1use std::sync::Arc;
7
8use chrono::{DateTime, TimeZone, Utc};
9use serde::{de::DeserializeOwned, Deserialize, Serialize};
10use tokio::sync::watch;
11
12use crate::error::TestError;
13use crate::types::{TestResultError, WaitingOperationStatus};
14use durable_execution_sdk::{Operation, OperationStatus, OperationType};
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct StepDetails<T = serde_json::Value> {
26 pub attempt: Option<u32>,
28 pub next_attempt_timestamp: Option<DateTime<Utc>>,
30 pub result: Option<T>,
32 pub error: Option<TestResultError>,
34}
35
36impl<T> StepDetails<T> {
37 pub fn new() -> Self {
39 Self {
40 attempt: None,
41 next_attempt_timestamp: None,
42 result: None,
43 error: None,
44 }
45 }
46}
47
48impl<T> Default for StepDetails<T> {
49 fn default() -> Self {
50 Self::new()
51 }
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct WaitDetails {
60 pub wait_seconds: Option<u64>,
62 pub scheduled_end_timestamp: Option<DateTime<Utc>>,
64}
65
66impl WaitDetails {
67 pub fn new() -> Self {
69 Self {
70 wait_seconds: None,
71 scheduled_end_timestamp: None,
72 }
73 }
74}
75
76impl Default for WaitDetails {
77 fn default() -> Self {
78 Self::new()
79 }
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct CallbackDetails<T = serde_json::Value> {
92 pub callback_id: Option<String>,
94 pub result: Option<T>,
96 pub error: Option<TestResultError>,
98}
99
100impl<T> CallbackDetails<T> {
101 pub fn new() -> Self {
103 Self {
104 callback_id: None,
105 result: None,
106 error: None,
107 }
108 }
109}
110
111impl<T> Default for CallbackDetails<T> {
112 fn default() -> Self {
113 Self::new()
114 }
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct InvokeDetails<T = serde_json::Value> {
127 pub result: Option<T>,
129 pub error: Option<TestResultError>,
131}
132
133impl<T> InvokeDetails<T> {
134 pub fn new() -> Self {
136 Self {
137 result: None,
138 error: None,
139 }
140 }
141}
142
143impl<T> Default for InvokeDetails<T> {
144 fn default() -> Self {
145 Self::new()
146 }
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct ContextDetails<T = serde_json::Value> {
159 pub result: Option<T>,
161 pub error: Option<TestResultError>,
163}
164
165impl<T> ContextDetails<T> {
166 pub fn new() -> Self {
168 Self {
169 result: None,
170 error: None,
171 }
172 }
173}
174
175impl<T> Default for ContextDetails<T> {
176 fn default() -> Self {
177 Self::new()
178 }
179}
180
181#[async_trait::async_trait]
185pub trait CallbackSender: Send + Sync {
186 async fn send_success(&self, callback_id: &str, result: &str) -> Result<(), TestError>;
188
189 async fn send_failure(
191 &self,
192 callback_id: &str,
193 error: &TestResultError,
194 ) -> Result<(), TestError>;
195
196 async fn send_heartbeat(&self, callback_id: &str) -> Result<(), TestError>;
198}
199
200pub struct DurableOperation {
225 operation: Operation,
227 callback_sender: Option<Arc<dyn CallbackSender>>,
229 status_watcher: Option<watch::Receiver<OperationStatus>>,
231 all_operations: Option<Arc<Vec<Operation>>>,
233}
234
235impl std::fmt::Debug for DurableOperation {
236 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237 f.debug_struct("DurableOperation")
238 .field("operation", &self.operation)
239 .field("callback_sender", &self.callback_sender.is_some())
240 .field("status_watcher", &self.status_watcher.is_some())
241 .field(
242 "all_operations",
243 &self.all_operations.as_ref().map(|ops| ops.len()),
244 )
245 .finish()
246 }
247}
248
249impl Clone for DurableOperation {
250 fn clone(&self) -> Self {
251 Self {
252 operation: self.operation.clone(),
253 callback_sender: self.callback_sender.clone(),
254 status_watcher: self.status_watcher.clone(),
255 all_operations: self.all_operations.clone(),
256 }
257 }
258}
259
260impl DurableOperation {
261 pub fn new(operation: Operation) -> Self {
263 Self {
264 operation,
265 callback_sender: None,
266 status_watcher: None,
267 all_operations: None,
268 }
269 }
270
271 pub fn with_callback_sender(
273 operation: Operation,
274 callback_sender: Arc<dyn CallbackSender>,
275 ) -> Self {
276 Self {
277 operation,
278 callback_sender: Some(callback_sender),
279 status_watcher: None,
280 all_operations: None,
281 }
282 }
283
284 pub fn with_status_watcher(
286 operation: Operation,
287 status_watcher: watch::Receiver<OperationStatus>,
288 ) -> Self {
289 Self {
290 operation,
291 callback_sender: None,
292 status_watcher: Some(status_watcher),
293 all_operations: None,
294 }
295 }
296
297 pub fn with_all(
299 operation: Operation,
300 callback_sender: Option<Arc<dyn CallbackSender>>,
301 status_watcher: Option<watch::Receiver<OperationStatus>>,
302 ) -> Self {
303 Self {
304 operation,
305 callback_sender,
306 status_watcher,
307 all_operations: None,
308 }
309 }
310
311 pub fn get_id(&self) -> &str {
325 &self.operation.operation_id
326 }
327
328 pub fn get_parent_id(&self) -> Option<&str> {
334 self.operation.parent_id.as_deref()
335 }
336
337 pub fn get_name(&self) -> Option<&str> {
347 self.operation.name.as_deref()
348 }
349
350 pub fn get_type(&self) -> OperationType {
356 self.operation.operation_type
357 }
358
359 pub fn get_status(&self) -> OperationStatus {
365 self.operation.status
366 }
367
368 pub fn get_start_timestamp(&self) -> Option<DateTime<Utc>> {
374 self.operation
375 .start_timestamp
376 .and_then(|ms| Utc.timestamp_millis_opt(ms).single())
377 }
378
379 pub fn get_end_timestamp(&self) -> Option<DateTime<Utc>> {
385 self.operation
386 .end_timestamp
387 .and_then(|ms| Utc.timestamp_millis_opt(ms).single())
388 }
389
390 pub fn get_operation_data(&self) -> &Operation {
396 &self.operation
397 }
398
399 pub fn is_callback(&self) -> bool {
405 self.operation.operation_type == OperationType::Callback
406 }
407
408 pub fn is_completed(&self) -> bool {
414 self.operation.is_completed()
415 }
416
417 pub fn is_succeeded(&self) -> bool {
423 self.operation.is_succeeded()
424 }
425
426 pub fn is_failed(&self) -> bool {
432 self.operation.is_failed()
433 }
434
435 pub fn with_operations(mut self, all_operations: Arc<Vec<Operation>>) -> Self {
437 self.all_operations = Some(all_operations);
438 self
439 }
440}
441
442impl DurableOperation {
443 pub fn get_child_operations(&self) -> Vec<DurableOperation> {
464 let Some(all_ops) = &self.all_operations else {
465 return Vec::new();
466 };
467
468 let my_id = &self.operation.operation_id;
469
470 all_ops
471 .iter()
472 .filter(|op| op.parent_id.as_deref() == Some(my_id))
473 .map(|op| DurableOperation::new(op.clone()).with_operations(Arc::clone(all_ops)))
474 .collect()
475 }
476}
477
478impl DurableOperation {
479 pub fn get_step_details<T: DeserializeOwned>(&self) -> Result<StepDetails<T>, TestError> {
499 if self.operation.operation_type != OperationType::Step {
500 return Err(TestError::type_mismatch(
501 OperationType::Step,
502 self.operation.operation_type,
503 ));
504 }
505
506 let sdk_details = self.operation.step_details.as_ref();
507
508 let result = if let Some(details) = sdk_details {
509 if let Some(ref result_str) = details.result {
510 Some(serde_json::from_str(result_str)?)
511 } else {
512 None
513 }
514 } else {
515 None
516 };
517
518 let error = sdk_details
519 .and_then(|d| d.error.as_ref())
520 .map(|e| TestResultError::from(e.clone()));
521
522 let next_attempt_timestamp = sdk_details
523 .and_then(|d| d.next_attempt_timestamp)
524 .and_then(|ms| Utc.timestamp_millis_opt(ms).single());
525
526 Ok(StepDetails {
527 attempt: sdk_details.and_then(|d| d.attempt),
528 next_attempt_timestamp,
529 result,
530 error,
531 })
532 }
533
534 pub fn get_wait_details(&self) -> Result<WaitDetails, TestError> {
546 if self.operation.operation_type != OperationType::Wait {
547 return Err(TestError::type_mismatch(
548 OperationType::Wait,
549 self.operation.operation_type,
550 ));
551 }
552
553 let sdk_details = self.operation.wait_details.as_ref();
554
555 let scheduled_end_timestamp = sdk_details
556 .and_then(|d| d.scheduled_end_timestamp)
557 .and_then(|ms| Utc.timestamp_millis_opt(ms).single());
558
559 let wait_seconds = match (self.operation.start_timestamp, sdk_details) {
561 (Some(start), Some(details)) => details.scheduled_end_timestamp.map(|end| {
562 let duration_ms = end.saturating_sub(start);
563 (duration_ms / 1000) as u64
564 }),
565 _ => None,
566 };
567
568 Ok(WaitDetails {
569 wait_seconds,
570 scheduled_end_timestamp,
571 })
572 }
573
574 pub fn get_callback_details<T: DeserializeOwned>(
590 &self,
591 ) -> Result<CallbackDetails<T>, TestError> {
592 if self.operation.operation_type != OperationType::Callback {
593 return Err(TestError::type_mismatch(
594 OperationType::Callback,
595 self.operation.operation_type,
596 ));
597 }
598
599 let sdk_details = self.operation.callback_details.as_ref();
600
601 let result = if let Some(details) = sdk_details {
602 if let Some(ref result_str) = details.result {
603 Some(serde_json::from_str(result_str)?)
604 } else {
605 None
606 }
607 } else {
608 None
609 };
610
611 let error = sdk_details
612 .and_then(|d| d.error.as_ref())
613 .map(|e| TestResultError::from(e.clone()));
614
615 Ok(CallbackDetails {
616 callback_id: sdk_details.and_then(|d| d.callback_id.clone()),
617 result,
618 error,
619 })
620 }
621
622 pub fn get_invoke_details<T: DeserializeOwned>(&self) -> Result<InvokeDetails<T>, TestError> {
638 if self.operation.operation_type != OperationType::Invoke {
639 return Err(TestError::type_mismatch(
640 OperationType::Invoke,
641 self.operation.operation_type,
642 ));
643 }
644
645 let sdk_details = self.operation.chained_invoke_details.as_ref();
646
647 let result = if let Some(details) = sdk_details {
648 if let Some(ref result_str) = details.result {
649 Some(serde_json::from_str(result_str)?)
650 } else {
651 None
652 }
653 } else {
654 None
655 };
656
657 let error = sdk_details
658 .and_then(|d| d.error.as_ref())
659 .map(|e| TestResultError::from(e.clone()));
660
661 Ok(InvokeDetails { result, error })
662 }
663
664 pub fn get_context_details<T: DeserializeOwned>(&self) -> Result<ContextDetails<T>, TestError> {
680 if self.operation.operation_type != OperationType::Context {
681 return Err(TestError::type_mismatch(
682 OperationType::Context,
683 self.operation.operation_type,
684 ));
685 }
686
687 let sdk_details = self.operation.context_details.as_ref();
688
689 let result = if let Some(details) = sdk_details {
690 if let Some(ref result_str) = details.result {
691 Some(serde_json::from_str(result_str)?)
692 } else {
693 None
694 }
695 } else {
696 None
697 };
698
699 let error = sdk_details
700 .and_then(|d| d.error.as_ref())
701 .map(|e| TestResultError::from(e.clone()));
702
703 Ok(ContextDetails { result, error })
704 }
705}
706
707impl DurableOperation {
708 pub async fn send_callback_success(&self, result: &str) -> Result<(), TestError> {
728 if !self.is_callback() {
729 return Err(TestError::NotCallbackOperation);
730 }
731
732 let callback_id = self.get_callback_id()?;
733
734 if let Some(ref sender) = self.callback_sender {
735 sender.send_success(&callback_id, result).await
736 } else {
737 Ok(())
739 }
740 }
741
742 pub async fn send_callback_failure(&self, error: &TestResultError) -> Result<(), TestError> {
758 if !self.is_callback() {
759 return Err(TestError::NotCallbackOperation);
760 }
761
762 let callback_id = self.get_callback_id()?;
763
764 if let Some(ref sender) = self.callback_sender {
765 sender.send_failure(&callback_id, error).await
766 } else {
767 Ok(())
769 }
770 }
771
772 pub async fn send_callback_heartbeat(&self) -> Result<(), TestError> {
784 if !self.is_callback() {
785 return Err(TestError::NotCallbackOperation);
786 }
787
788 let callback_id = self.get_callback_id()?;
789
790 if let Some(ref sender) = self.callback_sender {
791 sender.send_heartbeat(&callback_id).await
792 } else {
793 Ok(())
795 }
796 }
797
798 fn get_callback_id(&self) -> Result<String, TestError> {
800 self.operation
801 .callback_details
802 .as_ref()
803 .and_then(|d| d.callback_id.clone())
804 .ok_or_else(|| {
805 TestError::result_not_available("Callback ID not available for this operation")
806 })
807 }
808}
809
810impl DurableOperation {
811 pub async fn wait_for_data(
833 &self,
834 target_status: WaitingOperationStatus,
835 ) -> Result<&Self, TestError> {
836 if self.has_reached_status(target_status) {
838 return Ok(self);
839 }
840
841 if let Some(ref watcher) = self.status_watcher {
843 let mut watcher = watcher.clone();
844
845 loop {
846 let current_status = *watcher.borrow();
848
849 if self.status_matches_target(current_status, target_status) {
850 return Ok(self);
851 }
852
853 if current_status.is_terminal()
855 && !self.status_matches_target(current_status, target_status)
856 {
857 return Err(TestError::execution_completed_early(
858 self.get_id(),
859 target_status,
860 ));
861 }
862
863 if watcher.changed().await.is_err() {
865 let final_status = *watcher.borrow();
867 if self.status_matches_target(final_status, target_status) {
868 return Ok(self);
869 }
870 return Err(TestError::execution_completed_early(
871 self.get_id(),
872 target_status,
873 ));
874 }
875 }
876 } else {
877 if self.has_reached_status(target_status) {
879 Ok(self)
880 } else {
881 Err(TestError::execution_completed_early(
882 self.get_id(),
883 target_status,
884 ))
885 }
886 }
887 }
888
889 fn has_reached_status(&self, target: WaitingOperationStatus) -> bool {
891 self.status_matches_target(self.operation.status, target)
892 }
893
894 fn status_matches_target(
896 &self,
897 current: OperationStatus,
898 target: WaitingOperationStatus,
899 ) -> bool {
900 match target {
901 WaitingOperationStatus::Started => {
902 true }
906 WaitingOperationStatus::Submitted => {
907 if self.is_callback() {
910 self.operation
911 .callback_details
912 .as_ref()
913 .map(|d| d.callback_id.is_some())
914 .unwrap_or(false)
915 } else {
916 true
918 }
919 }
920 WaitingOperationStatus::Completed => {
921 current.is_terminal()
923 }
924 }
925 }
926}
927
928#[cfg(test)]
929mod tests {
930 use super::*;
931 use durable_execution_sdk::{
932 CallbackDetails as SdkCallbackDetails, ChainedInvokeDetails as SdkChainedInvokeDetails,
933 ContextDetails as SdkContextDetails, Operation, OperationStatus, OperationType,
934 StepDetails as SdkStepDetails, WaitDetails as SdkWaitDetails,
935 };
936
937 fn create_step_operation(name: &str, result: Option<&str>) -> Operation {
938 let mut op = Operation::new(format!("{}-001", name), OperationType::Step);
939 op.name = Some(name.to_string());
940 op.status = OperationStatus::Succeeded;
941 op.step_details = Some(SdkStepDetails {
942 result: result.map(|s| s.to_string()),
943 attempt: Some(1),
944 next_attempt_timestamp: None,
945 error: None,
946 payload: None,
947 });
948 op
949 }
950
951 fn create_wait_operation(name: &str) -> Operation {
952 let mut op = Operation::new(format!("{}-001", name), OperationType::Wait);
953 op.name = Some(name.to_string());
954 op.status = OperationStatus::Succeeded;
955 op.start_timestamp = Some(1000);
956 op.wait_details = Some(SdkWaitDetails {
957 scheduled_end_timestamp: Some(6000), });
959 op
960 }
961
962 fn create_callback_operation(name: &str, callback_id: &str) -> Operation {
963 let mut op = Operation::new(format!("{}-001", name), OperationType::Callback);
964 op.name = Some(name.to_string());
965 op.status = OperationStatus::Started;
966 op.callback_details = Some(SdkCallbackDetails {
967 callback_id: Some(callback_id.to_string()),
968 result: None,
969 error: None,
970 });
971 op
972 }
973
974 fn create_invoke_operation(name: &str, result: Option<&str>) -> Operation {
975 let mut op = Operation::new(format!("{}-001", name), OperationType::Invoke);
976 op.name = Some(name.to_string());
977 op.status = OperationStatus::Succeeded;
978 op.chained_invoke_details = Some(SdkChainedInvokeDetails {
979 result: result.map(|s| s.to_string()),
980 error: None,
981 });
982 op
983 }
984
985 fn create_context_operation(name: &str, result: Option<&str>) -> Operation {
986 let mut op = Operation::new(format!("{}-001", name), OperationType::Context);
987 op.name = Some(name.to_string());
988 op.status = OperationStatus::Succeeded;
989 op.context_details = Some(SdkContextDetails {
990 result: result.map(|s| s.to_string()),
991 replay_children: None,
992 error: None,
993 });
994 op
995 }
996
997 #[test]
1002 fn test_get_id() {
1003 let op = create_step_operation("my-step", None);
1004 let durable_op = DurableOperation::new(op);
1005 assert_eq!(durable_op.get_id(), "my-step-001");
1006 }
1007
1008 #[test]
1009 fn test_get_name() {
1010 let op = create_step_operation("my-step", None);
1011 let durable_op = DurableOperation::new(op);
1012 assert_eq!(durable_op.get_name(), Some("my-step"));
1013 }
1014
1015 #[test]
1016 fn test_get_type() {
1017 let step_op = DurableOperation::new(create_step_operation("step", None));
1018 assert_eq!(step_op.get_type(), OperationType::Step);
1019
1020 let wait_op = DurableOperation::new(create_wait_operation("wait"));
1021 assert_eq!(wait_op.get_type(), OperationType::Wait);
1022
1023 let callback_op = DurableOperation::new(create_callback_operation("callback", "cb-123"));
1024 assert_eq!(callback_op.get_type(), OperationType::Callback);
1025 }
1026
1027 #[test]
1028 fn test_get_status() {
1029 let op = create_step_operation("step", None);
1030 let durable_op = DurableOperation::new(op);
1031 assert_eq!(durable_op.get_status(), OperationStatus::Succeeded);
1032 }
1033
1034 #[test]
1035 fn test_is_callback() {
1036 let step_op = DurableOperation::new(create_step_operation("step", None));
1037 assert!(!step_op.is_callback());
1038
1039 let callback_op = DurableOperation::new(create_callback_operation("callback", "cb-123"));
1040 assert!(callback_op.is_callback());
1041 }
1042
1043 #[test]
1044 fn test_is_completed() {
1045 let mut op = create_step_operation("step", None);
1046 op.status = OperationStatus::Succeeded;
1047 let durable_op = DurableOperation::new(op);
1048 assert!(durable_op.is_completed());
1049
1050 let mut op2 = create_step_operation("step2", None);
1051 op2.status = OperationStatus::Started;
1052 let durable_op2 = DurableOperation::new(op2);
1053 assert!(!durable_op2.is_completed());
1054 }
1055
1056 #[test]
1061 fn test_get_step_details_success() {
1062 let op = create_step_operation("step", Some(r#""hello""#));
1063 let durable_op = DurableOperation::new(op);
1064
1065 let details: StepDetails<String> = durable_op.get_step_details().unwrap();
1066 assert_eq!(details.attempt, Some(1));
1067 assert_eq!(details.result, Some("hello".to_string()));
1068 assert!(details.error.is_none());
1069 }
1070
1071 #[test]
1072 fn test_get_step_details_wrong_type() {
1073 let op = create_wait_operation("wait");
1074 let durable_op = DurableOperation::new(op);
1075
1076 let result: Result<StepDetails<String>, _> = durable_op.get_step_details();
1077 assert!(result.is_err());
1078 assert!(matches!(
1079 result.unwrap_err(),
1080 TestError::OperationTypeMismatch { .. }
1081 ));
1082 }
1083
1084 #[test]
1085 fn test_get_wait_details_success() {
1086 let op = create_wait_operation("wait");
1087 let durable_op = DurableOperation::new(op);
1088
1089 let details = durable_op.get_wait_details().unwrap();
1090 assert_eq!(details.wait_seconds, Some(5));
1091 assert!(details.scheduled_end_timestamp.is_some());
1092 }
1093
1094 #[test]
1095 fn test_get_wait_details_wrong_type() {
1096 let op = create_step_operation("step", None);
1097 let durable_op = DurableOperation::new(op);
1098
1099 let result = durable_op.get_wait_details();
1100 assert!(result.is_err());
1101 assert!(matches!(
1102 result.unwrap_err(),
1103 TestError::OperationTypeMismatch { .. }
1104 ));
1105 }
1106
1107 #[test]
1108 fn test_get_callback_details_success() {
1109 let op = create_callback_operation("callback", "cb-123");
1110 let durable_op = DurableOperation::new(op);
1111
1112 let details: CallbackDetails<String> = durable_op.get_callback_details().unwrap();
1113 assert_eq!(details.callback_id, Some("cb-123".to_string()));
1114 assert!(details.result.is_none());
1115 assert!(details.error.is_none());
1116 }
1117
1118 #[test]
1119 fn test_get_callback_details_wrong_type() {
1120 let op = create_step_operation("step", None);
1121 let durable_op = DurableOperation::new(op);
1122
1123 let result: Result<CallbackDetails<String>, _> = durable_op.get_callback_details();
1124 assert!(result.is_err());
1125 assert!(matches!(
1126 result.unwrap_err(),
1127 TestError::OperationTypeMismatch { .. }
1128 ));
1129 }
1130
1131 #[test]
1132 fn test_get_invoke_details_success() {
1133 let op = create_invoke_operation("invoke", Some(r#"{"value": 42}"#));
1134 let durable_op = DurableOperation::new(op);
1135
1136 let details: InvokeDetails<serde_json::Value> = durable_op.get_invoke_details().unwrap();
1137 assert!(details.result.is_some());
1138 assert_eq!(details.result.unwrap()["value"], 42);
1139 assert!(details.error.is_none());
1140 }
1141
1142 #[test]
1143 fn test_get_invoke_details_wrong_type() {
1144 let op = create_step_operation("step", None);
1145 let durable_op = DurableOperation::new(op);
1146
1147 let result: Result<InvokeDetails<String>, _> = durable_op.get_invoke_details();
1148 assert!(result.is_err());
1149 assert!(matches!(
1150 result.unwrap_err(),
1151 TestError::OperationTypeMismatch { .. }
1152 ));
1153 }
1154
1155 #[test]
1156 fn test_get_context_details_success() {
1157 let op = create_context_operation("context", Some(r#""done""#));
1158 let durable_op = DurableOperation::new(op);
1159
1160 let details: ContextDetails<String> = durable_op.get_context_details().unwrap();
1161 assert_eq!(details.result, Some("done".to_string()));
1162 assert!(details.error.is_none());
1163 }
1164
1165 #[test]
1166 fn test_get_context_details_wrong_type() {
1167 let op = create_step_operation("step", None);
1168 let durable_op = DurableOperation::new(op);
1169
1170 let result: Result<ContextDetails<String>, _> = durable_op.get_context_details();
1171 assert!(result.is_err());
1172 assert!(matches!(
1173 result.unwrap_err(),
1174 TestError::OperationTypeMismatch { .. }
1175 ));
1176 }
1177
1178 #[tokio::test]
1183 async fn test_send_callback_success_on_callback_operation() {
1184 let op = create_callback_operation("callback", "cb-123");
1185 let durable_op = DurableOperation::new(op);
1186
1187 let result = durable_op.send_callback_success(r#""result""#).await;
1189 assert!(result.is_ok());
1190 }
1191
1192 #[tokio::test]
1193 async fn test_send_callback_success_on_non_callback_operation() {
1194 let op = create_step_operation("step", None);
1195 let durable_op = DurableOperation::new(op);
1196
1197 let result = durable_op.send_callback_success(r#""result""#).await;
1198 assert!(result.is_err());
1199 assert!(matches!(
1200 result.unwrap_err(),
1201 TestError::NotCallbackOperation
1202 ));
1203 }
1204
1205 #[tokio::test]
1206 async fn test_send_callback_failure_on_callback_operation() {
1207 let op = create_callback_operation("callback", "cb-123");
1208 let durable_op = DurableOperation::new(op);
1209
1210 let error = TestResultError::new("TestError", "Something went wrong");
1211 let result = durable_op.send_callback_failure(&error).await;
1212 assert!(result.is_ok());
1213 }
1214
1215 #[tokio::test]
1216 async fn test_send_callback_failure_on_non_callback_operation() {
1217 let op = create_step_operation("step", None);
1218 let durable_op = DurableOperation::new(op);
1219
1220 let error = TestResultError::new("TestError", "Something went wrong");
1221 let result = durable_op.send_callback_failure(&error).await;
1222 assert!(result.is_err());
1223 assert!(matches!(
1224 result.unwrap_err(),
1225 TestError::NotCallbackOperation
1226 ));
1227 }
1228
1229 #[tokio::test]
1230 async fn test_send_callback_heartbeat_on_callback_operation() {
1231 let op = create_callback_operation("callback", "cb-123");
1232 let durable_op = DurableOperation::new(op);
1233
1234 let result = durable_op.send_callback_heartbeat().await;
1235 assert!(result.is_ok());
1236 }
1237
1238 #[tokio::test]
1239 async fn test_send_callback_heartbeat_on_non_callback_operation() {
1240 let op = create_step_operation("step", None);
1241 let durable_op = DurableOperation::new(op);
1242
1243 let result = durable_op.send_callback_heartbeat().await;
1244 assert!(result.is_err());
1245 assert!(matches!(
1246 result.unwrap_err(),
1247 TestError::NotCallbackOperation
1248 ));
1249 }
1250
1251 #[tokio::test]
1256 async fn test_wait_for_data_started_already_started() {
1257 let op = create_step_operation("step", None);
1258 let durable_op = DurableOperation::new(op);
1259
1260 let result = durable_op
1261 .wait_for_data(WaitingOperationStatus::Started)
1262 .await;
1263 assert!(result.is_ok());
1264 }
1265
1266 #[tokio::test]
1267 async fn test_wait_for_data_completed_already_completed() {
1268 let mut op = create_step_operation("step", None);
1269 op.status = OperationStatus::Succeeded;
1270 let durable_op = DurableOperation::new(op);
1271
1272 let result = durable_op
1273 .wait_for_data(WaitingOperationStatus::Completed)
1274 .await;
1275 assert!(result.is_ok());
1276 }
1277
1278 #[tokio::test]
1279 async fn test_wait_for_data_submitted_callback_with_id() {
1280 let op = create_callback_operation("callback", "cb-123");
1281 let durable_op = DurableOperation::new(op);
1282
1283 let result = durable_op
1284 .wait_for_data(WaitingOperationStatus::Submitted)
1285 .await;
1286 assert!(result.is_ok());
1287 }
1288
1289 #[tokio::test]
1290 async fn test_wait_for_data_completed_not_yet_completed() {
1291 let mut op = create_step_operation("step", None);
1292 op.status = OperationStatus::Started;
1293 let durable_op = DurableOperation::new(op);
1294
1295 let result = durable_op
1297 .wait_for_data(WaitingOperationStatus::Completed)
1298 .await;
1299 assert!(result.is_err());
1300 }
1301
1302 fn create_operation_with_parent(id: &str, name: &str, parent_id: Option<&str>) -> Operation {
1308 let mut op = Operation::new(id.to_string(), OperationType::Step);
1309 op.name = Some(name.to_string());
1310 op.status = OperationStatus::Succeeded;
1311 op.parent_id = parent_id.map(|s| s.to_string());
1312 op
1313 }
1314
1315 #[test]
1316 fn test_get_child_operations_matching_parent_id() {
1317 let parent = create_operation_with_parent("parent-1", "parent", None);
1318 let child1 = create_operation_with_parent("child-1", "child_a", Some("parent-1"));
1319 let child2 = create_operation_with_parent("child-2", "child_b", Some("parent-1"));
1320 let unrelated = create_operation_with_parent("other-1", "other", Some("parent-2"));
1321
1322 let all_ops = Arc::new(vec![
1323 parent.clone(),
1324 child1.clone(),
1325 child2.clone(),
1326 unrelated.clone(),
1327 ]);
1328
1329 let durable_parent = DurableOperation::new(parent).with_operations(all_ops);
1330 let children = durable_parent.get_child_operations();
1331
1332 assert_eq!(children.len(), 2);
1333 assert_eq!(children[0].get_id(), "child-1");
1334 assert_eq!(children[1].get_id(), "child-2");
1335 }
1336
1337 #[test]
1338 fn test_get_child_operations_empty_when_no_children() {
1339 let parent = create_operation_with_parent("parent-1", "parent", None);
1340 let unrelated = create_operation_with_parent("other-1", "other", Some("parent-2"));
1341
1342 let all_ops = Arc::new(vec![parent.clone(), unrelated.clone()]);
1343
1344 let durable_parent = DurableOperation::new(parent).with_operations(all_ops);
1345 let children = durable_parent.get_child_operations();
1346
1347 assert!(children.is_empty());
1348 }
1349
1350 #[test]
1351 fn test_get_child_operations_preserves_order() {
1352 let parent = create_operation_with_parent("parent-1", "parent", None);
1353 let child_c = create_operation_with_parent("child-c", "third", Some("parent-1"));
1354 let child_a = create_operation_with_parent("child-a", "first", Some("parent-1"));
1355 let child_b = create_operation_with_parent("child-b", "second", Some("parent-1"));
1356
1357 let all_ops = Arc::new(vec![
1359 parent.clone(),
1360 child_c.clone(),
1361 child_a.clone(),
1362 child_b.clone(),
1363 ]);
1364
1365 let durable_parent = DurableOperation::new(parent).with_operations(all_ops);
1366 let children = durable_parent.get_child_operations();
1367
1368 assert_eq!(children.len(), 3);
1369 assert_eq!(children[0].get_id(), "child-c");
1371 assert_eq!(children[1].get_id(), "child-a");
1372 assert_eq!(children[2].get_id(), "child-b");
1373 }
1374
1375 #[test]
1376 fn test_get_child_operations_without_all_operations() {
1377 let parent = create_operation_with_parent("parent-1", "parent", None);
1379 let durable_parent = DurableOperation::new(parent);
1380 let children = durable_parent.get_child_operations();
1381
1382 assert!(children.is_empty());
1383 }
1384
1385 #[test]
1386 fn test_get_child_operations_children_can_enumerate_grandchildren() {
1387 let parent = create_operation_with_parent("parent-1", "parent", None);
1388 let child = create_operation_with_parent("child-1", "child", Some("parent-1"));
1389 let grandchild =
1390 create_operation_with_parent("grandchild-1", "grandchild", Some("child-1"));
1391
1392 let all_ops = Arc::new(vec![parent.clone(), child.clone(), grandchild.clone()]);
1393
1394 let durable_parent = DurableOperation::new(parent).with_operations(all_ops);
1395 let children = durable_parent.get_child_operations();
1396
1397 assert_eq!(children.len(), 1);
1398 assert_eq!(children[0].get_id(), "child-1");
1399
1400 let grandchildren = children[0].get_child_operations();
1402 assert_eq!(grandchildren.len(), 1);
1403 assert_eq!(grandchildren[0].get_id(), "grandchild-1");
1404 }
1405
1406 #[test]
1411 fn test_step_details_default() {
1412 let details: StepDetails<String> = StepDetails::default();
1413 assert!(details.attempt.is_none());
1414 assert!(details.next_attempt_timestamp.is_none());
1415 assert!(details.result.is_none());
1416 assert!(details.error.is_none());
1417 }
1418
1419 #[test]
1420 fn test_wait_details_default() {
1421 let details = WaitDetails::default();
1422 assert!(details.wait_seconds.is_none());
1423 assert!(details.scheduled_end_timestamp.is_none());
1424 }
1425
1426 #[test]
1427 fn test_callback_details_default() {
1428 let details: CallbackDetails<String> = CallbackDetails::default();
1429 assert!(details.callback_id.is_none());
1430 assert!(details.result.is_none());
1431 assert!(details.error.is_none());
1432 }
1433
1434 #[test]
1435 fn test_invoke_details_default() {
1436 let details: InvokeDetails<String> = InvokeDetails::default();
1437 assert!(details.result.is_none());
1438 assert!(details.error.is_none());
1439 }
1440
1441 #[test]
1442 fn test_context_details_default() {
1443 let details: ContextDetails<String> = ContextDetails::default();
1444 assert!(details.result.is_none());
1445 assert!(details.error.is_none());
1446 }
1447}
1448
1449#[cfg(test)]
1453mod property_tests {
1454 use super::*;
1455 use durable_execution_sdk::{
1456 CallbackDetails as SdkCallbackDetails, ChainedInvokeDetails as SdkChainedInvokeDetails,
1457 ContextDetails as SdkContextDetails, Operation, OperationStatus, OperationType,
1458 StepDetails as SdkStepDetails, WaitDetails as SdkWaitDetails,
1459 };
1460 use proptest::prelude::*;
1461
1462 fn operation_type_strategy() -> impl Strategy<Value = OperationType> {
1464 prop_oneof![
1465 Just(OperationType::Step),
1466 Just(OperationType::Wait),
1467 Just(OperationType::Callback),
1468 Just(OperationType::Invoke),
1469 Just(OperationType::Context),
1470 ]
1471 }
1472
1473 fn operation_status_strategy() -> impl Strategy<Value = OperationStatus> {
1475 prop_oneof![
1476 Just(OperationStatus::Started),
1477 Just(OperationStatus::Pending),
1478 Just(OperationStatus::Ready),
1479 Just(OperationStatus::Succeeded),
1480 Just(OperationStatus::Failed),
1481 Just(OperationStatus::Cancelled),
1482 Just(OperationStatus::TimedOut),
1483 Just(OperationStatus::Stopped),
1484 ]
1485 }
1486
1487 fn operation_id_strategy() -> impl Strategy<Value = String> {
1489 "[a-zA-Z0-9_-]{1,32}".prop_map(|s| s)
1490 }
1491
1492 fn optional_result_strategy() -> impl Strategy<Value = Option<String>> {
1494 prop_oneof![
1495 Just(None),
1496 Just(Some(r#""hello""#.to_string())),
1497 Just(Some(r#"42"#.to_string())),
1498 Just(Some(r#"{"key": "value"}"#.to_string())),
1499 Just(Some(r#"true"#.to_string())),
1500 ]
1501 }
1502
1503 fn optional_callback_id_strategy() -> impl Strategy<Value = Option<String>> {
1505 prop_oneof![Just(None), "[a-zA-Z0-9_-]{8,16}".prop_map(|s| Some(s)),]
1506 }
1507
1508 fn optional_timestamp_strategy() -> impl Strategy<Value = Option<i64>> {
1510 prop_oneof![
1511 Just(None),
1512 (1577836800000i64..1893456000000i64).prop_map(Some),
1513 ]
1514 }
1515
1516 fn create_operation_with_type(
1518 op_type: OperationType,
1519 op_id: String,
1520 status: OperationStatus,
1521 result: Option<String>,
1522 callback_id: Option<String>,
1523 start_ts: Option<i64>,
1524 end_ts: Option<i64>,
1525 ) -> Operation {
1526 let mut op = Operation::new(op_id, op_type);
1527 op.status = status;
1528 op.start_timestamp = start_ts;
1529 op.end_timestamp = end_ts;
1530
1531 match op_type {
1532 OperationType::Step => {
1533 op.step_details = Some(SdkStepDetails {
1534 result,
1535 attempt: Some(1),
1536 next_attempt_timestamp: None,
1537 error: None,
1538 payload: None,
1539 });
1540 }
1541 OperationType::Wait => {
1542 op.wait_details = Some(SdkWaitDetails {
1543 scheduled_end_timestamp: end_ts,
1544 });
1545 }
1546 OperationType::Callback => {
1547 op.callback_details = Some(SdkCallbackDetails {
1548 callback_id,
1549 result,
1550 error: None,
1551 });
1552 }
1553 OperationType::Invoke => {
1554 op.chained_invoke_details = Some(SdkChainedInvokeDetails {
1555 result,
1556 error: None,
1557 });
1558 }
1559 OperationType::Context => {
1560 op.context_details = Some(SdkContextDetails {
1561 result,
1562 replay_children: None,
1563 error: None,
1564 });
1565 }
1566 OperationType::Execution => {
1567 }
1569 }
1570
1571 op
1572 }
1573
1574 proptest! {
1585 #![proptest_config(ProptestConfig::with_cases(100))]
1586
1587 #[test]
1588 fn prop_type_specific_details_availability(
1589 op_type in operation_type_strategy(),
1590 op_id in operation_id_strategy(),
1591 status in operation_status_strategy(),
1592 result in optional_result_strategy(),
1593 callback_id in optional_callback_id_strategy(),
1594 start_ts in optional_timestamp_strategy(),
1595 end_ts in optional_timestamp_strategy(),
1596 ) {
1597 if op_type == OperationType::Execution {
1599 return Ok(());
1600 }
1601
1602 let op = create_operation_with_type(
1603 op_type, op_id, status, result, callback_id, start_ts, end_ts
1604 );
1605 let durable_op = DurableOperation::new(op);
1606
1607 match op_type {
1609 OperationType::Step => {
1610 let step_result: Result<StepDetails<serde_json::Value>, _> =
1612 durable_op.get_step_details();
1613 prop_assert!(step_result.is_ok(), "get_step_details should succeed for Step operation");
1614
1615 prop_assert!(durable_op.get_wait_details().is_err());
1617 prop_assert!(durable_op.get_callback_details::<serde_json::Value>().is_err());
1618 prop_assert!(durable_op.get_invoke_details::<serde_json::Value>().is_err());
1619 prop_assert!(durable_op.get_context_details::<serde_json::Value>().is_err());
1620 }
1621 OperationType::Wait => {
1622 prop_assert!(durable_op.get_wait_details().is_ok(),
1624 "get_wait_details should succeed for Wait operation");
1625
1626 prop_assert!(durable_op.get_step_details::<serde_json::Value>().is_err());
1628 prop_assert!(durable_op.get_callback_details::<serde_json::Value>().is_err());
1629 prop_assert!(durable_op.get_invoke_details::<serde_json::Value>().is_err());
1630 prop_assert!(durable_op.get_context_details::<serde_json::Value>().is_err());
1631 }
1632 OperationType::Callback => {
1633 let callback_result: Result<CallbackDetails<serde_json::Value>, _> =
1635 durable_op.get_callback_details();
1636 prop_assert!(callback_result.is_ok(),
1637 "get_callback_details should succeed for Callback operation");
1638
1639 prop_assert!(durable_op.get_step_details::<serde_json::Value>().is_err());
1641 prop_assert!(durable_op.get_wait_details().is_err());
1642 prop_assert!(durable_op.get_invoke_details::<serde_json::Value>().is_err());
1643 prop_assert!(durable_op.get_context_details::<serde_json::Value>().is_err());
1644 }
1645 OperationType::Invoke => {
1646 let invoke_result: Result<InvokeDetails<serde_json::Value>, _> =
1648 durable_op.get_invoke_details();
1649 prop_assert!(invoke_result.is_ok(),
1650 "get_invoke_details should succeed for Invoke operation");
1651
1652 prop_assert!(durable_op.get_step_details::<serde_json::Value>().is_err());
1654 prop_assert!(durable_op.get_wait_details().is_err());
1655 prop_assert!(durable_op.get_callback_details::<serde_json::Value>().is_err());
1656 prop_assert!(durable_op.get_context_details::<serde_json::Value>().is_err());
1657 }
1658 OperationType::Context => {
1659 let context_result: Result<ContextDetails<serde_json::Value>, _> =
1661 durable_op.get_context_details();
1662 prop_assert!(context_result.is_ok(),
1663 "get_context_details should succeed for Context operation");
1664
1665 prop_assert!(durable_op.get_step_details::<serde_json::Value>().is_err());
1667 prop_assert!(durable_op.get_wait_details().is_err());
1668 prop_assert!(durable_op.get_callback_details::<serde_json::Value>().is_err());
1669 prop_assert!(durable_op.get_invoke_details::<serde_json::Value>().is_err());
1670 }
1671 OperationType::Execution => {
1672 }
1674 }
1675 }
1676 }
1677
1678 proptest! {
1689 #![proptest_config(ProptestConfig::with_cases(100))]
1690
1691 #[test]
1692 fn prop_callback_method_type_safety(
1693 op_type in operation_type_strategy(),
1694 op_id in operation_id_strategy(),
1695 status in operation_status_strategy(),
1696 result in optional_result_strategy(),
1697 callback_id in optional_callback_id_strategy(),
1698 ) {
1699 if op_type == OperationType::Execution {
1701 return Ok(());
1702 }
1703
1704 let op = create_operation_with_type(
1705 op_type, op_id, status, result, callback_id.clone(), None, None
1706 );
1707 let durable_op = DurableOperation::new(op);
1708
1709 let rt = tokio::runtime::Runtime::new().unwrap();
1711
1712 if op_type == OperationType::Callback {
1713 if callback_id.is_some() {
1716 let success_result = rt.block_on(durable_op.send_callback_success(r#""test""#));
1717 prop_assert!(success_result.is_ok(),
1718 "send_callback_success should succeed for Callback operation with callback_id");
1719
1720 let error = TestResultError::new("TestError", "test");
1721 let failure_result = rt.block_on(durable_op.send_callback_failure(&error));
1722 prop_assert!(failure_result.is_ok(),
1723 "send_callback_failure should succeed for Callback operation with callback_id");
1724
1725 let heartbeat_result = rt.block_on(durable_op.send_callback_heartbeat());
1726 prop_assert!(heartbeat_result.is_ok(),
1727 "send_callback_heartbeat should succeed for Callback operation with callback_id");
1728 }
1729 } else {
1730 let success_result = rt.block_on(durable_op.send_callback_success(r#""test""#));
1732 prop_assert!(success_result.is_err(),
1733 "send_callback_success should fail for non-Callback operation");
1734 prop_assert!(matches!(success_result.unwrap_err(), TestError::NotCallbackOperation));
1735
1736 let error = TestResultError::new("TestError", "test");
1737 let failure_result = rt.block_on(durable_op.send_callback_failure(&error));
1738 prop_assert!(failure_result.is_err(),
1739 "send_callback_failure should fail for non-Callback operation");
1740 prop_assert!(matches!(failure_result.unwrap_err(), TestError::NotCallbackOperation));
1741
1742 let heartbeat_result = rt.block_on(durable_op.send_callback_heartbeat());
1743 prop_assert!(heartbeat_result.is_err(),
1744 "send_callback_heartbeat should fail for non-Callback operation");
1745 prop_assert!(matches!(heartbeat_result.unwrap_err(), TestError::NotCallbackOperation));
1746 }
1747 }
1748 }
1749}