1use anyhow::{Result, anyhow};
12use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, VecDeque};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::{RwLock, broadcast, oneshot};
17use tokio::task::JoinHandle;
18
19use crate::resource_locks::{ResourceScope, ResourceType};
20
21pub const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
23
24pub const DEFAULT_MAX_MISSED_HEARTBEATS: u32 = 3;
26
27const MAX_OUTPUT_LINES: usize = 100;
29
30pub struct OperationTracker {
32 operations: RwLock<HashMap<String, ActiveOperation>>,
34 heartbeat_interval: Duration,
36 max_missed_heartbeats: u32,
38 event_sender: broadcast::Sender<OperationEvent>,
40 next_id: RwLock<u64>,
42}
43
44#[derive(Debug, Clone)]
46pub struct ActiveOperation {
47 pub operation_id: String,
49 pub agent_id: String,
51 pub resource_type: ResourceType,
53 pub scope: ResourceScope,
55 pub started_at: Instant,
57 pub last_heartbeat: Instant,
59 pub process_id: Option<u32>,
61 pub status: String,
63 pub output_lines: VecDeque<String>,
65 pub description: String,
67 pub completed: bool,
69}
70
71impl ActiveOperation {
72 pub fn is_heartbeat_alive(&self, heartbeat_interval: Duration, max_missed: u32) -> bool {
74 if self.completed {
75 return false;
76 }
77 let max_silence = heartbeat_interval * max_missed;
78 self.last_heartbeat.elapsed() < max_silence
79 }
80
81 pub fn elapsed(&self) -> Duration {
83 self.started_at.elapsed()
84 }
85
86 pub fn time_since_heartbeat(&self) -> Duration {
88 self.last_heartbeat.elapsed()
89 }
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
94#[serde(tag = "type", rename_all = "snake_case")]
95pub enum OperationEvent {
96 Started {
98 operation_id: String,
100 agent_id: String,
102 resource_type: String,
104 scope: String,
106 description: String,
108 },
109 Heartbeat {
111 operation_id: String,
113 agent_id: String,
115 status: String,
117 elapsed_secs: u64,
119 },
120 Completed {
122 operation_id: String,
124 agent_id: String,
126 resource_type: String,
128 scope: String,
130 duration_secs: u64,
132 success: bool,
134 summary: String,
136 },
137 Stale {
139 operation_id: String,
141 agent_id: String,
143 resource_type: String,
145 scope: String,
147 last_heartbeat_secs_ago: u64,
149 },
150 ProcessTerminated {
152 operation_id: String,
154 agent_id: String,
156 process_id: u32,
158 exit_status: Option<i32>,
160 },
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct OperationStatus {
166 pub operation_id: String,
168 pub agent_id: String,
170 pub resource_type: String,
172 pub scope: String,
174 pub started_at_secs_ago: u64,
176 pub last_heartbeat_secs_ago: u64,
178 pub is_alive: bool,
180 pub status: String,
182 pub description: String,
184 pub process_id: Option<u32>,
186 pub recent_output: Vec<String>,
188}
189
190pub struct OperationHandle {
196 tracker: Arc<OperationTracker>,
197 operation_id: String,
198 agent_id: String,
199 _resource_type: ResourceType,
200 _scope: ResourceScope,
201 heartbeat_task: Option<JoinHandle<()>>,
202 completion_sender: Option<oneshot::Sender<OperationCompletion>>,
203}
204
205#[derive(Debug)]
207struct OperationCompletion {
208 success: bool,
209 summary: String,
210}
211
212impl OperationTracker {
213 pub fn new() -> Arc<Self> {
215 Self::with_config(DEFAULT_HEARTBEAT_INTERVAL, DEFAULT_MAX_MISSED_HEARTBEATS)
216 }
217
218 pub fn with_config(heartbeat_interval: Duration, max_missed_heartbeats: u32) -> Arc<Self> {
220 let (event_sender, _) = broadcast::channel(256);
221 Arc::new(Self {
222 operations: RwLock::new(HashMap::new()),
223 heartbeat_interval,
224 max_missed_heartbeats,
225 event_sender,
226 next_id: RwLock::new(1),
227 })
228 }
229
230 pub fn subscribe(&self) -> broadcast::Receiver<OperationEvent> {
232 self.event_sender.subscribe()
233 }
234
235 async fn generate_id(&self) -> String {
237 let mut id = self.next_id.write().await;
238 let operation_id = format!("op-{}", *id);
239 *id += 1;
240 operation_id
241 }
242
243 pub async fn start_operation(
250 self: &Arc<Self>,
251 agent_id: &str,
252 resource_type: ResourceType,
253 scope: ResourceScope,
254 description: &str,
255 ) -> Result<OperationHandle> {
256 let operation_id = self.generate_id().await;
257 let now = Instant::now();
258
259 let operation = ActiveOperation {
260 operation_id: operation_id.clone(),
261 agent_id: agent_id.to_string(),
262 resource_type,
263 scope: scope.clone(),
264 started_at: now,
265 last_heartbeat: now,
266 process_id: None,
267 status: "Starting".to_string(),
268 output_lines: VecDeque::new(),
269 description: description.to_string(),
270 completed: false,
271 };
272
273 {
275 let mut ops = self.operations.write().await;
276 ops.insert(operation_id.clone(), operation);
277 }
278
279 let _ = self.event_sender.send(OperationEvent::Started {
281 operation_id: operation_id.clone(),
282 agent_id: agent_id.to_string(),
283 resource_type: format!("{:?}", resource_type),
284 scope: format!("{:?}", scope),
285 description: description.to_string(),
286 });
287
288 let (completion_sender, completion_receiver) = oneshot::channel::<OperationCompletion>();
290
291 let tracker = Arc::clone(self);
293 let op_id = operation_id.clone();
294 let agent = agent_id.to_string();
295 let heartbeat_interval = self.heartbeat_interval;
296
297 let heartbeat_task = tokio::spawn(async move {
298 let mut interval = tokio::time::interval(heartbeat_interval);
299
300 loop {
301 interval.tick().await;
302
303 let ops = tracker.operations.read().await;
305 if let Some(op) = ops.get(&op_id) {
306 if op.completed {
307 break;
308 }
309
310 if let Some(pid) = op.process_id
312 && !is_process_alive(pid)
313 {
314 drop(ops);
315 let _ = tracker
317 .event_sender
318 .send(OperationEvent::ProcessTerminated {
319 operation_id: op_id.clone(),
320 agent_id: agent.clone(),
321 process_id: pid,
322 exit_status: None, });
324 }
325 } else {
326 break;
328 }
329 }
330 });
331
332 let tracker_for_completion = Arc::clone(self);
334 let op_id_for_completion = operation_id.clone();
335 let agent_for_completion = agent_id.to_string();
336 let resource_type_for_completion = resource_type;
337 let scope_for_completion = scope.clone();
338
339 tokio::spawn(async move {
340 if let Ok(completion) = completion_receiver.await {
341 tracker_for_completion
342 .complete_operation_internal(
343 &op_id_for_completion,
344 &agent_for_completion,
345 resource_type_for_completion,
346 &scope_for_completion,
347 completion.success,
348 &completion.summary,
349 )
350 .await;
351 }
352 });
353
354 Ok(OperationHandle {
355 tracker: Arc::clone(self),
356 operation_id,
357 agent_id: agent_id.to_string(),
358 _resource_type: resource_type,
359 _scope: scope,
360 heartbeat_task: Some(heartbeat_task),
361 completion_sender: Some(completion_sender),
362 })
363 }
364
365 pub async fn heartbeat(&self, operation_id: &str, status: &str) -> Result<()> {
367 let mut ops = self.operations.write().await;
368 let op = ops
369 .get_mut(operation_id)
370 .ok_or_else(|| anyhow!("Operation {} not found", operation_id))?;
371
372 op.last_heartbeat = Instant::now();
373 op.status = status.to_string();
374
375 let _ = self.event_sender.send(OperationEvent::Heartbeat {
376 operation_id: operation_id.to_string(),
377 agent_id: op.agent_id.clone(),
378 status: status.to_string(),
379 elapsed_secs: op.elapsed().as_secs(),
380 });
381
382 Ok(())
383 }
384
385 pub async fn add_output(&self, operation_id: &str, line: &str) -> Result<()> {
387 let mut ops = self.operations.write().await;
388 let op = ops
389 .get_mut(operation_id)
390 .ok_or_else(|| anyhow!("Operation {} not found", operation_id))?;
391
392 op.output_lines.push_back(line.to_string());
393 if op.output_lines.len() > MAX_OUTPUT_LINES {
394 op.output_lines.pop_front();
395 }
396
397 Ok(())
398 }
399
400 pub async fn attach_process(&self, operation_id: &str, process_id: u32) -> Result<()> {
402 let mut ops = self.operations.write().await;
403 let op = ops
404 .get_mut(operation_id)
405 .ok_or_else(|| anyhow!("Operation {} not found", operation_id))?;
406
407 op.process_id = Some(process_id);
408 Ok(())
409 }
410
411 pub async fn is_alive(&self, operation_id: &str) -> bool {
413 let ops = self.operations.read().await;
414 if let Some(op) = ops.get(operation_id) {
415 if op.completed {
416 return false;
417 }
418
419 if !op.is_heartbeat_alive(self.heartbeat_interval, self.max_missed_heartbeats) {
421 return false;
422 }
423
424 if let Some(pid) = op.process_id
426 && !is_process_alive(pid)
427 {
428 return false;
429 }
430
431 true
432 } else {
433 false
434 }
435 }
436
437 pub async fn get_status(&self, operation_id: &str) -> Option<OperationStatus> {
439 let ops = self.operations.read().await;
440 ops.get(operation_id).map(|op| OperationStatus {
441 operation_id: op.operation_id.clone(),
442 agent_id: op.agent_id.clone(),
443 resource_type: format!("{:?}", op.resource_type),
444 scope: format!("{:?}", op.scope),
445 started_at_secs_ago: op.elapsed().as_secs(),
446 last_heartbeat_secs_ago: op.time_since_heartbeat().as_secs(),
447 is_alive: op.is_heartbeat_alive(self.heartbeat_interval, self.max_missed_heartbeats)
448 && op.process_id.is_none_or(is_process_alive),
449 status: op.status.clone(),
450 description: op.description.clone(),
451 process_id: op.process_id,
452 recent_output: op.output_lines.iter().cloned().collect(),
453 })
454 }
455
456 pub async fn list_operations(&self) -> Vec<OperationStatus> {
458 let ops = self.operations.read().await;
459 ops.values()
460 .filter(|op| !op.completed)
461 .map(|op| OperationStatus {
462 operation_id: op.operation_id.clone(),
463 agent_id: op.agent_id.clone(),
464 resource_type: format!("{:?}", op.resource_type),
465 scope: format!("{:?}", op.scope),
466 started_at_secs_ago: op.elapsed().as_secs(),
467 last_heartbeat_secs_ago: op.time_since_heartbeat().as_secs(),
468 is_alive: op
469 .is_heartbeat_alive(self.heartbeat_interval, self.max_missed_heartbeats)
470 && op.process_id.is_none_or(is_process_alive),
471 status: op.status.clone(),
472 description: op.description.clone(),
473 process_id: op.process_id,
474 recent_output: op.output_lines.iter().cloned().collect(),
475 })
476 .collect()
477 }
478
479 pub async fn operations_for_agent(&self, agent_id: &str) -> Vec<OperationStatus> {
481 let ops = self.operations.read().await;
482 ops.values()
483 .filter(|op| op.agent_id == agent_id && !op.completed)
484 .map(|op| OperationStatus {
485 operation_id: op.operation_id.clone(),
486 agent_id: op.agent_id.clone(),
487 resource_type: format!("{:?}", op.resource_type),
488 scope: format!("{:?}", op.scope),
489 started_at_secs_ago: op.elapsed().as_secs(),
490 last_heartbeat_secs_ago: op.time_since_heartbeat().as_secs(),
491 is_alive: op
492 .is_heartbeat_alive(self.heartbeat_interval, self.max_missed_heartbeats)
493 && op.process_id.is_none_or(is_process_alive),
494 status: op.status.clone(),
495 description: op.description.clone(),
496 process_id: op.process_id,
497 recent_output: op.output_lines.iter().cloned().collect(),
498 })
499 .collect()
500 }
501
502 pub async fn find_operation(
504 &self,
505 resource_type: ResourceType,
506 scope: &ResourceScope,
507 ) -> Option<OperationStatus> {
508 let ops = self.operations.read().await;
509 ops.values()
510 .find(|op| op.resource_type == resource_type && &op.scope == scope && !op.completed)
511 .map(|op| OperationStatus {
512 operation_id: op.operation_id.clone(),
513 agent_id: op.agent_id.clone(),
514 resource_type: format!("{:?}", op.resource_type),
515 scope: format!("{:?}", op.scope),
516 started_at_secs_ago: op.elapsed().as_secs(),
517 last_heartbeat_secs_ago: op.time_since_heartbeat().as_secs(),
518 is_alive: op
519 .is_heartbeat_alive(self.heartbeat_interval, self.max_missed_heartbeats)
520 && op.process_id.is_none_or(is_process_alive),
521 status: op.status.clone(),
522 description: op.description.clone(),
523 process_id: op.process_id,
524 recent_output: op.output_lines.iter().cloned().collect(),
525 })
526 }
527
528 pub async fn cleanup_stale(&self) -> Vec<String> {
530 let mut ops = self.operations.write().await;
531 let mut stale = Vec::new();
532
533 ops.retain(|id, op| {
534 if op.completed {
535 return false; }
537
538 let is_alive = op
539 .is_heartbeat_alive(self.heartbeat_interval, self.max_missed_heartbeats)
540 && op.process_id.is_none_or(is_process_alive);
541
542 if !is_alive {
543 stale.push(id.clone());
544 let _ = self.event_sender.send(OperationEvent::Stale {
545 operation_id: id.clone(),
546 agent_id: op.agent_id.clone(),
547 resource_type: format!("{:?}", op.resource_type),
548 scope: format!("{:?}", op.scope),
549 last_heartbeat_secs_ago: op.time_since_heartbeat().as_secs(),
550 });
551 false
552 } else {
553 true
554 }
555 });
556
557 stale
558 }
559
560 async fn complete_operation_internal(
562 &self,
563 operation_id: &str,
564 agent_id: &str,
565 resource_type: ResourceType,
566 scope: &ResourceScope,
567 success: bool,
568 summary: &str,
569 ) {
570 let duration_secs = {
571 let mut ops = self.operations.write().await;
572 if let Some(op) = ops.get_mut(operation_id) {
573 op.completed = true;
574 op.status = if success { "Completed" } else { "Failed" }.to_string();
575 op.elapsed().as_secs()
576 } else {
577 0
578 }
579 };
580
581 let _ = self.event_sender.send(OperationEvent::Completed {
582 operation_id: operation_id.to_string(),
583 agent_id: agent_id.to_string(),
584 resource_type: format!("{:?}", resource_type),
585 scope: format!("{:?}", scope),
586 duration_secs,
587 success,
588 summary: summary.to_string(),
589 });
590
591 let tracker = Arc::clone(&Arc::new(self.clone_inner().await));
593 let op_id = operation_id.to_string();
594 tokio::spawn(async move {
595 tokio::time::sleep(Duration::from_secs(5)).await;
596 let mut ops = tracker.operations.write().await;
597 ops.remove(&op_id);
598 });
599 }
600
601 async fn clone_inner(&self) -> OperationTrackerInner {
603 OperationTrackerInner {
604 operations: Arc::new(RwLock::new(self.operations.read().await.clone())),
605 }
606 }
607}
608
609struct OperationTrackerInner {
611 operations: Arc<RwLock<HashMap<String, ActiveOperation>>>,
612}
613
614impl Default for OperationTracker {
615 fn default() -> Self {
616 let (event_sender, _) = broadcast::channel(256);
617 Self {
618 operations: RwLock::new(HashMap::new()),
619 heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
620 max_missed_heartbeats: DEFAULT_MAX_MISSED_HEARTBEATS,
621 event_sender,
622 next_id: RwLock::new(1),
623 }
624 }
625}
626
627impl OperationHandle {
628 pub fn operation_id(&self) -> &str {
630 &self.operation_id
631 }
632
633 pub fn agent_id(&self) -> &str {
635 &self.agent_id
636 }
637
638 pub async fn update_status(&self, status: &str) -> Result<()> {
640 self.tracker.heartbeat(&self.operation_id, status).await
641 }
642
643 pub async fn add_output(&self, line: &str) -> Result<()> {
645 self.tracker.add_output(&self.operation_id, line).await
646 }
647
648 pub async fn attach_process(&self, process_id: u32) -> Result<()> {
650 self.tracker
651 .attach_process(&self.operation_id, process_id)
652 .await
653 }
654
655 pub fn complete(mut self, summary: &str) {
657 if let Some(sender) = self.completion_sender.take() {
658 let _ = sender.send(OperationCompletion {
659 success: true,
660 summary: summary.to_string(),
661 });
662 }
663 }
664
665 pub fn fail(mut self, error: &str) {
667 if let Some(sender) = self.completion_sender.take() {
668 let _ = sender.send(OperationCompletion {
669 success: false,
670 summary: error.to_string(),
671 });
672 }
673 }
674}
675
676impl Drop for OperationHandle {
677 fn drop(&mut self) {
678 if let Some(task) = self.heartbeat_task.take() {
680 task.abort();
681 }
682
683 if let Some(sender) = self.completion_sender.take() {
685 let _ = sender.send(OperationCompletion {
686 success: true,
687 summary: "Operation handle dropped".to_string(),
688 });
689 }
690 }
691}
692
693#[cfg(all(unix, feature = "native"))]
695fn is_process_alive(pid: u32) -> bool {
696 unsafe { libc::kill(pid as i32, 0) == 0 }
698}
699
700#[cfg(windows)]
701fn is_process_alive(pid: u32) -> bool {
702 use std::process::Command;
703 Command::new("tasklist")
705 .args(["/FI", &format!("PID eq {}", pid), "/NH"])
706 .output()
707 .map(|output| {
708 let stdout = String::from_utf8_lossy(&output.stdout);
709 stdout.contains(&pid.to_string())
710 })
711 .unwrap_or(false)
712}
713
714#[cfg(all(not(windows), not(all(unix, feature = "native"))))]
715fn is_process_alive(_pid: u32) -> bool {
716 false
718}
719
720#[cfg(test)]
721mod tests {
722 use super::*;
723 use std::path::PathBuf;
724
725 #[tokio::test]
726 async fn test_start_operation() {
727 let tracker = OperationTracker::new();
728 let scope = ResourceScope::Project(PathBuf::from("/test/project"));
729
730 let handle = tracker
731 .start_operation("agent-1", ResourceType::Build, scope.clone(), "cargo build")
732 .await
733 .unwrap();
734
735 assert!(tracker.is_alive(handle.operation_id()).await);
736
737 let status = tracker.get_status(handle.operation_id()).await.unwrap();
738 assert_eq!(status.agent_id, "agent-1");
739 assert!(status.is_alive);
740 }
741
742 #[tokio::test]
743 async fn test_heartbeat_updates_status() {
744 let tracker = OperationTracker::new();
745 let scope = ResourceScope::Project(PathBuf::from("/test/project"));
746
747 let handle = tracker
748 .start_operation("agent-1", ResourceType::Build, scope.clone(), "cargo build")
749 .await
750 .unwrap();
751
752 handle.update_status("Compiling crate...").await.unwrap();
753
754 let status = tracker.get_status(handle.operation_id()).await.unwrap();
755 assert_eq!(status.status, "Compiling crate...");
756 }
757
758 #[tokio::test]
759 async fn test_operation_completion() {
760 let tracker = OperationTracker::new();
761 let scope = ResourceScope::Project(PathBuf::from("/test/project"));
762
763 let handle = tracker
764 .start_operation("agent-1", ResourceType::Build, scope.clone(), "cargo build")
765 .await
766 .unwrap();
767
768 let op_id = handle.operation_id().to_string();
769 handle.complete("Build succeeded");
770
771 tokio::time::sleep(Duration::from_millis(100)).await;
773
774 assert!(!tracker.is_alive(&op_id).await);
776 }
777
778 #[tokio::test]
779 async fn test_stale_detection() {
780 let tracker = OperationTracker::with_config(
781 Duration::from_millis(10), 2, );
784 let scope = ResourceScope::Project(PathBuf::from("/test/project"));
785
786 let handle = tracker
787 .start_operation("agent-1", ResourceType::Build, scope.clone(), "cargo build")
788 .await
789 .unwrap();
790
791 let op_id = handle.operation_id().to_string();
792
793 std::mem::forget(handle);
795
796 tokio::time::sleep(Duration::from_millis(50)).await;
798
799 assert!(!tracker.is_alive(&op_id).await);
801 }
802
803 #[tokio::test]
804 async fn test_list_operations() {
805 let tracker = OperationTracker::new();
806 let scope1 = ResourceScope::Project(PathBuf::from("/test/project1"));
807 let scope2 = ResourceScope::Project(PathBuf::from("/test/project2"));
808
809 let _handle1 = tracker
810 .start_operation("agent-1", ResourceType::Build, scope1, "build 1")
811 .await
812 .unwrap();
813
814 let _handle2 = tracker
815 .start_operation("agent-2", ResourceType::Test, scope2, "test 2")
816 .await
817 .unwrap();
818
819 let ops = tracker.list_operations().await;
820 assert_eq!(ops.len(), 2);
821 }
822
823 #[tokio::test]
824 async fn test_find_operation() {
825 let tracker = OperationTracker::new();
826 let scope = ResourceScope::Project(PathBuf::from("/test/project"));
827
828 let _handle = tracker
829 .start_operation("agent-1", ResourceType::Build, scope.clone(), "cargo build")
830 .await
831 .unwrap();
832
833 let found = tracker.find_operation(ResourceType::Build, &scope).await;
834 assert!(found.is_some());
835 assert_eq!(found.unwrap().agent_id, "agent-1");
836
837 let not_found = tracker.find_operation(ResourceType::Test, &scope).await;
839 assert!(not_found.is_none());
840 }
841
842 #[tokio::test]
843 async fn test_output_lines() {
844 let tracker = OperationTracker::new();
845 let scope = ResourceScope::Project(PathBuf::from("/test/project"));
846
847 let handle = tracker
848 .start_operation("agent-1", ResourceType::Build, scope.clone(), "cargo build")
849 .await
850 .unwrap();
851
852 handle.add_output("Compiling foo v1.0.0").await.unwrap();
853 handle.add_output("Compiling bar v2.0.0").await.unwrap();
854
855 let status = tracker.get_status(handle.operation_id()).await.unwrap();
856 assert_eq!(status.recent_output.len(), 2);
857 assert_eq!(status.recent_output[0], "Compiling foo v1.0.0");
858 }
859
860 #[tokio::test]
861 async fn test_event_subscription() {
862 let tracker = OperationTracker::new();
863 let mut receiver = tracker.subscribe();
864 let scope = ResourceScope::Project(PathBuf::from("/test/project"));
865
866 let _handle = tracker
867 .start_operation("agent-1", ResourceType::Build, scope.clone(), "cargo build")
868 .await
869 .unwrap();
870
871 let event = receiver.try_recv().unwrap();
873 match event {
874 OperationEvent::Started { agent_id, .. } => {
875 assert_eq!(agent_id, "agent-1");
876 }
877 _ => panic!("Expected Started event"),
878 }
879 }
880}