1use crate::output::OutputFormat;
4use crate::progress::with_spinner;
5use anyhow::Result;
6use clap::{Args, Subcommand};
7use comfy_table::{presets::UTF8_FULL, Cell, Color, ContentArrangement, Table};
8use serde::{Deserialize, Serialize};
9use std::path::PathBuf;
10use std::time::Duration;
11
12#[derive(Debug, Args)]
14pub struct MonitorCommand {
15 #[command(subcommand)]
16 command: MonitorSubcommand,
17}
18
19#[derive(Debug, Subcommand)]
20enum MonitorSubcommand {
21 #[command(visible_aliases = &["resources", "htop"])]
23 Top {
24 #[arg(short = 'i', long, default_value = "2")]
26 interval: u64,
27
28 #[arg(short = 'n', long, default_value = "0")]
30 iterations: u64,
31
32 #[arg(short = 's', long, default_value = "cpu")]
34 sort_by: String,
35
36 #[arg(short = 'f', long)]
38 filter: Option<String>,
39 },
40
41 #[command(visible_aliases = &["repeat", "poll"])]
43 Watch {
44 command: Vec<String>,
46
47 #[arg(short = 'i', long, default_value = "2")]
49 interval: u64,
50
51 #[arg(short = 'n', long, default_value = "0")]
53 iterations: u64,
54
55 #[arg(short = 'd', long)]
57 differences: bool,
58 },
59
60 #[command(visible_aliases = &["stream", "log", "tail"])]
62 Events {
63 #[arg(short = 't', long)]
65 event_type: Option<String>,
66
67 #[arg(short = 'a', long)]
69 agent: Option<String>,
70
71 #[arg(short = 'n', long)]
73 node: Option<String>,
74
75 #[arg(short = 'f', long)]
77 follow: bool,
78
79 #[arg(short = 'l', long, default_value = "50")]
81 limit: usize,
82
83 #[arg(short = 'o', long)]
85 output: Option<PathBuf>,
86 },
87
88 #[command(visible_aliases = &["dash", "overview"])]
90 Dashboard {
91 #[arg(short = 'i', long, default_value = "5")]
93 interval: u64,
94
95 #[arg(short = 'o', long, value_enum)]
97 output: Option<OutputFormat>,
98 },
99}
100
101impl MonitorCommand {
102 pub async fn execute(&self, output_format: OutputFormat) -> Result<()> {
103 match &self.command {
104 MonitorSubcommand::Top {
105 interval,
106 iterations,
107 sort_by,
108 filter,
109 } => {
110 top_command(
111 *interval,
112 *iterations,
113 sort_by,
114 filter.as_deref(),
115 output_format,
116 )
117 .await
118 }
119 MonitorSubcommand::Watch {
120 command,
121 interval,
122 iterations,
123 differences,
124 } => watch_command(command, *interval, *iterations, *differences).await,
125 MonitorSubcommand::Events {
126 event_type,
127 agent,
128 node,
129 follow,
130 limit,
131 output,
132 } => {
133 events_command(
134 event_type.as_deref(),
135 agent.as_deref(),
136 node.as_deref(),
137 *follow,
138 *limit,
139 output.as_ref(),
140 output_format,
141 )
142 .await
143 }
144 MonitorSubcommand::Dashboard { interval, output } => {
145 let format = output.unwrap_or(output_format);
146 dashboard_command(*interval, format).await
147 }
148 }
149 }
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize)]
154struct ResourceSnapshot {
155 timestamp: chrono::DateTime<chrono::Utc>,
156 agents: Vec<AgentResource>,
157 system: SystemResource,
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
161struct AgentResource {
162 id: String,
163 name: String,
164 cpu_percent: f64,
165 memory_mb: f64,
166 network_kbps: f64,
167 status: String,
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
171struct SystemResource {
172 total_cpu_percent: f64,
173 total_memory_mb: f64,
174 total_memory_used_mb: f64,
175 total_agents: usize,
176 active_agents: usize,
177}
178
179async fn top_command(
180 interval: u64,
181 iterations: u64,
182 sort_by: &str,
183 filter: Option<&str>,
184 format: OutputFormat,
185) -> Result<()> {
186 with_spinner("Initializing resource monitor", async {
187 tokio::time::sleep(Duration::from_millis(500)).await;
188 Ok::<(), anyhow::Error>(())
189 })
190 .await?;
191
192 let mut iteration = 0u64;
193
194 loop {
195 let mut snapshot = fetch_resource_snapshot().await?;
196
197 if let Some(pattern) = filter {
199 snapshot
200 .agents
201 .retain(|a| a.name.contains(pattern) || a.id.contains(pattern));
202 }
203
204 match sort_by {
206 "cpu" => snapshot
207 .agents
208 .sort_by(|a, b| b.cpu_percent.partial_cmp(&a.cpu_percent).unwrap()),
209 "memory" => snapshot
210 .agents
211 .sort_by(|a, b| b.memory_mb.partial_cmp(&a.memory_mb).unwrap()),
212 "name" => snapshot.agents.sort_by(|a, b| a.name.cmp(&b.name)),
213 _ => {}
214 }
215
216 if matches!(format, OutputFormat::Table) {
218 print!("\x1B[2J\x1B[1;1H"); }
220
221 display_resource_snapshot(&snapshot, format)?;
223
224 iteration += 1;
225 if iterations > 0 && iteration >= iterations {
226 break;
227 }
228
229 tokio::time::sleep(Duration::from_secs(interval)).await;
231 }
232
233 Ok(())
234}
235
236async fn fetch_resource_snapshot() -> Result<ResourceSnapshot> {
237 use rand::Rng;
239 let mut rng = rand::rng();
240
241 let agents = vec![
242 AgentResource {
243 id: "agent-001".to_string(),
244 name: "web-server".to_string(),
245 cpu_percent: rng.random_range(5.0..95.0),
246 memory_mb: rng.random_range(50.0..500.0),
247 network_kbps: rng.random_range(10.0..1000.0),
248 status: "Running".to_string(),
249 },
250 AgentResource {
251 id: "agent-002".to_string(),
252 name: "database".to_string(),
253 cpu_percent: rng.random_range(5.0..95.0),
254 memory_mb: rng.random_range(100.0..800.0),
255 network_kbps: rng.random_range(10.0..1000.0),
256 status: "Running".to_string(),
257 },
258 AgentResource {
259 id: "agent-003".to_string(),
260 name: "cache".to_string(),
261 cpu_percent: rng.random_range(5.0..95.0),
262 memory_mb: rng.random_range(20.0..200.0),
263 network_kbps: rng.random_range(10.0..1000.0),
264 status: "Running".to_string(),
265 },
266 ];
267
268 let total_cpu = agents.iter().map(|a| a.cpu_percent).sum::<f64>() / agents.len() as f64;
269 let total_memory_used = agents.iter().map(|a| a.memory_mb).sum::<f64>();
270
271 Ok(ResourceSnapshot {
272 timestamp: chrono::Utc::now(),
273 agents,
274 system: SystemResource {
275 total_cpu_percent: total_cpu,
276 total_memory_mb: 16384.0,
277 total_memory_used_mb: total_memory_used,
278 total_agents: 3,
279 active_agents: 3,
280 },
281 })
282}
283
284fn display_resource_snapshot(snapshot: &ResourceSnapshot, format: OutputFormat) -> Result<()> {
285 match format {
286 OutputFormat::Json => {
287 println!("{}", serde_json::to_string_pretty(&snapshot)?);
288 }
289 OutputFormat::Yaml => {
290 println!("{}", serde_yaml::to_string(&snapshot)?);
291 }
292 OutputFormat::Quiet => {
293 for agent in &snapshot.agents {
294 println!("{}", agent.id);
295 }
296 }
297 OutputFormat::Table => {
298 println!(
300 "MielinOS Resource Monitor - {}",
301 snapshot.timestamp.format("%H:%M:%S")
302 );
303 println!(
304 "Agents: {} total, {} active | CPU: {:.1}% | Memory: {:.1}/{:.1} MB ({:.1}%)",
305 snapshot.system.total_agents,
306 snapshot.system.active_agents,
307 snapshot.system.total_cpu_percent,
308 snapshot.system.total_memory_used_mb,
309 snapshot.system.total_memory_mb,
310 (snapshot.system.total_memory_used_mb / snapshot.system.total_memory_mb) * 100.0
311 );
312 println!();
313
314 let mut table = Table::new();
316 table
317 .load_preset(UTF8_FULL)
318 .set_content_arrangement(ContentArrangement::Dynamic)
319 .set_header(vec![
320 "ID",
321 "Name",
322 "Status",
323 "CPU %",
324 "Memory (MB)",
325 "Network (KB/s)",
326 ]);
327
328 for agent in &snapshot.agents {
329 let cpu_cell = if agent.cpu_percent > 80.0 {
330 Cell::new(format!("{:.1}", agent.cpu_percent)).fg(Color::Red)
331 } else if agent.cpu_percent > 50.0 {
332 Cell::new(format!("{:.1}", agent.cpu_percent)).fg(Color::Yellow)
333 } else {
334 Cell::new(format!("{:.1}", agent.cpu_percent)).fg(Color::Green)
335 };
336
337 let status_cell = if agent.status == "Running" {
338 Cell::new(&agent.status).fg(Color::Green)
339 } else {
340 Cell::new(&agent.status).fg(Color::Red)
341 };
342
343 table.add_row(vec![
344 Cell::new(&agent.id),
345 Cell::new(&agent.name),
346 status_cell,
347 cpu_cell,
348 Cell::new(format!("{:.1}", agent.memory_mb)),
349 Cell::new(format!("{:.1}", agent.network_kbps)),
350 ]);
351 }
352
353 println!("{}", table);
354 }
355 }
356
357 Ok(())
358}
359
360async fn watch_command(
361 command: &[String],
362 interval: u64,
363 iterations: u64,
364 differences: bool,
365) -> Result<()> {
366 println!("Watching command: {}", command.join(" "));
367 println!("Interval: {}s, Press Ctrl+C to stop", interval);
368 println!();
369
370 let mut iteration = 0u64;
371 let mut previous_output = String::new();
372
373 loop {
374 let timestamp = chrono::Utc::now();
375
376 print!("\x1B[2J\x1B[1;1H");
378
379 println!(
380 "Every {}s: {} {}",
381 interval,
382 command.join(" "),
383 timestamp.format("%H:%M:%S")
384 );
385 println!();
386
387 let current_output = format!(
389 "Output from command: {}\nIteration: {}\nTimestamp: {}",
390 command.join(" "),
391 iteration + 1,
392 timestamp
393 );
394
395 if differences && !previous_output.is_empty() && previous_output != current_output {
396 println!("[Changes detected]");
397 }
398
399 println!("{}", current_output);
400 previous_output = current_output;
401
402 iteration += 1;
403 if iterations > 0 && iteration >= iterations {
404 break;
405 }
406
407 tokio::time::sleep(Duration::from_secs(interval)).await;
408 }
409
410 Ok(())
411}
412
413#[derive(Debug, Clone, Serialize, Deserialize)]
414struct Event {
415 timestamp: chrono::DateTime<chrono::Utc>,
416 event_type: String,
417 severity: String,
418 source: String,
419 message: String,
420 metadata: serde_json::Value,
421}
422
423async fn events_command(
424 event_type_filter: Option<&str>,
425 agent_filter: Option<&str>,
426 node_filter: Option<&str>,
427 follow: bool,
428 limit: usize,
429 output_file: Option<&PathBuf>,
430 format: OutputFormat,
431) -> Result<()> {
432 let events = fetch_events(event_type_filter, agent_filter, node_filter, limit).await?;
433
434 if let Some(path) = output_file {
436 let content = serde_json::to_string_pretty(&events)?;
437 std::fs::write(path, content)?;
438 println!("Events written to: {}", path.display());
439 return Ok(());
440 }
441
442 display_events(&events, format)?;
444
445 if follow {
447 println!("\n[Following events... Press Ctrl+C to stop]");
448 loop {
449 tokio::time::sleep(Duration::from_secs(2)).await;
450
451 let new_events = fetch_events(event_type_filter, agent_filter, node_filter, 5).await?;
453
454 for event in new_events {
455 display_event(&event, format)?;
456 }
457 }
458 }
459
460 Ok(())
461}
462
463async fn fetch_events(
464 event_type_filter: Option<&str>,
465 agent_filter: Option<&str>,
466 node_filter: Option<&str>,
467 limit: usize,
468) -> Result<Vec<Event>> {
469 use rand::Rng;
471 let mut rng = rand::rng();
472
473 let event_types = [
474 "agent.started",
475 "agent.stopped",
476 "migration.started",
477 "migration.completed",
478 "health.check",
479 ];
480 let sources = ["node-001", "node-002", "agent-001", "agent-002"];
481 let severities = ["info", "warning", "error"];
482
483 let mut events = Vec::new();
484
485 for i in 0..limit {
486 let event_type = event_types[rng.random_range(0..event_types.len())].to_string();
487 let source = sources[rng.random_range(0..sources.len())].to_string();
488 let severity = severities[rng.random_range(0..severities.len())].to_string();
489
490 if let Some(et_filter) = event_type_filter {
492 if !event_type.contains(et_filter) {
493 continue;
494 }
495 }
496
497 if let Some(agent_f) = agent_filter {
498 if !source.contains(agent_f) {
499 continue;
500 }
501 }
502
503 if let Some(node_f) = node_filter {
504 if !source.contains(node_f) {
505 continue;
506 }
507 }
508
509 events.push(Event {
510 timestamp: chrono::Utc::now() - chrono::Duration::seconds((limit - i) as i64 * 10),
511 event_type,
512 severity,
513 source,
514 message: format!("Event #{} - Sample event message", i + 1),
515 metadata: serde_json::json!({"index": i, "random": rng.random::<u32>()}),
516 });
517 }
518
519 Ok(events)
520}
521
522fn display_events(events: &[Event], format: OutputFormat) -> Result<()> {
523 match format {
524 OutputFormat::Json => {
525 println!("{}", serde_json::to_string_pretty(&events)?);
526 }
527 OutputFormat::Yaml => {
528 println!("{}", serde_yaml::to_string(&events)?);
529 }
530 OutputFormat::Quiet => {
531 for event in events {
532 println!("{}", event.event_type);
533 }
534 }
535 OutputFormat::Table => {
536 let mut table = Table::new();
537 table
538 .load_preset(UTF8_FULL)
539 .set_content_arrangement(ContentArrangement::Dynamic)
540 .set_header(vec!["Timestamp", "Type", "Severity", "Source", "Message"]);
541
542 for event in events {
543 let severity_cell = match event.severity.as_str() {
544 "error" => Cell::new(&event.severity).fg(Color::Red),
545 "warning" => Cell::new(&event.severity).fg(Color::Yellow),
546 _ => Cell::new(&event.severity).fg(Color::Green),
547 };
548
549 table.add_row(vec![
550 Cell::new(event.timestamp.format("%Y-%m-%d %H:%M:%S")),
551 Cell::new(&event.event_type),
552 severity_cell,
553 Cell::new(&event.source),
554 Cell::new(&event.message),
555 ]);
556 }
557
558 println!("{}", table);
559 }
560 }
561
562 Ok(())
563}
564
565fn display_event(event: &Event, format: OutputFormat) -> Result<()> {
566 match format {
567 OutputFormat::Json => {
568 println!("{}", serde_json::to_string(event)?);
569 }
570 OutputFormat::Yaml => {
571 println!("{}", serde_yaml::to_string(event)?);
572 }
573 OutputFormat::Quiet => {
574 println!("{}", event.event_type);
575 }
576 OutputFormat::Table => {
577 println!(
578 "{} [{}] {} - {} - {}",
579 event.timestamp.format("%H:%M:%S"),
580 event.severity.to_uppercase(),
581 event.event_type,
582 event.source,
583 event.message
584 );
585 }
586 }
587
588 Ok(())
589}
590
591async fn dashboard_command(interval: u64, format: OutputFormat) -> Result<()> {
592 loop {
593 let snapshot = fetch_resource_snapshot().await?;
594 let events = fetch_events(None, None, None, 10).await?;
595
596 if matches!(format, OutputFormat::Table) {
598 print!("\x1B[2J\x1B[1;1H");
599 }
600
601 println!("╔══════════════════════════════════════════════════════════════════╗");
602 println!(
603 "║ MielinOS System Dashboard - {} ║",
604 snapshot.timestamp.format("%H:%M:%S")
605 );
606 println!("╚══════════════════════════════════════════════════════════════════╝");
607 println!();
608
609 println!("┌─ System Overview ───────────────────────────────────────────────┐");
611 println!(
612 "│ Agents: {} total, {} active",
613 snapshot.system.total_agents, snapshot.system.active_agents
614 );
615 println!("│ CPU: {:.1}%", snapshot.system.total_cpu_percent);
616 println!(
617 "│ Memory: {:.1} / {:.1} MB ({:.1}%)",
618 snapshot.system.total_memory_used_mb,
619 snapshot.system.total_memory_mb,
620 (snapshot.system.total_memory_used_mb / snapshot.system.total_memory_mb) * 100.0
621 );
622 println!("└─────────────────────────────────────────────────────────────────┘");
623 println!();
624
625 println!("┌─ Top Agents (by CPU) ───────────────────────────────────────────┐");
627 for (i, agent) in snapshot.agents.iter().take(5).enumerate() {
628 println!(
629 "│ {}. {} - CPU: {:.1}%, Mem: {:.1}MB",
630 i + 1,
631 agent.name,
632 agent.cpu_percent,
633 agent.memory_mb
634 );
635 }
636 println!("└─────────────────────────────────────────────────────────────────┘");
637 println!();
638
639 println!("┌─ Recent Events ─────────────────────────────────────────────────┐");
641 for event in events.iter().take(5) {
642 println!(
643 "│ {} [{}] {}",
644 event.timestamp.format("%H:%M:%S"),
645 event.severity,
646 event.message
647 );
648 }
649 println!("└─────────────────────────────────────────────────────────────────┘");
650
651 println!("\nRefreshing in {}s... (Press Ctrl+C to exit)", interval);
652
653 tokio::time::sleep(Duration::from_secs(interval)).await;
654 }
655}
656
657pub async fn handle_monitor_command(command: MonitorCommand, format: OutputFormat) -> Result<()> {
659 command.execute(format).await
660}