1use std::collections::HashSet;
27use std::future::Future;
28use std::pin::Pin;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::Arc;
31
32use serde::de::DeserializeOwned;
33use serde::Serialize;
34use tokio::sync::{Mutex, RwLock};
35
36use durable_execution_sdk::{
37 DurableContext, DurableError, DurableServiceClient, Operation, OperationStatus, OperationType,
38};
39
40use super::scheduler::{QueueScheduler, Scheduler};
41use super::types::ExecutionId;
42use super::worker_manager::CheckpointWorkerManager;
43use crate::operation::CallbackSender;
44use crate::operation_handle::{OperationHandle, OperationMatcher};
45use crate::types::{ExecutionStatus, Invocation, TestResultError};
46
47#[derive(Debug, Clone, Default)]
49pub struct SkipTimeConfig {
50 pub enabled: bool,
52}
53
54#[derive(Debug)]
56pub struct TestExecutionResult<T> {
57 pub status: ExecutionStatus,
59 pub result: Option<T>,
61 pub error: Option<TestResultError>,
63 pub operations: Vec<Operation>,
65 pub invocations: Vec<Invocation>,
67 pub execution_id: String,
69}
70
71impl<T> TestExecutionResult<T> {
72 pub fn success(result: T, operations: Vec<Operation>, execution_id: String) -> Self {
74 Self {
75 status: ExecutionStatus::Succeeded,
76 result: Some(result),
77 error: None,
78 operations,
79 invocations: Vec::new(),
80 execution_id,
81 }
82 }
83
84 pub fn failure(
86 error: TestResultError,
87 operations: Vec<Operation>,
88 execution_id: String,
89 ) -> Self {
90 Self {
91 status: ExecutionStatus::Failed,
92 result: None,
93 error: Some(error),
94 operations,
95 invocations: Vec::new(),
96 execution_id,
97 }
98 }
99
100 pub fn running(operations: Vec<Operation>, execution_id: String) -> Self {
102 Self {
103 status: ExecutionStatus::Running,
104 result: None,
105 error: None,
106 operations,
107 invocations: Vec::new(),
108 execution_id,
109 }
110 }
111
112 pub fn with_invocation(mut self, invocation: Invocation) -> Self {
114 self.invocations.push(invocation);
115 self
116 }
117}
118
119#[derive(Debug, Default)]
121pub struct OperationStorage {
122 operations: Vec<Operation>,
124 operations_by_id: std::collections::HashMap<String, usize>,
126 operations_by_name: std::collections::HashMap<String, Vec<usize>>,
128}
129
130impl OperationStorage {
131 pub fn new() -> Self {
133 Self::default()
134 }
135
136 pub fn add_operation(&mut self, operation: Operation) {
138 let index = self.operations.len();
139 let id = operation.operation_id.clone();
140 let name = operation.name.clone();
141
142 self.operations.push(operation);
143 self.operations_by_id.insert(id, index);
144
145 if let Some(name) = name {
146 self.operations_by_name.entry(name).or_default().push(index);
147 }
148 }
149
150 pub fn update_operation(&mut self, operation: Operation) {
152 let id = operation.operation_id.clone();
153 if let Some(&index) = self.operations_by_id.get(&id) {
154 self.operations[index] = operation;
155 } else {
156 self.add_operation(operation);
157 }
158 }
159
160 pub fn get_by_id(&self, id: &str) -> Option<&Operation> {
162 self.operations_by_id
163 .get(id)
164 .and_then(|&idx| self.operations.get(idx))
165 }
166
167 pub fn get_all(&self) -> &[Operation] {
169 &self.operations
170 }
171
172 pub fn clear(&mut self) {
174 self.operations.clear();
175 self.operations_by_id.clear();
176 self.operations_by_name.clear();
177 }
178
179 pub fn len(&self) -> usize {
181 self.operations.len()
182 }
183
184 pub fn is_empty(&self) -> bool {
186 self.operations.is_empty()
187 }
188}
189
190pub type BoxedHandler<I, O> = Box<
192 dyn Fn(I, DurableContext) -> Pin<Box<dyn Future<Output = Result<O, DurableError>> + Send>>
193 + Send
194 + Sync,
195>;
196
197pub struct TestExecutionOrchestrator<I, O>
206where
207 I: DeserializeOwned + Send + Serialize + 'static,
208 O: Serialize + DeserializeOwned + Send + 'static,
209{
210 handler: BoxedHandler<I, O>,
212 operation_storage: Arc<RwLock<OperationStorage>>,
214 checkpoint_api: Arc<CheckpointWorkerManager>,
216 skip_time_config: SkipTimeConfig,
218 scheduler: Box<dyn Scheduler>,
220 pending_operations: HashSet<String>,
222 invocation_active: Arc<AtomicBool>,
224 execution_id: Option<ExecutionId>,
226 checkpoint_token: Option<String>,
228 execution_complete: Arc<AtomicBool>,
230 final_result: Arc<Mutex<Option<Result<O, DurableError>>>>,
232 registered_handles: Vec<OperationHandle>,
234 shared_operations: Arc<RwLock<Vec<Operation>>>,
236 callback_sender: Option<Arc<dyn CallbackSender>>,
238}
239
240impl<I, O> TestExecutionOrchestrator<I, O>
241where
242 I: DeserializeOwned + Send + Serialize + Clone + 'static,
243 O: Serialize + DeserializeOwned + Send + 'static,
244{
245 pub fn new<F, Fut>(
254 handler: F,
255 operation_storage: Arc<RwLock<OperationStorage>>,
256 checkpoint_api: Arc<CheckpointWorkerManager>,
257 skip_time_config: SkipTimeConfig,
258 ) -> Self
259 where
260 F: Fn(I, DurableContext) -> Fut + Send + Sync + 'static,
261 Fut: Future<Output = Result<O, DurableError>> + Send + 'static,
262 {
263 let boxed_handler = Box::new(move |input: I, ctx: DurableContext| {
264 let fut = handler(input, ctx);
265 Box::pin(fut) as Pin<Box<dyn Future<Output = Result<O, DurableError>> + Send>>
266 });
267
268 let scheduler: Box<dyn Scheduler> = Box::new(QueueScheduler::new());
270
271 Self {
272 handler: boxed_handler,
273 operation_storage,
274 checkpoint_api,
275 skip_time_config,
276 scheduler,
277 pending_operations: HashSet::new(),
278 invocation_active: Arc::new(AtomicBool::new(false)),
279 execution_id: None,
280 checkpoint_token: None,
281 execution_complete: Arc::new(AtomicBool::new(false)),
282 final_result: Arc::new(Mutex::new(None)),
283 registered_handles: Vec::new(),
284 shared_operations: Arc::new(RwLock::new(Vec::new())),
285 callback_sender: None,
286 }
287 }
288
289 pub fn with_handles(
297 mut self,
298 handles: Vec<OperationHandle>,
299 shared_operations: Arc<RwLock<Vec<Operation>>>,
300 callback_sender: Option<Arc<dyn CallbackSender>>,
301 ) -> Self {
302 if let Some(ref sender) = callback_sender {
306 for handle in &handles {
307 let sender_clone = Arc::clone(sender);
308 if let Ok(mut guard) = handle.callback_sender.try_write() {
309 *guard = Some(sender_clone);
310 }
311 }
312 }
313 self.registered_handles = handles;
314 self.shared_operations = shared_operations;
315 self.callback_sender = callback_sender;
316 self
317 }
318
319 pub fn is_time_skipping_enabled(&self) -> bool {
321 self.skip_time_config.enabled
322 }
323
324 pub fn execution_id(&self) -> Option<&str> {
326 self.execution_id.as_deref()
327 }
328
329 pub fn checkpoint_token(&self) -> Option<&str> {
331 self.checkpoint_token.as_deref()
332 }
333
334 pub fn is_execution_complete(&self) -> bool {
336 self.execution_complete.load(Ordering::SeqCst)
337 }
338
339 pub fn is_invocation_active(&self) -> bool {
341 self.invocation_active.load(Ordering::SeqCst)
342 }
343}
344
345impl<I, O> TestExecutionOrchestrator<I, O>
346where
347 I: DeserializeOwned + Send + Serialize + Clone + 'static,
348 O: Serialize + DeserializeOwned + Send + 'static,
349{
350 pub async fn execute_handler(
374 &mut self,
375 payload: I,
376 ) -> Result<TestExecutionResult<O>, crate::error::TestError> {
377 use super::types::{ApiType, StartDurableExecutionRequest};
378 use durable_execution_sdk::lambda::InitialExecutionState;
379 use durable_execution_sdk::state::ExecutionState;
380
381 self.operation_storage.write().await.clear();
383 self.pending_operations.clear();
384 self.execution_complete.store(false, Ordering::SeqCst);
385 *self.final_result.lock().await = None;
386
387 let payload_json = serde_json::to_string(&payload)?;
389
390 let invocation_id = uuid::Uuid::new_v4().to_string();
392 let start_request = StartDurableExecutionRequest {
393 invocation_id: invocation_id.clone(),
394 payload: Some(payload_json),
395 };
396 let start_payload = serde_json::to_string(&start_request)?;
397
398 let start_response = self
399 .checkpoint_api
400 .send_api_request(ApiType::StartDurableExecution, start_payload)
401 .await?;
402
403 if let Some(error) = start_response.error {
404 return Err(crate::error::TestError::CheckpointServerError(error));
405 }
406
407 let invocation_result: super::InvocationResult =
408 serde_json::from_str(&start_response.payload.ok_or_else(|| {
409 crate::error::TestError::CheckpointServerError(
410 "Empty response from checkpoint server".to_string(),
411 )
412 })?)?;
413
414 self.execution_id = Some(invocation_result.execution_id.clone());
415 self.checkpoint_token = Some(invocation_result.checkpoint_token.clone());
416
417 let execution_arn = invocation_result.execution_id.clone();
418 let checkpoint_token = invocation_result.checkpoint_token.clone();
419
420 let initial_state = InitialExecutionState::new();
422
423 let execution_state = Arc::new(ExecutionState::new(
425 &execution_arn,
426 &checkpoint_token,
427 initial_state,
428 self.checkpoint_api.clone(),
429 ));
430
431 let ctx = DurableContext::new(execution_state.clone());
433
434 let start_time = chrono::Utc::now();
436 let mut invocation = Invocation::with_start(start_time);
437
438 self.invocation_active.store(true, Ordering::SeqCst);
440 let handler_result = (self.handler)(payload.clone(), ctx).await;
441 self.invocation_active.store(false, Ordering::SeqCst);
442
443 let end_time = chrono::Utc::now();
445 invocation = invocation.with_end(end_time);
446
447 let operations = match self.checkpoint_api.get_operations(&execution_arn, "").await {
449 Ok(response) => {
450 let mut storage = self.operation_storage.write().await;
451 for op in &response.operations {
452 storage.update_operation(op.clone());
453 }
454 response.operations
455 }
456 Err(_) => Vec::new(),
457 };
458
459 self.populate_handles(&operations).await;
461
462 match handler_result {
464 Ok(result) => {
465 self.execution_complete.store(true, Ordering::SeqCst);
466 let mut test_result =
467 TestExecutionResult::success(result, operations, execution_arn);
468 test_result.invocations.push(invocation);
469 Ok(test_result)
470 }
471 Err(error) => {
472 if error.is_suspend() {
474 let test_result = self
476 .handle_pending_execution(payload, execution_arn, invocation)
477 .await?;
478 Ok(test_result)
479 } else {
480 self.execution_complete.store(true, Ordering::SeqCst);
481 let error_obj = durable_execution_sdk::ErrorObject::from(&error);
482 let test_error = TestResultError::new(error_obj.error_type, error.to_string());
483 let mut test_result =
484 TestExecutionResult::failure(test_error.clone(), operations, execution_arn);
485 test_result
486 .invocations
487 .push(invocation.with_error(test_error));
488 Ok(test_result)
489 }
490 }
491 }
492 }
493
494 async fn handle_pending_execution(
496 &mut self,
497 payload: I,
498 execution_arn: String,
499 initial_invocation: Invocation,
500 ) -> Result<TestExecutionResult<O>, crate::error::TestError> {
501 let mut invocations = vec![initial_invocation];
502 let mut iteration_count = 0;
503 const MAX_ITERATIONS: usize = 100; loop {
506 iteration_count += 1;
507 if iteration_count > MAX_ITERATIONS {
508 return Err(crate::error::TestError::CheckpointServerError(
509 "Maximum iteration count exceeded".to_string(),
510 ));
511 }
512
513 let mut operations = match self.checkpoint_api.get_operations(&execution_arn, "").await
515 {
516 Ok(response) => {
517 let mut storage = self.operation_storage.write().await;
518 for op in &response.operations {
519 storage.update_operation(op.clone());
520 }
521 response.operations
522 }
523 Err(_) => Vec::new(),
524 };
525
526 self.populate_handles(&operations).await;
528
529 let process_result = self.process_operations(&operations, &execution_arn);
531
532 match process_result {
533 ProcessOperationsResult::ExecutionSucceeded(result_str) => {
534 self.execution_complete.store(true, Ordering::SeqCst);
535 if let Ok(result) = serde_json::from_str::<O>(&result_str) {
536 let mut test_result =
537 TestExecutionResult::success(result, operations, execution_arn);
538 test_result.invocations = invocations;
539 return Ok(test_result);
540 }
541 let mut test_result = TestExecutionResult::running(operations, execution_arn);
543 test_result.invocations = invocations;
544 return Ok(test_result);
545 }
546 ProcessOperationsResult::ExecutionFailed(error) => {
547 self.execution_complete.store(true, Ordering::SeqCst);
548 let mut test_result =
549 TestExecutionResult::failure(error, operations, execution_arn);
550 test_result.invocations = invocations;
551 return Ok(test_result);
552 }
553 ProcessOperationsResult::NoPendingOperations => {
554 let mut test_result = TestExecutionResult::running(operations, execution_arn);
556 test_result.invocations = invocations;
557 return Ok(test_result);
558 }
559 ProcessOperationsResult::ShouldReinvoke(advance_time_ms) => {
560 if advance_time_ms.is_none() {
564 if !self.registered_handles.is_empty() {
568 loop {
569 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
570
571 let poll_operations = match self
573 .checkpoint_api
574 .get_operations(&execution_arn, "")
575 .await
576 {
577 Ok(response) => response.operations,
578 Err(_) => continue,
579 };
580
581 self.populate_handles(&poll_operations).await;
583
584 let all_callbacks_done =
586 self.pending_operations.iter().all(|op_id| {
587 poll_operations.iter().any(|op| {
588 &op.operation_id == op_id
589 && op.operation_type == OperationType::Callback
590 && matches!(
591 op.status,
592 OperationStatus::Succeeded
593 | OperationStatus::Failed
594 | OperationStatus::Cancelled
595 )
596 })
597 });
598
599 if all_callbacks_done {
600 operations = poll_operations;
601 break;
602 }
603 }
604 } else {
606 let mut test_result =
608 TestExecutionResult::running(operations, execution_arn);
609 test_result.invocations = invocations;
610 return Ok(test_result);
611 }
612 }
613
614 if let Some(advance_ms) = advance_time_ms {
616 if advance_ms > 0 {
617 tokio::time::advance(tokio::time::Duration::from_millis(advance_ms))
618 .await;
619 }
620 }
621
622 if self.skip_time_config.enabled {
627 for op in &operations {
628 if op.operation_type == OperationType::Wait
629 && op.status == OperationStatus::Started
630 {
631 let mut updated_operation = op.clone();
637 updated_operation.status = OperationStatus::Succeeded;
638 updated_operation.end_timestamp =
639 Some(chrono::Utc::now().timestamp_millis());
640
641 let update_request = super::types::UpdateCheckpointDataRequest {
642 execution_id: execution_arn.clone(),
643 operation_id: op.operation_id.clone(),
644 operation_data: updated_operation,
645 payload: None,
646 error: None,
647 };
648
649 let payload = serde_json::to_string(&update_request)?;
650 let _ = self
651 .checkpoint_api
652 .send_api_request(
653 super::types::ApiType::UpdateCheckpointData,
654 payload,
655 )
656 .await;
657 }
658 }
659 }
660 }
661 }
662
663 let new_invocation_id = uuid::Uuid::new_v4().to_string();
665 let start_invocation_request = super::types::StartInvocationRequest {
666 execution_id: execution_arn.clone(),
667 invocation_id: new_invocation_id.clone(),
668 };
669 let start_payload = serde_json::to_string(&start_invocation_request)?;
670
671 let start_response = self
672 .checkpoint_api
673 .send_api_request(super::types::ApiType::StartInvocation, start_payload)
674 .await?;
675
676 if let Some(error) = start_response.error {
677 return Err(crate::error::TestError::CheckpointServerError(error));
678 }
679
680 let invocation_result: super::InvocationResult =
681 serde_json::from_str(&start_response.payload.ok_or_else(|| {
682 crate::error::TestError::CheckpointServerError(
683 "Empty response from checkpoint server".to_string(),
684 )
685 })?)?;
686
687 self.checkpoint_token = Some(invocation_result.checkpoint_token.clone());
688
689 use durable_execution_sdk::lambda::InitialExecutionState;
692 use durable_execution_sdk::state::ExecutionState;
693
694 let current_operations: Vec<Operation> = invocation_result
696 .operation_events
697 .iter()
698 .map(|e| e.operation.clone())
699 .collect();
700 let initial_state = InitialExecutionState::with_operations(current_operations);
701 let execution_state = Arc::new(ExecutionState::new(
702 &execution_arn,
703 &invocation_result.checkpoint_token,
704 initial_state,
705 self.checkpoint_api.clone(),
706 ));
707
708 let ctx = DurableContext::new(execution_state);
709
710 let start_time = chrono::Utc::now();
712 let mut invocation = Invocation::with_start(start_time);
713
714 self.invocation_active.store(true, Ordering::SeqCst);
716 let handler_result = (self.handler)(payload.clone(), ctx).await;
717 self.invocation_active.store(false, Ordering::SeqCst);
718
719 let end_time = chrono::Utc::now();
720 invocation = invocation.with_end(end_time);
721
722 match handler_result {
723 Ok(result) => {
724 self.execution_complete.store(true, Ordering::SeqCst);
725 invocations.push(invocation);
726
727 let final_operations =
729 match self.checkpoint_api.get_operations(&execution_arn, "").await {
730 Ok(response) => response.operations,
731 Err(_) => Vec::new(),
732 };
733
734 self.populate_handles(&final_operations).await;
736
737 let mut test_result =
738 TestExecutionResult::success(result, final_operations, execution_arn);
739 test_result.invocations = invocations;
740 return Ok(test_result);
741 }
742 Err(error) => {
743 if error.is_suspend() {
744 invocations.push(invocation);
746 continue;
747 } else {
748 self.execution_complete.store(true, Ordering::SeqCst);
749 let error_obj = durable_execution_sdk::ErrorObject::from(&error);
750 let test_error =
751 TestResultError::new(error_obj.error_type, error.to_string());
752 invocations.push(invocation.with_error(test_error.clone()));
753
754 let final_operations =
755 match self.checkpoint_api.get_operations(&execution_arn, "").await {
756 Ok(response) => response.operations,
757 Err(_) => Vec::new(),
758 };
759
760 self.populate_handles(&final_operations).await;
762
763 let mut test_result = TestExecutionResult::failure(
764 test_error,
765 final_operations,
766 execution_arn,
767 );
768 test_result.invocations = invocations;
769 return Ok(test_result);
770 }
771 }
772 }
773 }
774 }
775
776 fn process_operations(
795 &mut self,
796 operations: &[Operation],
797 execution_id: &str,
798 ) -> ProcessOperationsResult {
799 if let Some(exec_result) = self.handle_execution_update(operations) {
801 return exec_result;
802 }
803
804 let mut has_pending_operations = false;
806 let mut earliest_scheduled_time: Option<i64> = None;
807
808 for operation in operations {
810 let result = self.process_operation(operation, execution_id);
811
812 match result {
813 OperationProcessResult::Pending(scheduled_time) => {
814 has_pending_operations = true;
815 if let Some(time) = scheduled_time {
816 match earliest_scheduled_time {
817 None => earliest_scheduled_time = Some(time),
818 Some(current) if time < current => earliest_scheduled_time = Some(time),
819 _ => {}
820 }
821 }
822 }
823 OperationProcessResult::Completed => {
824 }
826 OperationProcessResult::NotApplicable => {
827 }
829 }
830 }
831
832 if !has_pending_operations {
833 return ProcessOperationsResult::NoPendingOperations;
834 }
835
836 let advance_time_ms = if self.skip_time_config.enabled {
838 if let Some(end_ts) = earliest_scheduled_time {
839 let now_ms = chrono::Utc::now().timestamp_millis();
840 if end_ts > now_ms {
841 Some((end_ts - now_ms) as u64)
842 } else {
843 Some(0)
844 }
845 } else {
846 None
847 }
848 } else {
849 None
850 };
851
852 ProcessOperationsResult::ShouldReinvoke(advance_time_ms)
853 }
854
855 fn process_operation(
873 &mut self,
874 operation: &Operation,
875 execution_id: &str,
876 ) -> OperationProcessResult {
877 if operation.status.is_terminal() {
879 return OperationProcessResult::Completed;
880 }
881
882 match operation.operation_type {
883 OperationType::Wait => self.handle_wait_update(operation, execution_id),
884 OperationType::Step => self.handle_step_update(operation, execution_id),
885 OperationType::Callback => self.handle_callback_update(operation, execution_id),
886 OperationType::Execution => {
887 OperationProcessResult::NotApplicable
889 }
890 OperationType::Invoke | OperationType::Context => {
891 if operation.status == OperationStatus::Started {
893 OperationProcessResult::Pending(None)
894 } else {
895 OperationProcessResult::Completed
896 }
897 }
898 }
899 }
900
901 fn handle_wait_update(
923 &mut self,
924 operation: &Operation,
925 _execution_id: &str,
926 ) -> OperationProcessResult {
927 if operation.status != OperationStatus::Started {
929 return OperationProcessResult::Completed;
930 }
931
932 self.pending_operations
934 .insert(operation.operation_id.clone());
935
936 let scheduled_end_timestamp = operation
938 .wait_details
939 .as_ref()
940 .and_then(|details| details.scheduled_end_timestamp);
941
942 OperationProcessResult::Pending(scheduled_end_timestamp)
943 }
944
945 fn handle_step_update(
965 &mut self,
966 operation: &Operation,
967 _execution_id: &str,
968 ) -> OperationProcessResult {
969 if operation.status != OperationStatus::Pending
971 && operation.status != OperationStatus::Started
972 {
973 return OperationProcessResult::Completed;
974 }
975
976 self.pending_operations
978 .insert(operation.operation_id.clone());
979
980 let next_attempt_timestamp = operation
982 .step_details
983 .as_ref()
984 .and_then(|details| details.next_attempt_timestamp);
985
986 OperationProcessResult::Pending(next_attempt_timestamp)
987 }
988
989 fn handle_callback_update(
1008 &mut self,
1009 operation: &Operation,
1010 _execution_id: &str,
1011 ) -> OperationProcessResult {
1012 if operation.status != OperationStatus::Started {
1014 self.pending_operations.remove(&operation.operation_id);
1016 return OperationProcessResult::Completed;
1017 }
1018
1019 self.pending_operations
1021 .insert(operation.operation_id.clone());
1022
1023 OperationProcessResult::Pending(None)
1026 }
1027
1028 async fn populate_handles(&self, operations: &[Operation]) {
1049 {
1051 let mut shared_ops = self.shared_operations.write().await;
1052 shared_ops.clear();
1053 shared_ops.extend(operations.iter().cloned());
1054 }
1055
1056 for handle in &self.registered_handles {
1058 let matched_op = match &handle.matcher {
1059 OperationMatcher::ByName(name) => operations
1060 .iter()
1061 .find(|op| op.name.as_deref() == Some(name)),
1062 OperationMatcher::ByIndex(index) => operations.get(*index),
1063 OperationMatcher::ById(id) => operations.iter().find(|op| op.operation_id == *id),
1064 OperationMatcher::ByNameAndIndex(name, index) => operations
1065 .iter()
1066 .filter(|op| op.name.as_deref() == Some(name))
1067 .nth(*index),
1068 };
1069
1070 if let Some(op) = matched_op {
1071 {
1073 let mut inner = handle.inner.write().await;
1074 *inner = Some(op.clone());
1075 }
1076
1077 let _ = handle.status_tx.send(Some(op.status));
1079 }
1080 }
1081 }
1082
1083 pub fn schedule_invocation_at_timestamp(
1105 &mut self,
1106 timestamp_ms: i64,
1107 execution_id: &str,
1108 operation_id: &str,
1109 ) {
1110 let checkpoint_api = Arc::clone(&self.checkpoint_api);
1111 let execution_id_owned = execution_id.to_string();
1112 let operation_id_owned = operation_id.to_string();
1113 let skip_time_enabled = self.skip_time_config.enabled;
1114
1115 let timestamp = chrono::DateTime::from_timestamp_millis(timestamp_ms)
1117 .map(|dt| dt.with_timezone(&chrono::Utc));
1118
1119 let update_checkpoint: super::scheduler::CheckpointUpdateFn = Box::new(move || {
1121 let checkpoint_api = checkpoint_api;
1122 let execution_id = execution_id_owned;
1123 let operation_id = operation_id_owned;
1124
1125 Box::pin(async move {
1126 if skip_time_enabled {
1128 let now_ms = chrono::Utc::now().timestamp_millis();
1129 let target_ms = timestamp_ms;
1130 if target_ms > now_ms {
1131 let advance_duration =
1132 tokio::time::Duration::from_millis((target_ms - now_ms) as u64);
1133 tokio::time::advance(advance_duration).await;
1134 }
1135 }
1136
1137 let mut updated_operation = Operation::new(&operation_id, OperationType::Wait);
1139 updated_operation.status = OperationStatus::Succeeded;
1140 updated_operation.end_timestamp = Some(chrono::Utc::now().timestamp_millis());
1141
1142 let update_request = super::types::UpdateCheckpointDataRequest {
1143 execution_id: execution_id.clone(),
1144 operation_id: operation_id.clone(),
1145 operation_data: updated_operation,
1146 payload: None,
1147 error: None,
1148 };
1149
1150 let payload = serde_json::to_string(&update_request)
1151 .map_err(crate::error::TestError::SerializationError)?;
1152
1153 let response = checkpoint_api
1154 .send_api_request(super::types::ApiType::UpdateCheckpointData, payload)
1155 .await?;
1156
1157 if let Some(error) = response.error {
1158 return Err(crate::error::TestError::CheckpointServerError(error));
1159 }
1160
1161 Ok(())
1162 })
1163 });
1164
1165 let start_invocation: super::scheduler::BoxedAsyncFn = Box::new(|| {
1167 Box::pin(async {
1168 })
1171 });
1172
1173 let on_error: super::scheduler::ErrorHandler = Box::new(|error| {
1175 tracing::error!("Error during scheduled invocation: {:?}", error);
1176 });
1177
1178 self.scheduler.schedule_function(
1180 start_invocation,
1181 on_error,
1182 timestamp,
1183 Some(update_checkpoint),
1184 );
1185 }
1186
1187 pub fn schedule_invocation_with_update(
1201 &mut self,
1202 timestamp: Option<chrono::DateTime<chrono::Utc>>,
1203 update_checkpoint: Option<super::scheduler::CheckpointUpdateFn>,
1204 ) {
1205 let skip_time_enabled = self.skip_time_config.enabled;
1206
1207 let wrapped_update: Option<super::scheduler::CheckpointUpdateFn> = if skip_time_enabled {
1209 if let Some(ts) = timestamp {
1210 let original_update = update_checkpoint;
1211 Some(Box::new(move || {
1212 Box::pin(async move {
1213 let now = chrono::Utc::now();
1215 if ts > now {
1216 let duration = (ts - now).to_std().unwrap_or_default();
1217 tokio::time::advance(duration).await;
1218 }
1219
1220 if let Some(update_fn) = original_update {
1222 update_fn().await?;
1223 }
1224
1225 Ok(())
1226 })
1227 }))
1228 } else {
1229 update_checkpoint
1230 }
1231 } else {
1232 update_checkpoint
1233 };
1234
1235 let start_invocation: super::scheduler::BoxedAsyncFn = Box::new(|| {
1237 Box::pin(async {
1238 })
1240 });
1241
1242 let on_error: super::scheduler::ErrorHandler = Box::new(|error| {
1244 tracing::error!("Error during scheduled invocation: {:?}", error);
1245 });
1246
1247 self.scheduler
1249 .schedule_function(start_invocation, on_error, timestamp, wrapped_update);
1250 }
1251
1252 pub fn has_scheduled_functions(&self) -> bool {
1263 self.scheduler.has_scheduled_function()
1264 }
1265
1266 pub async fn invoke_handler(
1295 &mut self,
1296 payload: I,
1297 execution_id: &str,
1298 is_initial: bool,
1299 ) -> Result<InvokeHandlerResult<O>, crate::error::TestError> {
1300 use super::types::{ApiType, StartDurableExecutionRequest, StartInvocationRequest};
1301 use durable_execution_sdk::lambda::InitialExecutionState;
1302 use durable_execution_sdk::state::ExecutionState;
1303
1304 if self.skip_time_config.enabled && self.invocation_active.load(Ordering::SeqCst) {
1306 return Err(crate::error::TestError::CheckpointServerError(
1307 "Concurrent invocation detected in time-skip mode. Only one invocation can be active at a time.".to_string(),
1308 ));
1309 }
1310
1311 let invocation_id = uuid::Uuid::new_v4().to_string();
1313 let checkpoint_token = if is_initial {
1314 let payload_json = serde_json::to_string(&payload)?;
1316 let start_request = StartDurableExecutionRequest {
1317 invocation_id: invocation_id.clone(),
1318 payload: Some(payload_json),
1319 };
1320 let start_payload = serde_json::to_string(&start_request)?;
1321
1322 let start_response = self
1323 .checkpoint_api
1324 .send_api_request(ApiType::StartDurableExecution, start_payload)
1325 .await?;
1326
1327 if let Some(error) = start_response.error {
1328 return Err(crate::error::TestError::CheckpointServerError(error));
1329 }
1330
1331 let invocation_result: super::InvocationResult =
1332 serde_json::from_str(&start_response.payload.ok_or_else(|| {
1333 crate::error::TestError::CheckpointServerError(
1334 "Empty response from checkpoint server".to_string(),
1335 )
1336 })?)?;
1337
1338 self.execution_id = Some(invocation_result.execution_id.clone());
1340 self.checkpoint_token = Some(invocation_result.checkpoint_token.clone());
1341
1342 invocation_result.checkpoint_token
1343 } else {
1344 let start_invocation_request = StartInvocationRequest {
1346 execution_id: execution_id.to_string(),
1347 invocation_id: invocation_id.clone(),
1348 };
1349 let start_payload = serde_json::to_string(&start_invocation_request)?;
1350
1351 let start_response = self
1352 .checkpoint_api
1353 .send_api_request(ApiType::StartInvocation, start_payload)
1354 .await?;
1355
1356 if let Some(error) = start_response.error {
1357 return Err(crate::error::TestError::CheckpointServerError(error));
1358 }
1359
1360 let invocation_result: super::InvocationResult =
1361 serde_json::from_str(&start_response.payload.ok_or_else(|| {
1362 crate::error::TestError::CheckpointServerError(
1363 "Empty response from checkpoint server".to_string(),
1364 )
1365 })?)?;
1366
1367 self.checkpoint_token = Some(invocation_result.checkpoint_token.clone());
1369
1370 invocation_result.checkpoint_token
1371 };
1372
1373 let initial_state = InitialExecutionState::new();
1375 let execution_state = Arc::new(ExecutionState::new(
1376 execution_id,
1377 &checkpoint_token,
1378 initial_state,
1379 self.checkpoint_api.clone(),
1380 ));
1381
1382 let ctx = DurableContext::new(execution_state.clone());
1384
1385 let start_time = chrono::Utc::now();
1387 let mut invocation = Invocation::with_start(start_time);
1388
1389 self.invocation_active.store(true, Ordering::SeqCst);
1391
1392 let handler_result = (self.handler)(payload.clone(), ctx).await;
1394
1395 self.invocation_active.store(false, Ordering::SeqCst);
1397
1398 let end_time = chrono::Utc::now();
1400 invocation = invocation.with_end(end_time);
1401
1402 let operations = match self.checkpoint_api.get_operations(execution_id, "").await {
1404 Ok(response) => {
1405 let mut storage = self.operation_storage.write().await;
1406 for op in &response.operations {
1407 storage.update_operation(op.clone());
1408 }
1409 response.operations
1410 }
1411 Err(_) => Vec::new(),
1412 };
1413
1414 match handler_result {
1416 Ok(result) => {
1417 self.execution_complete.store(true, Ordering::SeqCst);
1419 Ok(InvokeHandlerResult::Succeeded {
1420 result,
1421 operations,
1422 invocation,
1423 })
1424 }
1425 Err(error) => {
1426 if error.is_suspend() {
1427 let process_result = self.process_operations(&operations, execution_id);
1429
1430 match process_result {
1431 ProcessOperationsResult::ExecutionSucceeded(result_str) => {
1432 self.execution_complete.store(true, Ordering::SeqCst);
1433 if let Ok(result) = serde_json::from_str::<O>(&result_str) {
1434 Ok(InvokeHandlerResult::Succeeded {
1435 result,
1436 operations,
1437 invocation,
1438 })
1439 } else {
1440 Ok(InvokeHandlerResult::Pending {
1441 operations,
1442 invocation,
1443 should_reinvoke: false,
1444 advance_time_ms: None,
1445 })
1446 }
1447 }
1448 ProcessOperationsResult::ExecutionFailed(test_error) => {
1449 self.execution_complete.store(true, Ordering::SeqCst);
1450 Ok(InvokeHandlerResult::Failed {
1451 error: test_error,
1452 operations,
1453 invocation,
1454 })
1455 }
1456 ProcessOperationsResult::NoPendingOperations => {
1457 Ok(InvokeHandlerResult::Pending {
1459 operations,
1460 invocation,
1461 should_reinvoke: false,
1462 advance_time_ms: None,
1463 })
1464 }
1465 ProcessOperationsResult::ShouldReinvoke(advance_time_ms) => {
1466 Ok(InvokeHandlerResult::Pending {
1468 operations,
1469 invocation,
1470 should_reinvoke: true,
1471 advance_time_ms,
1472 })
1473 }
1474 }
1475 } else {
1476 self.execution_complete.store(true, Ordering::SeqCst);
1478 let error_obj = durable_execution_sdk::ErrorObject::from(&error);
1479 let test_error = TestResultError::new(error_obj.error_type, error.to_string());
1480 let invocation_with_error = invocation.with_error(test_error.clone());
1481 Ok(InvokeHandlerResult::Failed {
1482 error: test_error,
1483 operations,
1484 invocation: invocation_with_error,
1485 })
1486 }
1487 }
1488 }
1489 }
1490
1491 pub fn flush_scheduled_functions(&mut self) {
1500 self.scheduler.flush_timers();
1501 }
1502
1503 pub async fn process_next_scheduled(&mut self) -> bool {
1509 self.scheduler.process_next().await
1510 }
1511
1512 fn handle_execution_update(&self, operations: &[Operation]) -> Option<ProcessOperationsResult> {
1530 let execution_op = operations
1532 .iter()
1533 .find(|op| op.operation_type == OperationType::Execution)?;
1534
1535 match execution_op.status {
1536 OperationStatus::Succeeded => {
1537 let result_str = execution_op.result.clone().unwrap_or_default();
1538 Some(ProcessOperationsResult::ExecutionSucceeded(result_str))
1539 }
1540 OperationStatus::Failed => {
1541 let error = if let Some(err) = &execution_op.error {
1542 TestResultError::new(err.error_type.clone(), err.error_message.clone())
1543 } else {
1544 TestResultError::new("ExecutionFailed", "Execution failed")
1545 };
1546 Some(ProcessOperationsResult::ExecutionFailed(error))
1547 }
1548 _ => None,
1549 }
1550 }
1551}
1552
1553#[derive(Debug)]
1557pub enum ProcessOperationsResult {
1558 ExecutionSucceeded(String),
1560 ExecutionFailed(TestResultError),
1562 NoPendingOperations,
1564 ShouldReinvoke(Option<u64>),
1566}
1567
1568#[derive(Debug)]
1572pub enum OperationProcessResult {
1573 Pending(Option<i64>),
1575 Completed,
1577 NotApplicable,
1579}
1580
1581#[derive(Debug)]
1585pub enum InvokeHandlerResult<T> {
1586 Succeeded {
1588 result: T,
1590 operations: Vec<Operation>,
1592 invocation: Invocation,
1594 },
1595 Failed {
1597 error: TestResultError,
1599 operations: Vec<Operation>,
1601 invocation: Invocation,
1603 },
1604 Pending {
1606 operations: Vec<Operation>,
1608 invocation: Invocation,
1610 should_reinvoke: bool,
1612 advance_time_ms: Option<u64>,
1614 },
1615}
1616
1617#[cfg(test)]
1618mod tests {
1619 use super::*;
1620 use durable_execution_sdk::{ErrorObject, StepDetails, WaitDetails};
1621
1622 #[test]
1623 fn test_skip_time_config_default() {
1624 let config = SkipTimeConfig::default();
1625 assert!(!config.enabled);
1626 }
1627
1628 #[test]
1629 fn test_operation_storage_new() {
1630 let storage = OperationStorage::new();
1631 assert!(storage.is_empty());
1632 assert_eq!(storage.len(), 0);
1633 }
1634
1635 #[test]
1636 fn test_operation_storage_add_and_get() {
1637 let mut storage = OperationStorage::new();
1638
1639 let op = Operation::new("op-1", durable_execution_sdk::OperationType::Step);
1640 storage.add_operation(op);
1641
1642 assert_eq!(storage.len(), 1);
1643 assert!(storage.get_by_id("op-1").is_some());
1644 }
1645
1646 #[test]
1647 fn test_operation_storage_update() {
1648 let mut storage = OperationStorage::new();
1649
1650 let mut op = Operation::new("op-1", durable_execution_sdk::OperationType::Step);
1651 op.status = durable_execution_sdk::OperationStatus::Started;
1652 storage.add_operation(op);
1653
1654 let mut updated_op = Operation::new("op-1", durable_execution_sdk::OperationType::Step);
1655 updated_op.status = durable_execution_sdk::OperationStatus::Succeeded;
1656 storage.update_operation(updated_op);
1657
1658 assert_eq!(storage.len(), 1);
1659 let retrieved = storage.get_by_id("op-1").unwrap();
1660 assert_eq!(
1661 retrieved.status,
1662 durable_execution_sdk::OperationStatus::Succeeded
1663 );
1664 }
1665
1666 #[test]
1667 fn test_test_execution_result_success() {
1668 let result: TestExecutionResult<String> =
1669 TestExecutionResult::success("test".to_string(), vec![], "exec-1".to_string());
1670 assert_eq!(result.status, ExecutionStatus::Succeeded);
1671 assert_eq!(result.result, Some("test".to_string()));
1672 assert!(result.error.is_none());
1673 }
1674
1675 #[test]
1676 fn test_test_execution_result_failure() {
1677 let error = TestResultError::new("TestError", "test error");
1678 let result: TestExecutionResult<String> =
1679 TestExecutionResult::failure(error, vec![], "exec-1".to_string());
1680 assert_eq!(result.status, ExecutionStatus::Failed);
1681 assert!(result.result.is_none());
1682 assert!(result.error.is_some());
1683 }
1684
1685 #[test]
1686 fn test_test_execution_result_running() {
1687 let result: TestExecutionResult<String> =
1688 TestExecutionResult::running(vec![], "exec-1".to_string());
1689 assert_eq!(result.status, ExecutionStatus::Running);
1690 assert!(result.result.is_none());
1691 assert!(result.error.is_none());
1692 }
1693
1694 #[test]
1696 fn test_process_operations_result_execution_succeeded() {
1697 let result = ProcessOperationsResult::ExecutionSucceeded("test result".to_string());
1698 match result {
1699 ProcessOperationsResult::ExecutionSucceeded(s) => assert_eq!(s, "test result"),
1700 _ => panic!("Expected ExecutionSucceeded"),
1701 }
1702 }
1703
1704 #[test]
1705 fn test_process_operations_result_execution_failed() {
1706 let error = TestResultError::new("TestError", "test error");
1707 let result = ProcessOperationsResult::ExecutionFailed(error);
1708 match result {
1709 ProcessOperationsResult::ExecutionFailed(e) => {
1710 assert_eq!(e.error_type, Some("TestError".to_string()));
1711 }
1712 _ => panic!("Expected ExecutionFailed"),
1713 }
1714 }
1715
1716 #[test]
1717 fn test_process_operations_result_no_pending() {
1718 let result = ProcessOperationsResult::NoPendingOperations;
1719 assert!(matches!(
1720 result,
1721 ProcessOperationsResult::NoPendingOperations
1722 ));
1723 }
1724
1725 #[test]
1726 fn test_process_operations_result_should_reinvoke() {
1727 let result = ProcessOperationsResult::ShouldReinvoke(Some(1000));
1728 match result {
1729 ProcessOperationsResult::ShouldReinvoke(Some(ms)) => assert_eq!(ms, 1000),
1730 _ => panic!("Expected ShouldReinvoke with time"),
1731 }
1732 }
1733
1734 #[test]
1736 fn test_operation_process_result_pending_with_timestamp() {
1737 let result = OperationProcessResult::Pending(Some(1234567890));
1738 match result {
1739 OperationProcessResult::Pending(Some(ts)) => assert_eq!(ts, 1234567890),
1740 _ => panic!("Expected Pending with timestamp"),
1741 }
1742 }
1743
1744 #[test]
1745 fn test_operation_process_result_pending_without_timestamp() {
1746 let result = OperationProcessResult::Pending(None);
1747 match result {
1748 OperationProcessResult::Pending(None) => {}
1749 _ => panic!("Expected Pending without timestamp"),
1750 }
1751 }
1752
1753 #[test]
1754 fn test_operation_process_result_completed() {
1755 let result = OperationProcessResult::Completed;
1756 assert!(matches!(result, OperationProcessResult::Completed));
1757 }
1758
1759 #[test]
1760 fn test_operation_process_result_not_applicable() {
1761 let result = OperationProcessResult::NotApplicable;
1762 assert!(matches!(result, OperationProcessResult::NotApplicable));
1763 }
1764
1765 #[test]
1767 fn test_handle_execution_update_succeeded() {
1768 let mut exec_op = Operation::new("exec-1", OperationType::Execution);
1771 exec_op.status = OperationStatus::Succeeded;
1772 exec_op.result = Some("\"success\"".to_string());
1773
1774 let operations = vec![exec_op];
1775
1776 let execution_op = operations
1778 .iter()
1779 .find(|op| op.operation_type == OperationType::Execution);
1780
1781 assert!(execution_op.is_some());
1782 let exec = execution_op.unwrap();
1783 assert_eq!(exec.status, OperationStatus::Succeeded);
1784 assert_eq!(exec.result, Some("\"success\"".to_string()));
1785 }
1786
1787 #[test]
1788 fn test_handle_execution_update_failed() {
1789 let mut exec_op = Operation::new("exec-1", OperationType::Execution);
1790 exec_op.status = OperationStatus::Failed;
1791 exec_op.error = Some(ErrorObject {
1792 error_type: "TestError".to_string(),
1793 error_message: "Test error message".to_string(),
1794 stack_trace: None,
1795 });
1796
1797 let operations = vec![exec_op];
1798
1799 let execution_op = operations
1800 .iter()
1801 .find(|op| op.operation_type == OperationType::Execution);
1802
1803 assert!(execution_op.is_some());
1804 let exec = execution_op.unwrap();
1805 assert_eq!(exec.status, OperationStatus::Failed);
1806 assert!(exec.error.is_some());
1807 }
1808
1809 #[test]
1810 fn test_handle_execution_update_still_running() {
1811 let mut exec_op = Operation::new("exec-1", OperationType::Execution);
1812 exec_op.status = OperationStatus::Started;
1813
1814 let operations = vec![exec_op];
1815
1816 let execution_op = operations
1817 .iter()
1818 .find(|op| op.operation_type == OperationType::Execution);
1819
1820 assert!(execution_op.is_some());
1821 let exec = execution_op.unwrap();
1822 assert_eq!(exec.status, OperationStatus::Started);
1823 }
1824
1825 #[test]
1827 fn test_wait_operation_started_with_timestamp() {
1828 let mut wait_op = Operation::new("wait-1", OperationType::Wait);
1829 wait_op.status = OperationStatus::Started;
1830 wait_op.wait_details = Some(WaitDetails {
1831 scheduled_end_timestamp: Some(1234567890000),
1832 });
1833
1834 assert!(wait_op.wait_details.is_some());
1836 let details = wait_op.wait_details.as_ref().unwrap();
1837 assert_eq!(details.scheduled_end_timestamp, Some(1234567890000));
1838 }
1839
1840 #[test]
1841 fn test_wait_operation_completed() {
1842 let mut wait_op = Operation::new("wait-1", OperationType::Wait);
1843 wait_op.status = OperationStatus::Succeeded;
1844
1845 assert!(wait_op.status.is_terminal());
1847 }
1848
1849 #[test]
1851 fn test_step_operation_pending_retry() {
1852 let mut step_op = Operation::new("step-1", OperationType::Step);
1853 step_op.status = OperationStatus::Pending;
1854 step_op.step_details = Some(StepDetails {
1855 result: None,
1856 attempt: Some(1),
1857 next_attempt_timestamp: Some(1234567890000),
1858 error: None,
1859 payload: None,
1860 });
1861
1862 assert!(step_op.step_details.is_some());
1864 let details = step_op.step_details.as_ref().unwrap();
1865 assert_eq!(details.next_attempt_timestamp, Some(1234567890000));
1866 assert_eq!(details.attempt, Some(1));
1867 }
1868
1869 #[test]
1870 fn test_step_operation_succeeded() {
1871 let mut step_op = Operation::new("step-1", OperationType::Step);
1872 step_op.status = OperationStatus::Succeeded;
1873 step_op.step_details = Some(StepDetails {
1874 result: Some("\"result\"".to_string()),
1875 attempt: Some(0),
1876 next_attempt_timestamp: None,
1877 error: None,
1878 payload: None,
1879 });
1880
1881 assert!(step_op.status.is_terminal());
1883 }
1884
1885 #[test]
1887 fn test_callback_operation_started() {
1888 let mut callback_op = Operation::new("callback-1", OperationType::Callback);
1889 callback_op.status = OperationStatus::Started;
1890
1891 assert_eq!(callback_op.status, OperationStatus::Started);
1893 assert!(!callback_op.status.is_terminal());
1894 }
1895
1896 #[test]
1897 fn test_callback_operation_succeeded() {
1898 let mut callback_op = Operation::new("callback-1", OperationType::Callback);
1899 callback_op.status = OperationStatus::Succeeded;
1900
1901 assert!(callback_op.status.is_terminal());
1903 }
1904
1905 #[test]
1907 fn test_operation_type_dispatch_wait() {
1908 let op = Operation::new("op-1", OperationType::Wait);
1909 assert_eq!(op.operation_type, OperationType::Wait);
1910 }
1911
1912 #[test]
1913 fn test_operation_type_dispatch_step() {
1914 let op = Operation::new("op-1", OperationType::Step);
1915 assert_eq!(op.operation_type, OperationType::Step);
1916 }
1917
1918 #[test]
1919 fn test_operation_type_dispatch_callback() {
1920 let op = Operation::new("op-1", OperationType::Callback);
1921 assert_eq!(op.operation_type, OperationType::Callback);
1922 }
1923
1924 #[test]
1925 fn test_operation_type_dispatch_execution() {
1926 let op = Operation::new("op-1", OperationType::Execution);
1927 assert_eq!(op.operation_type, OperationType::Execution);
1928 }
1929
1930 #[test]
1931 fn test_operation_type_dispatch_invoke() {
1932 let op = Operation::new("op-1", OperationType::Invoke);
1933 assert_eq!(op.operation_type, OperationType::Invoke);
1934 }
1935
1936 #[test]
1937 fn test_operation_type_dispatch_context() {
1938 let op = Operation::new("op-1", OperationType::Context);
1939 assert_eq!(op.operation_type, OperationType::Context);
1940 }
1941
1942 #[test]
1944 fn test_earliest_scheduled_time_single_wait() {
1945 let mut wait_op = Operation::new("wait-1", OperationType::Wait);
1946 wait_op.status = OperationStatus::Started;
1947 wait_op.wait_details = Some(WaitDetails {
1948 scheduled_end_timestamp: Some(1000),
1949 });
1950
1951 let operations = vec![wait_op];
1952
1953 let mut earliest: Option<i64> = None;
1954 for op in &operations {
1955 if op.operation_type == OperationType::Wait && op.status == OperationStatus::Started {
1956 if let Some(details) = &op.wait_details {
1957 if let Some(end_ts) = details.scheduled_end_timestamp {
1958 match earliest {
1959 None => earliest = Some(end_ts),
1960 Some(current) if end_ts < current => earliest = Some(end_ts),
1961 _ => {}
1962 }
1963 }
1964 }
1965 }
1966 }
1967
1968 assert_eq!(earliest, Some(1000));
1969 }
1970
1971 #[test]
1972 fn test_earliest_scheduled_time_multiple_waits() {
1973 let mut wait_op1 = Operation::new("wait-1", OperationType::Wait);
1974 wait_op1.status = OperationStatus::Started;
1975 wait_op1.wait_details = Some(WaitDetails {
1976 scheduled_end_timestamp: Some(2000),
1977 });
1978
1979 let mut wait_op2 = Operation::new("wait-2", OperationType::Wait);
1980 wait_op2.status = OperationStatus::Started;
1981 wait_op2.wait_details = Some(WaitDetails {
1982 scheduled_end_timestamp: Some(1000),
1983 });
1984
1985 let mut wait_op3 = Operation::new("wait-3", OperationType::Wait);
1986 wait_op3.status = OperationStatus::Started;
1987 wait_op3.wait_details = Some(WaitDetails {
1988 scheduled_end_timestamp: Some(3000),
1989 });
1990
1991 let operations = vec![wait_op1, wait_op2, wait_op3];
1992
1993 let mut earliest: Option<i64> = None;
1994 for op in &operations {
1995 if op.operation_type == OperationType::Wait && op.status == OperationStatus::Started {
1996 if let Some(details) = &op.wait_details {
1997 if let Some(end_ts) = details.scheduled_end_timestamp {
1998 match earliest {
1999 None => earliest = Some(end_ts),
2000 Some(current) if end_ts < current => earliest = Some(end_ts),
2001 _ => {}
2002 }
2003 }
2004 }
2005 }
2006 }
2007
2008 assert_eq!(earliest, Some(1000)); }
2010
2011 #[tokio::test]
2013 async fn test_schedule_invocation_at_timestamp_schedules_function() {
2014 use super::*;
2015 use std::sync::Arc;
2016 use tokio::sync::RwLock;
2017
2018 let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2020
2021 let handler =
2023 |_input: String, _ctx: DurableContext| async move { Ok("result".to_string()) };
2024
2025 let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2026 let mut orchestrator = TestExecutionOrchestrator::new(
2027 handler,
2028 operation_storage,
2029 checkpoint_api,
2030 SkipTimeConfig { enabled: false },
2031 );
2032
2033 let future_timestamp = chrono::Utc::now().timestamp_millis() + 1000;
2035 orchestrator.schedule_invocation_at_timestamp(future_timestamp, "exec-1", "wait-1");
2036
2037 assert!(orchestrator.has_scheduled_functions());
2039 }
2040
2041 #[tokio::test]
2042 async fn test_schedule_invocation_with_update_schedules_function() {
2043 use super::*;
2044 use std::sync::Arc;
2045 use tokio::sync::RwLock;
2046
2047 let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2049
2050 let handler =
2052 |_input: String, _ctx: DurableContext| async move { Ok("result".to_string()) };
2053
2054 let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2055 let mut orchestrator = TestExecutionOrchestrator::new(
2056 handler,
2057 operation_storage,
2058 checkpoint_api,
2059 SkipTimeConfig { enabled: false },
2060 );
2061
2062 orchestrator.schedule_invocation_with_update(None, None);
2064
2065 assert!(orchestrator.has_scheduled_functions());
2067 }
2068
2069 #[tokio::test]
2070 async fn test_flush_scheduled_functions_clears_queue() {
2071 use super::*;
2072 use std::sync::Arc;
2073 use tokio::sync::RwLock;
2074
2075 let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2077
2078 let handler =
2080 |_input: String, _ctx: DurableContext| async move { Ok("result".to_string()) };
2081
2082 let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2083 let mut orchestrator = TestExecutionOrchestrator::new(
2084 handler,
2085 operation_storage,
2086 checkpoint_api,
2087 SkipTimeConfig { enabled: false },
2088 );
2089
2090 let future_timestamp = chrono::Utc::now().timestamp_millis() + 1000;
2092 orchestrator.schedule_invocation_at_timestamp(future_timestamp, "exec-1", "wait-1");
2093 orchestrator.schedule_invocation_at_timestamp(future_timestamp + 1000, "exec-1", "wait-2");
2094
2095 assert!(orchestrator.has_scheduled_functions());
2097
2098 orchestrator.flush_scheduled_functions();
2100
2101 assert!(!orchestrator.has_scheduled_functions());
2103 }
2104
2105 #[tokio::test]
2106 async fn test_process_next_scheduled_processes_function() {
2107 use super::*;
2108 use std::sync::Arc;
2109 use tokio::sync::RwLock;
2110
2111 let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2113
2114 let handler =
2116 |_input: String, _ctx: DurableContext| async move { Ok("result".to_string()) };
2117
2118 let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2119 let mut orchestrator = TestExecutionOrchestrator::new(
2120 handler,
2121 operation_storage,
2122 checkpoint_api,
2123 SkipTimeConfig { enabled: false },
2124 );
2125
2126 orchestrator.schedule_invocation_with_update(None, None);
2128
2129 let processed = orchestrator.process_next_scheduled().await;
2131 assert!(processed);
2132
2133 assert!(!orchestrator.has_scheduled_functions());
2135 }
2136
2137 #[tokio::test]
2138 async fn test_schedule_invocation_with_time_skipping_enabled() {
2139 use super::*;
2140 use std::sync::Arc;
2141 use tokio::sync::RwLock;
2142
2143 let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2145
2146 let handler =
2148 |_input: String, _ctx: DurableContext| async move { Ok("result".to_string()) };
2149
2150 let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2151 let mut orchestrator = TestExecutionOrchestrator::new(
2152 handler,
2153 operation_storage,
2154 checkpoint_api,
2155 SkipTimeConfig { enabled: true },
2156 );
2157
2158 assert!(orchestrator.is_time_skipping_enabled());
2160
2161 let future_timestamp = chrono::Utc::now().timestamp_millis() + 5000;
2163 orchestrator.schedule_invocation_at_timestamp(future_timestamp, "exec-1", "wait-1");
2164
2165 assert!(orchestrator.has_scheduled_functions());
2167 }
2168
2169 #[test]
2171 fn test_invoke_handler_result_succeeded() {
2172 let invocation = Invocation::with_start(chrono::Utc::now());
2173 let result: InvokeHandlerResult<String> = InvokeHandlerResult::Succeeded {
2174 result: "test result".to_string(),
2175 operations: vec![],
2176 invocation,
2177 };
2178
2179 match result {
2180 InvokeHandlerResult::Succeeded {
2181 result, operations, ..
2182 } => {
2183 assert_eq!(result, "test result");
2184 assert!(operations.is_empty());
2185 }
2186 _ => panic!("Expected Succeeded variant"),
2187 }
2188 }
2189
2190 #[test]
2191 fn test_invoke_handler_result_failed() {
2192 let invocation = Invocation::with_start(chrono::Utc::now());
2193 let error = TestResultError::new("TestError", "test error message");
2194 let result: InvokeHandlerResult<String> = InvokeHandlerResult::Failed {
2195 error,
2196 operations: vec![],
2197 invocation,
2198 };
2199
2200 match result {
2201 InvokeHandlerResult::Failed {
2202 error, operations, ..
2203 } => {
2204 assert_eq!(error.error_type, Some("TestError".to_string()));
2205 assert!(operations.is_empty());
2206 }
2207 _ => panic!("Expected Failed variant"),
2208 }
2209 }
2210
2211 #[test]
2212 fn test_invoke_handler_result_pending_with_reinvoke() {
2213 let invocation = Invocation::with_start(chrono::Utc::now());
2214 let result: InvokeHandlerResult<String> = InvokeHandlerResult::Pending {
2215 operations: vec![],
2216 invocation,
2217 should_reinvoke: true,
2218 advance_time_ms: Some(5000),
2219 };
2220
2221 match result {
2222 InvokeHandlerResult::Pending {
2223 should_reinvoke,
2224 advance_time_ms,
2225 ..
2226 } => {
2227 assert!(should_reinvoke);
2228 assert_eq!(advance_time_ms, Some(5000));
2229 }
2230 _ => panic!("Expected Pending variant"),
2231 }
2232 }
2233
2234 #[test]
2235 fn test_invoke_handler_result_pending_without_reinvoke() {
2236 let invocation = Invocation::with_start(chrono::Utc::now());
2237 let result: InvokeHandlerResult<String> = InvokeHandlerResult::Pending {
2238 operations: vec![],
2239 invocation,
2240 should_reinvoke: false,
2241 advance_time_ms: None,
2242 };
2243
2244 match result {
2245 InvokeHandlerResult::Pending {
2246 should_reinvoke,
2247 advance_time_ms,
2248 ..
2249 } => {
2250 assert!(!should_reinvoke);
2251 assert_eq!(advance_time_ms, None);
2252 }
2253 _ => panic!("Expected Pending variant"),
2254 }
2255 }
2256
2257 #[tokio::test]
2259 async fn test_invoke_handler_creates_orchestrator_state() {
2260 use super::*;
2261 use std::sync::Arc;
2262 use tokio::sync::RwLock;
2263
2264 let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2266
2267 let handler =
2269 |_input: String, _ctx: DurableContext| async move { Ok("result".to_string()) };
2270
2271 let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2272 let orchestrator = TestExecutionOrchestrator::new(
2273 handler,
2274 operation_storage,
2275 checkpoint_api,
2276 SkipTimeConfig { enabled: false },
2277 );
2278
2279 assert!(orchestrator.execution_id().is_none());
2281 assert!(orchestrator.checkpoint_token().is_none());
2282 assert!(!orchestrator.is_execution_complete());
2283 assert!(!orchestrator.is_invocation_active());
2284 }
2285
2286 #[tokio::test]
2287 async fn test_invoke_handler_time_skip_mode_prevents_concurrent() {
2288 use super::*;
2289 use std::sync::Arc;
2290 use tokio::sync::RwLock;
2291
2292 let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2294
2295 let handler =
2297 |_input: String, _ctx: DurableContext| async move { Ok("result".to_string()) };
2298
2299 let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2300 let orchestrator = TestExecutionOrchestrator::new(
2301 handler,
2302 operation_storage,
2303 checkpoint_api,
2304 SkipTimeConfig { enabled: true },
2305 );
2306
2307 assert!(orchestrator.is_time_skipping_enabled());
2309
2310 assert!(!orchestrator.is_invocation_active());
2312 }
2313
2314 #[tokio::test]
2315 async fn test_invoke_handler_tracks_invocation_active_state() {
2316 use super::*;
2317 use std::sync::atomic::{AtomicBool, Ordering};
2318 use std::sync::Arc;
2319 use tokio::sync::RwLock;
2320
2321 let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2323
2324 let was_active = Arc::new(AtomicBool::new(false));
2326 let was_active_clone = Arc::clone(&was_active);
2327
2328 let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2330 let orchestrator = TestExecutionOrchestrator::new(
2331 move |_input: String, _ctx: DurableContext| {
2332 let was_active = Arc::clone(&was_active_clone);
2333 async move {
2334 was_active.store(true, Ordering::SeqCst);
2337 Ok("result".to_string())
2338 }
2339 },
2340 operation_storage,
2341 checkpoint_api,
2342 SkipTimeConfig { enabled: false },
2343 );
2344
2345 assert!(!orchestrator.is_invocation_active());
2347 }
2348}
2349
2350#[cfg(test)]
2354mod property_tests {
2355 use super::*;
2356 use durable_execution_sdk::{OperationType, WaitDetails};
2357 use proptest::prelude::*;
2358
2359 fn wait_duration_strategy() -> impl Strategy<Value = u64> {
2361 1u64..=60
2362 }
2363
2364 fn multiple_wait_durations_strategy() -> impl Strategy<Value = Vec<u64>> {
2366 prop::collection::vec(wait_duration_strategy(), 1..=3)
2367 }
2368
2369 proptest! {
2370 #[test]
2383 fn prop_wait_operation_completion(wait_seconds in wait_duration_strategy()) {
2384 let rt = tokio::runtime::Builder::new_current_thread()
2386 .enable_all()
2387 .build()
2388 .unwrap();
2389
2390 rt.block_on(async {
2391 let now_ms = chrono::Utc::now().timestamp_millis();
2393 let scheduled_end_ms = now_ms + (wait_seconds as i64 * 1000);
2394
2395 let mut wait_op = Operation::new("wait-test", OperationType::Wait);
2397 wait_op.status = OperationStatus::Started;
2398 wait_op.wait_details = Some(WaitDetails {
2399 scheduled_end_timestamp: Some(scheduled_end_ms),
2400 });
2401
2402 prop_assert!(wait_op.wait_details.is_some());
2404 let details = wait_op.wait_details.as_ref().unwrap();
2405 prop_assert_eq!(details.scheduled_end_timestamp, Some(scheduled_end_ms));
2406
2407 let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2409 let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2410
2411 let handler = |_input: String, _ctx: DurableContext| async move {
2412 Ok("result".to_string())
2413 };
2414
2415 let mut orchestrator = TestExecutionOrchestrator::new(
2416 handler,
2417 operation_storage.clone(),
2418 checkpoint_api,
2419 SkipTimeConfig { enabled: true },
2420 );
2421
2422 prop_assert!(orchestrator.is_time_skipping_enabled());
2424
2425 let operations = vec![wait_op.clone()];
2427 let result = orchestrator.process_operations(&operations, "exec-test");
2428
2429 match result {
2431 ProcessOperationsResult::ShouldReinvoke(advance_time_ms) => {
2432 prop_assert!(
2434 advance_time_ms.is_some(),
2435 "Should have advance time when time skipping is enabled"
2436 );
2437
2438 if let Some(advance_ms) = advance_time_ms {
2441 let expected_min = (wait_seconds as u64).saturating_sub(1) * 1000;
2443 let expected_max = (wait_seconds as u64 + 1) * 1000;
2444 prop_assert!(
2445 advance_ms >= expected_min && advance_ms <= expected_max,
2446 "Advance time {} should be approximately {} seconds ({}ms - {}ms)",
2447 advance_ms, wait_seconds, expected_min, expected_max
2448 );
2449 }
2450 }
2451 ProcessOperationsResult::NoPendingOperations => {
2452 }
2455 other => {
2456 prop_assert!(
2457 false,
2458 "Expected ShouldReinvoke or NoPendingOperations, got {:?}",
2459 other
2460 );
2461 }
2462 }
2463
2464 prop_assert!(
2466 orchestrator.pending_operations.contains("wait-test"),
2467 "Wait operation should be tracked as pending"
2468 );
2469
2470 Ok(())
2471 })?;
2472 }
2473
2474 #[test]
2482 fn prop_wait_operation_completion_multiple_waits(
2483 wait_durations in multiple_wait_durations_strategy()
2484 ) {
2485 let rt = tokio::runtime::Builder::new_current_thread()
2486 .enable_all()
2487 .build()
2488 .unwrap();
2489
2490 rt.block_on(async {
2491 let now_ms = chrono::Utc::now().timestamp_millis();
2492
2493 let mut operations = Vec::new();
2495 for (i, &duration) in wait_durations.iter().enumerate() {
2496 let scheduled_end_ms = now_ms + (duration as i64 * 1000);
2497 let mut wait_op = Operation::new(&format!("wait-{}", i), OperationType::Wait);
2498 wait_op.status = OperationStatus::Started;
2499 wait_op.wait_details = Some(WaitDetails {
2500 scheduled_end_timestamp: Some(scheduled_end_ms),
2501 });
2502 operations.push(wait_op);
2503 }
2504
2505 let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2507 let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2508
2509 let handler = |_input: String, _ctx: DurableContext| async move {
2510 Ok("result".to_string())
2511 };
2512
2513 let mut orchestrator = TestExecutionOrchestrator::new(
2514 handler,
2515 operation_storage,
2516 checkpoint_api,
2517 SkipTimeConfig { enabled: true },
2518 );
2519
2520 let result = orchestrator.process_operations(&operations, "exec-test");
2522
2523 let min_duration = wait_durations.iter().min().copied().unwrap_or(0);
2525
2526 match result {
2527 ProcessOperationsResult::ShouldReinvoke(advance_time_ms) => {
2528 if let Some(advance_ms) = advance_time_ms {
2530 let expected_min = min_duration.saturating_sub(1) * 1000;
2532 let expected_max = (min_duration + 1) * 1000;
2533 prop_assert!(
2534 advance_ms >= expected_min && advance_ms <= expected_max,
2535 "Advance time {} should be approximately {} seconds (min duration)",
2536 advance_ms, min_duration
2537 );
2538 }
2539 }
2540 ProcessOperationsResult::NoPendingOperations => {
2541 }
2543 other => {
2544 prop_assert!(
2545 false,
2546 "Expected ShouldReinvoke or NoPendingOperations, got {:?}",
2547 other
2548 );
2549 }
2550 }
2551
2552 for (i, _) in wait_durations.iter().enumerate() {
2554 let op_id = format!("wait-{}", i);
2555 prop_assert!(
2556 orchestrator.pending_operations.contains(&op_id),
2557 "Wait operation {} should be tracked as pending",
2558 op_id
2559 );
2560 }
2561
2562 Ok(())
2563 })?;
2564 }
2565
2566 #[test]
2573 fn prop_wait_operation_completion_already_completed(wait_seconds in wait_duration_strategy()) {
2574 let rt = tokio::runtime::Builder::new_current_thread()
2575 .enable_all()
2576 .build()
2577 .unwrap();
2578
2579 rt.block_on(async {
2580 let now_ms = chrono::Utc::now().timestamp_millis();
2581 let scheduled_end_ms = now_ms + (wait_seconds as i64 * 1000);
2582
2583 let mut wait_op = Operation::new("wait-completed", OperationType::Wait);
2585 wait_op.status = OperationStatus::Succeeded; wait_op.wait_details = Some(WaitDetails {
2587 scheduled_end_timestamp: Some(scheduled_end_ms),
2588 });
2589 wait_op.end_timestamp = Some(now_ms);
2590
2591 let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2593 let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2594
2595 let handler = |_input: String, _ctx: DurableContext| async move {
2596 Ok("result".to_string())
2597 };
2598
2599 let mut orchestrator = TestExecutionOrchestrator::new(
2600 handler,
2601 operation_storage,
2602 checkpoint_api,
2603 SkipTimeConfig { enabled: true },
2604 );
2605
2606 let operations = vec![wait_op];
2608 let result = orchestrator.process_operations(&operations, "exec-test");
2609
2610 match result {
2612 ProcessOperationsResult::NoPendingOperations => {
2613 }
2615 ProcessOperationsResult::ShouldReinvoke(_) => {
2616 prop_assert!(
2617 false,
2618 "Completed wait operation should not trigger re-invocation"
2619 );
2620 }
2621 other => {
2622 prop_assert!(
2623 false,
2624 "Expected NoPendingOperations for completed wait, got {:?}",
2625 other
2626 );
2627 }
2628 }
2629
2630 prop_assert!(
2632 !orchestrator.pending_operations.contains("wait-completed"),
2633 "Completed wait operation should not be tracked as pending"
2634 );
2635
2636 Ok(())
2637 })?;
2638 }
2639 }
2640}