Skip to main content

durable_execution_sdk/state/
batcher.rs

1//! Checkpoint batching for efficient API calls.
2//!
3//! This module provides the checkpoint batching system that collects checkpoint
4//! requests and sends them in batches to reduce API calls to the Lambda service.
5//!
6//! ## Checkpoint Token Management
7//!
8//! The batcher manages checkpoint tokens according to the following rules:
9//!
10//! 1. **First Checkpoint**: Uses the initial `CheckpointToken` from the Lambda invocation input
11//! 2. **Subsequent Checkpoints**: Uses the token returned from the previous checkpoint response
12//! 3. **Token Consumption**: Each token can only be used once; the batcher automatically
13//!    updates to the new token after each successful checkpoint
14//! 4. **Error Handling**: If a checkpoint fails with "Invalid checkpoint token", the error
15//!    is marked as retriable so Lambda can retry with a fresh token
16//!
17//! ## Requirements
18//!
19//! - 2.9: THE Checkpointing_System SHALL use the CheckpointToken from invocation input for the first checkpoint
20//! - 2.10: THE Checkpointing_System SHALL use the returned CheckpointToken from each checkpoint response for subsequent checkpoints
21//! - 2.11: THE Checkpointing_System SHALL handle InvalidParameterValueException for invalid tokens by allowing propagation for retry
22//! - 2.12: WHEN batching operations, THE Checkpointing_System SHALL checkpoint in execution order with EXECUTION completion last
23//! - 2.13: THE Checkpointing_System SHALL support including both START and completion actions for STEP/CONTEXT in the same batch
24
25use std::collections::{HashMap, HashSet};
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::Arc;
28use std::time::Duration as StdDuration;
29
30use tokio::sync::{mpsc, oneshot, RwLock};
31use tokio::time::{timeout, Instant};
32
33use crate::client::SharedDurableServiceClient;
34use crate::error::DurableError;
35use crate::operation::{OperationAction, OperationType, OperationUpdate};
36
37/// Configuration for the checkpoint batcher.
38#[derive(Debug, Clone)]
39pub struct CheckpointBatcherConfig {
40    /// Maximum size of a batch in bytes (default: 750KB)
41    pub max_batch_size_bytes: usize,
42    /// Maximum time to wait before sending a batch (default: 1 second)
43    pub max_batch_time_ms: u64,
44    /// Maximum number of operations per batch (default: unlimited/usize::MAX)
45    pub max_batch_operations: usize,
46}
47
48impl Default for CheckpointBatcherConfig {
49    fn default() -> Self {
50        Self {
51            max_batch_size_bytes: 750 * 1024, // 750KB
52            max_batch_time_ms: 1000,          // 1 second
53            max_batch_operations: usize::MAX, // unlimited
54        }
55    }
56}
57
58/// A request to checkpoint an operation.
59///
60/// This struct is sent through the checkpoint queue to the batcher.
61/// It includes an optional completion channel for synchronous checkpoints.
62#[derive(Debug)]
63pub struct CheckpointRequest {
64    /// The operation update to checkpoint
65    pub operation: OperationUpdate,
66    /// Optional channel to signal completion (for sync checkpoints)
67    pub completion: Option<oneshot::Sender<Result<(), DurableError>>>,
68}
69
70impl CheckpointRequest {
71    /// Creates a new synchronous checkpoint request.
72    ///
73    /// Returns the request and a receiver to wait for completion.
74    pub fn sync(operation: OperationUpdate) -> (Self, oneshot::Receiver<Result<(), DurableError>>) {
75        let (tx, rx) = oneshot::channel();
76        (
77            Self {
78                operation,
79                completion: Some(tx),
80            },
81            rx,
82        )
83    }
84
85    /// Creates a new asynchronous (fire-and-forget) checkpoint request.
86    pub fn async_request(operation: OperationUpdate) -> Self {
87        Self {
88            operation,
89            completion: None,
90        }
91    }
92
93    /// Returns true if this is a synchronous request.
94    pub fn is_sync(&self) -> bool {
95        self.completion.is_some()
96    }
97
98    /// Estimates the size of this request in bytes for batching.
99    pub fn estimated_size(&self) -> usize {
100        // Estimate based on JSON serialization
101        // operation_id + operation_type + action + result + error + parent_id + name
102        let base_size = 100; // Base overhead for JSON structure
103        let op = &self.operation;
104
105        base_size
106            + op.operation_id.len()
107            + op.result.as_ref().map(|r| r.len()).unwrap_or(0)
108            + op.error
109                .as_ref()
110                .map(|e| e.error_message.len() + e.error_type.len())
111                .unwrap_or(0)
112            + op.parent_id.as_ref().map(|p| p.len()).unwrap_or(0)
113            + op.name.as_ref().map(|n| n.len()).unwrap_or(0)
114    }
115}
116
117/// Result of processing a batch of checkpoints.
118#[derive(Debug)]
119pub struct BatchResult {
120    /// Whether the batch was successfully sent
121    pub success: bool,
122    /// The new checkpoint token if successful
123    pub new_token: Option<String>,
124    /// Error if the batch failed
125    pub error: Option<DurableError>,
126}
127
128/// The checkpoint batcher collects checkpoint requests and sends them in batches.
129///
130/// This improves efficiency by reducing the number of API calls to the Lambda service.
131/// The batcher sends a batch when any of these conditions are met:
132/// - The batch reaches the maximum size in bytes
133/// - The batch reaches the maximum number of operations
134/// - The maximum batch time has elapsed
135pub struct CheckpointBatcher {
136    /// Configuration for batching behavior
137    config: CheckpointBatcherConfig,
138    /// Receiver for checkpoint requests
139    queue_rx: mpsc::Receiver<CheckpointRequest>,
140    /// Service client for sending checkpoints
141    service_client: SharedDurableServiceClient,
142    /// Reference to the execution state for updating tokens
143    durable_execution_arn: String,
144    /// Current checkpoint token (shared with ExecutionState)
145    checkpoint_token: Arc<RwLock<String>>,
146    /// Tracks whether the initial token has been consumed
147    initial_token_consumed: AtomicBool,
148}
149
150impl CheckpointBatcher {
151    /// Creates a new CheckpointBatcher.
152    ///
153    /// # Arguments
154    ///
155    /// * `config` - Configuration for batching behavior
156    /// * `queue_rx` - Receiver for checkpoint requests
157    /// * `service_client` - Service client for sending checkpoints
158    /// * `durable_execution_arn` - The ARN of the durable execution
159    /// * `checkpoint_token` - The initial checkpoint token from Lambda invocation input
160    ///
161    /// # Requirements
162    ///
163    /// - 2.9: THE Checkpointing_System SHALL use the CheckpointToken from invocation input for the first checkpoint
164    pub fn new(
165        config: CheckpointBatcherConfig,
166        queue_rx: mpsc::Receiver<CheckpointRequest>,
167        service_client: SharedDurableServiceClient,
168        durable_execution_arn: String,
169        checkpoint_token: Arc<RwLock<String>>,
170    ) -> Self {
171        Self {
172            config,
173            queue_rx,
174            service_client,
175            durable_execution_arn,
176            checkpoint_token,
177            initial_token_consumed: AtomicBool::new(false),
178        }
179    }
180
181    /// Runs the batcher loop, processing checkpoint requests.
182    ///
183    /// This method runs until the queue is closed (sender dropped).
184    pub async fn run(&mut self) {
185        loop {
186            let batch = self.collect_batch().await;
187            if batch.is_empty() {
188                // Queue closed, exit
189                break;
190            }
191            self.process_batch(batch).await;
192        }
193    }
194
195    /// Collects a batch of checkpoint requests.
196    ///
197    /// Returns when any batch limit is reached or the queue is closed.
198    async fn collect_batch(&mut self) -> Vec<CheckpointRequest> {
199        let mut batch = Vec::new();
200        let mut batch_size = 0usize;
201        let batch_deadline =
202            Instant::now() + StdDuration::from_millis(self.config.max_batch_time_ms);
203
204        // Wait for the first request (blocking)
205        match self.queue_rx.recv().await {
206            Some(request) => {
207                batch_size += request.estimated_size();
208                batch.push(request);
209            }
210            None => return batch, // Queue closed
211        }
212
213        // Collect more requests until limits are reached
214        loop {
215            // Check if we've hit operation count limit
216            if batch.len() >= self.config.max_batch_operations {
217                break;
218            }
219
220            // Calculate remaining time until deadline
221            let now = Instant::now();
222            if now >= batch_deadline {
223                break;
224            }
225            let remaining = batch_deadline - now;
226
227            // Try to receive more requests with timeout
228            match timeout(remaining, self.queue_rx.recv()).await {
229                Ok(Some(request)) => {
230                    let request_size = request.estimated_size();
231
232                    // Check if adding this request would exceed size limit
233                    if batch_size + request_size > self.config.max_batch_size_bytes
234                        && !batch.is_empty()
235                    {
236                        // Include this request and break - next batch will handle any overflow
237                        batch.push(request);
238                        break;
239                    }
240
241                    batch_size += request_size;
242                    batch.push(request);
243                }
244                Ok(None) => break, // Queue closed
245                Err(_) => break,   // Timeout reached
246            }
247        }
248
249        batch
250    }
251
252    /// Sorts checkpoint requests according to the ordering rules.
253    ///
254    /// The ordering rules are:
255    /// 1. Operations are checkpointed in execution order (preserving original order)
256    /// 2. EXECUTION completion (SUCCEED/FAIL on EXECUTION type) must be last in the batch
257    /// 3. Child operations must come after their parent CONTEXT starts
258    /// 4. START and completion (SUCCEED/FAIL) for the same operation can be in the same batch
259    ///
260    /// # Requirements
261    ///
262    /// - 2.12: WHEN batching operations, THE Checkpointing_System SHALL checkpoint in execution order with EXECUTION completion last
263    /// - 2.13: THE Checkpointing_System SHALL support including both START and completion actions for STEP/CONTEXT in the same batch
264    pub fn sort_checkpoint_batch(batch: Vec<CheckpointRequest>) -> Vec<CheckpointRequest> {
265        if batch.len() <= 1 {
266            return batch;
267        }
268
269        // Step 1: Identify CONTEXT START operations and build parent-child relationships
270        let context_starts: HashSet<String> = batch
271            .iter()
272            .filter(|req| {
273                req.operation.operation_type == OperationType::Context
274                    && req.operation.action == OperationAction::Start
275            })
276            .map(|req| req.operation.operation_id.clone())
277            .collect();
278
279        // Build a map of operation_id -> parent_id for operations in this batch
280        let parent_map: HashMap<String, Option<String>> = batch
281            .iter()
282            .map(|req| {
283                (
284                    req.operation.operation_id.clone(),
285                    req.operation.parent_id.clone(),
286                )
287            })
288            .collect();
289
290        // Helper function to check if an operation is an ancestor of another
291        let is_ancestor = |operation_id: &str, ancestor_id: &str| -> bool {
292            let mut current = parent_map.get(operation_id).and_then(|p| p.as_ref());
293            while let Some(parent_id) = current {
294                if parent_id == ancestor_id {
295                    return true;
296                }
297                current = parent_map.get(parent_id).and_then(|p| p.as_ref());
298            }
299            false
300        };
301
302        // Step 2: Sort with custom comparator
303        let mut indexed_batch: Vec<(usize, CheckpointRequest)> =
304            batch.into_iter().enumerate().collect();
305
306        indexed_batch.sort_by(|(idx_a, a), (idx_b, b)| {
307            // Priority 1: EXECUTION completion must be last
308            let a_is_exec_completion = a.operation.operation_type == OperationType::Execution
309                && matches!(
310                    a.operation.action,
311                    OperationAction::Succeed | OperationAction::Fail
312                );
313            let b_is_exec_completion = b.operation.operation_type == OperationType::Execution
314                && matches!(
315                    b.operation.action,
316                    OperationAction::Succeed | OperationAction::Fail
317                );
318
319            if a_is_exec_completion && !b_is_exec_completion {
320                return std::cmp::Ordering::Greater;
321            }
322            if !a_is_exec_completion && b_is_exec_completion {
323                return std::cmp::Ordering::Less;
324            }
325
326            // Priority 2: Parent CONTEXT START must come before child operations
327            if b.operation.action == OperationAction::Start
328                && context_starts.contains(&b.operation.operation_id)
329            {
330                if let Some(ref parent_id) = a.operation.parent_id {
331                    if *parent_id == b.operation.operation_id
332                        || is_ancestor(&a.operation.operation_id, &b.operation.operation_id)
333                    {
334                        return std::cmp::Ordering::Greater;
335                    }
336                }
337            }
338            if a.operation.action == OperationAction::Start
339                && context_starts.contains(&a.operation.operation_id)
340            {
341                if let Some(ref parent_id) = b.operation.parent_id {
342                    if *parent_id == a.operation.operation_id
343                        || is_ancestor(&b.operation.operation_id, &a.operation.operation_id)
344                    {
345                        return std::cmp::Ordering::Less;
346                    }
347                }
348            }
349
350            // Priority 3: For same operation_id, START comes before completion
351            if a.operation.operation_id == b.operation.operation_id {
352                let a_is_start = a.operation.action == OperationAction::Start;
353                let b_is_start = b.operation.action == OperationAction::Start;
354                if a_is_start && !b_is_start {
355                    return std::cmp::Ordering::Less;
356                }
357                if !a_is_start && b_is_start {
358                    return std::cmp::Ordering::Greater;
359                }
360            }
361
362            // Priority 4: Preserve original order (stable sort)
363            idx_a.cmp(idx_b)
364        });
365
366        // Extract the sorted requests
367        indexed_batch.into_iter().map(|(_, req)| req).collect()
368    }
369
370    /// Processes a batch of checkpoint requests.
371    ///
372    /// # Requirements
373    ///
374    /// - 2.9: THE Checkpointing_System SHALL use the CheckpointToken from invocation input for the first checkpoint
375    /// - 2.10: THE Checkpointing_System SHALL use the returned CheckpointToken from each checkpoint response for subsequent checkpoints
376    /// - 2.11: THE Checkpointing_System SHALL handle InvalidParameterValueException for invalid tokens by allowing propagation for retry
377    /// - 2.12: WHEN batching operations, THE Checkpointing_System SHALL checkpoint in execution order with EXECUTION completion last
378    /// - 2.13: THE Checkpointing_System SHALL support including both START and completion actions for STEP/CONTEXT in the same batch
379    /// - 18.5: THE AWS_Integration SHALL handle ThrottlingException with appropriate retry behavior
380    async fn process_batch(&self, batch: Vec<CheckpointRequest>) {
381        if batch.is_empty() {
382            return;
383        }
384
385        // Sort the batch according to checkpoint ordering rules
386        let sorted_batch = Self::sort_checkpoint_batch(batch);
387
388        // Extract operations and completion channels
389        let (operations, completions): (Vec<_>, Vec<_>) = sorted_batch
390            .into_iter()
391            .map(|req| (req.operation, req.completion))
392            .unzip();
393
394        // Get current checkpoint token
395        let token = self.checkpoint_token.read().await.clone();
396
397        // Send the batch to the service with retry for throttling
398        let result = self.checkpoint_with_retry(&token, operations).await;
399
400        // Handle the result
401        match result {
402            Ok(response) => {
403                // Mark that we've consumed the initial token.
404                // Release ordering ensures that the checkpoint_token write (via RwLock)
405                // is visible to any thread that subsequently reads initial_token_consumed.
406                // This is a one-way transition (false -> true) that never reverts.
407                //
408                // Requirements: 4.4, 4.6
409                self.initial_token_consumed.store(true, Ordering::Release);
410
411                // Update the checkpoint token with the new token from the response
412                {
413                    let mut token_guard = self.checkpoint_token.write().await;
414                    *token_guard = response.checkpoint_token;
415                }
416
417                // Signal success to all waiting callers
418                for completion in completions.into_iter().flatten() {
419                    let _ = completion.send(Ok(()));
420                }
421            }
422            Err(error) => {
423                // Check if this is an invalid checkpoint token error
424                let is_invalid_token = error.is_invalid_checkpoint_token();
425
426                // Signal failure to all waiting callers
427                for completion in completions.into_iter().flatten() {
428                    let error_msg = if is_invalid_token {
429                        format!(
430                            "Invalid checkpoint token - token may have been consumed. \
431                             Lambda will retry with a fresh token. Original error: {}",
432                            error
433                        )
434                    } else {
435                        error.to_string()
436                    };
437
438                    let _ = completion.send(Err(DurableError::Checkpoint {
439                        message: error_msg,
440                        is_retriable: error.is_retriable(),
441                        aws_error: None,
442                    }));
443                }
444            }
445        }
446    }
447
448    /// Sends a checkpoint request with retry for throttling errors.
449    ///
450    /// # Requirements
451    ///
452    /// - 18.5: THE AWS_Integration SHALL handle ThrottlingException with appropriate retry behavior
453    async fn checkpoint_with_retry(
454        &self,
455        token: &str,
456        operations: Vec<OperationUpdate>,
457    ) -> Result<crate::client::CheckpointResponse, DurableError> {
458        const MAX_RETRIES: u32 = 5;
459        const INITIAL_DELAY_MS: u64 = 100;
460        const MAX_DELAY_MS: u64 = 10_000;
461        const BACKOFF_MULTIPLIER: u64 = 2;
462
463        let mut attempt = 0;
464        let mut delay_ms = INITIAL_DELAY_MS;
465
466        loop {
467            let result = self
468                .service_client
469                .checkpoint(&self.durable_execution_arn, token, operations.clone())
470                .await;
471
472            match result {
473                Ok(response) => return Ok(response),
474                Err(error) if error.is_throttling() => {
475                    attempt += 1;
476                    if attempt > MAX_RETRIES {
477                        tracing::warn!(
478                            attempt = attempt,
479                            "Checkpoint throttling: max retries exceeded"
480                        );
481                        return Err(error);
482                    }
483
484                    let actual_delay = error.get_retry_after_ms().unwrap_or(delay_ms);
485
486                    tracing::debug!(
487                        attempt = attempt,
488                        delay_ms = actual_delay,
489                        "Checkpoint throttled, retrying with exponential backoff"
490                    );
491
492                    tokio::time::sleep(StdDuration::from_millis(actual_delay)).await;
493                    delay_ms = (delay_ms * BACKOFF_MULTIPLIER).min(MAX_DELAY_MS);
494                }
495                Err(error) => return Err(error),
496            }
497        }
498    }
499}
500
501/// Handle for sending checkpoint requests to the batcher.
502#[derive(Clone)]
503pub struct CheckpointSender {
504    /// Channel for sending requests to the batcher
505    pub tx: mpsc::Sender<CheckpointRequest>,
506}
507
508impl CheckpointSender {
509    /// Creates a new CheckpointSender.
510    pub fn new(tx: mpsc::Sender<CheckpointRequest>) -> Self {
511        Self { tx }
512    }
513
514    /// Sends a synchronous checkpoint request and waits for completion.
515    ///
516    /// This method blocks until the checkpoint is confirmed or fails.
517    pub async fn checkpoint_sync(&self, operation: OperationUpdate) -> Result<(), DurableError> {
518        let (request, rx) = CheckpointRequest::sync(operation);
519
520        self.tx
521            .send(request)
522            .await
523            .map_err(|_| DurableError::Checkpoint {
524                message: "Checkpoint queue closed".to_string(),
525                is_retriable: false,
526                aws_error: None,
527            })?;
528
529        rx.await.map_err(|_| DurableError::Checkpoint {
530            message: "Checkpoint completion channel closed".to_string(),
531            is_retriable: false,
532            aws_error: None,
533        })?
534    }
535
536    /// Sends an asynchronous checkpoint request (fire-and-forget).
537    ///
538    /// This method returns immediately without waiting for confirmation.
539    pub async fn checkpoint_async(&self, operation: OperationUpdate) -> Result<(), DurableError> {
540        let request = CheckpointRequest::async_request(operation);
541
542        self.tx
543            .send(request)
544            .await
545            .map_err(|_| DurableError::Checkpoint {
546                message: "Checkpoint queue closed".to_string(),
547                is_retriable: false,
548                aws_error: None,
549            })
550    }
551
552    /// Sends a checkpoint request with configurable sync/async behavior.
553    pub async fn checkpoint(
554        &self,
555        operation: OperationUpdate,
556        is_sync: bool,
557    ) -> Result<(), DurableError> {
558        if is_sync {
559            self.checkpoint_sync(operation).await
560        } else {
561            self.checkpoint_async(operation).await
562        }
563    }
564}
565
566/// Creates a checkpoint queue with the given buffer size.
567///
568/// Returns a sender for submitting checkpoint requests and a receiver
569/// for the batcher to process them.
570pub fn create_checkpoint_queue(
571    buffer_size: usize,
572) -> (CheckpointSender, mpsc::Receiver<CheckpointRequest>) {
573    let (tx, rx) = mpsc::channel(buffer_size);
574    (CheckpointSender::new(tx), rx)
575}
576
577#[cfg(test)]
578mod tests {
579    use super::*;
580    use crate::client::{CheckpointResponse, MockDurableServiceClient};
581    use crate::error::ErrorObject;
582
583    fn create_test_update(id: &str) -> OperationUpdate {
584        OperationUpdate::start(id, OperationType::Step)
585    }
586
587    fn create_start_update(id: &str, op_type: OperationType) -> OperationUpdate {
588        OperationUpdate::start(id, op_type)
589    }
590
591    fn create_succeed_update(id: &str, op_type: OperationType) -> OperationUpdate {
592        OperationUpdate::succeed(id, op_type, Some("result".to_string()))
593    }
594
595    fn create_fail_update(id: &str, op_type: OperationType) -> OperationUpdate {
596        OperationUpdate::fail(id, op_type, ErrorObject::new("Error", "message"))
597    }
598
599    fn create_request(update: OperationUpdate) -> CheckpointRequest {
600        CheckpointRequest::async_request(update)
601    }
602
603    // Checkpoint request tests
604    #[test]
605    fn test_checkpoint_request_sync() {
606        let update = create_test_update("op-1");
607        let (request, _rx) = CheckpointRequest::sync(update);
608
609        assert!(request.is_sync());
610        assert_eq!(request.operation.operation_id, "op-1");
611    }
612
613    #[test]
614    fn test_checkpoint_request_async() {
615        let update = create_test_update("op-1");
616        let request = CheckpointRequest::async_request(update);
617
618        assert!(!request.is_sync());
619        assert_eq!(request.operation.operation_id, "op-1");
620    }
621
622    #[test]
623    fn test_checkpoint_request_estimated_size() {
624        let update = create_test_update("op-1");
625        let request = CheckpointRequest::async_request(update);
626
627        let size = request.estimated_size();
628        assert!(size > 0);
629        assert!(size >= 100);
630    }
631
632    #[test]
633    fn test_checkpoint_request_estimated_size_with_result() {
634        let mut update = create_test_update("op-1");
635        update.result = Some("a".repeat(1000));
636        let request = CheckpointRequest::async_request(update);
637
638        let size = request.estimated_size();
639        assert!(size >= 1100);
640    }
641
642    #[test]
643    fn test_create_checkpoint_queue() {
644        let (sender, _rx) = create_checkpoint_queue(100);
645        drop(sender);
646    }
647
648    #[tokio::test]
649    async fn test_checkpoint_sender_sync() {
650        let (sender, mut rx) = create_checkpoint_queue(10);
651
652        let handle = tokio::spawn(async move {
653            if let Some(request) = rx.recv().await {
654                assert!(request.is_sync());
655                if let Some(completion) = request.completion {
656                    let _ = completion.send(Ok(()));
657                }
658            }
659        });
660
661        let update = create_test_update("op-1");
662        let result = sender.checkpoint_sync(update).await;
663        assert!(result.is_ok());
664
665        handle.await.unwrap();
666    }
667
668    #[tokio::test]
669    async fn test_checkpoint_sender_async() {
670        let (sender, mut rx) = create_checkpoint_queue(10);
671
672        let update = create_test_update("op-1");
673        let result = sender.checkpoint_async(update).await;
674        assert!(result.is_ok());
675
676        let request = rx.recv().await.unwrap();
677        assert!(!request.is_sync());
678        assert_eq!(request.operation.operation_id, "op-1");
679    }
680
681    #[tokio::test]
682    async fn test_checkpoint_sender_queue_closed() {
683        let (sender, rx) = create_checkpoint_queue(10);
684        drop(rx);
685
686        let update = create_test_update("op-1");
687        let result = sender.checkpoint_async(update).await;
688        assert!(result.is_err());
689    }
690
691    #[tokio::test]
692    async fn test_checkpoint_batcher_processes_batch() {
693        let client = Arc::new(
694            MockDurableServiceClient::new()
695                .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
696        );
697
698        let (sender, rx) = create_checkpoint_queue(10);
699        let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
700
701        let config = CheckpointBatcherConfig {
702            max_batch_time_ms: 10,
703            ..Default::default()
704        };
705
706        let mut batcher = CheckpointBatcher::new(
707            config,
708            rx,
709            client,
710            "arn:test".to_string(),
711            checkpoint_token.clone(),
712        );
713
714        let update = create_test_update("op-1");
715        let (request, completion_rx) = CheckpointRequest::sync(update);
716        sender.tx.send(request).await.unwrap();
717
718        drop(sender);
719        batcher.run().await;
720
721        let result = completion_rx.await.unwrap();
722        assert!(result.is_ok());
723
724        let token = checkpoint_token.read().await;
725        assert_eq!(*token, "new-token");
726    }
727
728    #[tokio::test]
729    async fn test_checkpoint_batcher_handles_error() {
730        let client = Arc::new(
731            MockDurableServiceClient::new()
732                .with_checkpoint_response(Err(DurableError::checkpoint_retriable("Test error"))),
733        );
734
735        let (sender, rx) = create_checkpoint_queue(10);
736        let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
737
738        let config = CheckpointBatcherConfig {
739            max_batch_time_ms: 10,
740            ..Default::default()
741        };
742
743        let mut batcher = CheckpointBatcher::new(
744            config,
745            rx,
746            client,
747            "arn:test".to_string(),
748            checkpoint_token.clone(),
749        );
750
751        let update = create_test_update("op-1");
752        let (request, completion_rx) = CheckpointRequest::sync(update);
753        sender.tx.send(request).await.unwrap();
754
755        drop(sender);
756        batcher.run().await;
757
758        let result = completion_rx.await.unwrap();
759        assert!(result.is_err());
760
761        let token = checkpoint_token.read().await;
762        assert_eq!(*token, "initial-token");
763    }
764
765    #[tokio::test]
766    async fn test_checkpoint_batcher_batches_multiple_requests() {
767        let client = Arc::new(
768            MockDurableServiceClient::new()
769                .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
770        );
771
772        let (sender, rx) = create_checkpoint_queue(10);
773        let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
774
775        let config = CheckpointBatcherConfig {
776            max_batch_time_ms: 50,
777            max_batch_operations: 3,
778            ..Default::default()
779        };
780
781        let mut batcher = CheckpointBatcher::new(
782            config,
783            rx,
784            client,
785            "arn:test".to_string(),
786            checkpoint_token.clone(),
787        );
788
789        for i in 0..3 {
790            let update = create_test_update(&format!("op-{}", i));
791            sender
792                .tx
793                .send(CheckpointRequest::async_request(update))
794                .await
795                .unwrap();
796        }
797
798        drop(sender);
799        batcher.run().await;
800
801        let token = checkpoint_token.read().await;
802        assert_eq!(*token, "new-token");
803    }
804
805    // Batch ordering tests
806    #[test]
807    fn test_execution_completion_is_last() {
808        let batch = vec![
809            create_request(create_succeed_update("exec-1", OperationType::Execution)),
810            create_request(create_start_update("step-1", OperationType::Step)),
811            create_request(create_succeed_update("step-2", OperationType::Step)),
812        ];
813
814        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
815
816        assert_eq!(sorted.len(), 3);
817        assert_eq!(sorted[2].operation.operation_id, "exec-1");
818        assert_eq!(sorted[2].operation.action, OperationAction::Succeed);
819        assert_eq!(sorted[2].operation.operation_type, OperationType::Execution);
820    }
821
822    #[test]
823    fn test_execution_fail_is_last() {
824        let batch = vec![
825            create_request(create_fail_update("exec-1", OperationType::Execution)),
826            create_request(create_start_update("step-1", OperationType::Step)),
827        ];
828
829        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
830
831        assert_eq!(sorted.len(), 2);
832        assert_eq!(sorted[1].operation.operation_id, "exec-1");
833        assert_eq!(sorted[1].operation.action, OperationAction::Fail);
834    }
835
836    #[test]
837    fn test_child_after_parent_context_start() {
838        let mut child_update = create_start_update("child-1", OperationType::Step);
839        child_update.parent_id = Some("parent-ctx".to_string());
840
841        let batch = vec![
842            create_request(child_update),
843            create_request(create_start_update("parent-ctx", OperationType::Context)),
844        ];
845
846        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
847
848        assert_eq!(sorted.len(), 2);
849        assert_eq!(sorted[0].operation.operation_id, "parent-ctx");
850        assert_eq!(sorted[0].operation.action, OperationAction::Start);
851        assert_eq!(sorted[1].operation.operation_id, "child-1");
852    }
853
854    #[test]
855    fn test_start_and_completion_same_operation() {
856        let batch = vec![
857            create_request(create_succeed_update("step-1", OperationType::Step)),
858            create_request(create_start_update("step-1", OperationType::Step)),
859        ];
860
861        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
862
863        assert_eq!(sorted.len(), 2);
864        assert_eq!(sorted[0].operation.operation_id, "step-1");
865        assert_eq!(sorted[0].operation.action, OperationAction::Start);
866        assert_eq!(sorted[1].operation.operation_id, "step-1");
867        assert_eq!(sorted[1].operation.action, OperationAction::Succeed);
868    }
869
870    #[test]
871    fn test_context_start_and_completion_same_batch() {
872        let batch = vec![
873            create_request(create_succeed_update("ctx-1", OperationType::Context)),
874            create_request(create_start_update("ctx-1", OperationType::Context)),
875        ];
876
877        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
878
879        assert_eq!(sorted.len(), 2);
880        assert_eq!(sorted[0].operation.operation_id, "ctx-1");
881        assert_eq!(sorted[0].operation.action, OperationAction::Start);
882        assert_eq!(sorted[1].operation.operation_id, "ctx-1");
883        assert_eq!(sorted[1].operation.action, OperationAction::Succeed);
884    }
885
886    #[test]
887    fn test_step_start_and_fail_same_batch() {
888        let batch = vec![
889            create_request(create_fail_update("step-1", OperationType::Step)),
890            create_request(create_start_update("step-1", OperationType::Step)),
891        ];
892
893        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
894
895        assert_eq!(sorted.len(), 2);
896        assert_eq!(sorted[0].operation.operation_id, "step-1");
897        assert_eq!(sorted[0].operation.action, OperationAction::Start);
898        assert_eq!(sorted[1].operation.operation_id, "step-1");
899        assert_eq!(sorted[1].operation.action, OperationAction::Fail);
900    }
901
902    #[test]
903    fn test_complex_ordering_scenario() {
904        let mut child1 = create_start_update("child-1", OperationType::Step);
905        child1.parent_id = Some("parent-ctx".to_string());
906
907        let mut child2 = create_succeed_update("child-2", OperationType::Step);
908        child2.parent_id = Some("parent-ctx".to_string());
909
910        let batch = vec![
911            create_request(create_succeed_update("exec-1", OperationType::Execution)),
912            create_request(child1),
913            create_request(create_succeed_update("parent-ctx", OperationType::Context)),
914            create_request(create_start_update("parent-ctx", OperationType::Context)),
915            create_request(child2),
916        ];
917
918        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
919
920        assert_eq!(sorted.len(), 5);
921
922        let parent_start_pos = sorted
923            .iter()
924            .position(|r| {
925                r.operation.operation_id == "parent-ctx"
926                    && r.operation.action == OperationAction::Start
927            })
928            .unwrap();
929        let parent_succeed_pos = sorted
930            .iter()
931            .position(|r| {
932                r.operation.operation_id == "parent-ctx"
933                    && r.operation.action == OperationAction::Succeed
934            })
935            .unwrap();
936        let child1_pos = sorted
937            .iter()
938            .position(|r| r.operation.operation_id == "child-1")
939            .unwrap();
940        let child2_pos = sorted
941            .iter()
942            .position(|r| r.operation.operation_id == "child-2")
943            .unwrap();
944        let exec_pos = sorted
945            .iter()
946            .position(|r| r.operation.operation_id == "exec-1")
947            .unwrap();
948
949        assert!(parent_start_pos < parent_succeed_pos);
950        assert!(parent_start_pos < child1_pos);
951        assert!(parent_start_pos < child2_pos);
952        assert_eq!(exec_pos, 4);
953    }
954
955    #[test]
956    fn test_empty_batch() {
957        let batch: Vec<CheckpointRequest> = vec![];
958        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
959        assert!(sorted.is_empty());
960    }
961
962    #[test]
963    fn test_single_item_batch() {
964        let batch = vec![create_request(create_start_update(
965            "step-1",
966            OperationType::Step,
967        ))];
968        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
969        assert_eq!(sorted.len(), 1);
970        assert_eq!(sorted[0].operation.operation_id, "step-1");
971    }
972
973    #[test]
974    fn test_preserves_original_order() {
975        let batch = vec![
976            create_request(create_start_update("step-1", OperationType::Step)),
977            create_request(create_start_update("step-2", OperationType::Step)),
978            create_request(create_start_update("step-3", OperationType::Step)),
979        ];
980
981        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
982
983        assert_eq!(sorted.len(), 3);
984        assert_eq!(sorted[0].operation.operation_id, "step-1");
985        assert_eq!(sorted[1].operation.operation_id, "step-2");
986        assert_eq!(sorted[2].operation.operation_id, "step-3");
987    }
988
989    #[test]
990    fn test_multiple_children_same_parent() {
991        let mut child1 = create_start_update("child-1", OperationType::Step);
992        child1.parent_id = Some("parent-ctx".to_string());
993
994        let mut child2 = create_start_update("child-2", OperationType::Step);
995        child2.parent_id = Some("parent-ctx".to_string());
996
997        let mut child3 = create_start_update("child-3", OperationType::Step);
998        child3.parent_id = Some("parent-ctx".to_string());
999
1000        let batch = vec![
1001            create_request(child1),
1002            create_request(child2),
1003            create_request(create_start_update("parent-ctx", OperationType::Context)),
1004            create_request(child3),
1005        ];
1006
1007        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1008
1009        assert_eq!(sorted[0].operation.operation_id, "parent-ctx");
1010
1011        for i in 1..4 {
1012            assert!(sorted[i].operation.parent_id.as_deref() == Some("parent-ctx"));
1013        }
1014    }
1015
1016    #[test]
1017    fn test_nested_contexts() {
1018        let mut parent_ctx = create_start_update("parent-ctx", OperationType::Context);
1019        parent_ctx.parent_id = Some("grandparent-ctx".to_string());
1020
1021        let mut child = create_start_update("child-1", OperationType::Step);
1022        child.parent_id = Some("parent-ctx".to_string());
1023
1024        let batch = vec![
1025            create_request(child),
1026            create_request(parent_ctx),
1027            create_request(create_start_update(
1028                "grandparent-ctx",
1029                OperationType::Context,
1030            )),
1031        ];
1032
1033        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1034
1035        let grandparent_pos = sorted
1036            .iter()
1037            .position(|r| r.operation.operation_id == "grandparent-ctx")
1038            .unwrap();
1039        let parent_pos = sorted
1040            .iter()
1041            .position(|r| r.operation.operation_id == "parent-ctx")
1042            .unwrap();
1043        let child_pos = sorted
1044            .iter()
1045            .position(|r| r.operation.operation_id == "child-1")
1046            .unwrap();
1047
1048        assert!(grandparent_pos < parent_pos);
1049        assert!(parent_pos < child_pos);
1050    }
1051
1052    #[test]
1053    fn test_execution_start_not_affected() {
1054        let batch = vec![
1055            create_request(create_start_update("step-1", OperationType::Step)),
1056            create_request(create_start_update("exec-1", OperationType::Execution)),
1057            create_request(create_start_update("step-2", OperationType::Step)),
1058        ];
1059
1060        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1061
1062        assert_eq!(sorted.len(), 3);
1063        assert_eq!(sorted[0].operation.operation_id, "step-1");
1064        assert_eq!(sorted[1].operation.operation_id, "exec-1");
1065        assert_eq!(sorted[2].operation.operation_id, "step-2");
1066    }
1067}
1068
1069#[cfg(test)]
1070mod property_tests {
1071    use super::*;
1072    use crate::client::{CheckpointResponse, DurableServiceClient, GetOperationsResponse};
1073    use async_trait::async_trait;
1074    use proptest::prelude::*;
1075    use std::collections::HashSet;
1076    use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
1077
1078    struct CountingMockClient {
1079        checkpoint_count: Arc<AtomicUsize>,
1080    }
1081
1082    impl CountingMockClient {
1083        fn new() -> (Self, Arc<AtomicUsize>) {
1084            let count = Arc::new(AtomicUsize::new(0));
1085            (
1086                Self {
1087                    checkpoint_count: count.clone(),
1088                },
1089                count,
1090            )
1091        }
1092    }
1093
1094    #[async_trait]
1095    impl DurableServiceClient for CountingMockClient {
1096        async fn checkpoint(
1097            &self,
1098            _durable_execution_arn: &str,
1099            _checkpoint_token: &str,
1100            _operations: Vec<OperationUpdate>,
1101        ) -> Result<CheckpointResponse, DurableError> {
1102            self.checkpoint_count.fetch_add(1, AtomicOrdering::SeqCst);
1103            Ok(CheckpointResponse::new(format!(
1104                "token-{}",
1105                self.checkpoint_count.load(AtomicOrdering::SeqCst)
1106            )))
1107        }
1108
1109        async fn get_operations(
1110            &self,
1111            _durable_execution_arn: &str,
1112            _next_marker: &str,
1113        ) -> Result<GetOperationsResponse, DurableError> {
1114            Ok(GetOperationsResponse {
1115                operations: vec![],
1116                next_marker: None,
1117            })
1118        }
1119    }
1120
1121    fn create_test_update_with_size(id: &str, result_size: usize) -> OperationUpdate {
1122        let mut update = OperationUpdate::start(id, OperationType::Step);
1123        if result_size > 0 {
1124            update.result = Some("x".repeat(result_size));
1125        }
1126        update
1127    }
1128
1129    proptest! {
1130        #![proptest_config(ProptestConfig::with_cases(100))]
1131
1132        #[test]
1133        fn prop_checkpoint_batching_respects_operation_count_limit(
1134            num_requests in 1usize..20,
1135            max_ops_per_batch in 1usize..10,
1136        ) {
1137            let rt = tokio::runtime::Runtime::new().unwrap();
1138            let result: Result<(), TestCaseError> = rt.block_on(async {
1139                let (client, call_count) = CountingMockClient::new();
1140                let client = Arc::new(client);
1141
1142                let (sender, rx) = create_checkpoint_queue(100);
1143                let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
1144
1145                let config = CheckpointBatcherConfig {
1146                    max_batch_time_ms: 10,
1147                    max_batch_operations: max_ops_per_batch,
1148                    max_batch_size_bytes: usize::MAX,
1149                };
1150
1151                let mut batcher = CheckpointBatcher::new(
1152                    config,
1153                    rx,
1154                    client,
1155                    "arn:test".to_string(),
1156                    checkpoint_token.clone(),
1157                );
1158
1159                for i in 0..num_requests {
1160                    let update = create_test_update_with_size(&format!("op-{}", i), 0);
1161                    sender.tx.send(CheckpointRequest::async_request(update)).await.unwrap();
1162                }
1163
1164                drop(sender);
1165                batcher.run().await;
1166
1167                let expected_max_calls = (num_requests + max_ops_per_batch - 1) / max_ops_per_batch;
1168                let actual_calls = call_count.load(AtomicOrdering::SeqCst);
1169
1170                if actual_calls > expected_max_calls {
1171                    return Err(TestCaseError::fail(format!(
1172                        "Expected at most {} API calls for {} requests with batch size {}, got {}",
1173                        expected_max_calls, num_requests, max_ops_per_batch, actual_calls
1174                    )));
1175                }
1176
1177                if actual_calls < 1 {
1178                    return Err(TestCaseError::fail(format!(
1179                        "Expected at least 1 API call for {} requests, got {}",
1180                        num_requests, actual_calls
1181                    )));
1182                }
1183
1184                Ok(())
1185            });
1186            result?;
1187        }
1188
1189        #[test]
1190        fn prop_checkpoint_batching_respects_size_limit(
1191            num_requests in 1usize..10,
1192            result_size in 100usize..500,
1193            max_batch_size in 500usize..2000,
1194        ) {
1195            let rt = tokio::runtime::Runtime::new().unwrap();
1196            let result: Result<(), TestCaseError> = rt.block_on(async {
1197                let (client, call_count) = CountingMockClient::new();
1198                let client = Arc::new(client);
1199
1200                let (sender, rx) = create_checkpoint_queue(100);
1201                let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
1202
1203                let config = CheckpointBatcherConfig {
1204                    max_batch_time_ms: 10,
1205                    max_batch_operations: usize::MAX,
1206                    max_batch_size_bytes: max_batch_size,
1207                };
1208
1209                let mut batcher = CheckpointBatcher::new(
1210                    config,
1211                    rx,
1212                    client,
1213                    "arn:test".to_string(),
1214                    checkpoint_token.clone(),
1215                );
1216
1217                for i in 0..num_requests {
1218                    let update = create_test_update_with_size(&format!("op-{}", i), result_size);
1219                    sender.tx.send(CheckpointRequest::async_request(update)).await.unwrap();
1220                }
1221
1222                drop(sender);
1223                batcher.run().await;
1224
1225                let estimated_request_size = 100 + result_size;
1226                let total_size = num_requests * estimated_request_size;
1227                let expected_max_calls = (total_size + max_batch_size - 1) / max_batch_size;
1228                let actual_calls = call_count.load(AtomicOrdering::SeqCst);
1229
1230                if actual_calls > expected_max_calls.max(1) * 2 {
1231                    return Err(TestCaseError::fail(format!(
1232                        "Expected at most ~{} API calls for {} requests of size {}, got {}",
1233                        expected_max_calls, num_requests, estimated_request_size, actual_calls
1234                    )));
1235                }
1236
1237                if actual_calls < 1 {
1238                    return Err(TestCaseError::fail(format!(
1239                        "Expected at least 1 API call, got {}",
1240                        actual_calls
1241                    )));
1242                }
1243
1244                Ok(())
1245            });
1246            result?;
1247        }
1248
1249        #[test]
1250        fn prop_all_requests_are_processed(
1251            num_requests in 1usize..20,
1252        ) {
1253            let rt = tokio::runtime::Runtime::new().unwrap();
1254            let result: Result<(), TestCaseError> = rt.block_on(async {
1255                let (client, _call_count) = CountingMockClient::new();
1256                let client = Arc::new(client);
1257
1258                let (sender, rx) = create_checkpoint_queue(100);
1259                let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
1260
1261                let config = CheckpointBatcherConfig {
1262                    max_batch_time_ms: 10,
1263                    max_batch_operations: 5,
1264                    ..Default::default()
1265                };
1266
1267                let mut batcher = CheckpointBatcher::new(
1268                    config,
1269                    rx,
1270                    client,
1271                    "arn:test".to_string(),
1272                    checkpoint_token.clone(),
1273                );
1274
1275                let mut receivers = Vec::new();
1276
1277                for i in 0..num_requests {
1278                    let update = create_test_update_with_size(&format!("op-{}", i), 0);
1279                    let (request, rx) = CheckpointRequest::sync(update);
1280                    sender.tx.send(request).await.unwrap();
1281                    receivers.push(rx);
1282                }
1283
1284                drop(sender);
1285                batcher.run().await;
1286
1287                let mut success_count = 0;
1288                for rx in receivers {
1289                    if let Ok(result) = rx.await {
1290                        if result.is_ok() {
1291                            success_count += 1;
1292                        }
1293                    }
1294                }
1295
1296                if success_count != num_requests {
1297                    return Err(TestCaseError::fail(format!(
1298                        "Expected all {} requests to succeed, got {}",
1299                        num_requests, success_count
1300                    )));
1301                }
1302
1303                Ok(())
1304            });
1305            result?;
1306        }
1307    }
1308
1309    // ============================================================================
1310    // Property Tests for Batch Ordering (Properties 13, 14, 15)
1311    // Feature: rust-sdk-test-suite
1312    // ============================================================================
1313
1314    /// Strategy for generating valid operation IDs
1315    fn operation_id_strategy() -> impl Strategy<Value = String> {
1316        "[a-z]{1,8}-[0-9]{1,4}".prop_map(|s| s)
1317    }
1318
1319    /// Strategy for generating operation types
1320    fn operation_type_strategy() -> impl Strategy<Value = OperationType> {
1321        prop_oneof![
1322            Just(OperationType::Execution),
1323            Just(OperationType::Step),
1324            Just(OperationType::Wait),
1325            Just(OperationType::Callback),
1326            Just(OperationType::Invoke),
1327            Just(OperationType::Context),
1328        ]
1329    }
1330
1331    /// Strategy for generating operation actions
1332    fn operation_action_strategy() -> impl Strategy<Value = OperationAction> {
1333        prop_oneof![
1334            Just(OperationAction::Start),
1335            Just(OperationAction::Succeed),
1336            Just(OperationAction::Fail),
1337        ]
1338    }
1339
1340    /// Creates a checkpoint request with the given parameters
1341    fn create_checkpoint_request(
1342        id: &str,
1343        op_type: OperationType,
1344        action: OperationAction,
1345        parent_id: Option<String>,
1346    ) -> CheckpointRequest {
1347        let mut update = match action {
1348            OperationAction::Start => OperationUpdate::start(id, op_type),
1349            OperationAction::Succeed => {
1350                OperationUpdate::succeed(id, op_type, Some("result".to_string()))
1351            }
1352            OperationAction::Fail => OperationUpdate::fail(
1353                id,
1354                op_type,
1355                crate::error::ErrorObject::new("Error", "message"),
1356            ),
1357            _ => OperationUpdate::start(id, op_type),
1358        };
1359        update.parent_id = parent_id;
1360        CheckpointRequest::async_request(update)
1361    }
1362
1363    /// Strategy for generating a batch with parent-child relationships
1364    /// Generates a CONTEXT START and some child operations that reference it
1365    fn parent_child_batch_strategy() -> impl Strategy<Value = Vec<CheckpointRequest>> {
1366        (
1367            operation_id_strategy(),                                // parent context id
1368            prop::collection::vec(operation_id_strategy(), 1..5),   // child ids
1369            prop::collection::vec(operation_type_strategy(), 1..5), // child types (will be filtered)
1370        )
1371            .prop_map(|(parent_id, child_ids, child_types)| {
1372                let mut batch = Vec::new();
1373
1374                // Add children first (in random order, they should be sorted after parent)
1375                for (child_id, child_type) in child_ids.iter().zip(child_types.iter()) {
1376                    // Skip if child_type is Execution (can't have parent)
1377                    if *child_type != OperationType::Execution {
1378                        batch.push(create_checkpoint_request(
1379                            child_id,
1380                            *child_type,
1381                            OperationAction::Start,
1382                            Some(parent_id.clone()),
1383                        ));
1384                    }
1385                }
1386
1387                // Add parent CONTEXT START (should be sorted to come first)
1388                batch.push(create_checkpoint_request(
1389                    &parent_id,
1390                    OperationType::Context,
1391                    OperationAction::Start,
1392                    None,
1393                ));
1394
1395                batch
1396            })
1397    }
1398
1399    /// Strategy for generating a batch with EXECUTION completion
1400    /// Generates some operations and an EXECUTION SUCCEED/FAIL that should be last
1401    fn execution_completion_batch_strategy() -> impl Strategy<Value = Vec<CheckpointRequest>> {
1402        (
1403            operation_id_strategy(), // execution id
1404            prop::collection::vec((operation_id_strategy(), operation_type_strategy()), 1..5), // other operations
1405            prop::bool::ANY, // succeed or fail
1406        )
1407            .prop_map(|(exec_id, other_ops, succeed)| {
1408                let mut batch = Vec::new();
1409
1410                // Add EXECUTION completion first (should be sorted to come last)
1411                let exec_action = if succeed {
1412                    OperationAction::Succeed
1413                } else {
1414                    OperationAction::Fail
1415                };
1416                batch.push(create_checkpoint_request(
1417                    &exec_id,
1418                    OperationType::Execution,
1419                    exec_action,
1420                    None,
1421                ));
1422
1423                // Add other operations
1424                for (op_id, op_type) in other_ops {
1425                    // Skip Execution type to avoid conflicts
1426                    if op_type != OperationType::Execution {
1427                        batch.push(create_checkpoint_request(
1428                            &op_id,
1429                            op_type,
1430                            OperationAction::Start,
1431                            None,
1432                        ));
1433                    }
1434                }
1435
1436                batch
1437            })
1438    }
1439
1440    /// Strategy for generating a batch with potential duplicate operation IDs
1441    /// (START and completion for same operation)
1442    fn same_operation_batch_strategy() -> impl Strategy<Value = Vec<CheckpointRequest>> {
1443        (
1444            operation_id_strategy(),
1445            prop_oneof![Just(OperationType::Step), Just(OperationType::Context),],
1446            prop::bool::ANY, // succeed or fail
1447        )
1448            .prop_map(|(op_id, op_type, succeed)| {
1449                let mut batch = Vec::new();
1450
1451                // Add completion first (should be sorted after START)
1452                let completion_action = if succeed {
1453                    OperationAction::Succeed
1454                } else {
1455                    OperationAction::Fail
1456                };
1457                batch.push(create_checkpoint_request(
1458                    &op_id,
1459                    op_type,
1460                    completion_action,
1461                    None,
1462                ));
1463
1464                // Add START (should be sorted to come first)
1465                batch.push(create_checkpoint_request(
1466                    &op_id,
1467                    op_type,
1468                    OperationAction::Start,
1469                    None,
1470                ));
1471
1472                batch
1473            })
1474    }
1475
1476    proptest! {
1477        #![proptest_config(ProptestConfig::with_cases(100))]
1478
1479        /// Property 13: Parent-before-child ordering
1480        /// For any batch of operation updates, child operations SHALL appear after their parent CONTEXT start
1481        /// Feature: rust-sdk-test-suite, Property 13: Parent-before-child ordering
1482        /// Validates: Requirements 8.1
1483        #[test]
1484        fn prop_parent_context_start_before_children(batch in parent_child_batch_strategy()) {
1485            // Skip empty batches
1486            if batch.is_empty() {
1487                return Ok(());
1488            }
1489
1490            let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1491
1492            // Find the parent CONTEXT START position
1493            let parent_pos = sorted.iter().position(|r| {
1494                r.operation.operation_type == OperationType::Context
1495                    && r.operation.action == OperationAction::Start
1496                    && r.operation.parent_id.is_none()
1497            });
1498
1499            // If there's a parent CONTEXT START, all children must come after it
1500            if let Some(parent_idx) = parent_pos {
1501                let parent_id = &sorted[parent_idx].operation.operation_id;
1502
1503                for (idx, req) in sorted.iter().enumerate() {
1504                    if let Some(ref req_parent_id) = req.operation.parent_id {
1505                        if req_parent_id == parent_id {
1506                            prop_assert!(
1507                                idx > parent_idx,
1508                                "Child operation {} at index {} should come after parent {} at index {}",
1509                                req.operation.operation_id,
1510                                idx,
1511                                parent_id,
1512                                parent_idx
1513                            );
1514                        }
1515                    }
1516                }
1517            }
1518        }
1519
1520        /// Property 14: Execution-last ordering
1521        /// For any batch containing EXECUTION completion, the EXECUTION update SHALL be last
1522        /// Feature: rust-sdk-test-suite, Property 14: Execution-last ordering
1523        /// Validates: Requirements 8.2
1524        #[test]
1525        fn prop_execution_completion_is_last(batch in execution_completion_batch_strategy()) {
1526            // Skip empty batches
1527            if batch.is_empty() {
1528                return Ok(());
1529            }
1530
1531            let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1532
1533            // Find EXECUTION completion (SUCCEED or FAIL)
1534            let exec_completion_pos = sorted.iter().position(|r| {
1535                r.operation.operation_type == OperationType::Execution
1536                    && matches!(r.operation.action, OperationAction::Succeed | OperationAction::Fail)
1537            });
1538
1539            // If there's an EXECUTION completion, it must be last
1540            if let Some(exec_idx) = exec_completion_pos {
1541                prop_assert_eq!(
1542                    exec_idx,
1543                    sorted.len() - 1,
1544                    "EXECUTION completion should be at index {} (last), but found at index {}",
1545                    sorted.len() - 1,
1546                    exec_idx
1547                );
1548            }
1549        }
1550
1551        /// Property 15: Operation ID uniqueness (with exception for START+completion)
1552        /// For any batch, each operation_id SHALL appear at most once, except for STEP/CONTEXT
1553        /// operations which may have both START and completion in the same batch
1554        /// Feature: rust-sdk-test-suite, Property 15: Operation ID uniqueness
1555        /// Validates: Requirements 8.3
1556        #[test]
1557        fn prop_operation_id_uniqueness_with_start_completion_exception(
1558            batch in same_operation_batch_strategy()
1559        ) {
1560            // Skip empty batches
1561            if batch.is_empty() {
1562                return Ok(());
1563            }
1564
1565            let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1566
1567            // Track operation_id occurrences with their actions
1568            let mut seen: std::collections::HashMap<String, Vec<OperationAction>> = std::collections::HashMap::new();
1569
1570            for req in &sorted {
1571                seen.entry(req.operation.operation_id.clone())
1572                    .or_default()
1573                    .push(req.operation.action);
1574            }
1575
1576            // Verify uniqueness rules
1577            for (op_id, actions) in &seen {
1578                if actions.len() > 1 {
1579                    // Multiple occurrences are only allowed for START + completion
1580                    let has_start = actions.contains(&OperationAction::Start);
1581                    let has_completion = actions.iter().any(|a| {
1582                        matches!(a, OperationAction::Succeed | OperationAction::Fail | OperationAction::Retry)
1583                    });
1584
1585                    prop_assert!(
1586                        has_start && has_completion && actions.len() == 2,
1587                        "Operation {} appears {} times with actions {:?}. \
1588                         Multiple occurrences only allowed for START + completion pair.",
1589                        op_id,
1590                        actions.len(),
1591                        actions
1592                    );
1593                }
1594            }
1595        }
1596
1597        /// Property 15 (continued): For same operation, START must come before completion
1598        /// Feature: rust-sdk-test-suite, Property 15: START before completion ordering
1599        /// Validates: Requirements 8.3
1600        #[test]
1601        fn prop_start_before_completion_for_same_operation(
1602            batch in same_operation_batch_strategy()
1603        ) {
1604            // Skip empty batches
1605            if batch.is_empty() {
1606                return Ok(());
1607            }
1608
1609            let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1610
1611            // Find START and completion positions for each operation
1612            let mut start_positions: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
1613            let mut completion_positions: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
1614
1615            for (idx, req) in sorted.iter().enumerate() {
1616                let op_id = &req.operation.operation_id;
1617                match req.operation.action {
1618                    OperationAction::Start => {
1619                        start_positions.insert(op_id.clone(), idx);
1620                    }
1621                    OperationAction::Succeed | OperationAction::Fail | OperationAction::Retry => {
1622                        completion_positions.insert(op_id.clone(), idx);
1623                    }
1624                    _ => {}
1625                }
1626            }
1627
1628            // Verify START comes before completion for each operation
1629            for (op_id, start_idx) in &start_positions {
1630                if let Some(completion_idx) = completion_positions.get(op_id) {
1631                    prop_assert!(
1632                        start_idx < completion_idx,
1633                        "For operation {}, START at index {} should come before completion at index {}",
1634                        op_id,
1635                        start_idx,
1636                        completion_idx
1637                    );
1638                }
1639            }
1640        }
1641    }
1642
1643    // ============================================================================
1644    // Property Test for Batch Round-Trip Preservation
1645    // Feature: rust-sdk-test-suite
1646    // ============================================================================
1647
1648    /// Strategy for generating a random batch of operations
1649    fn random_batch_strategy() -> impl Strategy<Value = Vec<CheckpointRequest>> {
1650        prop::collection::vec(
1651            (
1652                operation_id_strategy(),
1653                operation_type_strategy(),
1654                operation_action_strategy(),
1655            ),
1656            1..10,
1657        )
1658        .prop_map(|ops| {
1659            ops.into_iter()
1660                .filter(|(_, op_type, _)| *op_type != OperationType::Execution) // Avoid execution complications
1661                .map(|(id, op_type, action)| create_checkpoint_request(&id, op_type, action, None))
1662                .collect()
1663        })
1664    }
1665
1666    proptest! {
1667        #![proptest_config(ProptestConfig::with_cases(100))]
1668
1669        /// Property: Batch sorting preserves all operations
1670        /// For any valid operation update sequence, sorting SHALL preserve all operations
1671        /// Feature: rust-sdk-test-suite, Property: Batch preservation
1672        /// Validates: Requirements 8.4
1673        #[test]
1674        fn prop_batch_sorting_preserves_all_operations(batch in random_batch_strategy()) {
1675            // Skip empty batches
1676            if batch.is_empty() {
1677                return Ok(());
1678            }
1679
1680            // Collect original operation IDs and actions
1681            let original_ops: HashSet<(String, String)> = batch.iter()
1682                .map(|r| (r.operation.operation_id.clone(), format!("{:?}", r.operation.action)))
1683                .collect();
1684
1685            let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1686
1687            // Collect sorted operation IDs and actions
1688            let sorted_ops: HashSet<(String, String)> = sorted.iter()
1689                .map(|r| (r.operation.operation_id.clone(), format!("{:?}", r.operation.action)))
1690                .collect();
1691
1692            // Verify all operations are preserved
1693            prop_assert_eq!(
1694                original_ops.len(),
1695                sorted_ops.len(),
1696                "Sorting changed the number of operations: original {}, sorted {}",
1697                original_ops.len(),
1698                sorted_ops.len()
1699            );
1700
1701            prop_assert_eq!(
1702                original_ops,
1703                sorted_ops,
1704                "Sorting changed the set of operations"
1705            );
1706        }
1707
1708        /// Property: Sorting is idempotent
1709        /// Sorting an already sorted batch should produce the same result
1710        /// Feature: rust-sdk-test-suite, Property: Sorting idempotence
1711        #[test]
1712        fn prop_sorting_is_idempotent(batch in random_batch_strategy()) {
1713            // Skip empty batches
1714            if batch.is_empty() {
1715                return Ok(());
1716            }
1717
1718            let sorted_once = CheckpointBatcher::sort_checkpoint_batch(batch);
1719
1720            // Clone the sorted batch for second sort
1721            let sorted_once_clone: Vec<CheckpointRequest> = sorted_once.iter()
1722                .map(|r| CheckpointRequest::async_request(r.operation.clone()))
1723                .collect();
1724
1725            let sorted_twice = CheckpointBatcher::sort_checkpoint_batch(sorted_once_clone);
1726
1727            // Verify the order is the same
1728            prop_assert_eq!(
1729                sorted_once.len(),
1730                sorted_twice.len(),
1731                "Double sorting changed the number of operations"
1732            );
1733
1734            for (idx, (first, second)) in sorted_once.iter().zip(sorted_twice.iter()).enumerate() {
1735                prop_assert_eq!(
1736                    &first.operation.operation_id,
1737                    &second.operation.operation_id,
1738                    "Operation ID mismatch at index {}: {} vs {}",
1739                    idx,
1740                    &first.operation.operation_id,
1741                    &second.operation.operation_id
1742                );
1743                prop_assert_eq!(
1744                    first.operation.action,
1745                    second.operation.action,
1746                    "Action mismatch at index {} for operation {}: {:?} vs {:?}",
1747                    idx,
1748                    &first.operation.operation_id,
1749                    first.operation.action,
1750                    second.operation.action
1751                );
1752            }
1753        }
1754    }
1755}