qudag_protocol/
node.rs

1use crate::{
2    message::{Message, MessageError, MessageType},
3    persistence::{
4        MemoryBackend, PersistedState, PersistenceError, PersistenceManager, SqliteBackend,
5        StatePersistence, StateProvider,
6    },
7    state::ProtocolStateMachine,
8    types::{ProtocolError, ProtocolEvent},
9};
10use qudag_crypto::ml_kem::MlKem768;
11use qudag_dag::Consensus;
12use qudag_network::Transport;
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::path::PathBuf;
16use std::sync::Arc;
17use tokio::sync::{mpsc, RwLock};
18use tracing::{debug, info, warn};
19
20/// Node configuration
21///
22/// # Examples
23///
24/// ```rust
25/// use qudag_protocol::NodeConfig;
26/// use std::path::PathBuf;
27///
28/// // Create default configuration
29/// let config = NodeConfig::default();
30/// assert_eq!(config.network_port, 8000);
31///
32/// // Create custom configuration
33/// let custom_config = NodeConfig {
34///     data_dir: PathBuf::from("/custom/data"),
35///     network_port: 9000,
36///     max_peers: 100,
37///     initial_peers: vec!["peer1:8000".to_string(), "peer2:8000".to_string()],
38/// };
39/// ```
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct NodeConfig {
42    /// Data directory
43    pub data_dir: PathBuf,
44    /// Network port
45    pub network_port: u16,
46    /// Maximum peers
47    pub max_peers: usize,
48    /// Initial peers
49    pub initial_peers: Vec<String>,
50}
51
52impl Default for NodeConfig {
53    fn default() -> Self {
54        Self {
55            data_dir: PathBuf::from("./data"),
56            network_port: 8000,
57            max_peers: 50,
58            initial_peers: Vec::new(),
59        }
60    }
61}
62
63/// Protocol node with persistence support
64pub struct Node {
65    /// Node configuration
66    #[allow(dead_code)]
67    config: NodeConfig,
68    /// Protocol state machine
69    state_machine: Arc<RwLock<ProtocolStateMachine>>,
70    /// Event channels
71    #[allow(dead_code)]
72    events: NodeEvents,
73    /// Cryptographic keys
74    keys: Option<KeyPair>,
75    /// Network transport
76    transport: Option<Arc<dyn Transport + Send + Sync>>,
77    /// Consensus engine
78    #[allow(dead_code)]
79    consensus: Option<Arc<dyn Consensus + Send + Sync>>,
80    /// Persistence manager
81    persistence: Option<PersistenceManager>,
82    /// Node ID
83    pub node_id: Vec<u8>,
84}
85
86/// Node event channels
87struct NodeEvents {
88    /// Event sender
89    #[allow(dead_code)]
90    tx: mpsc::Sender<ProtocolEvent>,
91    /// Event receiver
92    #[allow(dead_code)]
93    rx: mpsc::Receiver<ProtocolEvent>,
94}
95
96/// Cryptographic key pair
97struct KeyPair {
98    /// Public key
99    #[allow(dead_code)]
100    public_key: Vec<u8>,
101    /// Private key
102    #[allow(dead_code)]
103    private_key: Vec<u8>,
104}
105
106impl Node {
107    /// Create new node
108    pub async fn new(config: NodeConfig) -> Result<Self, ProtocolError> {
109        let (tx, rx) = mpsc::channel(1000);
110
111        // Generate node ID
112        let node_id = Self::generate_node_id();
113
114        // Initialize state machine
115        let state_machine = Arc::new(RwLock::new(ProtocolStateMachine::new(
116            crate::message::ProtocolVersion::CURRENT,
117        )));
118
119        Ok(Self {
120            config,
121            state_machine,
122            events: NodeEvents { tx, rx },
123            keys: None,
124            transport: None,
125            consensus: None,
126            persistence: None,
127            node_id,
128        })
129    }
130
131    /// Create new node with persistence
132    pub async fn with_persistence(config: NodeConfig) -> Result<Self, ProtocolError> {
133        let mut node = Self::new(config.clone()).await?;
134
135        // Create persistence backend based on configuration
136        let backend: Arc<dyn StatePersistence> = if config.data_dir.join("state.db").exists() {
137            // Use SQLite for lightweight persistence
138            let db_path = config.data_dir.join("state.db");
139            Arc::new(SqliteBackend::new(db_path).await.map_err(|e| {
140                ProtocolError::Internal(format!("Failed to create SQLite backend: {}", e))
141            })?)
142        } else {
143            // Use memory backend for testing
144            Arc::new(MemoryBackend::default())
145        };
146
147        // Create persistence manager
148        let persistence_manager: PersistenceManager = backend;
149
150        // Try to recover state
151        if let Some(recovered_state) = persistence_manager
152            .recover_state()
153            .await
154            .map_err(|e| ProtocolError::Internal(format!("Failed to recover state: {}", e)))?
155        {
156            info!("Recovered state from persistence");
157
158            // Restore state machine
159            let _state_machine = node.state_machine.write().await;
160            // TODO: Implement proper state restoration
161            // For now, just log the recovery
162            debug!(
163                "Recovered {} peers and {} sessions",
164                recovered_state.peers.len(),
165                recovered_state.sessions.len()
166            );
167        }
168
169        node.persistence = Some(persistence_manager);
170        Ok(node)
171    }
172
173    fn generate_node_id() -> Vec<u8> {
174        use rand::RngCore;
175        let mut rng = rand::thread_rng();
176        let mut id = vec![0u8; 32];
177        rng.fill_bytes(&mut id);
178        id
179    }
180
181    /// Start node
182    pub async fn start(&mut self) -> Result<(), ProtocolError> {
183        info!("Starting node...");
184
185        // Initialize cryptographic keys
186        self.init_keys().await?;
187
188        // Initialize network transport
189        self.init_transport().await?;
190
191        // Initialize consensus engine
192        self.init_consensus().await?;
193
194        // Update state machine - transition through proper states
195        let mut state_machine = self.state_machine.write().await;
196
197        // First transition to Handshake
198        state_machine
199            .transition_to(
200                crate::state::ProtocolState::Handshake(crate::state::HandshakeState::Waiting),
201                "Node starting handshake".to_string(),
202            )
203            .map_err(|e| ProtocolError::StateError(e.to_string()))?;
204
205        // Skip through handshake states for now
206        state_machine
207            .transition_to(
208                crate::state::ProtocolState::Handshake(crate::state::HandshakeState::InProgress),
209                "Handshake in progress".to_string(),
210            )
211            .map_err(|e| ProtocolError::StateError(e.to_string()))?;
212
213        state_machine
214            .transition_to(
215                crate::state::ProtocolState::Handshake(crate::state::HandshakeState::Processing),
216                "Processing handshake".to_string(),
217            )
218            .map_err(|e| ProtocolError::StateError(e.to_string()))?;
219
220        state_machine
221            .transition_to(
222                crate::state::ProtocolState::Handshake(crate::state::HandshakeState::Completed),
223                "Handshake completed".to_string(),
224            )
225            .map_err(|e| ProtocolError::StateError(e.to_string()))?;
226
227        // Now transition to active
228        state_machine
229            .transition_to(
230                crate::state::ProtocolState::Active(crate::state::ActiveState::Normal),
231                "Node started".to_string(),
232            )
233            .map_err(|e| ProtocolError::StateError(e.to_string()))?;
234
235        drop(state_machine);
236
237        // Start auto-save if persistence is enabled
238        // Note: Auto-save would need to be started externally with a proper Arc<Node>
239        // to avoid self-referential issues
240
241        info!("Node started successfully");
242        Ok(())
243    }
244
245    /// Stop node
246    pub async fn stop(&mut self) -> Result<(), ProtocolError> {
247        info!("Stopping node...");
248
249        // Update state machine
250        let mut state_machine = self.state_machine.write().await;
251        state_machine
252            .transition_to(
253                crate::state::ProtocolState::Shutdown,
254                "Node stopping".to_string(),
255            )
256            .map_err(|e| ProtocolError::StateError(e.to_string()))?;
257        drop(state_machine);
258
259        // Save final state if persistence is enabled
260        if let Some(persistence) = &self.persistence {
261            let state = self.get_current_state().await.map_err(|e| {
262                ProtocolError::Internal(format!("Failed to get state for save: {}", e))
263            })?;
264            persistence.save_state(&state).await.map_err(|e| {
265                ProtocolError::Internal(format!("Failed to save final state: {}", e))
266            })?;
267        }
268
269        // Stop components
270        if let Some(_transport) = &self.transport {
271            // TODO: Implement transport stop method
272        }
273
274        info!("Node stopped successfully");
275        Ok(())
276    }
277
278    /// Handle incoming message
279    pub async fn handle_message(&mut self, message: Message) -> Result<(), MessageError> {
280        debug!("Handling message: {:?}", message.msg_type);
281
282        // Verify message
283        // TODO: Get proper public key for verification
284        // if !message.verify(&proper_public_key)? {
285        //     return Err(MessageError::InvalidSignature);
286        // }
287
288        // Process message
289        match message.msg_type {
290            MessageType::Handshake(_) => self.handle_handshake(message).await?,
291            MessageType::Data(_) => self.handle_data(message).await?,
292            MessageType::Control(_) => self.handle_control(message).await?,
293            MessageType::Sync(_) => self.handle_sync(message).await?,
294            _ => return Err(MessageError::InvalidFormat),
295        }
296
297        Ok(())
298    }
299
300    // Initialize cryptographic keys
301    async fn init_keys(&mut self) -> Result<(), ProtocolError> {
302        // Generate ML-KEM key pair
303        let (pk, sk) = MlKem768::keygen().map_err(|e| ProtocolError::CryptoError(e.to_string()))?;
304
305        self.keys = Some(KeyPair {
306            public_key: pk.as_bytes().to_vec(),
307            private_key: sk.as_bytes().to_vec(),
308        });
309
310        Ok(())
311    }
312
313    // Initialize network transport
314    async fn init_transport(&mut self) -> Result<(), ProtocolError> {
315        // TODO: Initialize transport
316        Ok(())
317    }
318
319    // Initialize consensus engine
320    async fn init_consensus(&mut self) -> Result<(), ProtocolError> {
321        // TODO: Initialize consensus
322        Ok(())
323    }
324
325    // Handle handshake message
326    async fn handle_handshake(&mut self, _message: Message) -> Result<(), MessageError> {
327        // TODO: Implement handshake
328        Ok(())
329    }
330
331    // Handle data message
332    async fn handle_data(&mut self, _message: Message) -> Result<(), MessageError> {
333        // TODO: Implement data handling
334        Ok(())
335    }
336
337    // Handle control message
338    async fn handle_control(&mut self, _message: Message) -> Result<(), MessageError> {
339        // TODO: Implement control handling
340        Ok(())
341    }
342
343    // Handle sync message
344    async fn handle_sync(&mut self, _message: Message) -> Result<(), MessageError> {
345        // TODO: Implement sync handling
346        Ok(())
347    }
348
349    /// Get current node state
350    pub async fn get_state(&self) -> crate::state::ProtocolState {
351        self.state_machine.read().await.current_state().clone()
352    }
353
354    /// Check if persistence is enabled
355    pub fn has_persistence(&self) -> bool {
356        self.persistence.is_some()
357    }
358
359    /// Save current state
360    pub async fn save_state(&self) -> Result<(), ProtocolError> {
361        if let Some(persistence) = &self.persistence {
362            let state = self
363                .get_current_state()
364                .await
365                .map_err(|e| ProtocolError::Internal(format!("Failed to get state: {}", e)))?;
366            persistence
367                .save_state(&state)
368                .await
369                .map_err(|e| ProtocolError::Internal(format!("Failed to save state: {}", e)))?;
370            info!("State saved successfully");
371        } else {
372            warn!("No persistence backend configured");
373        }
374        Ok(())
375    }
376
377    /// Create backup
378    pub async fn create_backup(&self, backup_path: PathBuf) -> Result<(), ProtocolError> {
379        if let Some(persistence) = &self.persistence {
380            persistence
381                .create_backup(&backup_path)
382                .await
383                .map_err(|e| ProtocolError::Internal(format!("Failed to create backup: {}", e)))?;
384            info!("Backup created at {:?}", backup_path);
385        } else {
386            return Err(ProtocolError::Internal(
387                "No persistence backend configured".to_string(),
388            ));
389        }
390        Ok(())
391    }
392
393    /// Restore from backup
394    pub async fn restore_backup(&self, backup_path: PathBuf) -> Result<(), ProtocolError> {
395        if let Some(persistence) = &self.persistence {
396            persistence
397                .restore_backup(&backup_path)
398                .await
399                .map_err(|e| ProtocolError::Internal(format!("Failed to restore backup: {}", e)))?;
400            info!("Backup restored from {:?}", backup_path);
401        } else {
402            return Err(ProtocolError::Internal(
403                "No persistence backend configured".to_string(),
404            ));
405        }
406        Ok(())
407    }
408}
409
410impl Node {
411    /// Get current state for persistence
412    pub async fn get_current_state(&self) -> Result<PersistedState, PersistenceError> {
413        let state_machine = self.state_machine.read().await;
414        let current_state = state_machine.current_state().clone();
415        let sessions = state_machine.get_sessions().clone();
416        let metrics = state_machine.get_metrics();
417
418        // TODO: Get actual peer list from network transport
419        let peers = vec![];
420
421        // TODO: Get actual DAG state from consensus engine
422        let dag_state = crate::persistence::DagState {
423            vertices: HashMap::new(),
424            tips: std::collections::HashSet::new(),
425            voting_records: HashMap::new(),
426            last_checkpoint: None,
427        };
428
429        Ok(PersistedState {
430            version: crate::persistence::CURRENT_STATE_VERSION,
431            node_id: self.node_id.clone(),
432            protocol_state: current_state,
433            sessions,
434            peers,
435            dag_state,
436            metrics,
437            last_saved: std::time::SystemTime::now()
438                .duration_since(std::time::UNIX_EPOCH)
439                .unwrap()
440                .as_secs(),
441        })
442    }
443}
444
445/// Wrapper to provide StateProvider implementation for Node
446pub struct NodeStateProvider {
447    node: Arc<RwLock<Node>>,
448}
449
450impl NodeStateProvider {
451    /// Create new state provider for a node
452    pub fn new(node: Arc<RwLock<Node>>) -> Self {
453        Self { node }
454    }
455}
456
457impl StateProvider for NodeStateProvider {
458    fn get_state_store(&self) -> Arc<dyn crate::persistence::StateStore + Send + Sync> {
459        // Since this is a sync method but we need to access an async RwLock,
460        // we'll use try_read() which is non-blocking
461        if let Ok(node) = self.node.try_read() {
462            if let Some(persistence) = &node.persistence {
463                // Return the actual state store from the persistence manager's backend
464                return persistence.clone();
465            }
466        }
467
468        // Fallback to memory store if no persistence is configured or lock failed
469        Arc::new(crate::persistence::MemoryStateStore::new())
470    }
471}
472
473#[cfg(test)]
474mod tests {
475    use super::*;
476
477    #[tokio::test]
478    async fn test_node_lifecycle() {
479        let config = NodeConfig::default();
480        let mut node = Node::new(config).await.unwrap();
481
482        assert_eq!(node.get_state().await, crate::state::ProtocolState::Initial);
483
484        node.start().await.unwrap();
485        assert!(matches!(
486            node.get_state().await,
487            crate::state::ProtocolState::Active(_)
488        ));
489
490        node.stop().await.unwrap();
491        assert_eq!(
492            node.get_state().await,
493            crate::state::ProtocolState::Shutdown
494        );
495    }
496
497    #[tokio::test]
498    async fn test_node_persistence() {
499        use tempfile::TempDir;
500
501        let temp_dir = TempDir::new().unwrap();
502        let config = NodeConfig {
503            data_dir: temp_dir.path().to_path_buf(),
504            ..Default::default()
505        };
506
507        // Create node with persistence
508        let mut node = Node::with_persistence(config.clone()).await.unwrap();
509
510        // Start node
511        node.start().await.unwrap();
512
513        // Save state
514        node.save_state().await.unwrap();
515
516        // Create backup
517        let backup_path = temp_dir.path().join("backup");
518        std::fs::create_dir_all(&backup_path).unwrap();
519        node.create_backup(backup_path.clone()).await.unwrap();
520
521        // Stop node
522        node.stop().await.unwrap();
523
524        // Create new node and verify state recovery
525        let node2 = Node::with_persistence(config).await.unwrap();
526        assert!(node2.persistence.is_some());
527    }
528}