1use std::sync::Arc;
10use tokio::sync::{watch, RwLock};
11
12use serde::de::DeserializeOwned;
13
14use durable_execution_sdk::{Operation, OperationStatus, OperationType};
15
16use crate::error::TestError;
17use crate::operation::{
18 CallbackDetails, CallbackSender, ContextDetails, DurableOperation, InvokeDetails, StepDetails,
19 WaitDetails,
20};
21use crate::types::WaitingOperationStatus;
22
23#[derive(Clone, Debug)]
25pub enum OperationMatcher {
26 ByName(String),
28 ByIndex(usize),
30 ById(String),
32 ByNameAndIndex(String, usize),
34}
35
36pub struct OperationHandle {
64 pub(crate) matcher: OperationMatcher,
66 pub(crate) inner: Arc<RwLock<Option<Operation>>>,
68 pub(crate) status_tx: watch::Sender<Option<OperationStatus>>,
70 pub(crate) status_rx: watch::Receiver<Option<OperationStatus>>,
72 pub(crate) callback_sender: Arc<RwLock<Option<Arc<dyn CallbackSender>>>>,
77 pub(crate) all_operations: Arc<RwLock<Vec<Operation>>>,
79}
80
81impl Clone for OperationHandle {
82 fn clone(&self) -> Self {
83 Self {
84 matcher: self.matcher.clone(),
85 inner: Arc::clone(&self.inner),
86 status_tx: self.status_tx.clone(),
87 status_rx: self.status_rx.clone(),
88 callback_sender: Arc::clone(&self.callback_sender),
89 all_operations: Arc::clone(&self.all_operations),
90 }
91 }
92}
93
94impl OperationHandle {
95 pub fn new(matcher: OperationMatcher, all_operations: Arc<RwLock<Vec<Operation>>>) -> Self {
105 let (status_tx, status_rx) = watch::channel(None);
106 Self {
107 matcher,
108 inner: Arc::new(RwLock::new(None)),
109 status_tx,
110 status_rx,
111 callback_sender: Arc::new(RwLock::new(None)),
112 all_operations,
113 }
114 }
115
116 async fn get_durable_operation(&self) -> Result<DurableOperation, TestError> {
122 let inner = self.inner.read().await;
123 match inner.as_ref() {
124 Some(op) => Ok(DurableOperation::new(op.clone())),
125 None => Err(TestError::OperationNotFound(
126 "Operation not yet populated".into(),
127 )),
128 }
129 }
130
131 pub async fn get_id(&self) -> Result<String, TestError> {
138 let op = self.get_durable_operation().await?;
139 Ok(op.get_id().to_string())
140 }
141
142 pub async fn get_name(&self) -> Result<Option<String>, TestError> {
149 let op = self.get_durable_operation().await?;
150 Ok(op.get_name().map(|s| s.to_string()))
151 }
152
153 pub async fn get_type(&self) -> Result<OperationType, TestError> {
160 let op = self.get_durable_operation().await?;
161 Ok(op.get_type())
162 }
163
164 pub async fn get_status(&self) -> Result<OperationStatus, TestError> {
171 let op = self.get_durable_operation().await?;
172 Ok(op.get_status())
173 }
174
175 pub async fn get_step_details<T: DeserializeOwned>(&self) -> Result<StepDetails<T>, TestError> {
186 let op = self.get_durable_operation().await?;
187 op.get_step_details()
188 }
189
190 pub async fn get_callback_details<T: DeserializeOwned>(
201 &self,
202 ) -> Result<CallbackDetails<T>, TestError> {
203 let op = self.get_durable_operation().await?;
204 op.get_callback_details()
205 }
206
207 pub async fn get_wait_details(&self) -> Result<WaitDetails, TestError> {
214 let op = self.get_durable_operation().await?;
215 op.get_wait_details()
216 }
217
218 pub async fn get_invoke_details<T: DeserializeOwned>(
229 &self,
230 ) -> Result<InvokeDetails<T>, TestError> {
231 let op = self.get_durable_operation().await?;
232 op.get_invoke_details()
233 }
234
235 pub async fn get_context_details<T: DeserializeOwned>(
246 &self,
247 ) -> Result<ContextDetails<T>, TestError> {
248 let op = self.get_durable_operation().await?;
249 op.get_context_details()
250 }
251
252 pub async fn is_populated(&self) -> bool {
258 self.inner.read().await.is_some()
259 }
260
261 pub async fn wait_for_data(&self, status: WaitingOperationStatus) -> Result<(), TestError> {
290 if self.check_status_reached(status).await {
292 return Ok(());
293 }
294
295 let mut rx = self.status_rx.clone();
297 loop {
298 if rx.changed().await.is_err() {
300 if self.check_status_reached(status).await {
302 return Ok(());
303 }
304 return Err(TestError::execution_completed_early(
305 self.matcher_description(),
306 status,
307 ));
308 }
309
310 if self.check_status_reached(status).await {
312 return Ok(());
313 }
314 }
315 }
316
317 async fn check_status_reached(&self, target: WaitingOperationStatus) -> bool {
319 let inner = self.inner.read().await;
320 match inner.as_ref() {
321 None => false,
322 Some(op) => Self::status_matches_target(op, target),
323 }
324 }
325
326 fn status_matches_target(op: &Operation, target: WaitingOperationStatus) -> bool {
328 match target {
329 WaitingOperationStatus::Started => {
330 true
332 }
333 WaitingOperationStatus::Submitted => {
334 if op.operation_type == OperationType::Callback {
336 op.callback_details
337 .as_ref()
338 .map(|d| d.callback_id.is_some())
339 .unwrap_or(false)
340 } else {
341 true
343 }
344 }
345 WaitingOperationStatus::Completed => {
346 op.status.is_terminal()
348 }
349 }
350 }
351
352 pub async fn send_callback_success(&self, result: &str) -> Result<(), TestError> {
373 let callback_id = self.validate_callback_ready().await?;
374
375 let sender_guard = self.callback_sender.read().await;
376 match sender_guard.as_ref() {
377 Some(sender) => sender.send_success(&callback_id, result).await,
378 None => Err(TestError::result_not_available(
379 "No callback sender configured on this handle",
380 )),
381 }
382 }
383
384 pub async fn send_callback_failure(
401 &self,
402 error: &crate::types::TestResultError,
403 ) -> Result<(), TestError> {
404 let callback_id = self.validate_callback_ready().await?;
405
406 let sender_guard = self.callback_sender.read().await;
407 match sender_guard.as_ref() {
408 Some(sender) => sender.send_failure(&callback_id, error).await,
409 None => Err(TestError::result_not_available(
410 "No callback sender configured on this handle",
411 )),
412 }
413 }
414
415 pub async fn send_callback_heartbeat(&self) -> Result<(), TestError> {
428 let callback_id = self.validate_callback_ready().await?;
429
430 let sender_guard = self.callback_sender.read().await;
431 match sender_guard.as_ref() {
432 Some(sender) => sender.send_heartbeat(&callback_id).await,
433 None => Err(TestError::result_not_available(
434 "No callback sender configured on this handle",
435 )),
436 }
437 }
438
439 async fn validate_callback_ready(&self) -> Result<String, TestError> {
442 let inner = self.inner.read().await;
443 let op = inner
444 .as_ref()
445 .ok_or_else(|| TestError::OperationNotFound("Operation not yet populated".into()))?;
446
447 if op.operation_type != OperationType::Callback {
448 return Err(TestError::NotCallbackOperation);
449 }
450
451 op.callback_details
452 .as_ref()
453 .and_then(|d| d.callback_id.clone())
454 .ok_or_else(|| {
455 TestError::result_not_available(
456 "Callback ID not available — operation has not reached Submitted status",
457 )
458 })
459 }
460
461 pub async fn get_child_operations(&self) -> Result<Vec<DurableOperation>, TestError> {
483 let inner = self.inner.read().await;
484 let op = inner
485 .as_ref()
486 .ok_or_else(|| TestError::OperationNotFound("Operation not yet populated".into()))?;
487
488 let my_id = &op.operation_id;
489 let all_ops = self.all_operations.read().await;
490
491 let all_ops_arc = Arc::new(all_ops.clone());
494
495 let children = all_ops_arc
496 .iter()
497 .filter(|child| child.parent_id.as_deref() == Some(my_id))
498 .map(|child| {
499 DurableOperation::new(child.clone()).with_operations(Arc::clone(&all_ops_arc))
500 })
501 .collect();
502
503 Ok(children)
504 }
505
506 fn matcher_description(&self) -> String {
508 match &self.matcher {
509 OperationMatcher::ByName(name) => format!("name={}", name),
510 OperationMatcher::ByIndex(idx) => format!("index={}", idx),
511 OperationMatcher::ById(id) => format!("id={}", id),
512 OperationMatcher::ByNameAndIndex(name, idx) => format!("name={}, index={}", name, idx),
513 }
514 }
515}
516
517#[cfg(test)]
518mod tests {
519 use super::*;
520 use durable_execution_sdk::{
521 CallbackDetails as SdkCallbackDetails, ChainedInvokeDetails as SdkChainedInvokeDetails,
522 ContextDetails as SdkContextDetails, OperationType, StepDetails as SdkStepDetails,
523 WaitDetails as SdkWaitDetails,
524 };
525
526 fn create_step_operation() -> Operation {
528 let mut op = Operation::new("step-001".to_string(), OperationType::Step);
529 op.name = Some("my-step".to_string());
530 op.status = OperationStatus::Succeeded;
531 op.step_details = Some(SdkStepDetails {
532 result: Some(r#""hello""#.to_string()),
533 attempt: Some(1),
534 next_attempt_timestamp: None,
535 error: None,
536 payload: None,
537 });
538 op
539 }
540
541 fn create_callback_operation() -> Operation {
543 let mut op = Operation::new("cb-001".to_string(), OperationType::Callback);
544 op.name = Some("my-callback".to_string());
545 op.status = OperationStatus::Started;
546 op.callback_details = Some(SdkCallbackDetails {
547 callback_id: Some("cb-id-123".to_string()),
548 result: None,
549 error: None,
550 });
551 op
552 }
553
554 fn create_wait_operation() -> Operation {
556 let mut op = Operation::new("wait-001".to_string(), OperationType::Wait);
557 op.name = Some("my-wait".to_string());
558 op.status = OperationStatus::Succeeded;
559 op.start_timestamp = Some(1000);
560 op.wait_details = Some(SdkWaitDetails {
561 scheduled_end_timestamp: Some(6000),
562 });
563 op
564 }
565
566 fn create_invoke_operation() -> Operation {
568 let mut op = Operation::new("invoke-001".to_string(), OperationType::Invoke);
569 op.name = Some("my-invoke".to_string());
570 op.status = OperationStatus::Succeeded;
571 op.chained_invoke_details = Some(SdkChainedInvokeDetails {
572 result: Some(r#"42"#.to_string()),
573 error: None,
574 });
575 op
576 }
577
578 fn create_context_operation() -> Operation {
580 let mut op = Operation::new("ctx-001".to_string(), OperationType::Context);
581 op.name = Some("my-context".to_string());
582 op.status = OperationStatus::Succeeded;
583 op.context_details = Some(SdkContextDetails {
584 result: Some(r#""done""#.to_string()),
585 replay_children: None,
586 error: None,
587 });
588 op
589 }
590
591 async fn populate_handle(handle: &OperationHandle, op: Operation) {
593 let mut inner = handle.inner.write().await;
594 *inner = Some(op);
595 }
596
597 #[test]
598 fn test_operation_handle_new_by_name() {
599 let all_ops = Arc::new(RwLock::new(Vec::new()));
600 let handle = OperationHandle::new(OperationMatcher::ByName("test-op".into()), all_ops);
601
602 assert!(matches!(handle.matcher, OperationMatcher::ByName(ref n) if n == "test-op"));
603 assert!(handle.callback_sender.try_read().unwrap().is_none());
604 }
605
606 #[test]
607 fn test_operation_handle_new_by_index() {
608 let all_ops = Arc::new(RwLock::new(Vec::new()));
609 let handle = OperationHandle::new(OperationMatcher::ByIndex(3), all_ops);
610
611 assert!(matches!(handle.matcher, OperationMatcher::ByIndex(3)));
612 }
613
614 #[test]
615 fn test_operation_handle_new_by_id() {
616 let all_ops = Arc::new(RwLock::new(Vec::new()));
617 let handle = OperationHandle::new(OperationMatcher::ById("abc-123".into()), all_ops);
618
619 assert!(matches!(handle.matcher, OperationMatcher::ById(ref id) if id == "abc-123"));
620 }
621
622 #[tokio::test]
623 async fn test_operation_handle_starts_unpopulated() {
624 let all_ops = Arc::new(RwLock::new(Vec::new()));
625 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
626
627 let inner = handle.inner.read().await;
628 assert!(inner.is_none());
629 assert!(handle.status_rx.borrow().is_none());
630 }
631
632 #[test]
633 fn test_operation_handle_clone() {
634 let all_ops = Arc::new(RwLock::new(Vec::new()));
635 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
636 let cloned = handle.clone();
637
638 assert!(Arc::ptr_eq(&handle.inner, &cloned.inner));
640 assert!(Arc::ptr_eq(&handle.all_operations, &cloned.all_operations));
641 assert!(matches!(cloned.matcher, OperationMatcher::ByName(ref n) if n == "test"));
642 }
643
644 #[test]
645 fn test_operation_matcher_debug() {
646 let by_name = OperationMatcher::ByName("test".into());
647 let by_index = OperationMatcher::ByIndex(0);
648 let by_id = OperationMatcher::ById("id-1".into());
649
650 assert!(format!("{:?}", by_name).contains("ByName"));
652 assert!(format!("{:?}", by_index).contains("ByIndex"));
653 assert!(format!("{:?}", by_id).contains("ById"));
654 }
655
656 #[tokio::test]
661 async fn test_is_populated_false_when_new() {
662 let all_ops = Arc::new(RwLock::new(Vec::new()));
663 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
664 assert!(!handle.is_populated().await);
665 }
666
667 #[tokio::test]
668 async fn test_is_populated_true_after_population() {
669 let all_ops = Arc::new(RwLock::new(Vec::new()));
670 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
671 populate_handle(&handle, create_step_operation()).await;
672 assert!(handle.is_populated().await);
673 }
674
675 #[tokio::test]
676 async fn test_get_id_unpopulated_returns_error() {
677 let all_ops = Arc::new(RwLock::new(Vec::new()));
678 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
679 let result = handle.get_id().await;
680 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
681 }
682
683 #[tokio::test]
684 async fn test_get_id_populated() {
685 let all_ops = Arc::new(RwLock::new(Vec::new()));
686 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
687 populate_handle(&handle, create_step_operation()).await;
688 assert_eq!(handle.get_id().await.unwrap(), "step-001");
689 }
690
691 #[tokio::test]
692 async fn test_get_name_unpopulated_returns_error() {
693 let all_ops = Arc::new(RwLock::new(Vec::new()));
694 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
695 let result = handle.get_name().await;
696 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
697 }
698
699 #[tokio::test]
700 async fn test_get_name_populated() {
701 let all_ops = Arc::new(RwLock::new(Vec::new()));
702 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
703 populate_handle(&handle, create_step_operation()).await;
704 assert_eq!(
705 handle.get_name().await.unwrap(),
706 Some("my-step".to_string())
707 );
708 }
709
710 #[tokio::test]
711 async fn test_get_type_unpopulated_returns_error() {
712 let all_ops = Arc::new(RwLock::new(Vec::new()));
713 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
714 let result = handle.get_type().await;
715 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
716 }
717
718 #[tokio::test]
719 async fn test_get_type_populated() {
720 let all_ops = Arc::new(RwLock::new(Vec::new()));
721 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
722 populate_handle(&handle, create_step_operation()).await;
723 assert_eq!(handle.get_type().await.unwrap(), OperationType::Step);
724 }
725
726 #[tokio::test]
727 async fn test_get_status_unpopulated_returns_error() {
728 let all_ops = Arc::new(RwLock::new(Vec::new()));
729 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
730 let result = handle.get_status().await;
731 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
732 }
733
734 #[tokio::test]
735 async fn test_get_status_populated() {
736 let all_ops = Arc::new(RwLock::new(Vec::new()));
737 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
738 populate_handle(&handle, create_step_operation()).await;
739 assert_eq!(
740 handle.get_status().await.unwrap(),
741 OperationStatus::Succeeded
742 );
743 }
744
745 #[tokio::test]
746 async fn test_get_step_details_unpopulated_returns_error() {
747 let all_ops = Arc::new(RwLock::new(Vec::new()));
748 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
749 let result: Result<StepDetails<String>, _> = handle.get_step_details().await;
750 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
751 }
752
753 #[tokio::test]
754 async fn test_get_step_details_populated() {
755 let all_ops = Arc::new(RwLock::new(Vec::new()));
756 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
757 populate_handle(&handle, create_step_operation()).await;
758 let details: StepDetails<String> = handle.get_step_details().await.unwrap();
759 assert_eq!(details.result, Some("hello".to_string()));
760 assert_eq!(details.attempt, Some(1));
761 }
762
763 #[tokio::test]
764 async fn test_get_callback_details_unpopulated_returns_error() {
765 let all_ops = Arc::new(RwLock::new(Vec::new()));
766 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
767 let result: Result<CallbackDetails<String>, _> = handle.get_callback_details().await;
768 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
769 }
770
771 #[tokio::test]
772 async fn test_get_callback_details_populated() {
773 let all_ops = Arc::new(RwLock::new(Vec::new()));
774 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
775 populate_handle(&handle, create_callback_operation()).await;
776 let details: CallbackDetails<String> = handle.get_callback_details().await.unwrap();
777 assert_eq!(details.callback_id, Some("cb-id-123".to_string()));
778 }
779
780 #[tokio::test]
781 async fn test_get_wait_details_unpopulated_returns_error() {
782 let all_ops = Arc::new(RwLock::new(Vec::new()));
783 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
784 let result = handle.get_wait_details().await;
785 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
786 }
787
788 #[tokio::test]
789 async fn test_get_wait_details_populated() {
790 let all_ops = Arc::new(RwLock::new(Vec::new()));
791 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
792 populate_handle(&handle, create_wait_operation()).await;
793 let details = handle.get_wait_details().await.unwrap();
794 assert_eq!(details.wait_seconds, Some(5));
795 }
796
797 #[tokio::test]
798 async fn test_get_invoke_details_unpopulated_returns_error() {
799 let all_ops = Arc::new(RwLock::new(Vec::new()));
800 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
801 let result: Result<InvokeDetails<serde_json::Value>, _> = handle.get_invoke_details().await;
802 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
803 }
804
805 #[tokio::test]
806 async fn test_get_invoke_details_populated() {
807 let all_ops = Arc::new(RwLock::new(Vec::new()));
808 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
809 populate_handle(&handle, create_invoke_operation()).await;
810 let details: InvokeDetails<i32> = handle.get_invoke_details().await.unwrap();
811 assert_eq!(details.result, Some(42));
812 }
813
814 #[tokio::test]
815 async fn test_get_context_details_unpopulated_returns_error() {
816 let all_ops = Arc::new(RwLock::new(Vec::new()));
817 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
818 let result: Result<ContextDetails<String>, _> = handle.get_context_details().await;
819 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
820 }
821
822 #[tokio::test]
823 async fn test_get_context_details_populated() {
824 let all_ops = Arc::new(RwLock::new(Vec::new()));
825 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
826 populate_handle(&handle, create_context_operation()).await;
827 let details: ContextDetails<String> = handle.get_context_details().await.unwrap();
828 assert_eq!(details.result, Some("done".to_string()));
829 }
830
831 #[tokio::test]
832 async fn test_get_step_details_wrong_type_returns_type_mismatch() {
833 let all_ops = Arc::new(RwLock::new(Vec::new()));
834 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
835 populate_handle(&handle, create_wait_operation()).await;
836 let result: Result<StepDetails<String>, _> = handle.get_step_details().await;
837 assert!(matches!(
838 result,
839 Err(TestError::OperationTypeMismatch { .. })
840 ));
841 }
842
843 #[tokio::test]
848 async fn test_wait_for_data_started_resolves_immediately_when_populated() {
849 let all_ops = Arc::new(RwLock::new(Vec::new()));
850 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
851 populate_handle(&handle, create_step_operation()).await;
852 let result = handle.wait_for_data(WaitingOperationStatus::Started).await;
854 assert!(result.is_ok());
855 }
856
857 #[tokio::test]
858 async fn test_wait_for_data_completed_resolves_immediately_when_terminal() {
859 let all_ops = Arc::new(RwLock::new(Vec::new()));
860 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
861 populate_handle(&handle, create_step_operation()).await;
863 let result = handle
864 .wait_for_data(WaitingOperationStatus::Completed)
865 .await;
866 assert!(result.is_ok());
867 }
868
869 #[tokio::test]
870 async fn test_wait_for_data_submitted_resolves_for_callback_with_id() {
871 let all_ops = Arc::new(RwLock::new(Vec::new()));
872 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
873 populate_handle(&handle, create_callback_operation()).await;
875 let result = handle
876 .wait_for_data(WaitingOperationStatus::Submitted)
877 .await;
878 assert!(result.is_ok());
879 }
880
881 #[tokio::test]
882 async fn test_wait_for_data_submitted_resolves_for_non_callback() {
883 let all_ops = Arc::new(RwLock::new(Vec::new()));
884 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
885 populate_handle(&handle, create_step_operation()).await;
887 let result = handle
888 .wait_for_data(WaitingOperationStatus::Submitted)
889 .await;
890 assert!(result.is_ok());
891 }
892
893 #[tokio::test]
894 async fn test_wait_for_data_unpopulated_not_started_returns_error_on_channel_close() {
895 let all_ops = Arc::new(RwLock::new(Vec::new()));
904 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
905
906 assert!(
908 !handle
909 .check_status_reached(WaitingOperationStatus::Started)
910 .await
911 );
912 assert!(
913 !handle
914 .check_status_reached(WaitingOperationStatus::Submitted)
915 .await
916 );
917 assert!(
918 !handle
919 .check_status_reached(WaitingOperationStatus::Completed)
920 .await
921 );
922 }
923
924 #[tokio::test]
925 async fn test_wait_for_data_non_terminal_does_not_satisfy_completed() {
926 let all_ops = Arc::new(RwLock::new(Vec::new()));
927 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
928
929 let mut op = Operation::new("step-001".to_string(), OperationType::Step);
931 op.status = OperationStatus::Started;
932 populate_handle(&handle, op).await;
933
934 assert!(
936 handle
937 .check_status_reached(WaitingOperationStatus::Started)
938 .await
939 );
940 assert!(
941 !handle
942 .check_status_reached(WaitingOperationStatus::Completed)
943 .await
944 );
945 }
946
947 #[tokio::test]
948 async fn test_wait_for_data_callback_without_id_does_not_satisfy_submitted() {
949 let all_ops = Arc::new(RwLock::new(Vec::new()));
950 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
951
952 let mut op = Operation::new("cb-001".to_string(), OperationType::Callback);
954 op.status = OperationStatus::Started;
955 op.callback_details = Some(SdkCallbackDetails {
956 callback_id: None,
957 result: None,
958 error: None,
959 });
960 populate_handle(&handle, op).await;
961
962 assert!(
964 handle
965 .check_status_reached(WaitingOperationStatus::Started)
966 .await
967 );
968 assert!(
969 !handle
970 .check_status_reached(WaitingOperationStatus::Submitted)
971 .await
972 );
973 }
974
975 #[tokio::test]
976 async fn test_status_matches_target_all_terminal_statuses_satisfy_completed() {
977 let terminal_statuses = vec![
978 OperationStatus::Succeeded,
979 OperationStatus::Failed,
980 OperationStatus::Cancelled,
981 OperationStatus::TimedOut,
982 OperationStatus::Stopped,
983 ];
984
985 for status in terminal_statuses {
986 let mut op = Operation::new("op-001".to_string(), OperationType::Step);
987 op.status = status;
988 assert!(
989 OperationHandle::status_matches_target(&op, WaitingOperationStatus::Completed),
990 "Expected {:?} to satisfy Completed",
991 status
992 );
993 }
994 }
995
996 #[tokio::test]
997 async fn test_status_matches_target_non_terminal_does_not_satisfy_completed() {
998 let non_terminal_statuses = vec![
999 OperationStatus::Started,
1000 OperationStatus::Pending,
1001 OperationStatus::Ready,
1002 ];
1003
1004 for status in non_terminal_statuses {
1005 let mut op = Operation::new("op-001".to_string(), OperationType::Step);
1006 op.status = status;
1007 assert!(
1008 !OperationHandle::status_matches_target(&op, WaitingOperationStatus::Completed),
1009 "Expected {:?} to NOT satisfy Completed",
1010 status
1011 );
1012 }
1013 }
1014
1015 #[tokio::test]
1016 async fn test_wait_for_data_resolves_when_status_update_arrives() {
1017 let all_ops = Arc::new(RwLock::new(Vec::new()));
1018 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1019
1020 let handle_clone = handle.clone();
1021 tokio::spawn(async move {
1023 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1024 let mut op = create_step_operation();
1026 op.status = OperationStatus::Succeeded;
1027 {
1028 let mut inner = handle_clone.inner.write().await;
1029 *inner = Some(op);
1030 }
1031 let _ = handle_clone
1032 .status_tx
1033 .send(Some(OperationStatus::Succeeded));
1034 });
1035
1036 let result = handle
1037 .wait_for_data(WaitingOperationStatus::Completed)
1038 .await;
1039 assert!(result.is_ok());
1040 }
1041
1042 #[tokio::test]
1043 async fn test_wait_for_data_waits_through_non_terminal_updates() {
1044 let all_ops = Arc::new(RwLock::new(Vec::new()));
1045 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1046
1047 let handle_clone = handle.clone();
1048 tokio::spawn(async move {
1049 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1050 let mut op = create_step_operation();
1052 op.status = OperationStatus::Started;
1053 {
1054 let mut inner = handle_clone.inner.write().await;
1055 *inner = Some(op);
1056 }
1057 let _ = handle_clone.status_tx.send(Some(OperationStatus::Started));
1058
1059 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1060 {
1062 let mut inner = handle_clone.inner.write().await;
1063 if let Some(ref mut op) = *inner {
1064 op.status = OperationStatus::Succeeded;
1065 }
1066 }
1067 let _ = handle_clone
1068 .status_tx
1069 .send(Some(OperationStatus::Succeeded));
1070 });
1071
1072 let result = handle
1073 .wait_for_data(WaitingOperationStatus::Completed)
1074 .await;
1075 assert!(result.is_ok());
1076 }
1077
1078 #[tokio::test]
1079 async fn test_wait_for_data_submitted_waits_for_callback_id() {
1080 let all_ops = Arc::new(RwLock::new(Vec::new()));
1081 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1082
1083 let handle_clone = handle.clone();
1084 tokio::spawn(async move {
1085 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1086 let mut op = Operation::new("cb-001".to_string(), OperationType::Callback);
1088 op.name = Some("my-callback".to_string());
1089 op.status = OperationStatus::Started;
1090 op.callback_details = Some(SdkCallbackDetails {
1091 callback_id: None,
1092 result: None,
1093 error: None,
1094 });
1095 {
1096 let mut inner = handle_clone.inner.write().await;
1097 *inner = Some(op);
1098 }
1099 let _ = handle_clone.status_tx.send(Some(OperationStatus::Started));
1100
1101 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1102 {
1104 let mut inner = handle_clone.inner.write().await;
1105 if let Some(ref mut op) = *inner {
1106 if let Some(ref mut details) = op.callback_details {
1107 details.callback_id = Some("cb-id-123".to_string());
1108 }
1109 }
1110 }
1111 let _ = handle_clone.status_tx.send(Some(OperationStatus::Started));
1112 });
1113
1114 let result = handle
1115 .wait_for_data(WaitingOperationStatus::Submitted)
1116 .await;
1117 assert!(result.is_ok());
1118 }
1119
1120 #[tokio::test]
1121 async fn test_matcher_description() {
1122 let all_ops = Arc::new(RwLock::new(Vec::new()));
1123
1124 let handle =
1125 OperationHandle::new(OperationMatcher::ByName("my-op".into()), all_ops.clone());
1126 assert_eq!(handle.matcher_description(), "name=my-op");
1127
1128 let handle = OperationHandle::new(OperationMatcher::ByIndex(3), all_ops.clone());
1129 assert_eq!(handle.matcher_description(), "index=3");
1130
1131 let handle = OperationHandle::new(OperationMatcher::ById("abc-123".into()), all_ops);
1132 assert_eq!(handle.matcher_description(), "id=abc-123");
1133 }
1134
1135 #[derive(Clone)]
1141 struct MockCallbackSender {
1142 success_calls: Arc<RwLock<Vec<(String, String)>>>,
1143 failure_calls: Arc<RwLock<Vec<(String, String)>>>,
1144 heartbeat_calls: Arc<RwLock<Vec<String>>>,
1145 }
1146
1147 impl MockCallbackSender {
1148 fn new() -> Self {
1149 Self {
1150 success_calls: Arc::new(RwLock::new(Vec::new())),
1151 failure_calls: Arc::new(RwLock::new(Vec::new())),
1152 heartbeat_calls: Arc::new(RwLock::new(Vec::new())),
1153 }
1154 }
1155 }
1156
1157 #[async_trait::async_trait]
1158 impl CallbackSender for MockCallbackSender {
1159 async fn send_success(&self, callback_id: &str, result: &str) -> Result<(), TestError> {
1160 self.success_calls
1161 .write()
1162 .await
1163 .push((callback_id.to_string(), result.to_string()));
1164 Ok(())
1165 }
1166
1167 async fn send_failure(
1168 &self,
1169 callback_id: &str,
1170 error: &crate::types::TestResultError,
1171 ) -> Result<(), TestError> {
1172 self.failure_calls
1173 .write()
1174 .await
1175 .push((callback_id.to_string(), error.to_string()));
1176 Ok(())
1177 }
1178
1179 async fn send_heartbeat(&self, callback_id: &str) -> Result<(), TestError> {
1180 self.heartbeat_calls
1181 .write()
1182 .await
1183 .push(callback_id.to_string());
1184 Ok(())
1185 }
1186 }
1187
1188 async fn create_callback_handle_with_sender() -> (OperationHandle, MockCallbackSender) {
1190 let all_ops = Arc::new(RwLock::new(Vec::new()));
1191 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1192 let sender = MockCallbackSender::new();
1193 {
1194 let mut guard = handle.callback_sender.write().await;
1195 *guard = Some(Arc::new(sender.clone()));
1196 }
1197 populate_handle(&handle, create_callback_operation()).await;
1198 (handle, sender)
1199 }
1200
1201 #[tokio::test]
1202 async fn test_send_callback_success_unpopulated_returns_error() {
1203 let all_ops = Arc::new(RwLock::new(Vec::new()));
1204 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1205 let result = handle.send_callback_success("ok").await;
1206 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
1207 }
1208
1209 #[tokio::test]
1210 async fn test_send_callback_success_non_callback_returns_error() {
1211 let all_ops = Arc::new(RwLock::new(Vec::new()));
1212 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1213 populate_handle(&handle, create_step_operation()).await;
1214 let result = handle.send_callback_success("ok").await;
1215 assert!(matches!(result, Err(TestError::NotCallbackOperation)));
1216 }
1217
1218 #[tokio::test]
1219 async fn test_send_callback_success_no_callback_id_returns_error() {
1220 let all_ops = Arc::new(RwLock::new(Vec::new()));
1221 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1222 let mut op = Operation::new("cb-001".to_string(), OperationType::Callback);
1224 op.callback_details = Some(SdkCallbackDetails {
1225 callback_id: None,
1226 result: None,
1227 error: None,
1228 });
1229 populate_handle(&handle, op).await;
1230 let result = handle.send_callback_success("ok").await;
1231 assert!(matches!(result, Err(TestError::ResultNotAvailable(_))));
1232 }
1233
1234 #[tokio::test]
1235 async fn test_send_callback_success_no_sender_returns_error() {
1236 let all_ops = Arc::new(RwLock::new(Vec::new()));
1237 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1238 populate_handle(&handle, create_callback_operation()).await;
1239 let result = handle.send_callback_success("ok").await;
1241 assert!(matches!(result, Err(TestError::ResultNotAvailable(_))));
1242 }
1243
1244 #[tokio::test]
1245 async fn test_send_callback_success_delegates_to_sender() {
1246 let (handle, sender) = create_callback_handle_with_sender().await;
1247 let result = handle.send_callback_success("my-result").await;
1248 assert!(result.is_ok());
1249 let calls = sender.success_calls.read().await;
1250 assert_eq!(calls.len(), 1);
1251 assert_eq!(calls[0], ("cb-id-123".to_string(), "my-result".to_string()));
1252 }
1253
1254 #[tokio::test]
1255 async fn test_send_callback_failure_unpopulated_returns_error() {
1256 let all_ops = Arc::new(RwLock::new(Vec::new()));
1257 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1258 let error = crate::types::TestResultError::new("TestError", "something failed");
1259 let result = handle.send_callback_failure(&error).await;
1260 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
1261 }
1262
1263 #[tokio::test]
1264 async fn test_send_callback_failure_non_callback_returns_error() {
1265 let all_ops = Arc::new(RwLock::new(Vec::new()));
1266 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1267 populate_handle(&handle, create_step_operation()).await;
1268 let error = crate::types::TestResultError::new("TestError", "something failed");
1269 let result = handle.send_callback_failure(&error).await;
1270 assert!(matches!(result, Err(TestError::NotCallbackOperation)));
1271 }
1272
1273 #[tokio::test]
1274 async fn test_send_callback_failure_delegates_to_sender() {
1275 let (handle, sender) = create_callback_handle_with_sender().await;
1276 let error = crate::types::TestResultError::new("TestError", "something failed");
1277 let result = handle.send_callback_failure(&error).await;
1278 assert!(result.is_ok());
1279 let calls = sender.failure_calls.read().await;
1280 assert_eq!(calls.len(), 1);
1281 assert_eq!(calls[0].0, "cb-id-123");
1282 }
1283
1284 #[tokio::test]
1285 async fn test_send_callback_heartbeat_unpopulated_returns_error() {
1286 let all_ops = Arc::new(RwLock::new(Vec::new()));
1287 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1288 let result = handle.send_callback_heartbeat().await;
1289 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
1290 }
1291
1292 #[tokio::test]
1293 async fn test_send_callback_heartbeat_non_callback_returns_error() {
1294 let all_ops = Arc::new(RwLock::new(Vec::new()));
1295 let handle = OperationHandle::new(OperationMatcher::ByName("test".into()), all_ops);
1296 populate_handle(&handle, create_step_operation()).await;
1297 let result = handle.send_callback_heartbeat().await;
1298 assert!(matches!(result, Err(TestError::NotCallbackOperation)));
1299 }
1300
1301 #[tokio::test]
1302 async fn test_send_callback_heartbeat_delegates_to_sender() {
1303 let (handle, sender) = create_callback_handle_with_sender().await;
1304 let result = handle.send_callback_heartbeat().await;
1305 assert!(result.is_ok());
1306 let calls = sender.heartbeat_calls.read().await;
1307 assert_eq!(calls.len(), 1);
1308 assert_eq!(calls[0], "cb-id-123");
1309 }
1310
1311 fn create_operation_with_parent(id: &str, name: &str, parent_id: Option<&str>) -> Operation {
1317 let mut op = Operation::new(id.to_string(), OperationType::Step);
1318 op.name = Some(name.to_string());
1319 op.status = OperationStatus::Succeeded;
1320 op.parent_id = parent_id.map(|s| s.to_string());
1321 op
1322 }
1323
1324 #[tokio::test]
1325 async fn test_get_child_operations_unpopulated_returns_error() {
1326 let all_ops = Arc::new(RwLock::new(Vec::new()));
1327 let handle = OperationHandle::new(OperationMatcher::ByName("parent".into()), all_ops);
1328
1329 let result = handle.get_child_operations().await;
1330 assert!(matches!(result, Err(TestError::OperationNotFound(_))));
1331 }
1332
1333 #[tokio::test]
1334 async fn test_get_child_operations_returns_matching_children() {
1335 let parent_op = create_operation_with_parent("parent-1", "parent", None);
1336 let child1 = create_operation_with_parent("child-1", "child-a", Some("parent-1"));
1337 let child2 = create_operation_with_parent("child-2", "child-b", Some("parent-1"));
1338 let unrelated = create_operation_with_parent("other-1", "other", Some("other-parent"));
1339
1340 let all_ops = Arc::new(RwLock::new(vec![
1341 parent_op.clone(),
1342 child1,
1343 child2,
1344 unrelated,
1345 ]));
1346
1347 let handle = OperationHandle::new(OperationMatcher::ByName("parent".into()), all_ops);
1348 populate_handle(&handle, parent_op).await;
1349
1350 let children = handle.get_child_operations().await.unwrap();
1351 assert_eq!(children.len(), 2);
1352 assert_eq!(children[0].get_id(), "child-1");
1353 assert_eq!(children[1].get_id(), "child-2");
1354 }
1355
1356 #[tokio::test]
1357 async fn test_get_child_operations_empty_when_no_children() {
1358 let parent_op = create_operation_with_parent("parent-1", "parent", None);
1359 let unrelated = create_operation_with_parent("other-1", "other", Some("other-parent"));
1360
1361 let all_ops = Arc::new(RwLock::new(vec![parent_op.clone(), unrelated]));
1362
1363 let handle = OperationHandle::new(OperationMatcher::ByName("parent".into()), all_ops);
1364 populate_handle(&handle, parent_op).await;
1365
1366 let children = handle.get_child_operations().await.unwrap();
1367 assert!(children.is_empty());
1368 }
1369
1370 #[tokio::test]
1371 async fn test_get_child_operations_preserves_execution_order() {
1372 let parent_op = create_operation_with_parent("parent-1", "parent", None);
1373 let child_c = create_operation_with_parent("child-c", "third", Some("parent-1"));
1374 let child_a = create_operation_with_parent("child-a", "first", Some("parent-1"));
1375 let child_b = create_operation_with_parent("child-b", "second", Some("parent-1"));
1376
1377 let all_ops = Arc::new(RwLock::new(vec![
1379 parent_op.clone(),
1380 child_c,
1381 child_a,
1382 child_b,
1383 ]));
1384
1385 let handle = OperationHandle::new(OperationMatcher::ByName("parent".into()), all_ops);
1386 populate_handle(&handle, parent_op).await;
1387
1388 let children = handle.get_child_operations().await.unwrap();
1389 assert_eq!(children.len(), 3);
1390 assert_eq!(children[0].get_id(), "child-c");
1391 assert_eq!(children[1].get_id(), "child-a");
1392 assert_eq!(children[2].get_id(), "child-b");
1393 }
1394
1395 #[tokio::test]
1396 async fn test_get_child_operations_children_support_recursive_enumeration() {
1397 let parent_op = create_operation_with_parent("parent-1", "parent", None);
1398 let child = create_operation_with_parent("child-1", "child", Some("parent-1"));
1399 let grandchild =
1400 create_operation_with_parent("grandchild-1", "grandchild", Some("child-1"));
1401
1402 let all_ops = Arc::new(RwLock::new(vec![parent_op.clone(), child, grandchild]));
1403
1404 let handle = OperationHandle::new(OperationMatcher::ByName("parent".into()), all_ops);
1405 populate_handle(&handle, parent_op).await;
1406
1407 let children = handle.get_child_operations().await.unwrap();
1408 assert_eq!(children.len(), 1);
1409 assert_eq!(children[0].get_id(), "child-1");
1410
1411 let grandchildren = children[0].get_child_operations();
1413 assert_eq!(grandchildren.len(), 1);
1414 assert_eq!(grandchildren[0].get_id(), "grandchild-1");
1415 }
1416}