1use std::collections::{HashMap, HashSet};
8use std::sync::atomic::{AtomicU8, Ordering};
9use std::sync::Arc;
10use std::time::Duration as StdDuration;
11
12use tokio::sync::{Mutex, RwLock};
13
14use crate::client::SharedDurableServiceClient;
15use crate::error::{DurableError, ErrorObject};
16use crate::operation::{Operation, OperationStatus, OperationType, OperationUpdate};
17use crate::types::ExecutionArn;
18
19use super::batcher::{
20 create_checkpoint_queue, CheckpointBatcher, CheckpointBatcherConfig, CheckpointSender,
21};
22use super::checkpoint_result::CheckpointedResult;
23use super::replay_status::ReplayStatus;
24
25pub struct ExecutionState {
30 durable_execution_arn: String,
32
33 checkpoint_token: Arc<RwLock<String>>,
35
36 operations: RwLock<HashMap<String, Operation>>,
38
39 service_client: SharedDurableServiceClient,
41
42 replay_status: AtomicU8,
44
45 replayed_operations: RwLock<HashSet<String>>,
47
48 next_marker: RwLock<Option<String>>,
50
51 parent_done_lock: Mutex<HashSet<String>>,
53
54 checkpoint_sender: Option<CheckpointSender>,
56
57 execution_operation: Option<Operation>,
60
61 checkpointing_mode: crate::config::CheckpointingMode,
63}
64
65impl ExecutionState {
66 pub fn new(
75 durable_execution_arn: impl Into<String>,
76 checkpoint_token: impl Into<String>,
77 initial_state: crate::lambda::InitialExecutionState,
78 service_client: SharedDurableServiceClient,
79 ) -> Self {
80 Self::with_checkpointing_mode(
81 durable_execution_arn,
82 checkpoint_token,
83 initial_state,
84 service_client,
85 crate::config::CheckpointingMode::default(),
86 )
87 }
88
89 pub fn with_checkpointing_mode(
91 durable_execution_arn: impl Into<String>,
92 checkpoint_token: impl Into<String>,
93 initial_state: crate::lambda::InitialExecutionState,
94 service_client: SharedDurableServiceClient,
95 checkpointing_mode: crate::config::CheckpointingMode,
96 ) -> Self {
97 let execution_operation = initial_state
99 .operations
100 .iter()
101 .find(|op| op.operation_type == OperationType::Execution)
102 .cloned();
103
104 let operations: HashMap<String, Operation> = initial_state
106 .operations
107 .into_iter()
108 .map(|op| (op.operation_id.clone(), op))
109 .collect();
110
111 let replay_status = if operations.is_empty() {
113 ReplayStatus::New
114 } else {
115 ReplayStatus::Replay
116 };
117
118 Self {
119 durable_execution_arn: durable_execution_arn.into(),
120 checkpoint_token: Arc::new(RwLock::new(checkpoint_token.into())),
121 operations: RwLock::new(operations),
122 service_client,
123 replay_status: AtomicU8::new(replay_status as u8),
124 replayed_operations: RwLock::new(HashSet::new()),
125 next_marker: RwLock::new(initial_state.next_marker),
126 parent_done_lock: Mutex::new(HashSet::new()),
127 checkpoint_sender: None,
128 execution_operation,
129 checkpointing_mode,
130 }
131 }
132
133 pub fn with_batcher(
139 durable_execution_arn: impl Into<String>,
140 checkpoint_token: impl Into<String>,
141 initial_state: crate::lambda::InitialExecutionState,
142 service_client: SharedDurableServiceClient,
143 batcher_config: CheckpointBatcherConfig,
144 queue_buffer_size: usize,
145 ) -> (Self, CheckpointBatcher) {
146 Self::with_batcher_and_mode(
147 durable_execution_arn,
148 checkpoint_token,
149 initial_state,
150 service_client,
151 batcher_config,
152 queue_buffer_size,
153 crate::config::CheckpointingMode::default(),
154 )
155 }
156
157 pub fn with_batcher_and_mode(
159 durable_execution_arn: impl Into<String>,
160 checkpoint_token: impl Into<String>,
161 initial_state: crate::lambda::InitialExecutionState,
162 service_client: SharedDurableServiceClient,
163 batcher_config: CheckpointBatcherConfig,
164 queue_buffer_size: usize,
165 checkpointing_mode: crate::config::CheckpointingMode,
166 ) -> (Self, CheckpointBatcher) {
167 let arn: String = durable_execution_arn.into();
168 let token: String = checkpoint_token.into();
169
170 let execution_operation = initial_state
172 .operations
173 .iter()
174 .find(|op| op.operation_type == OperationType::Execution)
175 .cloned();
176
177 let operations: HashMap<String, Operation> = initial_state
179 .operations
180 .into_iter()
181 .map(|op| (op.operation_id.clone(), op))
182 .collect();
183
184 let replay_status = if operations.is_empty() {
186 ReplayStatus::New
187 } else {
188 ReplayStatus::Replay
189 };
190
191 let (sender, rx) = create_checkpoint_queue(queue_buffer_size);
193 let checkpoint_token = Arc::new(RwLock::new(token));
194
195 let batcher = CheckpointBatcher::new(
197 batcher_config,
198 rx,
199 service_client.clone(),
200 arn.clone(),
201 checkpoint_token.clone(),
202 );
203
204 let state = Self {
205 durable_execution_arn: arn,
206 checkpoint_token,
207 operations: RwLock::new(operations),
208 service_client,
209 replay_status: AtomicU8::new(replay_status as u8),
210 replayed_operations: RwLock::new(HashSet::new()),
211 next_marker: RwLock::new(initial_state.next_marker),
212 parent_done_lock: Mutex::new(HashSet::new()),
213 checkpoint_sender: Some(sender),
214 execution_operation,
215 checkpointing_mode,
216 };
217
218 (state, batcher)
219 }
220
221 pub fn durable_execution_arn(&self) -> &str {
223 &self.durable_execution_arn
224 }
225
226 #[inline]
228 pub fn durable_execution_arn_typed(&self) -> ExecutionArn {
229 ExecutionArn::from(self.durable_execution_arn.clone())
230 }
231
232 pub async fn checkpoint_token(&self) -> String {
234 self.checkpoint_token.read().await.clone()
235 }
236
237 pub async fn set_checkpoint_token(&self, token: impl Into<String>) {
239 let mut guard = self.checkpoint_token.write().await;
240 *guard = token.into();
241 }
242
243 pub fn replay_status(&self) -> ReplayStatus {
254 ReplayStatus::from(self.replay_status.load(Ordering::Acquire))
257 }
258
259 pub fn is_replay(&self) -> bool {
261 self.replay_status().is_replay()
262 }
263
264 pub fn is_new(&self) -> bool {
266 self.replay_status().is_new()
267 }
268
269 pub fn checkpointing_mode(&self) -> crate::config::CheckpointingMode {
271 self.checkpointing_mode
272 }
273
274 pub fn is_eager_checkpointing(&self) -> bool {
276 self.checkpointing_mode.is_eager()
277 }
278
279 pub fn is_batched_checkpointing(&self) -> bool {
281 self.checkpointing_mode.is_batched()
282 }
283
284 pub fn is_optimistic_checkpointing(&self) -> bool {
286 self.checkpointing_mode.is_optimistic()
287 }
288
289 pub fn execution_operation(&self) -> Option<&Operation> {
291 self.execution_operation.as_ref()
292 }
293
294 pub fn get_original_input_raw(&self) -> Option<&str> {
296 self.execution_operation
297 .as_ref()
298 .and_then(|op| op.execution_details.as_ref())
299 .and_then(|details| details.input_payload.as_deref())
300 }
301
302 pub fn execution_operation_id(&self) -> Option<&str> {
304 self.execution_operation
305 .as_ref()
306 .map(|op| op.operation_id.as_str())
307 }
308
309 pub async fn complete_execution_success(
311 &self,
312 result: Option<String>,
313 ) -> Result<(), DurableError> {
314 let execution_id =
315 self.execution_operation_id()
316 .ok_or_else(|| DurableError::Validation {
317 message: "Cannot complete execution: no EXECUTION operation exists".to_string(),
318 })?;
319
320 let update = OperationUpdate::succeed(execution_id, OperationType::Execution, result);
321
322 self.create_checkpoint(update, true).await
323 }
324
325 pub async fn complete_execution_failure(&self, error: ErrorObject) -> Result<(), DurableError> {
327 let execution_id =
328 self.execution_operation_id()
329 .ok_or_else(|| DurableError::Validation {
330 message: "Cannot complete execution: no EXECUTION operation exists".to_string(),
331 })?;
332
333 let update = OperationUpdate::fail(execution_id, OperationType::Execution, error);
334
335 self.create_checkpoint(update, true).await
336 }
337
338 pub async fn get_checkpoint_result(&self, operation_id: &str) -> CheckpointedResult {
340 let operations = self.operations.read().await;
341 CheckpointedResult::new(operations.get(operation_id).cloned())
342 }
343
344 pub async fn track_replay(&self, operation_id: &str) {
346 {
347 let mut replayed = self.replayed_operations.write().await;
348 replayed.insert(operation_id.to_string());
349 }
350
351 let (replayed_count, total_count) = {
352 let replayed = self.replayed_operations.read().await;
353 let operations = self.operations.read().await;
354 (replayed.len(), operations.len())
355 };
356
357 if replayed_count >= total_count {
358 let has_more = self.next_marker.read().await.is_some();
359 if !has_more {
360 self.replay_status
367 .store(ReplayStatus::New as u8, Ordering::Release);
368 }
369 }
370 }
371
372 pub async fn load_more_operations(&self) -> Result<bool, DurableError> {
374 let marker = {
375 let guard = self.next_marker.read().await;
376 match guard.as_ref() {
377 Some(m) => m.clone(),
378 None => return Ok(false),
379 }
380 };
381
382 let response = self.get_operations_with_retry(&marker).await?;
383
384 {
385 let mut operations = self.operations.write().await;
386 for op in response.operations {
387 operations.insert(op.operation_id.clone(), op);
388 }
389 }
390
391 {
392 let mut next_marker = self.next_marker.write().await;
393 *next_marker = response.next_marker;
394 }
395
396 Ok(true)
397 }
398
399 async fn get_operations_with_retry(
401 &self,
402 marker: &str,
403 ) -> Result<crate::client::GetOperationsResponse, DurableError> {
404 const MAX_RETRIES: u32 = 5;
405 const INITIAL_DELAY_MS: u64 = 100;
406 const MAX_DELAY_MS: u64 = 10_000;
407 const BACKOFF_MULTIPLIER: u64 = 2;
408
409 let mut attempt = 0;
410 let mut delay_ms = INITIAL_DELAY_MS;
411
412 loop {
413 let result = self
414 .service_client
415 .get_operations(&self.durable_execution_arn, marker)
416 .await;
417
418 match result {
419 Ok(response) => return Ok(response),
420 Err(error) if error.is_throttling() => {
421 attempt += 1;
422 if attempt > MAX_RETRIES {
423 tracing::warn!(
424 attempt = attempt,
425 "GetOperations throttling: max retries exceeded"
426 );
427 return Err(error);
428 }
429
430 let actual_delay = error.get_retry_after_ms().unwrap_or(delay_ms);
431 tracing::debug!(
432 attempt = attempt,
433 delay_ms = actual_delay,
434 "GetOperations throttled, retrying"
435 );
436 tokio::time::sleep(StdDuration::from_millis(actual_delay)).await;
437 delay_ms = (delay_ms * BACKOFF_MULTIPLIER).min(MAX_DELAY_MS);
438 }
439 Err(error) => return Err(error),
440 }
441 }
442 }
443
444 pub async fn load_all_operations(&self) -> Result<(), DurableError> {
446 while self.load_more_operations().await? {}
447 Ok(())
448 }
449
450 pub async fn has_more_operations(&self) -> bool {
452 self.next_marker.read().await.is_some()
453 }
454
455 pub async fn operation_count(&self) -> usize {
457 self.operations.read().await.len()
458 }
459
460 pub fn service_client(&self) -> &SharedDurableServiceClient {
462 &self.service_client
463 }
464
465 pub async fn add_operation(&self, operation: Operation) {
467 let mut operations = self.operations.write().await;
468 operations.insert(operation.operation_id.clone(), operation);
469 }
470
471 pub async fn update_operation(
473 &self,
474 operation_id: &str,
475 update_fn: impl FnOnce(&mut Operation),
476 ) {
477 let mut operations = self.operations.write().await;
478 if let Some(op) = operations.get_mut(operation_id) {
479 update_fn(op);
480 }
481 }
482
483 pub async fn has_operation(&self, operation_id: &str) -> bool {
485 self.operations.read().await.contains_key(operation_id)
486 }
487
488 pub async fn get_operation(&self, operation_id: &str) -> Option<Operation> {
490 self.operations.read().await.get(operation_id).cloned()
491 }
492
493 pub async fn mark_parent_done(&self, parent_id: &str) {
495 let mut done_parents = self.parent_done_lock.lock().await;
496 done_parents.insert(parent_id.to_string());
497 }
498
499 pub async fn is_parent_done(&self, parent_id: &str) -> bool {
501 let done_parents = self.parent_done_lock.lock().await;
502 done_parents.contains(parent_id)
503 }
504
505 pub async fn is_orphaned(&self, parent_id: Option<&str>) -> bool {
507 match parent_id {
508 Some(pid) => self.is_parent_done(pid).await,
509 None => false,
510 }
511 }
512
513 pub async fn create_checkpoint(
515 &self,
516 operation: OperationUpdate,
517 is_sync: bool,
518 ) -> Result<(), DurableError> {
519 if let Some(ref parent_id) = operation.parent_id {
521 if self.is_parent_done(parent_id).await {
522 return Err(DurableError::OrphanedChild {
523 message: format!(
524 "Cannot checkpoint operation {} - parent {} has completed",
525 operation.operation_id, parent_id
526 ),
527 operation_id: operation.operation_id.clone(),
528 });
529 }
530 }
531
532 let effective_is_sync = match self.checkpointing_mode {
534 crate::config::CheckpointingMode::Eager => true,
535 crate::config::CheckpointingMode::Batched => is_sync,
536 crate::config::CheckpointingMode::Optimistic => is_sync,
537 };
538
539 if self.checkpointing_mode.is_eager() {
541 return self.checkpoint_direct(operation, effective_is_sync).await;
542 }
543
544 if let Some(ref sender) = self.checkpoint_sender {
546 let result = sender
547 .checkpoint(operation.clone(), effective_is_sync)
548 .await;
549 if result.is_ok() {
550 self.update_local_cache_from_update(&operation).await;
551 }
552 return result;
553 }
554
555 self.checkpoint_direct(operation, effective_is_sync).await
557 }
558
559 async fn checkpoint_direct(
561 &self,
562 operation: OperationUpdate,
563 _is_sync: bool,
564 ) -> Result<(), DurableError> {
565 let token = self.checkpoint_token.read().await.clone();
566 let response = self
567 .service_client
568 .checkpoint(&self.durable_execution_arn, &token, vec![operation.clone()])
569 .await?;
570
571 {
572 let mut token_guard = self.checkpoint_token.write().await;
573 *token_guard = response.checkpoint_token;
574 }
575
576 if let Some(ref new_state) = response.new_execution_state {
578 self.update_local_cache_from_response(new_state).await;
579 } else {
580 self.update_local_cache_from_update(&operation).await;
581 }
582 Ok(())
583 }
584
585 pub async fn create_checkpoint_with_response(
588 &self,
589 operation: OperationUpdate,
590 ) -> Result<crate::client::CheckpointResponse, DurableError> {
591 if let Some(ref parent_id) = operation.parent_id {
593 if self.is_parent_done(parent_id).await {
594 return Err(DurableError::OrphanedChild {
595 message: format!(
596 "Cannot checkpoint operation {} - parent {} has completed",
597 operation.operation_id, parent_id
598 ),
599 operation_id: operation.operation_id.clone(),
600 });
601 }
602 }
603
604 let token = self.checkpoint_token.read().await.clone();
605 let response = self
606 .service_client
607 .checkpoint(&self.durable_execution_arn, &token, vec![operation.clone()])
608 .await?;
609
610 tracing::debug!(
611 has_new_state = response.new_execution_state.is_some(),
612 num_operations = response
613 .new_execution_state
614 .as_ref()
615 .map(|s| s.operations.len())
616 .unwrap_or(0),
617 "Checkpoint response received"
618 );
619
620 {
621 let mut token_guard = self.checkpoint_token.write().await;
622 *token_guard = response.checkpoint_token.clone();
623 }
624
625 if let Some(ref new_state) = response.new_execution_state {
627 self.update_local_cache_from_response(new_state).await;
628 } else {
629 self.update_local_cache_from_update(&operation).await;
630 }
631
632 Ok(response)
633 }
634
635 async fn update_local_cache_from_response(&self, new_state: &crate::client::NewExecutionState) {
637 let mut operations = self.operations.write().await;
638 for op in &new_state.operations {
639 operations.insert(op.operation_id.clone(), op.clone());
640 }
641 }
642
643 pub async fn checkpoint_sync(&self, operation: OperationUpdate) -> Result<(), DurableError> {
645 self.create_checkpoint(operation, true).await
646 }
647
648 pub async fn checkpoint_async(&self, operation: OperationUpdate) -> Result<(), DurableError> {
650 self.create_checkpoint(operation, false).await
651 }
652
653 pub async fn checkpoint_optimal(
655 &self,
656 operation: OperationUpdate,
657 prefer_sync: bool,
658 ) -> Result<(), DurableError> {
659 let is_sync = match self.checkpointing_mode {
660 crate::config::CheckpointingMode::Eager => true,
661 crate::config::CheckpointingMode::Batched => prefer_sync,
662 crate::config::CheckpointingMode::Optimistic => prefer_sync,
663 };
664 self.create_checkpoint(operation, is_sync).await
665 }
666
667 pub fn should_use_async_checkpoint(&self) -> bool {
669 match self.checkpointing_mode {
670 crate::config::CheckpointingMode::Eager => false,
671 crate::config::CheckpointingMode::Batched => true,
672 crate::config::CheckpointingMode::Optimistic => true,
673 }
674 }
675
676 pub fn prioritizes_performance(&self) -> bool {
678 self.checkpointing_mode.is_optimistic()
679 }
680
681 pub fn prioritizes_durability(&self) -> bool {
683 self.checkpointing_mode.is_eager()
684 }
685
686 async fn update_local_cache_from_update(&self, update: &OperationUpdate) {
688 let mut operations = self.operations.write().await;
689
690 match update.action {
691 crate::operation::OperationAction::Start => {
692 let mut op = Operation::new(&update.operation_id, update.operation_type);
693 op.parent_id = update.parent_id.clone();
694 op.name = update.name.clone();
695 operations.insert(update.operation_id.clone(), op);
696 }
697 crate::operation::OperationAction::Succeed => {
698 if let Some(op) = operations.get_mut(&update.operation_id) {
699 op.status = OperationStatus::Succeeded;
700 op.result = update.result.clone();
701 } else {
702 let mut op = Operation::new(&update.operation_id, update.operation_type);
703 op.status = OperationStatus::Succeeded;
704 op.result = update.result.clone();
705 op.parent_id = update.parent_id.clone();
706 op.name = update.name.clone();
707 operations.insert(update.operation_id.clone(), op);
708 }
709 }
710 crate::operation::OperationAction::Fail => {
711 if let Some(op) = operations.get_mut(&update.operation_id) {
712 op.status = OperationStatus::Failed;
713 op.error = update.error.clone();
714 } else {
715 let mut op = Operation::new(&update.operation_id, update.operation_type);
716 op.status = OperationStatus::Failed;
717 op.error = update.error.clone();
718 op.parent_id = update.parent_id.clone();
719 op.name = update.name.clone();
720 operations.insert(update.operation_id.clone(), op);
721 }
722 }
723 crate::operation::OperationAction::Cancel => {
724 if let Some(op) = operations.get_mut(&update.operation_id) {
725 op.status = OperationStatus::Cancelled;
726 } else {
727 let mut op = Operation::new(&update.operation_id, update.operation_type);
728 op.status = OperationStatus::Cancelled;
729 op.parent_id = update.parent_id.clone();
730 op.name = update.name.clone();
731 operations.insert(update.operation_id.clone(), op);
732 }
733 }
734 crate::operation::OperationAction::Retry => {
735 if let Some(op) = operations.get_mut(&update.operation_id) {
736 op.status = OperationStatus::Pending;
737 if update.result.is_some() || update.step_options.is_some() {
738 let step_details =
739 op.step_details
740 .get_or_insert(crate::operation::StepDetails {
741 result: None,
742 attempt: None,
743 next_attempt_timestamp: None,
744 error: None,
745 payload: None,
746 });
747 if update.result.is_some() {
748 step_details.payload = update.result.clone();
749 }
750 step_details.attempt = Some(step_details.attempt.unwrap_or(0) + 1);
751 }
752 if update.error.is_some() {
753 op.error = update.error.clone();
754 }
755 } else {
756 let mut op = Operation::new(&update.operation_id, update.operation_type);
757 op.status = OperationStatus::Pending;
758 op.parent_id = update.parent_id.clone();
759 op.name = update.name.clone();
760 op.error = update.error.clone();
761 if update.result.is_some() || update.step_options.is_some() {
762 op.step_details = Some(crate::operation::StepDetails {
763 result: None,
764 attempt: Some(1),
765 next_attempt_timestamp: None,
766 error: None,
767 payload: update.result.clone(),
768 });
769 }
770 operations.insert(update.operation_id.clone(), op);
771 }
772 }
773 }
774 }
775
776 pub fn shared_checkpoint_token(&self) -> Arc<RwLock<String>> {
778 self.checkpoint_token.clone()
779 }
780
781 pub fn has_checkpoint_sender(&self) -> bool {
783 self.checkpoint_sender.is_some()
784 }
785
786 pub async fn load_child_operations(
788 &self,
789 parent_id: &str,
790 ) -> Result<Vec<Operation>, DurableError> {
791 let operations = self.operations.read().await;
792 let children: Vec<Operation> = operations
793 .values()
794 .filter(|op| op.parent_id.as_deref() == Some(parent_id))
795 .cloned()
796 .collect();
797
798 Ok(children)
799 }
800
801 pub async fn get_child_operations(&self, parent_id: &str) -> Vec<Operation> {
803 let operations = self.operations.read().await;
804 operations
805 .values()
806 .filter(|op| op.parent_id.as_deref() == Some(parent_id))
807 .cloned()
808 .collect()
809 }
810
811 pub async fn has_replay_children(&self, operation_id: &str) -> bool {
813 let operations = self.operations.read().await;
814 operations
815 .get(operation_id)
816 .filter(|op| op.operation_type == OperationType::Context)
817 .and_then(|op| op.context_details.as_ref())
818 .and_then(|details| details.replay_children)
819 .unwrap_or(false)
820 }
821}
822
823impl std::fmt::Debug for ExecutionState {
825 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
826 f.debug_struct("ExecutionState")
827 .field("durable_execution_arn", &self.durable_execution_arn)
828 .field("replay_status", &self.replay_status())
829 .finish_non_exhaustive()
830 }
831}
832
833#[cfg(test)]
834mod tests {
835 use super::*;
836 use std::sync::Arc;
837
838 use crate::client::{
839 CheckpointResponse, GetOperationsResponse, MockDurableServiceClient,
840 SharedDurableServiceClient,
841 };
842 use crate::error::ErrorObject;
843 use crate::lambda::InitialExecutionState;
844 use crate::operation::{
845 ContextDetails, ExecutionDetails, Operation, OperationStatus, OperationType,
846 OperationUpdate,
847 };
848
849 fn create_mock_client() -> SharedDurableServiceClient {
850 Arc::new(MockDurableServiceClient::new())
851 }
852
853 fn create_test_operation(id: &str, status: OperationStatus) -> Operation {
854 let mut op = Operation::new(id, OperationType::Step);
855 op.status = status;
856 op
857 }
858
859 fn create_execution_operation(input_payload: Option<&str>) -> Operation {
860 let mut op = Operation::new("exec-123", OperationType::Execution);
861 op.status = OperationStatus::Started;
862 op.execution_details = Some(ExecutionDetails {
863 input_payload: input_payload.map(|s| s.to_string()),
864 });
865 op
866 }
867
868 #[tokio::test]
871 async fn test_execution_state_new_empty() {
872 let client = create_mock_client();
873 let state = ExecutionState::new(
874 "arn:test",
875 "token-123",
876 InitialExecutionState::new(),
877 client,
878 );
879
880 assert_eq!(state.durable_execution_arn(), "arn:test");
881 assert_eq!(state.checkpoint_token().await, "token-123");
882 assert!(state.is_new());
883 assert!(!state.is_replay());
884 assert_eq!(state.operation_count().await, 0);
885 assert!(!state.has_more_operations().await);
886 }
887
888 #[tokio::test]
889 async fn test_execution_state_new_with_operations() {
890 let client = create_mock_client();
891 let ops = vec![
892 create_test_operation("op-1", OperationStatus::Succeeded),
893 create_test_operation("op-2", OperationStatus::Succeeded),
894 ];
895 let initial_state = InitialExecutionState::with_operations(ops);
896
897 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
898
899 assert!(state.is_replay());
900 assert!(!state.is_new());
901 assert_eq!(state.operation_count().await, 2);
902 }
903
904 #[tokio::test]
905 async fn test_get_checkpoint_result_exists() {
906 let client = create_mock_client();
907 let mut op = create_test_operation("op-1", OperationStatus::Succeeded);
908 op.result = Some(r#"{"value": 42}"#.to_string());
909 let initial_state = InitialExecutionState::with_operations(vec![op]);
910
911 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
912
913 let result = state.get_checkpoint_result("op-1").await;
914 assert!(result.is_existent());
915 assert!(result.is_succeeded());
916 assert_eq!(result.result(), Some(r#"{"value": 42}"#));
917 }
918
919 #[tokio::test]
920 async fn test_get_checkpoint_result_not_exists() {
921 let client = create_mock_client();
922 let state = ExecutionState::new(
923 "arn:test",
924 "token-123",
925 InitialExecutionState::new(),
926 client,
927 );
928
929 let result = state.get_checkpoint_result("non-existent").await;
930 assert!(!result.is_existent());
931 }
932
933 #[tokio::test]
934 async fn test_track_replay_transitions_to_new() {
935 let client = create_mock_client();
936 let ops = vec![
937 create_test_operation("op-1", OperationStatus::Succeeded),
938 create_test_operation("op-2", OperationStatus::Succeeded),
939 ];
940 let initial_state = InitialExecutionState::with_operations(ops);
941
942 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
943
944 assert!(state.is_replay());
945 state.track_replay("op-1").await;
946 assert!(state.is_replay());
947 state.track_replay("op-2").await;
948 assert!(state.is_new());
949 }
950
951 #[tokio::test]
952 async fn test_track_replay_with_pagination() {
953 let client = Arc::new(
954 MockDurableServiceClient::new().with_get_operations_response(Ok(
955 GetOperationsResponse {
956 operations: vec![create_test_operation("op-3", OperationStatus::Succeeded)],
957 next_marker: None,
958 },
959 )),
960 );
961
962 let ops = vec![create_test_operation("op-1", OperationStatus::Succeeded)];
963 let mut initial_state = InitialExecutionState::with_operations(ops);
964 initial_state.next_marker = Some("marker-1".to_string());
965
966 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
967
968 state.track_replay("op-1").await;
969 assert!(state.is_replay());
970 }
971
972 #[tokio::test]
973 async fn test_set_checkpoint_token() {
974 let client = create_mock_client();
975 let state = ExecutionState::new(
976 "arn:test",
977 "token-123",
978 InitialExecutionState::new(),
979 client,
980 );
981
982 assert_eq!(state.checkpoint_token().await, "token-123");
983 state.set_checkpoint_token("token-456").await;
984 assert_eq!(state.checkpoint_token().await, "token-456");
985 }
986
987 #[tokio::test]
988 async fn test_add_operation() {
989 let client = create_mock_client();
990 let state = ExecutionState::new(
991 "arn:test",
992 "token-123",
993 InitialExecutionState::new(),
994 client,
995 );
996
997 assert!(!state.has_operation("op-1").await);
998
999 let op = create_test_operation("op-1", OperationStatus::Succeeded);
1000 state.add_operation(op).await;
1001
1002 assert!(state.has_operation("op-1").await);
1003 assert_eq!(state.operation_count().await, 1);
1004 }
1005
1006 #[tokio::test]
1007 async fn test_update_operation() {
1008 let client = create_mock_client();
1009 let ops = vec![create_test_operation("op-1", OperationStatus::Started)];
1010 let initial_state = InitialExecutionState::with_operations(ops);
1011
1012 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1013
1014 let op = state.get_operation("op-1").await.unwrap();
1015 assert_eq!(op.status, OperationStatus::Started);
1016
1017 state
1018 .update_operation("op-1", |op| {
1019 op.status = OperationStatus::Succeeded;
1020 op.result = Some("done".to_string());
1021 })
1022 .await;
1023
1024 let op = state.get_operation("op-1").await.unwrap();
1025 assert_eq!(op.status, OperationStatus::Succeeded);
1026 assert_eq!(op.result, Some("done".to_string()));
1027 }
1028
1029 #[tokio::test]
1030 async fn test_load_more_operations() {
1031 let client = Arc::new(
1032 MockDurableServiceClient::new().with_get_operations_response(Ok(
1033 GetOperationsResponse {
1034 operations: vec![
1035 create_test_operation("op-2", OperationStatus::Succeeded),
1036 create_test_operation("op-3", OperationStatus::Succeeded),
1037 ],
1038 next_marker: None,
1039 },
1040 )),
1041 );
1042
1043 let ops = vec![create_test_operation("op-1", OperationStatus::Succeeded)];
1044 let mut initial_state = InitialExecutionState::with_operations(ops);
1045 initial_state.next_marker = Some("marker-1".to_string());
1046
1047 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1048
1049 assert_eq!(state.operation_count().await, 1);
1050 assert!(state.has_more_operations().await);
1051
1052 let loaded = state.load_more_operations().await.unwrap();
1053 assert!(loaded);
1054
1055 assert_eq!(state.operation_count().await, 3);
1056 assert!(!state.has_more_operations().await);
1057 }
1058
1059 #[tokio::test]
1060 async fn test_load_more_operations_no_more() {
1061 let client = create_mock_client();
1062 let state = ExecutionState::new(
1063 "arn:test",
1064 "token-123",
1065 InitialExecutionState::new(),
1066 client,
1067 );
1068
1069 let loaded = state.load_more_operations().await.unwrap();
1070 assert!(!loaded);
1071 }
1072
1073 #[tokio::test]
1074 async fn test_load_all_operations() {
1075 let client = Arc::new(
1076 MockDurableServiceClient::new()
1077 .with_get_operations_response(Ok(GetOperationsResponse {
1078 operations: vec![create_test_operation("op-2", OperationStatus::Succeeded)],
1079 next_marker: Some("marker-2".to_string()),
1080 }))
1081 .with_get_operations_response(Ok(GetOperationsResponse {
1082 operations: vec![create_test_operation("op-3", OperationStatus::Succeeded)],
1083 next_marker: None,
1084 })),
1085 );
1086
1087 let ops = vec![create_test_operation("op-1", OperationStatus::Succeeded)];
1088 let mut initial_state = InitialExecutionState::with_operations(ops);
1089 initial_state.next_marker = Some("marker-1".to_string());
1090
1091 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1092
1093 assert_eq!(state.operation_count().await, 1);
1094 state.load_all_operations().await.unwrap();
1095
1096 assert_eq!(state.operation_count().await, 3);
1097 assert!(!state.has_more_operations().await);
1098 }
1099
1100 #[tokio::test]
1101 async fn test_mark_parent_done() {
1102 let client = create_mock_client();
1103 let state = ExecutionState::new(
1104 "arn:test",
1105 "token-123",
1106 InitialExecutionState::new(),
1107 client,
1108 );
1109
1110 assert!(!state.is_parent_done("parent-1").await);
1111 state.mark_parent_done("parent-1").await;
1112 assert!(state.is_parent_done("parent-1").await);
1113 assert!(!state.is_parent_done("parent-2").await);
1114 }
1115
1116 #[tokio::test]
1117 async fn test_is_orphaned() {
1118 let client = create_mock_client();
1119 let state = ExecutionState::new(
1120 "arn:test",
1121 "token-123",
1122 InitialExecutionState::new(),
1123 client,
1124 );
1125
1126 assert!(!state.is_orphaned(None).await);
1127 assert!(!state.is_orphaned(Some("parent-1")).await);
1128
1129 state.mark_parent_done("parent-1").await;
1130
1131 assert!(state.is_orphaned(Some("parent-1")).await);
1132 assert!(!state.is_orphaned(Some("parent-2")).await);
1133 }
1134
1135 #[tokio::test]
1136 async fn test_debug_impl() {
1137 let client = create_mock_client();
1138 let state = ExecutionState::new(
1139 "arn:test",
1140 "token-123",
1141 InitialExecutionState::new(),
1142 client,
1143 );
1144
1145 let debug_str = format!("{:?}", state);
1146 assert!(debug_str.contains("ExecutionState"));
1147 assert!(debug_str.contains("arn:test"));
1148 }
1149
1150 #[tokio::test]
1151 async fn test_create_checkpoint_direct() {
1152 let client = Arc::new(
1153 MockDurableServiceClient::new()
1154 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1155 );
1156
1157 let state = ExecutionState::new(
1158 "arn:test",
1159 "token-123",
1160 InitialExecutionState::new(),
1161 client,
1162 );
1163
1164 let update = OperationUpdate::start("op-1", OperationType::Step);
1165 let result = state.create_checkpoint(update, true).await;
1166
1167 assert!(result.is_ok());
1168 assert_eq!(state.checkpoint_token().await, "new-token");
1169 assert!(state.has_operation("op-1").await);
1170 }
1171
1172 #[tokio::test]
1173 async fn test_create_checkpoint_updates_local_cache_on_succeed() {
1174 let client = Arc::new(
1175 MockDurableServiceClient::new()
1176 .with_checkpoint_response(Ok(CheckpointResponse::new("token-1")))
1177 .with_checkpoint_response(Ok(CheckpointResponse::new("token-2"))),
1178 );
1179
1180 let state = ExecutionState::new(
1181 "arn:test",
1182 "token-123",
1183 InitialExecutionState::new(),
1184 client,
1185 );
1186
1187 let update = OperationUpdate::start("op-1", OperationType::Step);
1188 state.create_checkpoint(update, true).await.unwrap();
1189
1190 let op = state.get_operation("op-1").await.unwrap();
1191 assert_eq!(op.status, OperationStatus::Started);
1192
1193 let update = OperationUpdate::succeed(
1194 "op-1",
1195 OperationType::Step,
1196 Some(r#"{"result": "ok"}"#.to_string()),
1197 );
1198 state.create_checkpoint(update, true).await.unwrap();
1199
1200 let op = state.get_operation("op-1").await.unwrap();
1201 assert_eq!(op.status, OperationStatus::Succeeded);
1202 assert_eq!(op.result, Some(r#"{"result": "ok"}"#.to_string()));
1203 }
1204
1205 #[tokio::test]
1206 async fn test_create_checkpoint_rejects_orphaned_child() {
1207 let client = create_mock_client();
1208 let state = ExecutionState::new(
1209 "arn:test",
1210 "token-123",
1211 InitialExecutionState::new(),
1212 client,
1213 );
1214
1215 state.mark_parent_done("parent-1").await;
1216
1217 let update =
1218 OperationUpdate::start("child-1", OperationType::Step).with_parent_id("parent-1");
1219 let result = state.create_checkpoint(update, true).await;
1220
1221 assert!(result.is_err());
1222 match result.unwrap_err() {
1223 crate::error::DurableError::OrphanedChild { operation_id, .. } => {
1224 assert_eq!(operation_id, "child-1");
1225 }
1226 _ => panic!("Expected OrphanedChild error"),
1227 }
1228 }
1229
1230 #[tokio::test]
1231 async fn test_with_batcher_creates_state_and_batcher() {
1232 let client = Arc::new(
1233 MockDurableServiceClient::new()
1234 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1235 );
1236
1237 let (state, mut batcher) = ExecutionState::with_batcher(
1238 "arn:test",
1239 "token-123",
1240 InitialExecutionState::new(),
1241 client,
1242 CheckpointBatcherConfig {
1243 max_batch_time_ms: 10,
1244 ..Default::default()
1245 },
1246 100,
1247 );
1248
1249 assert!(state.has_checkpoint_sender());
1250 assert_eq!(state.durable_execution_arn(), "arn:test");
1251
1252 let batcher_handle = tokio::spawn(async move {
1253 batcher.run().await;
1254 });
1255
1256 let update = OperationUpdate::start("op-1", OperationType::Step);
1257 let result = state.create_checkpoint(update, true).await;
1258
1259 drop(state);
1260 batcher_handle.await.unwrap();
1261
1262 assert!(result.is_ok());
1263 }
1264
1265 #[tokio::test]
1266 async fn test_checkpoint_sync_convenience_method() {
1267 let client = Arc::new(
1268 MockDurableServiceClient::new()
1269 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1270 );
1271
1272 let state = ExecutionState::new(
1273 "arn:test",
1274 "token-123",
1275 InitialExecutionState::new(),
1276 client,
1277 );
1278
1279 let update = OperationUpdate::start("op-1", OperationType::Step);
1280 let result = state.checkpoint_sync(update).await;
1281
1282 assert!(result.is_ok());
1283 }
1284
1285 #[tokio::test]
1286 async fn test_checkpoint_async_convenience_method() {
1287 let client = Arc::new(
1288 MockDurableServiceClient::new()
1289 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1290 );
1291
1292 let state = ExecutionState::new(
1293 "arn:test",
1294 "token-123",
1295 InitialExecutionState::new(),
1296 client,
1297 );
1298
1299 let update = OperationUpdate::start("op-1", OperationType::Step);
1300 let result = state.checkpoint_async(update).await;
1301
1302 assert!(result.is_ok());
1303 }
1304
1305 #[tokio::test]
1306 async fn test_shared_checkpoint_token() {
1307 let client = create_mock_client();
1308 let state = ExecutionState::new(
1309 "arn:test",
1310 "token-123",
1311 InitialExecutionState::new(),
1312 client,
1313 );
1314
1315 let shared_token = state.shared_checkpoint_token();
1316 assert_eq!(*shared_token.read().await, "token-123");
1317
1318 {
1319 let mut guard = shared_token.write().await;
1320 *guard = "modified-token".to_string();
1321 }
1322
1323 assert_eq!(state.checkpoint_token().await, "modified-token");
1324 }
1325
1326 #[tokio::test]
1329 async fn test_load_child_operations_returns_children() {
1330 let client = create_mock_client();
1331
1332 let mut parent_op = Operation::new("parent-ctx", OperationType::Context);
1333 parent_op.status = OperationStatus::Succeeded;
1334
1335 let mut child1 = create_test_operation("child-1", OperationStatus::Succeeded);
1336 child1.parent_id = Some("parent-ctx".to_string());
1337
1338 let mut child2 = create_test_operation("child-2", OperationStatus::Succeeded);
1339 child2.parent_id = Some("parent-ctx".to_string());
1340
1341 let mut other_child = create_test_operation("other-child", OperationStatus::Succeeded);
1342 other_child.parent_id = Some("other-parent".to_string());
1343
1344 let initial_state =
1345 InitialExecutionState::with_operations(vec![parent_op, child1, child2, other_child]);
1346
1347 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1348
1349 let children = state.load_child_operations("parent-ctx").await.unwrap();
1350
1351 assert_eq!(children.len(), 2);
1352 let child_ids: Vec<&str> = children.iter().map(|c| c.operation_id.as_str()).collect();
1353 assert!(child_ids.contains(&"child-1"));
1354 assert!(child_ids.contains(&"child-2"));
1355 }
1356
1357 #[tokio::test]
1358 async fn test_load_child_operations_no_children() {
1359 let client = create_mock_client();
1360
1361 let parent_op = Operation::new("parent-ctx", OperationType::Context);
1362 let initial_state = InitialExecutionState::with_operations(vec![parent_op]);
1363
1364 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1365
1366 let children = state.load_child_operations("parent-ctx").await.unwrap();
1367
1368 assert!(children.is_empty());
1369 }
1370
1371 #[tokio::test]
1372 async fn test_get_child_operations_returns_cached_children() {
1373 let client = create_mock_client();
1374
1375 let mut parent_op = Operation::new("parent-ctx", OperationType::Context);
1376 parent_op.status = OperationStatus::Succeeded;
1377
1378 let mut child1 = create_test_operation("child-1", OperationStatus::Succeeded);
1379 child1.parent_id = Some("parent-ctx".to_string());
1380
1381 let mut child2 = create_test_operation("child-2", OperationStatus::Succeeded);
1382 child2.parent_id = Some("parent-ctx".to_string());
1383
1384 let initial_state = InitialExecutionState::with_operations(vec![parent_op, child1, child2]);
1385
1386 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1387
1388 let children = state.get_child_operations("parent-ctx").await;
1389
1390 assert_eq!(children.len(), 2);
1391 }
1392
1393 #[tokio::test]
1394 async fn test_get_child_operations_empty_for_nonexistent_parent() {
1395 let client = create_mock_client();
1396 let state = ExecutionState::new(
1397 "arn:test",
1398 "token-123",
1399 InitialExecutionState::new(),
1400 client,
1401 );
1402
1403 let children = state.get_child_operations("nonexistent-parent").await;
1404 assert!(children.is_empty());
1405 }
1406
1407 #[tokio::test]
1408 async fn test_has_replay_children_true() {
1409 let client = create_mock_client();
1410
1411 let mut ctx_op = Operation::new("ctx-with-replay", OperationType::Context);
1412 ctx_op.status = OperationStatus::Succeeded;
1413 ctx_op.context_details = Some(ContextDetails {
1414 result: None,
1415 replay_children: Some(true),
1416 error: None,
1417 });
1418
1419 let initial_state = InitialExecutionState::with_operations(vec![ctx_op]);
1420 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1421
1422 assert!(state.has_replay_children("ctx-with-replay").await);
1423 }
1424
1425 #[tokio::test]
1426 async fn test_has_replay_children_false_when_not_set() {
1427 let client = create_mock_client();
1428
1429 let mut ctx_op = Operation::new("ctx-no-replay", OperationType::Context);
1430 ctx_op.status = OperationStatus::Succeeded;
1431 ctx_op.context_details = Some(ContextDetails {
1432 result: None,
1433 replay_children: None,
1434 error: None,
1435 });
1436
1437 let initial_state = InitialExecutionState::with_operations(vec![ctx_op]);
1438 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1439
1440 assert!(!state.has_replay_children("ctx-no-replay").await);
1441 }
1442
1443 #[tokio::test]
1446 async fn test_checkpointing_mode_default_is_batched() {
1447 let client = create_mock_client();
1448 let state = ExecutionState::new(
1449 "arn:test",
1450 "token-123",
1451 InitialExecutionState::new(),
1452 client,
1453 );
1454
1455 assert_eq!(
1456 state.checkpointing_mode(),
1457 crate::config::CheckpointingMode::Batched
1458 );
1459 assert!(state.is_batched_checkpointing());
1460 assert!(!state.is_eager_checkpointing());
1461 assert!(!state.is_optimistic_checkpointing());
1462 }
1463
1464 #[tokio::test]
1465 async fn test_checkpointing_mode_eager() {
1466 let client = create_mock_client();
1467 let state = ExecutionState::with_checkpointing_mode(
1468 "arn:test",
1469 "token-123",
1470 InitialExecutionState::new(),
1471 client,
1472 crate::config::CheckpointingMode::Eager,
1473 );
1474
1475 assert_eq!(
1476 state.checkpointing_mode(),
1477 crate::config::CheckpointingMode::Eager
1478 );
1479 assert!(state.is_eager_checkpointing());
1480 assert!(!state.is_batched_checkpointing());
1481 assert!(!state.is_optimistic_checkpointing());
1482 assert!(state.prioritizes_durability());
1483 assert!(!state.prioritizes_performance());
1484 assert!(!state.should_use_async_checkpoint());
1485 }
1486
1487 #[tokio::test]
1488 async fn test_checkpointing_mode_optimistic() {
1489 let client = create_mock_client();
1490 let state = ExecutionState::with_checkpointing_mode(
1491 "arn:test",
1492 "token-123",
1493 InitialExecutionState::new(),
1494 client,
1495 crate::config::CheckpointingMode::Optimistic,
1496 );
1497
1498 assert_eq!(
1499 state.checkpointing_mode(),
1500 crate::config::CheckpointingMode::Optimistic
1501 );
1502 assert!(state.is_optimistic_checkpointing());
1503 assert!(!state.is_eager_checkpointing());
1504 assert!(!state.is_batched_checkpointing());
1505 assert!(state.prioritizes_performance());
1506 assert!(!state.prioritizes_durability());
1507 assert!(state.should_use_async_checkpoint());
1508 }
1509
1510 #[tokio::test]
1511 async fn test_checkpointing_mode_batched_helpers() {
1512 let client = create_mock_client();
1513 let state = ExecutionState::with_checkpointing_mode(
1514 "arn:test",
1515 "token-123",
1516 InitialExecutionState::new(),
1517 client,
1518 crate::config::CheckpointingMode::Batched,
1519 );
1520
1521 assert!(!state.prioritizes_durability());
1522 assert!(!state.prioritizes_performance());
1523 assert!(state.should_use_async_checkpoint());
1524 }
1525
1526 #[tokio::test]
1527 async fn test_checkpoint_optimal_eager_mode() {
1528 let client = Arc::new(
1529 MockDurableServiceClient::new()
1530 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1531 );
1532
1533 let state = ExecutionState::with_checkpointing_mode(
1534 "arn:test",
1535 "token-123",
1536 InitialExecutionState::new(),
1537 client,
1538 crate::config::CheckpointingMode::Eager,
1539 );
1540
1541 let update = OperationUpdate::start("op-1", OperationType::Step);
1542 let result = state.checkpoint_optimal(update, false).await;
1543
1544 assert!(result.is_ok());
1545 }
1546
1547 #[tokio::test]
1548 async fn test_eager_mode_bypasses_batcher() {
1549 let client = Arc::new(
1550 MockDurableServiceClient::new()
1551 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1552 );
1553
1554 let (state, mut batcher) = ExecutionState::with_batcher_and_mode(
1555 "arn:test",
1556 "token-123",
1557 InitialExecutionState::new(),
1558 client,
1559 CheckpointBatcherConfig {
1560 max_batch_time_ms: 10,
1561 ..Default::default()
1562 },
1563 100,
1564 crate::config::CheckpointingMode::Eager,
1565 );
1566
1567 let batcher_handle = tokio::spawn(async move {
1568 batcher.run().await;
1569 });
1570
1571 let update = OperationUpdate::start("op-1", OperationType::Step);
1572 let result = state.create_checkpoint(update, false).await;
1573
1574 drop(state);
1575 batcher_handle.await.unwrap();
1576
1577 assert!(result.is_ok());
1578 }
1579
1580 #[tokio::test]
1583 async fn test_execution_operation_recognized() {
1584 let client = create_mock_client();
1585 let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1586 let step_op = Operation::new("step-1", OperationType::Step);
1587
1588 let initial_state = InitialExecutionState::with_operations(vec![exec_op, step_op]);
1589 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1590
1591 assert!(state.execution_operation().is_some());
1592 let exec = state.execution_operation().unwrap();
1593 assert_eq!(exec.operation_type, OperationType::Execution);
1594 assert_eq!(exec.operation_id, "exec-123");
1595 }
1596
1597 #[tokio::test]
1598 async fn test_execution_operation_not_present() {
1599 let client = create_mock_client();
1600 let step_op = Operation::new("step-1", OperationType::Step);
1601
1602 let initial_state = InitialExecutionState::with_operations(vec![step_op]);
1603 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1604
1605 assert!(state.execution_operation().is_none());
1606 }
1607
1608 #[tokio::test]
1609 async fn test_get_original_input_raw() {
1610 let client = create_mock_client();
1611 let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1612
1613 let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1614 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1615
1616 let input = state.get_original_input_raw();
1617 assert!(input.is_some());
1618 assert_eq!(input.unwrap(), r#"{"order_id": "123"}"#);
1619 }
1620
1621 #[tokio::test]
1622 async fn test_get_original_input_raw_no_payload() {
1623 let client = create_mock_client();
1624 let exec_op = create_execution_operation(None);
1625
1626 let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1627 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1628
1629 let input = state.get_original_input_raw();
1630 assert!(input.is_none());
1631 }
1632
1633 #[tokio::test]
1634 async fn test_get_original_input_raw_no_execution_operation() {
1635 let client = create_mock_client();
1636 let step_op = Operation::new("step-1", OperationType::Step);
1637
1638 let initial_state = InitialExecutionState::with_operations(vec![step_op]);
1639 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1640
1641 let input = state.get_original_input_raw();
1642 assert!(input.is_none());
1643 }
1644
1645 #[tokio::test]
1646 async fn test_execution_operation_id() {
1647 let client = create_mock_client();
1648 let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1649
1650 let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1651 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1652
1653 let id = state.execution_operation_id();
1654 assert!(id.is_some());
1655 assert_eq!(id.unwrap(), "exec-123");
1656 }
1657
1658 #[tokio::test]
1659 async fn test_complete_execution_success() {
1660 let client = Arc::new(
1661 MockDurableServiceClient::new()
1662 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1663 );
1664
1665 let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1666 let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1667 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1668
1669 let result = state
1670 .complete_execution_success(Some(r#"{"status": "completed"}"#.to_string()))
1671 .await;
1672 assert!(result.is_ok());
1673
1674 let op = state.get_operation("exec-123").await.unwrap();
1675 assert_eq!(op.status, OperationStatus::Succeeded);
1676 assert_eq!(op.result, Some(r#"{"status": "completed"}"#.to_string()));
1677 }
1678
1679 #[tokio::test]
1680 async fn test_complete_execution_success_no_execution_operation() {
1681 let client = create_mock_client();
1682 let step_op = Operation::new("step-1", OperationType::Step);
1683
1684 let initial_state = InitialExecutionState::with_operations(vec![step_op]);
1685 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1686
1687 let result = state
1688 .complete_execution_success(Some("result".to_string()))
1689 .await;
1690 assert!(result.is_err());
1691 match result.unwrap_err() {
1692 crate::error::DurableError::Validation { message } => {
1693 assert!(message.contains("no EXECUTION operation"));
1694 }
1695 _ => panic!("Expected Validation error"),
1696 }
1697 }
1698
1699 #[tokio::test]
1700 async fn test_complete_execution_failure() {
1701 let client = Arc::new(
1702 MockDurableServiceClient::new()
1703 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1704 );
1705
1706 let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1707 let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1708 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1709
1710 let error = ErrorObject::new("ProcessingError", "Order processing failed");
1711 let result = state.complete_execution_failure(error).await;
1712 assert!(result.is_ok());
1713
1714 let op = state.get_operation("exec-123").await.unwrap();
1715 assert_eq!(op.status, OperationStatus::Failed);
1716 assert!(op.error.is_some());
1717 assert_eq!(op.error.as_ref().unwrap().error_type, "ProcessingError");
1718 }
1719
1720 #[tokio::test]
1721 async fn test_complete_execution_failure_no_execution_operation() {
1722 let client = create_mock_client();
1723 let step_op = Operation::new("step-1", OperationType::Step);
1724
1725 let initial_state = InitialExecutionState::with_operations(vec![step_op]);
1726 let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1727
1728 let error = ErrorObject::new("TestError", "Test message");
1729 let result = state.complete_execution_failure(error).await;
1730 assert!(result.is_err());
1731 match result.unwrap_err() {
1732 crate::error::DurableError::Validation { message } => {
1733 assert!(message.contains("no EXECUTION operation"));
1734 }
1735 _ => panic!("Expected Validation error"),
1736 }
1737 }
1738
1739 #[tokio::test]
1740 async fn test_with_batcher_recognizes_execution_operation() {
1741 let client = Arc::new(MockDurableServiceClient::new());
1742 let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1743
1744 let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1745 let (state, _batcher) = ExecutionState::with_batcher(
1746 "arn:test",
1747 "token-123",
1748 initial_state,
1749 client,
1750 CheckpointBatcherConfig::default(),
1751 100,
1752 );
1753
1754 assert!(state.execution_operation().is_some());
1755 let exec = state.execution_operation().unwrap();
1756 assert_eq!(exec.operation_type, OperationType::Execution);
1757 assert_eq!(
1758 state.get_original_input_raw(),
1759 Some(r#"{"order_id": "123"}"#)
1760 );
1761 }
1762
1763 mod property_tests {
1766 use super::*;
1767 use proptest::prelude::*;
1768
1769 proptest! {
1770 #![proptest_config(ProptestConfig::with_cases(100))]
1771
1772 #[test]
1773 fn prop_orphaned_child_checkpoint_fails(
1774 parent_id in "[a-z]{5,10}",
1775 child_id in "[a-z]{5,10}",
1776 ) {
1777 let rt = tokio::runtime::Runtime::new().unwrap();
1778 let result: Result<(), TestCaseError> = rt.block_on(async {
1779 let client = Arc::new(
1780 MockDurableServiceClient::new()
1781 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token")))
1782 );
1783
1784 let state = ExecutionState::new(
1785 "arn:test",
1786 "token-123",
1787 InitialExecutionState::new(),
1788 client,
1789 );
1790
1791 state.mark_parent_done(&parent_id).await;
1792
1793 let update = OperationUpdate::start(&child_id, OperationType::Step)
1794 .with_parent_id(&parent_id);
1795 let checkpoint_result = state.create_checkpoint(update, true).await;
1796
1797 match checkpoint_result {
1798 Err(DurableError::OrphanedChild { operation_id, .. }) => {
1799 if operation_id != child_id {
1800 return Err(TestCaseError::fail(format!(
1801 "Expected operation_id '{}' in OrphanedChild error, got '{}'",
1802 child_id, operation_id
1803 )));
1804 }
1805 }
1806 Ok(_) => {
1807 return Err(TestCaseError::fail(
1808 "Expected OrphanedChild error, but checkpoint succeeded"
1809 ));
1810 }
1811 Err(other) => {
1812 return Err(TestCaseError::fail(format!(
1813 "Expected OrphanedChild error, got {:?}",
1814 other
1815 )));
1816 }
1817 }
1818
1819 Ok(())
1820 });
1821 result?;
1822 }
1823
1824 #[test]
1825 fn prop_non_orphaned_child_checkpoint_succeeds(
1826 parent_id in "[a-z]{5,10}",
1827 child_id in "[a-z]{5,10}",
1828 ) {
1829 let rt = tokio::runtime::Runtime::new().unwrap();
1830 let result: Result<(), TestCaseError> = rt.block_on(async {
1831 let client = Arc::new(
1832 MockDurableServiceClient::new()
1833 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token")))
1834 );
1835
1836 let state = ExecutionState::new(
1837 "arn:test",
1838 "token-123",
1839 InitialExecutionState::new(),
1840 client,
1841 );
1842
1843 let update = OperationUpdate::start(&child_id, OperationType::Step)
1844 .with_parent_id(&parent_id);
1845 let checkpoint_result = state.create_checkpoint(update, true).await;
1846
1847 if let Err(e) = checkpoint_result {
1848 return Err(TestCaseError::fail(format!(
1849 "Expected checkpoint to succeed for non-orphaned child, got error: {:?}",
1850 e
1851 )));
1852 }
1853
1854 Ok(())
1855 });
1856 result?;
1857 }
1858
1859 #[test]
1860 fn prop_root_operation_never_orphaned(
1861 operation_id in "[a-z]{5,10}",
1862 ) {
1863 let rt = tokio::runtime::Runtime::new().unwrap();
1864 let result: Result<(), TestCaseError> = rt.block_on(async {
1865 let client = Arc::new(
1866 MockDurableServiceClient::new()
1867 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token")))
1868 );
1869
1870 let state = ExecutionState::new(
1871 "arn:test",
1872 "token-123",
1873 InitialExecutionState::new(),
1874 client,
1875 );
1876
1877 let update = OperationUpdate::start(&operation_id, OperationType::Step);
1878 let checkpoint_result = state.create_checkpoint(update, true).await;
1879
1880 if let Err(e) = checkpoint_result {
1881 return Err(TestCaseError::fail(format!(
1882 "Expected checkpoint to succeed for root operation, got error: {:?}",
1883 e
1884 )));
1885 }
1886
1887 Ok(())
1888 });
1889 result?;
1890 }
1891
1892 #[test]
1893 fn prop_marking_parent_done_affects_future_checkpoints(
1894 parent_id in "[a-z]{5,10}",
1895 child_id_before in "[a-z]{5,10}",
1896 child_id_after in "[a-z]{5,10}",
1897 ) {
1898 let rt = tokio::runtime::Runtime::new().unwrap();
1899 let result: Result<(), TestCaseError> = rt.block_on(async {
1900 let client = Arc::new(
1901 MockDurableServiceClient::new()
1902 .with_checkpoint_response(Ok(CheckpointResponse::new("token-1")))
1903 .with_checkpoint_response(Ok(CheckpointResponse::new("token-2")))
1904 );
1905
1906 let state = ExecutionState::new(
1907 "arn:test",
1908 "token-123",
1909 InitialExecutionState::new(),
1910 client,
1911 );
1912
1913 let update_before = OperationUpdate::start(&child_id_before, OperationType::Step)
1914 .with_parent_id(&parent_id);
1915 let result_before = state.create_checkpoint(update_before, true).await;
1916
1917 if let Err(e) = result_before {
1918 return Err(TestCaseError::fail(format!(
1919 "Expected first checkpoint to succeed, got error: {:?}",
1920 e
1921 )));
1922 }
1923
1924 state.mark_parent_done(&parent_id).await;
1925
1926 let update_after = OperationUpdate::start(&child_id_after, OperationType::Step)
1927 .with_parent_id(&parent_id);
1928 let result_after = state.create_checkpoint(update_after, true).await;
1929
1930 match result_after {
1931 Err(DurableError::OrphanedChild { .. }) => {
1932 }
1934 Ok(_) => {
1935 return Err(TestCaseError::fail(
1936 "Expected OrphanedChild error after marking parent done"
1937 ));
1938 }
1939 Err(other) => {
1940 return Err(TestCaseError::fail(format!(
1941 "Expected OrphanedChild error, got {:?}",
1942 other
1943 )));
1944 }
1945 }
1946
1947 Ok(())
1948 });
1949 result?;
1950 }
1951 }
1952 }
1953}