chasm_cli/agency/
remote.rs

1// Copyright (c) 2024-2026 Nervosys LLC
2// SPDX-License-Identifier: Apache-2.0
3//! Remote Agent Monitoring System
4//!
5//! Enables monitoring of agent task progress across distributed machines.
6//!
7//! ## Features
8//!
9//! - **Real-time Progress**: WebSocket-based live updates
10//! - **Task Tracking**: Monitor task status, progress, and metrics
11//! - **Multi-machine**: Track agents across multiple remote hosts
12//! - **Heartbeat**: Automatic health monitoring with configurable intervals
13
14#![allow(dead_code)]
15//! - **Event Streaming**: Subscribe to specific agent or task events
16
17use chrono::{DateTime, Utc};
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::sync::Arc;
21use tokio::sync::{broadcast, RwLock};
22
23// =============================================================================
24// Core Types
25// =============================================================================
26
27/// Unique identifier for remote nodes
28pub type NodeId = String;
29
30/// Unique identifier for remote tasks
31pub type RemoteTaskId = String;
32
33/// Remote node representing a machine running agents
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct RemoteNode {
36    /// Unique node identifier
37    pub id: NodeId,
38    /// Human-readable name
39    pub name: String,
40    /// Node address (hostname:port or IP:port)
41    pub address: String,
42    /// Node status
43    pub status: NodeStatus,
44    /// Node capabilities/tags
45    pub tags: Vec<String>,
46    /// Hardware info
47    pub hardware: Option<HardwareInfo>,
48    /// Number of active agents
49    pub active_agents: u32,
50    /// Number of running tasks
51    pub running_tasks: u32,
52    /// Last heartbeat received
53    pub last_heartbeat: DateTime<Utc>,
54    /// Node registered at
55    pub registered_at: DateTime<Utc>,
56    /// Custom metadata
57    pub metadata: HashMap<String, serde_json::Value>,
58}
59
60/// Node status
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
62#[serde(rename_all = "snake_case")]
63#[derive(Default)]
64pub enum NodeStatus {
65    /// Node is online and healthy
66    Online,
67    /// Node is online but degraded (high load, etc.)
68    Degraded,
69    /// Node is offline or unreachable
70    Offline,
71    /// Node is in maintenance mode
72    Maintenance,
73    /// Node status is unknown
74    #[default]
75    Unknown,
76}
77
78
79/// Hardware information for a node
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct HardwareInfo {
82    /// CPU cores
83    pub cpu_cores: u32,
84    /// Total RAM in bytes
85    pub ram_total: u64,
86    /// Available RAM in bytes
87    pub ram_available: u64,
88    /// GPU information
89    pub gpus: Vec<GpuInfo>,
90    /// Operating system
91    pub os: String,
92    /// Architecture (x86_64, arm64, etc.)
93    pub arch: String,
94}
95
96/// GPU information
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct GpuInfo {
99    /// GPU name/model
100    pub name: String,
101    /// VRAM in bytes
102    pub vram: u64,
103    /// CUDA version (if NVIDIA)
104    pub cuda_version: Option<String>,
105}
106
107// =============================================================================
108// Remote Task
109// =============================================================================
110
111/// A task running on a remote node
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct RemoteTask {
114    /// Unique task identifier
115    pub id: RemoteTaskId,
116    /// Node this task is running on
117    pub node_id: NodeId,
118    /// Agent executing this task
119    pub agent_id: String,
120    /// Agent name
121    pub agent_name: String,
122    /// Task title/description
123    pub title: String,
124    /// Detailed description
125    pub description: Option<String>,
126    /// Task status
127    pub status: RemoteTaskStatus,
128    /// Progress (0.0 - 1.0)
129    pub progress: f32,
130    /// Progress message
131    pub progress_message: Option<String>,
132    /// Current step (for multi-step tasks)
133    pub current_step: Option<u32>,
134    /// Total steps
135    pub total_steps: Option<u32>,
136    /// Task priority
137    pub priority: TaskPriority,
138    /// Task started at
139    pub started_at: DateTime<Utc>,
140    /// Task completed at
141    pub completed_at: Option<DateTime<Utc>>,
142    /// Estimated completion time
143    pub eta: Option<DateTime<Utc>>,
144    /// Task result (if completed)
145    pub result: Option<TaskResult>,
146    /// Error message (if failed)
147    pub error: Option<String>,
148    /// Resource usage
149    pub resources: ResourceUsage,
150    /// Task logs (recent entries)
151    pub logs: Vec<TaskLogEntry>,
152    /// Custom metadata
153    pub metadata: HashMap<String, serde_json::Value>,
154}
155
156/// Remote task status
157#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
158#[serde(rename_all = "snake_case")]
159pub enum RemoteTaskStatus {
160    /// Task is queued waiting to start
161    Queued,
162    /// Task is starting up
163    Starting,
164    /// Task is actively running
165    Running,
166    /// Task is paused
167    Paused,
168    /// Task completed successfully
169    Completed,
170    /// Task failed with error
171    Failed,
172    /// Task was cancelled
173    Cancelled,
174    /// Task timed out
175    TimedOut,
176}
177
178/// Task priority levels
179#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
180#[serde(rename_all = "snake_case")]
181#[derive(Default)]
182pub enum TaskPriority {
183    Low = 0,
184    #[default]
185    Normal = 1,
186    High = 2,
187    Critical = 3,
188}
189
190
191/// Task result
192#[derive(Debug, Clone, Serialize, Deserialize)]
193pub struct TaskResult {
194    /// Success flag
195    pub success: bool,
196    /// Output data
197    pub output: Option<serde_json::Value>,
198    /// Artifacts produced (file paths, URLs, etc.)
199    pub artifacts: Vec<TaskArtifact>,
200    /// Metrics collected during execution
201    pub metrics: TaskMetrics,
202}
203
204/// Task artifact (output files, etc.)
205#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct TaskArtifact {
207    /// Artifact name
208    pub name: String,
209    /// Artifact type
210    pub artifact_type: ArtifactType,
211    /// Location (path or URL)
212    pub location: String,
213    /// Size in bytes
214    pub size: Option<u64>,
215    /// Checksum (SHA256)
216    pub checksum: Option<String>,
217}
218
219/// Artifact types
220#[derive(Debug, Clone, Serialize, Deserialize)]
221#[serde(rename_all = "snake_case")]
222pub enum ArtifactType {
223    File,
224    Directory,
225    Url,
226    Database,
227    Model,
228    Report,
229    Log,
230    Custom(String),
231}
232
233/// Task execution metrics
234#[derive(Debug, Clone, Default, Serialize, Deserialize)]
235pub struct TaskMetrics {
236    /// Total execution time in milliseconds
237    pub duration_ms: u64,
238    /// Tokens used (for LLM tasks)
239    pub tokens_used: Option<u64>,
240    /// API calls made
241    pub api_calls: u32,
242    /// Files processed
243    pub files_processed: u32,
244    /// Errors encountered (but recovered)
245    pub errors_recovered: u32,
246    /// Retries performed
247    pub retries: u32,
248}
249
250/// Resource usage during task execution
251#[derive(Debug, Clone, Default, Serialize, Deserialize)]
252pub struct ResourceUsage {
253    /// CPU usage percentage (0-100)
254    pub cpu_percent: f32,
255    /// Memory usage in bytes
256    pub memory_bytes: u64,
257    /// GPU memory usage in bytes (if applicable)
258    pub gpu_memory_bytes: Option<u64>,
259    /// Network bytes sent
260    pub network_tx_bytes: u64,
261    /// Network bytes received
262    pub network_rx_bytes: u64,
263    /// Disk read bytes
264    pub disk_read_bytes: u64,
265    /// Disk write bytes
266    pub disk_write_bytes: u64,
267}
268
269/// Task log entry
270#[derive(Debug, Clone, Serialize, Deserialize)]
271pub struct TaskLogEntry {
272    /// Log timestamp
273    pub timestamp: DateTime<Utc>,
274    /// Log level
275    pub level: LogLevel,
276    /// Log message
277    pub message: String,
278    /// Optional structured data
279    pub data: Option<serde_json::Value>,
280}
281
282/// Log levels
283#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
284#[serde(rename_all = "snake_case")]
285pub enum LogLevel {
286    Trace,
287    Debug,
288    Info,
289    Warn,
290    Error,
291}
292
293// =============================================================================
294// Events
295// =============================================================================
296
297/// Events emitted by the remote monitoring system
298#[derive(Debug, Clone, Serialize, Deserialize)]
299#[serde(tag = "type", content = "data")]
300pub enum RemoteEvent {
301    /// Node came online
302    NodeOnline(NodeId),
303    /// Node went offline
304    NodeOffline(NodeId),
305    /// Node status changed
306    NodeStatusChanged {
307        node_id: NodeId,
308        old_status: NodeStatus,
309        new_status: NodeStatus,
310    },
311    /// Node heartbeat received
312    NodeHeartbeat {
313        node_id: NodeId,
314        timestamp: DateTime<Utc>,
315    },
316
317    /// Task was created/queued
318    TaskCreated(RemoteTask),
319    /// Task started running
320    TaskStarted {
321        task_id: RemoteTaskId,
322        node_id: NodeId,
323    },
324    /// Task progress updated
325    TaskProgress {
326        task_id: RemoteTaskId,
327        progress: f32,
328        message: Option<String>,
329    },
330    /// Task step completed
331    TaskStepCompleted {
332        task_id: RemoteTaskId,
333        step: u32,
334        total: u32,
335        description: Option<String>,
336    },
337    /// Task completed successfully
338    TaskCompleted {
339        task_id: RemoteTaskId,
340        result: TaskResult,
341    },
342    /// Task failed
343    TaskFailed {
344        task_id: RemoteTaskId,
345        error: String,
346    },
347    /// Task was cancelled
348    TaskCancelled {
349        task_id: RemoteTaskId,
350        reason: Option<String>,
351    },
352    /// Task log entry added
353    TaskLog {
354        task_id: RemoteTaskId,
355        entry: TaskLogEntry,
356    },
357
358    /// Agent registered on node
359    AgentRegistered {
360        node_id: NodeId,
361        agent_id: String,
362        agent_name: String,
363    },
364    /// Agent unregistered from node
365    AgentUnregistered { node_id: NodeId, agent_id: String },
366}
367
368// =============================================================================
369// Remote Monitor
370// =============================================================================
371
372/// Configuration for the remote monitor
373#[derive(Debug, Clone, Serialize, Deserialize)]
374pub struct RemoteMonitorConfig {
375    /// Server bind address
376    pub bind_address: String,
377    /// Server port
378    pub port: u16,
379    /// Enable TLS
380    pub tls_enabled: bool,
381    /// TLS certificate path
382    pub tls_cert_path: Option<String>,
383    /// TLS key path
384    pub tls_key_path: Option<String>,
385    /// Authentication token (for API access)
386    pub auth_token: Option<String>,
387    /// Heartbeat interval in seconds
388    pub heartbeat_interval_secs: u64,
389    /// Node timeout in seconds (mark offline after this)
390    pub node_timeout_secs: u64,
391    /// Maximum log entries per task
392    pub max_log_entries: usize,
393    /// Enable metrics collection
394    pub metrics_enabled: bool,
395}
396
397impl Default for RemoteMonitorConfig {
398    fn default() -> Self {
399        Self {
400            bind_address: "0.0.0.0".to_string(),
401            port: 9876,
402            tls_enabled: false,
403            tls_cert_path: None,
404            tls_key_path: None,
405            auth_token: None,
406            heartbeat_interval_secs: 30,
407            node_timeout_secs: 90,
408            max_log_entries: 1000,
409            metrics_enabled: true,
410        }
411    }
412}
413
414/// Remote monitoring server
415pub struct RemoteMonitor {
416    config: RemoteMonitorConfig,
417    nodes: Arc<RwLock<HashMap<NodeId, RemoteNode>>>,
418    tasks: Arc<RwLock<HashMap<RemoteTaskId, RemoteTask>>>,
419    event_tx: broadcast::Sender<RemoteEvent>,
420}
421
422impl RemoteMonitor {
423    /// Create a new remote monitor
424    pub fn new(config: RemoteMonitorConfig) -> Self {
425        let (event_tx, _) = broadcast::channel(1000);
426
427        Self {
428            config,
429            nodes: Arc::new(RwLock::new(HashMap::new())),
430            tasks: Arc::new(RwLock::new(HashMap::new())),
431            event_tx,
432        }
433    }
434
435    /// Subscribe to events
436    pub fn subscribe(&self) -> broadcast::Receiver<RemoteEvent> {
437        self.event_tx.subscribe()
438    }
439
440    /// Register a new node
441    pub async fn register_node(&self, node: RemoteNode) -> Result<(), RemoteMonitorError> {
442        let node_id = node.id.clone();
443        let mut nodes = self.nodes.write().await;
444        nodes.insert(node_id.clone(), node);
445
446        let _ = self.event_tx.send(RemoteEvent::NodeOnline(node_id));
447        Ok(())
448    }
449
450    /// Unregister a node
451    pub async fn unregister_node(&self, node_id: &str) -> Result<bool, RemoteMonitorError> {
452        let mut nodes = self.nodes.write().await;
453        let removed = nodes.remove(node_id).is_some();
454
455        if removed {
456            let _ = self
457                .event_tx
458                .send(RemoteEvent::NodeOffline(node_id.to_string()));
459        }
460
461        Ok(removed)
462    }
463
464    /// Update node heartbeat
465    pub async fn heartbeat(&self, node_id: &str) -> Result<(), RemoteMonitorError> {
466        let mut nodes = self.nodes.write().await;
467
468        if let Some(node) = nodes.get_mut(node_id) {
469            node.last_heartbeat = Utc::now();
470            if node.status == NodeStatus::Offline || node.status == NodeStatus::Unknown {
471                let old_status = node.status;
472                node.status = NodeStatus::Online;
473                let _ = self.event_tx.send(RemoteEvent::NodeStatusChanged {
474                    node_id: node_id.to_string(),
475                    old_status,
476                    new_status: NodeStatus::Online,
477                });
478            }
479            let _ = self.event_tx.send(RemoteEvent::NodeHeartbeat {
480                node_id: node_id.to_string(),
481                timestamp: node.last_heartbeat,
482            });
483            Ok(())
484        } else {
485            Err(RemoteMonitorError::NodeNotFound(node_id.to_string()))
486        }
487    }
488
489    /// Get node by ID
490    pub async fn get_node(&self, node_id: &str) -> Option<RemoteNode> {
491        let nodes = self.nodes.read().await;
492        nodes.get(node_id).cloned()
493    }
494
495    /// List all nodes
496    pub async fn list_nodes(&self) -> Vec<RemoteNode> {
497        let nodes = self.nodes.read().await;
498        nodes.values().cloned().collect()
499    }
500
501    /// List nodes by status
502    pub async fn list_nodes_by_status(&self, status: NodeStatus) -> Vec<RemoteNode> {
503        let nodes = self.nodes.read().await;
504        nodes
505            .values()
506            .filter(|n| n.status == status)
507            .cloned()
508            .collect()
509    }
510
511    /// Create a new task
512    pub async fn create_task(&self, task: RemoteTask) -> Result<RemoteTaskId, RemoteMonitorError> {
513        // Verify node exists
514        {
515            let nodes = self.nodes.read().await;
516            if !nodes.contains_key(&task.node_id) {
517                return Err(RemoteMonitorError::NodeNotFound(task.node_id.clone()));
518            }
519        }
520
521        let task_id = task.id.clone();
522        let mut tasks = self.tasks.write().await;
523        tasks.insert(task_id.clone(), task.clone());
524
525        let _ = self.event_tx.send(RemoteEvent::TaskCreated(task));
526        Ok(task_id)
527    }
528
529    /// Update task status
530    pub async fn update_task_status(
531        &self,
532        task_id: &str,
533        status: RemoteTaskStatus,
534    ) -> Result<(), RemoteMonitorError> {
535        let mut tasks = self.tasks.write().await;
536
537        if let Some(task) = tasks.get_mut(task_id) {
538            let old_status = task.status;
539            task.status = status;
540
541            match status {
542                RemoteTaskStatus::Running if old_status != RemoteTaskStatus::Running => {
543                    let _ = self.event_tx.send(RemoteEvent::TaskStarted {
544                        task_id: task_id.to_string(),
545                        node_id: task.node_id.clone(),
546                    });
547                }
548                RemoteTaskStatus::Completed => {
549                    task.completed_at = Some(Utc::now());
550                    task.progress = 1.0;
551                }
552                RemoteTaskStatus::Failed
553                | RemoteTaskStatus::Cancelled
554                | RemoteTaskStatus::TimedOut => {
555                    task.completed_at = Some(Utc::now());
556                }
557                _ => {}
558            }
559
560            Ok(())
561        } else {
562            Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
563        }
564    }
565
566    /// Update task progress
567    pub async fn update_task_progress(
568        &self,
569        task_id: &str,
570        progress: f32,
571        message: Option<String>,
572    ) -> Result<(), RemoteMonitorError> {
573        let mut tasks = self.tasks.write().await;
574
575        if let Some(task) = tasks.get_mut(task_id) {
576            task.progress = progress.clamp(0.0, 1.0);
577            task.progress_message = message.clone();
578
579            let _ = self.event_tx.send(RemoteEvent::TaskProgress {
580                task_id: task_id.to_string(),
581                progress: task.progress,
582                message,
583            });
584
585            Ok(())
586        } else {
587            Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
588        }
589    }
590
591    /// Update task step
592    pub async fn update_task_step(
593        &self,
594        task_id: &str,
595        step: u32,
596        total: u32,
597        description: Option<String>,
598    ) -> Result<(), RemoteMonitorError> {
599        let mut tasks = self.tasks.write().await;
600
601        if let Some(task) = tasks.get_mut(task_id) {
602            task.current_step = Some(step);
603            task.total_steps = Some(total);
604            task.progress = step as f32 / total as f32;
605
606            let _ = self.event_tx.send(RemoteEvent::TaskStepCompleted {
607                task_id: task_id.to_string(),
608                step,
609                total,
610                description,
611            });
612
613            Ok(())
614        } else {
615            Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
616        }
617    }
618
619    /// Complete a task
620    pub async fn complete_task(
621        &self,
622        task_id: &str,
623        result: TaskResult,
624    ) -> Result<(), RemoteMonitorError> {
625        let node_id = {
626            let mut tasks = self.tasks.write().await;
627
628            if let Some(task) = tasks.get_mut(task_id) {
629                task.status = RemoteTaskStatus::Completed;
630                task.completed_at = Some(Utc::now());
631                task.progress = 1.0;
632                task.result = Some(result.clone());
633                task.node_id.clone()
634            } else {
635                return Err(RemoteMonitorError::TaskNotFound(task_id.to_string()));
636            }
637        };
638
639        // Update node task count
640        let mut nodes = self.nodes.write().await;
641        if let Some(node) = nodes.get_mut(&node_id) {
642            node.running_tasks = node.running_tasks.saturating_sub(1);
643        }
644
645        let _ = self.event_tx.send(RemoteEvent::TaskCompleted {
646            task_id: task_id.to_string(),
647            result,
648        });
649
650        Ok(())
651    }
652
653    /// Fail a task
654    pub async fn fail_task(&self, task_id: &str, error: String) -> Result<(), RemoteMonitorError> {
655        let node_id = {
656            let mut tasks = self.tasks.write().await;
657
658            if let Some(task) = tasks.get_mut(task_id) {
659                task.status = RemoteTaskStatus::Failed;
660                task.completed_at = Some(Utc::now());
661                task.error = Some(error.clone());
662                task.node_id.clone()
663            } else {
664                return Err(RemoteMonitorError::TaskNotFound(task_id.to_string()));
665            }
666        };
667
668        // Update node task count
669        let mut nodes = self.nodes.write().await;
670        if let Some(node) = nodes.get_mut(&node_id) {
671            node.running_tasks = node.running_tasks.saturating_sub(1);
672        }
673
674        let _ = self.event_tx.send(RemoteEvent::TaskFailed {
675            task_id: task_id.to_string(),
676            error,
677        });
678
679        Ok(())
680    }
681
682    /// Cancel a task
683    pub async fn cancel_task(
684        &self,
685        task_id: &str,
686        reason: Option<String>,
687    ) -> Result<(), RemoteMonitorError> {
688        let node_id = {
689            let mut tasks = self.tasks.write().await;
690
691            if let Some(task) = tasks.get_mut(task_id) {
692                task.status = RemoteTaskStatus::Cancelled;
693                task.completed_at = Some(Utc::now());
694                task.node_id.clone()
695            } else {
696                return Err(RemoteMonitorError::TaskNotFound(task_id.to_string()));
697            }
698        };
699
700        // Update node task count
701        let mut nodes = self.nodes.write().await;
702        if let Some(node) = nodes.get_mut(&node_id) {
703            node.running_tasks = node.running_tasks.saturating_sub(1);
704        }
705
706        let _ = self.event_tx.send(RemoteEvent::TaskCancelled {
707            task_id: task_id.to_string(),
708            reason,
709        });
710
711        Ok(())
712    }
713
714    /// Add log entry to task
715    pub async fn add_task_log(
716        &self,
717        task_id: &str,
718        entry: TaskLogEntry,
719    ) -> Result<(), RemoteMonitorError> {
720        let mut tasks = self.tasks.write().await;
721
722        if let Some(task) = tasks.get_mut(task_id) {
723            task.logs.push(entry.clone());
724
725            // Trim logs if over limit
726            if task.logs.len() > self.config.max_log_entries {
727                let drain_count = task.logs.len() - self.config.max_log_entries;
728                task.logs.drain(0..drain_count);
729            }
730
731            let _ = self.event_tx.send(RemoteEvent::TaskLog {
732                task_id: task_id.to_string(),
733                entry,
734            });
735
736            Ok(())
737        } else {
738            Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
739        }
740    }
741
742    /// Get task by ID
743    pub async fn get_task(&self, task_id: &str) -> Option<RemoteTask> {
744        let tasks = self.tasks.read().await;
745        tasks.get(task_id).cloned()
746    }
747
748    /// List all tasks
749    pub async fn list_tasks(&self) -> Vec<RemoteTask> {
750        let tasks = self.tasks.read().await;
751        tasks.values().cloned().collect()
752    }
753
754    /// List tasks by node
755    pub async fn list_tasks_by_node(&self, node_id: &str) -> Vec<RemoteTask> {
756        let tasks = self.tasks.read().await;
757        tasks
758            .values()
759            .filter(|t| t.node_id == node_id)
760            .cloned()
761            .collect()
762    }
763
764    /// List tasks by status
765    pub async fn list_tasks_by_status(&self, status: RemoteTaskStatus) -> Vec<RemoteTask> {
766        let tasks = self.tasks.read().await;
767        tasks
768            .values()
769            .filter(|t| t.status == status)
770            .cloned()
771            .collect()
772    }
773
774    /// List tasks by agent
775    pub async fn list_tasks_by_agent(&self, agent_id: &str) -> Vec<RemoteTask> {
776        let tasks = self.tasks.read().await;
777        tasks
778            .values()
779            .filter(|t| t.agent_id == agent_id)
780            .cloned()
781            .collect()
782    }
783
784    /// Get monitoring statistics
785    pub async fn get_stats(&self) -> MonitorStats {
786        let nodes = self.nodes.read().await;
787        let tasks = self.tasks.read().await;
788
789        let online_nodes = nodes
790            .values()
791            .filter(|n| n.status == NodeStatus::Online)
792            .count();
793        let total_agents: u32 = nodes.values().map(|n| n.active_agents).sum();
794
795        let running_tasks = tasks
796            .values()
797            .filter(|t| t.status == RemoteTaskStatus::Running)
798            .count();
799        let queued_tasks = tasks
800            .values()
801            .filter(|t| t.status == RemoteTaskStatus::Queued)
802            .count();
803        let completed_tasks = tasks
804            .values()
805            .filter(|t| t.status == RemoteTaskStatus::Completed)
806            .count();
807        let failed_tasks = tasks
808            .values()
809            .filter(|t| t.status == RemoteTaskStatus::Failed)
810            .count();
811
812        MonitorStats {
813            total_nodes: nodes.len(),
814            online_nodes,
815            total_agents: total_agents as usize,
816            total_tasks: tasks.len(),
817            running_tasks,
818            queued_tasks,
819            completed_tasks,
820            failed_tasks,
821        }
822    }
823
824    /// Check for timed out nodes and update their status
825    pub async fn check_node_timeouts(&self) {
826        let timeout = chrono::Duration::seconds(self.config.node_timeout_secs as i64);
827        let now = Utc::now();
828
829        let mut nodes = self.nodes.write().await;
830        for node in nodes.values_mut() {
831            if node.status == NodeStatus::Online && now - node.last_heartbeat > timeout {
832                let old_status = node.status;
833                node.status = NodeStatus::Offline;
834                let _ = self.event_tx.send(RemoteEvent::NodeStatusChanged {
835                    node_id: node.id.clone(),
836                    old_status,
837                    new_status: NodeStatus::Offline,
838                });
839            }
840        }
841    }
842
843    /// Get configuration
844    pub fn config(&self) -> &RemoteMonitorConfig {
845        &self.config
846    }
847}
848
849/// Monitor statistics
850#[derive(Debug, Clone, Serialize, Deserialize)]
851pub struct MonitorStats {
852    pub total_nodes: usize,
853    pub online_nodes: usize,
854    pub total_agents: usize,
855    pub total_tasks: usize,
856    pub running_tasks: usize,
857    pub queued_tasks: usize,
858    pub completed_tasks: usize,
859    pub failed_tasks: usize,
860}
861
862// =============================================================================
863// Remote Agent Client
864// =============================================================================
865
866/// Client for remote agents to report progress
867pub struct RemoteAgentClient {
868    /// Node ID this client represents
869    pub node_id: NodeId,
870    /// Server URL
871    server_url: String,
872    /// Authentication token
873    auth_token: Option<String>,
874    /// HTTP client
875    #[allow(dead_code)]
876    client: reqwest::Client,
877}
878
879impl RemoteAgentClient {
880    /// Create a new remote agent client
881    pub fn new(node_id: impl Into<String>, server_url: impl Into<String>) -> Self {
882        Self {
883            node_id: node_id.into(),
884            server_url: server_url.into(),
885            auth_token: None,
886            client: reqwest::Client::new(),
887        }
888    }
889
890    /// Set authentication token
891    pub fn with_auth(mut self, token: impl Into<String>) -> Self {
892        self.auth_token = Some(token.into());
893        self
894    }
895
896    /// Send heartbeat
897    pub async fn heartbeat(&self) -> Result<(), RemoteMonitorError> {
898        let url = format!(
899            "{}/api/v1/nodes/{}/heartbeat",
900            self.server_url, self.node_id
901        );
902
903        let mut request = self.client.post(&url);
904        if let Some(ref token) = self.auth_token {
905            request = request.header("Authorization", format!("Bearer {}", token));
906        }
907
908        let response = request
909            .send()
910            .await
911            .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
912
913        if !response.status().is_success() {
914            return Err(RemoteMonitorError::ApiError(
915                response.status().as_u16(),
916                response.text().await.unwrap_or_default(),
917            ));
918        }
919
920        Ok(())
921    }
922
923    /// Report task progress
924    pub async fn report_progress(
925        &self,
926        task_id: &str,
927        progress: f32,
928        message: Option<&str>,
929    ) -> Result<(), RemoteMonitorError> {
930        let url = format!("{}/api/v1/tasks/{}/progress", self.server_url, task_id);
931
932        let body = serde_json::json!({
933            "progress": progress,
934            "message": message,
935        });
936
937        let mut request = self.client.post(&url).json(&body);
938        if let Some(ref token) = self.auth_token {
939            request = request.header("Authorization", format!("Bearer {}", token));
940        }
941
942        let response = request
943            .send()
944            .await
945            .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
946
947        if !response.status().is_success() {
948            return Err(RemoteMonitorError::ApiError(
949                response.status().as_u16(),
950                response.text().await.unwrap_or_default(),
951            ));
952        }
953
954        Ok(())
955    }
956
957    /// Report task step
958    pub async fn report_step(
959        &self,
960        task_id: &str,
961        step: u32,
962        total: u32,
963        description: Option<&str>,
964    ) -> Result<(), RemoteMonitorError> {
965        let url = format!("{}/api/v1/tasks/{}/step", self.server_url, task_id);
966
967        let body = serde_json::json!({
968            "step": step,
969            "total": total,
970            "description": description,
971        });
972
973        let mut request = self.client.post(&url).json(&body);
974        if let Some(ref token) = self.auth_token {
975            request = request.header("Authorization", format!("Bearer {}", token));
976        }
977
978        let response = request
979            .send()
980            .await
981            .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
982
983        if !response.status().is_success() {
984            return Err(RemoteMonitorError::ApiError(
985                response.status().as_u16(),
986                response.text().await.unwrap_or_default(),
987            ));
988        }
989
990        Ok(())
991    }
992
993    /// Report task completion
994    pub async fn report_completed(
995        &self,
996        task_id: &str,
997        result: TaskResult,
998    ) -> Result<(), RemoteMonitorError> {
999        let url = format!("{}/api/v1/tasks/{}/complete", self.server_url, task_id);
1000
1001        let mut request = self.client.post(&url).json(&result);
1002        if let Some(ref token) = self.auth_token {
1003            request = request.header("Authorization", format!("Bearer {}", token));
1004        }
1005
1006        let response = request
1007            .send()
1008            .await
1009            .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
1010
1011        if !response.status().is_success() {
1012            return Err(RemoteMonitorError::ApiError(
1013                response.status().as_u16(),
1014                response.text().await.unwrap_or_default(),
1015            ));
1016        }
1017
1018        Ok(())
1019    }
1020
1021    /// Report task failure
1022    pub async fn report_failed(
1023        &self,
1024        task_id: &str,
1025        error: &str,
1026    ) -> Result<(), RemoteMonitorError> {
1027        let url = format!("{}/api/v1/tasks/{}/fail", self.server_url, task_id);
1028
1029        let body = serde_json::json!({
1030            "error": error,
1031        });
1032
1033        let mut request = self.client.post(&url).json(&body);
1034        if let Some(ref token) = self.auth_token {
1035            request = request.header("Authorization", format!("Bearer {}", token));
1036        }
1037
1038        let response = request
1039            .send()
1040            .await
1041            .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
1042
1043        if !response.status().is_success() {
1044            return Err(RemoteMonitorError::ApiError(
1045                response.status().as_u16(),
1046                response.text().await.unwrap_or_default(),
1047            ));
1048        }
1049
1050        Ok(())
1051    }
1052
1053    /// Send log entry
1054    pub async fn send_log(
1055        &self,
1056        task_id: &str,
1057        level: LogLevel,
1058        message: &str,
1059    ) -> Result<(), RemoteMonitorError> {
1060        let url = format!("{}/api/v1/tasks/{}/log", self.server_url, task_id);
1061
1062        let body = serde_json::json!({
1063            "level": level,
1064            "message": message,
1065            "timestamp": Utc::now(),
1066        });
1067
1068        let mut request = self.client.post(&url).json(&body);
1069        if let Some(ref token) = self.auth_token {
1070            request = request.header("Authorization", format!("Bearer {}", token));
1071        }
1072
1073        let response = request
1074            .send()
1075            .await
1076            .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
1077
1078        if !response.status().is_success() {
1079            return Err(RemoteMonitorError::ApiError(
1080                response.status().as_u16(),
1081                response.text().await.unwrap_or_default(),
1082            ));
1083        }
1084
1085        Ok(())
1086    }
1087}
1088
1089// =============================================================================
1090// Task Builder
1091// =============================================================================
1092
1093/// Builder for creating remote tasks
1094pub struct RemoteTaskBuilder {
1095    task: RemoteTask,
1096}
1097
1098impl RemoteTaskBuilder {
1099    /// Create a new task builder
1100    pub fn new(
1101        id: impl Into<String>,
1102        node_id: impl Into<String>,
1103        agent_id: impl Into<String>,
1104    ) -> Self {
1105        Self {
1106            task: RemoteTask {
1107                id: id.into(),
1108                node_id: node_id.into(),
1109                agent_id: agent_id.into(),
1110                agent_name: String::new(),
1111                title: String::new(),
1112                description: None,
1113                status: RemoteTaskStatus::Queued,
1114                progress: 0.0,
1115                progress_message: None,
1116                current_step: None,
1117                total_steps: None,
1118                priority: TaskPriority::Normal,
1119                started_at: Utc::now(),
1120                completed_at: None,
1121                eta: None,
1122                result: None,
1123                error: None,
1124                resources: ResourceUsage::default(),
1125                logs: Vec::new(),
1126                metadata: HashMap::new(),
1127            },
1128        }
1129    }
1130
1131    /// Set agent name
1132    pub fn agent_name(mut self, name: impl Into<String>) -> Self {
1133        self.task.agent_name = name.into();
1134        self
1135    }
1136
1137    /// Set task title
1138    pub fn title(mut self, title: impl Into<String>) -> Self {
1139        self.task.title = title.into();
1140        self
1141    }
1142
1143    /// Set task description
1144    pub fn description(mut self, description: impl Into<String>) -> Self {
1145        self.task.description = Some(description.into());
1146        self
1147    }
1148
1149    /// Set task priority
1150    pub fn priority(mut self, priority: TaskPriority) -> Self {
1151        self.task.priority = priority;
1152        self
1153    }
1154
1155    /// Set total steps
1156    pub fn total_steps(mut self, steps: u32) -> Self {
1157        self.task.total_steps = Some(steps);
1158        self
1159    }
1160
1161    /// Set ETA
1162    pub fn eta(mut self, eta: DateTime<Utc>) -> Self {
1163        self.task.eta = Some(eta);
1164        self
1165    }
1166
1167    /// Add metadata
1168    pub fn metadata(mut self, key: impl Into<String>, value: impl Serialize) -> Self {
1169        if let Ok(v) = serde_json::to_value(value) {
1170            self.task.metadata.insert(key.into(), v);
1171        }
1172        self
1173    }
1174
1175    /// Build the task
1176    pub fn build(self) -> RemoteTask {
1177        self.task
1178    }
1179}
1180
1181// =============================================================================
1182// Error Types
1183// =============================================================================
1184
1185/// Remote monitor errors
1186#[derive(Debug, Clone)]
1187pub enum RemoteMonitorError {
1188    /// Node not found
1189    NodeNotFound(String),
1190    /// Task not found
1191    TaskNotFound(String),
1192    /// Network error
1193    Network(String),
1194    /// API error
1195    ApiError(u16, String),
1196    /// Authentication error
1197    AuthError(String),
1198    /// Invalid state
1199    InvalidState(String),
1200}
1201
1202impl std::fmt::Display for RemoteMonitorError {
1203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1204        match self {
1205            RemoteMonitorError::NodeNotFound(id) => write!(f, "Node not found: {}", id),
1206            RemoteMonitorError::TaskNotFound(id) => write!(f, "Task not found: {}", id),
1207            RemoteMonitorError::Network(e) => write!(f, "Network error: {}", e),
1208            RemoteMonitorError::ApiError(code, msg) => write!(f, "API error {}: {}", code, msg),
1209            RemoteMonitorError::AuthError(e) => write!(f, "Authentication error: {}", e),
1210            RemoteMonitorError::InvalidState(e) => write!(f, "Invalid state: {}", e),
1211        }
1212    }
1213}
1214
1215impl std::error::Error for RemoteMonitorError {}
1216
1217// =============================================================================
1218// Helper Functions
1219// =============================================================================
1220
1221/// Generate a unique task ID
1222pub fn generate_task_id() -> String {
1223    use std::time::{SystemTime, UNIX_EPOCH};
1224    let timestamp = SystemTime::now()
1225        .duration_since(UNIX_EPOCH)
1226        .unwrap()
1227        .as_nanos();
1228    format!("task_{:x}", timestamp)
1229}
1230
1231/// Generate a unique node ID
1232pub fn generate_node_id() -> String {
1233    use std::time::{SystemTime, UNIX_EPOCH};
1234    let timestamp = SystemTime::now()
1235        .duration_since(UNIX_EPOCH)
1236        .unwrap()
1237        .as_nanos();
1238    format!("node_{:x}", timestamp)
1239}
1240
1241// =============================================================================
1242// Tests
1243// =============================================================================
1244
1245#[cfg(test)]
1246mod tests {
1247    use super::*;
1248
1249    #[tokio::test]
1250    async fn test_remote_monitor_creation() {
1251        let config = RemoteMonitorConfig::default();
1252        let monitor = RemoteMonitor::new(config);
1253
1254        let stats = monitor.get_stats().await;
1255        assert_eq!(stats.total_nodes, 0);
1256        assert_eq!(stats.total_tasks, 0);
1257    }
1258
1259    #[tokio::test]
1260    async fn test_node_registration() {
1261        let monitor = RemoteMonitor::new(RemoteMonitorConfig::default());
1262
1263        let node = RemoteNode {
1264            id: "test-node".to_string(),
1265            name: "Test Node".to_string(),
1266            address: "localhost:9876".to_string(),
1267            status: NodeStatus::Online,
1268            tags: vec!["test".to_string()],
1269            hardware: None,
1270            active_agents: 0,
1271            running_tasks: 0,
1272            last_heartbeat: Utc::now(),
1273            registered_at: Utc::now(),
1274            metadata: HashMap::new(),
1275        };
1276
1277        monitor.register_node(node).await.unwrap();
1278
1279        let retrieved = monitor.get_node("test-node").await;
1280        assert!(retrieved.is_some());
1281        assert_eq!(retrieved.unwrap().name, "Test Node");
1282    }
1283
1284    #[tokio::test]
1285    async fn test_task_creation() {
1286        let monitor = RemoteMonitor::new(RemoteMonitorConfig::default());
1287
1288        // First register a node
1289        let node = RemoteNode {
1290            id: "test-node".to_string(),
1291            name: "Test Node".to_string(),
1292            address: "localhost:9876".to_string(),
1293            status: NodeStatus::Online,
1294            tags: vec![],
1295            hardware: None,
1296            active_agents: 1,
1297            running_tasks: 0,
1298            last_heartbeat: Utc::now(),
1299            registered_at: Utc::now(),
1300            metadata: HashMap::new(),
1301        };
1302        monitor.register_node(node).await.unwrap();
1303
1304        // Create a task
1305        let task = RemoteTaskBuilder::new("task-1", "test-node", "agent-1")
1306            .agent_name("Test Agent")
1307            .title("Test Task")
1308            .description("A test task")
1309            .priority(TaskPriority::High)
1310            .build();
1311
1312        let task_id = monitor.create_task(task).await.unwrap();
1313        assert_eq!(task_id, "task-1");
1314
1315        let retrieved = monitor.get_task("task-1").await;
1316        assert!(retrieved.is_some());
1317        assert_eq!(retrieved.unwrap().title, "Test Task");
1318    }
1319
1320    #[tokio::test]
1321    async fn test_task_progress() {
1322        let monitor = RemoteMonitor::new(RemoteMonitorConfig::default());
1323
1324        // Register node
1325        let node = RemoteNode {
1326            id: "node-1".to_string(),
1327            name: "Node 1".to_string(),
1328            address: "localhost:9876".to_string(),
1329            status: NodeStatus::Online,
1330            tags: vec![],
1331            hardware: None,
1332            active_agents: 1,
1333            running_tasks: 0,
1334            last_heartbeat: Utc::now(),
1335            registered_at: Utc::now(),
1336            metadata: HashMap::new(),
1337        };
1338        monitor.register_node(node).await.unwrap();
1339
1340        // Create task
1341        let task = RemoteTaskBuilder::new("task-1", "node-1", "agent-1")
1342            .title("Progress Test")
1343            .build();
1344        monitor.create_task(task).await.unwrap();
1345
1346        // Update progress
1347        monitor
1348            .update_task_progress("task-1", 0.5, Some("Halfway done".to_string()))
1349            .await
1350            .unwrap();
1351
1352        let task = monitor.get_task("task-1").await.unwrap();
1353        assert!((task.progress - 0.5).abs() < 0.01);
1354        assert_eq!(task.progress_message, Some("Halfway done".to_string()));
1355    }
1356
1357    #[test]
1358    fn test_task_builder() {
1359        let task = RemoteTaskBuilder::new("task-123", "node-1", "agent-1")
1360            .agent_name("My Agent")
1361            .title("Important Task")
1362            .description("Does important things")
1363            .priority(TaskPriority::Critical)
1364            .total_steps(5)
1365            .build();
1366
1367        assert_eq!(task.id, "task-123");
1368        assert_eq!(task.node_id, "node-1");
1369        assert_eq!(task.agent_id, "agent-1");
1370        assert_eq!(task.agent_name, "My Agent");
1371        assert_eq!(task.title, "Important Task");
1372        assert_eq!(task.priority, TaskPriority::Critical);
1373        assert_eq!(task.total_steps, Some(5));
1374        assert_eq!(task.status, RemoteTaskStatus::Queued);
1375    }
1376}