1use std::sync::Arc;
2use std::time::Duration;
3use thiserror::Error;
4use tokio::select;
5use tokio::sync::{mpsc, Mutex, RwLock};
6use tracing::{debug, error, info, warn};
7
8use qudag_network::{
10 p2p::{NetworkConfig as P2PNetworkConfig, P2PEvent, P2PNode, QuDagResponse},
11 DarkResolver, P2PHandle,
12};
13
14use qudag_dag::{Dag, DagMessage, VertexId};
16
17#[derive(Debug, Clone)]
19pub enum RpcTransport {
20 Tcp(String),
21 Unix(String),
22}
23
24#[derive(Debug, Clone)]
25pub enum RpcCommand {
26 Stop,
27 GetStatus,
28}
29
30pub struct RpcServer {
32 _transport: RpcTransport,
33}
34
35impl RpcServer {
36 pub fn new_tcp(
37 port: u16,
38 ) -> (
39 Self,
40 tokio::sync::mpsc::Receiver<(RpcCommand, tokio::sync::oneshot::Sender<serde_json::Value>)>,
41 ) {
42 let (_, rx) = tokio::sync::mpsc::channel(1);
43 (
44 Self {
45 _transport: RpcTransport::Tcp(format!("127.0.0.1:{}", port)),
46 },
47 rx,
48 )
49 }
50
51 pub fn new_unix(
52 path: String,
53 ) -> (
54 Self,
55 tokio::sync::mpsc::Receiver<(RpcCommand, tokio::sync::oneshot::Sender<serde_json::Value>)>,
56 ) {
57 let (_, rx) = tokio::sync::mpsc::channel(1);
58 (
59 Self {
60 _transport: RpcTransport::Unix(path),
61 },
62 rx,
63 )
64 }
65
66 pub async fn start(&mut self) -> Result<(), String> {
67 Ok(())
69 }
70
71 pub async fn stop(&mut self) -> Result<(), String> {
72 Ok(())
74 }
75}
76
77use crate::types::{ProtocolError, ProtocolEvent};
79
80#[derive(Error, Debug)]
82pub enum NodeRunnerError {
83 #[error("Network error: {0}")]
84 NetworkError(String),
85
86 #[error("DAG error: {0}")]
87 DagError(String),
88
89 #[error("RPC error: {0}")]
90 RpcError(String),
91
92 #[error("Protocol error: {0}")]
93 ProtocolError(#[from] ProtocolError),
94
95 #[error("Node already started")]
96 AlreadyStarted,
97
98 #[error("Node not started")]
99 NotStarted,
100
101 #[error("Shutdown error: {0}")]
102 ShutdownError(String),
103}
104
105#[derive(Debug, Clone)]
107pub struct NodeRunnerConfig {
108 pub p2p_config: P2PNetworkConfig,
110
111 pub rpc_transport: RpcTransport,
113
114 pub max_dag_concurrent: usize,
116
117 pub enable_dark_resolver: bool,
119
120 pub shutdown_timeout: Duration,
122}
123
124impl Default for NodeRunnerConfig {
125 fn default() -> Self {
126 Self {
127 p2p_config: P2PNetworkConfig::default(),
128 rpc_transport: RpcTransport::Tcp("127.0.0.1:9090".to_string()),
129 max_dag_concurrent: 100,
130 enable_dark_resolver: true,
131 shutdown_timeout: Duration::from_secs(30),
132 }
133 }
134}
135
136pub struct NodeRunner {
138 config: NodeRunnerConfig,
140
141 p2p_handle: Option<P2PHandle>,
143
144 p2p_task_handle:
146 Option<tokio::task::JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>>,
147
148 dag: Arc<RwLock<Dag>>,
150
151 rpc_server: Option<Arc<Mutex<RpcServer>>>,
153
154 #[allow(dead_code)]
156 rpc_command_rx: Option<
157 tokio::sync::mpsc::Receiver<(RpcCommand, tokio::sync::oneshot::Sender<serde_json::Value>)>,
158 >,
159
160 dark_resolver: Option<Arc<RwLock<DarkResolver>>>,
162
163 #[allow(dead_code)]
165 event_tx: mpsc::UnboundedSender<ProtocolEvent>,
166 event_rx: Option<mpsc::UnboundedReceiver<ProtocolEvent>>,
167
168 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
170
171 is_running: Arc<RwLock<bool>>,
173}
174
175impl NodeRunner {
176 pub fn new(config: NodeRunnerConfig) -> Self {
178 let (event_tx, event_rx) = mpsc::unbounded_channel();
179
180 let dag = Arc::new(RwLock::new(Dag::new(config.max_dag_concurrent)));
182
183 Self {
184 config,
185 p2p_handle: None,
186 p2p_task_handle: None,
187 dag,
188 rpc_server: None,
189 rpc_command_rx: None,
190 dark_resolver: None,
191 event_tx,
192 event_rx: Some(event_rx),
193 shutdown_tx: None,
194 is_running: Arc::new(RwLock::new(false)),
195 }
196 }
197
198 async fn initialize_components(&mut self) -> Result<(), NodeRunnerError> {
200 info!("Initializing node components...");
201
202 let (mut p2p_node, p2p_handle) = P2PNode::new(self.config.p2p_config.clone())
204 .await
205 .map_err(|e| NodeRunnerError::NetworkError(e.to_string()))?;
206
207 p2p_node
209 .start()
210 .await
211 .map_err(|e| NodeRunnerError::NetworkError(e.to_string()))?;
212
213 let p2p_task_handle = tokio::spawn(async move {
215 p2p_node
216 .run()
217 .await
218 .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> {
219 Box::new(std::io::Error::other(e.to_string()))
220 })
221 });
222
223 self.p2p_handle = Some(p2p_handle);
224 self.p2p_task_handle = Some(p2p_task_handle);
225
226 let (rpc_server, rpc_command_rx) = match &self.config.rpc_transport {
228 RpcTransport::Tcp(addr) => {
229 let port = addr
230 .split(':')
231 .next_back()
232 .and_then(|p| p.parse::<u16>().ok())
233 .unwrap_or(9090);
234 RpcServer::new_tcp(port)
235 }
236 RpcTransport::Unix(path) => RpcServer::new_unix(path.clone()),
237 };
238 self.rpc_server = Some(Arc::new(Mutex::new(rpc_server)));
239 self.rpc_command_rx = Some(rpc_command_rx);
240
241 if self.config.enable_dark_resolver {
243 self.dark_resolver = Some(Arc::new(RwLock::new(DarkResolver::new())));
244 }
245
246 info!("All node components initialized successfully");
247 Ok(())
248 }
249
250 pub async fn start(&mut self) -> Result<(), NodeRunnerError> {
252 if *self.is_running.read().await {
254 return Err(NodeRunnerError::AlreadyStarted);
255 }
256
257 info!("Starting QuDAG node...");
258
259 if self.p2p_handle.is_none() {
261 self.initialize_components().await?;
262 }
263
264 if let Some(rpc_server) = &self.rpc_server {
266 let mut server = rpc_server.lock().await;
267 server
268 .start()
269 .await
270 .map_err(|e| NodeRunnerError::RpcError(e.to_string()))?;
271 }
272
273 *self.is_running.write().await = true;
275
276 info!("QuDAG node started successfully");
277 Ok(())
278 }
279
280 pub async fn run(&mut self) -> Result<(), NodeRunnerError> {
282 if !*self.is_running.read().await {
283 return Err(NodeRunnerError::NotStarted);
284 }
285
286 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
287 self.shutdown_tx = Some(shutdown_tx);
288
289 let mut event_rx = self.event_rx.take().ok_or(NodeRunnerError::NotStarted)?;
290
291 let p2p_handle = self.p2p_handle.clone();
293
294 info!("Node runner event loop started");
295
296 loop {
297 select! {
298 p2p_event = async {
300 if let Some(ref handle) = p2p_handle {
301 handle.next_event().await
302 } else {
303 None::<P2PEvent>
304 }
305 } => {
306 if let Some(event) = p2p_event {
307 if let Err(e) = self.handle_p2p_event(event).await {
308 error!("Error handling P2P event: {}", e);
309 }
310 }
311 }
312
313 Some(protocol_event) = event_rx.recv() => {
315 if let Err(e) = self.handle_protocol_event(protocol_event).await {
316 error!("Error handling protocol event: {}", e);
317 }
318 }
319
320 _ = &mut shutdown_rx => {
322 info!("Received shutdown signal");
323 break;
324 }
325 }
326 }
327
328 info!("Node runner event loop stopped");
329 Ok(())
330 }
331
332 async fn handle_p2p_event(&self, event: P2PEvent) -> Result<(), NodeRunnerError> {
334 match event {
335 P2PEvent::MessageReceived {
336 peer_id,
337 topic,
338 data,
339 } => {
340 debug!("Received message from peer {} on topic {}", peer_id, topic);
341
342 let dag_message = DagMessage {
344 id: VertexId::new(),
345 payload: data,
346 parents: Default::default(), timestamp: std::time::SystemTime::now()
348 .duration_since(std::time::UNIX_EPOCH)
349 .unwrap()
350 .as_secs(),
351 };
352
353 let dag = self.dag.write().await;
355 dag.submit_message(dag_message)
356 .await
357 .map_err(|e| NodeRunnerError::DagError(e.to_string()))?;
358 }
359
360 P2PEvent::PeerConnected(peer_id) => {
361 info!("Peer connected: {}", peer_id);
362 }
364
365 P2PEvent::PeerDisconnected(peer_id) => {
366 info!("Peer disconnected: {}", peer_id);
367 }
369
370 P2PEvent::RequestReceived {
371 peer_id,
372 request,
373 channel,
374 } => {
375 debug!("Received request from peer {}: {:?}", peer_id, request);
376 let response = QuDagResponse {
378 request_id: request.request_id,
379 payload: vec![],
380 };
381 let _ = channel.send(response);
382 }
383
384 _ => {
385 debug!("Unhandled P2P event: {:?}", event);
386 }
387 }
388
389 Ok(())
390 }
391
392 async fn handle_protocol_event(&self, event: ProtocolEvent) -> Result<(), NodeRunnerError> {
394 match event {
395 ProtocolEvent::MessageReceived { id, .. } => {
396 debug!("Message received: {:?}", id);
397
398 if let Some(p2p_handle) = &self.p2p_handle {
400 let _handle = p2p_handle;
402 }
404 }
405
406 ProtocolEvent::MessageFinalized { id, .. } => {
407 info!("Message finalized: {:?}", id);
408 }
410
411 _ => {
412 debug!("Unhandled protocol event: {:?}", event);
413 }
414 }
415
416 Ok(())
417 }
418
419 async fn _handle_rpc_commands(
421 mut _rx: tokio::sync::mpsc::Receiver<(
422 RpcCommand,
423 tokio::sync::oneshot::Sender<serde_json::Value>,
424 )>,
425 _event_tx: mpsc::UnboundedSender<ProtocolEvent>,
426 _is_running: Arc<RwLock<bool>>,
427 ) {
428 }
431
432 pub async fn stop(&mut self) -> Result<(), NodeRunnerError> {
434 if !*self.is_running.read().await {
435 return Ok(());
436 }
437
438 info!("Stopping QuDAG node...");
439
440 if let Some(tx) = self.shutdown_tx.take() {
442 let _ = tx.send(());
443 }
444
445 if let Some(rpc_server) = &self.rpc_server {
447 let mut server = rpc_server.lock().await;
448 server
449 .stop()
450 .await
451 .map_err(|e| NodeRunnerError::RpcError(e.to_string()))?;
452 }
453
454 if let Some(task_handle) = self.p2p_task_handle.take() {
456 task_handle.abort();
457 if let Err(e) = task_handle.await {
458 if !e.is_cancelled() {
459 warn!("P2P task shutdown error: {}", e);
460 }
461 }
462 }
463
464 self.p2p_handle = None;
466
467 *self.is_running.write().await = false;
469
470 info!("QuDAG node stopped successfully");
471 Ok(())
472 }
473
474 pub fn p2p_handle(&self) -> &Option<P2PHandle> {
476 &self.p2p_handle
477 }
478
479 pub fn dag(&self) -> &Arc<RwLock<Dag>> {
481 &self.dag
482 }
483
484 pub fn rpc_server(&self) -> &Option<Arc<Mutex<RpcServer>>> {
486 &self.rpc_server
487 }
488
489 pub fn dark_resolver(&self) -> &Option<Arc<RwLock<DarkResolver>>> {
491 &self.dark_resolver
492 }
493
494 pub fn is_running(&self) -> &Arc<RwLock<bool>> {
496 &self.is_running
497 }
498
499 pub fn config(&self) -> &NodeRunnerConfig {
501 &self.config
502 }
503
504 pub async fn status(&self) -> Result<serde_json::Value, NodeRunnerError> {
506 let is_running = *self.is_running.read().await;
507
508 let dag_stats = {
509 let dag = self.dag.read().await;
510 let vertices = dag.vertices.read().await;
511 serde_json::json!({
512 "vertex_count": vertices.len(),
513 "tips": 0, })
515 };
516
517 let p2p_stats = if let Some(p2p_handle) = &self.p2p_handle {
518 serde_json::json!({
519 "peer_id": p2p_handle.local_peer_id().await.to_string(),
520 "connected_peers": p2p_handle.connected_peers().await.len(),
521 })
522 } else {
523 serde_json::Value::Null
524 };
525
526 Ok(serde_json::json!({
527 "is_running": is_running,
528 "dag": dag_stats,
529 "p2p": p2p_stats,
530 "dark_resolver_enabled": self.config.enable_dark_resolver,
531 }))
532 }
533}
534
535#[cfg(test)]
536mod tests {
537 use super::*;
538
539 #[tokio::test]
540 async fn test_node_runner_creation() {
541 let config = NodeRunnerConfig::default();
542 let node_runner = NodeRunner::new(config);
543
544 assert!(!*node_runner.is_running.read().await);
545 assert!(node_runner.p2p_handle.is_none());
546 assert!(node_runner.rpc_server.is_none());
547 }
548
549 #[tokio::test]
550 async fn test_node_runner_start_stop() {
551 let config = NodeRunnerConfig {
552 rpc_transport: RpcTransport::Tcp("127.0.0.1:0".to_string()),
553 ..Default::default()
554 };
555
556 let mut node_runner = NodeRunner::new(config);
557
558 assert!(node_runner.start().await.is_ok());
560 assert!(*node_runner.is_running.read().await);
561
562 assert!(matches!(
564 node_runner.start().await,
565 Err(NodeRunnerError::AlreadyStarted)
566 ));
567
568 assert!(node_runner.stop().await.is_ok());
570 assert!(!*node_runner.is_running.read().await);
571 }
572
573 #[tokio::test]
574 async fn test_node_runner_status() {
575 let config = NodeRunnerConfig::default();
576 let node_runner = NodeRunner::new(config);
577
578 let status = node_runner.status().await.unwrap();
579 assert_eq!(status["is_running"], false);
580 assert!(status["dag"].is_object());
581 assert!(status["p2p"].is_null());
582 }
583}