1use crate::ProtocolError;
2use qudag_crypto::ml_dsa::MlDsaPublicKey;
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{SystemTime, UNIX_EPOCH};
8use tokio::io::{AsyncReadExt, AsyncWriteExt};
9use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
10use tokio::sync::{mpsc, oneshot, Mutex, RwLock};
11use tokio::time::{timeout, Duration};
12use tracing::{debug, error, info, warn};
13use uuid::Uuid;
14
15trait ReadU32Ext: AsyncReadExt + Unpin {
17 async fn read_u32(&mut self) -> std::io::Result<u32> {
18 let mut buf = [0u8; 4];
19 self.read_exact(&mut buf).await?;
20 Ok(u32::from_be_bytes(buf))
21 }
22}
23
24impl<T: AsyncReadExt + Unpin> ReadU32Ext for T {}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct RpcRequest {
29 pub id: Uuid,
30 pub method: String,
31 pub params: serde_json::Value,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct RpcResponse {
37 pub id: Uuid,
38 pub result: Option<serde_json::Value>,
39 pub error: Option<RpcError>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct RpcError {
45 pub code: i32,
46 pub message: String,
47 pub data: Option<serde_json::Value>,
48}
49
50#[derive(Debug, Clone)]
52pub enum RpcCommand {
53 Stop,
54 GetStatus,
55 ListPeers,
56 AddPeer(String),
57 RemovePeer(String),
58 GetPeerInfo(String),
59 BanPeer(String),
60 UnbanPeer(String),
61 GetNetworkStats,
62 TestNetwork,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct PeerInfo {
68 pub id: String,
69 pub address: String,
70 pub connected_duration: u64,
71 pub messages_sent: u64,
72 pub messages_received: u64,
73 pub last_seen: u64,
74 pub status: String,
75 pub latency: Option<f64>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct NetworkStats {
81 pub total_connections: usize,
82 pub active_connections: usize,
83 pub messages_sent: u64,
84 pub messages_received: u64,
85 pub bytes_sent: u64,
86 pub bytes_received: u64,
87 pub average_latency: f64,
88 pub uptime: u64,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct NetworkTestResult {
94 pub peer_id: String,
95 pub address: String,
96 pub reachable: bool,
97 pub latency: Option<f64>,
98 pub error: Option<String>,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct DagStats {
104 pub vertex_count: usize,
105 pub edge_count: usize,
106 pub tip_count: usize,
107 pub finalized_height: u64,
108 pub pending_transactions: usize,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct MemoryStats {
114 pub total_allocated: usize,
115 pub current_usage: usize,
116 pub peak_usage: usize,
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct NodeStatus {
122 pub node_id: String,
123 pub state: String,
124 pub uptime: u64,
125 pub peers: Vec<PeerInfo>,
126 pub network_stats: NetworkStats,
127 pub dag_stats: DagStats,
128 pub memory_usage: MemoryStats,
129}
130
131#[derive(Debug, Clone)]
133pub enum RpcTransport {
134 Tcp(String),
136 Unix(String),
138}
139
140type NodeRunnerHandle = Arc<RwLock<dyn NodeRunnerTrait + Send + Sync>>;
142
143pub trait NodeRunnerTrait: Send + Sync + std::fmt::Debug {
145 fn get_status(
146 &self,
147 ) -> Pin<
148 Box<
149 dyn std::future::Future<
150 Output = Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>>,
151 > + Send,
152 >,
153 >;
154 fn get_connected_peers(
155 &self,
156 ) -> Pin<Box<dyn std::future::Future<Output = Vec<PeerInfo>> + Send>>;
157 fn dial_peer(
158 &self,
159 address: String,
160 ) -> Pin<Box<dyn std::future::Future<Output = Result<(), String>> + Send>>;
161 fn disconnect_peer(
162 &self,
163 peer_id: &str,
164 ) -> Pin<Box<dyn std::future::Future<Output = Result<(), String>> + Send>>;
165 fn get_network_stats(&self) -> Pin<Box<dyn std::future::Future<Output = NetworkStats> + Send>>;
166 fn shutdown(
167 &self,
168 ) -> Pin<
169 Box<
170 dyn std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
171 + Send,
172 >,
173 >;
174}
175
176pub struct RpcServer {
178 transport: RpcTransport,
179 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
180 command_tx: mpsc::Sender<(RpcCommand, tokio::sync::oneshot::Sender<serde_json::Value>)>,
181 network_manager: Arc<RwLock<NetworkManager>>,
182 node_handle: Option<NodeRunnerHandle>,
184 node_shutdown_tx: Option<oneshot::Sender<()>>,
186 auth_token: Option<String>,
187 rate_limiter: Arc<Mutex<RateLimiter>>,
188 auth_keys: Arc<RwLock<HashMap<String, MlDsaPublicKey>>>,
189 #[allow(dead_code)]
190 start_time: SystemTime,
191}
192
193#[derive(Debug)]
195pub struct NetworkManager {
196 mock_peers: HashMap<String, PeerInfo>,
198 banned_peers: std::collections::HashSet<String>,
200 network_stats: NetworkStats,
202 start_time: SystemTime,
204 node_handle: Option<NodeRunnerHandle>,
206}
207
208#[derive(Debug)]
210struct RateLimiter {
211 requests: HashMap<String, Vec<SystemTime>>,
212 max_requests_per_minute: usize,
213}
214
215impl NetworkManager {
216 fn new() -> Self {
217 Self {
218 mock_peers: HashMap::new(),
219 banned_peers: std::collections::HashSet::new(),
220 network_stats: NetworkStats {
221 total_connections: 0,
222 active_connections: 0,
223 messages_sent: 0,
224 messages_received: 0,
225 bytes_sent: 0,
226 bytes_received: 0,
227 average_latency: 0.0,
228 uptime: 0,
229 },
230 start_time: SystemTime::now(),
231 node_handle: None,
232 }
233 }
234
235 pub fn set_node_handle(&mut self, handle: NodeRunnerHandle) {
237 self.node_handle = Some(handle);
238 }
239
240 async fn add_peer(&mut self, address: String) -> Result<(), String> {
241 if self.banned_peers.contains(&address) {
242 return Err("Peer is banned".to_string());
243 }
244
245 if let Some(node) = &self.node_handle {
247 let node_guard = node.read().await;
248 return node_guard.dial_peer(address).await;
249 }
250
251 let peer_id = format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8]);
253 let peer_info = PeerInfo {
254 id: peer_id.clone(),
255 address: address.clone(),
256 connected_duration: 0,
257 messages_sent: 0,
258 messages_received: 0,
259 last_seen: SystemTime::now()
260 .duration_since(UNIX_EPOCH)
261 .unwrap()
262 .as_secs(),
263 status: "Connected".to_string(),
264 latency: None,
265 };
266
267 self.mock_peers.insert(peer_id, peer_info);
268 self.network_stats.total_connections += 1;
269 self.network_stats.active_connections += 1;
270 Ok(())
271 }
272
273 async fn remove_peer(&mut self, peer_id: &str) -> Result<(), String> {
274 if let Some(node) = &self.node_handle {
276 let node_guard = node.read().await;
277 return node_guard.disconnect_peer(peer_id).await;
278 }
279
280 if self.mock_peers.remove(peer_id).is_some() {
282 self.network_stats.active_connections =
283 self.network_stats.active_connections.saturating_sub(1);
284 Ok(())
285 } else {
286 Err("Peer not found".to_string())
287 }
288 }
289
290 async fn get_peer_info(&self, peer_id: &str) -> Option<PeerInfo> {
291 if let Some(node) = &self.node_handle {
293 let node_guard = node.read().await;
294 let connected_peers = node_guard.get_connected_peers().await;
295 return connected_peers.into_iter().find(|p| p.id == peer_id);
296 }
297
298 self.mock_peers.get(peer_id).cloned()
300 }
301
302 async fn list_peers(&self) -> Vec<PeerInfo> {
303 if let Some(node) = &self.node_handle {
305 let node_guard = node.read().await;
306 return node_guard.get_connected_peers().await;
307 }
308
309 self.mock_peers.values().cloned().collect()
311 }
312
313 async fn ban_peer(&mut self, peer_id: &str) -> Result<(), String> {
314 let peer_address = if let Some(node) = &self.node_handle {
316 let node_guard = node.read().await;
317 let connected_peers = node_guard.get_connected_peers().await;
318 connected_peers
319 .into_iter()
320 .find(|p| p.id == peer_id)
321 .map(|p| p.address)
322 } else {
323 self.mock_peers.get(peer_id).map(|p| p.address.clone())
324 };
325
326 if let Some(address) = peer_address {
327 self.banned_peers.insert(address);
328 self.remove_peer(peer_id).await?;
329 Ok(())
330 } else {
331 Err("Peer not found".to_string())
332 }
333 }
334
335 fn unban_peer(&mut self, address: &str) -> Result<(), String> {
336 if self.banned_peers.remove(address) {
337 Ok(())
338 } else {
339 Err("Peer is not banned".to_string())
340 }
341 }
342
343 async fn get_network_stats(&mut self) -> NetworkStats {
344 if let Some(node) = &self.node_handle {
346 let node_guard = node.read().await;
347 return node_guard.get_network_stats().await;
348 }
349
350 self.network_stats.uptime = self.start_time.elapsed().unwrap_or_default().as_secs();
352 self.network_stats.clone()
353 }
354
355 async fn test_network(&self) -> Vec<NetworkTestResult> {
356 let mut results = Vec::new();
357
358 let peers = if let Some(node) = &self.node_handle {
359 let node_guard = node.read().await;
360 node_guard.get_connected_peers().await
361 } else {
362 self.mock_peers.values().cloned().collect()
363 };
364
365 for peer in peers {
366 let result = self.test_peer_connectivity(&peer).await;
367 results.push(result);
368 }
369
370 results
371 }
372
373 async fn test_peer_connectivity(&self, peer: &PeerInfo) -> NetworkTestResult {
374 let start = std::time::Instant::now();
376
377 match peer.address.parse::<std::net::SocketAddr>() {
379 Ok(addr) => {
380 match timeout(Duration::from_secs(5), tokio::net::TcpStream::connect(addr)).await {
381 Ok(Ok(_)) => NetworkTestResult {
382 peer_id: peer.id.clone(),
383 address: peer.address.clone(),
384 reachable: true,
385 latency: Some(start.elapsed().as_millis() as f64),
386 error: None,
387 },
388 Ok(Err(e)) => NetworkTestResult {
389 peer_id: peer.id.clone(),
390 address: peer.address.clone(),
391 reachable: false,
392 latency: None,
393 error: Some(e.to_string()),
394 },
395 Err(_) => NetworkTestResult {
396 peer_id: peer.id.clone(),
397 address: peer.address.clone(),
398 reachable: false,
399 latency: None,
400 error: Some("Connection timeout".to_string()),
401 },
402 }
403 }
404 Err(e) => NetworkTestResult {
405 peer_id: peer.id.clone(),
406 address: peer.address.clone(),
407 reachable: false,
408 latency: None,
409 error: Some(format!("Invalid address: {}", e)),
410 },
411 }
412 }
413}
414
415impl RateLimiter {
416 fn new(max_requests_per_minute: usize) -> Self {
417 Self {
418 requests: HashMap::new(),
419 max_requests_per_minute,
420 }
421 }
422
423 fn check_rate_limit(&mut self, client_ip: &str) -> bool {
424 let now = SystemTime::now();
425 let requests = self.requests.entry(client_ip.to_string()).or_default();
426
427 requests.retain(|&time| now.duration_since(time).unwrap_or_default().as_secs() < 60);
429
430 if requests.len() >= self.max_requests_per_minute {
431 false
432 } else {
433 requests.push(now);
434 true
435 }
436 }
437}
438
439impl RpcServer {
440 pub fn new_tcp(
442 port: u16,
443 ) -> (
444 Self,
445 mpsc::Receiver<(RpcCommand, tokio::sync::oneshot::Sender<serde_json::Value>)>,
446 ) {
447 let (command_tx, command_rx) = mpsc::channel(100);
448
449 let server = Self {
450 transport: RpcTransport::Tcp(format!("127.0.0.1:{}", port)),
451 shutdown_tx: None,
452 command_tx,
453 network_manager: Arc::new(RwLock::new(NetworkManager::new())),
454 node_handle: None,
455 node_shutdown_tx: None,
456 auth_token: std::env::var("RPC_AUTH_TOKEN").ok(),
457 rate_limiter: Arc::new(Mutex::new(RateLimiter::new(60))), auth_keys: Arc::new(RwLock::new(HashMap::new())),
459 start_time: SystemTime::now(),
460 };
461
462 (server, command_rx)
463 }
464
465 pub fn new_unix(
467 socket_path: String,
468 ) -> (
469 Self,
470 mpsc::Receiver<(RpcCommand, tokio::sync::oneshot::Sender<serde_json::Value>)>,
471 ) {
472 let (command_tx, command_rx) = mpsc::channel(100);
473
474 let server = Self {
475 transport: RpcTransport::Unix(socket_path),
476 shutdown_tx: None,
477 command_tx,
478 network_manager: Arc::new(RwLock::new(NetworkManager::new())),
479 node_handle: None,
480 node_shutdown_tx: None,
481 auth_token: std::env::var("RPC_AUTH_TOKEN").ok(),
482 rate_limiter: Arc::new(Mutex::new(RateLimiter::new(60))),
483 auth_keys: Arc::new(RwLock::new(HashMap::new())),
484 start_time: SystemTime::now(),
485 };
486
487 (server, command_rx)
488 }
489
490 pub fn with_auth(
492 transport: RpcTransport,
493 auth_token: String,
494 ) -> (
495 Self,
496 mpsc::Receiver<(RpcCommand, tokio::sync::oneshot::Sender<serde_json::Value>)>,
497 ) {
498 let (command_tx, command_rx) = mpsc::channel(100);
499
500 let server = Self {
501 transport,
502 shutdown_tx: None,
503 command_tx,
504 network_manager: Arc::new(RwLock::new(NetworkManager::new())),
505 node_handle: None,
506 node_shutdown_tx: None,
507 auth_token: Some(auth_token),
508 rate_limiter: Arc::new(Mutex::new(RateLimiter::new(60))),
509 auth_keys: Arc::new(RwLock::new(HashMap::new())),
510 start_time: SystemTime::now(),
511 };
512
513 (server, command_rx)
514 }
515
516 pub async fn set_node_handle(&mut self, handle: NodeRunnerHandle) {
518 self.node_handle = Some(handle.clone());
519 let mut manager = self.network_manager.write().await;
520 manager.set_node_handle(handle);
521 }
522
523 pub fn set_shutdown_channel(&mut self, tx: oneshot::Sender<()>) {
525 self.node_shutdown_tx = Some(tx);
526 }
527
528 pub async fn add_auth_key(&self, client_id: String, public_key: MlDsaPublicKey) {
530 let mut keys = self.auth_keys.write().await;
531 keys.insert(client_id, public_key);
532 }
533
534 pub async fn start(&mut self) -> Result<(), ProtocolError> {
536 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
537 self.shutdown_tx = Some(shutdown_tx);
538
539 let command_tx = self.command_tx.clone();
540 let network_manager = Arc::clone(&self.network_manager);
541 let auth_token = self.auth_token.clone();
544 let auth_keys = Arc::clone(&self.auth_keys);
545 let rate_limiter = Arc::clone(&self.rate_limiter);
546 let transport = self.transport.clone();
547
548 tokio::spawn(async move {
549 match transport {
550 RpcTransport::Tcp(addr) => {
551 let listener = match TcpListener::bind(&addr).await {
552 Ok(l) => l,
553 Err(e) => {
554 error!("Failed to bind TCP listener: {}", e);
555 return;
556 }
557 };
558
559 info!(
560 "RPC server listening on TCP: {}",
561 listener.local_addr().unwrap()
562 );
563
564 loop {
565 tokio::select! {
566 Ok((stream, addr)) = listener.accept() => {
567 debug!("New RPC connection from {}", addr);
568 let command_tx = command_tx.clone();
569 let network_manager = Arc::clone(&network_manager);
570 let auth_token = auth_token.clone();
573 let auth_keys = Arc::clone(&auth_keys);
574 let rate_limiter = Arc::clone(&rate_limiter);
575 let client_ip = addr.ip().to_string();
576
577 tokio::spawn(async move {
578 {
580 let mut limiter = rate_limiter.lock().await;
581 if !limiter.check_rate_limit(&client_ip) {
582 warn!("Rate limit exceeded for client: {}", client_ip);
583 return;
584 }
585 }
586
587 if let Err(e) = handle_tcp_connection(
588 stream, command_tx, network_manager, auth_token, auth_keys
589 ).await {
590 error!("Error handling RPC connection: {}", e);
591 }
592 });
593 }
594 _ = &mut shutdown_rx => {
595 info!("RPC server shutting down");
596 break;
597 }
598 }
599 }
600 }
601 RpcTransport::Unix(path) => {
602 let _ = std::fs::remove_file(&path);
604
605 let listener = match UnixListener::bind(&path) {
606 Ok(l) => l,
607 Err(e) => {
608 error!("Failed to bind Unix listener: {}", e);
609 return;
610 }
611 };
612
613 info!("RPC server listening on Unix socket: {}", path);
614
615 loop {
616 tokio::select! {
617 Ok((stream, _)) = listener.accept() => {
618 debug!("New RPC connection on Unix socket");
619 let command_tx = command_tx.clone();
620 let network_manager = Arc::clone(&network_manager);
621 let auth_token = auth_token.clone();
624 let auth_keys = Arc::clone(&auth_keys);
625
626 tokio::spawn(async move {
627 if let Err(e) = handle_unix_connection(
628 stream, command_tx, network_manager, auth_token, auth_keys
629 ).await {
630 error!("Error handling RPC connection: {}", e);
631 }
632 });
633 }
634 _ = &mut shutdown_rx => {
635 info!("RPC server shutting down");
636 break;
637 }
638 }
639 }
640 }
641 }
642 });
643
644 Ok(())
645 }
646
647 pub async fn stop(&mut self) -> Result<(), ProtocolError> {
649 if let Some(tx) = self.shutdown_tx.take() {
650 let _ = tx.send(());
651 }
652 Ok(())
653 }
654}
655
656async fn handle_tcp_connection(
658 mut stream: TcpStream,
659 command_tx: mpsc::Sender<(RpcCommand, tokio::sync::oneshot::Sender<serde_json::Value>)>,
660 network_manager: Arc<RwLock<NetworkManager>>,
661 auth_token: Option<String>,
662 auth_keys: Arc<RwLock<HashMap<String, MlDsaPublicKey>>>,
663) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
664 let request_len = timeout(Duration::from_secs(30), ReadU32Ext::read_u32(&mut stream))
666 .await??
667 .min(10 * 1024 * 1024); let mut request_data = vec![0u8; request_len as usize];
670 timeout(
671 Duration::from_secs(30),
672 stream.read_exact(&mut request_data),
673 )
674 .await??;
675
676 let request: RpcRequest = serde_json::from_slice(&request_data)?;
677
678 let response =
679 handle_request(request, command_tx, network_manager, auth_token, auth_keys).await;
680
681 let response_data = serde_json::to_vec(&response)?;
682 stream
683 .write_all(&(response_data.len() as u32).to_be_bytes())
684 .await?;
685 stream.write_all(&response_data).await?;
686 stream.flush().await?;
687
688 Ok(())
689}
690
691async fn handle_unix_connection(
693 mut stream: UnixStream,
694 command_tx: mpsc::Sender<(RpcCommand, tokio::sync::oneshot::Sender<serde_json::Value>)>,
695 network_manager: Arc<RwLock<NetworkManager>>,
696 auth_token: Option<String>,
697 auth_keys: Arc<RwLock<HashMap<String, MlDsaPublicKey>>>,
698) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
699 let request_len = timeout(Duration::from_secs(30), ReadU32Ext::read_u32(&mut stream))
700 .await??
701 .min(10 * 1024 * 1024);
702
703 let mut request_data = vec![0u8; request_len as usize];
704 timeout(
705 Duration::from_secs(30),
706 stream.read_exact(&mut request_data),
707 )
708 .await??;
709
710 let request: RpcRequest = serde_json::from_slice(&request_data)?;
711
712 let response =
713 handle_request(request, command_tx, network_manager, auth_token, auth_keys).await;
714
715 let response_data = serde_json::to_vec(&response)?;
716 stream
717 .write_all(&(response_data.len() as u32).to_be_bytes())
718 .await?;
719 stream.write_all(&response_data).await?;
720 stream.flush().await?;
721
722 Ok(())
723}
724
725async fn authenticate_request(
727 request: &RpcRequest,
728 auth_token: &Option<String>,
729 auth_keys: &Arc<RwLock<HashMap<String, MlDsaPublicKey>>>,
730) -> bool {
731 if let Some(expected_token) = auth_token {
733 if let Some(provided_token) = request.params.get("auth_token").and_then(|v| v.as_str()) {
734 if provided_token == expected_token {
735 return true;
736 }
737 }
738 } else if auth_token.is_none() && auth_keys.read().await.is_empty() {
739 return true;
741 }
742
743 if let (Some(client_id), Some(signature)) = (
745 request.params.get("client_id").and_then(|v| v.as_str()),
746 request.params.get("signature").and_then(|v| v.as_str()),
747 ) {
748 let keys = auth_keys.read().await;
749 if let Some(public_key) = keys.get(client_id) {
750 let message = format!("{}:{}", request.method, request.id);
752 if let Ok(sig_bytes) = hex::decode(signature) {
753 if public_key.verify(message.as_bytes(), &sig_bytes).is_ok() {
754 return true;
755 }
756 }
757 }
758 }
759
760 false
761}
762
763async fn handle_request(
765 request: RpcRequest,
766 command_tx: mpsc::Sender<(RpcCommand, tokio::sync::oneshot::Sender<serde_json::Value>)>,
767 network_manager: Arc<RwLock<NetworkManager>>,
768 auth_token: Option<String>,
769 auth_keys: Arc<RwLock<HashMap<String, MlDsaPublicKey>>>,
770) -> RpcResponse {
771 if !authenticate_request(&request, &auth_token, &auth_keys).await {
773 return RpcResponse {
774 id: request.id,
775 result: None,
776 error: Some(RpcError {
777 code: -32001,
778 message: "Authentication required".to_string(),
779 data: None,
780 }),
781 };
782 }
783 match request.method.as_str() {
784 "list_peers" => {
785 let manager = network_manager.read().await;
786 let peers = manager.list_peers().await;
787 RpcResponse {
788 id: request.id,
789 result: Some(serde_json::to_value(peers).unwrap()),
790 error: None,
791 }
792 }
793 "add_peer" => {
794 let address = match request.params.get("address").and_then(|v| v.as_str()) {
795 Some(addr) => addr.to_string(),
796 None => {
797 return RpcResponse {
798 id: request.id,
799 result: None,
800 error: Some(RpcError {
801 code: -32602,
802 message: "Invalid params: address required".to_string(),
803 data: None,
804 }),
805 };
806 }
807 };
808
809 let mut manager = network_manager.write().await;
810 match manager.add_peer(address.clone()).await {
811 Ok(()) => RpcResponse {
812 id: request.id,
813 result: Some(
814 serde_json::json!({"status": "success", "message": format!("Peer {} added", address)}),
815 ),
816 error: None,
817 },
818 Err(e) => RpcResponse {
819 id: request.id,
820 result: None,
821 error: Some(RpcError {
822 code: -32003,
823 message: format!("Failed to add peer: {}", e),
824 data: None,
825 }),
826 },
827 }
828 }
829 "remove_peer" => {
830 let peer_id = match request.params.get("peer_id").and_then(|v| v.as_str()) {
831 Some(id) => id,
832 None => {
833 return RpcResponse {
834 id: request.id,
835 result: None,
836 error: Some(RpcError {
837 code: -32602,
838 message: "Invalid params: peer_id required".to_string(),
839 data: None,
840 }),
841 };
842 }
843 };
844
845 let mut manager = network_manager.write().await;
846 match manager.remove_peer(peer_id).await {
847 Ok(()) => RpcResponse {
848 id: request.id,
849 result: Some(
850 serde_json::json!({"status": "success", "message": format!("Peer {} removed", peer_id)}),
851 ),
852 error: None,
853 },
854 Err(e) => RpcResponse {
855 id: request.id,
856 result: None,
857 error: Some(RpcError {
858 code: -32003,
859 message: format!("Failed to remove peer: {}", e),
860 data: None,
861 }),
862 },
863 }
864 }
865 "get_peer_info" => {
866 let peer_id = match request.params.get("peer_id").and_then(|v| v.as_str()) {
867 Some(id) => id,
868 None => {
869 return RpcResponse {
870 id: request.id,
871 result: None,
872 error: Some(RpcError {
873 code: -32602,
874 message: "Invalid params: peer_id required".to_string(),
875 data: None,
876 }),
877 };
878 }
879 };
880
881 let manager = network_manager.read().await;
882 match manager.get_peer_info(peer_id).await {
883 Some(peer_info) => RpcResponse {
884 id: request.id,
885 result: Some(serde_json::to_value(peer_info).unwrap()),
886 error: None,
887 },
888 None => RpcResponse {
889 id: request.id,
890 result: None,
891 error: Some(RpcError {
892 code: -32004,
893 message: "Peer not found".to_string(),
894 data: None,
895 }),
896 },
897 }
898 }
899 "ban_peer" => {
900 let peer_id = match request.params.get("peer_id").and_then(|v| v.as_str()) {
901 Some(id) => id,
902 None => {
903 return RpcResponse {
904 id: request.id,
905 result: None,
906 error: Some(RpcError {
907 code: -32602,
908 message: "Invalid params: peer_id required".to_string(),
909 data: None,
910 }),
911 };
912 }
913 };
914
915 let mut manager = network_manager.write().await;
916 match manager.ban_peer(peer_id).await {
917 Ok(()) => RpcResponse {
918 id: request.id,
919 result: Some(
920 serde_json::json!({"status": "success", "message": format!("Peer {} banned", peer_id)}),
921 ),
922 error: None,
923 },
924 Err(e) => RpcResponse {
925 id: request.id,
926 result: None,
927 error: Some(RpcError {
928 code: -32003,
929 message: format!("Failed to ban peer: {}", e),
930 data: None,
931 }),
932 },
933 }
934 }
935 "unban_peer" => {
936 let address = match request.params.get("address").and_then(|v| v.as_str()) {
937 Some(addr) => addr,
938 None => {
939 return RpcResponse {
940 id: request.id,
941 result: None,
942 error: Some(RpcError {
943 code: -32602,
944 message: "Invalid params: address required".to_string(),
945 data: None,
946 }),
947 };
948 }
949 };
950
951 let mut manager = network_manager.write().await;
952 match manager.unban_peer(address) {
953 Ok(()) => RpcResponse {
954 id: request.id,
955 result: Some(
956 serde_json::json!({"status": "success", "message": format!("Peer {} unbanned", address)}),
957 ),
958 error: None,
959 },
960 Err(e) => RpcResponse {
961 id: request.id,
962 result: None,
963 error: Some(RpcError {
964 code: -32003,
965 message: format!("Failed to unban peer: {}", e),
966 data: None,
967 }),
968 },
969 }
970 }
971 "get_network_stats" => {
972 let mut manager = network_manager.write().await;
973 let stats = manager.get_network_stats().await;
974 RpcResponse {
975 id: request.id,
976 result: Some(serde_json::to_value(stats).unwrap()),
977 error: None,
978 }
979 }
980 "test_network" => {
981 let manager = network_manager.read().await;
982 let results = manager.test_network().await;
983 RpcResponse {
984 id: request.id,
985 result: Some(serde_json::to_value(results).unwrap()),
986 error: None,
987 }
988 }
989 "stop" => {
990 info!("Received stop request via RPC");
991
992 let (tx, rx) = tokio::sync::oneshot::channel();
994 if let Err(_) = command_tx.send((RpcCommand::Stop, tx)).await {
995 return RpcResponse {
996 id: request.id,
997 result: None,
998 error: Some(RpcError {
999 code: -1,
1000 message: "Failed to send stop command".to_string(),
1001 data: None,
1002 }),
1003 };
1004 }
1005
1006 match rx.await {
1007 Ok(result) => RpcResponse {
1008 id: request.id,
1009 result: Some(result),
1010 error: None,
1011 },
1012 Err(_) => RpcResponse {
1013 id: request.id,
1014 result: None,
1015 error: Some(RpcError {
1016 code: -1,
1017 message: "Command execution failed".to_string(),
1018 data: None,
1019 }),
1020 },
1021 }
1022 }
1023 "get_status" => {
1024 let mut manager = network_manager.write().await;
1026
1027 let real_status = if let Some(node) = &manager.node_handle {
1029 let node_guard = node.read().await;
1030 match node_guard.get_status().await {
1031 Ok(status) => Some(status),
1032 Err(e) => {
1033 warn!("Failed to get real node status: {}", e);
1034 None
1035 }
1036 }
1037 } else {
1038 None
1039 };
1040
1041 let result = if let Some(status) = real_status {
1043 status
1044 } else {
1045 let mut status = NodeStatus {
1047 node_id: "node_mock".to_string(),
1048 state: "Mock".to_string(),
1049 uptime: 0,
1050 peers: vec![],
1051 network_stats: NetworkStats {
1052 total_connections: 0,
1053 active_connections: 0,
1054 messages_sent: 0,
1055 messages_received: 0,
1056 bytes_sent: 0,
1057 bytes_received: 0,
1058 average_latency: 0.0,
1059 uptime: 0,
1060 },
1061 dag_stats: DagStats {
1062 vertex_count: 0,
1063 edge_count: 0,
1064 tip_count: 0,
1065 finalized_height: 0,
1066 pending_transactions: 0,
1067 },
1068 memory_usage: MemoryStats {
1069 total_allocated: 0,
1070 current_usage: 0,
1071 peak_usage: 0,
1072 },
1073 };
1074
1075 status.peers = manager.list_peers().await;
1077 status.network_stats = manager.get_network_stats().await;
1078 status.uptime = manager.start_time.elapsed().unwrap_or_default().as_secs();
1079
1080 #[cfg(target_os = "linux")]
1082 {
1083 if let Ok(contents) = std::fs::read_to_string("/proc/self/status") {
1084 for line in contents.lines() {
1085 if line.starts_with("VmRSS:") {
1086 if let Some(kb_str) = line.split_whitespace().nth(1) {
1087 if let Ok(kb) = kb_str.parse::<usize>() {
1088 status.memory_usage.current_usage = kb * 1024;
1089 }
1090 }
1091 } else if line.starts_with("VmPeak:") {
1092 if let Some(kb_str) = line.split_whitespace().nth(1) {
1093 if let Ok(kb) = kb_str.parse::<usize>() {
1094 status.memory_usage.peak_usage = kb * 1024;
1095 }
1096 }
1097 }
1098 }
1099 }
1100 }
1101
1102 serde_json::to_value(status).unwrap()
1103 };
1104
1105 RpcResponse {
1106 id: request.id,
1107 result: Some(result),
1108 error: None,
1109 }
1110 }
1111 _ => RpcResponse {
1112 id: request.id,
1113 result: None,
1114 error: Some(RpcError {
1115 code: -32601,
1116 message: format!("Method '{}' not found", request.method),
1117 data: None,
1118 }),
1119 },
1120 }
1121}
1122
1123#[cfg(test)]
1124mod tests {
1125 use super::*;
1126
1127 #[test]
1128 fn test_rpc_request_serialization() {
1129 let request = RpcRequest {
1130 id: Uuid::new_v4(),
1131 method: "stop".to_string(),
1132 params: serde_json::Value::Null,
1133 };
1134
1135 let serialized = serde_json::to_string(&request).unwrap();
1136 let deserialized: RpcRequest = serde_json::from_str(&serialized).unwrap();
1137
1138 assert_eq!(request.method, deserialized.method);
1139 }
1140
1141 #[tokio::test]
1142 async fn test_network_manager_peer_operations() {
1143 let mut manager = NetworkManager::new();
1144
1145 assert!(manager.add_peer("127.0.0.1:8001".to_string()).await.is_ok());
1147 assert_eq!(manager.list_peers().await.len(), 1);
1148
1149 assert!(manager.add_peer("127.0.0.1:8002".to_string()).await.is_ok());
1151 assert_eq!(manager.list_peers().await.len(), 2);
1152
1153 let peers = manager.list_peers().await;
1155 let peer_id = peers[0].id.clone();
1156 assert!(manager.get_peer_info(&peer_id).await.is_some());
1157
1158 assert!(manager.remove_peer(&peer_id).await.is_ok());
1160 assert_eq!(manager.list_peers().await.len(), 1);
1161
1162 assert!(manager.remove_peer("invalid_id").await.is_err());
1164 }
1165
1166 #[tokio::test]
1167 async fn test_network_manager_ban_operations() {
1168 let mut manager = NetworkManager::new();
1169
1170 manager
1172 .add_peer("127.0.0.1:8001".to_string())
1173 .await
1174 .unwrap();
1175 let peer_id = manager.list_peers().await[0].id.clone();
1176
1177 assert!(manager.ban_peer(&peer_id).await.is_ok());
1179 assert_eq!(manager.list_peers().await.len(), 0); assert!(manager
1183 .add_peer("127.0.0.1:8001".to_string())
1184 .await
1185 .is_err());
1186
1187 assert!(manager.unban_peer("127.0.0.1:8001").is_ok());
1189
1190 assert!(manager.add_peer("127.0.0.1:8001".to_string()).await.is_ok());
1192 }
1193
1194 #[test]
1195 fn test_rate_limiter() {
1196 let mut limiter = RateLimiter::new(2); assert!(limiter.check_rate_limit("127.0.0.1"));
1200 assert!(limiter.check_rate_limit("127.0.0.1"));
1201
1202 assert!(!limiter.check_rate_limit("127.0.0.1"));
1204
1205 assert!(limiter.check_rate_limit("127.0.0.2"));
1207 }
1208
1209 #[tokio::test]
1210 async fn test_authenticate_request() {
1211 let request_with_token = RpcRequest {
1212 id: Uuid::new_v4(),
1213 method: "test".to_string(),
1214 params: serde_json::json!({ "auth_token": "secret123" }),
1215 };
1216
1217 let request_without_token = RpcRequest {
1218 id: Uuid::new_v4(),
1219 method: "test".to_string(),
1220 params: serde_json::Value::Null,
1221 };
1222
1223 let auth_keys = Arc::new(RwLock::new(HashMap::new()));
1224
1225 let auth_token = Some("secret123".to_string());
1227 assert!(authenticate_request(&request_with_token, &auth_token, &auth_keys).await);
1228 assert!(!authenticate_request(&request_without_token, &auth_token, &auth_keys).await);
1229
1230 let no_auth = None;
1232 assert!(authenticate_request(&request_with_token, &no_auth, &auth_keys).await);
1233 assert!(authenticate_request(&request_without_token, &no_auth, &auth_keys).await);
1234 }
1235
1236 #[tokio::test]
1237 async fn test_rpc_server_creation() {
1238 let (server, _rx) = RpcServer::new_tcp(0); match server.transport {
1240 RpcTransport::Tcp(addr) => assert!(addr.contains(":0")),
1241 _ => panic!("Expected TCP transport"),
1242 }
1243 }
1244
1245 #[tokio::test]
1246 async fn test_rpc_server_with_auth() {
1247 let (server, _rx) = RpcServer::with_auth(
1248 RpcTransport::Tcp("127.0.0.1:0".to_string()),
1249 "secret123".to_string(),
1250 );
1251 assert_eq!(server.auth_token, Some("secret123".to_string()));
1252 }
1253
1254 #[tokio::test]
1255 async fn test_network_test_functionality() {
1256 let manager = NetworkManager::new();
1257 let results = manager.test_network().await;
1258 assert!(results.is_empty()); }
1260
1261 #[tokio::test]
1262 async fn test_network_stats() {
1263 let mut manager = NetworkManager::new();
1264 let stats = manager.get_network_stats().await;
1265
1266 assert_eq!(stats.total_connections, 0);
1267 assert_eq!(stats.active_connections, 0);
1268 assert_eq!(stats.messages_sent, 0);
1269 assert_eq!(stats.messages_received, 0);
1270
1271 manager
1273 .add_peer("127.0.0.1:8001".to_string())
1274 .await
1275 .unwrap();
1276 let updated_stats = manager.get_network_stats().await;
1277 assert_eq!(updated_stats.total_connections, 1);
1278 assert_eq!(updated_stats.active_connections, 1);
1279 }
1280
1281 #[test]
1282 fn test_peer_info_serialization() {
1283 let peer_info = PeerInfo {
1284 id: "test_peer".to_string(),
1285 address: "127.0.0.1:8001".to_string(),
1286 connected_duration: 300,
1287 messages_sent: 10,
1288 messages_received: 15,
1289 last_seen: 1234567890,
1290 status: "Connected".to_string(),
1291 latency: Some(25.5),
1292 };
1293
1294 let serialized = serde_json::to_string(&peer_info).unwrap();
1295 let deserialized: PeerInfo = serde_json::from_str(&serialized).unwrap();
1296
1297 assert_eq!(peer_info.id, deserialized.id);
1298 assert_eq!(peer_info.address, deserialized.address);
1299 assert_eq!(peer_info.status, deserialized.status);
1300 assert_eq!(peer_info.latency, deserialized.latency);
1301 }
1302
1303 #[test]
1304 fn test_network_stats_serialization() {
1305 let stats = NetworkStats {
1306 total_connections: 5,
1307 active_connections: 3,
1308 messages_sent: 100,
1309 messages_received: 95,
1310 bytes_sent: 1024,
1311 bytes_received: 2048,
1312 average_latency: 15.7,
1313 uptime: 3600,
1314 };
1315
1316 let serialized = serde_json::to_string(&stats).unwrap();
1317 let deserialized: NetworkStats = serde_json::from_str(&serialized).unwrap();
1318
1319 assert_eq!(stats.total_connections, deserialized.total_connections);
1320 assert_eq!(stats.active_connections, deserialized.active_connections);
1321 assert_eq!(stats.uptime, deserialized.uptime);
1322 }
1323
1324 #[test]
1325 fn test_rpc_error_codes() {
1326 let method_not_found = RpcError {
1328 code: -32601,
1329 message: "Method not found".to_string(),
1330 data: None,
1331 };
1332
1333 let invalid_params = RpcError {
1334 code: -32602,
1335 message: "Invalid params".to_string(),
1336 data: None,
1337 };
1338
1339 let auth_required = RpcError {
1340 code: -32001,
1341 message: "Authentication required".to_string(),
1342 data: None,
1343 };
1344
1345 assert_eq!(method_not_found.code, -32601);
1346 assert_eq!(invalid_params.code, -32602);
1347 assert_eq!(auth_required.code, -32001);
1348 }
1349}