1use crate::{
4 discovery::{DiscoveryConfig, DiscoveryManager},
5 error::{ChaincraftError, NetworkError, Result},
6 network::{PeerId, PeerInfo},
7 shared::{MessageType, SharedMessage, SharedObjectId, SharedObjectRegistry},
8 shared_object::{ApplicationObject, ApplicationObjectRegistry, SimpleSharedNumber},
9 storage::{MemoryStorage, Storage},
10};
11use serde_json::json;
12
13use serde::de::Error as SerdeDeError;
14use std::{
15 collections::{HashMap, HashSet},
16 net::SocketAddr,
17 sync::{Arc, Mutex, OnceLock},
18 time::Duration,
19};
20use tokio::{net::UdpSocket, sync::RwLock};
21
22const PEERS_KEY: &str = "__PEERS__";
24const BANNED_PEERS_KEY: &str = "__BANNED_PEERS__";
26
27#[derive(serde::Serialize, serde::Deserialize)]
28struct PersistedPeer {
29 id: String,
30 address: String,
31}
32
33#[derive(serde::Serialize, serde::Deserialize)]
34struct BannedEntry {
35 addr: String,
36 expires_at: String,
37}
38
39pub struct ChaincraftNode {
41 pub id: PeerId,
43 pub registry: Arc<RwLock<SharedObjectRegistry>>,
45 pub app_objects: Arc<RwLock<ApplicationObjectRegistry>>,
47 pub discovery: Option<DiscoveryManager>,
49 pub storage: Arc<dyn Storage>,
51 pub peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
53 pub banned_peers: Arc<RwLock<HashSet<SocketAddr>>>,
55 pub known_hashes: Arc<RwLock<HashSet<String>>>,
57 pub socket: Option<Arc<UdpSocket>>,
59 pub config: NodeConfig,
61 pub running: Arc<RwLock<bool>>,
63}
64
65impl ChaincraftNode {
66 pub fn new(id: PeerId, storage: Arc<dyn Storage>) -> Self {
68 Self::builder()
69 .with_id(id)
70 .with_storage(storage)
71 .build()
72 .expect("Failed to create node")
73 }
74
75 pub fn default() -> Self {
77 Self::new(PeerId::new(), Arc::new(MemoryStorage::new()))
78 }
79
80 pub fn new_default() -> Self {
82 Self::default()
83 }
84
85 pub fn builder() -> ChaincraftNodeBuilder {
87 ChaincraftNodeBuilder::new()
88 }
89
90 pub async fn start(&mut self) -> Result<()> {
92 self.storage.initialize().await?;
94
95 self.load_persisted_peers().await?;
97 self.load_banned_peers().await?;
98
99 {
101 let mut running = self.running.write().await;
102 *running = true;
103 }
104
105 self.start_networking().await?;
107
108 Ok(())
112 }
113
114 pub async fn stop(&mut self) -> Result<()> {
116 {
117 let mut running = self.running.write().await;
118 *running = false;
119 }
120 if self.config.local_discovery {
122 unregister_local_node(&self.id);
123 }
124 Ok(())
126 }
127
128 pub async fn close(&mut self) -> Result<()> {
130 self.stop().await
131 }
132
133 pub async fn is_running_async(&self) -> bool {
135 *self.running.read().await
136 }
137
138 pub async fn add_peer(&self, peer: PeerInfo) -> Result<()> {
140 let banned = self.banned_peers.read().await;
141 if banned.contains(&peer.address) {
142 return Err(ChaincraftError::Network(NetworkError::PeerBanned {
143 addr: peer.address,
144 expires_at: chrono::Utc::now() + chrono::Duration::days(365 * 10),
145 }));
146 }
147 drop(banned);
148
149 let mut peers = self.peers.write().await;
150 peers.insert(peer.id.clone(), peer.clone());
151 drop(peers);
152
153 if self.config.persist_peers {
154 self.save_persisted_peers().await?;
155 }
156 Ok(())
157 }
158
159 pub async fn remove_peer(&self, peer_id: &PeerId) -> Result<()> {
161 let mut peers = self.peers.write().await;
162 peers.remove(peer_id);
163 drop(peers);
164
165 if self.config.persist_peers {
166 self.save_persisted_peers().await?;
167 }
168 Ok(())
169 }
170
171 pub async fn ban_peer(&self, addr: SocketAddr, duration: Option<std::time::Duration>) -> Result<()> {
173 {
174 let mut banned = self.banned_peers.write().await;
175 banned.insert(addr);
176 }
177 self.save_banned_peers().await?;
178 Ok(())
179 }
180
181 pub async fn unban_peer(&self, addr: SocketAddr) -> Result<()> {
183 {
184 let mut banned = self.banned_peers.write().await;
185 banned.remove(&addr);
186 }
187 self.save_banned_peers().await?;
188 Ok(())
189 }
190
191 pub async fn is_banned(&self, addr: SocketAddr) -> bool {
193 self.banned_peers.read().await.contains(&addr)
194 }
195
196 async fn load_persisted_peers(&self) -> Result<()> {
198 if !self.config.persist_peers {
199 return Ok(());
200 }
201 let bytes = match self.storage.get(PEERS_KEY).await? {
202 Some(b) => b,
203 None => return Ok(()),
204 };
205 let persisted: Vec<PersistedPeer> = match serde_json::from_slice(&bytes) {
206 Ok(p) => p,
207 Err(_) => return Ok(()),
208 };
209 let mut peers = self.peers.write().await;
210 for p in persisted {
211 let addr: SocketAddr = match p.address.parse() {
212 Ok(a) => a,
213 Err(_) => continue,
214 };
215 let id = match uuid::Uuid::parse_str(&p.id) {
216 Ok(u) => PeerId::from_uuid(u),
217 Err(_) => PeerId::new(),
218 };
219 let info = PeerInfo::new(id, addr);
220 peers.insert(info.id.clone(), info);
221 }
222 Ok(())
223 }
224
225 async fn save_persisted_peers(&self) -> Result<()> {
227 let peers = self.peers.read().await;
228 let persisted: Vec<PersistedPeer> = peers
229 .values()
230 .map(|p| PersistedPeer {
231 id: p.id.to_string(),
232 address: p.address.to_string(),
233 })
234 .collect();
235 let json = serde_json::to_vec(&persisted).map_err(|e| {
236 ChaincraftError::Serialization(crate::error::SerializationError::Json(e))
237 })?;
238 self.storage.put(PEERS_KEY, json).await?;
239 Ok(())
240 }
241
242 async fn load_banned_peers(&self) -> Result<()> {
244 let bytes = match self.storage.get(BANNED_PEERS_KEY).await? {
245 Some(b) => b,
246 None => return Ok(()),
247 };
248 let entries: Vec<BannedEntry> = match serde_json::from_slice(&bytes) {
249 Ok(e) => e,
250 Err(_) => return Ok(()),
251 };
252 let now = chrono::Utc::now();
253 let mut banned = self.banned_peers.write().await;
254 for e in entries {
255 if let Ok(addr) = e.addr.parse::<SocketAddr>() {
256 let expires: chrono::DateTime<chrono::Utc> =
257 chrono::DateTime::parse_from_rfc3339(&e.expires_at)
258 .map(|dt| dt.with_timezone(&chrono::Utc))
259 .unwrap_or(now);
260 if expires > now {
261 banned.insert(addr);
262 }
263 }
264 }
265 Ok(())
266 }
267
268 async fn save_banned_peers(&self) -> Result<()> {
270 let banned = self.banned_peers.read().await;
271 let entries: Vec<BannedEntry> = banned
272 .iter()
273 .map(|addr| BannedEntry {
274 addr: addr.to_string(),
275 expires_at: (chrono::Utc::now() + chrono::Duration::days(365 * 10)).to_rfc3339(),
276 })
277 .collect();
278 let json = serde_json::to_vec(&entries).map_err(|e| {
279 ChaincraftError::Serialization(crate::error::SerializationError::Json(e))
280 })?;
281 self.storage.put(BANNED_PEERS_KEY, json).await?;
282 Ok(())
283 }
284
285 pub async fn connect_to_peer(&mut self, peer_addr: &str) -> Result<()> {
287 self.connect_to_peer_with_discovery(peer_addr, false).await
288 }
289
290 pub async fn connect_to_peer_with_discovery(
292 &mut self,
293 peer_addr: &str,
294 _discovery: bool,
295 ) -> Result<()> {
296 let socket_addr: SocketAddr = peer_addr.parse().map_err(|_| {
298 ChaincraftError::Network(NetworkError::InvalidMessage {
299 reason: "Invalid peer address format".to_string(),
300 })
301 })?;
302
303 if self.is_banned(socket_addr).await {
304 return Err(ChaincraftError::Network(NetworkError::PeerBanned {
305 addr: socket_addr,
306 expires_at: chrono::Utc::now(),
307 }));
308 }
309
310 let peer_id = PeerId::new(); let peer_info = PeerInfo::new(peer_id.clone(), socket_addr);
312
313 self.add_peer(peer_info.clone()).await?;
314
315 if let Some(discovery) = &self.discovery {
317 discovery.add_peer(peer_info).await?;
318 discovery.mark_connected(&peer_id).await?;
319 }
320
321 Ok(())
322 }
323
324 pub async fn get_peers(&self) -> Vec<PeerInfo> {
326 let peers = self.peers.read().await;
327 peers.values().cloned().collect()
328 }
329
330 pub fn peers(&self) -> Vec<PeerInfo> {
332 Vec::new()
334 }
335
336 pub fn id(&self) -> &PeerId {
338 &self.id
339 }
340
341 pub fn port(&self) -> u16 {
343 self.config.port
344 }
345
346 pub fn host(&self) -> &str {
348 "127.0.0.1" }
350
351 pub fn max_peers(&self) -> usize {
353 self.config.max_peers
354 }
355
356 pub async fn create_shared_message(&mut self, data: String) -> Result<String> {
358 let message_data = serde_json::to_value(&data).map_err(|e| {
359 ChaincraftError::Serialization(crate::error::SerializationError::Json(e))
360 })?;
361 let message =
362 SharedMessage::new(MessageType::Custom("user_message".to_string()), message_data);
363 let hash = message.hash.clone();
364 let json = message.to_json()?;
365 self.storage.put(&hash, json.as_bytes().to_vec()).await?;
366
367 {
369 let mut set = self.known_hashes.write().await;
370 set.insert(hash.clone());
371 }
372
373 if let Some(socket) = &self.socket {
375 let peers = self.peers.clone();
376 let banned_peers = self.banned_peers.clone();
377 let socket = socket.clone();
378 let json_bytes = json.into_bytes();
379 tokio::spawn(async move {
380 if let Err(e) = broadcast_bytes(&socket, &peers, &banned_peers, &json_bytes).await {
381 tracing::warn!("Failed to broadcast message: {:?}", e);
382 }
383 });
384 }
385
386 Ok(hash)
387 }
388
389 pub fn has_object(&self, _hash: &str) -> bool {
391 true
393 }
394
395 pub async fn get_object(&self, hash: &str) -> Result<String> {
397 if let Some(bytes) = self.storage.get(hash).await? {
398 let s = String::from_utf8(bytes).map_err(|e| {
399 ChaincraftError::Serialization(crate::error::SerializationError::Json(
400 SerdeDeError::custom(e),
401 ))
402 })?;
403 Ok(s)
404 } else {
405 Err(ChaincraftError::Storage(crate::error::StorageError::KeyNotFound {
406 key: hash.to_string(),
407 }))
408 }
409 }
410
411 pub fn db_size(&self) -> usize {
413 futures::executor::block_on(async { self.storage.len().await.unwrap_or(0) })
415 }
416
417 pub async fn add_shared_object(
419 &self,
420 object: Box<dyn ApplicationObject>,
421 ) -> Result<SharedObjectId> {
422 let mut registry = self.app_objects.write().await;
423 let id = registry.register(object);
424 Ok(id)
425 }
426
427 pub async fn shared_objects(&self) -> Vec<Box<dyn ApplicationObject>> {
429 let registry = self.app_objects.read().await;
430 registry
431 .ids()
432 .into_iter()
433 .filter_map(|id| registry.get(&id))
434 .map(|obj| obj.clone_box())
435 .collect()
436 }
437
438 pub async fn shared_object_count(&self) -> usize {
440 let registry = self.app_objects.read().await;
441 registry.len()
442 }
443
444 pub async fn create_shared_message_with_data(
446 &mut self,
447 data: serde_json::Value,
448 ) -> Result<String> {
449 let message_type = if let Some(msg_type) = data.get("type").and_then(|t| t.as_str()) {
451 match msg_type {
452 "PEER_DISCOVERY" => MessageType::PeerDiscovery,
453 "REQUEST_LOCAL_PEERS" => MessageType::RequestLocalPeers,
454 "LOCAL_PEERS" => MessageType::LocalPeers,
455 "REQUEST_SHARED_OBJECT_UPDATE" => MessageType::RequestSharedObjectUpdate,
456 "SHARED_OBJECT_UPDATE" => MessageType::SharedObjectUpdate,
457 "GET" => MessageType::Get,
458 "SET" => MessageType::Set,
459 "DELETE" => MessageType::Delete,
460 "RESPONSE" => MessageType::Response,
461 "NOTIFICATION" => MessageType::Notification,
462 "HEARTBEAT" => MessageType::Heartbeat,
463 "ERROR" => MessageType::Error,
464 _ => MessageType::Custom(msg_type.to_string()),
465 }
466 } else {
467 MessageType::Custom("user_message".to_string())
468 };
469
470 let message = SharedMessage::new(message_type, data.clone());
471 let hash = message.hash.clone();
472 let json = message.to_json()?;
473 self.storage.put(&hash, json.as_bytes().to_vec()).await?;
475 let mut app_registry = self.app_objects.write().await;
477 let _processed = app_registry.process_message(message).await?;
478
479 if let Some(socket) = &self.socket {
481 let peers = self.peers.clone();
482 let banned_peers = self.banned_peers.clone();
483 let socket = socket.clone();
484 let json_bytes = json.into_bytes();
485 tokio::spawn(async move {
486 if let Err(e) = broadcast_bytes(&socket, &peers, &banned_peers, &json_bytes).await {
487 tracing::warn!("Failed to broadcast message: {:?}", e);
488 }
489 });
490 }
491
492 Ok(hash)
493 }
494
495 pub async fn get_state(&self) -> Result<serde_json::Value> {
497 Ok(serde_json::json!({
498 "node_id": self.id.to_string(),
499 "running": *self.running.read().await,
500 "port": self.config.port,
501 "max_peers": self.config.max_peers,
502 "peer_count": self.peers.read().await.len(),
503 "messages": "stored", "shared_objects": self.shared_object_count().await
505 }))
506 }
507
508 pub async fn get_discovery_info(&self) -> serde_json::Value {
510 serde_json::json!({
511 "node_id": self.id.to_string(),
512 "host": self.host(),
513 "port": self.port(),
514 "max_peers": self.max_peers(),
515 "peer_count": self.peers.read().await.len()
516 })
517 }
518
519 pub fn set_port(&mut self, port: u16) {
521 self.config.port = port;
522 }
523
524 pub fn disable_local_discovery(&mut self) {
526 self.config.local_discovery = false;
527 }
528
529 pub fn is_running(&self) -> bool {
531 futures::executor::block_on(async { *self.running.read().await })
533 }
534}
535
536impl ChaincraftNode {
538 async fn start_networking(&mut self) -> Result<()> {
539 let bind_addr: SocketAddr = format!("{}:{}", self.host(), self.port())
542 .parse()
543 .map_err(|_| {
544 ChaincraftError::Config(format!(
545 "Invalid bind address {}:{}",
546 self.host(),
547 self.port()
548 ))
549 })?;
550
551 let socket = UdpSocket::bind(bind_addr)
552 .await
553 .map_err(|e| ChaincraftError::Network(NetworkError::BindFailed { addr: bind_addr, source: e }))?;
554
555 let register_addr = if self.config.port == 0 {
557 if let Ok(local_addr) = socket.local_addr() {
558 self.config.port = local_addr.port();
559 local_addr
560 } else {
561 bind_addr
562 }
563 } else {
564 bind_addr
565 };
566
567 let socket = Arc::new(socket);
568 self.socket = Some(socket.clone());
569
570 let running = self.running.clone();
571 let storage = self.storage.clone();
572 let app_objects = self.app_objects.clone();
573 let peers = self.peers.clone();
574 let known_hashes = self.known_hashes.clone();
575
576 if self.config.local_discovery {
578 register_local_node(self.id.clone(), register_addr);
579 }
580
581 let banned_peers = self.banned_peers.clone();
583 let known_hashes = self.known_hashes.clone();
584 {
585 let socket = socket.clone();
586 let running = running.clone();
587 let storage = storage.clone();
588 let app_objects = app_objects.clone();
589 let peers = peers.clone();
590 let banned_peers = banned_peers.clone();
591 let known_hashes = known_hashes.clone();
592 tokio::spawn(async move {
593 let mut buf = vec![0u8; 64 * 1024];
594 loop {
595 if !*running.read().await {
596 break;
597 }
598
599 let (len, addr) = match socket.recv_from(&mut buf).await {
600 Ok(res) => res,
601 Err(e) => {
602 if !*running.read().await {
603 break;
604 }
605 tracing::warn!("UDP recv_from error: {:?}", e);
606 continue;
607 }
608 };
609
610 let data = &buf[..len];
611 if let Err(e) = handle_incoming_datagram(
612 data,
613 addr,
614 &socket,
615 &storage,
616 &app_objects,
617 &peers,
618 &banned_peers,
619 Some(&known_hashes),
620 )
621 .await
622 {
623 tracing::warn!("Error handling incoming datagram from {}: {:?}", addr, e);
624 }
625 }
626 });
627 }
628
629 let node_id = self.id.clone();
631 let banned_peers = banned_peers.clone();
632 let local_discovery = self.config.local_discovery;
633 tokio::spawn(async move {
634 let interval = Duration::from_millis(500);
636 loop {
637 if !*running.read().await {
638 break;
639 }
640
641 if local_discovery {
644 if let Some(local_nodes) = snapshot_local_nodes() {
645 let banned_set: HashSet<SocketAddr> = {
646 let b = banned_peers.read().await;
647 b.iter().copied().collect()
648 };
649 let mut peers_guard = peers.write().await;
650 for (peer_id, addr) in local_nodes {
651 if peer_id == node_id {
652 continue;
653 }
654 if banned_set.contains(&addr) {
655 continue;
656 }
657 if peers_guard.values().any(|p| p.address == addr) {
658 continue;
659 }
660 let info = PeerInfo::new(peer_id.clone(), addr);
661 peers_guard.insert(peer_id, info);
662 }
663 }
664 }
665
666 let hashes: Vec<String> = {
668 let set = known_hashes.read().await;
669 set.iter().cloned().collect()
670 };
671
672 for hash in hashes {
673 if let Ok(Some(bytes)) = storage.get(&hash).await {
675 if let Err(e) = broadcast_bytes(&socket, &peers, &banned_peers, &bytes).await {
676 tracing::warn!("gossip broadcast failed for {}: {:?}", hash, e);
677 }
678 }
679 }
680
681 let peer_addrs: Vec<SocketAddr> = {
683 let p = peers.read().await;
684 p.values().map(|x| x.address).collect()
685 };
686 if !peer_addrs.is_empty() {
687 if let Some(&peer_addr) = peer_addrs.first() {
688 let req = json!({ "type": "REQUEST_DIGEST" });
689 if let Ok(bytes) = serde_json::to_vec(&req) {
690 let _ = socket.send_to(&bytes, peer_addr).await;
691 }
692 }
693 }
694
695 tokio::time::sleep(interval).await;
696 }
697 });
698
699 Ok(())
700 }
701}
702
703async fn broadcast_bytes(
705 socket: &Arc<UdpSocket>,
706 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
707 banned_peers: &Arc<RwLock<HashSet<SocketAddr>>>,
708 data: &[u8],
709) -> Result<()> {
710 let (peers_snapshot, banned_set): (Vec<SocketAddr>, HashSet<SocketAddr>) = {
711 let p = peers.read().await;
712 let b = banned_peers.read().await;
713 (
714 p.values().map(|x| x.address).collect(),
715 b.iter().copied().collect(),
716 )
717 };
718
719 for addr in peers_snapshot {
720 if banned_set.contains(&addr) {
721 continue;
722 }
723 if let Err(e) = socket.send_to(data, addr).await {
724 tracing::warn!("Failed to send UDP packet to {}: {:?}", addr, e);
725 }
726 }
727
728 Ok(())
729}
730
731static LOCAL_NODES: OnceLock<Mutex<HashMap<PeerId, SocketAddr>>> = OnceLock::new();
736
737fn local_registry() -> &'static Mutex<HashMap<PeerId, SocketAddr>> {
738 LOCAL_NODES.get_or_init(|| Mutex::new(HashMap::new()))
739}
740
741fn register_local_node(id: PeerId, addr: SocketAddr) {
742 let registry = local_registry();
743 let mut guard = registry.lock().unwrap();
744 guard.insert(id, addr);
745}
746
747fn unregister_local_node(id: &PeerId) {
748 if let Some(registry) = LOCAL_NODES.get() {
749 let mut guard = registry.lock().unwrap();
750 guard.remove(id);
751 }
752}
753
754pub fn clear_local_registry() {
756 if let Some(registry) = LOCAL_NODES.get() {
757 let mut guard = registry.lock().unwrap();
758 guard.clear();
759 }
760}
761
762fn snapshot_local_nodes() -> Option<Vec<(PeerId, SocketAddr)>> {
763 let registry = LOCAL_NODES.get()?;
764 let guard = registry.lock().unwrap();
765 Some(guard.iter().map(|(id, addr)| (id.clone(), *addr)).collect())
766}
767
768async fn handle_digest_sync_control(
770 data: &[u8],
771 addr: SocketAddr,
772 socket: &Arc<UdpSocket>,
773 storage: &Arc<dyn Storage>,
774 app_objects: &Arc<RwLock<ApplicationObjectRegistry>>,
775 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
776 banned_peers: &Arc<RwLock<HashSet<SocketAddr>>>,
777 known_hashes: &Arc<RwLock<HashSet<String>>>,
778) -> Result<bool> {
779 let value: serde_json::Value = serde_json::from_slice(data).map_err(|_| {
780 ChaincraftError::Serialization(crate::error::SerializationError::Json(
781 serde_json::Error::custom("not json"),
782 ))
783 })?;
784 let msg_type = value.get("type").and_then(|t| t.as_str());
785 match msg_type {
786 Some("REQUEST_DIGEST") => {
787 let digest = {
788 let registry = app_objects.read().await;
789 let ids = registry.ids();
790 let mut digest = "".to_string();
791 for id in ids {
792 if let Some(obj) = registry.get(&id) {
793 if obj.is_merkleized() {
794 digest = obj.get_latest_digest().await.unwrap_or_default();
795 break;
796 }
797 }
798 }
799 digest
800 };
801 let resp = json!({ "type": "DIGEST_RESPONSE", "digest": digest });
802 let bytes = serde_json::to_vec(&resp).unwrap_or_default();
803 let _ = socket.send_to(&bytes, addr).await;
804 return Ok(true);
805 }
806 Some("REQUEST_MESSAGES_SINCE") => {
807 let since = value.get("digest").and_then(|d| d.as_str()).unwrap_or("");
808 let messages = {
809 let registry = app_objects.read().await;
810 let ids = registry.ids();
811 let mut msgs = Vec::new();
812 for id in ids {
813 if let Some(obj) = registry.get(&id) {
814 if obj.is_merkleized() {
815 msgs = obj.get_messages_since_digest(since).await.unwrap_or_default();
816 break;
817 }
818 }
819 }
820 msgs
821 };
822 let msg_ser: Vec<serde_json::Value> = messages
823 .iter()
824 .filter_map(|m| serde_json::to_value(m).ok())
825 .collect();
826 let resp = json!({ "type": "MESSAGES_RESPONSE", "messages": msg_ser });
827 let bytes = serde_json::to_vec(&resp).unwrap_or_default();
828 let _ = socket.send_to(&bytes, addr).await;
829 return Ok(true);
830 }
831 Some("DIGEST_RESPONSE") => {
832 let remote_digest = value.get("digest").and_then(|d| d.as_str()).unwrap_or("");
833 let our_digest = {
834 let registry = app_objects.read().await;
835 let ids = registry.ids();
836 let mut d = String::new();
837 for id in ids {
838 if let Some(obj) = registry.get(&id) {
839 if obj.is_merkleized() {
840 d = obj.get_latest_digest().await.unwrap_or_default();
841 break;
842 }
843 }
844 }
845 d
846 };
847 if remote_digest != our_digest {
848 let req = json!({ "type": "REQUEST_MESSAGES_SINCE", "digest": our_digest });
849 let bytes = serde_json::to_vec(&req).unwrap_or_default();
850 let _ = socket.send_to(&bytes, addr).await;
851 }
852 return Ok(true);
853 }
854 Some("MESSAGES_RESPONSE") => {
855 let messages: Vec<SharedMessage> = value
856 .get("messages")
857 .and_then(|m| m.as_array())
858 .map(|arr| {
859 arr.iter()
860 .filter_map(|v| serde_json::from_value(v.clone()).ok())
861 .collect::<Vec<_>>()
862 })
863 .unwrap_or_default();
864 for msg in messages {
865 if storage.exists(&msg.hash).await.unwrap_or(true) {
866 continue;
867 }
868 let json = msg.to_json().unwrap_or_default();
869 let _ = storage.put(&msg.hash, json.as_bytes().to_vec()).await;
870 {
871 let mut set = known_hashes.write().await;
872 set.insert(msg.hash.clone());
873 }
874 {
875 let mut registry = app_objects.write().await;
876 let _ = registry.process_message(msg.clone()).await;
877 }
878 let bytes = msg.to_json().unwrap_or_default().into_bytes();
879 let _ = broadcast_bytes(socket, peers, banned_peers, &bytes).await;
880 }
881 return Ok(true);
882 }
883 _ => {}
884 }
885 Ok(false)
886}
887
888async fn handle_incoming_datagram(
890 data: &[u8],
891 addr: SocketAddr,
892 socket: &Arc<UdpSocket>,
893 storage: &Arc<dyn Storage>,
894 app_objects: &Arc<RwLock<ApplicationObjectRegistry>>,
895 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
896 banned_peers: &Arc<RwLock<HashSet<SocketAddr>>>,
897 known_hashes: Option<&Arc<RwLock<HashSet<String>>>>,
898) -> Result<()> {
899 {
901 let banned = banned_peers.read().await;
902 if banned.contains(&addr) {
903 return Ok(()); }
905 }
906
907 if let Some(kh) = known_hashes {
909 if let Ok(true) =
910 handle_digest_sync_control(data, addr, socket, storage, app_objects, peers, banned_peers, kh).await
911 {
912 {
914 let mut guard = peers.write().await;
915 if !guard.values().any(|p| p.address == addr) {
916 let peer_id = PeerId::new();
917 let info = PeerInfo::new(peer_id.clone(), addr);
918 guard.insert(peer_id, info);
919 }
920 }
921 return Ok(());
922 }
923 }
924
925 let msg: SharedMessage = match serde_json::from_slice(data) {
927 Ok(m) => m,
928 Err(_) => return Ok(()),
929 };
930
931 if storage.exists(&msg.hash).await? {
933 return Ok(());
934 }
935
936 let json = msg.to_json()?;
938 storage.put(&msg.hash, json.as_bytes().to_vec()).await?;
939
940 {
942 let mut guard = peers.write().await;
943 if !guard.values().any(|p| p.address == addr) {
944 let peer_id = PeerId::new();
945 let info = PeerInfo::new(peer_id.clone(), addr);
946 guard.insert(peer_id, info);
947 }
948 }
949
950 {
952 let mut registry = app_objects.write().await;
953 let _ = registry.process_message(msg.clone()).await?;
954 }
955
956 let bytes = json.into_bytes();
958 broadcast_bytes(socket, peers, banned_peers, &bytes).await?;
959
960 Ok(())
961}
962
963#[derive(Debug, Clone)]
965pub struct NodeConfig {
966 pub max_peers: usize,
968
969 pub port: u16,
971
972 pub host: String,
974
975 pub consensus_enabled: bool,
977
978 pub local_discovery: bool,
980 pub persist_peers: bool,
982}
983
984impl Default for NodeConfig {
985 fn default() -> Self {
986 Self {
987 max_peers: 50,
988 port: 8080,
989 host: "127.0.0.1".to_string(),
990 consensus_enabled: true,
991 local_discovery: true,
992 persist_peers: true,
993 }
994 }
995}
996
997pub struct ChaincraftNodeBuilder {
999 id: Option<PeerId>,
1000 storage: Option<Arc<dyn Storage>>,
1001 config: NodeConfig,
1002 persistent: bool,
1003}
1004
1005impl ChaincraftNodeBuilder {
1006 pub fn new() -> Self {
1008 Self {
1009 id: None,
1010 storage: None,
1011 config: NodeConfig::default(),
1012 persistent: false,
1013 }
1014 }
1015
1016 pub fn with_id(mut self, id: PeerId) -> Self {
1018 self.id = Some(id);
1019 self
1020 }
1021
1022 pub fn with_storage(mut self, storage: Arc<dyn Storage>) -> Self {
1024 self.storage = Some(storage);
1025 self
1026 }
1027
1028 pub fn with_persistent_storage(mut self, persistent: bool) -> Self {
1030 self.persistent = persistent;
1031 self
1032 }
1033
1034 pub fn with_config(mut self, config: NodeConfig) -> Self {
1036 self.config = config;
1037 self
1038 }
1039
1040 pub fn port(mut self, port: u16) -> Self {
1042 self.config.port = port;
1043 self
1044 }
1045
1046 pub fn host(mut self, host: impl Into<String>) -> Self {
1048 self.config.host = host.into();
1049 self
1050 }
1051
1052 pub fn local_discovery(mut self, enabled: bool) -> Self {
1054 self.config.local_discovery = enabled;
1055 self
1056 }
1057
1058 pub fn persist_peers(mut self, enabled: bool) -> Self {
1060 self.config.persist_peers = enabled;
1061 self
1062 }
1063
1064 pub fn max_peers(mut self, max_peers: usize) -> Self {
1066 self.config.max_peers = max_peers;
1067 self
1068 }
1069
1070 pub fn build(self) -> Result<ChaincraftNode> {
1072 let id = self.id.unwrap_or_else(|| {
1074 use crate::network::PeerId;
1075 PeerId::new()
1076 });
1077
1078 let storage: Arc<dyn Storage> = if let Some(storage) = self.storage {
1080 storage
1081 } else if self.persistent {
1082 #[cfg(feature = "persistent")]
1083 {
1084 use crate::storage::SledStorage;
1085 let path = format!("node_{}.db", self.config.port);
1087 Arc::new(SledStorage::open(path)?)
1088 }
1089 #[cfg(not(feature = "persistent"))]
1090 {
1091 Arc::new(MemoryStorage::new())
1092 }
1093 } else {
1094 Arc::new(MemoryStorage::new())
1095 };
1096
1097 Ok(ChaincraftNode {
1098 id,
1099 registry: Arc::new(RwLock::new(SharedObjectRegistry::new())),
1100 app_objects: Arc::new(RwLock::new(ApplicationObjectRegistry::new())),
1101 discovery: None, storage,
1103 peers: Arc::new(RwLock::new(HashMap::new())),
1104 banned_peers: Arc::new(RwLock::new(HashSet::new())),
1105 known_hashes: Arc::new(RwLock::new(HashSet::new())),
1106 socket: None,
1107 config: self.config,
1108 running: Arc::new(RwLock::new(false)),
1109 })
1110 }
1111}
1112
1113impl Default for ChaincraftNodeBuilder {
1114 fn default() -> Self {
1115 Self::new()
1116 }
1117}