1use std::sync::Arc;
10use tokio::sync::{watch, RwLock};
11
12use serde::de::DeserializeOwned;
13
14use durable_execution_sdk::{Operation, OperationStatus, OperationType};
15
16use crate::error::TestError;
17use crate::operation::{
18 CallbackDetails, CallbackSender, ContextDetails, DurableOperation, InvokeDetails, StepDetails,
19 WaitDetails,
20};
21use crate::types::WaitingOperationStatus;
22
23#[derive(Clone, Debug)]
25pub enum OperationMatcher {
26 ByName(String),
28 ByIndex(usize),
30 ById(String),
32 ByNameAndIndex(String, usize),
34}
35
36pub struct OperationHandle {
64 pub(crate) matcher: OperationMatcher,
66 pub(crate) inner: Arc<RwLock<Option<Operation>>>,
68 pub(crate) status_tx: watch::Sender<Option<OperationStatus>>,
70 pub(crate) status_rx: watch::Receiver<Option<OperationStatus>>,
72 pub(crate) callback_sender: Arc<RwLock<Option<Arc<dyn CallbackSender>>>>,
77 pub(crate) all_operations: Arc<RwLock<Vec<Operation>>>,
79}
80
81impl Clone for OperationHandle {
82 fn clone(&self) -> Self {
83 Self {
84 matcher: self.matcher.clone(),
85 inner: Arc::clone(&self.inner),
86 status_tx: self.status_tx.clone(),
87 status_rx: self.status_rx.clone(),
88 callback_sender: Arc::clone(&self.callback_sender),
89 all_operations: Arc::clone(&self.all_operations),
90 }
91 }
92}
93
94impl OperationHandle {
95 pub fn new(matcher: OperationMatcher, all_operations: Arc<RwLock<Vec<Operation>>>) -> Self {
105 let (status_tx, status_rx) = watch::channel(None);
106 Self {
107 matcher,
108 inner: Arc::new(RwLock::new(None)),
109 status_tx,
110 status_rx,
111 callback_sender: Arc::new(RwLock::new(None)),
112 all_operations,
113 }
114 }
115
116 async fn get_durable_operation(&self) -> Result<DurableOperation, TestError> {
122 let inner = self.inner.read().await;
123 match inner.as_ref() {
124 Some(op) => Ok(DurableOperation::new(op.clone())),
125 None => Err(TestError::OperationNotFound(
126 "Operation not yet populated".into(),
127 )),
128 }
129 }
130
131 pub async fn get_id(&self) -> Result<String, TestError> {
138 let op = self.get_durable_operation().await?;
139 Ok(op.get_id().to_string())
140 }
141
142 pub async fn get_name(&self) -> Result<Option<String>, TestError> {
149 let op = self.get_durable_operation().await?;
150 Ok(op.get_name().map(|s| s.to_string()))
151 }
152
153 pub async fn get_type(&self) -> Result<OperationType, TestError> {
160 let op = self.get_durable_operation().await?;
161 Ok(op.get_type())
162 }
163
164 pub async fn get_status(&self) -> Result<OperationStatus, TestError> {
171 let op = self.get_durable_operation().await?;
172 Ok(op.get_status())
173 }
174
175 pub async fn get_step_details<T: DeserializeOwned>(&self) -> Result<StepDetails<T>, TestError> {
186 let op = self.get_durable_operation().await?;
187 op.get_step_details()
188 }
189
190 pub async fn get_callback_details<T: DeserializeOwned>(
201 &self,
202 ) -> Result<CallbackDetails<T>, TestError> {
203 let op = self.get_durable_operation().await?;
204 op.get_callback_details()
205 }
206
207 pub async fn get_wait_details(&self) -> Result<WaitDetails, TestError> {
214 let op = self.get_durable_operation().await?;
215 op.get_wait_details()
216 }
217
218 pub async fn get_invoke_details<T: DeserializeOwned>(
229 &self,
230 ) -> Result<InvokeDetails<T>, TestError> {
231 let op = self.get_durable_operation().await?;
232 op.get_invoke_details()
233 }
234
235 pub async fn get_context_details<T: DeserializeOwned>(
246 &self,
247 ) -> Result<ContextDetails<T>, TestError> {
248 let op = self.get_durable_operation().await?;
249 op.get_context_details()
250 }
251
252 pub async fn is_populated(&self) -> bool {
258 self.inner.read().await.is_some()
259 }
260
261 pub async fn wait_for_data(&self, status: WaitingOperationStatus) -> Result<(), TestError> {
290 if self.check_status_reached(status).await {
292 return Ok(());
293 }
294
295 let mut rx = self.status_rx.clone();
297 loop {
298 if rx.changed().await.is_err() {
300 if self.check_status_reached(status).await {
302 return Ok(());
303 }
304 return Err(TestError::execution_completed_early(
305 self.matcher_description(),
306 status,
307 ));
308 }
309
310 if self.check_status_reached(status).await {
312 return Ok(());
313 }
314 }
315 }
316
317 async fn check_status_reached(&self, target: WaitingOperationStatus) -> bool {
319 let inner = self.inner.read().await;
320 match inner.as_ref() {
321 None => false,
322 Some(op) => Self::status_matches_target(op, target),
323 }
324 }
325
326 fn status_matches_target(op: &Operation, target: WaitingOperationStatus) -> bool {
328 match target {
329 WaitingOperationStatus::Started => {
330 true
332 }
333 WaitingOperationStatus::Submitted => {
334 if op.operation_type == OperationType::Callback {
336 op.callback_details
337 .as_ref()
338 .map(|d| d.callback_id.is_some())
339 .unwrap_or(false)
340 } else {
341 true
343 }
344 }
345 WaitingOperationStatus::Completed => {
346 op.status.is_terminal()
348 }
349 }
350 }
351
352 pub async fn send_callback_success(&self, result: &str) -> Result<(), TestError> {
373 let callback_id = self.validate_callback_ready().await?;
374
375 let sender_guard = self.callback_sender.read().await;
376 match sender_guard.as_ref() {
377 Some(sender) => sender.send_success(&callback_id, result).await,
378 None => Err(TestError::result_not_available(
379 "No callback sender configured on this handle",
380 )),
381 }
382 }
383
384 pub async fn send_callback_failure(
401 &self,
402 error: &crate::types::TestResultError,
403 ) -> Result<(), TestError> {
404 let callback_id = self.validate_callback_ready().await?;
405
406 let sender_guard = self.callback_sender.read().await;
407 match sender_guard.as_ref() {
408 Some(sender) => sender.send_failure(&callback_id, error).await,
409 None => Err(TestError::result_not_available(
410 "No callback sender configured on this handle",
411 )),
412 }
413 }
414
415 pub async fn send_callback_heartbeat(&self) -> Result<(), TestError> {
428 let callback_id = self.validate_callback_ready().await?;
429
430 let sender_guard = self.callback_sender.read().await;
431 match sender_guard.as_ref() {
432 Some(sender) => sender.send_heartbeat(&callback_id).await,
433 None => Err(TestError::result_not_available(
434 "No callback sender configured on this handle",
435 )),
436 }
437 }
438
439 async fn validate_callback_ready(&self) -> Result<String, TestError> {
442 let inner = self.inner.read().await;
443 let op = inner
444 .as_ref()
445 .ok_or_else(|| TestError::OperationNotFound("Operation not yet populated".into()))?;
446
447 if op.operation_type != OperationType::Callback {
448 return Err(TestError::NotCallbackOperation);
449 }
450
451 op.callback_details
452 .as_ref()
453 .and_then(|d| d.callback_id.clone())
454 .ok_or_else(|| {
455 TestError::result_not_available(
456 "Callback ID not available — operation has not reached Submitted status",
457 )
458 })
459 }
460
461 pub async fn get_child_operations(&self) -> Result<Vec<DurableOperation>, TestError> {
476 let inner = self.inner.read().await;
477 let op = inner
478 .as_ref()
479 .ok_or_else(|| TestError::OperationNotFound("Operation not yet populated".into()))?;
480
481 let my_id = &op.operation_id;
482 let all_ops = self.all_operations.read().await;
483
484 let all_ops_arc = Arc::new(all_ops.clone());
487
488 let children = all_ops_arc
489 .iter()
490 .filter(|child| child.parent_id.as_deref() == Some(my_id))
491 .map(|child| {
492 DurableOperation::new(child.clone()).with_operations(Arc::clone(&all_ops_arc))
493 })
494 .collect();
495
496 Ok(children)
497 }
498
499 fn matcher_description(&self) -> String {
501 match &self.matcher {
502 OperationMatcher::ByName(name) => format!("name={}", name),
503 OperationMatcher::ByIndex(idx) => format!("index={}", idx),
504 OperationMatcher::ById(id) => format!("id={}", id),
505 OperationMatcher::ByNameAndIndex(name, idx) => format!("name={}, index={}", name, idx),
506 }
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513 use durable_execution_sdk::{
514 CallbackDetails as SdkCallbackDetails, ChainedInvokeDetails as SdkChainedInvokeDetails,
515 ContextDetails as SdkContextDetails, OperationType, StepDetails as SdkStepDetails,
516 WaitDetails as SdkWaitDetails,
517 };
518
519 fn create_step_operation() -> Operation {
521 let mut op = Operation::new("step-001".to_string(), OperationType::Step);
522 op.name = Some("my-step".to_string());
523 op.status = OperationStatus::Succeeded;
524 op.step_details = Some(SdkStepDetails {
525 result: Some(r#""hello""#.to_string()),
526 attempt: Some(1),
527 next_attempt_timestamp: None,
528 error: None,
529 payload: None,
530 });
531 op
532 }
533
534 fn create_callback_operation() -> Operation {
536 let mut op = Operation::new("cb-001".to_string(), OperationType::Callback);
537 op.name = Some("my-callback".to_string());
538 op.status = OperationStatus::Started;
539 op.callback_details = Some(SdkCallbackDetails {
540 callback_id: Some("cb-id-123".to_string()),
541 result: None,
542 error: None,
543 });
544 op
545 }
546
547 fn create_wait_operation() -> Operation {
549 let mut op = Operation::new("wait-001".to_string(), OperationType::Wait);
550 op.name = Some("my-wait".to_string());
551 op.status = OperationStatus::Succeeded;
552 op.start_timestamp = Some(1000);
553 op.wait_details = Some(SdkWaitDetails {
554 scheduled_end_timestamp: Some(6000),
555 });
556 op
557 }
558
559 fn create_invoke_operation() -> Operation {
561 let mut op = Operation::new("invoke-001".to_string(), OperationType::Invoke);
562 op.name = Some("my-invoke".to_string());
563 op.status = OperationStatus::Succeeded;
564 op.chained_invoke_details = Some(SdkChainedInvokeDetails {
565 result: Some(r#"42"#.to_string()),
566 error: None,
567 });
568 op
569 }
570
571 fn create_context_operation() -> Operation {
573 let mut op = Operation::new("ctx-001".to_string(), OperationType::Context);
574 op.name = Some("my-context".to_string());
575 op.status = OperationStatus::Succeeded;
576 op.context_details = Some(SdkContextDetails {
577 result: Some(r#""done""#.to_string()),
578 replay_children: None,
579 error: None,
580 });
581 op
582 }
583
584 async fn populate_handle(handle: &OperationHandle, op: Operation) {
586 let mut inner = handle.inner.write().await;
587 *inner = Some(op);
588 }
589
590 #[test]
591 fn test_operation_handle_new_by_name() {
592 let all_ops = Arc::new(RwLock::new(Vec::new()));
593 let handle = OperationHandle::new(OperationMatcher::ByName("test-op".into()), all_ops);
594
595 assert!(matches!(handle.matcher, OperationMatcher::ByName(ref n) if n == "test-op"));
596 assert!(handle.callback_sender.try_read().unwrap().is_none());
597 }
598
599 #[test]
600 fn test_operation_handle_new_by_index() {
601 let all_ops = Arc::new(RwLock::new(Vec::new()));
602 let handle = OperationHandle::new(OperationMatcher::ByIndex(3), all_ops);
603
604 assert!(matches!(handle.matcher, OperationMatcher::ByIndex(3)));
605 }
606
607 #[test]
608 fn test_operation_handle_new_by_id() {
609 let all_ops = Arc::new(RwLock::new(Vec::new()));
610 let handle = OperationHandle::new(OperationMatcher::ById("abc-123".into()), all_ops);
611
612 assert!(matches!(handle.matcher, OperationMatcher::ById(ref id) if id == "abc-123"));
613 }
614
615 #[tokio::test]
616 async fn test_operation_handle_starts_unpopulated() {
617 let all_ops = Arc::new(RwLock::new(Vec::new()));
618 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
619
620 let inner = handle.inner.read().await;
621 assert!(inner.is_none());
622 assert!(handle.status_rx.borrow().is_none());
623 }
624
625 #[test]
626 fn test_operation_handle_clone() {
627 let all_ops = Arc::new(RwLock::new(Vec::new()));
628 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
629 let cloned = handle.clone();
630
631 assert!(Arc::ptr_eq(&handle.inner, &cloned.inner));
633 assert!(Arc::ptr_eq(&handle.all_operations, &cloned.all_operations));
634 assert!(matches!(cloned.matcher, OperationMatcher::ByName(ref n) if n == "test"));
635 }
636
637 #[test]
638 fn test_operation_matcher_debug() {
639 let by_name = OperationMatcher::ByName("test".into());
640 let by_index = OperationMatcher::ByIndex(0);
641 let by_id = OperationMatcher::ById("id-1".into());
642
643 assert!(format!("{:?}", by_name).contains("ByName"));
645 assert!(format!("{:?}", by_index).contains("ByIndex"));
646 assert!(format!("{:?}", by_id).contains("ById"));
647 }
648
649 #[tokio::test]
654 async fn test_is_populated_false_when_new() {
655 let all_ops = Arc::new(RwLock::new(Vec::new()));
656 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
657 assert!(!handle.is_populated().await);
658 }
659
660 #[tokio::test]
661 async fn test_is_populated_true_after_population() {
662 let all_ops = Arc::new(RwLock::new(Vec::new()));
663 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
664 populate_handle(&handle, create_step_operation()).await;
665 assert!(handle.is_populated().await);
666 }
667
668 #[tokio::test]
669 async fn test_get_id_unpopulated_returns_error() {
670 let all_ops = Arc::new(RwLock::new(Vec::new()));
671 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
672 let result = handle.get_id().await;
673 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
674 }
675
676 #[tokio::test]
677 async fn test_get_id_populated() {
678 let all_ops = Arc::new(RwLock::new(Vec::new()));
679 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
680 populate_handle(&handle, create_step_operation()).await;
681 assert_eq!(handle.get_id().await.unwrap(), "step-001");
682 }
683
684 #[tokio::test]
685 async fn test_get_name_unpopulated_returns_error() {
686 let all_ops = Arc::new(RwLock::new(Vec::new()));
687 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
688 let result = handle.get_name().await;
689 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
690 }
691
692 #[tokio::test]
693 async fn test_get_name_populated() {
694 let all_ops = Arc::new(RwLock::new(Vec::new()));
695 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
696 populate_handle(&handle, create_step_operation()).await;
697 assert_eq!(
698 handle.get_name().await.unwrap(),
699 Some("my-step".to_string())
700 );
701 }
702
703 #[tokio::test]
704 async fn test_get_type_unpopulated_returns_error() {
705 let all_ops = Arc::new(RwLock::new(Vec::new()));
706 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
707 let result = handle.get_type().await;
708 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
709 }
710
711 #[tokio::test]
712 async fn test_get_type_populated() {
713 let all_ops = Arc::new(RwLock::new(Vec::new()));
714 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
715 populate_handle(&handle, create_step_operation()).await;
716 assert_eq!(handle.get_type().await.unwrap(), OperationType::Step);
717 }
718
719 #[tokio::test]
720 async fn test_get_status_unpopulated_returns_error() {
721 let all_ops = Arc::new(RwLock::new(Vec::new()));
722 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
723 let result = handle.get_status().await;
724 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
725 }
726
727 #[tokio::test]
728 async fn test_get_status_populated() {
729 let all_ops = Arc::new(RwLock::new(Vec::new()));
730 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
731 populate_handle(&handle, create_step_operation()).await;
732 assert_eq!(
733 handle.get_status().await.unwrap(),
734 OperationStatus::Succeeded
735 );
736 }
737
738 #[tokio::test]
739 async fn test_get_step_details_unpopulated_returns_error() {
740 let all_ops = Arc::new(RwLock::new(Vec::new()));
741 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
742 let result: Result<StepDetails<String>, _> = handle.get_step_details().await;
743 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
744 }
745
746 #[tokio::test]
747 async fn test_get_step_details_populated() {
748 let all_ops = Arc::new(RwLock::new(Vec::new()));
749 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
750 populate_handle(&handle, create_step_operation()).await;
751 let details: StepDetails<String> = handle.get_step_details().await.unwrap();
752 assert_eq!(details.result, Some("hello".to_string()));
753 assert_eq!(details.attempt, Some(1));
754 }
755
756 #[tokio::test]
757 async fn test_get_callback_details_unpopulated_returns_error() {
758 let all_ops = Arc::new(RwLock::new(Vec::new()));
759 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
760 let result: Result<CallbackDetails<String>, _> = handle.get_callback_details().await;
761 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
762 }
763
764 #[tokio::test]
765 async fn test_get_callback_details_populated() {
766 let all_ops = Arc::new(RwLock::new(Vec::new()));
767 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
768 populate_handle(&handle, create_callback_operation()).await;
769 let details: CallbackDetails<String> = handle.get_callback_details().await.unwrap();
770 assert_eq!(details.callback_id, Some("cb-id-123".to_string()));
771 }
772
773 #[tokio::test]
774 async fn test_get_wait_details_unpopulated_returns_error() {
775 let all_ops = Arc::new(RwLock::new(Vec::new()));
776 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
777 let result = handle.get_wait_details().await;
778 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
779 }
780
781 #[tokio::test]
782 async fn test_get_wait_details_populated() {
783 let all_ops = Arc::new(RwLock::new(Vec::new()));
784 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
785 populate_handle(&handle, create_wait_operation()).await;
786 let details = handle.get_wait_details().await.unwrap();
787 assert_eq!(details.wait_seconds, Some(5));
788 }
789
790 #[tokio::test]
791 async fn test_get_invoke_details_unpopulated_returns_error() {
792 let all_ops = Arc::new(RwLock::new(Vec::new()));
793 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
794 let result: Result<InvokeDetails<serde_json::Value>, _> = handle.get_invoke_details().await;
795 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
796 }
797
798 #[tokio::test]
799 async fn test_get_invoke_details_populated() {
800 let all_ops = Arc::new(RwLock::new(Vec::new()));
801 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
802 populate_handle(&handle, create_invoke_operation()).await;
803 let details: InvokeDetails<i32> = handle.get_invoke_details().await.unwrap();
804 assert_eq!(details.result, Some(42));
805 }
806
807 #[tokio::test]
808 async fn test_get_context_details_unpopulated_returns_error() {
809 let all_ops = Arc::new(RwLock::new(Vec::new()));
810 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
811 let result: Result<ContextDetails<String>, _> = handle.get_context_details().await;
812 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
813 }
814
815 #[tokio::test]
816 async fn test_get_context_details_populated() {
817 let all_ops = Arc::new(RwLock::new(Vec::new()));
818 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
819 populate_handle(&handle, create_context_operation()).await;
820 let details: ContextDetails<String> = handle.get_context_details().await.unwrap();
821 assert_eq!(details.result, Some("done".to_string()));
822 }
823
824 #[tokio::test]
825 async fn test_get_step_details_wrong_type_returns_type_mismatch() {
826 let all_ops = Arc::new(RwLock::new(Vec::new()));
827 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
828 populate_handle(&handle, create_wait_operation()).await;
829 let result: Result<StepDetails<String>, _> = handle.get_step_details().await;
830 assert!(matches!(
831 result,
832 Err(TestError::OperationTypeMismatch { .. })
833 ));
834 }
835
836 #[tokio::test]
841 async fn test_wait_for_data_started_resolves_immediately_when_populated() {
842 let all_ops = Arc::new(RwLock::new(Vec::new()));
843 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
844 populate_handle(&handle, create_step_operation()).await;
845 let result = handle.wait_for_data(WaitingOperationStatus::Started).await;
847 assert!(result.is_ok());
848 }
849
850 #[tokio::test]
851 async fn test_wait_for_data_completed_resolves_immediately_when_terminal() {
852 let all_ops = Arc::new(RwLock::new(Vec::new()));
853 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
854 populate_handle(&handle, create_step_operation()).await;
856 let result = handle
857 .wait_for_data(WaitingOperationStatus::Completed)
858 .await;
859 assert!(result.is_ok());
860 }
861
862 #[tokio::test]
863 async fn test_wait_for_data_submitted_resolves_for_callback_with_id() {
864 let all_ops = Arc::new(RwLock::new(Vec::new()));
865 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
866 populate_handle(&handle, create_callback_operation()).await;
868 let result = handle
869 .wait_for_data(WaitingOperationStatus::Submitted)
870 .await;
871 assert!(result.is_ok());
872 }
873
874 #[tokio::test]
875 async fn test_wait_for_data_submitted_resolves_for_non_callback() {
876 let all_ops = Arc::new(RwLock::new(Vec::new()));
877 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
878 populate_handle(&handle, create_step_operation()).await;
880 let result = handle
881 .wait_for_data(WaitingOperationStatus::Submitted)
882 .await;
883 assert!(result.is_ok());
884 }
885
886 #[tokio::test]
887 async fn test_wait_for_data_unpopulated_not_started_returns_error_on_channel_close() {
888 let all_ops = Arc::new(RwLock::new(Vec::new()));
897 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
898
899 assert!(
901 !handle
902 .check_status_reached(WaitingOperationStatus::Started)
903 .await
904 );
905 assert!(
906 !handle
907 .check_status_reached(WaitingOperationStatus::Submitted)
908 .await
909 );
910 assert!(
911 !handle
912 .check_status_reached(WaitingOperationStatus::Completed)
913 .await
914 );
915 }
916
917 #[tokio::test]
918 async fn test_wait_for_data_non_terminal_does_not_satisfy_completed() {
919 let all_ops = Arc::new(RwLock::new(Vec::new()));
920 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
921
922 let mut op = Operation::new("step-001".to_string(), OperationType::Step);
924 op.status = OperationStatus::Started;
925 populate_handle(&handle, op).await;
926
927 assert!(
929 handle
930 .check_status_reached(WaitingOperationStatus::Started)
931 .await
932 );
933 assert!(
934 !handle
935 .check_status_reached(WaitingOperationStatus::Completed)
936 .await
937 );
938 }
939
940 #[tokio::test]
941 async fn test_wait_for_data_callback_without_id_does_not_satisfy_submitted() {
942 let all_ops = Arc::new(RwLock::new(Vec::new()));
943 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
944
945 let mut op = Operation::new("cb-001".to_string(), OperationType::Callback);
947 op.status = OperationStatus::Started;
948 op.callback_details = Some(SdkCallbackDetails {
949 callback_id: None,
950 result: None,
951 error: None,
952 });
953 populate_handle(&handle, op).await;
954
955 assert!(
957 handle
958 .check_status_reached(WaitingOperationStatus::Started)
959 .await
960 );
961 assert!(
962 !handle
963 .check_status_reached(WaitingOperationStatus::Submitted)
964 .await
965 );
966 }
967
968 #[tokio::test]
969 async fn test_status_matches_target_all_terminal_statuses_satisfy_completed() {
970 let terminal_statuses = vec![
971 OperationStatus::Succeeded,
972 OperationStatus::Failed,
973 OperationStatus::Cancelled,
974 OperationStatus::TimedOut,
975 OperationStatus::Stopped,
976 ];
977
978 for status in terminal_statuses {
979 let mut op = Operation::new("op-001".to_string(), OperationType::Step);
980 op.status = status;
981 assert!(
982 OperationHandle::status_matches_target(&op, WaitingOperationStatus::Completed),
983 "Expected {:?} to satisfy Completed",
984 status
985 );
986 }
987 }
988
989 #[tokio::test]
990 async fn test_status_matches_target_non_terminal_does_not_satisfy_completed() {
991 let non_terminal_statuses = vec![
992 OperationStatus::Started,
993 OperationStatus::Pending,
994 OperationStatus::Ready,
995 ];
996
997 for status in non_terminal_statuses {
998 let mut op = Operation::new("op-001".to_string(), OperationType::Step);
999 op.status = status;
1000 assert!(
1001 !OperationHandle::status_matches_target(&op, WaitingOperationStatus::Completed),
1002 "Expected {:?} to NOT satisfy Completed",
1003 status
1004 );
1005 }
1006 }
1007
1008 #[tokio::test]
1009 async fn test_wait_for_data_resolves_when_status_update_arrives() {
1010 let all_ops = Arc::new(RwLock::new(Vec::new()));
1011 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1012
1013 let handle_clone = handle.clone();
1014 tokio::spawn(async move {
1016 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1017 let mut op = create_step_operation();
1019 op.status = OperationStatus::Succeeded;
1020 {
1021 let mut inner = handle_clone.inner.write().await;
1022 *inner = Some(op);
1023 }
1024 let _ = handle_clone
1025 .status_tx
1026 .send(Some(OperationStatus::Succeeded));
1027 });
1028
1029 let result = handle
1030 .wait_for_data(WaitingOperationStatus::Completed)
1031 .await;
1032 assert!(result.is_ok());
1033 }
1034
1035 #[tokio::test]
1036 async fn test_wait_for_data_waits_through_non_terminal_updates() {
1037 let all_ops = Arc::new(RwLock::new(Vec::new()));
1038 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1039
1040 let handle_clone = handle.clone();
1041 tokio::spawn(async move {
1042 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1043 let mut op = create_step_operation();
1045 op.status = OperationStatus::Started;
1046 {
1047 let mut inner = handle_clone.inner.write().await;
1048 *inner = Some(op);
1049 }
1050 let _ = handle_clone.status_tx.send(Some(OperationStatus::Started));
1051
1052 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1053 {
1055 let mut inner = handle_clone.inner.write().await;
1056 if let Some(ref mut op) = *inner {
1057 op.status = OperationStatus::Succeeded;
1058 }
1059 }
1060 let _ = handle_clone
1061 .status_tx
1062 .send(Some(OperationStatus::Succeeded));
1063 });
1064
1065 let result = handle
1066 .wait_for_data(WaitingOperationStatus::Completed)
1067 .await;
1068 assert!(result.is_ok());
1069 }
1070
1071 #[tokio::test]
1072 async fn test_wait_for_data_submitted_waits_for_callback_id() {
1073 let all_ops = Arc::new(RwLock::new(Vec::new()));
1074 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1075
1076 let handle_clone = handle.clone();
1077 tokio::spawn(async move {
1078 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1079 let mut op = Operation::new("cb-001".to_string(), OperationType::Callback);
1081 op.name = Some("my-callback".to_string());
1082 op.status = OperationStatus::Started;
1083 op.callback_details = Some(SdkCallbackDetails {
1084 callback_id: None,
1085 result: None,
1086 error: None,
1087 });
1088 {
1089 let mut inner = handle_clone.inner.write().await;
1090 *inner = Some(op);
1091 }
1092 let _ = handle_clone.status_tx.send(Some(OperationStatus::Started));
1093
1094 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1095 {
1097 let mut inner = handle_clone.inner.write().await;
1098 if let Some(ref mut op) = *inner {
1099 if let Some(ref mut details) = op.callback_details {
1100 details.callback_id = Some("cb-id-123".to_string());
1101 }
1102 }
1103 }
1104 let _ = handle_clone.status_tx.send(Some(OperationStatus::Started));
1105 });
1106
1107 let result = handle
1108 .wait_for_data(WaitingOperationStatus::Submitted)
1109 .await;
1110 assert!(result.is_ok());
1111 }
1112
1113 #[tokio::test]
1114 async fn test_matcher_description() {
1115 let all_ops = Arc::new(RwLock::new(Vec::new()));
1116
1117 let handle =
1118 OperationHandle::new(OperationMatcher::ByName("my-op".into()), all_ops.clone());
1119 assert_eq!(handle.matcher_description(), "name=my-op");
1120
1121 let handle = OperationHandle::new(OperationMatcher::ByIndex(3), all_ops.clone());
1122 assert_eq!(handle.matcher_description(), "index=3");
1123
1124 let handle = OperationHandle::new(OperationMatcher::ById("abc-123".into()), all_ops);
1125 assert_eq!(handle.matcher_description(), "id=abc-123");
1126 }
1127
1128 #[derive(Clone)]
1134 struct MockCallbackSender {
1135 success_calls: Arc<RwLock<Vec<(String, String)>>>,
1136 failure_calls: Arc<RwLock<Vec<(String, String)>>>,
1137 heartbeat_calls: Arc<RwLock<Vec<String>>>,
1138 }
1139
1140 impl MockCallbackSender {
1141 fn new() -> Self {
1142 Self {
1143 success_calls: Arc::new(RwLock::new(Vec::new())),
1144 failure_calls: Arc::new(RwLock::new(Vec::new())),
1145 heartbeat_calls: Arc::new(RwLock::new(Vec::new())),
1146 }
1147 }
1148 }
1149
1150 #[async_trait::async_trait]
1151 impl CallbackSender for MockCallbackSender {
1152 async fn send_success(&self, callback_id: &str, result: &str) -> Result<(), TestError> {
1153 self.success_calls
1154 .write()
1155 .await
1156 .push((callback_id.to_string(), result.to_string()));
1157 Ok(())
1158 }
1159
1160 async fn send_failure(
1161 &self,
1162 callback_id: &str,
1163 error: &crate::types::TestResultError,
1164 ) -> Result<(), TestError> {
1165 self.failure_calls
1166 .write()
1167 .await
1168 .push((callback_id.to_string(), error.to_string()));
1169 Ok(())
1170 }
1171
1172 async fn send_heartbeat(&self, callback_id: &str) -> Result<(), TestError> {
1173 self.heartbeat_calls
1174 .write()
1175 .await
1176 .push(callback_id.to_string());
1177 Ok(())
1178 }
1179 }
1180
1181 async fn create_callback_handle_with_sender() -> (OperationHandle, MockCallbackSender) {
1183 let all_ops = Arc::new(RwLock::new(Vec::new()));
1184 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1185 let sender = MockCallbackSender::new();
1186 {
1187 let mut guard = handle.callback_sender.write().await;
1188 *guard = Some(Arc::new(sender.clone()));
1189 }
1190 populate_handle(&handle, create_callback_operation()).await;
1191 (handle, sender)
1192 }
1193
1194 #[tokio::test]
1195 async fn test_send_callback_success_unpopulated_returns_error() {
1196 let all_ops = Arc::new(RwLock::new(Vec::new()));
1197 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1198 let result = handle.send_callback_success("ok").await;
1199 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
1200 }
1201
1202 #[tokio::test]
1203 async fn test_send_callback_success_non_callback_returns_error() {
1204 let all_ops = Arc::new(RwLock::new(Vec::new()));
1205 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1206 populate_handle(&handle, create_step_operation()).await;
1207 let result = handle.send_callback_success("ok").await;
1208 assert!(matches!(result, Err(TestError::NotCallbackOperation)));
1209 }
1210
1211 #[tokio::test]
1212 async fn test_send_callback_success_no_callback_id_returns_error() {
1213 let all_ops = Arc::new(RwLock::new(Vec::new()));
1214 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1215 let mut op = Operation::new("cb-001".to_string(), OperationType::Callback);
1217 op.callback_details = Some(SdkCallbackDetails {
1218 callback_id: None,
1219 result: None,
1220 error: None,
1221 });
1222 populate_handle(&handle, op).await;
1223 let result = handle.send_callback_success("ok").await;
1224 assert!(matches!(result, Err(TestError::ResultNotAvailable(_))));
1225 }
1226
1227 #[tokio::test]
1228 async fn test_send_callback_success_no_sender_returns_error() {
1229 let all_ops = Arc::new(RwLock::new(Vec::new()));
1230 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1231 populate_handle(&handle, create_callback_operation()).await;
1232 let result = handle.send_callback_success("ok").await;
1234 assert!(matches!(result, Err(TestError::ResultNotAvailable(_))));
1235 }
1236
1237 #[tokio::test]
1238 async fn test_send_callback_success_delegates_to_sender() {
1239 let (handle, sender) = create_callback_handle_with_sender().await;
1240 let result = handle.send_callback_success("my-result").await;
1241 assert!(result.is_ok());
1242 let calls = sender.success_calls.read().await;
1243 assert_eq!(calls.len(), 1);
1244 assert_eq!(calls[0], ("cb-id-123".to_string(), "my-result".to_string()));
1245 }
1246
1247 #[tokio::test]
1248 async fn test_send_callback_failure_unpopulated_returns_error() {
1249 let all_ops = Arc::new(RwLock::new(Vec::new()));
1250 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1251 let error = crate::types::TestResultError::new("TestError", "something failed");
1252 let result = handle.send_callback_failure(&error).await;
1253 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
1254 }
1255
1256 #[tokio::test]
1257 async fn test_send_callback_failure_non_callback_returns_error() {
1258 let all_ops = Arc::new(RwLock::new(Vec::new()));
1259 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1260 populate_handle(&handle, create_step_operation()).await;
1261 let error = crate::types::TestResultError::new("TestError", "something failed");
1262 let result = handle.send_callback_failure(&error).await;
1263 assert!(matches!(result, Err(TestError::NotCallbackOperation)));
1264 }
1265
1266 #[tokio::test]
1267 async fn test_send_callback_failure_delegates_to_sender() {
1268 let (handle, sender) = create_callback_handle_with_sender().await;
1269 let error = crate::types::TestResultError::new("TestError", "something failed");
1270 let result = handle.send_callback_failure(&error).await;
1271 assert!(result.is_ok());
1272 let calls = sender.failure_calls.read().await;
1273 assert_eq!(calls.len(), 1);
1274 assert_eq!(calls[0].0, "cb-id-123");
1275 }
1276
1277 #[tokio::test]
1278 async fn test_send_callback_heartbeat_unpopulated_returns_error() {
1279 let all_ops = Arc::new(RwLock::new(Vec::new()));
1280 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1281 let result = handle.send_callback_heartbeat().await;
1282 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
1283 }
1284
1285 #[tokio::test]
1286 async fn test_send_callback_heartbeat_non_callback_returns_error() {
1287 let all_ops = Arc::new(RwLock::new(Vec::new()));
1288 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1289 populate_handle(&handle, create_step_operation()).await;
1290 let result = handle.send_callback_heartbeat().await;
1291 assert!(matches!(result, Err(TestError::NotCallbackOperation)));
1292 }
1293
1294 #[tokio::test]
1295 async fn test_send_callback_heartbeat_delegates_to_sender() {
1296 let (handle, sender) = create_callback_handle_with_sender().await;
1297 let result = handle.send_callback_heartbeat().await;
1298 assert!(result.is_ok());
1299 let calls = sender.heartbeat_calls.read().await;
1300 assert_eq!(calls.len(), 1);
1301 assert_eq!(calls[0], "cb-id-123");
1302 }
1303
1304 fn create_operation_with_parent(id: &str, name: &str, parent_id: Option<&str>) -> Operation {
1310 let mut op = Operation::new(id.to_string(), OperationType::Step);
1311 op.name = Some(name.to_string());
1312 op.status = OperationStatus::Succeeded;
1313 op.parent_id = parent_id.map(|s| s.to_string());
1314 op
1315 }
1316
1317 #[tokio::test]
1318 async fn test_get_child_operations_unpopulated_returns_error() {
1319 let all_ops = Arc::new(RwLock::new(Vec::new()));
1320 let handle = OperationHandle::new(OperationMatcher::ByName("parent".into()), all_ops);
1321
1322 let result = handle.get_child_operations().await;
1323 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
1324 }
1325
1326 #[tokio::test]
1327 async fn test_get_child_operations_returns_matching_children() {
1328 let parent_op = create_operation_with_parent("parent-1", "parent", None);
1329 let child1 = create_operation_with_parent("child-1", "child-a", Some("parent-1"));
1330 let child2 = create_operation_with_parent("child-2", "child-b", Some("parent-1"));
1331 let unrelated = create_operation_with_parent("other-1", "other", Some("other-parent"));
1332
1333 let all_ops = Arc::new(RwLock::new(vec![
1334 parent_op.clone(),
1335 child1,
1336 child2,
1337 unrelated,
1338 ]));
1339
1340 let handle = OperationHandle::new(OperationMatcher::ByName("parent".into()), all_ops);
1341 populate_handle(&handle, parent_op).await;
1342
1343 let children = handle.get_child_operations().await.unwrap();
1344 assert_eq!(children.len(), 2);
1345 assert_eq!(children[0].get_id(), "child-1");
1346 assert_eq!(children[1].get_id(), "child-2");
1347 }
1348
1349 #[tokio::test]
1350 async fn test_get_child_operations_empty_when_no_children() {
1351 let parent_op = create_operation_with_parent("parent-1", "parent", None);
1352 let unrelated = create_operation_with_parent("other-1", "other", Some("other-parent"));
1353
1354 let all_ops = Arc::new(RwLock::new(vec![parent_op.clone(), unrelated]));
1355
1356 let handle = OperationHandle::new(OperationMatcher::ByName("parent".into()), all_ops);
1357 populate_handle(&handle, parent_op).await;
1358
1359 let children = handle.get_child_operations().await.unwrap();
1360 assert!(children.is_empty());
1361 }
1362
1363 #[tokio::test]
1364 async fn test_get_child_operations_preserves_execution_order() {
1365 let parent_op = create_operation_with_parent("parent-1", "parent", None);
1366 let child_c = create_operation_with_parent("child-c", "third", Some("parent-1"));
1367 let child_a = create_operation_with_parent("child-a", "first", Some("parent-1"));
1368 let child_b = create_operation_with_parent("child-b", "second", Some("parent-1"));
1369
1370 let all_ops = Arc::new(RwLock::new(vec![
1372 parent_op.clone(),
1373 child_c,
1374 child_a,
1375 child_b,
1376 ]));
1377
1378 let handle = OperationHandle::new(OperationMatcher::ByName("parent".into()), all_ops);
1379 populate_handle(&handle, parent_op).await;
1380
1381 let children = handle.get_child_operations().await.unwrap();
1382 assert_eq!(children.len(), 3);
1383 assert_eq!(children[0].get_id(), "child-c");
1384 assert_eq!(children[1].get_id(), "child-a");
1385 assert_eq!(children[2].get_id(), "child-b");
1386 }
1387
1388 #[tokio::test]
1389 async fn test_get_child_operations_children_support_recursive_enumeration() {
1390 let parent_op = create_operation_with_parent("parent-1", "parent", None);
1391 let child = create_operation_with_parent("child-1", "child", Some("parent-1"));
1392 let grandchild =
1393 create_operation_with_parent("grandchild-1", "grandchild", Some("child-1"));
1394
1395 let all_ops = Arc::new(RwLock::new(vec![parent_op.clone(), child, grandchild]));
1396
1397 let handle = OperationHandle::new(OperationMatcher::ByName("parent".into()), all_ops);
1398 populate_handle(&handle, parent_op).await;
1399
1400 let children = handle.get_child_operations().await.unwrap();
1401 assert_eq!(children.len(), 1);
1402 assert_eq!(children[0].get_id(), "child-1");
1403
1404 let grandchildren = children[0].get_child_operations();
1406 assert_eq!(grandchildren.len(), 1);
1407 assert_eq!(grandchildren[0].get_id(), "grandchild-1");
1408 }
1409}