Skip to main content

chaincraft_rust/
node.rs

1//! Chaincraft node implementation
2
3use crate::{
4    discovery::{DiscoveryConfig, DiscoveryManager},
5    error::{ChaincraftError, NetworkError, Result},
6    network::{PeerId, PeerInfo},
7    shared::{MessageType, SharedMessage, SharedObjectId, SharedObjectRegistry},
8    shared_object::{ApplicationObject, ApplicationObjectRegistry, SimpleSharedNumber},
9    storage::{MemoryStorage, Storage},
10};
11use serde_json::json;
12
13use serde::de::Error as SerdeDeError;
14use std::{
15    collections::{HashMap, HashSet},
16    net::SocketAddr,
17    sync::{Arc, Mutex, OnceLock},
18    time::Duration,
19};
20use tokio::{net::UdpSocket, sync::RwLock};
21
22/// Storage key for persisted peers (equivalent to Python PEERS in DB)
23const PEERS_KEY: &str = "__PEERS__";
24/// Storage key for banned peers (equivalent to Python BANNED_PEERS in DB)
25const BANNED_PEERS_KEY: &str = "__BANNED_PEERS__";
26
27#[derive(serde::Serialize, serde::Deserialize)]
28struct PersistedPeer {
29    id: String,
30    address: String,
31}
32
33#[derive(serde::Serialize, serde::Deserialize)]
34struct BannedEntry {
35    addr: String,
36    expires_at: String,
37}
38
39/// Main node structure for Chaincraft network
40pub struct ChaincraftNode {
41    /// Unique identifier for this node
42    pub id: PeerId,
43    /// Registry of shared objects
44    pub registry: Arc<RwLock<SharedObjectRegistry>>,
45    /// Registry of application objects
46    pub app_objects: Arc<RwLock<ApplicationObjectRegistry>>,
47    /// Discovery manager
48    pub discovery: Option<DiscoveryManager>,
49    /// Storage backend
50    pub storage: Arc<dyn Storage>,
51    /// Connected peers
52    pub peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
53    /// Banned peer addresses (loaded from and saved to storage)
54    pub banned_peers: Arc<RwLock<HashSet<SocketAddr>>>,
55    /// Known message hashes for gossip
56    pub known_hashes: Arc<RwLock<HashSet<String>>>,
57    /// UDP socket for networking (if initialized)
58    pub socket: Option<Arc<UdpSocket>>,
59    /// Node configuration
60    pub config: NodeConfig,
61    /// Running flag
62    pub running: Arc<RwLock<bool>>,
63}
64
65impl ChaincraftNode {
66    /// Create a new Chaincraft node
67    pub fn new(id: PeerId, storage: Arc<dyn Storage>) -> Self {
68        Self::builder()
69            .with_id(id)
70            .with_storage(storage)
71            .build()
72            .expect("Failed to create node")
73    }
74
75    /// Create a new Chaincraft node with default settings
76    pub fn default() -> Self {
77        Self::new(PeerId::new(), Arc::new(MemoryStorage::new()))
78    }
79
80    /// Create a new Chaincraft node with default settings (alias for compatibility with examples)
81    pub fn new_default() -> Self {
82        Self::default()
83    }
84
85    /// Create a new node builder
86    pub fn builder() -> ChaincraftNodeBuilder {
87        ChaincraftNodeBuilder::new()
88    }
89
90    /// Start the node
91    pub async fn start(&mut self) -> Result<()> {
92        // Initialize storage
93        self.storage.initialize().await?;
94
95        // Load persisted peers and banned peers from storage
96        self.load_persisted_peers().await?;
97        self.load_banned_peers().await?;
98
99        // Set running status
100        {
101            let mut running = self.running.write().await;
102            *running = true;
103        }
104
105        // Start networking (UDP-based, minimal implementation)
106        self.start_networking().await?;
107
108        // TODO: Start consensus
109        // TODO: Start API server
110
111        Ok(())
112    }
113
114    /// Stop the node
115    pub async fn stop(&mut self) -> Result<()> {
116        {
117            let mut running = self.running.write().await;
118            *running = false;
119        }
120        // Unregister from local discovery
121        if self.config.local_discovery {
122            unregister_local_node(&self.id);
123        }
124        // Socket tasks observe the running flag and exit gracefully
125        Ok(())
126    }
127
128    /// Close the node (alias for stop)
129    pub async fn close(&mut self) -> Result<()> {
130        self.stop().await
131    }
132
133    /// Check if the node is running (async version)
134    pub async fn is_running_async(&self) -> bool {
135        *self.running.read().await
136    }
137
138    /// Add a peer to the node's peer list. Rejects if peer address is banned.
139    pub async fn add_peer(&self, peer: PeerInfo) -> Result<()> {
140        let banned = self.banned_peers.read().await;
141        if banned.contains(&peer.address) {
142            return Err(ChaincraftError::Network(NetworkError::PeerBanned {
143                addr: peer.address,
144                expires_at: chrono::Utc::now() + chrono::Duration::days(365 * 10),
145            }));
146        }
147        drop(banned);
148
149        let mut peers = self.peers.write().await;
150        peers.insert(peer.id.clone(), peer.clone());
151        drop(peers);
152
153        if self.config.persist_peers {
154            self.save_persisted_peers().await?;
155        }
156        Ok(())
157    }
158
159    /// Remove a peer from the node's peer list
160    pub async fn remove_peer(&self, peer_id: &PeerId) -> Result<()> {
161        let mut peers = self.peers.write().await;
162        peers.remove(peer_id);
163        drop(peers);
164
165        if self.config.persist_peers {
166            self.save_persisted_peers().await?;
167        }
168        Ok(())
169    }
170
171    /// Ban a peer address for a duration. Persisted to storage.
172    pub async fn ban_peer(&self, addr: SocketAddr, duration: Option<std::time::Duration>) -> Result<()> {
173        {
174            let mut banned = self.banned_peers.write().await;
175            banned.insert(addr);
176        }
177        self.save_banned_peers().await?;
178        Ok(())
179    }
180
181    /// Unban a peer address. Persisted to storage.
182    pub async fn unban_peer(&self, addr: SocketAddr) -> Result<()> {
183        {
184            let mut banned = self.banned_peers.write().await;
185            banned.remove(&addr);
186        }
187        self.save_banned_peers().await?;
188        Ok(())
189    }
190
191    /// Check if an address is banned
192    pub async fn is_banned(&self, addr: SocketAddr) -> bool {
193        self.banned_peers.read().await.contains(&addr)
194    }
195
196    /// Load persisted peers from storage (equivalent to Python PEERS in DB)
197    async fn load_persisted_peers(&self) -> Result<()> {
198        if !self.config.persist_peers {
199            return Ok(());
200        }
201        let bytes = match self.storage.get(PEERS_KEY).await? {
202            Some(b) => b,
203            None => return Ok(()),
204        };
205        let persisted: Vec<PersistedPeer> = match serde_json::from_slice(&bytes) {
206            Ok(p) => p,
207            Err(_) => return Ok(()),
208        };
209        let mut peers = self.peers.write().await;
210        for p in persisted {
211            let addr: SocketAddr = match p.address.parse() {
212                Ok(a) => a,
213                Err(_) => continue,
214            };
215            let id = match uuid::Uuid::parse_str(&p.id) {
216                Ok(u) => PeerId::from_uuid(u),
217                Err(_) => PeerId::new(),
218            };
219            let info = PeerInfo::new(id, addr);
220            peers.insert(info.id.clone(), info);
221        }
222        Ok(())
223    }
224
225    /// Save peers to storage
226    async fn save_persisted_peers(&self) -> Result<()> {
227        let peers = self.peers.read().await;
228        let persisted: Vec<PersistedPeer> = peers
229            .values()
230            .map(|p| PersistedPeer {
231                id: p.id.to_string(),
232                address: p.address.to_string(),
233            })
234            .collect();
235        let json = serde_json::to_vec(&persisted).map_err(|e| {
236            ChaincraftError::Serialization(crate::error::SerializationError::Json(e))
237        })?;
238        self.storage.put(PEERS_KEY, json).await?;
239        Ok(())
240    }
241
242    /// Load banned peers from storage (equivalent to Python BANNED_PEERS in DB)
243    async fn load_banned_peers(&self) -> Result<()> {
244        let bytes = match self.storage.get(BANNED_PEERS_KEY).await? {
245            Some(b) => b,
246            None => return Ok(()),
247        };
248        let entries: Vec<BannedEntry> = match serde_json::from_slice(&bytes) {
249            Ok(e) => e,
250            Err(_) => return Ok(()),
251        };
252        let now = chrono::Utc::now();
253        let mut banned = self.banned_peers.write().await;
254        for e in entries {
255            if let Ok(addr) = e.addr.parse::<SocketAddr>() {
256                let expires: chrono::DateTime<chrono::Utc> =
257                    chrono::DateTime::parse_from_rfc3339(&e.expires_at)
258                        .map(|dt| dt.with_timezone(&chrono::Utc))
259                        .unwrap_or(now);
260                if expires > now {
261                    banned.insert(addr);
262                }
263            }
264        }
265        Ok(())
266    }
267
268    /// Save banned peers to storage
269    async fn save_banned_peers(&self) -> Result<()> {
270        let banned = self.banned_peers.read().await;
271        let entries: Vec<BannedEntry> = banned
272            .iter()
273            .map(|addr| BannedEntry {
274                addr: addr.to_string(),
275                expires_at: (chrono::Utc::now() + chrono::Duration::days(365 * 10)).to_rfc3339(),
276            })
277            .collect();
278        let json = serde_json::to_vec(&entries).map_err(|e| {
279            ChaincraftError::Serialization(crate::error::SerializationError::Json(e))
280        })?;
281        self.storage.put(BANNED_PEERS_KEY, json).await?;
282        Ok(())
283    }
284
285    /// Connect to a peer
286    pub async fn connect_to_peer(&mut self, peer_addr: &str) -> Result<()> {
287        self.connect_to_peer_with_discovery(peer_addr, false).await
288    }
289
290    /// Connect to a peer with optional discovery
291    pub async fn connect_to_peer_with_discovery(
292        &mut self,
293        peer_addr: &str,
294        _discovery: bool,
295    ) -> Result<()> {
296        // Parse address and create PeerInfo
297        let socket_addr: SocketAddr = peer_addr.parse().map_err(|_| {
298            ChaincraftError::Network(NetworkError::InvalidMessage {
299                reason: "Invalid peer address format".to_string(),
300            })
301        })?;
302
303        if self.is_banned(socket_addr).await {
304            return Err(ChaincraftError::Network(NetworkError::PeerBanned {
305                addr: socket_addr,
306                expires_at: chrono::Utc::now(),
307            }));
308        }
309
310        let peer_id = PeerId::new(); // Generate a new peer ID for this address
311        let peer_info = PeerInfo::new(peer_id.clone(), socket_addr);
312
313        self.add_peer(peer_info.clone()).await?;
314
315        // If discovery is available, notify it about the new peer
316        if let Some(discovery) = &self.discovery {
317            discovery.add_peer(peer_info).await?;
318            discovery.mark_connected(&peer_id).await?;
319        }
320
321        Ok(())
322    }
323
324    /// Get all connected peers
325    pub async fn get_peers(&self) -> Vec<PeerInfo> {
326        let peers = self.peers.read().await;
327        peers.values().cloned().collect()
328    }
329
330    /// Get connected peers synchronously (for compatibility)
331    pub fn peers(&self) -> Vec<PeerInfo> {
332        // For now we expose an empty list synchronously; async get_peers should be used.
333        Vec::new()
334    }
335
336    /// Get the node's ID
337    pub fn id(&self) -> &PeerId {
338        &self.id
339    }
340
341    /// Get the node's port
342    pub fn port(&self) -> u16 {
343        self.config.port
344    }
345
346    /// Get the node's host
347    pub fn host(&self) -> &str {
348        "127.0.0.1" // Default host
349    }
350
351    /// Get maximum peers
352    pub fn max_peers(&self) -> usize {
353        self.config.max_peers
354    }
355
356    /// Create a shared message
357    pub async fn create_shared_message(&mut self, data: String) -> Result<String> {
358        let message_data = serde_json::to_value(&data).map_err(|e| {
359            ChaincraftError::Serialization(crate::error::SerializationError::Json(e))
360        })?;
361        let message =
362            SharedMessage::new(MessageType::Custom("user_message".to_string()), message_data);
363        let hash = message.hash.clone();
364        let json = message.to_json()?;
365        self.storage.put(&hash, json.as_bytes().to_vec()).await?;
366
367        // Track this hash for gossip
368        {
369            let mut set = self.known_hashes.write().await;
370            set.insert(hash.clone());
371        }
372
373        // Broadcast to peers if networking is enabled
374        if let Some(socket) = &self.socket {
375            let peers = self.peers.clone();
376            let banned_peers = self.banned_peers.clone();
377            let socket = socket.clone();
378            let json_bytes = json.into_bytes();
379            tokio::spawn(async move {
380                if let Err(e) = broadcast_bytes(&socket, &peers, &banned_peers, &json_bytes).await {
381                    tracing::warn!("Failed to broadcast message: {:?}", e);
382                }
383            });
384        }
385
386        Ok(hash)
387    }
388
389    /// Check if the node has a specific object
390    pub fn has_object(&self, _hash: &str) -> bool {
391        // Simplified implementation for testing
392        true
393    }
394
395    /// Get an object by hash
396    pub async fn get_object(&self, hash: &str) -> Result<String> {
397        if let Some(bytes) = self.storage.get(hash).await? {
398            let s = String::from_utf8(bytes).map_err(|e| {
399                ChaincraftError::Serialization(crate::error::SerializationError::Json(
400                    SerdeDeError::custom(e),
401                ))
402            })?;
403            Ok(s)
404        } else {
405            Err(ChaincraftError::Storage(crate::error::StorageError::KeyNotFound {
406                key: hash.to_string(),
407            }))
408        }
409    }
410
411    /// Get the database size
412    pub fn db_size(&self) -> usize {
413        // Query underlying storage length synchronously for tests
414        futures::executor::block_on(async { self.storage.len().await.unwrap_or(0) })
415    }
416
417    /// Add a shared object (application object)
418    pub async fn add_shared_object(
419        &self,
420        object: Box<dyn ApplicationObject>,
421    ) -> Result<SharedObjectId> {
422        let mut registry = self.app_objects.write().await;
423        let id = registry.register(object);
424        Ok(id)
425    }
426
427    /// Get shared objects (for compatibility with Python tests)
428    pub async fn shared_objects(&self) -> Vec<Box<dyn ApplicationObject>> {
429        let registry = self.app_objects.read().await;
430        registry
431            .ids()
432            .into_iter()
433            .filter_map(|id| registry.get(&id))
434            .map(|obj| obj.clone_box())
435            .collect()
436    }
437
438    /// Get shared object count
439    pub async fn shared_object_count(&self) -> usize {
440        let registry = self.app_objects.read().await;
441        registry.len()
442    }
443
444    /// Create shared message with application object processing
445    pub async fn create_shared_message_with_data(
446        &mut self,
447        data: serde_json::Value,
448    ) -> Result<String> {
449        // Extract message type from data if present, otherwise use default
450        let message_type = if let Some(msg_type) = data.get("type").and_then(|t| t.as_str()) {
451            match msg_type {
452                "PEER_DISCOVERY" => MessageType::PeerDiscovery,
453                "REQUEST_LOCAL_PEERS" => MessageType::RequestLocalPeers,
454                "LOCAL_PEERS" => MessageType::LocalPeers,
455                "REQUEST_SHARED_OBJECT_UPDATE" => MessageType::RequestSharedObjectUpdate,
456                "SHARED_OBJECT_UPDATE" => MessageType::SharedObjectUpdate,
457                "GET" => MessageType::Get,
458                "SET" => MessageType::Set,
459                "DELETE" => MessageType::Delete,
460                "RESPONSE" => MessageType::Response,
461                "NOTIFICATION" => MessageType::Notification,
462                "HEARTBEAT" => MessageType::Heartbeat,
463                "ERROR" => MessageType::Error,
464                _ => MessageType::Custom(msg_type.to_string()),
465            }
466        } else {
467            MessageType::Custom("user_message".to_string())
468        };
469
470        let message = SharedMessage::new(message_type, data.clone());
471        let hash = message.hash.clone();
472        let json = message.to_json()?;
473        // Store before processing
474        self.storage.put(&hash, json.as_bytes().to_vec()).await?;
475        // Process message through application objects
476        let mut app_registry = self.app_objects.write().await;
477        let _processed = app_registry.process_message(message).await?;
478
479        // Broadcast to peers over UDP if networking is enabled
480        if let Some(socket) = &self.socket {
481            let peers = self.peers.clone();
482            let banned_peers = self.banned_peers.clone();
483            let socket = socket.clone();
484            let json_bytes = json.into_bytes();
485            tokio::spawn(async move {
486                if let Err(e) = broadcast_bytes(&socket, &peers, &banned_peers, &json_bytes).await {
487                    tracing::warn!("Failed to broadcast message: {:?}", e);
488                }
489            });
490        }
491
492        Ok(hash)
493    }
494
495    /// Get node state for testing/debugging
496    pub async fn get_state(&self) -> Result<serde_json::Value> {
497        Ok(serde_json::json!({
498            "node_id": self.id.to_string(),
499            "running": *self.running.read().await,
500            "port": self.config.port,
501            "max_peers": self.config.max_peers,
502            "peer_count": self.peers.read().await.len(),
503            "messages": "stored", // Simplified for testing
504            "shared_objects": self.shared_object_count().await
505        }))
506    }
507
508    /// Get discovery info for testing
509    pub async fn get_discovery_info(&self) -> serde_json::Value {
510        serde_json::json!({
511            "node_id": self.id.to_string(),
512            "host": self.host(),
513            "port": self.port(),
514            "max_peers": self.max_peers(),
515            "peer_count": self.peers.read().await.len()
516        })
517    }
518
519    /// Set port for testing
520    pub fn set_port(&mut self, port: u16) {
521        self.config.port = port;
522    }
523
524    /// Disable local discovery (for single-node tests)
525    pub fn disable_local_discovery(&mut self) {
526        self.config.local_discovery = false;
527    }
528
529    /// Check if node is running (sync version for compatibility)
530    pub fn is_running(&self) -> bool {
531        // For tests, we'll use a blocking approach
532        futures::executor::block_on(async { *self.running.read().await })
533    }
534}
535
536/// Helper: start UDP networking for this node.
537impl ChaincraftNode {
538    async fn start_networking(&mut self) -> Result<()> {
539        // Bind UDP socket to configured host/port. If port is 0, the OS will
540        // choose an ephemeral port for us.
541        let bind_addr: SocketAddr = format!("{}:{}", self.host(), self.port())
542            .parse()
543            .map_err(|_| {
544                ChaincraftError::Config(format!(
545                    "Invalid bind address {}:{}",
546                    self.host(),
547                    self.port()
548                ))
549            })?;
550
551        let socket = UdpSocket::bind(bind_addr)
552            .await
553            .map_err(|e| ChaincraftError::Network(NetworkError::BindFailed { addr: bind_addr, source: e }))?;
554
555        // If we bound to port 0, update the config with the actual port chosen
556        let register_addr = if self.config.port == 0 {
557            if let Ok(local_addr) = socket.local_addr() {
558                self.config.port = local_addr.port();
559                local_addr
560            } else {
561                bind_addr
562            }
563        } else {
564            bind_addr
565        };
566
567        let socket = Arc::new(socket);
568        self.socket = Some(socket.clone());
569
570        let running = self.running.clone();
571        let storage = self.storage.clone();
572        let app_objects = self.app_objects.clone();
573        let peers = self.peers.clone();
574        let known_hashes = self.known_hashes.clone();
575
576        // Register this node for local discovery (in-process registry)
577        if self.config.local_discovery {
578            register_local_node(self.id.clone(), register_addr);
579        }
580
581        // Receive loop
582        let banned_peers = self.banned_peers.clone();
583        let known_hashes = self.known_hashes.clone();
584        {
585            let socket = socket.clone();
586            let running = running.clone();
587            let storage = storage.clone();
588            let app_objects = app_objects.clone();
589            let peers = peers.clone();
590            let banned_peers = banned_peers.clone();
591            let known_hashes = known_hashes.clone();
592            tokio::spawn(async move {
593                let mut buf = vec![0u8; 64 * 1024];
594                loop {
595                    if !*running.read().await {
596                        break;
597                    }
598
599                    let (len, addr) = match socket.recv_from(&mut buf).await {
600                        Ok(res) => res,
601                        Err(e) => {
602                            if !*running.read().await {
603                                break;
604                            }
605                            tracing::warn!("UDP recv_from error: {:?}", e);
606                            continue;
607                        }
608                    };
609
610                    let data = &buf[..len];
611                    if let Err(e) = handle_incoming_datagram(
612                        data,
613                        addr,
614                        &socket,
615                        &storage,
616                        &app_objects,
617                        &peers,
618                        &banned_peers,
619                        Some(&known_hashes),
620                    )
621                    .await
622                    {
623                        tracing::warn!("Error handling incoming datagram from {}: {:?}", addr, e);
624                    }
625                }
626            });
627        }
628
629        // Gossip + local discovery loop
630        let node_id = self.id.clone();
631        let banned_peers = banned_peers.clone();
632        let local_discovery = self.config.local_discovery;
633        tokio::spawn(async move {
634            // Simple fixed interval; could be made configurable.
635            let interval = Duration::from_millis(500);
636            loop {
637                if !*running.read().await {
638                    break;
639                }
640
641                // Local discovery: pull all locally-registered nodes and ensure
642                // they appear in our peers map. Only when local_discovery is enabled.
643                if local_discovery {
644                    if let Some(local_nodes) = snapshot_local_nodes() {
645                        let banned_set: HashSet<SocketAddr> = {
646                            let b = banned_peers.read().await;
647                            b.iter().copied().collect()
648                        };
649                        let mut peers_guard = peers.write().await;
650                        for (peer_id, addr) in local_nodes {
651                            if peer_id == node_id {
652                                continue;
653                            }
654                            if banned_set.contains(&addr) {
655                                continue;
656                            }
657                            if peers_guard.values().any(|p| p.address == addr) {
658                                continue;
659                            }
660                            let info = PeerInfo::new(peer_id.clone(), addr);
661                            peers_guard.insert(peer_id, info);
662                        }
663                    }
664                }
665
666                // Snapshot known hashes
667                let hashes: Vec<String> = {
668                    let set = known_hashes.read().await;
669                    set.iter().cloned().collect()
670                };
671
672                for hash in hashes {
673                    // Fetch stored JSON and rebroadcast it
674                    if let Ok(Some(bytes)) = storage.get(&hash).await {
675                        if let Err(e) = broadcast_bytes(&socket, &peers, &banned_peers, &bytes).await {
676                            tracing::warn!("gossip broadcast failed for {}: {:?}", hash, e);
677                        }
678                    }
679                }
680
681                // Digest-based sync: periodically request latest digest from a peer
682                let peer_addrs: Vec<SocketAddr> = {
683                    let p = peers.read().await;
684                    p.values().map(|x| x.address).collect()
685                };
686                if !peer_addrs.is_empty() {
687                    if let Some(&peer_addr) = peer_addrs.first() {
688                        let req = json!({ "type": "REQUEST_DIGEST" });
689                        if let Ok(bytes) = serde_json::to_vec(&req) {
690                            let _ = socket.send_to(&bytes, peer_addr).await;
691                        }
692                    }
693                }
694
695                tokio::time::sleep(interval).await;
696            }
697        });
698
699        Ok(())
700    }
701}
702
703/// Broadcast raw bytes to all known peers.
704async fn broadcast_bytes(
705    socket: &Arc<UdpSocket>,
706    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
707    banned_peers: &Arc<RwLock<HashSet<SocketAddr>>>,
708    data: &[u8],
709) -> Result<()> {
710    let (peers_snapshot, banned_set): (Vec<SocketAddr>, HashSet<SocketAddr>) = {
711        let p = peers.read().await;
712        let b = banned_peers.read().await;
713        (
714            p.values().map(|x| x.address).collect(),
715            b.iter().copied().collect(),
716        )
717    };
718
719    for addr in peers_snapshot {
720        if banned_set.contains(&addr) {
721            continue;
722        }
723        if let Err(e) = socket.send_to(data, addr).await {
724            tracing::warn!("Failed to send UDP packet to {}: {:?}", addr, e);
725        }
726    }
727
728    Ok(())
729}
730
731// -----------------------------------------------------------------------------
732// Local discovery (in-process registry)
733// -----------------------------------------------------------------------------
734
735static LOCAL_NODES: OnceLock<Mutex<HashMap<PeerId, SocketAddr>>> = OnceLock::new();
736
737fn local_registry() -> &'static Mutex<HashMap<PeerId, SocketAddr>> {
738    LOCAL_NODES.get_or_init(|| Mutex::new(HashMap::new()))
739}
740
741fn register_local_node(id: PeerId, addr: SocketAddr) {
742    let registry = local_registry();
743    let mut guard = registry.lock().unwrap();
744    guard.insert(id, addr);
745}
746
747fn unregister_local_node(id: &PeerId) {
748    if let Some(registry) = LOCAL_NODES.get() {
749        let mut guard = registry.lock().unwrap();
750        guard.remove(id);
751    }
752}
753
754/// Clear all entries from the local registry. Useful for test isolation.
755pub fn clear_local_registry() {
756    if let Some(registry) = LOCAL_NODES.get() {
757        let mut guard = registry.lock().unwrap();
758        guard.clear();
759    }
760}
761
762fn snapshot_local_nodes() -> Option<Vec<(PeerId, SocketAddr)>> {
763    let registry = LOCAL_NODES.get()?;
764    let guard = registry.lock().unwrap();
765    Some(guard.iter().map(|(id, addr)| (id.clone(), *addr)).collect())
766}
767
768/// Handle digest-sync control messages (REQUEST_DIGEST, REQUEST_MESSAGES_SINCE, etc.)
769async fn handle_digest_sync_control(
770    data: &[u8],
771    addr: SocketAddr,
772    socket: &Arc<UdpSocket>,
773    storage: &Arc<dyn Storage>,
774    app_objects: &Arc<RwLock<ApplicationObjectRegistry>>,
775    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
776    banned_peers: &Arc<RwLock<HashSet<SocketAddr>>>,
777    known_hashes: &Arc<RwLock<HashSet<String>>>,
778) -> Result<bool> {
779    let value: serde_json::Value = serde_json::from_slice(data).map_err(|_| {
780        ChaincraftError::Serialization(crate::error::SerializationError::Json(
781            serde_json::Error::custom("not json"),
782        ))
783    })?;
784    let msg_type = value.get("type").and_then(|t| t.as_str());
785    match msg_type {
786        Some("REQUEST_DIGEST") => {
787            let digest = {
788                let registry = app_objects.read().await;
789                let ids = registry.ids();
790                let mut digest = "".to_string();
791                for id in ids {
792                    if let Some(obj) = registry.get(&id) {
793                        if obj.is_merkleized() {
794                            digest = obj.get_latest_digest().await.unwrap_or_default();
795                            break;
796                        }
797                    }
798                }
799                digest
800            };
801            let resp = json!({ "type": "DIGEST_RESPONSE", "digest": digest });
802            let bytes = serde_json::to_vec(&resp).unwrap_or_default();
803            let _ = socket.send_to(&bytes, addr).await;
804            return Ok(true);
805        }
806        Some("REQUEST_MESSAGES_SINCE") => {
807            let since = value.get("digest").and_then(|d| d.as_str()).unwrap_or("");
808            let messages = {
809                let registry = app_objects.read().await;
810                let ids = registry.ids();
811                let mut msgs = Vec::new();
812                for id in ids {
813                    if let Some(obj) = registry.get(&id) {
814                        if obj.is_merkleized() {
815                            msgs = obj.get_messages_since_digest(since).await.unwrap_or_default();
816                            break;
817                        }
818                    }
819                }
820                msgs
821            };
822            let msg_ser: Vec<serde_json::Value> = messages
823                .iter()
824                .filter_map(|m| serde_json::to_value(m).ok())
825                .collect();
826            let resp = json!({ "type": "MESSAGES_RESPONSE", "messages": msg_ser });
827            let bytes = serde_json::to_vec(&resp).unwrap_or_default();
828            let _ = socket.send_to(&bytes, addr).await;
829            return Ok(true);
830        }
831        Some("DIGEST_RESPONSE") => {
832            let remote_digest = value.get("digest").and_then(|d| d.as_str()).unwrap_or("");
833            let our_digest = {
834                let registry = app_objects.read().await;
835                let ids = registry.ids();
836                let mut d = String::new();
837                for id in ids {
838                    if let Some(obj) = registry.get(&id) {
839                        if obj.is_merkleized() {
840                            d = obj.get_latest_digest().await.unwrap_or_default();
841                            break;
842                        }
843                    }
844                }
845                d
846            };
847            if remote_digest != our_digest {
848                let req = json!({ "type": "REQUEST_MESSAGES_SINCE", "digest": our_digest });
849                let bytes = serde_json::to_vec(&req).unwrap_or_default();
850                let _ = socket.send_to(&bytes, addr).await;
851            }
852            return Ok(true);
853        }
854        Some("MESSAGES_RESPONSE") => {
855            let messages: Vec<SharedMessage> = value
856                .get("messages")
857                .and_then(|m| m.as_array())
858                .map(|arr| {
859                    arr.iter()
860                        .filter_map(|v| serde_json::from_value(v.clone()).ok())
861                        .collect::<Vec<_>>()
862                })
863                .unwrap_or_default();
864            for msg in messages {
865                if storage.exists(&msg.hash).await.unwrap_or(true) {
866                    continue;
867                }
868                let json = msg.to_json().unwrap_or_default();
869                let _ = storage.put(&msg.hash, json.as_bytes().to_vec()).await;
870                {
871                    let mut set = known_hashes.write().await;
872                    set.insert(msg.hash.clone());
873                }
874                {
875                    let mut registry = app_objects.write().await;
876                    let _ = registry.process_message(msg.clone()).await;
877                }
878                let bytes = msg.to_json().unwrap_or_default().into_bytes();
879                let _ = broadcast_bytes(socket, peers, banned_peers, &bytes).await;
880            }
881            return Ok(true);
882        }
883        _ => {}
884    }
885    Ok(false)
886}
887
888/// Handle an incoming UDP datagram.
889async fn handle_incoming_datagram(
890    data: &[u8],
891    addr: SocketAddr,
892    socket: &Arc<UdpSocket>,
893    storage: &Arc<dyn Storage>,
894    app_objects: &Arc<RwLock<ApplicationObjectRegistry>>,
895    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
896    banned_peers: &Arc<RwLock<HashSet<SocketAddr>>>,
897    known_hashes: Option<&Arc<RwLock<HashSet<String>>>>,
898) -> Result<()> {
899    // Reject traffic from banned peers
900    {
901        let banned = banned_peers.read().await;
902        if banned.contains(&addr) {
903            return Ok(()); // Silently ignore
904        }
905    }
906
907    // Try digest-sync control messages first
908    if let Some(kh) = known_hashes {
909        if let Ok(true) =
910            handle_digest_sync_control(data, addr, socket, storage, app_objects, peers, banned_peers, kh).await
911        {
912            // Ensure peer is recorded
913            {
914                let mut guard = peers.write().await;
915                if !guard.values().any(|p| p.address == addr) {
916                    let peer_id = PeerId::new();
917                    let info = PeerInfo::new(peer_id.clone(), addr);
918                    guard.insert(peer_id, info);
919                }
920            }
921            return Ok(());
922        }
923    }
924
925    // Try to parse as SharedMessage JSON
926    let msg: SharedMessage = match serde_json::from_slice(data) {
927        Ok(m) => m,
928        Err(_) => return Ok(()),
929    };
930
931    // Deduplicate using storage: if we already have this hash, ignore
932    if storage.exists(&msg.hash).await? {
933        return Ok(());
934    }
935
936    // Store message
937    let json = msg.to_json()?;
938    storage.put(&msg.hash, json.as_bytes().to_vec()).await?;
939
940    // Ensure peer is recorded (only if not banned)
941    {
942        let mut guard = peers.write().await;
943        if !guard.values().any(|p| p.address == addr) {
944            let peer_id = PeerId::new();
945            let info = PeerInfo::new(peer_id.clone(), addr);
946            guard.insert(peer_id, info);
947        }
948    }
949
950    // Process through application objects
951    {
952        let mut registry = app_objects.write().await;
953        let _ = registry.process_message(msg.clone()).await?;
954    }
955
956    // Broadcast to other peers so the message propagates
957    let bytes = json.into_bytes();
958    broadcast_bytes(socket, peers, banned_peers, &bytes).await?;
959
960    Ok(())
961}
962
963/// Node configuration
964#[derive(Debug, Clone)]
965pub struct NodeConfig {
966    /// Maximum number of peers to connect to
967    pub max_peers: usize,
968
969    /// Network port to listen on
970    pub port: u16,
971
972    /// Host to bind on
973    pub host: String,
974
975    /// Enable consensus participation
976    pub consensus_enabled: bool,
977
978    /// Enable local discovery of peers within the same process
979    pub local_discovery: bool,
980    /// Persist peers to storage and load on start (equivalent to Python PEERS in DB)
981    pub persist_peers: bool,
982}
983
984impl Default for NodeConfig {
985    fn default() -> Self {
986        Self {
987            max_peers: 50,
988            port: 8080,
989            host: "127.0.0.1".to_string(),
990            consensus_enabled: true,
991            local_discovery: true,
992            persist_peers: true,
993        }
994    }
995}
996
997/// Builder for Chaincraft nodes
998pub struct ChaincraftNodeBuilder {
999    id: Option<PeerId>,
1000    storage: Option<Arc<dyn Storage>>,
1001    config: NodeConfig,
1002    persistent: bool,
1003}
1004
1005impl ChaincraftNodeBuilder {
1006    /// Create a new node builder
1007    pub fn new() -> Self {
1008        Self {
1009            id: None,
1010            storage: None,
1011            config: NodeConfig::default(),
1012            persistent: false,
1013        }
1014    }
1015
1016    /// Set the node ID
1017    pub fn with_id(mut self, id: PeerId) -> Self {
1018        self.id = Some(id);
1019        self
1020    }
1021
1022    /// Set the storage backend
1023    pub fn with_storage(mut self, storage: Arc<dyn Storage>) -> Self {
1024        self.storage = Some(storage);
1025        self
1026    }
1027
1028    /// Set persistent storage option
1029    pub fn with_persistent_storage(mut self, persistent: bool) -> Self {
1030        self.persistent = persistent;
1031        self
1032    }
1033
1034    /// Set the node configuration
1035    pub fn with_config(mut self, config: NodeConfig) -> Self {
1036        self.config = config;
1037        self
1038    }
1039
1040    /// Set the port
1041    pub fn port(mut self, port: u16) -> Self {
1042        self.config.port = port;
1043        self
1044    }
1045
1046    /// Set the host
1047    pub fn host(mut self, host: impl Into<String>) -> Self {
1048        self.config.host = host.into();
1049        self
1050    }
1051
1052    /// Enable or disable local discovery
1053    pub fn local_discovery(mut self, enabled: bool) -> Self {
1054        self.config.local_discovery = enabled;
1055        self
1056    }
1057
1058    /// Enable or disable persisting peers to storage
1059    pub fn persist_peers(mut self, enabled: bool) -> Self {
1060        self.config.persist_peers = enabled;
1061        self
1062    }
1063
1064    /// Set the maximum peers
1065    pub fn max_peers(mut self, max_peers: usize) -> Self {
1066        self.config.max_peers = max_peers;
1067        self
1068    }
1069
1070    /// Build the node
1071    pub fn build(self) -> Result<ChaincraftNode> {
1072        // Generate a new random ID if not provided
1073        let id = self.id.unwrap_or_else(|| {
1074            use crate::network::PeerId;
1075            PeerId::new()
1076        });
1077
1078        // Select storage backend: explicit, persistent (on-disk), or in-memory.
1079        let storage: Arc<dyn Storage> = if let Some(storage) = self.storage {
1080            storage
1081        } else if self.persistent {
1082            #[cfg(feature = "persistent")]
1083            {
1084                use crate::storage::SledStorage;
1085                // Use port-based file name similar to Python's node_<port>.db
1086                let path = format!("node_{}.db", self.config.port);
1087                Arc::new(SledStorage::open(path)?)
1088            }
1089            #[cfg(not(feature = "persistent"))]
1090            {
1091                Arc::new(MemoryStorage::new())
1092            }
1093        } else {
1094            Arc::new(MemoryStorage::new())
1095        };
1096
1097        Ok(ChaincraftNode {
1098            id,
1099            registry: Arc::new(RwLock::new(SharedObjectRegistry::new())),
1100            app_objects: Arc::new(RwLock::new(ApplicationObjectRegistry::new())),
1101            discovery: None, // Will be initialized during start if needed
1102            storage,
1103            peers: Arc::new(RwLock::new(HashMap::new())),
1104            banned_peers: Arc::new(RwLock::new(HashSet::new())),
1105            known_hashes: Arc::new(RwLock::new(HashSet::new())),
1106            socket: None,
1107            config: self.config,
1108            running: Arc::new(RwLock::new(false)),
1109        })
1110    }
1111}
1112
1113impl Default for ChaincraftNodeBuilder {
1114    fn default() -> Self {
1115        Self::new()
1116    }
1117}