chasm_cli/agency/
remote.rs

1// Copyright (c) 2024-2026 Nervosys LLC
2// SPDX-License-Identifier: Apache-2.0
3//! Remote Agent Monitoring System
4//!
5//! Enables monitoring of agent task progress across distributed machines.
6//!
7//! ## Features
8//!
9//! - **Real-time Progress**: WebSocket-based live updates
10//! - **Task Tracking**: Monitor task status, progress, and metrics
11//! - **Multi-machine**: Track agents across multiple remote hosts
12//! - **Heartbeat**: Automatic health monitoring with configurable intervals
13
14#![allow(dead_code)]
15//! - **Event Streaming**: Subscribe to specific agent or task events
16
17use chrono::{DateTime, Utc};
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::sync::Arc;
21use tokio::sync::{broadcast, RwLock};
22
23// =============================================================================
24// Core Types
25// =============================================================================
26
27/// Unique identifier for remote nodes
28pub type NodeId = String;
29
30/// Unique identifier for remote tasks
31pub type RemoteTaskId = String;
32
33/// Remote node representing a machine running agents
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct RemoteNode {
36    /// Unique node identifier
37    pub id: NodeId,
38    /// Human-readable name
39    pub name: String,
40    /// Node address (hostname:port or IP:port)
41    pub address: String,
42    /// Node status
43    pub status: NodeStatus,
44    /// Node capabilities/tags
45    pub tags: Vec<String>,
46    /// Hardware info
47    pub hardware: Option<HardwareInfo>,
48    /// Number of active agents
49    pub active_agents: u32,
50    /// Number of running tasks
51    pub running_tasks: u32,
52    /// Last heartbeat received
53    pub last_heartbeat: DateTime<Utc>,
54    /// Node registered at
55    pub registered_at: DateTime<Utc>,
56    /// Custom metadata
57    pub metadata: HashMap<String, serde_json::Value>,
58}
59
60/// Node status
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
62#[serde(rename_all = "snake_case")]
63pub enum NodeStatus {
64    /// Node is online and healthy
65    Online,
66    /// Node is online but degraded (high load, etc.)
67    Degraded,
68    /// Node is offline or unreachable
69    Offline,
70    /// Node is in maintenance mode
71    Maintenance,
72    /// Node status is unknown
73    Unknown,
74}
75
76impl Default for NodeStatus {
77    fn default() -> Self {
78        NodeStatus::Unknown
79    }
80}
81
82/// Hardware information for a node
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct HardwareInfo {
85    /// CPU cores
86    pub cpu_cores: u32,
87    /// Total RAM in bytes
88    pub ram_total: u64,
89    /// Available RAM in bytes
90    pub ram_available: u64,
91    /// GPU information
92    pub gpus: Vec<GpuInfo>,
93    /// Operating system
94    pub os: String,
95    /// Architecture (x86_64, arm64, etc.)
96    pub arch: String,
97}
98
99/// GPU information
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct GpuInfo {
102    /// GPU name/model
103    pub name: String,
104    /// VRAM in bytes
105    pub vram: u64,
106    /// CUDA version (if NVIDIA)
107    pub cuda_version: Option<String>,
108}
109
110// =============================================================================
111// Remote Task
112// =============================================================================
113
114/// A task running on a remote node
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct RemoteTask {
117    /// Unique task identifier
118    pub id: RemoteTaskId,
119    /// Node this task is running on
120    pub node_id: NodeId,
121    /// Agent executing this task
122    pub agent_id: String,
123    /// Agent name
124    pub agent_name: String,
125    /// Task title/description
126    pub title: String,
127    /// Detailed description
128    pub description: Option<String>,
129    /// Task status
130    pub status: RemoteTaskStatus,
131    /// Progress (0.0 - 1.0)
132    pub progress: f32,
133    /// Progress message
134    pub progress_message: Option<String>,
135    /// Current step (for multi-step tasks)
136    pub current_step: Option<u32>,
137    /// Total steps
138    pub total_steps: Option<u32>,
139    /// Task priority
140    pub priority: TaskPriority,
141    /// Task started at
142    pub started_at: DateTime<Utc>,
143    /// Task completed at
144    pub completed_at: Option<DateTime<Utc>>,
145    /// Estimated completion time
146    pub eta: Option<DateTime<Utc>>,
147    /// Task result (if completed)
148    pub result: Option<TaskResult>,
149    /// Error message (if failed)
150    pub error: Option<String>,
151    /// Resource usage
152    pub resources: ResourceUsage,
153    /// Task logs (recent entries)
154    pub logs: Vec<TaskLogEntry>,
155    /// Custom metadata
156    pub metadata: HashMap<String, serde_json::Value>,
157}
158
159/// Remote task status
160#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
161#[serde(rename_all = "snake_case")]
162pub enum RemoteTaskStatus {
163    /// Task is queued waiting to start
164    Queued,
165    /// Task is starting up
166    Starting,
167    /// Task is actively running
168    Running,
169    /// Task is paused
170    Paused,
171    /// Task completed successfully
172    Completed,
173    /// Task failed with error
174    Failed,
175    /// Task was cancelled
176    Cancelled,
177    /// Task timed out
178    TimedOut,
179}
180
181/// Task priority levels
182#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
183#[serde(rename_all = "snake_case")]
184pub enum TaskPriority {
185    Low = 0,
186    Normal = 1,
187    High = 2,
188    Critical = 3,
189}
190
191impl Default for TaskPriority {
192    fn default() -> Self {
193        TaskPriority::Normal
194    }
195}
196
197/// Task result
198#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct TaskResult {
200    /// Success flag
201    pub success: bool,
202    /// Output data
203    pub output: Option<serde_json::Value>,
204    /// Artifacts produced (file paths, URLs, etc.)
205    pub artifacts: Vec<TaskArtifact>,
206    /// Metrics collected during execution
207    pub metrics: TaskMetrics,
208}
209
210/// Task artifact (output files, etc.)
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct TaskArtifact {
213    /// Artifact name
214    pub name: String,
215    /// Artifact type
216    pub artifact_type: ArtifactType,
217    /// Location (path or URL)
218    pub location: String,
219    /// Size in bytes
220    pub size: Option<u64>,
221    /// Checksum (SHA256)
222    pub checksum: Option<String>,
223}
224
225/// Artifact types
226#[derive(Debug, Clone, Serialize, Deserialize)]
227#[serde(rename_all = "snake_case")]
228pub enum ArtifactType {
229    File,
230    Directory,
231    Url,
232    Database,
233    Model,
234    Report,
235    Log,
236    Custom(String),
237}
238
239/// Task execution metrics
240#[derive(Debug, Clone, Default, Serialize, Deserialize)]
241pub struct TaskMetrics {
242    /// Total execution time in milliseconds
243    pub duration_ms: u64,
244    /// Tokens used (for LLM tasks)
245    pub tokens_used: Option<u64>,
246    /// API calls made
247    pub api_calls: u32,
248    /// Files processed
249    pub files_processed: u32,
250    /// Errors encountered (but recovered)
251    pub errors_recovered: u32,
252    /// Retries performed
253    pub retries: u32,
254}
255
256/// Resource usage during task execution
257#[derive(Debug, Clone, Default, Serialize, Deserialize)]
258pub struct ResourceUsage {
259    /// CPU usage percentage (0-100)
260    pub cpu_percent: f32,
261    /// Memory usage in bytes
262    pub memory_bytes: u64,
263    /// GPU memory usage in bytes (if applicable)
264    pub gpu_memory_bytes: Option<u64>,
265    /// Network bytes sent
266    pub network_tx_bytes: u64,
267    /// Network bytes received
268    pub network_rx_bytes: u64,
269    /// Disk read bytes
270    pub disk_read_bytes: u64,
271    /// Disk write bytes
272    pub disk_write_bytes: u64,
273}
274
275/// Task log entry
276#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct TaskLogEntry {
278    /// Log timestamp
279    pub timestamp: DateTime<Utc>,
280    /// Log level
281    pub level: LogLevel,
282    /// Log message
283    pub message: String,
284    /// Optional structured data
285    pub data: Option<serde_json::Value>,
286}
287
288/// Log levels
289#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
290#[serde(rename_all = "snake_case")]
291pub enum LogLevel {
292    Trace,
293    Debug,
294    Info,
295    Warn,
296    Error,
297}
298
299// =============================================================================
300// Events
301// =============================================================================
302
303/// Events emitted by the remote monitoring system
304#[derive(Debug, Clone, Serialize, Deserialize)]
305#[serde(tag = "type", content = "data")]
306pub enum RemoteEvent {
307    /// Node came online
308    NodeOnline(NodeId),
309    /// Node went offline
310    NodeOffline(NodeId),
311    /// Node status changed
312    NodeStatusChanged {
313        node_id: NodeId,
314        old_status: NodeStatus,
315        new_status: NodeStatus,
316    },
317    /// Node heartbeat received
318    NodeHeartbeat {
319        node_id: NodeId,
320        timestamp: DateTime<Utc>,
321    },
322
323    /// Task was created/queued
324    TaskCreated(RemoteTask),
325    /// Task started running
326    TaskStarted {
327        task_id: RemoteTaskId,
328        node_id: NodeId,
329    },
330    /// Task progress updated
331    TaskProgress {
332        task_id: RemoteTaskId,
333        progress: f32,
334        message: Option<String>,
335    },
336    /// Task step completed
337    TaskStepCompleted {
338        task_id: RemoteTaskId,
339        step: u32,
340        total: u32,
341        description: Option<String>,
342    },
343    /// Task completed successfully
344    TaskCompleted {
345        task_id: RemoteTaskId,
346        result: TaskResult,
347    },
348    /// Task failed
349    TaskFailed {
350        task_id: RemoteTaskId,
351        error: String,
352    },
353    /// Task was cancelled
354    TaskCancelled {
355        task_id: RemoteTaskId,
356        reason: Option<String>,
357    },
358    /// Task log entry added
359    TaskLog {
360        task_id: RemoteTaskId,
361        entry: TaskLogEntry,
362    },
363
364    /// Agent registered on node
365    AgentRegistered {
366        node_id: NodeId,
367        agent_id: String,
368        agent_name: String,
369    },
370    /// Agent unregistered from node
371    AgentUnregistered { node_id: NodeId, agent_id: String },
372}
373
374// =============================================================================
375// Remote Monitor
376// =============================================================================
377
378/// Configuration for the remote monitor
379#[derive(Debug, Clone, Serialize, Deserialize)]
380pub struct RemoteMonitorConfig {
381    /// Server bind address
382    pub bind_address: String,
383    /// Server port
384    pub port: u16,
385    /// Enable TLS
386    pub tls_enabled: bool,
387    /// TLS certificate path
388    pub tls_cert_path: Option<String>,
389    /// TLS key path
390    pub tls_key_path: Option<String>,
391    /// Authentication token (for API access)
392    pub auth_token: Option<String>,
393    /// Heartbeat interval in seconds
394    pub heartbeat_interval_secs: u64,
395    /// Node timeout in seconds (mark offline after this)
396    pub node_timeout_secs: u64,
397    /// Maximum log entries per task
398    pub max_log_entries: usize,
399    /// Enable metrics collection
400    pub metrics_enabled: bool,
401}
402
403impl Default for RemoteMonitorConfig {
404    fn default() -> Self {
405        Self {
406            bind_address: "0.0.0.0".to_string(),
407            port: 9876,
408            tls_enabled: false,
409            tls_cert_path: None,
410            tls_key_path: None,
411            auth_token: None,
412            heartbeat_interval_secs: 30,
413            node_timeout_secs: 90,
414            max_log_entries: 1000,
415            metrics_enabled: true,
416        }
417    }
418}
419
420/// Remote monitoring server
421pub struct RemoteMonitor {
422    config: RemoteMonitorConfig,
423    nodes: Arc<RwLock<HashMap<NodeId, RemoteNode>>>,
424    tasks: Arc<RwLock<HashMap<RemoteTaskId, RemoteTask>>>,
425    event_tx: broadcast::Sender<RemoteEvent>,
426}
427
428impl RemoteMonitor {
429    /// Create a new remote monitor
430    pub fn new(config: RemoteMonitorConfig) -> Self {
431        let (event_tx, _) = broadcast::channel(1000);
432
433        Self {
434            config,
435            nodes: Arc::new(RwLock::new(HashMap::new())),
436            tasks: Arc::new(RwLock::new(HashMap::new())),
437            event_tx,
438        }
439    }
440
441    /// Subscribe to events
442    pub fn subscribe(&self) -> broadcast::Receiver<RemoteEvent> {
443        self.event_tx.subscribe()
444    }
445
446    /// Register a new node
447    pub async fn register_node(&self, node: RemoteNode) -> Result<(), RemoteMonitorError> {
448        let node_id = node.id.clone();
449        let mut nodes = self.nodes.write().await;
450        nodes.insert(node_id.clone(), node);
451
452        let _ = self.event_tx.send(RemoteEvent::NodeOnline(node_id));
453        Ok(())
454    }
455
456    /// Unregister a node
457    pub async fn unregister_node(&self, node_id: &str) -> Result<bool, RemoteMonitorError> {
458        let mut nodes = self.nodes.write().await;
459        let removed = nodes.remove(node_id).is_some();
460
461        if removed {
462            let _ = self
463                .event_tx
464                .send(RemoteEvent::NodeOffline(node_id.to_string()));
465        }
466
467        Ok(removed)
468    }
469
470    /// Update node heartbeat
471    pub async fn heartbeat(&self, node_id: &str) -> Result<(), RemoteMonitorError> {
472        let mut nodes = self.nodes.write().await;
473
474        if let Some(node) = nodes.get_mut(node_id) {
475            node.last_heartbeat = Utc::now();
476            if node.status == NodeStatus::Offline || node.status == NodeStatus::Unknown {
477                let old_status = node.status;
478                node.status = NodeStatus::Online;
479                let _ = self.event_tx.send(RemoteEvent::NodeStatusChanged {
480                    node_id: node_id.to_string(),
481                    old_status,
482                    new_status: NodeStatus::Online,
483                });
484            }
485            let _ = self.event_tx.send(RemoteEvent::NodeHeartbeat {
486                node_id: node_id.to_string(),
487                timestamp: node.last_heartbeat,
488            });
489            Ok(())
490        } else {
491            Err(RemoteMonitorError::NodeNotFound(node_id.to_string()))
492        }
493    }
494
495    /// Get node by ID
496    pub async fn get_node(&self, node_id: &str) -> Option<RemoteNode> {
497        let nodes = self.nodes.read().await;
498        nodes.get(node_id).cloned()
499    }
500
501    /// List all nodes
502    pub async fn list_nodes(&self) -> Vec<RemoteNode> {
503        let nodes = self.nodes.read().await;
504        nodes.values().cloned().collect()
505    }
506
507    /// List nodes by status
508    pub async fn list_nodes_by_status(&self, status: NodeStatus) -> Vec<RemoteNode> {
509        let nodes = self.nodes.read().await;
510        nodes
511            .values()
512            .filter(|n| n.status == status)
513            .cloned()
514            .collect()
515    }
516
517    /// Create a new task
518    pub async fn create_task(&self, task: RemoteTask) -> Result<RemoteTaskId, RemoteMonitorError> {
519        // Verify node exists
520        {
521            let nodes = self.nodes.read().await;
522            if !nodes.contains_key(&task.node_id) {
523                return Err(RemoteMonitorError::NodeNotFound(task.node_id.clone()));
524            }
525        }
526
527        let task_id = task.id.clone();
528        let mut tasks = self.tasks.write().await;
529        tasks.insert(task_id.clone(), task.clone());
530
531        let _ = self.event_tx.send(RemoteEvent::TaskCreated(task));
532        Ok(task_id)
533    }
534
535    /// Update task status
536    pub async fn update_task_status(
537        &self,
538        task_id: &str,
539        status: RemoteTaskStatus,
540    ) -> Result<(), RemoteMonitorError> {
541        let mut tasks = self.tasks.write().await;
542
543        if let Some(task) = tasks.get_mut(task_id) {
544            let old_status = task.status;
545            task.status = status;
546
547            match status {
548                RemoteTaskStatus::Running if old_status != RemoteTaskStatus::Running => {
549                    let _ = self.event_tx.send(RemoteEvent::TaskStarted {
550                        task_id: task_id.to_string(),
551                        node_id: task.node_id.clone(),
552                    });
553                }
554                RemoteTaskStatus::Completed => {
555                    task.completed_at = Some(Utc::now());
556                    task.progress = 1.0;
557                }
558                RemoteTaskStatus::Failed
559                | RemoteTaskStatus::Cancelled
560                | RemoteTaskStatus::TimedOut => {
561                    task.completed_at = Some(Utc::now());
562                }
563                _ => {}
564            }
565
566            Ok(())
567        } else {
568            Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
569        }
570    }
571
572    /// Update task progress
573    pub async fn update_task_progress(
574        &self,
575        task_id: &str,
576        progress: f32,
577        message: Option<String>,
578    ) -> Result<(), RemoteMonitorError> {
579        let mut tasks = self.tasks.write().await;
580
581        if let Some(task) = tasks.get_mut(task_id) {
582            task.progress = progress.clamp(0.0, 1.0);
583            task.progress_message = message.clone();
584
585            let _ = self.event_tx.send(RemoteEvent::TaskProgress {
586                task_id: task_id.to_string(),
587                progress: task.progress,
588                message,
589            });
590
591            Ok(())
592        } else {
593            Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
594        }
595    }
596
597    /// Update task step
598    pub async fn update_task_step(
599        &self,
600        task_id: &str,
601        step: u32,
602        total: u32,
603        description: Option<String>,
604    ) -> Result<(), RemoteMonitorError> {
605        let mut tasks = self.tasks.write().await;
606
607        if let Some(task) = tasks.get_mut(task_id) {
608            task.current_step = Some(step);
609            task.total_steps = Some(total);
610            task.progress = step as f32 / total as f32;
611
612            let _ = self.event_tx.send(RemoteEvent::TaskStepCompleted {
613                task_id: task_id.to_string(),
614                step,
615                total,
616                description,
617            });
618
619            Ok(())
620        } else {
621            Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
622        }
623    }
624
625    /// Complete a task
626    pub async fn complete_task(
627        &self,
628        task_id: &str,
629        result: TaskResult,
630    ) -> Result<(), RemoteMonitorError> {
631        let node_id = {
632            let mut tasks = self.tasks.write().await;
633
634            if let Some(task) = tasks.get_mut(task_id) {
635                task.status = RemoteTaskStatus::Completed;
636                task.completed_at = Some(Utc::now());
637                task.progress = 1.0;
638                task.result = Some(result.clone());
639                task.node_id.clone()
640            } else {
641                return Err(RemoteMonitorError::TaskNotFound(task_id.to_string()));
642            }
643        };
644
645        // Update node task count
646        let mut nodes = self.nodes.write().await;
647        if let Some(node) = nodes.get_mut(&node_id) {
648            node.running_tasks = node.running_tasks.saturating_sub(1);
649        }
650
651        let _ = self.event_tx.send(RemoteEvent::TaskCompleted {
652            task_id: task_id.to_string(),
653            result,
654        });
655
656        Ok(())
657    }
658
659    /// Fail a task
660    pub async fn fail_task(&self, task_id: &str, error: String) -> Result<(), RemoteMonitorError> {
661        let node_id = {
662            let mut tasks = self.tasks.write().await;
663
664            if let Some(task) = tasks.get_mut(task_id) {
665                task.status = RemoteTaskStatus::Failed;
666                task.completed_at = Some(Utc::now());
667                task.error = Some(error.clone());
668                task.node_id.clone()
669            } else {
670                return Err(RemoteMonitorError::TaskNotFound(task_id.to_string()));
671            }
672        };
673
674        // Update node task count
675        let mut nodes = self.nodes.write().await;
676        if let Some(node) = nodes.get_mut(&node_id) {
677            node.running_tasks = node.running_tasks.saturating_sub(1);
678        }
679
680        let _ = self.event_tx.send(RemoteEvent::TaskFailed {
681            task_id: task_id.to_string(),
682            error,
683        });
684
685        Ok(())
686    }
687
688    /// Cancel a task
689    pub async fn cancel_task(
690        &self,
691        task_id: &str,
692        reason: Option<String>,
693    ) -> Result<(), RemoteMonitorError> {
694        let node_id = {
695            let mut tasks = self.tasks.write().await;
696
697            if let Some(task) = tasks.get_mut(task_id) {
698                task.status = RemoteTaskStatus::Cancelled;
699                task.completed_at = Some(Utc::now());
700                task.node_id.clone()
701            } else {
702                return Err(RemoteMonitorError::TaskNotFound(task_id.to_string()));
703            }
704        };
705
706        // Update node task count
707        let mut nodes = self.nodes.write().await;
708        if let Some(node) = nodes.get_mut(&node_id) {
709            node.running_tasks = node.running_tasks.saturating_sub(1);
710        }
711
712        let _ = self.event_tx.send(RemoteEvent::TaskCancelled {
713            task_id: task_id.to_string(),
714            reason,
715        });
716
717        Ok(())
718    }
719
720    /// Add log entry to task
721    pub async fn add_task_log(
722        &self,
723        task_id: &str,
724        entry: TaskLogEntry,
725    ) -> Result<(), RemoteMonitorError> {
726        let mut tasks = self.tasks.write().await;
727
728        if let Some(task) = tasks.get_mut(task_id) {
729            task.logs.push(entry.clone());
730
731            // Trim logs if over limit
732            if task.logs.len() > self.config.max_log_entries {
733                let drain_count = task.logs.len() - self.config.max_log_entries;
734                task.logs.drain(0..drain_count);
735            }
736
737            let _ = self.event_tx.send(RemoteEvent::TaskLog {
738                task_id: task_id.to_string(),
739                entry,
740            });
741
742            Ok(())
743        } else {
744            Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
745        }
746    }
747
748    /// Get task by ID
749    pub async fn get_task(&self, task_id: &str) -> Option<RemoteTask> {
750        let tasks = self.tasks.read().await;
751        tasks.get(task_id).cloned()
752    }
753
754    /// List all tasks
755    pub async fn list_tasks(&self) -> Vec<RemoteTask> {
756        let tasks = self.tasks.read().await;
757        tasks.values().cloned().collect()
758    }
759
760    /// List tasks by node
761    pub async fn list_tasks_by_node(&self, node_id: &str) -> Vec<RemoteTask> {
762        let tasks = self.tasks.read().await;
763        tasks
764            .values()
765            .filter(|t| t.node_id == node_id)
766            .cloned()
767            .collect()
768    }
769
770    /// List tasks by status
771    pub async fn list_tasks_by_status(&self, status: RemoteTaskStatus) -> Vec<RemoteTask> {
772        let tasks = self.tasks.read().await;
773        tasks
774            .values()
775            .filter(|t| t.status == status)
776            .cloned()
777            .collect()
778    }
779
780    /// List tasks by agent
781    pub async fn list_tasks_by_agent(&self, agent_id: &str) -> Vec<RemoteTask> {
782        let tasks = self.tasks.read().await;
783        tasks
784            .values()
785            .filter(|t| t.agent_id == agent_id)
786            .cloned()
787            .collect()
788    }
789
790    /// Get monitoring statistics
791    pub async fn get_stats(&self) -> MonitorStats {
792        let nodes = self.nodes.read().await;
793        let tasks = self.tasks.read().await;
794
795        let online_nodes = nodes
796            .values()
797            .filter(|n| n.status == NodeStatus::Online)
798            .count();
799        let total_agents: u32 = nodes.values().map(|n| n.active_agents).sum();
800
801        let running_tasks = tasks
802            .values()
803            .filter(|t| t.status == RemoteTaskStatus::Running)
804            .count();
805        let queued_tasks = tasks
806            .values()
807            .filter(|t| t.status == RemoteTaskStatus::Queued)
808            .count();
809        let completed_tasks = tasks
810            .values()
811            .filter(|t| t.status == RemoteTaskStatus::Completed)
812            .count();
813        let failed_tasks = tasks
814            .values()
815            .filter(|t| t.status == RemoteTaskStatus::Failed)
816            .count();
817
818        MonitorStats {
819            total_nodes: nodes.len(),
820            online_nodes,
821            total_agents: total_agents as usize,
822            total_tasks: tasks.len(),
823            running_tasks,
824            queued_tasks,
825            completed_tasks,
826            failed_tasks,
827        }
828    }
829
830    /// Check for timed out nodes and update their status
831    pub async fn check_node_timeouts(&self) {
832        let timeout = chrono::Duration::seconds(self.config.node_timeout_secs as i64);
833        let now = Utc::now();
834
835        let mut nodes = self.nodes.write().await;
836        for node in nodes.values_mut() {
837            if node.status == NodeStatus::Online && now - node.last_heartbeat > timeout {
838                let old_status = node.status;
839                node.status = NodeStatus::Offline;
840                let _ = self.event_tx.send(RemoteEvent::NodeStatusChanged {
841                    node_id: node.id.clone(),
842                    old_status,
843                    new_status: NodeStatus::Offline,
844                });
845            }
846        }
847    }
848
849    /// Get configuration
850    pub fn config(&self) -> &RemoteMonitorConfig {
851        &self.config
852    }
853}
854
855/// Monitor statistics
856#[derive(Debug, Clone, Serialize, Deserialize)]
857pub struct MonitorStats {
858    pub total_nodes: usize,
859    pub online_nodes: usize,
860    pub total_agents: usize,
861    pub total_tasks: usize,
862    pub running_tasks: usize,
863    pub queued_tasks: usize,
864    pub completed_tasks: usize,
865    pub failed_tasks: usize,
866}
867
868// =============================================================================
869// Remote Agent Client
870// =============================================================================
871
872/// Client for remote agents to report progress
873pub struct RemoteAgentClient {
874    /// Node ID this client represents
875    pub node_id: NodeId,
876    /// Server URL
877    server_url: String,
878    /// Authentication token
879    auth_token: Option<String>,
880    /// HTTP client
881    #[allow(dead_code)]
882    client: reqwest::Client,
883}
884
885impl RemoteAgentClient {
886    /// Create a new remote agent client
887    pub fn new(node_id: impl Into<String>, server_url: impl Into<String>) -> Self {
888        Self {
889            node_id: node_id.into(),
890            server_url: server_url.into(),
891            auth_token: None,
892            client: reqwest::Client::new(),
893        }
894    }
895
896    /// Set authentication token
897    pub fn with_auth(mut self, token: impl Into<String>) -> Self {
898        self.auth_token = Some(token.into());
899        self
900    }
901
902    /// Send heartbeat
903    pub async fn heartbeat(&self) -> Result<(), RemoteMonitorError> {
904        let url = format!(
905            "{}/api/v1/nodes/{}/heartbeat",
906            self.server_url, self.node_id
907        );
908
909        let mut request = self.client.post(&url);
910        if let Some(ref token) = self.auth_token {
911            request = request.header("Authorization", format!("Bearer {}", token));
912        }
913
914        let response = request
915            .send()
916            .await
917            .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
918
919        if !response.status().is_success() {
920            return Err(RemoteMonitorError::ApiError(
921                response.status().as_u16(),
922                response.text().await.unwrap_or_default(),
923            ));
924        }
925
926        Ok(())
927    }
928
929    /// Report task progress
930    pub async fn report_progress(
931        &self,
932        task_id: &str,
933        progress: f32,
934        message: Option<&str>,
935    ) -> Result<(), RemoteMonitorError> {
936        let url = format!("{}/api/v1/tasks/{}/progress", self.server_url, task_id);
937
938        let body = serde_json::json!({
939            "progress": progress,
940            "message": message,
941        });
942
943        let mut request = self.client.post(&url).json(&body);
944        if let Some(ref token) = self.auth_token {
945            request = request.header("Authorization", format!("Bearer {}", token));
946        }
947
948        let response = request
949            .send()
950            .await
951            .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
952
953        if !response.status().is_success() {
954            return Err(RemoteMonitorError::ApiError(
955                response.status().as_u16(),
956                response.text().await.unwrap_or_default(),
957            ));
958        }
959
960        Ok(())
961    }
962
963    /// Report task step
964    pub async fn report_step(
965        &self,
966        task_id: &str,
967        step: u32,
968        total: u32,
969        description: Option<&str>,
970    ) -> Result<(), RemoteMonitorError> {
971        let url = format!("{}/api/v1/tasks/{}/step", self.server_url, task_id);
972
973        let body = serde_json::json!({
974            "step": step,
975            "total": total,
976            "description": description,
977        });
978
979        let mut request = self.client.post(&url).json(&body);
980        if let Some(ref token) = self.auth_token {
981            request = request.header("Authorization", format!("Bearer {}", token));
982        }
983
984        let response = request
985            .send()
986            .await
987            .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
988
989        if !response.status().is_success() {
990            return Err(RemoteMonitorError::ApiError(
991                response.status().as_u16(),
992                response.text().await.unwrap_or_default(),
993            ));
994        }
995
996        Ok(())
997    }
998
999    /// Report task completion
1000    pub async fn report_completed(
1001        &self,
1002        task_id: &str,
1003        result: TaskResult,
1004    ) -> Result<(), RemoteMonitorError> {
1005        let url = format!("{}/api/v1/tasks/{}/complete", self.server_url, task_id);
1006
1007        let mut request = self.client.post(&url).json(&result);
1008        if let Some(ref token) = self.auth_token {
1009            request = request.header("Authorization", format!("Bearer {}", token));
1010        }
1011
1012        let response = request
1013            .send()
1014            .await
1015            .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
1016
1017        if !response.status().is_success() {
1018            return Err(RemoteMonitorError::ApiError(
1019                response.status().as_u16(),
1020                response.text().await.unwrap_or_default(),
1021            ));
1022        }
1023
1024        Ok(())
1025    }
1026
1027    /// Report task failure
1028    pub async fn report_failed(
1029        &self,
1030        task_id: &str,
1031        error: &str,
1032    ) -> Result<(), RemoteMonitorError> {
1033        let url = format!("{}/api/v1/tasks/{}/fail", self.server_url, task_id);
1034
1035        let body = serde_json::json!({
1036            "error": error,
1037        });
1038
1039        let mut request = self.client.post(&url).json(&body);
1040        if let Some(ref token) = self.auth_token {
1041            request = request.header("Authorization", format!("Bearer {}", token));
1042        }
1043
1044        let response = request
1045            .send()
1046            .await
1047            .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
1048
1049        if !response.status().is_success() {
1050            return Err(RemoteMonitorError::ApiError(
1051                response.status().as_u16(),
1052                response.text().await.unwrap_or_default(),
1053            ));
1054        }
1055
1056        Ok(())
1057    }
1058
1059    /// Send log entry
1060    pub async fn send_log(
1061        &self,
1062        task_id: &str,
1063        level: LogLevel,
1064        message: &str,
1065    ) -> Result<(), RemoteMonitorError> {
1066        let url = format!("{}/api/v1/tasks/{}/log", self.server_url, task_id);
1067
1068        let body = serde_json::json!({
1069            "level": level,
1070            "message": message,
1071            "timestamp": Utc::now(),
1072        });
1073
1074        let mut request = self.client.post(&url).json(&body);
1075        if let Some(ref token) = self.auth_token {
1076            request = request.header("Authorization", format!("Bearer {}", token));
1077        }
1078
1079        let response = request
1080            .send()
1081            .await
1082            .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
1083
1084        if !response.status().is_success() {
1085            return Err(RemoteMonitorError::ApiError(
1086                response.status().as_u16(),
1087                response.text().await.unwrap_or_default(),
1088            ));
1089        }
1090
1091        Ok(())
1092    }
1093}
1094
1095// =============================================================================
1096// Task Builder
1097// =============================================================================
1098
1099/// Builder for creating remote tasks
1100pub struct RemoteTaskBuilder {
1101    task: RemoteTask,
1102}
1103
1104impl RemoteTaskBuilder {
1105    /// Create a new task builder
1106    pub fn new(
1107        id: impl Into<String>,
1108        node_id: impl Into<String>,
1109        agent_id: impl Into<String>,
1110    ) -> Self {
1111        Self {
1112            task: RemoteTask {
1113                id: id.into(),
1114                node_id: node_id.into(),
1115                agent_id: agent_id.into(),
1116                agent_name: String::new(),
1117                title: String::new(),
1118                description: None,
1119                status: RemoteTaskStatus::Queued,
1120                progress: 0.0,
1121                progress_message: None,
1122                current_step: None,
1123                total_steps: None,
1124                priority: TaskPriority::Normal,
1125                started_at: Utc::now(),
1126                completed_at: None,
1127                eta: None,
1128                result: None,
1129                error: None,
1130                resources: ResourceUsage::default(),
1131                logs: Vec::new(),
1132                metadata: HashMap::new(),
1133            },
1134        }
1135    }
1136
1137    /// Set agent name
1138    pub fn agent_name(mut self, name: impl Into<String>) -> Self {
1139        self.task.agent_name = name.into();
1140        self
1141    }
1142
1143    /// Set task title
1144    pub fn title(mut self, title: impl Into<String>) -> Self {
1145        self.task.title = title.into();
1146        self
1147    }
1148
1149    /// Set task description
1150    pub fn description(mut self, description: impl Into<String>) -> Self {
1151        self.task.description = Some(description.into());
1152        self
1153    }
1154
1155    /// Set task priority
1156    pub fn priority(mut self, priority: TaskPriority) -> Self {
1157        self.task.priority = priority;
1158        self
1159    }
1160
1161    /// Set total steps
1162    pub fn total_steps(mut self, steps: u32) -> Self {
1163        self.task.total_steps = Some(steps);
1164        self
1165    }
1166
1167    /// Set ETA
1168    pub fn eta(mut self, eta: DateTime<Utc>) -> Self {
1169        self.task.eta = Some(eta);
1170        self
1171    }
1172
1173    /// Add metadata
1174    pub fn metadata(mut self, key: impl Into<String>, value: impl Serialize) -> Self {
1175        if let Ok(v) = serde_json::to_value(value) {
1176            self.task.metadata.insert(key.into(), v);
1177        }
1178        self
1179    }
1180
1181    /// Build the task
1182    pub fn build(self) -> RemoteTask {
1183        self.task
1184    }
1185}
1186
1187// =============================================================================
1188// Error Types
1189// =============================================================================
1190
1191/// Remote monitor errors
1192#[derive(Debug, Clone)]
1193pub enum RemoteMonitorError {
1194    /// Node not found
1195    NodeNotFound(String),
1196    /// Task not found
1197    TaskNotFound(String),
1198    /// Network error
1199    Network(String),
1200    /// API error
1201    ApiError(u16, String),
1202    /// Authentication error
1203    AuthError(String),
1204    /// Invalid state
1205    InvalidState(String),
1206}
1207
1208impl std::fmt::Display for RemoteMonitorError {
1209    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1210        match self {
1211            RemoteMonitorError::NodeNotFound(id) => write!(f, "Node not found: {}", id),
1212            RemoteMonitorError::TaskNotFound(id) => write!(f, "Task not found: {}", id),
1213            RemoteMonitorError::Network(e) => write!(f, "Network error: {}", e),
1214            RemoteMonitorError::ApiError(code, msg) => write!(f, "API error {}: {}", code, msg),
1215            RemoteMonitorError::AuthError(e) => write!(f, "Authentication error: {}", e),
1216            RemoteMonitorError::InvalidState(e) => write!(f, "Invalid state: {}", e),
1217        }
1218    }
1219}
1220
1221impl std::error::Error for RemoteMonitorError {}
1222
1223// =============================================================================
1224// Helper Functions
1225// =============================================================================
1226
1227/// Generate a unique task ID
1228pub fn generate_task_id() -> String {
1229    use std::time::{SystemTime, UNIX_EPOCH};
1230    let timestamp = SystemTime::now()
1231        .duration_since(UNIX_EPOCH)
1232        .unwrap()
1233        .as_nanos();
1234    format!("task_{:x}", timestamp)
1235}
1236
1237/// Generate a unique node ID
1238pub fn generate_node_id() -> String {
1239    use std::time::{SystemTime, UNIX_EPOCH};
1240    let timestamp = SystemTime::now()
1241        .duration_since(UNIX_EPOCH)
1242        .unwrap()
1243        .as_nanos();
1244    format!("node_{:x}", timestamp)
1245}
1246
1247// =============================================================================
1248// Tests
1249// =============================================================================
1250
1251#[cfg(test)]
1252mod tests {
1253    use super::*;
1254
1255    #[tokio::test]
1256    async fn test_remote_monitor_creation() {
1257        let config = RemoteMonitorConfig::default();
1258        let monitor = RemoteMonitor::new(config);
1259
1260        let stats = monitor.get_stats().await;
1261        assert_eq!(stats.total_nodes, 0);
1262        assert_eq!(stats.total_tasks, 0);
1263    }
1264
1265    #[tokio::test]
1266    async fn test_node_registration() {
1267        let monitor = RemoteMonitor::new(RemoteMonitorConfig::default());
1268
1269        let node = RemoteNode {
1270            id: "test-node".to_string(),
1271            name: "Test Node".to_string(),
1272            address: "localhost:9876".to_string(),
1273            status: NodeStatus::Online,
1274            tags: vec!["test".to_string()],
1275            hardware: None,
1276            active_agents: 0,
1277            running_tasks: 0,
1278            last_heartbeat: Utc::now(),
1279            registered_at: Utc::now(),
1280            metadata: HashMap::new(),
1281        };
1282
1283        monitor.register_node(node).await.unwrap();
1284
1285        let retrieved = monitor.get_node("test-node").await;
1286        assert!(retrieved.is_some());
1287        assert_eq!(retrieved.unwrap().name, "Test Node");
1288    }
1289
1290    #[tokio::test]
1291    async fn test_task_creation() {
1292        let monitor = RemoteMonitor::new(RemoteMonitorConfig::default());
1293
1294        // First register a node
1295        let node = RemoteNode {
1296            id: "test-node".to_string(),
1297            name: "Test Node".to_string(),
1298            address: "localhost:9876".to_string(),
1299            status: NodeStatus::Online,
1300            tags: vec![],
1301            hardware: None,
1302            active_agents: 1,
1303            running_tasks: 0,
1304            last_heartbeat: Utc::now(),
1305            registered_at: Utc::now(),
1306            metadata: HashMap::new(),
1307        };
1308        monitor.register_node(node).await.unwrap();
1309
1310        // Create a task
1311        let task = RemoteTaskBuilder::new("task-1", "test-node", "agent-1")
1312            .agent_name("Test Agent")
1313            .title("Test Task")
1314            .description("A test task")
1315            .priority(TaskPriority::High)
1316            .build();
1317
1318        let task_id = monitor.create_task(task).await.unwrap();
1319        assert_eq!(task_id, "task-1");
1320
1321        let retrieved = monitor.get_task("task-1").await;
1322        assert!(retrieved.is_some());
1323        assert_eq!(retrieved.unwrap().title, "Test Task");
1324    }
1325
1326    #[tokio::test]
1327    async fn test_task_progress() {
1328        let monitor = RemoteMonitor::new(RemoteMonitorConfig::default());
1329
1330        // Register node
1331        let node = RemoteNode {
1332            id: "node-1".to_string(),
1333            name: "Node 1".to_string(),
1334            address: "localhost:9876".to_string(),
1335            status: NodeStatus::Online,
1336            tags: vec![],
1337            hardware: None,
1338            active_agents: 1,
1339            running_tasks: 0,
1340            last_heartbeat: Utc::now(),
1341            registered_at: Utc::now(),
1342            metadata: HashMap::new(),
1343        };
1344        monitor.register_node(node).await.unwrap();
1345
1346        // Create task
1347        let task = RemoteTaskBuilder::new("task-1", "node-1", "agent-1")
1348            .title("Progress Test")
1349            .build();
1350        monitor.create_task(task).await.unwrap();
1351
1352        // Update progress
1353        monitor
1354            .update_task_progress("task-1", 0.5, Some("Halfway done".to_string()))
1355            .await
1356            .unwrap();
1357
1358        let task = monitor.get_task("task-1").await.unwrap();
1359        assert!((task.progress - 0.5).abs() < 0.01);
1360        assert_eq!(task.progress_message, Some("Halfway done".to_string()));
1361    }
1362
1363    #[test]
1364    fn test_task_builder() {
1365        let task = RemoteTaskBuilder::new("task-123", "node-1", "agent-1")
1366            .agent_name("My Agent")
1367            .title("Important Task")
1368            .description("Does important things")
1369            .priority(TaskPriority::Critical)
1370            .total_steps(5)
1371            .build();
1372
1373        assert_eq!(task.id, "task-123");
1374        assert_eq!(task.node_id, "node-1");
1375        assert_eq!(task.agent_id, "agent-1");
1376        assert_eq!(task.agent_name, "My Agent");
1377        assert_eq!(task.title, "Important Task");
1378        assert_eq!(task.priority, TaskPriority::Critical);
1379        assert_eq!(task.total_steps, Some(5));
1380        assert_eq!(task.status, RemoteTaskStatus::Queued);
1381    }
1382}