1use crate::consensus::doms::attestation::Attestation;
2use crate::consensus::doms::entry::Entry;
3use crate::utils::misc::{TermExt, bin_to_bitvec, bitvec_to_bin};
4use crate::utils::rocksdb::RocksDb;
5use crate::utils::safe_etf::{encode_safe_deterministic, u64_to_term};
6use crate::utils::{Hash, PublicKey, Signature};
7use amadeus_utils::constants::{CF_ATTESTATION, CF_ENTRY, CF_ENTRY_META, CF_SYSCONF, CF_TX, CF_TX_ACCOUNT_NONCE};
8use amadeus_utils::misc::get_bits_percentage;
9use amadeus_utils::rocksdb::{Direction, IteratorMode, ReadOptions};
10use amadeus_utils::vecpak::{Term, decode};
11use bitvec::prelude::*;
12use std::collections::HashMap;
13use tracing::Instrument;
14
15#[derive(Debug, thiserror::Error)]
16pub enum Error {
17 #[error(transparent)]
18 RocksDb(#[from] amadeus_utils::rocksdb::RocksDbError),
19 #[error(transparent)]
20 EtfDecode(#[from] eetf::DecodeError),
21 #[error(transparent)]
22 EtfEncode(#[from] eetf::EncodeError),
23 #[error(transparent)]
24 BinDecode(#[from] bincode::error::DecodeError),
25 #[error(transparent)]
26 BinEncode(#[from] bincode::error::EncodeError),
27 #[error(transparent)]
28 Join(#[from] tokio::task::JoinError),
29 #[error(transparent)]
32 Att(#[from] crate::consensus::doms::attestation::Error),
33 #[error("invalid kv cell: {0}")]
34 KvCell(&'static str),
35 #[error("invalid etf: {0}")]
36 BadEtf(&'static str),
37}
38
39async fn init_kvdb(base: &str) -> Result<RocksDb, Error> {
41 let long_init_hint = tokio::spawn(
42 async {
43 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
44 }
45 .instrument(tracing::Span::current()),
46 );
47
48 let path = format!("{}/db/fabric", base);
49 let db = RocksDb::open(path).await.unwrap(); long_init_hint.abort();
51
52 Ok(db)
53}
54
55#[derive(Debug, Clone)]
57pub struct Fabric {
58 db: RocksDb,
59}
60
61impl Fabric {
62 pub async fn new(base: &str) -> Result<Self, Error> {
64 let db = init_kvdb(base).await?;
66 Ok(Self { db })
67 }
68
69 pub fn with_db(db: RocksDb) -> Self {
71 Self { db }
72 }
73
74 pub fn db(&self) -> &RocksDb {
75 &self.db
76 }
77
78 pub async fn cleanup(&self) {
80 use crate::consensus::chain_epoch;
81 let db = &self.db;
82
83 let next_epoch = if let Ok(Some(bin)) = self.db.get(CF_SYSCONF, b"finality_clean_next_epoch")
85 && let Ok(bytes) = bin.try_into()
86 {
87 u32::from_be_bytes(bytes)
88 } else {
89 0u32
90 };
91
92 let cur_epoch = chain_epoch(db);
93 if next_epoch >= cur_epoch.saturating_sub(1) {
94 return; }
96
97 let start_height = next_epoch.saturating_mul(100_000);
99 let _end_height = start_height + 99_999;
100
101 let mut handles = Vec::with_capacity(10);
103 for idx in 0..10u64 {
104 let s = (start_height as u64) + idx * 10_000;
105 let e = s + 9_999;
106 let fab = self.clone();
108 handles.push(tokio::spawn(async move {
109 fab.clean_muts_rev_range(s, e).ok();
110 }));
111 }
112 for h in handles {
113 let _ = h.await;
114 }
115
116 let cf_sysconf = db.inner.cf_handle(CF_SYSCONF).unwrap();
117
118 let next_epoch_be = (next_epoch + 1).to_be_bytes();
119 let txn = db.begin_transaction();
120 let _ = txn.put_cf(&cf_sysconf, b"finality_clean_next_epoch", &next_epoch_be);
121 let _ = txn.commit();
122 }
123
124 pub fn insert_entry(
126 &self,
127 hash: &Hash,
128 height: u64,
129 slot: u64,
130 entry_bin: &[u8],
131 seen_millis: u64,
132 ) -> Result<(), Error> {
133 use amadeus_utils::database::pad_integer;
134
135 let cf_entry = self.db.inner.cf_handle(CF_ENTRY).unwrap();
136 let cf_entry_meta = self.db.inner.cf_handle(CF_ENTRY_META).unwrap();
137
138 let txn = self.db.begin_transaction();
139 if txn.get_cf(&cf_entry, hash)?.is_none() {
140 txn.put_cf(&cf_entry, hash, entry_bin)?;
141
142 let seentime_key = format!("entry:{}:seentime", hex::encode(hash));
143 txn.put_cf(&cf_entry_meta, seentime_key.as_bytes(), seen_millis.to_string().as_bytes())?;
144 }
145
146 let height_key = format!("by_height:{}:{}", pad_integer(height), hex::encode(hash));
148 txn.put_cf(&cf_entry_meta, height_key.as_bytes(), hash)?;
149
150 let slot_key = format!("by_slot:{}:{}", pad_integer(slot), hex::encode(hash));
151 txn.put_cf(&cf_entry_meta, slot_key.as_bytes(), hash)?;
152
153 txn.commit()?;
154 Ok(())
155 }
156
157 pub fn entries_by_height(&self, height: u64) -> Result<Vec<Vec<u8>>, Error> {
158 use amadeus_utils::database::pad_integer;
159
160 let height_prefix = format!("by_height:{}:", pad_integer(height));
161 let mut out = Vec::new();
162 for (_, v) in self.db.iter_prefix(CF_ENTRY_META, height_prefix.as_bytes())?.iter() {
163 if let Some(entry_bin) = self.db.get(CF_ENTRY, &v)? {
164 out.push(entry_bin);
165 }
166 }
167
168 Ok(out)
169 }
170
171 pub fn entries_by_slot(&self, slot: u64) -> Result<Vec<Vec<u8>>, Error> {
172 use amadeus_utils::database::pad_integer;
173
174 let slot_prefix = format!("by_slot:{}:", pad_integer(slot));
175 let mut out = Vec::new();
176 for (_, v) in self.db.iter_prefix(CF_ENTRY_META, slot_prefix.as_bytes())?.iter() {
177 if let Some(entry_bin) = self.db.get(CF_ENTRY, &v)? {
178 out.push(entry_bin);
179 }
180 }
181
182 Ok(out)
183 }
184
185 pub fn get_entry_by_hash(&self, hash: &Hash) -> Option<Entry> {
186 let bin = self.db.get(CF_ENTRY, hash.as_ref()).ok()??;
187 Entry::from_vecpak_bin(&bin).ok()
188 }
189
190 pub fn my_attestation_by_entryhash(&self, hash: &[u8]) -> Result<Option<Attestation>, Error> {
191 use amadeus_utils::database::pad_integer;
192
193 let hash_array: [u8; 32] = hash.try_into().map_err(|_| Error::BadEtf("hash_len"))?;
194 let entry = self.get_entry_by_hash(&Hash::from(hash_array));
195 let entry = entry.ok_or(Error::BadEtf("entry_not_found"))?;
196
197 let my_signer = self.db.get(CF_SYSCONF, b"trainer_pk")?.ok_or(Error::BadEtf("no_trainer_pk"))?;
198
199 let prefix = format!(
200 "attestation:{}:{}:{}:",
201 pad_integer(entry.header.height),
202 hex::encode(hash),
203 hex::encode(&my_signer)
204 );
205
206 for (_, value) in self.db.iter_prefix(CF_ATTESTATION, prefix.as_bytes())?.iter() {
207 if let Some(att) = Attestation::from_vecpak_bin(value) {
208 return Ok(Some(att));
209 }
210 }
211
212 Ok(None)
213 }
214
215 pub fn get_or_resign_my_attestation(
216 &self,
217 config: &crate::config::Config,
218 entry_hash: &Hash,
219 ) -> Result<Option<Attestation>, Error> {
220 use amadeus_utils::database::pad_integer;
221
222 let entry = self.get_entry_by_hash(entry_hash).ok_or(Error::BadEtf("entry_not_found"))?;
223 let my_pk = config.get_pk();
224
225 let prefix = format!(
226 "attestation:{}:{}:{}:",
227 pad_integer(entry.header.height),
228 hex::encode(entry_hash),
229 hex::encode(&my_pk)
230 );
231
232 for (_, value) in self.db.iter_prefix(CF_ATTESTATION, prefix.as_bytes())?.iter() {
233 if let Some(att) = Attestation::from_vecpak_bin(value) {
234 if att.signer.as_ref() as &[u8] == my_pk.as_ref() as &[u8] {
235 return Ok(Some(att));
236 }
237 let sk = config.get_sk();
238 let new_a = Attestation::sign_with(my_pk.as_ref(), &sk, entry_hash, &att.mutations_hash)?;
239
240 let key = format!(
241 "attestation:{}:{}:{}:{}",
242 pad_integer(entry.header.height),
243 hex::encode(entry_hash),
244 hex::encode(&my_pk),
245 hex::encode(&new_a.mutations_hash)
246 );
247 self.db.put(CF_ATTESTATION, key.as_bytes(), &new_a.to_vecpak_bin())?;
248
249 return Ok(Some(new_a));
250 }
251 }
252
253 Ok(None)
254 }
255
256 pub fn insert_consensus(&self, consensus: &crate::consensus::consensus::Consensus) -> Result<(), Error> {
257 use amadeus_utils::vecpak::{self, Term as VTerm};
258
259 let key =
260 format!("consensus:{}:{}", hex::encode(&consensus.entry_hash), hex::encode(&consensus.mutations_hash));
261
262 if let Some(existing_bin) = self.db.get(CF_ATTESTATION, key.as_bytes())? {
263 if let Ok(existing_term) = decode(&existing_bin) {
264 if let Some(existing_mask) = extract_mask_from_consensus_term(&existing_term) {
265 let consensus_mask = consensus.mask();
266 if existing_mask.all()
267 || (!consensus_mask.is_empty() && existing_mask.count_ones() >= consensus_mask.count_ones())
268 {
269 return Ok(());
270 }
271 }
272 }
273 }
274
275 let mask = self.validate_consensus(&consensus)?;
276
277 let consensus_term = VTerm::PropList(vec![
278 (VTerm::Binary(b"mask".to_vec()), VTerm::Binary(bitvec_to_bin(&mask))),
279 (VTerm::Binary(b"agg_sig".to_vec()), VTerm::Binary(consensus.aggsig.aggsig.clone())),
280 ]);
281
282 self.db.put(CF_ATTESTATION, key.as_bytes(), &vecpak::encode(consensus_term))?;
283
284 Ok(())
285 }
286
287 pub fn validate_consensus(
293 &self,
294 consensus: &crate::consensus::consensus::Consensus,
295 ) -> Result<BitVec<u8, Msb0>, Error> {
296 use crate::utils::bls12_381 as bls;
297 use amadeus_runtime::consensus::unmask_trainers;
298 use amadeus_utils::constants::DST_ATT;
299
300 let mut to_sign = [0u8; 64];
301 to_sign[..32].copy_from_slice(consensus.entry_hash.as_ref());
302 to_sign[32..].copy_from_slice(consensus.mutations_hash.as_ref());
303
304 let entry = self.get_entry_by_hash(&consensus.entry_hash).ok_or(Error::BadEtf("invalid_entry"))?;
305 let trainers = self.trainers_for_height(entry.header.height).ok_or(Error::KvCell("trainers_for_height"))?;
312 if trainers.is_empty() {
313 return Err(Error::KvCell("trainers_for_height:empty"));
314 }
315
316 let consensus_mask = consensus.mask();
317 let mask = if consensus_mask.is_empty() { bitvec![u8, Msb0; 1; trainers.len()] } else { consensus_mask };
318
319 let score = get_bits_percentage(&mask, trainers.len());
320 if score < 0.67 {
321 return Err(Error::BadEtf("consensus_too_low"));
322 }
323
324 let signed_pks = unmask_trainers(&mask, &trainers);
325 let agg_pk = bls::aggregate_public_keys(&signed_pks).map_err(|_| Error::BadEtf("bls_aggregate_failed"))?;
326 let sig = consensus.signature().ok_or(Error::BadEtf("invalid_signature_length"))?;
327 bls::verify(&*agg_pk, &*sig, &to_sign, DST_ATT).map_err(|_| Error::BadEtf("invalid_signature"))?;
328
329 Ok(mask)
330 }
331
332 pub fn best_consensus_by_entryhash(
333 &self,
334 trainers: &[PublicKey],
335 entry_hash: &[u8],
336 ) -> Result<(Option<[u8; 32]>, Option<f64>, Option<StoredConsensus>), Error> {
337 let prefix = format!("consensus:{}:", hex::encode(entry_hash));
338 let items = self.db.iter_prefix(CF_ATTESTATION, prefix.as_bytes())?;
339
340 if items.is_empty() {
341 return Ok((None, None, None));
342 }
343
344 let mut consensuses = Vec::new();
345 for (key, value) in items {
346 if let Ok(key_str) = std::str::from_utf8(&key) {
347 let parts: Vec<&str> = key_str.split(':').collect();
348 if parts.len() >= 3 {
349 if let Ok(mutations_hash) = hex::decode(parts[2]) {
350 if mutations_hash.len() == 32 {
351 if let Some(stored) = parse_stored_consensus(&value) {
352 let mut hash_array = [0u8; 32];
353 hash_array.copy_from_slice(&mutations_hash);
354 consensuses.push((hash_array, stored));
355 }
356 }
357 }
358 }
359 }
360 }
361
362 let best = consensuses
363 .into_iter()
364 .map(|(hash, consensus)| {
365 let score = get_bits_percentage(&consensus.mask, trainers.len());
366 (hash, score, consensus)
367 })
368 .max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
369
370 Ok(best.map_or((None, None, None), |(h, s, c)| (Some(h), Some(s), Some(c))))
371 }
372
373 pub fn set_temporal_hash_height(&self, entry: &Entry) -> Result<(), Error> {
375 let cf_sysconf = self.db.inner.cf_handle(CF_SYSCONF).unwrap();
376
377 let txn = self.db.begin_transaction();
378 txn.put_cf(&cf_sysconf, b"temporal_tip", &entry.hash)?;
379 let height_term = encode_safe_deterministic(&u64_to_term(entry.header.height));
380 txn.put_cf(&cf_sysconf, b"temporal_height", &height_term)?;
381 txn.commit()?;
382
383 Ok(())
384 }
385
386 pub fn get_temporal_entry(&self) -> Result<Option<Entry>, Error> {
387 Ok(self.get_temporal_hash()?.and_then(|h| self.get_entry_by_hash(&Hash::from(h))))
388 }
389
390 pub fn get_temporal_hash(&self) -> Result<Option<[u8; 32]>, Error> {
391 match self.db.get(CF_SYSCONF, b"temporal_tip")? {
392 Some(rt) => Ok(Some(rt.try_into().map_err(|_| Error::KvCell("temporal_tip"))?)),
393 None => Ok(None),
394 }
395 }
396
397 pub fn get_temporal_height(&self) -> Result<Option<u64>, Error> {
398 if let Some(entry) = self.get_temporal_entry()? {
400 return Ok(Some(entry.header.height));
401 }
402
403 match self.db.get(CF_SYSCONF, b"temporal_height")? {
404 Some(hb) => {
405 if hb.len() == 8 {
407 let arr: [u8; 8] = hb.try_into().map_err(|_| Error::KvCell("temporal_height"))?;
408 return Ok(Some(u64::from_be_bytes(arr)));
409 }
410 if hb.len() == 4 {
412 let arr: [u8; 4] = hb.try_into().map_err(|_| Error::KvCell("temporal_height"))?;
413 return Ok(Some(u32::from_be_bytes(arr) as u64));
414 }
415 if let Ok(term) = eetf::Term::decode(&mut std::io::Cursor::new(&hb)) {
417 if let Some(height) = TermExt::get_integer(&term) {
418 return Ok(Some(height as u64));
419 }
420 }
421 Err(Error::KvCell("temporal_height"))
422 }
423 None => Ok(None),
424 }
425 }
426
427 pub fn set_rooted_hash_height(&self, entry: &Entry) -> Result<(), Error> {
429 let cf_sysconf = self.db.inner.cf_handle(CF_SYSCONF).unwrap();
430
431 let txn = self.db.begin_transaction();
432 txn.put_cf(&cf_sysconf, b"rooted_tip", &entry.hash)?;
433 txn.put_cf(&cf_sysconf, b"rooted_height", entry.header.height.to_string().as_bytes())?;
434 txn.commit()?;
435
436 Ok(())
437 }
438
439 pub fn get_rooted_entry(&self) -> Result<Option<Entry>, Error> {
440 Ok(self.get_rooted_hash()?.and_then(|h| self.get_entry_by_hash(&Hash::from(h))))
441 }
442
443 pub fn get_rooted_hash(&self) -> Result<Option<[u8; 32]>, Error> {
444 match self.db.get(CF_SYSCONF, b"rooted_tip")? {
445 Some(rt) => Ok(Some(rt.try_into().map_err(|_| Error::KvCell("rooted_tip"))?)),
446 None => Ok(None),
447 }
448 }
449
450 pub fn get_rooted_height(&self) -> Result<Option<u64>, Error> {
451 if let Some(entry) = self.get_rooted_entry()? {
453 return Ok(Some(entry.header.height));
454 }
455
456 match self.db.get(CF_SYSCONF, b"rooted_height")? {
457 Some(hb) => {
458 if let Ok(s) = std::str::from_utf8(&hb) {
460 if let Ok(height) = s.parse::<u64>() {
461 return Ok(Some(height));
462 }
463 }
464 if let Ok(term) = eetf::Term::decode(&mut std::io::Cursor::new(&hb)) {
466 if let Some(height) = TermExt::get_integer(&term) {
467 return Ok(Some(height as u64));
468 }
469 }
470 Err(Error::KvCell("rooted_height"))
471 }
472 None => Ok(None),
473 }
474 }
475
476 pub fn get_temporal_height_or_0(&self) -> u64 {
478 self.get_temporal_height().ok().flatten().unwrap_or(0)
479 }
480
481 pub fn get_chain_epoch_or_0(&self) -> u64 {
482 self.get_temporal_height_or_0() / 100_000
483 }
484
485 pub fn trainers_for_height(&self, height: u64) -> Option<Vec<PublicKey>> {
486 amadeus_runtime::consensus::bic::epoch::trainers_for_height(self.db(), height)
487 }
488
489 pub fn get_muts_rev(&self, hash: &Hash) -> Result<Option<Vec<u8>>, Error> {
490 let key = format!("entry:{}:muts_rev", hex::encode(hash));
491 Ok(self.db.get(CF_ENTRY_META, key.as_bytes())?)
492 }
493
494 pub fn put_muts_rev(&self, hash: &Hash, data: &[u8]) -> Result<(), Error> {
495 let key = format!("entry:{}:muts_rev", hex::encode(hash));
496 self.db.put(CF_ENTRY_META, key.as_bytes(), data)?;
497 Ok(())
498 }
499
500 pub fn delete_muts_rev(&self, hash: &Hash) -> Result<(), Error> {
501 let key = format!("entry:{}:muts_rev", hex::encode(hash));
502 self.db.delete(CF_ENTRY_META, key.as_bytes())?;
503 Ok(())
504 }
505
506 pub fn get_muts(&self, hash: &Hash) -> Result<Option<Vec<u8>>, Error> {
507 let key = format!("entry:{}:muts", hex::encode(hash));
508 Ok(self.db.get(CF_ENTRY_META, key.as_bytes())?)
509 }
510
511 pub fn put_muts(&self, hash: &Hash, data: &[u8]) -> Result<(), Error> {
512 let key = format!("entry:{}:muts", hex::encode(hash));
513 self.db.put(CF_ENTRY_META, key.as_bytes(), data)?;
514 Ok(())
515 }
516
517 pub fn put_attestation(&self, hash: &Hash, data: &[u8]) -> Result<(), Error> {
518 let attestation = Attestation::from_vecpak_bin(data).ok_or(Error::BadEtf("attestation_unpack_failed"))?;
519 let entry = self.get_entry_by_hash(hash).ok_or(Error::BadEtf("entry_not_found"))?;
520
521 let key = format!(
522 "attestation:{}:{}:{}:{}",
523 amadeus_utils::database::pad_integer(entry.header.height),
524 hex::encode(hash),
525 hex::encode(&attestation.signer),
526 hex::encode(&attestation.mutations_hash)
527 );
528 self.db.put(CF_ATTESTATION, key.as_bytes(), &attestation.to_vecpak_bin())?;
529
530 Ok(())
531 }
532
533 pub fn delete_attestation(&self, hash: &Hash) -> Result<(), Error> {
534 let entry = self.get_entry_by_hash(hash).ok_or(Error::BadEtf("entry_not_found"))?;
535 let my_signer = self.db.get(CF_SYSCONF, b"trainer_pk")?.ok_or(Error::BadEtf("no_trainer_pk"))?;
536
537 let prefix = format!(
538 "attestation:{}:{}:{}:",
539 amadeus_utils::database::pad_integer(entry.header.height),
540 hex::encode(hash),
541 hex::encode(&my_signer)
542 );
543
544 for (key, _) in self.db.iter_prefix(CF_ATTESTATION, prefix.as_bytes())?.iter() {
545 self.db.delete(CF_ATTESTATION, key)?;
546 }
547
548 Ok(())
549 }
550
551 pub fn put_entry_seen_time(&self, hash: &Hash, seen_time: u64) -> Result<(), Error> {
552 let key = format!("entry:{}:seentime", hex::encode(hash));
553 self.db.put(CF_ENTRY_META, key.as_bytes(), seen_time.to_string().as_bytes())?;
554 Ok(())
555 }
556
557 pub fn delete_entry_seen_time(&self, hash: &Hash) -> Result<(), Error> {
558 let key = format!("entry:{}:seentime", hex::encode(hash));
559 self.db.delete(CF_ENTRY_META, key.as_bytes())?;
560 Ok(())
561 }
562
563 pub fn get_entry_seen_time(&self, hash: &Hash) -> Result<Option<u64>, Error> {
564 let key = format!("entry:{}:seentime", hex::encode(hash));
565 if let Some(bin) = self.db.get(CF_ENTRY_META, key.as_bytes())? {
566 if let Ok(s) = std::str::from_utf8(&bin) {
567 if let Ok(val) = s.parse::<u64>() {
568 return Ok(Some(val));
569 }
570 }
571 return Err(Error::BadEtf("seen_time_format"));
572 }
573 Ok(None)
574 }
575
576 pub fn delete_consensus(&self, hash: &Hash) -> Result<(), Error> {
577 let prefix = format!("consensus:{}:", hex::encode(hash));
578 for (key, _) in self.db.iter_prefix(CF_ATTESTATION, prefix.as_bytes())?.iter() {
579 self.db.delete(CF_ATTESTATION, &key)?;
580 }
581 Ok(())
582 }
583
584 pub fn delete_entry(&self, hash: &Hash) -> Result<(), Error> {
585 self.db.delete(CF_ENTRY, hash.as_ref())?;
586 Ok(())
587 }
588
589 pub fn delete_entry_by_height(&self, height_key: &[u8]) -> Result<(), Error> {
590 self.db.delete(CF_ENTRY_META, height_key)?;
591 Ok(())
592 }
593
594 pub fn delete_entry_by_slot(&self, slot_key: &[u8]) -> Result<(), Error> {
595 self.db.delete(CF_ENTRY_META, slot_key)?;
596 Ok(())
597 }
598
599 pub fn put_tx_metadata(&self, key: &[u8], tx: &[u8]) -> Result<(), Error> {
600 let cf_tx = self.db.inner.cf_handle(CF_TX).unwrap();
601
602 let txn = self.db.begin_transaction();
603 txn.put_cf(&cf_tx, key, tx)?;
604 txn.commit()?;
605
606 Ok(())
607 }
608
609 pub fn delete_tx_metadata(&self, hash: &Hash) -> Result<(), Error> {
610 let cf_tx = self.db.inner.cf_handle(CF_TX).unwrap();
611
612 let txn = self.db.begin_transaction();
613 txn.delete_cf(&cf_tx, hash.as_ref() as &[u8])?;
614 txn.commit()?;
615
616 Ok(())
617 }
618
619 pub fn put_tx_account_nonce(&self, key: &[u8], tx_hash: &Hash) -> Result<(), Error> {
620 let cf_nonce = self.db.inner.cf_handle(CF_TX_ACCOUNT_NONCE).unwrap();
621
622 let txn = self.db.begin_transaction();
623 txn.put_cf(&cf_nonce, key, tx_hash.as_ref() as &[u8])?;
624 txn.commit()?;
625
626 Ok(())
627 }
628
629 pub fn delete_tx_account_nonce(&self, key: &[u8]) -> Result<(), Error> {
630 let cf_nonce = self.db.inner.cf_handle(CF_TX_ACCOUNT_NONCE).unwrap();
631
632 let txn = self.db.begin_transaction();
633 txn.delete_cf(&cf_nonce, key)?;
634 txn.commit()?;
635
636 Ok(())
637 }
638
639 pub fn put_entry_raw(&self, hash: &Hash, data: &[u8]) -> Result<(), Error> {
640 let cf_entry = self.db.inner.cf_handle(CF_ENTRY).unwrap();
641
642 let txn = self.db.begin_transaction();
643 txn.put_cf(&cf_entry, hash, data)?;
644 txn.commit()?;
645
646 Ok(())
647 }
648
649 pub fn get_entry_raw(&self, hash: &Hash) -> Result<Option<Vec<u8>>, Error> {
650 let entry_cf = CF_ENTRY;
651 Ok(self.db.get(entry_cf, hash.as_ref())?)
652 }
653
654 fn clean_muts_rev_range(&self, start: u64, end: u64) -> Result<(), crate::utils::rocksdb::Error> {
655 use amadeus_utils::database::pad_integer;
656
657 let cf_entry_meta = self.db.inner.cf_handle(CF_ENTRY_META).unwrap();
658
659 let start_key = format!("by_height:{}:", pad_integer(start));
660 let end_key = format!("by_height:{}:", pad_integer(end + 1));
661
662 let txn = self.db.begin_transaction();
663 let mut opts = ReadOptions::default();
664 opts.set_total_order_seek(true);
665 let iter =
666 txn.iterator_cf_opt(&cf_entry_meta, opts, IteratorMode::From(start_key.as_bytes(), Direction::Forward));
667
668 let mut deleted_hashes = Vec::new();
669 for item in iter {
670 let (k, v) = item?;
671 if k.as_ref() >= end_key.as_bytes() {
672 break;
673 }
674 if let Ok(key_str) = std::str::from_utf8(&k) {
675 if key_str.starts_with("by_height:") {
676 deleted_hashes.push(v.to_vec());
677 }
678 }
679 }
680
681 let ops = deleted_hashes.len();
682 for hash in deleted_hashes {
683 let muts_rev_key = format!("entry:{}:muts_rev", hex::encode(&hash));
684 let _ = txn.delete_cf(&cf_entry_meta, muts_rev_key.as_bytes());
685 }
686
687 if ops > 0 {
688 txn.commit()?;
689 }
690
691 Ok(())
692 }
693
694 pub fn are_we_trainer(&self, config: &crate::config::Config) -> bool {
696 let Some(h) = self.get_temporal_height().ok().flatten() else { return false };
697 let Some(trainers) = self.trainers_for_height(h + 1) else { return false };
698 trainers.iter().any(|pk| pk == &config.get_pk())
699 }
700
701 pub fn get_trainer_for_slot(&self, height: u64, slot: u64) -> Option<PublicKey> {
703 let trainers = self.trainers_for_height(height)?;
704 if trainers.is_empty() {
705 return None;
706 }
707 let idx = slot.rem_euclid(trainers.len() as u64) as usize;
708 trainers.get(idx).copied()
709 }
710
711 pub fn get_trainer_for_current_slot(&self) -> Option<PublicKey> {
712 let h = self.get_temporal_height().ok()??;
713 self.get_trainer_for_slot(h, h)
714 }
715
716 pub fn get_trainer_for_next_slot(&self) -> Option<PublicKey> {
717 let h = self.get_temporal_height().ok()??;
718 self.get_trainer_for_slot(h + 1, h + 1)
719 }
720
721 pub fn are_we_trainer_for_next_slot(&self, config: &crate::config::Config) -> bool {
722 match self.get_trainer_for_next_slot() {
723 Some(pk) => pk == config.get_pk(),
724 None => false,
725 }
726 }
727
728 pub fn is_in_chain(&self, target_hash: &Hash) -> bool {
729 let target_entry = match self.get_entry_by_hash(target_hash) {
731 Some(e) => e,
732 None => return false,
733 };
734
735 let target_height = target_entry.header.height;
736
737 let tip_hash = match self.get_temporal_hash() {
739 Ok(Some(h)) => h,
740 _ => return false,
741 };
742 let tip_entry = match self.get_entry_by_hash(&Hash::from(tip_hash)) {
743 Some(e) => e,
744 None => return false,
745 };
746
747 let tip_height = tip_entry.header.height;
748
749 if tip_height < target_height {
751 return false;
752 }
753
754 self.is_in_chain_internal(&tip_entry.hash, target_hash, target_height)
756 }
757
758 fn is_in_chain_internal(&self, current_hash: &Hash, target_hash: &Hash, target_height: u64) -> bool {
759 if current_hash == target_hash {
761 return true;
762 }
763
764 let current_entry = match self.get_entry_by_hash(current_hash) {
766 Some(e) => e,
767 None => return false,
768 };
769
770 if current_entry.header.height <= target_height {
772 return false;
773 }
774
775 self.is_in_chain_internal(¤t_entry.header.prev_hash, target_hash, target_height)
777 }
778
779 pub fn validate_entry_slot_trainer(&self, entry: &Entry, prev_slot: u64) -> bool {
781 let next_slot = entry.header.slot;
782 let slot_trainer = self.get_trainer_for_slot(entry.header.height, next_slot);
783
784 if (next_slot as i64) - (prev_slot as i64) != 1 {
786 return false;
787 }
788
789 match slot_trainer {
791 Some(expected_trainer) if entry.header.signer == expected_trainer => true,
792 Some(_) if entry.mask.is_some() => {
793 let trainers = self.trainers_for_height(entry.header.height).unwrap_or_default();
795 let score = get_bits_percentage(entry.mask.as_ref().unwrap(), trainers.len());
796 score >= 0.67
797 }
798 _ => false,
799 }
800 }
801
802 pub fn start_proc_consensus(&self) {
803 let cf_sysconf = self.db.inner.cf_handle(CF_SYSCONF).unwrap();
804
805 let txn = self.db.begin_transaction();
806 let _ = txn.put_cf(&cf_sysconf, b"proc_consensus", &[1]);
807 let _ = txn.commit();
808 }
809
810 pub fn stop_proc_consensus(&self) {
811 let cf_sysconf = self.db.inner.cf_handle(CF_SYSCONF).unwrap();
812
813 let txn = self.db.begin_transaction();
814 let _ = txn.put_cf(&cf_sysconf, b"proc_consensus", &[0]);
815 let _ = txn.commit();
816 }
817
818 pub fn is_proc_consensus(&self) -> bool {
819 self.db.get(CF_SYSCONF, b"proc_consensus").ok().flatten().map_or(false, |v| v[0] == 1)
820 }
821
822 pub fn chain_nonce(&self, public_key: &[u8]) -> Option<u64> {
826 let cf = self.db.inner.cf_handle(amadeus_utils::constants::CF_CONTRACTSTATE)?;
827 let key = format!("account:{}:nonce", hex::encode(public_key));
828 self.db
829 .inner
830 .get_cf(&cf, key.as_bytes())
831 .ok()
832 .flatten()
833 .and_then(|bytes| std::str::from_utf8(&bytes).ok().and_then(|s| s.parse::<u64>().ok()))
834 }
835
836 pub fn chain_balance(&self, public_key: &[u8]) -> i128 {
838 let cf = match self.db.inner.cf_handle(amadeus_utils::constants::CF_CONTRACTSTATE) {
839 Some(cf) => cf,
840 None => return 0,
841 };
842 let key = format!("account:{}:balance:AMA", hex::encode(public_key));
843 self.db
844 .inner
845 .get_cf(&cf, key.as_bytes())
846 .ok()
847 .flatten()
848 .and_then(|bytes| std::str::from_utf8(&bytes).ok().and_then(|s| s.parse::<i128>().ok()))
849 .unwrap_or(0)
850 }
851
852 pub fn chain_diff_bits(&self) -> u64 {
854 let cf = match self.db.inner.cf_handle(amadeus_utils::constants::CF_CONTRACTSTATE) {
855 Some(cf) => cf,
856 None => return 128, };
858 self.db
859 .inner
860 .get_cf(&cf, b"bic:sol:diff")
861 .ok()
862 .flatten()
863 .and_then(|bytes| std::str::from_utf8(&bytes).ok().and_then(|s| s.parse::<u64>().ok()))
864 .unwrap_or(128)
865 }
866
867 pub fn chain_segment_vr_hash(&self) -> Option<Vec<u8>> {
869 let cf = self.db.inner.cf_handle(amadeus_utils::constants::CF_CONTRACTSTATE)?;
870 self.db.inner.get_cf(&cf, b"segment:vr_hash").ok().flatten()
871 }
872
873 pub fn chain_balance_symbol(&self, public_key: &[u8], symbol: &[u8]) -> i128 {
875 let cf = match self.db.inner.cf_handle(amadeus_utils::constants::CF_CONTRACTSTATE) {
876 Some(cf) => cf,
877 None => return 0,
878 };
879 let key = format!("account:{}:balance:{}", hex::encode(public_key), std::str::from_utf8(symbol).unwrap_or(""));
880 self.db
881 .inner
882 .get_cf(&cf, key.as_bytes())
883 .ok()
884 .flatten()
885 .and_then(|bytes| std::str::from_utf8(&bytes).ok().and_then(|s| s.parse::<i128>().ok()))
886 .unwrap_or(0)
887 }
888
889 pub fn chain_total_sols(&self) -> u64 {
891 let cf = match self.db.inner.cf_handle(amadeus_utils::constants::CF_CONTRACTSTATE) {
892 Some(cf) => cf,
893 None => return 0,
894 };
895 self.db
896 .inner
897 .get_cf(&cf, b"bic:sol:total")
898 .ok()
899 .flatten()
900 .and_then(|bytes| std::str::from_utf8(&bytes).ok().and_then(|s| s.parse::<u64>().ok()))
901 .unwrap_or(0)
902 }
903}
904
905pub mod chain_queries {
907 use crate::utils::rocksdb::RocksDb;
908
909 pub fn chain_nonce(db: &RocksDb, public_key: &[u8]) -> Option<u64> {
911 let cf = db.inner.cf_handle(amadeus_utils::constants::CF_CONTRACTSTATE)?;
912 let key = format!("account:{}:nonce", hex::encode(public_key));
913 db.inner
914 .get_cf(&cf, key.as_bytes())
915 .ok()
916 .flatten()
917 .and_then(|bytes| std::str::from_utf8(&bytes).ok().and_then(|s| s.parse::<u64>().ok()))
918 }
919
920 pub fn chain_balance(db: &RocksDb, public_key: &[u8]) -> i128 {
922 let cf = match db.inner.cf_handle(amadeus_utils::constants::CF_CONTRACTSTATE) {
923 Some(cf) => cf,
924 None => return 0,
925 };
926 let key = format!("account:{}:balance:AMA", hex::encode(public_key));
927 db.inner
928 .get_cf(&cf, key.as_bytes())
929 .ok()
930 .flatten()
931 .and_then(|bytes| std::str::from_utf8(&bytes).ok().and_then(|s| s.parse::<i128>().ok()))
932 .unwrap_or(0)
933 }
934
935 pub fn chain_diff_bits(db: &RocksDb) -> u64 {
937 let cf = match db.inner.cf_handle(amadeus_utils::constants::CF_CONTRACTSTATE) {
938 Some(cf) => cf,
939 None => return 128, };
941 db.inner
942 .get_cf(&cf, b"bic:sol:diff")
943 .ok()
944 .flatten()
945 .and_then(|bytes| std::str::from_utf8(&bytes).ok().and_then(|s| s.parse::<u64>().ok()))
946 .unwrap_or(128)
947 }
948
949 pub fn chain_segment_vr_hash(db: &RocksDb) -> Option<Vec<u8>> {
951 let cf = db.inner.cf_handle(amadeus_utils::constants::CF_CONTRACTSTATE)?;
952 db.inner.get_cf(&cf, b"segment:vr_hash").ok().flatten()
953 }
954}
955
956#[derive(Debug, Clone, PartialEq, Eq)]
957pub struct StoredConsensus {
958 pub mask: BitVec<u8, Msb0>,
959 pub agg_sig: Signature,
960}
961
962#[allow(dead_code)]
963fn pack_consensus_map(map: &HashMap<[u8; 32], StoredConsensus>) -> Result<Vec<u8>, Error> {
964 use amadeus_utils::vecpak::{self, Term as VTerm};
965
966 let mut entries = Vec::new();
967 for (mut_hash, v) in map.iter() {
968 let mask_bytes = bitvec_to_bin(&v.mask);
969 let consensus_data = VTerm::PropList(vec![
970 (VTerm::Binary(b"mask".to_vec()), VTerm::Binary(mask_bytes)),
971 (VTerm::Binary(b"agg_sig".to_vec()), VTerm::Binary(v.agg_sig.to_vec())),
972 ]);
973 entries.push((VTerm::Binary(mut_hash.to_vec()), consensus_data));
974 }
975 let term = VTerm::PropList(entries);
976 Ok(vecpak::encode(term))
977}
978
979fn extract_mask_from_consensus_term(term: &Term) -> Option<BitVec<u8, Msb0>> {
980 use amadeus_utils::vecpak::VecpakExt;
981
982 let map = term.get_proplist_map()?;
983 let mask_bytes: Vec<u8> = map.get_binary(b"mask")?;
984 Some(bin_to_bitvec(mask_bytes))
985}
986
987fn parse_stored_consensus(bin: &[u8]) -> Option<StoredConsensus> {
988 use amadeus_utils::vecpak::VecpakExt;
989
990 let term = decode(bin).ok()?;
991 let map = term.get_proplist_map()?;
992
993 let mask_bytes: Vec<u8> = map.get_binary(b"mask")?;
994 let mask = bin_to_bitvec(mask_bytes);
995 let agg_sig: Signature = map.get_binary(b"agg_sig")?;
996
997 Some(StoredConsensus { mask, agg_sig })
998}
999
1000#[cfg(test)]
1001mod tests {
1002 use super::*;
1003
1004 #[tokio::test]
1005 async fn test_height_slot_indexing() {
1006 let test_path = format!("target/test_fabric_{}", std::process::id());
1008 let fab = Fabric::new(&test_path).await.unwrap();
1009
1010 let entry_hash1: Hash = Hash::new([1; 32]);
1012 let entry_hash2: Hash = Hash::new([2; 32]);
1013 let entry_bin1 = vec![1, 2, 3, 4];
1014 let entry_bin2 = vec![5, 6, 7, 8];
1015 let height = 12345;
1016 let slot1 = 67890;
1017 let slot2 = 67891;
1018 let seen_time = 1234567890;
1019
1020 fab.insert_entry(&entry_hash1, height, slot1, &entry_bin1, seen_time).unwrap();
1022 fab.insert_entry(&entry_hash2, height, slot2, &entry_bin2, seen_time).unwrap();
1023
1024 let entries = fab.entries_by_height(height as u64).unwrap();
1026 assert_eq!(entries.len(), 2);
1027 assert!(entries.contains(&entry_bin1));
1028 assert!(entries.contains(&entry_bin2));
1029
1030 let entries_slot1 = fab.entries_by_slot(slot1).unwrap();
1032 assert_eq!(entries_slot1.len(), 1);
1033 assert_eq!(entries_slot1[0], entry_bin1);
1034
1035 let entries_slot2 = fab.entries_by_slot(slot2).unwrap();
1036 assert_eq!(entries_slot2.len(), 1);
1037 assert_eq!(entries_slot2[0], entry_bin2);
1038
1039 let empty_entries = fab.entries_by_height(99999).unwrap();
1041 assert!(empty_entries.is_empty());
1042
1043 let empty_slot = fab.entries_by_slot(99999).unwrap();
1044 assert!(empty_slot.is_empty());
1045 }
1046
1047 #[tokio::test]
1048 async fn test_clean_muts_rev_range() {
1049 let test_path = format!("target/test_clean_muts_{}", std::process::id());
1050 let fab = Fabric::new(&test_path).await.unwrap();
1051
1052 let h0: Hash = Hash::new([0; 32]);
1053 let h1: Hash = Hash::new([1; 32]);
1054 let h2: Hash = Hash::new([2; 32]);
1055 let h3: Hash = Hash::new([3; 32]);
1056 let h4: Hash = Hash::new([4; 32]);
1057 fab.insert_entry(&h0, 99, 999, &[0], 0).unwrap();
1058 fab.insert_entry(&h1, 100, 1000, &[1], 0).unwrap();
1059 fab.insert_entry(&h2, 101, 1001, &[2], 0).unwrap();
1060 fab.insert_entry(&h3, 102, 1002, &[3], 0).unwrap();
1061 fab.insert_entry(&h4, 103, 1003, &[4], 0).unwrap();
1062 fab.put_muts_rev(&h0, b"data0").unwrap();
1063 fab.put_muts_rev(&h1, b"data1").unwrap();
1064 fab.put_muts_rev(&h2, b"data2").unwrap();
1065 fab.put_muts_rev(&h3, b"data3").unwrap();
1066 fab.put_muts_rev(&h4, b"data4").unwrap();
1067
1068 fab.clean_muts_rev_range(100, 102).unwrap();
1069
1070 assert!(fab.get_muts_rev(&h0).unwrap().is_some());
1071 assert!(fab.get_muts_rev(&h1).unwrap().is_none());
1072 assert!(fab.get_muts_rev(&h2).unwrap().is_none());
1073 assert!(fab.get_muts_rev(&h3).unwrap().is_none());
1074 assert!(fab.get_muts_rev(&h4).unwrap().is_some());
1075 }
1076}