1use anyhow::{anyhow, Result};
2use qudag_crypto::ml_dsa::MlDsaKeyPair;
3use qudag_protocol::NodeConfig;
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6use tokio::io::AsyncWriteExt;
7use tokio::net::{TcpStream, UnixStream};
8use tokio::sync::Mutex;
9use tokio::time::{sleep, timeout, Duration};
10use tracing::{debug, warn};
11use uuid::Uuid;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct RpcRequest {
16 pub id: Uuid,
17 pub method: String,
18 pub params: serde_json::Value,
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct RpcResponse {
24 pub id: Uuid,
25 pub result: Option<serde_json::Value>,
26 pub error: Option<RpcError>,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct RpcError {
32 pub code: i32,
33 pub message: String,
34 pub data: Option<serde_json::Value>,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct NodeStatus {
40 pub node_id: String,
41 pub state: String,
42 pub uptime: u64,
43 pub peers: Vec<PeerInfo>,
44 pub network_stats: NetworkStats,
45 pub dag_stats: DagStats,
46 pub memory_usage: MemoryStats,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct PeerInfo {
52 pub id: String,
53 pub address: String,
54 pub connected_duration: u64,
55 pub messages_sent: u64,
56 pub messages_received: u64,
57 pub last_seen: u64,
58 pub status: String,
59 pub latency: Option<f64>,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct NetworkStats {
65 pub total_connections: usize,
66 pub active_connections: usize,
67 pub messages_sent: u64,
68 pub messages_received: u64,
69 pub bytes_sent: u64,
70 pub bytes_received: u64,
71 pub average_latency: f64,
72 pub uptime: u64,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct DagStats {
78 pub vertex_count: usize,
79 pub edge_count: usize,
80 pub tip_count: usize,
81 pub finalized_height: u64,
82 pub pending_transactions: usize,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct MemoryStats {
88 pub total_allocated: usize,
89 pub current_usage: usize,
90 pub peak_usage: usize,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct WalletInfo {
96 pub public_key: String,
97 pub balance: u64,
98 pub address: String,
99 pub key_type: String,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct NetworkTestResult {
105 pub peer_id: String,
106 pub address: String,
107 pub reachable: bool,
108 pub latency: Option<f64>,
109 pub error: Option<String>,
110}
111
112#[async_trait::async_trait]
114trait AsyncReadWrite: Send + Sync {
115 async fn read_u32(&mut self) -> Result<u32>;
116 async fn read_exact(&mut self, buf: &mut [u8]) -> Result<()>;
117 async fn write_u32(&mut self, val: u32) -> Result<()>;
118 async fn write_all(&mut self, buf: &[u8]) -> Result<()>;
119 async fn flush(&mut self) -> Result<()>;
120}
121
122#[async_trait::async_trait]
123impl AsyncReadWrite for TcpStream {
124 async fn read_u32(&mut self) -> Result<u32> {
125 let mut buf = [0u8; 4];
126 tokio::io::AsyncReadExt::read_exact(self, &mut buf).await?;
127 Ok(u32::from_be_bytes(buf))
128 }
129
130 async fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
131 tokio::io::AsyncReadExt::read_exact(self, buf).await?;
132 Ok(())
133 }
134
135 async fn write_u32(&mut self, val: u32) -> Result<()> {
136 AsyncWriteExt::write_all(self, &val.to_be_bytes()).await?;
137 Ok(())
138 }
139
140 async fn write_all(&mut self, buf: &[u8]) -> Result<()> {
141 AsyncWriteExt::write_all(self, buf).await?;
142 Ok(())
143 }
144
145 async fn flush(&mut self) -> Result<()> {
146 AsyncWriteExt::flush(self).await?;
147 Ok(())
148 }
149}
150
151#[async_trait::async_trait]
152impl AsyncReadWrite for UnixStream {
153 async fn read_u32(&mut self) -> Result<u32> {
154 let mut buf = [0u8; 4];
155 tokio::io::AsyncReadExt::read_exact(self, &mut buf).await?;
156 Ok(u32::from_be_bytes(buf))
157 }
158
159 async fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
160 tokio::io::AsyncReadExt::read_exact(self, buf).await?;
161 Ok(())
162 }
163
164 async fn write_u32(&mut self, val: u32) -> Result<()> {
165 AsyncWriteExt::write_all(self, &val.to_be_bytes()).await?;
166 Ok(())
167 }
168
169 async fn write_all(&mut self, buf: &[u8]) -> Result<()> {
170 AsyncWriteExt::write_all(self, buf).await?;
171 Ok(())
172 }
173
174 async fn flush(&mut self) -> Result<()> {
175 AsyncWriteExt::flush(self).await?;
176 Ok(())
177 }
178}
179
180#[derive(Debug, Clone)]
182pub enum RpcTransport {
183 Tcp { host: String, port: u16 },
185 Unix { path: String },
187}
188
189#[derive(Debug)]
191struct ConnectionPool {
192 transport: RpcTransport,
193 connections: Arc<Mutex<Vec<TcpStream>>>,
194 unix_connections: Arc<Mutex<Vec<UnixStream>>>,
195 #[allow(dead_code)]
196 max_connections: usize,
197}
198
199pub struct RpcClient {
201 transport: RpcTransport,
202 timeout: Duration,
203 retry_attempts: u32,
204 retry_delay: Duration,
205 pool: Option<ConnectionPool>,
206 auth_token: Option<String>,
207 auth_key: Option<MlDsaKeyPair>,
208 client_id: Option<String>,
209}
210
211impl RpcClient {
212 pub fn new_tcp(host: String, port: u16) -> Self {
214 Self {
215 transport: RpcTransport::Tcp { host, port },
216 timeout: Duration::from_secs(30),
217 retry_attempts: 3,
218 retry_delay: Duration::from_millis(500),
219 pool: None,
220 auth_token: None,
221 auth_key: None,
222 client_id: None,
223 }
224 }
225
226 pub fn new_unix(path: String) -> Self {
228 Self {
229 transport: RpcTransport::Unix { path },
230 timeout: Duration::from_secs(30),
231 retry_attempts: 3,
232 retry_delay: Duration::from_millis(500),
233 pool: None,
234 auth_token: None,
235 auth_key: None,
236 client_id: None,
237 }
238 }
239
240 pub fn with_timeout(mut self, timeout: Duration) -> Self {
242 self.timeout = timeout;
243 self
244 }
245
246 pub fn with_retry(mut self, attempts: u32, delay: Duration) -> Self {
248 self.retry_attempts = attempts;
249 self.retry_delay = delay;
250 self
251 }
252
253 pub fn with_pool(mut self, max_connections: usize) -> Self {
255 self.pool = Some(ConnectionPool {
256 transport: self.transport.clone(),
257 connections: Arc::new(Mutex::new(Vec::new())),
258 unix_connections: Arc::new(Mutex::new(Vec::new())),
259 max_connections,
260 });
261 self
262 }
263
264 pub fn with_auth_token(mut self, token: String) -> Self {
266 self.auth_token = Some(token);
267 self
268 }
269
270 pub fn with_ml_dsa_auth(mut self, client_id: String, keypair: MlDsaKeyPair) -> Self {
272 self.client_id = Some(client_id);
273 self.auth_key = Some(keypair);
274 self
275 }
276
277 async fn connect(&self) -> Result<Box<dyn AsyncReadWrite>> {
279 match &self.transport {
280 RpcTransport::Tcp { host, port } => {
281 let stream = TcpStream::connect(format!("{}:{}", host, port)).await?;
282 Ok(Box::new(stream))
283 }
284 RpcTransport::Unix { path } => {
285 let stream = UnixStream::connect(path).await?;
286 Ok(Box::new(stream))
287 }
288 }
289 }
290
291 async fn get_connection(&self) -> Result<Box<dyn AsyncReadWrite>> {
293 if let Some(pool) = &self.pool {
294 match &pool.transport {
295 RpcTransport::Tcp { host, port } => {
296 let mut conns = pool.connections.lock().await;
297 if let Some(conn) = conns.pop() {
298 return Ok(Box::new(conn));
300 }
301 drop(conns);
302 let stream = TcpStream::connect(format!("{}:{}", host, port)).await?;
304 Ok(Box::new(stream))
305 }
306 RpcTransport::Unix { path } => {
307 let mut conns = pool.unix_connections.lock().await;
308 if let Some(conn) = conns.pop() {
309 return Ok(Box::new(conn));
310 }
311 drop(conns);
312 let stream = UnixStream::connect(path).await?;
313 Ok(Box::new(stream))
314 }
315 }
316 } else {
317 self.connect().await
318 }
319 }
320
321 async fn send_request(
323 &self,
324 method: &str,
325 mut params: serde_json::Value,
326 ) -> Result<serde_json::Value> {
327 if let Some(token) = &self.auth_token {
329 params["auth_token"] = serde_json::Value::String(token.clone());
330 } else if let (Some(client_id), Some(keypair)) = (&self.client_id, &self.auth_key) {
331 let request_id = Uuid::new_v4();
332 let message = format!("{}:{}", method, request_id);
333 let mut rng = rand::thread_rng();
334 let signature = keypair.sign(message.as_bytes(), &mut rng)?;
335 params["client_id"] = serde_json::Value::String(client_id.clone());
336 params["signature"] = serde_json::Value::String(hex::encode(signature));
337 }
338
339 let mut last_error = None;
340
341 for attempt in 0..self.retry_attempts {
342 if attempt > 0 {
343 sleep(self.retry_delay).await;
344 debug!(
345 "Retrying RPC request, attempt {}/{}",
346 attempt + 1,
347 self.retry_attempts
348 );
349 }
350
351 match self.send_request_once(method, params.clone()).await {
352 Ok(result) => return Ok(result),
353 Err(e) => {
354 warn!("RPC request failed: {}", e);
355 last_error = Some(e);
356 }
357 }
358 }
359
360 Err(last_error.unwrap_or_else(|| anyhow!("All retry attempts failed")))
361 }
362
363 async fn send_request_once(
365 &self,
366 method: &str,
367 params: serde_json::Value,
368 ) -> Result<serde_json::Value> {
369 let request = RpcRequest {
370 id: Uuid::new_v4(),
371 method: method.to_string(),
372 params,
373 };
374
375 let request_data = serde_json::to_vec(&request)?;
376
377 let mut stream = timeout(self.timeout, self.get_connection())
379 .await
380 .map_err(|_| anyhow!("Connection timeout"))??;
381
382 timeout(self.timeout, async {
384 stream.write_u32(request_data.len() as u32).await?;
385 stream.write_all(&request_data).await?;
386 stream.flush().await?;
387 Ok::<(), anyhow::Error>(())
388 })
389 .await
390 .map_err(|_| anyhow!("Request send timeout"))??;
391
392 let response_len = timeout(self.timeout, stream.read_u32())
394 .await
395 .map_err(|_| anyhow!("Response read timeout"))??;
396
397 if response_len > 10 * 1024 * 1024 {
398 return Err(anyhow!("Response too large: {} bytes", response_len));
399 }
400
401 let mut response_data = vec![0u8; response_len as usize];
402 timeout(self.timeout, stream.read_exact(&mut response_data))
403 .await
404 .map_err(|_| anyhow!("Response read timeout"))??;
405
406 let response: RpcResponse = serde_json::from_slice(&response_data)?;
407
408 if let Some(error) = response.error {
409 return Err(anyhow!("RPC error {}: {}", error.code, error.message));
410 }
411
412 response.result.ok_or_else(|| anyhow!("Empty response"))
413 }
414
415 pub async fn get_status(&self) -> Result<NodeStatus> {
417 let result = self
418 .send_request("get_status", serde_json::Value::Null)
419 .await?;
420 Ok(serde_json::from_value(result)?)
421 }
422
423 pub async fn start_node(&self, config: NodeConfig) -> Result<()> {
425 let params = serde_json::to_value(config)?;
426 self.send_request("start", params).await?;
427 Ok(())
428 }
429
430 pub async fn stop_node(&self) -> Result<()> {
432 self.send_request("stop", serde_json::Value::Null).await?;
433 Ok(())
434 }
435
436 pub async fn restart_node(&self) -> Result<()> {
438 self.send_request("restart", serde_json::Value::Null)
439 .await?;
440 Ok(())
441 }
442
443 pub async fn add_peer(&self, address: String) -> Result<String> {
445 let params = serde_json::json!({ "address": address });
446 let result = self.send_request("add_peer", params).await?;
447 Ok(serde_json::from_value::<serde_json::Value>(result)?
448 .get("message")
449 .and_then(|v| v.as_str())
450 .unwrap_or("Peer added successfully")
451 .to_string())
452 }
453
454 pub async fn remove_peer(&self, peer_id: String) -> Result<String> {
456 let params = serde_json::json!({ "peer_id": peer_id });
457 let result = self.send_request("remove_peer", params).await?;
458 Ok(serde_json::from_value::<serde_json::Value>(result)?
459 .get("message")
460 .and_then(|v| v.as_str())
461 .unwrap_or("Peer removed successfully")
462 .to_string())
463 }
464
465 pub async fn list_peers(&self) -> Result<Vec<PeerInfo>> {
467 let result = self
468 .send_request("list_peers", serde_json::Value::Null)
469 .await?;
470 Ok(serde_json::from_value(result)?)
471 }
472
473 pub async fn get_peer_info(&self, peer_id: String) -> Result<PeerInfo> {
475 let params = serde_json::json!({ "peer_id": peer_id });
476 let result = self.send_request("get_peer_info", params).await?;
477 Ok(serde_json::from_value(result)?)
478 }
479
480 pub async fn ban_peer(&self, peer_id: String) -> Result<String> {
482 let params = serde_json::json!({ "peer_id": peer_id });
483 let result = self.send_request("ban_peer", params).await?;
484 Ok(serde_json::from_value::<serde_json::Value>(result)?
485 .get("message")
486 .and_then(|v| v.as_str())
487 .unwrap_or("Peer banned successfully")
488 .to_string())
489 }
490
491 pub async fn unban_peer(&self, address: String) -> Result<String> {
493 let params = serde_json::json!({ "address": address });
494 let result = self.send_request("unban_peer", params).await?;
495 Ok(serde_json::from_value::<serde_json::Value>(result)?
496 .get("message")
497 .and_then(|v| v.as_str())
498 .unwrap_or("Peer unbanned successfully")
499 .to_string())
500 }
501
502 pub async fn get_network_stats(&self) -> Result<NetworkStats> {
504 let result = self
505 .send_request("get_network_stats", serde_json::Value::Null)
506 .await?;
507 Ok(serde_json::from_value(result)?)
508 }
509
510 pub async fn test_network(&self) -> Result<Vec<NetworkTestResult>> {
512 let result = self
513 .send_request("test_network", serde_json::Value::Null)
514 .await?;
515 Ok(serde_json::from_value(result)?)
516 }
517
518 pub async fn get_wallet_info(&self) -> Result<WalletInfo> {
520 let result = self
521 .send_request("get_wallet_info", serde_json::Value::Null)
522 .await?;
523 Ok(serde_json::from_value(result)?)
524 }
525
526 pub async fn create_wallet(&self, password: String) -> Result<String> {
528 let params = serde_json::json!({ "password": password });
529 let result = self.send_request("create_wallet", params).await?;
530 Ok(serde_json::from_value(result)?)
531 }
532
533 pub async fn import_wallet(&self, seed: String, password: String) -> Result<()> {
535 let params = serde_json::json!({ "seed": seed, "password": password });
536 self.send_request("import_wallet", params).await?;
537 Ok(())
538 }
539
540 pub async fn export_wallet(&self, password: String) -> Result<String> {
542 let params = serde_json::json!({ "password": password });
543 let result = self.send_request("export_wallet", params).await?;
544 Ok(serde_json::from_value(result)?)
545 }
546
547 pub async fn get_dag_data(&self) -> Result<serde_json::Value> {
549 self.send_request("get_dag_data", serde_json::Value::Null)
550 .await
551 }
552
553 pub async fn debug_network(&self) -> Result<serde_json::Value> {
555 self.send_request("debug_network", serde_json::Value::Null)
556 .await
557 }
558
559 pub async fn debug_consensus(&self) -> Result<serde_json::Value> {
561 self.send_request("debug_consensus", serde_json::Value::Null)
562 .await
563 }
564
565 pub async fn debug_performance(&self) -> Result<serde_json::Value> {
567 self.send_request("debug_performance", serde_json::Value::Null)
568 .await
569 }
570
571 pub async fn security_audit(&self) -> Result<serde_json::Value> {
573 self.send_request("security_audit", serde_json::Value::Null)
574 .await
575 }
576
577 pub async fn get_config(&self) -> Result<serde_json::Value> {
579 self.send_request("get_config", serde_json::Value::Null)
580 .await
581 }
582
583 pub async fn update_config(&self, config: serde_json::Value) -> Result<()> {
585 self.send_request("update_config", config).await?;
586 Ok(())
587 }
588
589 pub async fn validate_config(&self, config: serde_json::Value) -> Result<bool> {
591 let params = serde_json::json!({ "config": config });
592 let result = self.send_request("validate_config", params).await?;
593 Ok(serde_json::from_value(result)?)
594 }
595}
596
597pub async fn is_node_running(port: u16) -> bool {
599 TcpStream::connect(format!("127.0.0.1:{}", port))
600 .await
601 .is_ok()
602}
603
604pub async fn wait_for_node_start(port: u16, timeout_secs: u64) -> Result<()> {
606 let start = std::time::Instant::now();
607 let timeout_duration = Duration::from_secs(timeout_secs);
608
609 while start.elapsed() < timeout_duration {
610 if is_node_running(port).await {
611 return Ok(());
612 }
613 tokio::time::sleep(Duration::from_millis(500)).await;
614 }
615
616 Err(anyhow!(
617 "Node failed to start within {} seconds",
618 timeout_secs
619 ))
620}
621
622#[cfg(test)]
623mod tests {
624 use super::*;
625
626 #[test]
627 fn test_rpc_request_serialization() {
628 let request = RpcRequest {
629 id: Uuid::new_v4(),
630 method: "test_method".to_string(),
631 params: serde_json::json!({"key": "value"}),
632 };
633
634 let serialized = serde_json::to_string(&request).unwrap();
635 let deserialized: RpcRequest = serde_json::from_str(&serialized).unwrap();
636
637 assert_eq!(request.method, deserialized.method);
638 }
639
640 #[test]
641 fn test_rpc_response_serialization() {
642 let response = RpcResponse {
643 id: Uuid::new_v4(),
644 result: Some(serde_json::json!({"status": "ok"})),
645 error: None,
646 };
647
648 let serialized = serde_json::to_string(&response).unwrap();
649 let deserialized: RpcResponse = serde_json::from_str(&serialized).unwrap();
650
651 assert!(deserialized.result.is_some());
652 assert!(deserialized.error.is_none());
653 }
654}