1use crate::peer_manager::{PeerManager, PeerManagerConfig};
2use crate::rpc::{NodeStatus, RpcClient};
3use crate::CliError;
4use anyhow::Result;
5use serde::{Deserialize, Serialize};
6use serde_json;
7use std::path::PathBuf;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::Mutex;
11use tokio::time::timeout;
12use tracing::{info, warn};
13
14#[derive(Debug, Clone)]
16pub struct StatusArgs {
17 pub port: u16,
18 pub format: OutputFormat,
19 pub timeout_seconds: u64,
20 pub verbose: bool,
21}
22
23impl Default for StatusArgs {
24 fn default() -> Self {
25 Self {
26 port: 8000,
27 format: OutputFormat::Text,
28 timeout_seconds: 30,
29 verbose: false,
30 }
31 }
32}
33
34#[derive(Debug, Clone, PartialEq)]
36pub enum OutputFormat {
37 Text,
38 Json,
39 Table,
40}
41
42#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
44pub struct NodeStatusResponse {
45 pub node_id: String,
46 pub state: NodeState,
47 pub uptime_seconds: u64,
48 pub connected_peers: Vec<PeerStatusInfo>,
49 pub network_stats: NetworkStatistics,
50 pub dag_stats: DagStatistics,
51 pub memory_usage: MemoryUsage,
52}
53
54#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
56pub enum NodeState {
57 Running,
58 Stopped,
59 Syncing,
60 Error(String),
61}
62
63#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
65pub struct PeerStatusInfo {
66 pub peer_id: String,
67 pub address: String,
68 pub connected_duration_seconds: u64,
69 pub messages_sent: u64,
70 pub messages_received: u64,
71 pub last_seen_timestamp: u64,
72}
73
74#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
76pub struct NetworkStatistics {
77 pub total_connections: usize,
78 pub active_connections: usize,
79 pub messages_sent: u64,
80 pub messages_received: u64,
81 pub bytes_sent: u64,
82 pub bytes_received: u64,
83 pub average_latency_ms: f64,
84}
85
86#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
88pub struct DagStatistics {
89 pub vertex_count: usize,
90 pub edge_count: usize,
91 pub tip_count: usize,
92 pub finalized_height: u64,
93 pub pending_transactions: usize,
94}
95
96#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
98pub struct MemoryUsage {
99 pub total_allocated_bytes: usize,
100 pub current_usage_bytes: usize,
101 pub peak_usage_bytes: usize,
102}
103
104pub async fn execute_status_command(args: StatusArgs) -> Result<String> {
106 validate_status_args(&args)?;
108
109 let client = RpcClient::new_tcp("127.0.0.1".to_string(), args.port)
111 .with_timeout(Duration::from_secs(args.timeout_seconds));
112
113 let is_connected = check_node_connectivity(args.port).await?;
115 if !is_connected {
116 return Err(anyhow::anyhow!(
117 "Connection refused: No node running on port {}",
118 args.port
119 ));
120 }
121
122 let rpc_status = client
124 .get_status()
125 .await
126 .map_err(|e| anyhow::anyhow!("Failed to get node status: {}", e))?;
127
128 let status_response = convert_rpc_status_to_response(rpc_status);
130
131 let output = format_status_output(&status_response, &args.format, args.verbose)?;
133
134 Ok(output)
135}
136
137fn validate_status_args(args: &StatusArgs) -> Result<()> {
139 if args.port == 0 {
140 return Err(anyhow::anyhow!("Port cannot be 0"));
141 }
142
143 if args.timeout_seconds == 0 {
147 return Err(anyhow::anyhow!("Timeout cannot be 0"));
148 }
149
150 if args.timeout_seconds > 300 {
151 return Err(anyhow::anyhow!(
152 "Timeout cannot be greater than 300 seconds"
153 ));
154 }
155
156 Ok(())
157}
158
159pub async fn check_node_connectivity(port: u16) -> Result<bool> {
161 match timeout(
162 Duration::from_secs(5),
163 tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port)),
164 )
165 .await
166 {
167 Ok(Ok(_)) => Ok(true),
168 Ok(Err(_)) => Ok(false),
169 Err(_) => Ok(false), }
171}
172
173fn convert_rpc_status_to_response(rpc_status: NodeStatus) -> NodeStatusResponse {
175 let state = match rpc_status.state.as_str() {
176 "Running" => NodeState::Running,
177 "Stopped" => NodeState::Stopped,
178 "Syncing" => NodeState::Syncing,
179 error_state if error_state.starts_with("Error") => {
180 let error_msg = error_state
181 .strip_prefix("Error(")
182 .unwrap_or("Unknown error")
183 .strip_suffix(")")
184 .unwrap_or("Unknown error");
185 NodeState::Error(error_msg.to_string())
186 }
187 _ => NodeState::Error(format!("Unknown state: {}", rpc_status.state)),
188 };
189
190 let connected_peers = rpc_status
191 .peers
192 .into_iter()
193 .map(|peer| PeerStatusInfo {
194 peer_id: peer.id,
195 address: peer.address,
196 connected_duration_seconds: peer.connected_duration,
197 messages_sent: peer.messages_sent,
198 messages_received: peer.messages_received,
199 last_seen_timestamp: peer.last_seen,
200 })
201 .collect();
202
203 let network_stats = NetworkStatistics {
204 total_connections: rpc_status.network_stats.total_connections,
205 active_connections: rpc_status.network_stats.active_connections,
206 messages_sent: rpc_status.network_stats.messages_sent,
207 messages_received: rpc_status.network_stats.messages_received,
208 bytes_sent: rpc_status.network_stats.bytes_sent,
209 bytes_received: rpc_status.network_stats.bytes_received,
210 average_latency_ms: rpc_status.network_stats.average_latency,
211 };
212
213 let dag_stats = DagStatistics {
214 vertex_count: rpc_status.dag_stats.vertex_count,
215 edge_count: rpc_status.dag_stats.edge_count,
216 tip_count: rpc_status.dag_stats.tip_count,
217 finalized_height: rpc_status.dag_stats.finalized_height,
218 pending_transactions: rpc_status.dag_stats.pending_transactions,
219 };
220
221 let memory_usage = MemoryUsage {
222 total_allocated_bytes: rpc_status.memory_usage.total_allocated,
223 current_usage_bytes: rpc_status.memory_usage.current_usage,
224 peak_usage_bytes: rpc_status.memory_usage.peak_usage,
225 };
226
227 NodeStatusResponse {
228 node_id: rpc_status.node_id,
229 state,
230 uptime_seconds: rpc_status.uptime,
231 connected_peers,
232 network_stats,
233 dag_stats,
234 memory_usage,
235 }
236}
237
238fn format_status_output(
240 status: &NodeStatusResponse,
241 format: &OutputFormat,
242 verbose: bool,
243) -> Result<String> {
244 match format {
245 OutputFormat::Json => {
246 if verbose {
247 Ok(serde_json::to_string_pretty(status)?)
248 } else {
249 Ok(serde_json::to_string(status)?)
250 }
251 }
252 OutputFormat::Text => format_status_as_text(status, verbose),
253 OutputFormat::Table => format_status_as_table(status, verbose),
254 }
255}
256
257fn format_status_as_text(status: &NodeStatusResponse, verbose: bool) -> Result<String> {
259 let mut output = String::new();
260
261 output.push_str(&format!("Node Status: {}", status.node_id));
262 output.push('\n');
263 output.push_str(&format!("State: {:?}", status.state));
264 output.push('\n');
265 output.push_str(&format!("Uptime: {} seconds", status.uptime_seconds));
266 output.push('\n');
267 output.push_str(&format!(
268 "Connected Peers: {}",
269 status.connected_peers.len()
270 ));
271 output.push('\n');
272
273 if verbose {
274 output.push_str("\nNetwork Statistics:\n");
275 output.push_str(&format!(
276 " Total Connections: {}",
277 status.network_stats.total_connections
278 ));
279 output.push('\n');
280 output.push_str(&format!(
281 " Active Connections: {}",
282 status.network_stats.active_connections
283 ));
284 output.push('\n');
285 output.push_str(&format!(
286 " Messages Sent: {}",
287 status.network_stats.messages_sent
288 ));
289 output.push('\n');
290 output.push_str(&format!(
291 " Messages Received: {}",
292 status.network_stats.messages_received
293 ));
294 output.push('\n');
295 output.push_str(&format!(
296 " Bytes Sent: {}",
297 status.network_stats.bytes_sent
298 ));
299 output.push('\n');
300 output.push_str(&format!(
301 " Bytes Received: {}",
302 status.network_stats.bytes_received
303 ));
304 output.push('\n');
305 output.push_str(&format!(
306 " Average Latency: {:.2} ms",
307 status.network_stats.average_latency_ms
308 ));
309 output.push('\n');
310
311 output.push_str("\nDAG Statistics:\n");
312 output.push_str(&format!(
313 " Vertex Count: {}",
314 status.dag_stats.vertex_count
315 ));
316 output.push('\n');
317 output.push_str(&format!(" Edge Count: {}", status.dag_stats.edge_count));
318 output.push('\n');
319 output.push_str(&format!(" Tip Count: {}", status.dag_stats.tip_count));
320 output.push('\n');
321 output.push_str(&format!(
322 " Finalized Height: {}",
323 status.dag_stats.finalized_height
324 ));
325 output.push('\n');
326 output.push_str(&format!(
327 " Pending Transactions: {}",
328 status.dag_stats.pending_transactions
329 ));
330 output.push('\n');
331
332 output.push_str("\nMemory Usage:\n");
333 output.push_str(&format!(
334 " Total Allocated: {} bytes",
335 status.memory_usage.total_allocated_bytes
336 ));
337 output.push('\n');
338 output.push_str(&format!(
339 " Current Usage: {} bytes",
340 status.memory_usage.current_usage_bytes
341 ));
342 output.push('\n');
343 output.push_str(&format!(
344 " Peak Usage: {} bytes",
345 status.memory_usage.peak_usage_bytes
346 ));
347 output.push('\n');
348
349 if !status.connected_peers.is_empty() {
350 output.push_str("\nConnected Peers:\n");
351 for peer in &status.connected_peers {
352 output.push_str(&format!(
353 " {}: {} ({}s connected)",
354 peer.peer_id, peer.address, peer.connected_duration_seconds
355 ));
356 output.push('\n');
357 }
358 }
359 }
360
361 Ok(output)
362}
363
364fn format_status_as_table(status: &NodeStatusResponse, verbose: bool) -> Result<String> {
366 let mut output = String::new();
367
368 output.push_str(
369 "┌──────────────────────────────────────────────────────────────────────────────┐\n",
370 );
371 output.push_str(&format!("│ Node Status: {:<62} │\n", status.node_id));
372 output.push_str(
373 "├──────────────────────────────────────────────────────────────────────────────┤\n",
374 );
375 output.push_str(&format!(
376 "│ State: {:<68} │\n",
377 format!("{:?}", status.state)
378 ));
379 output.push_str(&format!(
380 "│ Uptime: {:<67} │\n",
381 format!("{} seconds", status.uptime_seconds)
382 ));
383 output.push_str(&format!(
384 "│ Connected Peers: {:<60} │\n",
385 status.connected_peers.len()
386 ));
387
388 if verbose {
389 output.push_str(
390 "├──────────────────────────────────────────────────────────────────────────────┤\n",
391 );
392 output.push_str(
393 "│ Network Statistics │\n",
394 );
395 output.push_str(
396 "├──────────────────────────────────────────────────────────────────────────────┤\n",
397 );
398 output.push_str(&format!(
399 "│ Total Connections: {:<57} │\n",
400 status.network_stats.total_connections
401 ));
402 output.push_str(&format!(
403 "│ Active Connections: {:<56} │\n",
404 status.network_stats.active_connections
405 ));
406 output.push_str(&format!(
407 "│ Messages Sent: {:<61} │\n",
408 status.network_stats.messages_sent
409 ));
410 output.push_str(&format!(
411 "│ Messages Received: {:<57} │\n",
412 status.network_stats.messages_received
413 ));
414 output.push_str(&format!(
415 "│ Bytes Sent: {:<64} │\n",
416 status.network_stats.bytes_sent
417 ));
418 output.push_str(&format!(
419 "│ Bytes Received: {:<60} │\n",
420 status.network_stats.bytes_received
421 ));
422 output.push_str(&format!(
423 "│ Average Latency: {:<59} │\n",
424 format!("{:.2} ms", status.network_stats.average_latency_ms)
425 ));
426
427 output.push_str(
428 "├──────────────────────────────────────────────────────────────────────────────┤\n",
429 );
430 output.push_str(
431 "│ DAG Statistics │\n",
432 );
433 output.push_str(
434 "├──────────────────────────────────────────────────────────────────────────────┤\n",
435 );
436 output.push_str(&format!(
437 "│ Vertex Count: {:<62} │\n",
438 status.dag_stats.vertex_count
439 ));
440 output.push_str(&format!(
441 "│ Edge Count: {:<64} │\n",
442 status.dag_stats.edge_count
443 ));
444 output.push_str(&format!(
445 "│ Tip Count: {:<65} │\n",
446 status.dag_stats.tip_count
447 ));
448 output.push_str(&format!(
449 "│ Finalized Height: {:<58} │\n",
450 status.dag_stats.finalized_height
451 ));
452 output.push_str(&format!(
453 "│ Pending Transactions: {:<54} │\n",
454 status.dag_stats.pending_transactions
455 ));
456
457 output.push_str(
458 "├──────────────────────────────────────────────────────────────────────────────┤\n",
459 );
460 output.push_str(
461 "│ Memory Usage │\n",
462 );
463 output.push_str(
464 "├──────────────────────────────────────────────────────────────────────────────┤\n",
465 );
466 output.push_str(&format!(
467 "│ Total Allocated: {:<59} │\n",
468 format!("{} bytes", status.memory_usage.total_allocated_bytes)
469 ));
470 output.push_str(&format!(
471 "│ Current Usage: {:<61} │\n",
472 format!("{} bytes", status.memory_usage.current_usage_bytes)
473 ));
474 output.push_str(&format!(
475 "│ Peak Usage: {:<64} │\n",
476 format!("{} bytes", status.memory_usage.peak_usage_bytes)
477 ));
478 }
479
480 output.push_str(
481 "└──────────────────────────────────────────────────────────────────────────────┘\n",
482 );
483
484 Ok(output)
485}
486
487pub struct CommandRouter {
489 peer_manager: Option<Arc<Mutex<PeerManager>>>,
491}
492
493impl Default for CommandRouter {
494 fn default() -> Self {
495 Self::new()
496 }
497}
498
499impl CommandRouter {
500 pub fn new() -> Self {
502 Self { peer_manager: None }
503 }
504
505 pub async fn with_peer_manager() -> Result<Self, CliError> {
507 let config = PeerManagerConfig::default();
508 let peer_manager = PeerManager::new(config)
509 .await
510 .map_err(|e| CliError::Config(format!("Failed to initialize peer manager: {}", e)))?;
511
512 Ok(Self {
513 peer_manager: Some(Arc::new(Mutex::new(peer_manager))),
514 })
515 }
516
517 async fn get_peer_manager(&self) -> Result<Arc<Mutex<PeerManager>>, CliError> {
519 if let Some(ref pm) = self.peer_manager {
520 Ok(Arc::clone(pm))
521 } else {
522 Err(CliError::Config("Peer manager not initialized".to_string()))
523 }
524 }
525
526 pub async fn handle_node_status(args: StatusArgs) -> Result<String, CliError> {
528 info!("Executing node status command with port {}", args.port);
529
530 match execute_status_command(args).await {
531 Ok(output) => Ok(output),
532 Err(e) => Err(CliError::Command(e.to_string())),
533 }
534 }
535
536 pub async fn handle_peer_list(&self, port: Option<u16>) -> Result<(), CliError> {
538 info!("Executing peer list command");
539
540 if let Ok(peer_manager) = self.get_peer_manager().await {
542 let manager = peer_manager.lock().await;
543 match manager.list_peers().await {
544 Ok(peers) => {
545 if peers.is_empty() {
546 println!("No peers in database");
547 } else {
548 println!("Known Peers ({}):", peers.len());
549 println!(
550 "{:<16} {:<30} {:<12} {:<10} {:<12} {:<20}",
551 "Peer ID", "Address", "Trust", "Status", "Latency", "Nickname"
552 );
553 println!("{}", "-".repeat(110));
554
555 let now = std::time::SystemTime::now()
556 .duration_since(std::time::UNIX_EPOCH)
557 .unwrap()
558 .as_secs();
559
560 for peer in peers {
561 let id_short = if peer.id.len() > 16 {
562 format!("{}...", &peer.id[..13])
563 } else {
564 peer.id.clone()
565 };
566
567 let status = if now - peer.last_seen < 300 {
568 "Active"
569 } else {
570 "Inactive"
571 };
572
573 let latency = peer
574 .avg_latency_ms
575 .map(|l| format!("{:.1}ms", l))
576 .unwrap_or_else(|| "N/A".to_string());
577
578 let nickname = peer.nickname.unwrap_or_else(|| "-".to_string());
579
580 println!(
581 "{:<16} {:<30} {:<12} {:<10} {:<12} {:<20}",
582 id_short, peer.address, peer.trust_level, status, latency, nickname
583 );
584 }
585 }
586 return Ok(());
587 }
588 Err(e) => {
589 warn!("Failed to list peers from manager: {}", e);
590 }
592 }
593 }
594
595 let port = port.unwrap_or(8000);
597 let client =
598 RpcClient::new_tcp("127.0.0.1".to_string(), port).with_timeout(Duration::from_secs(30));
599
600 match client.list_peers().await {
601 Ok(peers) => {
602 if peers.is_empty() {
603 println!("No peers connected");
604 } else {
605 println!("Connected Peers ({}):", peers.len());
606 println!(
607 "{:<20} {:<30} {:<15} {:<12} {:<12}",
608 "Peer ID", "Address", "Status", "Messages In", "Messages Out"
609 );
610 println!("{}", "-".repeat(95));
611
612 for peer in peers {
613 println!(
614 "{:<20} {:<30} {:<15} {:<12} {:<12}",
615 peer.id,
616 peer.address,
617 peer.status,
618 peer.messages_received,
619 peer.messages_sent
620 );
621 }
622 }
623 Ok(())
624 }
625 Err(e) => {
626 warn!("Failed to fetch peer list: {}", e);
627 Err(CliError::Command(format!(
628 "Failed to fetch peer list: {}",
629 e
630 )))
631 }
632 }
633 }
634
635 pub async fn handle_peer_add(
637 &self,
638 address: String,
639 port: Option<u16>,
640 nickname: Option<String>,
641 ) -> Result<(), CliError> {
642 info!("Executing peer add command for address: {}", address);
643
644 if !is_valid_peer_address(&address) {
646 return Err(CliError::Command(format!(
647 "Invalid peer address format: {}",
648 address
649 )));
650 }
651
652 if let Ok(peer_manager) = self.get_peer_manager().await {
654 println!("Connecting to peer: {}", address);
655
656 let manager = peer_manager.lock().await;
657 match manager.add_peer(address.clone(), nickname.clone()).await {
658 Ok(peer_id) => {
659 println!("✓ Successfully connected to peer");
660 println!(" Peer ID: {}", peer_id);
661 println!(" Address: {}", address);
662 if let Some(nick) = nickname {
663 println!(" Nickname: {}", nick);
664 }
665
666 if let Err(e) = manager.save_peers().await {
668 warn!("Failed to save peer data: {}", e);
669 }
670
671 return Ok(());
672 }
673 Err(e) => {
674 warn!("Failed to add peer via manager: {}", e);
675 }
677 }
678 }
679
680 let port = port.unwrap_or(8000);
682 let client =
683 RpcClient::new_tcp("127.0.0.1".to_string(), port).with_timeout(Duration::from_secs(30));
684
685 match client.add_peer(address.clone()).await {
686 Ok(message) => {
687 println!("✓ {}", message);
688 Ok(())
689 }
690 Err(e) => {
691 warn!("Failed to add peer {}: {}", address, e);
692 Err(CliError::Command(format!("Failed to add peer: {}", e)))
693 }
694 }
695 }
696
697 pub async fn handle_peer_remove(
699 &self,
700 peer_id: String,
701 port: Option<u16>,
702 force: bool,
703 ) -> Result<(), CliError> {
704 info!("Executing peer remove command for peer: {}", peer_id);
705
706 if !force {
708 print!("Are you sure you want to remove peer {}? [y/N] ", peer_id);
709 use std::io::{self, Write};
710 io::stdout().flush().unwrap();
711
712 let mut response = String::new();
713 io::stdin().read_line(&mut response).unwrap();
714
715 if !response.trim().eq_ignore_ascii_case("y") {
716 println!("Operation cancelled");
717 return Ok(());
718 }
719 }
720
721 if let Ok(peer_manager) = self.get_peer_manager().await {
723 let manager = peer_manager.lock().await;
724 match manager.remove_peer(peer_id.clone()).await {
725 Ok(()) => {
726 println!("✓ Successfully removed peer: {}", peer_id);
727
728 if let Err(e) = manager.save_peers().await {
730 warn!("Failed to save peer data: {}", e);
731 }
732
733 return Ok(());
734 }
735 Err(e) => {
736 warn!("Failed to remove peer via manager: {}", e);
737 }
739 }
740 }
741
742 let port = port.unwrap_or(8000);
744 let client =
745 RpcClient::new_tcp("127.0.0.1".to_string(), port).with_timeout(Duration::from_secs(30));
746
747 match client.remove_peer(peer_id.clone()).await {
748 Ok(message) => {
749 println!("✓ {}", message);
750 Ok(())
751 }
752 Err(e) => {
753 warn!("Failed to remove peer {}: {}", peer_id, e);
754 Err(CliError::Command(format!("Failed to remove peer: {}", e)))
755 }
756 }
757 }
758
759 pub async fn handle_network_stats(
761 &self,
762 port: Option<u16>,
763 verbose: bool,
764 ) -> Result<(), CliError> {
765 info!("Executing network stats command");
766
767 let port = port.unwrap_or(8000);
768 let client =
769 RpcClient::new_tcp("127.0.0.1".to_string(), port).with_timeout(Duration::from_secs(30));
770
771 match client.get_network_stats().await {
772 Ok(stats) => {
773 println!("Network Statistics:");
774 println!("==================");
775 println!("Total Connections: {}", stats.total_connections);
776 println!("Active Connections: {}", stats.active_connections);
777 println!("Messages Sent: {}", stats.messages_sent);
778 println!("Messages Received: {}", stats.messages_received);
779
780 if verbose {
781 println!("Bytes Sent: {}", format_bytes(stats.bytes_sent));
782 println!("Bytes Received: {}", format_bytes(stats.bytes_received));
783 println!("Average Latency: {:.2} ms", stats.average_latency);
784 println!("Uptime: {}", format_duration(stats.uptime));
785 }
786
787 Ok(())
788 }
789 Err(e) => {
790 warn!("Failed to fetch network stats: {}", e);
791 Err(CliError::Command(format!(
792 "Failed to fetch network stats: {}",
793 e
794 )))
795 }
796 }
797 }
798
799 pub async fn handle_network_test(&self, port: Option<u16>) -> Result<(), CliError> {
801 info!("Executing network test command");
802
803 let port = port.unwrap_or(8000);
804 let client =
805 RpcClient::new_tcp("127.0.0.1".to_string(), port).with_timeout(Duration::from_secs(60)); println!("Testing network connectivity...");
808
809 match client.test_network().await {
810 Ok(results) => {
811 println!("\nNetwork Test Results:");
812 println!("====================\n");
813
814 if results.is_empty() {
815 println!("No peers to test");
816 return Ok(());
817 }
818
819 for result in results {
820 let status = if result.reachable {
821 "✓ REACHABLE"
822 } else {
823 "✗ UNREACHABLE"
824 };
825 println!("Peer: {} ({})", result.peer_id, result.address);
826 println!("Status: {}", status);
827
828 if let Some(latency) = result.latency {
829 println!("Latency: {:.2} ms", latency);
830 }
831
832 if let Some(error) = result.error {
833 println!("Error: {}", error);
834 }
835
836 println!();
837 }
838
839 Ok(())
840 }
841 Err(e) => {
842 warn!("Failed to run network test: {}", e);
843 Err(CliError::Command(format!(
844 "Failed to run network test: {}",
845 e
846 )))
847 }
848 }
849 }
850
851 pub async fn handle_peer_info(
853 &self,
854 peer_id: String,
855 port: Option<u16>,
856 ) -> Result<(), CliError> {
857 info!("Executing peer info command for peer: {}", peer_id);
858
859 let port = port.unwrap_or(8000);
860 let client =
861 RpcClient::new_tcp("127.0.0.1".to_string(), port).with_timeout(Duration::from_secs(30));
862
863 match client.get_peer_info(peer_id.clone()).await {
864 Ok(peer) => {
865 println!("Peer Information:");
866 println!("================\n");
867 println!("Peer ID: {}", peer.id);
868 println!("Address: {}", peer.address);
869 println!("Status: {}", peer.status);
870 println!("Connected Duration: {} seconds", peer.connected_duration);
871 println!("Messages Sent: {}", peer.messages_sent);
872 println!("Messages Received: {}", peer.messages_received);
873 println!("Last Seen: {} (timestamp)", peer.last_seen);
874
875 if let Some(latency) = peer.latency {
876 println!("Latency: {:.2} ms", latency);
877 }
878
879 Ok(())
880 }
881 Err(e) => {
882 warn!("Failed to get peer info for {}: {}", peer_id, e);
883 Err(CliError::Command(format!("Failed to get peer info: {}", e)))
884 }
885 }
886 }
887
888 pub async fn handle_peer_ban(
890 &self,
891 peer_id: String,
892 port: Option<u16>,
893 ) -> Result<(), CliError> {
894 info!("Executing peer ban command for peer: {}", peer_id);
895
896 if let Ok(peer_manager) = self.get_peer_manager().await {
898 let manager = peer_manager.lock().await;
899 match manager.ban_peer(peer_id.clone()).await {
900 Ok(()) => {
901 println!("✓ Successfully banned peer: {}", peer_id);
902 println!(" The peer has been blacklisted and disconnected");
903
904 if let Err(e) = manager.save_peers().await {
906 warn!("Failed to save peer data: {}", e);
907 }
908
909 return Ok(());
910 }
911 Err(e) => {
912 warn!("Failed to ban peer via manager: {}", e);
913 }
915 }
916 }
917
918 let port = port.unwrap_or(8000);
920 let client =
921 RpcClient::new_tcp("127.0.0.1".to_string(), port).with_timeout(Duration::from_secs(30));
922
923 match client.ban_peer(peer_id.clone()).await {
924 Ok(message) => {
925 println!("✓ {}", message);
926 Ok(())
927 }
928 Err(e) => {
929 warn!("Failed to ban peer {}: {}", peer_id, e);
930 Err(CliError::Command(format!("Failed to ban peer: {}", e)))
931 }
932 }
933 }
934
935 pub async fn handle_peer_unban(
937 &self,
938 address: String,
939 port: Option<u16>,
940 ) -> Result<(), CliError> {
941 info!("Executing peer unban command for address: {}", address);
942
943 if let Ok(peer_manager) = self.get_peer_manager().await {
945 let manager = peer_manager.lock().await;
946 match manager.unban_peer(address.clone()).await {
947 Ok(()) => {
948 println!("✓ Successfully unbanned peer with address: {}", address);
949 println!(" The peer can now connect again");
950
951 if let Err(e) = manager.save_peers().await {
953 warn!("Failed to save peer data: {}", e);
954 }
955
956 return Ok(());
957 }
958 Err(e) => {
959 warn!("Failed to unban peer via manager: {}", e);
960 }
962 }
963 }
964
965 let port = port.unwrap_or(8000);
967 let client =
968 RpcClient::new_tcp("127.0.0.1".to_string(), port).with_timeout(Duration::from_secs(30));
969
970 match client.unban_peer(address.clone()).await {
971 Ok(message) => {
972 println!("✓ {}", message);
973 Ok(())
974 }
975 Err(e) => {
976 warn!("Failed to unban peer {}: {}", address, e);
977 Err(CliError::Command(format!("Failed to unban peer: {}", e)))
978 }
979 }
980 }
981
982 pub async fn handle_peer_import(&self, path: PathBuf, merge: bool) -> Result<(), CliError> {
984 info!("Executing peer import command from: {:?}", path);
985
986 if !path.exists() {
987 return Err(CliError::Command(format!("File not found: {:?}", path)));
988 }
989
990 let peer_manager = self.get_peer_manager().await?;
991 let manager = peer_manager.lock().await;
992
993 match manager.import_peers(path.clone(), merge).await {
994 Ok(count) => {
995 println!("✓ Successfully imported {} peers from {:?}", count, path);
996 if merge {
997 println!(" Peers were merged with existing database");
998 } else {
999 println!(" Existing peer database was replaced");
1000 }
1001 Ok(())
1002 }
1003 Err(e) => {
1004 warn!("Failed to import peers: {}", e);
1005 Err(CliError::Command(format!("Failed to import peers: {}", e)))
1006 }
1007 }
1008 }
1009
1010 pub async fn handle_peer_export(
1012 &self,
1013 path: PathBuf,
1014 tags: Option<Vec<String>>,
1015 ) -> Result<(), CliError> {
1016 info!("Executing peer export command to: {:?}", path);
1017
1018 let peer_manager = self.get_peer_manager().await?;
1019 let manager = peer_manager.lock().await;
1020
1021 match manager.export_peers(path.clone(), tags.clone()).await {
1022 Ok(count) => {
1023 println!("✓ Successfully exported {} peers to {:?}", count, path);
1024 if let Some(t) = tags {
1025 println!(" Filtered by tags: {}", t.join(", "));
1026 }
1027 Ok(())
1028 }
1029 Err(e) => {
1030 warn!("Failed to export peers: {}", e);
1031 Err(CliError::Command(format!("Failed to export peers: {}", e)))
1032 }
1033 }
1034 }
1035
1036 pub async fn handle_peer_test(&self) -> Result<(), CliError> {
1038 info!("Executing peer test command");
1039
1040 let peer_manager = self.get_peer_manager().await?;
1041 let manager = peer_manager.lock().await;
1042
1043 println!("Testing connectivity to all known peers...");
1044 println!();
1045
1046 let progress_callback = |current: usize, total: usize| {
1047 print!("\rTesting peer {}/{}...", current, total);
1048 use std::io::{self, Write};
1049 io::stdout().flush().unwrap();
1050 };
1051
1052 match manager.test_all_peers(progress_callback).await {
1053 Ok(results) => {
1054 println!("\r\nTest Results:");
1055 println!("=============\n");
1056
1057 let mut success_count = 0;
1058 let mut total_latency = 0.0;
1059 let mut latency_count = 0;
1060
1061 for (peer_id, success, latency) in &results {
1062 let status = if *success {
1063 "✓ SUCCESS"
1064 } else {
1065 "✗ FAILED"
1066 };
1067 print!(
1068 "{:<16} {}",
1069 if peer_id.len() > 16 {
1070 format!("{}...", &peer_id[..13])
1071 } else {
1072 peer_id.clone()
1073 },
1074 status
1075 );
1076
1077 if let Some(lat) = latency {
1078 print!(" ({:.1}ms)", lat);
1079 total_latency += lat;
1080 latency_count += 1;
1081 }
1082 println!();
1083
1084 if *success {
1085 success_count += 1;
1086 }
1087 }
1088
1089 println!("\nSummary:");
1090 println!("--------");
1091 println!("Total peers tested: {}", results.len());
1092 println!(
1093 "Successful connections: {} ({:.1}%)",
1094 success_count,
1095 (success_count as f64 / results.len() as f64) * 100.0
1096 );
1097
1098 if latency_count > 0 {
1099 println!(
1100 "Average latency: {:.1}ms",
1101 total_latency / latency_count as f64
1102 );
1103 }
1104
1105 Ok(())
1106 }
1107 Err(e) => {
1108 warn!("Failed to test peers: {}", e);
1109 Err(CliError::Command(format!("Failed to test peers: {}", e)))
1110 }
1111 }
1112 }
1113}
1114
1115pub async fn start_node(
1118 data_dir: Option<PathBuf>,
1119 port: Option<u16>,
1120 peers: Vec<String>,
1121) -> Result<(), CliError> {
1122 use crate::node_manager::{NodeManager, NodeManagerConfig};
1123
1124 info!("Starting QuDAG node...");
1125
1126 let config = NodeManagerConfig::default();
1128 let manager = NodeManager::new(config)
1129 .map_err(|e| CliError::Node(format!("Failed to create node manager: {}", e)))?;
1130
1131 manager
1133 .start_node(port, data_dir, peers, true) .await
1135 .map_err(|e| CliError::Node(format!("Failed to start node: {}", e)))?;
1136
1137 Ok(())
1138}
1139
1140pub async fn stop_node() -> Result<(), CliError> {
1141 use crate::node_manager::{NodeManager, NodeManagerConfig};
1142
1143 info!("Stopping QuDAG node...");
1144
1145 let config = NodeManagerConfig::default();
1147 let manager = NodeManager::new(config)
1148 .map_err(|e| CliError::Node(format!("Failed to create node manager: {}", e)))?;
1149
1150 manager
1152 .stop_node(false) .await
1154 .map_err(|e| CliError::Node(format!("Failed to stop node: {}", e)))?;
1155
1156 Ok(())
1157}
1158
1159pub async fn show_status() -> Result<(), CliError> {
1160 use crate::node_manager::{NodeManager, NodeManagerConfig};
1161
1162 info!("Fetching node status...");
1163
1164 let config = NodeManagerConfig::default();
1166 let manager = NodeManager::new(config)
1167 .map_err(|e| CliError::Node(format!("Failed to create node manager: {}", e)))?;
1168
1169 let local_status = manager
1170 .get_status()
1171 .await
1172 .map_err(|e| CliError::Node(format!("Failed to get local status: {}", e)))?;
1173
1174 if local_status.is_running {
1175 let args = StatusArgs::default();
1177 match CommandRouter::handle_node_status(args).await {
1178 Ok(output) => {
1179 println!("{}", output);
1180 Ok(())
1181 }
1182 Err(e) => {
1183 warn!("Failed to get detailed status via RPC: {}", e);
1185 println!("Node Status:");
1186 println!("============");
1187 println!("Status: Running (PID: {})", local_status.pid.unwrap_or(0));
1188 println!("Port: {}", local_status.port);
1189 println!("Data Directory: {:?}", local_status.data_dir);
1190 println!("Log File: {:?}", local_status.log_file);
1191 if let Some(uptime) = local_status.uptime_seconds {
1192 println!("Uptime: {} seconds", uptime);
1193 }
1194 println!("\nNote: RPC connection failed, showing local status only");
1195 Ok(())
1196 }
1197 }
1198 } else {
1199 println!("Node Status:");
1200 println!("============");
1201 println!("Status: Not running");
1202 println!("Port: {} (configured)", local_status.port);
1203 println!("Data Directory: {:?}", local_status.data_dir);
1204 println!("Log File: {:?}", local_status.log_file);
1205 println!("\nUse 'qudag start' to start the node");
1206 Ok(())
1207 }
1208}
1209
1210pub async fn list_peers() -> Result<(), CliError> {
1211 let router = CommandRouter::with_peer_manager().await?;
1212 router.handle_peer_list(None).await
1213}
1214
1215pub async fn add_peer(address: String) -> Result<(), CliError> {
1216 let router = CommandRouter::with_peer_manager().await?;
1217 router.handle_peer_add(address, None, None).await
1218}
1219
1220pub async fn remove_peer(peer_id: String) -> Result<(), CliError> {
1221 let router = CommandRouter::with_peer_manager().await?;
1222 router.handle_peer_remove(peer_id, None, false).await
1223}
1224
1225pub async fn visualize_dag(
1226 output: Option<PathBuf>,
1227 format: Option<String>,
1228) -> Result<(), CliError> {
1229 info!("Generating DAG visualization...");
1230
1231 let output = output.unwrap_or_else(|| PathBuf::from("dag_visualization.dot"));
1232 let format = format.unwrap_or_else(|| "dot".to_string());
1233
1234 use std::fs::File;
1236 use std::io::Write;
1237
1238 let dot_content = r#"digraph DAG {
1239 node [shape=box];
1240 "genesis" -> "block1";
1241 "genesis" -> "block2";
1242 "block1" -> "block3";
1243 "block2" -> "block3";
1244}
1245"#;
1246
1247 let mut file = File::create(&output)
1248 .map_err(|e| CliError::Visualization(format!("Failed to create output file: {}", e)))?;
1249
1250 file.write_all(dot_content.as_bytes())
1251 .map_err(|e| CliError::Visualization(format!("Failed to write visualization: {}", e)))?;
1252
1253 info!(
1254 "DAG visualization saved to {:?} in {} format",
1255 output, format
1256 );
1257 Ok(())
1258}
1259
1260pub async fn show_network_stats() -> Result<(), CliError> {
1261 let router = CommandRouter::new();
1262 router.handle_network_stats(None, false).await
1263}
1264
1265pub async fn test_network() -> Result<(), CliError> {
1266 let router = CommandRouter::new();
1267 router.handle_network_test(None).await
1268}
1269
1270fn is_valid_peer_address(address: &str) -> bool {
1272 if let Some((host, port_str)) = address.rsplit_once(':') {
1274 if host.is_empty() || port_str.is_empty() {
1275 return false;
1276 }
1277
1278 if let Ok(port) = port_str.parse::<u16>() {
1280 if port == 0 {
1281 return false;
1282 }
1283 } else {
1284 return false;
1285 }
1286
1287 if host.parse::<std::net::IpAddr>().is_ok() {
1289 return true; }
1291
1292 if host.len() <= 253 && !host.is_empty() {
1294 return host
1295 .chars()
1296 .all(|c| c.is_alphanumeric() || c == '.' || c == '-');
1297 }
1298 }
1299
1300 false
1301}
1302
1303fn format_bytes(bytes: u64) -> String {
1305 const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
1306 let mut size = bytes as f64;
1307 let mut unit_index = 0;
1308
1309 while size >= 1024.0 && unit_index < UNITS.len() - 1 {
1310 size /= 1024.0;
1311 unit_index += 1;
1312 }
1313
1314 if unit_index == 0 {
1315 format!("{} {}", bytes, UNITS[unit_index])
1316 } else {
1317 format!("{:.2} {}", size, UNITS[unit_index])
1318 }
1319}
1320
1321fn format_duration(seconds: u64) -> String {
1323 let days = seconds / 86400;
1324 let hours = (seconds % 86400) / 3600;
1325 let minutes = (seconds % 3600) / 60;
1326 let secs = seconds % 60;
1327
1328 if days > 0 {
1329 format!("{}d {}h {}m {}s", days, hours, minutes, secs)
1330 } else if hours > 0 {
1331 format!("{}h {}m {}s", hours, minutes, secs)
1332 } else if minutes > 0 {
1333 format!("{}m {}s", minutes, secs)
1334 } else {
1335 format!("{}s", secs)
1336 }
1337}