amadeus_node/
context.rs

1use crate::node::anr::{Anr, NodeAnrs};
2use crate::node::peers::{HandshakeStatus, PeersSummary};
3use crate::node::protocol::*;
4use crate::node::protocol::{Catchup, CatchupHeight, Instruction, NewPhoneWhoDis, NewPhoneWhoDisReply, Typename};
5use crate::node::{anr, peers};
6use crate::socket::UdpSocketExt;
7use crate::utils::misc::format_duration;
8use crate::utils::{Hash, PublicKey};
9use crate::{SystemStats, Ver, config, consensus, get_system_stats, metrics, node, utils};
10use amadeus_utils::vecpak;
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use std::net::{Ipv4Addr, SocketAddr};
14use std::sync::Arc;
15use tracing::{debug, info, instrument, warn};
16
17/// Softfork status based on temporal-rooted height gap
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
19#[serde(rename_all = "lowercase")]
20pub enum SoftforkStatus {
21    /// No fork, healthy state (gap 0 or 1)
22    #[serde(rename = "")]
23    Healthy,
24    /// Minor fork (gap 2-10, may auto-resolve)
25    Minor,
26    /// Major fork (gap > 10, manual intervention needed)
27    Major,
28}
29
30#[derive(Debug, thiserror::Error, strum_macros::IntoStaticStr)]
31pub enum Error {
32    #[error(transparent)]
33    Io(#[from] std::io::Error),
34    #[error(transparent)]
35    Fabric(#[from] consensus::fabric::Error),
36    #[error(transparent)]
37    Archiver(#[from] utils::archiver::Error),
38    #[error(transparent)]
39    Protocol(#[from] node::protocol::Error),
40    #[error(transparent)]
41    Config(#[from] config::Error),
42    #[error(transparent)]
43    Anr(#[from] anr::Error),
44    #[error(transparent)]
45    Peers(#[from] peers::Error),
46    #[error("other error: {0}")]
47    Other(String),
48}
49
50impl Typename for Error {
51    fn typename(&self) -> &'static str {
52        self.into()
53    }
54}
55
56pub struct Context {
57    pub(crate) config: config::Config,
58    pub(crate) metrics: metrics::Metrics,
59    pub(crate) reassembler: node::ReedSolomonReassembler,
60    pub(crate) peers: peers::NodePeers,
61    pub(crate) anrs: NodeAnrs,
62    pub(crate) fabric: crate::consensus::fabric::Fabric,
63    pub(crate) socket: Arc<dyn UdpSocketExt>,
64}
65
66impl Context {
67    pub async fn with_config_and_socket(
68        config: config::Config,
69        socket: Arc<dyn UdpSocketExt>,
70    ) -> Result<Arc<Self>, Error> {
71        use crate::config::{
72            ANR_PERIOD_MILLIS, BROADCAST_PERIOD_MILLIS, CATCHUP_PERIOD_MILLIS, CLEANUP_PERIOD_MILLIS,
73            CONSENSUS_PERIOD_MILLIS,
74        };
75        use crate::consensus::fabric::Fabric;
76        use crate::utils::archiver::init_storage;
77        use metrics::Metrics;
78        use node::ReedSolomonReassembler;
79        use tokio::time::{Duration, interval};
80
81        assert_ne!(config.get_root(), "");
82        init_storage(&config.get_root()).await?;
83
84        let fabric = Fabric::new(&config.get_root()).await?;
85        let metrics = Metrics::new();
86        let node_peers = peers::NodePeers::default();
87        let node_anrs = NodeAnrs::new();
88        let reassembler = ReedSolomonReassembler::new();
89
90        node_anrs.seed(&config).await; // must be done before node_peers.seed()
91        node_peers.seed(&fabric, &config, &node_anrs).await?;
92
93        let ctx = Arc::new(Self { config, metrics, reassembler, peers: node_peers, anrs: node_anrs, fabric, socket });
94
95        tokio::spawn({
96            let ctx = ctx.clone();
97            async move {
98                if let Err(e) = ctx.bootstrap_task().await {
99                    warn!("bootstrap task error: {e}");
100                    // ctx.metrics.add_error(&e);
101                }
102            }
103        });
104
105        tokio::spawn({
106            let ctx = ctx.clone();
107            async move {
108                let mut ticker = interval(Duration::from_millis(CLEANUP_PERIOD_MILLIS));
109                loop {
110                    ticker.tick().await;
111                    ctx.cleanup_task().await;
112                }
113            }
114        });
115
116        tokio::spawn({
117            let ctx = ctx.clone();
118            async move {
119                let mut ticker = interval(Duration::from_millis(ANR_PERIOD_MILLIS));
120                ticker.tick().await;
121                loop {
122                    ticker.tick().await;
123                    if let Err(e) = ctx.anr_task().await {
124                        warn!("anr task error: {e}");
125                        // ctx.metrics.add_error(&e);
126                    }
127                }
128            }
129        });
130
131        tokio::spawn({
132            let ctx = ctx.clone();
133            async move {
134                let mut ticker = interval(Duration::from_millis(BROADCAST_PERIOD_MILLIS));
135                loop {
136                    ticker.tick().await;
137                    if let Err(e) = ctx.broadcast_task().await {
138                        // broadcast errors are expected when starting from scratch
139                        warn!("broadcast task error: {e}");
140                        //ctx.metrics.add_error(&e);
141                    }
142                }
143            }
144        });
145
146        tokio::spawn({
147            let ctx = ctx.clone();
148            async move {
149                let mut ticker = interval(Duration::from_millis(CONSENSUS_PERIOD_MILLIS));
150                loop {
151                    ticker.tick().await;
152                    if let Err(e) = ctx.consensus_task().await {
153                        warn!("consensus task error: {e}");
154                    }
155                }
156            }
157        });
158
159        tokio::spawn({
160            let ctx = ctx.clone();
161            async move {
162                let mut is_syncing = false;
163                loop {
164                    let tick_ms = if is_syncing { 30 } else { CATCHUP_PERIOD_MILLIS };
165                    tokio::time::sleep(Duration::from_millis(tick_ms)).await;
166
167                    match ctx.catchup_task().await {
168                        Ok(syncing) => is_syncing = syncing,
169                        Err(e) => warn!("catchup task error: {e}"),
170                    }
171                }
172            }
173        });
174
175        tokio::spawn({
176            let ctx = ctx.clone();
177            async move {
178                let mut ticker = interval(Duration::from_millis(600_000));
179                loop {
180                    ticker.tick().await;
181                    if let Err(e) = ctx.autoupdate_task().await {
182                        warn!("autoupdate task error: {e}");
183                        // ctx.metrics.add_error(&e);
184                    }
185                }
186            }
187        });
188
189        Ok(ctx)
190    }
191
192    #[instrument(skip(self), name = "bootstrap_task")]
193    async fn bootstrap_task(&self) -> Result<(), Error> {
194        let new_phone_who_dis = Protocol::NewPhoneWhoDis(NewPhoneWhoDis::new());
195
196        for ip in &self.config.seed_ips {
197            // Prefer encrypted handshake (requires ANR); if it fails, log and continue without aborting.
198            match new_phone_who_dis.send_to_with_metrics(self, *ip).await {
199                Ok(_) => {
200                    debug!("sent encrypted new_phone_who_dis to seed {ip}");
201                }
202                Err(e) => {
203                    // Handle gracefully: tests may run without seeded ANRs. Do not fail the whole task.
204                    warn!("failed to send encrypted new_phone_who_dis to seed {ip}: {e}");
205                }
206            }
207            // Mark handshake as initiated regardless of send outcome to reflect intent to connect.
208            self.peers.set_handshake_status(*ip, HandshakeStatus::Initiated).await?;
209        }
210
211        info!("sent new_phone_who_dis to {} seed nodes", self.config.seed_ips.len());
212        Ok(())
213    }
214
215    #[instrument(skip(self), name = "cleanup_task")]
216    async fn cleanup_task(&self) {
217        self.anrs.update_rate_limiting_counters().await;
218        let cleared_shards = self.reassembler.clear_stale().await;
219        let cleared_peers = self.peers.clear_stale(&self.fabric, &self.anrs).await;
220        if cleared_shards > 0 || cleared_peers > 0 {
221            debug!("cleared {} stale shards, {} stale peers", cleared_shards, cleared_peers);
222        }
223    }
224
225    #[instrument(skip(self), name = "anr_task")]
226    async fn anr_task(&self) -> Result<(), Error> {
227        let unverified_ips = self.anrs.get_random_not_verified(3).await;
228        if !unverified_ips.is_empty() {
229            let new_phone_who_dis = Protocol::NewPhoneWhoDis(NewPhoneWhoDis::new());
230            for ip in &unverified_ips {
231                new_phone_who_dis.send_to_with_metrics(self, *ip).await?;
232                self.peers.set_handshake_status(*ip, HandshakeStatus::Initiated).await?;
233            }
234        }
235
236        let verified_ips = self.anrs.get_random_verified(3).await;
237        if !verified_ips.is_empty() {
238            let get_peer_anrs = Protocol::GetPeerAnrs(GetPeerAnrs::new(self.anrs.get_all_b3f4().await));
239            for ip in &verified_ips {
240                self.send_message_to(&get_peer_anrs, *ip).await?;
241            }
242        }
243
244        info!("sent new_phone_who_dis to {} and get_peer_anrs to {} nodes", unverified_ips.len(), verified_ips.len());
245
246        Ok(())
247    }
248
249    #[instrument(skip(self), name = "broadcast_task")]
250    async fn broadcast_task(&self) -> Result<(), Error> {
251        let ping = Protocol::Ping(Ping::new());
252        let tip = Protocol::EventTip(EventTip::from_current_tips_db(&self.fabric)?);
253
254        let my_ip = self.config.get_public_ipv4();
255        let peers = self.peers.get_all().await?;
256        if !peers.is_empty() {
257            let mut sent_count = 0;
258            for peer in peers {
259                if peer.ip != my_ip {
260                    self.send_message_to(&ping, peer.ip).await?;
261                    self.send_message_to(&tip, peer.ip).await?;
262                    sent_count += 1;
263                }
264            }
265
266            debug!("sent {sent_count} ping and tip messages");
267        }
268
269        Ok(())
270    }
271
272    #[instrument(skip(self), name = "autoupdate_task")]
273    async fn autoupdate_task(&self) -> Result<(), Error> {
274        // TODO: check if updates are available, then download and verify update signature and
275        // apply set the flag to make the node restart after it's slot
276        Ok(())
277    }
278
279    #[instrument(skip(self), name = "consensus_task")]
280    async fn consensus_task(&self) -> Result<(), Error> {
281        use consensus::consensus::{proc_consensus, proc_entries};
282        self.fabric.start_proc_consensus();
283        if let Err(e) = proc_entries(&self.fabric, &self.config, self).await {
284            warn!("proc_entries failed: {e}");
285        }
286
287        if let Err(e) = proc_consensus(&self.fabric) {
288            warn!("proc_consensus failed: {e}");
289        }
290        self.fabric.stop_proc_consensus();
291        Ok(())
292    }
293
294    #[instrument(skip(self), name = "catchup_task")]
295    async fn catchup_task(&self) -> Result<bool, Error> {
296        if self.fabric.is_proc_consensus() {
297            return Ok(true);
298        }
299
300        let temporal_height = match self.fabric.get_temporal_height() {
301            Ok(Some(h)) => h,
302            Ok(None) => 0,
303            Err(e) => return Err(e.into()),
304        };
305        let rooted_height = self.fabric.get_rooted_height()?.unwrap_or_default();
306        info!("Temporal: {} Rooted: {}", temporal_height, rooted_height);
307
308        let trainer_pks = self.fabric.trainers_for_height(temporal_height).unwrap_or_default();
309        let (peers_temporal, peers_rooted, peers_bft) = self.peers.get_heights(&trainer_pks).await?;
310
311        let behind_temporal = peers_temporal.saturating_sub(temporal_height);
312        let behind_rooted = peers_rooted.saturating_sub(rooted_height);
313        let behind_bft = peers_bft.saturating_sub(temporal_height);
314        let rooting_stuck = temporal_height.saturating_sub(rooted_height) > 1000;
315
316        if rooting_stuck {
317            warn!(
318                "Stopped syncing: getting {} consensuses starting {}",
319                temporal_height - rooted_height,
320                rooted_height + 1
321            );
322            let online_trainer_ips = self.peers.get_trainer_ips_above_temporal(temporal_height, &trainer_pks).await?;
323            let heights: Vec<u64> = (rooted_height + 1..=temporal_height).take(200).collect();
324            let chunks: Vec<Vec<CatchupHeight>> = heights
325                .into_iter()
326                .map(|height| CatchupHeight { height, c: Some(true), e: None, a: None, hashes: None })
327                .collect::<Vec<_>>()
328                .chunks(20)
329                .map(|chunk| chunk.to_vec())
330                .collect();
331            self.fetch_heights(chunks, online_trainer_ips).await?;
332            return Ok(false);
333        }
334
335        if behind_bft > 0 {
336            info!("Behind BFT: Syncing {} entries", behind_bft);
337            let online_trainer_ips = self.peers.get_trainer_ips_above_temporal(peers_bft, &trainer_pks).await?;
338            let heights: Vec<u64> = (rooted_height + 1..=peers_bft).take(200).collect();
339            let chunks: Vec<Vec<CatchupHeight>> = heights
340                .into_iter()
341                .map(|height| CatchupHeight { height, c: Some(true), e: Some(true), a: None, hashes: None })
342                .collect::<Vec<_>>()
343                .chunks(20)
344                .map(|chunk| chunk.to_vec())
345                .collect();
346            self.fetch_heights(chunks, online_trainer_ips).await?;
347            return Ok(false);
348        }
349
350        if behind_rooted > 0 {
351            info!("Behind rooted: Syncing {} entries", behind_rooted);
352            let online_trainer_ips = self.peers.get_trainer_ips_above_rooted(peers_rooted, &trainer_pks).await?;
353            let heights: Vec<u64> = (rooted_height + 1..=peers_rooted).take(200).collect();
354            let chunks: Vec<Vec<CatchupHeight>> = heights
355                .into_iter()
356                .map(|height| {
357                    let entries = self.fabric.entries_by_height(height).unwrap_or_default();
358                    let hashes = entries;
359                    CatchupHeight { height, c: Some(true), e: Some(true), a: None, hashes: Some(hashes) }
360                })
361                .collect::<Vec<_>>()
362                .chunks(20)
363                .map(|chunk| chunk.to_vec())
364                .collect();
365            self.fetch_heights(chunks, online_trainer_ips).await?;
366            return Ok(false);
367        }
368
369        if behind_temporal > 0 {
370            info!("Behind temporal: Syncing {} entries", behind_temporal);
371            let online_trainer_ips = self.peers.get_trainer_ips_above_temporal(peers_temporal, &trainer_pks).await?;
372            let heights: Vec<u64> = (temporal_height..=peers_temporal).take(200).collect();
373            let chunks: Vec<Vec<CatchupHeight>> = heights
374                .into_iter()
375                .map(|height| {
376                    let entries = self.fabric.entries_by_height(height).unwrap_or_default();
377                    let hashes = entries;
378                    CatchupHeight { height, c: None, e: Some(true), a: Some(true), hashes: Some(hashes) }
379                })
380                .collect::<Vec<_>>()
381                .chunks(20)
382                .map(|chunk| chunk.to_vec())
383                .collect();
384            self.fetch_heights(chunks, online_trainer_ips).await?;
385            return Ok(false);
386        }
387
388        if behind_temporal == 0 {
389            info!("In sync: Fetching attestations for last entry");
390            let online_trainer_ips = self.peers.get_trainer_ips_above_temporal(peers_temporal, &trainer_pks).await?;
391            let entries = self.fabric.entries_by_height(temporal_height).unwrap_or_default();
392            let hashes = entries;
393            let chunk = vec![CatchupHeight {
394                height: temporal_height,
395                c: None,
396                e: Some(true),
397                a: Some(true),
398                hashes: Some(hashes),
399            }];
400            self.fetch_heights(vec![chunk], online_trainer_ips).await?;
401        }
402
403        Ok(behind_temporal > 0 || behind_rooted > 0 || behind_bft > 0 || rooting_stuck)
404    }
405
406    async fn fetch_heights(
407        &self,
408        chunks: Vec<Vec<CatchupHeight>>,
409        peers: Vec<std::net::Ipv4Addr>,
410    ) -> Result<(), Error> {
411        use rand::seq::SliceRandom;
412        let mut shuffled_peers = peers;
413        {
414            let mut rng = rand::rng();
415            shuffled_peers.shuffle(&mut rng);
416        }
417
418        for (chunk, peer_ip) in chunks.into_iter().zip(shuffled_peers.into_iter().cycle()) {
419            Protocol::Catchup(Catchup { height_flags: chunk }).send_to_with_metrics(self, peer_ip).await?;
420        }
421        Ok(())
422    }
423
424    pub fn get_prometheus_metrics(&self) -> String {
425        self.metrics.get_prometheus()
426    }
427
428    pub fn get_json_health(&self) -> Value {
429        serde_json::json!({
430            "status": "ok",
431            "version": env!("CARGO_PKG_VERSION"),
432            "uptime": self.metrics.get_uptime()
433        })
434    }
435
436    /// Convenience function to send UDP data with metrics tracking
437    pub async fn send_message_to(&self, message: &Protocol, dst: Ipv4Addr) -> Result<(), Error> {
438        message.send_to_with_metrics(self, dst).await.map_err(Into::into)
439    }
440
441    /// Convenience function to receive UDP data with metrics tracking
442    pub async fn recv_from(&self, buf: &mut [u8]) -> std::io::Result<(usize, SocketAddr)> {
443        self.socket.recv_from_with_metrics(buf, &self.metrics).await
444    }
445
446    pub async fn is_peer_handshaked(&self, ip: Ipv4Addr) -> bool {
447        if let Some(peer) = self.peers.by_ip(ip).await {
448            if let Some(ref pk) = peer.pk {
449                if self.anrs.is_handshaked(pk.as_ref()).await {
450                    return true;
451                }
452            }
453        }
454        false
455    }
456
457    pub async fn get_peers_summary(&self) -> Result<PeersSummary, Error> {
458        let my_ip = self.config.get_public_ipv4();
459        let temporal_height = self.get_temporal_height();
460        let trainer_pks = self.fabric.trainers_for_height(temporal_height + 1).unwrap_or_default();
461        self.peers.get_peers_summary(my_ip, &trainer_pks).await.map_err(Into::into)
462    }
463
464    pub fn get_softfork_status(&self) -> SoftforkStatus {
465        let temporal_height = self.get_temporal_height();
466        let rooted_height = self.get_rooted_height();
467        let gap = temporal_height.saturating_sub(rooted_height);
468
469        match gap {
470            0 | 1 => SoftforkStatus::Healthy,
471            2..=10 => SoftforkStatus::Minor,
472            _ => SoftforkStatus::Major,
473        }
474    }
475
476    pub fn get_config(&self) -> &config::Config {
477        &self.config
478    }
479
480    pub fn get_socket(&self) -> Arc<dyn UdpSocketExt> {
481        self.socket.clone()
482    }
483
484    pub fn get_metrics(&self) -> &metrics::Metrics {
485        &self.metrics
486    }
487
488    pub fn get_metrics_snapshot(&self) -> metrics::MetricsSnapshot {
489        self.metrics.get_snapshot()
490    }
491
492    pub fn get_system_stats(&self) -> SystemStats {
493        get_system_stats()
494    }
495
496    pub fn get_uptime(&self) -> String {
497        format_duration(self.metrics.get_uptime())
498    }
499
500    /// Add a task to the metrics tracker
501    pub fn inc_tasks(&self) {
502        self.metrics.inc_tasks();
503    }
504
505    /// Remove a task from the metrics tracker
506    pub fn dec_tasks(&self) {
507        self.metrics.dec_tasks();
508    }
509
510    /// Get temporal height from fabric
511    pub fn get_temporal_height(&self) -> u64 {
512        self.fabric.get_temporal_height().ok().flatten().unwrap_or_default() as u64
513    }
514
515    /// Get rooted height from fabric
516    pub fn get_rooted_height(&self) -> u64 {
517        self.fabric.get_rooted_height().ok().flatten().unwrap_or_default() as u64
518    }
519
520    pub async fn get_entries(&self) -> Vec<(u64, u64, u64)> {
521        // Try to get real archived entries, fallback to sample data if it fails
522        tokio::task::spawn_blocking(|| {
523            tokio::runtime::Handle::current().block_on(async {
524                consensus::doms::entry::get_archived_entries().await.unwrap_or_else(|_| {
525                    // Fallback to sample data if archiver fails
526                    vec![
527                        (201, 20100123, 1024), // (epoch, height, size_bytes)
528                        (201, 20100456, 2048),
529                        (202, 20200789, 1536),
530                        (202, 20201012, 3072),
531                        (203, 20300345, 2560),
532                    ]
533                })
534            })
535        })
536        .await
537        .unwrap_or_else(|_| {
538            // Fallback if spawn_blocking fails
539            vec![
540                (201, 20100123, 1024),
541                (201, 20100456, 2048),
542                (202, 20200789, 1536),
543                (202, 20201012, 3072),
544                (203, 20300345, 2560),
545            ]
546        })
547    }
548
549    /// Set handshake status for a peer by IP address
550    pub async fn set_peer_handshake_status(&self, ip: Ipv4Addr, status: HandshakeStatus) -> Result<(), peers::Error> {
551        self.peers.set_handshake_status(ip, status).await
552    }
553
554    /// Update peer information from ANR data
555    pub async fn update_peer_from_anr(
556        &self,
557        ip: Ipv4Addr,
558        pk: &PublicKey,
559        version: &Ver,
560        status: Option<HandshakeStatus>,
561    ) {
562        self.peers.update_peer_from_anr(ip, pk, version, status).await
563    }
564
565    /// Get all ANRs
566    pub async fn get_all_anrs(&self) -> Vec<anr::Anr> {
567        self.anrs.get_all().await
568    }
569
570    /// Get ANR by public key (Base58 encoded)
571    pub async fn get_anr_by_pk_b58(&self, pk_b58: &str) -> Option<anr::Anr> {
572        if let Ok(pk_bytes) = bs58::decode(pk_b58).into_vec() {
573            let pk_array: Result<[u8; 48], _> = pk_bytes.try_into();
574            if let Ok(pk_array) = pk_array {
575                let pk = PublicKey::from(pk_array);
576                return self.get_anr_by_pk(&pk).await;
577            }
578        }
579        None
580    }
581
582    /// Get ANR by public key bytes
583    pub async fn get_anr_by_pk(&self, pk: &PublicKey) -> Option<anr::Anr> {
584        let all_anrs = self.anrs.get_all().await;
585        all_anrs.into_iter().find(|anr| &anr.pk == pk)
586    }
587
588    /// Get all handshaked ANRs (validators)
589    pub async fn get_validator_anrs(&self) -> Vec<anr::Anr> {
590        let all_anrs = self.anrs.get_all().await;
591        all_anrs.into_iter().filter(|anr| anr.handshaked).collect()
592    }
593
594    /// Get entries by height from fabric
595    pub fn get_entries_by_height(&self, height: u64) -> Result<Vec<consensus::doms::entry::Entry>, Error> {
596        let entries_raw = self.fabric.entries_by_height(height)?;
597        entries_raw
598            .into_iter()
599            .map(|raw| {
600                consensus::doms::entry::Entry::from_vecpak_bin(&raw)
601                    .map_err(|_| Error::Other("entry not vecpak in fabric".into()))
602            })
603            .collect()
604    }
605
606    /// Get temporal entry from fabric
607    pub fn get_temporal_entry(&self) -> Result<Option<consensus::doms::entry::Entry>, Error> {
608        self.fabric.get_temporal_entry().map_err(Into::into)
609    }
610
611    /// Get trainers for height
612    pub fn get_trainers_for_height(&self, height: u64) -> Option<Vec<PublicKey>> {
613        self.fabric.trainers_for_height(height)
614    }
615
616    /// Get wallet balance - wrapper around fabric.chain_balance_symbol
617    pub fn get_wallet_balance(&self, public_key: &PublicKey, symbol: &[u8]) -> i128 {
618        self.fabric.chain_balance_symbol(public_key.as_ref(), symbol)
619    }
620
621    /// Get contract state from CF_CONTRACTSTATE
622    pub fn get_contract_state(&self, contract: &PublicKey, key: &[u8]) -> Option<Vec<u8>> {
623        use amadeus_utils::constants::CF_CONTRACTSTATE;
624        let full_key = [b"bic:contract:" as &[u8], contract.as_ref(), b":" as &[u8], key].concat();
625        self.fabric.db().get(CF_CONTRACTSTATE, &full_key).ok().flatten()
626    }
627
628    /// Get chain difficulty bits
629    pub fn get_chain_diff_bits(&self) -> u32 {
630        self.fabric.chain_diff_bits() as u32
631    }
632
633    /// Get total sols
634    pub fn get_chain_total_sols(&self) -> i128 {
635        self.fabric.chain_total_sols() as i128
636    }
637
638    /// Get all balances for a wallet using prefix scan
639    pub fn get_all_wallet_balances(&self, public_key: &PublicKey) -> Vec<(Vec<u8>, i128)> {
640        use amadeus_utils::constants::CF_CONTRACTSTATE;
641        let prefix = [b"account:" as &[u8], public_key.as_ref() as &[u8], b":balance:" as &[u8]].concat();
642        self.fabric
643            .db()
644            .iter_prefix(CF_CONTRACTSTATE, &prefix)
645            .unwrap_or_default()
646            .into_iter()
647            .filter_map(|(key, value)| {
648                if key.len() <= prefix.len() {
649                    return None;
650                }
651                let symbol = key[prefix.len()..].to_vec();
652                let amount = std::str::from_utf8(&value).ok()?.parse::<i128>().ok()?;
653                if amount > 0 { Some((symbol, amount)) } else { None }
654            })
655            .collect()
656    }
657
658    /// Get entry by hash
659    pub fn get_entry_by_hash(&self, hash: &Hash) -> Option<consensus::doms::entry::Entry> {
660        self.fabric.get_entry_by_hash(hash)
661    }
662
663    /// Get value from database column family
664    pub fn db_get(&self, cf: &str, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
665        self.fabric.db().get(cf, key).map_err(|e| Error::Other(e.to_string()))
666    }
667
668    /// Iterate over keys with prefix in database column family
669    pub fn db_iter_prefix(&self, cf: &str, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Error> {
670        self.fabric.db().iter_prefix(cf, prefix).map_err(|e| Error::Other(e.to_string()))
671    }
672
673    /// Reads UDP datagram and silently does parsing, validation and reassembly
674    /// If the protocol message is complete, returns Some(Protocol)
675    pub async fn parse_udp(&self, buf: &[u8], src: Ipv4Addr) -> Option<Protocol> {
676        self.metrics.add_incoming_udp_packet(buf.len());
677
678        if !self.anrs.is_within_udp_limit(src).await? {
679            return None; // peer sends too many UDP packets
680        }
681
682        // Process encrypted message shards
683        match self.reassembler.add_shard(buf, &self.config.get_sk()).await {
684            Ok(Some((packet, pk))) => {
685                match vecpak::from_slice::<Protocol>(&packet) {
686                    Ok(proto) => {
687                        self.peers.update_peer_from_proto(src, proto.typename()).await;
688                        let is_allowed =
689                            matches!(proto, Protocol::NewPhoneWhoDis(_) | Protocol::NewPhoneWhoDisReply(_))
690                                || self.anrs.handshaked_and_valid_ip4(pk.as_ref(), &src).await;
691
692                        if !is_allowed {
693                            self.anrs.unset_handshaked(pk.as_ref()).await;
694                            warn!("handshake needed {src}");
695                            return None; // neither handshake message nor handshaked peer
696                        }
697
698                        if !self.anrs.is_within_proto_limit(pk.as_ref(), proto.typename()).await? {
699                            return None; // peer sends too many proto requests
700                        }
701
702                        self.metrics.add_incoming_proto(proto.typename());
703                        return Some(proto);
704                    }
705                    Err(e) => warn!("parse error: {e} {}", hex::encode(packet)),
706                }
707            }
708            Ok(None) => {} // waiting for more shards, not an error
709            Err(e) => warn!("bad udp frame from {src} - {e}"),
710        }
711
712        None
713    }
714
715    pub async fn handle(&self, message: Protocol, src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
716        self.metrics.add_incoming_proto(message.typename());
717        message.handle(self, src).await.map_err(|e| {
718            warn!("can't handle {}: {e}", message.typename());
719            e.into()
720        })
721    }
722
723    pub async fn execute(&self, instruction: Instruction) -> Result<(), Error> {
724        let name = instruction.typename();
725        self.execute_inner(instruction).await.inspect_err(|e| warn!("can't execute {name}: {e}"))
726    }
727
728    /// Handle instruction processing following the Elixir reference implementation
729    pub async fn execute_inner(&self, instruction: Instruction) -> Result<(), Error> {
730        match instruction {
731            Instruction::Noop { why } => {
732                debug!("noop: {why}");
733            }
734
735            Instruction::SendNewPhoneWhoDisReply { dst } => {
736                let anr = Anr::from_config(&self.config)?;
737                let reply = Protocol::NewPhoneWhoDisReply(NewPhoneWhoDisReply::new(anr));
738                self.send_message_to(&reply, dst).await?;
739            }
740
741            Instruction::SendGetPeerAnrsReply { dst, anrs } => {
742                let peers_v2 = Protocol::GetPeerAnrsReply(GetPeerAnrsReply::new(anrs));
743                self.send_message_to(&peers_v2, dst).await?;
744            }
745
746            Instruction::SendPingReply { ts_m, dst } => {
747                let pong = Protocol::PingReply(PingReply::new(ts_m));
748                self.send_message_to(&pong, dst).await?;
749            }
750
751            Instruction::ValidTxs { txs } => {
752                // Insert valid transactions into tx pool
753                info!("received {} valid transactions", txs.len());
754                // TODO: implement TXPool.insert(txs) equivalent
755            }
756
757            Instruction::ReceivedEntry { entry: _ } => {
758                // nothing here
759            }
760
761            Instruction::ReceivedAttestation { attestation } => {
762                // Handle received attestation (from catchup)
763                info!("received attestation for entry {:?}", &attestation.entry_hash[..8]);
764                // TODO: implement attestation validation and insertion
765                // Following Elixir implementation:
766                // - Validate attestation vs chain
767                // - Insert if valid, cache if invalid but structurally correct
768                debug!("Attestation handling not fully implemented yet");
769            }
770
771            Instruction::ReceivedConsensus { consensus } => {
772                let _ = self.fabric.insert_consensus(&consensus);
773            }
774
775            Instruction::SpecialBusiness { business: _ } => {
776                // Handle special business messages
777                info!("received special business");
778                // TODO: implement special business handling
779                // Following Elixir implementation:
780                // - Parse business operation (slash_trainer_tx, slash_trainer_entry)
781                // - Generate appropriate attestation/signature
782                // - Reply with special_business_reply
783            }
784
785            Instruction::SpecialBusinessReply { business: _ } => {
786                // Handle special business reply messages
787                info!("received special business reply");
788                // TODO: implement special business reply handling
789                // Following Elixir implementation:
790                // - Parse reply operation
791                // - Verify signatures
792                // - Forward to SpecialMeetingGen
793            }
794
795            Instruction::SolicitEntry { hash: _ } => {
796                // Handle solicit entry request
797                info!("received solicit entry request");
798                // TODO: implement solicit entry handling
799                // Following Elixir implementation:
800                // - Check if peer is authorized trainer
801                // - Compare entry scores
802                // - Potentially backstep temporal chain
803            }
804
805            Instruction::SolicitEntry2 => {
806                // Handle solicit entry2 request
807                info!("received solicit entry2 request");
808                // TODO: implement solicit entry2 handling
809                // Following Elixir implementation:
810                // - Check if peer is authorized trainer for next height
811                // - Get best entry for current height
812                // - Potentially rewind chain if needed
813            }
814        };
815
816        Ok(())
817    }
818}
819
820#[cfg(test)]
821mod tests {
822    use super::*;
823    use crate::socket::MockSocket;
824
825    #[tokio::test(flavor = "multi_thread")]
826    async fn tokio_rwlock_allows_concurrent_reads() {
827        // set up a tokio RwLock and verify concurrent read access without awaiting while holding a guard
828        let lock = tokio::sync::RwLock::new(0usize);
829
830        // acquire first read
831        let r1 = lock.read().await;
832        assert_eq!(*r1, 0);
833        // try to acquire another read without await; should succeed when a read lock is already held
834        let r2 = lock.try_read();
835        assert!(r2.is_ok(), "try_read should succeed when another reader holds the lock");
836        // drop the second read guard before attempting a write to avoid deadlock
837        drop(r2);
838        drop(r1);
839
840        // now ensure we can write exclusively after dropping readers
841        let mut w = lock.write().await;
842        *w += 1;
843        assert_eq!(*w, 1);
844    }
845
846    #[tokio::test]
847    async fn test_anr_verification_request_creation() {
848        // test that we can create an ANR verification request without errors
849        use crate::utils::bls12_381 as bls;
850        use std::net::Ipv4Addr;
851
852        // create test config
853        let sk = bls::generate_sk();
854        let pk = bls::get_public_key(&sk).expect("pk");
855        let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
856
857        let config = config::Config {
858            work_folder: "/tmp/test".to_string(),
859            version: Ver::new(1, 2, 3),
860            offline: false,
861            http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
862            http_port: 3000,
863            udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
864            udp_port: 36969,
865            public_ipv4: Some("127.0.0.1".to_string()),
866            seed_ips: Vec::new(),
867            seed_anrs: Vec::new(),
868            other_nodes: Vec::new(),
869            trust_factor: 0.8,
870            max_peers: 100,
871            trainer_sk: sk,
872            trainer_pk: pk,
873            trainer_pk_b58: String::new(),
874            trainer_pop: pop.to_vec(),
875            archival_node: false,
876            autoupdate: false,
877            computor_type: None,
878            snapshot_height: 0,
879            anr: None,
880            anr_desc: None,
881            anr_name: None,
882        };
883
884        let target_ip = Ipv4Addr::new(127, 0, 0, 1);
885
886        // test that ANR creation doesn't panic and handles errors gracefully
887        let my_anr =
888            Anr::build(&config.trainer_sk, &config.trainer_pk, &config.trainer_pop, target_ip, Ver::new(1, 0, 0));
889        assert!(my_anr.is_ok());
890    }
891
892    #[tokio::test]
893    async fn test_get_random_unverified_anrs() {
894        // test the ANR selection logic - create a test registry
895        let registry = NodeAnrs::new();
896        let result = registry.get_random_not_verified(3).await;
897
898        // should not panic and should return a vector
899        // should return at most 3 results as requested
900        assert!(result.len() <= 3);
901    }
902
903    #[tokio::test]
904    async fn test_cleanup_stale_manual_trigger() {
905        // test that cleanup_stale can be called manually without error
906        use crate::utils::bls12_381 as bls;
907        use std::net::Ipv4Addr;
908
909        // create test config with minimal requirements
910        let sk = bls::generate_sk();
911        let pk = bls::get_public_key(&sk).expect("pk");
912        let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
913
914        // Use unique work folder to avoid OnceCell conflicts with other tests
915        let unique_id = format!("{}_{}", std::process::id(), utils::misc::get_unix_nanos_now());
916        let config = config::Config {
917            work_folder: format!("/tmp/test_cleanup_{}", unique_id),
918            version: Ver::new(1, 2, 3),
919            offline: false,
920            http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
921            http_port: 3000,
922            udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
923            udp_port: 36969,
924            public_ipv4: Some("127.0.0.1".to_string()),
925            seed_ips: vec!["127.0.0.1".parse().unwrap()],
926            seed_anrs: Vec::new(),
927            other_nodes: Vec::new(),
928            trust_factor: 0.8,
929            max_peers: 100,
930            trainer_sk: sk,
931            trainer_pk: pk,
932            trainer_pk_b58: String::new(),
933            trainer_pop: pop.to_vec(),
934            archival_node: false,
935            autoupdate: false,
936            computor_type: None,
937            snapshot_height: 0,
938            anr: None,
939            anr_desc: None,
940            anr_name: None,
941        };
942
943        // create context with test config
944        let socket = Arc::new(MockSocket::new());
945        match Context::with_config_and_socket(config, socket).await {
946            Ok(ctx) => {
947                // test cleanup_stale manual trigger - should not panic
948                ctx.cleanup_task().await;
949            }
950            Err(_) => {
951                // context creation failed - this can happen when running tests in parallel
952                // due to archiver OnceCell conflicts. This is acceptable for this test.
953            }
954        }
955    }
956
957    #[tokio::test]
958    async fn test_bootstrap_handshake_manual_trigger() {
959        // test that bootstrap_handshake can be called manually
960        use crate::utils::bls12_381 as bls;
961        use std::net::Ipv4Addr;
962
963        // create test config
964        let sk = bls::generate_sk();
965        let pk = bls::get_public_key(&sk).expect("pk");
966        let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
967
968        let work_folder = format!("/tmp/test_bootstrap_{}", std::process::id());
969        let config = config::Config {
970            work_folder,
971            version: Ver::new(1, 2, 3),
972            offline: false,
973            http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
974            http_port: 3000,
975            udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
976            udp_port: 36969,
977            public_ipv4: Some("127.0.0.1".to_string()),
978            seed_ips: vec!["127.0.0.1".parse().unwrap()], // test seed node
979            seed_anrs: Vec::new(),
980            other_nodes: Vec::new(),
981            trust_factor: 0.8,
982            max_peers: 100,
983            trainer_sk: sk,
984            trainer_pk: pk,
985            trainer_pk_b58: String::new(),
986            trainer_pop: pop.to_vec(),
987            archival_node: false,
988            autoupdate: false,
989            computor_type: None,
990            snapshot_height: 0,
991            anr: None,
992            anr_desc: None,
993            anr_name: None,
994        };
995
996        // create context with test config
997        let socket = Arc::new(MockSocket::new());
998        let ctx = Context::with_config_and_socket(config, socket).await.expect("context creation");
999
1000        // test bootstrap_handshake manual trigger - should handle errors gracefully
1001        match ctx.bootstrap_task().await {
1002            Ok(()) => {
1003                // success case - message was sent
1004            }
1005            Err(_e) => {
1006                // failure case is ok for testing, might be due to network issues
1007                // the important thing is that it doesn't panic
1008            }
1009        }
1010    }
1011
1012    #[tokio::test]
1013    async fn test_context_task_tracking() {
1014        // test that Context task tracking wrapper functions work
1015        use crate::utils::bls12_381 as bls;
1016        use std::net::Ipv4Addr;
1017
1018        let sk = bls::generate_sk();
1019        let pk = bls::get_public_key(&sk).expect("pk");
1020        let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
1021
1022        let unique_id = format!("{}_{}", std::process::id(), utils::misc::get_unix_nanos_now());
1023        let config = config::Config {
1024            work_folder: format!("/tmp/test_tasks_{}", unique_id),
1025            version: Ver::new(1, 2, 3),
1026            offline: false,
1027            http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1028            http_port: 3000,
1029            udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1030            udp_port: 36969,
1031            public_ipv4: Some("127.0.0.1".to_string()),
1032            seed_ips: Vec::new(),
1033            seed_anrs: Vec::new(),
1034            other_nodes: Vec::new(),
1035            trust_factor: 0.8,
1036            max_peers: 100,
1037            trainer_sk: sk,
1038            trainer_pk: pk,
1039            trainer_pk_b58: String::new(),
1040            trainer_pop: pop.to_vec(),
1041            archival_node: false,
1042            autoupdate: false,
1043            computor_type: None,
1044            snapshot_height: 0,
1045            anr: None,
1046            anr_desc: None,
1047            anr_name: None,
1048        };
1049
1050        let socket = Arc::new(MockSocket::new());
1051        let metrics = metrics::Metrics::new();
1052        let node_peers = peers::NodePeers::default();
1053        let node_anrs = crate::node::anr::NodeAnrs::new();
1054        let reassembler = node::ReedSolomonReassembler::new();
1055
1056        let fabric = crate::consensus::fabric::Fabric::new(&config.get_root()).await.unwrap();
1057        let ctx = Context { config, metrics, reassembler, peers: node_peers, anrs: node_anrs, fabric, socket };
1058
1059        // Test task tracking via Context wrapper methods
1060        let snapshot = ctx.get_metrics_snapshot();
1061        assert_eq!(snapshot.tasks, 0);
1062
1063        ctx.inc_tasks();
1064        ctx.inc_tasks();
1065        let snapshot = ctx.get_metrics_snapshot();
1066        assert_eq!(snapshot.tasks, 2);
1067
1068        ctx.dec_tasks();
1069        let snapshot = ctx.get_metrics_snapshot();
1070        assert_eq!(snapshot.tasks, 1);
1071    }
1072
1073    #[tokio::test]
1074    async fn test_context_convenience_socket_functions() {
1075        // test that Context convenience functions work without panicking
1076        use std::sync::Arc;
1077
1078        use crate::utils::bls12_381 as bls;
1079        use std::net::Ipv4Addr;
1080
1081        let sk = bls::generate_sk();
1082        let pk = bls::get_public_key(&sk).expect("pk");
1083        let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
1084
1085        let unique_id = format!("{}_{}", std::process::id(), utils::misc::get_unix_nanos_now());
1086        let config = config::Config {
1087            work_folder: format!("/tmp/test_convenience_{}", unique_id),
1088            version: Ver::new(1, 2, 3),
1089            offline: false,
1090            http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1091            http_port: 3000,
1092            udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1093            udp_port: 36969,
1094            public_ipv4: Some("127.0.0.1".to_string()),
1095            seed_ips: Vec::new(),
1096            seed_anrs: Vec::new(),
1097            other_nodes: Vec::new(),
1098            trust_factor: 0.8,
1099            max_peers: 100,
1100            trainer_sk: sk,
1101            trainer_pk: pk,
1102            trainer_pk_b58: String::new(),
1103            trainer_pop: pop.to_vec(),
1104            archival_node: false,
1105            autoupdate: false,
1106            computor_type: None,
1107            snapshot_height: 0,
1108            anr: None,
1109            anr_desc: None,
1110            anr_name: None,
1111        };
1112        let socket = Arc::new(MockSocket::new());
1113
1114        match Context::with_config_and_socket(config, socket).await {
1115            Ok(context) => {
1116                let mut buf = [0u8; 1024];
1117                let target: Ipv4Addr = "127.0.0.1".parse().unwrap();
1118
1119                let pong = Protocol::PingReply(PingReply { ts_m: 1234567890, seen_time: 1234567890123 });
1120                // Test send_to convenience function - should return error with MockSocket but not panic
1121                match context.send_message_to(&pong, target).await {
1122                    Ok(_) => {
1123                        // unexpected success with MockSocket
1124                    }
1125                    Err(_) => {
1126                        // expected error with MockSocket
1127                    }
1128                }
1129
1130                // Test recv_from convenience function - should return error with MockSocket but not panic
1131                match context.recv_from(&mut buf).await {
1132                    Ok(_) => {
1133                        // unexpected success with MockSocket
1134                    }
1135                    Err(_) => {
1136                        // expected error with MockSocket
1137                    }
1138                }
1139            }
1140            Err(_) => {
1141                // context creation failed - this is acceptable for this test
1142            }
1143        }
1144    }
1145}