1use crate::Context;
2#[cfg(test)]
3use crate::Ver;
4use crate::bic::sol;
5use crate::bic::sol::Solution;
6use crate::consensus::consensus::{self, Consensus};
7use crate::consensus::doms::attestation::EventAttestation;
8use crate::consensus::doms::entry::Entry;
9use crate::consensus::doms::{Attestation, EntrySummary};
10use crate::consensus::fabric::Fabric;
11use crate::node::anr::Anr;
12use crate::node::peers::HandshakeStatus;
13use crate::node::{anr, peers};
14use crate::utils::bls12_381;
15use crate::utils::misc::{TermExt, TermMap, Typename, get_unix_millis_now, parse_list, serialize_list};
16use crate::utils::safe_etf::encode_safe;
17use eetf::convert::TryAsRef;
18use eetf::{Atom, Binary, DecodeError as EtfDecodeError, EncodeError as EtfEncodeError, List, Map, Term};
19use std::collections::HashMap;
20use std::fmt::Debug;
21use std::io::Error as IoError;
22use std::net::{Ipv4Addr, SocketAddr};
23use tracing::instrument;
24use tracing::warn;
25
26#[async_trait::async_trait]
29pub trait Protocol: Typename + Debug + Send + Sync {
30 fn from_etf_map_validated(map: TermMap) -> Result<Self, Error>
31 where
32 Self: Sized;
33 fn to_etf_bin(&self) -> Result<Vec<u8>, Error>;
35 async fn handle(&self, ctx: &Context, src: Ipv4Addr) -> Result<Vec<Instruction>, Error>;
37 async fn send_to_with_metrics(&self, ctx: &Context, dst: Ipv4Addr) -> Result<(), Error> {
40 let dst_addr = SocketAddr::new(std::net::IpAddr::V4(dst), ctx.config.udp_port);
41 let dst_anr = ctx.node_anrs.get_by_ip4(dst).await.ok_or(Error::NoAnrForDestination(dst))?;
42 let payload = self.to_etf_bin().inspect_err(|e| ctx.metrics.add_error(e))?;
43
44 let shards = ctx.reassembler.build_shards(&ctx.config, &payload, &dst_anr.pk).await?;
45 for shard in &shards {
46 ctx.socket.send_to_with_metrics(shard, dst_addr, &ctx.metrics).await?;
47 }
48
49 ctx.metrics.add_outgoing_proto(self.typename());
50 Ok(())
51 }
52}
53
54#[derive(Debug, thiserror::Error, strum_macros::IntoStaticStr)]
55pub enum Error {
56 #[error(transparent)]
57 Io(#[from] IoError),
58 #[error(transparent)]
59 EtfDecode(#[from] EtfDecodeError),
60 #[error(transparent)]
61 EtfEncode(#[from] EtfEncodeError),
62 #[error(transparent)]
63 Bls(#[from] bls12_381::Error),
64 #[error(transparent)]
65 Tx(#[from] crate::consensus::doms::tx::Error),
66 #[error(transparent)]
67 Entry(#[from] crate::consensus::doms::entry::Error),
68 #[error(transparent)]
69 Archiver(#[from] crate::utils::archiver::Error),
70 #[error(transparent)]
71 Peers(#[from] peers::Error),
72 #[error(transparent)]
73 Consensus(#[from] consensus::Error),
74 #[error(transparent)]
75 Fabric(#[from] crate::consensus::fabric::Error),
76 #[error(transparent)]
77 Sol(#[from] sol::Error),
78 #[error(transparent)]
79 Att(#[from] crate::consensus::doms::attestation::Error),
80 #[error(transparent)]
81 Reassembler(#[from] crate::node::reassembler::Error),
82 #[error(transparent)]
83 Anr(#[from] anr::Error),
84 #[error("bad etf: {0}")]
85 BadEtf(&'static str),
86 #[error("No ANR found for destination IP: {0}")]
87 NoAnrForDestination(Ipv4Addr),
88}
89
90impl Typename for Error {
91 fn typename(&self) -> &'static str {
92 self.into()
93 }
94}
95
96#[derive(Debug, strum_macros::IntoStaticStr)]
98pub enum Instruction {
99 Noop { why: String },
100 SendNewPhoneWhoDisReply { dst: Ipv4Addr },
101 SendGetPeerAnrsReply { anrs: Vec<Anr>, dst: Ipv4Addr },
102 SendPingReply { ts_m: u64, dst: Ipv4Addr },
103
104 ValidTxs { txs: Vec<Vec<u8>> },
105 ReceivedSol { sol: Solution },
106 ReceivedEntry { entry: Entry },
107 ReceivedAttestation { attestation: Attestation },
108 ReceivedConsensus { consensus: Consensus },
109 ConsensusesPacked { packed: Vec<u8> },
110 CatchupEntryReq { heights: Vec<u64> },
111 CatchupTriReq { heights: Vec<u64> },
112 CatchupBiReq { heights: Vec<u64> },
113 CatchupAttestationReq { hashes: Vec<Vec<u8>> },
114 SpecialBusiness { business: Vec<u8> },
115 SpecialBusinessReply { business: Vec<u8> },
116 SolicitEntry { hash: Vec<u8> },
117 SolicitEntry2,
118}
119
120impl Typename for Instruction {
121 fn typename(&self) -> &'static str {
122 self.into()
123 }
124}
125
126#[instrument(skip(bin), name = "Proto::from_etf_validated")]
128pub fn parse_etf_bin(bin: &[u8]) -> Result<Box<dyn Protocol>, Error> {
129 let term = Term::decode(bin)?;
131 let map = term.get_term_map().ok_or(Error::BadEtf("map"))?;
132
133 let op_atom = map.get_atom("op").ok_or(Error::BadEtf("op"))?;
135 let proto: Box<dyn Protocol> = match op_atom.name.as_str() {
136 Ping::TYPENAME => Box::new(Ping::from_etf_map_validated(map)?),
137 PingReply::TYPENAME => Box::new(PingReply::from_etf_map_validated(map)?),
138 Entry::TYPENAME => Box::new(Entry::from_etf_map_validated(map)?),
139 EventTip::TYPENAME => Box::new(EventTip::from_etf_map_validated(map)?),
140 EventAttestation::TYPENAME => Box::new(EventAttestation::from_etf_map_validated(map)?),
141 Solution::TYPENAME => Box::new(Solution::from_etf_map_validated(map)?),
142 EventTx::TYPENAME => Box::new(EventTx::from_etf_map_validated(map)?),
143 GetPeerAnrs::TYPENAME => Box::new(GetPeerAnrs::from_etf_map_validated(map)?),
144 GetPeerAnrsReply::TYPENAME => Box::new(GetPeerAnrsReply::from_etf_map_validated(map)?),
145 NewPhoneWhoDis::TYPENAME => Box::new(NewPhoneWhoDis::from_etf_map_validated(map)?),
146 NewPhoneWhoDisReply::TYPENAME => Box::new(NewPhoneWhoDisReply::from_etf_map_validated(map)?),
147 SpecialBusiness::TYPENAME => Box::new(SpecialBusiness::from_etf_map_validated(map)?),
148 SpecialBusinessReply::TYPENAME => Box::new(SpecialBusinessReply::from_etf_map_validated(map)?),
149 Catchup::TYPENAME => Box::new(Catchup::from_etf_map_validated(map)?),
150 CatchupReply::TYPENAME => Box::new(CatchupReply::from_etf_map_validated(map)?),
151 _ => {
152 warn!("Unknown operation: {}", op_atom.name);
153 return Err(Error::BadEtf("op"));
154 }
155 };
156
157 Ok(proto)
158}
159
160#[derive(Debug)]
161pub struct EventTip {
162 pub temporal: EntrySummary,
163 pub rooted: EntrySummary,
164}
165
166impl Typename for EventTip {
167 fn typename(&self) -> &'static str {
168 Self::TYPENAME
169 }
170}
171
172#[async_trait::async_trait]
173impl Protocol for EventTip {
174 fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
175 let temporal_term = map.get_term_map("temporal").ok_or(Error::BadEtf("temporal"))?;
176 let rooted_term = map.get_term_map("rooted").ok_or(Error::BadEtf("rooted"))?;
177 let temporal = EntrySummary::from_etf_term(&temporal_term)?;
178 let rooted = EntrySummary::from_etf_term(&rooted_term)?;
179
180 Ok(Self { temporal, rooted })
181 }
182 fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
183 let mut m = HashMap::new();
184 m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
185 m.insert(Term::Atom(Atom::from("temporal")), self.temporal.to_etf_term()?);
186 m.insert(Term::Atom(Atom::from("rooted")), self.rooted.to_etf_term()?);
187 let term = Term::from(Map { map: m });
188 let etf_data = encode_safe(&term);
189
190 Ok(etf_data)
191 }
192
193 #[instrument(skip(self, ctx), fields(src = %src), name = "EventTip::handle")]
194 async fn handle(&self, ctx: &Context, src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
195 let signer = self.temporal.header.signer.to_vec();
198 let db = ctx.fabric.db();
199 let is_trainer = match crate::consensus::trainers_for_height(db, crate::consensus::chain_height(db)) {
200 Some(trainers) => trainers.iter().any(|pk| pk.as_slice() == signer),
201 None => false,
202 };
203
204 if is_trainer || ctx.is_peer_handshaked(src).await {
205 ctx.node_peers.update_peer_from_tip(ctx, src, self).await;
206 }
207
208 Ok(vec![Instruction::Noop { why: "event_tip handling not implemented".to_string() }])
209 }
210}
211
212impl EventTip {
213 pub const TYPENAME: &'static str = "event_tip";
214
215 pub fn new(temporal: EntrySummary, rooted: EntrySummary) -> Self {
216 Self { temporal, rooted }
217 }
218
219 pub fn from_current_tips() -> Result<Self, Error> {
220 Err(Error::Consensus(consensus::Error::NotImplemented("from_current_tips requires DB context")))
222 }
223
224 pub fn from_current_tips_db(fab: &Fabric) -> Result<Self, Error> {
226 fn entry_summary_by_hash(fab: &Fabric, hash: &[u8; 32]) -> EntrySummary {
228 if let Some(entry) = fab.get_entry_by_hash(hash) { entry.into() } else { EntrySummary::empty() }
229 }
230
231 let temporal_summary = match fab.get_temporal_hash()? {
232 Some(h) => entry_summary_by_hash(fab, &h),
233 None => EntrySummary::empty(),
234 };
235
236 let rooted_summary = match fab.get_rooted_hash()? {
237 Some(h) => entry_summary_by_hash(fab, &h),
238 None => EntrySummary::empty(),
239 };
240
241 Ok(Self { temporal: temporal_summary, rooted: rooted_summary })
242 }
243}
244
245#[derive(Debug)]
246pub struct Ping {
247 pub ts_m: u64,
248}
249
250#[derive(Debug)]
251pub struct PingReply {
252 pub ts_m: u64,
253 pub seen_time: u64,
254}
255
256#[derive(Debug)]
257pub struct ConsensusBulk {
258 pub consensuses_packed: Vec<u8>,
259}
260
261#[derive(Debug)]
262pub struct CatchupEntry {
263 pub heights: Vec<u64>,
264}
265
266#[derive(Debug)]
267pub struct CatchupTri {
268 pub heights: Vec<u64>,
269}
270
271#[derive(Debug)]
272pub struct CatchupBi {
273 pub heights: Vec<u64>,
274}
275
276#[derive(Debug)]
277pub struct CatchupAttestation {
278 pub hashes: Vec<Vec<u8>>,
279}
280
281#[derive(Debug)]
283pub struct Catchup {
284 pub heights: Vec<CatchupHeight>,
285}
286
287#[derive(Debug, Clone)]
288pub struct CatchupHeight {
289 pub height: u32,
290 pub c: Option<bool>, pub e: Option<bool>, pub a: Option<bool>, pub hashes: Option<Vec<Vec<u8>>>, }
295
296impl Catchup {
297 pub const TYPENAME: &'static str = "catchup";
298}
299
300impl Typename for Catchup {
301 fn typename(&self) -> &'static str {
302 Self::TYPENAME
303 }
304}
305
306#[async_trait::async_trait]
307impl Protocol for Catchup {
308 fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
309 let height_flags_term = map.get_list("height_flags").ok_or(Error::BadEtf("height_flags"))?;
310 let mut height_flags = Vec::new();
311
312 for item in height_flags_term {
313 if let Some(flag_map) = item.get_term_map() {
314 let height = flag_map.get_integer::<u32>("height").ok_or(Error::BadEtf("height"))?;
315
316 let c = flag_map.get_atom("c").map(|a| a.name == "true");
317 let e = flag_map.get_atom("e").map(|a| a.name == "true");
318 let a = flag_map.get_atom("a").map(|a| a.name == "true");
319
320 let hashes = flag_map.get_list("hashes").map(|hashes_list| {
321 hashes_list.iter().filter_map(|h| h.get_binary().map(|bytes| bytes.to_vec())).collect()
322 });
323
324 height_flags.push(CatchupHeight { height, c, e, a, hashes });
325 }
326 }
327
328 Ok(Self { heights: height_flags })
329 }
330
331 fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
332 let mut m = HashMap::new();
333 m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
334
335 let height_flags_list: Vec<Term> = self
336 .heights
337 .iter()
338 .map(|flag| {
339 let mut flag_map = HashMap::new();
340 flag_map.insert(Term::Atom(Atom::from("height")), Term::FixInteger((flag.height as i32).into()));
341
342 if let Some(true) = flag.c {
343 flag_map.insert(Term::Atom(Atom::from("c")), Term::Atom(Atom::from("true")));
344 }
345 if let Some(true) = flag.e {
346 flag_map.insert(Term::Atom(Atom::from("e")), Term::Atom(Atom::from("true")));
347 }
348 if let Some(true) = flag.a {
349 flag_map.insert(Term::Atom(Atom::from("a")), Term::Atom(Atom::from("true")));
350 }
351 if let Some(ref hashes) = flag.hashes {
352 if !hashes.is_empty() {
353 let hashes_terms: Vec<Term> =
354 hashes.iter().map(|h| Term::Binary(Binary::from(h.clone()))).collect();
355 flag_map.insert(Term::Atom(Atom::from("hashes")), Term::List(List::from(hashes_terms)));
356 }
357 }
358 Term::Map(Map::from(flag_map))
359 })
360 .collect();
361
362 m.insert(Term::Atom(Atom::from("height_flags")), Term::List(List::from(height_flags_list)));
363
364 let etf_term = Term::Map(Map::from(m));
365 Ok(encode_safe(&etf_term))
366 }
367
368 async fn handle(&self, _ctx: &Context, _src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
369 Ok(vec![Instruction::Noop { why: "catchup received".to_string() }])
371 }
372}
373
374#[derive(Debug)]
375pub struct CatchupReply {
376 pub heights: Vec<CatchupHeightReply>,
377}
378
379#[derive(Debug, Clone)]
380pub struct CatchupHeightReply {
381 pub height: u32,
382 pub entries: Option<Vec<Entry>>,
383 pub attestations: Option<Vec<Attestation>>,
384 pub consensuses: Option<Vec<Consensus>>,
385}
386
387impl CatchupReply {
388 pub const TYPENAME: &'static str = "catchup_reply";
389}
390
391impl Typename for CatchupReply {
392 fn typename(&self) -> &'static str {
393 Self::TYPENAME
394 }
395}
396
397#[async_trait::async_trait]
398impl Protocol for CatchupReply {
399 fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
400 let tries_term = map.get_list("tries").ok_or(Error::BadEtf("tries"))?;
401 let mut tries = Vec::new();
402
403 for item in tries_term {
404 if let Some(trie_map) = item.get_term_map() {
405 let height = trie_map.get_integer::<u32>("height").ok_or(Error::BadEtf("height"))?;
406
407 let entries = trie_map.get_list("entries").and_then(|list| {
408 let parsed = parse_list(list, |bytes| Entry::unpack(bytes));
409 if parsed.is_empty() { None } else { Some(parsed) }
410 });
411
412 let attestations = trie_map.get_list("attestations").and_then(|list| {
413 let parsed = parse_list(list, |bytes| Attestation::from_etf_bin(bytes));
414 if parsed.is_empty() { None } else { Some(parsed) }
415 });
416
417 let consensuses = trie_map.get_list("consensuses").and_then(|list| {
418 let parsed = parse_list(list, |bytes| Consensus::from_etf_bin(bytes));
419 if parsed.is_empty() { None } else { Some(parsed) }
420 });
421
422 tries.push(CatchupHeightReply { height, entries, attestations, consensuses });
423 }
424 }
425
426 Ok(Self { heights: tries })
427 }
428
429 fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
430 let mut m = HashMap::new();
431 m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
432
433 let tries_list: Vec<Term> = self
434 .heights
435 .iter()
436 .map(|trie| {
437 let mut trie_map = HashMap::new();
438 trie_map.insert(Term::Atom(Atom::from("height")), Term::FixInteger((trie.height as i32).into()));
439
440 if let Some(ref entries) = trie.entries {
441 if let Some(term) = serialize_list(entries, |e| e.pack()) {
442 trie_map.insert(Term::Atom(Atom::from("entries")), term);
443 }
444 }
445
446 if let Some(ref attestations) = trie.attestations {
447 if let Some(term) = serialize_list(attestations, |a| a.to_etf_bin()) {
448 trie_map.insert(Term::Atom(Atom::from("attestations")), term);
449 }
450 }
451
452 if let Some(ref consensuses) = trie.consensuses {
453 if let Some(term) = serialize_list(consensuses, |c| c.to_etf_bin()) {
454 trie_map.insert(Term::Atom(Atom::from("consensuses")), term);
455 }
456 }
457
458 Term::Map(Map::from(trie_map))
459 })
460 .collect();
461
462 m.insert(Term::Atom(Atom::from("tries")), Term::List(List::from(tries_list)));
463
464 let etf_term = Term::Map(Map::from(m));
465 Ok(encode_safe(&etf_term))
466 }
467
468 async fn handle(&self, ctx: &Context, src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
469 use tracing::{debug, info};
470
471 let mut instructions = Vec::new();
472 let rooted_tip_height = ctx.fabric.get_rooted_height()?.unwrap_or_default();
473
474 for trie in &self.heights {
475 if let Some(ref entries) = trie.entries {
477 debug!("Received {} entries at height {}", entries.len(), trie.height);
478 for entry in entries {
479 if entry.header.height >= rooted_tip_height {
480 instructions.push(Instruction::ReceivedEntry { entry: entry.clone() });
481 }
482 }
483 }
484
485 if let Some(ref attestations) = trie.attestations {
487 debug!("Received {} attestations at height {}", attestations.len(), trie.height);
488 for attestation in attestations {
489 instructions.push(Instruction::ReceivedAttestation { attestation: attestation.clone() });
490 }
491 }
492
493 if let Some(ref consensuses) = trie.consensuses {
495 debug!("Received {} consensuses at height {}", consensuses.len(), trie.height);
496 for consensus in consensuses {
497 instructions.push(Instruction::ReceivedConsensus { consensus: consensus.clone() });
498 }
499 }
500 }
501
502 if !instructions.is_empty() {
503 info!("Processed catchup_reply from {} with {} instructions", src, instructions.len());
504 }
505
506 Ok(instructions)
507 }
508}
509
510#[derive(Debug)]
511pub struct SolicitEntry {
512 pub hash: Vec<u8>,
513}
514
515#[derive(Debug)]
516pub struct SolicitEntry2;
517
518impl Typename for Ping {
519 fn typename(&self) -> &'static str {
520 Self::TYPENAME
521 }
522}
523
524#[async_trait::async_trait]
525impl Protocol for Ping {
526 fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
527 let ts_m = map.get_integer("ts_m").ok_or(Error::BadEtf("ts_m"))?;
528 Ok(Self { ts_m })
529 }
530
531 fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
532 let mut m = HashMap::new();
533 m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
534 m.insert(Term::Atom(Atom::from("ts_m")), Term::from(eetf::BigInteger { value: self.ts_m.into() }));
535 let term = Term::from(Map { map: m });
536 let etf_data = encode_safe(&term);
537 Ok(etf_data)
538 }
539
540 #[instrument(skip(self, ctx), fields(src = %src), name = "Ping::handle")]
541 async fn handle(&self, ctx: &Context, src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
542 ctx.node_peers.update_peer_ping_timestamp(src, self.ts_m).await;
543 Ok(vec![Instruction::SendPingReply { ts_m: self.ts_m, dst: src }])
544 }
545}
546
547impl Ping {
548 pub const TYPENAME: &'static str = "ping";
549
550 pub fn new() -> Self {
552 Self { ts_m: get_unix_millis_now() }
553 }
554
555 pub fn with_timestamp(ts_m: u64) -> Self {
557 Self { ts_m }
558 }
559}
560
561impl Typename for PingReply {
562 fn typename(&self) -> &'static str {
563 Self::TYPENAME
564 }
565}
566
567#[async_trait::async_trait]
568impl Protocol for PingReply {
569 fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
570 let ts_m = map.get_integer("ts_m").ok_or(Error::BadEtf("ts_m"))?;
571 let seen_time_ms = get_unix_millis_now();
572 Ok(Self { ts_m: ts_m, seen_time: seen_time_ms })
574 }
575
576 fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
577 let mut m = HashMap::new();
578 m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
579 m.insert(Term::Atom(Atom::from("ts_m")), Term::from(eetf::BigInteger { value: self.ts_m.into() }));
580 let term = Term::from(Map { map: m });
581 let etf_data = encode_safe(&term);
582 Ok(etf_data)
583 }
584
585 #[instrument(skip(self, ctx), fields(src = %src), name = "PingReply::handle")]
586 async fn handle(&self, ctx: &Context, src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
587 ctx.node_peers.update_peer_from_pong(src, self).await;
588 Ok(vec![Instruction::Noop { why: "pong processed".to_string() }])
589 }
590}
591
592impl PingReply {
593 pub const TYPENAME: &'static str = "ping_reply";
594}
595
596#[derive(Debug)]
597pub struct EventTx {
598 pub valid_txs: Vec<Vec<u8>>,
599}
600
601impl Typename for EventTx {
602 fn typename(&self) -> &'static str {
603 Self::TYPENAME
604 }
605}
606
607#[async_trait::async_trait]
608impl Protocol for EventTx {
609 fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
610 let txs_list = map.get_list("txs_packed").ok_or(Error::BadEtf("txs_packed"))?;
612 let valid_txs = EventTx::get_valid_txs_from_list(txs_list)?;
613 Ok(Self { valid_txs })
614 }
615 fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
616 let tx_terms: Vec<Term> = self.valid_txs.iter().map(|tx| Term::from(Binary { bytes: tx.clone() })).collect();
618 let mut m = HashMap::new();
619 m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
620 m.insert(Term::Atom(Atom::from("txs_packed")), Term::from(List { elements: tx_terms }));
622 let term = Term::from(Map { map: m });
623 let etf_data = encode_safe(&term);
624 Ok(etf_data)
625 }
626
627 async fn handle(&self, _ctx: &Context, _src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
628 Ok(vec![Instruction::Noop { why: "event_tx handling not implemented".to_string() }])
630 }
631}
632
633impl EventTx {
634 pub const TYPENAME: &'static str = "event_tx";
635
636 fn get_valid_txs_from_list(txs_list: &[Term]) -> Result<Vec<Vec<u8>>, Error> {
637 let mut good: Vec<Vec<u8>> = Vec::with_capacity(txs_list.len());
638
639 for t in txs_list {
640 let bin = if let Some(b) = TryAsRef::<Binary>::try_as_ref(t) {
642 b.bytes.as_slice()
643 } else {
644 continue;
646 };
647
648 match crate::consensus::doms::tx::validate(bin, false) {
650 Ok(_) => good.push(bin.to_vec()),
651 Err(e) => warn!("invalid tx in event_tx: {}", e),
652 }
653 }
654
655 Ok(good)
656 }
657}
658
659#[derive(Debug)]
660pub struct GetPeerAnrs {
661 pub has_peers_b3f4: Vec<[u8; 4]>,
662}
663
664impl Typename for GetPeerAnrs {
665 fn typename(&self) -> &'static str {
666 Self::TYPENAME
667 }
668}
669
670#[async_trait::async_trait]
671impl Protocol for GetPeerAnrs {
672 fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
673 let list = map.get_list("hasPeersb3f4").ok_or(Error::BadEtf("hasPeersb3f4"))?;
674 let mut has_peers_b3f4 = Vec::<[u8; 4]>::new();
675 for t in list {
676 use std::convert::TryInto;
677 let b = t.get_binary().ok_or(Error::BadEtf("hasPeersb3f4"))?;
678 let b3f4 = b.try_into().map_err(|_| Error::BadEtf("hasPeersb3f4_length"))?;
679 has_peers_b3f4.push(b3f4);
680 }
681
682 Ok(Self { has_peers_b3f4 })
683 }
684
685 fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
686 let b3f4_terms: Vec<Term> =
687 self.has_peers_b3f4.iter().map(|b3f4| Term::from(Binary { bytes: b3f4.to_vec() })).collect();
688 let mut m = HashMap::new();
689 m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
690 m.insert(Term::Atom(Atom::from("hasPeersb3f4")), Term::from(List { elements: b3f4_terms }));
691 let term = Term::from(Map { map: m });
692 let etf_data = encode_safe(&term);
693
694 Ok(etf_data)
695 }
696
697 async fn handle(&self, ctx: &Context, src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
698 let anrs = ctx.node_anrs.get_all_excluding_b3f4(&self.has_peers_b3f4).await;
699
700 Ok(vec![Instruction::SendGetPeerAnrsReply { anrs, dst: src }])
701 }
702}
703
704impl GetPeerAnrs {
705 pub const TYPENAME: &'static str = "get_peer_anrs";
706}
707
708#[derive(Debug)]
709pub struct GetPeerAnrsReply {
710 pub anrs: Vec<Anr>,
711}
712
713impl Typename for GetPeerAnrsReply {
714 fn typename(&self) -> &'static str {
715 Self::TYPENAME
716 }
717}
718
719#[async_trait::async_trait]
720impl Protocol for GetPeerAnrsReply {
721 fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
722 let list = map.get_list("anrs").ok_or(Error::BadEtf("anrs"))?;
723 let mut anrs = Vec::new();
724 for term in list {
725 let anr_map = term.get_term_map().ok_or(Error::BadEtf("anr_map"))?;
726 let anr = Anr::from_etf_term_map(anr_map)?;
727 if anr.verify_signature() {
728 anrs.push(anr);
729 }
730 }
731 Ok(Self { anrs })
732 }
733
734 fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
735 let anr_terms: Vec<Term> = self.anrs.iter().map(|anr| anr.to_etf_term()).collect();
736 let mut m = HashMap::new();
737 m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
738 m.insert(Term::Atom(Atom::from("anrs")), Term::from(List { elements: anr_terms }));
739 let term = Term::from(Map { map: m });
740 let etf_data = encode_safe(&term);
741 Ok(etf_data)
742 }
743
744 async fn handle(&self, ctx: &Context, _src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
745 for anr in &self.anrs {
746 ctx.node_anrs.insert(anr.clone()).await;
747 }
748 Ok(vec![Instruction::Noop { why: format!("inserted {} anrs", self.anrs.len()) }])
749 }
750}
751
752impl GetPeerAnrsReply {
753 pub const TYPENAME: &'static str = "get_peer_anrs_reply";
754}
755
756#[derive(Debug)]
757pub struct NewPhoneWhoDis {}
758
759impl Typename for NewPhoneWhoDis {
760 fn typename(&self) -> &'static str {
761 Self::TYPENAME
762 }
763}
764
765#[async_trait::async_trait]
766impl Protocol for NewPhoneWhoDis {
767 fn from_etf_map_validated(_map: TermMap) -> Result<Self, Error> {
768 Ok(Self {})
770 }
771
772 fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
773 let mut map = TermMap::default();
774 map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
775 Ok(encode_safe(&map.into_term()))
777 }
778
779 #[instrument(skip_all)]
780 async fn handle(&self, _ctx: &Context, src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
781 Ok(vec![Instruction::SendNewPhoneWhoDisReply { dst: src }])
783 }
784}
785
786impl NewPhoneWhoDis {
787 pub const TYPENAME: &'static str = "new_phone_who_dis";
788
789 pub fn new() -> Self {
791 Self {}
792 }
793}
794
795#[derive(Debug)]
796pub struct NewPhoneWhoDisReply {
797 pub anr: Anr,
798}
799
800impl Typename for NewPhoneWhoDisReply {
801 fn typename(&self) -> &'static str {
802 Self::TYPENAME
803 }
804}
805
806#[async_trait::async_trait]
807impl Protocol for NewPhoneWhoDisReply {
808 fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
809 let anr_map = map.get_term_map("anr").ok_or(Error::BadEtf("anr"))?;
810 let anr = Anr::from_etf_term_map(anr_map)?;
811
812 if !anr.verify_signature() {
813 return Err(Error::BadEtf("anr_signature_invalid"));
814 }
815
816 Ok(Self { anr })
817 }
818
819 fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
820 let mut map = TermMap::default();
821 map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
822 map.insert(Term::Atom(Atom::from("anr")), self.anr.to_etf_term());
823 Ok(encode_safe(&map.into_term()))
824 }
825
826 #[instrument(skip(self, ctx), fields(src = %src), name = "NewPhoneWhoDisReply::handle")]
827 async fn handle(&self, ctx: &Context, src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
828 if src != self.anr.ip4 {
830 warn!("new_phone_who_dis_reply ip mismatched with anr {}", self.anr.ip4);
831 return Err(Error::BadEtf("anr_ip_mismatch"));
832 }
833
834 let now = crate::utils::misc::get_unix_secs_now();
836 let age_secs = now.saturating_sub(self.anr.ts);
837 if age_secs > 60 {
838 warn!("new_phone_who_dis_reply ANR too old: {} seconds", age_secs);
839 return Err(Error::BadEtf("anr_too_old"));
840 }
841
842 ctx.node_anrs.insert(self.anr.clone()).await;
844 ctx.node_anrs.set_handshaked(&self.anr.pk).await;
845 ctx.update_peer_from_anr(src, &self.anr.pk, &self.anr.version, Some(HandshakeStatus::Completed)).await;
846
847 Ok(vec![Instruction::Noop { why: "handshake completed".to_string() }])
848 }
849}
850
851impl NewPhoneWhoDisReply {
852 pub const TYPENAME: &'static str = "new_phone_who_dis_reply";
853
854 pub fn new(anr: Anr) -> Self {
855 Self { anr }
856 }
857}
858
859#[derive(Debug)]
860pub struct SpecialBusiness {
861 pub business: Vec<u8>,
862}
863
864impl Typename for SpecialBusiness {
865 fn typename(&self) -> &'static str {
866 Self::TYPENAME
867 }
868}
869
870#[async_trait::async_trait]
871impl Protocol for SpecialBusiness {
872 fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
873 let business = map.get_binary::<Vec<u8>>("business").ok_or(Error::BadEtf("business"))?;
874 Ok(Self { business })
875 }
876
877 fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
878 let mut m = HashMap::new();
879 m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
880 m.insert(Term::Atom(Atom::from("business")), Term::from(Binary { bytes: self.business.clone() }));
881 let term = Term::from(Map { map: m });
882 let etf_data = encode_safe(&term);
883 Ok(etf_data)
884 }
885
886 async fn handle(&self, _ctx: &Context, _src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
887 Ok(vec![Instruction::SpecialBusiness { business: self.business.clone() }])
890 }
891}
892
893impl SpecialBusiness {
894 pub const TYPENAME: &'static str = "special_business";
895}
896
897#[derive(Debug)]
898pub struct SpecialBusinessReply {
899 pub business: Vec<u8>,
900}
901
902impl Typename for SpecialBusinessReply {
903 fn typename(&self) -> &'static str {
904 Self::TYPENAME
905 }
906}
907
908#[async_trait::async_trait]
909impl Protocol for SpecialBusinessReply {
910 fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
911 let business = map.get_binary::<Vec<u8>>("business").ok_or(Error::BadEtf("business"))?;
912 Ok(Self { business })
913 }
914
915 fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
916 let mut m = HashMap::new();
917 m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
918 m.insert(Term::Atom(Atom::from("business")), Term::from(Binary { bytes: self.business.clone() }));
919 let term = Term::from(Map { map: m });
920 let etf_data = encode_safe(&term);
921 Ok(etf_data)
922 }
923
924 async fn handle(&self, _ctx: &Context, _src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
925 Ok(vec![Instruction::SpecialBusinessReply { business: self.business.clone() }])
927 }
928}
929
930impl SpecialBusinessReply {
931 pub const TYPENAME: &'static str = "special_business_reply";
932}
933
934#[cfg(test)]
935mod tests {
936 use super::*;
937 use crate::config::Config;
938 use crate::consensus::doms::entry::{EntryHeader, EntrySummary};
939 use crate::utils::bls12_381::sign as bls_sign;
940
941 #[tokio::test]
942 async fn test_ping_etf_roundtrip() {
943 let _temporal = create_dummy_entry_summary();
945 let _rooted = create_dummy_entry_summary();
946 let ping = Ping::new();
947
948 let bin = ping.to_etf_bin().expect("should serialize");
950
951 let result = parse_etf_bin(&bin).expect("should deserialize");
953
954 assert_eq!(result.typename(), "ping");
956 }
957
958 #[tokio::test]
959 async fn test_pong_etf_roundtrip() {
960 let pong = PingReply { ts_m: 1234567890, seen_time: 9876543210 };
961
962 let bin = pong.to_etf_bin().expect("should serialize");
963 let result = parse_etf_bin(&bin).expect("should deserialize");
964
965 assert_eq!(result.typename(), "ping_reply");
967 }
968
969 #[tokio::test]
970 async fn test_txpool_etf_roundtrip() {
971 let event_tx = EventTx { valid_txs: vec![vec![1, 2, 3], vec![4, 5, 6]] };
972
973 let bin = event_tx.to_etf_bin().expect("should serialize");
974 let result = parse_etf_bin(&bin).expect("should deserialize");
975
976 assert_eq!(result.typename(), "event_tx");
977 }
978
979 #[tokio::test]
980 async fn test_peers_etf_roundtrip() {
981 let peers = GetPeerAnrs { has_peers_b3f4: vec![[192, 168, 1, 1], [10, 0, 0, 1]] };
982
983 let bin = peers.to_etf_bin().expect("should serialize");
984 let result = parse_etf_bin(&bin).expect("should deserialize");
985
986 assert_eq!(result.typename(), "get_peer_anrs");
987 }
988
989 #[tokio::test]
990 async fn test_new_phone_who_dis_roundtrip_and_handle() {
991 use crate::node::anr;
992 use crate::utils::{bls12_381 as bls, misc::get_unix_secs_now};
993 use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
994
995 let sk = bls::generate_sk();
997 let pk = bls::get_public_key(&sk).expect("pk");
998 let pop = bls_sign(&sk, &pk, crate::consensus::DST_POP).expect("pop");
999 let ip = Ipv4Addr::new(127, 0, 0, 1);
1000 let _my_anr = anr::Anr::build(&sk, &pk, &pop, ip, Ver::new(1, 0, 0)).expect("anr");
1001
1002 let _challenge = get_unix_secs_now();
1004 let msg = NewPhoneWhoDis::new();
1005
1006 let bin = msg.to_etf_bin().expect("serialize");
1008 let parsed = parse_etf_bin(&bin).expect("deserialize");
1009 assert_eq!(parsed.typename(), "new_phone_who_dis");
1010
1011 let _src = SocketAddr::V4(SocketAddrV4::new(ip, 36969));
1013 }
1015
1016 #[tokio::test]
1017 async fn test_new_phone_who_dis_ip_mismatch_noop() {
1018 use crate::node::anr;
1019 use crate::utils::{bls12_381 as bls, misc::get_unix_secs_now};
1020 use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
1021
1022 let sk = bls::generate_sk();
1023 let pk = bls::get_public_key(&sk).expect("pk");
1024 let pop = bls_sign(&sk, &pk, crate::consensus::DST_POP).expect("pop");
1025 let ip = Ipv4Addr::new(127, 0, 0, 1);
1026 let _my_anr = anr::Anr::build(&sk, &pk, &pop, ip, Ver::new(1, 0, 0)).expect("anr");
1027 let _challenge = get_unix_secs_now();
1028 let _msg = NewPhoneWhoDis::new();
1029
1030 let wrong_ip = Ipv4Addr::new(127, 0, 0, 2);
1032 let _src = SocketAddr::V4(SocketAddrV4::new(wrong_ip, 36969));
1033 }
1035
1036 fn create_dummy_entry_summary() -> EntrySummary {
1037 let header = EntryHeader {
1038 height: 1,
1039 slot: 1,
1040 prev_slot: 0,
1041 prev_hash: [0u8; 32],
1042 dr: [1u8; 32],
1043 vr: [2u8; 96],
1044 signer: [3u8; 48],
1045 txs_hash: [4u8; 32],
1046 };
1047
1048 EntrySummary { header, signature: [5u8; 96], mask: None }
1049 }
1050
1051 #[tokio::test]
1052 async fn test_new_phone_who_dis_v1_1_7_format() {
1053 let msg = NewPhoneWhoDis::new();
1055
1056 let bin = msg.to_etf_bin().expect("serialize NewPhoneWhoDis");
1058
1059 let parsed = parse_etf_bin(&bin).expect("deserialize NewPhoneWhoDis");
1061 assert_eq!(parsed.typename(), "new_phone_who_dis");
1062
1063 if let Ok(_parsed_msg) =
1065 NewPhoneWhoDis::from_etf_map_validated(Term::decode(&bin[..]).expect("decode").get_term_map().expect("map"))
1066 {
1067 assert!(true);
1069 } else {
1070 panic!("Failed to deserialize simplified NewPhoneWhoDis");
1071 }
1072 }
1073
1074 #[tokio::test]
1075 async fn test_ping_ts_m_field_validation() {
1076 let ping = Ping::with_timestamp(1234567890);
1077 let valid_bin = ping.to_etf_bin().expect("should serialize");
1078 let result = parse_etf_bin(&valid_bin);
1079 assert!(result.is_ok(), "Valid ping should parse successfully");
1080
1081 let ping_reply = PingReply { ts_m: 1234567890, seen_time: 9876543210 };
1082 let valid_bin = ping_reply.to_etf_bin().expect("should serialize");
1083 let result = parse_etf_bin(&valid_bin);
1084 assert!(result.is_ok(), "Valid ping_reply should parse successfully");
1085 }
1086
1087 #[tokio::test]
1088 async fn test_ping_complete_roundtrip_with_encryption() {
1089 use crate::node::reassembler::Message;
1091 use crate::utils::bls12_381 as bls;
1092
1093 let sender_sk = bls::generate_sk();
1095 let sender_pk = bls::get_public_key(&sender_sk).expect("sender pk");
1096
1097 let receiver_sk = bls::generate_sk();
1098 let receiver_pk = bls::get_public_key(&receiver_sk).expect("receiver pk");
1099
1100 let original_ping = Ping::with_timestamp(1234567890);
1102 let original_payload = original_ping.to_etf_bin().expect("serialize ping");
1103
1104 let shared_secret = bls::get_shared_secret(&receiver_pk, &sender_sk).expect("shared secret");
1106
1107 let version = Ver::new(1, 1, 7);
1109 let encrypted_messages =
1110 Message::encrypt(&sender_pk, &shared_secret, &original_payload, version).expect("encrypt message");
1111
1112 assert_eq!(encrypted_messages.len(), 1, "should create single encrypted message for small payload");
1114 let encrypted_msg = &encrypted_messages[0];
1115
1116 let udp_packet_bytes = encrypted_msg.to_bytes();
1118
1119 assert_eq!(&udp_packet_bytes[0..3], b"AMA", "UDP packet should start with AMA magic");
1121
1122 let received_encrypted_msg =
1124 Message::try_from(udp_packet_bytes.as_slice()).expect("deserialize encrypted message from UDP packet");
1125
1126 assert_eq!(received_encrypted_msg.version, encrypted_msg.version);
1128 assert_eq!(received_encrypted_msg.pk, encrypted_msg.pk);
1129 assert_eq!(received_encrypted_msg.shard_index, encrypted_msg.shard_index);
1130 assert_eq!(received_encrypted_msg.shard_total, encrypted_msg.shard_total);
1131 assert_eq!(received_encrypted_msg.ts_nano, encrypted_msg.ts_nano);
1132 assert_eq!(received_encrypted_msg.original_size, encrypted_msg.original_size);
1133 assert_eq!(received_encrypted_msg.payload, encrypted_msg.payload);
1134
1135 let decrypted_payload = received_encrypted_msg.decrypt(&shared_secret).expect("decrypt payload");
1137
1138 let parsed_proto = parse_etf_bin(&decrypted_payload).expect("parse decrypted payload");
1140
1141 assert_eq!(parsed_proto.typename(), "ping");
1143
1144 let decrypted_ping = Ping::from_etf_map_validated(
1146 Term::decode(decrypted_payload.as_slice()).expect("decode").get_term_map().expect("map"),
1147 )
1148 .expect("parse ping");
1149
1150 assert_eq!(original_ping.ts_m, decrypted_ping.ts_m, "ping timestamp should match after roundtrip");
1152 }
1153
1154 #[tokio::test]
1155 async fn test_protocol_send_to() {
1156 use crate::socket::MockSocket;
1158 use crate::utils::bls12_381 as bls;
1159 use std::net::Ipv4Addr;
1160 use std::sync::Arc;
1161
1162 let sk = bls::generate_sk();
1163 let pk = bls::get_public_key(&sk).expect("pk");
1164 let pop = bls_sign(&sk, &pk, crate::consensus::DST_POP).expect("pop");
1165
1166 let config = Config {
1167 work_folder: "/tmp/test_protocol_send_to".to_string(),
1168 version: Ver::new(1, 2, 3),
1169 offline: false,
1170 http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1171 http_port: 3000,
1172 udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1173 udp_port: 36969,
1174 public_ipv4: Some("127.0.0.1".to_string()),
1175 seed_ips: Vec::new(),
1176 seed_anrs: Vec::new(),
1177 other_nodes: Vec::new(),
1178 trust_factor: 0.8,
1179 max_peers: 100,
1180 trainer_sk: sk,
1181 trainer_pk: pk,
1182 trainer_pk_b58: String::new(),
1183 trainer_pop: pop.to_vec(),
1184 archival_node: false,
1185 autoupdate: false,
1186 computor_type: None,
1187 snapshot_height: 0,
1188 anr: None,
1189 anr_desc: None,
1190 anr_name: None,
1191 };
1192
1193 let dummy_socket = Arc::new(MockSocket::new());
1194 let target: Ipv4Addr = "127.0.0.1".parse().unwrap();
1195
1196 match Context::with_config_and_socket(config, dummy_socket).await {
1197 Ok(ctx) => {
1198 let pong = PingReply { ts_m: 12345, seen_time: 67890 };
1200
1201 let metrics_json_before = ctx.metrics.get_json();
1203 let sent_before = metrics_json_before.get("outgoing_protos");
1204
1205 match pong.send_to_with_metrics(&ctx, target).await {
1207 Ok(_) => {
1208 }
1210 Err(_) => {
1211 }
1213 }
1214
1215 let metrics_json_after = ctx.metrics.get_json();
1217 let sent_after = metrics_json_after.get("outgoing_protos").unwrap().as_object().unwrap();
1218 match sent_before {
1219 Some(obj) => {
1220 let sent_before_obj = obj.as_object().unwrap();
1221 let pong_before = sent_before_obj.get("ping_reply").map(|v| v.as_u64().unwrap()).unwrap_or(0);
1222 let pong_after = sent_after.get("ping_reply").map(|v| v.as_u64().unwrap()).unwrap_or(0);
1223 assert_eq!(
1224 pong_after,
1225 pong_before + 1,
1226 "Sent packet counter should increment even on send failure"
1227 );
1228 }
1229 None => {
1230 assert_eq!(sent_after.get("ping_reply").unwrap().as_u64().unwrap(), 1);
1232 }
1233 }
1234 }
1235 Err(_) => {
1236 }
1238 }
1239 }
1240
1241 #[tokio::test]
1242 async fn test_catchup_and_catchup_reply_etf_roundtrip() {
1243 let height_flag = CatchupHeight {
1245 height: 42,
1246 c: Some(true),
1247 e: None,
1248 a: Some(true),
1249 hashes: Some(vec![vec![1, 2, 3], vec![4, 5, 6]]),
1250 };
1251
1252 let catchup = Catchup { heights: vec![height_flag] };
1253
1254 let catchup_bin = catchup.to_etf_bin().expect("should serialize catchup");
1256 let parsed_catchup = parse_etf_bin(&catchup_bin).expect("should deserialize catchup");
1257 assert_eq!(parsed_catchup.typename(), "catchup");
1258
1259 let entry1 = Entry {
1261 hash: [1; 32],
1262 header: EntryHeader {
1263 height: 100,
1264 slot: 1,
1265 prev_slot: 0,
1266 prev_hash: [0; 32],
1267 dr: [2; 32],
1268 vr: [0; 96],
1269 signer: [3; 48],
1270 txs_hash: [4; 32],
1271 },
1272 signature: [5; 96],
1273 mask: None,
1274 txs: vec![vec![1, 2, 3, 4]],
1275 };
1276
1277 let attestation1 =
1278 Attestation { entry_hash: [6; 32], mutations_hash: [7; 32], signer: [8; 48], signature: [9; 96] };
1279
1280 let consensus1 = Consensus {
1281 entry_hash: [10; 32],
1282 mutations_hash: [11; 32],
1283 mask: Some(vec![true, false, true]),
1284 agg_sig: [12; 96],
1285 score: Some(0.95),
1286 };
1287
1288 let trie1 = CatchupHeightReply {
1289 height: 100,
1290 entries: Some(vec![entry1]),
1291 attestations: Some(vec![attestation1]),
1292 consensuses: None,
1293 };
1294 let trie2 =
1295 CatchupHeightReply { height: 101, entries: None, attestations: None, consensuses: Some(vec![consensus1]) };
1296 let catchup_reply = CatchupReply { heights: vec![trie1, trie2] };
1297
1298 let reply_bin = catchup_reply.to_etf_bin().expect("should serialize catchup_reply");
1300 let parsed_reply = parse_etf_bin(&reply_bin).expect("should deserialize catchup_reply");
1301 assert_eq!(parsed_reply.typename(), "catchup_reply");
1302 }
1303}