Skip to main content

durable_execution_sdk_testing/
operation_handle.rs

1//! Lazy operation handle for pre-run operation registration.
2//!
3//! An `OperationHandle` is a lazy reference to a named operation that is registered
4//! before `run()` and auto-populates with operation data during execution. This enables
5//! the idiomatic callback testing pattern: pre-register a handle, start a non-blocking
6//! run, wait for mid-execution status changes, send callback responses, and await the
7//! final result.
8
9use std::sync::Arc;
10use tokio::sync::{watch, RwLock};
11
12use serde::de::DeserializeOwned;
13
14use durable_execution_sdk::{Operation, OperationStatus, OperationType};
15
16use crate::error::TestError;
17use crate::operation::{
18    CallbackDetails, CallbackSender, ContextDetails, DurableOperation, InvokeDetails, StepDetails,
19    WaitDetails,
20};
21use crate::types::WaitingOperationStatus;
22
23/// How an `OperationHandle` matches against operations during execution.
24#[derive(Clone, Debug)]
25pub enum OperationMatcher {
26    /// Match by operation name (first operation with this name).
27    ByName(String),
28    /// Match by execution order index.
29    ByIndex(usize),
30    /// Match by unique operation ID.
31    ById(String),
32    /// Match by operation name and occurrence index (nth operation with this name).
33    ByNameAndIndex(String, usize),
34}
35
36/// A lazy handle to an operation that will be populated during execution.
37///
38/// Registered before `run()` via `get_operation_handle()`, `get_operation_handle_by_index()`,
39/// or `get_operation_handle_by_id()` on the `LocalDurableTestRunner`. The handle starts
40/// unpopulated and is filled with operation data when the orchestrator finds a matching
41/// operation during execution.
42///
43/// # Examples
44///
45/// ```ignore
46/// use durable_execution_sdk_testing::OperationHandle;
47///
48/// // Pre-register a handle before run()
49/// let handle = runner.get_operation_handle("my-callback");
50///
51/// // Start non-blocking execution
52/// let future = runner.run(input);
53///
54/// // Wait for the operation to reach Submitted status
55/// handle.wait_for_data(WaitingOperationStatus::Submitted).await?;
56///
57/// // Send callback response
58/// handle.send_callback_success("result").await?;
59///
60/// // Await the final result
61/// let result = future.await?;
62/// ```
63pub struct OperationHandle {
64    /// How this handle matches operations.
65    pub(crate) matcher: OperationMatcher,
66    /// Shared operation data, populated during execution.
67    pub(crate) inner: Arc<RwLock<Option<Operation>>>,
68    /// Watch channel sender for status notifications.
69    pub(crate) status_tx: watch::Sender<Option<OperationStatus>>,
70    /// Watch channel receiver for status notifications.
71    pub(crate) status_rx: watch::Receiver<Option<OperationStatus>>,
72    /// Callback sender for interacting with the checkpoint server.
73    /// Wrapped in Arc<RwLock<>> so that clones share the same sender reference.
74    /// This allows `with_handles()` to set the sender after the handle is cloned
75    /// by the test code, and the test's clone will see the update.
76    pub(crate) callback_sender: Arc<RwLock<Option<Arc<dyn CallbackSender>>>>,
77    /// Shared reference to all operations (for child enumeration).
78    pub(crate) all_operations: Arc<RwLock<Vec<Operation>>>,
79}
80
81impl Clone for OperationHandle {
82    fn clone(&self) -> Self {
83        Self {
84            matcher: self.matcher.clone(),
85            inner: Arc::clone(&self.inner),
86            status_tx: self.status_tx.clone(),
87            status_rx: self.status_rx.clone(),
88            callback_sender: Arc::clone(&self.callback_sender),
89            all_operations: Arc::clone(&self.all_operations),
90        }
91    }
92}
93
94impl OperationHandle {
95    /// Creates a new unpopulated `OperationHandle` with the given matcher.
96    ///
97    /// The handle starts with no operation data. It will be populated during
98    /// execution when the orchestrator finds an operation matching the matcher.
99    ///
100    /// # Arguments
101    ///
102    /// * `matcher` - How this handle should match against operations
103    /// * `all_operations` - Shared reference to all operations for child enumeration
104    pub fn new(matcher: OperationMatcher, all_operations: Arc<RwLock<Vec<Operation>>>) -> Self {
105        let (status_tx, status_rx) = watch::channel(None);
106        Self {
107            matcher,
108            inner: Arc::new(RwLock::new(None)),
109            status_tx,
110            status_rx,
111            callback_sender: Arc::new(RwLock::new(None)),
112            all_operations,
113        }
114    }
115
116    // =========================================================================
117    // Inspection Methods (Requirements 1.3, 1.6)
118    // =========================================================================
119
120    /// Helper to get the inner operation or return an error if unpopulated.
121    async fn get_durable_operation(&self) -> Result<DurableOperation, TestError> {
122        let inner = self.inner.read().await;
123        match inner.as_ref() {
124            Some(op) => Ok(DurableOperation::new(op.clone())),
125            None => Err(TestError::OperationNotFound(
126                "Operation not yet populated".into(),
127            )),
128        }
129    }
130
131    /// Gets the operation ID.
132    ///
133    /// # Returns
134    ///
135    /// - `Ok(String)` - The operation ID if the handle is populated
136    /// - `Err(TestError::OperationNotFound)` - If the handle is not yet populated
137    pub async fn get_id(&self) -> Result<String, TestError> {
138        let op = self.get_durable_operation().await?;
139        Ok(op.get_id().to_string())
140    }
141
142    /// Gets the operation name.
143    ///
144    /// # Returns
145    ///
146    /// - `Ok(Option<String>)` - The operation name if the handle is populated
147    /// - `Err(TestError::OperationNotFound)` - If the handle is not yet populated
148    pub async fn get_name(&self) -> Result<Option<String>, TestError> {
149        let op = self.get_durable_operation().await?;
150        Ok(op.get_name().map(|s| s.to_string()))
151    }
152
153    /// Gets the operation type.
154    ///
155    /// # Returns
156    ///
157    /// - `Ok(OperationType)` - The operation type if the handle is populated
158    /// - `Err(TestError::OperationNotFound)` - If the handle is not yet populated
159    pub async fn get_type(&self) -> Result<OperationType, TestError> {
160        let op = self.get_durable_operation().await?;
161        Ok(op.get_type())
162    }
163
164    /// Gets the operation status.
165    ///
166    /// # Returns
167    ///
168    /// - `Ok(OperationStatus)` - The operation status if the handle is populated
169    /// - `Err(TestError::OperationNotFound)` - If the handle is not yet populated
170    pub async fn get_status(&self) -> Result<OperationStatus, TestError> {
171        let op = self.get_durable_operation().await?;
172        Ok(op.get_status())
173    }
174
175    /// Gets step-specific details.
176    ///
177    /// # Type Parameters
178    ///
179    /// * `T` - The type to deserialize the result into
180    ///
181    /// # Returns
182    ///
183    /// - `Ok(StepDetails<T>)` - The step details if populated and is a Step operation
184    /// - `Err(TestError)` - If unpopulated or wrong operation type
185    pub async fn get_step_details<T: DeserializeOwned>(&self) -> Result<StepDetails<T>, TestError> {
186        let op = self.get_durable_operation().await?;
187        op.get_step_details()
188    }
189
190    /// Gets callback-specific details.
191    ///
192    /// # Type Parameters
193    ///
194    /// * `T` - The type to deserialize the result into
195    ///
196    /// # Returns
197    ///
198    /// - `Ok(CallbackDetails<T>)` - The callback details if populated and is a Callback operation
199    /// - `Err(TestError)` - If unpopulated or wrong operation type
200    pub async fn get_callback_details<T: DeserializeOwned>(
201        &self,
202    ) -> Result<CallbackDetails<T>, TestError> {
203        let op = self.get_durable_operation().await?;
204        op.get_callback_details()
205    }
206
207    /// Gets wait-specific details.
208    ///
209    /// # Returns
210    ///
211    /// - `Ok(WaitDetails)` - The wait details if populated and is a Wait operation
212    /// - `Err(TestError)` - If unpopulated or wrong operation type
213    pub async fn get_wait_details(&self) -> Result<WaitDetails, TestError> {
214        let op = self.get_durable_operation().await?;
215        op.get_wait_details()
216    }
217
218    /// Gets invoke-specific details.
219    ///
220    /// # Type Parameters
221    ///
222    /// * `T` - The type to deserialize the result into
223    ///
224    /// # Returns
225    ///
226    /// - `Ok(InvokeDetails<T>)` - The invoke details if populated and is an Invoke operation
227    /// - `Err(TestError)` - If unpopulated or wrong operation type
228    pub async fn get_invoke_details<T: DeserializeOwned>(
229        &self,
230    ) -> Result<InvokeDetails<T>, TestError> {
231        let op = self.get_durable_operation().await?;
232        op.get_invoke_details()
233    }
234
235    /// Gets context-specific details.
236    ///
237    /// # Type Parameters
238    ///
239    /// * `T` - The type to deserialize the result into
240    ///
241    /// # Returns
242    ///
243    /// - `Ok(ContextDetails<T>)` - The context details if populated and is a Context operation
244    /// - `Err(TestError)` - If unpopulated or wrong operation type
245    pub async fn get_context_details<T: DeserializeOwned>(
246        &self,
247    ) -> Result<ContextDetails<T>, TestError> {
248        let op = self.get_durable_operation().await?;
249        op.get_context_details()
250    }
251
252    /// Checks if the handle has been populated with operation data.
253    ///
254    /// # Returns
255    ///
256    /// `true` if the handle has been populated, `false` otherwise.
257    pub async fn is_populated(&self) -> bool {
258        self.inner.read().await.is_some()
259    }
260
261    // =========================================================================
262    // Async Waiting (Requirements 1.4, 4.1, 4.2, 4.3, 4.5)
263    // =========================================================================
264
265    /// Waits for the operation to reach the specified `WaitingOperationStatus`.
266    ///
267    /// Resolves immediately if the operation has already reached the target status.
268    /// If the watch channel closes (execution ended) before the target status is
269    /// reached, returns `Err(TestError::ExecutionCompletedEarly(...))`.
270    ///
271    /// # Arguments
272    ///
273    /// * `status` - The target status to wait for
274    ///
275    /// # Returns
276    ///
277    /// - `Ok(())` - When the operation reaches the target status
278    /// - `Err(TestError::ExecutionCompletedEarly)` - If execution ends before reaching the target
279    ///
280    /// # Examples
281    ///
282    /// ```ignore
283    /// // Wait for a callback to be ready for responses
284    /// handle.wait_for_data(WaitingOperationStatus::Submitted).await?;
285    ///
286    /// // Wait for an operation to complete
287    /// handle.wait_for_data(WaitingOperationStatus::Completed).await?;
288    /// ```
289    pub async fn wait_for_data(&self, status: WaitingOperationStatus) -> Result<(), TestError> {
290        // Check if the current status already satisfies the target
291        if self.check_status_reached(status).await {
292            return Ok(());
293        }
294
295        // Subscribe to the watch channel and wait for updates
296        let mut rx = self.status_rx.clone();
297        loop {
298            // Wait for the next status change
299            if rx.changed().await.is_err() {
300                // Channel closed — execution ended. Do a final check.
301                if self.check_status_reached(status).await {
302                    return Ok(());
303                }
304                return Err(TestError::execution_completed_early(
305                    self.matcher_description(),
306                    status,
307                ));
308            }
309
310            // Check if the new status satisfies the target
311            if self.check_status_reached(status).await {
312                return Ok(());
313            }
314        }
315    }
316
317    /// Checks whether the current operation state satisfies the target `WaitingOperationStatus`.
318    async fn check_status_reached(&self, target: WaitingOperationStatus) -> bool {
319        let inner = self.inner.read().await;
320        match inner.as_ref() {
321            None => false,
322            Some(op) => Self::status_matches_target(op, target),
323        }
324    }
325
326    /// Determines if an operation's current state matches the target waiting status.
327    fn status_matches_target(op: &Operation, target: WaitingOperationStatus) -> bool {
328        match target {
329            WaitingOperationStatus::Started => {
330                // Any populated operation has started
331                true
332            }
333            WaitingOperationStatus::Submitted => {
334                // For callbacks, Submitted means callback_id is available
335                if op.operation_type == OperationType::Callback {
336                    op.callback_details
337                        .as_ref()
338                        .map(|d| d.callback_id.is_some())
339                        .unwrap_or(false)
340                } else {
341                    // For non-callbacks, treat Submitted as Started
342                    true
343                }
344            }
345            WaitingOperationStatus::Completed => {
346                // Terminal status means completed
347                op.status.is_terminal()
348            }
349        }
350    }
351
352    // =========================================================================
353    // Callback Interaction Methods (Requirements 1.5, 5.4)
354    // =========================================================================
355
356    /// Sends a success response for a callback operation.
357    ///
358    /// Validates that the handle is populated, the operation is a callback type,
359    /// and the callback_id is available (Submitted status) before delegating to
360    /// the callback sender.
361    ///
362    /// # Arguments
363    ///
364    /// * `result` - The result value to send as a JSON string
365    ///
366    /// # Returns
367    ///
368    /// - `Ok(())` - If the callback response was sent successfully
369    /// - `Err(TestError::OperationNotFound)` - If the handle is not yet populated
370    /// - `Err(TestError::NotCallbackOperation)` - If the operation is not a callback
371    /// - `Err(TestError::ResultNotAvailable)` - If the callback_id is not yet available
372    pub async fn send_callback_success(&self, result: &str) -> Result<(), TestError> {
373        let callback_id = self.validate_callback_ready().await?;
374
375        let sender_guard = self.callback_sender.read().await;
376        match sender_guard.as_ref() {
377            Some(sender) => sender.send_success(&callback_id, result).await,
378            None => Err(TestError::result_not_available(
379                "No callback sender configured on this handle",
380            )),
381        }
382    }
383
384    /// Sends a failure response for a callback operation.
385    ///
386    /// Validates that the handle is populated, the operation is a callback type,
387    /// and the callback_id is available (Submitted status) before delegating to
388    /// the callback sender.
389    ///
390    /// # Arguments
391    ///
392    /// * `error` - The error information to send
393    ///
394    /// # Returns
395    ///
396    /// - `Ok(())` - If the callback failure was sent successfully
397    /// - `Err(TestError::OperationNotFound)` - If the handle is not yet populated
398    /// - `Err(TestError::NotCallbackOperation)` - If the operation is not a callback
399    /// - `Err(TestError::ResultNotAvailable)` - If the callback_id is not yet available
400    pub async fn send_callback_failure(
401        &self,
402        error: &crate::types::TestResultError,
403    ) -> Result<(), TestError> {
404        let callback_id = self.validate_callback_ready().await?;
405
406        let sender_guard = self.callback_sender.read().await;
407        match sender_guard.as_ref() {
408            Some(sender) => sender.send_failure(&callback_id, error).await,
409            None => Err(TestError::result_not_available(
410                "No callback sender configured on this handle",
411            )),
412        }
413    }
414
415    /// Sends a heartbeat for a callback operation.
416    ///
417    /// Validates that the handle is populated, the operation is a callback type,
418    /// and the callback_id is available (Submitted status) before delegating to
419    /// the callback sender.
420    ///
421    /// # Returns
422    ///
423    /// - `Ok(())` - If the heartbeat was sent successfully
424    /// - `Err(TestError::OperationNotFound)` - If the handle is not yet populated
425    /// - `Err(TestError::NotCallbackOperation)` - If the operation is not a callback
426    /// - `Err(TestError::ResultNotAvailable)` - If the callback_id is not yet available
427    pub async fn send_callback_heartbeat(&self) -> Result<(), TestError> {
428        let callback_id = self.validate_callback_ready().await?;
429
430        let sender_guard = self.callback_sender.read().await;
431        match sender_guard.as_ref() {
432            Some(sender) => sender.send_heartbeat(&callback_id).await,
433            None => Err(TestError::result_not_available(
434                "No callback sender configured on this handle",
435            )),
436        }
437    }
438
439    /// Validates that the handle is populated with a callback operation that has
440    /// a callback_id available (Submitted status). Returns the callback_id on success.
441    async fn validate_callback_ready(&self) -> Result<String, TestError> {
442        let inner = self.inner.read().await;
443        let op = inner
444            .as_ref()
445            .ok_or_else(|| TestError::OperationNotFound("Operation not yet populated".into()))?;
446
447        if op.operation_type != OperationType::Callback {
448            return Err(TestError::NotCallbackOperation);
449        }
450
451        op.callback_details
452            .as_ref()
453            .and_then(|d| d.callback_id.clone())
454            .ok_or_else(|| {
455                TestError::result_not_available(
456                    "Callback ID not available — operation has not reached Submitted status",
457                )
458            })
459    }
460
461    // =========================================================================
462    // Child Operation Methods (Requirements 3.1, 3.2, 3.3, 3.4)
463    // =========================================================================
464
465    /// Returns all child operations nested under this operation.
466    ///
467    /// Child operations are those whose `parent_id` matches this operation's `id`.
468    /// The returned operations preserve execution order (same order as in the
469    /// shared operations list).
470    ///
471    /// # Returns
472    ///
473    /// - `Ok(Vec<DurableOperation>)` - Child operations if the handle is populated
474    /// - `Err(TestError::OperationNotFound)` - If the handle is not yet populated
475    ///
476    /// # Requirements
477    ///
478    /// - 3.1: Returns all operations whose parent_id matches this operation's id
479    /// - 3.2: Returns empty Vec when no children exist
480    /// - 3.3: Returned children support get_child_operations() for nested hierarchies
481    /// - 3.4: Preserves execution order of child operations
482    pub async fn get_child_operations(&self) -> Result<Vec<DurableOperation>, TestError> {
483        let inner = self.inner.read().await;
484        let op = inner
485            .as_ref()
486            .ok_or_else(|| TestError::OperationNotFound("Operation not yet populated".into()))?;
487
488        let my_id = &op.operation_id;
489        let all_ops = self.all_operations.read().await;
490
491        // Snapshot into an Arc<Vec<Operation>> so each child DurableOperation
492        // can enumerate its own children via with_operations().
493        let all_ops_arc = Arc::new(all_ops.clone());
494
495        let children = all_ops_arc
496            .iter()
497            .filter(|child| child.parent_id.as_deref() == Some(my_id))
498            .map(|child| {
499                DurableOperation::new(child.clone()).with_operations(Arc::clone(&all_ops_arc))
500            })
501            .collect();
502
503        Ok(children)
504    }
505
506    /// Returns a human-readable description of the matcher for error messages.
507    fn matcher_description(&self) -> String {
508        match &self.matcher {
509            OperationMatcher::ByName(name) => format!("name={}", name),
510            OperationMatcher::ByIndex(idx) => format!("index={}", idx),
511            OperationMatcher::ById(id) => format!("id={}", id),
512            OperationMatcher::ByNameAndIndex(name, idx) => format!("name={}, index={}", name, idx),
513        }
514    }
515}
516
517#[cfg(test)]
518mod tests {
519    use super::*;
520    use durable_execution_sdk::{
521        CallbackDetails as SdkCallbackDetails, ChainedInvokeDetails as SdkChainedInvokeDetails,
522        ContextDetails as SdkContextDetails, OperationType, StepDetails as SdkStepDetails,
523        WaitDetails as SdkWaitDetails,
524    };
525
526    /// Helper to create a step operation for testing.
527    fn create_step_operation() -> Operation {
528        let mut op = Operation::new("step-001".to_string(), OperationType::Step);
529        op.name = Some("my-step".to_string());
530        op.status = OperationStatus::Succeeded;
531        op.step_details = Some(SdkStepDetails {
532            result: Some(r#""hello""#.to_string()),
533            attempt: Some(1),
534            next_attempt_timestamp: None,
535            error: None,
536            payload: None,
537        });
538        op
539    }
540
541    /// Helper to create a callback operation for testing.
542    fn create_callback_operation() -> Operation {
543        let mut op = Operation::new("cb-001".to_string(), OperationType::Callback);
544        op.name = Some("my-callback".to_string());
545        op.status = OperationStatus::Started;
546        op.callback_details = Some(SdkCallbackDetails {
547            callback_id: Some("cb-id-123".to_string()),
548            result: None,
549            error: None,
550        });
551        op
552    }
553
554    /// Helper to create a wait operation for testing.
555    fn create_wait_operation() -> Operation {
556        let mut op = Operation::new("wait-001".to_string(), OperationType::Wait);
557        op.name = Some("my-wait".to_string());
558        op.status = OperationStatus::Succeeded;
559        op.start_timestamp = Some(1000);
560        op.wait_details = Some(SdkWaitDetails {
561            scheduled_end_timestamp: Some(6000),
562        });
563        op
564    }
565
566    /// Helper to create an invoke operation for testing.
567    fn create_invoke_operation() -> Operation {
568        let mut op = Operation::new("invoke-001".to_string(), OperationType::Invoke);
569        op.name = Some("my-invoke".to_string());
570        op.status = OperationStatus::Succeeded;
571        op.chained_invoke_details = Some(SdkChainedInvokeDetails {
572            result: Some(r#"42"#.to_string()),
573            error: None,
574        });
575        op
576    }
577
578    /// Helper to create a context operation for testing.
579    fn create_context_operation() -> Operation {
580        let mut op = Operation::new("ctx-001".to_string(), OperationType::Context);
581        op.name = Some("my-context".to_string());
582        op.status = OperationStatus::Succeeded;
583        op.context_details = Some(SdkContextDetails {
584            result: Some(r#""done""#.to_string()),
585            replay_children: None,
586            error: None,
587        });
588        op
589    }
590
591    /// Helper to populate a handle with an operation.
592    async fn populate_handle(handle: &OperationHandle, op: Operation) {
593        let mut inner = handle.inner.write().await;
594        *inner = Some(op);
595    }
596
597    #[test]
598    fn test_operation_handle_new_by_name() {
599        let all_ops = Arc::new(RwLock::new(Vec::new()));
600        let handle = OperationHandle::new(OperationMatcher::ByName("test-op".into()), all_ops);
601
602        assert!(matches!(handle.matcher, OperationMatcher::ByName(ref n) if n == "test-op"));
603        assert!(handle.callback_sender.try_read().unwrap().is_none());
604    }
605
606    #[test]
607    fn test_operation_handle_new_by_index() {
608        let all_ops = Arc::new(RwLock::new(Vec::new()));
609        let handle = OperationHandle::new(OperationMatcher::ByIndex(3), all_ops);
610
611        assert!(matches!(handle.matcher, OperationMatcher::ByIndex(3)));
612    }
613
614    #[test]
615    fn test_operation_handle_new_by_id() {
616        let all_ops = Arc::new(RwLock::new(Vec::new()));
617        let handle = OperationHandle::new(OperationMatcher::ById("abc-123".into()), all_ops);
618
619        assert!(matches!(handle.matcher, OperationMatcher::ById(ref id) if id == "abc-123"));
620    }
621
622    #[tokio::test]
623    async fn test_operation_handle_starts_unpopulated() {
624        let all_ops = Arc::new(RwLock::new(Vec::new()));
625        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
626
627        let inner = handle.inner.read().await;
628        assert!(inner.is_none());
629        assert!(handle.status_rx.borrow().is_none());
630    }
631
632    #[test]
633    fn test_operation_handle_clone() {
634        let all_ops = Arc::new(RwLock::new(Vec::new()));
635        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
636        let cloned = handle.clone();
637
638        // Cloned handle shares the same inner Arc
639        assert!(Arc::ptr_eq(&handle.inner, &cloned.inner));
640        assert!(Arc::ptr_eq(&handle.all_operations, &cloned.all_operations));
641        assert!(matches!(cloned.matcher, OperationMatcher::ByName(ref n) if n == "test"));
642    }
643
644    #[test]
645    fn test_operation_matcher_debug() {
646        let by_name = OperationMatcher::ByName("test".into());
647        let by_index = OperationMatcher::ByIndex(0);
648        let by_id = OperationMatcher::ById("id-1".into());
649
650        // Verify Debug is implemented and produces reasonable output
651        assert!(format!("{:?}", by_name).contains("ByName"));
652        assert!(format!("{:?}", by_index).contains("ByIndex"));
653        assert!(format!("{:?}", by_id).contains("ById"));
654    }
655
656    // =========================================================================
657    // Inspection Method Tests (Requirements 1.3, 1.6)
658    // =========================================================================
659
660    #[tokio::test]
661    async fn test_is_populated_false_when_new() {
662        let all_ops = Arc::new(RwLock::new(Vec::new()));
663        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
664        assert!(!handle.is_populated().await);
665    }
666
667    #[tokio::test]
668    async fn test_is_populated_true_after_population() {
669        let all_ops = Arc::new(RwLock::new(Vec::new()));
670        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
671        populate_handle(&handle, create_step_operation()).await;
672        assert!(handle.is_populated().await);
673    }
674
675    #[tokio::test]
676    async fn test_get_id_unpopulated_returns_error() {
677        let all_ops = Arc::new(RwLock::new(Vec::new()));
678        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
679        let result = handle.get_id().await;
680        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
681    }
682
683    #[tokio::test]
684    async fn test_get_id_populated() {
685        let all_ops = Arc::new(RwLock::new(Vec::new()));
686        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
687        populate_handle(&handle, create_step_operation()).await;
688        assert_eq!(handle.get_id().await.unwrap(), "step-001");
689    }
690
691    #[tokio::test]
692    async fn test_get_name_unpopulated_returns_error() {
693        let all_ops = Arc::new(RwLock::new(Vec::new()));
694        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
695        let result = handle.get_name().await;
696        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
697    }
698
699    #[tokio::test]
700    async fn test_get_name_populated() {
701        let all_ops = Arc::new(RwLock::new(Vec::new()));
702        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
703        populate_handle(&handle, create_step_operation()).await;
704        assert_eq!(
705            handle.get_name().await.unwrap(),
706            Some("my-step".to_string())
707        );
708    }
709
710    #[tokio::test]
711    async fn test_get_type_unpopulated_returns_error() {
712        let all_ops = Arc::new(RwLock::new(Vec::new()));
713        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
714        let result = handle.get_type().await;
715        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
716    }
717
718    #[tokio::test]
719    async fn test_get_type_populated() {
720        let all_ops = Arc::new(RwLock::new(Vec::new()));
721        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
722        populate_handle(&handle, create_step_operation()).await;
723        assert_eq!(handle.get_type().await.unwrap(), OperationType::Step);
724    }
725
726    #[tokio::test]
727    async fn test_get_status_unpopulated_returns_error() {
728        let all_ops = Arc::new(RwLock::new(Vec::new()));
729        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
730        let result = handle.get_status().await;
731        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
732    }
733
734    #[tokio::test]
735    async fn test_get_status_populated() {
736        let all_ops = Arc::new(RwLock::new(Vec::new()));
737        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
738        populate_handle(&handle, create_step_operation()).await;
739        assert_eq!(
740            handle.get_status().await.unwrap(),
741            OperationStatus::Succeeded
742        );
743    }
744
745    #[tokio::test]
746    async fn test_get_step_details_unpopulated_returns_error() {
747        let all_ops = Arc::new(RwLock::new(Vec::new()));
748        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
749        let result: Result<StepDetails<String>, _> = handle.get_step_details().await;
750        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
751    }
752
753    #[tokio::test]
754    async fn test_get_step_details_populated() {
755        let all_ops = Arc::new(RwLock::new(Vec::new()));
756        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
757        populate_handle(&handle, create_step_operation()).await;
758        let details: StepDetails<String> = handle.get_step_details().await.unwrap();
759        assert_eq!(details.result, Some("hello".to_string()));
760        assert_eq!(details.attempt, Some(1));
761    }
762
763    #[tokio::test]
764    async fn test_get_callback_details_unpopulated_returns_error() {
765        let all_ops = Arc::new(RwLock::new(Vec::new()));
766        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
767        let result: Result<CallbackDetails<String>, _> = handle.get_callback_details().await;
768        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
769    }
770
771    #[tokio::test]
772    async fn test_get_callback_details_populated() {
773        let all_ops = Arc::new(RwLock::new(Vec::new()));
774        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
775        populate_handle(&handle, create_callback_operation()).await;
776        let details: CallbackDetails<String> = handle.get_callback_details().await.unwrap();
777        assert_eq!(details.callback_id, Some("cb-id-123".to_string()));
778    }
779
780    #[tokio::test]
781    async fn test_get_wait_details_unpopulated_returns_error() {
782        let all_ops = Arc::new(RwLock::new(Vec::new()));
783        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
784        let result = handle.get_wait_details().await;
785        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
786    }
787
788    #[tokio::test]
789    async fn test_get_wait_details_populated() {
790        let all_ops = Arc::new(RwLock::new(Vec::new()));
791        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
792        populate_handle(&handle, create_wait_operation()).await;
793        let details = handle.get_wait_details().await.unwrap();
794        assert_eq!(details.wait_seconds, Some(5));
795    }
796
797    #[tokio::test]
798    async fn test_get_invoke_details_unpopulated_returns_error() {
799        let all_ops = Arc::new(RwLock::new(Vec::new()));
800        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
801        let result: Result<InvokeDetails<serde_json::Value>, _> = handle.get_invoke_details().await;
802        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
803    }
804
805    #[tokio::test]
806    async fn test_get_invoke_details_populated() {
807        let all_ops = Arc::new(RwLock::new(Vec::new()));
808        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
809        populate_handle(&handle, create_invoke_operation()).await;
810        let details: InvokeDetails<i32> = handle.get_invoke_details().await.unwrap();
811        assert_eq!(details.result, Some(42));
812    }
813
814    #[tokio::test]
815    async fn test_get_context_details_unpopulated_returns_error() {
816        let all_ops = Arc::new(RwLock::new(Vec::new()));
817        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
818        let result: Result<ContextDetails<String>, _> = handle.get_context_details().await;
819        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
820    }
821
822    #[tokio::test]
823    async fn test_get_context_details_populated() {
824        let all_ops = Arc::new(RwLock::new(Vec::new()));
825        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
826        populate_handle(&handle, create_context_operation()).await;
827        let details: ContextDetails<String> = handle.get_context_details().await.unwrap();
828        assert_eq!(details.result, Some("done".to_string()));
829    }
830
831    #[tokio::test]
832    async fn test_get_step_details_wrong_type_returns_type_mismatch() {
833        let all_ops = Arc::new(RwLock::new(Vec::new()));
834        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
835        populate_handle(&handle, create_wait_operation()).await;
836        let result: Result<StepDetails<String>, _> = handle.get_step_details().await;
837        assert!(matches!(
838            result,
839            Err(TestError::OperationTypeMismatch { .. })
840        ));
841    }
842
843    // =========================================================================
844    // wait_for_data Tests (Requirements 1.4, 4.1, 4.2, 4.3, 4.5)
845    // =========================================================================
846
847    #[tokio::test]
848    async fn test_wait_for_data_started_resolves_immediately_when_populated() {
849        let all_ops = Arc::new(RwLock::new(Vec::new()));
850        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
851        populate_handle(&handle, create_step_operation()).await;
852        // Any populated operation satisfies Started
853        let result = handle.wait_for_data(WaitingOperationStatus::Started).await;
854        assert!(result.is_ok());
855    }
856
857    #[tokio::test]
858    async fn test_wait_for_data_completed_resolves_immediately_when_terminal() {
859        let all_ops = Arc::new(RwLock::new(Vec::new()));
860        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
861        // create_step_operation has status Succeeded (terminal)
862        populate_handle(&handle, create_step_operation()).await;
863        let result = handle
864            .wait_for_data(WaitingOperationStatus::Completed)
865            .await;
866        assert!(result.is_ok());
867    }
868
869    #[tokio::test]
870    async fn test_wait_for_data_submitted_resolves_for_callback_with_id() {
871        let all_ops = Arc::new(RwLock::new(Vec::new()));
872        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
873        // create_callback_operation has callback_id set
874        populate_handle(&handle, create_callback_operation()).await;
875        let result = handle
876            .wait_for_data(WaitingOperationStatus::Submitted)
877            .await;
878        assert!(result.is_ok());
879    }
880
881    #[tokio::test]
882    async fn test_wait_for_data_submitted_resolves_for_non_callback() {
883        let all_ops = Arc::new(RwLock::new(Vec::new()));
884        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
885        // Non-callback operations treat Submitted as Started
886        populate_handle(&handle, create_step_operation()).await;
887        let result = handle
888            .wait_for_data(WaitingOperationStatus::Submitted)
889            .await;
890        assert!(result.is_ok());
891    }
892
893    #[tokio::test]
894    async fn test_wait_for_data_unpopulated_not_started_returns_error_on_channel_close() {
895        // Test the ExecutionCompletedEarly path by verifying that when the
896        // operation never reaches the target status and the channel closes,
897        // we get the appropriate error.
898        //
899        // In real usage, the OperationHandle holds a status_tx sender, which
900        // means the channel can't close while the handle exists. The channel-
901        // close path is a safety net for edge cases. We test the underlying
902        // logic via check_status_reached instead.
903        let all_ops = Arc::new(RwLock::new(Vec::new()));
904        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
905
906        // Unpopulated handle should not satisfy any status
907        assert!(
908            !handle
909                .check_status_reached(WaitingOperationStatus::Started)
910                .await
911        );
912        assert!(
913            !handle
914                .check_status_reached(WaitingOperationStatus::Submitted)
915                .await
916        );
917        assert!(
918            !handle
919                .check_status_reached(WaitingOperationStatus::Completed)
920                .await
921        );
922    }
923
924    #[tokio::test]
925    async fn test_wait_for_data_non_terminal_does_not_satisfy_completed() {
926        let all_ops = Arc::new(RwLock::new(Vec::new()));
927        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
928
929        // Populate with a non-terminal status
930        let mut op = Operation::new("step-001".to_string(), OperationType::Step);
931        op.status = OperationStatus::Started;
932        populate_handle(&handle, op).await;
933
934        // Started satisfies Started but not Completed
935        assert!(
936            handle
937                .check_status_reached(WaitingOperationStatus::Started)
938                .await
939        );
940        assert!(
941            !handle
942                .check_status_reached(WaitingOperationStatus::Completed)
943                .await
944        );
945    }
946
947    #[tokio::test]
948    async fn test_wait_for_data_callback_without_id_does_not_satisfy_submitted() {
949        let all_ops = Arc::new(RwLock::new(Vec::new()));
950        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
951
952        // Populate with a callback that has no callback_id
953        let mut op = Operation::new("cb-001".to_string(), OperationType::Callback);
954        op.status = OperationStatus::Started;
955        op.callback_details = Some(SdkCallbackDetails {
956            callback_id: None,
957            result: None,
958            error: None,
959        });
960        populate_handle(&handle, op).await;
961
962        // Started is satisfied, but Submitted is not (no callback_id)
963        assert!(
964            handle
965                .check_status_reached(WaitingOperationStatus::Started)
966                .await
967        );
968        assert!(
969            !handle
970                .check_status_reached(WaitingOperationStatus::Submitted)
971                .await
972        );
973    }
974
975    #[tokio::test]
976    async fn test_status_matches_target_all_terminal_statuses_satisfy_completed() {
977        let terminal_statuses = vec![
978            OperationStatus::Succeeded,
979            OperationStatus::Failed,
980            OperationStatus::Cancelled,
981            OperationStatus::TimedOut,
982            OperationStatus::Stopped,
983        ];
984
985        for status in terminal_statuses {
986            let mut op = Operation::new("op-001".to_string(), OperationType::Step);
987            op.status = status;
988            assert!(
989                OperationHandle::status_matches_target(&op, WaitingOperationStatus::Completed),
990                "Expected {:?} to satisfy Completed",
991                status
992            );
993        }
994    }
995
996    #[tokio::test]
997    async fn test_status_matches_target_non_terminal_does_not_satisfy_completed() {
998        let non_terminal_statuses = vec![
999            OperationStatus::Started,
1000            OperationStatus::Pending,
1001            OperationStatus::Ready,
1002        ];
1003
1004        for status in non_terminal_statuses {
1005            let mut op = Operation::new("op-001".to_string(), OperationType::Step);
1006            op.status = status;
1007            assert!(
1008                !OperationHandle::status_matches_target(&op, WaitingOperationStatus::Completed),
1009                "Expected {:?} to NOT satisfy Completed",
1010                status
1011            );
1012        }
1013    }
1014
1015    #[tokio::test]
1016    async fn test_wait_for_data_resolves_when_status_update_arrives() {
1017        let all_ops = Arc::new(RwLock::new(Vec::new()));
1018        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1019
1020        let handle_clone = handle.clone();
1021        // Spawn a task that populates the handle and sends a status update after a delay
1022        tokio::spawn(async move {
1023            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1024            // Populate with a completed operation
1025            let mut op = create_step_operation();
1026            op.status = OperationStatus::Succeeded;
1027            {
1028                let mut inner = handle_clone.inner.write().await;
1029                *inner = Some(op);
1030            }
1031            let _ = handle_clone
1032                .status_tx
1033                .send(Some(OperationStatus::Succeeded));
1034        });
1035
1036        let result = handle
1037            .wait_for_data(WaitingOperationStatus::Completed)
1038            .await;
1039        assert!(result.is_ok());
1040    }
1041
1042    #[tokio::test]
1043    async fn test_wait_for_data_waits_through_non_terminal_updates() {
1044        let all_ops = Arc::new(RwLock::new(Vec::new()));
1045        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1046
1047        let handle_clone = handle.clone();
1048        tokio::spawn(async move {
1049            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1050            // First: populate with Started status (non-terminal)
1051            let mut op = create_step_operation();
1052            op.status = OperationStatus::Started;
1053            {
1054                let mut inner = handle_clone.inner.write().await;
1055                *inner = Some(op);
1056            }
1057            let _ = handle_clone.status_tx.send(Some(OperationStatus::Started));
1058
1059            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1060            // Then: update to Succeeded (terminal)
1061            {
1062                let mut inner = handle_clone.inner.write().await;
1063                if let Some(ref mut op) = *inner {
1064                    op.status = OperationStatus::Succeeded;
1065                }
1066            }
1067            let _ = handle_clone
1068                .status_tx
1069                .send(Some(OperationStatus::Succeeded));
1070        });
1071
1072        let result = handle
1073            .wait_for_data(WaitingOperationStatus::Completed)
1074            .await;
1075        assert!(result.is_ok());
1076    }
1077
1078    #[tokio::test]
1079    async fn test_wait_for_data_submitted_waits_for_callback_id() {
1080        let all_ops = Arc::new(RwLock::new(Vec::new()));
1081        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1082
1083        let handle_clone = handle.clone();
1084        tokio::spawn(async move {
1085            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1086            // First: populate with callback but no callback_id
1087            let mut op = Operation::new("cb-001".to_string(), OperationType::Callback);
1088            op.name = Some("my-callback".to_string());
1089            op.status = OperationStatus::Started;
1090            op.callback_details = Some(SdkCallbackDetails {
1091                callback_id: None,
1092                result: None,
1093                error: None,
1094            });
1095            {
1096                let mut inner = handle_clone.inner.write().await;
1097                *inner = Some(op);
1098            }
1099            let _ = handle_clone.status_tx.send(Some(OperationStatus::Started));
1100
1101            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1102            // Then: update with callback_id
1103            {
1104                let mut inner = handle_clone.inner.write().await;
1105                if let Some(ref mut op) = *inner {
1106                    if let Some(ref mut details) = op.callback_details {
1107                        details.callback_id = Some("cb-id-123".to_string());
1108                    }
1109                }
1110            }
1111            let _ = handle_clone.status_tx.send(Some(OperationStatus::Started));
1112        });
1113
1114        let result = handle
1115            .wait_for_data(WaitingOperationStatus::Submitted)
1116            .await;
1117        assert!(result.is_ok());
1118    }
1119
1120    #[tokio::test]
1121    async fn test_matcher_description() {
1122        let all_ops = Arc::new(RwLock::new(Vec::new()));
1123
1124        let handle =
1125            OperationHandle::new(OperationMatcher::ByName("my-op".into()), all_ops.clone());
1126        assert_eq!(handle.matcher_description(), "name=my-op");
1127
1128        let handle = OperationHandle::new(OperationMatcher::ByIndex(3), all_ops.clone());
1129        assert_eq!(handle.matcher_description(), "index=3");
1130
1131        let handle = OperationHandle::new(OperationMatcher::ById("abc-123".into()), all_ops);
1132        assert_eq!(handle.matcher_description(), "id=abc-123");
1133    }
1134
1135    // =========================================================================
1136    // Callback Method Tests (Requirements 1.5, 5.4)
1137    // =========================================================================
1138
1139    /// Mock callback sender for testing.
1140    #[derive(Clone)]
1141    struct MockCallbackSender {
1142        success_calls: Arc<RwLock<Vec<(String, String)>>>,
1143        failure_calls: Arc<RwLock<Vec<(String, String)>>>,
1144        heartbeat_calls: Arc<RwLock<Vec<String>>>,
1145    }
1146
1147    impl MockCallbackSender {
1148        fn new() -> Self {
1149            Self {
1150                success_calls: Arc::new(RwLock::new(Vec::new())),
1151                failure_calls: Arc::new(RwLock::new(Vec::new())),
1152                heartbeat_calls: Arc::new(RwLock::new(Vec::new())),
1153            }
1154        }
1155    }
1156
1157    #[async_trait::async_trait]
1158    impl CallbackSender for MockCallbackSender {
1159        async fn send_success(&self, callback_id: &str, result: &str) -> Result<(), TestError> {
1160            self.success_calls
1161                .write()
1162                .await
1163                .push((callback_id.to_string(), result.to_string()));
1164            Ok(())
1165        }
1166
1167        async fn send_failure(
1168            &self,
1169            callback_id: &str,
1170            error: &crate::types::TestResultError,
1171        ) -> Result<(), TestError> {
1172            self.failure_calls
1173                .write()
1174                .await
1175                .push((callback_id.to_string(), error.to_string()));
1176            Ok(())
1177        }
1178
1179        async fn send_heartbeat(&self, callback_id: &str) -> Result<(), TestError> {
1180            self.heartbeat_calls
1181                .write()
1182                .await
1183                .push(callback_id.to_string());
1184            Ok(())
1185        }
1186    }
1187
1188    /// Helper to create a handle with a mock callback sender and a populated callback operation.
1189    async fn create_callback_handle_with_sender() -> (OperationHandle, MockCallbackSender) {
1190        let all_ops = Arc::new(RwLock::new(Vec::new()));
1191        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1192        let sender = MockCallbackSender::new();
1193        {
1194            let mut guard = handle.callback_sender.write().await;
1195            *guard = Some(Arc::new(sender.clone()));
1196        }
1197        populate_handle(&handle, create_callback_operation()).await;
1198        (handle, sender)
1199    }
1200
1201    #[tokio::test]
1202    async fn test_send_callback_success_unpopulated_returns_error() {
1203        let all_ops = Arc::new(RwLock::new(Vec::new()));
1204        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1205        let result = handle.send_callback_success("ok").await;
1206        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
1207    }
1208
1209    #[tokio::test]
1210    async fn test_send_callback_success_non_callback_returns_error() {
1211        let all_ops = Arc::new(RwLock::new(Vec::new()));
1212        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1213        populate_handle(&handle, create_step_operation()).await;
1214        let result = handle.send_callback_success("ok").await;
1215        assert!(matches!(result, Err(TestError::NotCallbackOperation)));
1216    }
1217
1218    #[tokio::test]
1219    async fn test_send_callback_success_no_callback_id_returns_error() {
1220        let all_ops = Arc::new(RwLock::new(Vec::new()));
1221        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1222        // Callback without callback_id (not yet Submitted)
1223        let mut op = Operation::new("cb-001".to_string(), OperationType::Callback);
1224        op.callback_details = Some(SdkCallbackDetails {
1225            callback_id: None,
1226            result: None,
1227            error: None,
1228        });
1229        populate_handle(&handle, op).await;
1230        let result = handle.send_callback_success("ok").await;
1231        assert!(matches!(result, Err(TestError::ResultNotAvailable(_))));
1232    }
1233
1234    #[tokio::test]
1235    async fn test_send_callback_success_no_sender_returns_error() {
1236        let all_ops = Arc::new(RwLock::new(Vec::new()));
1237        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1238        populate_handle(&handle, create_callback_operation()).await;
1239        // No callback_sender configured
1240        let result = handle.send_callback_success("ok").await;
1241        assert!(matches!(result, Err(TestError::ResultNotAvailable(_))));
1242    }
1243
1244    #[tokio::test]
1245    async fn test_send_callback_success_delegates_to_sender() {
1246        let (handle, sender) = create_callback_handle_with_sender().await;
1247        let result = handle.send_callback_success("my-result").await;
1248        assert!(result.is_ok());
1249        let calls = sender.success_calls.read().await;
1250        assert_eq!(calls.len(), 1);
1251        assert_eq!(calls[0], ("cb-id-123".to_string(), "my-result".to_string()));
1252    }
1253
1254    #[tokio::test]
1255    async fn test_send_callback_failure_unpopulated_returns_error() {
1256        let all_ops = Arc::new(RwLock::new(Vec::new()));
1257        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1258        let error = crate::types::TestResultError::new("TestError", "something failed");
1259        let result = handle.send_callback_failure(&error).await;
1260        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
1261    }
1262
1263    #[tokio::test]
1264    async fn test_send_callback_failure_non_callback_returns_error() {
1265        let all_ops = Arc::new(RwLock::new(Vec::new()));
1266        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1267        populate_handle(&handle, create_step_operation()).await;
1268        let error = crate::types::TestResultError::new("TestError", "something failed");
1269        let result = handle.send_callback_failure(&error).await;
1270        assert!(matches!(result, Err(TestError::NotCallbackOperation)));
1271    }
1272
1273    #[tokio::test]
1274    async fn test_send_callback_failure_delegates_to_sender() {
1275        let (handle, sender) = create_callback_handle_with_sender().await;
1276        let error = crate::types::TestResultError::new("TestError", "something failed");
1277        let result = handle.send_callback_failure(&error).await;
1278        assert!(result.is_ok());
1279        let calls = sender.failure_calls.read().await;
1280        assert_eq!(calls.len(), 1);
1281        assert_eq!(calls[0].0, "cb-id-123");
1282    }
1283
1284    #[tokio::test]
1285    async fn test_send_callback_heartbeat_unpopulated_returns_error() {
1286        let all_ops = Arc::new(RwLock::new(Vec::new()));
1287        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1288        let result = handle.send_callback_heartbeat().await;
1289        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
1290    }
1291
1292    #[tokio::test]
1293    async fn test_send_callback_heartbeat_non_callback_returns_error() {
1294        let all_ops = Arc::new(RwLock::new(Vec::new()));
1295        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1296        populate_handle(&handle, create_step_operation()).await;
1297        let result = handle.send_callback_heartbeat().await;
1298        assert!(matches!(result, Err(TestError::NotCallbackOperation)));
1299    }
1300
1301    #[tokio::test]
1302    async fn test_send_callback_heartbeat_delegates_to_sender() {
1303        let (handle, sender) = create_callback_handle_with_sender().await;
1304        let result = handle.send_callback_heartbeat().await;
1305        assert!(result.is_ok());
1306        let calls = sender.heartbeat_calls.read().await;
1307        assert_eq!(calls.len(), 1);
1308        assert_eq!(calls[0], "cb-id-123");
1309    }
1310
1311    // =========================================================================
1312    // get_child_operations() tests (Requirements 3.1, 3.2, 3.3, 3.4)
1313    // =========================================================================
1314
1315    /// Helper to create an operation with a parent_id for child enumeration tests.
1316    fn create_operation_with_parent(id: &str, name: &str, parent_id: Option<&str>) -> Operation {
1317        let mut op = Operation::new(id.to_string(), OperationType::Step);
1318        op.name = Some(name.to_string());
1319        op.status = OperationStatus::Succeeded;
1320        op.parent_id = parent_id.map(|s| s.to_string());
1321        op
1322    }
1323
1324    #[tokio::test]
1325    async fn test_get_child_operations_unpopulated_returns_error() {
1326        let all_ops = Arc::new(RwLock::new(Vec::new()));
1327        let handle = OperationHandle::new(OperationMatcher::ByName("parent".into()), all_ops);
1328
1329        let result = handle.get_child_operations().await;
1330        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
1331    }
1332
1333    #[tokio::test]
1334    async fn test_get_child_operations_returns_matching_children() {
1335        let parent_op = create_operation_with_parent("parent-1", "parent", None);
1336        let child1 = create_operation_with_parent("child-1", "child-a", Some("parent-1"));
1337        let child2 = create_operation_with_parent("child-2", "child-b", Some("parent-1"));
1338        let unrelated = create_operation_with_parent("other-1", "other", Some("other-parent"));
1339
1340        let all_ops = Arc::new(RwLock::new(vec![
1341            parent_op.clone(),
1342            child1,
1343            child2,
1344            unrelated,
1345        ]));
1346
1347        let handle = OperationHandle::new(OperationMatcher::ByName("parent".into()), all_ops);
1348        populate_handle(&handle, parent_op).await;
1349
1350        let children = handle.get_child_operations().await.unwrap();
1351        assert_eq!(children.len(), 2);
1352        assert_eq!(children[0].get_id(), "child-1");
1353        assert_eq!(children[1].get_id(), "child-2");
1354    }
1355
1356    #[tokio::test]
1357    async fn test_get_child_operations_empty_when_no_children() {
1358        let parent_op = create_operation_with_parent("parent-1", "parent", None);
1359        let unrelated = create_operation_with_parent("other-1", "other", Some("other-parent"));
1360
1361        let all_ops = Arc::new(RwLock::new(vec![parent_op.clone(), unrelated]));
1362
1363        let handle = OperationHandle::new(OperationMatcher::ByName("parent".into()), all_ops);
1364        populate_handle(&handle, parent_op).await;
1365
1366        let children = handle.get_child_operations().await.unwrap();
1367        assert!(children.is_empty());
1368    }
1369
1370    #[tokio::test]
1371    async fn test_get_child_operations_preserves_execution_order() {
1372        let parent_op = create_operation_with_parent("parent-1", "parent", None);
1373        let child_c = create_operation_with_parent("child-c", "third", Some("parent-1"));
1374        let child_a = create_operation_with_parent("child-a", "first", Some("parent-1"));
1375        let child_b = create_operation_with_parent("child-b", "second", Some("parent-1"));
1376
1377        // Insertion order: c, a, b — children should come back in that order
1378        let all_ops = Arc::new(RwLock::new(vec![
1379            parent_op.clone(),
1380            child_c,
1381            child_a,
1382            child_b,
1383        ]));
1384
1385        let handle = OperationHandle::new(OperationMatcher::ByName("parent".into()), all_ops);
1386        populate_handle(&handle, parent_op).await;
1387
1388        let children = handle.get_child_operations().await.unwrap();
1389        assert_eq!(children.len(), 3);
1390        assert_eq!(children[0].get_id(), "child-c");
1391        assert_eq!(children[1].get_id(), "child-a");
1392        assert_eq!(children[2].get_id(), "child-b");
1393    }
1394
1395    #[tokio::test]
1396    async fn test_get_child_operations_children_support_recursive_enumeration() {
1397        let parent_op = create_operation_with_parent("parent-1", "parent", None);
1398        let child = create_operation_with_parent("child-1", "child", Some("parent-1"));
1399        let grandchild =
1400            create_operation_with_parent("grandchild-1", "grandchild", Some("child-1"));
1401
1402        let all_ops = Arc::new(RwLock::new(vec![parent_op.clone(), child, grandchild]));
1403
1404        let handle = OperationHandle::new(OperationMatcher::ByName("parent".into()), all_ops);
1405        populate_handle(&handle, parent_op).await;
1406
1407        let children = handle.get_child_operations().await.unwrap();
1408        assert_eq!(children.len(), 1);
1409        assert_eq!(children[0].get_id(), "child-1");
1410
1411        // Recursive: child should be able to enumerate its own children
1412        let grandchildren = children[0].get_child_operations();
1413        assert_eq!(grandchildren.len(), 1);
1414        assert_eq!(grandchildren[0].get_id(), "grandchild-1");
1415    }
1416}