chasm_cli/agency/
remote.rs

1// Copyright (c) 2024-2026 Nervosys LLC
2// SPDX-License-Identifier: Apache-2.0
3//! Remote Agent Monitoring System
4//!
5//! Enables monitoring of agent task progress across distributed machines.
6//!
7//! ## Features
8//!
9//! - **Real-time Progress**: WebSocket-based live updates
10//! - **Task Tracking**: Monitor task status, progress, and metrics
11//! - **Multi-machine**: Track agents across multiple remote hosts
12//! - **Heartbeat**: Automatic health monitoring with configurable intervals
13
14#![allow(dead_code)]
15//! - **Event Streaming**: Subscribe to specific agent or task events
16
17use chrono::{DateTime, Utc};
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::sync::Arc;
21use tokio::sync::{broadcast, RwLock};
22
23// =============================================================================
24// Core Types
25// =============================================================================
26
27/// Unique identifier for remote nodes
28pub type NodeId = String;
29
30/// Unique identifier for remote tasks
31pub type RemoteTaskId = String;
32
33/// Remote node representing a machine running agents
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct RemoteNode {
36    /// Unique node identifier
37    pub id: NodeId,
38    /// Human-readable name
39    pub name: String,
40    /// Node address (hostname:port or IP:port)
41    pub address: String,
42    /// Node status
43    pub status: NodeStatus,
44    /// Node capabilities/tags
45    pub tags: Vec<String>,
46    /// Hardware info
47    pub hardware: Option<HardwareInfo>,
48    /// Number of active agents
49    pub active_agents: u32,
50    /// Number of running tasks
51    pub running_tasks: u32,
52    /// Last heartbeat received
53    pub last_heartbeat: DateTime<Utc>,
54    /// Node registered at
55    pub registered_at: DateTime<Utc>,
56    /// Custom metadata
57    pub metadata: HashMap<String, serde_json::Value>,
58}
59
60/// Node status
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
62#[serde(rename_all = "snake_case")]
63#[derive(Default)]
64pub enum NodeStatus {
65    /// Node is online and healthy
66    Online,
67    /// Node is online but degraded (high load, etc.)
68    Degraded,
69    /// Node is offline or unreachable
70    Offline,
71    /// Node is in maintenance mode
72    Maintenance,
73    /// Node status is unknown
74    #[default]
75    Unknown,
76}
77
78/// Hardware information for a node
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct HardwareInfo {
81    /// CPU cores
82    pub cpu_cores: u32,
83    /// Total RAM in bytes
84    pub ram_total: u64,
85    /// Available RAM in bytes
86    pub ram_available: u64,
87    /// GPU information
88    pub gpus: Vec<GpuInfo>,
89    /// Operating system
90    pub os: String,
91    /// Architecture (x86_64, arm64, etc.)
92    pub arch: String,
93}
94
95/// GPU information
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct GpuInfo {
98    /// GPU name/model
99    pub name: String,
100    /// VRAM in bytes
101    pub vram: u64,
102    /// CUDA version (if NVIDIA)
103    pub cuda_version: Option<String>,
104}
105
106// =============================================================================
107// Remote Task
108// =============================================================================
109
110/// A task running on a remote node
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct RemoteTask {
113    /// Unique task identifier
114    pub id: RemoteTaskId,
115    /// Node this task is running on
116    pub node_id: NodeId,
117    /// Agent executing this task
118    pub agent_id: String,
119    /// Agent name
120    pub agent_name: String,
121    /// Task title/description
122    pub title: String,
123    /// Detailed description
124    pub description: Option<String>,
125    /// Task status
126    pub status: RemoteTaskStatus,
127    /// Progress (0.0 - 1.0)
128    pub progress: f32,
129    /// Progress message
130    pub progress_message: Option<String>,
131    /// Current step (for multi-step tasks)
132    pub current_step: Option<u32>,
133    /// Total steps
134    pub total_steps: Option<u32>,
135    /// Task priority
136    pub priority: TaskPriority,
137    /// Task started at
138    pub started_at: DateTime<Utc>,
139    /// Task completed at
140    pub completed_at: Option<DateTime<Utc>>,
141    /// Estimated completion time
142    pub eta: Option<DateTime<Utc>>,
143    /// Task result (if completed)
144    pub result: Option<TaskResult>,
145    /// Error message (if failed)
146    pub error: Option<String>,
147    /// Resource usage
148    pub resources: ResourceUsage,
149    /// Task logs (recent entries)
150    pub logs: Vec<TaskLogEntry>,
151    /// Custom metadata
152    pub metadata: HashMap<String, serde_json::Value>,
153}
154
155/// Remote task status
156#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
157#[serde(rename_all = "snake_case")]
158pub enum RemoteTaskStatus {
159    /// Task is queued waiting to start
160    Queued,
161    /// Task is starting up
162    Starting,
163    /// Task is actively running
164    Running,
165    /// Task is paused
166    Paused,
167    /// Task completed successfully
168    Completed,
169    /// Task failed with error
170    Failed,
171    /// Task was cancelled
172    Cancelled,
173    /// Task timed out
174    TimedOut,
175}
176
177/// Task priority levels
178#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
179#[serde(rename_all = "snake_case")]
180#[derive(Default)]
181pub enum TaskPriority {
182    Low = 0,
183    #[default]
184    Normal = 1,
185    High = 2,
186    Critical = 3,
187}
188
189/// Task result
190#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct TaskResult {
192    /// Success flag
193    pub success: bool,
194    /// Output data
195    pub output: Option<serde_json::Value>,
196    /// Artifacts produced (file paths, URLs, etc.)
197    pub artifacts: Vec<TaskArtifact>,
198    /// Metrics collected during execution
199    pub metrics: TaskMetrics,
200}
201
202/// Task artifact (output files, etc.)
203#[derive(Debug, Clone, Serialize, Deserialize)]
204pub struct TaskArtifact {
205    /// Artifact name
206    pub name: String,
207    /// Artifact type
208    pub artifact_type: ArtifactType,
209    /// Location (path or URL)
210    pub location: String,
211    /// Size in bytes
212    pub size: Option<u64>,
213    /// Checksum (SHA256)
214    pub checksum: Option<String>,
215}
216
217/// Artifact types
218#[derive(Debug, Clone, Serialize, Deserialize)]
219#[serde(rename_all = "snake_case")]
220pub enum ArtifactType {
221    File,
222    Directory,
223    Url,
224    Database,
225    Model,
226    Report,
227    Log,
228    Custom(String),
229}
230
231/// Task execution metrics
232#[derive(Debug, Clone, Default, Serialize, Deserialize)]
233pub struct TaskMetrics {
234    /// Total execution time in milliseconds
235    pub duration_ms: u64,
236    /// Tokens used (for LLM tasks)
237    pub tokens_used: Option<u64>,
238    /// API calls made
239    pub api_calls: u32,
240    /// Files processed
241    pub files_processed: u32,
242    /// Errors encountered (but recovered)
243    pub errors_recovered: u32,
244    /// Retries performed
245    pub retries: u32,
246}
247
248/// Resource usage during task execution
249#[derive(Debug, Clone, Default, Serialize, Deserialize)]
250pub struct ResourceUsage {
251    /// CPU usage percentage (0-100)
252    pub cpu_percent: f32,
253    /// Memory usage in bytes
254    pub memory_bytes: u64,
255    /// GPU memory usage in bytes (if applicable)
256    pub gpu_memory_bytes: Option<u64>,
257    /// Network bytes sent
258    pub network_tx_bytes: u64,
259    /// Network bytes received
260    pub network_rx_bytes: u64,
261    /// Disk read bytes
262    pub disk_read_bytes: u64,
263    /// Disk write bytes
264    pub disk_write_bytes: u64,
265}
266
267/// Task log entry
268#[derive(Debug, Clone, Serialize, Deserialize)]
269pub struct TaskLogEntry {
270    /// Log timestamp
271    pub timestamp: DateTime<Utc>,
272    /// Log level
273    pub level: LogLevel,
274    /// Log message
275    pub message: String,
276    /// Optional structured data
277    pub data: Option<serde_json::Value>,
278}
279
280/// Log levels
281#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
282#[serde(rename_all = "snake_case")]
283pub enum LogLevel {
284    Trace,
285    Debug,
286    Info,
287    Warn,
288    Error,
289}
290
291// =============================================================================
292// Events
293// =============================================================================
294
295/// Events emitted by the remote monitoring system
296#[derive(Debug, Clone, Serialize, Deserialize)]
297#[serde(tag = "type", content = "data")]
298pub enum RemoteEvent {
299    /// Node came online
300    NodeOnline(NodeId),
301    /// Node went offline
302    NodeOffline(NodeId),
303    /// Node status changed
304    NodeStatusChanged {
305        node_id: NodeId,
306        old_status: NodeStatus,
307        new_status: NodeStatus,
308    },
309    /// Node heartbeat received
310    NodeHeartbeat {
311        node_id: NodeId,
312        timestamp: DateTime<Utc>,
313    },
314
315    /// Task was created/queued
316    TaskCreated(Box<RemoteTask>),
317    /// Task started running
318    TaskStarted {
319        task_id: RemoteTaskId,
320        node_id: NodeId,
321    },
322    /// Task progress updated
323    TaskProgress {
324        task_id: RemoteTaskId,
325        progress: f32,
326        message: Option<String>,
327    },
328    /// Task step completed
329    TaskStepCompleted {
330        task_id: RemoteTaskId,
331        step: u32,
332        total: u32,
333        description: Option<String>,
334    },
335    /// Task completed successfully
336    TaskCompleted {
337        task_id: RemoteTaskId,
338        result: TaskResult,
339    },
340    /// Task failed
341    TaskFailed {
342        task_id: RemoteTaskId,
343        error: String,
344    },
345    /// Task was cancelled
346    TaskCancelled {
347        task_id: RemoteTaskId,
348        reason: Option<String>,
349    },
350    /// Task log entry added
351    TaskLog {
352        task_id: RemoteTaskId,
353        entry: TaskLogEntry,
354    },
355
356    /// Agent registered on node
357    AgentRegistered {
358        node_id: NodeId,
359        agent_id: String,
360        agent_name: String,
361    },
362    /// Agent unregistered from node
363    AgentUnregistered { node_id: NodeId, agent_id: String },
364}
365
366// =============================================================================
367// Remote Monitor
368// =============================================================================
369
370/// Configuration for the remote monitor
371#[derive(Debug, Clone, Serialize, Deserialize)]
372pub struct RemoteMonitorConfig {
373    /// Server bind address
374    pub bind_address: String,
375    /// Server port
376    pub port: u16,
377    /// Enable TLS
378    pub tls_enabled: bool,
379    /// TLS certificate path
380    pub tls_cert_path: Option<String>,
381    /// TLS key path
382    pub tls_key_path: Option<String>,
383    /// Authentication token (for API access)
384    pub auth_token: Option<String>,
385    /// Heartbeat interval in seconds
386    pub heartbeat_interval_secs: u64,
387    /// Node timeout in seconds (mark offline after this)
388    pub node_timeout_secs: u64,
389    /// Maximum log entries per task
390    pub max_log_entries: usize,
391    /// Enable metrics collection
392    pub metrics_enabled: bool,
393}
394
395impl Default for RemoteMonitorConfig {
396    fn default() -> Self {
397        Self {
398            bind_address: "0.0.0.0".to_string(),
399            port: 9876,
400            tls_enabled: false,
401            tls_cert_path: None,
402            tls_key_path: None,
403            auth_token: None,
404            heartbeat_interval_secs: 30,
405            node_timeout_secs: 90,
406            max_log_entries: 1000,
407            metrics_enabled: true,
408        }
409    }
410}
411
412/// Remote monitoring server
413pub struct RemoteMonitor {
414    config: RemoteMonitorConfig,
415    nodes: Arc<RwLock<HashMap<NodeId, RemoteNode>>>,
416    tasks: Arc<RwLock<HashMap<RemoteTaskId, RemoteTask>>>,
417    event_tx: broadcast::Sender<RemoteEvent>,
418}
419
420impl RemoteMonitor {
421    /// Create a new remote monitor
422    pub fn new(config: RemoteMonitorConfig) -> Self {
423        let (event_tx, _) = broadcast::channel(1000);
424
425        Self {
426            config,
427            nodes: Arc::new(RwLock::new(HashMap::new())),
428            tasks: Arc::new(RwLock::new(HashMap::new())),
429            event_tx,
430        }
431    }
432
433    /// Subscribe to events
434    pub fn subscribe(&self) -> broadcast::Receiver<RemoteEvent> {
435        self.event_tx.subscribe()
436    }
437
438    /// Register a new node
439    pub async fn register_node(&self, node: RemoteNode) -> Result<(), RemoteMonitorError> {
440        let node_id = node.id.clone();
441        let mut nodes = self.nodes.write().await;
442        nodes.insert(node_id.clone(), node);
443
444        let _ = self.event_tx.send(RemoteEvent::NodeOnline(node_id));
445        Ok(())
446    }
447
448    /// Unregister a node
449    pub async fn unregister_node(&self, node_id: &str) -> Result<bool, RemoteMonitorError> {
450        let mut nodes = self.nodes.write().await;
451        let removed = nodes.remove(node_id).is_some();
452
453        if removed {
454            let _ = self
455                .event_tx
456                .send(RemoteEvent::NodeOffline(node_id.to_string()));
457        }
458
459        Ok(removed)
460    }
461
462    /// Update node heartbeat
463    pub async fn heartbeat(&self, node_id: &str) -> Result<(), RemoteMonitorError> {
464        let mut nodes = self.nodes.write().await;
465
466        if let Some(node) = nodes.get_mut(node_id) {
467            node.last_heartbeat = Utc::now();
468            if node.status == NodeStatus::Offline || node.status == NodeStatus::Unknown {
469                let old_status = node.status;
470                node.status = NodeStatus::Online;
471                let _ = self.event_tx.send(RemoteEvent::NodeStatusChanged {
472                    node_id: node_id.to_string(),
473                    old_status,
474                    new_status: NodeStatus::Online,
475                });
476            }
477            let _ = self.event_tx.send(RemoteEvent::NodeHeartbeat {
478                node_id: node_id.to_string(),
479                timestamp: node.last_heartbeat,
480            });
481            Ok(())
482        } else {
483            Err(RemoteMonitorError::NodeNotFound(node_id.to_string()))
484        }
485    }
486
487    /// Get node by ID
488    pub async fn get_node(&self, node_id: &str) -> Option<RemoteNode> {
489        let nodes = self.nodes.read().await;
490        nodes.get(node_id).cloned()
491    }
492
493    /// List all nodes
494    pub async fn list_nodes(&self) -> Vec<RemoteNode> {
495        let nodes = self.nodes.read().await;
496        nodes.values().cloned().collect()
497    }
498
499    /// List nodes by status
500    pub async fn list_nodes_by_status(&self, status: NodeStatus) -> Vec<RemoteNode> {
501        let nodes = self.nodes.read().await;
502        nodes
503            .values()
504            .filter(|n| n.status == status)
505            .cloned()
506            .collect()
507    }
508
509    /// Create a new task
510    pub async fn create_task(&self, task: RemoteTask) -> Result<RemoteTaskId, RemoteMonitorError> {
511        // Verify node exists
512        {
513            let nodes = self.nodes.read().await;
514            if !nodes.contains_key(&task.node_id) {
515                return Err(RemoteMonitorError::NodeNotFound(task.node_id.clone()));
516            }
517        }
518
519        let task_id = task.id.clone();
520        let mut tasks = self.tasks.write().await;
521        tasks.insert(task_id.clone(), task.clone());
522
523        let _ = self.event_tx.send(RemoteEvent::TaskCreated(Box::new(task)));
524        Ok(task_id)
525    }
526
527    /// Update task status
528    pub async fn update_task_status(
529        &self,
530        task_id: &str,
531        status: RemoteTaskStatus,
532    ) -> Result<(), RemoteMonitorError> {
533        let mut tasks = self.tasks.write().await;
534
535        if let Some(task) = tasks.get_mut(task_id) {
536            let old_status = task.status;
537            task.status = status;
538
539            match status {
540                RemoteTaskStatus::Running if old_status != RemoteTaskStatus::Running => {
541                    let _ = self.event_tx.send(RemoteEvent::TaskStarted {
542                        task_id: task_id.to_string(),
543                        node_id: task.node_id.clone(),
544                    });
545                }
546                RemoteTaskStatus::Completed => {
547                    task.completed_at = Some(Utc::now());
548                    task.progress = 1.0;
549                }
550                RemoteTaskStatus::Failed
551                | RemoteTaskStatus::Cancelled
552                | RemoteTaskStatus::TimedOut => {
553                    task.completed_at = Some(Utc::now());
554                }
555                _ => {}
556            }
557
558            Ok(())
559        } else {
560            Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
561        }
562    }
563
564    /// Update task progress
565    pub async fn update_task_progress(
566        &self,
567        task_id: &str,
568        progress: f32,
569        message: Option<String>,
570    ) -> Result<(), RemoteMonitorError> {
571        let mut tasks = self.tasks.write().await;
572
573        if let Some(task) = tasks.get_mut(task_id) {
574            task.progress = progress.clamp(0.0, 1.0);
575            task.progress_message = message.clone();
576
577            let _ = self.event_tx.send(RemoteEvent::TaskProgress {
578                task_id: task_id.to_string(),
579                progress: task.progress,
580                message,
581            });
582
583            Ok(())
584        } else {
585            Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
586        }
587    }
588
589    /// Update task step
590    pub async fn update_task_step(
591        &self,
592        task_id: &str,
593        step: u32,
594        total: u32,
595        description: Option<String>,
596    ) -> Result<(), RemoteMonitorError> {
597        let mut tasks = self.tasks.write().await;
598
599        if let Some(task) = tasks.get_mut(task_id) {
600            task.current_step = Some(step);
601            task.total_steps = Some(total);
602            task.progress = step as f32 / total as f32;
603
604            let _ = self.event_tx.send(RemoteEvent::TaskStepCompleted {
605                task_id: task_id.to_string(),
606                step,
607                total,
608                description,
609            });
610
611            Ok(())
612        } else {
613            Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
614        }
615    }
616
617    /// Complete a task
618    pub async fn complete_task(
619        &self,
620        task_id: &str,
621        result: TaskResult,
622    ) -> Result<(), RemoteMonitorError> {
623        let node_id = {
624            let mut tasks = self.tasks.write().await;
625
626            if let Some(task) = tasks.get_mut(task_id) {
627                task.status = RemoteTaskStatus::Completed;
628                task.completed_at = Some(Utc::now());
629                task.progress = 1.0;
630                task.result = Some(result.clone());
631                task.node_id.clone()
632            } else {
633                return Err(RemoteMonitorError::TaskNotFound(task_id.to_string()));
634            }
635        };
636
637        // Update node task count
638        let mut nodes = self.nodes.write().await;
639        if let Some(node) = nodes.get_mut(&node_id) {
640            node.running_tasks = node.running_tasks.saturating_sub(1);
641        }
642
643        let _ = self.event_tx.send(RemoteEvent::TaskCompleted {
644            task_id: task_id.to_string(),
645            result,
646        });
647
648        Ok(())
649    }
650
651    /// Fail a task
652    pub async fn fail_task(&self, task_id: &str, error: String) -> Result<(), RemoteMonitorError> {
653        let node_id = {
654            let mut tasks = self.tasks.write().await;
655
656            if let Some(task) = tasks.get_mut(task_id) {
657                task.status = RemoteTaskStatus::Failed;
658                task.completed_at = Some(Utc::now());
659                task.error = Some(error.clone());
660                task.node_id.clone()
661            } else {
662                return Err(RemoteMonitorError::TaskNotFound(task_id.to_string()));
663            }
664        };
665
666        // Update node task count
667        let mut nodes = self.nodes.write().await;
668        if let Some(node) = nodes.get_mut(&node_id) {
669            node.running_tasks = node.running_tasks.saturating_sub(1);
670        }
671
672        let _ = self.event_tx.send(RemoteEvent::TaskFailed {
673            task_id: task_id.to_string(),
674            error,
675        });
676
677        Ok(())
678    }
679
680    /// Cancel a task
681    pub async fn cancel_task(
682        &self,
683        task_id: &str,
684        reason: Option<String>,
685    ) -> Result<(), RemoteMonitorError> {
686        let node_id = {
687            let mut tasks = self.tasks.write().await;
688
689            if let Some(task) = tasks.get_mut(task_id) {
690                task.status = RemoteTaskStatus::Cancelled;
691                task.completed_at = Some(Utc::now());
692                task.node_id.clone()
693            } else {
694                return Err(RemoteMonitorError::TaskNotFound(task_id.to_string()));
695            }
696        };
697
698        // Update node task count
699        let mut nodes = self.nodes.write().await;
700        if let Some(node) = nodes.get_mut(&node_id) {
701            node.running_tasks = node.running_tasks.saturating_sub(1);
702        }
703
704        let _ = self.event_tx.send(RemoteEvent::TaskCancelled {
705            task_id: task_id.to_string(),
706            reason,
707        });
708
709        Ok(())
710    }
711
712    /// Add log entry to task
713    pub async fn add_task_log(
714        &self,
715        task_id: &str,
716        entry: TaskLogEntry,
717    ) -> Result<(), RemoteMonitorError> {
718        let mut tasks = self.tasks.write().await;
719
720        if let Some(task) = tasks.get_mut(task_id) {
721            task.logs.push(entry.clone());
722
723            // Trim logs if over limit
724            if task.logs.len() > self.config.max_log_entries {
725                let drain_count = task.logs.len() - self.config.max_log_entries;
726                task.logs.drain(0..drain_count);
727            }
728
729            let _ = self.event_tx.send(RemoteEvent::TaskLog {
730                task_id: task_id.to_string(),
731                entry,
732            });
733
734            Ok(())
735        } else {
736            Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
737        }
738    }
739
740    /// Get task by ID
741    pub async fn get_task(&self, task_id: &str) -> Option<RemoteTask> {
742        let tasks = self.tasks.read().await;
743        tasks.get(task_id).cloned()
744    }
745
746    /// List all tasks
747    pub async fn list_tasks(&self) -> Vec<RemoteTask> {
748        let tasks = self.tasks.read().await;
749        tasks.values().cloned().collect()
750    }
751
752    /// List tasks by node
753    pub async fn list_tasks_by_node(&self, node_id: &str) -> Vec<RemoteTask> {
754        let tasks = self.tasks.read().await;
755        tasks
756            .values()
757            .filter(|t| t.node_id == node_id)
758            .cloned()
759            .collect()
760    }
761
762    /// List tasks by status
763    pub async fn list_tasks_by_status(&self, status: RemoteTaskStatus) -> Vec<RemoteTask> {
764        let tasks = self.tasks.read().await;
765        tasks
766            .values()
767            .filter(|t| t.status == status)
768            .cloned()
769            .collect()
770    }
771
772    /// List tasks by agent
773    pub async fn list_tasks_by_agent(&self, agent_id: &str) -> Vec<RemoteTask> {
774        let tasks = self.tasks.read().await;
775        tasks
776            .values()
777            .filter(|t| t.agent_id == agent_id)
778            .cloned()
779            .collect()
780    }
781
782    /// Get monitoring statistics
783    pub async fn get_stats(&self) -> MonitorStats {
784        let nodes = self.nodes.read().await;
785        let tasks = self.tasks.read().await;
786
787        let online_nodes = nodes
788            .values()
789            .filter(|n| n.status == NodeStatus::Online)
790            .count();
791        let total_agents: u32 = nodes.values().map(|n| n.active_agents).sum();
792
793        let running_tasks = tasks
794            .values()
795            .filter(|t| t.status == RemoteTaskStatus::Running)
796            .count();
797        let queued_tasks = tasks
798            .values()
799            .filter(|t| t.status == RemoteTaskStatus::Queued)
800            .count();
801        let completed_tasks = tasks
802            .values()
803            .filter(|t| t.status == RemoteTaskStatus::Completed)
804            .count();
805        let failed_tasks = tasks
806            .values()
807            .filter(|t| t.status == RemoteTaskStatus::Failed)
808            .count();
809
810        MonitorStats {
811            total_nodes: nodes.len(),
812            online_nodes,
813            total_agents: total_agents as usize,
814            total_tasks: tasks.len(),
815            running_tasks,
816            queued_tasks,
817            completed_tasks,
818            failed_tasks,
819        }
820    }
821
822    /// Check for timed out nodes and update their status
823    pub async fn check_node_timeouts(&self) {
824        let timeout = chrono::Duration::seconds(self.config.node_timeout_secs as i64);
825        let now = Utc::now();
826
827        let mut nodes = self.nodes.write().await;
828        for node in nodes.values_mut() {
829            if node.status == NodeStatus::Online && now - node.last_heartbeat > timeout {
830                let old_status = node.status;
831                node.status = NodeStatus::Offline;
832                let _ = self.event_tx.send(RemoteEvent::NodeStatusChanged {
833                    node_id: node.id.clone(),
834                    old_status,
835                    new_status: NodeStatus::Offline,
836                });
837            }
838        }
839    }
840
841    /// Get configuration
842    pub fn config(&self) -> &RemoteMonitorConfig {
843        &self.config
844    }
845}
846
847/// Monitor statistics
848#[derive(Debug, Clone, Serialize, Deserialize)]
849pub struct MonitorStats {
850    pub total_nodes: usize,
851    pub online_nodes: usize,
852    pub total_agents: usize,
853    pub total_tasks: usize,
854    pub running_tasks: usize,
855    pub queued_tasks: usize,
856    pub completed_tasks: usize,
857    pub failed_tasks: usize,
858}
859
860// =============================================================================
861// Remote Agent Client
862// =============================================================================
863
864/// Client for remote agents to report progress
865pub struct RemoteAgentClient {
866    /// Node ID this client represents
867    pub node_id: NodeId,
868    /// Server URL
869    server_url: String,
870    /// Authentication token
871    auth_token: Option<String>,
872    /// HTTP client
873    #[allow(dead_code)]
874    client: reqwest::Client,
875}
876
877impl RemoteAgentClient {
878    /// Create a new remote agent client
879    pub fn new(node_id: impl Into<String>, server_url: impl Into<String>) -> Self {
880        Self {
881            node_id: node_id.into(),
882            server_url: server_url.into(),
883            auth_token: None,
884            client: reqwest::Client::new(),
885        }
886    }
887
888    /// Set authentication token
889    pub fn with_auth(mut self, token: impl Into<String>) -> Self {
890        self.auth_token = Some(token.into());
891        self
892    }
893
894    /// Send heartbeat
895    pub async fn heartbeat(&self) -> Result<(), RemoteMonitorError> {
896        let url = format!(
897            "{}/api/v1/nodes/{}/heartbeat",
898            self.server_url, self.node_id
899        );
900
901        let mut request = self.client.post(&url);
902        if let Some(ref token) = self.auth_token {
903            request = request.header("Authorization", format!("Bearer {}", token));
904        }
905
906        let response = request
907            .send()
908            .await
909            .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
910
911        if !response.status().is_success() {
912            return Err(RemoteMonitorError::ApiError(
913                response.status().as_u16(),
914                response.text().await.unwrap_or_default(),
915            ));
916        }
917
918        Ok(())
919    }
920
921    /// Report task progress
922    pub async fn report_progress(
923        &self,
924        task_id: &str,
925        progress: f32,
926        message: Option<&str>,
927    ) -> Result<(), RemoteMonitorError> {
928        let url = format!("{}/api/v1/tasks/{}/progress", self.server_url, task_id);
929
930        let body = serde_json::json!({
931            "progress": progress,
932            "message": message,
933        });
934
935        let mut request = self.client.post(&url).json(&body);
936        if let Some(ref token) = self.auth_token {
937            request = request.header("Authorization", format!("Bearer {}", token));
938        }
939
940        let response = request
941            .send()
942            .await
943            .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
944
945        if !response.status().is_success() {
946            return Err(RemoteMonitorError::ApiError(
947                response.status().as_u16(),
948                response.text().await.unwrap_or_default(),
949            ));
950        }
951
952        Ok(())
953    }
954
955    /// Report task step
956    pub async fn report_step(
957        &self,
958        task_id: &str,
959        step: u32,
960        total: u32,
961        description: Option<&str>,
962    ) -> Result<(), RemoteMonitorError> {
963        let url = format!("{}/api/v1/tasks/{}/step", self.server_url, task_id);
964
965        let body = serde_json::json!({
966            "step": step,
967            "total": total,
968            "description": description,
969        });
970
971        let mut request = self.client.post(&url).json(&body);
972        if let Some(ref token) = self.auth_token {
973            request = request.header("Authorization", format!("Bearer {}", token));
974        }
975
976        let response = request
977            .send()
978            .await
979            .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
980
981        if !response.status().is_success() {
982            return Err(RemoteMonitorError::ApiError(
983                response.status().as_u16(),
984                response.text().await.unwrap_or_default(),
985            ));
986        }
987
988        Ok(())
989    }
990
991    /// Report task completion
992    pub async fn report_completed(
993        &self,
994        task_id: &str,
995        result: TaskResult,
996    ) -> Result<(), RemoteMonitorError> {
997        let url = format!("{}/api/v1/tasks/{}/complete", self.server_url, task_id);
998
999        let mut request = self.client.post(&url).json(&result);
1000        if let Some(ref token) = self.auth_token {
1001            request = request.header("Authorization", format!("Bearer {}", token));
1002        }
1003
1004        let response = request
1005            .send()
1006            .await
1007            .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
1008
1009        if !response.status().is_success() {
1010            return Err(RemoteMonitorError::ApiError(
1011                response.status().as_u16(),
1012                response.text().await.unwrap_or_default(),
1013            ));
1014        }
1015
1016        Ok(())
1017    }
1018
1019    /// Report task failure
1020    pub async fn report_failed(
1021        &self,
1022        task_id: &str,
1023        error: &str,
1024    ) -> Result<(), RemoteMonitorError> {
1025        let url = format!("{}/api/v1/tasks/{}/fail", self.server_url, task_id);
1026
1027        let body = serde_json::json!({
1028            "error": error,
1029        });
1030
1031        let mut request = self.client.post(&url).json(&body);
1032        if let Some(ref token) = self.auth_token {
1033            request = request.header("Authorization", format!("Bearer {}", token));
1034        }
1035
1036        let response = request
1037            .send()
1038            .await
1039            .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
1040
1041        if !response.status().is_success() {
1042            return Err(RemoteMonitorError::ApiError(
1043                response.status().as_u16(),
1044                response.text().await.unwrap_or_default(),
1045            ));
1046        }
1047
1048        Ok(())
1049    }
1050
1051    /// Send log entry
1052    pub async fn send_log(
1053        &self,
1054        task_id: &str,
1055        level: LogLevel,
1056        message: &str,
1057    ) -> Result<(), RemoteMonitorError> {
1058        let url = format!("{}/api/v1/tasks/{}/log", self.server_url, task_id);
1059
1060        let body = serde_json::json!({
1061            "level": level,
1062            "message": message,
1063            "timestamp": Utc::now(),
1064        });
1065
1066        let mut request = self.client.post(&url).json(&body);
1067        if let Some(ref token) = self.auth_token {
1068            request = request.header("Authorization", format!("Bearer {}", token));
1069        }
1070
1071        let response = request
1072            .send()
1073            .await
1074            .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
1075
1076        if !response.status().is_success() {
1077            return Err(RemoteMonitorError::ApiError(
1078                response.status().as_u16(),
1079                response.text().await.unwrap_or_default(),
1080            ));
1081        }
1082
1083        Ok(())
1084    }
1085}
1086
1087// =============================================================================
1088// Task Builder
1089// =============================================================================
1090
1091/// Builder for creating remote tasks
1092pub struct RemoteTaskBuilder {
1093    task: RemoteTask,
1094}
1095
1096impl RemoteTaskBuilder {
1097    /// Create a new task builder
1098    pub fn new(
1099        id: impl Into<String>,
1100        node_id: impl Into<String>,
1101        agent_id: impl Into<String>,
1102    ) -> Self {
1103        Self {
1104            task: RemoteTask {
1105                id: id.into(),
1106                node_id: node_id.into(),
1107                agent_id: agent_id.into(),
1108                agent_name: String::new(),
1109                title: String::new(),
1110                description: None,
1111                status: RemoteTaskStatus::Queued,
1112                progress: 0.0,
1113                progress_message: None,
1114                current_step: None,
1115                total_steps: None,
1116                priority: TaskPriority::Normal,
1117                started_at: Utc::now(),
1118                completed_at: None,
1119                eta: None,
1120                result: None,
1121                error: None,
1122                resources: ResourceUsage::default(),
1123                logs: Vec::new(),
1124                metadata: HashMap::new(),
1125            },
1126        }
1127    }
1128
1129    /// Set agent name
1130    pub fn agent_name(mut self, name: impl Into<String>) -> Self {
1131        self.task.agent_name = name.into();
1132        self
1133    }
1134
1135    /// Set task title
1136    pub fn title(mut self, title: impl Into<String>) -> Self {
1137        self.task.title = title.into();
1138        self
1139    }
1140
1141    /// Set task description
1142    pub fn description(mut self, description: impl Into<String>) -> Self {
1143        self.task.description = Some(description.into());
1144        self
1145    }
1146
1147    /// Set task priority
1148    pub fn priority(mut self, priority: TaskPriority) -> Self {
1149        self.task.priority = priority;
1150        self
1151    }
1152
1153    /// Set total steps
1154    pub fn total_steps(mut self, steps: u32) -> Self {
1155        self.task.total_steps = Some(steps);
1156        self
1157    }
1158
1159    /// Set ETA
1160    pub fn eta(mut self, eta: DateTime<Utc>) -> Self {
1161        self.task.eta = Some(eta);
1162        self
1163    }
1164
1165    /// Add metadata
1166    pub fn metadata(mut self, key: impl Into<String>, value: impl Serialize) -> Self {
1167        if let Ok(v) = serde_json::to_value(value) {
1168            self.task.metadata.insert(key.into(), v);
1169        }
1170        self
1171    }
1172
1173    /// Build the task
1174    pub fn build(self) -> RemoteTask {
1175        self.task
1176    }
1177}
1178
1179// =============================================================================
1180// Error Types
1181// =============================================================================
1182
1183/// Remote monitor errors
1184#[derive(Debug, Clone)]
1185pub enum RemoteMonitorError {
1186    /// Node not found
1187    NodeNotFound(String),
1188    /// Task not found
1189    TaskNotFound(String),
1190    /// Network error
1191    Network(String),
1192    /// API error
1193    ApiError(u16, String),
1194    /// Authentication error
1195    AuthError(String),
1196    /// Invalid state
1197    InvalidState(String),
1198}
1199
1200impl std::fmt::Display for RemoteMonitorError {
1201    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1202        match self {
1203            RemoteMonitorError::NodeNotFound(id) => write!(f, "Node not found: {}", id),
1204            RemoteMonitorError::TaskNotFound(id) => write!(f, "Task not found: {}", id),
1205            RemoteMonitorError::Network(e) => write!(f, "Network error: {}", e),
1206            RemoteMonitorError::ApiError(code, msg) => write!(f, "API error {}: {}", code, msg),
1207            RemoteMonitorError::AuthError(e) => write!(f, "Authentication error: {}", e),
1208            RemoteMonitorError::InvalidState(e) => write!(f, "Invalid state: {}", e),
1209        }
1210    }
1211}
1212
1213impl std::error::Error for RemoteMonitorError {}
1214
1215// =============================================================================
1216// Helper Functions
1217// =============================================================================
1218
1219/// Generate a unique task ID
1220pub fn generate_task_id() -> String {
1221    use std::time::{SystemTime, UNIX_EPOCH};
1222    let timestamp = SystemTime::now()
1223        .duration_since(UNIX_EPOCH)
1224        .unwrap()
1225        .as_nanos();
1226    format!("task_{:x}", timestamp)
1227}
1228
1229/// Generate a unique node ID
1230pub fn generate_node_id() -> String {
1231    use std::time::{SystemTime, UNIX_EPOCH};
1232    let timestamp = SystemTime::now()
1233        .duration_since(UNIX_EPOCH)
1234        .unwrap()
1235        .as_nanos();
1236    format!("node_{:x}", timestamp)
1237}
1238
1239// =============================================================================
1240// Tests
1241// =============================================================================
1242
1243#[cfg(test)]
1244mod tests {
1245    use super::*;
1246
1247    #[tokio::test]
1248    async fn test_remote_monitor_creation() {
1249        let config = RemoteMonitorConfig::default();
1250        let monitor = RemoteMonitor::new(config);
1251
1252        let stats = monitor.get_stats().await;
1253        assert_eq!(stats.total_nodes, 0);
1254        assert_eq!(stats.total_tasks, 0);
1255    }
1256
1257    #[tokio::test]
1258    async fn test_node_registration() {
1259        let monitor = RemoteMonitor::new(RemoteMonitorConfig::default());
1260
1261        let node = RemoteNode {
1262            id: "test-node".to_string(),
1263            name: "Test Node".to_string(),
1264            address: "localhost:9876".to_string(),
1265            status: NodeStatus::Online,
1266            tags: vec!["test".to_string()],
1267            hardware: None,
1268            active_agents: 0,
1269            running_tasks: 0,
1270            last_heartbeat: Utc::now(),
1271            registered_at: Utc::now(),
1272            metadata: HashMap::new(),
1273        };
1274
1275        monitor.register_node(node).await.unwrap();
1276
1277        let retrieved = monitor.get_node("test-node").await;
1278        assert!(retrieved.is_some());
1279        assert_eq!(retrieved.unwrap().name, "Test Node");
1280    }
1281
1282    #[tokio::test]
1283    async fn test_task_creation() {
1284        let monitor = RemoteMonitor::new(RemoteMonitorConfig::default());
1285
1286        // First register a node
1287        let node = RemoteNode {
1288            id: "test-node".to_string(),
1289            name: "Test Node".to_string(),
1290            address: "localhost:9876".to_string(),
1291            status: NodeStatus::Online,
1292            tags: vec![],
1293            hardware: None,
1294            active_agents: 1,
1295            running_tasks: 0,
1296            last_heartbeat: Utc::now(),
1297            registered_at: Utc::now(),
1298            metadata: HashMap::new(),
1299        };
1300        monitor.register_node(node).await.unwrap();
1301
1302        // Create a task
1303        let task = RemoteTaskBuilder::new("task-1", "test-node", "agent-1")
1304            .agent_name("Test Agent")
1305            .title("Test Task")
1306            .description("A test task")
1307            .priority(TaskPriority::High)
1308            .build();
1309
1310        let task_id = monitor.create_task(task).await.unwrap();
1311        assert_eq!(task_id, "task-1");
1312
1313        let retrieved = monitor.get_task("task-1").await;
1314        assert!(retrieved.is_some());
1315        assert_eq!(retrieved.unwrap().title, "Test Task");
1316    }
1317
1318    #[tokio::test]
1319    async fn test_task_progress() {
1320        let monitor = RemoteMonitor::new(RemoteMonitorConfig::default());
1321
1322        // Register node
1323        let node = RemoteNode {
1324            id: "node-1".to_string(),
1325            name: "Node 1".to_string(),
1326            address: "localhost:9876".to_string(),
1327            status: NodeStatus::Online,
1328            tags: vec![],
1329            hardware: None,
1330            active_agents: 1,
1331            running_tasks: 0,
1332            last_heartbeat: Utc::now(),
1333            registered_at: Utc::now(),
1334            metadata: HashMap::new(),
1335        };
1336        monitor.register_node(node).await.unwrap();
1337
1338        // Create task
1339        let task = RemoteTaskBuilder::new("task-1", "node-1", "agent-1")
1340            .title("Progress Test")
1341            .build();
1342        monitor.create_task(task).await.unwrap();
1343
1344        // Update progress
1345        monitor
1346            .update_task_progress("task-1", 0.5, Some("Halfway done".to_string()))
1347            .await
1348            .unwrap();
1349
1350        let task = monitor.get_task("task-1").await.unwrap();
1351        assert!((task.progress - 0.5).abs() < 0.01);
1352        assert_eq!(task.progress_message, Some("Halfway done".to_string()));
1353    }
1354
1355    #[test]
1356    fn test_task_builder() {
1357        let task = RemoteTaskBuilder::new("task-123", "node-1", "agent-1")
1358            .agent_name("My Agent")
1359            .title("Important Task")
1360            .description("Does important things")
1361            .priority(TaskPriority::Critical)
1362            .total_steps(5)
1363            .build();
1364
1365        assert_eq!(task.id, "task-123");
1366        assert_eq!(task.node_id, "node-1");
1367        assert_eq!(task.agent_id, "agent-1");
1368        assert_eq!(task.agent_name, "My Agent");
1369        assert_eq!(task.title, "Important Task");
1370        assert_eq!(task.priority, TaskPriority::Critical);
1371        assert_eq!(task.total_steps, Some(5));
1372        assert_eq!(task.status, RemoteTaskStatus::Queued);
1373    }
1374}