1use std::collections::{HashMap, HashSet};
8use std::sync::atomic::{AtomicU8, Ordering};
9use std::sync::Arc;
10use std::time::Duration as StdDuration;
11
12use tokio::sync::{Mutex, RwLock};
13
14use crate::client::SharedDurableServiceClient;
15use crate::error::{DurableError, ErrorObject};
16use crate::operation::{Operation, OperationStatus, OperationType, OperationUpdate};
17use crate::types::ExecutionArn;
18
19use super::batcher::{
20 create_checkpoint_queue, CheckpointBatcher, CheckpointBatcherConfig, CheckpointSender,
21};
22use super::checkpoint_result::CheckpointedResult;
23use super::replay_status::ReplayStatus;
24
25pub struct ExecutionState {
30 durable_execution_arn: String,
32
33 checkpoint_token: Arc<RwLock<String>>,
35
36 operations: RwLock<HashMap<String, Operation>>,
38
39 service_client: SharedDurableServiceClient,
41
42 replay_status: AtomicU8,
44
45 replayed_operations: RwLock<HashSet<String>>,
47
48 next_marker: RwLock<Option<String>>,
50
51 parent_done_lock: Mutex<HashSet<String>>,
53
54 checkpoint_sender: Option<CheckpointSender>,
56
57 execution_operation: Option<Operation>,
60
61 checkpointing_mode: crate::config::CheckpointingMode,
70}
71
72impl ExecutionState {
73 pub fn new(
87 durable_execution_arn: impl Into<String>,
88 checkpoint_token: impl Into<String>,
89 initial_state: crate::lambda::InitialExecutionState,
90 service_client: SharedDurableServiceClient,
91 ) -> Self {
92 Self::with_checkpointing_mode(
93 durable_execution_arn,
94 checkpoint_token,
95 initial_state,
96 service_client,
97 crate::config::CheckpointingMode::default(),
98 )
99 }
100
101 pub fn with_checkpointing_mode(
111 durable_execution_arn: impl Into<String>,
112 checkpoint_token: impl Into<String>,
113 initial_state: crate::lambda::InitialExecutionState,
114 service_client: SharedDurableServiceClient,
115 checkpointing_mode: crate::config::CheckpointingMode,
116 ) -> Self {
117 let execution_operation = initial_state
119 .operations
120 .iter()
121 .find(|op| op.operation_type == OperationType::Execution)
122 .cloned();
123
124 let operations: HashMap<String, Operation> = initial_state
126 .operations
127 .into_iter()
128 .map(|op| (op.operation_id.clone(), op))
129 .collect();
130
131 let replay_status = if operations.is_empty() {
133 ReplayStatus::New
134 } else {
135 ReplayStatus::Replay
136 };
137
138 Self {
139 durable_execution_arn: durable_execution_arn.into(),
140 checkpoint_token: Arc::new(RwLock::new(checkpoint_token.into())),
141 operations: RwLock::new(operations),
142 service_client,
143 replay_status: AtomicU8::new(replay_status as u8),
144 replayed_operations: RwLock::new(HashSet::new()),
145 next_marker: RwLock::new(initial_state.next_marker),
146 parent_done_lock: Mutex::new(HashSet::new()),
147 checkpoint_sender: None,
148 execution_operation,
149 checkpointing_mode,
150 }
151 }
152
153 pub fn with_batcher(
164 durable_execution_arn: impl Into<String>,
165 checkpoint_token: impl Into<String>,
166 initial_state: crate::lambda::InitialExecutionState,
167 service_client: SharedDurableServiceClient,
168 batcher_config: CheckpointBatcherConfig,
169 queue_buffer_size: usize,
170 ) -> (Self, CheckpointBatcher) {
171 Self::with_batcher_and_mode(
172 durable_execution_arn,
173 checkpoint_token,
174 initial_state,
175 service_client,
176 batcher_config,
177 queue_buffer_size,
178 crate::config::CheckpointingMode::default(),
179 )
180 }
181
182 pub fn with_batcher_and_mode(
192 durable_execution_arn: impl Into<String>,
193 checkpoint_token: impl Into<String>,
194 initial_state: crate::lambda::InitialExecutionState,
195 service_client: SharedDurableServiceClient,
196 batcher_config: CheckpointBatcherConfig,
197 queue_buffer_size: usize,
198 checkpointing_mode: crate::config::CheckpointingMode,
199 ) -> (Self, CheckpointBatcher) {
200 let arn: String = durable_execution_arn.into();
201 let token: String = checkpoint_token.into();
202
203 let execution_operation = initial_state
205 .operations
206 .iter()
207 .find(|op| op.operation_type == OperationType::Execution)
208 .cloned();
209
210 let operations: HashMap<String, Operation> = initial_state
212 .operations
213 .into_iter()
214 .map(|op| (op.operation_id.clone(), op))
215 .collect();
216
217 let replay_status = if operations.is_empty() {
219 ReplayStatus::New
220 } else {
221 ReplayStatus::Replay
222 };
223
224 let (sender, rx) = create_checkpoint_queue(queue_buffer_size);
226 let checkpoint_token = Arc::new(RwLock::new(token));
227
228 let batcher = CheckpointBatcher::new(
230 batcher_config,
231 rx,
232 service_client.clone(),
233 arn.clone(),
234 checkpoint_token.clone(),
235 );
236
237 let state = Self {
238 durable_execution_arn: arn,
239 checkpoint_token,
240 operations: RwLock::new(operations),
241 service_client,
242 replay_status: AtomicU8::new(replay_status as u8),
243 replayed_operations: RwLock::new(HashSet::new()),
244 next_marker: RwLock::new(initial_state.next_marker),
245 parent_done_lock: Mutex::new(HashSet::new()),
246 checkpoint_sender: Some(sender),
247 execution_operation,
248 checkpointing_mode,
249 };
250
251 (state, batcher)
252 }
253
254 pub fn durable_execution_arn(&self) -> &str {
256 &self.durable_execution_arn
257 }
258
259 #[inline]
261 pub fn durable_execution_arn_typed(&self) -> ExecutionArn {
262 ExecutionArn::from(self.durable_execution_arn.clone())
263 }
264
265 pub async fn checkpoint_token(&self) -> String {
267 self.checkpoint_token.read().await.clone()
268 }
269
270 pub async fn set_checkpoint_token(&self, token: impl Into<String>) {
272 let mut guard = self.checkpoint_token.write().await;
273 *guard = token.into();
274 }
275
276 pub fn replay_status(&self) -> ReplayStatus {
287 ReplayStatus::from(self.replay_status.load(Ordering::Acquire))
290 }
291
292 pub fn is_replay(&self) -> bool {
294 self.replay_status().is_replay()
295 }
296
297 pub fn is_new(&self) -> bool {
299 self.replay_status().is_new()
300 }
301
302 pub fn checkpointing_mode(&self) -> crate::config::CheckpointingMode {
304 self.checkpointing_mode
305 }
306
307 pub fn is_eager_checkpointing(&self) -> bool {
309 self.checkpointing_mode.is_eager()
310 }
311
312 pub fn is_batched_checkpointing(&self) -> bool {
314 self.checkpointing_mode.is_batched()
315 }
316
317 pub fn is_optimistic_checkpointing(&self) -> bool {
319 self.checkpointing_mode.is_optimistic()
320 }
321
322 pub fn execution_operation(&self) -> Option<&Operation> {
328 self.execution_operation.as_ref()
329 }
330
331 pub fn get_original_input_raw(&self) -> Option<&str> {
337 self.execution_operation
338 .as_ref()
339 .and_then(|op| op.execution_details.as_ref())
340 .and_then(|details| details.input_payload.as_deref())
341 }
342
343 pub fn execution_operation_id(&self) -> Option<&str> {
345 self.execution_operation
346 .as_ref()
347 .map(|op| op.operation_id.as_str())
348 }
349
350 pub async fn complete_execution_success(
357 &self,
358 result: Option<String>,
359 ) -> Result<(), DurableError> {
360 let execution_id =
361 self.execution_operation_id()
362 .ok_or_else(|| DurableError::Validation {
363 message: "Cannot complete execution: no EXECUTION operation exists".to_string(),
364 })?;
365
366 let update = OperationUpdate::succeed(execution_id, OperationType::Execution, result);
367
368 self.create_checkpoint(update, true).await
369 }
370
371 pub async fn complete_execution_failure(&self, error: ErrorObject) -> Result<(), DurableError> {
377 let execution_id =
378 self.execution_operation_id()
379 .ok_or_else(|| DurableError::Validation {
380 message: "Cannot complete execution: no EXECUTION operation exists".to_string(),
381 })?;
382
383 let update = OperationUpdate::fail(execution_id, OperationType::Execution, error);
384
385 self.create_checkpoint(update, true).await
386 }
387
388 pub async fn get_checkpoint_result(&self, operation_id: &str) -> CheckpointedResult {
390 let operations = self.operations.read().await;
391 CheckpointedResult::new(operations.get(operation_id).cloned())
392 }
393
394 pub async fn track_replay(&self, operation_id: &str) {
396 {
397 let mut replayed = self.replayed_operations.write().await;
398 replayed.insert(operation_id.to_string());
399 }
400
401 let (replayed_count, total_count) = {
402 let replayed = self.replayed_operations.read().await;
403 let operations = self.operations.read().await;
404 (replayed.len(), operations.len())
405 };
406
407 if replayed_count >= total_count {
408 let has_more = self.next_marker.read().await.is_some();
409 if !has_more {
410 self.replay_status
417 .store(ReplayStatus::New as u8, Ordering::Release);
418 }
419 }
420 }
421
422 pub async fn load_more_operations(&self) -> Result<bool, DurableError> {
428 let marker = {
429 let guard = self.next_marker.read().await;
430 match guard.as_ref() {
431 Some(m) => m.clone(),
432 None => return Ok(false),
433 }
434 };
435
436 let response = self.get_operations_with_retry(&marker).await?;
437
438 {
439 let mut operations = self.operations.write().await;
440 for op in response.operations {
441 operations.insert(op.operation_id.clone(), op);
442 }
443 }
444
445 {
446 let mut next_marker = self.next_marker.write().await;
447 *next_marker = response.next_marker;
448 }
449
450 Ok(true)
451 }
452
453 async fn get_operations_with_retry(
455 &self,
456 marker: &str,
457 ) -> Result<crate::client::GetOperationsResponse, DurableError> {
458 const MAX_RETRIES: u32 = 5;
459 const INITIAL_DELAY_MS: u64 = 100;
460 const MAX_DELAY_MS: u64 = 10_000;
461 const BACKOFF_MULTIPLIER: u64 = 2;
462
463 let mut attempt = 0;
464 let mut delay_ms = INITIAL_DELAY_MS;
465
466 loop {
467 let result = self
468 .service_client
469 .get_operations(&self.durable_execution_arn, marker)
470 .await;
471
472 match result {
473 Ok(response) => return Ok(response),
474 Err(error) if error.is_throttling() => {
475 attempt += 1;
476 if attempt > MAX_RETRIES {
477 tracing::warn!(
478 attempt = attempt,
479 "GetOperations throttling: max retries exceeded"
480 );
481 return Err(error);
482 }
483
484 let actual_delay = error.get_retry_after_ms().unwrap_or(delay_ms);
485 tracing::debug!(
486 attempt = attempt,
487 delay_ms = actual_delay,
488 "GetOperations throttled, retrying"
489 );
490 tokio::time::sleep(StdDuration::from_millis(actual_delay)).await;
491 delay_ms = (delay_ms * BACKOFF_MULTIPLIER).min(MAX_DELAY_MS);
492 }
493 Err(error) => return Err(error),
494 }
495 }
496 }
497
498 pub async fn load_all_operations(&self) -> Result<(), DurableError> {
500 while self.load_more_operations().await? {}
501 Ok(())
502 }
503
504 pub async fn has_more_operations(&self) -> bool {
506 self.next_marker.read().await.is_some()
507 }
508
509 pub async fn operation_count(&self) -> usize {
511 self.operations.read().await.len()
512 }
513
514 pub fn service_client(&self) -> &SharedDurableServiceClient {
516 &self.service_client
517 }
518
519 pub async fn add_operation(&self, operation: Operation) {
521 let mut operations = self.operations.write().await;
522 operations.insert(operation.operation_id.clone(), operation);
523 }
524
525 pub async fn update_operation(
527 &self,
528 operation_id: &str,
529 update_fn: impl FnOnce(&mut Operation),
530 ) {
531 let mut operations = self.operations.write().await;
532 if let Some(op) = operations.get_mut(operation_id) {
533 update_fn(op);
534 }
535 }
536
537 pub async fn has_operation(&self, operation_id: &str) -> bool {
539 self.operations.read().await.contains_key(operation_id)
540 }
541
542 pub async fn get_operation(&self, operation_id: &str) -> Option<Operation> {
544 self.operations.read().await.get(operation_id).cloned()
545 }
546
547 pub async fn mark_parent_done(&self, parent_id: &str) {
549 let mut done_parents = self.parent_done_lock.lock().await;
550 done_parents.insert(parent_id.to_string());
551 }
552
553 pub async fn is_parent_done(&self, parent_id: &str) -> bool {
555 let done_parents = self.parent_done_lock.lock().await;
556 done_parents.contains(parent_id)
557 }
558
559 pub async fn is_orphaned(&self, parent_id: Option<&str>) -> bool {
561 match parent_id {
562 Some(pid) => self.is_parent_done(pid).await,
563 None => false,
564 }
565 }
566
567 pub async fn create_checkpoint(
575 &self,
576 operation: OperationUpdate,
577 is_sync: bool,
578 ) -> Result<(), DurableError> {
579 if let Some(ref parent_id) = operation.parent_id {
581 if self.is_parent_done(parent_id).await {
582 return Err(DurableError::OrphanedChild {
583 message: format!(
584 "Cannot checkpoint operation {} - parent {} has completed",
585 operation.operation_id, parent_id
586 ),
587 operation_id: operation.operation_id.clone(),
588 });
589 }
590 }
591
592 let effective_is_sync = match self.checkpointing_mode {
594 crate::config::CheckpointingMode::Eager => true,
595 crate::config::CheckpointingMode::Batched => is_sync,
596 crate::config::CheckpointingMode::Optimistic => is_sync,
597 };
598
599 if self.checkpointing_mode.is_eager() {
601 return self.checkpoint_direct(operation, effective_is_sync).await;
602 }
603
604 if let Some(ref sender) = self.checkpoint_sender {
606 let result = sender
607 .checkpoint(operation.clone(), effective_is_sync)
608 .await;
609 if result.is_ok() {
610 self.update_local_cache_from_update(&operation).await;
611 }
612 return result;
613 }
614
615 self.checkpoint_direct(operation, effective_is_sync).await
617 }
618
619 async fn checkpoint_direct(
621 &self,
622 operation: OperationUpdate,
623 _is_sync: bool,
624 ) -> Result<(), DurableError> {
625 let token = self.checkpoint_token.read().await.clone();
626 let response = self
627 .service_client
628 .checkpoint(&self.durable_execution_arn, &token, vec![operation.clone()])
629 .await?;
630
631 {
632 let mut token_guard = self.checkpoint_token.write().await;
633 *token_guard = response.checkpoint_token;
634 }
635
636 if let Some(ref new_state) = response.new_execution_state {
638 self.update_local_cache_from_response(new_state).await;
639 } else {
640 self.update_local_cache_from_update(&operation).await;
641 }
642 Ok(())
643 }
644
645 pub async fn create_checkpoint_with_response(
648 &self,
649 operation: OperationUpdate,
650 ) -> Result<crate::client::CheckpointResponse, DurableError> {
651 if let Some(ref parent_id) = operation.parent_id {
653 if self.is_parent_done(parent_id).await {
654 return Err(DurableError::OrphanedChild {
655 message: format!(
656 "Cannot checkpoint operation {} - parent {} has completed",
657 operation.operation_id, parent_id
658 ),
659 operation_id: operation.operation_id.clone(),
660 });
661 }
662 }
663
664 let token = self.checkpoint_token.read().await.clone();
665 let response = self
666 .service_client
667 .checkpoint(&self.durable_execution_arn, &token, vec![operation.clone()])
668 .await?;
669
670 tracing::debug!(
671 has_new_state = response.new_execution_state.is_some(),
672 num_operations = response
673 .new_execution_state
674 .as_ref()
675 .map(|s| s.operations.len())
676 .unwrap_or(0),
677 "Checkpoint response received"
678 );
679
680 {
681 let mut token_guard = self.checkpoint_token.write().await;
682 *token_guard = response.checkpoint_token.clone();
683 }
684
685 if let Some(ref new_state) = response.new_execution_state {
687 self.update_local_cache_from_response(new_state).await;
688 } else {
689 self.update_local_cache_from_update(&operation).await;
690 }
691
692 Ok(response)
693 }
694
695 async fn update_local_cache_from_response(&self, new_state: &crate::client::NewExecutionState) {
697 let mut operations = self.operations.write().await;
698 for op in &new_state.operations {
699 operations.insert(op.operation_id.clone(), op.clone());
700 }
701 }
702
703 pub async fn checkpoint_sync(&self, operation: OperationUpdate) -> Result<(), DurableError> {
705 self.create_checkpoint(operation, true).await
706 }
707
708 pub async fn checkpoint_async(&self, operation: OperationUpdate) -> Result<(), DurableError> {
710 self.create_checkpoint(operation, false).await
711 }
712
713 pub async fn checkpoint_optimal(
715 &self,
716 operation: OperationUpdate,
717 prefer_sync: bool,
718 ) -> Result<(), DurableError> {
719 let is_sync = match self.checkpointing_mode {
720 crate::config::CheckpointingMode::Eager => true,
721 crate::config::CheckpointingMode::Batched => prefer_sync,
722 crate::config::CheckpointingMode::Optimistic => prefer_sync,
723 };
724 self.create_checkpoint(operation, is_sync).await
725 }
726
727 pub fn should_use_async_checkpoint(&self) -> bool {
729 match self.checkpointing_mode {
730 crate::config::CheckpointingMode::Eager => false,
731 crate::config::CheckpointingMode::Batched => true,
732 crate::config::CheckpointingMode::Optimistic => true,
733 }
734 }
735
736 pub fn prioritizes_performance(&self) -> bool {
738 self.checkpointing_mode.is_optimistic()
739 }
740
741 pub fn prioritizes_durability(&self) -> bool {
743 self.checkpointing_mode.is_eager()
744 }
745
746 async fn update_local_cache_from_update(&self, update: &OperationUpdate) {
748 let mut operations = self.operations.write().await;
749
750 match update.action {
751 crate::operation::OperationAction::Start => {
752 let mut op = Operation::new(&update.operation_id, update.operation_type);
753 op.parent_id = update.parent_id.clone();
754 op.name = update.name.clone();
755 operations.insert(update.operation_id.clone(), op);
756 }
757 crate::operation::OperationAction::Succeed => {
758 if let Some(op) = operations.get_mut(&update.operation_id) {
759 op.status = OperationStatus::Succeeded;
760 op.result = update.result.clone();
761 } else {
762 let mut op = Operation::new(&update.operation_id, update.operation_type);
763 op.status = OperationStatus::Succeeded;
764 op.result = update.result.clone();
765 op.parent_id = update.parent_id.clone();
766 op.name = update.name.clone();
767 operations.insert(update.operation_id.clone(), op);
768 }
769 }
770 crate::operation::OperationAction::Fail => {
771 if let Some(op) = operations.get_mut(&update.operation_id) {
772 op.status = OperationStatus::Failed;
773 op.error = update.error.clone();
774 } else {
775 let mut op = Operation::new(&update.operation_id, update.operation_type);
776 op.status = OperationStatus::Failed;
777 op.error = update.error.clone();
778 op.parent_id = update.parent_id.clone();
779 op.name = update.name.clone();
780 operations.insert(update.operation_id.clone(), op);
781 }
782 }
783 crate::operation::OperationAction::Cancel => {
784 if let Some(op) = operations.get_mut(&update.operation_id) {
785 op.status = OperationStatus::Cancelled;
786 } else {
787 let mut op = Operation::new(&update.operation_id, update.operation_type);
788 op.status = OperationStatus::Cancelled;
789 op.parent_id = update.parent_id.clone();
790 op.name = update.name.clone();
791 operations.insert(update.operation_id.clone(), op);
792 }
793 }
794 crate::operation::OperationAction::Retry => {
795 if let Some(op) = operations.get_mut(&update.operation_id) {
796 op.status = OperationStatus::Pending;
797 if update.result.is_some() || update.step_options.is_some() {
798 let step_details =
799 op.step_details
800 .get_or_insert(crate::operation::StepDetails {
801 result: None,
802 attempt: None,
803 next_attempt_timestamp: None,
804 error: None,
805 payload: None,
806 });
807 if update.result.is_some() {
808 step_details.payload = update.result.clone();
809 }
810 step_details.attempt = Some(step_details.attempt.unwrap_or(0) + 1);
811 }
812 if update.error.is_some() {
813 op.error = update.error.clone();
814 }
815 } else {
816 let mut op = Operation::new(&update.operation_id, update.operation_type);
817 op.status = OperationStatus::Pending;
818 op.parent_id = update.parent_id.clone();
819 op.name = update.name.clone();
820 op.error = update.error.clone();
821 if update.result.is_some() || update.step_options.is_some() {
822 op.step_details = Some(crate::operation::StepDetails {
823 result: None,
824 attempt: Some(1),
825 next_attempt_timestamp: None,
826 error: None,
827 payload: update.result.clone(),
828 });
829 }
830 operations.insert(update.operation_id.clone(), op);
831 }
832 }
833 }
834 }
835
836 pub fn shared_checkpoint_token(&self) -> Arc<RwLock<String>> {
838 self.checkpoint_token.clone()
839 }
840
841 pub fn has_checkpoint_sender(&self) -> bool {
843 self.checkpoint_sender.is_some()
844 }
845
846 pub async fn load_child_operations(
853 &self,
854 parent_id: &str,
855 ) -> Result<Vec<Operation>, DurableError> {
856 let operations = self.operations.read().await;
857 let children: Vec<Operation> = operations
858 .values()
859 .filter(|op| op.parent_id.as_deref() == Some(parent_id))
860 .cloned()
861 .collect();
862
863 Ok(children)
864 }
865
866 pub async fn get_child_operations(&self, parent_id: &str) -> Vec<Operation> {
868 let operations = self.operations.read().await;
869 operations
870 .values()
871 .filter(|op| op.parent_id.as_deref() == Some(parent_id))
872 .cloned()
873 .collect()
874 }
875
876 pub async fn has_replay_children(&self, operation_id: &str) -> bool {
882 let operations = self.operations.read().await;
883 operations
884 .get(operation_id)
885 .filter(|op| op.operation_type == OperationType::Context)
886 .and_then(|op| op.context_details.as_ref())
887 .and_then(|details| details.replay_children)
888 .unwrap_or(false)
889 }
890}
891
892impl std::fmt::Debug for ExecutionState {
894 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
895 f.debug_struct("ExecutionState")
896 .field("durable_execution_arn", &self.durable_execution_arn)
897 .field("replay_status", &self.replay_status())
898 .finish_non_exhaustive()
899 }
900}
901
902#[cfg(test)]
903mod tests {
904 use super::*;
905 use std::sync::Arc;
906
907 use crate::client::{
908 CheckpointResponse, GetOperationsResponse, MockDurableServiceClient,
909 SharedDurableServiceClient,
910 };
911 use crate::error::ErrorObject;
912 use crate::lambda::InitialExecutionState;
913 use crate::operation::{
914 ContextDetails, ExecutionDetails, Operation, OperationStatus, OperationType,
915 OperationUpdate,
916 };
917
918 fn create_mock_client() -> SharedDurableServiceClient {
919 Arc::new(MockDurableServiceClient::new())
920 }
921
922 fn create_test_operation(id: &str, status: OperationStatus) -> Operation {
923 let mut op = Operation::new(id, OperationType::Step);
924 op.status = status;
925 op
926 }
927
928 fn create_execution_operation(input_payload: Option<&str>) -> Operation {
929 let mut op = Operation::new("exec-123", OperationType::Execution);
930 op.status = OperationStatus::Started;
931 op.execution_details = Some(ExecutionDetails {
932 input_payload: input_payload.map(|s| s.to_string()),
933 });
934 op
935 }
936
937 #[tokio::test]
940 async fn test_execution_state_new_empty() {
941 let client = create_mock_client();
942 let state = ExecutionState::new(
943 "arn:test",
944 "token-123",
945 InitialExecutionState::new(),
946 client,
947 );
948
949 assert_eq!(state.durable_execution_arn(), "arn:test");
950 assert_eq!(state.checkpoint_token().await, "token-123");
951 assert!(state.is_new());
952 assert!(!state.is_replay());
953 assert_eq!(state.operation_count().await, 0);
954 assert!(!state.has_more_operations().await);
955 }
956
957 #[tokio::test]
958 async fn test_execution_state_new_with_operations() {
959 let client = create_mock_client();
960 let ops = vec![
961 create_test_operation("op-1", OperationStatus::Succeeded),
962 create_test_operation("op-2", OperationStatus::Succeeded),
963 ];
964 let initial_state = InitialExecutionState::with_operations(ops);
965
966 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
967
968 assert!(state.is_replay());
969 assert!(!state.is_new());
970 assert_eq!(state.operation_count().await, 2);
971 }
972
973 #[tokio::test]
974 async fn test_get_checkpoint_result_exists() {
975 let client = create_mock_client();
976 let mut op = create_test_operation("op-1", OperationStatus::Succeeded);
977 op.result = Some(r#"{"value": 42}"#.to_string());
978 let initial_state = InitialExecutionState::with_operations(vec![op]);
979
980 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
981
982 let result = state.get_checkpoint_result("op-1").await;
983 assert!(result.is_existent());
984 assert!(result.is_succeeded());
985 assert_eq!(result.result(), Some(r#"{"value": 42}"#));
986 }
987
988 #[tokio::test]
989 async fn test_get_checkpoint_result_not_exists() {
990 let client = create_mock_client();
991 let state = ExecutionState::new(
992 "arn:test",
993 "token-123",
994 InitialExecutionState::new(),
995 client,
996 );
997
998 let result = state.get_checkpoint_result("non-existent").await;
999 assert!(!result.is_existent());
1000 }
1001
1002 #[tokio::test]
1003 async fn test_track_replay_transitions_to_new() {
1004 let client = create_mock_client();
1005 let ops = vec![
1006 create_test_operation("op-1", OperationStatus::Succeeded),
1007 create_test_operation("op-2", OperationStatus::Succeeded),
1008 ];
1009 let initial_state = InitialExecutionState::with_operations(ops);
1010
1011 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1012
1013 assert!(state.is_replay());
1014 state.track_replay("op-1").await;
1015 assert!(state.is_replay());
1016 state.track_replay("op-2").await;
1017 assert!(state.is_new());
1018 }
1019
1020 #[tokio::test]
1021 async fn test_track_replay_with_pagination() {
1022 let client = Arc::new(
1023 MockDurableServiceClient::new().with_get_operations_response(Ok(
1024 GetOperationsResponse {
1025 operations: vec![create_test_operation("op-3", OperationStatus::Succeeded)],
1026 next_marker: None,
1027 },
1028 )),
1029 );
1030
1031 let ops = vec![create_test_operation("op-1", OperationStatus::Succeeded)];
1032 let mut initial_state = InitialExecutionState::with_operations(ops);
1033 initial_state.next_marker = Some("marker-1".to_string());
1034
1035 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1036
1037 state.track_replay("op-1").await;
1038 assert!(state.is_replay());
1039 }
1040
1041 #[tokio::test]
1042 async fn test_set_checkpoint_token() {
1043 let client = create_mock_client();
1044 let state = ExecutionState::new(
1045 "arn:test",
1046 "token-123",
1047 InitialExecutionState::new(),
1048 client,
1049 );
1050
1051 assert_eq!(state.checkpoint_token().await, "token-123");
1052 state.set_checkpoint_token("token-456").await;
1053 assert_eq!(state.checkpoint_token().await, "token-456");
1054 }
1055
1056 #[tokio::test]
1057 async fn test_add_operation() {
1058 let client = create_mock_client();
1059 let state = ExecutionState::new(
1060 "arn:test",
1061 "token-123",
1062 InitialExecutionState::new(),
1063 client,
1064 );
1065
1066 assert!(!state.has_operation("op-1").await);
1067
1068 let op = create_test_operation("op-1", OperationStatus::Succeeded);
1069 state.add_operation(op).await;
1070
1071 assert!(state.has_operation("op-1").await);
1072 assert_eq!(state.operation_count().await, 1);
1073 }
1074
1075 #[tokio::test]
1076 async fn test_update_operation() {
1077 let client = create_mock_client();
1078 let ops = vec![create_test_operation("op-1", OperationStatus::Started)];
1079 let initial_state = InitialExecutionState::with_operations(ops);
1080
1081 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1082
1083 let op = state.get_operation("op-1").await.unwrap();
1084 assert_eq!(op.status, OperationStatus::Started);
1085
1086 state
1087 .update_operation("op-1", |op| {
1088 op.status = OperationStatus::Succeeded;
1089 op.result = Some("done".to_string());
1090 })
1091 .await;
1092
1093 let op = state.get_operation("op-1").await.unwrap();
1094 assert_eq!(op.status, OperationStatus::Succeeded);
1095 assert_eq!(op.result, Some("done".to_string()));
1096 }
1097
1098 #[tokio::test]
1099 async fn test_load_more_operations() {
1100 let client = Arc::new(
1101 MockDurableServiceClient::new().with_get_operations_response(Ok(
1102 GetOperationsResponse {
1103 operations: vec![
1104 create_test_operation("op-2", OperationStatus::Succeeded),
1105 create_test_operation("op-3", OperationStatus::Succeeded),
1106 ],
1107 next_marker: None,
1108 },
1109 )),
1110 );
1111
1112 let ops = vec![create_test_operation("op-1", OperationStatus::Succeeded)];
1113 let mut initial_state = InitialExecutionState::with_operations(ops);
1114 initial_state.next_marker = Some("marker-1".to_string());
1115
1116 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1117
1118 assert_eq!(state.operation_count().await, 1);
1119 assert!(state.has_more_operations().await);
1120
1121 let loaded = state.load_more_operations().await.unwrap();
1122 assert!(loaded);
1123
1124 assert_eq!(state.operation_count().await, 3);
1125 assert!(!state.has_more_operations().await);
1126 }
1127
1128 #[tokio::test]
1129 async fn test_load_more_operations_no_more() {
1130 let client = create_mock_client();
1131 let state = ExecutionState::new(
1132 "arn:test",
1133 "token-123",
1134 InitialExecutionState::new(),
1135 client,
1136 );
1137
1138 let loaded = state.load_more_operations().await.unwrap();
1139 assert!(!loaded);
1140 }
1141
1142 #[tokio::test]
1143 async fn test_load_all_operations() {
1144 let client = Arc::new(
1145 MockDurableServiceClient::new()
1146 .with_get_operations_response(Ok(GetOperationsResponse {
1147 operations: vec![create_test_operation("op-2", OperationStatus::Succeeded)],
1148 next_marker: Some("marker-2".to_string()),
1149 }))
1150 .with_get_operations_response(Ok(GetOperationsResponse {
1151 operations: vec![create_test_operation("op-3", OperationStatus::Succeeded)],
1152 next_marker: None,
1153 })),
1154 );
1155
1156 let ops = vec![create_test_operation("op-1", OperationStatus::Succeeded)];
1157 let mut initial_state = InitialExecutionState::with_operations(ops);
1158 initial_state.next_marker = Some("marker-1".to_string());
1159
1160 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1161
1162 assert_eq!(state.operation_count().await, 1);
1163 state.load_all_operations().await.unwrap();
1164
1165 assert_eq!(state.operation_count().await, 3);
1166 assert!(!state.has_more_operations().await);
1167 }
1168
1169 #[tokio::test]
1170 async fn test_mark_parent_done() {
1171 let client = create_mock_client();
1172 let state = ExecutionState::new(
1173 "arn:test",
1174 "token-123",
1175 InitialExecutionState::new(),
1176 client,
1177 );
1178
1179 assert!(!state.is_parent_done("parent-1").await);
1180 state.mark_parent_done("parent-1").await;
1181 assert!(state.is_parent_done("parent-1").await);
1182 assert!(!state.is_parent_done("parent-2").await);
1183 }
1184
1185 #[tokio::test]
1186 async fn test_is_orphaned() {
1187 let client = create_mock_client();
1188 let state = ExecutionState::new(
1189 "arn:test",
1190 "token-123",
1191 InitialExecutionState::new(),
1192 client,
1193 );
1194
1195 assert!(!state.is_orphaned(None).await);
1196 assert!(!state.is_orphaned(Some("parent-1")).await);
1197
1198 state.mark_parent_done("parent-1").await;
1199
1200 assert!(state.is_orphaned(Some("parent-1")).await);
1201 assert!(!state.is_orphaned(Some("parent-2")).await);
1202 }
1203
1204 #[tokio::test]
1205 async fn test_debug_impl() {
1206 let client = create_mock_client();
1207 let state = ExecutionState::new(
1208 "arn:test",
1209 "token-123",
1210 InitialExecutionState::new(),
1211 client,
1212 );
1213
1214 let debug_str = format!("{:?}", state);
1215 assert!(debug_str.contains("ExecutionState"));
1216 assert!(debug_str.contains("arn:test"));
1217 }
1218
1219 #[tokio::test]
1220 async fn test_create_checkpoint_direct() {
1221 let client = Arc::new(
1222 MockDurableServiceClient::new()
1223 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1224 );
1225
1226 let state = ExecutionState::new(
1227 "arn:test",
1228 "token-123",
1229 InitialExecutionState::new(),
1230 client,
1231 );
1232
1233 let update = OperationUpdate::start("op-1", OperationType::Step);
1234 let result = state.create_checkpoint(update, true).await;
1235
1236 assert!(result.is_ok());
1237 assert_eq!(state.checkpoint_token().await, "new-token");
1238 assert!(state.has_operation("op-1").await);
1239 }
1240
1241 #[tokio::test]
1242 async fn test_create_checkpoint_updates_local_cache_on_succeed() {
1243 let client = Arc::new(
1244 MockDurableServiceClient::new()
1245 .with_checkpoint_response(Ok(CheckpointResponse::new("token-1")))
1246 .with_checkpoint_response(Ok(CheckpointResponse::new("token-2"))),
1247 );
1248
1249 let state = ExecutionState::new(
1250 "arn:test",
1251 "token-123",
1252 InitialExecutionState::new(),
1253 client,
1254 );
1255
1256 let update = OperationUpdate::start("op-1", OperationType::Step);
1257 state.create_checkpoint(update, true).await.unwrap();
1258
1259 let op = state.get_operation("op-1").await.unwrap();
1260 assert_eq!(op.status, OperationStatus::Started);
1261
1262 let update = OperationUpdate::succeed(
1263 "op-1",
1264 OperationType::Step,
1265 Some(r#"{"result": "ok"}"#.to_string()),
1266 );
1267 state.create_checkpoint(update, true).await.unwrap();
1268
1269 let op = state.get_operation("op-1").await.unwrap();
1270 assert_eq!(op.status, OperationStatus::Succeeded);
1271 assert_eq!(op.result, Some(r#"{"result": "ok"}"#.to_string()));
1272 }
1273
1274 #[tokio::test]
1275 async fn test_create_checkpoint_rejects_orphaned_child() {
1276 let client = create_mock_client();
1277 let state = ExecutionState::new(
1278 "arn:test",
1279 "token-123",
1280 InitialExecutionState::new(),
1281 client,
1282 );
1283
1284 state.mark_parent_done("parent-1").await;
1285
1286 let update =
1287 OperationUpdate::start("child-1", OperationType::Step).with_parent_id("parent-1");
1288 let result = state.create_checkpoint(update, true).await;
1289
1290 assert!(result.is_err());
1291 match result.unwrap_err() {
1292 crate::error::DurableError::OrphanedChild { operation_id, .. } => {
1293 assert_eq!(operation_id, "child-1");
1294 }
1295 _ => panic!("Expected OrphanedChild error"),
1296 }
1297 }
1298
1299 #[tokio::test]
1300 async fn test_with_batcher_creates_state_and_batcher() {
1301 let client = Arc::new(
1302 MockDurableServiceClient::new()
1303 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1304 );
1305
1306 let (state, mut batcher) = ExecutionState::with_batcher(
1307 "arn:test",
1308 "token-123",
1309 InitialExecutionState::new(),
1310 client,
1311 CheckpointBatcherConfig {
1312 max_batch_time_ms: 10,
1313 ..Default::default()
1314 },
1315 100,
1316 );
1317
1318 assert!(state.has_checkpoint_sender());
1319 assert_eq!(state.durable_execution_arn(), "arn:test");
1320
1321 let batcher_handle = tokio::spawn(async move {
1322 batcher.run().await;
1323 });
1324
1325 let update = OperationUpdate::start("op-1", OperationType::Step);
1326 let result = state.create_checkpoint(update, true).await;
1327
1328 drop(state);
1329 batcher_handle.await.unwrap();
1330
1331 assert!(result.is_ok());
1332 }
1333
1334 #[tokio::test]
1335 async fn test_checkpoint_sync_convenience_method() {
1336 let client = Arc::new(
1337 MockDurableServiceClient::new()
1338 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1339 );
1340
1341 let state = ExecutionState::new(
1342 "arn:test",
1343 "token-123",
1344 InitialExecutionState::new(),
1345 client,
1346 );
1347
1348 let update = OperationUpdate::start("op-1", OperationType::Step);
1349 let result = state.checkpoint_sync(update).await;
1350
1351 assert!(result.is_ok());
1352 }
1353
1354 #[tokio::test]
1355 async fn test_checkpoint_async_convenience_method() {
1356 let client = Arc::new(
1357 MockDurableServiceClient::new()
1358 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1359 );
1360
1361 let state = ExecutionState::new(
1362 "arn:test",
1363 "token-123",
1364 InitialExecutionState::new(),
1365 client,
1366 );
1367
1368 let update = OperationUpdate::start("op-1", OperationType::Step);
1369 let result = state.checkpoint_async(update).await;
1370
1371 assert!(result.is_ok());
1372 }
1373
1374 #[tokio::test]
1375 async fn test_shared_checkpoint_token() {
1376 let client = create_mock_client();
1377 let state = ExecutionState::new(
1378 "arn:test",
1379 "token-123",
1380 InitialExecutionState::new(),
1381 client,
1382 );
1383
1384 let shared_token = state.shared_checkpoint_token();
1385 assert_eq!(*shared_token.read().await, "token-123");
1386
1387 {
1388 let mut guard = shared_token.write().await;
1389 *guard = "modified-token".to_string();
1390 }
1391
1392 assert_eq!(state.checkpoint_token().await, "modified-token");
1393 }
1394
1395 #[tokio::test]
1398 async fn test_load_child_operations_returns_children() {
1399 let client = create_mock_client();
1400
1401 let mut parent_op = Operation::new("parent-ctx", OperationType::Context);
1402 parent_op.status = OperationStatus::Succeeded;
1403
1404 let mut child1 = create_test_operation("child-1", OperationStatus::Succeeded);
1405 child1.parent_id = Some("parent-ctx".to_string());
1406
1407 let mut child2 = create_test_operation("child-2", OperationStatus::Succeeded);
1408 child2.parent_id = Some("parent-ctx".to_string());
1409
1410 let mut other_child = create_test_operation("other-child", OperationStatus::Succeeded);
1411 other_child.parent_id = Some("other-parent".to_string());
1412
1413 let initial_state =
1414 InitialExecutionState::with_operations(vec![parent_op, child1, child2, other_child]);
1415
1416 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1417
1418 let children = state.load_child_operations("parent-ctx").await.unwrap();
1419
1420 assert_eq!(children.len(), 2);
1421 let child_ids: Vec<&str> = children.iter().map(|c| c.operation_id.as_str()).collect();
1422 assert!(child_ids.contains(&"child-1"));
1423 assert!(child_ids.contains(&"child-2"));
1424 }
1425
1426 #[tokio::test]
1427 async fn test_load_child_operations_no_children() {
1428 let client = create_mock_client();
1429
1430 let parent_op = Operation::new("parent-ctx", OperationType::Context);
1431 let initial_state = InitialExecutionState::with_operations(vec![parent_op]);
1432
1433 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1434
1435 let children = state.load_child_operations("parent-ctx").await.unwrap();
1436
1437 assert!(children.is_empty());
1438 }
1439
1440 #[tokio::test]
1441 async fn test_get_child_operations_returns_cached_children() {
1442 let client = create_mock_client();
1443
1444 let mut parent_op = Operation::new("parent-ctx", OperationType::Context);
1445 parent_op.status = OperationStatus::Succeeded;
1446
1447 let mut child1 = create_test_operation("child-1", OperationStatus::Succeeded);
1448 child1.parent_id = Some("parent-ctx".to_string());
1449
1450 let mut child2 = create_test_operation("child-2", OperationStatus::Succeeded);
1451 child2.parent_id = Some("parent-ctx".to_string());
1452
1453 let initial_state = InitialExecutionState::with_operations(vec![parent_op, child1, child2]);
1454
1455 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1456
1457 let children = state.get_child_operations("parent-ctx").await;
1458
1459 assert_eq!(children.len(), 2);
1460 }
1461
1462 #[tokio::test]
1463 async fn test_get_child_operations_empty_for_nonexistent_parent() {
1464 let client = create_mock_client();
1465 let state = ExecutionState::new(
1466 "arn:test",
1467 "token-123",
1468 InitialExecutionState::new(),
1469 client,
1470 );
1471
1472 let children = state.get_child_operations("nonexistent-parent").await;
1473 assert!(children.is_empty());
1474 }
1475
1476 #[tokio::test]
1477 async fn test_has_replay_children_true() {
1478 let client = create_mock_client();
1479
1480 let mut ctx_op = Operation::new("ctx-with-replay", OperationType::Context);
1481 ctx_op.status = OperationStatus::Succeeded;
1482 ctx_op.context_details = Some(ContextDetails {
1483 result: None,
1484 replay_children: Some(true),
1485 error: None,
1486 });
1487
1488 let initial_state = InitialExecutionState::with_operations(vec![ctx_op]);
1489 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1490
1491 assert!(state.has_replay_children("ctx-with-replay").await);
1492 }
1493
1494 #[tokio::test]
1495 async fn test_has_replay_children_false_when_not_set() {
1496 let client = create_mock_client();
1497
1498 let mut ctx_op = Operation::new("ctx-no-replay", OperationType::Context);
1499 ctx_op.status = OperationStatus::Succeeded;
1500 ctx_op.context_details = Some(ContextDetails {
1501 result: None,
1502 replay_children: None,
1503 error: None,
1504 });
1505
1506 let initial_state = InitialExecutionState::with_operations(vec![ctx_op]);
1507 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1508
1509 assert!(!state.has_replay_children("ctx-no-replay").await);
1510 }
1511
1512 #[tokio::test]
1515 async fn test_checkpointing_mode_default_is_batched() {
1516 let client = create_mock_client();
1517 let state = ExecutionState::new(
1518 "arn:test",
1519 "token-123",
1520 InitialExecutionState::new(),
1521 client,
1522 );
1523
1524 assert_eq!(
1525 state.checkpointing_mode(),
1526 crate::config::CheckpointingMode::Batched
1527 );
1528 assert!(state.is_batched_checkpointing());
1529 assert!(!state.is_eager_checkpointing());
1530 assert!(!state.is_optimistic_checkpointing());
1531 }
1532
1533 #[tokio::test]
1534 async fn test_checkpointing_mode_eager() {
1535 let client = create_mock_client();
1536 let state = ExecutionState::with_checkpointing_mode(
1537 "arn:test",
1538 "token-123",
1539 InitialExecutionState::new(),
1540 client,
1541 crate::config::CheckpointingMode::Eager,
1542 );
1543
1544 assert_eq!(
1545 state.checkpointing_mode(),
1546 crate::config::CheckpointingMode::Eager
1547 );
1548 assert!(state.is_eager_checkpointing());
1549 assert!(!state.is_batched_checkpointing());
1550 assert!(!state.is_optimistic_checkpointing());
1551 assert!(state.prioritizes_durability());
1552 assert!(!state.prioritizes_performance());
1553 assert!(!state.should_use_async_checkpoint());
1554 }
1555
1556 #[tokio::test]
1557 async fn test_checkpointing_mode_optimistic() {
1558 let client = create_mock_client();
1559 let state = ExecutionState::with_checkpointing_mode(
1560 "arn:test",
1561 "token-123",
1562 InitialExecutionState::new(),
1563 client,
1564 crate::config::CheckpointingMode::Optimistic,
1565 );
1566
1567 assert_eq!(
1568 state.checkpointing_mode(),
1569 crate::config::CheckpointingMode::Optimistic
1570 );
1571 assert!(state.is_optimistic_checkpointing());
1572 assert!(!state.is_eager_checkpointing());
1573 assert!(!state.is_batched_checkpointing());
1574 assert!(state.prioritizes_performance());
1575 assert!(!state.prioritizes_durability());
1576 assert!(state.should_use_async_checkpoint());
1577 }
1578
1579 #[tokio::test]
1580 async fn test_checkpointing_mode_batched_helpers() {
1581 let client = create_mock_client();
1582 let state = ExecutionState::with_checkpointing_mode(
1583 "arn:test",
1584 "token-123",
1585 InitialExecutionState::new(),
1586 client,
1587 crate::config::CheckpointingMode::Batched,
1588 );
1589
1590 assert!(!state.prioritizes_durability());
1591 assert!(!state.prioritizes_performance());
1592 assert!(state.should_use_async_checkpoint());
1593 }
1594
1595 #[tokio::test]
1596 async fn test_checkpoint_optimal_eager_mode() {
1597 let client = Arc::new(
1598 MockDurableServiceClient::new()
1599 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1600 );
1601
1602 let state = ExecutionState::with_checkpointing_mode(
1603 "arn:test",
1604 "token-123",
1605 InitialExecutionState::new(),
1606 client,
1607 crate::config::CheckpointingMode::Eager,
1608 );
1609
1610 let update = OperationUpdate::start("op-1", OperationType::Step);
1611 let result = state.checkpoint_optimal(update, false).await;
1612
1613 assert!(result.is_ok());
1614 }
1615
1616 #[tokio::test]
1617 async fn test_eager_mode_bypasses_batcher() {
1618 let client = Arc::new(
1619 MockDurableServiceClient::new()
1620 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1621 );
1622
1623 let (state, mut batcher) = ExecutionState::with_batcher_and_mode(
1624 "arn:test",
1625 "token-123",
1626 InitialExecutionState::new(),
1627 client,
1628 CheckpointBatcherConfig {
1629 max_batch_time_ms: 10,
1630 ..Default::default()
1631 },
1632 100,
1633 crate::config::CheckpointingMode::Eager,
1634 );
1635
1636 let batcher_handle = tokio::spawn(async move {
1637 batcher.run().await;
1638 });
1639
1640 let update = OperationUpdate::start("op-1", OperationType::Step);
1641 let result = state.create_checkpoint(update, false).await;
1642
1643 drop(state);
1644 batcher_handle.await.unwrap();
1645
1646 assert!(result.is_ok());
1647 }
1648
1649 #[tokio::test]
1652 async fn test_execution_operation_recognized() {
1653 let client = create_mock_client();
1654 let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1655 let step_op = Operation::new("step-1", OperationType::Step);
1656
1657 let initial_state = InitialExecutionState::with_operations(vec![exec_op, step_op]);
1658 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1659
1660 assert!(state.execution_operation().is_some());
1661 let exec = state.execution_operation().unwrap();
1662 assert_eq!(exec.operation_type, OperationType::Execution);
1663 assert_eq!(exec.operation_id, "exec-123");
1664 }
1665
1666 #[tokio::test]
1667 async fn test_execution_operation_not_present() {
1668 let client = create_mock_client();
1669 let step_op = Operation::new("step-1", OperationType::Step);
1670
1671 let initial_state = InitialExecutionState::with_operations(vec![step_op]);
1672 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1673
1674 assert!(state.execution_operation().is_none());
1675 }
1676
1677 #[tokio::test]
1678 async fn test_get_original_input_raw() {
1679 let client = create_mock_client();
1680 let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1681
1682 let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1683 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1684
1685 let input = state.get_original_input_raw();
1686 assert!(input.is_some());
1687 assert_eq!(input.unwrap(), r#"{"order_id": "123"}"#);
1688 }
1689
1690 #[tokio::test]
1691 async fn test_get_original_input_raw_no_payload() {
1692 let client = create_mock_client();
1693 let exec_op = create_execution_operation(None);
1694
1695 let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1696 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1697
1698 let input = state.get_original_input_raw();
1699 assert!(input.is_none());
1700 }
1701
1702 #[tokio::test]
1703 async fn test_get_original_input_raw_no_execution_operation() {
1704 let client = create_mock_client();
1705 let step_op = Operation::new("step-1", OperationType::Step);
1706
1707 let initial_state = InitialExecutionState::with_operations(vec![step_op]);
1708 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1709
1710 let input = state.get_original_input_raw();
1711 assert!(input.is_none());
1712 }
1713
1714 #[tokio::test]
1715 async fn test_execution_operation_id() {
1716 let client = create_mock_client();
1717 let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1718
1719 let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1720 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1721
1722 let id = state.execution_operation_id();
1723 assert!(id.is_some());
1724 assert_eq!(id.unwrap(), "exec-123");
1725 }
1726
1727 #[tokio::test]
1728 async fn test_complete_execution_success() {
1729 let client = Arc::new(
1730 MockDurableServiceClient::new()
1731 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1732 );
1733
1734 let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1735 let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1736 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1737
1738 let result = state
1739 .complete_execution_success(Some(r#"{"status": "completed"}"#.to_string()))
1740 .await;
1741 assert!(result.is_ok());
1742
1743 let op = state.get_operation("exec-123").await.unwrap();
1744 assert_eq!(op.status, OperationStatus::Succeeded);
1745 assert_eq!(op.result, Some(r#"{"status": "completed"}"#.to_string()));
1746 }
1747
1748 #[tokio::test]
1749 async fn test_complete_execution_success_no_execution_operation() {
1750 let client = create_mock_client();
1751 let step_op = Operation::new("step-1", OperationType::Step);
1752
1753 let initial_state = InitialExecutionState::with_operations(vec![step_op]);
1754 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1755
1756 let result = state
1757 .complete_execution_success(Some("result".to_string()))
1758 .await;
1759 assert!(result.is_err());
1760 match result.unwrap_err() {
1761 crate::error::DurableError::Validation { message } => {
1762 assert!(message.contains("no EXECUTION operation"));
1763 }
1764 _ => panic!("Expected Validation error"),
1765 }
1766 }
1767
1768 #[tokio::test]
1769 async fn test_complete_execution_failure() {
1770 let client = Arc::new(
1771 MockDurableServiceClient::new()
1772 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1773 );
1774
1775 let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1776 let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1777 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1778
1779 let error = ErrorObject::new("ProcessingError", "Order processing failed");
1780 let result = state.complete_execution_failure(error).await;
1781 assert!(result.is_ok());
1782
1783 let op = state.get_operation("exec-123").await.unwrap();
1784 assert_eq!(op.status, OperationStatus::Failed);
1785 assert!(op.error.is_some());
1786 assert_eq!(op.error.as_ref().unwrap().error_type, "ProcessingError");
1787 }
1788
1789 #[tokio::test]
1790 async fn test_complete_execution_failure_no_execution_operation() {
1791 let client = create_mock_client();
1792 let step_op = Operation::new("step-1", OperationType::Step);
1793
1794 let initial_state = InitialExecutionState::with_operations(vec![step_op]);
1795 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1796
1797 let error = ErrorObject::new("TestError", "Test message");
1798 let result = state.complete_execution_failure(error).await;
1799 assert!(result.is_err());
1800 match result.unwrap_err() {
1801 crate::error::DurableError::Validation { message } => {
1802 assert!(message.contains("no EXECUTION operation"));
1803 }
1804 _ => panic!("Expected Validation error"),
1805 }
1806 }
1807
1808 #[tokio::test]
1809 async fn test_with_batcher_recognizes_execution_operation() {
1810 let client = Arc::new(MockDurableServiceClient::new());
1811 let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1812
1813 let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1814 let (state, _batcher) = ExecutionState::with_batcher(
1815 "arn:test",
1816 "token-123",
1817 initial_state,
1818 client,
1819 CheckpointBatcherConfig::default(),
1820 100,
1821 );
1822
1823 assert!(state.execution_operation().is_some());
1824 let exec = state.execution_operation().unwrap();
1825 assert_eq!(exec.operation_type, OperationType::Execution);
1826 assert_eq!(
1827 state.get_original_input_raw(),
1828 Some(r#"{"order_id": "123"}"#)
1829 );
1830 }
1831
1832 mod property_tests {
1835 use super::*;
1836 use proptest::prelude::*;
1837
1838 proptest! {
1839 #![proptest_config(ProptestConfig::with_cases(100))]
1840
1841 #[test]
1842 fn prop_orphaned_child_checkpoint_fails(
1843 parent_id in "[a-z]{5,10}",
1844 child_id in "[a-z]{5,10}",
1845 ) {
1846 let rt = tokio::runtime::Runtime::new().unwrap();
1847 let result: Result<(), TestCaseError> = rt.block_on(async {
1848 let client = Arc::new(
1849 MockDurableServiceClient::new()
1850 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token")))
1851 );
1852
1853 let state = ExecutionState::new(
1854 "arn:test",
1855 "token-123",
1856 InitialExecutionState::new(),
1857 client,
1858 );
1859
1860 state.mark_parent_done(&parent_id).await;
1861
1862 let update = OperationUpdate::start(&child_id, OperationType::Step)
1863 .with_parent_id(&parent_id);
1864 let checkpoint_result = state.create_checkpoint(update, true).await;
1865
1866 match checkpoint_result {
1867 Err(DurableError::OrphanedChild { operation_id, .. }) => {
1868 if operation_id != child_id {
1869 return Err(TestCaseError::fail(format!(
1870 "Expected operation_id '{}' in OrphanedChild error, got '{}'",
1871 child_id, operation_id
1872 )));
1873 }
1874 }
1875 Ok(_) => {
1876 return Err(TestCaseError::fail(
1877 "Expected OrphanedChild error, but checkpoint succeeded"
1878 ));
1879 }
1880 Err(other) => {
1881 return Err(TestCaseError::fail(format!(
1882 "Expected OrphanedChild error, got {:?}",
1883 other
1884 )));
1885 }
1886 }
1887
1888 Ok(())
1889 });
1890 result?;
1891 }
1892
1893 #[test]
1894 fn prop_non_orphaned_child_checkpoint_succeeds(
1895 parent_id in "[a-z]{5,10}",
1896 child_id in "[a-z]{5,10}",
1897 ) {
1898 let rt = tokio::runtime::Runtime::new().unwrap();
1899 let result: Result<(), TestCaseError> = rt.block_on(async {
1900 let client = Arc::new(
1901 MockDurableServiceClient::new()
1902 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token")))
1903 );
1904
1905 let state = ExecutionState::new(
1906 "arn:test",
1907 "token-123",
1908 InitialExecutionState::new(),
1909 client,
1910 );
1911
1912 let update = OperationUpdate::start(&child_id, OperationType::Step)
1913 .with_parent_id(&parent_id);
1914 let checkpoint_result = state.create_checkpoint(update, true).await;
1915
1916 if let Err(e) = checkpoint_result {
1917 return Err(TestCaseError::fail(format!(
1918 "Expected checkpoint to succeed for non-orphaned child, got error: {:?}",
1919 e
1920 )));
1921 }
1922
1923 Ok(())
1924 });
1925 result?;
1926 }
1927
1928 #[test]
1929 fn prop_root_operation_never_orphaned(
1930 operation_id in "[a-z]{5,10}",
1931 ) {
1932 let rt = tokio::runtime::Runtime::new().unwrap();
1933 let result: Result<(), TestCaseError> = rt.block_on(async {
1934 let client = Arc::new(
1935 MockDurableServiceClient::new()
1936 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token")))
1937 );
1938
1939 let state = ExecutionState::new(
1940 "arn:test",
1941 "token-123",
1942 InitialExecutionState::new(),
1943 client,
1944 );
1945
1946 let update = OperationUpdate::start(&operation_id, OperationType::Step);
1947 let checkpoint_result = state.create_checkpoint(update, true).await;
1948
1949 if let Err(e) = checkpoint_result {
1950 return Err(TestCaseError::fail(format!(
1951 "Expected checkpoint to succeed for root operation, got error: {:?}",
1952 e
1953 )));
1954 }
1955
1956 Ok(())
1957 });
1958 result?;
1959 }
1960
1961 #[test]
1962 fn prop_marking_parent_done_affects_future_checkpoints(
1963 parent_id in "[a-z]{5,10}",
1964 child_id_before in "[a-z]{5,10}",
1965 child_id_after in "[a-z]{5,10}",
1966 ) {
1967 let rt = tokio::runtime::Runtime::new().unwrap();
1968 let result: Result<(), TestCaseError> = rt.block_on(async {
1969 let client = Arc::new(
1970 MockDurableServiceClient::new()
1971 .with_checkpoint_response(Ok(CheckpointResponse::new("token-1")))
1972 .with_checkpoint_response(Ok(CheckpointResponse::new("token-2")))
1973 );
1974
1975 let state = ExecutionState::new(
1976 "arn:test",
1977 "token-123",
1978 InitialExecutionState::new(),
1979 client,
1980 );
1981
1982 let update_before = OperationUpdate::start(&child_id_before, OperationType::Step)
1983 .with_parent_id(&parent_id);
1984 let result_before = state.create_checkpoint(update_before, true).await;
1985
1986 if let Err(e) = result_before {
1987 return Err(TestCaseError::fail(format!(
1988 "Expected first checkpoint to succeed, got error: {:?}",
1989 e
1990 )));
1991 }
1992
1993 state.mark_parent_done(&parent_id).await;
1994
1995 let update_after = OperationUpdate::start(&child_id_after, OperationType::Step)
1996 .with_parent_id(&parent_id);
1997 let result_after = state.create_checkpoint(update_after, true).await;
1998
1999 match result_after {
2000 Err(DurableError::OrphanedChild { .. }) => {
2001 }
2003 Ok(_) => {
2004 return Err(TestCaseError::fail(
2005 "Expected OrphanedChild error after marking parent done"
2006 ));
2007 }
2008 Err(other) => {
2009 return Err(TestCaseError::fail(format!(
2010 "Expected OrphanedChild error, got {:?}",
2011 other
2012 )));
2013 }
2014 }
2015
2016 Ok(())
2017 });
2018 result?;
2019 }
2020 }
2021 }
2022}