Skip to main content

amadeus_node/
context.rs

1use crate::node::anr::{Anr, NodeAnrs};
2use crate::node::peers::{HandshakeStatus, PeersSummary};
3use crate::node::protocol::*;
4use crate::node::protocol::{Catchup, CatchupHeight, Instruction, NewPhoneWhoDis, NewPhoneWhoDisReply, Typename};
5use crate::node::{anr, peers};
6use crate::socket::UdpSocketExt;
7use crate::utils::misc::format_duration;
8use crate::utils::{Hash, PublicKey};
9use crate::{SystemStats, Ver, config, consensus, get_system_stats, metrics, node, utils};
10use amadeus_utils::vecpak;
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use std::net::{Ipv4Addr, SocketAddr};
14use std::sync::Arc;
15use tracing::{debug, info, instrument, warn};
16
17/// Softfork status based on temporal-rooted height gap
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
19#[serde(rename_all = "lowercase")]
20pub enum SoftforkStatus {
21    /// No fork, healthy state (gap 0 or 1)
22    #[serde(rename = "")]
23    Healthy,
24    /// Minor fork (gap 2-10, may auto-resolve)
25    Minor,
26    /// Major fork (gap > 10, manual intervention needed)
27    Major,
28}
29
30#[derive(Debug, thiserror::Error, strum_macros::IntoStaticStr)]
31pub enum Error {
32    #[error(transparent)]
33    Io(#[from] std::io::Error),
34    #[error(transparent)]
35    Fabric(#[from] consensus::fabric::Error),
36    #[error(transparent)]
37    Archiver(#[from] utils::archiver::Error),
38    #[error(transparent)]
39    Protocol(#[from] node::protocol::Error),
40    #[error(transparent)]
41    Config(#[from] config::Error),
42    #[error(transparent)]
43    Anr(#[from] anr::Error),
44    #[error(transparent)]
45    Peers(#[from] peers::Error),
46    #[error("other error: {0}")]
47    Other(String),
48}
49
50impl Typename for Error {
51    fn typename(&self) -> &'static str {
52        self.into()
53    }
54}
55
56pub struct Context {
57    pub(crate) config: config::Config,
58    pub(crate) metrics: metrics::Metrics,
59    pub(crate) reassembler: node::ReedSolomonReassembler,
60    pub(crate) peers: peers::NodePeers,
61    pub(crate) anrs: NodeAnrs,
62    pub(crate) fabric: crate::consensus::fabric::Fabric,
63    pub(crate) socket: Arc<dyn UdpSocketExt>,
64}
65
66impl Context {
67    pub async fn with_config_and_socket(
68        config: config::Config,
69        socket: Arc<dyn UdpSocketExt>,
70    ) -> Result<Arc<Self>, Error> {
71        use crate::config::{
72            ANR_PERIOD_MILLIS, BROADCAST_PERIOD_MILLIS, CATCHUP_PERIOD_MILLIS, CLEANUP_PERIOD_MILLIS,
73            CONSENSUS_PERIOD_MILLIS,
74        };
75        use crate::consensus::fabric::Fabric;
76        use crate::utils::archiver::init_storage;
77        use metrics::Metrics;
78        use node::ReedSolomonReassembler;
79        use tokio::time::{Duration, interval};
80
81        assert_ne!(config.get_root(), "");
82        init_storage(&config.get_root()).await?;
83
84        let fabric = Fabric::new(&config.get_root()).await?;
85        let metrics = Metrics::new();
86        let node_peers = peers::NodePeers::default();
87        let node_anrs = NodeAnrs::new();
88        let reassembler = ReedSolomonReassembler::new();
89
90        node_anrs.seed(&config).await; // must be done before node_peers.seed()
91        node_peers.seed(&fabric, &config, &node_anrs).await?;
92
93        let ctx = Arc::new(Self { config, metrics, reassembler, peers: node_peers, anrs: node_anrs, fabric, socket });
94
95        tokio::spawn({
96            let ctx = ctx.clone();
97            async move {
98                if let Err(e) = ctx.bootstrap_task().await {
99                    warn!("bootstrap task error: {e}");
100                    // ctx.metrics.add_error(&e);
101                }
102            }
103        });
104
105        tokio::spawn({
106            let ctx = ctx.clone();
107            async move {
108                let mut ticker = interval(Duration::from_millis(CLEANUP_PERIOD_MILLIS));
109                loop {
110                    ticker.tick().await;
111                    ctx.cleanup_task().await;
112                }
113            }
114        });
115
116        tokio::spawn({
117            let ctx = ctx.clone();
118            async move {
119                let mut ticker = interval(Duration::from_millis(ANR_PERIOD_MILLIS));
120                ticker.tick().await;
121                loop {
122                    ticker.tick().await;
123                    if let Err(e) = ctx.anr_task().await {
124                        warn!("anr task error: {e}");
125                        // ctx.metrics.add_error(&e);
126                    }
127                }
128            }
129        });
130
131        tokio::spawn({
132            let ctx = ctx.clone();
133            async move {
134                let mut ticker = interval(Duration::from_millis(BROADCAST_PERIOD_MILLIS));
135                loop {
136                    ticker.tick().await;
137                    if let Err(e) = ctx.broadcast_task().await {
138                        // broadcast errors are expected when starting from scratch
139                        warn!("broadcast task error: {e}");
140                        //ctx.metrics.add_error(&e);
141                    }
142                }
143            }
144        });
145
146        tokio::spawn({
147            let ctx = ctx.clone();
148            async move {
149                let mut ticker = interval(Duration::from_millis(CONSENSUS_PERIOD_MILLIS));
150                loop {
151                    ticker.tick().await;
152                    if let Err(e) = ctx.consensus_task().await {
153                        warn!("consensus task error: {e}");
154                    }
155                }
156            }
157        });
158
159        tokio::spawn({
160            let ctx = ctx.clone();
161            async move {
162                let mut is_syncing = false;
163                loop {
164                    let tick_ms = if is_syncing { 30 } else { CATCHUP_PERIOD_MILLIS };
165                    tokio::time::sleep(Duration::from_millis(tick_ms)).await;
166
167                    match ctx.catchup_task().await {
168                        Ok(syncing) => is_syncing = syncing,
169                        Err(e) => warn!("catchup task error: {e}"),
170                    }
171                }
172            }
173        });
174
175        tokio::spawn({
176            let ctx = ctx.clone();
177            async move {
178                let mut ticker = interval(Duration::from_millis(600_000));
179                loop {
180                    ticker.tick().await;
181                    if let Err(e) = ctx.autoupdate_task().await {
182                        warn!("autoupdate task error: {e}");
183                        // ctx.metrics.add_error(&e);
184                    }
185                }
186            }
187        });
188
189        Ok(ctx)
190    }
191
192    #[instrument(skip(self), name = "bootstrap_task")]
193    async fn bootstrap_task(&self) -> Result<(), Error> {
194        let new_phone_who_dis = Protocol::NewPhoneWhoDis(NewPhoneWhoDis::new());
195
196        for ip in &self.config.seed_ips {
197            // Prefer encrypted handshake (requires ANR); if it fails, log and continue without aborting.
198            match new_phone_who_dis.send_to_with_metrics(self, *ip).await {
199                Ok(_) => {
200                    debug!("sent encrypted new_phone_who_dis to seed {ip}");
201                }
202                Err(e) => {
203                    // Handle gracefully: tests may run without seeded ANRs. Do not fail the whole task.
204                    warn!("failed to send encrypted new_phone_who_dis to seed {ip}: {e}");
205                }
206            }
207            // Mark handshake as initiated regardless of send outcome to reflect intent to connect.
208            self.peers.set_handshake_status(*ip, HandshakeStatus::Initiated).await?;
209        }
210
211        info!("sent new_phone_who_dis to {} seed nodes", self.config.seed_ips.len());
212        Ok(())
213    }
214
215    #[instrument(skip(self), name = "cleanup_task")]
216    async fn cleanup_task(&self) {
217        self.anrs.update_rate_limiting_counters().await;
218        let cleared_shards = self.reassembler.clear_stale().await;
219        let cleared_peers = self.peers.clear_stale(&self.fabric, &self.anrs).await;
220        if cleared_shards > 0 || cleared_peers > 0 {
221            debug!("cleared {} stale shards, {} stale peers", cleared_shards, cleared_peers);
222        }
223    }
224
225    #[instrument(skip(self), name = "anr_task")]
226    async fn anr_task(&self) -> Result<(), Error> {
227        let unverified_ips = self.anrs.get_random_not_verified(3).await;
228        if !unverified_ips.is_empty() {
229            let new_phone_who_dis = Protocol::NewPhoneWhoDis(NewPhoneWhoDis::new());
230            for ip in &unverified_ips {
231                new_phone_who_dis.send_to_with_metrics(self, *ip).await?;
232                self.peers.set_handshake_status(*ip, HandshakeStatus::Initiated).await?;
233            }
234        }
235
236        let verified_ips = self.anrs.get_random_verified(3).await;
237        if !verified_ips.is_empty() {
238            let get_peer_anrs = Protocol::GetPeerAnrs(GetPeerAnrs::new(self.anrs.get_all_b3f4().await));
239            for ip in &verified_ips {
240                self.send_message_to(&get_peer_anrs, *ip).await?;
241            }
242        }
243
244        info!("sent new_phone_who_dis to {} and get_peer_anrs to {} nodes", unverified_ips.len(), verified_ips.len());
245
246        Ok(())
247    }
248
249    #[instrument(skip(self), name = "broadcast_task")]
250    async fn broadcast_task(&self) -> Result<(), Error> {
251        let ping = Protocol::Ping(Ping::new());
252        let tip = Protocol::EventTip(EventTip::from_current_tips_db(&self.fabric)?);
253
254        let my_ip = self.config.get_public_ipv4();
255        let peers = self.peers.get_all().await?;
256        if !peers.is_empty() {
257            let mut sent_count = 0;
258            for peer in peers {
259                if peer.ip != my_ip {
260                    self.send_message_to(&ping, peer.ip).await?;
261                    self.send_message_to(&tip, peer.ip).await?;
262                    sent_count += 1;
263                }
264            }
265
266            debug!("sent {sent_count} ping and tip messages");
267        }
268
269        Ok(())
270    }
271
272    #[instrument(skip(self), name = "autoupdate_task")]
273    async fn autoupdate_task(&self) -> Result<(), Error> {
274        // TODO: check if updates are available, then download and verify update signature and
275        // apply set the flag to make the node restart after it's slot
276        Ok(())
277    }
278
279    #[instrument(skip(self), name = "consensus_task")]
280    async fn consensus_task(&self) -> Result<(), Error> {
281        use consensus::consensus::{proc_consensus, proc_entries};
282        self.fabric.start_proc_consensus();
283        if let Err(e) = proc_entries(&self.fabric, &self.config, self).await {
284            warn!("proc_entries failed: {e}");
285        }
286
287        if let Err(e) = proc_consensus(&self.fabric) {
288            warn!("proc_consensus failed: {e}");
289        }
290        self.fabric.stop_proc_consensus();
291        Ok(())
292    }
293
294    #[instrument(skip(self), name = "catchup_task")]
295    async fn catchup_task(&self) -> Result<bool, Error> {
296        if self.fabric.is_proc_consensus() {
297            return Ok(true);
298        }
299
300        let temporal_height = match self.fabric.get_temporal_height() {
301            Ok(Some(h)) => h,
302            Ok(None) => 0,
303            Err(e) => return Err(e.into()),
304        };
305        let rooted_height = self.fabric.get_rooted_height()?.unwrap_or_default();
306        info!("Temporal: {} Rooted: {}", temporal_height, rooted_height);
307
308        let trainer_pks = self.fabric.trainers_for_height(temporal_height).unwrap_or_default();
309        let (peers_temporal, peers_rooted, peers_bft) = self.peers.get_heights(&trainer_pks).await?;
310
311        let behind_temporal = peers_temporal.saturating_sub(temporal_height);
312        let behind_rooted = peers_rooted.saturating_sub(rooted_height);
313        let behind_bft = peers_bft.saturating_sub(temporal_height);
314        let rooting_stuck = temporal_height.saturating_sub(rooted_height) > 1000;
315
316        if rooting_stuck {
317            warn!(
318                "Stopped syncing: getting {} consensuses starting {}",
319                temporal_height - rooted_height,
320                rooted_height + 1
321            );
322            let online_trainer_ips = self.peers.get_trainer_ips_above_temporal(temporal_height, &trainer_pks).await?;
323            let heights: Vec<u64> = (rooted_height + 1..=temporal_height).take(200).collect();
324            let chunks: Vec<Vec<CatchupHeight>> = heights
325                .into_iter()
326                .map(|height| CatchupHeight { height, c: Some(true), e: None, a: None, hashes: None })
327                .collect::<Vec<_>>()
328                .chunks(20)
329                .map(|chunk| chunk.to_vec())
330                .collect();
331            self.fetch_heights(chunks, online_trainer_ips).await?;
332            return Ok(false);
333        }
334
335        if behind_bft > 0 {
336            info!("Behind BFT: Syncing {} entries", behind_bft);
337            let online_trainer_ips = self.peers.get_trainer_ips_above_temporal(peers_bft, &trainer_pks).await?;
338            let heights: Vec<u64> = (rooted_height + 1..=peers_bft).take(200).collect();
339            let chunks: Vec<Vec<CatchupHeight>> = heights
340                .into_iter()
341                .map(|height| CatchupHeight { height, c: Some(true), e: Some(true), a: None, hashes: None })
342                .collect::<Vec<_>>()
343                .chunks(20)
344                .map(|chunk| chunk.to_vec())
345                .collect();
346            self.fetch_heights(chunks, online_trainer_ips).await?;
347            return Ok(false);
348        }
349
350        if behind_rooted > 0 {
351            info!("Behind rooted: Syncing {} entries", behind_rooted);
352            let online_trainer_ips = self.peers.get_trainer_ips_above_rooted(peers_rooted, &trainer_pks).await?;
353            let heights: Vec<u64> = (rooted_height + 1..=peers_rooted).take(200).collect();
354            let chunks: Vec<Vec<CatchupHeight>> = heights
355                .into_iter()
356                .map(|height| {
357                    let entries = self.fabric.entries_by_height(height).unwrap_or_default();
358                    let hashes = entries;
359                    CatchupHeight { height, c: Some(true), e: Some(true), a: None, hashes: Some(hashes) }
360                })
361                .collect::<Vec<_>>()
362                .chunks(20)
363                .map(|chunk| chunk.to_vec())
364                .collect();
365            self.fetch_heights(chunks, online_trainer_ips).await?;
366            return Ok(false);
367        }
368
369        if behind_temporal > 0 {
370            info!("Behind temporal: Syncing {} entries", behind_temporal);
371            let online_trainer_ips = self.peers.get_trainer_ips_above_temporal(peers_temporal, &trainer_pks).await?;
372            let heights: Vec<u64> = (temporal_height..=peers_temporal).take(200).collect();
373            let chunks: Vec<Vec<CatchupHeight>> = heights
374                .into_iter()
375                .map(|height| {
376                    let entries = self.fabric.entries_by_height(height).unwrap_or_default();
377                    let hashes = entries;
378                    CatchupHeight { height, c: None, e: Some(true), a: Some(true), hashes: Some(hashes) }
379                })
380                .collect::<Vec<_>>()
381                .chunks(20)
382                .map(|chunk| chunk.to_vec())
383                .collect();
384            self.fetch_heights(chunks, online_trainer_ips).await?;
385            return Ok(false);
386        }
387
388        if behind_temporal == 0 {
389            info!("In sync: Fetching attestations for last entry");
390            let online_trainer_ips = self.peers.get_trainer_ips_above_temporal(peers_temporal, &trainer_pks).await?;
391            let entries = self.fabric.entries_by_height(temporal_height).unwrap_or_default();
392            let hashes = entries;
393            let chunk = vec![CatchupHeight {
394                height: temporal_height,
395                c: None,
396                e: Some(true),
397                a: Some(true),
398                hashes: Some(hashes),
399            }];
400            self.fetch_heights(vec![chunk], online_trainer_ips).await?;
401        }
402
403        Ok(behind_temporal > 0 || behind_rooted > 0 || behind_bft > 0 || rooting_stuck)
404    }
405
406    async fn fetch_heights(
407        &self,
408        chunks: Vec<Vec<CatchupHeight>>,
409        peers: Vec<std::net::Ipv4Addr>,
410    ) -> Result<(), Error> {
411        use rand::seq::SliceRandom;
412        let mut shuffled_peers = peers;
413        {
414            let mut rng = rand::rng();
415            shuffled_peers.shuffle(&mut rng);
416        }
417
418        for (chunk, peer_ip) in chunks.into_iter().zip(shuffled_peers.into_iter().cycle()) {
419            Protocol::Catchup(Catchup { height_flags: chunk }).send_to_with_metrics(self, peer_ip).await?;
420        }
421        Ok(())
422    }
423
424    pub fn get_prometheus_metrics(&self) -> String {
425        self.metrics.get_prometheus()
426    }
427
428    pub fn get_json_health(&self) -> Value {
429        serde_json::json!({
430            "status": "ok",
431            "version": env!("CARGO_PKG_VERSION"),
432            "uptime": self.metrics.get_uptime()
433        })
434    }
435
436    /// Convenience function to send UDP data with metrics tracking
437    pub async fn send_message_to(&self, message: &Protocol, dst: Ipv4Addr) -> Result<(), Error> {
438        message.send_to_with_metrics(self, dst).await.map_err(Into::into)
439    }
440
441    /// Convenience function to receive UDP data with metrics tracking
442    pub async fn recv_from(&self, buf: &mut [u8]) -> std::io::Result<(usize, SocketAddr)> {
443        self.socket.recv_from_with_metrics(buf, &self.metrics).await
444    }
445
446    pub async fn is_peer_handshaked(&self, ip: Ipv4Addr) -> bool {
447        if let Some(peer) = self.peers.by_ip(ip).await {
448            if let Some(ref pk) = peer.pk {
449                if self.anrs.is_handshaked(pk.as_ref()).await {
450                    return true;
451                }
452            }
453        }
454        false
455    }
456
457    pub async fn get_peers_summary(&self) -> Result<PeersSummary, Error> {
458        let my_ip = self.config.get_public_ipv4();
459        let temporal_height = self.get_temporal_height();
460        let trainer_pks = self.fabric.trainers_for_height(temporal_height + 1).unwrap_or_default();
461        self.peers.get_peers_summary(my_ip, &trainer_pks).await.map_err(Into::into)
462    }
463
464    pub fn get_softfork_status(&self) -> SoftforkStatus {
465        let temporal_height = self.get_temporal_height();
466        let rooted_height = self.get_rooted_height();
467        let gap = temporal_height.saturating_sub(rooted_height);
468
469        match gap {
470            0 | 1 => SoftforkStatus::Healthy,
471            2..=10 => SoftforkStatus::Minor,
472            _ => SoftforkStatus::Major,
473        }
474    }
475
476    pub fn get_config(&self) -> &config::Config {
477        &self.config
478    }
479
480    pub fn get_socket(&self) -> Arc<dyn UdpSocketExt> {
481        self.socket.clone()
482    }
483
484    pub fn get_metrics(&self) -> &metrics::Metrics {
485        &self.metrics
486    }
487
488    pub fn get_metrics_snapshot(&self) -> metrics::MetricsSnapshot {
489        self.metrics.get_snapshot()
490    }
491
492    pub fn get_system_stats(&self) -> SystemStats {
493        get_system_stats()
494    }
495
496    pub fn get_uptime(&self) -> String {
497        format_duration(self.metrics.get_uptime())
498    }
499
500    /// Add a task to the metrics tracker
501    pub fn inc_tasks(&self) {
502        self.metrics.inc_tasks();
503    }
504
505    /// Remove a task from the metrics tracker
506    pub fn dec_tasks(&self) {
507        self.metrics.dec_tasks();
508    }
509
510    /// Get temporal height from fabric
511    pub fn get_temporal_height(&self) -> u64 {
512        self.fabric.get_temporal_height().ok().flatten().unwrap_or_default() as u64
513    }
514
515    /// Get rooted height from fabric
516    pub fn get_rooted_height(&self) -> u64 {
517        self.fabric.get_rooted_height().ok().flatten().unwrap_or_default() as u64
518    }
519
520    pub async fn get_entries(&self) -> Vec<(u64, u64, u64)> {
521        // Try to get real archived entries, fallback to sample data if it fails
522        tokio::task::spawn_blocking(|| {
523            tokio::runtime::Handle::current().block_on(async {
524                consensus::doms::entry::get_archived_entries().await.unwrap_or_else(|_| {
525                    // Fallback to sample data if archiver fails
526                    vec![
527                        (201, 20100123, 1024), // (epoch, height, size_bytes)
528                        (201, 20100456, 2048),
529                        (202, 20200789, 1536),
530                        (202, 20201012, 3072),
531                        (203, 20300345, 2560),
532                    ]
533                })
534            })
535        })
536        .await
537        .unwrap_or_else(|_| {
538            // Fallback if spawn_blocking fails
539            vec![
540                (201, 20100123, 1024),
541                (201, 20100456, 2048),
542                (202, 20200789, 1536),
543                (202, 20201012, 3072),
544                (203, 20300345, 2560),
545            ]
546        })
547    }
548
549    /// Set handshake status for a peer by IP address
550    pub async fn set_peer_handshake_status(&self, ip: Ipv4Addr, status: HandshakeStatus) -> Result<(), peers::Error> {
551        self.peers.set_handshake_status(ip, status).await
552    }
553
554    /// Update peer information from ANR data
555    pub async fn update_peer_from_anr(
556        &self,
557        ip: Ipv4Addr,
558        pk: &PublicKey,
559        version: &Ver,
560        status: Option<HandshakeStatus>,
561    ) {
562        self.peers.update_peer_from_anr(ip, pk, version, status).await
563    }
564
565    /// Get all ANRs
566    pub async fn get_all_anrs(&self) -> Vec<anr::Anr> {
567        self.anrs.get_all().await
568    }
569
570    /// Get ANR by public key (Base58 encoded)
571    pub async fn get_anr_by_pk_b58(&self, pk_b58: &str) -> Option<anr::Anr> {
572        if let Ok(pk_bytes) = bs58::decode(pk_b58).into_vec() {
573            let pk_array: Result<[u8; 48], _> = pk_bytes.try_into();
574            if let Ok(pk_array) = pk_array {
575                let pk = PublicKey::from(pk_array);
576                return self.get_anr_by_pk(&pk).await;
577            }
578        }
579        None
580    }
581
582    /// Get ANR by public key bytes
583    pub async fn get_anr_by_pk(&self, pk: &PublicKey) -> Option<anr::Anr> {
584        let all_anrs = self.anrs.get_all().await;
585        all_anrs.into_iter().find(|anr| &anr.pk == pk)
586    }
587
588    /// Get all handshaked ANRs (validators)
589    pub async fn get_validator_anrs(&self) -> Vec<anr::Anr> {
590        let all_anrs = self.anrs.get_all().await;
591        all_anrs.into_iter().filter(|anr| anr.handshaked).collect()
592    }
593
594    /// Get entries by height from fabric
595    pub fn get_entries_by_height(&self, height: u64) -> Result<Vec<consensus::doms::entry::Entry>, Error> {
596        let entries_raw = self.fabric.entries_by_height(height)?;
597        entries_raw
598            .into_iter()
599            .map(|raw| {
600                consensus::doms::entry::Entry::from_vecpak_bin(&raw)
601                    .map_err(|_| Error::Other("entry not vecpak in fabric".into()))
602            })
603            .collect()
604    }
605
606    /// Get temporal entry from fabric
607    pub fn get_temporal_entry(&self) -> Result<Option<consensus::doms::entry::Entry>, Error> {
608        self.fabric.get_temporal_entry().map_err(Into::into)
609    }
610
611    /// Get trainers for height
612    pub fn get_trainers_for_height(&self, height: u64) -> Option<Vec<PublicKey>> {
613        self.fabric.trainers_for_height(height)
614    }
615
616    /// Get node's public key
617    pub fn get_public_key(&self) -> PublicKey {
618        self.config.get_pk()
619    }
620
621    /// Get wallet balance - wrapper around fabric.chain_balance_symbol
622    pub fn get_wallet_balance(&self, public_key: &PublicKey, symbol: &[u8]) -> i128 {
623        self.fabric.chain_balance_symbol(public_key.as_ref(), symbol)
624    }
625
626    /// Get contract state from CF_CONTRACTSTATE
627    pub fn get_contract_state(&self, contract: &PublicKey, key: &[u8]) -> Option<Vec<u8>> {
628        use amadeus_utils::constants::CF_CONTRACTSTATE;
629        let full_key = [b"bic:contract:" as &[u8], contract.as_ref(), b":" as &[u8], key].concat();
630        self.fabric.db().get(CF_CONTRACTSTATE, &full_key).ok().flatten()
631    }
632
633    /// Get chain difficulty bits
634    pub fn get_chain_diff_bits(&self) -> u32 {
635        self.fabric.chain_diff_bits() as u32
636    }
637
638    /// Get total sols
639    pub fn get_chain_total_sols(&self) -> i128 {
640        self.fabric.chain_total_sols() as i128
641    }
642
643    /// Get all balances for a wallet using prefix scan
644    pub fn get_all_wallet_balances(&self, public_key: &PublicKey) -> Vec<(Vec<u8>, i128)> {
645        use amadeus_utils::constants::CF_CONTRACTSTATE;
646        let prefix = [b"account:" as &[u8], public_key.as_ref() as &[u8], b":balance:" as &[u8]].concat();
647        self.fabric
648            .db()
649            .iter_prefix(CF_CONTRACTSTATE, &prefix)
650            .unwrap_or_default()
651            .into_iter()
652            .filter_map(|(key, value)| {
653                if key.len() <= prefix.len() {
654                    return None;
655                }
656                let symbol = key[prefix.len()..].to_vec();
657                let amount = std::str::from_utf8(&value).ok()?.parse::<i128>().ok()?;
658                if amount > 0 { Some((symbol, amount)) } else { None }
659            })
660            .collect()
661    }
662
663    /// Get entry by hash
664    pub fn get_entry_by_hash(&self, hash: &Hash) -> Option<consensus::doms::entry::Entry> {
665        self.fabric.get_entry_by_hash(hash)
666    }
667
668    /// Get value from database column family
669    pub fn db_get(&self, cf: &str, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
670        self.fabric.db().get(cf, key).map_err(|e| Error::Other(e.to_string()))
671    }
672
673    /// Iterate over keys with prefix in database column family
674    pub fn db_iter_prefix(&self, cf: &str, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Error> {
675        self.fabric.db().iter_prefix(cf, prefix).map_err(|e| Error::Other(e.to_string()))
676    }
677
678    /// Reads UDP datagram and silently does parsing, validation and reassembly
679    /// If the protocol message is complete, returns Some(Protocol)
680    pub async fn parse_udp(&self, buf: &[u8], src: Ipv4Addr) -> Option<Protocol> {
681        self.metrics.add_incoming_udp_packet(buf.len());
682
683        if !self.anrs.is_within_udp_limit(src).await? {
684            return None; // peer sends too many UDP packets
685        }
686
687        // Process encrypted message shards
688        match self.reassembler.add_shard(buf, &self.config.get_sk()).await {
689            Ok(Some((packet, pk))) => {
690                match vecpak::from_slice::<Protocol>(&packet) {
691                    Ok(proto) => {
692                        self.peers.update_peer_from_proto(src, proto.typename()).await;
693                        let is_allowed =
694                            matches!(proto, Protocol::NewPhoneWhoDis(_) | Protocol::NewPhoneWhoDisReply(_))
695                                || self.anrs.handshaked_and_valid_ip4(pk.as_ref(), &src).await;
696
697                        if !is_allowed {
698                            self.anrs.unset_handshaked(pk.as_ref()).await;
699                            warn!("handshake needed {src}");
700                            return None; // neither handshake message nor handshaked peer
701                        }
702
703                        if !self.anrs.is_within_proto_limit(pk.as_ref(), proto.typename()).await? {
704                            return None; // peer sends too many proto requests
705                        }
706
707                        self.metrics.add_incoming_proto(proto.typename());
708                        return Some(proto);
709                    }
710                    Err(e) => warn!("parse error: {e} {}", hex::encode(packet)),
711                }
712            }
713            Ok(None) => {} // waiting for more shards, not an error
714            Err(e) => warn!("bad udp frame from {src} - {e}"),
715        }
716
717        None
718    }
719
720    pub async fn handle(&self, message: Protocol, src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
721        self.metrics.add_incoming_proto(message.typename());
722        message.handle(self, src).await.map_err(|e| {
723            warn!("can't handle {}: {e}", message.typename());
724            e.into()
725        })
726    }
727
728    pub async fn execute(&self, instruction: Instruction) -> Result<(), Error> {
729        let name = instruction.typename();
730        self.execute_inner(instruction).await.inspect_err(|e| warn!("can't execute {name}: {e}"))
731    }
732
733    /// Handle instruction processing following the Elixir reference implementation
734    pub async fn execute_inner(&self, instruction: Instruction) -> Result<(), Error> {
735        match instruction {
736            Instruction::Noop { why } => {
737                debug!("noop: {why}");
738            }
739
740            Instruction::SendNewPhoneWhoDisReply { dst } => {
741                let anr = Anr::from_config(&self.config)?;
742                let reply = Protocol::NewPhoneWhoDisReply(NewPhoneWhoDisReply::new(anr));
743                self.send_message_to(&reply, dst).await?;
744            }
745
746            Instruction::SendGetPeerAnrsReply { dst, anrs } => {
747                let peers_v2 = Protocol::GetPeerAnrsReply(GetPeerAnrsReply::new(anrs));
748                self.send_message_to(&peers_v2, dst).await?;
749            }
750
751            Instruction::SendPingReply { ts_m, dst } => {
752                let pong = Protocol::PingReply(PingReply::new(ts_m));
753                self.send_message_to(&pong, dst).await?;
754            }
755
756            Instruction::ValidTxs { txs } => {
757                // Insert valid transactions into tx pool
758                info!("received {} valid transactions", txs.len());
759                // TODO: implement TXPool.insert(txs) equivalent
760            }
761
762            Instruction::ReceivedEntry { entry: _ } => {
763                // nothing here
764            }
765
766            Instruction::ReceivedAttestation { attestation } => {
767                // Handle received attestation (from catchup)
768                info!("received attestation for entry {:?}", &attestation.entry_hash[..8]);
769                // TODO: implement attestation validation and insertion
770                // Following Elixir implementation:
771                // - Validate attestation vs chain
772                // - Insert if valid, cache if invalid but structurally correct
773                debug!("Attestation handling not fully implemented yet");
774            }
775
776            Instruction::ReceivedConsensus { consensus } => {
777                let _ = self.fabric.insert_consensus(&consensus);
778            }
779
780            Instruction::SpecialBusiness { business: _ } => {
781                // Handle special business messages
782                info!("received special business");
783                // TODO: implement special business handling
784                // Following Elixir implementation:
785                // - Parse business operation (slash_trainer_tx, slash_trainer_entry)
786                // - Generate appropriate attestation/signature
787                // - Reply with special_business_reply
788            }
789
790            Instruction::SpecialBusinessReply { business: _ } => {
791                // Handle special business reply messages
792                info!("received special business reply");
793                // TODO: implement special business reply handling
794                // Following Elixir implementation:
795                // - Parse reply operation
796                // - Verify signatures
797                // - Forward to SpecialMeetingGen
798            }
799
800            Instruction::SolicitEntry { hash: _ } => {
801                // Handle solicit entry request
802                info!("received solicit entry request");
803                // TODO: implement solicit entry handling
804                // Following Elixir implementation:
805                // - Check if peer is authorized trainer
806                // - Compare entry scores
807                // - Potentially backstep temporal chain
808            }
809
810            Instruction::SolicitEntry2 => {
811                // Handle solicit entry2 request
812                info!("received solicit entry2 request");
813                // TODO: implement solicit entry2 handling
814                // Following Elixir implementation:
815                // - Check if peer is authorized trainer for next height
816                // - Get best entry for current height
817                // - Potentially rewind chain if needed
818            }
819        };
820
821        Ok(())
822    }
823}
824
825#[cfg(test)]
826mod tests {
827    use super::*;
828    use crate::socket::MockSocket;
829
830    #[tokio::test(flavor = "multi_thread")]
831    async fn tokio_rwlock_allows_concurrent_reads() {
832        // set up a tokio RwLock and verify concurrent read access without awaiting while holding a guard
833        let lock = tokio::sync::RwLock::new(0usize);
834
835        // acquire first read
836        let r1 = lock.read().await;
837        assert_eq!(*r1, 0);
838        // try to acquire another read without await; should succeed when a read lock is already held
839        let r2 = lock.try_read();
840        assert!(r2.is_ok(), "try_read should succeed when another reader holds the lock");
841        // drop the second read guard before attempting a write to avoid deadlock
842        drop(r2);
843        drop(r1);
844
845        // now ensure we can write exclusively after dropping readers
846        let mut w = lock.write().await;
847        *w += 1;
848        assert_eq!(*w, 1);
849    }
850
851    #[tokio::test]
852    async fn test_anr_verification_request_creation() {
853        // test that we can create an ANR verification request without errors
854        use crate::utils::bls12_381 as bls;
855        use std::net::Ipv4Addr;
856
857        // create test config
858        let sk = bls::generate_sk();
859        let pk = bls::get_public_key(&sk).expect("pk");
860        let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
861
862        let config = config::Config {
863            work_folder: "/tmp/test".to_string(),
864            version: Ver::new(1, 2, 3),
865            offline: false,
866            http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
867            http_port: 3000,
868            udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
869            udp_port: 36969,
870            public_ipv4: Some("127.0.0.1".to_string()),
871            seed_ips: Vec::new(),
872            seed_anrs: Vec::new(),
873            other_nodes: Vec::new(),
874            trust_factor: 0.8,
875            max_peers: 100,
876            trainer_sk: sk,
877            trainer_pk: pk,
878            trainer_pk_b58: String::new(),
879            trainer_pop: pop.to_vec(),
880            archival_node: false,
881            autoupdate: false,
882            computor_type: None,
883            snapshot_height: 0,
884            anr: None,
885            anr_desc: None,
886            anr_name: None,
887        };
888
889        let target_ip = Ipv4Addr::new(127, 0, 0, 1);
890
891        // test that ANR creation doesn't panic and handles errors gracefully
892        let my_anr =
893            Anr::build(&config.trainer_sk, &config.trainer_pk, &config.trainer_pop, target_ip, Ver::new(1, 0, 0));
894        assert!(my_anr.is_ok());
895    }
896
897    #[tokio::test]
898    async fn test_get_random_unverified_anrs() {
899        // test the ANR selection logic - create a test registry
900        let registry = NodeAnrs::new();
901        let result = registry.get_random_not_verified(3).await;
902
903        // should not panic and should return a vector
904        // should return at most 3 results as requested
905        assert!(result.len() <= 3);
906    }
907
908    #[tokio::test]
909    async fn test_cleanup_stale_manual_trigger() {
910        // test that cleanup_stale can be called manually without error
911        use crate::utils::bls12_381 as bls;
912        use std::net::Ipv4Addr;
913
914        // create test config with minimal requirements
915        let sk = bls::generate_sk();
916        let pk = bls::get_public_key(&sk).expect("pk");
917        let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
918
919        // Use unique work folder to avoid OnceCell conflicts with other tests
920        let unique_id = format!("{}_{}", std::process::id(), utils::misc::get_unix_nanos_now());
921        let config = config::Config {
922            work_folder: format!("/tmp/test_cleanup_{}", unique_id),
923            version: Ver::new(1, 2, 3),
924            offline: false,
925            http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
926            http_port: 3000,
927            udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
928            udp_port: 36969,
929            public_ipv4: Some("127.0.0.1".to_string()),
930            seed_ips: vec!["127.0.0.1".parse().unwrap()],
931            seed_anrs: Vec::new(),
932            other_nodes: Vec::new(),
933            trust_factor: 0.8,
934            max_peers: 100,
935            trainer_sk: sk,
936            trainer_pk: pk,
937            trainer_pk_b58: String::new(),
938            trainer_pop: pop.to_vec(),
939            archival_node: false,
940            autoupdate: false,
941            computor_type: None,
942            snapshot_height: 0,
943            anr: None,
944            anr_desc: None,
945            anr_name: None,
946        };
947
948        // create context with test config
949        let socket = Arc::new(MockSocket::new());
950        match Context::with_config_and_socket(config, socket).await {
951            Ok(ctx) => {
952                // test cleanup_stale manual trigger - should not panic
953                ctx.cleanup_task().await;
954            }
955            Err(_) => {
956                // context creation failed - this can happen when running tests in parallel
957                // due to archiver OnceCell conflicts. This is acceptable for this test.
958            }
959        }
960    }
961
962    #[tokio::test]
963    async fn test_bootstrap_handshake_manual_trigger() {
964        // test that bootstrap_handshake can be called manually
965        use crate::utils::bls12_381 as bls;
966        use std::net::Ipv4Addr;
967
968        // create test config
969        let sk = bls::generate_sk();
970        let pk = bls::get_public_key(&sk).expect("pk");
971        let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
972
973        let work_folder = format!("/tmp/test_bootstrap_{}", std::process::id());
974        let config = config::Config {
975            work_folder,
976            version: Ver::new(1, 2, 3),
977            offline: false,
978            http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
979            http_port: 3000,
980            udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
981            udp_port: 36969,
982            public_ipv4: Some("127.0.0.1".to_string()),
983            seed_ips: vec!["127.0.0.1".parse().unwrap()], // test seed node
984            seed_anrs: Vec::new(),
985            other_nodes: Vec::new(),
986            trust_factor: 0.8,
987            max_peers: 100,
988            trainer_sk: sk,
989            trainer_pk: pk,
990            trainer_pk_b58: String::new(),
991            trainer_pop: pop.to_vec(),
992            archival_node: false,
993            autoupdate: false,
994            computor_type: None,
995            snapshot_height: 0,
996            anr: None,
997            anr_desc: None,
998            anr_name: None,
999        };
1000
1001        // create context with test config
1002        let socket = Arc::new(MockSocket::new());
1003        let ctx = Context::with_config_and_socket(config, socket).await.expect("context creation");
1004
1005        // test bootstrap_handshake manual trigger - should handle errors gracefully
1006        match ctx.bootstrap_task().await {
1007            Ok(()) => {
1008                // success case - message was sent
1009            }
1010            Err(_e) => {
1011                // failure case is ok for testing, might be due to network issues
1012                // the important thing is that it doesn't panic
1013            }
1014        }
1015    }
1016
1017    #[tokio::test]
1018    async fn test_context_task_tracking() {
1019        // test that Context task tracking wrapper functions work
1020        use crate::utils::bls12_381 as bls;
1021        use std::net::Ipv4Addr;
1022
1023        let sk = bls::generate_sk();
1024        let pk = bls::get_public_key(&sk).expect("pk");
1025        let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
1026
1027        let unique_id = format!("{}_{}", std::process::id(), utils::misc::get_unix_nanos_now());
1028        let config = config::Config {
1029            work_folder: format!("/tmp/test_tasks_{}", unique_id),
1030            version: Ver::new(1, 2, 3),
1031            offline: false,
1032            http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1033            http_port: 3000,
1034            udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1035            udp_port: 36969,
1036            public_ipv4: Some("127.0.0.1".to_string()),
1037            seed_ips: Vec::new(),
1038            seed_anrs: Vec::new(),
1039            other_nodes: Vec::new(),
1040            trust_factor: 0.8,
1041            max_peers: 100,
1042            trainer_sk: sk,
1043            trainer_pk: pk,
1044            trainer_pk_b58: String::new(),
1045            trainer_pop: pop.to_vec(),
1046            archival_node: false,
1047            autoupdate: false,
1048            computor_type: None,
1049            snapshot_height: 0,
1050            anr: None,
1051            anr_desc: None,
1052            anr_name: None,
1053        };
1054
1055        let socket = Arc::new(MockSocket::new());
1056        let metrics = metrics::Metrics::new();
1057        let node_peers = peers::NodePeers::default();
1058        let node_anrs = crate::node::anr::NodeAnrs::new();
1059        let reassembler = node::ReedSolomonReassembler::new();
1060
1061        let fabric = crate::consensus::fabric::Fabric::new(&config.get_root()).await.unwrap();
1062        let ctx = Context { config, metrics, reassembler, peers: node_peers, anrs: node_anrs, fabric, socket };
1063
1064        // Test task tracking via Context wrapper methods
1065        let snapshot = ctx.get_metrics_snapshot();
1066        assert_eq!(snapshot.tasks, 0);
1067
1068        ctx.inc_tasks();
1069        ctx.inc_tasks();
1070        let snapshot = ctx.get_metrics_snapshot();
1071        assert_eq!(snapshot.tasks, 2);
1072
1073        ctx.dec_tasks();
1074        let snapshot = ctx.get_metrics_snapshot();
1075        assert_eq!(snapshot.tasks, 1);
1076    }
1077
1078    #[tokio::test]
1079    async fn test_context_convenience_socket_functions() {
1080        // test that Context convenience functions work without panicking
1081        use std::sync::Arc;
1082
1083        use crate::utils::bls12_381 as bls;
1084        use std::net::Ipv4Addr;
1085
1086        let sk = bls::generate_sk();
1087        let pk = bls::get_public_key(&sk).expect("pk");
1088        let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
1089
1090        let unique_id = format!("{}_{}", std::process::id(), utils::misc::get_unix_nanos_now());
1091        let config = config::Config {
1092            work_folder: format!("/tmp/test_convenience_{}", unique_id),
1093            version: Ver::new(1, 2, 3),
1094            offline: false,
1095            http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1096            http_port: 3000,
1097            udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1098            udp_port: 36969,
1099            public_ipv4: Some("127.0.0.1".to_string()),
1100            seed_ips: Vec::new(),
1101            seed_anrs: Vec::new(),
1102            other_nodes: Vec::new(),
1103            trust_factor: 0.8,
1104            max_peers: 100,
1105            trainer_sk: sk,
1106            trainer_pk: pk,
1107            trainer_pk_b58: String::new(),
1108            trainer_pop: pop.to_vec(),
1109            archival_node: false,
1110            autoupdate: false,
1111            computor_type: None,
1112            snapshot_height: 0,
1113            anr: None,
1114            anr_desc: None,
1115            anr_name: None,
1116        };
1117        let socket = Arc::new(MockSocket::new());
1118
1119        match Context::with_config_and_socket(config, socket).await {
1120            Ok(context) => {
1121                let mut buf = [0u8; 1024];
1122                let target: Ipv4Addr = "127.0.0.1".parse().unwrap();
1123
1124                let pong = Protocol::PingReply(PingReply { ts_m: 1234567890, seen_time: 1234567890123 });
1125                // Test send_to convenience function - should return error with MockSocket but not panic
1126                match context.send_message_to(&pong, target).await {
1127                    Ok(_) => {
1128                        // unexpected success with MockSocket
1129                    }
1130                    Err(_) => {
1131                        // expected error with MockSocket
1132                    }
1133                }
1134
1135                // Test recv_from convenience function - should return error with MockSocket but not panic
1136                match context.recv_from(&mut buf).await {
1137                    Ok(_) => {
1138                        // unexpected success with MockSocket
1139                    }
1140                    Err(_) => {
1141                        // expected error with MockSocket
1142                    }
1143                }
1144            }
1145            Err(_) => {
1146                // context creation failed - this is acceptable for this test
1147            }
1148        }
1149    }
1150}