qudag_protocol/
rpc_server.rs

1use crate::ProtocolError;
2use qudag_crypto::ml_dsa::MlDsaPublicKey;
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{SystemTime, UNIX_EPOCH};
8use tokio::io::{AsyncReadExt, AsyncWriteExt};
9use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
10use tokio::sync::{mpsc, oneshot, Mutex, RwLock};
11use tokio::time::{timeout, Duration};
12use tracing::{debug, error, info, warn};
13use uuid::Uuid;
14
15/// Extension trait for reading u32 from streams
16trait ReadU32Ext: AsyncReadExt + Unpin {
17    async fn read_u32(&mut self) -> std::io::Result<u32> {
18        let mut buf = [0u8; 4];
19        self.read_exact(&mut buf).await?;
20        Ok(u32::from_be_bytes(buf))
21    }
22}
23
24impl<T: AsyncReadExt + Unpin> ReadU32Ext for T {}
25
26/// RPC request
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct RpcRequest {
29    pub id: Uuid,
30    pub method: String,
31    pub params: serde_json::Value,
32}
33
34/// RPC response
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct RpcResponse {
37    pub id: Uuid,
38    pub result: Option<serde_json::Value>,
39    pub error: Option<RpcError>,
40}
41
42/// RPC error
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct RpcError {
45    pub code: i32,
46    pub message: String,
47    pub data: Option<serde_json::Value>,
48}
49
50/// RPC command types
51#[derive(Debug, Clone)]
52pub enum RpcCommand {
53    Stop,
54    GetStatus,
55    ListPeers,
56    AddPeer(String),
57    RemovePeer(String),
58    GetPeerInfo(String),
59    BanPeer(String),
60    UnbanPeer(String),
61    GetNetworkStats,
62    TestNetwork,
63}
64
65/// Peer information for RPC responses
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct PeerInfo {
68    pub id: String,
69    pub address: String,
70    pub connected_duration: u64,
71    pub messages_sent: u64,
72    pub messages_received: u64,
73    pub last_seen: u64,
74    pub status: String,
75    pub latency: Option<f64>,
76}
77
78/// Network statistics
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct NetworkStats {
81    pub total_connections: usize,
82    pub active_connections: usize,
83    pub messages_sent: u64,
84    pub messages_received: u64,
85    pub bytes_sent: u64,
86    pub bytes_received: u64,
87    pub average_latency: f64,
88    pub uptime: u64,
89}
90
91/// Network test result
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct NetworkTestResult {
94    pub peer_id: String,
95    pub address: String,
96    pub reachable: bool,
97    pub latency: Option<f64>,
98    pub error: Option<String>,
99}
100
101/// DAG statistics
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct DagStats {
104    pub vertex_count: usize,
105    pub edge_count: usize,
106    pub tip_count: usize,
107    pub finalized_height: u64,
108    pub pending_transactions: usize,
109}
110
111/// Memory statistics
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct MemoryStats {
114    pub total_allocated: usize,
115    pub current_usage: usize,
116    pub peak_usage: usize,
117}
118
119/// Node status with all metrics
120#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct NodeStatus {
122    pub node_id: String,
123    pub state: String,
124    pub uptime: u64,
125    pub peers: Vec<PeerInfo>,
126    pub network_stats: NetworkStats,
127    pub dag_stats: DagStats,
128    pub memory_usage: MemoryStats,
129}
130
131/// Transport type for RPC server
132#[derive(Debug, Clone)]
133pub enum RpcTransport {
134    /// TCP socket transport
135    Tcp(String),
136    /// Unix domain socket transport
137    Unix(String),
138}
139
140/// Forward declaration of NodeRunner to avoid circular imports
141type NodeRunnerHandle = Arc<RwLock<dyn NodeRunnerTrait + Send + Sync>>;
142
143/// Trait for NodeRunner operations that RPC server can call
144pub trait NodeRunnerTrait: Send + Sync + std::fmt::Debug {
145    fn get_status(
146        &self,
147    ) -> Pin<
148        Box<
149            dyn std::future::Future<
150                    Output = Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>>,
151                > + Send,
152        >,
153    >;
154    fn get_connected_peers(
155        &self,
156    ) -> Pin<Box<dyn std::future::Future<Output = Vec<PeerInfo>> + Send>>;
157    fn dial_peer(
158        &self,
159        address: String,
160    ) -> Pin<Box<dyn std::future::Future<Output = Result<(), String>> + Send>>;
161    fn disconnect_peer(
162        &self,
163        peer_id: &str,
164    ) -> Pin<Box<dyn std::future::Future<Output = Result<(), String>> + Send>>;
165    fn get_network_stats(&self) -> Pin<Box<dyn std::future::Future<Output = NetworkStats> + Send>>;
166    fn shutdown(
167        &self,
168    ) -> Pin<
169        Box<
170            dyn std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
171                + Send,
172        >,
173    >;
174}
175
176/// RPC server for handling remote commands
177pub struct RpcServer {
178    transport: RpcTransport,
179    shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
180    command_tx: mpsc::Sender<(RpcCommand, tokio::sync::oneshot::Sender<serde_json::Value>)>,
181    network_manager: Arc<RwLock<NetworkManager>>,
182    /// Handle to the running node for real operations
183    node_handle: Option<NodeRunnerHandle>,
184    /// Channel to send shutdown signal to the node
185    node_shutdown_tx: Option<oneshot::Sender<()>>,
186    auth_token: Option<String>,
187    rate_limiter: Arc<Mutex<RateLimiter>>,
188    auth_keys: Arc<RwLock<HashMap<String, MlDsaPublicKey>>>,
189    #[allow(dead_code)]
190    start_time: SystemTime,
191}
192
193/// Network manager for peer operations that can work with or without a real P2P node
194#[derive(Debug)]
195pub struct NetworkManager {
196    /// Mock peers for when no real node is connected
197    mock_peers: HashMap<String, PeerInfo>,
198    /// Banned peer addresses
199    banned_peers: std::collections::HashSet<String>,
200    /// Network statistics
201    network_stats: NetworkStats,
202    /// Start time for uptime calculation
203    start_time: SystemTime,
204    /// Handle to real node (if available)
205    node_handle: Option<NodeRunnerHandle>,
206}
207
208/// Rate limiter for RPC requests
209#[derive(Debug)]
210struct RateLimiter {
211    requests: HashMap<String, Vec<SystemTime>>,
212    max_requests_per_minute: usize,
213}
214
215impl NetworkManager {
216    fn new() -> Self {
217        Self {
218            mock_peers: HashMap::new(),
219            banned_peers: std::collections::HashSet::new(),
220            network_stats: NetworkStats {
221                total_connections: 0,
222                active_connections: 0,
223                messages_sent: 0,
224                messages_received: 0,
225                bytes_sent: 0,
226                bytes_received: 0,
227                average_latency: 0.0,
228                uptime: 0,
229            },
230            start_time: SystemTime::now(),
231            node_handle: None,
232        }
233    }
234
235    /// Set the handle to the real node for actual operations
236    pub fn set_node_handle(&mut self, handle: NodeRunnerHandle) {
237        self.node_handle = Some(handle);
238    }
239
240    async fn add_peer(&mut self, address: String) -> Result<(), String> {
241        if self.banned_peers.contains(&address) {
242            return Err("Peer is banned".to_string());
243        }
244
245        // Try to use real node if available
246        if let Some(node) = &self.node_handle {
247            let node_guard = node.read().await;
248            return node_guard.dial_peer(address).await;
249        }
250
251        // Fall back to mock behavior
252        let peer_id = format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8]);
253        let peer_info = PeerInfo {
254            id: peer_id.clone(),
255            address: address.clone(),
256            connected_duration: 0,
257            messages_sent: 0,
258            messages_received: 0,
259            last_seen: SystemTime::now()
260                .duration_since(UNIX_EPOCH)
261                .unwrap()
262                .as_secs(),
263            status: "Connected".to_string(),
264            latency: None,
265        };
266
267        self.mock_peers.insert(peer_id, peer_info);
268        self.network_stats.total_connections += 1;
269        self.network_stats.active_connections += 1;
270        Ok(())
271    }
272
273    async fn remove_peer(&mut self, peer_id: &str) -> Result<(), String> {
274        // Try to use real node if available
275        if let Some(node) = &self.node_handle {
276            let node_guard = node.read().await;
277            return node_guard.disconnect_peer(peer_id).await;
278        }
279
280        // Fall back to mock behavior
281        if self.mock_peers.remove(peer_id).is_some() {
282            self.network_stats.active_connections =
283                self.network_stats.active_connections.saturating_sub(1);
284            Ok(())
285        } else {
286            Err("Peer not found".to_string())
287        }
288    }
289
290    async fn get_peer_info(&self, peer_id: &str) -> Option<PeerInfo> {
291        // If we have a real node, get peer info from it
292        if let Some(node) = &self.node_handle {
293            let node_guard = node.read().await;
294            let connected_peers = node_guard.get_connected_peers().await;
295            return connected_peers.into_iter().find(|p| p.id == peer_id);
296        }
297
298        // Fall back to mock peers
299        self.mock_peers.get(peer_id).cloned()
300    }
301
302    async fn list_peers(&self) -> Vec<PeerInfo> {
303        // If we have a real node, get peers from it
304        if let Some(node) = &self.node_handle {
305            let node_guard = node.read().await;
306            return node_guard.get_connected_peers().await;
307        }
308
309        // Fall back to mock peers
310        self.mock_peers.values().cloned().collect()
311    }
312
313    async fn ban_peer(&mut self, peer_id: &str) -> Result<(), String> {
314        // Get peer address before removing
315        let peer_address = if let Some(node) = &self.node_handle {
316            let node_guard = node.read().await;
317            let connected_peers = node_guard.get_connected_peers().await;
318            connected_peers
319                .into_iter()
320                .find(|p| p.id == peer_id)
321                .map(|p| p.address)
322        } else {
323            self.mock_peers.get(peer_id).map(|p| p.address.clone())
324        };
325
326        if let Some(address) = peer_address {
327            self.banned_peers.insert(address);
328            self.remove_peer(peer_id).await?;
329            Ok(())
330        } else {
331            Err("Peer not found".to_string())
332        }
333    }
334
335    fn unban_peer(&mut self, address: &str) -> Result<(), String> {
336        if self.banned_peers.remove(address) {
337            Ok(())
338        } else {
339            Err("Peer is not banned".to_string())
340        }
341    }
342
343    async fn get_network_stats(&mut self) -> NetworkStats {
344        // If we have a real node, get stats from it
345        if let Some(node) = &self.node_handle {
346            let node_guard = node.read().await;
347            return node_guard.get_network_stats().await;
348        }
349
350        // Fall back to mock stats
351        self.network_stats.uptime = self.start_time.elapsed().unwrap_or_default().as_secs();
352        self.network_stats.clone()
353    }
354
355    async fn test_network(&self) -> Vec<NetworkTestResult> {
356        let mut results = Vec::new();
357
358        let peers = if let Some(node) = &self.node_handle {
359            let node_guard = node.read().await;
360            node_guard.get_connected_peers().await
361        } else {
362            self.mock_peers.values().cloned().collect()
363        };
364
365        for peer in peers {
366            let result = self.test_peer_connectivity(&peer).await;
367            results.push(result);
368        }
369
370        results
371    }
372
373    async fn test_peer_connectivity(&self, peer: &PeerInfo) -> NetworkTestResult {
374        // Simulate network test - in a real implementation this would do actual connectivity testing
375        let start = std::time::Instant::now();
376
377        // Try to parse address and test connectivity
378        match peer.address.parse::<std::net::SocketAddr>() {
379            Ok(addr) => {
380                match timeout(Duration::from_secs(5), tokio::net::TcpStream::connect(addr)).await {
381                    Ok(Ok(_)) => NetworkTestResult {
382                        peer_id: peer.id.clone(),
383                        address: peer.address.clone(),
384                        reachable: true,
385                        latency: Some(start.elapsed().as_millis() as f64),
386                        error: None,
387                    },
388                    Ok(Err(e)) => NetworkTestResult {
389                        peer_id: peer.id.clone(),
390                        address: peer.address.clone(),
391                        reachable: false,
392                        latency: None,
393                        error: Some(e.to_string()),
394                    },
395                    Err(_) => NetworkTestResult {
396                        peer_id: peer.id.clone(),
397                        address: peer.address.clone(),
398                        reachable: false,
399                        latency: None,
400                        error: Some("Connection timeout".to_string()),
401                    },
402                }
403            }
404            Err(e) => NetworkTestResult {
405                peer_id: peer.id.clone(),
406                address: peer.address.clone(),
407                reachable: false,
408                latency: None,
409                error: Some(format!("Invalid address: {}", e)),
410            },
411        }
412    }
413}
414
415impl RateLimiter {
416    fn new(max_requests_per_minute: usize) -> Self {
417        Self {
418            requests: HashMap::new(),
419            max_requests_per_minute,
420        }
421    }
422
423    fn check_rate_limit(&mut self, client_ip: &str) -> bool {
424        let now = SystemTime::now();
425        let requests = self.requests.entry(client_ip.to_string()).or_default();
426
427        // Remove requests older than 1 minute
428        requests.retain(|&time| now.duration_since(time).unwrap_or_default().as_secs() < 60);
429
430        if requests.len() >= self.max_requests_per_minute {
431            false
432        } else {
433            requests.push(now);
434            true
435        }
436    }
437}
438
439impl RpcServer {
440    /// Create new RPC server with TCP transport
441    pub fn new_tcp(
442        port: u16,
443    ) -> (
444        Self,
445        mpsc::Receiver<(RpcCommand, tokio::sync::oneshot::Sender<serde_json::Value>)>,
446    ) {
447        let (command_tx, command_rx) = mpsc::channel(100);
448
449        let server = Self {
450            transport: RpcTransport::Tcp(format!("127.0.0.1:{}", port)),
451            shutdown_tx: None,
452            command_tx,
453            network_manager: Arc::new(RwLock::new(NetworkManager::new())),
454            node_handle: None,
455            node_shutdown_tx: None,
456            auth_token: std::env::var("RPC_AUTH_TOKEN").ok(),
457            rate_limiter: Arc::new(Mutex::new(RateLimiter::new(60))), // 60 requests per minute
458            auth_keys: Arc::new(RwLock::new(HashMap::new())),
459            start_time: SystemTime::now(),
460        };
461
462        (server, command_rx)
463    }
464
465    /// Create new RPC server with Unix socket transport
466    pub fn new_unix(
467        socket_path: String,
468    ) -> (
469        Self,
470        mpsc::Receiver<(RpcCommand, tokio::sync::oneshot::Sender<serde_json::Value>)>,
471    ) {
472        let (command_tx, command_rx) = mpsc::channel(100);
473
474        let server = Self {
475            transport: RpcTransport::Unix(socket_path),
476            shutdown_tx: None,
477            command_tx,
478            network_manager: Arc::new(RwLock::new(NetworkManager::new())),
479            node_handle: None,
480            node_shutdown_tx: None,
481            auth_token: std::env::var("RPC_AUTH_TOKEN").ok(),
482            rate_limiter: Arc::new(Mutex::new(RateLimiter::new(60))),
483            auth_keys: Arc::new(RwLock::new(HashMap::new())),
484            start_time: SystemTime::now(),
485        };
486
487        (server, command_rx)
488    }
489
490    /// Create new RPC server with authentication
491    pub fn with_auth(
492        transport: RpcTransport,
493        auth_token: String,
494    ) -> (
495        Self,
496        mpsc::Receiver<(RpcCommand, tokio::sync::oneshot::Sender<serde_json::Value>)>,
497    ) {
498        let (command_tx, command_rx) = mpsc::channel(100);
499
500        let server = Self {
501            transport,
502            shutdown_tx: None,
503            command_tx,
504            network_manager: Arc::new(RwLock::new(NetworkManager::new())),
505            node_handle: None,
506            node_shutdown_tx: None,
507            auth_token: Some(auth_token),
508            rate_limiter: Arc::new(Mutex::new(RateLimiter::new(60))),
509            auth_keys: Arc::new(RwLock::new(HashMap::new())),
510            start_time: SystemTime::now(),
511        };
512
513        (server, command_rx)
514    }
515
516    /// Set the node handle for real operations
517    pub async fn set_node_handle(&mut self, handle: NodeRunnerHandle) {
518        self.node_handle = Some(handle.clone());
519        let mut manager = self.network_manager.write().await;
520        manager.set_node_handle(handle);
521    }
522
523    /// Set the shutdown channel for stopping the node
524    pub fn set_shutdown_channel(&mut self, tx: oneshot::Sender<()>) {
525        self.node_shutdown_tx = Some(tx);
526    }
527
528    /// Add an authorized public key for ML-DSA authentication
529    pub async fn add_auth_key(&self, client_id: String, public_key: MlDsaPublicKey) {
530        let mut keys = self.auth_keys.write().await;
531        keys.insert(client_id, public_key);
532    }
533
534    /// Start RPC server
535    pub async fn start(&mut self) -> Result<(), ProtocolError> {
536        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
537        self.shutdown_tx = Some(shutdown_tx);
538
539        let command_tx = self.command_tx.clone();
540        let network_manager = Arc::clone(&self.network_manager);
541        // let node = self.node.clone();
542        // let dag = self.dag.clone();
543        let auth_token = self.auth_token.clone();
544        let auth_keys = Arc::clone(&self.auth_keys);
545        let rate_limiter = Arc::clone(&self.rate_limiter);
546        let transport = self.transport.clone();
547
548        tokio::spawn(async move {
549            match transport {
550                RpcTransport::Tcp(addr) => {
551                    let listener = match TcpListener::bind(&addr).await {
552                        Ok(l) => l,
553                        Err(e) => {
554                            error!("Failed to bind TCP listener: {}", e);
555                            return;
556                        }
557                    };
558
559                    info!(
560                        "RPC server listening on TCP: {}",
561                        listener.local_addr().unwrap()
562                    );
563
564                    loop {
565                        tokio::select! {
566                            Ok((stream, addr)) = listener.accept() => {
567                                debug!("New RPC connection from {}", addr);
568                                let command_tx = command_tx.clone();
569                                let network_manager = Arc::clone(&network_manager);
570                                // let node = node.clone();
571                                // let dag = dag.clone();
572                                let auth_token = auth_token.clone();
573                                let auth_keys = Arc::clone(&auth_keys);
574                                let rate_limiter = Arc::clone(&rate_limiter);
575                                let client_ip = addr.ip().to_string();
576
577                                tokio::spawn(async move {
578                                    // Check rate limit
579                                    {
580                                        let mut limiter = rate_limiter.lock().await;
581                                        if !limiter.check_rate_limit(&client_ip) {
582                                            warn!("Rate limit exceeded for client: {}", client_ip);
583                                            return;
584                                        }
585                                    }
586
587                                    if let Err(e) = handle_tcp_connection(
588                                        stream, command_tx, network_manager, auth_token, auth_keys
589                                    ).await {
590                                        error!("Error handling RPC connection: {}", e);
591                                    }
592                                });
593                            }
594                            _ = &mut shutdown_rx => {
595                                info!("RPC server shutting down");
596                                break;
597                            }
598                        }
599                    }
600                }
601                RpcTransport::Unix(path) => {
602                    // Remove existing socket file if it exists
603                    let _ = std::fs::remove_file(&path);
604
605                    let listener = match UnixListener::bind(&path) {
606                        Ok(l) => l,
607                        Err(e) => {
608                            error!("Failed to bind Unix listener: {}", e);
609                            return;
610                        }
611                    };
612
613                    info!("RPC server listening on Unix socket: {}", path);
614
615                    loop {
616                        tokio::select! {
617                            Ok((stream, _)) = listener.accept() => {
618                                debug!("New RPC connection on Unix socket");
619                                let command_tx = command_tx.clone();
620                                let network_manager = Arc::clone(&network_manager);
621                                // let node = node.clone();
622                                // let dag = dag.clone();
623                                let auth_token = auth_token.clone();
624                                let auth_keys = Arc::clone(&auth_keys);
625
626                                tokio::spawn(async move {
627                                    if let Err(e) = handle_unix_connection(
628                                        stream, command_tx, network_manager, auth_token, auth_keys
629                                    ).await {
630                                        error!("Error handling RPC connection: {}", e);
631                                    }
632                                });
633                            }
634                            _ = &mut shutdown_rx => {
635                                info!("RPC server shutting down");
636                                break;
637                            }
638                        }
639                    }
640                }
641            }
642        });
643
644        Ok(())
645    }
646
647    /// Stop RPC server
648    pub async fn stop(&mut self) -> Result<(), ProtocolError> {
649        if let Some(tx) = self.shutdown_tx.take() {
650            let _ = tx.send(());
651        }
652        Ok(())
653    }
654}
655
656/// Handle TCP RPC connection
657async fn handle_tcp_connection(
658    mut stream: TcpStream,
659    command_tx: mpsc::Sender<(RpcCommand, tokio::sync::oneshot::Sender<serde_json::Value>)>,
660    network_manager: Arc<RwLock<NetworkManager>>,
661    auth_token: Option<String>,
662    auth_keys: Arc<RwLock<HashMap<String, MlDsaPublicKey>>>,
663) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
664    // Read request with timeout
665    let request_len = timeout(Duration::from_secs(30), ReadU32Ext::read_u32(&mut stream))
666        .await??
667        .min(10 * 1024 * 1024); // Max 10MB request
668
669    let mut request_data = vec![0u8; request_len as usize];
670    timeout(
671        Duration::from_secs(30),
672        stream.read_exact(&mut request_data),
673    )
674    .await??;
675
676    let request: RpcRequest = serde_json::from_slice(&request_data)?;
677
678    let response =
679        handle_request(request, command_tx, network_manager, auth_token, auth_keys).await;
680
681    let response_data = serde_json::to_vec(&response)?;
682    stream
683        .write_all(&(response_data.len() as u32).to_be_bytes())
684        .await?;
685    stream.write_all(&response_data).await?;
686    stream.flush().await?;
687
688    Ok(())
689}
690
691/// Handle Unix socket RPC connection
692async fn handle_unix_connection(
693    mut stream: UnixStream,
694    command_tx: mpsc::Sender<(RpcCommand, tokio::sync::oneshot::Sender<serde_json::Value>)>,
695    network_manager: Arc<RwLock<NetworkManager>>,
696    auth_token: Option<String>,
697    auth_keys: Arc<RwLock<HashMap<String, MlDsaPublicKey>>>,
698) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
699    let request_len = timeout(Duration::from_secs(30), ReadU32Ext::read_u32(&mut stream))
700        .await??
701        .min(10 * 1024 * 1024);
702
703    let mut request_data = vec![0u8; request_len as usize];
704    timeout(
705        Duration::from_secs(30),
706        stream.read_exact(&mut request_data),
707    )
708    .await??;
709
710    let request: RpcRequest = serde_json::from_slice(&request_data)?;
711
712    let response =
713        handle_request(request, command_tx, network_manager, auth_token, auth_keys).await;
714
715    let response_data = serde_json::to_vec(&response)?;
716    stream
717        .write_all(&(response_data.len() as u32).to_be_bytes())
718        .await?;
719    stream.write_all(&response_data).await?;
720    stream.flush().await?;
721
722    Ok(())
723}
724
725/// Authenticate RPC request
726async fn authenticate_request(
727    request: &RpcRequest,
728    auth_token: &Option<String>,
729    auth_keys: &Arc<RwLock<HashMap<String, MlDsaPublicKey>>>,
730) -> bool {
731    // Check token-based auth first
732    if let Some(expected_token) = auth_token {
733        if let Some(provided_token) = request.params.get("auth_token").and_then(|v| v.as_str()) {
734            if provided_token == expected_token {
735                return true;
736            }
737        }
738    } else if auth_token.is_none() && auth_keys.read().await.is_empty() {
739        // No authentication required if both are empty
740        return true;
741    }
742
743    // Check ML-DSA signature-based auth
744    if let (Some(client_id), Some(signature)) = (
745        request.params.get("client_id").and_then(|v| v.as_str()),
746        request.params.get("signature").and_then(|v| v.as_str()),
747    ) {
748        let keys = auth_keys.read().await;
749        if let Some(public_key) = keys.get(client_id) {
750            // Verify signature over the request method and ID
751            let message = format!("{}:{}", request.method, request.id);
752            if let Ok(sig_bytes) = hex::decode(signature) {
753                if public_key.verify(message.as_bytes(), &sig_bytes).is_ok() {
754                    return true;
755                }
756            }
757        }
758    }
759
760    false
761}
762
763/// Handle RPC request
764async fn handle_request(
765    request: RpcRequest,
766    command_tx: mpsc::Sender<(RpcCommand, tokio::sync::oneshot::Sender<serde_json::Value>)>,
767    network_manager: Arc<RwLock<NetworkManager>>,
768    auth_token: Option<String>,
769    auth_keys: Arc<RwLock<HashMap<String, MlDsaPublicKey>>>,
770) -> RpcResponse {
771    // Authenticate request if auth is enabled
772    if !authenticate_request(&request, &auth_token, &auth_keys).await {
773        return RpcResponse {
774            id: request.id,
775            result: None,
776            error: Some(RpcError {
777                code: -32001,
778                message: "Authentication required".to_string(),
779                data: None,
780            }),
781        };
782    }
783    match request.method.as_str() {
784        "list_peers" => {
785            let manager = network_manager.read().await;
786            let peers = manager.list_peers().await;
787            RpcResponse {
788                id: request.id,
789                result: Some(serde_json::to_value(peers).unwrap()),
790                error: None,
791            }
792        }
793        "add_peer" => {
794            let address = match request.params.get("address").and_then(|v| v.as_str()) {
795                Some(addr) => addr.to_string(),
796                None => {
797                    return RpcResponse {
798                        id: request.id,
799                        result: None,
800                        error: Some(RpcError {
801                            code: -32602,
802                            message: "Invalid params: address required".to_string(),
803                            data: None,
804                        }),
805                    };
806                }
807            };
808
809            let mut manager = network_manager.write().await;
810            match manager.add_peer(address.clone()).await {
811                Ok(()) => RpcResponse {
812                    id: request.id,
813                    result: Some(
814                        serde_json::json!({"status": "success", "message": format!("Peer {} added", address)}),
815                    ),
816                    error: None,
817                },
818                Err(e) => RpcResponse {
819                    id: request.id,
820                    result: None,
821                    error: Some(RpcError {
822                        code: -32003,
823                        message: format!("Failed to add peer: {}", e),
824                        data: None,
825                    }),
826                },
827            }
828        }
829        "remove_peer" => {
830            let peer_id = match request.params.get("peer_id").and_then(|v| v.as_str()) {
831                Some(id) => id,
832                None => {
833                    return RpcResponse {
834                        id: request.id,
835                        result: None,
836                        error: Some(RpcError {
837                            code: -32602,
838                            message: "Invalid params: peer_id required".to_string(),
839                            data: None,
840                        }),
841                    };
842                }
843            };
844
845            let mut manager = network_manager.write().await;
846            match manager.remove_peer(peer_id).await {
847                Ok(()) => RpcResponse {
848                    id: request.id,
849                    result: Some(
850                        serde_json::json!({"status": "success", "message": format!("Peer {} removed", peer_id)}),
851                    ),
852                    error: None,
853                },
854                Err(e) => RpcResponse {
855                    id: request.id,
856                    result: None,
857                    error: Some(RpcError {
858                        code: -32003,
859                        message: format!("Failed to remove peer: {}", e),
860                        data: None,
861                    }),
862                },
863            }
864        }
865        "get_peer_info" => {
866            let peer_id = match request.params.get("peer_id").and_then(|v| v.as_str()) {
867                Some(id) => id,
868                None => {
869                    return RpcResponse {
870                        id: request.id,
871                        result: None,
872                        error: Some(RpcError {
873                            code: -32602,
874                            message: "Invalid params: peer_id required".to_string(),
875                            data: None,
876                        }),
877                    };
878                }
879            };
880
881            let manager = network_manager.read().await;
882            match manager.get_peer_info(peer_id).await {
883                Some(peer_info) => RpcResponse {
884                    id: request.id,
885                    result: Some(serde_json::to_value(peer_info).unwrap()),
886                    error: None,
887                },
888                None => RpcResponse {
889                    id: request.id,
890                    result: None,
891                    error: Some(RpcError {
892                        code: -32004,
893                        message: "Peer not found".to_string(),
894                        data: None,
895                    }),
896                },
897            }
898        }
899        "ban_peer" => {
900            let peer_id = match request.params.get("peer_id").and_then(|v| v.as_str()) {
901                Some(id) => id,
902                None => {
903                    return RpcResponse {
904                        id: request.id,
905                        result: None,
906                        error: Some(RpcError {
907                            code: -32602,
908                            message: "Invalid params: peer_id required".to_string(),
909                            data: None,
910                        }),
911                    };
912                }
913            };
914
915            let mut manager = network_manager.write().await;
916            match manager.ban_peer(peer_id).await {
917                Ok(()) => RpcResponse {
918                    id: request.id,
919                    result: Some(
920                        serde_json::json!({"status": "success", "message": format!("Peer {} banned", peer_id)}),
921                    ),
922                    error: None,
923                },
924                Err(e) => RpcResponse {
925                    id: request.id,
926                    result: None,
927                    error: Some(RpcError {
928                        code: -32003,
929                        message: format!("Failed to ban peer: {}", e),
930                        data: None,
931                    }),
932                },
933            }
934        }
935        "unban_peer" => {
936            let address = match request.params.get("address").and_then(|v| v.as_str()) {
937                Some(addr) => addr,
938                None => {
939                    return RpcResponse {
940                        id: request.id,
941                        result: None,
942                        error: Some(RpcError {
943                            code: -32602,
944                            message: "Invalid params: address required".to_string(),
945                            data: None,
946                        }),
947                    };
948                }
949            };
950
951            let mut manager = network_manager.write().await;
952            match manager.unban_peer(address) {
953                Ok(()) => RpcResponse {
954                    id: request.id,
955                    result: Some(
956                        serde_json::json!({"status": "success", "message": format!("Peer {} unbanned", address)}),
957                    ),
958                    error: None,
959                },
960                Err(e) => RpcResponse {
961                    id: request.id,
962                    result: None,
963                    error: Some(RpcError {
964                        code: -32003,
965                        message: format!("Failed to unban peer: {}", e),
966                        data: None,
967                    }),
968                },
969            }
970        }
971        "get_network_stats" => {
972            let mut manager = network_manager.write().await;
973            let stats = manager.get_network_stats().await;
974            RpcResponse {
975                id: request.id,
976                result: Some(serde_json::to_value(stats).unwrap()),
977                error: None,
978            }
979        }
980        "test_network" => {
981            let manager = network_manager.read().await;
982            let results = manager.test_network().await;
983            RpcResponse {
984                id: request.id,
985                result: Some(serde_json::to_value(results).unwrap()),
986                error: None,
987            }
988        }
989        "stop" => {
990            info!("Received stop request via RPC");
991
992            // Try to shutdown the node gracefully through command channel
993            let (tx, rx) = tokio::sync::oneshot::channel();
994            if let Err(_) = command_tx.send((RpcCommand::Stop, tx)).await {
995                return RpcResponse {
996                    id: request.id,
997                    result: None,
998                    error: Some(RpcError {
999                        code: -1,
1000                        message: "Failed to send stop command".to_string(),
1001                        data: None,
1002                    }),
1003                };
1004            }
1005
1006            match rx.await {
1007                Ok(result) => RpcResponse {
1008                    id: request.id,
1009                    result: Some(result),
1010                    error: None,
1011                },
1012                Err(_) => RpcResponse {
1013                    id: request.id,
1014                    result: None,
1015                    error: Some(RpcError {
1016                        code: -1,
1017                        message: "Command execution failed".to_string(),
1018                        data: None,
1019                    }),
1020                },
1021            }
1022        }
1023        "get_status" => {
1024            // Try to get status from real node if available
1025            let mut manager = network_manager.write().await;
1026
1027            // Check if we have a node handle to get real status from
1028            let real_status = if let Some(node) = &manager.node_handle {
1029                let node_guard = node.read().await;
1030                match node_guard.get_status().await {
1031                    Ok(status) => Some(status),
1032                    Err(e) => {
1033                        warn!("Failed to get real node status: {}", e);
1034                        None
1035                    }
1036                }
1037            } else {
1038                None
1039            };
1040
1041            // If we got real status, use it; otherwise build mock status
1042            let result = if let Some(status) = real_status {
1043                status
1044            } else {
1045                // Build mock status
1046                let mut status = NodeStatus {
1047                    node_id: "node_mock".to_string(),
1048                    state: "Mock".to_string(),
1049                    uptime: 0,
1050                    peers: vec![],
1051                    network_stats: NetworkStats {
1052                        total_connections: 0,
1053                        active_connections: 0,
1054                        messages_sent: 0,
1055                        messages_received: 0,
1056                        bytes_sent: 0,
1057                        bytes_received: 0,
1058                        average_latency: 0.0,
1059                        uptime: 0,
1060                    },
1061                    dag_stats: DagStats {
1062                        vertex_count: 0,
1063                        edge_count: 0,
1064                        tip_count: 0,
1065                        finalized_height: 0,
1066                        pending_transactions: 0,
1067                    },
1068                    memory_usage: MemoryStats {
1069                        total_allocated: 0,
1070                        current_usage: 0,
1071                        peak_usage: 0,
1072                    },
1073                };
1074
1075                // Get mock network stats
1076                status.peers = manager.list_peers().await;
1077                status.network_stats = manager.get_network_stats().await;
1078                status.uptime = manager.start_time.elapsed().unwrap_or_default().as_secs();
1079
1080                // Get memory stats
1081                #[cfg(target_os = "linux")]
1082                {
1083                    if let Ok(contents) = std::fs::read_to_string("/proc/self/status") {
1084                        for line in contents.lines() {
1085                            if line.starts_with("VmRSS:") {
1086                                if let Some(kb_str) = line.split_whitespace().nth(1) {
1087                                    if let Ok(kb) = kb_str.parse::<usize>() {
1088                                        status.memory_usage.current_usage = kb * 1024;
1089                                    }
1090                                }
1091                            } else if line.starts_with("VmPeak:") {
1092                                if let Some(kb_str) = line.split_whitespace().nth(1) {
1093                                    if let Ok(kb) = kb_str.parse::<usize>() {
1094                                        status.memory_usage.peak_usage = kb * 1024;
1095                                    }
1096                                }
1097                            }
1098                        }
1099                    }
1100                }
1101
1102                serde_json::to_value(status).unwrap()
1103            };
1104
1105            RpcResponse {
1106                id: request.id,
1107                result: Some(result),
1108                error: None,
1109            }
1110        }
1111        _ => RpcResponse {
1112            id: request.id,
1113            result: None,
1114            error: Some(RpcError {
1115                code: -32601,
1116                message: format!("Method '{}' not found", request.method),
1117                data: None,
1118            }),
1119        },
1120    }
1121}
1122
1123#[cfg(test)]
1124mod tests {
1125    use super::*;
1126
1127    #[test]
1128    fn test_rpc_request_serialization() {
1129        let request = RpcRequest {
1130            id: Uuid::new_v4(),
1131            method: "stop".to_string(),
1132            params: serde_json::Value::Null,
1133        };
1134
1135        let serialized = serde_json::to_string(&request).unwrap();
1136        let deserialized: RpcRequest = serde_json::from_str(&serialized).unwrap();
1137
1138        assert_eq!(request.method, deserialized.method);
1139    }
1140
1141    #[tokio::test]
1142    async fn test_network_manager_peer_operations() {
1143        let mut manager = NetworkManager::new();
1144
1145        // Test adding peer
1146        assert!(manager.add_peer("127.0.0.1:8001".to_string()).await.is_ok());
1147        assert_eq!(manager.list_peers().await.len(), 1);
1148
1149        // Test adding duplicate peer (should work)
1150        assert!(manager.add_peer("127.0.0.1:8002".to_string()).await.is_ok());
1151        assert_eq!(manager.list_peers().await.len(), 2);
1152
1153        // Test getting peer info
1154        let peers = manager.list_peers().await;
1155        let peer_id = peers[0].id.clone();
1156        assert!(manager.get_peer_info(&peer_id).await.is_some());
1157
1158        // Test removing peer
1159        assert!(manager.remove_peer(&peer_id).await.is_ok());
1160        assert_eq!(manager.list_peers().await.len(), 1);
1161
1162        // Test removing non-existent peer
1163        assert!(manager.remove_peer("invalid_id").await.is_err());
1164    }
1165
1166    #[tokio::test]
1167    async fn test_network_manager_ban_operations() {
1168        let mut manager = NetworkManager::new();
1169
1170        // Add a peer
1171        manager
1172            .add_peer("127.0.0.1:8001".to_string())
1173            .await
1174            .unwrap();
1175        let peer_id = manager.list_peers().await[0].id.clone();
1176
1177        // Ban the peer
1178        assert!(manager.ban_peer(&peer_id).await.is_ok());
1179        assert_eq!(manager.list_peers().await.len(), 0); // Should be removed
1180
1181        // Try to add the same address again (should fail)
1182        assert!(manager
1183            .add_peer("127.0.0.1:8001".to_string())
1184            .await
1185            .is_err());
1186
1187        // Unban the peer
1188        assert!(manager.unban_peer("127.0.0.1:8001").is_ok());
1189
1190        // Now adding should work again
1191        assert!(manager.add_peer("127.0.0.1:8001".to_string()).await.is_ok());
1192    }
1193
1194    #[test]
1195    fn test_rate_limiter() {
1196        let mut limiter = RateLimiter::new(2); // 2 requests per minute
1197
1198        // First two requests should pass
1199        assert!(limiter.check_rate_limit("127.0.0.1"));
1200        assert!(limiter.check_rate_limit("127.0.0.1"));
1201
1202        // Third request should fail
1203        assert!(!limiter.check_rate_limit("127.0.0.1"));
1204
1205        // Different IP should work
1206        assert!(limiter.check_rate_limit("127.0.0.2"));
1207    }
1208
1209    #[tokio::test]
1210    async fn test_authenticate_request() {
1211        let request_with_token = RpcRequest {
1212            id: Uuid::new_v4(),
1213            method: "test".to_string(),
1214            params: serde_json::json!({ "auth_token": "secret123" }),
1215        };
1216
1217        let request_without_token = RpcRequest {
1218            id: Uuid::new_v4(),
1219            method: "test".to_string(),
1220            params: serde_json::Value::Null,
1221        };
1222
1223        let auth_keys = Arc::new(RwLock::new(HashMap::new()));
1224
1225        // Test with auth enabled
1226        let auth_token = Some("secret123".to_string());
1227        assert!(authenticate_request(&request_with_token, &auth_token, &auth_keys).await);
1228        assert!(!authenticate_request(&request_without_token, &auth_token, &auth_keys).await);
1229
1230        // Test with auth disabled
1231        let no_auth = None;
1232        assert!(authenticate_request(&request_with_token, &no_auth, &auth_keys).await);
1233        assert!(authenticate_request(&request_without_token, &no_auth, &auth_keys).await);
1234    }
1235
1236    #[tokio::test]
1237    async fn test_rpc_server_creation() {
1238        let (server, _rx) = RpcServer::new_tcp(0); // Port 0 for automatic assignment
1239        match server.transport {
1240            RpcTransport::Tcp(addr) => assert!(addr.contains(":0")),
1241            _ => panic!("Expected TCP transport"),
1242        }
1243    }
1244
1245    #[tokio::test]
1246    async fn test_rpc_server_with_auth() {
1247        let (server, _rx) = RpcServer::with_auth(
1248            RpcTransport::Tcp("127.0.0.1:0".to_string()),
1249            "secret123".to_string(),
1250        );
1251        assert_eq!(server.auth_token, Some("secret123".to_string()));
1252    }
1253
1254    #[tokio::test]
1255    async fn test_network_test_functionality() {
1256        let manager = NetworkManager::new();
1257        let results = manager.test_network().await;
1258        assert!(results.is_empty()); // No peers to test
1259    }
1260
1261    #[tokio::test]
1262    async fn test_network_stats() {
1263        let mut manager = NetworkManager::new();
1264        let stats = manager.get_network_stats().await;
1265
1266        assert_eq!(stats.total_connections, 0);
1267        assert_eq!(stats.active_connections, 0);
1268        assert_eq!(stats.messages_sent, 0);
1269        assert_eq!(stats.messages_received, 0);
1270
1271        // Add a peer and check stats update
1272        manager
1273            .add_peer("127.0.0.1:8001".to_string())
1274            .await
1275            .unwrap();
1276        let updated_stats = manager.get_network_stats().await;
1277        assert_eq!(updated_stats.total_connections, 1);
1278        assert_eq!(updated_stats.active_connections, 1);
1279    }
1280
1281    #[test]
1282    fn test_peer_info_serialization() {
1283        let peer_info = PeerInfo {
1284            id: "test_peer".to_string(),
1285            address: "127.0.0.1:8001".to_string(),
1286            connected_duration: 300,
1287            messages_sent: 10,
1288            messages_received: 15,
1289            last_seen: 1234567890,
1290            status: "Connected".to_string(),
1291            latency: Some(25.5),
1292        };
1293
1294        let serialized = serde_json::to_string(&peer_info).unwrap();
1295        let deserialized: PeerInfo = serde_json::from_str(&serialized).unwrap();
1296
1297        assert_eq!(peer_info.id, deserialized.id);
1298        assert_eq!(peer_info.address, deserialized.address);
1299        assert_eq!(peer_info.status, deserialized.status);
1300        assert_eq!(peer_info.latency, deserialized.latency);
1301    }
1302
1303    #[test]
1304    fn test_network_stats_serialization() {
1305        let stats = NetworkStats {
1306            total_connections: 5,
1307            active_connections: 3,
1308            messages_sent: 100,
1309            messages_received: 95,
1310            bytes_sent: 1024,
1311            bytes_received: 2048,
1312            average_latency: 15.7,
1313            uptime: 3600,
1314        };
1315
1316        let serialized = serde_json::to_string(&stats).unwrap();
1317        let deserialized: NetworkStats = serde_json::from_str(&serialized).unwrap();
1318
1319        assert_eq!(stats.total_connections, deserialized.total_connections);
1320        assert_eq!(stats.active_connections, deserialized.active_connections);
1321        assert_eq!(stats.uptime, deserialized.uptime);
1322    }
1323
1324    #[test]
1325    fn test_rpc_error_codes() {
1326        // Test standard JSON-RPC error codes
1327        let method_not_found = RpcError {
1328            code: -32601,
1329            message: "Method not found".to_string(),
1330            data: None,
1331        };
1332
1333        let invalid_params = RpcError {
1334            code: -32602,
1335            message: "Invalid params".to_string(),
1336            data: None,
1337        };
1338
1339        let auth_required = RpcError {
1340            code: -32001,
1341            message: "Authentication required".to_string(),
1342            data: None,
1343        };
1344
1345        assert_eq!(method_not_found.code, -32601);
1346        assert_eq!(invalid_params.code, -32602);
1347        assert_eq!(auth_required.code, -32001);
1348    }
1349}