1#![allow(dead_code)]
15use chrono::{DateTime, Utc};
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::sync::Arc;
21use tokio::sync::{broadcast, RwLock};
22
23pub type NodeId = String;
29
30pub type RemoteTaskId = String;
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct RemoteNode {
36 pub id: NodeId,
38 pub name: String,
40 pub address: String,
42 pub status: NodeStatus,
44 pub tags: Vec<String>,
46 pub hardware: Option<HardwareInfo>,
48 pub active_agents: u32,
50 pub running_tasks: u32,
52 pub last_heartbeat: DateTime<Utc>,
54 pub registered_at: DateTime<Utc>,
56 pub metadata: HashMap<String, serde_json::Value>,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
62#[serde(rename_all = "snake_case")]
63#[derive(Default)]
64pub enum NodeStatus {
65 Online,
67 Degraded,
69 Offline,
71 Maintenance,
73 #[default]
75 Unknown,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct HardwareInfo {
81 pub cpu_cores: u32,
83 pub ram_total: u64,
85 pub ram_available: u64,
87 pub gpus: Vec<GpuInfo>,
89 pub os: String,
91 pub arch: String,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct GpuInfo {
98 pub name: String,
100 pub vram: u64,
102 pub cuda_version: Option<String>,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct RemoteTask {
113 pub id: RemoteTaskId,
115 pub node_id: NodeId,
117 pub agent_id: String,
119 pub agent_name: String,
121 pub title: String,
123 pub description: Option<String>,
125 pub status: RemoteTaskStatus,
127 pub progress: f32,
129 pub progress_message: Option<String>,
131 pub current_step: Option<u32>,
133 pub total_steps: Option<u32>,
135 pub priority: TaskPriority,
137 pub started_at: DateTime<Utc>,
139 pub completed_at: Option<DateTime<Utc>>,
141 pub eta: Option<DateTime<Utc>>,
143 pub result: Option<TaskResult>,
145 pub error: Option<String>,
147 pub resources: ResourceUsage,
149 pub logs: Vec<TaskLogEntry>,
151 pub metadata: HashMap<String, serde_json::Value>,
153}
154
155#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
157#[serde(rename_all = "snake_case")]
158pub enum RemoteTaskStatus {
159 Queued,
161 Starting,
163 Running,
165 Paused,
167 Completed,
169 Failed,
171 Cancelled,
173 TimedOut,
175}
176
177#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
179#[serde(rename_all = "snake_case")]
180#[derive(Default)]
181pub enum TaskPriority {
182 Low = 0,
183 #[default]
184 Normal = 1,
185 High = 2,
186 Critical = 3,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct TaskResult {
192 pub success: bool,
194 pub output: Option<serde_json::Value>,
196 pub artifacts: Vec<TaskArtifact>,
198 pub metrics: TaskMetrics,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize)]
204pub struct TaskArtifact {
205 pub name: String,
207 pub artifact_type: ArtifactType,
209 pub location: String,
211 pub size: Option<u64>,
213 pub checksum: Option<String>,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
219#[serde(rename_all = "snake_case")]
220pub enum ArtifactType {
221 File,
222 Directory,
223 Url,
224 Database,
225 Model,
226 Report,
227 Log,
228 Custom(String),
229}
230
231#[derive(Debug, Clone, Default, Serialize, Deserialize)]
233pub struct TaskMetrics {
234 pub duration_ms: u64,
236 pub tokens_used: Option<u64>,
238 pub api_calls: u32,
240 pub files_processed: u32,
242 pub errors_recovered: u32,
244 pub retries: u32,
246}
247
248#[derive(Debug, Clone, Default, Serialize, Deserialize)]
250pub struct ResourceUsage {
251 pub cpu_percent: f32,
253 pub memory_bytes: u64,
255 pub gpu_memory_bytes: Option<u64>,
257 pub network_tx_bytes: u64,
259 pub network_rx_bytes: u64,
261 pub disk_read_bytes: u64,
263 pub disk_write_bytes: u64,
265}
266
267#[derive(Debug, Clone, Serialize, Deserialize)]
269pub struct TaskLogEntry {
270 pub timestamp: DateTime<Utc>,
272 pub level: LogLevel,
274 pub message: String,
276 pub data: Option<serde_json::Value>,
278}
279
280#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
282#[serde(rename_all = "snake_case")]
283pub enum LogLevel {
284 Trace,
285 Debug,
286 Info,
287 Warn,
288 Error,
289}
290
291#[derive(Debug, Clone, Serialize, Deserialize)]
297#[serde(tag = "type", content = "data")]
298pub enum RemoteEvent {
299 NodeOnline(NodeId),
301 NodeOffline(NodeId),
303 NodeStatusChanged {
305 node_id: NodeId,
306 old_status: NodeStatus,
307 new_status: NodeStatus,
308 },
309 NodeHeartbeat {
311 node_id: NodeId,
312 timestamp: DateTime<Utc>,
313 },
314
315 TaskCreated(Box<RemoteTask>),
317 TaskStarted {
319 task_id: RemoteTaskId,
320 node_id: NodeId,
321 },
322 TaskProgress {
324 task_id: RemoteTaskId,
325 progress: f32,
326 message: Option<String>,
327 },
328 TaskStepCompleted {
330 task_id: RemoteTaskId,
331 step: u32,
332 total: u32,
333 description: Option<String>,
334 },
335 TaskCompleted {
337 task_id: RemoteTaskId,
338 result: TaskResult,
339 },
340 TaskFailed {
342 task_id: RemoteTaskId,
343 error: String,
344 },
345 TaskCancelled {
347 task_id: RemoteTaskId,
348 reason: Option<String>,
349 },
350 TaskLog {
352 task_id: RemoteTaskId,
353 entry: TaskLogEntry,
354 },
355
356 AgentRegistered {
358 node_id: NodeId,
359 agent_id: String,
360 agent_name: String,
361 },
362 AgentUnregistered { node_id: NodeId, agent_id: String },
364}
365
366#[derive(Debug, Clone, Serialize, Deserialize)]
372pub struct RemoteMonitorConfig {
373 pub bind_address: String,
375 pub port: u16,
377 pub tls_enabled: bool,
379 pub tls_cert_path: Option<String>,
381 pub tls_key_path: Option<String>,
383 pub auth_token: Option<String>,
385 pub heartbeat_interval_secs: u64,
387 pub node_timeout_secs: u64,
389 pub max_log_entries: usize,
391 pub metrics_enabled: bool,
393}
394
395impl Default for RemoteMonitorConfig {
396 fn default() -> Self {
397 Self {
398 bind_address: "0.0.0.0".to_string(),
399 port: 9876,
400 tls_enabled: false,
401 tls_cert_path: None,
402 tls_key_path: None,
403 auth_token: None,
404 heartbeat_interval_secs: 30,
405 node_timeout_secs: 90,
406 max_log_entries: 1000,
407 metrics_enabled: true,
408 }
409 }
410}
411
412pub struct RemoteMonitor {
414 config: RemoteMonitorConfig,
415 nodes: Arc<RwLock<HashMap<NodeId, RemoteNode>>>,
416 tasks: Arc<RwLock<HashMap<RemoteTaskId, RemoteTask>>>,
417 event_tx: broadcast::Sender<RemoteEvent>,
418}
419
420impl RemoteMonitor {
421 pub fn new(config: RemoteMonitorConfig) -> Self {
423 let (event_tx, _) = broadcast::channel(1000);
424
425 Self {
426 config,
427 nodes: Arc::new(RwLock::new(HashMap::new())),
428 tasks: Arc::new(RwLock::new(HashMap::new())),
429 event_tx,
430 }
431 }
432
433 pub fn subscribe(&self) -> broadcast::Receiver<RemoteEvent> {
435 self.event_tx.subscribe()
436 }
437
438 pub async fn register_node(&self, node: RemoteNode) -> Result<(), RemoteMonitorError> {
440 let node_id = node.id.clone();
441 let mut nodes = self.nodes.write().await;
442 nodes.insert(node_id.clone(), node);
443
444 let _ = self.event_tx.send(RemoteEvent::NodeOnline(node_id));
445 Ok(())
446 }
447
448 pub async fn unregister_node(&self, node_id: &str) -> Result<bool, RemoteMonitorError> {
450 let mut nodes = self.nodes.write().await;
451 let removed = nodes.remove(node_id).is_some();
452
453 if removed {
454 let _ = self
455 .event_tx
456 .send(RemoteEvent::NodeOffline(node_id.to_string()));
457 }
458
459 Ok(removed)
460 }
461
462 pub async fn heartbeat(&self, node_id: &str) -> Result<(), RemoteMonitorError> {
464 let mut nodes = self.nodes.write().await;
465
466 if let Some(node) = nodes.get_mut(node_id) {
467 node.last_heartbeat = Utc::now();
468 if node.status == NodeStatus::Offline || node.status == NodeStatus::Unknown {
469 let old_status = node.status;
470 node.status = NodeStatus::Online;
471 let _ = self.event_tx.send(RemoteEvent::NodeStatusChanged {
472 node_id: node_id.to_string(),
473 old_status,
474 new_status: NodeStatus::Online,
475 });
476 }
477 let _ = self.event_tx.send(RemoteEvent::NodeHeartbeat {
478 node_id: node_id.to_string(),
479 timestamp: node.last_heartbeat,
480 });
481 Ok(())
482 } else {
483 Err(RemoteMonitorError::NodeNotFound(node_id.to_string()))
484 }
485 }
486
487 pub async fn get_node(&self, node_id: &str) -> Option<RemoteNode> {
489 let nodes = self.nodes.read().await;
490 nodes.get(node_id).cloned()
491 }
492
493 pub async fn list_nodes(&self) -> Vec<RemoteNode> {
495 let nodes = self.nodes.read().await;
496 nodes.values().cloned().collect()
497 }
498
499 pub async fn list_nodes_by_status(&self, status: NodeStatus) -> Vec<RemoteNode> {
501 let nodes = self.nodes.read().await;
502 nodes
503 .values()
504 .filter(|n| n.status == status)
505 .cloned()
506 .collect()
507 }
508
509 pub async fn create_task(&self, task: RemoteTask) -> Result<RemoteTaskId, RemoteMonitorError> {
511 {
513 let nodes = self.nodes.read().await;
514 if !nodes.contains_key(&task.node_id) {
515 return Err(RemoteMonitorError::NodeNotFound(task.node_id.clone()));
516 }
517 }
518
519 let task_id = task.id.clone();
520 let mut tasks = self.tasks.write().await;
521 tasks.insert(task_id.clone(), task.clone());
522
523 let _ = self.event_tx.send(RemoteEvent::TaskCreated(Box::new(task)));
524 Ok(task_id)
525 }
526
527 pub async fn update_task_status(
529 &self,
530 task_id: &str,
531 status: RemoteTaskStatus,
532 ) -> Result<(), RemoteMonitorError> {
533 let mut tasks = self.tasks.write().await;
534
535 if let Some(task) = tasks.get_mut(task_id) {
536 let old_status = task.status;
537 task.status = status;
538
539 match status {
540 RemoteTaskStatus::Running if old_status != RemoteTaskStatus::Running => {
541 let _ = self.event_tx.send(RemoteEvent::TaskStarted {
542 task_id: task_id.to_string(),
543 node_id: task.node_id.clone(),
544 });
545 }
546 RemoteTaskStatus::Completed => {
547 task.completed_at = Some(Utc::now());
548 task.progress = 1.0;
549 }
550 RemoteTaskStatus::Failed
551 | RemoteTaskStatus::Cancelled
552 | RemoteTaskStatus::TimedOut => {
553 task.completed_at = Some(Utc::now());
554 }
555 _ => {}
556 }
557
558 Ok(())
559 } else {
560 Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
561 }
562 }
563
564 pub async fn update_task_progress(
566 &self,
567 task_id: &str,
568 progress: f32,
569 message: Option<String>,
570 ) -> Result<(), RemoteMonitorError> {
571 let mut tasks = self.tasks.write().await;
572
573 if let Some(task) = tasks.get_mut(task_id) {
574 task.progress = progress.clamp(0.0, 1.0);
575 task.progress_message = message.clone();
576
577 let _ = self.event_tx.send(RemoteEvent::TaskProgress {
578 task_id: task_id.to_string(),
579 progress: task.progress,
580 message,
581 });
582
583 Ok(())
584 } else {
585 Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
586 }
587 }
588
589 pub async fn update_task_step(
591 &self,
592 task_id: &str,
593 step: u32,
594 total: u32,
595 description: Option<String>,
596 ) -> Result<(), RemoteMonitorError> {
597 let mut tasks = self.tasks.write().await;
598
599 if let Some(task) = tasks.get_mut(task_id) {
600 task.current_step = Some(step);
601 task.total_steps = Some(total);
602 task.progress = step as f32 / total as f32;
603
604 let _ = self.event_tx.send(RemoteEvent::TaskStepCompleted {
605 task_id: task_id.to_string(),
606 step,
607 total,
608 description,
609 });
610
611 Ok(())
612 } else {
613 Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
614 }
615 }
616
617 pub async fn complete_task(
619 &self,
620 task_id: &str,
621 result: TaskResult,
622 ) -> Result<(), RemoteMonitorError> {
623 let node_id = {
624 let mut tasks = self.tasks.write().await;
625
626 if let Some(task) = tasks.get_mut(task_id) {
627 task.status = RemoteTaskStatus::Completed;
628 task.completed_at = Some(Utc::now());
629 task.progress = 1.0;
630 task.result = Some(result.clone());
631 task.node_id.clone()
632 } else {
633 return Err(RemoteMonitorError::TaskNotFound(task_id.to_string()));
634 }
635 };
636
637 let mut nodes = self.nodes.write().await;
639 if let Some(node) = nodes.get_mut(&node_id) {
640 node.running_tasks = node.running_tasks.saturating_sub(1);
641 }
642
643 let _ = self.event_tx.send(RemoteEvent::TaskCompleted {
644 task_id: task_id.to_string(),
645 result,
646 });
647
648 Ok(())
649 }
650
651 pub async fn fail_task(&self, task_id: &str, error: String) -> Result<(), RemoteMonitorError> {
653 let node_id = {
654 let mut tasks = self.tasks.write().await;
655
656 if let Some(task) = tasks.get_mut(task_id) {
657 task.status = RemoteTaskStatus::Failed;
658 task.completed_at = Some(Utc::now());
659 task.error = Some(error.clone());
660 task.node_id.clone()
661 } else {
662 return Err(RemoteMonitorError::TaskNotFound(task_id.to_string()));
663 }
664 };
665
666 let mut nodes = self.nodes.write().await;
668 if let Some(node) = nodes.get_mut(&node_id) {
669 node.running_tasks = node.running_tasks.saturating_sub(1);
670 }
671
672 let _ = self.event_tx.send(RemoteEvent::TaskFailed {
673 task_id: task_id.to_string(),
674 error,
675 });
676
677 Ok(())
678 }
679
680 pub async fn cancel_task(
682 &self,
683 task_id: &str,
684 reason: Option<String>,
685 ) -> Result<(), RemoteMonitorError> {
686 let node_id = {
687 let mut tasks = self.tasks.write().await;
688
689 if let Some(task) = tasks.get_mut(task_id) {
690 task.status = RemoteTaskStatus::Cancelled;
691 task.completed_at = Some(Utc::now());
692 task.node_id.clone()
693 } else {
694 return Err(RemoteMonitorError::TaskNotFound(task_id.to_string()));
695 }
696 };
697
698 let mut nodes = self.nodes.write().await;
700 if let Some(node) = nodes.get_mut(&node_id) {
701 node.running_tasks = node.running_tasks.saturating_sub(1);
702 }
703
704 let _ = self.event_tx.send(RemoteEvent::TaskCancelled {
705 task_id: task_id.to_string(),
706 reason,
707 });
708
709 Ok(())
710 }
711
712 pub async fn add_task_log(
714 &self,
715 task_id: &str,
716 entry: TaskLogEntry,
717 ) -> Result<(), RemoteMonitorError> {
718 let mut tasks = self.tasks.write().await;
719
720 if let Some(task) = tasks.get_mut(task_id) {
721 task.logs.push(entry.clone());
722
723 if task.logs.len() > self.config.max_log_entries {
725 let drain_count = task.logs.len() - self.config.max_log_entries;
726 task.logs.drain(0..drain_count);
727 }
728
729 let _ = self.event_tx.send(RemoteEvent::TaskLog {
730 task_id: task_id.to_string(),
731 entry,
732 });
733
734 Ok(())
735 } else {
736 Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
737 }
738 }
739
740 pub async fn get_task(&self, task_id: &str) -> Option<RemoteTask> {
742 let tasks = self.tasks.read().await;
743 tasks.get(task_id).cloned()
744 }
745
746 pub async fn list_tasks(&self) -> Vec<RemoteTask> {
748 let tasks = self.tasks.read().await;
749 tasks.values().cloned().collect()
750 }
751
752 pub async fn list_tasks_by_node(&self, node_id: &str) -> Vec<RemoteTask> {
754 let tasks = self.tasks.read().await;
755 tasks
756 .values()
757 .filter(|t| t.node_id == node_id)
758 .cloned()
759 .collect()
760 }
761
762 pub async fn list_tasks_by_status(&self, status: RemoteTaskStatus) -> Vec<RemoteTask> {
764 let tasks = self.tasks.read().await;
765 tasks
766 .values()
767 .filter(|t| t.status == status)
768 .cloned()
769 .collect()
770 }
771
772 pub async fn list_tasks_by_agent(&self, agent_id: &str) -> Vec<RemoteTask> {
774 let tasks = self.tasks.read().await;
775 tasks
776 .values()
777 .filter(|t| t.agent_id == agent_id)
778 .cloned()
779 .collect()
780 }
781
782 pub async fn get_stats(&self) -> MonitorStats {
784 let nodes = self.nodes.read().await;
785 let tasks = self.tasks.read().await;
786
787 let online_nodes = nodes
788 .values()
789 .filter(|n| n.status == NodeStatus::Online)
790 .count();
791 let total_agents: u32 = nodes.values().map(|n| n.active_agents).sum();
792
793 let running_tasks = tasks
794 .values()
795 .filter(|t| t.status == RemoteTaskStatus::Running)
796 .count();
797 let queued_tasks = tasks
798 .values()
799 .filter(|t| t.status == RemoteTaskStatus::Queued)
800 .count();
801 let completed_tasks = tasks
802 .values()
803 .filter(|t| t.status == RemoteTaskStatus::Completed)
804 .count();
805 let failed_tasks = tasks
806 .values()
807 .filter(|t| t.status == RemoteTaskStatus::Failed)
808 .count();
809
810 MonitorStats {
811 total_nodes: nodes.len(),
812 online_nodes,
813 total_agents: total_agents as usize,
814 total_tasks: tasks.len(),
815 running_tasks,
816 queued_tasks,
817 completed_tasks,
818 failed_tasks,
819 }
820 }
821
822 pub async fn check_node_timeouts(&self) {
824 let timeout = chrono::Duration::seconds(self.config.node_timeout_secs as i64);
825 let now = Utc::now();
826
827 let mut nodes = self.nodes.write().await;
828 for node in nodes.values_mut() {
829 if node.status == NodeStatus::Online && now - node.last_heartbeat > timeout {
830 let old_status = node.status;
831 node.status = NodeStatus::Offline;
832 let _ = self.event_tx.send(RemoteEvent::NodeStatusChanged {
833 node_id: node.id.clone(),
834 old_status,
835 new_status: NodeStatus::Offline,
836 });
837 }
838 }
839 }
840
841 pub fn config(&self) -> &RemoteMonitorConfig {
843 &self.config
844 }
845}
846
847#[derive(Debug, Clone, Serialize, Deserialize)]
849pub struct MonitorStats {
850 pub total_nodes: usize,
851 pub online_nodes: usize,
852 pub total_agents: usize,
853 pub total_tasks: usize,
854 pub running_tasks: usize,
855 pub queued_tasks: usize,
856 pub completed_tasks: usize,
857 pub failed_tasks: usize,
858}
859
860pub struct RemoteAgentClient {
866 pub node_id: NodeId,
868 server_url: String,
870 auth_token: Option<String>,
872 #[allow(dead_code)]
874 client: reqwest::Client,
875}
876
877impl RemoteAgentClient {
878 pub fn new(node_id: impl Into<String>, server_url: impl Into<String>) -> Self {
880 Self {
881 node_id: node_id.into(),
882 server_url: server_url.into(),
883 auth_token: None,
884 client: reqwest::Client::new(),
885 }
886 }
887
888 pub fn with_auth(mut self, token: impl Into<String>) -> Self {
890 self.auth_token = Some(token.into());
891 self
892 }
893
894 pub async fn heartbeat(&self) -> Result<(), RemoteMonitorError> {
896 let url = format!(
897 "{}/api/v1/nodes/{}/heartbeat",
898 self.server_url, self.node_id
899 );
900
901 let mut request = self.client.post(&url);
902 if let Some(ref token) = self.auth_token {
903 request = request.header("Authorization", format!("Bearer {}", token));
904 }
905
906 let response = request
907 .send()
908 .await
909 .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
910
911 if !response.status().is_success() {
912 return Err(RemoteMonitorError::ApiError(
913 response.status().as_u16(),
914 response.text().await.unwrap_or_default(),
915 ));
916 }
917
918 Ok(())
919 }
920
921 pub async fn report_progress(
923 &self,
924 task_id: &str,
925 progress: f32,
926 message: Option<&str>,
927 ) -> Result<(), RemoteMonitorError> {
928 let url = format!("{}/api/v1/tasks/{}/progress", self.server_url, task_id);
929
930 let body = serde_json::json!({
931 "progress": progress,
932 "message": message,
933 });
934
935 let mut request = self.client.post(&url).json(&body);
936 if let Some(ref token) = self.auth_token {
937 request = request.header("Authorization", format!("Bearer {}", token));
938 }
939
940 let response = request
941 .send()
942 .await
943 .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
944
945 if !response.status().is_success() {
946 return Err(RemoteMonitorError::ApiError(
947 response.status().as_u16(),
948 response.text().await.unwrap_or_default(),
949 ));
950 }
951
952 Ok(())
953 }
954
955 pub async fn report_step(
957 &self,
958 task_id: &str,
959 step: u32,
960 total: u32,
961 description: Option<&str>,
962 ) -> Result<(), RemoteMonitorError> {
963 let url = format!("{}/api/v1/tasks/{}/step", self.server_url, task_id);
964
965 let body = serde_json::json!({
966 "step": step,
967 "total": total,
968 "description": description,
969 });
970
971 let mut request = self.client.post(&url).json(&body);
972 if let Some(ref token) = self.auth_token {
973 request = request.header("Authorization", format!("Bearer {}", token));
974 }
975
976 let response = request
977 .send()
978 .await
979 .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
980
981 if !response.status().is_success() {
982 return Err(RemoteMonitorError::ApiError(
983 response.status().as_u16(),
984 response.text().await.unwrap_or_default(),
985 ));
986 }
987
988 Ok(())
989 }
990
991 pub async fn report_completed(
993 &self,
994 task_id: &str,
995 result: TaskResult,
996 ) -> Result<(), RemoteMonitorError> {
997 let url = format!("{}/api/v1/tasks/{}/complete", self.server_url, task_id);
998
999 let mut request = self.client.post(&url).json(&result);
1000 if let Some(ref token) = self.auth_token {
1001 request = request.header("Authorization", format!("Bearer {}", token));
1002 }
1003
1004 let response = request
1005 .send()
1006 .await
1007 .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
1008
1009 if !response.status().is_success() {
1010 return Err(RemoteMonitorError::ApiError(
1011 response.status().as_u16(),
1012 response.text().await.unwrap_or_default(),
1013 ));
1014 }
1015
1016 Ok(())
1017 }
1018
1019 pub async fn report_failed(
1021 &self,
1022 task_id: &str,
1023 error: &str,
1024 ) -> Result<(), RemoteMonitorError> {
1025 let url = format!("{}/api/v1/tasks/{}/fail", self.server_url, task_id);
1026
1027 let body = serde_json::json!({
1028 "error": error,
1029 });
1030
1031 let mut request = self.client.post(&url).json(&body);
1032 if let Some(ref token) = self.auth_token {
1033 request = request.header("Authorization", format!("Bearer {}", token));
1034 }
1035
1036 let response = request
1037 .send()
1038 .await
1039 .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
1040
1041 if !response.status().is_success() {
1042 return Err(RemoteMonitorError::ApiError(
1043 response.status().as_u16(),
1044 response.text().await.unwrap_or_default(),
1045 ));
1046 }
1047
1048 Ok(())
1049 }
1050
1051 pub async fn send_log(
1053 &self,
1054 task_id: &str,
1055 level: LogLevel,
1056 message: &str,
1057 ) -> Result<(), RemoteMonitorError> {
1058 let url = format!("{}/api/v1/tasks/{}/log", self.server_url, task_id);
1059
1060 let body = serde_json::json!({
1061 "level": level,
1062 "message": message,
1063 "timestamp": Utc::now(),
1064 });
1065
1066 let mut request = self.client.post(&url).json(&body);
1067 if let Some(ref token) = self.auth_token {
1068 request = request.header("Authorization", format!("Bearer {}", token));
1069 }
1070
1071 let response = request
1072 .send()
1073 .await
1074 .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
1075
1076 if !response.status().is_success() {
1077 return Err(RemoteMonitorError::ApiError(
1078 response.status().as_u16(),
1079 response.text().await.unwrap_or_default(),
1080 ));
1081 }
1082
1083 Ok(())
1084 }
1085}
1086
1087pub struct RemoteTaskBuilder {
1093 task: RemoteTask,
1094}
1095
1096impl RemoteTaskBuilder {
1097 pub fn new(
1099 id: impl Into<String>,
1100 node_id: impl Into<String>,
1101 agent_id: impl Into<String>,
1102 ) -> Self {
1103 Self {
1104 task: RemoteTask {
1105 id: id.into(),
1106 node_id: node_id.into(),
1107 agent_id: agent_id.into(),
1108 agent_name: String::new(),
1109 title: String::new(),
1110 description: None,
1111 status: RemoteTaskStatus::Queued,
1112 progress: 0.0,
1113 progress_message: None,
1114 current_step: None,
1115 total_steps: None,
1116 priority: TaskPriority::Normal,
1117 started_at: Utc::now(),
1118 completed_at: None,
1119 eta: None,
1120 result: None,
1121 error: None,
1122 resources: ResourceUsage::default(),
1123 logs: Vec::new(),
1124 metadata: HashMap::new(),
1125 },
1126 }
1127 }
1128
1129 pub fn agent_name(mut self, name: impl Into<String>) -> Self {
1131 self.task.agent_name = name.into();
1132 self
1133 }
1134
1135 pub fn title(mut self, title: impl Into<String>) -> Self {
1137 self.task.title = title.into();
1138 self
1139 }
1140
1141 pub fn description(mut self, description: impl Into<String>) -> Self {
1143 self.task.description = Some(description.into());
1144 self
1145 }
1146
1147 pub fn priority(mut self, priority: TaskPriority) -> Self {
1149 self.task.priority = priority;
1150 self
1151 }
1152
1153 pub fn total_steps(mut self, steps: u32) -> Self {
1155 self.task.total_steps = Some(steps);
1156 self
1157 }
1158
1159 pub fn eta(mut self, eta: DateTime<Utc>) -> Self {
1161 self.task.eta = Some(eta);
1162 self
1163 }
1164
1165 pub fn metadata(mut self, key: impl Into<String>, value: impl Serialize) -> Self {
1167 if let Ok(v) = serde_json::to_value(value) {
1168 self.task.metadata.insert(key.into(), v);
1169 }
1170 self
1171 }
1172
1173 pub fn build(self) -> RemoteTask {
1175 self.task
1176 }
1177}
1178
1179#[derive(Debug, Clone)]
1185pub enum RemoteMonitorError {
1186 NodeNotFound(String),
1188 TaskNotFound(String),
1190 Network(String),
1192 ApiError(u16, String),
1194 AuthError(String),
1196 InvalidState(String),
1198}
1199
1200impl std::fmt::Display for RemoteMonitorError {
1201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1202 match self {
1203 RemoteMonitorError::NodeNotFound(id) => write!(f, "Node not found: {}", id),
1204 RemoteMonitorError::TaskNotFound(id) => write!(f, "Task not found: {}", id),
1205 RemoteMonitorError::Network(e) => write!(f, "Network error: {}", e),
1206 RemoteMonitorError::ApiError(code, msg) => write!(f, "API error {}: {}", code, msg),
1207 RemoteMonitorError::AuthError(e) => write!(f, "Authentication error: {}", e),
1208 RemoteMonitorError::InvalidState(e) => write!(f, "Invalid state: {}", e),
1209 }
1210 }
1211}
1212
1213impl std::error::Error for RemoteMonitorError {}
1214
1215pub fn generate_task_id() -> String {
1221 use std::time::{SystemTime, UNIX_EPOCH};
1222 let timestamp = SystemTime::now()
1223 .duration_since(UNIX_EPOCH)
1224 .unwrap()
1225 .as_nanos();
1226 format!("task_{:x}", timestamp)
1227}
1228
1229pub fn generate_node_id() -> String {
1231 use std::time::{SystemTime, UNIX_EPOCH};
1232 let timestamp = SystemTime::now()
1233 .duration_since(UNIX_EPOCH)
1234 .unwrap()
1235 .as_nanos();
1236 format!("node_{:x}", timestamp)
1237}
1238
1239#[cfg(test)]
1244mod tests {
1245 use super::*;
1246
1247 #[tokio::test]
1248 async fn test_remote_monitor_creation() {
1249 let config = RemoteMonitorConfig::default();
1250 let monitor = RemoteMonitor::new(config);
1251
1252 let stats = monitor.get_stats().await;
1253 assert_eq!(stats.total_nodes, 0);
1254 assert_eq!(stats.total_tasks, 0);
1255 }
1256
1257 #[tokio::test]
1258 async fn test_node_registration() {
1259 let monitor = RemoteMonitor::new(RemoteMonitorConfig::default());
1260
1261 let node = RemoteNode {
1262 id: "test-node".to_string(),
1263 name: "Test Node".to_string(),
1264 address: "localhost:9876".to_string(),
1265 status: NodeStatus::Online,
1266 tags: vec!["test".to_string()],
1267 hardware: None,
1268 active_agents: 0,
1269 running_tasks: 0,
1270 last_heartbeat: Utc::now(),
1271 registered_at: Utc::now(),
1272 metadata: HashMap::new(),
1273 };
1274
1275 monitor.register_node(node).await.unwrap();
1276
1277 let retrieved = monitor.get_node("test-node").await;
1278 assert!(retrieved.is_some());
1279 assert_eq!(retrieved.unwrap().name, "Test Node");
1280 }
1281
1282 #[tokio::test]
1283 async fn test_task_creation() {
1284 let monitor = RemoteMonitor::new(RemoteMonitorConfig::default());
1285
1286 let node = RemoteNode {
1288 id: "test-node".to_string(),
1289 name: "Test Node".to_string(),
1290 address: "localhost:9876".to_string(),
1291 status: NodeStatus::Online,
1292 tags: vec![],
1293 hardware: None,
1294 active_agents: 1,
1295 running_tasks: 0,
1296 last_heartbeat: Utc::now(),
1297 registered_at: Utc::now(),
1298 metadata: HashMap::new(),
1299 };
1300 monitor.register_node(node).await.unwrap();
1301
1302 let task = RemoteTaskBuilder::new("task-1", "test-node", "agent-1")
1304 .agent_name("Test Agent")
1305 .title("Test Task")
1306 .description("A test task")
1307 .priority(TaskPriority::High)
1308 .build();
1309
1310 let task_id = monitor.create_task(task).await.unwrap();
1311 assert_eq!(task_id, "task-1");
1312
1313 let retrieved = monitor.get_task("task-1").await;
1314 assert!(retrieved.is_some());
1315 assert_eq!(retrieved.unwrap().title, "Test Task");
1316 }
1317
1318 #[tokio::test]
1319 async fn test_task_progress() {
1320 let monitor = RemoteMonitor::new(RemoteMonitorConfig::default());
1321
1322 let node = RemoteNode {
1324 id: "node-1".to_string(),
1325 name: "Node 1".to_string(),
1326 address: "localhost:9876".to_string(),
1327 status: NodeStatus::Online,
1328 tags: vec![],
1329 hardware: None,
1330 active_agents: 1,
1331 running_tasks: 0,
1332 last_heartbeat: Utc::now(),
1333 registered_at: Utc::now(),
1334 metadata: HashMap::new(),
1335 };
1336 monitor.register_node(node).await.unwrap();
1337
1338 let task = RemoteTaskBuilder::new("task-1", "node-1", "agent-1")
1340 .title("Progress Test")
1341 .build();
1342 monitor.create_task(task).await.unwrap();
1343
1344 monitor
1346 .update_task_progress("task-1", 0.5, Some("Halfway done".to_string()))
1347 .await
1348 .unwrap();
1349
1350 let task = monitor.get_task("task-1").await.unwrap();
1351 assert!((task.progress - 0.5).abs() < 0.01);
1352 assert_eq!(task.progress_message, Some("Halfway done".to_string()));
1353 }
1354
1355 #[test]
1356 fn test_task_builder() {
1357 let task = RemoteTaskBuilder::new("task-123", "node-1", "agent-1")
1358 .agent_name("My Agent")
1359 .title("Important Task")
1360 .description("Does important things")
1361 .priority(TaskPriority::Critical)
1362 .total_steps(5)
1363 .build();
1364
1365 assert_eq!(task.id, "task-123");
1366 assert_eq!(task.node_id, "node-1");
1367 assert_eq!(task.agent_id, "agent-1");
1368 assert_eq!(task.agent_name, "My Agent");
1369 assert_eq!(task.title, "Important Task");
1370 assert_eq!(task.priority, TaskPriority::Critical);
1371 assert_eq!(task.total_steps, Some(5));
1372 assert_eq!(task.status, RemoteTaskStatus::Queued);
1373 }
1374}