1use std::sync::Arc;
7
8use chrono::{DateTime, TimeZone, Utc};
9use serde::{de::DeserializeOwned, Deserialize, Serialize};
10use tokio::sync::watch;
11
12use crate::error::TestError;
13use crate::types::{TestResultError, WaitingOperationStatus};
14use durable_execution_sdk::{Operation, OperationStatus, OperationType};
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct StepDetails<T = serde_json::Value> {
26 pub attempt: Option<u32>,
28 pub next_attempt_timestamp: Option<DateTime<Utc>>,
30 pub result: Option<T>,
32 pub error: Option<TestResultError>,
34}
35
36impl<T> StepDetails<T> {
37 pub fn new() -> Self {
39 Self {
40 attempt: None,
41 next_attempt_timestamp: None,
42 result: None,
43 error: None,
44 }
45 }
46}
47
48impl<T> Default for StepDetails<T> {
49 fn default() -> Self {
50 Self::new()
51 }
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct WaitDetails {
60 pub wait_seconds: Option<u64>,
62 pub scheduled_end_timestamp: Option<DateTime<Utc>>,
64}
65
66impl WaitDetails {
67 pub fn new() -> Self {
69 Self {
70 wait_seconds: None,
71 scheduled_end_timestamp: None,
72 }
73 }
74}
75
76impl Default for WaitDetails {
77 fn default() -> Self {
78 Self::new()
79 }
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct CallbackDetails<T = serde_json::Value> {
92 pub callback_id: Option<String>,
94 pub result: Option<T>,
96 pub error: Option<TestResultError>,
98}
99
100impl<T> CallbackDetails<T> {
101 pub fn new() -> Self {
103 Self {
104 callback_id: None,
105 result: None,
106 error: None,
107 }
108 }
109}
110
111impl<T> Default for CallbackDetails<T> {
112 fn default() -> Self {
113 Self::new()
114 }
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct InvokeDetails<T = serde_json::Value> {
127 pub result: Option<T>,
129 pub error: Option<TestResultError>,
131}
132
133impl<T> InvokeDetails<T> {
134 pub fn new() -> Self {
136 Self {
137 result: None,
138 error: None,
139 }
140 }
141}
142
143impl<T> Default for InvokeDetails<T> {
144 fn default() -> Self {
145 Self::new()
146 }
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct ContextDetails<T = serde_json::Value> {
159 pub result: Option<T>,
161 pub error: Option<TestResultError>,
163}
164
165impl<T> ContextDetails<T> {
166 pub fn new() -> Self {
168 Self {
169 result: None,
170 error: None,
171 }
172 }
173}
174
175impl<T> Default for ContextDetails<T> {
176 fn default() -> Self {
177 Self::new()
178 }
179}
180
181#[async_trait::async_trait]
185pub trait CallbackSender: Send + Sync {
186 async fn send_success(&self, callback_id: &str, result: &str) -> Result<(), TestError>;
188
189 async fn send_failure(
191 &self,
192 callback_id: &str,
193 error: &TestResultError,
194 ) -> Result<(), TestError>;
195
196 async fn send_heartbeat(&self, callback_id: &str) -> Result<(), TestError>;
198}
199
200pub struct DurableOperation {
225 operation: Operation,
227 callback_sender: Option<Arc<dyn CallbackSender>>,
229 status_watcher: Option<watch::Receiver<OperationStatus>>,
231 all_operations: Option<Arc<Vec<Operation>>>,
233}
234
235impl std::fmt::Debug for DurableOperation {
236 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237 f.debug_struct("DurableOperation")
238 .field("operation", &self.operation)
239 .field("callback_sender", &self.callback_sender.is_some())
240 .field("status_watcher", &self.status_watcher.is_some())
241 .field(
242 "all_operations",
243 &self.all_operations.as_ref().map(|ops| ops.len()),
244 )
245 .finish()
246 }
247}
248
249impl Clone for DurableOperation {
250 fn clone(&self) -> Self {
251 Self {
252 operation: self.operation.clone(),
253 callback_sender: self.callback_sender.clone(),
254 status_watcher: self.status_watcher.clone(),
255 all_operations: self.all_operations.clone(),
256 }
257 }
258}
259
260impl DurableOperation {
261 pub fn new(operation: Operation) -> Self {
263 Self {
264 operation,
265 callback_sender: None,
266 status_watcher: None,
267 all_operations: None,
268 }
269 }
270
271 pub fn with_callback_sender(
273 operation: Operation,
274 callback_sender: Arc<dyn CallbackSender>,
275 ) -> Self {
276 Self {
277 operation,
278 callback_sender: Some(callback_sender),
279 status_watcher: None,
280 all_operations: None,
281 }
282 }
283
284 pub fn with_status_watcher(
286 operation: Operation,
287 status_watcher: watch::Receiver<OperationStatus>,
288 ) -> Self {
289 Self {
290 operation,
291 callback_sender: None,
292 status_watcher: Some(status_watcher),
293 all_operations: None,
294 }
295 }
296
297 pub fn with_all(
299 operation: Operation,
300 callback_sender: Option<Arc<dyn CallbackSender>>,
301 status_watcher: Option<watch::Receiver<OperationStatus>>,
302 ) -> Self {
303 Self {
304 operation,
305 callback_sender,
306 status_watcher,
307 all_operations: None,
308 }
309 }
310
311 pub fn get_id(&self) -> &str {
321 &self.operation.operation_id
322 }
323
324 pub fn get_parent_id(&self) -> Option<&str> {
330 self.operation.parent_id.as_deref()
331 }
332
333 pub fn get_name(&self) -> Option<&str> {
339 self.operation.name.as_deref()
340 }
341
342 pub fn get_type(&self) -> OperationType {
348 self.operation.operation_type
349 }
350
351 pub fn get_status(&self) -> OperationStatus {
357 self.operation.status
358 }
359
360 pub fn get_start_timestamp(&self) -> Option<DateTime<Utc>> {
366 self.operation
367 .start_timestamp
368 .and_then(|ms| Utc.timestamp_millis_opt(ms).single())
369 }
370
371 pub fn get_end_timestamp(&self) -> Option<DateTime<Utc>> {
377 self.operation
378 .end_timestamp
379 .and_then(|ms| Utc.timestamp_millis_opt(ms).single())
380 }
381
382 pub fn get_operation_data(&self) -> &Operation {
388 &self.operation
389 }
390
391 pub fn is_callback(&self) -> bool {
397 self.operation.operation_type == OperationType::Callback
398 }
399
400 pub fn is_completed(&self) -> bool {
406 self.operation.is_completed()
407 }
408
409 pub fn is_succeeded(&self) -> bool {
415 self.operation.is_succeeded()
416 }
417
418 pub fn is_failed(&self) -> bool {
424 self.operation.is_failed()
425 }
426
427 pub fn with_operations(mut self, all_operations: Arc<Vec<Operation>>) -> Self {
429 self.all_operations = Some(all_operations);
430 self
431 }
432}
433
434impl DurableOperation {
435 pub fn get_child_operations(&self) -> Vec<DurableOperation> {
450 let Some(all_ops) = &self.all_operations else {
451 return Vec::new();
452 };
453
454 let my_id = &self.operation.operation_id;
455
456 all_ops
457 .iter()
458 .filter(|op| op.parent_id.as_deref() == Some(my_id))
459 .map(|op| DurableOperation::new(op.clone()).with_operations(Arc::clone(all_ops)))
460 .collect()
461 }
462}
463
464impl DurableOperation {
465 pub fn get_step_details<T: DeserializeOwned>(&self) -> Result<StepDetails<T>, TestError> {
480 if self.operation.operation_type != OperationType::Step {
481 return Err(TestError::type_mismatch(
482 OperationType::Step,
483 self.operation.operation_type,
484 ));
485 }
486
487 let sdk_details = self.operation.step_details.as_ref();
488
489 let result = if let Some(details) = sdk_details {
490 if let Some(ref result_str) = details.result {
491 Some(serde_json::from_str(result_str)?)
492 } else {
493 None
494 }
495 } else {
496 None
497 };
498
499 let error = sdk_details
500 .and_then(|d| d.error.as_ref())
501 .map(|e| TestResultError::from(e.clone()));
502
503 let next_attempt_timestamp = sdk_details
504 .and_then(|d| d.next_attempt_timestamp)
505 .and_then(|ms| Utc.timestamp_millis_opt(ms).single());
506
507 Ok(StepDetails {
508 attempt: sdk_details.and_then(|d| d.attempt),
509 next_attempt_timestamp,
510 result,
511 error,
512 })
513 }
514
515 pub fn get_wait_details(&self) -> Result<WaitDetails, TestError> {
522 if self.operation.operation_type != OperationType::Wait {
523 return Err(TestError::type_mismatch(
524 OperationType::Wait,
525 self.operation.operation_type,
526 ));
527 }
528
529 let sdk_details = self.operation.wait_details.as_ref();
530
531 let scheduled_end_timestamp = sdk_details
532 .and_then(|d| d.scheduled_end_timestamp)
533 .and_then(|ms| Utc.timestamp_millis_opt(ms).single());
534
535 let wait_seconds = match (self.operation.start_timestamp, sdk_details) {
537 (Some(start), Some(details)) => details.scheduled_end_timestamp.map(|end| {
538 let duration_ms = end.saturating_sub(start);
539 (duration_ms / 1000) as u64
540 }),
541 _ => None,
542 };
543
544 Ok(WaitDetails {
545 wait_seconds,
546 scheduled_end_timestamp,
547 })
548 }
549
550 pub fn get_callback_details<T: DeserializeOwned>(
561 &self,
562 ) -> Result<CallbackDetails<T>, TestError> {
563 if self.operation.operation_type != OperationType::Callback {
564 return Err(TestError::type_mismatch(
565 OperationType::Callback,
566 self.operation.operation_type,
567 ));
568 }
569
570 let sdk_details = self.operation.callback_details.as_ref();
571
572 let result = if let Some(details) = sdk_details {
573 if let Some(ref result_str) = details.result {
574 Some(serde_json::from_str(result_str)?)
575 } else {
576 None
577 }
578 } else {
579 None
580 };
581
582 let error = sdk_details
583 .and_then(|d| d.error.as_ref())
584 .map(|e| TestResultError::from(e.clone()));
585
586 Ok(CallbackDetails {
587 callback_id: sdk_details.and_then(|d| d.callback_id.clone()),
588 result,
589 error,
590 })
591 }
592
593 pub fn get_invoke_details<T: DeserializeOwned>(&self) -> Result<InvokeDetails<T>, TestError> {
604 if self.operation.operation_type != OperationType::Invoke {
605 return Err(TestError::type_mismatch(
606 OperationType::Invoke,
607 self.operation.operation_type,
608 ));
609 }
610
611 let sdk_details = self.operation.chained_invoke_details.as_ref();
612
613 let result = if let Some(details) = sdk_details {
614 if let Some(ref result_str) = details.result {
615 Some(serde_json::from_str(result_str)?)
616 } else {
617 None
618 }
619 } else {
620 None
621 };
622
623 let error = sdk_details
624 .and_then(|d| d.error.as_ref())
625 .map(|e| TestResultError::from(e.clone()));
626
627 Ok(InvokeDetails { result, error })
628 }
629
630 pub fn get_context_details<T: DeserializeOwned>(&self) -> Result<ContextDetails<T>, TestError> {
641 if self.operation.operation_type != OperationType::Context {
642 return Err(TestError::type_mismatch(
643 OperationType::Context,
644 self.operation.operation_type,
645 ));
646 }
647
648 let sdk_details = self.operation.context_details.as_ref();
649
650 let result = if let Some(details) = sdk_details {
651 if let Some(ref result_str) = details.result {
652 Some(serde_json::from_str(result_str)?)
653 } else {
654 None
655 }
656 } else {
657 None
658 };
659
660 let error = sdk_details
661 .and_then(|d| d.error.as_ref())
662 .map(|e| TestResultError::from(e.clone()));
663
664 Ok(ContextDetails { result, error })
665 }
666}
667
668impl DurableOperation {
669 pub async fn send_callback_success(&self, result: &str) -> Result<(), TestError> {
684 if !self.is_callback() {
685 return Err(TestError::NotCallbackOperation);
686 }
687
688 let callback_id = self.get_callback_id()?;
689
690 if let Some(ref sender) = self.callback_sender {
691 sender.send_success(&callback_id, result).await
692 } else {
693 Ok(())
695 }
696 }
697
698 pub async fn send_callback_failure(&self, error: &TestResultError) -> Result<(), TestError> {
709 if !self.is_callback() {
710 return Err(TestError::NotCallbackOperation);
711 }
712
713 let callback_id = self.get_callback_id()?;
714
715 if let Some(ref sender) = self.callback_sender {
716 sender.send_failure(&callback_id, error).await
717 } else {
718 Ok(())
720 }
721 }
722
723 pub async fn send_callback_heartbeat(&self) -> Result<(), TestError> {
730 if !self.is_callback() {
731 return Err(TestError::NotCallbackOperation);
732 }
733
734 let callback_id = self.get_callback_id()?;
735
736 if let Some(ref sender) = self.callback_sender {
737 sender.send_heartbeat(&callback_id).await
738 } else {
739 Ok(())
741 }
742 }
743
744 fn get_callback_id(&self) -> Result<String, TestError> {
746 self.operation
747 .callback_details
748 .as_ref()
749 .and_then(|d| d.callback_id.clone())
750 .ok_or_else(|| {
751 TestError::result_not_available("Callback ID not available for this operation")
752 })
753 }
754}
755
756impl DurableOperation {
757 pub async fn wait_for_data(
772 &self,
773 target_status: WaitingOperationStatus,
774 ) -> Result<&Self, TestError> {
775 if self.has_reached_status(target_status) {
777 return Ok(self);
778 }
779
780 if let Some(ref watcher) = self.status_watcher {
782 let mut watcher = watcher.clone();
783
784 loop {
785 let current_status = *watcher.borrow();
787
788 if self.status_matches_target(current_status, target_status) {
789 return Ok(self);
790 }
791
792 if current_status.is_terminal()
794 && !self.status_matches_target(current_status, target_status)
795 {
796 return Err(TestError::execution_completed_early(
797 self.get_id(),
798 target_status,
799 ));
800 }
801
802 if watcher.changed().await.is_err() {
804 let final_status = *watcher.borrow();
806 if self.status_matches_target(final_status, target_status) {
807 return Ok(self);
808 }
809 return Err(TestError::execution_completed_early(
810 self.get_id(),
811 target_status,
812 ));
813 }
814 }
815 } else {
816 if self.has_reached_status(target_status) {
818 Ok(self)
819 } else {
820 Err(TestError::execution_completed_early(
821 self.get_id(),
822 target_status,
823 ))
824 }
825 }
826 }
827
828 fn has_reached_status(&self, target: WaitingOperationStatus) -> bool {
830 self.status_matches_target(self.operation.status, target)
831 }
832
833 fn status_matches_target(
835 &self,
836 current: OperationStatus,
837 target: WaitingOperationStatus,
838 ) -> bool {
839 match target {
840 WaitingOperationStatus::Started => {
841 true }
845 WaitingOperationStatus::Submitted => {
846 if self.is_callback() {
849 self.operation
850 .callback_details
851 .as_ref()
852 .map(|d| d.callback_id.is_some())
853 .unwrap_or(false)
854 } else {
855 true
857 }
858 }
859 WaitingOperationStatus::Completed => {
860 current.is_terminal()
862 }
863 }
864 }
865}
866
867#[cfg(test)]
868mod tests {
869 use super::*;
870 use durable_execution_sdk::{
871 CallbackDetails as SdkCallbackDetails, ChainedInvokeDetails as SdkChainedInvokeDetails,
872 ContextDetails as SdkContextDetails, Operation, OperationStatus, OperationType,
873 StepDetails as SdkStepDetails, WaitDetails as SdkWaitDetails,
874 };
875
876 fn create_step_operation(name: &str, result: Option<&str>) -> Operation {
877 let mut op = Operation::new(format!("{}-001", name), OperationType::Step);
878 op.name = Some(name.to_string());
879 op.status = OperationStatus::Succeeded;
880 op.step_details = Some(SdkStepDetails {
881 result: result.map(|s| s.to_string()),
882 attempt: Some(1),
883 next_attempt_timestamp: None,
884 error: None,
885 payload: None,
886 });
887 op
888 }
889
890 fn create_wait_operation(name: &str) -> Operation {
891 let mut op = Operation::new(format!("{}-001", name), OperationType::Wait);
892 op.name = Some(name.to_string());
893 op.status = OperationStatus::Succeeded;
894 op.start_timestamp = Some(1000);
895 op.wait_details = Some(SdkWaitDetails {
896 scheduled_end_timestamp: Some(6000), });
898 op
899 }
900
901 fn create_callback_operation(name: &str, callback_id: &str) -> Operation {
902 let mut op = Operation::new(format!("{}-001", name), OperationType::Callback);
903 op.name = Some(name.to_string());
904 op.status = OperationStatus::Started;
905 op.callback_details = Some(SdkCallbackDetails {
906 callback_id: Some(callback_id.to_string()),
907 result: None,
908 error: None,
909 });
910 op
911 }
912
913 fn create_invoke_operation(name: &str, result: Option<&str>) -> Operation {
914 let mut op = Operation::new(format!("{}-001", name), OperationType::Invoke);
915 op.name = Some(name.to_string());
916 op.status = OperationStatus::Succeeded;
917 op.chained_invoke_details = Some(SdkChainedInvokeDetails {
918 result: result.map(|s| s.to_string()),
919 error: None,
920 });
921 op
922 }
923
924 fn create_context_operation(name: &str, result: Option<&str>) -> Operation {
925 let mut op = Operation::new(format!("{}-001", name), OperationType::Context);
926 op.name = Some(name.to_string());
927 op.status = OperationStatus::Succeeded;
928 op.context_details = Some(SdkContextDetails {
929 result: result.map(|s| s.to_string()),
930 replay_children: None,
931 error: None,
932 });
933 op
934 }
935
936 #[test]
941 fn test_get_id() {
942 let op = create_step_operation("my-step", None);
943 let durable_op = DurableOperation::new(op);
944 assert_eq!(durable_op.get_id(), "my-step-001");
945 }
946
947 #[test]
948 fn test_get_name() {
949 let op = create_step_operation("my-step", None);
950 let durable_op = DurableOperation::new(op);
951 assert_eq!(durable_op.get_name(), Some("my-step"));
952 }
953
954 #[test]
955 fn test_get_type() {
956 let step_op = DurableOperation::new(create_step_operation("step", None));
957 assert_eq!(step_op.get_type(), OperationType::Step);
958
959 let wait_op = DurableOperation::new(create_wait_operation("wait"));
960 assert_eq!(wait_op.get_type(), OperationType::Wait);
961
962 let callback_op = DurableOperation::new(create_callback_operation("callback", "cb-123"));
963 assert_eq!(callback_op.get_type(), OperationType::Callback);
964 }
965
966 #[test]
967 fn test_get_status() {
968 let op = create_step_operation("step", None);
969 let durable_op = DurableOperation::new(op);
970 assert_eq!(durable_op.get_status(), OperationStatus::Succeeded);
971 }
972
973 #[test]
974 fn test_is_callback() {
975 let step_op = DurableOperation::new(create_step_operation("step", None));
976 assert!(!step_op.is_callback());
977
978 let callback_op = DurableOperation::new(create_callback_operation("callback", "cb-123"));
979 assert!(callback_op.is_callback());
980 }
981
982 #[test]
983 fn test_is_completed() {
984 let mut op = create_step_operation("step", None);
985 op.status = OperationStatus::Succeeded;
986 let durable_op = DurableOperation::new(op);
987 assert!(durable_op.is_completed());
988
989 let mut op2 = create_step_operation("step2", None);
990 op2.status = OperationStatus::Started;
991 let durable_op2 = DurableOperation::new(op2);
992 assert!(!durable_op2.is_completed());
993 }
994
995 #[test]
1000 fn test_get_step_details_success() {
1001 let op = create_step_operation("step", Some(r#""hello""#));
1002 let durable_op = DurableOperation::new(op);
1003
1004 let details: StepDetails<String> = durable_op.get_step_details().unwrap();
1005 assert_eq!(details.attempt, Some(1));
1006 assert_eq!(details.result, Some("hello".to_string()));
1007 assert!(details.error.is_none());
1008 }
1009
1010 #[test]
1011 fn test_get_step_details_wrong_type() {
1012 let op = create_wait_operation("wait");
1013 let durable_op = DurableOperation::new(op);
1014
1015 let result: Result<StepDetails<String>, _> = durable_op.get_step_details();
1016 assert!(result.is_err());
1017 assert!(matches!(
1018 result.unwrap_err(),
1019 TestError::OperationTypeMismatch { .. }
1020 ));
1021 }
1022
1023 #[test]
1024 fn test_get_wait_details_success() {
1025 let op = create_wait_operation("wait");
1026 let durable_op = DurableOperation::new(op);
1027
1028 let details = durable_op.get_wait_details().unwrap();
1029 assert_eq!(details.wait_seconds, Some(5));
1030 assert!(details.scheduled_end_timestamp.is_some());
1031 }
1032
1033 #[test]
1034 fn test_get_wait_details_wrong_type() {
1035 let op = create_step_operation("step", None);
1036 let durable_op = DurableOperation::new(op);
1037
1038 let result = durable_op.get_wait_details();
1039 assert!(result.is_err());
1040 assert!(matches!(
1041 result.unwrap_err(),
1042 TestError::OperationTypeMismatch { .. }
1043 ));
1044 }
1045
1046 #[test]
1047 fn test_get_callback_details_success() {
1048 let op = create_callback_operation("callback", "cb-123");
1049 let durable_op = DurableOperation::new(op);
1050
1051 let details: CallbackDetails<String> = durable_op.get_callback_details().unwrap();
1052 assert_eq!(details.callback_id, Some("cb-123".to_string()));
1053 assert!(details.result.is_none());
1054 assert!(details.error.is_none());
1055 }
1056
1057 #[test]
1058 fn test_get_callback_details_wrong_type() {
1059 let op = create_step_operation("step", None);
1060 let durable_op = DurableOperation::new(op);
1061
1062 let result: Result<CallbackDetails<String>, _> = durable_op.get_callback_details();
1063 assert!(result.is_err());
1064 assert!(matches!(
1065 result.unwrap_err(),
1066 TestError::OperationTypeMismatch { .. }
1067 ));
1068 }
1069
1070 #[test]
1071 fn test_get_invoke_details_success() {
1072 let op = create_invoke_operation("invoke", Some(r#"{"value": 42}"#));
1073 let durable_op = DurableOperation::new(op);
1074
1075 let details: InvokeDetails<serde_json::Value> = durable_op.get_invoke_details().unwrap();
1076 assert!(details.result.is_some());
1077 assert_eq!(details.result.unwrap()["value"], 42);
1078 assert!(details.error.is_none());
1079 }
1080
1081 #[test]
1082 fn test_get_invoke_details_wrong_type() {
1083 let op = create_step_operation("step", None);
1084 let durable_op = DurableOperation::new(op);
1085
1086 let result: Result<InvokeDetails<String>, _> = durable_op.get_invoke_details();
1087 assert!(result.is_err());
1088 assert!(matches!(
1089 result.unwrap_err(),
1090 TestError::OperationTypeMismatch { .. }
1091 ));
1092 }
1093
1094 #[test]
1095 fn test_get_context_details_success() {
1096 let op = create_context_operation("context", Some(r#""done""#));
1097 let durable_op = DurableOperation::new(op);
1098
1099 let details: ContextDetails<String> = durable_op.get_context_details().unwrap();
1100 assert_eq!(details.result, Some("done".to_string()));
1101 assert!(details.error.is_none());
1102 }
1103
1104 #[test]
1105 fn test_get_context_details_wrong_type() {
1106 let op = create_step_operation("step", None);
1107 let durable_op = DurableOperation::new(op);
1108
1109 let result: Result<ContextDetails<String>, _> = durable_op.get_context_details();
1110 assert!(result.is_err());
1111 assert!(matches!(
1112 result.unwrap_err(),
1113 TestError::OperationTypeMismatch { .. }
1114 ));
1115 }
1116
1117 #[tokio::test]
1122 async fn test_send_callback_success_on_callback_operation() {
1123 let op = create_callback_operation("callback", "cb-123");
1124 let durable_op = DurableOperation::new(op);
1125
1126 let result = durable_op.send_callback_success(r#""result""#).await;
1128 assert!(result.is_ok());
1129 }
1130
1131 #[tokio::test]
1132 async fn test_send_callback_success_on_non_callback_operation() {
1133 let op = create_step_operation("step", None);
1134 let durable_op = DurableOperation::new(op);
1135
1136 let result = durable_op.send_callback_success(r#""result""#).await;
1137 assert!(result.is_err());
1138 assert!(matches!(
1139 result.unwrap_err(),
1140 TestError::NotCallbackOperation
1141 ));
1142 }
1143
1144 #[tokio::test]
1145 async fn test_send_callback_failure_on_callback_operation() {
1146 let op = create_callback_operation("callback", "cb-123");
1147 let durable_op = DurableOperation::new(op);
1148
1149 let error = TestResultError::new("TestError", "Something went wrong");
1150 let result = durable_op.send_callback_failure(&error).await;
1151 assert!(result.is_ok());
1152 }
1153
1154 #[tokio::test]
1155 async fn test_send_callback_failure_on_non_callback_operation() {
1156 let op = create_step_operation("step", None);
1157 let durable_op = DurableOperation::new(op);
1158
1159 let error = TestResultError::new("TestError", "Something went wrong");
1160 let result = durable_op.send_callback_failure(&error).await;
1161 assert!(result.is_err());
1162 assert!(matches!(
1163 result.unwrap_err(),
1164 TestError::NotCallbackOperation
1165 ));
1166 }
1167
1168 #[tokio::test]
1169 async fn test_send_callback_heartbeat_on_callback_operation() {
1170 let op = create_callback_operation("callback", "cb-123");
1171 let durable_op = DurableOperation::new(op);
1172
1173 let result = durable_op.send_callback_heartbeat().await;
1174 assert!(result.is_ok());
1175 }
1176
1177 #[tokio::test]
1178 async fn test_send_callback_heartbeat_on_non_callback_operation() {
1179 let op = create_step_operation("step", None);
1180 let durable_op = DurableOperation::new(op);
1181
1182 let result = durable_op.send_callback_heartbeat().await;
1183 assert!(result.is_err());
1184 assert!(matches!(
1185 result.unwrap_err(),
1186 TestError::NotCallbackOperation
1187 ));
1188 }
1189
1190 #[tokio::test]
1195 async fn test_wait_for_data_started_already_started() {
1196 let op = create_step_operation("step", None);
1197 let durable_op = DurableOperation::new(op);
1198
1199 let result = durable_op
1200 .wait_for_data(WaitingOperationStatus::Started)
1201 .await;
1202 assert!(result.is_ok());
1203 }
1204
1205 #[tokio::test]
1206 async fn test_wait_for_data_completed_already_completed() {
1207 let mut op = create_step_operation("step", None);
1208 op.status = OperationStatus::Succeeded;
1209 let durable_op = DurableOperation::new(op);
1210
1211 let result = durable_op
1212 .wait_for_data(WaitingOperationStatus::Completed)
1213 .await;
1214 assert!(result.is_ok());
1215 }
1216
1217 #[tokio::test]
1218 async fn test_wait_for_data_submitted_callback_with_id() {
1219 let op = create_callback_operation("callback", "cb-123");
1220 let durable_op = DurableOperation::new(op);
1221
1222 let result = durable_op
1223 .wait_for_data(WaitingOperationStatus::Submitted)
1224 .await;
1225 assert!(result.is_ok());
1226 }
1227
1228 #[tokio::test]
1229 async fn test_wait_for_data_completed_not_yet_completed() {
1230 let mut op = create_step_operation("step", None);
1231 op.status = OperationStatus::Started;
1232 let durable_op = DurableOperation::new(op);
1233
1234 let result = durable_op
1236 .wait_for_data(WaitingOperationStatus::Completed)
1237 .await;
1238 assert!(result.is_err());
1239 }
1240
1241 fn create_operation_with_parent(id: &str, name: &str, parent_id: Option<&str>) -> Operation {
1247 let mut op = Operation::new(id.to_string(), OperationType::Step);
1248 op.name = Some(name.to_string());
1249 op.status = OperationStatus::Succeeded;
1250 op.parent_id = parent_id.map(|s| s.to_string());
1251 op
1252 }
1253
1254 #[test]
1255 fn test_get_child_operations_matching_parent_id() {
1256 let parent = create_operation_with_parent("parent-1", "parent", None);
1257 let child1 = create_operation_with_parent("child-1", "child_a", Some("parent-1"));
1258 let child2 = create_operation_with_parent("child-2", "child_b", Some("parent-1"));
1259 let unrelated = create_operation_with_parent("other-1", "other", Some("parent-2"));
1260
1261 let all_ops = Arc::new(vec![
1262 parent.clone(),
1263 child1.clone(),
1264 child2.clone(),
1265 unrelated.clone(),
1266 ]);
1267
1268 let durable_parent = DurableOperation::new(parent).with_operations(all_ops);
1269 let children = durable_parent.get_child_operations();
1270
1271 assert_eq!(children.len(), 2);
1272 assert_eq!(children[0].get_id(), "child-1");
1273 assert_eq!(children[1].get_id(), "child-2");
1274 }
1275
1276 #[test]
1277 fn test_get_child_operations_empty_when_no_children() {
1278 let parent = create_operation_with_parent("parent-1", "parent", None);
1279 let unrelated = create_operation_with_parent("other-1", "other", Some("parent-2"));
1280
1281 let all_ops = Arc::new(vec![parent.clone(), unrelated.clone()]);
1282
1283 let durable_parent = DurableOperation::new(parent).with_operations(all_ops);
1284 let children = durable_parent.get_child_operations();
1285
1286 assert!(children.is_empty());
1287 }
1288
1289 #[test]
1290 fn test_get_child_operations_preserves_order() {
1291 let parent = create_operation_with_parent("parent-1", "parent", None);
1292 let child_c = create_operation_with_parent("child-c", "third", Some("parent-1"));
1293 let child_a = create_operation_with_parent("child-a", "first", Some("parent-1"));
1294 let child_b = create_operation_with_parent("child-b", "second", Some("parent-1"));
1295
1296 let all_ops = Arc::new(vec![
1298 parent.clone(),
1299 child_c.clone(),
1300 child_a.clone(),
1301 child_b.clone(),
1302 ]);
1303
1304 let durable_parent = DurableOperation::new(parent).with_operations(all_ops);
1305 let children = durable_parent.get_child_operations();
1306
1307 assert_eq!(children.len(), 3);
1308 assert_eq!(children[0].get_id(), "child-c");
1310 assert_eq!(children[1].get_id(), "child-a");
1311 assert_eq!(children[2].get_id(), "child-b");
1312 }
1313
1314 #[test]
1315 fn test_get_child_operations_without_all_operations() {
1316 let parent = create_operation_with_parent("parent-1", "parent", None);
1318 let durable_parent = DurableOperation::new(parent);
1319 let children = durable_parent.get_child_operations();
1320
1321 assert!(children.is_empty());
1322 }
1323
1324 #[test]
1325 fn test_get_child_operations_children_can_enumerate_grandchildren() {
1326 let parent = create_operation_with_parent("parent-1", "parent", None);
1327 let child = create_operation_with_parent("child-1", "child", Some("parent-1"));
1328 let grandchild =
1329 create_operation_with_parent("grandchild-1", "grandchild", Some("child-1"));
1330
1331 let all_ops = Arc::new(vec![parent.clone(), child.clone(), grandchild.clone()]);
1332
1333 let durable_parent = DurableOperation::new(parent).with_operations(all_ops);
1334 let children = durable_parent.get_child_operations();
1335
1336 assert_eq!(children.len(), 1);
1337 assert_eq!(children[0].get_id(), "child-1");
1338
1339 let grandchildren = children[0].get_child_operations();
1341 assert_eq!(grandchildren.len(), 1);
1342 assert_eq!(grandchildren[0].get_id(), "grandchild-1");
1343 }
1344
1345 #[test]
1350 fn test_step_details_default() {
1351 let details: StepDetails<String> = StepDetails::default();
1352 assert!(details.attempt.is_none());
1353 assert!(details.next_attempt_timestamp.is_none());
1354 assert!(details.result.is_none());
1355 assert!(details.error.is_none());
1356 }
1357
1358 #[test]
1359 fn test_wait_details_default() {
1360 let details = WaitDetails::default();
1361 assert!(details.wait_seconds.is_none());
1362 assert!(details.scheduled_end_timestamp.is_none());
1363 }
1364
1365 #[test]
1366 fn test_callback_details_default() {
1367 let details: CallbackDetails<String> = CallbackDetails::default();
1368 assert!(details.callback_id.is_none());
1369 assert!(details.result.is_none());
1370 assert!(details.error.is_none());
1371 }
1372
1373 #[test]
1374 fn test_invoke_details_default() {
1375 let details: InvokeDetails<String> = InvokeDetails::default();
1376 assert!(details.result.is_none());
1377 assert!(details.error.is_none());
1378 }
1379
1380 #[test]
1381 fn test_context_details_default() {
1382 let details: ContextDetails<String> = ContextDetails::default();
1383 assert!(details.result.is_none());
1384 assert!(details.error.is_none());
1385 }
1386}
1387
1388#[cfg(test)]
1392mod property_tests {
1393 use super::*;
1394 use durable_execution_sdk::{
1395 CallbackDetails as SdkCallbackDetails, ChainedInvokeDetails as SdkChainedInvokeDetails,
1396 ContextDetails as SdkContextDetails, Operation, OperationStatus, OperationType,
1397 StepDetails as SdkStepDetails, WaitDetails as SdkWaitDetails,
1398 };
1399 use proptest::prelude::*;
1400
1401 fn operation_type_strategy() -> impl Strategy<Value = OperationType> {
1403 prop_oneof![
1404 Just(OperationType::Step),
1405 Just(OperationType::Wait),
1406 Just(OperationType::Callback),
1407 Just(OperationType::Invoke),
1408 Just(OperationType::Context),
1409 ]
1410 }
1411
1412 fn operation_status_strategy() -> impl Strategy<Value = OperationStatus> {
1414 prop_oneof![
1415 Just(OperationStatus::Started),
1416 Just(OperationStatus::Pending),
1417 Just(OperationStatus::Ready),
1418 Just(OperationStatus::Succeeded),
1419 Just(OperationStatus::Failed),
1420 Just(OperationStatus::Cancelled),
1421 Just(OperationStatus::TimedOut),
1422 Just(OperationStatus::Stopped),
1423 ]
1424 }
1425
1426 fn operation_id_strategy() -> impl Strategy<Value = String> {
1428 "[a-zA-Z0-9_-]{1,32}".prop_map(|s| s)
1429 }
1430
1431 fn optional_result_strategy() -> impl Strategy<Value = Option<String>> {
1433 prop_oneof![
1434 Just(None),
1435 Just(Some(r#""hello""#.to_string())),
1436 Just(Some(r#"42"#.to_string())),
1437 Just(Some(r#"{"key": "value"}"#.to_string())),
1438 Just(Some(r#"true"#.to_string())),
1439 ]
1440 }
1441
1442 fn optional_callback_id_strategy() -> impl Strategy<Value = Option<String>> {
1444 prop_oneof![Just(None), "[a-zA-Z0-9_-]{8,16}".prop_map(|s| Some(s)),]
1445 }
1446
1447 fn optional_timestamp_strategy() -> impl Strategy<Value = Option<i64>> {
1449 prop_oneof![
1450 Just(None),
1451 (1577836800000i64..1893456000000i64).prop_map(Some),
1452 ]
1453 }
1454
1455 fn create_operation_with_type(
1457 op_type: OperationType,
1458 op_id: String,
1459 status: OperationStatus,
1460 result: Option<String>,
1461 callback_id: Option<String>,
1462 start_ts: Option<i64>,
1463 end_ts: Option<i64>,
1464 ) -> Operation {
1465 let mut op = Operation::new(op_id, op_type);
1466 op.status = status;
1467 op.start_timestamp = start_ts;
1468 op.end_timestamp = end_ts;
1469
1470 match op_type {
1471 OperationType::Step => {
1472 op.step_details = Some(SdkStepDetails {
1473 result,
1474 attempt: Some(1),
1475 next_attempt_timestamp: None,
1476 error: None,
1477 payload: None,
1478 });
1479 }
1480 OperationType::Wait => {
1481 op.wait_details = Some(SdkWaitDetails {
1482 scheduled_end_timestamp: end_ts,
1483 });
1484 }
1485 OperationType::Callback => {
1486 op.callback_details = Some(SdkCallbackDetails {
1487 callback_id,
1488 result,
1489 error: None,
1490 });
1491 }
1492 OperationType::Invoke => {
1493 op.chained_invoke_details = Some(SdkChainedInvokeDetails {
1494 result,
1495 error: None,
1496 });
1497 }
1498 OperationType::Context => {
1499 op.context_details = Some(SdkContextDetails {
1500 result,
1501 replay_children: None,
1502 error: None,
1503 });
1504 }
1505 OperationType::Execution => {
1506 }
1508 }
1509
1510 op
1511 }
1512
1513 proptest! {
1524 #![proptest_config(ProptestConfig::with_cases(100))]
1525
1526 #[test]
1527 fn prop_type_specific_details_availability(
1528 op_type in operation_type_strategy(),
1529 op_id in operation_id_strategy(),
1530 status in operation_status_strategy(),
1531 result in optional_result_strategy(),
1532 callback_id in optional_callback_id_strategy(),
1533 start_ts in optional_timestamp_strategy(),
1534 end_ts in optional_timestamp_strategy(),
1535 ) {
1536 if op_type == OperationType::Execution {
1538 return Ok(());
1539 }
1540
1541 let op = create_operation_with_type(
1542 op_type, op_id, status, result, callback_id, start_ts, end_ts
1543 );
1544 let durable_op = DurableOperation::new(op);
1545
1546 match op_type {
1548 OperationType::Step => {
1549 let step_result: Result<StepDetails<serde_json::Value>, _> =
1551 durable_op.get_step_details();
1552 prop_assert!(step_result.is_ok(), "get_step_details should succeed for Step operation");
1553
1554 prop_assert!(durable_op.get_wait_details().is_err());
1556 prop_assert!(durable_op.get_callback_details::<serde_json::Value>().is_err());
1557 prop_assert!(durable_op.get_invoke_details::<serde_json::Value>().is_err());
1558 prop_assert!(durable_op.get_context_details::<serde_json::Value>().is_err());
1559 }
1560 OperationType::Wait => {
1561 prop_assert!(durable_op.get_wait_details().is_ok(),
1563 "get_wait_details should succeed for Wait operation");
1564
1565 prop_assert!(durable_op.get_step_details::<serde_json::Value>().is_err());
1567 prop_assert!(durable_op.get_callback_details::<serde_json::Value>().is_err());
1568 prop_assert!(durable_op.get_invoke_details::<serde_json::Value>().is_err());
1569 prop_assert!(durable_op.get_context_details::<serde_json::Value>().is_err());
1570 }
1571 OperationType::Callback => {
1572 let callback_result: Result<CallbackDetails<serde_json::Value>, _> =
1574 durable_op.get_callback_details();
1575 prop_assert!(callback_result.is_ok(),
1576 "get_callback_details should succeed for Callback operation");
1577
1578 prop_assert!(durable_op.get_step_details::<serde_json::Value>().is_err());
1580 prop_assert!(durable_op.get_wait_details().is_err());
1581 prop_assert!(durable_op.get_invoke_details::<serde_json::Value>().is_err());
1582 prop_assert!(durable_op.get_context_details::<serde_json::Value>().is_err());
1583 }
1584 OperationType::Invoke => {
1585 let invoke_result: Result<InvokeDetails<serde_json::Value>, _> =
1587 durable_op.get_invoke_details();
1588 prop_assert!(invoke_result.is_ok(),
1589 "get_invoke_details should succeed for Invoke operation");
1590
1591 prop_assert!(durable_op.get_step_details::<serde_json::Value>().is_err());
1593 prop_assert!(durable_op.get_wait_details().is_err());
1594 prop_assert!(durable_op.get_callback_details::<serde_json::Value>().is_err());
1595 prop_assert!(durable_op.get_context_details::<serde_json::Value>().is_err());
1596 }
1597 OperationType::Context => {
1598 let context_result: Result<ContextDetails<serde_json::Value>, _> =
1600 durable_op.get_context_details();
1601 prop_assert!(context_result.is_ok(),
1602 "get_context_details should succeed for Context operation");
1603
1604 prop_assert!(durable_op.get_step_details::<serde_json::Value>().is_err());
1606 prop_assert!(durable_op.get_wait_details().is_err());
1607 prop_assert!(durable_op.get_callback_details::<serde_json::Value>().is_err());
1608 prop_assert!(durable_op.get_invoke_details::<serde_json::Value>().is_err());
1609 }
1610 OperationType::Execution => {
1611 }
1613 }
1614 }
1615 }
1616
1617 proptest! {
1628 #![proptest_config(ProptestConfig::with_cases(100))]
1629
1630 #[test]
1631 fn prop_callback_method_type_safety(
1632 op_type in operation_type_strategy(),
1633 op_id in operation_id_strategy(),
1634 status in operation_status_strategy(),
1635 result in optional_result_strategy(),
1636 callback_id in optional_callback_id_strategy(),
1637 ) {
1638 if op_type == OperationType::Execution {
1640 return Ok(());
1641 }
1642
1643 let op = create_operation_with_type(
1644 op_type, op_id, status, result, callback_id.clone(), None, None
1645 );
1646 let durable_op = DurableOperation::new(op);
1647
1648 let rt = tokio::runtime::Runtime::new().unwrap();
1650
1651 if op_type == OperationType::Callback {
1652 if callback_id.is_some() {
1655 let success_result = rt.block_on(durable_op.send_callback_success(r#""test""#));
1656 prop_assert!(success_result.is_ok(),
1657 "send_callback_success should succeed for Callback operation with callback_id");
1658
1659 let error = TestResultError::new("TestError", "test");
1660 let failure_result = rt.block_on(durable_op.send_callback_failure(&error));
1661 prop_assert!(failure_result.is_ok(),
1662 "send_callback_failure should succeed for Callback operation with callback_id");
1663
1664 let heartbeat_result = rt.block_on(durable_op.send_callback_heartbeat());
1665 prop_assert!(heartbeat_result.is_ok(),
1666 "send_callback_heartbeat should succeed for Callback operation with callback_id");
1667 }
1668 } else {
1669 let success_result = rt.block_on(durable_op.send_callback_success(r#""test""#));
1671 prop_assert!(success_result.is_err(),
1672 "send_callback_success should fail for non-Callback operation");
1673 prop_assert!(matches!(success_result.unwrap_err(), TestError::NotCallbackOperation));
1674
1675 let error = TestResultError::new("TestError", "test");
1676 let failure_result = rt.block_on(durable_op.send_callback_failure(&error));
1677 prop_assert!(failure_result.is_err(),
1678 "send_callback_failure should fail for non-Callback operation");
1679 prop_assert!(matches!(failure_result.unwrap_err(), TestError::NotCallbackOperation));
1680
1681 let heartbeat_result = rt.block_on(durable_op.send_callback_heartbeat());
1682 prop_assert!(heartbeat_result.is_err(),
1683 "send_callback_heartbeat should fail for non-Callback operation");
1684 prop_assert!(matches!(heartbeat_result.unwrap_err(), TestError::NotCallbackOperation));
1685 }
1686 }
1687 }
1688}