1use crate::{
4 discovery::{DiscoveryConfig, DiscoveryManager},
5 error::{ChaincraftError, Result},
6 network::{PeerId, PeerInfo},
7 shared::{MessageType, SharedMessage, SharedObjectId, SharedObjectRegistry},
8 shared_object::{ApplicationObject, ApplicationObjectRegistry, SimpleSharedNumber},
9 storage::{MemoryStorage, Storage},
10};
11
12use serde::de::Error as SerdeDeError;
13use std::{
14 collections::HashMap,
15 sync::Arc,
16};
17use tokio::sync::RwLock;
18
19pub struct ChaincraftNode {
21 pub id: PeerId,
23 pub registry: Arc<RwLock<SharedObjectRegistry>>,
25 pub app_objects: Arc<RwLock<ApplicationObjectRegistry>>,
27 pub discovery: Option<DiscoveryManager>,
29 pub storage: Arc<dyn Storage>,
31 pub peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
33 pub config: NodeConfig,
35 pub running: Arc<RwLock<bool>>,
37}
38
39impl ChaincraftNode {
40 pub fn new(id: PeerId, storage: Arc<dyn Storage>) -> Self {
42 Self::builder()
43 .with_id(id)
44 .with_storage(storage)
45 .build()
46 .expect("Failed to create node")
47 }
48
49 pub fn default() -> Self {
51 Self::new(PeerId::new(), Arc::new(MemoryStorage::new()))
52 }
53
54 pub fn new_default() -> Self {
56 Self::default()
57 }
58
59 pub fn builder() -> ChaincraftNodeBuilder {
61 ChaincraftNodeBuilder::new()
62 }
63
64 pub async fn start(&mut self) -> Result<()> {
66 self.storage.initialize().await?;
68
69 *self.running.write().await = true;
71
72 Ok(())
77 }
78
79 pub async fn stop(&mut self) -> Result<()> {
81 *self.running.write().await = false;
82 Ok(())
84 }
85
86 pub async fn close(&mut self) -> Result<()> {
88 self.stop().await
89 }
90
91 pub async fn is_running_async(&self) -> bool {
93 *self.running.read().await
94 }
95
96 pub async fn add_peer(&self, peer: PeerInfo) -> Result<()> {
98 let mut peers = self.peers.write().await;
99 peers.insert(peer.id.clone(), peer);
100 Ok(())
101 }
102
103 pub async fn remove_peer(&self, peer_id: &PeerId) -> Result<()> {
105 let mut peers = self.peers.write().await;
106 peers.remove(peer_id);
107 Ok(())
108 }
109
110 pub async fn connect_to_peer(&mut self, peer_addr: &str) -> Result<()> {
112 self.connect_to_peer_with_discovery(peer_addr, false).await
113 }
114
115 pub async fn connect_to_peer_with_discovery(
117 &mut self,
118 peer_addr: &str,
119 _discovery: bool,
120 ) -> Result<()> {
121 let parts: Vec<&str> = peer_addr.split(':').collect();
123 if parts.len() != 2 {
124 return Err(ChaincraftError::Network(crate::error::NetworkError::InvalidMessage {
125 reason: "Invalid peer address format".to_string(),
126 }));
127 }
128
129 let host = parts[0].to_string();
130 let port: u16 = parts[1].parse().map_err(|_| {
131 ChaincraftError::Network(crate::error::NetworkError::InvalidMessage {
132 reason: "Invalid port number".to_string(),
133 })
134 })?;
135
136 let peer_id = PeerId::new(); let socket_addr = format!("{}:{}", host, port).parse().map_err(|_| {
138 ChaincraftError::Network(crate::error::NetworkError::InvalidMessage {
139 reason: "Invalid socket address".to_string(),
140 })
141 })?;
142 let peer_info = PeerInfo::new(peer_id.clone(), socket_addr);
143
144 self.add_peer(peer_info.clone()).await?;
145
146 if let Some(discovery) = &self.discovery {
148 discovery.add_peer(peer_info).await?;
149 discovery.mark_connected(&peer_id).await?;
150 }
151
152 Ok(())
153 }
154
155 pub async fn get_peers(&self) -> Vec<PeerInfo> {
157 let peers = self.peers.read().await;
158 peers.values().cloned().collect()
159 }
160
161 pub fn peers(&self) -> Vec<PeerInfo> {
163 Vec::new()
166 }
167
168 pub fn id(&self) -> &PeerId {
170 &self.id
171 }
172
173 pub fn port(&self) -> u16 {
175 self.config.port
176 }
177
178 pub fn host(&self) -> &str {
180 "127.0.0.1" }
182
183 pub fn max_peers(&self) -> usize {
185 self.config.max_peers
186 }
187
188 pub async fn create_shared_message(&mut self, data: String) -> Result<String> {
190 let message_data = serde_json::to_value(&data).map_err(|e| {
191 ChaincraftError::Serialization(crate::error::SerializationError::Json(e))
192 })?;
193 let message =
194 SharedMessage::new(MessageType::Custom("user_message".to_string()), message_data);
195 let hash = message.hash.clone();
196 let json = message.to_json()?;
197 self.storage.put(&hash, json.as_bytes().to_vec()).await?;
198 Ok(hash)
199 }
200
201 pub fn has_object(&self, _hash: &str) -> bool {
203 true
205 }
206
207 pub async fn get_object(&self, hash: &str) -> Result<String> {
209 if let Some(bytes) = self.storage.get(hash).await? {
210 let s = String::from_utf8(bytes).map_err(|e| {
211 ChaincraftError::Serialization(crate::error::SerializationError::Json(
212 SerdeDeError::custom(e),
213 ))
214 })?;
215 Ok(s)
216 } else {
217 Err(ChaincraftError::Storage(crate::error::StorageError::KeyNotFound {
218 key: hash.to_string(),
219 }))
220 }
221 }
222
223 pub fn db_size(&self) -> usize {
225 1
228 }
229
230 pub async fn add_shared_object(
232 &self,
233 object: Box<dyn ApplicationObject>,
234 ) -> Result<SharedObjectId> {
235 let mut registry = self.app_objects.write().await;
236 let id = registry.register(object);
237 Ok(id)
238 }
239
240 pub async fn shared_objects(&self) -> Vec<Box<dyn ApplicationObject>> {
242 let registry = self.app_objects.read().await;
243 registry
244 .ids()
245 .into_iter()
246 .filter_map(|id| registry.get(&id))
247 .map(|obj| obj.clone_box())
248 .collect()
249 }
250
251 pub async fn shared_object_count(&self) -> usize {
253 let registry = self.app_objects.read().await;
254 registry.len()
255 }
256
257 pub async fn create_shared_message_with_data(
259 &mut self,
260 data: serde_json::Value,
261 ) -> Result<String> {
262 let message_type = if let Some(msg_type) = data.get("type").and_then(|t| t.as_str()) {
264 match msg_type {
265 "PEER_DISCOVERY" => MessageType::PeerDiscovery,
266 "REQUEST_LOCAL_PEERS" => MessageType::RequestLocalPeers,
267 "LOCAL_PEERS" => MessageType::LocalPeers,
268 "REQUEST_SHARED_OBJECT_UPDATE" => MessageType::RequestSharedObjectUpdate,
269 "SHARED_OBJECT_UPDATE" => MessageType::SharedObjectUpdate,
270 "GET" => MessageType::Get,
271 "SET" => MessageType::Set,
272 "DELETE" => MessageType::Delete,
273 "RESPONSE" => MessageType::Response,
274 "NOTIFICATION" => MessageType::Notification,
275 "HEARTBEAT" => MessageType::Heartbeat,
276 "ERROR" => MessageType::Error,
277 _ => MessageType::Custom(msg_type.to_string()),
278 }
279 } else {
280 MessageType::Custom("user_message".to_string())
281 };
282
283 let message = SharedMessage::new(message_type, data.clone());
284 let hash = message.hash.clone();
285 let json = message.to_json()?;
286 self.storage.put(&hash, json.as_bytes().to_vec()).await?;
288 let mut app_registry = self.app_objects.write().await;
290 let _processed = app_registry.process_message(message).await?;
291 Ok(hash)
292 }
293
294 pub async fn get_state(&self) -> Result<serde_json::Value> {
296 Ok(serde_json::json!({
297 "node_id": self.id.to_string(),
298 "running": *self.running.read().await,
299 "port": self.config.port,
300 "max_peers": self.config.max_peers,
301 "peer_count": self.peers.read().await.len(),
302 "messages": "stored", "shared_objects": self.shared_object_count().await
304 }))
305 }
306
307 pub async fn get_discovery_info(&self) -> serde_json::Value {
309 serde_json::json!({
310 "node_id": self.id.to_string(),
311 "host": self.host(),
312 "port": self.port(),
313 "max_peers": self.max_peers(),
314 "peer_count": self.peers.read().await.len()
315 })
316 }
317
318 pub fn set_port(&mut self, port: u16) {
320 self.config.port = port;
321 }
322
323 pub fn is_running(&self) -> bool {
325 futures::executor::block_on(async { *self.running.read().await })
327 }
328}
329
330#[derive(Debug, Clone)]
332pub struct NodeConfig {
333 pub max_peers: usize,
335
336 pub port: u16,
338
339 pub consensus_enabled: bool,
341}
342
343impl Default for NodeConfig {
344 fn default() -> Self {
345 Self {
346 max_peers: 50,
347 port: 8080,
348 consensus_enabled: true,
349 }
350 }
351}
352
353pub struct ChaincraftNodeBuilder {
355 id: Option<PeerId>,
356 storage: Option<Arc<dyn Storage>>,
357 config: NodeConfig,
358 persistent: bool,
359}
360
361impl ChaincraftNodeBuilder {
362 pub fn new() -> Self {
364 Self {
365 id: None,
366 storage: None,
367 config: NodeConfig::default(),
368 persistent: false,
369 }
370 }
371
372 pub fn with_id(mut self, id: PeerId) -> Self {
374 self.id = Some(id);
375 self
376 }
377
378 pub fn with_storage(mut self, storage: Arc<dyn Storage>) -> Self {
380 self.storage = Some(storage);
381 self
382 }
383
384 pub fn with_persistent_storage(mut self, persistent: bool) -> Self {
386 self.persistent = persistent;
387 self
388 }
389
390 pub fn with_config(mut self, config: NodeConfig) -> Self {
392 self.config = config;
393 self
394 }
395
396 pub fn port(mut self, port: u16) -> Self {
398 self.config.port = port;
399 self
400 }
401
402 pub fn max_peers(mut self, max_peers: usize) -> Self {
404 self.config.max_peers = max_peers;
405 self
406 }
407
408 pub fn build(self) -> Result<ChaincraftNode> {
410 let id = self.id.unwrap_or_else(|| {
412 use crate::network::PeerId;
413 PeerId::new()
414 });
415
416 let storage = self.storage.unwrap_or_else(|| {
418 use crate::storage::MemoryStorage;
419 Arc::new(MemoryStorage::new())
420 });
421
422 Ok(ChaincraftNode {
423 id,
424 registry: Arc::new(RwLock::new(SharedObjectRegistry::new())),
425 app_objects: Arc::new(RwLock::new(ApplicationObjectRegistry::new())),
426 discovery: None, storage,
428 peers: Arc::new(RwLock::new(HashMap::new())),
429 config: self.config,
430 running: Arc::new(RwLock::new(false)),
431 })
432 }
433}
434
435impl Default for ChaincraftNodeBuilder {
436 fn default() -> Self {
437 Self::new()
438 }
439}