1use crate::{
2 message::{Message, MessageError, MessageType},
3 persistence::{
4 MemoryBackend, PersistedState, PersistenceError, PersistenceManager, SqliteBackend,
5 StatePersistence, StateProvider,
6 },
7 state::ProtocolStateMachine,
8 types::{ProtocolError, ProtocolEvent},
9};
10use qudag_crypto::ml_kem::MlKem768;
11use qudag_dag::Consensus;
12use qudag_network::Transport;
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::path::PathBuf;
16use std::sync::Arc;
17use tokio::sync::{mpsc, RwLock};
18use tracing::{debug, info, warn};
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct NodeConfig {
42 pub data_dir: PathBuf,
44 pub network_port: u16,
46 pub max_peers: usize,
48 pub initial_peers: Vec<String>,
50}
51
52impl Default for NodeConfig {
53 fn default() -> Self {
54 Self {
55 data_dir: PathBuf::from("./data"),
56 network_port: 8000,
57 max_peers: 50,
58 initial_peers: Vec::new(),
59 }
60 }
61}
62
63pub struct Node {
65 #[allow(dead_code)]
67 config: NodeConfig,
68 state_machine: Arc<RwLock<ProtocolStateMachine>>,
70 #[allow(dead_code)]
72 events: NodeEvents,
73 keys: Option<KeyPair>,
75 transport: Option<Arc<dyn Transport + Send + Sync>>,
77 #[allow(dead_code)]
79 consensus: Option<Arc<dyn Consensus + Send + Sync>>,
80 persistence: Option<PersistenceManager>,
82 pub node_id: Vec<u8>,
84}
85
86struct NodeEvents {
88 #[allow(dead_code)]
90 tx: mpsc::Sender<ProtocolEvent>,
91 #[allow(dead_code)]
93 rx: mpsc::Receiver<ProtocolEvent>,
94}
95
96struct KeyPair {
98 #[allow(dead_code)]
100 public_key: Vec<u8>,
101 #[allow(dead_code)]
103 private_key: Vec<u8>,
104}
105
106impl Node {
107 pub async fn new(config: NodeConfig) -> Result<Self, ProtocolError> {
109 let (tx, rx) = mpsc::channel(1000);
110
111 let node_id = Self::generate_node_id();
113
114 let state_machine = Arc::new(RwLock::new(ProtocolStateMachine::new(
116 crate::message::ProtocolVersion::CURRENT,
117 )));
118
119 Ok(Self {
120 config,
121 state_machine,
122 events: NodeEvents { tx, rx },
123 keys: None,
124 transport: None,
125 consensus: None,
126 persistence: None,
127 node_id,
128 })
129 }
130
131 pub async fn with_persistence(config: NodeConfig) -> Result<Self, ProtocolError> {
133 let mut node = Self::new(config.clone()).await?;
134
135 let backend: Arc<dyn StatePersistence> = if config.data_dir.join("state.db").exists() {
137 let db_path = config.data_dir.join("state.db");
139 Arc::new(SqliteBackend::new(db_path).await.map_err(|e| {
140 ProtocolError::Internal(format!("Failed to create SQLite backend: {}", e))
141 })?)
142 } else {
143 Arc::new(MemoryBackend::default())
145 };
146
147 let persistence_manager: PersistenceManager = backend;
149
150 if let Some(recovered_state) = persistence_manager
152 .recover_state()
153 .await
154 .map_err(|e| ProtocolError::Internal(format!("Failed to recover state: {}", e)))?
155 {
156 info!("Recovered state from persistence");
157
158 let _state_machine = node.state_machine.write().await;
160 debug!(
163 "Recovered {} peers and {} sessions",
164 recovered_state.peers.len(),
165 recovered_state.sessions.len()
166 );
167 }
168
169 node.persistence = Some(persistence_manager);
170 Ok(node)
171 }
172
173 fn generate_node_id() -> Vec<u8> {
174 use rand::RngCore;
175 let mut rng = rand::thread_rng();
176 let mut id = vec![0u8; 32];
177 rng.fill_bytes(&mut id);
178 id
179 }
180
181 pub async fn start(&mut self) -> Result<(), ProtocolError> {
183 info!("Starting node...");
184
185 self.init_keys().await?;
187
188 self.init_transport().await?;
190
191 self.init_consensus().await?;
193
194 let mut state_machine = self.state_machine.write().await;
196
197 state_machine
199 .transition_to(
200 crate::state::ProtocolState::Handshake(crate::state::HandshakeState::Waiting),
201 "Node starting handshake".to_string(),
202 )
203 .map_err(|e| ProtocolError::StateError(e.to_string()))?;
204
205 state_machine
207 .transition_to(
208 crate::state::ProtocolState::Handshake(crate::state::HandshakeState::InProgress),
209 "Handshake in progress".to_string(),
210 )
211 .map_err(|e| ProtocolError::StateError(e.to_string()))?;
212
213 state_machine
214 .transition_to(
215 crate::state::ProtocolState::Handshake(crate::state::HandshakeState::Processing),
216 "Processing handshake".to_string(),
217 )
218 .map_err(|e| ProtocolError::StateError(e.to_string()))?;
219
220 state_machine
221 .transition_to(
222 crate::state::ProtocolState::Handshake(crate::state::HandshakeState::Completed),
223 "Handshake completed".to_string(),
224 )
225 .map_err(|e| ProtocolError::StateError(e.to_string()))?;
226
227 state_machine
229 .transition_to(
230 crate::state::ProtocolState::Active(crate::state::ActiveState::Normal),
231 "Node started".to_string(),
232 )
233 .map_err(|e| ProtocolError::StateError(e.to_string()))?;
234
235 drop(state_machine);
236
237 info!("Node started successfully");
242 Ok(())
243 }
244
245 pub async fn stop(&mut self) -> Result<(), ProtocolError> {
247 info!("Stopping node...");
248
249 let mut state_machine = self.state_machine.write().await;
251 state_machine
252 .transition_to(
253 crate::state::ProtocolState::Shutdown,
254 "Node stopping".to_string(),
255 )
256 .map_err(|e| ProtocolError::StateError(e.to_string()))?;
257 drop(state_machine);
258
259 if let Some(persistence) = &self.persistence {
261 let state = self.get_current_state().await.map_err(|e| {
262 ProtocolError::Internal(format!("Failed to get state for save: {}", e))
263 })?;
264 persistence.save_state(&state).await.map_err(|e| {
265 ProtocolError::Internal(format!("Failed to save final state: {}", e))
266 })?;
267 }
268
269 if let Some(_transport) = &self.transport {
271 }
273
274 info!("Node stopped successfully");
275 Ok(())
276 }
277
278 pub async fn handle_message(&mut self, message: Message) -> Result<(), MessageError> {
280 debug!("Handling message: {:?}", message.msg_type);
281
282 match message.msg_type {
290 MessageType::Handshake(_) => self.handle_handshake(message).await?,
291 MessageType::Data(_) => self.handle_data(message).await?,
292 MessageType::Control(_) => self.handle_control(message).await?,
293 MessageType::Sync(_) => self.handle_sync(message).await?,
294 _ => return Err(MessageError::InvalidFormat),
295 }
296
297 Ok(())
298 }
299
300 async fn init_keys(&mut self) -> Result<(), ProtocolError> {
302 let (pk, sk) = MlKem768::keygen().map_err(|e| ProtocolError::CryptoError(e.to_string()))?;
304
305 self.keys = Some(KeyPair {
306 public_key: pk.as_bytes().to_vec(),
307 private_key: sk.as_bytes().to_vec(),
308 });
309
310 Ok(())
311 }
312
313 async fn init_transport(&mut self) -> Result<(), ProtocolError> {
315 Ok(())
317 }
318
319 async fn init_consensus(&mut self) -> Result<(), ProtocolError> {
321 Ok(())
323 }
324
325 async fn handle_handshake(&mut self, _message: Message) -> Result<(), MessageError> {
327 Ok(())
329 }
330
331 async fn handle_data(&mut self, _message: Message) -> Result<(), MessageError> {
333 Ok(())
335 }
336
337 async fn handle_control(&mut self, _message: Message) -> Result<(), MessageError> {
339 Ok(())
341 }
342
343 async fn handle_sync(&mut self, _message: Message) -> Result<(), MessageError> {
345 Ok(())
347 }
348
349 pub async fn get_state(&self) -> crate::state::ProtocolState {
351 self.state_machine.read().await.current_state().clone()
352 }
353
354 pub fn has_persistence(&self) -> bool {
356 self.persistence.is_some()
357 }
358
359 pub async fn save_state(&self) -> Result<(), ProtocolError> {
361 if let Some(persistence) = &self.persistence {
362 let state = self
363 .get_current_state()
364 .await
365 .map_err(|e| ProtocolError::Internal(format!("Failed to get state: {}", e)))?;
366 persistence
367 .save_state(&state)
368 .await
369 .map_err(|e| ProtocolError::Internal(format!("Failed to save state: {}", e)))?;
370 info!("State saved successfully");
371 } else {
372 warn!("No persistence backend configured");
373 }
374 Ok(())
375 }
376
377 pub async fn create_backup(&self, backup_path: PathBuf) -> Result<(), ProtocolError> {
379 if let Some(persistence) = &self.persistence {
380 persistence
381 .create_backup(&backup_path)
382 .await
383 .map_err(|e| ProtocolError::Internal(format!("Failed to create backup: {}", e)))?;
384 info!("Backup created at {:?}", backup_path);
385 } else {
386 return Err(ProtocolError::Internal(
387 "No persistence backend configured".to_string(),
388 ));
389 }
390 Ok(())
391 }
392
393 pub async fn restore_backup(&self, backup_path: PathBuf) -> Result<(), ProtocolError> {
395 if let Some(persistence) = &self.persistence {
396 persistence
397 .restore_backup(&backup_path)
398 .await
399 .map_err(|e| ProtocolError::Internal(format!("Failed to restore backup: {}", e)))?;
400 info!("Backup restored from {:?}", backup_path);
401 } else {
402 return Err(ProtocolError::Internal(
403 "No persistence backend configured".to_string(),
404 ));
405 }
406 Ok(())
407 }
408}
409
410impl Node {
411 pub async fn get_current_state(&self) -> Result<PersistedState, PersistenceError> {
413 let state_machine = self.state_machine.read().await;
414 let current_state = state_machine.current_state().clone();
415 let sessions = state_machine.get_sessions().clone();
416 let metrics = state_machine.get_metrics();
417
418 let peers = vec![];
420
421 let dag_state = crate::persistence::DagState {
423 vertices: HashMap::new(),
424 tips: std::collections::HashSet::new(),
425 voting_records: HashMap::new(),
426 last_checkpoint: None,
427 };
428
429 Ok(PersistedState {
430 version: crate::persistence::CURRENT_STATE_VERSION,
431 node_id: self.node_id.clone(),
432 protocol_state: current_state,
433 sessions,
434 peers,
435 dag_state,
436 metrics,
437 last_saved: std::time::SystemTime::now()
438 .duration_since(std::time::UNIX_EPOCH)
439 .unwrap()
440 .as_secs(),
441 })
442 }
443}
444
445pub struct NodeStateProvider {
447 node: Arc<RwLock<Node>>,
448}
449
450impl NodeStateProvider {
451 pub fn new(node: Arc<RwLock<Node>>) -> Self {
453 Self { node }
454 }
455}
456
457impl StateProvider for NodeStateProvider {
458 fn get_state_store(&self) -> Arc<dyn crate::persistence::StateStore + Send + Sync> {
459 if let Ok(node) = self.node.try_read() {
462 if let Some(persistence) = &node.persistence {
463 return persistence.clone();
465 }
466 }
467
468 Arc::new(crate::persistence::MemoryStateStore::new())
470 }
471}
472
473#[cfg(test)]
474mod tests {
475 use super::*;
476
477 #[tokio::test]
478 async fn test_node_lifecycle() {
479 let config = NodeConfig::default();
480 let mut node = Node::new(config).await.unwrap();
481
482 assert_eq!(node.get_state().await, crate::state::ProtocolState::Initial);
483
484 node.start().await.unwrap();
485 assert!(matches!(
486 node.get_state().await,
487 crate::state::ProtocolState::Active(_)
488 ));
489
490 node.stop().await.unwrap();
491 assert_eq!(
492 node.get_state().await,
493 crate::state::ProtocolState::Shutdown
494 );
495 }
496
497 #[tokio::test]
498 async fn test_node_persistence() {
499 use tempfile::TempDir;
500
501 let temp_dir = TempDir::new().unwrap();
502 let config = NodeConfig {
503 data_dir: temp_dir.path().to_path_buf(),
504 ..Default::default()
505 };
506
507 let mut node = Node::with_persistence(config.clone()).await.unwrap();
509
510 node.start().await.unwrap();
512
513 node.save_state().await.unwrap();
515
516 let backup_path = temp_dir.path().join("backup");
518 std::fs::create_dir_all(&backup_path).unwrap();
519 node.create_backup(backup_path.clone()).await.unwrap();
520
521 node.stop().await.unwrap();
523
524 let node2 = Node::with_persistence(config).await.unwrap();
526 assert!(node2.persistence.is_some());
527 }
528}