amadeus_node/node/
protocol.rs

1use crate::Context;
2#[cfg(test)]
3use crate::Ver;
4use crate::bic::sol;
5use crate::bic::sol::Solution;
6use crate::consensus::consensus::{self, Consensus};
7use crate::consensus::doms::attestation::EventAttestation;
8use crate::consensus::doms::entry::Entry;
9use crate::consensus::doms::{Attestation, EntrySummary};
10use crate::consensus::fabric::Fabric;
11use crate::node::anr::Anr;
12use crate::node::peers::HandshakeStatus;
13use crate::node::{anr, peers};
14use crate::utils::bls12_381;
15use crate::utils::misc::{TermExt, TermMap, Typename, get_unix_millis_now, parse_list, serialize_list};
16use crate::utils::safe_etf::encode_safe;
17use eetf::convert::TryAsRef;
18use eetf::{Atom, Binary, DecodeError as EtfDecodeError, EncodeError as EtfEncodeError, List, Map, Term};
19use std::collections::HashMap;
20use std::fmt::Debug;
21use std::io::Error as IoError;
22use std::net::{Ipv4Addr, SocketAddr};
23use tracing::instrument;
24use tracing::warn;
25
26/// Every object that has this trait must be convertible from an Erlang ETF
27/// Binary representation and must be able to handle itself as a message
28#[async_trait::async_trait]
29pub trait Protocol: Typename + Debug + Send + Sync {
30    fn from_etf_map_validated(map: TermMap) -> Result<Self, Error>
31    where
32        Self: Sized;
33    /// Convert to ETF binary format for network transmission
34    fn to_etf_bin(&self) -> Result<Vec<u8>, Error>;
35    /// Handle a message returning instructions for upper layers
36    async fn handle(&self, ctx: &Context, src: Ipv4Addr) -> Result<Vec<Instruction>, Error>;
37    /// Send this protocol message to a destination using encrypted format (v1.1.7+)
38    /// REQUIRES ANR to be available - use send_to_legacy_with_metrics for bootstrap messages
39    async fn send_to_with_metrics(&self, ctx: &Context, dst: Ipv4Addr) -> Result<(), Error> {
40        let dst_addr = SocketAddr::new(std::net::IpAddr::V4(dst), ctx.config.udp_port);
41        let dst_anr = ctx.node_anrs.get_by_ip4(dst).await.ok_or(Error::NoAnrForDestination(dst))?;
42        let payload = self.to_etf_bin().inspect_err(|e| ctx.metrics.add_error(e))?;
43
44        let shards = ctx.reassembler.build_shards(&ctx.config, &payload, &dst_anr.pk).await?;
45        for shard in &shards {
46            ctx.socket.send_to_with_metrics(shard, dst_addr, &ctx.metrics).await?;
47        }
48
49        ctx.metrics.add_outgoing_proto(self.typename());
50        Ok(())
51    }
52}
53
54#[derive(Debug, thiserror::Error, strum_macros::IntoStaticStr)]
55pub enum Error {
56    #[error(transparent)]
57    Io(#[from] IoError),
58    #[error(transparent)]
59    EtfDecode(#[from] EtfDecodeError),
60    #[error(transparent)]
61    EtfEncode(#[from] EtfEncodeError),
62    #[error(transparent)]
63    Bls(#[from] bls12_381::Error),
64    #[error(transparent)]
65    Tx(#[from] crate::consensus::doms::tx::Error),
66    #[error(transparent)]
67    Entry(#[from] crate::consensus::doms::entry::Error),
68    #[error(transparent)]
69    Archiver(#[from] crate::utils::archiver::Error),
70    #[error(transparent)]
71    Peers(#[from] peers::Error),
72    #[error(transparent)]
73    Consensus(#[from] consensus::Error),
74    #[error(transparent)]
75    Fabric(#[from] crate::consensus::fabric::Error),
76    #[error(transparent)]
77    Sol(#[from] sol::Error),
78    #[error(transparent)]
79    Att(#[from] crate::consensus::doms::attestation::Error),
80    #[error(transparent)]
81    Reassembler(#[from] crate::node::reassembler::Error),
82    #[error(transparent)]
83    Anr(#[from] anr::Error),
84    #[error("bad etf: {0}")]
85    BadEtf(&'static str),
86    #[error("No ANR found for destination IP: {0}")]
87    NoAnrForDestination(Ipv4Addr),
88}
89
90impl Typename for Error {
91    fn typename(&self) -> &'static str {
92        self.into()
93    }
94}
95
96/// Result of handling an incoming message.
97#[derive(Debug, strum_macros::IntoStaticStr)]
98pub enum Instruction {
99    Noop { why: String },
100    SendNewPhoneWhoDisReply { dst: Ipv4Addr },
101    SendGetPeerAnrsReply { anrs: Vec<Anr>, dst: Ipv4Addr },
102    SendPingReply { ts_m: u64, dst: Ipv4Addr },
103
104    ValidTxs { txs: Vec<Vec<u8>> },
105    ReceivedSol { sol: Solution },
106    ReceivedEntry { entry: Entry },
107    ReceivedAttestation { attestation: Attestation },
108    ReceivedConsensus { consensus: Consensus },
109    ConsensusesPacked { packed: Vec<u8> },
110    CatchupEntryReq { heights: Vec<u64> },
111    CatchupTriReq { heights: Vec<u64> },
112    CatchupBiReq { heights: Vec<u64> },
113    CatchupAttestationReq { hashes: Vec<Vec<u8>> },
114    SpecialBusiness { business: Vec<u8> },
115    SpecialBusinessReply { business: Vec<u8> },
116    SolicitEntry { hash: Vec<u8> },
117    SolicitEntry2,
118}
119
120impl Typename for Instruction {
121    fn typename(&self) -> &'static str {
122        self.into()
123    }
124}
125
126/// Does proto parsing and validation
127#[instrument(skip(bin), name = "Proto::from_etf_validated")]
128pub fn parse_etf_bin(bin: &[u8]) -> Result<Box<dyn Protocol>, Error> {
129    // TODO: this function is a main UDP router and is subject to refactoring
130    let term = Term::decode(bin)?;
131    let map = term.get_term_map().ok_or(Error::BadEtf("map"))?;
132
133    // `op` determines the variant
134    let op_atom = map.get_atom("op").ok_or(Error::BadEtf("op"))?;
135    let proto: Box<dyn Protocol> = match op_atom.name.as_str() {
136        Ping::TYPENAME => Box::new(Ping::from_etf_map_validated(map)?),
137        PingReply::TYPENAME => Box::new(PingReply::from_etf_map_validated(map)?),
138        Entry::TYPENAME => Box::new(Entry::from_etf_map_validated(map)?),
139        EventTip::TYPENAME => Box::new(EventTip::from_etf_map_validated(map)?),
140        EventAttestation::TYPENAME => Box::new(EventAttestation::from_etf_map_validated(map)?),
141        Solution::TYPENAME => Box::new(Solution::from_etf_map_validated(map)?),
142        EventTx::TYPENAME => Box::new(EventTx::from_etf_map_validated(map)?),
143        GetPeerAnrs::TYPENAME => Box::new(GetPeerAnrs::from_etf_map_validated(map)?),
144        GetPeerAnrsReply::TYPENAME => Box::new(GetPeerAnrsReply::from_etf_map_validated(map)?),
145        NewPhoneWhoDis::TYPENAME => Box::new(NewPhoneWhoDis::from_etf_map_validated(map)?),
146        NewPhoneWhoDisReply::TYPENAME => Box::new(NewPhoneWhoDisReply::from_etf_map_validated(map)?),
147        SpecialBusiness::TYPENAME => Box::new(SpecialBusiness::from_etf_map_validated(map)?),
148        SpecialBusinessReply::TYPENAME => Box::new(SpecialBusinessReply::from_etf_map_validated(map)?),
149        Catchup::TYPENAME => Box::new(Catchup::from_etf_map_validated(map)?),
150        CatchupReply::TYPENAME => Box::new(CatchupReply::from_etf_map_validated(map)?),
151        _ => {
152            warn!("Unknown operation: {}", op_atom.name);
153            return Err(Error::BadEtf("op"));
154        }
155    };
156
157    Ok(proto)
158}
159
160#[derive(Debug)]
161pub struct EventTip {
162    pub temporal: EntrySummary,
163    pub rooted: EntrySummary,
164}
165
166impl Typename for EventTip {
167    fn typename(&self) -> &'static str {
168        Self::TYPENAME
169    }
170}
171
172#[async_trait::async_trait]
173impl Protocol for EventTip {
174    fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
175        let temporal_term = map.get_term_map("temporal").ok_or(Error::BadEtf("temporal"))?;
176        let rooted_term = map.get_term_map("rooted").ok_or(Error::BadEtf("rooted"))?;
177        let temporal = EntrySummary::from_etf_term(&temporal_term)?;
178        let rooted = EntrySummary::from_etf_term(&rooted_term)?;
179
180        Ok(Self { temporal, rooted })
181    }
182    fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
183        let mut m = HashMap::new();
184        m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
185        m.insert(Term::Atom(Atom::from("temporal")), self.temporal.to_etf_term()?);
186        m.insert(Term::Atom(Atom::from("rooted")), self.rooted.to_etf_term()?);
187        let term = Term::from(Map { map: m });
188        let etf_data = encode_safe(&term);
189
190        Ok(etf_data)
191    }
192
193    #[instrument(skip(self, ctx), fields(src = %src), name = "EventTip::handle")]
194    async fn handle(&self, ctx: &Context, src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
195        // TODO: validate temporal and rooted entry signatures like in Elixir
196
197        let signer = self.temporal.header.signer.to_vec();
198        let db = ctx.fabric.db();
199        let is_trainer = match crate::consensus::trainers_for_height(db, crate::consensus::chain_height(db)) {
200            Some(trainers) => trainers.iter().any(|pk| pk.as_slice() == signer),
201            None => false,
202        };
203
204        if is_trainer || ctx.is_peer_handshaked(src).await {
205            ctx.node_peers.update_peer_from_tip(ctx, src, self).await;
206        }
207
208        Ok(vec![Instruction::Noop { why: "event_tip handling not implemented".to_string() }])
209    }
210}
211
212impl EventTip {
213    pub const TYPENAME: &'static str = "event_tip";
214
215    pub fn new(temporal: EntrySummary, rooted: EntrySummary) -> Self {
216        Self { temporal, rooted }
217    }
218
219    pub fn from_current_tips() -> Result<Self, Error> {
220        // Deprecated: use from_current_tips_db with an explicit DB handle
221        Err(Error::Consensus(consensus::Error::NotImplemented("from_current_tips requires DB context")))
222    }
223
224    /// Build EventTip from the current temporal/rooted tips in Fabric using the provided Fabric handle
225    pub fn from_current_tips_db(fab: &Fabric) -> Result<Self, Error> {
226        // Helper to load EntrySummary by tip hash, or return empty summary if missing
227        fn entry_summary_by_hash(fab: &Fabric, hash: &[u8; 32]) -> EntrySummary {
228            if let Some(entry) = fab.get_entry_by_hash(hash) { entry.into() } else { EntrySummary::empty() }
229        }
230
231        let temporal_summary = match fab.get_temporal_hash()? {
232            Some(h) => entry_summary_by_hash(fab, &h),
233            None => EntrySummary::empty(),
234        };
235
236        let rooted_summary = match fab.get_rooted_hash()? {
237            Some(h) => entry_summary_by_hash(fab, &h),
238            None => EntrySummary::empty(),
239        };
240
241        Ok(Self { temporal: temporal_summary, rooted: rooted_summary })
242    }
243}
244
245#[derive(Debug)]
246pub struct Ping {
247    pub ts_m: u64,
248}
249
250#[derive(Debug)]
251pub struct PingReply {
252    pub ts_m: u64,
253    pub seen_time: u64,
254}
255
256#[derive(Debug)]
257pub struct ConsensusBulk {
258    pub consensuses_packed: Vec<u8>,
259}
260
261#[derive(Debug)]
262pub struct CatchupEntry {
263    pub heights: Vec<u64>,
264}
265
266#[derive(Debug)]
267pub struct CatchupTri {
268    pub heights: Vec<u64>,
269}
270
271#[derive(Debug)]
272pub struct CatchupBi {
273    pub heights: Vec<u64>,
274}
275
276#[derive(Debug)]
277pub struct CatchupAttestation {
278    pub hashes: Vec<Vec<u8>>,
279}
280
281/// Requests information about selected heights (<1000 heights per request)
282#[derive(Debug)]
283pub struct Catchup {
284    pub heights: Vec<CatchupHeight>,
285}
286
287#[derive(Debug, Clone)]
288pub struct CatchupHeight {
289    pub height: u32,
290    pub c: Option<bool>,              // consensus flag (matches Elixir)
291    pub e: Option<bool>,              // entries flag (matches Elixir)
292    pub a: Option<bool>,              // attestations flag (matches Elixir)
293    pub hashes: Option<Vec<Vec<u8>>>, // skip these entry hashes (matches Elixir)
294}
295
296impl Catchup {
297    pub const TYPENAME: &'static str = "catchup";
298}
299
300impl Typename for Catchup {
301    fn typename(&self) -> &'static str {
302        Self::TYPENAME
303    }
304}
305
306#[async_trait::async_trait]
307impl Protocol for Catchup {
308    fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
309        let height_flags_term = map.get_list("height_flags").ok_or(Error::BadEtf("height_flags"))?;
310        let mut height_flags = Vec::new();
311
312        for item in height_flags_term {
313            if let Some(flag_map) = item.get_term_map() {
314                let height = flag_map.get_integer::<u32>("height").ok_or(Error::BadEtf("height"))?;
315
316                let c = flag_map.get_atom("c").map(|a| a.name == "true");
317                let e = flag_map.get_atom("e").map(|a| a.name == "true");
318                let a = flag_map.get_atom("a").map(|a| a.name == "true");
319
320                let hashes = flag_map.get_list("hashes").map(|hashes_list| {
321                    hashes_list.iter().filter_map(|h| h.get_binary().map(|bytes| bytes.to_vec())).collect()
322                });
323
324                height_flags.push(CatchupHeight { height, c, e, a, hashes });
325            }
326        }
327
328        Ok(Self { heights: height_flags })
329    }
330
331    fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
332        let mut m = HashMap::new();
333        m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
334
335        let height_flags_list: Vec<Term> = self
336            .heights
337            .iter()
338            .map(|flag| {
339                let mut flag_map = HashMap::new();
340                flag_map.insert(Term::Atom(Atom::from("height")), Term::FixInteger((flag.height as i32).into()));
341
342                if let Some(true) = flag.c {
343                    flag_map.insert(Term::Atom(Atom::from("c")), Term::Atom(Atom::from("true")));
344                }
345                if let Some(true) = flag.e {
346                    flag_map.insert(Term::Atom(Atom::from("e")), Term::Atom(Atom::from("true")));
347                }
348                if let Some(true) = flag.a {
349                    flag_map.insert(Term::Atom(Atom::from("a")), Term::Atom(Atom::from("true")));
350                }
351                if let Some(ref hashes) = flag.hashes {
352                    if !hashes.is_empty() {
353                        let hashes_terms: Vec<Term> =
354                            hashes.iter().map(|h| Term::Binary(Binary::from(h.clone()))).collect();
355                        flag_map.insert(Term::Atom(Atom::from("hashes")), Term::List(List::from(hashes_terms)));
356                    }
357                }
358                Term::Map(Map::from(flag_map))
359            })
360            .collect();
361
362        m.insert(Term::Atom(Atom::from("height_flags")), Term::List(List::from(height_flags_list)));
363
364        let etf_term = Term::Map(Map::from(m));
365        Ok(encode_safe(&etf_term))
366    }
367
368    async fn handle(&self, _ctx: &Context, _src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
369        // TODO: implement catchup handling logic
370        Ok(vec![Instruction::Noop { why: "catchup received".to_string() }])
371    }
372}
373
374#[derive(Debug)]
375pub struct CatchupReply {
376    pub heights: Vec<CatchupHeightReply>,
377}
378
379#[derive(Debug, Clone)]
380pub struct CatchupHeightReply {
381    pub height: u32,
382    pub entries: Option<Vec<Entry>>,
383    pub attestations: Option<Vec<Attestation>>,
384    pub consensuses: Option<Vec<Consensus>>,
385}
386
387impl CatchupReply {
388    pub const TYPENAME: &'static str = "catchup_reply";
389}
390
391impl Typename for CatchupReply {
392    fn typename(&self) -> &'static str {
393        Self::TYPENAME
394    }
395}
396
397#[async_trait::async_trait]
398impl Protocol for CatchupReply {
399    fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
400        let tries_term = map.get_list("tries").ok_or(Error::BadEtf("tries"))?;
401        let mut tries = Vec::new();
402
403        for item in tries_term {
404            if let Some(trie_map) = item.get_term_map() {
405                let height = trie_map.get_integer::<u32>("height").ok_or(Error::BadEtf("height"))?;
406
407                let entries = trie_map.get_list("entries").and_then(|list| {
408                    let parsed = parse_list(list, |bytes| Entry::unpack(bytes));
409                    if parsed.is_empty() { None } else { Some(parsed) }
410                });
411
412                let attestations = trie_map.get_list("attestations").and_then(|list| {
413                    let parsed = parse_list(list, |bytes| Attestation::from_etf_bin(bytes));
414                    if parsed.is_empty() { None } else { Some(parsed) }
415                });
416
417                let consensuses = trie_map.get_list("consensuses").and_then(|list| {
418                    let parsed = parse_list(list, |bytes| Consensus::from_etf_bin(bytes));
419                    if parsed.is_empty() { None } else { Some(parsed) }
420                });
421
422                tries.push(CatchupHeightReply { height, entries, attestations, consensuses });
423            }
424        }
425
426        Ok(Self { heights: tries })
427    }
428
429    fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
430        let mut m = HashMap::new();
431        m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
432
433        let tries_list: Vec<Term> = self
434            .heights
435            .iter()
436            .map(|trie| {
437                let mut trie_map = HashMap::new();
438                trie_map.insert(Term::Atom(Atom::from("height")), Term::FixInteger((trie.height as i32).into()));
439
440                if let Some(ref entries) = trie.entries {
441                    if let Some(term) = serialize_list(entries, |e| e.pack()) {
442                        trie_map.insert(Term::Atom(Atom::from("entries")), term);
443                    }
444                }
445
446                if let Some(ref attestations) = trie.attestations {
447                    if let Some(term) = serialize_list(attestations, |a| a.to_etf_bin()) {
448                        trie_map.insert(Term::Atom(Atom::from("attestations")), term);
449                    }
450                }
451
452                if let Some(ref consensuses) = trie.consensuses {
453                    if let Some(term) = serialize_list(consensuses, |c| c.to_etf_bin()) {
454                        trie_map.insert(Term::Atom(Atom::from("consensuses")), term);
455                    }
456                }
457
458                Term::Map(Map::from(trie_map))
459            })
460            .collect();
461
462        m.insert(Term::Atom(Atom::from("tries")), Term::List(List::from(tries_list)));
463
464        let etf_term = Term::Map(Map::from(m));
465        Ok(encode_safe(&etf_term))
466    }
467
468    async fn handle(&self, ctx: &Context, src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
469        use tracing::{debug, info};
470
471        let mut instructions = Vec::new();
472        let rooted_tip_height = ctx.fabric.get_rooted_height()?.unwrap_or_default();
473
474        for trie in &self.heights {
475            // Handle entries - insert if height >= rooted_tip_height
476            if let Some(ref entries) = trie.entries {
477                debug!("Received {} entries at height {}", entries.len(), trie.height);
478                for entry in entries {
479                    if entry.header.height >= rooted_tip_height {
480                        instructions.push(Instruction::ReceivedEntry { entry: entry.clone() });
481                    }
482                }
483            }
484
485            // Handle attestations - validate and insert
486            if let Some(ref attestations) = trie.attestations {
487                debug!("Received {} attestations at height {}", attestations.len(), trie.height);
488                for attestation in attestations {
489                    instructions.push(Instruction::ReceivedAttestation { attestation: attestation.clone() });
490                }
491            }
492
493            // Handle consensuses - validate and insert
494            if let Some(ref consensuses) = trie.consensuses {
495                debug!("Received {} consensuses at height {}", consensuses.len(), trie.height);
496                for consensus in consensuses {
497                    instructions.push(Instruction::ReceivedConsensus { consensus: consensus.clone() });
498                }
499            }
500        }
501
502        if !instructions.is_empty() {
503            info!("Processed catchup_reply from {} with {} instructions", src, instructions.len());
504        }
505
506        Ok(instructions)
507    }
508}
509
510#[derive(Debug)]
511pub struct SolicitEntry {
512    pub hash: Vec<u8>,
513}
514
515#[derive(Debug)]
516pub struct SolicitEntry2;
517
518impl Typename for Ping {
519    fn typename(&self) -> &'static str {
520        Self::TYPENAME
521    }
522}
523
524#[async_trait::async_trait]
525impl Protocol for Ping {
526    fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
527        let ts_m = map.get_integer("ts_m").ok_or(Error::BadEtf("ts_m"))?;
528        Ok(Self { ts_m })
529    }
530
531    fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
532        let mut m = HashMap::new();
533        m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
534        m.insert(Term::Atom(Atom::from("ts_m")), Term::from(eetf::BigInteger { value: self.ts_m.into() }));
535        let term = Term::from(Map { map: m });
536        let etf_data = encode_safe(&term);
537        Ok(etf_data)
538    }
539
540    #[instrument(skip(self, ctx), fields(src = %src), name = "Ping::handle")]
541    async fn handle(&self, ctx: &Context, src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
542        ctx.node_peers.update_peer_ping_timestamp(src, self.ts_m).await;
543        Ok(vec![Instruction::SendPingReply { ts_m: self.ts_m, dst: src }])
544    }
545}
546
547impl Ping {
548    pub const TYPENAME: &'static str = "ping";
549
550    /// Create a new Ping with current timestamp (v1.1.7+ simplified format)
551    pub fn new() -> Self {
552        Self { ts_m: get_unix_millis_now() }
553    }
554
555    /// Create Ping with specific timestamp
556    pub fn with_timestamp(ts_m: u64) -> Self {
557        Self { ts_m }
558    }
559}
560
561impl Typename for PingReply {
562    fn typename(&self) -> &'static str {
563        Self::TYPENAME
564    }
565}
566
567#[async_trait::async_trait]
568impl Protocol for PingReply {
569    fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
570        let ts_m = map.get_integer("ts_m").ok_or(Error::BadEtf("ts_m"))?;
571        let seen_time_ms = get_unix_millis_now();
572        // check what else must be validated
573        Ok(Self { ts_m: ts_m, seen_time: seen_time_ms })
574    }
575
576    fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
577        let mut m = HashMap::new();
578        m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
579        m.insert(Term::Atom(Atom::from("ts_m")), Term::from(eetf::BigInteger { value: self.ts_m.into() }));
580        let term = Term::from(Map { map: m });
581        let etf_data = encode_safe(&term);
582        Ok(etf_data)
583    }
584
585    #[instrument(skip(self, ctx), fields(src = %src), name = "PingReply::handle")]
586    async fn handle(&self, ctx: &Context, src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
587        ctx.node_peers.update_peer_from_pong(src, self).await;
588        Ok(vec![Instruction::Noop { why: "pong processed".to_string() }])
589    }
590}
591
592impl PingReply {
593    pub const TYPENAME: &'static str = "ping_reply";
594}
595
596#[derive(Debug)]
597pub struct EventTx {
598    pub valid_txs: Vec<Vec<u8>>,
599}
600
601impl Typename for EventTx {
602    fn typename(&self) -> &'static str {
603        Self::TYPENAME
604    }
605}
606
607#[async_trait::async_trait]
608impl Protocol for EventTx {
609    fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
610        // txs_packed is a list of binary transaction packets, not a single binary
611        let txs_list = map.get_list("txs_packed").ok_or(Error::BadEtf("txs_packed"))?;
612        let valid_txs = EventTx::get_valid_txs_from_list(txs_list)?;
613        Ok(Self { valid_txs })
614    }
615    fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
616        // create list of transaction binaries (txs_packed is directly a list of binaries)
617        let tx_terms: Vec<Term> = self.valid_txs.iter().map(|tx| Term::from(Binary { bytes: tx.clone() })).collect();
618        let mut m = HashMap::new();
619        m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
620        // txs_packed is directly a list of binary terms, not a binary containing an encoded list
621        m.insert(Term::Atom(Atom::from("txs_packed")), Term::from(List { elements: tx_terms }));
622        let term = Term::from(Map { map: m });
623        let etf_data = encode_safe(&term);
624        Ok(etf_data)
625    }
626
627    async fn handle(&self, _ctx: &Context, _src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
628        // TODO: update ETS-like tx pool with valid_txs
629        Ok(vec![Instruction::Noop { why: "event_tx handling not implemented".to_string() }])
630    }
631}
632
633impl EventTx {
634    pub const TYPENAME: &'static str = "event_tx";
635
636    fn get_valid_txs_from_list(txs_list: &[Term]) -> Result<Vec<Vec<u8>>, Error> {
637        let mut good: Vec<Vec<u8>> = Vec::with_capacity(txs_list.len());
638
639        for t in txs_list {
640            // each item must be a binary (a packed transaction)
641            let bin = if let Some(b) = TryAsRef::<Binary>::try_as_ref(t) {
642                b.bytes.as_slice()
643            } else {
644                // skip non-binary entries silently
645                continue;
646            };
647
648            // validate basic tx rules, special-meeting context is false in gossip path
649            match crate::consensus::doms::tx::validate(bin, false) {
650                Ok(_) => good.push(bin.to_vec()),
651                Err(e) => warn!("invalid tx in event_tx: {}", e),
652            }
653        }
654
655        Ok(good)
656    }
657}
658
659#[derive(Debug)]
660pub struct GetPeerAnrs {
661    pub has_peers_b3f4: Vec<[u8; 4]>,
662}
663
664impl Typename for GetPeerAnrs {
665    fn typename(&self) -> &'static str {
666        Self::TYPENAME
667    }
668}
669
670#[async_trait::async_trait]
671impl Protocol for GetPeerAnrs {
672    fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
673        let list = map.get_list("hasPeersb3f4").ok_or(Error::BadEtf("hasPeersb3f4"))?;
674        let mut has_peers_b3f4 = Vec::<[u8; 4]>::new();
675        for t in list {
676            use std::convert::TryInto;
677            let b = t.get_binary().ok_or(Error::BadEtf("hasPeersb3f4"))?;
678            let b3f4 = b.try_into().map_err(|_| Error::BadEtf("hasPeersb3f4_length"))?;
679            has_peers_b3f4.push(b3f4);
680        }
681
682        Ok(Self { has_peers_b3f4 })
683    }
684
685    fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
686        let b3f4_terms: Vec<Term> =
687            self.has_peers_b3f4.iter().map(|b3f4| Term::from(Binary { bytes: b3f4.to_vec() })).collect();
688        let mut m = HashMap::new();
689        m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
690        m.insert(Term::Atom(Atom::from("hasPeersb3f4")), Term::from(List { elements: b3f4_terms }));
691        let term = Term::from(Map { map: m });
692        let etf_data = encode_safe(&term);
693
694        Ok(etf_data)
695    }
696
697    async fn handle(&self, ctx: &Context, src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
698        let anrs = ctx.node_anrs.get_all_excluding_b3f4(&self.has_peers_b3f4).await;
699
700        Ok(vec![Instruction::SendGetPeerAnrsReply { anrs, dst: src }])
701    }
702}
703
704impl GetPeerAnrs {
705    pub const TYPENAME: &'static str = "get_peer_anrs";
706}
707
708#[derive(Debug)]
709pub struct GetPeerAnrsReply {
710    pub anrs: Vec<Anr>,
711}
712
713impl Typename for GetPeerAnrsReply {
714    fn typename(&self) -> &'static str {
715        Self::TYPENAME
716    }
717}
718
719#[async_trait::async_trait]
720impl Protocol for GetPeerAnrsReply {
721    fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
722        let list = map.get_list("anrs").ok_or(Error::BadEtf("anrs"))?;
723        let mut anrs = Vec::new();
724        for term in list {
725            let anr_map = term.get_term_map().ok_or(Error::BadEtf("anr_map"))?;
726            let anr = Anr::from_etf_term_map(anr_map)?;
727            if anr.verify_signature() {
728                anrs.push(anr);
729            }
730        }
731        Ok(Self { anrs })
732    }
733
734    fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
735        let anr_terms: Vec<Term> = self.anrs.iter().map(|anr| anr.to_etf_term()).collect();
736        let mut m = HashMap::new();
737        m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
738        m.insert(Term::Atom(Atom::from("anrs")), Term::from(List { elements: anr_terms }));
739        let term = Term::from(Map { map: m });
740        let etf_data = encode_safe(&term);
741        Ok(etf_data)
742    }
743
744    async fn handle(&self, ctx: &Context, _src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
745        for anr in &self.anrs {
746            ctx.node_anrs.insert(anr.clone()).await;
747        }
748        Ok(vec![Instruction::Noop { why: format!("inserted {} anrs", self.anrs.len()) }])
749    }
750}
751
752impl GetPeerAnrsReply {
753    pub const TYPENAME: &'static str = "get_peer_anrs_reply";
754}
755
756#[derive(Debug)]
757pub struct NewPhoneWhoDis {}
758
759impl Typename for NewPhoneWhoDis {
760    fn typename(&self) -> &'static str {
761        Self::TYPENAME
762    }
763}
764
765#[async_trait::async_trait]
766impl Protocol for NewPhoneWhoDis {
767    fn from_etf_map_validated(_map: TermMap) -> Result<Self, Error> {
768        // v1.1.7+ simplified - no fields to parse
769        Ok(Self {})
770    }
771
772    fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
773        let mut map = TermMap::default();
774        map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
775        // v1.1.7+ - no additional fields
776        Ok(encode_safe(&map.into_term()))
777    }
778
779    #[instrument(skip_all)]
780    async fn handle(&self, _ctx: &Context, src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
781        // v1.1.7+ simplified - respond with NewPhoneWhoDisReply
782        Ok(vec![Instruction::SendNewPhoneWhoDisReply { dst: src }])
783    }
784}
785
786impl NewPhoneWhoDis {
787    pub const TYPENAME: &'static str = "new_phone_who_dis";
788
789    /// Create new NewPhoneWhoDis message (v1.1.7+ simplified)
790    pub fn new() -> Self {
791        Self {}
792    }
793}
794
795#[derive(Debug)]
796pub struct NewPhoneWhoDisReply {
797    pub anr: Anr,
798}
799
800impl Typename for NewPhoneWhoDisReply {
801    fn typename(&self) -> &'static str {
802        Self::TYPENAME
803    }
804}
805
806#[async_trait::async_trait]
807impl Protocol for NewPhoneWhoDisReply {
808    fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
809        let anr_map = map.get_term_map("anr").ok_or(Error::BadEtf("anr"))?;
810        let anr = Anr::from_etf_term_map(anr_map)?;
811
812        if !anr.verify_signature() {
813            return Err(Error::BadEtf("anr_signature_invalid"));
814        }
815
816        Ok(Self { anr })
817    }
818
819    fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
820        let mut map = TermMap::default();
821        map.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
822        map.insert(Term::Atom(Atom::from("anr")), self.anr.to_etf_term());
823        Ok(encode_safe(&map.into_term()))
824    }
825
826    #[instrument(skip(self, ctx), fields(src = %src), name = "NewPhoneWhoDisReply::handle")]
827    async fn handle(&self, ctx: &Context, src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
828        // SECURITY: ip address spoofing protection
829        if src != self.anr.ip4 {
830            warn!("new_phone_who_dis_reply ip mismatched with anr {}", self.anr.ip4);
831            return Err(Error::BadEtf("anr_ip_mismatch"));
832        }
833
834        // Check if ANR timestamp is fresh (within 60 seconds like Elixir)
835        let now = crate::utils::misc::get_unix_secs_now();
836        let age_secs = now.saturating_sub(self.anr.ts);
837        if age_secs > 60 {
838            warn!("new_phone_who_dis_reply ANR too old: {} seconds", age_secs);
839            return Err(Error::BadEtf("anr_too_old"));
840        }
841
842        // Insert ANR and mark as handshaked
843        ctx.node_anrs.insert(self.anr.clone()).await;
844        ctx.node_anrs.set_handshaked(&self.anr.pk).await;
845        ctx.update_peer_from_anr(src, &self.anr.pk, &self.anr.version, Some(HandshakeStatus::Completed)).await;
846
847        Ok(vec![Instruction::Noop { why: "handshake completed".to_string() }])
848    }
849}
850
851impl NewPhoneWhoDisReply {
852    pub const TYPENAME: &'static str = "new_phone_who_dis_reply";
853
854    pub fn new(anr: Anr) -> Self {
855        Self { anr }
856    }
857}
858
859#[derive(Debug)]
860pub struct SpecialBusiness {
861    pub business: Vec<u8>,
862}
863
864impl Typename for SpecialBusiness {
865    fn typename(&self) -> &'static str {
866        Self::TYPENAME
867    }
868}
869
870#[async_trait::async_trait]
871impl Protocol for SpecialBusiness {
872    fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
873        let business = map.get_binary::<Vec<u8>>("business").ok_or(Error::BadEtf("business"))?;
874        Ok(Self { business })
875    }
876
877    fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
878        let mut m = HashMap::new();
879        m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
880        m.insert(Term::Atom(Atom::from("business")), Term::from(Binary { bytes: self.business.clone() }));
881        let term = Term::from(Map { map: m });
882        let etf_data = encode_safe(&term);
883        Ok(etf_data)
884    }
885
886    async fn handle(&self, _ctx: &Context, _src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
887        // TODO: Implement special business handling logic
888        // For now, just pass the business data to the state handler
889        Ok(vec![Instruction::SpecialBusiness { business: self.business.clone() }])
890    }
891}
892
893impl SpecialBusiness {
894    pub const TYPENAME: &'static str = "special_business";
895}
896
897#[derive(Debug)]
898pub struct SpecialBusinessReply {
899    pub business: Vec<u8>,
900}
901
902impl Typename for SpecialBusinessReply {
903    fn typename(&self) -> &'static str {
904        Self::TYPENAME
905    }
906}
907
908#[async_trait::async_trait]
909impl Protocol for SpecialBusinessReply {
910    fn from_etf_map_validated(map: TermMap) -> Result<Self, Error> {
911        let business = map.get_binary::<Vec<u8>>("business").ok_or(Error::BadEtf("business"))?;
912        Ok(Self { business })
913    }
914
915    fn to_etf_bin(&self) -> Result<Vec<u8>, Error> {
916        let mut m = HashMap::new();
917        m.insert(Term::Atom(Atom::from("op")), Term::Atom(Atom::from(Self::TYPENAME)));
918        m.insert(Term::Atom(Atom::from("business")), Term::from(Binary { bytes: self.business.clone() }));
919        let term = Term::from(Map { map: m });
920        let etf_data = encode_safe(&term);
921        Ok(etf_data)
922    }
923
924    async fn handle(&self, _ctx: &Context, _src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
925        // TODO: Implement special business reply handling logic
926        Ok(vec![Instruction::SpecialBusinessReply { business: self.business.clone() }])
927    }
928}
929
930impl SpecialBusinessReply {
931    pub const TYPENAME: &'static str = "special_business_reply";
932}
933
934#[cfg(test)]
935mod tests {
936    use super::*;
937    use crate::config::Config;
938    use crate::consensus::doms::entry::{EntryHeader, EntrySummary};
939    use crate::utils::bls12_381::sign as bls_sign;
940
941    #[tokio::test]
942    async fn test_ping_etf_roundtrip() {
943        // create a sample ping message
944        let _temporal = create_dummy_entry_summary();
945        let _rooted = create_dummy_entry_summary();
946        let ping = Ping::new();
947
948        // serialize to ETF (now compressed by default)
949        let bin = ping.to_etf_bin().expect("should serialize");
950
951        // deserialize back
952        let result = parse_etf_bin(&bin).expect("should deserialize");
953
954        // check that we get the right type
955        assert_eq!(result.typename(), "ping");
956    }
957
958    #[tokio::test]
959    async fn test_pong_etf_roundtrip() {
960        let pong = PingReply { ts_m: 1234567890, seen_time: 9876543210 };
961
962        let bin = pong.to_etf_bin().expect("should serialize");
963        let result = parse_etf_bin(&bin).expect("should deserialize");
964
965        // check that the result type is Pong
966        assert_eq!(result.typename(), "ping_reply");
967    }
968
969    #[tokio::test]
970    async fn test_txpool_etf_roundtrip() {
971        let event_tx = EventTx { valid_txs: vec![vec![1, 2, 3], vec![4, 5, 6]] };
972
973        let bin = event_tx.to_etf_bin().expect("should serialize");
974        let result = parse_etf_bin(&bin).expect("should deserialize");
975
976        assert_eq!(result.typename(), "event_tx");
977    }
978
979    #[tokio::test]
980    async fn test_peers_etf_roundtrip() {
981        let peers = GetPeerAnrs { has_peers_b3f4: vec![[192, 168, 1, 1], [10, 0, 0, 1]] };
982
983        let bin = peers.to_etf_bin().expect("should serialize");
984        let result = parse_etf_bin(&bin).expect("should deserialize");
985
986        assert_eq!(result.typename(), "get_peer_anrs");
987    }
988
989    #[tokio::test]
990    async fn test_new_phone_who_dis_roundtrip_and_handle() {
991        use crate::node::anr;
992        use crate::utils::{bls12_381 as bls, misc::get_unix_secs_now};
993        use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
994
995        // build a valid ANR for 127.0.0.1
996        let sk = bls::generate_sk();
997        let pk = bls::get_public_key(&sk).expect("pk");
998        let pop = bls_sign(&sk, &pk, crate::consensus::DST_POP).expect("pop");
999        let ip = Ipv4Addr::new(127, 0, 0, 1);
1000        let _my_anr = anr::Anr::build(&sk, &pk, &pop, ip, Ver::new(1, 0, 0)).expect("anr");
1001
1002        // construct message
1003        let _challenge = get_unix_secs_now();
1004        let msg = NewPhoneWhoDis::new();
1005
1006        // roundtrip serialize/deserialize
1007        let bin = msg.to_etf_bin().expect("serialize");
1008        let parsed = parse_etf_bin(&bin).expect("deserialize");
1009        assert_eq!(parsed.typename(), "new_phone_who_dis");
1010
1011        // Test that message can be parsed (handle_inner now does state updates directly)
1012        let _src = SocketAddr::V4(SocketAddrV4::new(ip, 36969));
1013        // Protocol handle_inner now manages state internally and returns Noop
1014    }
1015
1016    #[tokio::test]
1017    async fn test_new_phone_who_dis_ip_mismatch_noop() {
1018        use crate::node::anr;
1019        use crate::utils::{bls12_381 as bls, misc::get_unix_secs_now};
1020        use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
1021
1022        let sk = bls::generate_sk();
1023        let pk = bls::get_public_key(&sk).expect("pk");
1024        let pop = bls_sign(&sk, &pk, crate::consensus::DST_POP).expect("pop");
1025        let ip = Ipv4Addr::new(127, 0, 0, 1);
1026        let _my_anr = anr::Anr::build(&sk, &pk, &pop, ip, Ver::new(1, 0, 0)).expect("anr");
1027        let _challenge = get_unix_secs_now();
1028        let _msg = NewPhoneWhoDis::new();
1029
1030        // Test with mismatched source ip - should be handled internally now
1031        let wrong_ip = Ipv4Addr::new(127, 0, 0, 2);
1032        let _src = SocketAddr::V4(SocketAddrV4::new(wrong_ip, 36969));
1033        // Protocol handle_inner now manages validation internally
1034    }
1035
1036    fn create_dummy_entry_summary() -> EntrySummary {
1037        let header = EntryHeader {
1038            height: 1,
1039            slot: 1,
1040            prev_slot: 0,
1041            prev_hash: [0u8; 32],
1042            dr: [1u8; 32],
1043            vr: [2u8; 96],
1044            signer: [3u8; 48],
1045            txs_hash: [4u8; 32],
1046        };
1047
1048        EntrySummary { header, signature: [5u8; 96], mask: None }
1049    }
1050
1051    #[tokio::test]
1052    async fn test_new_phone_who_dis_v1_1_7_format() {
1053        // Test the simplified v1.1.7+ NewPhoneWhoDis format (no ANR, no challenge)
1054        let msg = NewPhoneWhoDis::new();
1055
1056        // Serialize to ETF
1057        let bin = msg.to_etf_bin().expect("serialize NewPhoneWhoDis");
1058
1059        // Deserialize back
1060        let parsed = parse_etf_bin(&bin).expect("deserialize NewPhoneWhoDis");
1061        assert_eq!(parsed.typename(), "new_phone_who_dis");
1062
1063        // Verify it deserializes correctly as NewPhoneWhoDis
1064        if let Ok(_parsed_msg) =
1065            NewPhoneWhoDis::from_etf_map_validated(Term::decode(&bin[..]).expect("decode").get_term_map().expect("map"))
1066        {
1067            // Success - the simplified format works correctly
1068            assert!(true);
1069        } else {
1070            panic!("Failed to deserialize simplified NewPhoneWhoDis");
1071        }
1072    }
1073
1074    #[tokio::test]
1075    async fn test_ping_ts_m_field_validation() {
1076        let ping = Ping::with_timestamp(1234567890);
1077        let valid_bin = ping.to_etf_bin().expect("should serialize");
1078        let result = parse_etf_bin(&valid_bin);
1079        assert!(result.is_ok(), "Valid ping should parse successfully");
1080
1081        let ping_reply = PingReply { ts_m: 1234567890, seen_time: 9876543210 };
1082        let valid_bin = ping_reply.to_etf_bin().expect("should serialize");
1083        let result = parse_etf_bin(&valid_bin);
1084        assert!(result.is_ok(), "Valid ping_reply should parse successfully");
1085    }
1086
1087    #[tokio::test]
1088    async fn test_ping_complete_roundtrip_with_encryption() {
1089        // test creates ping, encrypts it as UDP packet, decrypts it, parses and compares with initial
1090        use crate::node::reassembler::Message;
1091        use crate::utils::bls12_381 as bls;
1092
1093        // create sender and receiver key pairs
1094        let sender_sk = bls::generate_sk();
1095        let sender_pk = bls::get_public_key(&sender_sk).expect("sender pk");
1096
1097        let receiver_sk = bls::generate_sk();
1098        let receiver_pk = bls::get_public_key(&receiver_sk).expect("receiver pk");
1099
1100        // create original ping message
1101        let original_ping = Ping::with_timestamp(1234567890);
1102        let original_payload = original_ping.to_etf_bin().expect("serialize ping");
1103
1104        // compute shared secret for encryption
1105        let shared_secret = bls::get_shared_secret(&receiver_pk, &sender_sk).expect("shared secret");
1106
1107        // encrypt the payload
1108        let version = Ver::new(1, 1, 7);
1109        let encrypted_messages =
1110            Message::encrypt(&sender_pk, &shared_secret, &original_payload, version).expect("encrypt message");
1111
1112        // should be single message for small payload
1113        assert_eq!(encrypted_messages.len(), 1, "should create single encrypted message for small payload");
1114        let encrypted_msg = &encrypted_messages[0];
1115
1116        // convert Message to UDP packet format (MessageV2)
1117        let udp_packet_bytes = encrypted_msg.to_bytes();
1118
1119        // verify UDP packet starts with "AMA" magic
1120        assert_eq!(&udp_packet_bytes[0..3], b"AMA", "UDP packet should start with AMA magic");
1121
1122        // parse UDP packet back to Message (simulating network reception)
1123        let received_encrypted_msg =
1124            Message::try_from(udp_packet_bytes.as_slice()).expect("deserialize encrypted message from UDP packet");
1125
1126        // verify the received message matches the original encrypted message
1127        assert_eq!(received_encrypted_msg.version, encrypted_msg.version);
1128        assert_eq!(received_encrypted_msg.pk, encrypted_msg.pk);
1129        assert_eq!(received_encrypted_msg.shard_index, encrypted_msg.shard_index);
1130        assert_eq!(received_encrypted_msg.shard_total, encrypted_msg.shard_total);
1131        assert_eq!(received_encrypted_msg.ts_nano, encrypted_msg.ts_nano);
1132        assert_eq!(received_encrypted_msg.original_size, encrypted_msg.original_size);
1133        assert_eq!(received_encrypted_msg.payload, encrypted_msg.payload);
1134
1135        // decrypt payload at receiver side
1136        let decrypted_payload = received_encrypted_msg.decrypt(&shared_secret).expect("decrypt payload");
1137
1138        // parse the decrypted payload back to ping
1139        let parsed_proto = parse_etf_bin(&decrypted_payload).expect("parse decrypted payload");
1140
1141        // verify it's a ping with correct typename
1142        assert_eq!(parsed_proto.typename(), "ping");
1143
1144        // parse again to get the actual ping struct for comparison
1145        let decrypted_ping = Ping::from_etf_map_validated(
1146            Term::decode(decrypted_payload.as_slice()).expect("decode").get_term_map().expect("map"),
1147        )
1148        .expect("parse ping");
1149
1150        // compare original and decrypted ping
1151        assert_eq!(original_ping.ts_m, decrypted_ping.ts_m, "ping timestamp should match after roundtrip");
1152    }
1153
1154    #[tokio::test]
1155    async fn test_protocol_send_to() {
1156        // Test that Protocol trait's send_to method works with Context convenience functions
1157        use crate::socket::MockSocket;
1158        use crate::utils::bls12_381 as bls;
1159        use std::net::Ipv4Addr;
1160        use std::sync::Arc;
1161
1162        let sk = bls::generate_sk();
1163        let pk = bls::get_public_key(&sk).expect("pk");
1164        let pop = bls_sign(&sk, &pk, crate::consensus::DST_POP).expect("pop");
1165
1166        let config = Config {
1167            work_folder: "/tmp/test_protocol_send_to".to_string(),
1168            version: Ver::new(1, 2, 3),
1169            offline: false,
1170            http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1171            http_port: 3000,
1172            udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1173            udp_port: 36969,
1174            public_ipv4: Some("127.0.0.1".to_string()),
1175            seed_ips: Vec::new(),
1176            seed_anrs: Vec::new(),
1177            other_nodes: Vec::new(),
1178            trust_factor: 0.8,
1179            max_peers: 100,
1180            trainer_sk: sk,
1181            trainer_pk: pk,
1182            trainer_pk_b58: String::new(),
1183            trainer_pop: pop.to_vec(),
1184            archival_node: false,
1185            autoupdate: false,
1186            computor_type: None,
1187            snapshot_height: 0,
1188            anr: None,
1189            anr_desc: None,
1190            anr_name: None,
1191        };
1192
1193        let dummy_socket = Arc::new(MockSocket::new());
1194        let target: Ipv4Addr = "127.0.0.1".parse().unwrap();
1195
1196        match Context::with_config_and_socket(config, dummy_socket).await {
1197            Ok(ctx) => {
1198                // Create a Pong message to test with
1199                let pong = PingReply { ts_m: 12345, seen_time: 67890 };
1200
1201                // Check metrics before sending
1202                let metrics_json_before = ctx.metrics.get_json();
1203                let sent_before = metrics_json_before.get("outgoing_protos");
1204
1205                // Test Protocol::send_to method - should return error with MockSocket but not panic
1206                match pong.send_to_with_metrics(&ctx, target).await {
1207                    Ok(_) => {
1208                        // unexpected success with MockSocket
1209                    }
1210                    Err(_) => {
1211                        // expected error with MockSocket - the important thing is that it compiled and didn't panic
1212                    }
1213                }
1214
1215                // Check that sent packet counter was incremented even when send fails
1216                let metrics_json_after = ctx.metrics.get_json();
1217                let sent_after = metrics_json_after.get("outgoing_protos").unwrap().as_object().unwrap();
1218                match sent_before {
1219                    Some(obj) => {
1220                        let sent_before_obj = obj.as_object().unwrap();
1221                        let pong_before = sent_before_obj.get("ping_reply").map(|v| v.as_u64().unwrap()).unwrap_or(0);
1222                        let pong_after = sent_after.get("ping_reply").map(|v| v.as_u64().unwrap()).unwrap_or(0);
1223                        assert_eq!(
1224                            pong_after,
1225                            pong_before + 1,
1226                            "Sent packet counter should increment even on send failure"
1227                        );
1228                    }
1229                    None => {
1230                        // no sent packets before, should have 1 pong now
1231                        assert_eq!(sent_after.get("ping_reply").unwrap().as_u64().unwrap(), 1);
1232                    }
1233                }
1234            }
1235            Err(_) => {
1236                // context creation failed - this is acceptable for this test
1237            }
1238        }
1239    }
1240
1241    #[tokio::test]
1242    async fn test_catchup_and_catchup_reply_etf_roundtrip() {
1243        // Test Catchup
1244        let height_flag = CatchupHeight {
1245            height: 42,
1246            c: Some(true),
1247            e: None,
1248            a: Some(true),
1249            hashes: Some(vec![vec![1, 2, 3], vec![4, 5, 6]]),
1250        };
1251
1252        let catchup = Catchup { heights: vec![height_flag] };
1253
1254        // Test Catchup serialization
1255        let catchup_bin = catchup.to_etf_bin().expect("should serialize catchup");
1256        let parsed_catchup = parse_etf_bin(&catchup_bin).expect("should deserialize catchup");
1257        assert_eq!(parsed_catchup.typename(), "catchup");
1258
1259        // Test CatchupReply with actual structs
1260        let entry1 = Entry {
1261            hash: [1; 32],
1262            header: EntryHeader {
1263                height: 100,
1264                slot: 1,
1265                prev_slot: 0,
1266                prev_hash: [0; 32],
1267                dr: [2; 32],
1268                vr: [0; 96],
1269                signer: [3; 48],
1270                txs_hash: [4; 32],
1271            },
1272            signature: [5; 96],
1273            mask: None,
1274            txs: vec![vec![1, 2, 3, 4]],
1275        };
1276
1277        let attestation1 =
1278            Attestation { entry_hash: [6; 32], mutations_hash: [7; 32], signer: [8; 48], signature: [9; 96] };
1279
1280        let consensus1 = Consensus {
1281            entry_hash: [10; 32],
1282            mutations_hash: [11; 32],
1283            mask: Some(vec![true, false, true]),
1284            agg_sig: [12; 96],
1285            score: Some(0.95),
1286        };
1287
1288        let trie1 = CatchupHeightReply {
1289            height: 100,
1290            entries: Some(vec![entry1]),
1291            attestations: Some(vec![attestation1]),
1292            consensuses: None,
1293        };
1294        let trie2 =
1295            CatchupHeightReply { height: 101, entries: None, attestations: None, consensuses: Some(vec![consensus1]) };
1296        let catchup_reply = CatchupReply { heights: vec![trie1, trie2] };
1297
1298        // Test CatchupReply serialization
1299        let reply_bin = catchup_reply.to_etf_bin().expect("should serialize catchup_reply");
1300        let parsed_reply = parse_etf_bin(&reply_bin).expect("should deserialize catchup_reply");
1301        assert_eq!(parsed_reply.typename(), "catchup_reply");
1302    }
1303}