qudag_cli/
rpc.rs

1use anyhow::{anyhow, Result};
2use qudag_crypto::ml_dsa::MlDsaKeyPair;
3use qudag_protocol::NodeConfig;
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6use tokio::io::AsyncWriteExt;
7use tokio::net::{TcpStream, UnixStream};
8use tokio::sync::Mutex;
9use tokio::time::{sleep, timeout, Duration};
10use tracing::{debug, warn};
11use uuid::Uuid;
12
13/// RPC request
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct RpcRequest {
16    pub id: Uuid,
17    pub method: String,
18    pub params: serde_json::Value,
19}
20
21/// RPC response
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct RpcResponse {
24    pub id: Uuid,
25    pub result: Option<serde_json::Value>,
26    pub error: Option<RpcError>,
27}
28
29/// RPC error
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct RpcError {
32    pub code: i32,
33    pub message: String,
34    pub data: Option<serde_json::Value>,
35}
36
37/// Node status information
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct NodeStatus {
40    pub node_id: String,
41    pub state: String,
42    pub uptime: u64,
43    pub peers: Vec<PeerInfo>,
44    pub network_stats: NetworkStats,
45    pub dag_stats: DagStats,
46    pub memory_usage: MemoryStats,
47}
48
49/// Peer information
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct PeerInfo {
52    pub id: String,
53    pub address: String,
54    pub connected_duration: u64,
55    pub messages_sent: u64,
56    pub messages_received: u64,
57    pub last_seen: u64,
58    pub status: String,
59    pub latency: Option<f64>,
60}
61
62/// Network statistics
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct NetworkStats {
65    pub total_connections: usize,
66    pub active_connections: usize,
67    pub messages_sent: u64,
68    pub messages_received: u64,
69    pub bytes_sent: u64,
70    pub bytes_received: u64,
71    pub average_latency: f64,
72    pub uptime: u64,
73}
74
75/// DAG statistics
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct DagStats {
78    pub vertex_count: usize,
79    pub edge_count: usize,
80    pub tip_count: usize,
81    pub finalized_height: u64,
82    pub pending_transactions: usize,
83}
84
85/// Memory statistics
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct MemoryStats {
88    pub total_allocated: usize,
89    pub current_usage: usize,
90    pub peak_usage: usize,
91}
92
93/// Wallet information
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct WalletInfo {
96    pub public_key: String,
97    pub balance: u64,
98    pub address: String,
99    pub key_type: String,
100}
101
102/// Network test result
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct NetworkTestResult {
105    pub peer_id: String,
106    pub address: String,
107    pub reachable: bool,
108    pub latency: Option<f64>,
109    pub error: Option<String>,
110}
111
112/// Trait for async read/write operations
113#[async_trait::async_trait]
114trait AsyncReadWrite: Send + Sync {
115    async fn read_u32(&mut self) -> Result<u32>;
116    async fn read_exact(&mut self, buf: &mut [u8]) -> Result<()>;
117    async fn write_u32(&mut self, val: u32) -> Result<()>;
118    async fn write_all(&mut self, buf: &[u8]) -> Result<()>;
119    async fn flush(&mut self) -> Result<()>;
120}
121
122#[async_trait::async_trait]
123impl AsyncReadWrite for TcpStream {
124    async fn read_u32(&mut self) -> Result<u32> {
125        let mut buf = [0u8; 4];
126        tokio::io::AsyncReadExt::read_exact(self, &mut buf).await?;
127        Ok(u32::from_be_bytes(buf))
128    }
129
130    async fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
131        tokio::io::AsyncReadExt::read_exact(self, buf).await?;
132        Ok(())
133    }
134
135    async fn write_u32(&mut self, val: u32) -> Result<()> {
136        AsyncWriteExt::write_all(self, &val.to_be_bytes()).await?;
137        Ok(())
138    }
139
140    async fn write_all(&mut self, buf: &[u8]) -> Result<()> {
141        AsyncWriteExt::write_all(self, buf).await?;
142        Ok(())
143    }
144
145    async fn flush(&mut self) -> Result<()> {
146        AsyncWriteExt::flush(self).await?;
147        Ok(())
148    }
149}
150
151#[async_trait::async_trait]
152impl AsyncReadWrite for UnixStream {
153    async fn read_u32(&mut self) -> Result<u32> {
154        let mut buf = [0u8; 4];
155        tokio::io::AsyncReadExt::read_exact(self, &mut buf).await?;
156        Ok(u32::from_be_bytes(buf))
157    }
158
159    async fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
160        tokio::io::AsyncReadExt::read_exact(self, buf).await?;
161        Ok(())
162    }
163
164    async fn write_u32(&mut self, val: u32) -> Result<()> {
165        AsyncWriteExt::write_all(self, &val.to_be_bytes()).await?;
166        Ok(())
167    }
168
169    async fn write_all(&mut self, buf: &[u8]) -> Result<()> {
170        AsyncWriteExt::write_all(self, buf).await?;
171        Ok(())
172    }
173
174    async fn flush(&mut self) -> Result<()> {
175        AsyncWriteExt::flush(self).await?;
176        Ok(())
177    }
178}
179
180/// Transport type for RPC client
181#[derive(Debug, Clone)]
182pub enum RpcTransport {
183    /// TCP socket transport
184    Tcp { host: String, port: u16 },
185    /// Unix domain socket transport
186    Unix { path: String },
187}
188
189/// Connection pool for RPC client
190#[derive(Debug)]
191struct ConnectionPool {
192    transport: RpcTransport,
193    connections: Arc<Mutex<Vec<TcpStream>>>,
194    unix_connections: Arc<Mutex<Vec<UnixStream>>>,
195    #[allow(dead_code)]
196    max_connections: usize,
197}
198
199/// RPC client for communicating with QuDAG nodes
200pub struct RpcClient {
201    transport: RpcTransport,
202    timeout: Duration,
203    retry_attempts: u32,
204    retry_delay: Duration,
205    pool: Option<ConnectionPool>,
206    auth_token: Option<String>,
207    auth_key: Option<MlDsaKeyPair>,
208    client_id: Option<String>,
209}
210
211impl RpcClient {
212    /// Create new RPC client with TCP transport
213    pub fn new_tcp(host: String, port: u16) -> Self {
214        Self {
215            transport: RpcTransport::Tcp { host, port },
216            timeout: Duration::from_secs(30),
217            retry_attempts: 3,
218            retry_delay: Duration::from_millis(500),
219            pool: None,
220            auth_token: None,
221            auth_key: None,
222            client_id: None,
223        }
224    }
225
226    /// Create new RPC client with Unix socket transport
227    pub fn new_unix(path: String) -> Self {
228        Self {
229            transport: RpcTransport::Unix { path },
230            timeout: Duration::from_secs(30),
231            retry_attempts: 3,
232            retry_delay: Duration::from_millis(500),
233            pool: None,
234            auth_token: None,
235            auth_key: None,
236            client_id: None,
237        }
238    }
239
240    /// Set request timeout
241    pub fn with_timeout(mut self, timeout: Duration) -> Self {
242        self.timeout = timeout;
243        self
244    }
245
246    /// Set retry configuration
247    pub fn with_retry(mut self, attempts: u32, delay: Duration) -> Self {
248        self.retry_attempts = attempts;
249        self.retry_delay = delay;
250        self
251    }
252
253    /// Enable connection pooling
254    pub fn with_pool(mut self, max_connections: usize) -> Self {
255        self.pool = Some(ConnectionPool {
256            transport: self.transport.clone(),
257            connections: Arc::new(Mutex::new(Vec::new())),
258            unix_connections: Arc::new(Mutex::new(Vec::new())),
259            max_connections,
260        });
261        self
262    }
263
264    /// Set authentication token
265    pub fn with_auth_token(mut self, token: String) -> Self {
266        self.auth_token = Some(token);
267        self
268    }
269
270    /// Set ML-DSA authentication
271    pub fn with_ml_dsa_auth(mut self, client_id: String, keypair: MlDsaKeyPair) -> Self {
272        self.client_id = Some(client_id);
273        self.auth_key = Some(keypair);
274        self
275    }
276
277    /// Connect to the RPC server
278    async fn connect(&self) -> Result<Box<dyn AsyncReadWrite>> {
279        match &self.transport {
280            RpcTransport::Tcp { host, port } => {
281                let stream = TcpStream::connect(format!("{}:{}", host, port)).await?;
282                Ok(Box::new(stream))
283            }
284            RpcTransport::Unix { path } => {
285                let stream = UnixStream::connect(path).await?;
286                Ok(Box::new(stream))
287            }
288        }
289    }
290
291    /// Get connection from pool or create new one
292    async fn get_connection(&self) -> Result<Box<dyn AsyncReadWrite>> {
293        if let Some(pool) = &self.pool {
294            match &pool.transport {
295                RpcTransport::Tcp { host, port } => {
296                    let mut conns = pool.connections.lock().await;
297                    if let Some(conn) = conns.pop() {
298                        // TODO: Check if connection is still alive
299                        return Ok(Box::new(conn));
300                    }
301                    drop(conns);
302                    // Create new connection
303                    let stream = TcpStream::connect(format!("{}:{}", host, port)).await?;
304                    Ok(Box::new(stream))
305                }
306                RpcTransport::Unix { path } => {
307                    let mut conns = pool.unix_connections.lock().await;
308                    if let Some(conn) = conns.pop() {
309                        return Ok(Box::new(conn));
310                    }
311                    drop(conns);
312                    let stream = UnixStream::connect(path).await?;
313                    Ok(Box::new(stream))
314                }
315            }
316        } else {
317            self.connect().await
318        }
319    }
320
321    /// Send RPC request with retry logic
322    async fn send_request(
323        &self,
324        method: &str,
325        mut params: serde_json::Value,
326    ) -> Result<serde_json::Value> {
327        // Add authentication to params if configured
328        if let Some(token) = &self.auth_token {
329            params["auth_token"] = serde_json::Value::String(token.clone());
330        } else if let (Some(client_id), Some(keypair)) = (&self.client_id, &self.auth_key) {
331            let request_id = Uuid::new_v4();
332            let message = format!("{}:{}", method, request_id);
333            let mut rng = rand::thread_rng();
334            let signature = keypair.sign(message.as_bytes(), &mut rng)?;
335            params["client_id"] = serde_json::Value::String(client_id.clone());
336            params["signature"] = serde_json::Value::String(hex::encode(signature));
337        }
338
339        let mut last_error = None;
340
341        for attempt in 0..self.retry_attempts {
342            if attempt > 0 {
343                sleep(self.retry_delay).await;
344                debug!(
345                    "Retrying RPC request, attempt {}/{}",
346                    attempt + 1,
347                    self.retry_attempts
348                );
349            }
350
351            match self.send_request_once(method, params.clone()).await {
352                Ok(result) => return Ok(result),
353                Err(e) => {
354                    warn!("RPC request failed: {}", e);
355                    last_error = Some(e);
356                }
357            }
358        }
359
360        Err(last_error.unwrap_or_else(|| anyhow!("All retry attempts failed")))
361    }
362
363    /// Send RPC request once (no retry)
364    async fn send_request_once(
365        &self,
366        method: &str,
367        params: serde_json::Value,
368    ) -> Result<serde_json::Value> {
369        let request = RpcRequest {
370            id: Uuid::new_v4(),
371            method: method.to_string(),
372            params,
373        };
374
375        let request_data = serde_json::to_vec(&request)?;
376
377        // Get connection
378        let mut stream = timeout(self.timeout, self.get_connection())
379            .await
380            .map_err(|_| anyhow!("Connection timeout"))??;
381
382        // Send request
383        timeout(self.timeout, async {
384            stream.write_u32(request_data.len() as u32).await?;
385            stream.write_all(&request_data).await?;
386            stream.flush().await?;
387            Ok::<(), anyhow::Error>(())
388        })
389        .await
390        .map_err(|_| anyhow!("Request send timeout"))??;
391
392        // Read response
393        let response_len = timeout(self.timeout, stream.read_u32())
394            .await
395            .map_err(|_| anyhow!("Response read timeout"))??;
396
397        if response_len > 10 * 1024 * 1024 {
398            return Err(anyhow!("Response too large: {} bytes", response_len));
399        }
400
401        let mut response_data = vec![0u8; response_len as usize];
402        timeout(self.timeout, stream.read_exact(&mut response_data))
403            .await
404            .map_err(|_| anyhow!("Response read timeout"))??;
405
406        let response: RpcResponse = serde_json::from_slice(&response_data)?;
407
408        if let Some(error) = response.error {
409            return Err(anyhow!("RPC error {}: {}", error.code, error.message));
410        }
411
412        response.result.ok_or_else(|| anyhow!("Empty response"))
413    }
414
415    /// Get node status
416    pub async fn get_status(&self) -> Result<NodeStatus> {
417        let result = self
418            .send_request("get_status", serde_json::Value::Null)
419            .await?;
420        Ok(serde_json::from_value(result)?)
421    }
422
423    /// Start node
424    pub async fn start_node(&self, config: NodeConfig) -> Result<()> {
425        let params = serde_json::to_value(config)?;
426        self.send_request("start", params).await?;
427        Ok(())
428    }
429
430    /// Stop node
431    pub async fn stop_node(&self) -> Result<()> {
432        self.send_request("stop", serde_json::Value::Null).await?;
433        Ok(())
434    }
435
436    /// Restart node
437    pub async fn restart_node(&self) -> Result<()> {
438        self.send_request("restart", serde_json::Value::Null)
439            .await?;
440        Ok(())
441    }
442
443    /// Add peer
444    pub async fn add_peer(&self, address: String) -> Result<String> {
445        let params = serde_json::json!({ "address": address });
446        let result = self.send_request("add_peer", params).await?;
447        Ok(serde_json::from_value::<serde_json::Value>(result)?
448            .get("message")
449            .and_then(|v| v.as_str())
450            .unwrap_or("Peer added successfully")
451            .to_string())
452    }
453
454    /// Remove peer
455    pub async fn remove_peer(&self, peer_id: String) -> Result<String> {
456        let params = serde_json::json!({ "peer_id": peer_id });
457        let result = self.send_request("remove_peer", params).await?;
458        Ok(serde_json::from_value::<serde_json::Value>(result)?
459            .get("message")
460            .and_then(|v| v.as_str())
461            .unwrap_or("Peer removed successfully")
462            .to_string())
463    }
464
465    /// List peers
466    pub async fn list_peers(&self) -> Result<Vec<PeerInfo>> {
467        let result = self
468            .send_request("list_peers", serde_json::Value::Null)
469            .await?;
470        Ok(serde_json::from_value(result)?)
471    }
472
473    /// Get peer information
474    pub async fn get_peer_info(&self, peer_id: String) -> Result<PeerInfo> {
475        let params = serde_json::json!({ "peer_id": peer_id });
476        let result = self.send_request("get_peer_info", params).await?;
477        Ok(serde_json::from_value(result)?)
478    }
479
480    /// Ban peer
481    pub async fn ban_peer(&self, peer_id: String) -> Result<String> {
482        let params = serde_json::json!({ "peer_id": peer_id });
483        let result = self.send_request("ban_peer", params).await?;
484        Ok(serde_json::from_value::<serde_json::Value>(result)?
485            .get("message")
486            .and_then(|v| v.as_str())
487            .unwrap_or("Peer banned successfully")
488            .to_string())
489    }
490
491    /// Unban peer
492    pub async fn unban_peer(&self, address: String) -> Result<String> {
493        let params = serde_json::json!({ "address": address });
494        let result = self.send_request("unban_peer", params).await?;
495        Ok(serde_json::from_value::<serde_json::Value>(result)?
496            .get("message")
497            .and_then(|v| v.as_str())
498            .unwrap_or("Peer unbanned successfully")
499            .to_string())
500    }
501
502    /// Get network statistics
503    pub async fn get_network_stats(&self) -> Result<NetworkStats> {
504        let result = self
505            .send_request("get_network_stats", serde_json::Value::Null)
506            .await?;
507        Ok(serde_json::from_value(result)?)
508    }
509
510    /// Test network connectivity
511    pub async fn test_network(&self) -> Result<Vec<NetworkTestResult>> {
512        let result = self
513            .send_request("test_network", serde_json::Value::Null)
514            .await?;
515        Ok(serde_json::from_value(result)?)
516    }
517
518    /// Get wallet information
519    pub async fn get_wallet_info(&self) -> Result<WalletInfo> {
520        let result = self
521            .send_request("get_wallet_info", serde_json::Value::Null)
522            .await?;
523        Ok(serde_json::from_value(result)?)
524    }
525
526    /// Create new wallet
527    pub async fn create_wallet(&self, password: String) -> Result<String> {
528        let params = serde_json::json!({ "password": password });
529        let result = self.send_request("create_wallet", params).await?;
530        Ok(serde_json::from_value(result)?)
531    }
532
533    /// Import wallet from seed
534    pub async fn import_wallet(&self, seed: String, password: String) -> Result<()> {
535        let params = serde_json::json!({ "seed": seed, "password": password });
536        self.send_request("import_wallet", params).await?;
537        Ok(())
538    }
539
540    /// Export wallet seed
541    pub async fn export_wallet(&self, password: String) -> Result<String> {
542        let params = serde_json::json!({ "password": password });
543        let result = self.send_request("export_wallet", params).await?;
544        Ok(serde_json::from_value(result)?)
545    }
546
547    /// Get DAG visualization data
548    pub async fn get_dag_data(&self) -> Result<serde_json::Value> {
549        self.send_request("get_dag_data", serde_json::Value::Null)
550            .await
551    }
552
553    /// Debug network
554    pub async fn debug_network(&self) -> Result<serde_json::Value> {
555        self.send_request("debug_network", serde_json::Value::Null)
556            .await
557    }
558
559    /// Debug consensus
560    pub async fn debug_consensus(&self) -> Result<serde_json::Value> {
561        self.send_request("debug_consensus", serde_json::Value::Null)
562            .await
563    }
564
565    /// Debug performance
566    pub async fn debug_performance(&self) -> Result<serde_json::Value> {
567        self.send_request("debug_performance", serde_json::Value::Null)
568            .await
569    }
570
571    /// Security audit
572    pub async fn security_audit(&self) -> Result<serde_json::Value> {
573        self.send_request("security_audit", serde_json::Value::Null)
574            .await
575    }
576
577    /// Get configuration
578    pub async fn get_config(&self) -> Result<serde_json::Value> {
579        self.send_request("get_config", serde_json::Value::Null)
580            .await
581    }
582
583    /// Update configuration
584    pub async fn update_config(&self, config: serde_json::Value) -> Result<()> {
585        self.send_request("update_config", config).await?;
586        Ok(())
587    }
588
589    /// Validate configuration
590    pub async fn validate_config(&self, config: serde_json::Value) -> Result<bool> {
591        let params = serde_json::json!({ "config": config });
592        let result = self.send_request("validate_config", params).await?;
593        Ok(serde_json::from_value(result)?)
594    }
595}
596
597/// Check if node is running
598pub async fn is_node_running(port: u16) -> bool {
599    TcpStream::connect(format!("127.0.0.1:{}", port))
600        .await
601        .is_ok()
602}
603
604/// Wait for node to start
605pub async fn wait_for_node_start(port: u16, timeout_secs: u64) -> Result<()> {
606    let start = std::time::Instant::now();
607    let timeout_duration = Duration::from_secs(timeout_secs);
608
609    while start.elapsed() < timeout_duration {
610        if is_node_running(port).await {
611            return Ok(());
612        }
613        tokio::time::sleep(Duration::from_millis(500)).await;
614    }
615
616    Err(anyhow!(
617        "Node failed to start within {} seconds",
618        timeout_secs
619    ))
620}
621
622#[cfg(test)]
623mod tests {
624    use super::*;
625
626    #[test]
627    fn test_rpc_request_serialization() {
628        let request = RpcRequest {
629            id: Uuid::new_v4(),
630            method: "test_method".to_string(),
631            params: serde_json::json!({"key": "value"}),
632        };
633
634        let serialized = serde_json::to_string(&request).unwrap();
635        let deserialized: RpcRequest = serde_json::from_str(&serialized).unwrap();
636
637        assert_eq!(request.method, deserialized.method);
638    }
639
640    #[test]
641    fn test_rpc_response_serialization() {
642        let response = RpcResponse {
643            id: Uuid::new_v4(),
644            result: Some(serde_json::json!({"status": "ok"})),
645            error: None,
646        };
647
648        let serialized = serde_json::to_string(&response).unwrap();
649        let deserialized: RpcResponse = serde_json::from_str(&serialized).unwrap();
650
651        assert!(deserialized.result.is_some());
652        assert!(deserialized.error.is_none());
653    }
654}