amadeus_node/consensus/doms/
entry.rs

1use crate::Context;
2use crate::config::ENTRY_SIZE;
3/// Entry is a consensus block in Amadeus
4use crate::consensus::agg_sig::{DST_ENTRY, DST_VRF};
5use crate::consensus::doms::tx::TxU;
6use crate::consensus::fabric;
7use crate::node::protocol;
8use crate::node::protocol::Protocol;
9use crate::utils::bls12_381;
10use crate::utils::misc::{TermExt, TermMap, bitvec_to_bools, bools_to_bitvec, get_unix_millis_now};
11use crate::utils::safe_etf::{encode_safe, encode_safe_deterministic};
12use crate::utils::{archiver, blake3};
13use eetf::{Atom, Binary, FixInteger, Map, Term};
14use std::collections::HashMap;
15use std::fmt;
16use std::net::Ipv4Addr;
17// use tracing::{instrument, warn};
18
19const MAX_TXS: usize = 100; // maximum number of transactions in an entry
20
21#[derive(Debug, thiserror::Error)]
22pub enum Error {
23    #[error(transparent)]
24    Io(#[from] std::io::Error),
25    #[error(transparent)]
26    EtfDecode(#[from] eetf::DecodeError),
27    #[error(transparent)]
28    EtfEncode(#[from] eetf::EncodeError),
29    #[error(transparent)]
30    BinDecode(#[from] bincode::error::DecodeError),
31    #[error(transparent)]
32    BinEncode(#[from] bincode::error::EncodeError),
33    #[error("invalid erlang etf: {0}")]
34    BadEtf(&'static str),
35    #[error("invalid signature")]
36    BadAggSignature,
37    #[error("wrong epoch or unsupported aggregate signature path")]
38    NoTrainers,
39    #[error("txs_hash invalid")]
40    BadTxsHash,
41    #[error(transparent)]
42    Tx(#[from] super::tx::Error),
43    #[error(transparent)]
44    Bls(#[from] bls12_381::Error),
45    #[error(transparent)]
46    Fabric(#[from] fabric::Error),
47    #[error(transparent)]
48    Archiver(#[from] archiver::Error),
49    #[error(transparent)]
50    RocksDb(#[from] crate::utils::rocksdb::Error),
51}
52
53/// Shared summary of an entry’s tip.
54#[derive(Debug, Clone)]
55pub struct EntrySummary {
56    pub header: EntryHeader,
57    pub signature: [u8; 96],
58    pub mask: Option<Vec<bool>>,
59}
60
61impl From<Entry> for EntrySummary {
62    fn from(entry: Entry) -> Self {
63        Self { header: entry.header, signature: entry.signature, mask: entry.mask }
64    }
65}
66
67impl EntrySummary {
68    /// Helper that reads an EntrySummary from an ETF term.
69    pub fn from_etf_term(map: &TermMap) -> Result<Self, Error> {
70        // allow empty map to represent "no tip" like the Elixir reference
71        if map.0.is_empty() {
72            return Ok(Self::empty());
73        }
74        let header_bin: Vec<u8> = map.get_binary("header").ok_or(Error::BadEtf("header"))?;
75        let signature = map.get_binary("signature").ok_or(Error::BadEtf("signature"))?;
76        let mask = map.get_binary("mask").map(bitvec_to_bools);
77        let header = EntryHeader::from_etf_bin(&header_bin).map_err(|_| Error::BadEtf("header"))?;
78        Ok(Self { header, signature, mask })
79    }
80
81    /// Convert EntrySummary to ETF term for encoding
82    pub fn to_etf_term(&self) -> Result<Term, Error> {
83        let mut m = HashMap::new();
84        m.insert(Term::Atom(Atom::from("header")), Term::from(Binary { bytes: self.header.to_etf_bin()? }));
85        m.insert(Term::Atom(Atom::from("signature")), Term::from(Binary { bytes: self.signature.to_vec() }));
86        if let Some(mask) = &self.mask {
87            m.insert(Term::Atom(Atom::from("mask")), Term::from(Binary { bytes: bools_to_bitvec(mask) }));
88        }
89        Ok(Term::from(Map { map: m }))
90    }
91
92    /// Empty summary placeholder used when tips are missing
93    pub fn empty() -> Self {
94        let header = EntryHeader {
95            height: 0,
96            slot: 0,
97            prev_slot: 0,
98            prev_hash: [0u8; 32],
99            dr: [0u8; 32],
100            vr: [0u8; 96],
101            signer: [0u8; 48],
102            txs_hash: [0u8; 32],
103        };
104        Self { header, signature: [0u8; 96], mask: None }
105    }
106}
107
108#[derive(Clone)]
109pub struct EntryHeader {
110    pub height: u32, // no need in u128 for next centuries
111    pub slot: u32,
112    pub prev_slot: i32, // is negative 1 in genesis entry
113    pub prev_hash: [u8; 32],
114    pub dr: [u8; 32], // deterministic random value
115    pub vr: [u8; 96], // verifiable random value
116    pub signer: [u8; 48],
117    pub txs_hash: [u8; 32],
118}
119
120impl fmt::Debug for EntryHeader {
121    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122        f.debug_struct("EntryHeader")
123            .field("slot", &self.slot)
124            .field("dr", &bs58::encode(&self.dr).into_string())
125            .field("height", &self.height)
126            .field("prev_hash", &bs58::encode(&self.prev_hash).into_string())
127            .field("prev_slot", &self.prev_slot)
128            .field("signer", &bs58::encode(&self.signer).into_string())
129            .field("txs_hash", &bs58::encode(&self.txs_hash).into_string())
130            .field("vr", &bs58::encode(&self.vr).into_string())
131            .finish()
132    }
133}
134
135impl EntryHeader {
136    pub fn from_etf_bin(bin: &[u8]) -> Result<Self, Error> {
137        let term = Term::decode(bin).map_err(Error::EtfDecode)?;
138        let map = term.get_term_map().ok_or(Error::BadEtf("entry-header-map"))?;
139
140        let height = map.get_integer("height").ok_or(Error::BadEtf("height"))?;
141        let slot = map.get_integer("slot").ok_or(Error::BadEtf("slot"))?;
142        let prev_slot = map.get_integer("prev_slot").ok_or(Error::BadEtf("prev_slot"))?;
143        let prev_hash = map.get_binary("prev_hash").ok_or(Error::BadEtf("prev_hash"))?;
144        let dr = map.get_binary("dr").ok_or(Error::BadEtf("dr"))?;
145        let vr = map.get_binary("vr").ok_or(Error::BadEtf("vr"))?;
146        let signer = map.get_binary("signer").ok_or(Error::BadEtf("signer"))?;
147        let txs_hash = map.get_binary("txs_hash").ok_or(Error::BadEtf("txs_hash"))?;
148
149        Ok(EntryHeader { height, slot, prev_slot, prev_hash, dr, vr, signer, txs_hash })
150    }
151
152    // Always deterministic
153    pub fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
154        let mut map = HashMap::new();
155        map.insert(Term::Atom(Atom::from("height")), Term::from(FixInteger { value: self.height as i32 }));
156        map.insert(Term::Atom(Atom::from("slot")), Term::from(FixInteger { value: self.slot as i32 }));
157        map.insert(Term::Atom(Atom::from("prev_slot")), Term::from(FixInteger { value: self.prev_slot as i32 }));
158        map.insert(Term::Atom(Atom::from("prev_hash")), Term::from(Binary { bytes: self.prev_hash.to_vec() }));
159        map.insert(Term::Atom(Atom::from("dr")), Term::from(Binary { bytes: self.dr.to_vec() }));
160        map.insert(Term::Atom(Atom::from("vr")), Term::from(Binary { bytes: self.vr.to_vec() }));
161        map.insert(Term::Atom(Atom::from("signer")), Term::from(Binary { bytes: self.signer.to_vec() }));
162        map.insert(Term::Atom(Atom::from("txs_hash")), Term::from(Binary { bytes: self.txs_hash.to_vec() }));
163
164        let term = Term::Map(Map { map });
165        let out = encode_safe_deterministic(&term);
166        Ok(out)
167    }
168}
169
170#[derive(Clone)]
171pub struct Entry {
172    pub hash: [u8; 32],
173    pub header: EntryHeader,
174    pub signature: [u8; 96],
175    pub mask: Option<Vec<bool>>, // vec<bool> in rust is special - its a packed vec<u8>
176    pub txs: Vec<Vec<u8>>,       // list of tx binaries that can be empty
177}
178
179impl Entry {
180    /// Pack entry to ETF deterministic format (like Elixir Entry.pack/1)
181    pub fn pack(&self) -> Result<Vec<u8>, Error> {
182        let mut map = HashMap::new();
183
184        // Convert header to ETF binary first
185        let header_bin = self.header.to_etf_bin()?;
186        map.insert(Term::Atom(Atom::from("header")), Term::from(Binary { bytes: header_bin }));
187
188        // Convert txs to ETF list of binaries
189        let txs_terms: Vec<Term> = self.txs.iter().map(|tx| Term::from(Binary { bytes: tx.clone() })).collect();
190        map.insert(Term::Atom(Atom::from("txs")), Term::from(eetf::List { elements: txs_terms }));
191
192        map.insert(Term::Atom(Atom::from("hash")), Term::from(Binary { bytes: self.hash.to_vec() }));
193        map.insert(Term::Atom(Atom::from("signature")), Term::from(Binary { bytes: self.signature.to_vec() }));
194
195        // Handle optional mask
196        if let Some(mask) = &self.mask {
197            let mask_bytes = bools_to_bitvec(mask);
198            map.insert(Term::Atom(Atom::from("mask")), Term::from(Binary { bytes: mask_bytes }));
199        }
200
201        let term = Term::from(eetf::Map { map });
202        let out = encode_safe_deterministic(&term);
203        Ok(out)
204    }
205
206    /// Unpack entry from ETF deterministic format (like Elixir Entry.unpack/1)
207    pub fn unpack(entry_packed: &[u8]) -> Result<Self, Error> {
208        let term = Term::decode(entry_packed)?;
209        let map = term.get_term_map().ok_or(Error::BadEtf("entry"))?;
210
211        let hash = map.get_binary("hash").ok_or(Error::BadEtf("hash"))?;
212        let header_bin: Vec<u8> = map.get_binary("header").ok_or(Error::BadEtf("header"))?;
213        let signature = map.get_binary("signature").ok_or(Error::BadEtf("signature"))?;
214        let mask = map.get_binary("mask").map(bitvec_to_bools);
215        let txs: Vec<Vec<u8>> =
216            map.get_list("txs").unwrap_or_default().iter().filter_map(TermExt::get_binary).map(Into::into).collect();
217
218        let header = EntryHeader::from_etf_bin(&header_bin)?;
219
220        Ok(Entry { hash, header, signature, mask, txs })
221    }
222}
223
224impl TryFrom<&[u8]> for Entry {
225    type Error = Error;
226
227    fn try_from(bin: &[u8]) -> Result<Self, Self::Error> {
228        Self::unpack(bin)
229    }
230}
231
232impl TryInto<Vec<u8>> for Entry {
233    type Error = Error;
234
235    fn try_into(self) -> Result<Vec<u8>, Self::Error> {
236        self.pack()
237    }
238}
239
240impl crate::utils::misc::Typename for Entry {
241    fn typename(&self) -> &'static str {
242        Self::TYPENAME
243    }
244}
245
246#[async_trait::async_trait]
247impl Protocol for Entry {
248    fn from_etf_map_validated(map: TermMap) -> Result<Self, protocol::Error> {
249        let bin = map.get_binary("entry_packed").ok_or(Error::BadEtf("entry_packed"))?;
250        Entry::from_etf_bin_validated(bin, ENTRY_SIZE).map_err(Into::into)
251    }
252    fn to_etf_bin(&self) -> Result<Vec<u8>, protocol::Error> {
253        // encode entry using ETF deterministic format
254        let entry_bin: Vec<u8> = self.pack().map_err(|_| protocol::Error::BadEtf("entry"))?;
255
256        let mut m = HashMap::new();
257        m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
258        m.insert(Term::Atom(Atom::from("entry_packed")), Term::from(Binary { bytes: entry_bin }));
259
260        let term = Term::from(Map { map: m });
261        let out = encode_safe(&term);
262        Ok(out)
263    }
264
265    async fn handle(&self, ctx: &Context, _src: Ipv4Addr) -> Result<Vec<protocol::Instruction>, protocol::Error> {
266        let height = self.header.height;
267
268        // compute rooted_tip_height if possible
269        let rooted_height = ctx
270            .fabric
271            .get_rooted_hash()
272            .ok()
273            .flatten()
274            .map(TryInto::try_into)
275            .and_then(|h| h.ok())
276            .and_then(|h| ctx.fabric.get_entry_by_hash(&h))
277            .map(|e| e.header.height)
278            .unwrap_or(0);
279
280        if height >= rooted_height {
281            let hash = self.hash;
282            let epoch = self.get_epoch();
283            let slot = self.header.slot; // height is the same as slot in amadeus
284            let bin: Vec<u8> = self.clone().try_into()?;
285
286            ctx.fabric.insert_entry(&hash, height, slot, &bin, get_unix_millis_now())?;
287            archiver::store(bin, format!("epoch-{}", epoch), format!("entry-{}", height)).await?;
288        }
289
290        Ok(vec![protocol::Instruction::Noop { why: "entry handling not implemented".to_string() }])
291    }
292}
293
294impl fmt::Debug for Entry {
295    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
296        f.debug_struct("Entry")
297            .field("hash", &bs58::encode(&self.hash).into_string())
298            .field("header", &self.header)
299            .field("signature", &bs58::encode(&self.signature).into_string())
300            .field("txs", &self.txs.iter().map(|tx| bs58::encode(tx).into_string()).collect::<Vec<String>>())
301            .finish()
302    }
303}
304
305impl Entry {
306    pub const TYPENAME: &'static str = "event_entry";
307
308    pub fn to_etf_bin(&self) -> Result<Vec<u8>, protocol::Error> {
309        // encode entry using ETF deterministic format
310        let entry_bin: Vec<u8> = self.pack().map_err(|_| protocol::Error::BadEtf("entry"))?;
311
312        let mut m = HashMap::new();
313        m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
314        m.insert(Term::Atom(Atom::from("entry_packed")), Term::from(Binary { bytes: entry_bin }));
315
316        let term = Term::from(Map { map: m });
317        let out = encode_safe(&term);
318        Ok(out)
319    }
320
321    pub fn from_etf_bin_validated(bin: &[u8], entry_size_limit: usize) -> Result<Entry, Error> {
322        if bin.len() >= entry_size_limit {
323            return Err(Error::BadEtf("entry_bin_too_large"));
324        }
325
326        // Validate deterministic ETF encoding first
327        let parsed_entry = Entry::unpack(bin)?;
328        let repacked = parsed_entry.pack()?;
329        if bin != repacked {
330            return Err(Error::BadEtf("not_deterministicly_encoded"));
331        }
332
333        // Validate header deterministic encoding
334        let header_repacked = parsed_entry.header.to_etf_bin()?;
335        // Note: We need to extract original header binary from the entry to compare
336        let term = Term::decode(bin)?;
337        let map = term.get_term_map().ok_or(Error::BadEtf("entry"))?;
338        let original_header_bin = map.get_binary("header").ok_or(Error::BadEtf("header"))?;
339        if original_header_bin != header_repacked {
340            return Err(Error::BadEtf("not_deterministicly_encoded_header"));
341        }
342
343        let parsed = ParsedEntry { entry: parsed_entry, header_bin: original_header_bin };
344        parsed.validate_signature()?;
345        let is_special = parsed.entry.mask.is_some();
346        parsed.entry.validate_contents(is_special)?;
347
348        Ok(parsed.entry)
349    }
350
351    fn validate_contents(&self, is_special_meeting_block: bool) -> Result<(), Error> {
352        if self.txs.len() > MAX_TXS {
353            return Err(Error::BadEtf("txs_len_over_100"));
354        }
355
356        let txs_bin = self.txs.iter().flatten().cloned().collect::<Vec<u8>>();
357        if self.header.txs_hash.as_slice() != blake3::hash(&txs_bin).as_slice() {
358            return Err(Error::BadTxsHash);
359        }
360
361        for txp in &self.txs {
362            super::tx::validate(txp, is_special_meeting_block)?;
363        }
364
365        Ok(())
366    }
367
368    /// Build next header skeleton similar to Entry.build_next/2.
369    /// This requires chain state (pk/sk), so we only provide a helper to derive next header fields given inputs.
370    pub fn build_next_header(&self, slot: u32, signer_pk: &[u8; 48], signer_sk: &[u8]) -> Result<EntryHeader, Error> {
371        // dr' = blake3(dr)
372        let dr = blake3::hash(&self.header.dr);
373        // vr' = sign(sk, prev_vr, DST_VRF)
374        let vr = bls12_381::sign(signer_sk, &self.header.vr, DST_VRF)?;
375
376        Ok(EntryHeader {
377            slot,
378            height: self.header.height + 1,
379            prev_slot: self.header.slot as i32,
380            prev_hash: self.hash,
381            dr,
382            vr,
383            signer: *signer_pk,
384            txs_hash: [0u8; 32], // to be filled when txs are known
385        })
386    }
387
388    pub fn get_epoch(&self) -> u32 {
389        self.header.height / 100_000
390    }
391
392    pub fn contains_tx(&self, tx_function: &str) -> bool {
393        self.txs.iter().any(|txp| {
394            if let Ok(txu) = TxU::from_vanilla(txp) {
395                if let Some(first) = txu.tx.actions.first() { first.function == tx_function } else { false }
396            } else {
397                false
398            }
399        })
400    }
401}
402
403#[derive(Debug, Clone)]
404struct ParsedEntry {
405    pub entry: Entry,
406    pub header_bin: Vec<u8>,
407}
408
409impl ParsedEntry {
410    fn from_etf_bin(bin: &[u8]) -> Result<Self, Error> {
411        let entry = Entry::unpack(bin)?;
412        let term = Term::decode(bin)?;
413        let map = term.get_term_map().ok_or(Error::BadEtf("entry"))?;
414        let header_bin: Vec<u8> = map.get_binary("header").ok_or(Error::BadEtf("header"))?;
415
416        Ok(ParsedEntry { entry, header_bin })
417    }
418
419    fn validate_signature(&self) -> Result<(), Error> {
420        if let Some(_mask) = &self.entry.mask {
421            // Aggregate signature path requires trainers from chain state (DB); not available here.
422            return Err(Error::NoTrainers);
423        } else {
424            let h = blake3::hash(&self.header_bin);
425            bls12_381::verify(&self.entry.header.signer, &self.entry.signature, &h, DST_ENTRY)?;
426        }
427
428        Ok(())
429    }
430}
431
432/// Get archived entries as a list of (epoch, height, entry_size) tuples by parsing filenames
433pub async fn get_archived_entries() -> Result<Vec<(u64, u64, u64)>, Error> {
434    let filenames_with_sizes = archiver::get_archived_filenames().await?;
435    let mut entries = Vec::new();
436
437    for (filename, file_size) in filenames_with_sizes {
438        if let Some((epoch, height)) = parse_entry_filename(&filename) {
439            entries.push((epoch, height, file_size));
440        }
441    }
442
443    // Sort by epoch first, then by height
444    entries.sort_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
445    entries.dedup(); // Remove any duplicates
446
447    Ok(entries)
448}
449
450/// Parse entry filename to extract epoch and height
451/// Expected format: "epoch-{epoch}/entry-{height}" or similar patterns
452fn parse_entry_filename(filename: &str) -> Option<(u64, u64)> {
453    // Split by '/' to get directory and filename parts
454    let parts: Vec<&str> = filename.split('/').collect();
455
456    let mut epoch = None;
457    let mut height = None;
458
459    // Look for epoch in directory part (e.g., "epoch-123")
460    for part in &parts {
461        if let Some(epoch_str) = part.strip_prefix("epoch-") {
462            if let Ok(e) = epoch_str.parse::<u64>() {
463                epoch = Some(e);
464            }
465        }
466    }
467
468    // Look for height in filename part (e.g., "entry-456")
469    if let Some(filename_part) = parts.last() {
470        if let Some(height_str) = filename_part.strip_prefix("entry-") {
471            if let Ok(h) = height_str.parse::<u64>() {
472                height = Some(h);
473            }
474        }
475    }
476
477    match (epoch, height) {
478        (Some(e), Some(h)) => Some((e, h)),
479        _ => None,
480    }
481}
482
483#[cfg(test)]
484mod tests {
485    use super::*;
486
487    #[test]
488    fn test_parse_entry_filename() {
489        // Test valid filenames
490        assert_eq!(parse_entry_filename("epoch-0/entry-12345"), Some((0, 12345)));
491        assert_eq!(parse_entry_filename("epoch-123/entry-456"), Some((123, 456)));
492        assert_eq!(parse_entry_filename("epoch-999/subdir/entry-789"), Some((999, 789)));
493
494        // Test invalid filenames
495        assert_eq!(parse_entry_filename("not-epoch/entry-123"), None);
496        assert_eq!(parse_entry_filename("epoch-123/not-entry"), None);
497        assert_eq!(parse_entry_filename("epoch-abc/entry-123"), None);
498        assert_eq!(parse_entry_filename("epoch-123/entry-def"), None);
499        assert_eq!(parse_entry_filename("random-file.txt"), None);
500        assert_eq!(parse_entry_filename(""), None);
501    }
502
503    #[tokio::test]
504    async fn test_get_archived_entries_empty() {
505        // This test will only work if the archiver is not initialized
506        // or the directory is empty, which is fine for testing the function structure
507        let result = get_archived_entries().await;
508        // We don't assert specific values since we don't know the state of the filesystem
509        // but we ensure the function doesn't panic and returns a proper Result
510        match result {
511            Ok(entries) => {
512                // Entries should be sorted and deduplicated
513                for i in 1..entries.len() {
514                    let prev = entries[i - 1];
515                    let curr = entries[i];
516                    assert!(prev.0 < curr.0 || (prev.0 == curr.0 && prev.1 <= curr.1));
517                    // Each entry should have a file size (third element)
518                    assert!(curr.2 > 0 || curr.2 == 0); // File size can be 0 for empty files
519                }
520            }
521            Err(_) => {
522                // It's okay if it fails due to archiver not being initialized
523            }
524        }
525    }
526}