Skip to main content

durable_execution_sdk/state/
batcher.rs

1//! Checkpoint batching for efficient API calls.
2//!
3//! This module provides the checkpoint batching system that collects checkpoint
4//! requests and sends them in batches to reduce API calls to the Lambda service.
5//!
6//! ## Checkpoint Token Management
7//!
8//! The batcher manages checkpoint tokens according to the following rules:
9//!
10//! 1. **First Checkpoint**: Uses the initial `CheckpointToken` from the Lambda invocation input
11//! 2. **Subsequent Checkpoints**: Uses the token returned from the previous checkpoint response
12//! 3. **Token Consumption**: Each token can only be used once; the batcher automatically
13//!    updates to the new token after each successful checkpoint
14//! 4. **Error Handling**: If a checkpoint fails with "Invalid checkpoint token", the error
15//!    is marked as retriable so Lambda can retry with a fresh token
16//!
17//! ## Requirements
18//!
19//! - 2.9: THE Checkpointing_System SHALL use the CheckpointToken from invocation input for the first checkpoint
20//! - 2.10: THE Checkpointing_System SHALL use the returned CheckpointToken from each checkpoint response for subsequent checkpoints
21//! - 2.11: THE Checkpointing_System SHALL handle InvalidParameterValueException for invalid tokens by allowing propagation for retry
22//! - 2.12: WHEN batching operations, THE Checkpointing_System SHALL checkpoint in execution order with EXECUTION completion last
23//! - 2.13: THE Checkpointing_System SHALL support including both START and completion actions for STEP/CONTEXT in the same batch
24
25use std::collections::{HashMap, HashSet};
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::Arc;
28use std::time::Duration as StdDuration;
29
30use tokio::sync::{mpsc, oneshot, RwLock};
31use tokio::time::{timeout, Instant};
32
33use crate::client::SharedDurableServiceClient;
34use crate::error::DurableError;
35use crate::operation::{OperationAction, OperationType, OperationUpdate};
36
37/// Configuration for the checkpoint batcher.
38#[derive(Debug, Clone)]
39pub struct CheckpointBatcherConfig {
40    /// Maximum size of a batch in bytes (default: 750KB)
41    pub max_batch_size_bytes: usize,
42    /// Maximum time to wait before sending a batch (default: 1 second)
43    pub max_batch_time_ms: u64,
44    /// Maximum number of operations per batch (default: unlimited/usize::MAX)
45    pub max_batch_operations: usize,
46}
47
48impl Default for CheckpointBatcherConfig {
49    fn default() -> Self {
50        Self {
51            max_batch_size_bytes: 750 * 1024, // 750KB
52            max_batch_time_ms: 1000,          // 1 second
53            max_batch_operations: usize::MAX, // unlimited
54        }
55    }
56}
57
58/// A request to checkpoint an operation.
59///
60/// This struct is sent through the checkpoint queue to the batcher.
61/// It includes an optional completion channel for synchronous checkpoints.
62#[derive(Debug)]
63pub struct CheckpointRequest {
64    /// The operation update to checkpoint
65    pub operation: OperationUpdate,
66    /// Optional channel to signal completion (for sync checkpoints)
67    pub completion: Option<oneshot::Sender<Result<(), DurableError>>>,
68}
69
70impl CheckpointRequest {
71    /// Creates a new synchronous checkpoint request.
72    ///
73    /// Returns the request and a receiver to wait for completion.
74    pub fn sync(operation: OperationUpdate) -> (Self, oneshot::Receiver<Result<(), DurableError>>) {
75        let (tx, rx) = oneshot::channel();
76        (
77            Self {
78                operation,
79                completion: Some(tx),
80            },
81            rx,
82        )
83    }
84
85    /// Creates a new asynchronous (fire-and-forget) checkpoint request.
86    pub fn async_request(operation: OperationUpdate) -> Self {
87        Self {
88            operation,
89            completion: None,
90        }
91    }
92
93    /// Returns true if this is a synchronous request.
94    pub fn is_sync(&self) -> bool {
95        self.completion.is_some()
96    }
97
98    /// Estimates the size of this request in bytes for batching.
99    pub fn estimated_size(&self) -> usize {
100        // Estimate based on JSON serialization
101        // operation_id + operation_type + action + result + error + parent_id + name
102        let base_size = 100; // Base overhead for JSON structure
103        let op = &self.operation;
104
105        base_size
106            + op.operation_id.len()
107            + op.result.as_ref().map(|r| r.len()).unwrap_or(0)
108            + op.error
109                .as_ref()
110                .map(|e| e.error_message.len() + e.error_type.len())
111                .unwrap_or(0)
112            + op.parent_id.as_ref().map(|p| p.len()).unwrap_or(0)
113            + op.name.as_ref().map(|n| n.len()).unwrap_or(0)
114    }
115}
116
117/// Result of processing a batch of checkpoints.
118#[derive(Debug)]
119pub struct BatchResult {
120    /// Whether the batch was successfully sent
121    pub success: bool,
122    /// The new checkpoint token if successful
123    pub new_token: Option<String>,
124    /// Error if the batch failed
125    pub error: Option<DurableError>,
126}
127
128/// The checkpoint batcher collects checkpoint requests and sends them in batches.
129///
130/// This improves efficiency by reducing the number of API calls to the Lambda service.
131/// The batcher sends a batch when any of these conditions are met:
132/// - The batch reaches the maximum size in bytes
133/// - The batch reaches the maximum number of operations
134/// - The maximum batch time has elapsed
135pub struct CheckpointBatcher {
136    /// Configuration for batching behavior
137    config: CheckpointBatcherConfig,
138    /// Receiver for checkpoint requests
139    queue_rx: mpsc::Receiver<CheckpointRequest>,
140    /// Service client for sending checkpoints
141    service_client: SharedDurableServiceClient,
142    /// Reference to the execution state for updating tokens
143    durable_execution_arn: String,
144    /// Current checkpoint token (shared with ExecutionState)
145    checkpoint_token: Arc<RwLock<String>>,
146    /// Tracks whether the initial token has been consumed
147    initial_token_consumed: AtomicBool,
148}
149
150impl CheckpointBatcher {
151    /// Creates a new CheckpointBatcher.
152    ///
153    /// # Arguments
154    ///
155    /// * `config` - Configuration for batching behavior
156    /// * `queue_rx` - Receiver for checkpoint requests
157    /// * `service_client` - Service client for sending checkpoints
158    /// * `durable_execution_arn` - The ARN of the durable execution
159    /// * `checkpoint_token` - The initial checkpoint token from Lambda invocation input
160    pub fn new(
161        config: CheckpointBatcherConfig,
162        queue_rx: mpsc::Receiver<CheckpointRequest>,
163        service_client: SharedDurableServiceClient,
164        durable_execution_arn: String,
165        checkpoint_token: Arc<RwLock<String>>,
166    ) -> Self {
167        Self {
168            config,
169            queue_rx,
170            service_client,
171            durable_execution_arn,
172            checkpoint_token,
173            initial_token_consumed: AtomicBool::new(false),
174        }
175    }
176
177    /// Runs the batcher loop, processing checkpoint requests.
178    ///
179    /// This method runs until the queue is closed (sender dropped).
180    pub async fn run(&mut self) {
181        loop {
182            let batch = self.collect_batch().await;
183            if batch.is_empty() {
184                // Queue closed, exit
185                break;
186            }
187            self.process_batch(batch).await;
188        }
189    }
190
191    /// Collects a batch of checkpoint requests.
192    ///
193    /// Returns when any batch limit is reached or the queue is closed.
194    async fn collect_batch(&mut self) -> Vec<CheckpointRequest> {
195        let mut batch = Vec::new();
196        let mut batch_size = 0usize;
197        let batch_deadline =
198            Instant::now() + StdDuration::from_millis(self.config.max_batch_time_ms);
199
200        // Wait for the first request (blocking)
201        match self.queue_rx.recv().await {
202            Some(request) => {
203                batch_size += request.estimated_size();
204                batch.push(request);
205            }
206            None => return batch, // Queue closed
207        }
208
209        // Collect more requests until limits are reached
210        loop {
211            // Check if we've hit operation count limit
212            if batch.len() >= self.config.max_batch_operations {
213                break;
214            }
215
216            // Calculate remaining time until deadline
217            let now = Instant::now();
218            if now >= batch_deadline {
219                break;
220            }
221            let remaining = batch_deadline - now;
222
223            // Try to receive more requests with timeout
224            match timeout(remaining, self.queue_rx.recv()).await {
225                Ok(Some(request)) => {
226                    let request_size = request.estimated_size();
227
228                    // Check if adding this request would exceed size limit
229                    if batch_size + request_size > self.config.max_batch_size_bytes
230                        && !batch.is_empty()
231                    {
232                        // Include this request and break - next batch will handle any overflow
233                        batch.push(request);
234                        break;
235                    }
236
237                    batch_size += request_size;
238                    batch.push(request);
239                }
240                Ok(None) => break, // Queue closed
241                Err(_) => break,   // Timeout reached
242            }
243        }
244
245        batch
246    }
247
248    /// Sorts checkpoint requests according to the ordering rules.
249    ///
250    /// The ordering rules are:
251    /// 1. Operations are checkpointed in execution order (preserving original order)
252    /// 2. EXECUTION completion (SUCCEED/FAIL on EXECUTION type) must be last in the batch
253    /// 3. Child operations must come after their parent CONTEXT starts
254    /// 4. START and completion (SUCCEED/FAIL) for the same operation can be in the same batch
255    pub fn sort_checkpoint_batch(batch: Vec<CheckpointRequest>) -> Vec<CheckpointRequest> {
256        if batch.len() <= 1 {
257            return batch;
258        }
259
260        // Step 1: Identify CONTEXT START operations and build parent-child relationships
261        let context_starts: HashSet<String> = batch
262            .iter()
263            .filter(|req| {
264                req.operation.operation_type == OperationType::Context
265                    && req.operation.action == OperationAction::Start
266            })
267            .map(|req| req.operation.operation_id.clone())
268            .collect();
269
270        // Build a map of operation_id -> parent_id for operations in this batch
271        let parent_map: HashMap<String, Option<String>> = batch
272            .iter()
273            .map(|req| {
274                (
275                    req.operation.operation_id.clone(),
276                    req.operation.parent_id.clone(),
277                )
278            })
279            .collect();
280
281        // Helper function to check if an operation is an ancestor of another
282        let is_ancestor = |operation_id: &str, ancestor_id: &str| -> bool {
283            let mut current = parent_map.get(operation_id).and_then(|p| p.as_ref());
284            while let Some(parent_id) = current {
285                if parent_id == ancestor_id {
286                    return true;
287                }
288                current = parent_map.get(parent_id).and_then(|p| p.as_ref());
289            }
290            false
291        };
292
293        // Step 2: Sort with custom comparator
294        let mut indexed_batch: Vec<(usize, CheckpointRequest)> =
295            batch.into_iter().enumerate().collect();
296
297        indexed_batch.sort_by(|(idx_a, a), (idx_b, b)| {
298            // Priority 1: EXECUTION completion must be last
299            let a_is_exec_completion = a.operation.operation_type == OperationType::Execution
300                && matches!(
301                    a.operation.action,
302                    OperationAction::Succeed | OperationAction::Fail
303                );
304            let b_is_exec_completion = b.operation.operation_type == OperationType::Execution
305                && matches!(
306                    b.operation.action,
307                    OperationAction::Succeed | OperationAction::Fail
308                );
309
310            if a_is_exec_completion && !b_is_exec_completion {
311                return std::cmp::Ordering::Greater;
312            }
313            if !a_is_exec_completion && b_is_exec_completion {
314                return std::cmp::Ordering::Less;
315            }
316
317            // Priority 2: Parent CONTEXT START must come before child operations
318            if b.operation.action == OperationAction::Start
319                && context_starts.contains(&b.operation.operation_id)
320            {
321                if let Some(ref parent_id) = a.operation.parent_id {
322                    if *parent_id == b.operation.operation_id
323                        || is_ancestor(&a.operation.operation_id, &b.operation.operation_id)
324                    {
325                        return std::cmp::Ordering::Greater;
326                    }
327                }
328            }
329            if a.operation.action == OperationAction::Start
330                && context_starts.contains(&a.operation.operation_id)
331            {
332                if let Some(ref parent_id) = b.operation.parent_id {
333                    if *parent_id == a.operation.operation_id
334                        || is_ancestor(&b.operation.operation_id, &a.operation.operation_id)
335                    {
336                        return std::cmp::Ordering::Less;
337                    }
338                }
339            }
340
341            // Priority 3: For same operation_id, START comes before completion
342            if a.operation.operation_id == b.operation.operation_id {
343                let a_is_start = a.operation.action == OperationAction::Start;
344                let b_is_start = b.operation.action == OperationAction::Start;
345                if a_is_start && !b_is_start {
346                    return std::cmp::Ordering::Less;
347                }
348                if !a_is_start && b_is_start {
349                    return std::cmp::Ordering::Greater;
350                }
351            }
352
353            // Priority 4: Preserve original order (stable sort)
354            idx_a.cmp(idx_b)
355        });
356
357        // Extract the sorted requests
358        indexed_batch.into_iter().map(|(_, req)| req).collect()
359    }
360
361    /// Processes a batch of checkpoint requests.
362    async fn process_batch(&self, batch: Vec<CheckpointRequest>) {
363        if batch.is_empty() {
364            return;
365        }
366
367        // Sort the batch according to checkpoint ordering rules
368        let sorted_batch = Self::sort_checkpoint_batch(batch);
369
370        // Extract operations and completion channels
371        let (operations, completions): (Vec<_>, Vec<_>) = sorted_batch
372            .into_iter()
373            .map(|req| (req.operation, req.completion))
374            .unzip();
375
376        // Get current checkpoint token
377        let token = self.checkpoint_token.read().await.clone();
378
379        // Send the batch to the service with retry for throttling
380        let result = self.checkpoint_with_retry(&token, operations).await;
381
382        // Handle the result
383        match result {
384            Ok(response) => {
385                // Mark that we've consumed the initial token.
386                // Release ordering ensures that the checkpoint_token write (via RwLock)
387                // is visible to any thread that subsequently reads initial_token_consumed.
388                // This is a one-way transition (false -> true) that never reverts.
389                //
390                // Requirements: 4.4, 4.6
391                self.initial_token_consumed.store(true, Ordering::Release);
392
393                // Update the checkpoint token with the new token from the response
394                {
395                    let mut token_guard = self.checkpoint_token.write().await;
396                    *token_guard = response.checkpoint_token;
397                }
398
399                // Signal success to all waiting callers
400                for completion in completions.into_iter().flatten() {
401                    let _ = completion.send(Ok(()));
402                }
403            }
404            Err(error) => {
405                // Check if this is an invalid checkpoint token error
406                let is_invalid_token = error.is_invalid_checkpoint_token();
407
408                // Signal failure to all waiting callers
409                for completion in completions.into_iter().flatten() {
410                    let error_msg = if is_invalid_token {
411                        format!(
412                            "Invalid checkpoint token - token may have been consumed. \
413                             Lambda will retry with a fresh token. Original error: {}",
414                            error
415                        )
416                    } else {
417                        error.to_string()
418                    };
419
420                    let _ = completion.send(Err(DurableError::Checkpoint {
421                        message: error_msg,
422                        is_retriable: error.is_retriable(),
423                        aws_error: None,
424                    }));
425                }
426            }
427        }
428    }
429
430    /// Sends a checkpoint request with retry for throttling errors.
431    ///
432    /// Since the `checkpoint` trait method takes `Vec<OperationUpdate>` by value,
433    /// a clone is required for each retry attempt. The clone is only performed
434    /// when actually retrying, not on the final (successful or non-throttled) attempt.
435    async fn checkpoint_with_retry(
436        &self,
437        token: &str,
438        operations: Vec<OperationUpdate>,
439    ) -> Result<crate::client::CheckpointResponse, DurableError> {
440        const MAX_RETRIES: u32 = 5;
441        const INITIAL_DELAY_MS: u64 = 100;
442        const MAX_DELAY_MS: u64 = 10_000;
443        const BACKOFF_MULTIPLIER: u64 = 2;
444
445        let mut attempt = 0;
446        let mut delay_ms = INITIAL_DELAY_MS;
447
448        loop {
449            let result = self
450                .service_client
451                .checkpoint(&self.durable_execution_arn, token, operations.clone())
452                .await;
453
454            match result {
455                Ok(response) => return Ok(response),
456                Err(error) if error.is_throttling() => {
457                    attempt += 1;
458                    if attempt > MAX_RETRIES {
459                        tracing::warn!(
460                            attempt = attempt,
461                            "Checkpoint throttling: max retries exceeded"
462                        );
463                        return Err(error);
464                    }
465
466                    let actual_delay = error.get_retry_after_ms().unwrap_or(delay_ms);
467
468                    tracing::debug!(
469                        attempt = attempt,
470                        delay_ms = actual_delay,
471                        "Checkpoint throttled, retrying with exponential backoff"
472                    );
473
474                    tokio::time::sleep(StdDuration::from_millis(actual_delay)).await;
475                    delay_ms = (delay_ms * BACKOFF_MULTIPLIER).min(MAX_DELAY_MS);
476                }
477                Err(error) => return Err(error),
478            }
479        }
480    }
481}
482
483/// Handle for sending checkpoint requests to the batcher.
484#[derive(Clone)]
485pub struct CheckpointSender {
486    /// Channel for sending requests to the batcher
487    pub tx: mpsc::Sender<CheckpointRequest>,
488}
489
490impl CheckpointSender {
491    /// Creates a new CheckpointSender.
492    pub fn new(tx: mpsc::Sender<CheckpointRequest>) -> Self {
493        Self { tx }
494    }
495
496    /// Sends a synchronous checkpoint request and waits for completion.
497    ///
498    /// This method blocks until the checkpoint is confirmed or fails.
499    pub async fn checkpoint_sync(&self, operation: OperationUpdate) -> Result<(), DurableError> {
500        let (request, rx) = CheckpointRequest::sync(operation);
501
502        self.tx
503            .send(request)
504            .await
505            .map_err(|_| DurableError::Checkpoint {
506                message: "Checkpoint queue closed".to_string(),
507                is_retriable: false,
508                aws_error: None,
509            })?;
510
511        rx.await.map_err(|_| DurableError::Checkpoint {
512            message: "Checkpoint completion channel closed".to_string(),
513            is_retriable: false,
514            aws_error: None,
515        })?
516    }
517
518    /// Sends an asynchronous checkpoint request (fire-and-forget).
519    ///
520    /// This method returns immediately without waiting for confirmation.
521    pub async fn checkpoint_async(&self, operation: OperationUpdate) -> Result<(), DurableError> {
522        let request = CheckpointRequest::async_request(operation);
523
524        self.tx
525            .send(request)
526            .await
527            .map_err(|_| DurableError::Checkpoint {
528                message: "Checkpoint queue closed".to_string(),
529                is_retriable: false,
530                aws_error: None,
531            })
532    }
533
534    /// Sends a checkpoint request with configurable sync/async behavior.
535    pub async fn checkpoint(
536        &self,
537        operation: OperationUpdate,
538        is_sync: bool,
539    ) -> Result<(), DurableError> {
540        if is_sync {
541            self.checkpoint_sync(operation).await
542        } else {
543            self.checkpoint_async(operation).await
544        }
545    }
546}
547
548/// Creates a checkpoint queue with the given buffer size.
549///
550/// Returns a sender for submitting checkpoint requests and a receiver
551/// for the batcher to process them.
552pub fn create_checkpoint_queue(
553    buffer_size: usize,
554) -> (CheckpointSender, mpsc::Receiver<CheckpointRequest>) {
555    let (tx, rx) = mpsc::channel(buffer_size);
556    (CheckpointSender::new(tx), rx)
557}
558
559#[cfg(test)]
560mod tests {
561    use super::*;
562    use crate::client::{CheckpointResponse, MockDurableServiceClient};
563    use crate::error::ErrorObject;
564
565    fn create_test_update(id: &str) -> OperationUpdate {
566        OperationUpdate::start(id, OperationType::Step)
567    }
568
569    fn create_start_update(id: &str, op_type: OperationType) -> OperationUpdate {
570        OperationUpdate::start(id, op_type)
571    }
572
573    fn create_succeed_update(id: &str, op_type: OperationType) -> OperationUpdate {
574        OperationUpdate::succeed(id, op_type, Some("result".to_string()))
575    }
576
577    fn create_fail_update(id: &str, op_type: OperationType) -> OperationUpdate {
578        OperationUpdate::fail(id, op_type, ErrorObject::new("Error", "message"))
579    }
580
581    fn create_request(update: OperationUpdate) -> CheckpointRequest {
582        CheckpointRequest::async_request(update)
583    }
584
585    // Checkpoint request tests
586    #[test]
587    fn test_checkpoint_request_sync() {
588        let update = create_test_update("op-1");
589        let (request, _rx) = CheckpointRequest::sync(update);
590
591        assert!(request.is_sync());
592        assert_eq!(request.operation.operation_id, "op-1");
593    }
594
595    #[test]
596    fn test_checkpoint_request_async() {
597        let update = create_test_update("op-1");
598        let request = CheckpointRequest::async_request(update);
599
600        assert!(!request.is_sync());
601        assert_eq!(request.operation.operation_id, "op-1");
602    }
603
604    #[test]
605    fn test_checkpoint_request_estimated_size() {
606        let update = create_test_update("op-1");
607        let request = CheckpointRequest::async_request(update);
608
609        let size = request.estimated_size();
610        assert!(size > 0);
611        assert!(size >= 100);
612    }
613
614    #[test]
615    fn test_checkpoint_request_estimated_size_with_result() {
616        let mut update = create_test_update("op-1");
617        update.result = Some("a".repeat(1000));
618        let request = CheckpointRequest::async_request(update);
619
620        let size = request.estimated_size();
621        assert!(size >= 1100);
622    }
623
624    #[test]
625    fn test_create_checkpoint_queue() {
626        let (sender, _rx) = create_checkpoint_queue(100);
627        drop(sender);
628    }
629
630    #[tokio::test]
631    async fn test_checkpoint_sender_sync() {
632        let (sender, mut rx) = create_checkpoint_queue(10);
633
634        let handle = tokio::spawn(async move {
635            if let Some(request) = rx.recv().await {
636                assert!(request.is_sync());
637                if let Some(completion) = request.completion {
638                    let _ = completion.send(Ok(()));
639                }
640            }
641        });
642
643        let update = create_test_update("op-1");
644        let result = sender.checkpoint_sync(update).await;
645        assert!(result.is_ok());
646
647        handle.await.unwrap();
648    }
649
650    #[tokio::test]
651    async fn test_checkpoint_sender_async() {
652        let (sender, mut rx) = create_checkpoint_queue(10);
653
654        let update = create_test_update("op-1");
655        let result = sender.checkpoint_async(update).await;
656        assert!(result.is_ok());
657
658        let request = rx.recv().await.unwrap();
659        assert!(!request.is_sync());
660        assert_eq!(request.operation.operation_id, "op-1");
661    }
662
663    #[tokio::test]
664    async fn test_checkpoint_sender_queue_closed() {
665        let (sender, rx) = create_checkpoint_queue(10);
666        drop(rx);
667
668        let update = create_test_update("op-1");
669        let result = sender.checkpoint_async(update).await;
670        assert!(result.is_err());
671    }
672
673    #[tokio::test]
674    async fn test_checkpoint_batcher_processes_batch() {
675        let client = Arc::new(
676            MockDurableServiceClient::new()
677                .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
678        );
679
680        let (sender, rx) = create_checkpoint_queue(10);
681        let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
682
683        let config = CheckpointBatcherConfig {
684            max_batch_time_ms: 10,
685            ..Default::default()
686        };
687
688        let mut batcher = CheckpointBatcher::new(
689            config,
690            rx,
691            client,
692            "arn:test".to_string(),
693            checkpoint_token.clone(),
694        );
695
696        let update = create_test_update("op-1");
697        let (request, completion_rx) = CheckpointRequest::sync(update);
698        sender.tx.send(request).await.unwrap();
699
700        drop(sender);
701        batcher.run().await;
702
703        let result = completion_rx.await.unwrap();
704        assert!(result.is_ok());
705
706        let token = checkpoint_token.read().await;
707        assert_eq!(*token, "new-token");
708    }
709
710    #[tokio::test]
711    async fn test_checkpoint_batcher_handles_error() {
712        let client = Arc::new(
713            MockDurableServiceClient::new()
714                .with_checkpoint_response(Err(DurableError::checkpoint_retriable("Test error"))),
715        );
716
717        let (sender, rx) = create_checkpoint_queue(10);
718        let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
719
720        let config = CheckpointBatcherConfig {
721            max_batch_time_ms: 10,
722            ..Default::default()
723        };
724
725        let mut batcher = CheckpointBatcher::new(
726            config,
727            rx,
728            client,
729            "arn:test".to_string(),
730            checkpoint_token.clone(),
731        );
732
733        let update = create_test_update("op-1");
734        let (request, completion_rx) = CheckpointRequest::sync(update);
735        sender.tx.send(request).await.unwrap();
736
737        drop(sender);
738        batcher.run().await;
739
740        let result = completion_rx.await.unwrap();
741        assert!(result.is_err());
742
743        let token = checkpoint_token.read().await;
744        assert_eq!(*token, "initial-token");
745    }
746
747    #[tokio::test]
748    async fn test_checkpoint_batcher_batches_multiple_requests() {
749        let client = Arc::new(
750            MockDurableServiceClient::new()
751                .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
752        );
753
754        let (sender, rx) = create_checkpoint_queue(10);
755        let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
756
757        let config = CheckpointBatcherConfig {
758            max_batch_time_ms: 50,
759            max_batch_operations: 3,
760            ..Default::default()
761        };
762
763        let mut batcher = CheckpointBatcher::new(
764            config,
765            rx,
766            client,
767            "arn:test".to_string(),
768            checkpoint_token.clone(),
769        );
770
771        for i in 0..3 {
772            let update = create_test_update(&format!("op-{}", i));
773            sender
774                .tx
775                .send(CheckpointRequest::async_request(update))
776                .await
777                .unwrap();
778        }
779
780        drop(sender);
781        batcher.run().await;
782
783        let token = checkpoint_token.read().await;
784        assert_eq!(*token, "new-token");
785    }
786
787    // Batch ordering tests
788    #[test]
789    fn test_execution_completion_is_last() {
790        let batch = vec![
791            create_request(create_succeed_update("exec-1", OperationType::Execution)),
792            create_request(create_start_update("step-1", OperationType::Step)),
793            create_request(create_succeed_update("step-2", OperationType::Step)),
794        ];
795
796        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
797
798        assert_eq!(sorted.len(), 3);
799        assert_eq!(sorted[2].operation.operation_id, "exec-1");
800        assert_eq!(sorted[2].operation.action, OperationAction::Succeed);
801        assert_eq!(sorted[2].operation.operation_type, OperationType::Execution);
802    }
803
804    #[test]
805    fn test_execution_fail_is_last() {
806        let batch = vec![
807            create_request(create_fail_update("exec-1", OperationType::Execution)),
808            create_request(create_start_update("step-1", OperationType::Step)),
809        ];
810
811        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
812
813        assert_eq!(sorted.len(), 2);
814        assert_eq!(sorted[1].operation.operation_id, "exec-1");
815        assert_eq!(sorted[1].operation.action, OperationAction::Fail);
816    }
817
818    #[test]
819    fn test_child_after_parent_context_start() {
820        let mut child_update = create_start_update("child-1", OperationType::Step);
821        child_update.parent_id = Some("parent-ctx".to_string());
822
823        let batch = vec![
824            create_request(child_update),
825            create_request(create_start_update("parent-ctx", OperationType::Context)),
826        ];
827
828        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
829
830        assert_eq!(sorted.len(), 2);
831        assert_eq!(sorted[0].operation.operation_id, "parent-ctx");
832        assert_eq!(sorted[0].operation.action, OperationAction::Start);
833        assert_eq!(sorted[1].operation.operation_id, "child-1");
834    }
835
836    #[test]
837    fn test_start_and_completion_same_operation() {
838        let batch = vec![
839            create_request(create_succeed_update("step-1", OperationType::Step)),
840            create_request(create_start_update("step-1", OperationType::Step)),
841        ];
842
843        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
844
845        assert_eq!(sorted.len(), 2);
846        assert_eq!(sorted[0].operation.operation_id, "step-1");
847        assert_eq!(sorted[0].operation.action, OperationAction::Start);
848        assert_eq!(sorted[1].operation.operation_id, "step-1");
849        assert_eq!(sorted[1].operation.action, OperationAction::Succeed);
850    }
851
852    #[test]
853    fn test_context_start_and_completion_same_batch() {
854        let batch = vec![
855            create_request(create_succeed_update("ctx-1", OperationType::Context)),
856            create_request(create_start_update("ctx-1", OperationType::Context)),
857        ];
858
859        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
860
861        assert_eq!(sorted.len(), 2);
862        assert_eq!(sorted[0].operation.operation_id, "ctx-1");
863        assert_eq!(sorted[0].operation.action, OperationAction::Start);
864        assert_eq!(sorted[1].operation.operation_id, "ctx-1");
865        assert_eq!(sorted[1].operation.action, OperationAction::Succeed);
866    }
867
868    #[test]
869    fn test_step_start_and_fail_same_batch() {
870        let batch = vec![
871            create_request(create_fail_update("step-1", OperationType::Step)),
872            create_request(create_start_update("step-1", OperationType::Step)),
873        ];
874
875        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
876
877        assert_eq!(sorted.len(), 2);
878        assert_eq!(sorted[0].operation.operation_id, "step-1");
879        assert_eq!(sorted[0].operation.action, OperationAction::Start);
880        assert_eq!(sorted[1].operation.operation_id, "step-1");
881        assert_eq!(sorted[1].operation.action, OperationAction::Fail);
882    }
883
884    #[test]
885    fn test_complex_ordering_scenario() {
886        let mut child1 = create_start_update("child-1", OperationType::Step);
887        child1.parent_id = Some("parent-ctx".to_string());
888
889        let mut child2 = create_succeed_update("child-2", OperationType::Step);
890        child2.parent_id = Some("parent-ctx".to_string());
891
892        let batch = vec![
893            create_request(create_succeed_update("exec-1", OperationType::Execution)),
894            create_request(child1),
895            create_request(create_succeed_update("parent-ctx", OperationType::Context)),
896            create_request(create_start_update("parent-ctx", OperationType::Context)),
897            create_request(child2),
898        ];
899
900        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
901
902        assert_eq!(sorted.len(), 5);
903
904        let parent_start_pos = sorted
905            .iter()
906            .position(|r| {
907                r.operation.operation_id == "parent-ctx"
908                    && r.operation.action == OperationAction::Start
909            })
910            .unwrap();
911        let parent_succeed_pos = sorted
912            .iter()
913            .position(|r| {
914                r.operation.operation_id == "parent-ctx"
915                    && r.operation.action == OperationAction::Succeed
916            })
917            .unwrap();
918        let child1_pos = sorted
919            .iter()
920            .position(|r| r.operation.operation_id == "child-1")
921            .unwrap();
922        let child2_pos = sorted
923            .iter()
924            .position(|r| r.operation.operation_id == "child-2")
925            .unwrap();
926        let exec_pos = sorted
927            .iter()
928            .position(|r| r.operation.operation_id == "exec-1")
929            .unwrap();
930
931        assert!(parent_start_pos < parent_succeed_pos);
932        assert!(parent_start_pos < child1_pos);
933        assert!(parent_start_pos < child2_pos);
934        assert_eq!(exec_pos, 4);
935    }
936
937    #[test]
938    fn test_empty_batch() {
939        let batch: Vec<CheckpointRequest> = vec![];
940        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
941        assert!(sorted.is_empty());
942    }
943
944    #[test]
945    fn test_single_item_batch() {
946        let batch = vec![create_request(create_start_update(
947            "step-1",
948            OperationType::Step,
949        ))];
950        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
951        assert_eq!(sorted.len(), 1);
952        assert_eq!(sorted[0].operation.operation_id, "step-1");
953    }
954
955    #[test]
956    fn test_preserves_original_order() {
957        let batch = vec![
958            create_request(create_start_update("step-1", OperationType::Step)),
959            create_request(create_start_update("step-2", OperationType::Step)),
960            create_request(create_start_update("step-3", OperationType::Step)),
961        ];
962
963        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
964
965        assert_eq!(sorted.len(), 3);
966        assert_eq!(sorted[0].operation.operation_id, "step-1");
967        assert_eq!(sorted[1].operation.operation_id, "step-2");
968        assert_eq!(sorted[2].operation.operation_id, "step-3");
969    }
970
971    #[test]
972    fn test_multiple_children_same_parent() {
973        let mut child1 = create_start_update("child-1", OperationType::Step);
974        child1.parent_id = Some("parent-ctx".to_string());
975
976        let mut child2 = create_start_update("child-2", OperationType::Step);
977        child2.parent_id = Some("parent-ctx".to_string());
978
979        let mut child3 = create_start_update("child-3", OperationType::Step);
980        child3.parent_id = Some("parent-ctx".to_string());
981
982        let batch = vec![
983            create_request(child1),
984            create_request(child2),
985            create_request(create_start_update("parent-ctx", OperationType::Context)),
986            create_request(child3),
987        ];
988
989        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
990
991        assert_eq!(sorted[0].operation.operation_id, "parent-ctx");
992
993        for i in 1..4 {
994            assert!(sorted[i].operation.parent_id.as_deref() == Some("parent-ctx"));
995        }
996    }
997
998    #[test]
999    fn test_nested_contexts() {
1000        let mut parent_ctx = create_start_update("parent-ctx", OperationType::Context);
1001        parent_ctx.parent_id = Some("grandparent-ctx".to_string());
1002
1003        let mut child = create_start_update("child-1", OperationType::Step);
1004        child.parent_id = Some("parent-ctx".to_string());
1005
1006        let batch = vec![
1007            create_request(child),
1008            create_request(parent_ctx),
1009            create_request(create_start_update(
1010                "grandparent-ctx",
1011                OperationType::Context,
1012            )),
1013        ];
1014
1015        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1016
1017        let grandparent_pos = sorted
1018            .iter()
1019            .position(|r| r.operation.operation_id == "grandparent-ctx")
1020            .unwrap();
1021        let parent_pos = sorted
1022            .iter()
1023            .position(|r| r.operation.operation_id == "parent-ctx")
1024            .unwrap();
1025        let child_pos = sorted
1026            .iter()
1027            .position(|r| r.operation.operation_id == "child-1")
1028            .unwrap();
1029
1030        assert!(grandparent_pos < parent_pos);
1031        assert!(parent_pos < child_pos);
1032    }
1033
1034    #[test]
1035    fn test_execution_start_not_affected() {
1036        let batch = vec![
1037            create_request(create_start_update("step-1", OperationType::Step)),
1038            create_request(create_start_update("exec-1", OperationType::Execution)),
1039            create_request(create_start_update("step-2", OperationType::Step)),
1040        ];
1041
1042        let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1043
1044        assert_eq!(sorted.len(), 3);
1045        assert_eq!(sorted[0].operation.operation_id, "step-1");
1046        assert_eq!(sorted[1].operation.operation_id, "exec-1");
1047        assert_eq!(sorted[2].operation.operation_id, "step-2");
1048    }
1049}
1050
1051#[cfg(test)]
1052mod property_tests {
1053    use super::*;
1054    use crate::client::{CheckpointResponse, DurableServiceClient, GetOperationsResponse};
1055    use async_trait::async_trait;
1056    use proptest::prelude::*;
1057    use std::collections::HashSet;
1058    use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
1059
1060    struct CountingMockClient {
1061        checkpoint_count: Arc<AtomicUsize>,
1062    }
1063
1064    impl CountingMockClient {
1065        fn new() -> (Self, Arc<AtomicUsize>) {
1066            let count = Arc::new(AtomicUsize::new(0));
1067            (
1068                Self {
1069                    checkpoint_count: count.clone(),
1070                },
1071                count,
1072            )
1073        }
1074    }
1075
1076    #[async_trait]
1077    impl DurableServiceClient for CountingMockClient {
1078        async fn checkpoint(
1079            &self,
1080            _durable_execution_arn: &str,
1081            _checkpoint_token: &str,
1082            _operations: Vec<OperationUpdate>,
1083        ) -> Result<CheckpointResponse, DurableError> {
1084            self.checkpoint_count.fetch_add(1, AtomicOrdering::SeqCst);
1085            Ok(CheckpointResponse::new(format!(
1086                "token-{}",
1087                self.checkpoint_count.load(AtomicOrdering::SeqCst)
1088            )))
1089        }
1090
1091        async fn get_operations(
1092            &self,
1093            _durable_execution_arn: &str,
1094            _next_marker: &str,
1095        ) -> Result<GetOperationsResponse, DurableError> {
1096            Ok(GetOperationsResponse {
1097                operations: vec![],
1098                next_marker: None,
1099            })
1100        }
1101    }
1102
1103    fn create_test_update_with_size(id: &str, result_size: usize) -> OperationUpdate {
1104        let mut update = OperationUpdate::start(id, OperationType::Step);
1105        if result_size > 0 {
1106            update.result = Some("x".repeat(result_size));
1107        }
1108        update
1109    }
1110
1111    proptest! {
1112        #![proptest_config(ProptestConfig::with_cases(100))]
1113
1114        #[test]
1115        fn prop_checkpoint_batching_respects_operation_count_limit(
1116            num_requests in 1usize..20,
1117            max_ops_per_batch in 1usize..10,
1118        ) {
1119            let rt = tokio::runtime::Runtime::new().unwrap();
1120            let result: Result<(), TestCaseError> = rt.block_on(async {
1121                let (client, call_count) = CountingMockClient::new();
1122                let client = Arc::new(client);
1123
1124                let (sender, rx) = create_checkpoint_queue(100);
1125                let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
1126
1127                let config = CheckpointBatcherConfig {
1128                    max_batch_time_ms: 10,
1129                    max_batch_operations: max_ops_per_batch,
1130                    max_batch_size_bytes: usize::MAX,
1131                };
1132
1133                let mut batcher = CheckpointBatcher::new(
1134                    config,
1135                    rx,
1136                    client,
1137                    "arn:test".to_string(),
1138                    checkpoint_token.clone(),
1139                );
1140
1141                for i in 0..num_requests {
1142                    let update = create_test_update_with_size(&format!("op-{}", i), 0);
1143                    sender.tx.send(CheckpointRequest::async_request(update)).await.unwrap();
1144                }
1145
1146                drop(sender);
1147                batcher.run().await;
1148
1149                let expected_max_calls = (num_requests + max_ops_per_batch - 1) / max_ops_per_batch;
1150                let actual_calls = call_count.load(AtomicOrdering::SeqCst);
1151
1152                if actual_calls > expected_max_calls {
1153                    return Err(TestCaseError::fail(format!(
1154                        "Expected at most {} API calls for {} requests with batch size {}, got {}",
1155                        expected_max_calls, num_requests, max_ops_per_batch, actual_calls
1156                    )));
1157                }
1158
1159                if actual_calls < 1 {
1160                    return Err(TestCaseError::fail(format!(
1161                        "Expected at least 1 API call for {} requests, got {}",
1162                        num_requests, actual_calls
1163                    )));
1164                }
1165
1166                Ok(())
1167            });
1168            result?;
1169        }
1170
1171        #[test]
1172        fn prop_checkpoint_batching_respects_size_limit(
1173            num_requests in 1usize..10,
1174            result_size in 100usize..500,
1175            max_batch_size in 500usize..2000,
1176        ) {
1177            let rt = tokio::runtime::Runtime::new().unwrap();
1178            let result: Result<(), TestCaseError> = rt.block_on(async {
1179                let (client, call_count) = CountingMockClient::new();
1180                let client = Arc::new(client);
1181
1182                let (sender, rx) = create_checkpoint_queue(100);
1183                let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
1184
1185                let config = CheckpointBatcherConfig {
1186                    max_batch_time_ms: 10,
1187                    max_batch_operations: usize::MAX,
1188                    max_batch_size_bytes: max_batch_size,
1189                };
1190
1191                let mut batcher = CheckpointBatcher::new(
1192                    config,
1193                    rx,
1194                    client,
1195                    "arn:test".to_string(),
1196                    checkpoint_token.clone(),
1197                );
1198
1199                for i in 0..num_requests {
1200                    let update = create_test_update_with_size(&format!("op-{}", i), result_size);
1201                    sender.tx.send(CheckpointRequest::async_request(update)).await.unwrap();
1202                }
1203
1204                drop(sender);
1205                batcher.run().await;
1206
1207                let estimated_request_size = 100 + result_size;
1208                let total_size = num_requests * estimated_request_size;
1209                let expected_max_calls = (total_size + max_batch_size - 1) / max_batch_size;
1210                let actual_calls = call_count.load(AtomicOrdering::SeqCst);
1211
1212                if actual_calls > expected_max_calls.max(1) * 2 {
1213                    return Err(TestCaseError::fail(format!(
1214                        "Expected at most ~{} API calls for {} requests of size {}, got {}",
1215                        expected_max_calls, num_requests, estimated_request_size, actual_calls
1216                    )));
1217                }
1218
1219                if actual_calls < 1 {
1220                    return Err(TestCaseError::fail(format!(
1221                        "Expected at least 1 API call, got {}",
1222                        actual_calls
1223                    )));
1224                }
1225
1226                Ok(())
1227            });
1228            result?;
1229        }
1230
1231        #[test]
1232        fn prop_all_requests_are_processed(
1233            num_requests in 1usize..20,
1234        ) {
1235            let rt = tokio::runtime::Runtime::new().unwrap();
1236            let result: Result<(), TestCaseError> = rt.block_on(async {
1237                let (client, _call_count) = CountingMockClient::new();
1238                let client = Arc::new(client);
1239
1240                let (sender, rx) = create_checkpoint_queue(100);
1241                let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
1242
1243                let config = CheckpointBatcherConfig {
1244                    max_batch_time_ms: 10,
1245                    max_batch_operations: 5,
1246                    ..Default::default()
1247                };
1248
1249                let mut batcher = CheckpointBatcher::new(
1250                    config,
1251                    rx,
1252                    client,
1253                    "arn:test".to_string(),
1254                    checkpoint_token.clone(),
1255                );
1256
1257                let mut receivers = Vec::new();
1258
1259                for i in 0..num_requests {
1260                    let update = create_test_update_with_size(&format!("op-{}", i), 0);
1261                    let (request, rx) = CheckpointRequest::sync(update);
1262                    sender.tx.send(request).await.unwrap();
1263                    receivers.push(rx);
1264                }
1265
1266                drop(sender);
1267                batcher.run().await;
1268
1269                let mut success_count = 0;
1270                for rx in receivers {
1271                    if let Ok(result) = rx.await {
1272                        if result.is_ok() {
1273                            success_count += 1;
1274                        }
1275                    }
1276                }
1277
1278                if success_count != num_requests {
1279                    return Err(TestCaseError::fail(format!(
1280                        "Expected all {} requests to succeed, got {}",
1281                        num_requests, success_count
1282                    )));
1283                }
1284
1285                Ok(())
1286            });
1287            result?;
1288        }
1289    }
1290
1291    // ============================================================================
1292    // Property Tests for Batch Ordering (Properties 13, 14, 15)
1293    // Feature: rust-sdk-test-suite
1294    // ============================================================================
1295
1296    /// Strategy for generating valid operation IDs
1297    fn operation_id_strategy() -> impl Strategy<Value = String> {
1298        "[a-z]{1,8}-[0-9]{1,4}".prop_map(|s| s)
1299    }
1300
1301    /// Strategy for generating operation types
1302    fn operation_type_strategy() -> impl Strategy<Value = OperationType> {
1303        prop_oneof![
1304            Just(OperationType::Execution),
1305            Just(OperationType::Step),
1306            Just(OperationType::Wait),
1307            Just(OperationType::Callback),
1308            Just(OperationType::Invoke),
1309            Just(OperationType::Context),
1310        ]
1311    }
1312
1313    /// Strategy for generating operation actions
1314    fn operation_action_strategy() -> impl Strategy<Value = OperationAction> {
1315        prop_oneof![
1316            Just(OperationAction::Start),
1317            Just(OperationAction::Succeed),
1318            Just(OperationAction::Fail),
1319        ]
1320    }
1321
1322    /// Creates a checkpoint request with the given parameters
1323    fn create_checkpoint_request(
1324        id: &str,
1325        op_type: OperationType,
1326        action: OperationAction,
1327        parent_id: Option<String>,
1328    ) -> CheckpointRequest {
1329        let mut update = match action {
1330            OperationAction::Start => OperationUpdate::start(id, op_type),
1331            OperationAction::Succeed => {
1332                OperationUpdate::succeed(id, op_type, Some("result".to_string()))
1333            }
1334            OperationAction::Fail => OperationUpdate::fail(
1335                id,
1336                op_type,
1337                crate::error::ErrorObject::new("Error", "message"),
1338            ),
1339            _ => OperationUpdate::start(id, op_type),
1340        };
1341        update.parent_id = parent_id;
1342        CheckpointRequest::async_request(update)
1343    }
1344
1345    /// Strategy for generating a batch with parent-child relationships
1346    /// Generates a CONTEXT START and some child operations that reference it
1347    fn parent_child_batch_strategy() -> impl Strategy<Value = Vec<CheckpointRequest>> {
1348        (
1349            operation_id_strategy(),                                // parent context id
1350            prop::collection::vec(operation_id_strategy(), 1..5),   // child ids
1351            prop::collection::vec(operation_type_strategy(), 1..5), // child types (will be filtered)
1352        )
1353            .prop_map(|(parent_id, child_ids, child_types)| {
1354                let mut batch = Vec::new();
1355
1356                // Add children first (in random order, they should be sorted after parent)
1357                for (child_id, child_type) in child_ids.iter().zip(child_types.iter()) {
1358                    // Skip if child_type is Execution (can't have parent)
1359                    if *child_type != OperationType::Execution {
1360                        batch.push(create_checkpoint_request(
1361                            child_id,
1362                            *child_type,
1363                            OperationAction::Start,
1364                            Some(parent_id.clone()),
1365                        ));
1366                    }
1367                }
1368
1369                // Add parent CONTEXT START (should be sorted to come first)
1370                batch.push(create_checkpoint_request(
1371                    &parent_id,
1372                    OperationType::Context,
1373                    OperationAction::Start,
1374                    None,
1375                ));
1376
1377                batch
1378            })
1379    }
1380
1381    /// Strategy for generating a batch with EXECUTION completion
1382    /// Generates some operations and an EXECUTION SUCCEED/FAIL that should be last
1383    fn execution_completion_batch_strategy() -> impl Strategy<Value = Vec<CheckpointRequest>> {
1384        (
1385            operation_id_strategy(), // execution id
1386            prop::collection::vec((operation_id_strategy(), operation_type_strategy()), 1..5), // other operations
1387            prop::bool::ANY, // succeed or fail
1388        )
1389            .prop_map(|(exec_id, other_ops, succeed)| {
1390                let mut batch = Vec::new();
1391
1392                // Add EXECUTION completion first (should be sorted to come last)
1393                let exec_action = if succeed {
1394                    OperationAction::Succeed
1395                } else {
1396                    OperationAction::Fail
1397                };
1398                batch.push(create_checkpoint_request(
1399                    &exec_id,
1400                    OperationType::Execution,
1401                    exec_action,
1402                    None,
1403                ));
1404
1405                // Add other operations
1406                for (op_id, op_type) in other_ops {
1407                    // Skip Execution type to avoid conflicts
1408                    if op_type != OperationType::Execution {
1409                        batch.push(create_checkpoint_request(
1410                            &op_id,
1411                            op_type,
1412                            OperationAction::Start,
1413                            None,
1414                        ));
1415                    }
1416                }
1417
1418                batch
1419            })
1420    }
1421
1422    /// Strategy for generating a batch with potential duplicate operation IDs
1423    /// (START and completion for same operation)
1424    fn same_operation_batch_strategy() -> impl Strategy<Value = Vec<CheckpointRequest>> {
1425        (
1426            operation_id_strategy(),
1427            prop_oneof![Just(OperationType::Step), Just(OperationType::Context),],
1428            prop::bool::ANY, // succeed or fail
1429        )
1430            .prop_map(|(op_id, op_type, succeed)| {
1431                let mut batch = Vec::new();
1432
1433                // Add completion first (should be sorted after START)
1434                let completion_action = if succeed {
1435                    OperationAction::Succeed
1436                } else {
1437                    OperationAction::Fail
1438                };
1439                batch.push(create_checkpoint_request(
1440                    &op_id,
1441                    op_type,
1442                    completion_action,
1443                    None,
1444                ));
1445
1446                // Add START (should be sorted to come first)
1447                batch.push(create_checkpoint_request(
1448                    &op_id,
1449                    op_type,
1450                    OperationAction::Start,
1451                    None,
1452                ));
1453
1454                batch
1455            })
1456    }
1457
1458    proptest! {
1459        #![proptest_config(ProptestConfig::with_cases(100))]
1460
1461        /// Property 13: Parent-before-child ordering
1462        /// For any batch of operation updates, child operations SHALL appear after their parent CONTEXT start
1463        /// Feature: rust-sdk-test-suite, Property 13: Parent-before-child ordering
1464        /// Validates: Requirements 8.1
1465        #[test]
1466        fn prop_parent_context_start_before_children(batch in parent_child_batch_strategy()) {
1467            // Skip empty batches
1468            if batch.is_empty() {
1469                return Ok(());
1470            }
1471
1472            let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1473
1474            // Find the parent CONTEXT START position
1475            let parent_pos = sorted.iter().position(|r| {
1476                r.operation.operation_type == OperationType::Context
1477                    && r.operation.action == OperationAction::Start
1478                    && r.operation.parent_id.is_none()
1479            });
1480
1481            // If there's a parent CONTEXT START, all children must come after it
1482            if let Some(parent_idx) = parent_pos {
1483                let parent_id = &sorted[parent_idx].operation.operation_id;
1484
1485                for (idx, req) in sorted.iter().enumerate() {
1486                    if let Some(ref req_parent_id) = req.operation.parent_id {
1487                        if req_parent_id == parent_id {
1488                            prop_assert!(
1489                                idx > parent_idx,
1490                                "Child operation {} at index {} should come after parent {} at index {}",
1491                                req.operation.operation_id,
1492                                idx,
1493                                parent_id,
1494                                parent_idx
1495                            );
1496                        }
1497                    }
1498                }
1499            }
1500        }
1501
1502        /// Property 14: Execution-last ordering
1503        /// For any batch containing EXECUTION completion, the EXECUTION update SHALL be last
1504        /// Feature: rust-sdk-test-suite, Property 14: Execution-last ordering
1505        /// Validates: Requirements 8.2
1506        #[test]
1507        fn prop_execution_completion_is_last(batch in execution_completion_batch_strategy()) {
1508            // Skip empty batches
1509            if batch.is_empty() {
1510                return Ok(());
1511            }
1512
1513            let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1514
1515            // Find EXECUTION completion (SUCCEED or FAIL)
1516            let exec_completion_pos = sorted.iter().position(|r| {
1517                r.operation.operation_type == OperationType::Execution
1518                    && matches!(r.operation.action, OperationAction::Succeed | OperationAction::Fail)
1519            });
1520
1521            // If there's an EXECUTION completion, it must be last
1522            if let Some(exec_idx) = exec_completion_pos {
1523                prop_assert_eq!(
1524                    exec_idx,
1525                    sorted.len() - 1,
1526                    "EXECUTION completion should be at index {} (last), but found at index {}",
1527                    sorted.len() - 1,
1528                    exec_idx
1529                );
1530            }
1531        }
1532
1533        /// Property 15: Operation ID uniqueness (with exception for START+completion)
1534        /// For any batch, each operation_id SHALL appear at most once, except for STEP/CONTEXT
1535        /// operations which may have both START and completion in the same batch
1536        /// Feature: rust-sdk-test-suite, Property 15: Operation ID uniqueness
1537        /// Validates: Requirements 8.3
1538        #[test]
1539        fn prop_operation_id_uniqueness_with_start_completion_exception(
1540            batch in same_operation_batch_strategy()
1541        ) {
1542            // Skip empty batches
1543            if batch.is_empty() {
1544                return Ok(());
1545            }
1546
1547            let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1548
1549            // Track operation_id occurrences with their actions
1550            let mut seen: std::collections::HashMap<String, Vec<OperationAction>> = std::collections::HashMap::new();
1551
1552            for req in &sorted {
1553                seen.entry(req.operation.operation_id.clone())
1554                    .or_default()
1555                    .push(req.operation.action);
1556            }
1557
1558            // Verify uniqueness rules
1559            for (op_id, actions) in &seen {
1560                if actions.len() > 1 {
1561                    // Multiple occurrences are only allowed for START + completion
1562                    let has_start = actions.contains(&OperationAction::Start);
1563                    let has_completion = actions.iter().any(|a| {
1564                        matches!(a, OperationAction::Succeed | OperationAction::Fail | OperationAction::Retry)
1565                    });
1566
1567                    prop_assert!(
1568                        has_start && has_completion && actions.len() == 2,
1569                        "Operation {} appears {} times with actions {:?}. \
1570                         Multiple occurrences only allowed for START + completion pair.",
1571                        op_id,
1572                        actions.len(),
1573                        actions
1574                    );
1575                }
1576            }
1577        }
1578
1579        /// Property 15 (continued): For same operation, START must come before completion
1580        /// Feature: rust-sdk-test-suite, Property 15: START before completion ordering
1581        /// Validates: Requirements 8.3
1582        #[test]
1583        fn prop_start_before_completion_for_same_operation(
1584            batch in same_operation_batch_strategy()
1585        ) {
1586            // Skip empty batches
1587            if batch.is_empty() {
1588                return Ok(());
1589            }
1590
1591            let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1592
1593            // Find START and completion positions for each operation
1594            let mut start_positions: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
1595            let mut completion_positions: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
1596
1597            for (idx, req) in sorted.iter().enumerate() {
1598                let op_id = &req.operation.operation_id;
1599                match req.operation.action {
1600                    OperationAction::Start => {
1601                        start_positions.insert(op_id.clone(), idx);
1602                    }
1603                    OperationAction::Succeed | OperationAction::Fail | OperationAction::Retry => {
1604                        completion_positions.insert(op_id.clone(), idx);
1605                    }
1606                    _ => {}
1607                }
1608            }
1609
1610            // Verify START comes before completion for each operation
1611            for (op_id, start_idx) in &start_positions {
1612                if let Some(completion_idx) = completion_positions.get(op_id) {
1613                    prop_assert!(
1614                        start_idx < completion_idx,
1615                        "For operation {}, START at index {} should come before completion at index {}",
1616                        op_id,
1617                        start_idx,
1618                        completion_idx
1619                    );
1620                }
1621            }
1622        }
1623    }
1624
1625    // ============================================================================
1626    // Property Test for Batch Round-Trip Preservation
1627    // Feature: rust-sdk-test-suite
1628    // ============================================================================
1629
1630    /// Strategy for generating a random batch of operations
1631    fn random_batch_strategy() -> impl Strategy<Value = Vec<CheckpointRequest>> {
1632        prop::collection::vec(
1633            (
1634                operation_id_strategy(),
1635                operation_type_strategy(),
1636                operation_action_strategy(),
1637            ),
1638            1..10,
1639        )
1640        .prop_map(|ops| {
1641            ops.into_iter()
1642                .filter(|(_, op_type, _)| *op_type != OperationType::Execution) // Avoid execution complications
1643                .map(|(id, op_type, action)| create_checkpoint_request(&id, op_type, action, None))
1644                .collect()
1645        })
1646    }
1647
1648    proptest! {
1649        #![proptest_config(ProptestConfig::with_cases(100))]
1650
1651        /// Property: Batch sorting preserves all operations
1652        /// For any valid operation update sequence, sorting SHALL preserve all operations
1653        /// Feature: rust-sdk-test-suite, Property: Batch preservation
1654        /// Validates: Requirements 8.4
1655        #[test]
1656        fn prop_batch_sorting_preserves_all_operations(batch in random_batch_strategy()) {
1657            // Skip empty batches
1658            if batch.is_empty() {
1659                return Ok(());
1660            }
1661
1662            // Collect original operation IDs and actions
1663            let original_ops: HashSet<(String, String)> = batch.iter()
1664                .map(|r| (r.operation.operation_id.clone(), format!("{:?}", r.operation.action)))
1665                .collect();
1666
1667            let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1668
1669            // Collect sorted operation IDs and actions
1670            let sorted_ops: HashSet<(String, String)> = sorted.iter()
1671                .map(|r| (r.operation.operation_id.clone(), format!("{:?}", r.operation.action)))
1672                .collect();
1673
1674            // Verify all operations are preserved
1675            prop_assert_eq!(
1676                original_ops.len(),
1677                sorted_ops.len(),
1678                "Sorting changed the number of operations: original {}, sorted {}",
1679                original_ops.len(),
1680                sorted_ops.len()
1681            );
1682
1683            prop_assert_eq!(
1684                original_ops,
1685                sorted_ops,
1686                "Sorting changed the set of operations"
1687            );
1688        }
1689
1690        /// Property: Sorting is idempotent
1691        /// Sorting an already sorted batch should produce the same result
1692        /// Feature: rust-sdk-test-suite, Property: Sorting idempotence
1693        #[test]
1694        fn prop_sorting_is_idempotent(batch in random_batch_strategy()) {
1695            // Skip empty batches
1696            if batch.is_empty() {
1697                return Ok(());
1698            }
1699
1700            let sorted_once = CheckpointBatcher::sort_checkpoint_batch(batch);
1701
1702            // Clone the sorted batch for second sort
1703            let sorted_once_clone: Vec<CheckpointRequest> = sorted_once.iter()
1704                .map(|r| CheckpointRequest::async_request(r.operation.clone()))
1705                .collect();
1706
1707            let sorted_twice = CheckpointBatcher::sort_checkpoint_batch(sorted_once_clone);
1708
1709            // Verify the order is the same
1710            prop_assert_eq!(
1711                sorted_once.len(),
1712                sorted_twice.len(),
1713                "Double sorting changed the number of operations"
1714            );
1715
1716            for (idx, (first, second)) in sorted_once.iter().zip(sorted_twice.iter()).enumerate() {
1717                prop_assert_eq!(
1718                    &first.operation.operation_id,
1719                    &second.operation.operation_id,
1720                    "Operation ID mismatch at index {}: {} vs {}",
1721                    idx,
1722                    &first.operation.operation_id,
1723                    &second.operation.operation_id
1724                );
1725                prop_assert_eq!(
1726                    first.operation.action,
1727                    second.operation.action,
1728                    "Action mismatch at index {} for operation {}: {:?} vs {:?}",
1729                    idx,
1730                    &first.operation.operation_id,
1731                    first.operation.action,
1732                    second.operation.action
1733                );
1734            }
1735        }
1736    }
1737}