1use crate::Context;
2use crate::config::ENTRY_SIZE;
3use crate::consensus::agg_sig::{DST_ENTRY, DST_VRF};
5use crate::consensus::doms::tx::TxU;
6use crate::consensus::fabric;
7use crate::node::protocol;
8use crate::node::protocol::Protocol;
9use crate::utils::bls12_381;
10use crate::utils::misc::{TermExt, TermMap, bitvec_to_bools, bools_to_bitvec, get_unix_millis_now};
11use crate::utils::safe_etf::{encode_safe, encode_safe_deterministic};
12use crate::utils::{archiver, blake3};
13use eetf::{Atom, Binary, FixInteger, Map, Term};
14use std::collections::HashMap;
15use std::fmt;
16use std::net::Ipv4Addr;
17const MAX_TXS: usize = 100; #[derive(Debug, thiserror::Error)]
22pub enum Error {
23 #[error(transparent)]
24 Io(#[from] std::io::Error),
25 #[error(transparent)]
26 EtfDecode(#[from] eetf::DecodeError),
27 #[error(transparent)]
28 EtfEncode(#[from] eetf::EncodeError),
29 #[error(transparent)]
30 BinDecode(#[from] bincode::error::DecodeError),
31 #[error(transparent)]
32 BinEncode(#[from] bincode::error::EncodeError),
33 #[error("invalid erlang etf: {0}")]
34 BadEtf(&'static str),
35 #[error("invalid signature")]
36 BadAggSignature,
37 #[error("wrong epoch or unsupported aggregate signature path")]
38 NoTrainers,
39 #[error("txs_hash invalid")]
40 BadTxsHash,
41 #[error(transparent)]
42 Tx(#[from] super::tx::Error),
43 #[error(transparent)]
44 Bls(#[from] bls12_381::Error),
45 #[error(transparent)]
46 Fabric(#[from] fabric::Error),
47 #[error(transparent)]
48 Archiver(#[from] archiver::Error),
49 #[error(transparent)]
50 RocksDb(#[from] crate::utils::rocksdb::Error),
51}
52
53#[derive(Debug, Clone)]
55pub struct EntrySummary {
56 pub header: EntryHeader,
57 pub signature: [u8; 96],
58 pub mask: Option<Vec<bool>>,
59}
60
61impl From<Entry> for EntrySummary {
62 fn from(entry: Entry) -> Self {
63 Self { header: entry.header, signature: entry.signature, mask: entry.mask }
64 }
65}
66
67impl EntrySummary {
68 pub fn from_etf_term(map: &TermMap) -> Result<Self, Error> {
70 if map.0.is_empty() {
72 return Ok(Self::empty());
73 }
74 let header_bin: Vec<u8> = map.get_binary("header").ok_or(Error::BadEtf("header"))?;
75 let signature = map.get_binary("signature").ok_or(Error::BadEtf("signature"))?;
76 let mask = map.get_binary("mask").map(bitvec_to_bools);
77 let header = EntryHeader::from_etf_bin(&header_bin).map_err(|_| Error::BadEtf("header"))?;
78 Ok(Self { header, signature, mask })
79 }
80
81 pub fn to_etf_term(&self) -> Result<Term, Error> {
83 let mut m = HashMap::new();
84 m.insert(Term::Atom(Atom::from("header")), Term::from(Binary { bytes: self.header.to_etf_bin()? }));
85 m.insert(Term::Atom(Atom::from("signature")), Term::from(Binary { bytes: self.signature.to_vec() }));
86 if let Some(mask) = &self.mask {
87 m.insert(Term::Atom(Atom::from("mask")), Term::from(Binary { bytes: bools_to_bitvec(mask) }));
88 }
89 Ok(Term::from(Map { map: m }))
90 }
91
92 pub fn empty() -> Self {
94 let header = EntryHeader {
95 height: 0,
96 slot: 0,
97 prev_slot: 0,
98 prev_hash: [0u8; 32],
99 dr: [0u8; 32],
100 vr: [0u8; 96],
101 signer: [0u8; 48],
102 txs_hash: [0u8; 32],
103 };
104 Self { header, signature: [0u8; 96], mask: None }
105 }
106}
107
108#[derive(Clone)]
109pub struct EntryHeader {
110 pub height: u32, pub slot: u32,
112 pub prev_slot: i32, pub prev_hash: [u8; 32],
114 pub dr: [u8; 32], pub vr: [u8; 96], pub signer: [u8; 48],
117 pub txs_hash: [u8; 32],
118}
119
120impl fmt::Debug for EntryHeader {
121 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122 f.debug_struct("EntryHeader")
123 .field("slot", &self.slot)
124 .field("dr", &bs58::encode(&self.dr).into_string())
125 .field("height", &self.height)
126 .field("prev_hash", &bs58::encode(&self.prev_hash).into_string())
127 .field("prev_slot", &self.prev_slot)
128 .field("signer", &bs58::encode(&self.signer).into_string())
129 .field("txs_hash", &bs58::encode(&self.txs_hash).into_string())
130 .field("vr", &bs58::encode(&self.vr).into_string())
131 .finish()
132 }
133}
134
135impl EntryHeader {
136 pub fn from_etf_bin(bin: &[u8]) -> Result<Self, Error> {
137 let term = Term::decode(bin).map_err(Error::EtfDecode)?;
138 let map = term.get_term_map().ok_or(Error::BadEtf("entry-header-map"))?;
139
140 let height = map.get_integer("height").ok_or(Error::BadEtf("height"))?;
141 let slot = map.get_integer("slot").ok_or(Error::BadEtf("slot"))?;
142 let prev_slot = map.get_integer("prev_slot").ok_or(Error::BadEtf("prev_slot"))?;
143 let prev_hash = map.get_binary("prev_hash").ok_or(Error::BadEtf("prev_hash"))?;
144 let dr = map.get_binary("dr").ok_or(Error::BadEtf("dr"))?;
145 let vr = map.get_binary("vr").ok_or(Error::BadEtf("vr"))?;
146 let signer = map.get_binary("signer").ok_or(Error::BadEtf("signer"))?;
147 let txs_hash = map.get_binary("txs_hash").ok_or(Error::BadEtf("txs_hash"))?;
148
149 Ok(EntryHeader { height, slot, prev_slot, prev_hash, dr, vr, signer, txs_hash })
150 }
151
152 pub fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
154 let mut map = HashMap::new();
155 map.insert(Term::Atom(Atom::from("height")), Term::from(FixInteger { value: self.height as i32 }));
156 map.insert(Term::Atom(Atom::from("slot")), Term::from(FixInteger { value: self.slot as i32 }));
157 map.insert(Term::Atom(Atom::from("prev_slot")), Term::from(FixInteger { value: self.prev_slot as i32 }));
158 map.insert(Term::Atom(Atom::from("prev_hash")), Term::from(Binary { bytes: self.prev_hash.to_vec() }));
159 map.insert(Term::Atom(Atom::from("dr")), Term::from(Binary { bytes: self.dr.to_vec() }));
160 map.insert(Term::Atom(Atom::from("vr")), Term::from(Binary { bytes: self.vr.to_vec() }));
161 map.insert(Term::Atom(Atom::from("signer")), Term::from(Binary { bytes: self.signer.to_vec() }));
162 map.insert(Term::Atom(Atom::from("txs_hash")), Term::from(Binary { bytes: self.txs_hash.to_vec() }));
163
164 let term = Term::Map(Map { map });
165 let out = encode_safe_deterministic(&term);
166 Ok(out)
167 }
168}
169
170#[derive(Clone)]
171pub struct Entry {
172 pub hash: [u8; 32],
173 pub header: EntryHeader,
174 pub signature: [u8; 96],
175 pub mask: Option<Vec<bool>>, pub txs: Vec<Vec<u8>>, }
178
179impl Entry {
180 pub fn pack(&self) -> Result<Vec<u8>, Error> {
182 let mut map = HashMap::new();
183
184 let header_bin = self.header.to_etf_bin()?;
186 map.insert(Term::Atom(Atom::from("header")), Term::from(Binary { bytes: header_bin }));
187
188 let txs_terms: Vec<Term> = self.txs.iter().map(|tx| Term::from(Binary { bytes: tx.clone() })).collect();
190 map.insert(Term::Atom(Atom::from("txs")), Term::from(eetf::List { elements: txs_terms }));
191
192 map.insert(Term::Atom(Atom::from("hash")), Term::from(Binary { bytes: self.hash.to_vec() }));
193 map.insert(Term::Atom(Atom::from("signature")), Term::from(Binary { bytes: self.signature.to_vec() }));
194
195 if let Some(mask) = &self.mask {
197 let mask_bytes = bools_to_bitvec(mask);
198 map.insert(Term::Atom(Atom::from("mask")), Term::from(Binary { bytes: mask_bytes }));
199 }
200
201 let term = Term::from(eetf::Map { map });
202 let out = encode_safe_deterministic(&term);
203 Ok(out)
204 }
205
206 pub fn unpack(entry_packed: &[u8]) -> Result<Self, Error> {
208 let term = Term::decode(entry_packed)?;
209 let map = term.get_term_map().ok_or(Error::BadEtf("entry"))?;
210
211 let hash = map.get_binary("hash").ok_or(Error::BadEtf("hash"))?;
212 let header_bin: Vec<u8> = map.get_binary("header").ok_or(Error::BadEtf("header"))?;
213 let signature = map.get_binary("signature").ok_or(Error::BadEtf("signature"))?;
214 let mask = map.get_binary("mask").map(bitvec_to_bools);
215 let txs: Vec<Vec<u8>> =
216 map.get_list("txs").unwrap_or_default().iter().filter_map(TermExt::get_binary).map(Into::into).collect();
217
218 let header = EntryHeader::from_etf_bin(&header_bin)?;
219
220 Ok(Entry { hash, header, signature, mask, txs })
221 }
222}
223
224impl TryFrom<&[u8]> for Entry {
225 type Error = Error;
226
227 fn try_from(bin: &[u8]) -> Result<Self, Self::Error> {
228 Self::unpack(bin)
229 }
230}
231
232impl TryInto<Vec<u8>> for Entry {
233 type Error = Error;
234
235 fn try_into(self) -> Result<Vec<u8>, Self::Error> {
236 self.pack()
237 }
238}
239
240impl crate::utils::misc::Typename for Entry {
241 fn typename(&self) -> &'static str {
242 Self::TYPENAME
243 }
244}
245
246#[async_trait::async_trait]
247impl Protocol for Entry {
248 fn from_etf_map_validated(map: TermMap) -> Result<Self, protocol::Error> {
249 let bin = map.get_binary("entry_packed").ok_or(Error::BadEtf("entry_packed"))?;
250 Entry::from_etf_bin_validated(bin, ENTRY_SIZE).map_err(Into::into)
251 }
252 fn to_etf_bin(&self) -> Result<Vec<u8>, protocol::Error> {
253 let entry_bin: Vec<u8> = self.pack().map_err(|_| protocol::Error::BadEtf("entry"))?;
255
256 let mut m = HashMap::new();
257 m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
258 m.insert(Term::Atom(Atom::from("entry_packed")), Term::from(Binary { bytes: entry_bin }));
259
260 let term = Term::from(Map { map: m });
261 let out = encode_safe(&term);
262 Ok(out)
263 }
264
265 async fn handle(&self, ctx: &Context, _src: Ipv4Addr) -> Result<Vec<protocol::Instruction>, protocol::Error> {
266 let height = self.header.height;
267
268 let rooted_height = ctx
270 .fabric
271 .get_rooted_hash()
272 .ok()
273 .flatten()
274 .map(TryInto::try_into)
275 .and_then(|h| h.ok())
276 .and_then(|h| ctx.fabric.get_entry_by_hash(&h))
277 .map(|e| e.header.height)
278 .unwrap_or(0);
279
280 if height >= rooted_height {
281 let hash = self.hash;
282 let epoch = self.get_epoch();
283 let slot = self.header.slot; let bin: Vec<u8> = self.clone().try_into()?;
285
286 ctx.fabric.insert_entry(&hash, height, slot, &bin, get_unix_millis_now())?;
287 archiver::store(bin, format!("epoch-{}", epoch), format!("entry-{}", height)).await?;
288 }
289
290 Ok(vec![protocol::Instruction::Noop { why: "entry handling not implemented".to_string() }])
291 }
292}
293
294impl fmt::Debug for Entry {
295 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
296 f.debug_struct("Entry")
297 .field("hash", &bs58::encode(&self.hash).into_string())
298 .field("header", &self.header)
299 .field("signature", &bs58::encode(&self.signature).into_string())
300 .field("txs", &self.txs.iter().map(|tx| bs58::encode(tx).into_string()).collect::<Vec<String>>())
301 .finish()
302 }
303}
304
305impl Entry {
306 pub const TYPENAME: &'static str = "event_entry";
307
308 pub fn to_etf_bin(&self) -> Result<Vec<u8>, protocol::Error> {
309 let entry_bin: Vec<u8> = self.pack().map_err(|_| protocol::Error::BadEtf("entry"))?;
311
312 let mut m = HashMap::new();
313 m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
314 m.insert(Term::Atom(Atom::from("entry_packed")), Term::from(Binary { bytes: entry_bin }));
315
316 let term = Term::from(Map { map: m });
317 let out = encode_safe(&term);
318 Ok(out)
319 }
320
321 pub fn from_etf_bin_validated(bin: &[u8], entry_size_limit: usize) -> Result<Entry, Error> {
322 if bin.len() >= entry_size_limit {
323 return Err(Error::BadEtf("entry_bin_too_large"));
324 }
325
326 let parsed_entry = Entry::unpack(bin)?;
328 let repacked = parsed_entry.pack()?;
329 if bin != repacked {
330 return Err(Error::BadEtf("not_deterministicly_encoded"));
331 }
332
333 let header_repacked = parsed_entry.header.to_etf_bin()?;
335 let term = Term::decode(bin)?;
337 let map = term.get_term_map().ok_or(Error::BadEtf("entry"))?;
338 let original_header_bin = map.get_binary("header").ok_or(Error::BadEtf("header"))?;
339 if original_header_bin != header_repacked {
340 return Err(Error::BadEtf("not_deterministicly_encoded_header"));
341 }
342
343 let parsed = ParsedEntry { entry: parsed_entry, header_bin: original_header_bin };
344 parsed.validate_signature()?;
345 let is_special = parsed.entry.mask.is_some();
346 parsed.entry.validate_contents(is_special)?;
347
348 Ok(parsed.entry)
349 }
350
351 fn validate_contents(&self, is_special_meeting_block: bool) -> Result<(), Error> {
352 if self.txs.len() > MAX_TXS {
353 return Err(Error::BadEtf("txs_len_over_100"));
354 }
355
356 let txs_bin = self.txs.iter().flatten().cloned().collect::<Vec<u8>>();
357 if self.header.txs_hash.as_slice() != blake3::hash(&txs_bin).as_slice() {
358 return Err(Error::BadTxsHash);
359 }
360
361 for txp in &self.txs {
362 super::tx::validate(txp, is_special_meeting_block)?;
363 }
364
365 Ok(())
366 }
367
368 pub fn build_next_header(&self, slot: u32, signer_pk: &[u8; 48], signer_sk: &[u8]) -> Result<EntryHeader, Error> {
371 let dr = blake3::hash(&self.header.dr);
373 let vr = bls12_381::sign(signer_sk, &self.header.vr, DST_VRF)?;
375
376 Ok(EntryHeader {
377 slot,
378 height: self.header.height + 1,
379 prev_slot: self.header.slot as i32,
380 prev_hash: self.hash,
381 dr,
382 vr,
383 signer: *signer_pk,
384 txs_hash: [0u8; 32], })
386 }
387
388 pub fn get_epoch(&self) -> u32 {
389 self.header.height / 100_000
390 }
391
392 pub fn contains_tx(&self, tx_function: &str) -> bool {
393 self.txs.iter().any(|txp| {
394 if let Ok(txu) = TxU::from_vanilla(txp) {
395 if let Some(first) = txu.tx.actions.first() { first.function == tx_function } else { false }
396 } else {
397 false
398 }
399 })
400 }
401}
402
403#[derive(Debug, Clone)]
404struct ParsedEntry {
405 pub entry: Entry,
406 pub header_bin: Vec<u8>,
407}
408
409impl ParsedEntry {
410 fn from_etf_bin(bin: &[u8]) -> Result<Self, Error> {
411 let entry = Entry::unpack(bin)?;
412 let term = Term::decode(bin)?;
413 let map = term.get_term_map().ok_or(Error::BadEtf("entry"))?;
414 let header_bin: Vec<u8> = map.get_binary("header").ok_or(Error::BadEtf("header"))?;
415
416 Ok(ParsedEntry { entry, header_bin })
417 }
418
419 fn validate_signature(&self) -> Result<(), Error> {
420 if let Some(_mask) = &self.entry.mask {
421 return Err(Error::NoTrainers);
423 } else {
424 let h = blake3::hash(&self.header_bin);
425 bls12_381::verify(&self.entry.header.signer, &self.entry.signature, &h, DST_ENTRY)?;
426 }
427
428 Ok(())
429 }
430}
431
432pub async fn get_archived_entries() -> Result<Vec<(u64, u64, u64)>, Error> {
434 let filenames_with_sizes = archiver::get_archived_filenames().await?;
435 let mut entries = Vec::new();
436
437 for (filename, file_size) in filenames_with_sizes {
438 if let Some((epoch, height)) = parse_entry_filename(&filename) {
439 entries.push((epoch, height, file_size));
440 }
441 }
442
443 entries.sort_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
445 entries.dedup(); Ok(entries)
448}
449
450fn parse_entry_filename(filename: &str) -> Option<(u64, u64)> {
453 let parts: Vec<&str> = filename.split('/').collect();
455
456 let mut epoch = None;
457 let mut height = None;
458
459 for part in &parts {
461 if let Some(epoch_str) = part.strip_prefix("epoch-") {
462 if let Ok(e) = epoch_str.parse::<u64>() {
463 epoch = Some(e);
464 }
465 }
466 }
467
468 if let Some(filename_part) = parts.last() {
470 if let Some(height_str) = filename_part.strip_prefix("entry-") {
471 if let Ok(h) = height_str.parse::<u64>() {
472 height = Some(h);
473 }
474 }
475 }
476
477 match (epoch, height) {
478 (Some(e), Some(h)) => Some((e, h)),
479 _ => None,
480 }
481}
482
483#[cfg(test)]
484mod tests {
485 use super::*;
486
487 #[test]
488 fn test_parse_entry_filename() {
489 assert_eq!(parse_entry_filename("epoch-0/entry-12345"), Some((0, 12345)));
491 assert_eq!(parse_entry_filename("epoch-123/entry-456"), Some((123, 456)));
492 assert_eq!(parse_entry_filename("epoch-999/subdir/entry-789"), Some((999, 789)));
493
494 assert_eq!(parse_entry_filename("not-epoch/entry-123"), None);
496 assert_eq!(parse_entry_filename("epoch-123/not-entry"), None);
497 assert_eq!(parse_entry_filename("epoch-abc/entry-123"), None);
498 assert_eq!(parse_entry_filename("epoch-123/entry-def"), None);
499 assert_eq!(parse_entry_filename("random-file.txt"), None);
500 assert_eq!(parse_entry_filename(""), None);
501 }
502
503 #[tokio::test]
504 async fn test_get_archived_entries_empty() {
505 let result = get_archived_entries().await;
508 match result {
511 Ok(entries) => {
512 for i in 1..entries.len() {
514 let prev = entries[i - 1];
515 let curr = entries[i];
516 assert!(prev.0 < curr.0 || (prev.0 == curr.0 && prev.1 <= curr.1));
517 assert!(curr.2 > 0 || curr.2 == 0); }
520 }
521 Err(_) => {
522 }
524 }
525 }
526}