Skip to main content

amadeus_node/consensus/
fabric.rs

1use crate::consensus::doms::attestation::Attestation;
2use crate::consensus::doms::entry::Entry;
3use crate::utils::misc::{TermExt, bin_to_bitvec, bitvec_to_bin};
4use crate::utils::rocksdb::RocksDb;
5use crate::utils::safe_etf::{encode_safe_deterministic, u64_to_term};
6use crate::utils::{Hash, PublicKey, Signature};
7use amadeus_utils::constants::{CF_ATTESTATION, CF_ENTRY, CF_ENTRY_META, CF_SYSCONF, CF_TX, CF_TX_ACCOUNT_NONCE};
8use amadeus_utils::misc::get_bits_percentage;
9use amadeus_utils::rocksdb::{Direction, IteratorMode, ReadOptions};
10use amadeus_utils::vecpak::{Term, decode};
11use bitvec::prelude::*;
12use std::collections::HashMap;
13use tracing::Instrument;
14
15#[derive(Debug, thiserror::Error)]
16pub enum Error {
17    #[error(transparent)]
18    RocksDb(#[from] amadeus_utils::rocksdb::RocksDbError),
19    #[error(transparent)]
20    EtfDecode(#[from] eetf::DecodeError),
21    #[error(transparent)]
22    EtfEncode(#[from] eetf::EncodeError),
23    #[error(transparent)]
24    BinDecode(#[from] bincode::error::DecodeError),
25    #[error(transparent)]
26    BinEncode(#[from] bincode::error::EncodeError),
27    #[error(transparent)]
28    Join(#[from] tokio::task::JoinError),
29    // #[error(transparent)]
30    // Entry(#[from] consensus::entry::Error),
31    #[error(transparent)]
32    Att(#[from] crate::consensus::doms::attestation::Error),
33    #[error("invalid kv cell: {0}")]
34    KvCell(&'static str),
35    #[error("invalid etf: {0}")]
36    BadEtf(&'static str),
37}
38
39/// Initialize Fabric DB area (creates/open RocksDB with the required CFs)
40async fn init_kvdb(base: &str) -> Result<RocksDb, Error> {
41    let long_init_hint = tokio::spawn(
42        async {
43            tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
44        }
45        .instrument(tracing::Span::current()),
46    );
47
48    let path = format!("{}/db/fabric", base);
49    let db = RocksDb::open(path).await.unwrap(); // nothing to do if db fails
50    long_init_hint.abort();
51
52    Ok(db)
53}
54
55// New Fabric struct that owns the RocksDb handle
56#[derive(Debug, Clone)]
57pub struct Fabric {
58    db: RocksDb,
59}
60
61impl Fabric {
62    /// Create Fabric by opening RocksDb at base/fabric
63    pub async fn new(base: &str) -> Result<Self, Error> {
64        // Previously init_kvdb(base) + Fabric::new(db)
65        let db = init_kvdb(base).await?;
66        Ok(Self { db })
67    }
68
69    /// Create Fabric from an already opened RocksDb handle
70    pub fn with_db(db: RocksDb) -> Self {
71        Self { db }
72    }
73
74    pub fn db(&self) -> &RocksDb {
75        &self.db
76    }
77
78    // Perform a single periodic cleanup step: if an epoch is ready to be cleaned, clean it
79    pub async fn cleanup(&self) {
80        use crate::consensus::chain_epoch;
81        let db = &self.db;
82
83        // read progress
84        let next_epoch = if let Ok(Some(bin)) = self.db.get(CF_SYSCONF, b"finality_clean_next_epoch")
85            && let Ok(bytes) = bin.try_into()
86        {
87            u32::from_be_bytes(bytes)
88        } else {
89            0u32
90        };
91
92        let cur_epoch = chain_epoch(db);
93        if next_epoch >= cur_epoch.saturating_sub(1) {
94            return; // nothing to do yet
95        }
96
97        // Clean one full epoch range [E*100_000 .. E*100_000 + 99_999]
98        let start_height = next_epoch.saturating_mul(100_000);
99        let _end_height = start_height + 99_999;
100
101        // Process in 10 shards of 10_000 heights to avoid long DB stalls
102        let mut handles = Vec::with_capacity(10);
103        for idx in 0..10u64 {
104            let s = (start_height as u64) + idx * 10_000;
105            let e = s + 9_999;
106            // spawn blocking work inline (db is sync API; wrap in spawn_blocking if needed later)
107            let fab = self.clone();
108            handles.push(tokio::spawn(async move {
109                fab.clean_muts_rev_range(s, e).ok();
110            }));
111        }
112        for h in handles {
113            let _ = h.await;
114        }
115
116        let cf_sysconf = db.inner.cf_handle(CF_SYSCONF).unwrap();
117
118        let next_epoch_be = (next_epoch + 1).to_be_bytes();
119        let txn = db.begin_transaction();
120        let _ = txn.put_cf(&cf_sysconf, b"finality_clean_next_epoch", &next_epoch_be);
121        let _ = txn.commit();
122    }
123
124    // Methods migrated from free functions
125    pub fn insert_entry(
126        &self,
127        hash: &Hash,
128        height: u64,
129        slot: u64,
130        entry_bin: &[u8],
131        seen_millis: u64,
132    ) -> Result<(), Error> {
133        use amadeus_utils::database::pad_integer;
134
135        let cf_entry = self.db.inner.cf_handle(CF_ENTRY).unwrap();
136        let cf_entry_meta = self.db.inner.cf_handle(CF_ENTRY_META).unwrap();
137
138        let txn = self.db.begin_transaction();
139        if txn.get_cf(&cf_entry, hash)?.is_none() {
140            txn.put_cf(&cf_entry, hash, entry_bin)?;
141
142            let seentime_key = format!("entry:{}:seentime", hex::encode(hash));
143            txn.put_cf(&cf_entry_meta, seentime_key.as_bytes(), seen_millis.to_string().as_bytes())?;
144        }
145
146        // ALWAYS index by height and slot, even if entry already exists
147        let height_key = format!("by_height:{}:{}", pad_integer(height), hex::encode(hash));
148        txn.put_cf(&cf_entry_meta, height_key.as_bytes(), hash)?;
149
150        let slot_key = format!("by_slot:{}:{}", pad_integer(slot), hex::encode(hash));
151        txn.put_cf(&cf_entry_meta, slot_key.as_bytes(), hash)?;
152
153        txn.commit()?;
154        Ok(())
155    }
156
157    pub fn entries_by_height(&self, height: u64) -> Result<Vec<Vec<u8>>, Error> {
158        use amadeus_utils::database::pad_integer;
159
160        let height_prefix = format!("by_height:{}:", pad_integer(height));
161        let mut out = Vec::new();
162        for (_, v) in self.db.iter_prefix(CF_ENTRY_META, height_prefix.as_bytes())?.iter() {
163            if let Some(entry_bin) = self.db.get(CF_ENTRY, &v)? {
164                out.push(entry_bin);
165            }
166        }
167
168        Ok(out)
169    }
170
171    pub fn entries_by_slot(&self, slot: u64) -> Result<Vec<Vec<u8>>, Error> {
172        use amadeus_utils::database::pad_integer;
173
174        let slot_prefix = format!("by_slot:{}:", pad_integer(slot));
175        let mut out = Vec::new();
176        for (_, v) in self.db.iter_prefix(CF_ENTRY_META, slot_prefix.as_bytes())?.iter() {
177            if let Some(entry_bin) = self.db.get(CF_ENTRY, &v)? {
178                out.push(entry_bin);
179            }
180        }
181
182        Ok(out)
183    }
184
185    pub fn get_entry_by_hash(&self, hash: &Hash) -> Option<Entry> {
186        let bin = self.db.get(CF_ENTRY, hash.as_ref()).ok()??;
187        Entry::from_vecpak_bin(&bin).ok()
188    }
189
190    pub fn my_attestation_by_entryhash(&self, hash: &[u8]) -> Result<Option<Attestation>, Error> {
191        use amadeus_utils::database::pad_integer;
192
193        let hash_array: [u8; 32] = hash.try_into().map_err(|_| Error::BadEtf("hash_len"))?;
194        let entry = self.get_entry_by_hash(&Hash::from(hash_array));
195        let entry = entry.ok_or(Error::BadEtf("entry_not_found"))?;
196
197        let my_signer = self.db.get(CF_SYSCONF, b"trainer_pk")?.ok_or(Error::BadEtf("no_trainer_pk"))?;
198
199        let prefix = format!(
200            "attestation:{}:{}:{}:",
201            pad_integer(entry.header.height),
202            hex::encode(hash),
203            hex::encode(&my_signer)
204        );
205
206        for (_, value) in self.db.iter_prefix(CF_ATTESTATION, prefix.as_bytes())?.iter() {
207            if let Some(att) = Attestation::from_vecpak_bin(value) {
208                return Ok(Some(att));
209            }
210        }
211
212        Ok(None)
213    }
214
215    pub fn get_or_resign_my_attestation(
216        &self,
217        config: &crate::config::Config,
218        entry_hash: &Hash,
219    ) -> Result<Option<Attestation>, Error> {
220        use amadeus_utils::database::pad_integer;
221
222        let entry = self.get_entry_by_hash(entry_hash).ok_or(Error::BadEtf("entry_not_found"))?;
223        let my_pk = config.get_pk();
224
225        let prefix = format!(
226            "attestation:{}:{}:{}:",
227            pad_integer(entry.header.height),
228            hex::encode(entry_hash),
229            hex::encode(&my_pk)
230        );
231
232        for (_, value) in self.db.iter_prefix(CF_ATTESTATION, prefix.as_bytes())?.iter() {
233            if let Some(att) = Attestation::from_vecpak_bin(value) {
234                if att.signer.as_ref() as &[u8] == my_pk.as_ref() as &[u8] {
235                    return Ok(Some(att));
236                }
237                let sk = config.get_sk();
238                let new_a = Attestation::sign_with(my_pk.as_ref(), &sk, entry_hash, &att.mutations_hash)?;
239
240                let key = format!(
241                    "attestation:{}:{}:{}:{}",
242                    pad_integer(entry.header.height),
243                    hex::encode(entry_hash),
244                    hex::encode(&my_pk),
245                    hex::encode(&new_a.mutations_hash)
246                );
247                self.db.put(CF_ATTESTATION, key.as_bytes(), &new_a.to_vecpak_bin())?;
248
249                return Ok(Some(new_a));
250            }
251        }
252
253        Ok(None)
254    }
255
256    pub fn insert_consensus(&self, consensus: &crate::consensus::consensus::Consensus) -> Result<(), Error> {
257        use amadeus_utils::vecpak::{self, Term as VTerm};
258
259        let key =
260            format!("consensus:{}:{}", hex::encode(&consensus.entry_hash), hex::encode(&consensus.mutations_hash));
261
262        if let Some(existing_bin) = self.db.get(CF_ATTESTATION, key.as_bytes())? {
263            if let Ok(existing_term) = decode(&existing_bin) {
264                if let Some(existing_mask) = extract_mask_from_consensus_term(&existing_term) {
265                    let consensus_mask = consensus.mask();
266                    if existing_mask.all()
267                        || (!consensus_mask.is_empty() && existing_mask.count_ones() >= consensus_mask.count_ones())
268                    {
269                        return Ok(());
270                    }
271                }
272            }
273        }
274
275        let mask = self.validate_consensus(&consensus)?;
276
277        let consensus_term = VTerm::PropList(vec![
278            (VTerm::Binary(b"mask".to_vec()), VTerm::Binary(bitvec_to_bin(&mask))),
279            (VTerm::Binary(b"agg_sig".to_vec()), VTerm::Binary(consensus.aggsig.aggsig.clone())),
280        ]);
281
282        self.db.put(CF_ATTESTATION, key.as_bytes(), &vecpak::encode(consensus_term))?;
283
284        Ok(())
285    }
286
287    /// Validate consensus vs chain state:
288    /// - Entry must exist and not be in the future vs current temporal_height
289    /// - Aggregate signature must verify against the set of trainers unmasked by `mask`
290    ///
291    /// On success, sets consensus.score = Some(score) and returns Ok(())
292    pub fn validate_consensus(
293        &self,
294        consensus: &crate::consensus::consensus::Consensus,
295    ) -> Result<BitVec<u8, Msb0>, Error> {
296        use crate::utils::bls12_381 as bls;
297        use amadeus_runtime::consensus::unmask_trainers;
298        use amadeus_utils::constants::DST_ATT;
299
300        let mut to_sign = [0u8; 64];
301        to_sign[..32].copy_from_slice(consensus.entry_hash.as_ref());
302        to_sign[32..].copy_from_slice(consensus.mutations_hash.as_ref());
303
304        let entry = self.get_entry_by_hash(&consensus.entry_hash).ok_or(Error::BadEtf("invalid_entry"))?;
305        //let curr_h = self.get_temporal_height()?.ok_or(Error::KvCell("temporal_height_missing"))?;
306
307        // if entry.header.height > curr_h {
308        //     return Err(Error::BadEtf("too_far_in_future"));
309        // }
310
311        let trainers = self.trainers_for_height(entry.header.height).ok_or(Error::KvCell("trainers_for_height"))?;
312        if trainers.is_empty() {
313            return Err(Error::KvCell("trainers_for_height:empty"));
314        }
315
316        let consensus_mask = consensus.mask();
317        let mask = if consensus_mask.is_empty() { bitvec![u8, Msb0; 1; trainers.len()] } else { consensus_mask };
318
319        let score = get_bits_percentage(&mask, trainers.len());
320        if score < 0.67 {
321            return Err(Error::BadEtf("consensus_too_low"));
322        }
323
324        let signed_pks = unmask_trainers(&mask, &trainers);
325        let agg_pk = bls::aggregate_public_keys(&signed_pks).map_err(|_| Error::BadEtf("bls_aggregate_failed"))?;
326        let sig = consensus.signature().ok_or(Error::BadEtf("invalid_signature_length"))?;
327        bls::verify(&*agg_pk, &*sig, &to_sign, DST_ATT).map_err(|_| Error::BadEtf("invalid_signature"))?;
328
329        Ok(mask)
330    }
331
332    pub fn best_consensus_by_entryhash(
333        &self,
334        trainers: &[PublicKey],
335        entry_hash: &[u8],
336    ) -> Result<(Option<[u8; 32]>, Option<f64>, Option<StoredConsensus>), Error> {
337        let prefix = format!("consensus:{}:", hex::encode(entry_hash));
338        let items = self.db.iter_prefix(CF_ATTESTATION, prefix.as_bytes())?;
339
340        if items.is_empty() {
341            return Ok((None, None, None));
342        }
343
344        let mut consensuses = Vec::new();
345        for (key, value) in items {
346            if let Ok(key_str) = std::str::from_utf8(&key) {
347                let parts: Vec<&str> = key_str.split(':').collect();
348                if parts.len() >= 3 {
349                    if let Ok(mutations_hash) = hex::decode(parts[2]) {
350                        if mutations_hash.len() == 32 {
351                            if let Some(stored) = parse_stored_consensus(&value) {
352                                let mut hash_array = [0u8; 32];
353                                hash_array.copy_from_slice(&mutations_hash);
354                                consensuses.push((hash_array, stored));
355                            }
356                        }
357                    }
358                }
359            }
360        }
361
362        let best = consensuses
363            .into_iter()
364            .map(|(hash, consensus)| {
365                let score = get_bits_percentage(&consensus.mask, trainers.len());
366                (hash, score, consensus)
367            })
368            .max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
369
370        Ok(best.map_or((None, None, None), |(h, s, c)| (Some(h), Some(s), Some(c))))
371    }
372
373    /// Sets temporal entry hash and height
374    pub fn set_temporal_hash_height(&self, entry: &Entry) -> Result<(), Error> {
375        let cf_sysconf = self.db.inner.cf_handle(CF_SYSCONF).unwrap();
376
377        let txn = self.db.begin_transaction();
378        txn.put_cf(&cf_sysconf, b"temporal_tip", &entry.hash)?;
379        let height_term = encode_safe_deterministic(&u64_to_term(entry.header.height));
380        txn.put_cf(&cf_sysconf, b"temporal_height", &height_term)?;
381        txn.commit()?;
382
383        Ok(())
384    }
385
386    pub fn get_temporal_entry(&self) -> Result<Option<Entry>, Error> {
387        Ok(self.get_temporal_hash()?.and_then(|h| self.get_entry_by_hash(&Hash::from(h))))
388    }
389
390    pub fn get_temporal_hash(&self) -> Result<Option<[u8; 32]>, Error> {
391        match self.db.get(CF_SYSCONF, b"temporal_tip")? {
392            Some(rt) => Ok(Some(rt.try_into().map_err(|_| Error::KvCell("temporal_tip"))?)),
393            None => Ok(None),
394        }
395    }
396
397    pub fn get_temporal_height(&self) -> Result<Option<u64>, Error> {
398        // prioritize the actual entry height over stored value (may be stale)
399        if let Some(entry) = self.get_temporal_entry()? {
400            return Ok(Some(entry.header.height));
401        }
402
403        match self.db.get(CF_SYSCONF, b"temporal_height")? {
404            Some(hb) => {
405                // Try u64 big-endian bytes (8 bytes)
406                if hb.len() == 8 {
407                    let arr: [u8; 8] = hb.try_into().map_err(|_| Error::KvCell("temporal_height"))?;
408                    return Ok(Some(u64::from_be_bytes(arr)));
409                }
410                // Try u32 big-endian bytes (4 bytes) for backward compatibility
411                if hb.len() == 4 {
412                    let arr: [u8; 4] = hb.try_into().map_err(|_| Error::KvCell("temporal_height"))?;
413                    return Ok(Some(u32::from_be_bytes(arr) as u64));
414                }
415                // Try ETF term (for Elixir compatibility)
416                if let Ok(term) = eetf::Term::decode(&mut std::io::Cursor::new(&hb)) {
417                    if let Some(height) = TermExt::get_integer(&term) {
418                        return Ok(Some(height as u64));
419                    }
420                }
421                Err(Error::KvCell("temporal_height"))
422            }
423            None => Ok(None),
424        }
425    }
426
427    /// Sets rooted entry hash and height
428    pub fn set_rooted_hash_height(&self, entry: &Entry) -> Result<(), Error> {
429        let cf_sysconf = self.db.inner.cf_handle(CF_SYSCONF).unwrap();
430
431        let txn = self.db.begin_transaction();
432        txn.put_cf(&cf_sysconf, b"rooted_tip", &entry.hash)?;
433        txn.put_cf(&cf_sysconf, b"rooted_height", entry.header.height.to_string().as_bytes())?;
434        txn.commit()?;
435
436        Ok(())
437    }
438
439    pub fn get_rooted_entry(&self) -> Result<Option<Entry>, Error> {
440        Ok(self.get_rooted_hash()?.and_then(|h| self.get_entry_by_hash(&Hash::from(h))))
441    }
442
443    pub fn get_rooted_hash(&self) -> Result<Option<[u8; 32]>, Error> {
444        match self.db.get(CF_SYSCONF, b"rooted_tip")? {
445            Some(rt) => Ok(Some(rt.try_into().map_err(|_| Error::KvCell("rooted_tip"))?)),
446            None => Ok(None),
447        }
448    }
449
450    pub fn get_rooted_height(&self) -> Result<Option<u64>, Error> {
451        // prioritize the actual entry height over stored value (may be stale)
452        if let Some(entry) = self.get_rooted_entry()? {
453            return Ok(Some(entry.header.height));
454        }
455
456        match self.db.get(CF_SYSCONF, b"rooted_height")? {
457            Some(hb) => {
458                // Try string format (primary format)
459                if let Ok(s) = std::str::from_utf8(&hb) {
460                    if let Ok(height) = s.parse::<u64>() {
461                        return Ok(Some(height));
462                    }
463                }
464                // Try ETF term (for Elixir compatibility / migration)
465                if let Ok(term) = eetf::Term::decode(&mut std::io::Cursor::new(&hb)) {
466                    if let Some(height) = TermExt::get_integer(&term) {
467                        return Ok(Some(height as u64));
468                    }
469                }
470                Err(Error::KvCell("rooted_height"))
471            }
472            None => Ok(None),
473        }
474    }
475
476    // Convenience wrappers for NodePeers and other components to avoid direct RocksDb usage
477    pub fn get_temporal_height_or_0(&self) -> u64 {
478        self.get_temporal_height().ok().flatten().unwrap_or(0)
479    }
480
481    pub fn get_chain_epoch_or_0(&self) -> u64 {
482        self.get_temporal_height_or_0() / 100_000
483    }
484
485    pub fn trainers_for_height(&self, height: u64) -> Option<Vec<PublicKey>> {
486        amadeus_runtime::consensus::bic::epoch::trainers_for_height(self.db(), height)
487    }
488
489    pub fn get_muts_rev(&self, hash: &Hash) -> Result<Option<Vec<u8>>, Error> {
490        let key = format!("entry:{}:muts_rev", hex::encode(hash));
491        Ok(self.db.get(CF_ENTRY_META, key.as_bytes())?)
492    }
493
494    pub fn put_muts_rev(&self, hash: &Hash, data: &[u8]) -> Result<(), Error> {
495        let key = format!("entry:{}:muts_rev", hex::encode(hash));
496        self.db.put(CF_ENTRY_META, key.as_bytes(), data)?;
497        Ok(())
498    }
499
500    pub fn delete_muts_rev(&self, hash: &Hash) -> Result<(), Error> {
501        let key = format!("entry:{}:muts_rev", hex::encode(hash));
502        self.db.delete(CF_ENTRY_META, key.as_bytes())?;
503        Ok(())
504    }
505
506    pub fn get_muts(&self, hash: &Hash) -> Result<Option<Vec<u8>>, Error> {
507        let key = format!("entry:{}:muts", hex::encode(hash));
508        Ok(self.db.get(CF_ENTRY_META, key.as_bytes())?)
509    }
510
511    pub fn put_muts(&self, hash: &Hash, data: &[u8]) -> Result<(), Error> {
512        let key = format!("entry:{}:muts", hex::encode(hash));
513        self.db.put(CF_ENTRY_META, key.as_bytes(), data)?;
514        Ok(())
515    }
516
517    pub fn put_attestation(&self, hash: &Hash, data: &[u8]) -> Result<(), Error> {
518        let attestation = Attestation::from_vecpak_bin(data).ok_or(Error::BadEtf("attestation_unpack_failed"))?;
519        let entry = self.get_entry_by_hash(hash).ok_or(Error::BadEtf("entry_not_found"))?;
520
521        let key = format!(
522            "attestation:{}:{}:{}:{}",
523            amadeus_utils::database::pad_integer(entry.header.height),
524            hex::encode(hash),
525            hex::encode(&attestation.signer),
526            hex::encode(&attestation.mutations_hash)
527        );
528        self.db.put(CF_ATTESTATION, key.as_bytes(), &attestation.to_vecpak_bin())?;
529
530        Ok(())
531    }
532
533    pub fn delete_attestation(&self, hash: &Hash) -> Result<(), Error> {
534        let entry = self.get_entry_by_hash(hash).ok_or(Error::BadEtf("entry_not_found"))?;
535        let my_signer = self.db.get(CF_SYSCONF, b"trainer_pk")?.ok_or(Error::BadEtf("no_trainer_pk"))?;
536
537        let prefix = format!(
538            "attestation:{}:{}:{}:",
539            amadeus_utils::database::pad_integer(entry.header.height),
540            hex::encode(hash),
541            hex::encode(&my_signer)
542        );
543
544        for (key, _) in self.db.iter_prefix(CF_ATTESTATION, prefix.as_bytes())?.iter() {
545            self.db.delete(CF_ATTESTATION, key)?;
546        }
547
548        Ok(())
549    }
550
551    pub fn put_entry_seen_time(&self, hash: &Hash, seen_time: u64) -> Result<(), Error> {
552        let key = format!("entry:{}:seentime", hex::encode(hash));
553        self.db.put(CF_ENTRY_META, key.as_bytes(), seen_time.to_string().as_bytes())?;
554        Ok(())
555    }
556
557    pub fn delete_entry_seen_time(&self, hash: &Hash) -> Result<(), Error> {
558        let key = format!("entry:{}:seentime", hex::encode(hash));
559        self.db.delete(CF_ENTRY_META, key.as_bytes())?;
560        Ok(())
561    }
562
563    pub fn get_entry_seen_time(&self, hash: &Hash) -> Result<Option<u64>, Error> {
564        let key = format!("entry:{}:seentime", hex::encode(hash));
565        if let Some(bin) = self.db.get(CF_ENTRY_META, key.as_bytes())? {
566            if let Ok(s) = std::str::from_utf8(&bin) {
567                if let Ok(val) = s.parse::<u64>() {
568                    return Ok(Some(val));
569                }
570            }
571            return Err(Error::BadEtf("seen_time_format"));
572        }
573        Ok(None)
574    }
575
576    pub fn delete_consensus(&self, hash: &Hash) -> Result<(), Error> {
577        let prefix = format!("consensus:{}:", hex::encode(hash));
578        for (key, _) in self.db.iter_prefix(CF_ATTESTATION, prefix.as_bytes())?.iter() {
579            self.db.delete(CF_ATTESTATION, &key)?;
580        }
581        Ok(())
582    }
583
584    pub fn delete_entry(&self, hash: &Hash) -> Result<(), Error> {
585        self.db.delete(CF_ENTRY, hash.as_ref())?;
586        Ok(())
587    }
588
589    pub fn delete_entry_by_height(&self, height_key: &[u8]) -> Result<(), Error> {
590        self.db.delete(CF_ENTRY_META, height_key)?;
591        Ok(())
592    }
593
594    pub fn delete_entry_by_slot(&self, slot_key: &[u8]) -> Result<(), Error> {
595        self.db.delete(CF_ENTRY_META, slot_key)?;
596        Ok(())
597    }
598
599    pub fn put_tx_metadata(&self, key: &[u8], tx: &[u8]) -> Result<(), Error> {
600        let cf_tx = self.db.inner.cf_handle(CF_TX).unwrap();
601
602        let txn = self.db.begin_transaction();
603        txn.put_cf(&cf_tx, key, tx)?;
604        txn.commit()?;
605
606        Ok(())
607    }
608
609    pub fn delete_tx_metadata(&self, hash: &Hash) -> Result<(), Error> {
610        let cf_tx = self.db.inner.cf_handle(CF_TX).unwrap();
611
612        let txn = self.db.begin_transaction();
613        txn.delete_cf(&cf_tx, hash.as_ref() as &[u8])?;
614        txn.commit()?;
615
616        Ok(())
617    }
618
619    pub fn put_tx_account_nonce(&self, key: &[u8], tx_hash: &Hash) -> Result<(), Error> {
620        let cf_nonce = self.db.inner.cf_handle(CF_TX_ACCOUNT_NONCE).unwrap();
621
622        let txn = self.db.begin_transaction();
623        txn.put_cf(&cf_nonce, key, tx_hash.as_ref() as &[u8])?;
624        txn.commit()?;
625
626        Ok(())
627    }
628
629    pub fn delete_tx_account_nonce(&self, key: &[u8]) -> Result<(), Error> {
630        let cf_nonce = self.db.inner.cf_handle(CF_TX_ACCOUNT_NONCE).unwrap();
631
632        let txn = self.db.begin_transaction();
633        txn.delete_cf(&cf_nonce, key)?;
634        txn.commit()?;
635
636        Ok(())
637    }
638
639    pub fn put_entry_raw(&self, hash: &Hash, data: &[u8]) -> Result<(), Error> {
640        let cf_entry = self.db.inner.cf_handle(CF_ENTRY).unwrap();
641
642        let txn = self.db.begin_transaction();
643        txn.put_cf(&cf_entry, hash, data)?;
644        txn.commit()?;
645
646        Ok(())
647    }
648
649    pub fn get_entry_raw(&self, hash: &Hash) -> Result<Option<Vec<u8>>, Error> {
650        let entry_cf = CF_ENTRY;
651        Ok(self.db.get(entry_cf, hash.as_ref())?)
652    }
653
654    fn clean_muts_rev_range(&self, start: u64, end: u64) -> Result<(), crate::utils::rocksdb::Error> {
655        use amadeus_utils::database::pad_integer;
656
657        let cf_entry_meta = self.db.inner.cf_handle(CF_ENTRY_META).unwrap();
658
659        let start_key = format!("by_height:{}:", pad_integer(start));
660        let end_key = format!("by_height:{}:", pad_integer(end + 1));
661
662        let txn = self.db.begin_transaction();
663        let mut opts = ReadOptions::default();
664        opts.set_total_order_seek(true);
665        let iter =
666            txn.iterator_cf_opt(&cf_entry_meta, opts, IteratorMode::From(start_key.as_bytes(), Direction::Forward));
667
668        let mut deleted_hashes = Vec::new();
669        for item in iter {
670            let (k, v) = item?;
671            if k.as_ref() >= end_key.as_bytes() {
672                break;
673            }
674            if let Ok(key_str) = std::str::from_utf8(&k) {
675                if key_str.starts_with("by_height:") {
676                    deleted_hashes.push(v.to_vec());
677                }
678            }
679        }
680
681        let ops = deleted_hashes.len();
682        for hash in deleted_hashes {
683            let muts_rev_key = format!("entry:{}:muts_rev", hex::encode(&hash));
684            let _ = txn.delete_cf(&cf_entry_meta, muts_rev_key.as_bytes());
685        }
686
687        if ops > 0 {
688            txn.commit()?;
689        }
690
691        Ok(())
692    }
693
694    /// Return true if our trainer_pk is included in trainers_for_height(chain_height()+1)
695    pub fn are_we_trainer(&self, config: &crate::config::Config) -> bool {
696        let Some(h) = self.get_temporal_height().ok().flatten() else { return false };
697        let Some(trainers) = self.trainers_for_height(h + 1) else { return false };
698        trainers.iter().any(|pk| pk == &config.get_pk())
699    }
700
701    /// Select trainer for a slot from the roster for the corresponding height
702    pub fn get_trainer_for_slot(&self, height: u64, slot: u64) -> Option<PublicKey> {
703        let trainers = self.trainers_for_height(height)?;
704        if trainers.is_empty() {
705            return None;
706        }
707        let idx = slot.rem_euclid(trainers.len() as u64) as usize;
708        trainers.get(idx).copied()
709    }
710
711    pub fn get_trainer_for_current_slot(&self) -> Option<PublicKey> {
712        let h = self.get_temporal_height().ok()??;
713        self.get_trainer_for_slot(h, h)
714    }
715
716    pub fn get_trainer_for_next_slot(&self) -> Option<PublicKey> {
717        let h = self.get_temporal_height().ok()??;
718        self.get_trainer_for_slot(h + 1, h + 1)
719    }
720
721    pub fn are_we_trainer_for_next_slot(&self, config: &crate::config::Config) -> bool {
722        match self.get_trainer_for_next_slot() {
723            Some(pk) => pk == config.get_pk(),
724            None => false,
725        }
726    }
727
728    pub fn is_in_chain(&self, target_hash: &Hash) -> bool {
729        // check if entry exists
730        let target_entry = match self.get_entry_by_hash(target_hash) {
731            Some(e) => e,
732            None => return false,
733        };
734
735        let target_height = target_entry.header.height;
736
737        // get tip entry
738        let tip_hash = match self.get_temporal_hash() {
739            Ok(Some(h)) => h,
740            _ => return false,
741        };
742        let tip_entry = match self.get_entry_by_hash(&Hash::from(tip_hash)) {
743            Some(e) => e,
744            None => return false,
745        };
746
747        let tip_height = tip_entry.header.height;
748
749        // if target is higher than tip, it can't be in chain
750        if tip_height < target_height {
751            return false;
752        }
753
754        // walk back from tip to target height
755        self.is_in_chain_internal(&tip_entry.hash, target_hash, target_height)
756    }
757
758    fn is_in_chain_internal(&self, current_hash: &Hash, target_hash: &Hash, target_height: u64) -> bool {
759        // check if we found the target
760        if current_hash == target_hash {
761            return true;
762        }
763
764        // get current entry
765        let current_entry = match self.get_entry_by_hash(current_hash) {
766            Some(e) => e,
767            None => return false,
768        };
769
770        // if we're below target height, target is not in chain
771        if current_entry.header.height <= target_height {
772            return false;
773        }
774
775        // continue walking back
776        self.is_in_chain_internal(&current_entry.header.prev_hash, target_hash, target_height)
777    }
778
779    /// Check if entry is in its designated slot
780    pub fn validate_entry_slot_trainer(&self, entry: &Entry, prev_slot: u64) -> bool {
781        let next_slot = entry.header.slot;
782        let slot_trainer = self.get_trainer_for_slot(entry.header.height, next_slot);
783
784        // check incremental slot
785        if (next_slot as i64) - (prev_slot as i64) != 1 {
786            return false;
787        }
788
789        // check trainer authorization
790        match slot_trainer {
791            Some(expected_trainer) if entry.header.signer == expected_trainer => true,
792            Some(_) if entry.mask.is_some() => {
793                // aggregate signature path - check if score >= 0.67
794                let trainers = self.trainers_for_height(entry.header.height).unwrap_or_default();
795                let score = get_bits_percentage(entry.mask.as_ref().unwrap(), trainers.len());
796                score >= 0.67
797            }
798            _ => false,
799        }
800    }
801
802    pub fn start_proc_consensus(&self) {
803        let cf_sysconf = self.db.inner.cf_handle(CF_SYSCONF).unwrap();
804
805        let txn = self.db.begin_transaction();
806        let _ = txn.put_cf(&cf_sysconf, b"proc_consensus", &[1]);
807        let _ = txn.commit();
808    }
809
810    pub fn stop_proc_consensus(&self) {
811        let cf_sysconf = self.db.inner.cf_handle(CF_SYSCONF).unwrap();
812
813        let txn = self.db.begin_transaction();
814        let _ = txn.put_cf(&cf_sysconf, b"proc_consensus", &[0]);
815        let _ = txn.commit();
816    }
817
818    pub fn is_proc_consensus(&self) -> bool {
819        self.db.get(CF_SYSCONF, b"proc_consensus").ok().flatten().map_or(false, |v| v[0] == 1)
820    }
821
822    // Chain state query functions - read from CF_CONTRACTSTATE column family
823
824    /// Get the chain nonce for a given public key
825    pub fn chain_nonce(&self, public_key: &[u8]) -> Option<u64> {
826        let cf = self.db.inner.cf_handle(amadeus_utils::constants::CF_CONTRACTSTATE)?;
827        let key = format!("account:{}:nonce", hex::encode(public_key));
828        self.db
829            .inner
830            .get_cf(&cf, key.as_bytes())
831            .ok()
832            .flatten()
833            .and_then(|bytes| std::str::from_utf8(&bytes).ok().and_then(|s| s.parse::<u64>().ok()))
834    }
835
836    /// Get the chain balance for a given public key (native AMA token)
837    pub fn chain_balance(&self, public_key: &[u8]) -> i128 {
838        let cf = match self.db.inner.cf_handle(amadeus_utils::constants::CF_CONTRACTSTATE) {
839            Some(cf) => cf,
840            None => return 0,
841        };
842        let key = format!("account:{}:balance:AMA", hex::encode(public_key));
843        self.db
844            .inner
845            .get_cf(&cf, key.as_bytes())
846            .ok()
847            .flatten()
848            .and_then(|bytes| std::str::from_utf8(&bytes).ok().and_then(|s| s.parse::<i128>().ok()))
849            .unwrap_or(0)
850    }
851
852    /// Get the chain difficulty bits
853    pub fn chain_diff_bits(&self) -> u64 {
854        let cf = match self.db.inner.cf_handle(amadeus_utils::constants::CF_CONTRACTSTATE) {
855            Some(cf) => cf,
856            None => return 128, // default difficulty
857        };
858        self.db
859            .inner
860            .get_cf(&cf, b"bic:sol:diff")
861            .ok()
862            .flatten()
863            .and_then(|bytes| std::str::from_utf8(&bytes).ok().and_then(|s| s.parse::<u64>().ok()))
864            .unwrap_or(128)
865    }
866
867    /// Get the chain segment VR hash
868    pub fn chain_segment_vr_hash(&self) -> Option<Vec<u8>> {
869        let cf = self.db.inner.cf_handle(amadeus_utils::constants::CF_CONTRACTSTATE)?;
870        self.db.inner.get_cf(&cf, b"segment:vr_hash").ok().flatten()
871    }
872
873    /// Get balance for a specific account and symbol from the chain state
874    pub fn chain_balance_symbol(&self, public_key: &[u8], symbol: &[u8]) -> i128 {
875        let cf = match self.db.inner.cf_handle(amadeus_utils::constants::CF_CONTRACTSTATE) {
876            Some(cf) => cf,
877            None => return 0,
878        };
879        let key = format!("account:{}:balance:{}", hex::encode(public_key), std::str::from_utf8(symbol).unwrap_or(""));
880        self.db
881            .inner
882            .get_cf(&cf, key.as_bytes())
883            .ok()
884            .flatten()
885            .and_then(|bytes| std::str::from_utf8(&bytes).ok().and_then(|s| s.parse::<i128>().ok()))
886            .unwrap_or(0)
887    }
888
889    /// Get the total number of solutions from the chain state
890    pub fn chain_total_sols(&self) -> u64 {
891        let cf = match self.db.inner.cf_handle(amadeus_utils::constants::CF_CONTRACTSTATE) {
892            Some(cf) => cf,
893            None => return 0,
894        };
895        self.db
896            .inner
897            .get_cf(&cf, b"bic:sol:total")
898            .ok()
899            .flatten()
900            .and_then(|bytes| std::str::from_utf8(&bytes).ok().and_then(|s| s.parse::<u64>().ok()))
901            .unwrap_or(0)
902    }
903}
904
905// Standalone chain query functions for use when only RocksDb is available
906pub mod chain_queries {
907    use crate::utils::rocksdb::RocksDb;
908
909    /// Get the chain nonce for a given public key
910    pub fn chain_nonce(db: &RocksDb, public_key: &[u8]) -> Option<u64> {
911        let cf = db.inner.cf_handle(amadeus_utils::constants::CF_CONTRACTSTATE)?;
912        let key = format!("account:{}:nonce", hex::encode(public_key));
913        db.inner
914            .get_cf(&cf, key.as_bytes())
915            .ok()
916            .flatten()
917            .and_then(|bytes| std::str::from_utf8(&bytes).ok().and_then(|s| s.parse::<u64>().ok()))
918    }
919
920    /// Get the chain balance for a given public key (native AMA token)
921    pub fn chain_balance(db: &RocksDb, public_key: &[u8]) -> i128 {
922        let cf = match db.inner.cf_handle(amadeus_utils::constants::CF_CONTRACTSTATE) {
923            Some(cf) => cf,
924            None => return 0,
925        };
926        let key = format!("account:{}:balance:AMA", hex::encode(public_key));
927        db.inner
928            .get_cf(&cf, key.as_bytes())
929            .ok()
930            .flatten()
931            .and_then(|bytes| std::str::from_utf8(&bytes).ok().and_then(|s| s.parse::<i128>().ok()))
932            .unwrap_or(0)
933    }
934
935    /// Get the chain difficulty bits
936    pub fn chain_diff_bits(db: &RocksDb) -> u64 {
937        let cf = match db.inner.cf_handle(amadeus_utils::constants::CF_CONTRACTSTATE) {
938            Some(cf) => cf,
939            None => return 128, // default difficulty
940        };
941        db.inner
942            .get_cf(&cf, b"bic:sol:diff")
943            .ok()
944            .flatten()
945            .and_then(|bytes| std::str::from_utf8(&bytes).ok().and_then(|s| s.parse::<u64>().ok()))
946            .unwrap_or(128)
947    }
948
949    /// Get the chain segment VR hash
950    pub fn chain_segment_vr_hash(db: &RocksDb) -> Option<Vec<u8>> {
951        let cf = db.inner.cf_handle(amadeus_utils::constants::CF_CONTRACTSTATE)?;
952        db.inner.get_cf(&cf, b"segment:vr_hash").ok().flatten()
953    }
954}
955
956#[derive(Debug, Clone, PartialEq, Eq)]
957pub struct StoredConsensus {
958    pub mask: BitVec<u8, Msb0>,
959    pub agg_sig: Signature,
960}
961
962#[allow(dead_code)]
963fn pack_consensus_map(map: &HashMap<[u8; 32], StoredConsensus>) -> Result<Vec<u8>, Error> {
964    use amadeus_utils::vecpak::{self, Term as VTerm};
965
966    let mut entries = Vec::new();
967    for (mut_hash, v) in map.iter() {
968        let mask_bytes = bitvec_to_bin(&v.mask);
969        let consensus_data = VTerm::PropList(vec![
970            (VTerm::Binary(b"mask".to_vec()), VTerm::Binary(mask_bytes)),
971            (VTerm::Binary(b"agg_sig".to_vec()), VTerm::Binary(v.agg_sig.to_vec())),
972        ]);
973        entries.push((VTerm::Binary(mut_hash.to_vec()), consensus_data));
974    }
975    let term = VTerm::PropList(entries);
976    Ok(vecpak::encode(term))
977}
978
979fn extract_mask_from_consensus_term(term: &Term) -> Option<BitVec<u8, Msb0>> {
980    use amadeus_utils::vecpak::VecpakExt;
981
982    let map = term.get_proplist_map()?;
983    let mask_bytes: Vec<u8> = map.get_binary(b"mask")?;
984    Some(bin_to_bitvec(mask_bytes))
985}
986
987fn parse_stored_consensus(bin: &[u8]) -> Option<StoredConsensus> {
988    use amadeus_utils::vecpak::VecpakExt;
989
990    let term = decode(bin).ok()?;
991    let map = term.get_proplist_map()?;
992
993    let mask_bytes: Vec<u8> = map.get_binary(b"mask")?;
994    let mask = bin_to_bitvec(mask_bytes);
995    let agg_sig: Signature = map.get_binary(b"agg_sig")?;
996
997    Some(StoredConsensus { mask, agg_sig })
998}
999
1000#[cfg(test)]
1001mod tests {
1002    use super::*;
1003
1004    #[tokio::test]
1005    async fn test_height_slot_indexing() {
1006        // initialize db for testing
1007        let test_path = format!("target/test_fabric_{}", std::process::id());
1008        let fab = Fabric::new(&test_path).await.unwrap();
1009
1010        // create test entry data
1011        let entry_hash1: Hash = Hash::new([1; 32]);
1012        let entry_hash2: Hash = Hash::new([2; 32]);
1013        let entry_bin1 = vec![1, 2, 3, 4];
1014        let entry_bin2 = vec![5, 6, 7, 8];
1015        let height = 12345;
1016        let slot1 = 67890;
1017        let slot2 = 67891;
1018        let seen_time = 1234567890;
1019
1020        // insert two entries with same height but different slots
1021        fab.insert_entry(&entry_hash1, height, slot1, &entry_bin1, seen_time).unwrap();
1022        fab.insert_entry(&entry_hash2, height, slot2, &entry_bin2, seen_time).unwrap();
1023
1024        // test querying by height should return both entries
1025        let entries = fab.entries_by_height(height as u64).unwrap();
1026        assert_eq!(entries.len(), 2);
1027        assert!(entries.contains(&entry_bin1));
1028        assert!(entries.contains(&entry_bin2));
1029
1030        // test querying by slot should return one entry each
1031        let entries_slot1 = fab.entries_by_slot(slot1).unwrap();
1032        assert_eq!(entries_slot1.len(), 1);
1033        assert_eq!(entries_slot1[0], entry_bin1);
1034
1035        let entries_slot2 = fab.entries_by_slot(slot2).unwrap();
1036        assert_eq!(entries_slot2.len(), 1);
1037        assert_eq!(entries_slot2[0], entry_bin2);
1038
1039        // test querying non-existent height/slot returns empty
1040        let empty_entries = fab.entries_by_height(99999).unwrap();
1041        assert!(empty_entries.is_empty());
1042
1043        let empty_slot = fab.entries_by_slot(99999).unwrap();
1044        assert!(empty_slot.is_empty());
1045    }
1046
1047    #[tokio::test]
1048    async fn test_clean_muts_rev_range() {
1049        let test_path = format!("target/test_clean_muts_{}", std::process::id());
1050        let fab = Fabric::new(&test_path).await.unwrap();
1051
1052        let h0: Hash = Hash::new([0; 32]);
1053        let h1: Hash = Hash::new([1; 32]);
1054        let h2: Hash = Hash::new([2; 32]);
1055        let h3: Hash = Hash::new([3; 32]);
1056        let h4: Hash = Hash::new([4; 32]);
1057        fab.insert_entry(&h0, 99, 999, &[0], 0).unwrap();
1058        fab.insert_entry(&h1, 100, 1000, &[1], 0).unwrap();
1059        fab.insert_entry(&h2, 101, 1001, &[2], 0).unwrap();
1060        fab.insert_entry(&h3, 102, 1002, &[3], 0).unwrap();
1061        fab.insert_entry(&h4, 103, 1003, &[4], 0).unwrap();
1062        fab.put_muts_rev(&h0, b"data0").unwrap();
1063        fab.put_muts_rev(&h1, b"data1").unwrap();
1064        fab.put_muts_rev(&h2, b"data2").unwrap();
1065        fab.put_muts_rev(&h3, b"data3").unwrap();
1066        fab.put_muts_rev(&h4, b"data4").unwrap();
1067
1068        fab.clean_muts_rev_range(100, 102).unwrap();
1069
1070        assert!(fab.get_muts_rev(&h0).unwrap().is_some());
1071        assert!(fab.get_muts_rev(&h1).unwrap().is_none());
1072        assert!(fab.get_muts_rev(&h2).unwrap().is_none());
1073        assert!(fab.get_muts_rev(&h3).unwrap().is_none());
1074        assert!(fab.get_muts_rev(&h4).unwrap().is_some());
1075    }
1076}