qudag_protocol/
node_runner.rs

1use std::sync::Arc;
2use std::time::Duration;
3use thiserror::Error;
4use tokio::select;
5use tokio::sync::{mpsc, Mutex, RwLock};
6use tracing::{debug, error, info, warn};
7
8// Import network components
9use qudag_network::{
10    p2p::{NetworkConfig as P2PNetworkConfig, P2PEvent, P2PNode, QuDagResponse},
11    DarkResolver, P2PHandle,
12};
13
14// Import DAG components
15use qudag_dag::{Dag, DagMessage, VertexId};
16
17// Minimal RPC types for NodeRunner integration
18#[derive(Debug, Clone)]
19pub enum RpcTransport {
20    Tcp(String),
21    Unix(String),
22}
23
24#[derive(Debug, Clone)]
25pub enum RpcCommand {
26    Stop,
27    GetStatus,
28}
29
30// Minimal RPC server placeholder
31pub struct RpcServer {
32    _transport: RpcTransport,
33}
34
35impl RpcServer {
36    pub fn new_tcp(
37        port: u16,
38    ) -> (
39        Self,
40        tokio::sync::mpsc::Receiver<(RpcCommand, tokio::sync::oneshot::Sender<serde_json::Value>)>,
41    ) {
42        let (_, rx) = tokio::sync::mpsc::channel(1);
43        (
44            Self {
45                _transport: RpcTransport::Tcp(format!("127.0.0.1:{}", port)),
46            },
47            rx,
48        )
49    }
50
51    pub fn new_unix(
52        path: String,
53    ) -> (
54        Self,
55        tokio::sync::mpsc::Receiver<(RpcCommand, tokio::sync::oneshot::Sender<serde_json::Value>)>,
56    ) {
57        let (_, rx) = tokio::sync::mpsc::channel(1);
58        (
59            Self {
60                _transport: RpcTransport::Unix(path),
61            },
62            rx,
63        )
64    }
65
66    pub async fn start(&mut self) -> Result<(), String> {
67        // Placeholder implementation
68        Ok(())
69    }
70
71    pub async fn stop(&mut self) -> Result<(), String> {
72        // Placeholder implementation
73        Ok(())
74    }
75}
76
77// Import protocol types
78use crate::types::{ProtocolError, ProtocolEvent};
79
80/// Errors that can occur during node operations
81#[derive(Error, Debug)]
82pub enum NodeRunnerError {
83    #[error("Network error: {0}")]
84    NetworkError(String),
85
86    #[error("DAG error: {0}")]
87    DagError(String),
88
89    #[error("RPC error: {0}")]
90    RpcError(String),
91
92    #[error("Protocol error: {0}")]
93    ProtocolError(#[from] ProtocolError),
94
95    #[error("Node already started")]
96    AlreadyStarted,
97
98    #[error("Node not started")]
99    NotStarted,
100
101    #[error("Shutdown error: {0}")]
102    ShutdownError(String),
103}
104
105/// Configuration for the NodeRunner
106#[derive(Debug, Clone)]
107pub struct NodeRunnerConfig {
108    /// P2P network configuration
109    pub p2p_config: P2PNetworkConfig,
110
111    /// RPC server transport configuration
112    pub rpc_transport: RpcTransport,
113
114    /// Maximum concurrent DAG messages
115    pub max_dag_concurrent: usize,
116
117    /// Enable dark resolver
118    pub enable_dark_resolver: bool,
119
120    /// Node shutdown timeout
121    pub shutdown_timeout: Duration,
122}
123
124impl Default for NodeRunnerConfig {
125    fn default() -> Self {
126        Self {
127            p2p_config: P2PNetworkConfig::default(),
128            rpc_transport: RpcTransport::Tcp("127.0.0.1:9090".to_string()),
129            max_dag_concurrent: 100,
130            enable_dark_resolver: true,
131            shutdown_timeout: Duration::from_secs(30),
132        }
133    }
134}
135
136/// The main node integration coordinator
137pub struct NodeRunner {
138    /// Configuration
139    config: NodeRunnerConfig,
140
141    /// P2P network handle
142    p2p_handle: Option<P2PHandle>,
143
144    /// P2P node task handle
145    p2p_task_handle:
146        Option<tokio::task::JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>>,
147
148    /// DAG consensus
149    dag: Arc<RwLock<Dag>>,
150
151    /// RPC server
152    rpc_server: Option<Arc<Mutex<RpcServer>>>,
153
154    /// RPC command receiver
155    #[allow(dead_code)]
156    rpc_command_rx: Option<
157        tokio::sync::mpsc::Receiver<(RpcCommand, tokio::sync::oneshot::Sender<serde_json::Value>)>,
158    >,
159
160    /// Dark resolver for .dark addresses
161    dark_resolver: Option<Arc<RwLock<DarkResolver>>>,
162
163    /// Event channel for protocol events
164    #[allow(dead_code)]
165    event_tx: mpsc::UnboundedSender<ProtocolEvent>,
166    event_rx: Option<mpsc::UnboundedReceiver<ProtocolEvent>>,
167
168    /// Shutdown signal
169    shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
170
171    /// Node state
172    is_running: Arc<RwLock<bool>>,
173}
174
175impl NodeRunner {
176    /// Create a new NodeRunner instance
177    pub fn new(config: NodeRunnerConfig) -> Self {
178        let (event_tx, event_rx) = mpsc::unbounded_channel();
179
180        // Create DAG instance
181        let dag = Arc::new(RwLock::new(Dag::new(config.max_dag_concurrent)));
182
183        Self {
184            config,
185            p2p_handle: None,
186            p2p_task_handle: None,
187            dag,
188            rpc_server: None,
189            rpc_command_rx: None,
190            dark_resolver: None,
191            event_tx,
192            event_rx: Some(event_rx),
193            shutdown_tx: None,
194            is_running: Arc::new(RwLock::new(false)),
195        }
196    }
197
198    /// Initialize all components
199    async fn initialize_components(&mut self) -> Result<(), NodeRunnerError> {
200        info!("Initializing node components...");
201
202        // Initialize P2P node
203        let (mut p2p_node, p2p_handle) = P2PNode::new(self.config.p2p_config.clone())
204            .await
205            .map_err(|e| NodeRunnerError::NetworkError(e.to_string()))?;
206
207        // Start the P2P node
208        p2p_node
209            .start()
210            .await
211            .map_err(|e| NodeRunnerError::NetworkError(e.to_string()))?;
212
213        // Spawn the P2P node task
214        let p2p_task_handle = tokio::spawn(async move {
215            p2p_node
216                .run()
217                .await
218                .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> {
219                    Box::new(std::io::Error::other(e.to_string()))
220                })
221        });
222
223        self.p2p_handle = Some(p2p_handle);
224        self.p2p_task_handle = Some(p2p_task_handle);
225
226        // Initialize RPC server
227        let (rpc_server, rpc_command_rx) = match &self.config.rpc_transport {
228            RpcTransport::Tcp(addr) => {
229                let port = addr
230                    .split(':')
231                    .next_back()
232                    .and_then(|p| p.parse::<u16>().ok())
233                    .unwrap_or(9090);
234                RpcServer::new_tcp(port)
235            }
236            RpcTransport::Unix(path) => RpcServer::new_unix(path.clone()),
237        };
238        self.rpc_server = Some(Arc::new(Mutex::new(rpc_server)));
239        self.rpc_command_rx = Some(rpc_command_rx);
240
241        // Initialize dark resolver if enabled
242        if self.config.enable_dark_resolver {
243            self.dark_resolver = Some(Arc::new(RwLock::new(DarkResolver::new())));
244        }
245
246        info!("All node components initialized successfully");
247        Ok(())
248    }
249
250    /// Start the node and all its components
251    pub async fn start(&mut self) -> Result<(), NodeRunnerError> {
252        // Check if already running
253        if *self.is_running.read().await {
254            return Err(NodeRunnerError::AlreadyStarted);
255        }
256
257        info!("Starting QuDAG node...");
258
259        // Initialize components if not already done
260        if self.p2p_handle.is_none() {
261            self.initialize_components().await?;
262        }
263
264        // Start RPC server
265        if let Some(rpc_server) = &self.rpc_server {
266            let mut server = rpc_server.lock().await;
267            server
268                .start()
269                .await
270                .map_err(|e| NodeRunnerError::RpcError(e.to_string()))?;
271        }
272
273        // Mark as running
274        *self.is_running.write().await = true;
275
276        info!("QuDAG node started successfully");
277        Ok(())
278    }
279
280    /// Main event loop that bridges P2P messages to DAG
281    pub async fn run(&mut self) -> Result<(), NodeRunnerError> {
282        if !*self.is_running.read().await {
283            return Err(NodeRunnerError::NotStarted);
284        }
285
286        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
287        self.shutdown_tx = Some(shutdown_tx);
288
289        let mut event_rx = self.event_rx.take().ok_or(NodeRunnerError::NotStarted)?;
290
291        // Get P2P events from the handle if available
292        let p2p_handle = self.p2p_handle.clone();
293
294        info!("Node runner event loop started");
295
296        loop {
297            select! {
298                // Handle P2P events (if P2P handle is available)
299                p2p_event = async {
300                    if let Some(ref handle) = p2p_handle {
301                        handle.next_event().await
302                    } else {
303                        None::<P2PEvent>
304                    }
305                } => {
306                    if let Some(event) = p2p_event {
307                        if let Err(e) = self.handle_p2p_event(event).await {
308                            error!("Error handling P2P event: {}", e);
309                        }
310                    }
311                }
312
313                // Handle protocol events
314                Some(protocol_event) = event_rx.recv() => {
315                    if let Err(e) = self.handle_protocol_event(protocol_event).await {
316                        error!("Error handling protocol event: {}", e);
317                    }
318                }
319
320                // Handle shutdown signal
321                _ = &mut shutdown_rx => {
322                    info!("Received shutdown signal");
323                    break;
324                }
325            }
326        }
327
328        info!("Node runner event loop stopped");
329        Ok(())
330    }
331
332    /// Handle P2P network events
333    async fn handle_p2p_event(&self, event: P2PEvent) -> Result<(), NodeRunnerError> {
334        match event {
335            P2PEvent::MessageReceived {
336                peer_id,
337                topic,
338                data,
339            } => {
340                debug!("Received message from peer {} on topic {}", peer_id, topic);
341
342                // Convert P2P message to DAG message
343                let dag_message = DagMessage {
344                    id: VertexId::new(),
345                    payload: data,
346                    parents: Default::default(), // TODO: Extract parents from message
347                    timestamp: std::time::SystemTime::now()
348                        .duration_since(std::time::UNIX_EPOCH)
349                        .unwrap()
350                        .as_secs(),
351                };
352
353                // Submit to DAG
354                let dag = self.dag.write().await;
355                dag.submit_message(dag_message)
356                    .await
357                    .map_err(|e| NodeRunnerError::DagError(e.to_string()))?;
358            }
359
360            P2PEvent::PeerConnected(peer_id) => {
361                info!("Peer connected: {}", peer_id);
362                // TODO: Update peer tracking
363            }
364
365            P2PEvent::PeerDisconnected(peer_id) => {
366                info!("Peer disconnected: {}", peer_id);
367                // TODO: Update peer tracking
368            }
369
370            P2PEvent::RequestReceived {
371                peer_id,
372                request,
373                channel,
374            } => {
375                debug!("Received request from peer {}: {:?}", peer_id, request);
376                // TODO: Handle custom requests
377                let response = QuDagResponse {
378                    request_id: request.request_id,
379                    payload: vec![],
380                };
381                let _ = channel.send(response);
382            }
383
384            _ => {
385                debug!("Unhandled P2P event: {:?}", event);
386            }
387        }
388
389        Ok(())
390    }
391
392    /// Handle protocol events
393    async fn handle_protocol_event(&self, event: ProtocolEvent) -> Result<(), NodeRunnerError> {
394        match event {
395            ProtocolEvent::MessageReceived { id, .. } => {
396                debug!("Message received: {:?}", id);
397
398                // Broadcast consensus result to network
399                if let Some(p2p_handle) = &self.p2p_handle {
400                    // TODO: Implement broadcast_consensus_result using p2p_handle
401                    let _handle = p2p_handle;
402                    // handle.publish("consensus", consensus_data).await?;
403                }
404            }
405
406            ProtocolEvent::MessageFinalized { id, .. } => {
407                info!("Message finalized: {:?}", id);
408                // TODO: Handle finalization completion
409            }
410
411            _ => {
412                debug!("Unhandled protocol event: {:?}", event);
413            }
414        }
415
416        Ok(())
417    }
418
419    /// Handle RPC commands (placeholder implementation)
420    async fn _handle_rpc_commands(
421        mut _rx: tokio::sync::mpsc::Receiver<(
422            RpcCommand,
423            tokio::sync::oneshot::Sender<serde_json::Value>,
424        )>,
425        _event_tx: mpsc::UnboundedSender<ProtocolEvent>,
426        _is_running: Arc<RwLock<bool>>,
427    ) {
428        // Placeholder implementation for RPC command handling
429        // In a real implementation, this would process RPC commands
430    }
431
432    /// Gracefully stop the node and all its components
433    pub async fn stop(&mut self) -> Result<(), NodeRunnerError> {
434        if !*self.is_running.read().await {
435            return Ok(());
436        }
437
438        info!("Stopping QuDAG node...");
439
440        // Send shutdown signal
441        if let Some(tx) = self.shutdown_tx.take() {
442            let _ = tx.send(());
443        }
444
445        // Stop RPC server
446        if let Some(rpc_server) = &self.rpc_server {
447            let mut server = rpc_server.lock().await;
448            server
449                .stop()
450                .await
451                .map_err(|e| NodeRunnerError::RpcError(e.to_string()))?;
452        }
453
454        // Stop P2P node by canceling the task
455        if let Some(task_handle) = self.p2p_task_handle.take() {
456            task_handle.abort();
457            if let Err(e) = task_handle.await {
458                if !e.is_cancelled() {
459                    warn!("P2P task shutdown error: {}", e);
460                }
461            }
462        }
463
464        // Drop P2P handle
465        self.p2p_handle = None;
466
467        // Mark as stopped
468        *self.is_running.write().await = false;
469
470        info!("QuDAG node stopped successfully");
471        Ok(())
472    }
473
474    /// Get a reference to the P2P handle
475    pub fn p2p_handle(&self) -> &Option<P2PHandle> {
476        &self.p2p_handle
477    }
478
479    /// Get a reference to the DAG
480    pub fn dag(&self) -> &Arc<RwLock<Dag>> {
481        &self.dag
482    }
483
484    /// Get a reference to the RPC server
485    pub fn rpc_server(&self) -> &Option<Arc<Mutex<RpcServer>>> {
486        &self.rpc_server
487    }
488
489    /// Get a reference to the dark resolver
490    pub fn dark_resolver(&self) -> &Option<Arc<RwLock<DarkResolver>>> {
491        &self.dark_resolver
492    }
493
494    /// Get a reference to the running state
495    pub fn is_running(&self) -> &Arc<RwLock<bool>> {
496        &self.is_running
497    }
498
499    /// Get the node configuration
500    pub fn config(&self) -> &NodeRunnerConfig {
501        &self.config
502    }
503
504    /// Get the current node status
505    pub async fn status(&self) -> Result<serde_json::Value, NodeRunnerError> {
506        let is_running = *self.is_running.read().await;
507
508        let dag_stats = {
509            let dag = self.dag.read().await;
510            let vertices = dag.vertices.read().await;
511            serde_json::json!({
512                "vertex_count": vertices.len(),
513                "tips": 0, // TODO: Implement get_tips method
514            })
515        };
516
517        let p2p_stats = if let Some(p2p_handle) = &self.p2p_handle {
518            serde_json::json!({
519                "peer_id": p2p_handle.local_peer_id().await.to_string(),
520                "connected_peers": p2p_handle.connected_peers().await.len(),
521            })
522        } else {
523            serde_json::Value::Null
524        };
525
526        Ok(serde_json::json!({
527            "is_running": is_running,
528            "dag": dag_stats,
529            "p2p": p2p_stats,
530            "dark_resolver_enabled": self.config.enable_dark_resolver,
531        }))
532    }
533}
534
535#[cfg(test)]
536mod tests {
537    use super::*;
538
539    #[tokio::test]
540    async fn test_node_runner_creation() {
541        let config = NodeRunnerConfig::default();
542        let node_runner = NodeRunner::new(config);
543
544        assert!(!*node_runner.is_running.read().await);
545        assert!(node_runner.p2p_handle.is_none());
546        assert!(node_runner.rpc_server.is_none());
547    }
548
549    #[tokio::test]
550    async fn test_node_runner_start_stop() {
551        let config = NodeRunnerConfig {
552            rpc_transport: RpcTransport::Tcp("127.0.0.1:0".to_string()),
553            ..Default::default()
554        };
555
556        let mut node_runner = NodeRunner::new(config);
557
558        // Should be able to start
559        assert!(node_runner.start().await.is_ok());
560        assert!(*node_runner.is_running.read().await);
561
562        // Should not be able to start again
563        assert!(matches!(
564            node_runner.start().await,
565            Err(NodeRunnerError::AlreadyStarted)
566        ));
567
568        // Should be able to stop
569        assert!(node_runner.stop().await.is_ok());
570        assert!(!*node_runner.is_running.read().await);
571    }
572
573    #[tokio::test]
574    async fn test_node_runner_status() {
575        let config = NodeRunnerConfig::default();
576        let node_runner = NodeRunner::new(config);
577
578        let status = node_runner.status().await.unwrap();
579        assert_eq!(status["is_running"], false);
580        assert!(status["dag"].is_object());
581        assert!(status["p2p"].is_null());
582    }
583}