1use crate::{
4 discovery::{DiscoveryConfig, DiscoveryManager},
5 error::{ChaincraftError, Result},
6 network::{PeerId, PeerInfo},
7 shared::{MessageType, SharedMessage, SharedObjectId, SharedObjectRegistry},
8 shared_object::{ApplicationObject, ApplicationObjectRegistry, SimpleSharedNumber},
9 storage::{MemoryStorage, Storage},
10};
11
12use serde::de::Error as SerdeDeError;
13use std::{collections::HashMap, sync::Arc};
14use tokio::sync::RwLock;
15
16pub struct ChaincraftNode {
18 pub id: PeerId,
20 pub registry: Arc<RwLock<SharedObjectRegistry>>,
22 pub app_objects: Arc<RwLock<ApplicationObjectRegistry>>,
24 pub discovery: Option<DiscoveryManager>,
26 pub storage: Arc<dyn Storage>,
28 pub peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
30 pub config: NodeConfig,
32 pub running: Arc<RwLock<bool>>,
34}
35
36impl ChaincraftNode {
37 pub fn new(id: PeerId, storage: Arc<dyn Storage>) -> Self {
39 Self::builder()
40 .with_id(id)
41 .with_storage(storage)
42 .build()
43 .expect("Failed to create node")
44 }
45
46 pub fn default() -> Self {
48 Self::new(PeerId::new(), Arc::new(MemoryStorage::new()))
49 }
50
51 pub fn new_default() -> Self {
53 Self::default()
54 }
55
56 pub fn builder() -> ChaincraftNodeBuilder {
58 ChaincraftNodeBuilder::new()
59 }
60
61 pub async fn start(&mut self) -> Result<()> {
63 self.storage.initialize().await?;
65
66 *self.running.write().await = true;
68
69 Ok(())
74 }
75
76 pub async fn stop(&mut self) -> Result<()> {
78 *self.running.write().await = false;
79 Ok(())
81 }
82
83 pub async fn close(&mut self) -> Result<()> {
85 self.stop().await
86 }
87
88 pub async fn is_running_async(&self) -> bool {
90 *self.running.read().await
91 }
92
93 pub async fn add_peer(&self, peer: PeerInfo) -> Result<()> {
95 let mut peers = self.peers.write().await;
96 peers.insert(peer.id.clone(), peer);
97 Ok(())
98 }
99
100 pub async fn remove_peer(&self, peer_id: &PeerId) -> Result<()> {
102 let mut peers = self.peers.write().await;
103 peers.remove(peer_id);
104 Ok(())
105 }
106
107 pub async fn connect_to_peer(&mut self, peer_addr: &str) -> Result<()> {
109 self.connect_to_peer_with_discovery(peer_addr, false).await
110 }
111
112 pub async fn connect_to_peer_with_discovery(
114 &mut self,
115 peer_addr: &str,
116 _discovery: bool,
117 ) -> Result<()> {
118 let parts: Vec<&str> = peer_addr.split(':').collect();
120 if parts.len() != 2 {
121 return Err(ChaincraftError::Network(crate::error::NetworkError::InvalidMessage {
122 reason: "Invalid peer address format".to_string(),
123 }));
124 }
125
126 let host = parts[0].to_string();
127 let port: u16 = parts[1].parse().map_err(|_| {
128 ChaincraftError::Network(crate::error::NetworkError::InvalidMessage {
129 reason: "Invalid port number".to_string(),
130 })
131 })?;
132
133 let peer_id = PeerId::new(); let socket_addr = format!("{}:{}", host, port).parse().map_err(|_| {
135 ChaincraftError::Network(crate::error::NetworkError::InvalidMessage {
136 reason: "Invalid socket address".to_string(),
137 })
138 })?;
139 let peer_info = PeerInfo::new(peer_id.clone(), socket_addr);
140
141 self.add_peer(peer_info.clone()).await?;
142
143 if let Some(discovery) = &self.discovery {
145 discovery.add_peer(peer_info).await?;
146 discovery.mark_connected(&peer_id).await?;
147 }
148
149 Ok(())
150 }
151
152 pub async fn get_peers(&self) -> Vec<PeerInfo> {
154 let peers = self.peers.read().await;
155 peers.values().cloned().collect()
156 }
157
158 pub fn peers(&self) -> Vec<PeerInfo> {
160 Vec::new()
163 }
164
165 pub fn id(&self) -> &PeerId {
167 &self.id
168 }
169
170 pub fn port(&self) -> u16 {
172 self.config.port
173 }
174
175 pub fn host(&self) -> &str {
177 "127.0.0.1" }
179
180 pub fn max_peers(&self) -> usize {
182 self.config.max_peers
183 }
184
185 pub async fn create_shared_message(&mut self, data: String) -> Result<String> {
187 let message_data = serde_json::to_value(&data).map_err(|e| {
188 ChaincraftError::Serialization(crate::error::SerializationError::Json(e))
189 })?;
190 let message =
191 SharedMessage::new(MessageType::Custom("user_message".to_string()), message_data);
192 let hash = message.hash.clone();
193 let json = message.to_json()?;
194 self.storage.put(&hash, json.as_bytes().to_vec()).await?;
195 Ok(hash)
196 }
197
198 pub fn has_object(&self, _hash: &str) -> bool {
200 true
202 }
203
204 pub async fn get_object(&self, hash: &str) -> Result<String> {
206 if let Some(bytes) = self.storage.get(hash).await? {
207 let s = String::from_utf8(bytes).map_err(|e| {
208 ChaincraftError::Serialization(crate::error::SerializationError::Json(
209 SerdeDeError::custom(e),
210 ))
211 })?;
212 Ok(s)
213 } else {
214 Err(ChaincraftError::Storage(crate::error::StorageError::KeyNotFound {
215 key: hash.to_string(),
216 }))
217 }
218 }
219
220 pub fn db_size(&self) -> usize {
222 1
225 }
226
227 pub async fn add_shared_object(
229 &self,
230 object: Box<dyn ApplicationObject>,
231 ) -> Result<SharedObjectId> {
232 let mut registry = self.app_objects.write().await;
233 let id = registry.register(object);
234 Ok(id)
235 }
236
237 pub async fn shared_objects(&self) -> Vec<Box<dyn ApplicationObject>> {
239 let registry = self.app_objects.read().await;
240 registry
241 .ids()
242 .into_iter()
243 .filter_map(|id| registry.get(&id))
244 .map(|obj| obj.clone_box())
245 .collect()
246 }
247
248 pub async fn shared_object_count(&self) -> usize {
250 let registry = self.app_objects.read().await;
251 registry.len()
252 }
253
254 pub async fn create_shared_message_with_data(
256 &mut self,
257 data: serde_json::Value,
258 ) -> Result<String> {
259 let message_type = if let Some(msg_type) = data.get("type").and_then(|t| t.as_str()) {
261 match msg_type {
262 "PEER_DISCOVERY" => MessageType::PeerDiscovery,
263 "REQUEST_LOCAL_PEERS" => MessageType::RequestLocalPeers,
264 "LOCAL_PEERS" => MessageType::LocalPeers,
265 "REQUEST_SHARED_OBJECT_UPDATE" => MessageType::RequestSharedObjectUpdate,
266 "SHARED_OBJECT_UPDATE" => MessageType::SharedObjectUpdate,
267 "GET" => MessageType::Get,
268 "SET" => MessageType::Set,
269 "DELETE" => MessageType::Delete,
270 "RESPONSE" => MessageType::Response,
271 "NOTIFICATION" => MessageType::Notification,
272 "HEARTBEAT" => MessageType::Heartbeat,
273 "ERROR" => MessageType::Error,
274 _ => MessageType::Custom(msg_type.to_string()),
275 }
276 } else {
277 MessageType::Custom("user_message".to_string())
278 };
279
280 let message = SharedMessage::new(message_type, data.clone());
281 let hash = message.hash.clone();
282 let json = message.to_json()?;
283 self.storage.put(&hash, json.as_bytes().to_vec()).await?;
285 let mut app_registry = self.app_objects.write().await;
287 let _processed = app_registry.process_message(message).await?;
288 Ok(hash)
289 }
290
291 pub async fn get_state(&self) -> Result<serde_json::Value> {
293 Ok(serde_json::json!({
294 "node_id": self.id.to_string(),
295 "running": *self.running.read().await,
296 "port": self.config.port,
297 "max_peers": self.config.max_peers,
298 "peer_count": self.peers.read().await.len(),
299 "messages": "stored", "shared_objects": self.shared_object_count().await
301 }))
302 }
303
304 pub async fn get_discovery_info(&self) -> serde_json::Value {
306 serde_json::json!({
307 "node_id": self.id.to_string(),
308 "host": self.host(),
309 "port": self.port(),
310 "max_peers": self.max_peers(),
311 "peer_count": self.peers.read().await.len()
312 })
313 }
314
315 pub fn set_port(&mut self, port: u16) {
317 self.config.port = port;
318 }
319
320 pub fn is_running(&self) -> bool {
322 futures::executor::block_on(async { *self.running.read().await })
324 }
325}
326
327#[derive(Debug, Clone)]
329pub struct NodeConfig {
330 pub max_peers: usize,
332
333 pub port: u16,
335
336 pub consensus_enabled: bool,
338}
339
340impl Default for NodeConfig {
341 fn default() -> Self {
342 Self {
343 max_peers: 50,
344 port: 8080,
345 consensus_enabled: true,
346 }
347 }
348}
349
350pub struct ChaincraftNodeBuilder {
352 id: Option<PeerId>,
353 storage: Option<Arc<dyn Storage>>,
354 config: NodeConfig,
355 persistent: bool,
356}
357
358impl ChaincraftNodeBuilder {
359 pub fn new() -> Self {
361 Self {
362 id: None,
363 storage: None,
364 config: NodeConfig::default(),
365 persistent: false,
366 }
367 }
368
369 pub fn with_id(mut self, id: PeerId) -> Self {
371 self.id = Some(id);
372 self
373 }
374
375 pub fn with_storage(mut self, storage: Arc<dyn Storage>) -> Self {
377 self.storage = Some(storage);
378 self
379 }
380
381 pub fn with_persistent_storage(mut self, persistent: bool) -> Self {
383 self.persistent = persistent;
384 self
385 }
386
387 pub fn with_config(mut self, config: NodeConfig) -> Self {
389 self.config = config;
390 self
391 }
392
393 pub fn port(mut self, port: u16) -> Self {
395 self.config.port = port;
396 self
397 }
398
399 pub fn max_peers(mut self, max_peers: usize) -> Self {
401 self.config.max_peers = max_peers;
402 self
403 }
404
405 pub fn build(self) -> Result<ChaincraftNode> {
407 let id = self.id.unwrap_or_else(|| {
409 use crate::network::PeerId;
410 PeerId::new()
411 });
412
413 let storage = self.storage.unwrap_or_else(|| {
415 use crate::storage::MemoryStorage;
416 Arc::new(MemoryStorage::new())
417 });
418
419 Ok(ChaincraftNode {
420 id,
421 registry: Arc::new(RwLock::new(SharedObjectRegistry::new())),
422 app_objects: Arc::new(RwLock::new(ApplicationObjectRegistry::new())),
423 discovery: None, storage,
425 peers: Arc::new(RwLock::new(HashMap::new())),
426 config: self.config,
427 running: Arc::new(RwLock::new(false)),
428 })
429 }
430}
431
432impl Default for ChaincraftNodeBuilder {
433 fn default() -> Self {
434 Self::new()
435 }
436}