use crate::output::OutputFormat;
use crate::progress::with_spinner;
use anyhow::Result;
use clap::{Args, Subcommand};
use comfy_table::{presets::UTF8_FULL, Cell, Color, ContentArrangement, Table};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::time::Duration;
#[derive(Debug, Args)]
pub struct MonitorCommand {
#[command(subcommand)]
command: MonitorSubcommand,
}
#[derive(Debug, Subcommand)]
enum MonitorSubcommand {
#[command(visible_aliases = &["resources", "htop"])]
Top {
#[arg(short = 'i', long, default_value = "2")]
interval: u64,
#[arg(short = 'n', long, default_value = "0")]
iterations: u64,
#[arg(short = 's', long, default_value = "cpu")]
sort_by: String,
#[arg(short = 'f', long)]
filter: Option<String>,
},
#[command(visible_aliases = &["repeat", "poll"])]
Watch {
command: Vec<String>,
#[arg(short = 'i', long, default_value = "2")]
interval: u64,
#[arg(short = 'n', long, default_value = "0")]
iterations: u64,
#[arg(short = 'd', long)]
differences: bool,
},
#[command(visible_aliases = &["stream", "log", "tail"])]
Events {
#[arg(short = 't', long)]
event_type: Option<String>,
#[arg(short = 'a', long)]
agent: Option<String>,
#[arg(short = 'n', long)]
node: Option<String>,
#[arg(short = 'f', long)]
follow: bool,
#[arg(short = 'l', long, default_value = "50")]
limit: usize,
#[arg(short = 'o', long)]
output: Option<PathBuf>,
},
#[command(visible_aliases = &["dash", "overview"])]
Dashboard {
#[arg(short = 'i', long, default_value = "5")]
interval: u64,
#[arg(short = 'o', long, value_enum)]
output: Option<OutputFormat>,
},
}
impl MonitorCommand {
pub async fn execute(&self, output_format: OutputFormat) -> Result<()> {
match &self.command {
MonitorSubcommand::Top {
interval,
iterations,
sort_by,
filter,
} => {
top_command(
*interval,
*iterations,
sort_by,
filter.as_deref(),
output_format,
)
.await
}
MonitorSubcommand::Watch {
command,
interval,
iterations,
differences,
} => watch_command(command, *interval, *iterations, *differences).await,
MonitorSubcommand::Events {
event_type,
agent,
node,
follow,
limit,
output,
} => {
events_command(
event_type.as_deref(),
agent.as_deref(),
node.as_deref(),
*follow,
*limit,
output.as_ref(),
output_format,
)
.await
}
MonitorSubcommand::Dashboard { interval, output } => {
let format = output.unwrap_or(output_format);
dashboard_command(*interval, format).await
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ResourceSnapshot {
timestamp: chrono::DateTime<chrono::Utc>,
agents: Vec<AgentResource>,
system: SystemResource,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct AgentResource {
id: String,
name: String,
cpu_percent: f64,
memory_mb: f64,
network_kbps: f64,
status: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct SystemResource {
total_cpu_percent: f64,
total_memory_mb: f64,
total_memory_used_mb: f64,
total_agents: usize,
active_agents: usize,
}
async fn top_command(
interval: u64,
iterations: u64,
sort_by: &str,
filter: Option<&str>,
format: OutputFormat,
) -> Result<()> {
with_spinner("Initializing resource monitor", async {
tokio::time::sleep(Duration::from_millis(500)).await;
Ok::<(), anyhow::Error>(())
})
.await?;
let mut iteration = 0u64;
loop {
let mut snapshot = fetch_resource_snapshot().await?;
if let Some(pattern) = filter {
snapshot
.agents
.retain(|a| a.name.contains(pattern) || a.id.contains(pattern));
}
match sort_by {
"cpu" => snapshot
.agents
.sort_by(|a, b| b.cpu_percent.partial_cmp(&a.cpu_percent).unwrap()),
"memory" => snapshot
.agents
.sort_by(|a, b| b.memory_mb.partial_cmp(&a.memory_mb).unwrap()),
"name" => snapshot.agents.sort_by(|a, b| a.name.cmp(&b.name)),
_ => {}
}
if matches!(format, OutputFormat::Table) {
print!("\x1B[2J\x1B[1;1H"); }
display_resource_snapshot(&snapshot, format)?;
iteration += 1;
if iterations > 0 && iteration >= iterations {
break;
}
tokio::time::sleep(Duration::from_secs(interval)).await;
}
Ok(())
}
async fn fetch_resource_snapshot() -> Result<ResourceSnapshot> {
use rand::Rng;
let mut rng = rand::rng();
let agents = vec![
AgentResource {
id: "agent-001".to_string(),
name: "web-server".to_string(),
cpu_percent: rng.random_range(5.0..95.0),
memory_mb: rng.random_range(50.0..500.0),
network_kbps: rng.random_range(10.0..1000.0),
status: "Running".to_string(),
},
AgentResource {
id: "agent-002".to_string(),
name: "database".to_string(),
cpu_percent: rng.random_range(5.0..95.0),
memory_mb: rng.random_range(100.0..800.0),
network_kbps: rng.random_range(10.0..1000.0),
status: "Running".to_string(),
},
AgentResource {
id: "agent-003".to_string(),
name: "cache".to_string(),
cpu_percent: rng.random_range(5.0..95.0),
memory_mb: rng.random_range(20.0..200.0),
network_kbps: rng.random_range(10.0..1000.0),
status: "Running".to_string(),
},
];
let total_cpu = agents.iter().map(|a| a.cpu_percent).sum::<f64>() / agents.len() as f64;
let total_memory_used = agents.iter().map(|a| a.memory_mb).sum::<f64>();
Ok(ResourceSnapshot {
timestamp: chrono::Utc::now(),
agents,
system: SystemResource {
total_cpu_percent: total_cpu,
total_memory_mb: 16384.0,
total_memory_used_mb: total_memory_used,
total_agents: 3,
active_agents: 3,
},
})
}
fn display_resource_snapshot(snapshot: &ResourceSnapshot, format: OutputFormat) -> Result<()> {
match format {
OutputFormat::Json => {
println!("{}", serde_json::to_string_pretty(&snapshot)?);
}
OutputFormat::Yaml => {
println!("{}", serde_yaml::to_string(&snapshot)?);
}
OutputFormat::Quiet => {
for agent in &snapshot.agents {
println!("{}", agent.id);
}
}
OutputFormat::Table => {
println!(
"MielinOS Resource Monitor - {}",
snapshot.timestamp.format("%H:%M:%S")
);
println!(
"Agents: {} total, {} active | CPU: {:.1}% | Memory: {:.1}/{:.1} MB ({:.1}%)",
snapshot.system.total_agents,
snapshot.system.active_agents,
snapshot.system.total_cpu_percent,
snapshot.system.total_memory_used_mb,
snapshot.system.total_memory_mb,
(snapshot.system.total_memory_used_mb / snapshot.system.total_memory_mb) * 100.0
);
println!();
let mut table = Table::new();
table
.load_preset(UTF8_FULL)
.set_content_arrangement(ContentArrangement::Dynamic)
.set_header(vec![
"ID",
"Name",
"Status",
"CPU %",
"Memory (MB)",
"Network (KB/s)",
]);
for agent in &snapshot.agents {
let cpu_cell = if agent.cpu_percent > 80.0 {
Cell::new(format!("{:.1}", agent.cpu_percent)).fg(Color::Red)
} else if agent.cpu_percent > 50.0 {
Cell::new(format!("{:.1}", agent.cpu_percent)).fg(Color::Yellow)
} else {
Cell::new(format!("{:.1}", agent.cpu_percent)).fg(Color::Green)
};
let status_cell = if agent.status == "Running" {
Cell::new(&agent.status).fg(Color::Green)
} else {
Cell::new(&agent.status).fg(Color::Red)
};
table.add_row(vec![
Cell::new(&agent.id),
Cell::new(&agent.name),
status_cell,
cpu_cell,
Cell::new(format!("{:.1}", agent.memory_mb)),
Cell::new(format!("{:.1}", agent.network_kbps)),
]);
}
println!("{}", table);
}
}
Ok(())
}
async fn watch_command(
command: &[String],
interval: u64,
iterations: u64,
differences: bool,
) -> Result<()> {
println!("Watching command: {}", command.join(" "));
println!("Interval: {}s, Press Ctrl+C to stop", interval);
println!();
let mut iteration = 0u64;
let mut previous_output = String::new();
loop {
let timestamp = chrono::Utc::now();
print!("\x1B[2J\x1B[1;1H");
println!(
"Every {}s: {} {}",
interval,
command.join(" "),
timestamp.format("%H:%M:%S")
);
println!();
let current_output = format!(
"Output from command: {}\nIteration: {}\nTimestamp: {}",
command.join(" "),
iteration + 1,
timestamp
);
if differences && !previous_output.is_empty() && previous_output != current_output {
println!("[Changes detected]");
}
println!("{}", current_output);
previous_output = current_output;
iteration += 1;
if iterations > 0 && iteration >= iterations {
break;
}
tokio::time::sleep(Duration::from_secs(interval)).await;
}
Ok(())
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Event {
timestamp: chrono::DateTime<chrono::Utc>,
event_type: String,
severity: String,
source: String,
message: String,
metadata: serde_json::Value,
}
async fn events_command(
event_type_filter: Option<&str>,
agent_filter: Option<&str>,
node_filter: Option<&str>,
follow: bool,
limit: usize,
output_file: Option<&PathBuf>,
format: OutputFormat,
) -> Result<()> {
let events = fetch_events(event_type_filter, agent_filter, node_filter, limit).await?;
if let Some(path) = output_file {
let content = serde_json::to_string_pretty(&events)?;
std::fs::write(path, content)?;
println!("Events written to: {}", path.display());
return Ok(());
}
display_events(&events, format)?;
if follow {
println!("\n[Following events... Press Ctrl+C to stop]");
loop {
tokio::time::sleep(Duration::from_secs(2)).await;
let new_events = fetch_events(event_type_filter, agent_filter, node_filter, 5).await?;
for event in new_events {
display_event(&event, format)?;
}
}
}
Ok(())
}
async fn fetch_events(
event_type_filter: Option<&str>,
agent_filter: Option<&str>,
node_filter: Option<&str>,
limit: usize,
) -> Result<Vec<Event>> {
use rand::Rng;
let mut rng = rand::rng();
let event_types = [
"agent.started",
"agent.stopped",
"migration.started",
"migration.completed",
"health.check",
];
let sources = ["node-001", "node-002", "agent-001", "agent-002"];
let severities = ["info", "warning", "error"];
let mut events = Vec::new();
for i in 0..limit {
let event_type = event_types[rng.random_range(0..event_types.len())].to_string();
let source = sources[rng.random_range(0..sources.len())].to_string();
let severity = severities[rng.random_range(0..severities.len())].to_string();
if let Some(et_filter) = event_type_filter {
if !event_type.contains(et_filter) {
continue;
}
}
if let Some(agent_f) = agent_filter {
if !source.contains(agent_f) {
continue;
}
}
if let Some(node_f) = node_filter {
if !source.contains(node_f) {
continue;
}
}
events.push(Event {
timestamp: chrono::Utc::now() - chrono::Duration::seconds((limit - i) as i64 * 10),
event_type,
severity,
source,
message: format!("Event #{} - Sample event message", i + 1),
metadata: serde_json::json!({"index": i, "random": rng.random::<u32>()}),
});
}
Ok(events)
}
fn display_events(events: &[Event], format: OutputFormat) -> Result<()> {
match format {
OutputFormat::Json => {
println!("{}", serde_json::to_string_pretty(&events)?);
}
OutputFormat::Yaml => {
println!("{}", serde_yaml::to_string(&events)?);
}
OutputFormat::Quiet => {
for event in events {
println!("{}", event.event_type);
}
}
OutputFormat::Table => {
let mut table = Table::new();
table
.load_preset(UTF8_FULL)
.set_content_arrangement(ContentArrangement::Dynamic)
.set_header(vec!["Timestamp", "Type", "Severity", "Source", "Message"]);
for event in events {
let severity_cell = match event.severity.as_str() {
"error" => Cell::new(&event.severity).fg(Color::Red),
"warning" => Cell::new(&event.severity).fg(Color::Yellow),
_ => Cell::new(&event.severity).fg(Color::Green),
};
table.add_row(vec![
Cell::new(event.timestamp.format("%Y-%m-%d %H:%M:%S")),
Cell::new(&event.event_type),
severity_cell,
Cell::new(&event.source),
Cell::new(&event.message),
]);
}
println!("{}", table);
}
}
Ok(())
}
fn display_event(event: &Event, format: OutputFormat) -> Result<()> {
match format {
OutputFormat::Json => {
println!("{}", serde_json::to_string(event)?);
}
OutputFormat::Yaml => {
println!("{}", serde_yaml::to_string(event)?);
}
OutputFormat::Quiet => {
println!("{}", event.event_type);
}
OutputFormat::Table => {
println!(
"{} [{}] {} - {} - {}",
event.timestamp.format("%H:%M:%S"),
event.severity.to_uppercase(),
event.event_type,
event.source,
event.message
);
}
}
Ok(())
}
async fn dashboard_command(interval: u64, format: OutputFormat) -> Result<()> {
loop {
let snapshot = fetch_resource_snapshot().await?;
let events = fetch_events(None, None, None, 10).await?;
if matches!(format, OutputFormat::Table) {
print!("\x1B[2J\x1B[1;1H");
}
println!("╔══════════════════════════════════════════════════════════════════╗");
println!(
"║ MielinOS System Dashboard - {} ║",
snapshot.timestamp.format("%H:%M:%S")
);
println!("╚══════════════════════════════════════════════════════════════════╝");
println!();
println!("┌─ System Overview ───────────────────────────────────────────────┐");
println!(
"│ Agents: {} total, {} active",
snapshot.system.total_agents, snapshot.system.active_agents
);
println!("│ CPU: {:.1}%", snapshot.system.total_cpu_percent);
println!(
"│ Memory: {:.1} / {:.1} MB ({:.1}%)",
snapshot.system.total_memory_used_mb,
snapshot.system.total_memory_mb,
(snapshot.system.total_memory_used_mb / snapshot.system.total_memory_mb) * 100.0
);
println!("└─────────────────────────────────────────────────────────────────┘");
println!();
println!("┌─ Top Agents (by CPU) ───────────────────────────────────────────┐");
for (i, agent) in snapshot.agents.iter().take(5).enumerate() {
println!(
"│ {}. {} - CPU: {:.1}%, Mem: {:.1}MB",
i + 1,
agent.name,
agent.cpu_percent,
agent.memory_mb
);
}
println!("└─────────────────────────────────────────────────────────────────┘");
println!();
println!("┌─ Recent Events ─────────────────────────────────────────────────┐");
for event in events.iter().take(5) {
println!(
"│ {} [{}] {}",
event.timestamp.format("%H:%M:%S"),
event.severity,
event.message
);
}
println!("└─────────────────────────────────────────────────────────────────┘");
println!("\nRefreshing in {}s... (Press Ctrl+C to exit)", interval);
tokio::time::sleep(Duration::from_secs(interval)).await;
}
}
pub async fn handle_monitor_command(command: MonitorCommand, format: OutputFormat) -> Result<()> {
command.execute(format).await
}