1#![allow(dead_code)]
15use chrono::{DateTime, Utc};
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::sync::Arc;
21use tokio::sync::{broadcast, RwLock};
22
23pub type NodeId = String;
29
30pub type RemoteTaskId = String;
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct RemoteNode {
36 pub id: NodeId,
38 pub name: String,
40 pub address: String,
42 pub status: NodeStatus,
44 pub tags: Vec<String>,
46 pub hardware: Option<HardwareInfo>,
48 pub active_agents: u32,
50 pub running_tasks: u32,
52 pub last_heartbeat: DateTime<Utc>,
54 pub registered_at: DateTime<Utc>,
56 pub metadata: HashMap<String, serde_json::Value>,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
62#[serde(rename_all = "snake_case")]
63pub enum NodeStatus {
64 Online,
66 Degraded,
68 Offline,
70 Maintenance,
72 Unknown,
74}
75
76impl Default for NodeStatus {
77 fn default() -> Self {
78 NodeStatus::Unknown
79 }
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct HardwareInfo {
85 pub cpu_cores: u32,
87 pub ram_total: u64,
89 pub ram_available: u64,
91 pub gpus: Vec<GpuInfo>,
93 pub os: String,
95 pub arch: String,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct GpuInfo {
102 pub name: String,
104 pub vram: u64,
106 pub cuda_version: Option<String>,
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct RemoteTask {
117 pub id: RemoteTaskId,
119 pub node_id: NodeId,
121 pub agent_id: String,
123 pub agent_name: String,
125 pub title: String,
127 pub description: Option<String>,
129 pub status: RemoteTaskStatus,
131 pub progress: f32,
133 pub progress_message: Option<String>,
135 pub current_step: Option<u32>,
137 pub total_steps: Option<u32>,
139 pub priority: TaskPriority,
141 pub started_at: DateTime<Utc>,
143 pub completed_at: Option<DateTime<Utc>>,
145 pub eta: Option<DateTime<Utc>>,
147 pub result: Option<TaskResult>,
149 pub error: Option<String>,
151 pub resources: ResourceUsage,
153 pub logs: Vec<TaskLogEntry>,
155 pub metadata: HashMap<String, serde_json::Value>,
157}
158
159#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
161#[serde(rename_all = "snake_case")]
162pub enum RemoteTaskStatus {
163 Queued,
165 Starting,
167 Running,
169 Paused,
171 Completed,
173 Failed,
175 Cancelled,
177 TimedOut,
179}
180
181#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
183#[serde(rename_all = "snake_case")]
184pub enum TaskPriority {
185 Low = 0,
186 Normal = 1,
187 High = 2,
188 Critical = 3,
189}
190
191impl Default for TaskPriority {
192 fn default() -> Self {
193 TaskPriority::Normal
194 }
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct TaskResult {
200 pub success: bool,
202 pub output: Option<serde_json::Value>,
204 pub artifacts: Vec<TaskArtifact>,
206 pub metrics: TaskMetrics,
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct TaskArtifact {
213 pub name: String,
215 pub artifact_type: ArtifactType,
217 pub location: String,
219 pub size: Option<u64>,
221 pub checksum: Option<String>,
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
227#[serde(rename_all = "snake_case")]
228pub enum ArtifactType {
229 File,
230 Directory,
231 Url,
232 Database,
233 Model,
234 Report,
235 Log,
236 Custom(String),
237}
238
239#[derive(Debug, Clone, Default, Serialize, Deserialize)]
241pub struct TaskMetrics {
242 pub duration_ms: u64,
244 pub tokens_used: Option<u64>,
246 pub api_calls: u32,
248 pub files_processed: u32,
250 pub errors_recovered: u32,
252 pub retries: u32,
254}
255
256#[derive(Debug, Clone, Default, Serialize, Deserialize)]
258pub struct ResourceUsage {
259 pub cpu_percent: f32,
261 pub memory_bytes: u64,
263 pub gpu_memory_bytes: Option<u64>,
265 pub network_tx_bytes: u64,
267 pub network_rx_bytes: u64,
269 pub disk_read_bytes: u64,
271 pub disk_write_bytes: u64,
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct TaskLogEntry {
278 pub timestamp: DateTime<Utc>,
280 pub level: LogLevel,
282 pub message: String,
284 pub data: Option<serde_json::Value>,
286}
287
288#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
290#[serde(rename_all = "snake_case")]
291pub enum LogLevel {
292 Trace,
293 Debug,
294 Info,
295 Warn,
296 Error,
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize)]
305#[serde(tag = "type", content = "data")]
306pub enum RemoteEvent {
307 NodeOnline(NodeId),
309 NodeOffline(NodeId),
311 NodeStatusChanged {
313 node_id: NodeId,
314 old_status: NodeStatus,
315 new_status: NodeStatus,
316 },
317 NodeHeartbeat {
319 node_id: NodeId,
320 timestamp: DateTime<Utc>,
321 },
322
323 TaskCreated(RemoteTask),
325 TaskStarted {
327 task_id: RemoteTaskId,
328 node_id: NodeId,
329 },
330 TaskProgress {
332 task_id: RemoteTaskId,
333 progress: f32,
334 message: Option<String>,
335 },
336 TaskStepCompleted {
338 task_id: RemoteTaskId,
339 step: u32,
340 total: u32,
341 description: Option<String>,
342 },
343 TaskCompleted {
345 task_id: RemoteTaskId,
346 result: TaskResult,
347 },
348 TaskFailed {
350 task_id: RemoteTaskId,
351 error: String,
352 },
353 TaskCancelled {
355 task_id: RemoteTaskId,
356 reason: Option<String>,
357 },
358 TaskLog {
360 task_id: RemoteTaskId,
361 entry: TaskLogEntry,
362 },
363
364 AgentRegistered {
366 node_id: NodeId,
367 agent_id: String,
368 agent_name: String,
369 },
370 AgentUnregistered { node_id: NodeId, agent_id: String },
372}
373
374#[derive(Debug, Clone, Serialize, Deserialize)]
380pub struct RemoteMonitorConfig {
381 pub bind_address: String,
383 pub port: u16,
385 pub tls_enabled: bool,
387 pub tls_cert_path: Option<String>,
389 pub tls_key_path: Option<String>,
391 pub auth_token: Option<String>,
393 pub heartbeat_interval_secs: u64,
395 pub node_timeout_secs: u64,
397 pub max_log_entries: usize,
399 pub metrics_enabled: bool,
401}
402
403impl Default for RemoteMonitorConfig {
404 fn default() -> Self {
405 Self {
406 bind_address: "0.0.0.0".to_string(),
407 port: 9876,
408 tls_enabled: false,
409 tls_cert_path: None,
410 tls_key_path: None,
411 auth_token: None,
412 heartbeat_interval_secs: 30,
413 node_timeout_secs: 90,
414 max_log_entries: 1000,
415 metrics_enabled: true,
416 }
417 }
418}
419
420pub struct RemoteMonitor {
422 config: RemoteMonitorConfig,
423 nodes: Arc<RwLock<HashMap<NodeId, RemoteNode>>>,
424 tasks: Arc<RwLock<HashMap<RemoteTaskId, RemoteTask>>>,
425 event_tx: broadcast::Sender<RemoteEvent>,
426}
427
428impl RemoteMonitor {
429 pub fn new(config: RemoteMonitorConfig) -> Self {
431 let (event_tx, _) = broadcast::channel(1000);
432
433 Self {
434 config,
435 nodes: Arc::new(RwLock::new(HashMap::new())),
436 tasks: Arc::new(RwLock::new(HashMap::new())),
437 event_tx,
438 }
439 }
440
441 pub fn subscribe(&self) -> broadcast::Receiver<RemoteEvent> {
443 self.event_tx.subscribe()
444 }
445
446 pub async fn register_node(&self, node: RemoteNode) -> Result<(), RemoteMonitorError> {
448 let node_id = node.id.clone();
449 let mut nodes = self.nodes.write().await;
450 nodes.insert(node_id.clone(), node);
451
452 let _ = self.event_tx.send(RemoteEvent::NodeOnline(node_id));
453 Ok(())
454 }
455
456 pub async fn unregister_node(&self, node_id: &str) -> Result<bool, RemoteMonitorError> {
458 let mut nodes = self.nodes.write().await;
459 let removed = nodes.remove(node_id).is_some();
460
461 if removed {
462 let _ = self
463 .event_tx
464 .send(RemoteEvent::NodeOffline(node_id.to_string()));
465 }
466
467 Ok(removed)
468 }
469
470 pub async fn heartbeat(&self, node_id: &str) -> Result<(), RemoteMonitorError> {
472 let mut nodes = self.nodes.write().await;
473
474 if let Some(node) = nodes.get_mut(node_id) {
475 node.last_heartbeat = Utc::now();
476 if node.status == NodeStatus::Offline || node.status == NodeStatus::Unknown {
477 let old_status = node.status;
478 node.status = NodeStatus::Online;
479 let _ = self.event_tx.send(RemoteEvent::NodeStatusChanged {
480 node_id: node_id.to_string(),
481 old_status,
482 new_status: NodeStatus::Online,
483 });
484 }
485 let _ = self.event_tx.send(RemoteEvent::NodeHeartbeat {
486 node_id: node_id.to_string(),
487 timestamp: node.last_heartbeat,
488 });
489 Ok(())
490 } else {
491 Err(RemoteMonitorError::NodeNotFound(node_id.to_string()))
492 }
493 }
494
495 pub async fn get_node(&self, node_id: &str) -> Option<RemoteNode> {
497 let nodes = self.nodes.read().await;
498 nodes.get(node_id).cloned()
499 }
500
501 pub async fn list_nodes(&self) -> Vec<RemoteNode> {
503 let nodes = self.nodes.read().await;
504 nodes.values().cloned().collect()
505 }
506
507 pub async fn list_nodes_by_status(&self, status: NodeStatus) -> Vec<RemoteNode> {
509 let nodes = self.nodes.read().await;
510 nodes
511 .values()
512 .filter(|n| n.status == status)
513 .cloned()
514 .collect()
515 }
516
517 pub async fn create_task(&self, task: RemoteTask) -> Result<RemoteTaskId, RemoteMonitorError> {
519 {
521 let nodes = self.nodes.read().await;
522 if !nodes.contains_key(&task.node_id) {
523 return Err(RemoteMonitorError::NodeNotFound(task.node_id.clone()));
524 }
525 }
526
527 let task_id = task.id.clone();
528 let mut tasks = self.tasks.write().await;
529 tasks.insert(task_id.clone(), task.clone());
530
531 let _ = self.event_tx.send(RemoteEvent::TaskCreated(task));
532 Ok(task_id)
533 }
534
535 pub async fn update_task_status(
537 &self,
538 task_id: &str,
539 status: RemoteTaskStatus,
540 ) -> Result<(), RemoteMonitorError> {
541 let mut tasks = self.tasks.write().await;
542
543 if let Some(task) = tasks.get_mut(task_id) {
544 let old_status = task.status;
545 task.status = status;
546
547 match status {
548 RemoteTaskStatus::Running if old_status != RemoteTaskStatus::Running => {
549 let _ = self.event_tx.send(RemoteEvent::TaskStarted {
550 task_id: task_id.to_string(),
551 node_id: task.node_id.clone(),
552 });
553 }
554 RemoteTaskStatus::Completed => {
555 task.completed_at = Some(Utc::now());
556 task.progress = 1.0;
557 }
558 RemoteTaskStatus::Failed
559 | RemoteTaskStatus::Cancelled
560 | RemoteTaskStatus::TimedOut => {
561 task.completed_at = Some(Utc::now());
562 }
563 _ => {}
564 }
565
566 Ok(())
567 } else {
568 Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
569 }
570 }
571
572 pub async fn update_task_progress(
574 &self,
575 task_id: &str,
576 progress: f32,
577 message: Option<String>,
578 ) -> Result<(), RemoteMonitorError> {
579 let mut tasks = self.tasks.write().await;
580
581 if let Some(task) = tasks.get_mut(task_id) {
582 task.progress = progress.clamp(0.0, 1.0);
583 task.progress_message = message.clone();
584
585 let _ = self.event_tx.send(RemoteEvent::TaskProgress {
586 task_id: task_id.to_string(),
587 progress: task.progress,
588 message,
589 });
590
591 Ok(())
592 } else {
593 Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
594 }
595 }
596
597 pub async fn update_task_step(
599 &self,
600 task_id: &str,
601 step: u32,
602 total: u32,
603 description: Option<String>,
604 ) -> Result<(), RemoteMonitorError> {
605 let mut tasks = self.tasks.write().await;
606
607 if let Some(task) = tasks.get_mut(task_id) {
608 task.current_step = Some(step);
609 task.total_steps = Some(total);
610 task.progress = step as f32 / total as f32;
611
612 let _ = self.event_tx.send(RemoteEvent::TaskStepCompleted {
613 task_id: task_id.to_string(),
614 step,
615 total,
616 description,
617 });
618
619 Ok(())
620 } else {
621 Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
622 }
623 }
624
625 pub async fn complete_task(
627 &self,
628 task_id: &str,
629 result: TaskResult,
630 ) -> Result<(), RemoteMonitorError> {
631 let node_id = {
632 let mut tasks = self.tasks.write().await;
633
634 if let Some(task) = tasks.get_mut(task_id) {
635 task.status = RemoteTaskStatus::Completed;
636 task.completed_at = Some(Utc::now());
637 task.progress = 1.0;
638 task.result = Some(result.clone());
639 task.node_id.clone()
640 } else {
641 return Err(RemoteMonitorError::TaskNotFound(task_id.to_string()));
642 }
643 };
644
645 let mut nodes = self.nodes.write().await;
647 if let Some(node) = nodes.get_mut(&node_id) {
648 node.running_tasks = node.running_tasks.saturating_sub(1);
649 }
650
651 let _ = self.event_tx.send(RemoteEvent::TaskCompleted {
652 task_id: task_id.to_string(),
653 result,
654 });
655
656 Ok(())
657 }
658
659 pub async fn fail_task(&self, task_id: &str, error: String) -> Result<(), RemoteMonitorError> {
661 let node_id = {
662 let mut tasks = self.tasks.write().await;
663
664 if let Some(task) = tasks.get_mut(task_id) {
665 task.status = RemoteTaskStatus::Failed;
666 task.completed_at = Some(Utc::now());
667 task.error = Some(error.clone());
668 task.node_id.clone()
669 } else {
670 return Err(RemoteMonitorError::TaskNotFound(task_id.to_string()));
671 }
672 };
673
674 let mut nodes = self.nodes.write().await;
676 if let Some(node) = nodes.get_mut(&node_id) {
677 node.running_tasks = node.running_tasks.saturating_sub(1);
678 }
679
680 let _ = self.event_tx.send(RemoteEvent::TaskFailed {
681 task_id: task_id.to_string(),
682 error,
683 });
684
685 Ok(())
686 }
687
688 pub async fn cancel_task(
690 &self,
691 task_id: &str,
692 reason: Option<String>,
693 ) -> Result<(), RemoteMonitorError> {
694 let node_id = {
695 let mut tasks = self.tasks.write().await;
696
697 if let Some(task) = tasks.get_mut(task_id) {
698 task.status = RemoteTaskStatus::Cancelled;
699 task.completed_at = Some(Utc::now());
700 task.node_id.clone()
701 } else {
702 return Err(RemoteMonitorError::TaskNotFound(task_id.to_string()));
703 }
704 };
705
706 let mut nodes = self.nodes.write().await;
708 if let Some(node) = nodes.get_mut(&node_id) {
709 node.running_tasks = node.running_tasks.saturating_sub(1);
710 }
711
712 let _ = self.event_tx.send(RemoteEvent::TaskCancelled {
713 task_id: task_id.to_string(),
714 reason,
715 });
716
717 Ok(())
718 }
719
720 pub async fn add_task_log(
722 &self,
723 task_id: &str,
724 entry: TaskLogEntry,
725 ) -> Result<(), RemoteMonitorError> {
726 let mut tasks = self.tasks.write().await;
727
728 if let Some(task) = tasks.get_mut(task_id) {
729 task.logs.push(entry.clone());
730
731 if task.logs.len() > self.config.max_log_entries {
733 let drain_count = task.logs.len() - self.config.max_log_entries;
734 task.logs.drain(0..drain_count);
735 }
736
737 let _ = self.event_tx.send(RemoteEvent::TaskLog {
738 task_id: task_id.to_string(),
739 entry,
740 });
741
742 Ok(())
743 } else {
744 Err(RemoteMonitorError::TaskNotFound(task_id.to_string()))
745 }
746 }
747
748 pub async fn get_task(&self, task_id: &str) -> Option<RemoteTask> {
750 let tasks = self.tasks.read().await;
751 tasks.get(task_id).cloned()
752 }
753
754 pub async fn list_tasks(&self) -> Vec<RemoteTask> {
756 let tasks = self.tasks.read().await;
757 tasks.values().cloned().collect()
758 }
759
760 pub async fn list_tasks_by_node(&self, node_id: &str) -> Vec<RemoteTask> {
762 let tasks = self.tasks.read().await;
763 tasks
764 .values()
765 .filter(|t| t.node_id == node_id)
766 .cloned()
767 .collect()
768 }
769
770 pub async fn list_tasks_by_status(&self, status: RemoteTaskStatus) -> Vec<RemoteTask> {
772 let tasks = self.tasks.read().await;
773 tasks
774 .values()
775 .filter(|t| t.status == status)
776 .cloned()
777 .collect()
778 }
779
780 pub async fn list_tasks_by_agent(&self, agent_id: &str) -> Vec<RemoteTask> {
782 let tasks = self.tasks.read().await;
783 tasks
784 .values()
785 .filter(|t| t.agent_id == agent_id)
786 .cloned()
787 .collect()
788 }
789
790 pub async fn get_stats(&self) -> MonitorStats {
792 let nodes = self.nodes.read().await;
793 let tasks = self.tasks.read().await;
794
795 let online_nodes = nodes
796 .values()
797 .filter(|n| n.status == NodeStatus::Online)
798 .count();
799 let total_agents: u32 = nodes.values().map(|n| n.active_agents).sum();
800
801 let running_tasks = tasks
802 .values()
803 .filter(|t| t.status == RemoteTaskStatus::Running)
804 .count();
805 let queued_tasks = tasks
806 .values()
807 .filter(|t| t.status == RemoteTaskStatus::Queued)
808 .count();
809 let completed_tasks = tasks
810 .values()
811 .filter(|t| t.status == RemoteTaskStatus::Completed)
812 .count();
813 let failed_tasks = tasks
814 .values()
815 .filter(|t| t.status == RemoteTaskStatus::Failed)
816 .count();
817
818 MonitorStats {
819 total_nodes: nodes.len(),
820 online_nodes,
821 total_agents: total_agents as usize,
822 total_tasks: tasks.len(),
823 running_tasks,
824 queued_tasks,
825 completed_tasks,
826 failed_tasks,
827 }
828 }
829
830 pub async fn check_node_timeouts(&self) {
832 let timeout = chrono::Duration::seconds(self.config.node_timeout_secs as i64);
833 let now = Utc::now();
834
835 let mut nodes = self.nodes.write().await;
836 for node in nodes.values_mut() {
837 if node.status == NodeStatus::Online && now - node.last_heartbeat > timeout {
838 let old_status = node.status;
839 node.status = NodeStatus::Offline;
840 let _ = self.event_tx.send(RemoteEvent::NodeStatusChanged {
841 node_id: node.id.clone(),
842 old_status,
843 new_status: NodeStatus::Offline,
844 });
845 }
846 }
847 }
848
849 pub fn config(&self) -> &RemoteMonitorConfig {
851 &self.config
852 }
853}
854
855#[derive(Debug, Clone, Serialize, Deserialize)]
857pub struct MonitorStats {
858 pub total_nodes: usize,
859 pub online_nodes: usize,
860 pub total_agents: usize,
861 pub total_tasks: usize,
862 pub running_tasks: usize,
863 pub queued_tasks: usize,
864 pub completed_tasks: usize,
865 pub failed_tasks: usize,
866}
867
868pub struct RemoteAgentClient {
874 pub node_id: NodeId,
876 server_url: String,
878 auth_token: Option<String>,
880 #[allow(dead_code)]
882 client: reqwest::Client,
883}
884
885impl RemoteAgentClient {
886 pub fn new(node_id: impl Into<String>, server_url: impl Into<String>) -> Self {
888 Self {
889 node_id: node_id.into(),
890 server_url: server_url.into(),
891 auth_token: None,
892 client: reqwest::Client::new(),
893 }
894 }
895
896 pub fn with_auth(mut self, token: impl Into<String>) -> Self {
898 self.auth_token = Some(token.into());
899 self
900 }
901
902 pub async fn heartbeat(&self) -> Result<(), RemoteMonitorError> {
904 let url = format!(
905 "{}/api/v1/nodes/{}/heartbeat",
906 self.server_url, self.node_id
907 );
908
909 let mut request = self.client.post(&url);
910 if let Some(ref token) = self.auth_token {
911 request = request.header("Authorization", format!("Bearer {}", token));
912 }
913
914 let response = request
915 .send()
916 .await
917 .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
918
919 if !response.status().is_success() {
920 return Err(RemoteMonitorError::ApiError(
921 response.status().as_u16(),
922 response.text().await.unwrap_or_default(),
923 ));
924 }
925
926 Ok(())
927 }
928
929 pub async fn report_progress(
931 &self,
932 task_id: &str,
933 progress: f32,
934 message: Option<&str>,
935 ) -> Result<(), RemoteMonitorError> {
936 let url = format!("{}/api/v1/tasks/{}/progress", self.server_url, task_id);
937
938 let body = serde_json::json!({
939 "progress": progress,
940 "message": message,
941 });
942
943 let mut request = self.client.post(&url).json(&body);
944 if let Some(ref token) = self.auth_token {
945 request = request.header("Authorization", format!("Bearer {}", token));
946 }
947
948 let response = request
949 .send()
950 .await
951 .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
952
953 if !response.status().is_success() {
954 return Err(RemoteMonitorError::ApiError(
955 response.status().as_u16(),
956 response.text().await.unwrap_or_default(),
957 ));
958 }
959
960 Ok(())
961 }
962
963 pub async fn report_step(
965 &self,
966 task_id: &str,
967 step: u32,
968 total: u32,
969 description: Option<&str>,
970 ) -> Result<(), RemoteMonitorError> {
971 let url = format!("{}/api/v1/tasks/{}/step", self.server_url, task_id);
972
973 let body = serde_json::json!({
974 "step": step,
975 "total": total,
976 "description": description,
977 });
978
979 let mut request = self.client.post(&url).json(&body);
980 if let Some(ref token) = self.auth_token {
981 request = request.header("Authorization", format!("Bearer {}", token));
982 }
983
984 let response = request
985 .send()
986 .await
987 .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
988
989 if !response.status().is_success() {
990 return Err(RemoteMonitorError::ApiError(
991 response.status().as_u16(),
992 response.text().await.unwrap_or_default(),
993 ));
994 }
995
996 Ok(())
997 }
998
999 pub async fn report_completed(
1001 &self,
1002 task_id: &str,
1003 result: TaskResult,
1004 ) -> Result<(), RemoteMonitorError> {
1005 let url = format!("{}/api/v1/tasks/{}/complete", self.server_url, task_id);
1006
1007 let mut request = self.client.post(&url).json(&result);
1008 if let Some(ref token) = self.auth_token {
1009 request = request.header("Authorization", format!("Bearer {}", token));
1010 }
1011
1012 let response = request
1013 .send()
1014 .await
1015 .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
1016
1017 if !response.status().is_success() {
1018 return Err(RemoteMonitorError::ApiError(
1019 response.status().as_u16(),
1020 response.text().await.unwrap_or_default(),
1021 ));
1022 }
1023
1024 Ok(())
1025 }
1026
1027 pub async fn report_failed(
1029 &self,
1030 task_id: &str,
1031 error: &str,
1032 ) -> Result<(), RemoteMonitorError> {
1033 let url = format!("{}/api/v1/tasks/{}/fail", self.server_url, task_id);
1034
1035 let body = serde_json::json!({
1036 "error": error,
1037 });
1038
1039 let mut request = self.client.post(&url).json(&body);
1040 if let Some(ref token) = self.auth_token {
1041 request = request.header("Authorization", format!("Bearer {}", token));
1042 }
1043
1044 let response = request
1045 .send()
1046 .await
1047 .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
1048
1049 if !response.status().is_success() {
1050 return Err(RemoteMonitorError::ApiError(
1051 response.status().as_u16(),
1052 response.text().await.unwrap_or_default(),
1053 ));
1054 }
1055
1056 Ok(())
1057 }
1058
1059 pub async fn send_log(
1061 &self,
1062 task_id: &str,
1063 level: LogLevel,
1064 message: &str,
1065 ) -> Result<(), RemoteMonitorError> {
1066 let url = format!("{}/api/v1/tasks/{}/log", self.server_url, task_id);
1067
1068 let body = serde_json::json!({
1069 "level": level,
1070 "message": message,
1071 "timestamp": Utc::now(),
1072 });
1073
1074 let mut request = self.client.post(&url).json(&body);
1075 if let Some(ref token) = self.auth_token {
1076 request = request.header("Authorization", format!("Bearer {}", token));
1077 }
1078
1079 let response = request
1080 .send()
1081 .await
1082 .map_err(|e| RemoteMonitorError::Network(e.to_string()))?;
1083
1084 if !response.status().is_success() {
1085 return Err(RemoteMonitorError::ApiError(
1086 response.status().as_u16(),
1087 response.text().await.unwrap_or_default(),
1088 ));
1089 }
1090
1091 Ok(())
1092 }
1093}
1094
1095pub struct RemoteTaskBuilder {
1101 task: RemoteTask,
1102}
1103
1104impl RemoteTaskBuilder {
1105 pub fn new(
1107 id: impl Into<String>,
1108 node_id: impl Into<String>,
1109 agent_id: impl Into<String>,
1110 ) -> Self {
1111 Self {
1112 task: RemoteTask {
1113 id: id.into(),
1114 node_id: node_id.into(),
1115 agent_id: agent_id.into(),
1116 agent_name: String::new(),
1117 title: String::new(),
1118 description: None,
1119 status: RemoteTaskStatus::Queued,
1120 progress: 0.0,
1121 progress_message: None,
1122 current_step: None,
1123 total_steps: None,
1124 priority: TaskPriority::Normal,
1125 started_at: Utc::now(),
1126 completed_at: None,
1127 eta: None,
1128 result: None,
1129 error: None,
1130 resources: ResourceUsage::default(),
1131 logs: Vec::new(),
1132 metadata: HashMap::new(),
1133 },
1134 }
1135 }
1136
1137 pub fn agent_name(mut self, name: impl Into<String>) -> Self {
1139 self.task.agent_name = name.into();
1140 self
1141 }
1142
1143 pub fn title(mut self, title: impl Into<String>) -> Self {
1145 self.task.title = title.into();
1146 self
1147 }
1148
1149 pub fn description(mut self, description: impl Into<String>) -> Self {
1151 self.task.description = Some(description.into());
1152 self
1153 }
1154
1155 pub fn priority(mut self, priority: TaskPriority) -> Self {
1157 self.task.priority = priority;
1158 self
1159 }
1160
1161 pub fn total_steps(mut self, steps: u32) -> Self {
1163 self.task.total_steps = Some(steps);
1164 self
1165 }
1166
1167 pub fn eta(mut self, eta: DateTime<Utc>) -> Self {
1169 self.task.eta = Some(eta);
1170 self
1171 }
1172
1173 pub fn metadata(mut self, key: impl Into<String>, value: impl Serialize) -> Self {
1175 if let Ok(v) = serde_json::to_value(value) {
1176 self.task.metadata.insert(key.into(), v);
1177 }
1178 self
1179 }
1180
1181 pub fn build(self) -> RemoteTask {
1183 self.task
1184 }
1185}
1186
1187#[derive(Debug, Clone)]
1193pub enum RemoteMonitorError {
1194 NodeNotFound(String),
1196 TaskNotFound(String),
1198 Network(String),
1200 ApiError(u16, String),
1202 AuthError(String),
1204 InvalidState(String),
1206}
1207
1208impl std::fmt::Display for RemoteMonitorError {
1209 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1210 match self {
1211 RemoteMonitorError::NodeNotFound(id) => write!(f, "Node not found: {}", id),
1212 RemoteMonitorError::TaskNotFound(id) => write!(f, "Task not found: {}", id),
1213 RemoteMonitorError::Network(e) => write!(f, "Network error: {}", e),
1214 RemoteMonitorError::ApiError(code, msg) => write!(f, "API error {}: {}", code, msg),
1215 RemoteMonitorError::AuthError(e) => write!(f, "Authentication error: {}", e),
1216 RemoteMonitorError::InvalidState(e) => write!(f, "Invalid state: {}", e),
1217 }
1218 }
1219}
1220
1221impl std::error::Error for RemoteMonitorError {}
1222
1223pub fn generate_task_id() -> String {
1229 use std::time::{SystemTime, UNIX_EPOCH};
1230 let timestamp = SystemTime::now()
1231 .duration_since(UNIX_EPOCH)
1232 .unwrap()
1233 .as_nanos();
1234 format!("task_{:x}", timestamp)
1235}
1236
1237pub fn generate_node_id() -> String {
1239 use std::time::{SystemTime, UNIX_EPOCH};
1240 let timestamp = SystemTime::now()
1241 .duration_since(UNIX_EPOCH)
1242 .unwrap()
1243 .as_nanos();
1244 format!("node_{:x}", timestamp)
1245}
1246
1247#[cfg(test)]
1252mod tests {
1253 use super::*;
1254
1255 #[tokio::test]
1256 async fn test_remote_monitor_creation() {
1257 let config = RemoteMonitorConfig::default();
1258 let monitor = RemoteMonitor::new(config);
1259
1260 let stats = monitor.get_stats().await;
1261 assert_eq!(stats.total_nodes, 0);
1262 assert_eq!(stats.total_tasks, 0);
1263 }
1264
1265 #[tokio::test]
1266 async fn test_node_registration() {
1267 let monitor = RemoteMonitor::new(RemoteMonitorConfig::default());
1268
1269 let node = RemoteNode {
1270 id: "test-node".to_string(),
1271 name: "Test Node".to_string(),
1272 address: "localhost:9876".to_string(),
1273 status: NodeStatus::Online,
1274 tags: vec!["test".to_string()],
1275 hardware: None,
1276 active_agents: 0,
1277 running_tasks: 0,
1278 last_heartbeat: Utc::now(),
1279 registered_at: Utc::now(),
1280 metadata: HashMap::new(),
1281 };
1282
1283 monitor.register_node(node).await.unwrap();
1284
1285 let retrieved = monitor.get_node("test-node").await;
1286 assert!(retrieved.is_some());
1287 assert_eq!(retrieved.unwrap().name, "Test Node");
1288 }
1289
1290 #[tokio::test]
1291 async fn test_task_creation() {
1292 let monitor = RemoteMonitor::new(RemoteMonitorConfig::default());
1293
1294 let node = RemoteNode {
1296 id: "test-node".to_string(),
1297 name: "Test Node".to_string(),
1298 address: "localhost:9876".to_string(),
1299 status: NodeStatus::Online,
1300 tags: vec![],
1301 hardware: None,
1302 active_agents: 1,
1303 running_tasks: 0,
1304 last_heartbeat: Utc::now(),
1305 registered_at: Utc::now(),
1306 metadata: HashMap::new(),
1307 };
1308 monitor.register_node(node).await.unwrap();
1309
1310 let task = RemoteTaskBuilder::new("task-1", "test-node", "agent-1")
1312 .agent_name("Test Agent")
1313 .title("Test Task")
1314 .description("A test task")
1315 .priority(TaskPriority::High)
1316 .build();
1317
1318 let task_id = monitor.create_task(task).await.unwrap();
1319 assert_eq!(task_id, "task-1");
1320
1321 let retrieved = monitor.get_task("task-1").await;
1322 assert!(retrieved.is_some());
1323 assert_eq!(retrieved.unwrap().title, "Test Task");
1324 }
1325
1326 #[tokio::test]
1327 async fn test_task_progress() {
1328 let monitor = RemoteMonitor::new(RemoteMonitorConfig::default());
1329
1330 let node = RemoteNode {
1332 id: "node-1".to_string(),
1333 name: "Node 1".to_string(),
1334 address: "localhost:9876".to_string(),
1335 status: NodeStatus::Online,
1336 tags: vec![],
1337 hardware: None,
1338 active_agents: 1,
1339 running_tasks: 0,
1340 last_heartbeat: Utc::now(),
1341 registered_at: Utc::now(),
1342 metadata: HashMap::new(),
1343 };
1344 monitor.register_node(node).await.unwrap();
1345
1346 let task = RemoteTaskBuilder::new("task-1", "node-1", "agent-1")
1348 .title("Progress Test")
1349 .build();
1350 monitor.create_task(task).await.unwrap();
1351
1352 monitor
1354 .update_task_progress("task-1", 0.5, Some("Halfway done".to_string()))
1355 .await
1356 .unwrap();
1357
1358 let task = monitor.get_task("task-1").await.unwrap();
1359 assert!((task.progress - 0.5).abs() < 0.01);
1360 assert_eq!(task.progress_message, Some("Halfway done".to_string()));
1361 }
1362
1363 #[test]
1364 fn test_task_builder() {
1365 let task = RemoteTaskBuilder::new("task-123", "node-1", "agent-1")
1366 .agent_name("My Agent")
1367 .title("Important Task")
1368 .description("Does important things")
1369 .priority(TaskPriority::Critical)
1370 .total_steps(5)
1371 .build();
1372
1373 assert_eq!(task.id, "task-123");
1374 assert_eq!(task.node_id, "node-1");
1375 assert_eq!(task.agent_id, "agent-1");
1376 assert_eq!(task.agent_name, "My Agent");
1377 assert_eq!(task.title, "Important Task");
1378 assert_eq!(task.priority, TaskPriority::Critical);
1379 assert_eq!(task.total_steps, Some(5));
1380 assert_eq!(task.status, RemoteTaskStatus::Queued);
1381 }
1382}