mielin_cli/commands/
monitor.rs

1//! Monitoring command implementations
2
3use crate::output::OutputFormat;
4use crate::progress::with_spinner;
5use anyhow::Result;
6use clap::{Args, Subcommand};
7use comfy_table::{presets::UTF8_FULL, Cell, Color, ContentArrangement, Table};
8use serde::{Deserialize, Serialize};
9use std::path::PathBuf;
10use std::time::Duration;
11
12/// Monitoring commands
13#[derive(Debug, Args)]
14pub struct MonitorCommand {
15    #[command(subcommand)]
16    command: MonitorSubcommand,
17}
18
19#[derive(Debug, Subcommand)]
20enum MonitorSubcommand {
21    /// Real-time resource monitoring (like top)
22    #[command(visible_aliases = &["resources", "htop"])]
23    Top {
24        /// Refresh interval in seconds
25        #[arg(short = 'i', long, default_value = "2")]
26        interval: u64,
27
28        /// Number of iterations (0 = infinite)
29        #[arg(short = 'n', long, default_value = "0")]
30        iterations: u64,
31
32        /// Sort by field (cpu, memory, name)
33        #[arg(short = 's', long, default_value = "cpu")]
34        sort_by: String,
35
36        /// Filter by agent name pattern
37        #[arg(short = 'f', long)]
38        filter: Option<String>,
39    },
40
41    /// Watch command output (refresh periodically)
42    #[command(visible_aliases = &["repeat", "poll"])]
43    Watch {
44        /// Command to watch (e.g., "node list")
45        command: Vec<String>,
46
47        /// Refresh interval in seconds
48        #[arg(short = 'i', long, default_value = "2")]
49        interval: u64,
50
51        /// Number of iterations (0 = infinite)
52        #[arg(short = 'n', long, default_value = "0")]
53        iterations: u64,
54
55        /// Highlight differences between iterations
56        #[arg(short = 'd', long)]
57        differences: bool,
58    },
59
60    /// Stream events from the system
61    #[command(visible_aliases = &["stream", "log", "tail"])]
62    Events {
63        /// Filter by event type
64        #[arg(short = 't', long)]
65        event_type: Option<String>,
66
67        /// Filter by agent ID
68        #[arg(short = 'a', long)]
69        agent: Option<String>,
70
71        /// Filter by node ID
72        #[arg(short = 'n', long)]
73        node: Option<String>,
74
75        /// Follow mode (stream continuously)
76        #[arg(short = 'f', long)]
77        follow: bool,
78
79        /// Show last N events
80        #[arg(short = 'l', long, default_value = "50")]
81        limit: usize,
82
83        /// Output to file
84        #[arg(short = 'o', long)]
85        output: Option<PathBuf>,
86    },
87
88    /// Show system metrics dashboard
89    #[command(visible_aliases = &["dash", "overview"])]
90    Dashboard {
91        /// Refresh interval in seconds
92        #[arg(short = 'i', long, default_value = "5")]
93        interval: u64,
94
95        /// Output format
96        #[arg(short = 'o', long, value_enum)]
97        output: Option<OutputFormat>,
98    },
99}
100
101impl MonitorCommand {
102    pub async fn execute(&self, output_format: OutputFormat) -> Result<()> {
103        match &self.command {
104            MonitorSubcommand::Top {
105                interval,
106                iterations,
107                sort_by,
108                filter,
109            } => {
110                top_command(
111                    *interval,
112                    *iterations,
113                    sort_by,
114                    filter.as_deref(),
115                    output_format,
116                )
117                .await
118            }
119            MonitorSubcommand::Watch {
120                command,
121                interval,
122                iterations,
123                differences,
124            } => watch_command(command, *interval, *iterations, *differences).await,
125            MonitorSubcommand::Events {
126                event_type,
127                agent,
128                node,
129                follow,
130                limit,
131                output,
132            } => {
133                events_command(
134                    event_type.as_deref(),
135                    agent.as_deref(),
136                    node.as_deref(),
137                    *follow,
138                    *limit,
139                    output.as_ref(),
140                    output_format,
141                )
142                .await
143            }
144            MonitorSubcommand::Dashboard { interval, output } => {
145                let format = output.unwrap_or(output_format);
146                dashboard_command(*interval, format).await
147            }
148        }
149    }
150}
151
152/// Resource monitoring data
153#[derive(Debug, Clone, Serialize, Deserialize)]
154struct ResourceSnapshot {
155    timestamp: chrono::DateTime<chrono::Utc>,
156    agents: Vec<AgentResource>,
157    system: SystemResource,
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
161struct AgentResource {
162    id: String,
163    name: String,
164    cpu_percent: f64,
165    memory_mb: f64,
166    network_kbps: f64,
167    status: String,
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
171struct SystemResource {
172    total_cpu_percent: f64,
173    total_memory_mb: f64,
174    total_memory_used_mb: f64,
175    total_agents: usize,
176    active_agents: usize,
177}
178
179async fn top_command(
180    interval: u64,
181    iterations: u64,
182    sort_by: &str,
183    filter: Option<&str>,
184    format: OutputFormat,
185) -> Result<()> {
186    with_spinner("Initializing resource monitor", async {
187        tokio::time::sleep(Duration::from_millis(500)).await;
188        Ok::<(), anyhow::Error>(())
189    })
190    .await?;
191
192    let mut iteration = 0u64;
193
194    loop {
195        let mut snapshot = fetch_resource_snapshot().await?;
196
197        // Filter agents if pattern provided
198        if let Some(pattern) = filter {
199            snapshot
200                .agents
201                .retain(|a| a.name.contains(pattern) || a.id.contains(pattern));
202        }
203
204        // Sort agents
205        match sort_by {
206            "cpu" => snapshot
207                .agents
208                .sort_by(|a, b| b.cpu_percent.partial_cmp(&a.cpu_percent).unwrap()),
209            "memory" => snapshot
210                .agents
211                .sort_by(|a, b| b.memory_mb.partial_cmp(&a.memory_mb).unwrap()),
212            "name" => snapshot.agents.sort_by(|a, b| a.name.cmp(&b.name)),
213            _ => {}
214        }
215
216        // Clear screen for top-like display (only in table mode)
217        if matches!(format, OutputFormat::Table) {
218            print!("\x1B[2J\x1B[1;1H"); // Clear screen and move cursor to top
219        }
220
221        // Display snapshot
222        display_resource_snapshot(&snapshot, format)?;
223
224        iteration += 1;
225        if iterations > 0 && iteration >= iterations {
226            break;
227        }
228
229        // Wait for next iteration
230        tokio::time::sleep(Duration::from_secs(interval)).await;
231    }
232
233    Ok(())
234}
235
236async fn fetch_resource_snapshot() -> Result<ResourceSnapshot> {
237    // Stub implementation - would fetch actual data from daemon
238    use rand::Rng;
239    let mut rng = rand::rng();
240
241    let agents = vec![
242        AgentResource {
243            id: "agent-001".to_string(),
244            name: "web-server".to_string(),
245            cpu_percent: rng.random_range(5.0..95.0),
246            memory_mb: rng.random_range(50.0..500.0),
247            network_kbps: rng.random_range(10.0..1000.0),
248            status: "Running".to_string(),
249        },
250        AgentResource {
251            id: "agent-002".to_string(),
252            name: "database".to_string(),
253            cpu_percent: rng.random_range(5.0..95.0),
254            memory_mb: rng.random_range(100.0..800.0),
255            network_kbps: rng.random_range(10.0..1000.0),
256            status: "Running".to_string(),
257        },
258        AgentResource {
259            id: "agent-003".to_string(),
260            name: "cache".to_string(),
261            cpu_percent: rng.random_range(5.0..95.0),
262            memory_mb: rng.random_range(20.0..200.0),
263            network_kbps: rng.random_range(10.0..1000.0),
264            status: "Running".to_string(),
265        },
266    ];
267
268    let total_cpu = agents.iter().map(|a| a.cpu_percent).sum::<f64>() / agents.len() as f64;
269    let total_memory_used = agents.iter().map(|a| a.memory_mb).sum::<f64>();
270
271    Ok(ResourceSnapshot {
272        timestamp: chrono::Utc::now(),
273        agents,
274        system: SystemResource {
275            total_cpu_percent: total_cpu,
276            total_memory_mb: 16384.0,
277            total_memory_used_mb: total_memory_used,
278            total_agents: 3,
279            active_agents: 3,
280        },
281    })
282}
283
284fn display_resource_snapshot(snapshot: &ResourceSnapshot, format: OutputFormat) -> Result<()> {
285    match format {
286        OutputFormat::Json => {
287            println!("{}", serde_json::to_string_pretty(&snapshot)?);
288        }
289        OutputFormat::Yaml => {
290            println!("{}", serde_yaml::to_string(&snapshot)?);
291        }
292        OutputFormat::Quiet => {
293            for agent in &snapshot.agents {
294                println!("{}", agent.id);
295            }
296        }
297        OutputFormat::Table => {
298            // System summary
299            println!(
300                "MielinOS Resource Monitor - {}",
301                snapshot.timestamp.format("%H:%M:%S")
302            );
303            println!(
304                "Agents: {} total, {} active | CPU: {:.1}% | Memory: {:.1}/{:.1} MB ({:.1}%)",
305                snapshot.system.total_agents,
306                snapshot.system.active_agents,
307                snapshot.system.total_cpu_percent,
308                snapshot.system.total_memory_used_mb,
309                snapshot.system.total_memory_mb,
310                (snapshot.system.total_memory_used_mb / snapshot.system.total_memory_mb) * 100.0
311            );
312            println!();
313
314            // Agent table
315            let mut table = Table::new();
316            table
317                .load_preset(UTF8_FULL)
318                .set_content_arrangement(ContentArrangement::Dynamic)
319                .set_header(vec![
320                    "ID",
321                    "Name",
322                    "Status",
323                    "CPU %",
324                    "Memory (MB)",
325                    "Network (KB/s)",
326                ]);
327
328            for agent in &snapshot.agents {
329                let cpu_cell = if agent.cpu_percent > 80.0 {
330                    Cell::new(format!("{:.1}", agent.cpu_percent)).fg(Color::Red)
331                } else if agent.cpu_percent > 50.0 {
332                    Cell::new(format!("{:.1}", agent.cpu_percent)).fg(Color::Yellow)
333                } else {
334                    Cell::new(format!("{:.1}", agent.cpu_percent)).fg(Color::Green)
335                };
336
337                let status_cell = if agent.status == "Running" {
338                    Cell::new(&agent.status).fg(Color::Green)
339                } else {
340                    Cell::new(&agent.status).fg(Color::Red)
341                };
342
343                table.add_row(vec![
344                    Cell::new(&agent.id),
345                    Cell::new(&agent.name),
346                    status_cell,
347                    cpu_cell,
348                    Cell::new(format!("{:.1}", agent.memory_mb)),
349                    Cell::new(format!("{:.1}", agent.network_kbps)),
350                ]);
351            }
352
353            println!("{}", table);
354        }
355    }
356
357    Ok(())
358}
359
360async fn watch_command(
361    command: &[String],
362    interval: u64,
363    iterations: u64,
364    differences: bool,
365) -> Result<()> {
366    println!("Watching command: {}", command.join(" "));
367    println!("Interval: {}s, Press Ctrl+C to stop", interval);
368    println!();
369
370    let mut iteration = 0u64;
371    let mut previous_output = String::new();
372
373    loop {
374        let timestamp = chrono::Utc::now();
375
376        // Clear screen for watch-like display
377        print!("\x1B[2J\x1B[1;1H");
378
379        println!(
380            "Every {}s: {}  {}",
381            interval,
382            command.join(" "),
383            timestamp.format("%H:%M:%S")
384        );
385        println!();
386
387        // Execute command (stub implementation)
388        let current_output = format!(
389            "Output from command: {}\nIteration: {}\nTimestamp: {}",
390            command.join(" "),
391            iteration + 1,
392            timestamp
393        );
394
395        if differences && !previous_output.is_empty() && previous_output != current_output {
396            println!("[Changes detected]");
397        }
398
399        println!("{}", current_output);
400        previous_output = current_output;
401
402        iteration += 1;
403        if iterations > 0 && iteration >= iterations {
404            break;
405        }
406
407        tokio::time::sleep(Duration::from_secs(interval)).await;
408    }
409
410    Ok(())
411}
412
413#[derive(Debug, Clone, Serialize, Deserialize)]
414struct Event {
415    timestamp: chrono::DateTime<chrono::Utc>,
416    event_type: String,
417    severity: String,
418    source: String,
419    message: String,
420    metadata: serde_json::Value,
421}
422
423async fn events_command(
424    event_type_filter: Option<&str>,
425    agent_filter: Option<&str>,
426    node_filter: Option<&str>,
427    follow: bool,
428    limit: usize,
429    output_file: Option<&PathBuf>,
430    format: OutputFormat,
431) -> Result<()> {
432    let events = fetch_events(event_type_filter, agent_filter, node_filter, limit).await?;
433
434    // Write to file if specified
435    if let Some(path) = output_file {
436        let content = serde_json::to_string_pretty(&events)?;
437        std::fs::write(path, content)?;
438        println!("Events written to: {}", path.display());
439        return Ok(());
440    }
441
442    // Display events
443    display_events(&events, format)?;
444
445    // Follow mode - stream events continuously
446    if follow {
447        println!("\n[Following events... Press Ctrl+C to stop]");
448        loop {
449            tokio::time::sleep(Duration::from_secs(2)).await;
450
451            // Fetch new events (stub - would stream from daemon)
452            let new_events = fetch_events(event_type_filter, agent_filter, node_filter, 5).await?;
453
454            for event in new_events {
455                display_event(&event, format)?;
456            }
457        }
458    }
459
460    Ok(())
461}
462
463async fn fetch_events(
464    event_type_filter: Option<&str>,
465    agent_filter: Option<&str>,
466    node_filter: Option<&str>,
467    limit: usize,
468) -> Result<Vec<Event>> {
469    // Stub implementation - would fetch actual events from daemon
470    use rand::Rng;
471    let mut rng = rand::rng();
472
473    let event_types = [
474        "agent.started",
475        "agent.stopped",
476        "migration.started",
477        "migration.completed",
478        "health.check",
479    ];
480    let sources = ["node-001", "node-002", "agent-001", "agent-002"];
481    let severities = ["info", "warning", "error"];
482
483    let mut events = Vec::new();
484
485    for i in 0..limit {
486        let event_type = event_types[rng.random_range(0..event_types.len())].to_string();
487        let source = sources[rng.random_range(0..sources.len())].to_string();
488        let severity = severities[rng.random_range(0..severities.len())].to_string();
489
490        // Apply filters
491        if let Some(et_filter) = event_type_filter {
492            if !event_type.contains(et_filter) {
493                continue;
494            }
495        }
496
497        if let Some(agent_f) = agent_filter {
498            if !source.contains(agent_f) {
499                continue;
500            }
501        }
502
503        if let Some(node_f) = node_filter {
504            if !source.contains(node_f) {
505                continue;
506            }
507        }
508
509        events.push(Event {
510            timestamp: chrono::Utc::now() - chrono::Duration::seconds((limit - i) as i64 * 10),
511            event_type,
512            severity,
513            source,
514            message: format!("Event #{} - Sample event message", i + 1),
515            metadata: serde_json::json!({"index": i, "random": rng.random::<u32>()}),
516        });
517    }
518
519    Ok(events)
520}
521
522fn display_events(events: &[Event], format: OutputFormat) -> Result<()> {
523    match format {
524        OutputFormat::Json => {
525            println!("{}", serde_json::to_string_pretty(&events)?);
526        }
527        OutputFormat::Yaml => {
528            println!("{}", serde_yaml::to_string(&events)?);
529        }
530        OutputFormat::Quiet => {
531            for event in events {
532                println!("{}", event.event_type);
533            }
534        }
535        OutputFormat::Table => {
536            let mut table = Table::new();
537            table
538                .load_preset(UTF8_FULL)
539                .set_content_arrangement(ContentArrangement::Dynamic)
540                .set_header(vec!["Timestamp", "Type", "Severity", "Source", "Message"]);
541
542            for event in events {
543                let severity_cell = match event.severity.as_str() {
544                    "error" => Cell::new(&event.severity).fg(Color::Red),
545                    "warning" => Cell::new(&event.severity).fg(Color::Yellow),
546                    _ => Cell::new(&event.severity).fg(Color::Green),
547                };
548
549                table.add_row(vec![
550                    Cell::new(event.timestamp.format("%Y-%m-%d %H:%M:%S")),
551                    Cell::new(&event.event_type),
552                    severity_cell,
553                    Cell::new(&event.source),
554                    Cell::new(&event.message),
555                ]);
556            }
557
558            println!("{}", table);
559        }
560    }
561
562    Ok(())
563}
564
565fn display_event(event: &Event, format: OutputFormat) -> Result<()> {
566    match format {
567        OutputFormat::Json => {
568            println!("{}", serde_json::to_string(event)?);
569        }
570        OutputFormat::Yaml => {
571            println!("{}", serde_yaml::to_string(event)?);
572        }
573        OutputFormat::Quiet => {
574            println!("{}", event.event_type);
575        }
576        OutputFormat::Table => {
577            println!(
578                "{} [{}] {} - {} - {}",
579                event.timestamp.format("%H:%M:%S"),
580                event.severity.to_uppercase(),
581                event.event_type,
582                event.source,
583                event.message
584            );
585        }
586    }
587
588    Ok(())
589}
590
591async fn dashboard_command(interval: u64, format: OutputFormat) -> Result<()> {
592    loop {
593        let snapshot = fetch_resource_snapshot().await?;
594        let events = fetch_events(None, None, None, 10).await?;
595
596        // Clear screen
597        if matches!(format, OutputFormat::Table) {
598            print!("\x1B[2J\x1B[1;1H");
599        }
600
601        println!("╔══════════════════════════════════════════════════════════════════╗");
602        println!(
603            "║           MielinOS System Dashboard - {}            ║",
604            snapshot.timestamp.format("%H:%M:%S")
605        );
606        println!("╚══════════════════════════════════════════════════════════════════╝");
607        println!();
608
609        // System overview
610        println!("┌─ System Overview ───────────────────────────────────────────────┐");
611        println!(
612            "│ Agents:  {} total, {} active",
613            snapshot.system.total_agents, snapshot.system.active_agents
614        );
615        println!("│ CPU:     {:.1}%", snapshot.system.total_cpu_percent);
616        println!(
617            "│ Memory:  {:.1} / {:.1} MB ({:.1}%)",
618            snapshot.system.total_memory_used_mb,
619            snapshot.system.total_memory_mb,
620            (snapshot.system.total_memory_used_mb / snapshot.system.total_memory_mb) * 100.0
621        );
622        println!("└─────────────────────────────────────────────────────────────────┘");
623        println!();
624
625        // Top agents
626        println!("┌─ Top Agents (by CPU) ───────────────────────────────────────────┐");
627        for (i, agent) in snapshot.agents.iter().take(5).enumerate() {
628            println!(
629                "│ {}. {} - CPU: {:.1}%, Mem: {:.1}MB",
630                i + 1,
631                agent.name,
632                agent.cpu_percent,
633                agent.memory_mb
634            );
635        }
636        println!("└─────────────────────────────────────────────────────────────────┘");
637        println!();
638
639        // Recent events
640        println!("┌─ Recent Events ─────────────────────────────────────────────────┐");
641        for event in events.iter().take(5) {
642            println!(
643                "│ {} [{}] {}",
644                event.timestamp.format("%H:%M:%S"),
645                event.severity,
646                event.message
647            );
648        }
649        println!("└─────────────────────────────────────────────────────────────────┘");
650
651        println!("\nRefreshing in {}s... (Press Ctrl+C to exit)", interval);
652
653        tokio::time::sleep(Duration::from_secs(interval)).await;
654    }
655}
656
657/// Handle monitor command
658pub async fn handle_monitor_command(command: MonitorCommand, format: OutputFormat) -> Result<()> {
659    command.execute(format).await
660}