Skip to main content

brainwires_agent/
operation_tracker.rs

1//! Operation tracker with heartbeat-based liveness checking
2//!
3//! Replaces fixed timeouts with active liveness monitoring. Operations remain
4//! valid as long as:
5//! 1. The holding agent is still running (heartbeats received)
6//! 2. Any attached external process (e.g., cargo build) is still alive
7//!
8//! This allows long-running operations (30+ minute builds) to hold locks
9//! without arbitrary timeout expiration.
10
11use anyhow::{Result, anyhow};
12use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, VecDeque};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::{RwLock, broadcast, oneshot};
17use tokio::task::JoinHandle;
18
19use crate::resource_locks::{ResourceScope, ResourceType};
20
21/// Default heartbeat interval (5 seconds)
22pub const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
23
24/// Default max missed heartbeats before considered dead (3 = 15 seconds)
25pub const DEFAULT_MAX_MISSED_HEARTBEATS: u32 = 3;
26
27/// Maximum number of output lines to keep per operation
28const MAX_OUTPUT_LINES: usize = 100;
29
30/// Tracks active operations with heartbeat-based liveness checking
31pub struct OperationTracker {
32    /// Active operations indexed by operation_id
33    operations: RwLock<HashMap<String, ActiveOperation>>,
34    /// Heartbeat interval
35    heartbeat_interval: Duration,
36    /// Max missed heartbeats before considered dead
37    max_missed_heartbeats: u32,
38    /// Notification channel for operation events
39    event_sender: broadcast::Sender<OperationEvent>,
40    /// Counter for generating unique operation IDs
41    next_id: RwLock<u64>,
42}
43
44/// Information about an active operation
45#[derive(Debug, Clone)]
46pub struct ActiveOperation {
47    /// Unique operation ID
48    pub operation_id: String,
49    /// ID of the agent performing the operation
50    pub agent_id: String,
51    /// Type of resource being used
52    pub resource_type: ResourceType,
53    /// Scope of the operation
54    pub scope: ResourceScope,
55    /// When the operation started
56    pub started_at: Instant,
57    /// Last heartbeat received
58    pub last_heartbeat: Instant,
59    /// Process ID if running an external command (e.g., cargo build)
60    pub process_id: Option<u32>,
61    /// Current status message
62    pub status: String,
63    /// Recent output lines from the operation
64    pub output_lines: VecDeque<String>,
65    /// Description of what the operation is doing
66    pub description: String,
67    /// Whether the operation has been explicitly completed
68    pub completed: bool,
69}
70
71impl ActiveOperation {
72    /// Check if the operation is still alive based on heartbeat
73    pub fn is_heartbeat_alive(&self, heartbeat_interval: Duration, max_missed: u32) -> bool {
74        if self.completed {
75            return false;
76        }
77        let max_silence = heartbeat_interval * max_missed;
78        self.last_heartbeat.elapsed() < max_silence
79    }
80
81    /// Get elapsed time since operation started
82    pub fn elapsed(&self) -> Duration {
83        self.started_at.elapsed()
84    }
85
86    /// Get time since last heartbeat
87    pub fn time_since_heartbeat(&self) -> Duration {
88        self.last_heartbeat.elapsed()
89    }
90}
91
92/// Events emitted by the operation tracker
93#[derive(Debug, Clone, Serialize, Deserialize)]
94#[serde(tag = "type", rename_all = "snake_case")]
95pub enum OperationEvent {
96    /// Operation started.
97    Started {
98        /// Unique operation identifier.
99        operation_id: String,
100        /// Agent performing the operation.
101        agent_id: String,
102        /// Type of resource being used.
103        resource_type: String,
104        /// Scope of the operation.
105        scope: String,
106        /// Human-readable description.
107        description: String,
108    },
109    /// Heartbeat received with status update.
110    Heartbeat {
111        /// Unique operation identifier.
112        operation_id: String,
113        /// Agent performing the operation.
114        agent_id: String,
115        /// Current status message.
116        status: String,
117        /// Seconds elapsed since operation started.
118        elapsed_secs: u64,
119    },
120    /// Operation completed.
121    Completed {
122        /// Unique operation identifier.
123        operation_id: String,
124        /// Agent that performed the operation.
125        agent_id: String,
126        /// Type of resource that was used.
127        resource_type: String,
128        /// Scope of the operation.
129        scope: String,
130        /// Total duration in seconds.
131        duration_secs: u64,
132        /// Whether the operation succeeded.
133        success: bool,
134        /// Summary of the outcome.
135        summary: String,
136    },
137    /// Operation detected as stale (no heartbeats).
138    Stale {
139        /// Unique operation identifier.
140        operation_id: String,
141        /// Agent that was performing the operation.
142        agent_id: String,
143        /// Type of resource that was held.
144        resource_type: String,
145        /// Scope of the operation.
146        scope: String,
147        /// Seconds since last heartbeat.
148        last_heartbeat_secs_ago: u64,
149    },
150    /// Process attached to operation terminated.
151    ProcessTerminated {
152        /// Unique operation identifier.
153        operation_id: String,
154        /// Agent that owned the process.
155        agent_id: String,
156        /// OS process identifier.
157        process_id: u32,
158        /// Exit status code if available.
159        exit_status: Option<i32>,
160    },
161}
162
163/// Status of an operation for querying
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct OperationStatus {
166    /// Unique operation identifier.
167    pub operation_id: String,
168    /// Agent performing the operation.
169    pub agent_id: String,
170    /// Type of resource being used.
171    pub resource_type: String,
172    /// Scope of the operation.
173    pub scope: String,
174    /// Seconds since the operation started.
175    pub started_at_secs_ago: u64,
176    /// Seconds since the last heartbeat.
177    pub last_heartbeat_secs_ago: u64,
178    /// Whether the operation is still alive.
179    pub is_alive: bool,
180    /// Current status message.
181    pub status: String,
182    /// Human-readable description.
183    pub description: String,
184    /// Attached OS process identifier, if any.
185    pub process_id: Option<u32>,
186    /// Recent output lines from the operation.
187    pub recent_output: Vec<String>,
188}
189
190/// Handle returned when starting an operation
191///
192/// The handle spawns a background heartbeat task and provides methods
193/// to update status and attach processes. When dropped, it signals
194/// completion of the operation.
195pub struct OperationHandle {
196    tracker: Arc<OperationTracker>,
197    operation_id: String,
198    agent_id: String,
199    _resource_type: ResourceType,
200    _scope: ResourceScope,
201    heartbeat_task: Option<JoinHandle<()>>,
202    completion_sender: Option<oneshot::Sender<OperationCompletion>>,
203}
204
205/// Completion status sent when handle is dropped
206#[derive(Debug)]
207struct OperationCompletion {
208    success: bool,
209    summary: String,
210}
211
212impl OperationTracker {
213    /// Create a new operation tracker with default settings
214    pub fn new() -> Arc<Self> {
215        Self::with_config(DEFAULT_HEARTBEAT_INTERVAL, DEFAULT_MAX_MISSED_HEARTBEATS)
216    }
217
218    /// Create a new operation tracker with custom settings
219    pub fn with_config(heartbeat_interval: Duration, max_missed_heartbeats: u32) -> Arc<Self> {
220        let (event_sender, _) = broadcast::channel(256);
221        Arc::new(Self {
222            operations: RwLock::new(HashMap::new()),
223            heartbeat_interval,
224            max_missed_heartbeats,
225            event_sender,
226            next_id: RwLock::new(1),
227        })
228    }
229
230    /// Subscribe to operation events
231    pub fn subscribe(&self) -> broadcast::Receiver<OperationEvent> {
232        self.event_sender.subscribe()
233    }
234
235    /// Generate a unique operation ID
236    async fn generate_id(&self) -> String {
237        let mut id = self.next_id.write().await;
238        let operation_id = format!("op-{}", *id);
239        *id += 1;
240        operation_id
241    }
242
243    /// Start tracking a new operation
244    ///
245    /// Returns an OperationHandle that:
246    /// - Automatically sends heartbeats
247    /// - Can have a process attached for liveness monitoring
248    /// - Signals completion when dropped
249    pub async fn start_operation(
250        self: &Arc<Self>,
251        agent_id: &str,
252        resource_type: ResourceType,
253        scope: ResourceScope,
254        description: &str,
255    ) -> Result<OperationHandle> {
256        let operation_id = self.generate_id().await;
257        let now = Instant::now();
258
259        let operation = ActiveOperation {
260            operation_id: operation_id.clone(),
261            agent_id: agent_id.to_string(),
262            resource_type,
263            scope: scope.clone(),
264            started_at: now,
265            last_heartbeat: now,
266            process_id: None,
267            status: "Starting".to_string(),
268            output_lines: VecDeque::new(),
269            description: description.to_string(),
270            completed: false,
271        };
272
273        // Store the operation
274        {
275            let mut ops = self.operations.write().await;
276            ops.insert(operation_id.clone(), operation);
277        }
278
279        // Emit started event
280        let _ = self.event_sender.send(OperationEvent::Started {
281            operation_id: operation_id.clone(),
282            agent_id: agent_id.to_string(),
283            resource_type: format!("{:?}", resource_type),
284            scope: format!("{:?}", scope),
285            description: description.to_string(),
286        });
287
288        // Create completion channel
289        let (completion_sender, completion_receiver) = oneshot::channel::<OperationCompletion>();
290
291        // Spawn heartbeat task
292        let tracker = Arc::clone(self);
293        let op_id = operation_id.clone();
294        let agent = agent_id.to_string();
295        let heartbeat_interval = self.heartbeat_interval;
296
297        let heartbeat_task = tokio::spawn(async move {
298            let mut interval = tokio::time::interval(heartbeat_interval);
299
300            loop {
301                interval.tick().await;
302
303                // Check if operation still exists and check process liveness
304                let ops = tracker.operations.read().await;
305                if let Some(op) = ops.get(&op_id) {
306                    if op.completed {
307                        break;
308                    }
309
310                    // Check process liveness if attached
311                    if let Some(pid) = op.process_id
312                        && !is_process_alive(pid)
313                    {
314                        drop(ops);
315                        // Process died - mark operation as potentially stale
316                        let _ = tracker
317                            .event_sender
318                            .send(OperationEvent::ProcessTerminated {
319                                operation_id: op_id.clone(),
320                                agent_id: agent.clone(),
321                                process_id: pid,
322                                exit_status: None, // Could try to get this
323                            });
324                    }
325                } else {
326                    // Operation was removed
327                    break;
328                }
329            }
330        });
331
332        // Spawn a task to wait for completion signal
333        let tracker_for_completion = Arc::clone(self);
334        let op_id_for_completion = operation_id.clone();
335        let agent_for_completion = agent_id.to_string();
336        let resource_type_for_completion = resource_type;
337        let scope_for_completion = scope.clone();
338
339        tokio::spawn(async move {
340            if let Ok(completion) = completion_receiver.await {
341                tracker_for_completion
342                    .complete_operation_internal(
343                        &op_id_for_completion,
344                        &agent_for_completion,
345                        resource_type_for_completion,
346                        &scope_for_completion,
347                        completion.success,
348                        &completion.summary,
349                    )
350                    .await;
351            }
352        });
353
354        Ok(OperationHandle {
355            tracker: Arc::clone(self),
356            operation_id,
357            agent_id: agent_id.to_string(),
358            _resource_type: resource_type,
359            _scope: scope,
360            heartbeat_task: Some(heartbeat_task),
361            completion_sender: Some(completion_sender),
362        })
363    }
364
365    /// Update heartbeat and status for an operation
366    pub async fn heartbeat(&self, operation_id: &str, status: &str) -> Result<()> {
367        let mut ops = self.operations.write().await;
368        let op = ops
369            .get_mut(operation_id)
370            .ok_or_else(|| anyhow!("Operation {} not found", operation_id))?;
371
372        op.last_heartbeat = Instant::now();
373        op.status = status.to_string();
374
375        let _ = self.event_sender.send(OperationEvent::Heartbeat {
376            operation_id: operation_id.to_string(),
377            agent_id: op.agent_id.clone(),
378            status: status.to_string(),
379            elapsed_secs: op.elapsed().as_secs(),
380        });
381
382        Ok(())
383    }
384
385    /// Add output line to an operation
386    pub async fn add_output(&self, operation_id: &str, line: &str) -> Result<()> {
387        let mut ops = self.operations.write().await;
388        let op = ops
389            .get_mut(operation_id)
390            .ok_or_else(|| anyhow!("Operation {} not found", operation_id))?;
391
392        op.output_lines.push_back(line.to_string());
393        if op.output_lines.len() > MAX_OUTPUT_LINES {
394            op.output_lines.pop_front();
395        }
396
397        Ok(())
398    }
399
400    /// Attach a process ID to an operation for liveness monitoring
401    pub async fn attach_process(&self, operation_id: &str, process_id: u32) -> Result<()> {
402        let mut ops = self.operations.write().await;
403        let op = ops
404            .get_mut(operation_id)
405            .ok_or_else(|| anyhow!("Operation {} not found", operation_id))?;
406
407        op.process_id = Some(process_id);
408        Ok(())
409    }
410
411    /// Check if an operation is still alive
412    pub async fn is_alive(&self, operation_id: &str) -> bool {
413        let ops = self.operations.read().await;
414        if let Some(op) = ops.get(operation_id) {
415            if op.completed {
416                return false;
417            }
418
419            // Check heartbeat
420            if !op.is_heartbeat_alive(self.heartbeat_interval, self.max_missed_heartbeats) {
421                return false;
422            }
423
424            // Check process if attached
425            if let Some(pid) = op.process_id
426                && !is_process_alive(pid)
427            {
428                return false;
429            }
430
431            true
432        } else {
433            false
434        }
435    }
436
437    /// Get status of an operation
438    pub async fn get_status(&self, operation_id: &str) -> Option<OperationStatus> {
439        let ops = self.operations.read().await;
440        ops.get(operation_id).map(|op| OperationStatus {
441            operation_id: op.operation_id.clone(),
442            agent_id: op.agent_id.clone(),
443            resource_type: format!("{:?}", op.resource_type),
444            scope: format!("{:?}", op.scope),
445            started_at_secs_ago: op.elapsed().as_secs(),
446            last_heartbeat_secs_ago: op.time_since_heartbeat().as_secs(),
447            is_alive: op.is_heartbeat_alive(self.heartbeat_interval, self.max_missed_heartbeats)
448                && op.process_id.is_none_or(is_process_alive),
449            status: op.status.clone(),
450            description: op.description.clone(),
451            process_id: op.process_id,
452            recent_output: op.output_lines.iter().cloned().collect(),
453        })
454    }
455
456    /// Get all active operations
457    pub async fn list_operations(&self) -> Vec<OperationStatus> {
458        let ops = self.operations.read().await;
459        ops.values()
460            .filter(|op| !op.completed)
461            .map(|op| OperationStatus {
462                operation_id: op.operation_id.clone(),
463                agent_id: op.agent_id.clone(),
464                resource_type: format!("{:?}", op.resource_type),
465                scope: format!("{:?}", op.scope),
466                started_at_secs_ago: op.elapsed().as_secs(),
467                last_heartbeat_secs_ago: op.time_since_heartbeat().as_secs(),
468                is_alive: op
469                    .is_heartbeat_alive(self.heartbeat_interval, self.max_missed_heartbeats)
470                    && op.process_id.is_none_or(is_process_alive),
471                status: op.status.clone(),
472                description: op.description.clone(),
473                process_id: op.process_id,
474                recent_output: op.output_lines.iter().cloned().collect(),
475            })
476            .collect()
477    }
478
479    /// Get operations for a specific agent
480    pub async fn operations_for_agent(&self, agent_id: &str) -> Vec<OperationStatus> {
481        let ops = self.operations.read().await;
482        ops.values()
483            .filter(|op| op.agent_id == agent_id && !op.completed)
484            .map(|op| OperationStatus {
485                operation_id: op.operation_id.clone(),
486                agent_id: op.agent_id.clone(),
487                resource_type: format!("{:?}", op.resource_type),
488                scope: format!("{:?}", op.scope),
489                started_at_secs_ago: op.elapsed().as_secs(),
490                last_heartbeat_secs_ago: op.time_since_heartbeat().as_secs(),
491                is_alive: op
492                    .is_heartbeat_alive(self.heartbeat_interval, self.max_missed_heartbeats)
493                    && op.process_id.is_none_or(is_process_alive),
494                status: op.status.clone(),
495                description: op.description.clone(),
496                process_id: op.process_id,
497                recent_output: op.output_lines.iter().cloned().collect(),
498            })
499            .collect()
500    }
501
502    /// Find operations by resource type and scope
503    pub async fn find_operation(
504        &self,
505        resource_type: ResourceType,
506        scope: &ResourceScope,
507    ) -> Option<OperationStatus> {
508        let ops = self.operations.read().await;
509        ops.values()
510            .find(|op| op.resource_type == resource_type && &op.scope == scope && !op.completed)
511            .map(|op| OperationStatus {
512                operation_id: op.operation_id.clone(),
513                agent_id: op.agent_id.clone(),
514                resource_type: format!("{:?}", op.resource_type),
515                scope: format!("{:?}", op.scope),
516                started_at_secs_ago: op.elapsed().as_secs(),
517                last_heartbeat_secs_ago: op.time_since_heartbeat().as_secs(),
518                is_alive: op
519                    .is_heartbeat_alive(self.heartbeat_interval, self.max_missed_heartbeats)
520                    && op.process_id.is_none_or(is_process_alive),
521                status: op.status.clone(),
522                description: op.description.clone(),
523                process_id: op.process_id,
524                recent_output: op.output_lines.iter().cloned().collect(),
525            })
526    }
527
528    /// Clean up stale operations (those with expired heartbeats)
529    pub async fn cleanup_stale(&self) -> Vec<String> {
530        let mut ops = self.operations.write().await;
531        let mut stale = Vec::new();
532
533        ops.retain(|id, op| {
534            if op.completed {
535                return false; // Remove completed operations
536            }
537
538            let is_alive = op
539                .is_heartbeat_alive(self.heartbeat_interval, self.max_missed_heartbeats)
540                && op.process_id.is_none_or(is_process_alive);
541
542            if !is_alive {
543                stale.push(id.clone());
544                let _ = self.event_sender.send(OperationEvent::Stale {
545                    operation_id: id.clone(),
546                    agent_id: op.agent_id.clone(),
547                    resource_type: format!("{:?}", op.resource_type),
548                    scope: format!("{:?}", op.scope),
549                    last_heartbeat_secs_ago: op.time_since_heartbeat().as_secs(),
550                });
551                false
552            } else {
553                true
554            }
555        });
556
557        stale
558    }
559
560    /// Internal method to complete an operation
561    async fn complete_operation_internal(
562        &self,
563        operation_id: &str,
564        agent_id: &str,
565        resource_type: ResourceType,
566        scope: &ResourceScope,
567        success: bool,
568        summary: &str,
569    ) {
570        let duration_secs = {
571            let mut ops = self.operations.write().await;
572            if let Some(op) = ops.get_mut(operation_id) {
573                op.completed = true;
574                op.status = if success { "Completed" } else { "Failed" }.to_string();
575                op.elapsed().as_secs()
576            } else {
577                0
578            }
579        };
580
581        let _ = self.event_sender.send(OperationEvent::Completed {
582            operation_id: operation_id.to_string(),
583            agent_id: agent_id.to_string(),
584            resource_type: format!("{:?}", resource_type),
585            scope: format!("{:?}", scope),
586            duration_secs,
587            success,
588            summary: summary.to_string(),
589        });
590
591        // Remove the operation after a short delay to allow event processing
592        let tracker = Arc::clone(&Arc::new(self.clone_inner().await));
593        let op_id = operation_id.to_string();
594        tokio::spawn(async move {
595            tokio::time::sleep(Duration::from_secs(5)).await;
596            let mut ops = tracker.operations.write().await;
597            ops.remove(&op_id);
598        });
599    }
600
601    /// Clone inner state for spawned tasks
602    async fn clone_inner(&self) -> OperationTrackerInner {
603        OperationTrackerInner {
604            operations: Arc::new(RwLock::new(self.operations.read().await.clone())),
605        }
606    }
607}
608
609/// Inner state for cloning
610struct OperationTrackerInner {
611    operations: Arc<RwLock<HashMap<String, ActiveOperation>>>,
612}
613
614impl Default for OperationTracker {
615    fn default() -> Self {
616        let (event_sender, _) = broadcast::channel(256);
617        Self {
618            operations: RwLock::new(HashMap::new()),
619            heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
620            max_missed_heartbeats: DEFAULT_MAX_MISSED_HEARTBEATS,
621            event_sender,
622            next_id: RwLock::new(1),
623        }
624    }
625}
626
627impl OperationHandle {
628    /// Get the operation ID
629    pub fn operation_id(&self) -> &str {
630        &self.operation_id
631    }
632
633    /// Get the agent ID
634    pub fn agent_id(&self) -> &str {
635        &self.agent_id
636    }
637
638    /// Update status (sends heartbeat)
639    pub async fn update_status(&self, status: &str) -> Result<()> {
640        self.tracker.heartbeat(&self.operation_id, status).await
641    }
642
643    /// Add output line
644    pub async fn add_output(&self, line: &str) -> Result<()> {
645        self.tracker.add_output(&self.operation_id, line).await
646    }
647
648    /// Attach a process for liveness monitoring
649    pub async fn attach_process(&self, process_id: u32) -> Result<()> {
650        self.tracker
651            .attach_process(&self.operation_id, process_id)
652            .await
653    }
654
655    /// Mark operation as completed successfully
656    pub fn complete(mut self, summary: &str) {
657        if let Some(sender) = self.completion_sender.take() {
658            let _ = sender.send(OperationCompletion {
659                success: true,
660                summary: summary.to_string(),
661            });
662        }
663    }
664
665    /// Mark operation as failed
666    pub fn fail(mut self, error: &str) {
667        if let Some(sender) = self.completion_sender.take() {
668            let _ = sender.send(OperationCompletion {
669                success: false,
670                summary: error.to_string(),
671            });
672        }
673    }
674}
675
676impl Drop for OperationHandle {
677    fn drop(&mut self) {
678        // Cancel heartbeat task
679        if let Some(task) = self.heartbeat_task.take() {
680            task.abort();
681        }
682
683        // Send completion if not already sent
684        if let Some(sender) = self.completion_sender.take() {
685            let _ = sender.send(OperationCompletion {
686                success: true,
687                summary: "Operation handle dropped".to_string(),
688            });
689        }
690    }
691}
692
693/// Check if a process is still alive
694#[cfg(all(unix, feature = "native"))]
695fn is_process_alive(pid: u32) -> bool {
696    // Send signal 0 to check if process exists
697    unsafe { libc::kill(pid as i32, 0) == 0 }
698}
699
700#[cfg(windows)]
701fn is_process_alive(pid: u32) -> bool {
702    use std::process::Command;
703    // On Windows, use tasklist to check if process exists
704    Command::new("tasklist")
705        .args(["/FI", &format!("PID eq {}", pid), "/NH"])
706        .output()
707        .map(|output| {
708            let stdout = String::from_utf8_lossy(&output.stdout);
709            stdout.contains(&pid.to_string())
710        })
711        .unwrap_or(false)
712}
713
714#[cfg(all(not(windows), not(all(unix, feature = "native"))))]
715fn is_process_alive(_pid: u32) -> bool {
716    // Process liveness checking not available in WASM/non-native
717    false
718}
719
720#[cfg(test)]
721mod tests {
722    use super::*;
723    use std::path::PathBuf;
724
725    #[tokio::test]
726    async fn test_start_operation() {
727        let tracker = OperationTracker::new();
728        let scope = ResourceScope::Project(PathBuf::from("/test/project"));
729
730        let handle = tracker
731            .start_operation("agent-1", ResourceType::Build, scope.clone(), "cargo build")
732            .await
733            .unwrap();
734
735        assert!(tracker.is_alive(handle.operation_id()).await);
736
737        let status = tracker.get_status(handle.operation_id()).await.unwrap();
738        assert_eq!(status.agent_id, "agent-1");
739        assert!(status.is_alive);
740    }
741
742    #[tokio::test]
743    async fn test_heartbeat_updates_status() {
744        let tracker = OperationTracker::new();
745        let scope = ResourceScope::Project(PathBuf::from("/test/project"));
746
747        let handle = tracker
748            .start_operation("agent-1", ResourceType::Build, scope.clone(), "cargo build")
749            .await
750            .unwrap();
751
752        handle.update_status("Compiling crate...").await.unwrap();
753
754        let status = tracker.get_status(handle.operation_id()).await.unwrap();
755        assert_eq!(status.status, "Compiling crate...");
756    }
757
758    #[tokio::test]
759    async fn test_operation_completion() {
760        let tracker = OperationTracker::new();
761        let scope = ResourceScope::Project(PathBuf::from("/test/project"));
762
763        let handle = tracker
764            .start_operation("agent-1", ResourceType::Build, scope.clone(), "cargo build")
765            .await
766            .unwrap();
767
768        let op_id = handle.operation_id().to_string();
769        handle.complete("Build succeeded");
770
771        // Give time for completion to process
772        tokio::time::sleep(Duration::from_millis(100)).await;
773
774        // Operation should be marked as not alive after completion
775        assert!(!tracker.is_alive(&op_id).await);
776    }
777
778    #[tokio::test]
779    async fn test_stale_detection() {
780        let tracker = OperationTracker::with_config(
781            Duration::from_millis(10), // 10ms heartbeat
782            2,                         // 2 missed = 20ms timeout
783        );
784        let scope = ResourceScope::Project(PathBuf::from("/test/project"));
785
786        let handle = tracker
787            .start_operation("agent-1", ResourceType::Build, scope.clone(), "cargo build")
788            .await
789            .unwrap();
790
791        let op_id = handle.operation_id().to_string();
792
793        // Drop handle without completing - simulates crash
794        std::mem::forget(handle);
795
796        // Wait for heartbeat timeout
797        tokio::time::sleep(Duration::from_millis(50)).await;
798
799        // Should now be stale
800        assert!(!tracker.is_alive(&op_id).await);
801    }
802
803    #[tokio::test]
804    async fn test_list_operations() {
805        let tracker = OperationTracker::new();
806        let scope1 = ResourceScope::Project(PathBuf::from("/test/project1"));
807        let scope2 = ResourceScope::Project(PathBuf::from("/test/project2"));
808
809        let _handle1 = tracker
810            .start_operation("agent-1", ResourceType::Build, scope1, "build 1")
811            .await
812            .unwrap();
813
814        let _handle2 = tracker
815            .start_operation("agent-2", ResourceType::Test, scope2, "test 2")
816            .await
817            .unwrap();
818
819        let ops = tracker.list_operations().await;
820        assert_eq!(ops.len(), 2);
821    }
822
823    #[tokio::test]
824    async fn test_find_operation() {
825        let tracker = OperationTracker::new();
826        let scope = ResourceScope::Project(PathBuf::from("/test/project"));
827
828        let _handle = tracker
829            .start_operation("agent-1", ResourceType::Build, scope.clone(), "cargo build")
830            .await
831            .unwrap();
832
833        let found = tracker.find_operation(ResourceType::Build, &scope).await;
834        assert!(found.is_some());
835        assert_eq!(found.unwrap().agent_id, "agent-1");
836
837        // Should not find non-existent operation
838        let not_found = tracker.find_operation(ResourceType::Test, &scope).await;
839        assert!(not_found.is_none());
840    }
841
842    #[tokio::test]
843    async fn test_output_lines() {
844        let tracker = OperationTracker::new();
845        let scope = ResourceScope::Project(PathBuf::from("/test/project"));
846
847        let handle = tracker
848            .start_operation("agent-1", ResourceType::Build, scope.clone(), "cargo build")
849            .await
850            .unwrap();
851
852        handle.add_output("Compiling foo v1.0.0").await.unwrap();
853        handle.add_output("Compiling bar v2.0.0").await.unwrap();
854
855        let status = tracker.get_status(handle.operation_id()).await.unwrap();
856        assert_eq!(status.recent_output.len(), 2);
857        assert_eq!(status.recent_output[0], "Compiling foo v1.0.0");
858    }
859
860    #[tokio::test]
861    async fn test_event_subscription() {
862        let tracker = OperationTracker::new();
863        let mut receiver = tracker.subscribe();
864        let scope = ResourceScope::Project(PathBuf::from("/test/project"));
865
866        let _handle = tracker
867            .start_operation("agent-1", ResourceType::Build, scope.clone(), "cargo build")
868            .await
869            .unwrap();
870
871        // Should receive started event
872        let event = receiver.try_recv().unwrap();
873        match event {
874            OperationEvent::Started { agent_id, .. } => {
875                assert_eq!(agent_id, "agent-1");
876            }
877            _ => panic!("Expected Started event"),
878        }
879    }
880}