Skip to main content

durable_execution_sdk_testing/
operation_handle.rs

1//! Lazy operation handle for pre-run operation registration.
2//!
3//! An `OperationHandle` is a lazy reference to a named operation that is registered
4//! before `run()` and auto-populates with operation data during execution. This enables
5//! the idiomatic callback testing pattern: pre-register a handle, start a non-blocking
6//! run, wait for mid-execution status changes, send callback responses, and await the
7//! final result.
8
9use std::sync::Arc;
10use tokio::sync::{watch, RwLock};
11
12use serde::de::DeserializeOwned;
13
14use durable_execution_sdk::{Operation, OperationStatus, OperationType};
15
16use crate::error::TestError;
17use crate::operation::{
18    CallbackDetails, CallbackSender, ContextDetails, DurableOperation, InvokeDetails, StepDetails,
19    WaitDetails,
20};
21use crate::types::WaitingOperationStatus;
22
23/// How an `OperationHandle` matches against operations during execution.
24#[derive(Clone, Debug)]
25pub enum OperationMatcher {
26    /// Match by operation name (first operation with this name).
27    ByName(String),
28    /// Match by execution order index.
29    ByIndex(usize),
30    /// Match by unique operation ID.
31    ById(String),
32    /// Match by operation name and occurrence index (nth operation with this name).
33    ByNameAndIndex(String, usize),
34}
35
36/// A lazy handle to an operation that will be populated during execution.
37///
38/// Registered before `run()` via `get_operation_handle()`, `get_operation_handle_by_index()`,
39/// or `get_operation_handle_by_id()` on the `LocalDurableTestRunner`. The handle starts
40/// unpopulated and is filled with operation data when the orchestrator finds a matching
41/// operation during execution.
42///
43/// # Examples
44///
45/// ```ignore
46/// use durable_execution_sdk_testing::OperationHandle;
47///
48/// // Pre-register a handle before run()
49/// let handle = runner.get_operation_handle("my-callback");
50///
51/// // Start non-blocking execution
52/// let future = runner.run(input);
53///
54/// // Wait for the operation to reach Submitted status
55/// handle.wait_for_data(WaitingOperationStatus::Submitted).await?;
56///
57/// // Send callback response
58/// handle.send_callback_success("result").await?;
59///
60/// // Await the final result
61/// let result = future.await?;
62/// ```
63pub struct OperationHandle {
64    /// How this handle matches operations.
65    pub(crate) matcher: OperationMatcher,
66    /// Shared operation data, populated during execution.
67    pub(crate) inner: Arc<RwLock<Option<Operation>>>,
68    /// Watch channel sender for status notifications.
69    pub(crate) status_tx: watch::Sender<Option<OperationStatus>>,
70    /// Watch channel receiver for status notifications.
71    pub(crate) status_rx: watch::Receiver<Option<OperationStatus>>,
72    /// Callback sender for interacting with the checkpoint server.
73    /// Wrapped in Arc<RwLock<>> so that clones share the same sender reference.
74    /// This allows `with_handles()` to set the sender after the handle is cloned
75    /// by the test code, and the test's clone will see the update.
76    pub(crate) callback_sender: Arc<RwLock<Option<Arc<dyn CallbackSender>>>>,
77    /// Shared reference to all operations (for child enumeration).
78    pub(crate) all_operations: Arc<RwLock<Vec<Operation>>>,
79}
80
81impl Clone for OperationHandle {
82    fn clone(&self) -> Self {
83        Self {
84            matcher: self.matcher.clone(),
85            inner: Arc::clone(&self.inner),
86            status_tx: self.status_tx.clone(),
87            status_rx: self.status_rx.clone(),
88            callback_sender: Arc::clone(&self.callback_sender),
89            all_operations: Arc::clone(&self.all_operations),
90        }
91    }
92}
93
94impl OperationHandle {
95    /// Creates a new unpopulated `OperationHandle` with the given matcher.
96    ///
97    /// The handle starts with no operation data. It will be populated during
98    /// execution when the orchestrator finds an operation matching the matcher.
99    ///
100    /// # Arguments
101    ///
102    /// * `matcher` - How this handle should match against operations
103    /// * `all_operations` - Shared reference to all operations for child enumeration
104    pub fn new(matcher: OperationMatcher, all_operations: Arc<RwLock<Vec<Operation>>>) -> Self {
105        let (status_tx, status_rx) = watch::channel(None);
106        Self {
107            matcher,
108            inner: Arc::new(RwLock::new(None)),
109            status_tx,
110            status_rx,
111            callback_sender: Arc::new(RwLock::new(None)),
112            all_operations,
113        }
114    }
115
116    // =========================================================================
117    // Inspection Methods (Requirements 1.3, 1.6)
118    // =========================================================================
119
120    /// Helper to get the inner operation or return an error if unpopulated.
121    async fn get_durable_operation(&self) -> Result<DurableOperation, TestError> {
122        let inner = self.inner.read().await;
123        match inner.as_ref() {
124            Some(op) => Ok(DurableOperation::new(op.clone())),
125            None => Err(TestError::OperationNotFound(
126                "Operation not yet populated".into(),
127            )),
128        }
129    }
130
131    /// Gets the operation ID.
132    ///
133    /// # Returns
134    ///
135    /// - `Ok(String)` - The operation ID if the handle is populated
136    /// - `Err(TestError::OperationNotFound)` - If the handle is not yet populated
137    pub async fn get_id(&self) -> Result<String, TestError> {
138        let op = self.get_durable_operation().await?;
139        Ok(op.get_id().to_string())
140    }
141
142    /// Gets the operation name.
143    ///
144    /// # Returns
145    ///
146    /// - `Ok(Option<String>)` - The operation name if the handle is populated
147    /// - `Err(TestError::OperationNotFound)` - If the handle is not yet populated
148    pub async fn get_name(&self) -> Result<Option<String>, TestError> {
149        let op = self.get_durable_operation().await?;
150        Ok(op.get_name().map(|s| s.to_string()))
151    }
152
153    /// Gets the operation type.
154    ///
155    /// # Returns
156    ///
157    /// - `Ok(OperationType)` - The operation type if the handle is populated
158    /// - `Err(TestError::OperationNotFound)` - If the handle is not yet populated
159    pub async fn get_type(&self) -> Result<OperationType, TestError> {
160        let op = self.get_durable_operation().await?;
161        Ok(op.get_type())
162    }
163
164    /// Gets the operation status.
165    ///
166    /// # Returns
167    ///
168    /// - `Ok(OperationStatus)` - The operation status if the handle is populated
169    /// - `Err(TestError::OperationNotFound)` - If the handle is not yet populated
170    pub async fn get_status(&self) -> Result<OperationStatus, TestError> {
171        let op = self.get_durable_operation().await?;
172        Ok(op.get_status())
173    }
174
175    /// Gets step-specific details.
176    ///
177    /// # Type Parameters
178    ///
179    /// * `T` - The type to deserialize the result into
180    ///
181    /// # Returns
182    ///
183    /// - `Ok(StepDetails<T>)` - The step details if populated and is a Step operation
184    /// - `Err(TestError)` - If unpopulated or wrong operation type
185    pub async fn get_step_details<T: DeserializeOwned>(&self) -> Result<StepDetails<T>, TestError> {
186        let op = self.get_durable_operation().await?;
187        op.get_step_details()
188    }
189
190    /// Gets callback-specific details.
191    ///
192    /// # Type Parameters
193    ///
194    /// * `T` - The type to deserialize the result into
195    ///
196    /// # Returns
197    ///
198    /// - `Ok(CallbackDetails<T>)` - The callback details if populated and is a Callback operation
199    /// - `Err(TestError)` - If unpopulated or wrong operation type
200    pub async fn get_callback_details<T: DeserializeOwned>(
201        &self,
202    ) -> Result<CallbackDetails<T>, TestError> {
203        let op = self.get_durable_operation().await?;
204        op.get_callback_details()
205    }
206
207    /// Gets wait-specific details.
208    ///
209    /// # Returns
210    ///
211    /// - `Ok(WaitDetails)` - The wait details if populated and is a Wait operation
212    /// - `Err(TestError)` - If unpopulated or wrong operation type
213    pub async fn get_wait_details(&self) -> Result<WaitDetails, TestError> {
214        let op = self.get_durable_operation().await?;
215        op.get_wait_details()
216    }
217
218    /// Gets invoke-specific details.
219    ///
220    /// # Type Parameters
221    ///
222    /// * `T` - The type to deserialize the result into
223    ///
224    /// # Returns
225    ///
226    /// - `Ok(InvokeDetails<T>)` - The invoke details if populated and is an Invoke operation
227    /// - `Err(TestError)` - If unpopulated or wrong operation type
228    pub async fn get_invoke_details<T: DeserializeOwned>(
229        &self,
230    ) -> Result<InvokeDetails<T>, TestError> {
231        let op = self.get_durable_operation().await?;
232        op.get_invoke_details()
233    }
234
235    /// Gets context-specific details.
236    ///
237    /// # Type Parameters
238    ///
239    /// * `T` - The type to deserialize the result into
240    ///
241    /// # Returns
242    ///
243    /// - `Ok(ContextDetails<T>)` - The context details if populated and is a Context operation
244    /// - `Err(TestError)` - If unpopulated or wrong operation type
245    pub async fn get_context_details<T: DeserializeOwned>(
246        &self,
247    ) -> Result<ContextDetails<T>, TestError> {
248        let op = self.get_durable_operation().await?;
249        op.get_context_details()
250    }
251
252    /// Checks if the handle has been populated with operation data.
253    ///
254    /// # Returns
255    ///
256    /// `true` if the handle has been populated, `false` otherwise.
257    pub async fn is_populated(&self) -> bool {
258        self.inner.read().await.is_some()
259    }
260
261    // =========================================================================
262    // Async Waiting (Requirements 1.4, 4.1, 4.2, 4.3, 4.5)
263    // =========================================================================
264
265    /// Waits for the operation to reach the specified `WaitingOperationStatus`.
266    ///
267    /// Resolves immediately if the operation has already reached the target status.
268    /// If the watch channel closes (execution ended) before the target status is
269    /// reached, returns `Err(TestError::ExecutionCompletedEarly(...))`.
270    ///
271    /// # Arguments
272    ///
273    /// * `status` - The target status to wait for
274    ///
275    /// # Returns
276    ///
277    /// - `Ok(())` - When the operation reaches the target status
278    /// - `Err(TestError::ExecutionCompletedEarly)` - If execution ends before reaching the target
279    ///
280    /// # Examples
281    ///
282    /// ```ignore
283    /// // Wait for a callback to be ready for responses
284    /// handle.wait_for_data(WaitingOperationStatus::Submitted).await?;
285    ///
286    /// // Wait for an operation to complete
287    /// handle.wait_for_data(WaitingOperationStatus::Completed).await?;
288    /// ```
289    pub async fn wait_for_data(&self, status: WaitingOperationStatus) -> Result<(), TestError> {
290        // Check if the current status already satisfies the target
291        if self.check_status_reached(status).await {
292            return Ok(());
293        }
294
295        // Subscribe to the watch channel and wait for updates
296        let mut rx = self.status_rx.clone();
297        loop {
298            // Wait for the next status change
299            if rx.changed().await.is_err() {
300                // Channel closed — execution ended. Do a final check.
301                if self.check_status_reached(status).await {
302                    return Ok(());
303                }
304                return Err(TestError::execution_completed_early(
305                    self.matcher_description(),
306                    status,
307                ));
308            }
309
310            // Check if the new status satisfies the target
311            if self.check_status_reached(status).await {
312                return Ok(());
313            }
314        }
315    }
316
317    /// Checks whether the current operation state satisfies the target `WaitingOperationStatus`.
318    async fn check_status_reached(&self, target: WaitingOperationStatus) -> bool {
319        let inner = self.inner.read().await;
320        match inner.as_ref() {
321            None => false,
322            Some(op) => Self::status_matches_target(op, target),
323        }
324    }
325
326    /// Determines if an operation's current state matches the target waiting status.
327    fn status_matches_target(op: &Operation, target: WaitingOperationStatus) -> bool {
328        match target {
329            WaitingOperationStatus::Started => {
330                // Any populated operation has started
331                true
332            }
333            WaitingOperationStatus::Submitted => {
334                // For callbacks, Submitted means callback_id is available
335                if op.operation_type == OperationType::Callback {
336                    op.callback_details
337                        .as_ref()
338                        .map(|d| d.callback_id.is_some())
339                        .unwrap_or(false)
340                } else {
341                    // For non-callbacks, treat Submitted as Started
342                    true
343                }
344            }
345            WaitingOperationStatus::Completed => {
346                // Terminal status means completed
347                op.status.is_terminal()
348            }
349        }
350    }
351
352    // =========================================================================
353    // Callback Interaction Methods (Requirements 1.5, 5.4)
354    // =========================================================================
355
356    /// Sends a success response for a callback operation.
357    ///
358    /// Validates that the handle is populated, the operation is a callback type,
359    /// and the callback_id is available (Submitted status) before delegating to
360    /// the callback sender.
361    ///
362    /// # Arguments
363    ///
364    /// * `result` - The result value to send as a JSON string
365    ///
366    /// # Returns
367    ///
368    /// - `Ok(())` - If the callback response was sent successfully
369    /// - `Err(TestError::OperationNotFound)` - If the handle is not yet populated
370    /// - `Err(TestError::NotCallbackOperation)` - If the operation is not a callback
371    /// - `Err(TestError::ResultNotAvailable)` - If the callback_id is not yet available
372    pub async fn send_callback_success(&self, result: &str) -> Result<(), TestError> {
373        let callback_id = self.validate_callback_ready().await?;
374
375        let sender_guard = self.callback_sender.read().await;
376        match sender_guard.as_ref() {
377            Some(sender) => sender.send_success(&callback_id, result).await,
378            None => Err(TestError::result_not_available(
379                "No callback sender configured on this handle",
380            )),
381        }
382    }
383
384    /// Sends a failure response for a callback operation.
385    ///
386    /// Validates that the handle is populated, the operation is a callback type,
387    /// and the callback_id is available (Submitted status) before delegating to
388    /// the callback sender.
389    ///
390    /// # Arguments
391    ///
392    /// * `error` - The error information to send
393    ///
394    /// # Returns
395    ///
396    /// - `Ok(())` - If the callback failure was sent successfully
397    /// - `Err(TestError::OperationNotFound)` - If the handle is not yet populated
398    /// - `Err(TestError::NotCallbackOperation)` - If the operation is not a callback
399    /// - `Err(TestError::ResultNotAvailable)` - If the callback_id is not yet available
400    pub async fn send_callback_failure(
401        &self,
402        error: &crate::types::TestResultError,
403    ) -> Result<(), TestError> {
404        let callback_id = self.validate_callback_ready().await?;
405
406        let sender_guard = self.callback_sender.read().await;
407        match sender_guard.as_ref() {
408            Some(sender) => sender.send_failure(&callback_id, error).await,
409            None => Err(TestError::result_not_available(
410                "No callback sender configured on this handle",
411            )),
412        }
413    }
414
415    /// Sends a heartbeat for a callback operation.
416    ///
417    /// Validates that the handle is populated, the operation is a callback type,
418    /// and the callback_id is available (Submitted status) before delegating to
419    /// the callback sender.
420    ///
421    /// # Returns
422    ///
423    /// - `Ok(())` - If the heartbeat was sent successfully
424    /// - `Err(TestError::OperationNotFound)` - If the handle is not yet populated
425    /// - `Err(TestError::NotCallbackOperation)` - If the operation is not a callback
426    /// - `Err(TestError::ResultNotAvailable)` - If the callback_id is not yet available
427    pub async fn send_callback_heartbeat(&self) -> Result<(), TestError> {
428        let callback_id = self.validate_callback_ready().await?;
429
430        let sender_guard = self.callback_sender.read().await;
431        match sender_guard.as_ref() {
432            Some(sender) => sender.send_heartbeat(&callback_id).await,
433            None => Err(TestError::result_not_available(
434                "No callback sender configured on this handle",
435            )),
436        }
437    }
438
439    /// Validates that the handle is populated with a callback operation that has
440    /// a callback_id available (Submitted status). Returns the callback_id on success.
441    async fn validate_callback_ready(&self) -> Result<String, TestError> {
442        let inner = self.inner.read().await;
443        let op = inner
444            .as_ref()
445            .ok_or_else(|| TestError::OperationNotFound("Operation not yet populated".into()))?;
446
447        if op.operation_type != OperationType::Callback {
448            return Err(TestError::NotCallbackOperation);
449        }
450
451        op.callback_details
452            .as_ref()
453            .and_then(|d| d.callback_id.clone())
454            .ok_or_else(|| {
455                TestError::result_not_available(
456                    "Callback ID not available — operation has not reached Submitted status",
457                )
458            })
459    }
460
461    // =========================================================================
462    // Child Operation Methods (Requirements 3.1, 3.2, 3.3, 3.4)
463    // =========================================================================
464
465    /// Returns all child operations nested under this operation.
466    ///
467    /// Child operations are those whose `parent_id` matches this operation's `id`.
468    /// The returned operations preserve execution order (same order as in the
469    /// shared operations list).
470    ///
471    /// # Returns
472    ///
473    /// - `Ok(Vec<DurableOperation>)` - Child operations if the handle is populated
474    /// - `Err(TestError::OperationNotFound)` - If the handle is not yet populated
475    pub async fn get_child_operations(&self) -> Result<Vec<DurableOperation>, TestError> {
476        let inner = self.inner.read().await;
477        let op = inner
478            .as_ref()
479            .ok_or_else(|| TestError::OperationNotFound("Operation not yet populated".into()))?;
480
481        let my_id = &op.operation_id;
482        let all_ops = self.all_operations.read().await;
483
484        // Snapshot into an Arc<Vec<Operation>> so each child DurableOperation
485        // can enumerate its own children via with_operations().
486        let all_ops_arc = Arc::new(all_ops.clone());
487
488        let children = all_ops_arc
489            .iter()
490            .filter(|child| child.parent_id.as_deref() == Some(my_id))
491            .map(|child| {
492                DurableOperation::new(child.clone()).with_operations(Arc::clone(&all_ops_arc))
493            })
494            .collect();
495
496        Ok(children)
497    }
498
499    /// Returns a human-readable description of the matcher for error messages.
500    fn matcher_description(&self) -> String {
501        match &self.matcher {
502            OperationMatcher::ByName(name) => format!("name={}", name),
503            OperationMatcher::ByIndex(idx) => format!("index={}", idx),
504            OperationMatcher::ById(id) => format!("id={}", id),
505            OperationMatcher::ByNameAndIndex(name, idx) => format!("name={}, index={}", name, idx),
506        }
507    }
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513    use durable_execution_sdk::{
514        CallbackDetails as SdkCallbackDetails, ChainedInvokeDetails as SdkChainedInvokeDetails,
515        ContextDetails as SdkContextDetails, OperationType, StepDetails as SdkStepDetails,
516        WaitDetails as SdkWaitDetails,
517    };
518
519    /// Helper to create a step operation for testing.
520    fn create_step_operation() -> Operation {
521        let mut op = Operation::new("step-001".to_string(), OperationType::Step);
522        op.name = Some("my-step".to_string());
523        op.status = OperationStatus::Succeeded;
524        op.step_details = Some(SdkStepDetails {
525            result: Some(r#""hello""#.to_string()),
526            attempt: Some(1),
527            next_attempt_timestamp: None,
528            error: None,
529            payload: None,
530        });
531        op
532    }
533
534    /// Helper to create a callback operation for testing.
535    fn create_callback_operation() -> Operation {
536        let mut op = Operation::new("cb-001".to_string(), OperationType::Callback);
537        op.name = Some("my-callback".to_string());
538        op.status = OperationStatus::Started;
539        op.callback_details = Some(SdkCallbackDetails {
540            callback_id: Some("cb-id-123".to_string()),
541            result: None,
542            error: None,
543        });
544        op
545    }
546
547    /// Helper to create a wait operation for testing.
548    fn create_wait_operation() -> Operation {
549        let mut op = Operation::new("wait-001".to_string(), OperationType::Wait);
550        op.name = Some("my-wait".to_string());
551        op.status = OperationStatus::Succeeded;
552        op.start_timestamp = Some(1000);
553        op.wait_details = Some(SdkWaitDetails {
554            scheduled_end_timestamp: Some(6000),
555        });
556        op
557    }
558
559    /// Helper to create an invoke operation for testing.
560    fn create_invoke_operation() -> Operation {
561        let mut op = Operation::new("invoke-001".to_string(), OperationType::Invoke);
562        op.name = Some("my-invoke".to_string());
563        op.status = OperationStatus::Succeeded;
564        op.chained_invoke_details = Some(SdkChainedInvokeDetails {
565            result: Some(r#"42"#.to_string()),
566            error: None,
567        });
568        op
569    }
570
571    /// Helper to create a context operation for testing.
572    fn create_context_operation() -> Operation {
573        let mut op = Operation::new("ctx-001".to_string(), OperationType::Context);
574        op.name = Some("my-context".to_string());
575        op.status = OperationStatus::Succeeded;
576        op.context_details = Some(SdkContextDetails {
577            result: Some(r#""done""#.to_string()),
578            replay_children: None,
579            error: None,
580        });
581        op
582    }
583
584    /// Helper to populate a handle with an operation.
585    async fn populate_handle(handle: &OperationHandle, op: Operation) {
586        let mut inner = handle.inner.write().await;
587        *inner = Some(op);
588    }
589
590    #[test]
591    fn test_operation_handle_new_by_name() {
592        let all_ops = Arc::new(RwLock::new(Vec::new()));
593        let handle = OperationHandle::new(OperationMatcher::ByName("test-op".into()), all_ops);
594
595        assert!(matches!(handle.matcher, OperationMatcher::ByName(ref n) if n == "test-op"));
596        assert!(handle.callback_sender.try_read().unwrap().is_none());
597    }
598
599    #[test]
600    fn test_operation_handle_new_by_index() {
601        let all_ops = Arc::new(RwLock::new(Vec::new()));
602        let handle = OperationHandle::new(OperationMatcher::ByIndex(3), all_ops);
603
604        assert!(matches!(handle.matcher, OperationMatcher::ByIndex(3)));
605    }
606
607    #[test]
608    fn test_operation_handle_new_by_id() {
609        let all_ops = Arc::new(RwLock::new(Vec::new()));
610        let handle = OperationHandle::new(OperationMatcher::ById("abc-123".into()), all_ops);
611
612        assert!(matches!(handle.matcher, OperationMatcher::ById(ref id) if id == "abc-123"));
613    }
614
615    #[tokio::test]
616    async fn test_operation_handle_starts_unpopulated() {
617        let all_ops = Arc::new(RwLock::new(Vec::new()));
618        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
619
620        let inner = handle.inner.read().await;
621        assert!(inner.is_none());
622        assert!(handle.status_rx.borrow().is_none());
623    }
624
625    #[test]
626    fn test_operation_handle_clone() {
627        let all_ops = Arc::new(RwLock::new(Vec::new()));
628        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
629        let cloned = handle.clone();
630
631        // Cloned handle shares the same inner Arc
632        assert!(Arc::ptr_eq(&handle.inner, &cloned.inner));
633        assert!(Arc::ptr_eq(&handle.all_operations, &cloned.all_operations));
634        assert!(matches!(cloned.matcher, OperationMatcher::ByName(ref n) if n == "test"));
635    }
636
637    #[test]
638    fn test_operation_matcher_debug() {
639        let by_name = OperationMatcher::ByName("test".into());
640        let by_index = OperationMatcher::ByIndex(0);
641        let by_id = OperationMatcher::ById("id-1".into());
642
643        // Verify Debug is implemented and produces reasonable output
644        assert!(format!("{:?}", by_name).contains("ByName"));
645        assert!(format!("{:?}", by_index).contains("ByIndex"));
646        assert!(format!("{:?}", by_id).contains("ById"));
647    }
648
649    // =========================================================================
650    // Inspection Method Tests (Requirements 1.3, 1.6)
651    // =========================================================================
652
653    #[tokio::test]
654    async fn test_is_populated_false_when_new() {
655        let all_ops = Arc::new(RwLock::new(Vec::new()));
656        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
657        assert!(!handle.is_populated().await);
658    }
659
660    #[tokio::test]
661    async fn test_is_populated_true_after_population() {
662        let all_ops = Arc::new(RwLock::new(Vec::new()));
663        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
664        populate_handle(&handle, create_step_operation()).await;
665        assert!(handle.is_populated().await);
666    }
667
668    #[tokio::test]
669    async fn test_get_id_unpopulated_returns_error() {
670        let all_ops = Arc::new(RwLock::new(Vec::new()));
671        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
672        let result = handle.get_id().await;
673        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
674    }
675
676    #[tokio::test]
677    async fn test_get_id_populated() {
678        let all_ops = Arc::new(RwLock::new(Vec::new()));
679        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
680        populate_handle(&handle, create_step_operation()).await;
681        assert_eq!(handle.get_id().await.unwrap(), "step-001");
682    }
683
684    #[tokio::test]
685    async fn test_get_name_unpopulated_returns_error() {
686        let all_ops = Arc::new(RwLock::new(Vec::new()));
687        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
688        let result = handle.get_name().await;
689        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
690    }
691
692    #[tokio::test]
693    async fn test_get_name_populated() {
694        let all_ops = Arc::new(RwLock::new(Vec::new()));
695        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
696        populate_handle(&handle, create_step_operation()).await;
697        assert_eq!(
698            handle.get_name().await.unwrap(),
699            Some("my-step".to_string())
700        );
701    }
702
703    #[tokio::test]
704    async fn test_get_type_unpopulated_returns_error() {
705        let all_ops = Arc::new(RwLock::new(Vec::new()));
706        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
707        let result = handle.get_type().await;
708        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
709    }
710
711    #[tokio::test]
712    async fn test_get_type_populated() {
713        let all_ops = Arc::new(RwLock::new(Vec::new()));
714        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
715        populate_handle(&handle, create_step_operation()).await;
716        assert_eq!(handle.get_type().await.unwrap(), OperationType::Step);
717    }
718
719    #[tokio::test]
720    async fn test_get_status_unpopulated_returns_error() {
721        let all_ops = Arc::new(RwLock::new(Vec::new()));
722        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
723        let result = handle.get_status().await;
724        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
725    }
726
727    #[tokio::test]
728    async fn test_get_status_populated() {
729        let all_ops = Arc::new(RwLock::new(Vec::new()));
730        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
731        populate_handle(&handle, create_step_operation()).await;
732        assert_eq!(
733            handle.get_status().await.unwrap(),
734            OperationStatus::Succeeded
735        );
736    }
737
738    #[tokio::test]
739    async fn test_get_step_details_unpopulated_returns_error() {
740        let all_ops = Arc::new(RwLock::new(Vec::new()));
741        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
742        let result: Result<StepDetails<String>, _> = handle.get_step_details().await;
743        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
744    }
745
746    #[tokio::test]
747    async fn test_get_step_details_populated() {
748        let all_ops = Arc::new(RwLock::new(Vec::new()));
749        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
750        populate_handle(&handle, create_step_operation()).await;
751        let details: StepDetails<String> = handle.get_step_details().await.unwrap();
752        assert_eq!(details.result, Some("hello".to_string()));
753        assert_eq!(details.attempt, Some(1));
754    }
755
756    #[tokio::test]
757    async fn test_get_callback_details_unpopulated_returns_error() {
758        let all_ops = Arc::new(RwLock::new(Vec::new()));
759        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
760        let result: Result<CallbackDetails<String>, _> = handle.get_callback_details().await;
761        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
762    }
763
764    #[tokio::test]
765    async fn test_get_callback_details_populated() {
766        let all_ops = Arc::new(RwLock::new(Vec::new()));
767        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
768        populate_handle(&handle, create_callback_operation()).await;
769        let details: CallbackDetails<String> = handle.get_callback_details().await.unwrap();
770        assert_eq!(details.callback_id, Some("cb-id-123".to_string()));
771    }
772
773    #[tokio::test]
774    async fn test_get_wait_details_unpopulated_returns_error() {
775        let all_ops = Arc::new(RwLock::new(Vec::new()));
776        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
777        let result = handle.get_wait_details().await;
778        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
779    }
780
781    #[tokio::test]
782    async fn test_get_wait_details_populated() {
783        let all_ops = Arc::new(RwLock::new(Vec::new()));
784        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
785        populate_handle(&handle, create_wait_operation()).await;
786        let details = handle.get_wait_details().await.unwrap();
787        assert_eq!(details.wait_seconds, Some(5));
788    }
789
790    #[tokio::test]
791    async fn test_get_invoke_details_unpopulated_returns_error() {
792        let all_ops = Arc::new(RwLock::new(Vec::new()));
793        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
794        let result: Result<InvokeDetails<serde_json::Value>, _> = handle.get_invoke_details().await;
795        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
796    }
797
798    #[tokio::test]
799    async fn test_get_invoke_details_populated() {
800        let all_ops = Arc::new(RwLock::new(Vec::new()));
801        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
802        populate_handle(&handle, create_invoke_operation()).await;
803        let details: InvokeDetails<i32> = handle.get_invoke_details().await.unwrap();
804        assert_eq!(details.result, Some(42));
805    }
806
807    #[tokio::test]
808    async fn test_get_context_details_unpopulated_returns_error() {
809        let all_ops = Arc::new(RwLock::new(Vec::new()));
810        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
811        let result: Result<ContextDetails<String>, _> = handle.get_context_details().await;
812        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
813    }
814
815    #[tokio::test]
816    async fn test_get_context_details_populated() {
817        let all_ops = Arc::new(RwLock::new(Vec::new()));
818        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
819        populate_handle(&handle, create_context_operation()).await;
820        let details: ContextDetails<String> = handle.get_context_details().await.unwrap();
821        assert_eq!(details.result, Some("done".to_string()));
822    }
823
824    #[tokio::test]
825    async fn test_get_step_details_wrong_type_returns_type_mismatch() {
826        let all_ops = Arc::new(RwLock::new(Vec::new()));
827        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
828        populate_handle(&handle, create_wait_operation()).await;
829        let result: Result<StepDetails<String>, _> = handle.get_step_details().await;
830        assert!(matches!(
831            result,
832            Err(TestError::OperationTypeMismatch { .. })
833        ));
834    }
835
836    // =========================================================================
837    // wait_for_data Tests (Requirements 1.4, 4.1, 4.2, 4.3, 4.5)
838    // =========================================================================
839
840    #[tokio::test]
841    async fn test_wait_for_data_started_resolves_immediately_when_populated() {
842        let all_ops = Arc::new(RwLock::new(Vec::new()));
843        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
844        populate_handle(&handle, create_step_operation()).await;
845        // Any populated operation satisfies Started
846        let result = handle.wait_for_data(WaitingOperationStatus::Started).await;
847        assert!(result.is_ok());
848    }
849
850    #[tokio::test]
851    async fn test_wait_for_data_completed_resolves_immediately_when_terminal() {
852        let all_ops = Arc::new(RwLock::new(Vec::new()));
853        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
854        // create_step_operation has status Succeeded (terminal)
855        populate_handle(&handle, create_step_operation()).await;
856        let result = handle
857            .wait_for_data(WaitingOperationStatus::Completed)
858            .await;
859        assert!(result.is_ok());
860    }
861
862    #[tokio::test]
863    async fn test_wait_for_data_submitted_resolves_for_callback_with_id() {
864        let all_ops = Arc::new(RwLock::new(Vec::new()));
865        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
866        // create_callback_operation has callback_id set
867        populate_handle(&handle, create_callback_operation()).await;
868        let result = handle
869            .wait_for_data(WaitingOperationStatus::Submitted)
870            .await;
871        assert!(result.is_ok());
872    }
873
874    #[tokio::test]
875    async fn test_wait_for_data_submitted_resolves_for_non_callback() {
876        let all_ops = Arc::new(RwLock::new(Vec::new()));
877        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
878        // Non-callback operations treat Submitted as Started
879        populate_handle(&handle, create_step_operation()).await;
880        let result = handle
881            .wait_for_data(WaitingOperationStatus::Submitted)
882            .await;
883        assert!(result.is_ok());
884    }
885
886    #[tokio::test]
887    async fn test_wait_for_data_unpopulated_not_started_returns_error_on_channel_close() {
888        // Test the ExecutionCompletedEarly path by verifying that when the
889        // operation never reaches the target status and the channel closes,
890        // we get the appropriate error.
891        //
892        // In real usage, the OperationHandle holds a status_tx sender, which
893        // means the channel can't close while the handle exists. The channel-
894        // close path is a safety net for edge cases. We test the underlying
895        // logic via check_status_reached instead.
896        let all_ops = Arc::new(RwLock::new(Vec::new()));
897        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
898
899        // Unpopulated handle should not satisfy any status
900        assert!(
901            !handle
902                .check_status_reached(WaitingOperationStatus::Started)
903                .await
904        );
905        assert!(
906            !handle
907                .check_status_reached(WaitingOperationStatus::Submitted)
908                .await
909        );
910        assert!(
911            !handle
912                .check_status_reached(WaitingOperationStatus::Completed)
913                .await
914        );
915    }
916
917    #[tokio::test]
918    async fn test_wait_for_data_non_terminal_does_not_satisfy_completed() {
919        let all_ops = Arc::new(RwLock::new(Vec::new()));
920        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
921
922        // Populate with a non-terminal status
923        let mut op = Operation::new("step-001".to_string(), OperationType::Step);
924        op.status = OperationStatus::Started;
925        populate_handle(&handle, op).await;
926
927        // Started satisfies Started but not Completed
928        assert!(
929            handle
930                .check_status_reached(WaitingOperationStatus::Started)
931                .await
932        );
933        assert!(
934            !handle
935                .check_status_reached(WaitingOperationStatus::Completed)
936                .await
937        );
938    }
939
940    #[tokio::test]
941    async fn test_wait_for_data_callback_without_id_does_not_satisfy_submitted() {
942        let all_ops = Arc::new(RwLock::new(Vec::new()));
943        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
944
945        // Populate with a callback that has no callback_id
946        let mut op = Operation::new("cb-001".to_string(), OperationType::Callback);
947        op.status = OperationStatus::Started;
948        op.callback_details = Some(SdkCallbackDetails {
949            callback_id: None,
950            result: None,
951            error: None,
952        });
953        populate_handle(&handle, op).await;
954
955        // Started is satisfied, but Submitted is not (no callback_id)
956        assert!(
957            handle
958                .check_status_reached(WaitingOperationStatus::Started)
959                .await
960        );
961        assert!(
962            !handle
963                .check_status_reached(WaitingOperationStatus::Submitted)
964                .await
965        );
966    }
967
968    #[tokio::test]
969    async fn test_status_matches_target_all_terminal_statuses_satisfy_completed() {
970        let terminal_statuses = vec![
971            OperationStatus::Succeeded,
972            OperationStatus::Failed,
973            OperationStatus::Cancelled,
974            OperationStatus::TimedOut,
975            OperationStatus::Stopped,
976        ];
977
978        for status in terminal_statuses {
979            let mut op = Operation::new("op-001".to_string(), OperationType::Step);
980            op.status = status;
981            assert!(
982                OperationHandle::status_matches_target(&op, WaitingOperationStatus::Completed),
983                "Expected {:?} to satisfy Completed",
984                status
985            );
986        }
987    }
988
989    #[tokio::test]
990    async fn test_status_matches_target_non_terminal_does_not_satisfy_completed() {
991        let non_terminal_statuses = vec![
992            OperationStatus::Started,
993            OperationStatus::Pending,
994            OperationStatus::Ready,
995        ];
996
997        for status in non_terminal_statuses {
998            let mut op = Operation::new("op-001".to_string(), OperationType::Step);
999            op.status = status;
1000            assert!(
1001                !OperationHandle::status_matches_target(&op, WaitingOperationStatus::Completed),
1002                "Expected {:?} to NOT satisfy Completed",
1003                status
1004            );
1005        }
1006    }
1007
1008    #[tokio::test]
1009    async fn test_wait_for_data_resolves_when_status_update_arrives() {
1010        let all_ops = Arc::new(RwLock::new(Vec::new()));
1011        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1012
1013        let handle_clone = handle.clone();
1014        // Spawn a task that populates the handle and sends a status update after a delay
1015        tokio::spawn(async move {
1016            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1017            // Populate with a completed operation
1018            let mut op = create_step_operation();
1019            op.status = OperationStatus::Succeeded;
1020            {
1021                let mut inner = handle_clone.inner.write().await;
1022                *inner = Some(op);
1023            }
1024            let _ = handle_clone
1025                .status_tx
1026                .send(Some(OperationStatus::Succeeded));
1027        });
1028
1029        let result = handle
1030            .wait_for_data(WaitingOperationStatus::Completed)
1031            .await;
1032        assert!(result.is_ok());
1033    }
1034
1035    #[tokio::test]
1036    async fn test_wait_for_data_waits_through_non_terminal_updates() {
1037        let all_ops = Arc::new(RwLock::new(Vec::new()));
1038        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1039
1040        let handle_clone = handle.clone();
1041        tokio::spawn(async move {
1042            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1043            // First: populate with Started status (non-terminal)
1044            let mut op = create_step_operation();
1045            op.status = OperationStatus::Started;
1046            {
1047                let mut inner = handle_clone.inner.write().await;
1048                *inner = Some(op);
1049            }
1050            let _ = handle_clone.status_tx.send(Some(OperationStatus::Started));
1051
1052            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1053            // Then: update to Succeeded (terminal)
1054            {
1055                let mut inner = handle_clone.inner.write().await;
1056                if let Some(ref mut op) = *inner {
1057                    op.status = OperationStatus::Succeeded;
1058                }
1059            }
1060            let _ = handle_clone
1061                .status_tx
1062                .send(Some(OperationStatus::Succeeded));
1063        });
1064
1065        let result = handle
1066            .wait_for_data(WaitingOperationStatus::Completed)
1067            .await;
1068        assert!(result.is_ok());
1069    }
1070
1071    #[tokio::test]
1072    async fn test_wait_for_data_submitted_waits_for_callback_id() {
1073        let all_ops = Arc::new(RwLock::new(Vec::new()));
1074        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1075
1076        let handle_clone = handle.clone();
1077        tokio::spawn(async move {
1078            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1079            // First: populate with callback but no callback_id
1080            let mut op = Operation::new("cb-001".to_string(), OperationType::Callback);
1081            op.name = Some("my-callback".to_string());
1082            op.status = OperationStatus::Started;
1083            op.callback_details = Some(SdkCallbackDetails {
1084                callback_id: None,
1085                result: None,
1086                error: None,
1087            });
1088            {
1089                let mut inner = handle_clone.inner.write().await;
1090                *inner = Some(op);
1091            }
1092            let _ = handle_clone.status_tx.send(Some(OperationStatus::Started));
1093
1094            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1095            // Then: update with callback_id
1096            {
1097                let mut inner = handle_clone.inner.write().await;
1098                if let Some(ref mut op) = *inner {
1099                    if let Some(ref mut details) = op.callback_details {
1100                        details.callback_id = Some("cb-id-123".to_string());
1101                    }
1102                }
1103            }
1104            let _ = handle_clone.status_tx.send(Some(OperationStatus::Started));
1105        });
1106
1107        let result = handle
1108            .wait_for_data(WaitingOperationStatus::Submitted)
1109            .await;
1110        assert!(result.is_ok());
1111    }
1112
1113    #[tokio::test]
1114    async fn test_matcher_description() {
1115        let all_ops = Arc::new(RwLock::new(Vec::new()));
1116
1117        let handle =
1118            OperationHandle::new(OperationMatcher::ByName("my-op".into()), all_ops.clone());
1119        assert_eq!(handle.matcher_description(), "name=my-op");
1120
1121        let handle = OperationHandle::new(OperationMatcher::ByIndex(3), all_ops.clone());
1122        assert_eq!(handle.matcher_description(), "index=3");
1123
1124        let handle = OperationHandle::new(OperationMatcher::ById("abc-123".into()), all_ops);
1125        assert_eq!(handle.matcher_description(), "id=abc-123");
1126    }
1127
1128    // =========================================================================
1129    // Callback Method Tests (Requirements 1.5, 5.4)
1130    // =========================================================================
1131
1132    /// Mock callback sender for testing.
1133    #[derive(Clone)]
1134    struct MockCallbackSender {
1135        success_calls: Arc<RwLock<Vec<(String, String)>>>,
1136        failure_calls: Arc<RwLock<Vec<(String, String)>>>,
1137        heartbeat_calls: Arc<RwLock<Vec<String>>>,
1138    }
1139
1140    impl MockCallbackSender {
1141        fn new() -> Self {
1142            Self {
1143                success_calls: Arc::new(RwLock::new(Vec::new())),
1144                failure_calls: Arc::new(RwLock::new(Vec::new())),
1145                heartbeat_calls: Arc::new(RwLock::new(Vec::new())),
1146            }
1147        }
1148    }
1149
1150    #[async_trait::async_trait]
1151    impl CallbackSender for MockCallbackSender {
1152        async fn send_success(&self, callback_id: &str, result: &str) -> Result<(), TestError> {
1153            self.success_calls
1154                .write()
1155                .await
1156                .push((callback_id.to_string(), result.to_string()));
1157            Ok(())
1158        }
1159
1160        async fn send_failure(
1161            &self,
1162            callback_id: &str,
1163            error: &crate::types::TestResultError,
1164        ) -> Result<(), TestError> {
1165            self.failure_calls
1166                .write()
1167                .await
1168                .push((callback_id.to_string(), error.to_string()));
1169            Ok(())
1170        }
1171
1172        async fn send_heartbeat(&self, callback_id: &str) -> Result<(), TestError> {
1173            self.heartbeat_calls
1174                .write()
1175                .await
1176                .push(callback_id.to_string());
1177            Ok(())
1178        }
1179    }
1180
1181    /// Helper to create a handle with a mock callback sender and a populated callback operation.
1182    async fn create_callback_handle_with_sender() -> (OperationHandle, MockCallbackSender) {
1183        let all_ops = Arc::new(RwLock::new(Vec::new()));
1184        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1185        let sender = MockCallbackSender::new();
1186        {
1187            let mut guard = handle.callback_sender.write().await;
1188            *guard = Some(Arc::new(sender.clone()));
1189        }
1190        populate_handle(&handle, create_callback_operation()).await;
1191        (handle, sender)
1192    }
1193
1194    #[tokio::test]
1195    async fn test_send_callback_success_unpopulated_returns_error() {
1196        let all_ops = Arc::new(RwLock::new(Vec::new()));
1197        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1198        let result = handle.send_callback_success("ok").await;
1199        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
1200    }
1201
1202    #[tokio::test]
1203    async fn test_send_callback_success_non_callback_returns_error() {
1204        let all_ops = Arc::new(RwLock::new(Vec::new()));
1205        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1206        populate_handle(&handle, create_step_operation()).await;
1207        let result = handle.send_callback_success("ok").await;
1208        assert!(matches!(result, Err(TestError::NotCallbackOperation)));
1209    }
1210
1211    #[tokio::test]
1212    async fn test_send_callback_success_no_callback_id_returns_error() {
1213        let all_ops = Arc::new(RwLock::new(Vec::new()));
1214        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1215        // Callback without callback_id (not yet Submitted)
1216        let mut op = Operation::new("cb-001".to_string(), OperationType::Callback);
1217        op.callback_details = Some(SdkCallbackDetails {
1218            callback_id: None,
1219            result: None,
1220            error: None,
1221        });
1222        populate_handle(&handle, op).await;
1223        let result = handle.send_callback_success("ok").await;
1224        assert!(matches!(result, Err(TestError::ResultNotAvailable(_))));
1225    }
1226
1227    #[tokio::test]
1228    async fn test_send_callback_success_no_sender_returns_error() {
1229        let all_ops = Arc::new(RwLock::new(Vec::new()));
1230        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1231        populate_handle(&handle, create_callback_operation()).await;
1232        // No callback_sender configured
1233        let result = handle.send_callback_success("ok").await;
1234        assert!(matches!(result, Err(TestError::ResultNotAvailable(_))));
1235    }
1236
1237    #[tokio::test]
1238    async fn test_send_callback_success_delegates_to_sender() {
1239        let (handle, sender) = create_callback_handle_with_sender().await;
1240        let result = handle.send_callback_success("my-result").await;
1241        assert!(result.is_ok());
1242        let calls = sender.success_calls.read().await;
1243        assert_eq!(calls.len(), 1);
1244        assert_eq!(calls[0], ("cb-id-123".to_string(), "my-result".to_string()));
1245    }
1246
1247    #[tokio::test]
1248    async fn test_send_callback_failure_unpopulated_returns_error() {
1249        let all_ops = Arc::new(RwLock::new(Vec::new()));
1250        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1251        let error = crate::types::TestResultError::new("TestError", "something failed");
1252        let result = handle.send_callback_failure(&error).await;
1253        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
1254    }
1255
1256    #[tokio::test]
1257    async fn test_send_callback_failure_non_callback_returns_error() {
1258        let all_ops = Arc::new(RwLock::new(Vec::new()));
1259        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1260        populate_handle(&handle, create_step_operation()).await;
1261        let error = crate::types::TestResultError::new("TestError", "something failed");
1262        let result = handle.send_callback_failure(&error).await;
1263        assert!(matches!(result, Err(TestError::NotCallbackOperation)));
1264    }
1265
1266    #[tokio::test]
1267    async fn test_send_callback_failure_delegates_to_sender() {
1268        let (handle, sender) = create_callback_handle_with_sender().await;
1269        let error = crate::types::TestResultError::new("TestError", "something failed");
1270        let result = handle.send_callback_failure(&error).await;
1271        assert!(result.is_ok());
1272        let calls = sender.failure_calls.read().await;
1273        assert_eq!(calls.len(), 1);
1274        assert_eq!(calls[0].0, "cb-id-123");
1275    }
1276
1277    #[tokio::test]
1278    async fn test_send_callback_heartbeat_unpopulated_returns_error() {
1279        let all_ops = Arc::new(RwLock::new(Vec::new()));
1280        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1281        let result = handle.send_callback_heartbeat().await;
1282        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
1283    }
1284
1285    #[tokio::test]
1286    async fn test_send_callback_heartbeat_non_callback_returns_error() {
1287        let all_ops = Arc::new(RwLock::new(Vec::new()));
1288        let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1289        populate_handle(&handle, create_step_operation()).await;
1290        let result = handle.send_callback_heartbeat().await;
1291        assert!(matches!(result, Err(TestError::NotCallbackOperation)));
1292    }
1293
1294    #[tokio::test]
1295    async fn test_send_callback_heartbeat_delegates_to_sender() {
1296        let (handle, sender) = create_callback_handle_with_sender().await;
1297        let result = handle.send_callback_heartbeat().await;
1298        assert!(result.is_ok());
1299        let calls = sender.heartbeat_calls.read().await;
1300        assert_eq!(calls.len(), 1);
1301        assert_eq!(calls[0], "cb-id-123");
1302    }
1303
1304    // =========================================================================
1305    // get_child_operations() tests (Requirements 3.1, 3.2, 3.3, 3.4)
1306    // =========================================================================
1307
1308    /// Helper to create an operation with a parent_id for child enumeration tests.
1309    fn create_operation_with_parent(id: &str, name: &str, parent_id: Option<&str>) -> Operation {
1310        let mut op = Operation::new(id.to_string(), OperationType::Step);
1311        op.name = Some(name.to_string());
1312        op.status = OperationStatus::Succeeded;
1313        op.parent_id = parent_id.map(|s| s.to_string());
1314        op
1315    }
1316
1317    #[tokio::test]
1318    async fn test_get_child_operations_unpopulated_returns_error() {
1319        let all_ops = Arc::new(RwLock::new(Vec::new()));
1320        let handle = OperationHandle::new(OperationMatcher::ByName("parent".into()), all_ops);
1321
1322        let result = handle.get_child_operations().await;
1323        assert!(matches!(result, Err(TestError::OperationNotFound(_))));
1324    }
1325
1326    #[tokio::test]
1327    async fn test_get_child_operations_returns_matching_children() {
1328        let parent_op = create_operation_with_parent("parent-1", "parent", None);
1329        let child1 = create_operation_with_parent("child-1", "child-a", Some("parent-1"));
1330        let child2 = create_operation_with_parent("child-2", "child-b", Some("parent-1"));
1331        let unrelated = create_operation_with_parent("other-1", "other", Some("other-parent"));
1332
1333        let all_ops = Arc::new(RwLock::new(vec![
1334            parent_op.clone(),
1335            child1,
1336            child2,
1337            unrelated,
1338        ]));
1339
1340        let handle = OperationHandle::new(OperationMatcher::ByName("parent".into()), all_ops);
1341        populate_handle(&handle, parent_op).await;
1342
1343        let children = handle.get_child_operations().await.unwrap();
1344        assert_eq!(children.len(), 2);
1345        assert_eq!(children[0].get_id(), "child-1");
1346        assert_eq!(children[1].get_id(), "child-2");
1347    }
1348
1349    #[tokio::test]
1350    async fn test_get_child_operations_empty_when_no_children() {
1351        let parent_op = create_operation_with_parent("parent-1", "parent", None);
1352        let unrelated = create_operation_with_parent("other-1", "other", Some("other-parent"));
1353
1354        let all_ops = Arc::new(RwLock::new(vec![parent_op.clone(), unrelated]));
1355
1356        let handle = OperationHandle::new(OperationMatcher::ByName("parent".into()), all_ops);
1357        populate_handle(&handle, parent_op).await;
1358
1359        let children = handle.get_child_operations().await.unwrap();
1360        assert!(children.is_empty());
1361    }
1362
1363    #[tokio::test]
1364    async fn test_get_child_operations_preserves_execution_order() {
1365        let parent_op = create_operation_with_parent("parent-1", "parent", None);
1366        let child_c = create_operation_with_parent("child-c", "third", Some("parent-1"));
1367        let child_a = create_operation_with_parent("child-a", "first", Some("parent-1"));
1368        let child_b = create_operation_with_parent("child-b", "second", Some("parent-1"));
1369
1370        // Insertion order: c, a, b — children should come back in that order
1371        let all_ops = Arc::new(RwLock::new(vec![
1372            parent_op.clone(),
1373            child_c,
1374            child_a,
1375            child_b,
1376        ]));
1377
1378        let handle = OperationHandle::new(OperationMatcher::ByName("parent".into()), all_ops);
1379        populate_handle(&handle, parent_op).await;
1380
1381        let children = handle.get_child_operations().await.unwrap();
1382        assert_eq!(children.len(), 3);
1383        assert_eq!(children[0].get_id(), "child-c");
1384        assert_eq!(children[1].get_id(), "child-a");
1385        assert_eq!(children[2].get_id(), "child-b");
1386    }
1387
1388    #[tokio::test]
1389    async fn test_get_child_operations_children_support_recursive_enumeration() {
1390        let parent_op = create_operation_with_parent("parent-1", "parent", None);
1391        let child = create_operation_with_parent("child-1", "child", Some("parent-1"));
1392        let grandchild =
1393            create_operation_with_parent("grandchild-1", "grandchild", Some("child-1"));
1394
1395        let all_ops = Arc::new(RwLock::new(vec![parent_op.clone(), child, grandchild]));
1396
1397        let handle = OperationHandle::new(OperationMatcher::ByName("parent".into()), all_ops);
1398        populate_handle(&handle, parent_op).await;
1399
1400        let children = handle.get_child_operations().await.unwrap();
1401        assert_eq!(children.len(), 1);
1402        assert_eq!(children[0].get_id(), "child-1");
1403
1404        // Recursive: child should be able to enumerate its own children
1405        let grandchildren = children[0].get_child_operations();
1406        assert_eq!(grandchildren.len(), 1);
1407        assert_eq!(grandchildren[0].get_id(), "grandchild-1");
1408    }
1409}