1#![allow(dead_code)]
15use chrono::{DateTime, Utc};
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::sync::Arc;
21use tokio::sync::{broadcast, RwLock};
22
23pub type NodeId = String;
29
30pub type RemoteTaskId = String;
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct RemoteNode {
36 pub id: NodeId,
38 pub name: String,
40 pub address: String,
42 pub status: NodeStatus,
44 pub tags: Vec<String>,
46 pub hardware: Option<HardwareInfo>,
48 pub active_agents: u32,
50 pub running_tasks: u32,
52 pub last_heartbeat: DateTime<Utc>,
54 pub registered_at: DateTime<Utc>,
56 pub metadata: HashMap<String, serde_json::Value>,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
62#[serde(rename_all = "snake_case")]
63#[derive(Default)]
64pub enum NodeStatus {
65 Online,
67 Degraded,
69 Offline,
71 Maintenance,
73 #[default]
75 Unknown,
76}
77
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct HardwareInfo {
82 pub cpu_cores: u32,
84 pub ram_total: u64,
86 pub ram_available: u64,
88 pub gpus: Vec<GpuInfo>,
90 pub os: String,
92 pub arch: String,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct GpuInfo {
99 pub name: String,
101 pub vram: u64,
103 pub cuda_version: Option<String>,
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct RemoteTask {
114 pub id: RemoteTaskId,
116 pub node_id: NodeId,
118 pub agent_id: String,
120 pub agent_name: String,
122 pub title: String,
124 pub description: Option<String>,
126 pub status: RemoteTaskStatus,
128 pub progress: f32,
130 pub progress_message: Option<String>,
132 pub current_step: Option<u32>,
134 pub total_steps: Option<u32>,
136 pub priority: TaskPriority,
138 pub started_at: DateTime<Utc>,
140 pub completed_at: Option<DateTime<Utc>>,
142 pub eta: Option<DateTime<Utc>>,
144 pub result: Option<TaskResult>,
146 pub error: Option<String>,
148 pub resources: ResourceUsage,
150 pub logs: Vec<TaskLogEntry>,
152 pub metadata: HashMap<String, serde_json::Value>,
154}
155
156#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
158#[serde(rename_all = "snake_case")]
159pub enum RemoteTaskStatus {
160 Queued,
162 Starting,
164 Running,
166 Paused,
168 Completed,
170 Failed,
172 Cancelled,
174 TimedOut,
176}
177
178#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
180#[serde(rename_all = "snake_case")]
181#[derive(Default)]
182pub enum TaskPriority {
183 Low = 0,
184 #[default]
185 Normal = 1,
186 High = 2,
187 Critical = 3,
188}
189
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
193pub struct TaskResult {
194 pub success: bool,
196 pub output: Option<serde_json::Value>,
198 pub artifacts: Vec<TaskArtifact>,
200 pub metrics: TaskMetrics,
202}
203
204#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct TaskArtifact {
207 pub name: String,
209 pub artifact_type: ArtifactType,
211 pub location: String,
213 pub size: Option<u64>,
215 pub checksum: Option<String>,
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize)]
221#[serde(rename_all = "snake_case")]
222pub enum ArtifactType {
223 File,
224 Directory,
225 Url,
226 Database,
227 Model,
228 Report,
229 Log,
230 Custom(String),
231}
232
233#[derive(Debug, Clone, Default, Serialize, Deserialize)]
235pub struct TaskMetrics {
236 pub duration_ms: u64,
238 pub tokens_used: Option<u64>,
240 pub api_calls: u32,
242 pub files_processed: u32,
244 pub errors_recovered: u32,
246 pub retries: u32,
248}
249
250#[derive(Debug, Clone, Default, Serialize, Deserialize)]
252pub struct ResourceUsage {
253 pub cpu_percent: f32,
255 pub memory_bytes: u64,
257 pub gpu_memory_bytes: Option<u64>,
259 pub network_tx_bytes: u64,
261 pub network_rx_bytes: u64,
263 pub disk_read_bytes: u64,
265 pub disk_write_bytes: u64,
267}
268
269#[derive(Debug, Clone, Serialize, Deserialize)]
271pub struct TaskLogEntry {
272 pub timestamp: DateTime<Utc>,
274 pub level: LogLevel,
276 pub message: String,
278 pub data: Option<serde_json::Value>,
280}
281
282#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
284#[serde(rename_all = "snake_case")]
285pub enum LogLevel {
286 Trace,
287 Debug,
288 Info,
289 Warn,
290 Error,
291}
292
293#[derive(Debug, Clone, Serialize, Deserialize)]
299#[serde(tag = "type", content = "data")]
300pub enum RemoteEvent {
301 NodeOnline(NodeId),
303 NodeOffline(NodeId),
305 NodeStatusChanged {
307 node_id: NodeId,
308 old_status: NodeStatus,
309 new_status: NodeStatus,
310 },
311 NodeHeartbeat {
313 node_id: NodeId,
314 timestamp: DateTime<Utc>,
315 },
316
317 TaskCreated(RemoteTask),
319 TaskStarted {
321 task_id: RemoteTaskId,
322 node_id: NodeId,
323 },
324 TaskProgress {
326 task_id: RemoteTaskId,
327 progress: f32,
328 message: Option<String>,
329 },
330 TaskStepCompleted {
332 task_id: RemoteTaskId,
333 step: u32,
334 total: u32,
335 description: Option<String>,
336 },
337 TaskCompleted {
339 task_id: RemoteTaskId,
340 result: TaskResult,
341 },
342 TaskFailed {
344 task_id: RemoteTaskId,
345 error: String,
346 },
347 TaskCancelled {
349 task_id: RemoteTaskId,
350 reason: Option<String>,
351 },
352 TaskLog {
354 task_id: RemoteTaskId,
355 entry: TaskLogEntry,
356 },
357
358 AgentRegistered {
360 node_id: NodeId,
361 agent_id: String,
362 agent_name: String,
363 },
364 AgentUnregistered { node_id: NodeId, agent_id: String },
366}
367
368#[derive(Debug, Clone, Serialize, Deserialize)]
374pub struct RemoteMonitorConfig {
375 pub bind_address: String,
377 pub port: u16,
379 pub tls_enabled: bool,
381 pub tls_cert_path: Option<String>,
383 pub tls_key_path: Option<String>,
385 pub auth_token: Option<String>,
387 pub heartbeat_interval_secs: u64,
389 pub node_timeout_secs: u64,
391 pub max_log_entries: usize,
393 pub metrics_enabled: bool,
395}
396
397impl Default for RemoteMonitorConfig {
398 fn default() -> Self {
399 Self {
400 bind_address: "0.0.0.0".to_string(),
401 port: 9876,
402 tls_enabled: false,
403 tls_cert_path: None,
404 tls_key_path: None,
405 auth_token: None,
406 heartbeat_interval_secs: 30,
407 node_timeout_secs: 90,
408 max_log_entries: 1000,
409 metrics_enabled: true,
410 }
411 }
412}
413
414pub struct RemoteMonitor {
416 config: RemoteMonitorConfig,
417 nodes: Arc<RwLock<HashMap<NodeId, RemoteNode>>>,
418 tasks: Arc<RwLock<HashMap<RemoteTaskId, RemoteTask>>>,
419 event_tx: broadcast::Sender<RemoteEvent>,
420}
421
422impl RemoteMonitor {
423 pub fn new(config: RemoteMonitorConfig) -> Self {
425 let (event_tx, _) = broadcast::channel(1000);
426
427 Self {
428 config,
429 nodes: Arc::new(RwLock::new(HashMap::new())),
430 tasks: Arc::new(RwLock::new(HashMap::new())),
431 event_tx,
432 }
433 }
434
435 pub fn subscribe(&self) -> broadcast::Receiver<RemoteEvent> {
437 self.event_tx.subscribe()
438 }
439
440 pub async fn register_node(&self, node: RemoteNode) -> Result<(), RemoteMonitorError> {
442 let node_id = node.id.clone();
443 let mut nodes = self.nodes.write().await;
444 nodes.insert(node_id.clone(), node);
445
446 let _ = self.event_tx.send(RemoteEvent::NodeOnline(node_id));
447 Ok(())
448 }
449
450 pub async fn unregister_node(&self, node_id: &str) -> Result<bool, RemoteMonitorError> {
452 let mut nodes = self.nodes.write().await;
453 let removed = nodes.remove(node_id).is_some();
454
455 if removed {
456 let _ = self
457 .event_tx
458 .send(RemoteEvent::NodeOffline(node_id.to_string()));
459 }
460
461 Ok(removed)
462 }
463
464 pub async fn heartbeat(&self, node_id: &str) -> Result<(), RemoteMonitorError> {
466 let mut nodes = self.nodes.write().await;
467
468 if let Some(node) = nodes.get_mut(node_id) {
469 node.last_heartbeat = Utc::now();
470 if node.status == NodeStatus::Offline || node.status == NodeStatus::Unknown {
471 let old_status = node.status;
472 node.status = NodeStatus::Online;
473 let _ = self.event_tx.send(RemoteEvent::NodeStatusChanged {
474 node_id: node_id.to_string(),
475 old_status,
476 new_status: NodeStatus::Online,
477 });
478 }
479 let _ = self.event_tx.send(RemoteEvent::NodeHeartbeat {
480 node_id: node_id.to_string(),
481 timestamp: node.last_heartbeat,
482 });
483 Ok(())
484 } else {
485 Err(RemoteMonitorError::NodeNotFound(node_id.to_string()))
486 }
487 }
488
489 pub async fn get_node(&self, node_id: &str) -> Option<RemoteNode> {
491 let nodes = self.nodes.read().await;
492 nodes.get(node_id).cloned()
493 }
494
495 pub async fn list_nodes(&self) -> Vec<RemoteNode> {
497 let nodes = self.nodes.read().await;
498 nodes.values().cloned().collect()
499 }
500
501 pub async fn list_nodes_by_status(&self, status: NodeStatus) -> Vec<RemoteNode> {
503 let nodes = self.nodes.read().await;
504 nodes
505 .values()
506 .filter(|n| n.status == status)
507 .cloned()
508 .collect()
509 }
510
511 pub async fn create_task(&self, task: RemoteTask) -> Result<RemoteTaskId, RemoteMonitorError> {
513 {
515 let nodes = self.nodes.read().await;
516 if !nodes.contains_key(&task.node_id) {
517 return Err(RemoteMonitorError::NodeNotFound(task.node_id.clone()));
518 }
519 }
520
521 let task_id = task.id.clone();
522 let mut tasks = self.tasks.write().await;
523 tasks.insert(task_id.clone(), task.clone());
524
525 let _ = self.event_tx.send(RemoteEvent::TaskCreated(task));
526 Ok(task_id)
527 }
528
529 pub async fn update_task_status(
531 &self,
532 task_id: &str,
533 status: RemoteTaskStatus,
534 ) -> Result<(), RemoteMonitorError> {
535 let mut tasks = self.tasks.write().await;
536
537 if let Some(task) = tasks.get_mut(task_id) {
538 let old_status = task.status;
539 task.status = status;
540
541 match status {
542 RemoteTaskStatus::Running if old_status != RemoteTaskStatus::Running => {
543 let _ = self.event_tx.send(RemoteEvent::TaskStarted {
544 task_id: task_id.to_string(),
545 node_id: task.node_id.clone(),
546 });
547 }
548 RemoteTaskStatus::Completed => {
549 task.completed_at = Some(Utc::now());
550 task.progress = 1.0;
551 }
552 RemoteTaskStatus::Failed
553 | RemoteTaskStatus::Cancelled
554 | RemoteTaskStatus::TimedOut => {
555 task.completed_at = Some(Utc::now());
556 }
557 _ => {}
558 }
559
560 Ok(())
561 } else {
562 Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
563 }
564 }
565
566 pub async fn update_task_progress(
568 &self,
569 task_id: &str,
570 progress: f32,
571 message: Option<String>,
572 ) -> Result<(), RemoteMonitorError> {
573 let mut tasks = self.tasks.write().await;
574
575 if let Some(task) = tasks.get_mut(task_id) {
576 task.progress = progress.clamp(0.0, 1.0);
577 task.progress_message = message.clone();
578
579 let _ = self.event_tx.send(RemoteEvent::TaskProgress {
580 task_id: task_id.to_string(),
581 progress: task.progress,
582 message,
583 });
584
585 Ok(())
586 } else {
587 Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
588 }
589 }
590
591 pub async fn update_task_step(
593 &self,
594 task_id: &str,
595 step: u32,
596 total: u32,
597 description: Option<String>,
598 ) -> Result<(), RemoteMonitorError> {
599 let mut tasks = self.tasks.write().await;
600
601 if let Some(task) = tasks.get_mut(task_id) {
602 task.current_step = Some(step);
603 task.total_steps = Some(total);
604 task.progress = step as f32 / total as f32;
605
606 let _ = self.event_tx.send(RemoteEvent::TaskStepCompleted {
607 task_id: task_id.to_string(),
608 step,
609 total,
610 description,
611 });
612
613 Ok(())
614 } else {
615 Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
616 }
617 }
618
619 pub async fn complete_task(
621 &self,
622 task_id: &str,
623 result: TaskResult,
624 ) -> Result<(), RemoteMonitorError> {
625 let node_id = {
626 let mut tasks = self.tasks.write().await;
627
628 if let Some(task) = tasks.get_mut(task_id) {
629 task.status = RemoteTaskStatus::Completed;
630 task.completed_at = Some(Utc::now());
631 task.progress = 1.0;
632 task.result = Some(result.clone());
633 task.node_id.clone()
634 } else {
635 return Err(RemoteMonitorError::TaskNotFound(task_id.to_string()));
636 }
637 };
638
639 let mut nodes = self.nodes.write().await;
641 if let Some(node) = nodes.get_mut(&node_id) {
642 node.running_tasks = node.running_tasks.saturating_sub(1);
643 }
644
645 let _ = self.event_tx.send(RemoteEvent::TaskCompleted {
646 task_id: task_id.to_string(),
647 result,
648 });
649
650 Ok(())
651 }
652
653 pub async fn fail_task(&self, task_id: &str, error: String) -> Result<(), RemoteMonitorError> {
655 let node_id = {
656 let mut tasks = self.tasks.write().await;
657
658 if let Some(task) = tasks.get_mut(task_id) {
659 task.status = RemoteTaskStatus::Failed;
660 task.completed_at = Some(Utc::now());
661 task.error = Some(error.clone());
662 task.node_id.clone()
663 } else {
664 return Err(RemoteMonitorError::TaskNotFound(task_id.to_string()));
665 }
666 };
667
668 let mut nodes = self.nodes.write().await;
670 if let Some(node) = nodes.get_mut(&node_id) {
671 node.running_tasks = node.running_tasks.saturating_sub(1);
672 }
673
674 let _ = self.event_tx.send(RemoteEvent::TaskFailed {
675 task_id: task_id.to_string(),
676 error,
677 });
678
679 Ok(())
680 }
681
682 pub async fn cancel_task(
684 &self,
685 task_id: &str,
686 reason: Option<String>,
687 ) -> Result<(), RemoteMonitorError> {
688 let node_id = {
689 let mut tasks = self.tasks.write().await;
690
691 if let Some(task) = tasks.get_mut(task_id) {
692 task.status = RemoteTaskStatus::Cancelled;
693 task.completed_at = Some(Utc::now());
694 task.node_id.clone()
695 } else {
696 return Err(RemoteMonitorError::TaskNotFound(task_id.to_string()));
697 }
698 };
699
700 let mut nodes = self.nodes.write().await;
702 if let Some(node) = nodes.get_mut(&node_id) {
703 node.running_tasks = node.running_tasks.saturating_sub(1);
704 }
705
706 let _ = self.event_tx.send(RemoteEvent::TaskCancelled {
707 task_id: task_id.to_string(),
708 reason,
709 });
710
711 Ok(())
712 }
713
714 pub async fn add_task_log(
716 &self,
717 task_id: &str,
718 entry: TaskLogEntry,
719 ) -> Result<(), RemoteMonitorError> {
720 let mut tasks = self.tasks.write().await;
721
722 if let Some(task) = tasks.get_mut(task_id) {
723 task.logs.push(entry.clone());
724
725 if task.logs.len() > self.config.max_log_entries {
727 let drain_count = task.logs.len() - self.config.max_log_entries;
728 task.logs.drain(0..drain_count);
729 }
730
731 let _ = self.event_tx.send(RemoteEvent::TaskLog {
732 task_id: task_id.to_string(),
733 entry,
734 });
735
736 Ok(())
737 } else {
738 Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
739 }
740 }
741
742 pub async fn get_task(&self, task_id: &str) -> Option<RemoteTask> {
744 let tasks = self.tasks.read().await;
745 tasks.get(task_id).cloned()
746 }
747
748 pub async fn list_tasks(&self) -> Vec<RemoteTask> {
750 let tasks = self.tasks.read().await;
751 tasks.values().cloned().collect()
752 }
753
754 pub async fn list_tasks_by_node(&self, node_id: &str) -> Vec<RemoteTask> {
756 let tasks = self.tasks.read().await;
757 tasks
758 .values()
759 .filter(|t| t.node_id == node_id)
760 .cloned()
761 .collect()
762 }
763
764 pub async fn list_tasks_by_status(&self, status: RemoteTaskStatus) -> Vec<RemoteTask> {
766 let tasks = self.tasks.read().await;
767 tasks
768 .values()
769 .filter(|t| t.status == status)
770 .cloned()
771 .collect()
772 }
773
774 pub async fn list_tasks_by_agent(&self, agent_id: &str) -> Vec<RemoteTask> {
776 let tasks = self.tasks.read().await;
777 tasks
778 .values()
779 .filter(|t| t.agent_id == agent_id)
780 .cloned()
781 .collect()
782 }
783
784 pub async fn get_stats(&self) -> MonitorStats {
786 let nodes = self.nodes.read().await;
787 let tasks = self.tasks.read().await;
788
789 let online_nodes = nodes
790 .values()
791 .filter(|n| n.status == NodeStatus::Online)
792 .count();
793 let total_agents: u32 = nodes.values().map(|n| n.active_agents).sum();
794
795 let running_tasks = tasks
796 .values()
797 .filter(|t| t.status == RemoteTaskStatus::Running)
798 .count();
799 let queued_tasks = tasks
800 .values()
801 .filter(|t| t.status == RemoteTaskStatus::Queued)
802 .count();
803 let completed_tasks = tasks
804 .values()
805 .filter(|t| t.status == RemoteTaskStatus::Completed)
806 .count();
807 let failed_tasks = tasks
808 .values()
809 .filter(|t| t.status == RemoteTaskStatus::Failed)
810 .count();
811
812 MonitorStats {
813 total_nodes: nodes.len(),
814 online_nodes,
815 total_agents: total_agents as usize,
816 total_tasks: tasks.len(),
817 running_tasks,
818 queued_tasks,
819 completed_tasks,
820 failed_tasks,
821 }
822 }
823
824 pub async fn check_node_timeouts(&self) {
826 let timeout = chrono::Duration::seconds(self.config.node_timeout_secs as i64);
827 let now = Utc::now();
828
829 let mut nodes = self.nodes.write().await;
830 for node in nodes.values_mut() {
831 if node.status == NodeStatus::Online && now - node.last_heartbeat > timeout {
832 let old_status = node.status;
833 node.status = NodeStatus::Offline;
834 let _ = self.event_tx.send(RemoteEvent::NodeStatusChanged {
835 node_id: node.id.clone(),
836 old_status,
837 new_status: NodeStatus::Offline,
838 });
839 }
840 }
841 }
842
843 pub fn config(&self) -> &RemoteMonitorConfig {
845 &self.config
846 }
847}
848
849#[derive(Debug, Clone, Serialize, Deserialize)]
851pub struct MonitorStats {
852 pub total_nodes: usize,
853 pub online_nodes: usize,
854 pub total_agents: usize,
855 pub total_tasks: usize,
856 pub running_tasks: usize,
857 pub queued_tasks: usize,
858 pub completed_tasks: usize,
859 pub failed_tasks: usize,
860}
861
862pub struct RemoteAgentClient {
868 pub node_id: NodeId,
870 server_url: String,
872 auth_token: Option<String>,
874 #[allow(dead_code)]
876 client: reqwest::Client,
877}
878
879impl RemoteAgentClient {
880 pub fn new(node_id: impl Into<String>, server_url: impl Into<String>) -> Self {
882 Self {
883 node_id: node_id.into(),
884 server_url: server_url.into(),
885 auth_token: None,
886 client: reqwest::Client::new(),
887 }
888 }
889
890 pub fn with_auth(mut self, token: impl Into<String>) -> Self {
892 self.auth_token = Some(token.into());
893 self
894 }
895
896 pub async fn heartbeat(&self) -> Result<(), RemoteMonitorError> {
898 let url = format!(
899 "{}/api/v1/nodes/{}/heartbeat",
900 self.server_url, self.node_id
901 );
902
903 let mut request = self.client.post(&url);
904 if let Some(ref token) = self.auth_token {
905 request = request.header("Authorization", format!("Bearer {}", token));
906 }
907
908 let response = request
909 .send()
910 .await
911 .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
912
913 if !response.status().is_success() {
914 return Err(RemoteMonitorError::ApiError(
915 response.status().as_u16(),
916 response.text().await.unwrap_or_default(),
917 ));
918 }
919
920 Ok(())
921 }
922
923 pub async fn report_progress(
925 &self,
926 task_id: &str,
927 progress: f32,
928 message: Option<&str>,
929 ) -> Result<(), RemoteMonitorError> {
930 let url = format!("{}/api/v1/tasks/{}/progress", self.server_url, task_id);
931
932 let body = serde_json::json!({
933 "progress": progress,
934 "message": message,
935 });
936
937 let mut request = self.client.post(&url).json(&body);
938 if let Some(ref token) = self.auth_token {
939 request = request.header("Authorization", format!("Bearer {}", token));
940 }
941
942 let response = request
943 .send()
944 .await
945 .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
946
947 if !response.status().is_success() {
948 return Err(RemoteMonitorError::ApiError(
949 response.status().as_u16(),
950 response.text().await.unwrap_or_default(),
951 ));
952 }
953
954 Ok(())
955 }
956
957 pub async fn report_step(
959 &self,
960 task_id: &str,
961 step: u32,
962 total: u32,
963 description: Option<&str>,
964 ) -> Result<(), RemoteMonitorError> {
965 let url = format!("{}/api/v1/tasks/{}/step", self.server_url, task_id);
966
967 let body = serde_json::json!({
968 "step": step,
969 "total": total,
970 "description": description,
971 });
972
973 let mut request = self.client.post(&url).json(&body);
974 if let Some(ref token) = self.auth_token {
975 request = request.header("Authorization", format!("Bearer {}", token));
976 }
977
978 let response = request
979 .send()
980 .await
981 .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
982
983 if !response.status().is_success() {
984 return Err(RemoteMonitorError::ApiError(
985 response.status().as_u16(),
986 response.text().await.unwrap_or_default(),
987 ));
988 }
989
990 Ok(())
991 }
992
993 pub async fn report_completed(
995 &self,
996 task_id: &str,
997 result: TaskResult,
998 ) -> Result<(), RemoteMonitorError> {
999 let url = format!("{}/api/v1/tasks/{}/complete", self.server_url, task_id);
1000
1001 let mut request = self.client.post(&url).json(&result);
1002 if let Some(ref token) = self.auth_token {
1003 request = request.header("Authorization", format!("Bearer {}", token));
1004 }
1005
1006 let response = request
1007 .send()
1008 .await
1009 .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
1010
1011 if !response.status().is_success() {
1012 return Err(RemoteMonitorError::ApiError(
1013 response.status().as_u16(),
1014 response.text().await.unwrap_or_default(),
1015 ));
1016 }
1017
1018 Ok(())
1019 }
1020
1021 pub async fn report_failed(
1023 &self,
1024 task_id: &str,
1025 error: &str,
1026 ) -> Result<(), RemoteMonitorError> {
1027 let url = format!("{}/api/v1/tasks/{}/fail", self.server_url, task_id);
1028
1029 let body = serde_json::json!({
1030 "error": error,
1031 });
1032
1033 let mut request = self.client.post(&url).json(&body);
1034 if let Some(ref token) = self.auth_token {
1035 request = request.header("Authorization", format!("Bearer {}", token));
1036 }
1037
1038 let response = request
1039 .send()
1040 .await
1041 .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
1042
1043 if !response.status().is_success() {
1044 return Err(RemoteMonitorError::ApiError(
1045 response.status().as_u16(),
1046 response.text().await.unwrap_or_default(),
1047 ));
1048 }
1049
1050 Ok(())
1051 }
1052
1053 pub async fn send_log(
1055 &self,
1056 task_id: &str,
1057 level: LogLevel,
1058 message: &str,
1059 ) -> Result<(), RemoteMonitorError> {
1060 let url = format!("{}/api/v1/tasks/{}/log", self.server_url, task_id);
1061
1062 let body = serde_json::json!({
1063 "level": level,
1064 "message": message,
1065 "timestamp": Utc::now(),
1066 });
1067
1068 let mut request = self.client.post(&url).json(&body);
1069 if let Some(ref token) = self.auth_token {
1070 request = request.header("Authorization", format!("Bearer {}", token));
1071 }
1072
1073 let response = request
1074 .send()
1075 .await
1076 .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
1077
1078 if !response.status().is_success() {
1079 return Err(RemoteMonitorError::ApiError(
1080 response.status().as_u16(),
1081 response.text().await.unwrap_or_default(),
1082 ));
1083 }
1084
1085 Ok(())
1086 }
1087}
1088
1089pub struct RemoteTaskBuilder {
1095 task: RemoteTask,
1096}
1097
1098impl RemoteTaskBuilder {
1099 pub fn new(
1101 id: impl Into<String>,
1102 node_id: impl Into<String>,
1103 agent_id: impl Into<String>,
1104 ) -> Self {
1105 Self {
1106 task: RemoteTask {
1107 id: id.into(),
1108 node_id: node_id.into(),
1109 agent_id: agent_id.into(),
1110 agent_name: String::new(),
1111 title: String::new(),
1112 description: None,
1113 status: RemoteTaskStatus::Queued,
1114 progress: 0.0,
1115 progress_message: None,
1116 current_step: None,
1117 total_steps: None,
1118 priority: TaskPriority::Normal,
1119 started_at: Utc::now(),
1120 completed_at: None,
1121 eta: None,
1122 result: None,
1123 error: None,
1124 resources: ResourceUsage::default(),
1125 logs: Vec::new(),
1126 metadata: HashMap::new(),
1127 },
1128 }
1129 }
1130
1131 pub fn agent_name(mut self, name: impl Into<String>) -> Self {
1133 self.task.agent_name = name.into();
1134 self
1135 }
1136
1137 pub fn title(mut self, title: impl Into<String>) -> Self {
1139 self.task.title = title.into();
1140 self
1141 }
1142
1143 pub fn description(mut self, description: impl Into<String>) -> Self {
1145 self.task.description = Some(description.into());
1146 self
1147 }
1148
1149 pub fn priority(mut self, priority: TaskPriority) -> Self {
1151 self.task.priority = priority;
1152 self
1153 }
1154
1155 pub fn total_steps(mut self, steps: u32) -> Self {
1157 self.task.total_steps = Some(steps);
1158 self
1159 }
1160
1161 pub fn eta(mut self, eta: DateTime<Utc>) -> Self {
1163 self.task.eta = Some(eta);
1164 self
1165 }
1166
1167 pub fn metadata(mut self, key: impl Into<String>, value: impl Serialize) -> Self {
1169 if let Ok(v) = serde_json::to_value(value) {
1170 self.task.metadata.insert(key.into(), v);
1171 }
1172 self
1173 }
1174
1175 pub fn build(self) -> RemoteTask {
1177 self.task
1178 }
1179}
1180
1181#[derive(Debug, Clone)]
1187pub enum RemoteMonitorError {
1188 NodeNotFound(String),
1190 TaskNotFound(String),
1192 Network(String),
1194 ApiError(u16, String),
1196 AuthError(String),
1198 InvalidState(String),
1200}
1201
1202impl std::fmt::Display for RemoteMonitorError {
1203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1204 match self {
1205 RemoteMonitorError::NodeNotFound(id) => write!(f, "Node not found: {}", id),
1206 RemoteMonitorError::TaskNotFound(id) => write!(f, "Task not found: {}", id),
1207 RemoteMonitorError::Network(e) => write!(f, "Network error: {}", e),
1208 RemoteMonitorError::ApiError(code, msg) => write!(f, "API error {}: {}", code, msg),
1209 RemoteMonitorError::AuthError(e) => write!(f, "Authentication error: {}", e),
1210 RemoteMonitorError::InvalidState(e) => write!(f, "Invalid state: {}", e),
1211 }
1212 }
1213}
1214
1215impl std::error::Error for RemoteMonitorError {}
1216
1217pub fn generate_task_id() -> String {
1223 use std::time::{SystemTime, UNIX_EPOCH};
1224 let timestamp = SystemTime::now()
1225 .duration_since(UNIX_EPOCH)
1226 .unwrap()
1227 .as_nanos();
1228 format!("task_{:x}", timestamp)
1229}
1230
1231pub fn generate_node_id() -> String {
1233 use std::time::{SystemTime, UNIX_EPOCH};
1234 let timestamp = SystemTime::now()
1235 .duration_since(UNIX_EPOCH)
1236 .unwrap()
1237 .as_nanos();
1238 format!("node_{:x}", timestamp)
1239}
1240
1241#[cfg(test)]
1246mod tests {
1247 use super::*;
1248
1249 #[tokio::test]
1250 async fn test_remote_monitor_creation() {
1251 let config = RemoteMonitorConfig::default();
1252 let monitor = RemoteMonitor::new(config);
1253
1254 let stats = monitor.get_stats().await;
1255 assert_eq!(stats.total_nodes, 0);
1256 assert_eq!(stats.total_tasks, 0);
1257 }
1258
1259 #[tokio::test]
1260 async fn test_node_registration() {
1261 let monitor = RemoteMonitor::new(RemoteMonitorConfig::default());
1262
1263 let node = RemoteNode {
1264 id: "test-node".to_string(),
1265 name: "Test Node".to_string(),
1266 address: "localhost:9876".to_string(),
1267 status: NodeStatus::Online,
1268 tags: vec!["test".to_string()],
1269 hardware: None,
1270 active_agents: 0,
1271 running_tasks: 0,
1272 last_heartbeat: Utc::now(),
1273 registered_at: Utc::now(),
1274 metadata: HashMap::new(),
1275 };
1276
1277 monitor.register_node(node).await.unwrap();
1278
1279 let retrieved = monitor.get_node("test-node").await;
1280 assert!(retrieved.is_some());
1281 assert_eq!(retrieved.unwrap().name, "Test Node");
1282 }
1283
1284 #[tokio::test]
1285 async fn test_task_creation() {
1286 let monitor = RemoteMonitor::new(RemoteMonitorConfig::default());
1287
1288 let node = RemoteNode {
1290 id: "test-node".to_string(),
1291 name: "Test Node".to_string(),
1292 address: "localhost:9876".to_string(),
1293 status: NodeStatus::Online,
1294 tags: vec![],
1295 hardware: None,
1296 active_agents: 1,
1297 running_tasks: 0,
1298 last_heartbeat: Utc::now(),
1299 registered_at: Utc::now(),
1300 metadata: HashMap::new(),
1301 };
1302 monitor.register_node(node).await.unwrap();
1303
1304 let task = RemoteTaskBuilder::new("task-1", "test-node", "agent-1")
1306 .agent_name("Test Agent")
1307 .title("Test Task")
1308 .description("A test task")
1309 .priority(TaskPriority::High)
1310 .build();
1311
1312 let task_id = monitor.create_task(task).await.unwrap();
1313 assert_eq!(task_id, "task-1");
1314
1315 let retrieved = monitor.get_task("task-1").await;
1316 assert!(retrieved.is_some());
1317 assert_eq!(retrieved.unwrap().title, "Test Task");
1318 }
1319
1320 #[tokio::test]
1321 async fn test_task_progress() {
1322 let monitor = RemoteMonitor::new(RemoteMonitorConfig::default());
1323
1324 let node = RemoteNode {
1326 id: "node-1".to_string(),
1327 name: "Node 1".to_string(),
1328 address: "localhost:9876".to_string(),
1329 status: NodeStatus::Online,
1330 tags: vec![],
1331 hardware: None,
1332 active_agents: 1,
1333 running_tasks: 0,
1334 last_heartbeat: Utc::now(),
1335 registered_at: Utc::now(),
1336 metadata: HashMap::new(),
1337 };
1338 monitor.register_node(node).await.unwrap();
1339
1340 let task = RemoteTaskBuilder::new("task-1", "node-1", "agent-1")
1342 .title("Progress Test")
1343 .build();
1344 monitor.create_task(task).await.unwrap();
1345
1346 monitor
1348 .update_task_progress("task-1", 0.5, Some("Halfway done".to_string()))
1349 .await
1350 .unwrap();
1351
1352 let task = monitor.get_task("task-1").await.unwrap();
1353 assert!((task.progress - 0.5).abs() < 0.01);
1354 assert_eq!(task.progress_message, Some("Halfway done".to_string()));
1355 }
1356
1357 #[test]
1358 fn test_task_builder() {
1359 let task = RemoteTaskBuilder::new("task-123", "node-1", "agent-1")
1360 .agent_name("My Agent")
1361 .title("Important Task")
1362 .description("Does important things")
1363 .priority(TaskPriority::Critical)
1364 .total_steps(5)
1365 .build();
1366
1367 assert_eq!(task.id, "task-123");
1368 assert_eq!(task.node_id, "node-1");
1369 assert_eq!(task.agent_id, "agent-1");
1370 assert_eq!(task.agent_name, "My Agent");
1371 assert_eq!(task.title, "Important Task");
1372 assert_eq!(task.priority, TaskPriority::Critical);
1373 assert_eq!(task.total_steps, Some(5));
1374 assert_eq!(task.status, RemoteTaskStatus::Queued);
1375 }
1376}