1use std::collections::{HashMap, HashSet};
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::Arc;
28use std::time::Duration as StdDuration;
29
30use tokio::sync::{mpsc, oneshot, RwLock};
31use tokio::time::{timeout, Instant};
32
33use crate::client::SharedDurableServiceClient;
34use crate::error::DurableError;
35use crate::operation::{OperationAction, OperationType, OperationUpdate};
36
37#[derive(Debug, Clone)]
39pub struct CheckpointBatcherConfig {
40 pub max_batch_size_bytes: usize,
42 pub max_batch_time_ms: u64,
44 pub max_batch_operations: usize,
46}
47
48impl Default for CheckpointBatcherConfig {
49 fn default() -> Self {
50 Self {
51 max_batch_size_bytes: 750 * 1024, max_batch_time_ms: 1000, max_batch_operations: usize::MAX, }
55 }
56}
57
58#[derive(Debug)]
63pub struct CheckpointRequest {
64 pub operation: OperationUpdate,
66 pub completion: Option<oneshot::Sender<Result<(), DurableError>>>,
68}
69
70impl CheckpointRequest {
71 pub fn sync(operation: OperationUpdate) -> (Self, oneshot::Receiver<Result<(), DurableError>>) {
75 let (tx, rx) = oneshot::channel();
76 (
77 Self {
78 operation,
79 completion: Some(tx),
80 },
81 rx,
82 )
83 }
84
85 pub fn async_request(operation: OperationUpdate) -> Self {
87 Self {
88 operation,
89 completion: None,
90 }
91 }
92
93 pub fn is_sync(&self) -> bool {
95 self.completion.is_some()
96 }
97
98 pub fn estimated_size(&self) -> usize {
100 let base_size = 100; let op = &self.operation;
104
105 base_size
106 + op.operation_id.len()
107 + op.result.as_ref().map(|r| r.len()).unwrap_or(0)
108 + op.error
109 .as_ref()
110 .map(|e| e.error_message.len() + e.error_type.len())
111 .unwrap_or(0)
112 + op.parent_id.as_ref().map(|p| p.len()).unwrap_or(0)
113 + op.name.as_ref().map(|n| n.len()).unwrap_or(0)
114 }
115}
116
117#[derive(Debug)]
119pub struct BatchResult {
120 pub success: bool,
122 pub new_token: Option<String>,
124 pub error: Option<DurableError>,
126}
127
128pub struct CheckpointBatcher {
136 config: CheckpointBatcherConfig,
138 queue_rx: mpsc::Receiver<CheckpointRequest>,
140 service_client: SharedDurableServiceClient,
142 durable_execution_arn: String,
144 checkpoint_token: Arc<RwLock<String>>,
146 initial_token_consumed: AtomicBool,
148}
149
150impl CheckpointBatcher {
151 pub fn new(
165 config: CheckpointBatcherConfig,
166 queue_rx: mpsc::Receiver<CheckpointRequest>,
167 service_client: SharedDurableServiceClient,
168 durable_execution_arn: String,
169 checkpoint_token: Arc<RwLock<String>>,
170 ) -> Self {
171 Self {
172 config,
173 queue_rx,
174 service_client,
175 durable_execution_arn,
176 checkpoint_token,
177 initial_token_consumed: AtomicBool::new(false),
178 }
179 }
180
181 pub async fn run(&mut self) {
185 loop {
186 let batch = self.collect_batch().await;
187 if batch.is_empty() {
188 break;
190 }
191 self.process_batch(batch).await;
192 }
193 }
194
195 async fn collect_batch(&mut self) -> Vec<CheckpointRequest> {
199 let mut batch = Vec::new();
200 let mut batch_size = 0usize;
201 let batch_deadline =
202 Instant::now() + StdDuration::from_millis(self.config.max_batch_time_ms);
203
204 match self.queue_rx.recv().await {
206 Some(request) => {
207 batch_size += request.estimated_size();
208 batch.push(request);
209 }
210 None => return batch, }
212
213 loop {
215 if batch.len() >= self.config.max_batch_operations {
217 break;
218 }
219
220 let now = Instant::now();
222 if now >= batch_deadline {
223 break;
224 }
225 let remaining = batch_deadline - now;
226
227 match timeout(remaining, self.queue_rx.recv()).await {
229 Ok(Some(request)) => {
230 let request_size = request.estimated_size();
231
232 if batch_size + request_size > self.config.max_batch_size_bytes
234 && !batch.is_empty()
235 {
236 batch.push(request);
238 break;
239 }
240
241 batch_size += request_size;
242 batch.push(request);
243 }
244 Ok(None) => break, Err(_) => break, }
247 }
248
249 batch
250 }
251
252 pub fn sort_checkpoint_batch(batch: Vec<CheckpointRequest>) -> Vec<CheckpointRequest> {
265 if batch.len() <= 1 {
266 return batch;
267 }
268
269 let context_starts: HashSet<String> = batch
271 .iter()
272 .filter(|req| {
273 req.operation.operation_type == OperationType::Context
274 && req.operation.action == OperationAction::Start
275 })
276 .map(|req| req.operation.operation_id.clone())
277 .collect();
278
279 let parent_map: HashMap<String, Option<String>> = batch
281 .iter()
282 .map(|req| {
283 (
284 req.operation.operation_id.clone(),
285 req.operation.parent_id.clone(),
286 )
287 })
288 .collect();
289
290 let is_ancestor = |operation_id: &str, ancestor_id: &str| -> bool {
292 let mut current = parent_map.get(operation_id).and_then(|p| p.as_ref());
293 while let Some(parent_id) = current {
294 if parent_id == ancestor_id {
295 return true;
296 }
297 current = parent_map.get(parent_id).and_then(|p| p.as_ref());
298 }
299 false
300 };
301
302 let mut indexed_batch: Vec<(usize, CheckpointRequest)> =
304 batch.into_iter().enumerate().collect();
305
306 indexed_batch.sort_by(|(idx_a, a), (idx_b, b)| {
307 let a_is_exec_completion = a.operation.operation_type == OperationType::Execution
309 && matches!(
310 a.operation.action,
311 OperationAction::Succeed | OperationAction::Fail
312 );
313 let b_is_exec_completion = b.operation.operation_type == OperationType::Execution
314 && matches!(
315 b.operation.action,
316 OperationAction::Succeed | OperationAction::Fail
317 );
318
319 if a_is_exec_completion && !b_is_exec_completion {
320 return std::cmp::Ordering::Greater;
321 }
322 if !a_is_exec_completion && b_is_exec_completion {
323 return std::cmp::Ordering::Less;
324 }
325
326 if b.operation.action == OperationAction::Start
328 && context_starts.contains(&b.operation.operation_id)
329 {
330 if let Some(ref parent_id) = a.operation.parent_id {
331 if *parent_id == b.operation.operation_id
332 || is_ancestor(&a.operation.operation_id, &b.operation.operation_id)
333 {
334 return std::cmp::Ordering::Greater;
335 }
336 }
337 }
338 if a.operation.action == OperationAction::Start
339 && context_starts.contains(&a.operation.operation_id)
340 {
341 if let Some(ref parent_id) = b.operation.parent_id {
342 if *parent_id == a.operation.operation_id
343 || is_ancestor(&b.operation.operation_id, &a.operation.operation_id)
344 {
345 return std::cmp::Ordering::Less;
346 }
347 }
348 }
349
350 if a.operation.operation_id == b.operation.operation_id {
352 let a_is_start = a.operation.action == OperationAction::Start;
353 let b_is_start = b.operation.action == OperationAction::Start;
354 if a_is_start && !b_is_start {
355 return std::cmp::Ordering::Less;
356 }
357 if !a_is_start && b_is_start {
358 return std::cmp::Ordering::Greater;
359 }
360 }
361
362 idx_a.cmp(idx_b)
364 });
365
366 indexed_batch.into_iter().map(|(_, req)| req).collect()
368 }
369
370 async fn process_batch(&self, batch: Vec<CheckpointRequest>) {
381 if batch.is_empty() {
382 return;
383 }
384
385 let sorted_batch = Self::sort_checkpoint_batch(batch);
387
388 let (operations, completions): (Vec<_>, Vec<_>) = sorted_batch
390 .into_iter()
391 .map(|req| (req.operation, req.completion))
392 .unzip();
393
394 let token = self.checkpoint_token.read().await.clone();
396
397 let result = self.checkpoint_with_retry(&token, operations).await;
399
400 match result {
402 Ok(response) => {
403 self.initial_token_consumed.store(true, Ordering::Release);
410
411 {
413 let mut token_guard = self.checkpoint_token.write().await;
414 *token_guard = response.checkpoint_token;
415 }
416
417 for completion in completions.into_iter().flatten() {
419 let _ = completion.send(Ok(()));
420 }
421 }
422 Err(error) => {
423 let is_invalid_token = error.is_invalid_checkpoint_token();
425
426 for completion in completions.into_iter().flatten() {
428 let error_msg = if is_invalid_token {
429 format!(
430 "Invalid checkpoint token - token may have been consumed. \
431 Lambda will retry with a fresh token. Original error: {}",
432 error
433 )
434 } else {
435 error.to_string()
436 };
437
438 let _ = completion.send(Err(DurableError::Checkpoint {
439 message: error_msg,
440 is_retriable: error.is_retriable(),
441 aws_error: None,
442 }));
443 }
444 }
445 }
446 }
447
448 async fn checkpoint_with_retry(
454 &self,
455 token: &str,
456 operations: Vec<OperationUpdate>,
457 ) -> Result<crate::client::CheckpointResponse, DurableError> {
458 const MAX_RETRIES: u32 = 5;
459 const INITIAL_DELAY_MS: u64 = 100;
460 const MAX_DELAY_MS: u64 = 10_000;
461 const BACKOFF_MULTIPLIER: u64 = 2;
462
463 let mut attempt = 0;
464 let mut delay_ms = INITIAL_DELAY_MS;
465
466 loop {
467 let result = self
468 .service_client
469 .checkpoint(&self.durable_execution_arn, token, operations.clone())
470 .await;
471
472 match result {
473 Ok(response) => return Ok(response),
474 Err(error) if error.is_throttling() => {
475 attempt += 1;
476 if attempt > MAX_RETRIES {
477 tracing::warn!(
478 attempt = attempt,
479 "Checkpoint throttling: max retries exceeded"
480 );
481 return Err(error);
482 }
483
484 let actual_delay = error.get_retry_after_ms().unwrap_or(delay_ms);
485
486 tracing::debug!(
487 attempt = attempt,
488 delay_ms = actual_delay,
489 "Checkpoint throttled, retrying with exponential backoff"
490 );
491
492 tokio::time::sleep(StdDuration::from_millis(actual_delay)).await;
493 delay_ms = (delay_ms * BACKOFF_MULTIPLIER).min(MAX_DELAY_MS);
494 }
495 Err(error) => return Err(error),
496 }
497 }
498 }
499}
500
501#[derive(Clone)]
503pub struct CheckpointSender {
504 pub tx: mpsc::Sender<CheckpointRequest>,
506}
507
508impl CheckpointSender {
509 pub fn new(tx: mpsc::Sender<CheckpointRequest>) -> Self {
511 Self { tx }
512 }
513
514 pub async fn checkpoint_sync(&self, operation: OperationUpdate) -> Result<(), DurableError> {
518 let (request, rx) = CheckpointRequest::sync(operation);
519
520 self.tx
521 .send(request)
522 .await
523 .map_err(|_| DurableError::Checkpoint {
524 message: "Checkpoint queue closed".to_string(),
525 is_retriable: false,
526 aws_error: None,
527 })?;
528
529 rx.await.map_err(|_| DurableError::Checkpoint {
530 message: "Checkpoint completion channel closed".to_string(),
531 is_retriable: false,
532 aws_error: None,
533 })?
534 }
535
536 pub async fn checkpoint_async(&self, operation: OperationUpdate) -> Result<(), DurableError> {
540 let request = CheckpointRequest::async_request(operation);
541
542 self.tx
543 .send(request)
544 .await
545 .map_err(|_| DurableError::Checkpoint {
546 message: "Checkpoint queue closed".to_string(),
547 is_retriable: false,
548 aws_error: None,
549 })
550 }
551
552 pub async fn checkpoint(
554 &self,
555 operation: OperationUpdate,
556 is_sync: bool,
557 ) -> Result<(), DurableError> {
558 if is_sync {
559 self.checkpoint_sync(operation).await
560 } else {
561 self.checkpoint_async(operation).await
562 }
563 }
564}
565
566pub fn create_checkpoint_queue(
571 buffer_size: usize,
572) -> (CheckpointSender, mpsc::Receiver<CheckpointRequest>) {
573 let (tx, rx) = mpsc::channel(buffer_size);
574 (CheckpointSender::new(tx), rx)
575}
576
577#[cfg(test)]
578mod tests {
579 use super::*;
580 use crate::client::{CheckpointResponse, MockDurableServiceClient};
581 use crate::error::ErrorObject;
582
583 fn create_test_update(id: &str) -> OperationUpdate {
584 OperationUpdate::start(id, OperationType::Step)
585 }
586
587 fn create_start_update(id: &str, op_type: OperationType) -> OperationUpdate {
588 OperationUpdate::start(id, op_type)
589 }
590
591 fn create_succeed_update(id: &str, op_type: OperationType) -> OperationUpdate {
592 OperationUpdate::succeed(id, op_type, Some("result".to_string()))
593 }
594
595 fn create_fail_update(id: &str, op_type: OperationType) -> OperationUpdate {
596 OperationUpdate::fail(id, op_type, ErrorObject::new("Error", "message"))
597 }
598
599 fn create_request(update: OperationUpdate) -> CheckpointRequest {
600 CheckpointRequest::async_request(update)
601 }
602
603 #[test]
605 fn test_checkpoint_request_sync() {
606 let update = create_test_update("op-1");
607 let (request, _rx) = CheckpointRequest::sync(update);
608
609 assert!(request.is_sync());
610 assert_eq!(request.operation.operation_id, "op-1");
611 }
612
613 #[test]
614 fn test_checkpoint_request_async() {
615 let update = create_test_update("op-1");
616 let request = CheckpointRequest::async_request(update);
617
618 assert!(!request.is_sync());
619 assert_eq!(request.operation.operation_id, "op-1");
620 }
621
622 #[test]
623 fn test_checkpoint_request_estimated_size() {
624 let update = create_test_update("op-1");
625 let request = CheckpointRequest::async_request(update);
626
627 let size = request.estimated_size();
628 assert!(size > 0);
629 assert!(size >= 100);
630 }
631
632 #[test]
633 fn test_checkpoint_request_estimated_size_with_result() {
634 let mut update = create_test_update("op-1");
635 update.result = Some("a".repeat(1000));
636 let request = CheckpointRequest::async_request(update);
637
638 let size = request.estimated_size();
639 assert!(size >= 1100);
640 }
641
642 #[test]
643 fn test_create_checkpoint_queue() {
644 let (sender, _rx) = create_checkpoint_queue(100);
645 drop(sender);
646 }
647
648 #[tokio::test]
649 async fn test_checkpoint_sender_sync() {
650 let (sender, mut rx) = create_checkpoint_queue(10);
651
652 let handle = tokio::spawn(async move {
653 if let Some(request) = rx.recv().await {
654 assert!(request.is_sync());
655 if let Some(completion) = request.completion {
656 let _ = completion.send(Ok(()));
657 }
658 }
659 });
660
661 let update = create_test_update("op-1");
662 let result = sender.checkpoint_sync(update).await;
663 assert!(result.is_ok());
664
665 handle.await.unwrap();
666 }
667
668 #[tokio::test]
669 async fn test_checkpoint_sender_async() {
670 let (sender, mut rx) = create_checkpoint_queue(10);
671
672 let update = create_test_update("op-1");
673 let result = sender.checkpoint_async(update).await;
674 assert!(result.is_ok());
675
676 let request = rx.recv().await.unwrap();
677 assert!(!request.is_sync());
678 assert_eq!(request.operation.operation_id, "op-1");
679 }
680
681 #[tokio::test]
682 async fn test_checkpoint_sender_queue_closed() {
683 let (sender, rx) = create_checkpoint_queue(10);
684 drop(rx);
685
686 let update = create_test_update("op-1");
687 let result = sender.checkpoint_async(update).await;
688 assert!(result.is_err());
689 }
690
691 #[tokio::test]
692 async fn test_checkpoint_batcher_processes_batch() {
693 let client = Arc::new(
694 MockDurableServiceClient::new()
695 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
696 );
697
698 let (sender, rx) = create_checkpoint_queue(10);
699 let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
700
701 let config = CheckpointBatcherConfig {
702 max_batch_time_ms: 10,
703 ..Default::default()
704 };
705
706 let mut batcher = CheckpointBatcher::new(
707 config,
708 rx,
709 client,
710 "arn:test".to_string(),
711 checkpoint_token.clone(),
712 );
713
714 let update = create_test_update("op-1");
715 let (request, completion_rx) = CheckpointRequest::sync(update);
716 sender.tx.send(request).await.unwrap();
717
718 drop(sender);
719 batcher.run().await;
720
721 let result = completion_rx.await.unwrap();
722 assert!(result.is_ok());
723
724 let token = checkpoint_token.read().await;
725 assert_eq!(*token, "new-token");
726 }
727
728 #[tokio::test]
729 async fn test_checkpoint_batcher_handles_error() {
730 let client = Arc::new(
731 MockDurableServiceClient::new()
732 .with_checkpoint_response(Err(DurableError::checkpoint_retriable("Test error"))),
733 );
734
735 let (sender, rx) = create_checkpoint_queue(10);
736 let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
737
738 let config = CheckpointBatcherConfig {
739 max_batch_time_ms: 10,
740 ..Default::default()
741 };
742
743 let mut batcher = CheckpointBatcher::new(
744 config,
745 rx,
746 client,
747 "arn:test".to_string(),
748 checkpoint_token.clone(),
749 );
750
751 let update = create_test_update("op-1");
752 let (request, completion_rx) = CheckpointRequest::sync(update);
753 sender.tx.send(request).await.unwrap();
754
755 drop(sender);
756 batcher.run().await;
757
758 let result = completion_rx.await.unwrap();
759 assert!(result.is_err());
760
761 let token = checkpoint_token.read().await;
762 assert_eq!(*token, "initial-token");
763 }
764
765 #[tokio::test]
766 async fn test_checkpoint_batcher_batches_multiple_requests() {
767 let client = Arc::new(
768 MockDurableServiceClient::new()
769 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
770 );
771
772 let (sender, rx) = create_checkpoint_queue(10);
773 let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
774
775 let config = CheckpointBatcherConfig {
776 max_batch_time_ms: 50,
777 max_batch_operations: 3,
778 ..Default::default()
779 };
780
781 let mut batcher = CheckpointBatcher::new(
782 config,
783 rx,
784 client,
785 "arn:test".to_string(),
786 checkpoint_token.clone(),
787 );
788
789 for i in 0..3 {
790 let update = create_test_update(&format!("op-{}", i));
791 sender
792 .tx
793 .send(CheckpointRequest::async_request(update))
794 .await
795 .unwrap();
796 }
797
798 drop(sender);
799 batcher.run().await;
800
801 let token = checkpoint_token.read().await;
802 assert_eq!(*token, "new-token");
803 }
804
805 #[test]
807 fn test_execution_completion_is_last() {
808 let batch = vec![
809 create_request(create_succeed_update("exec-1", OperationType::Execution)),
810 create_request(create_start_update("step-1", OperationType::Step)),
811 create_request(create_succeed_update("step-2", OperationType::Step)),
812 ];
813
814 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
815
816 assert_eq!(sorted.len(), 3);
817 assert_eq!(sorted[2].operation.operation_id, "exec-1");
818 assert_eq!(sorted[2].operation.action, OperationAction::Succeed);
819 assert_eq!(sorted[2].operation.operation_type, OperationType::Execution);
820 }
821
822 #[test]
823 fn test_execution_fail_is_last() {
824 let batch = vec![
825 create_request(create_fail_update("exec-1", OperationType::Execution)),
826 create_request(create_start_update("step-1", OperationType::Step)),
827 ];
828
829 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
830
831 assert_eq!(sorted.len(), 2);
832 assert_eq!(sorted[1].operation.operation_id, "exec-1");
833 assert_eq!(sorted[1].operation.action, OperationAction::Fail);
834 }
835
836 #[test]
837 fn test_child_after_parent_context_start() {
838 let mut child_update = create_start_update("child-1", OperationType::Step);
839 child_update.parent_id = Some("parent-ctx".to_string());
840
841 let batch = vec![
842 create_request(child_update),
843 create_request(create_start_update("parent-ctx", OperationType::Context)),
844 ];
845
846 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
847
848 assert_eq!(sorted.len(), 2);
849 assert_eq!(sorted[0].operation.operation_id, "parent-ctx");
850 assert_eq!(sorted[0].operation.action, OperationAction::Start);
851 assert_eq!(sorted[1].operation.operation_id, "child-1");
852 }
853
854 #[test]
855 fn test_start_and_completion_same_operation() {
856 let batch = vec![
857 create_request(create_succeed_update("step-1", OperationType::Step)),
858 create_request(create_start_update("step-1", OperationType::Step)),
859 ];
860
861 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
862
863 assert_eq!(sorted.len(), 2);
864 assert_eq!(sorted[0].operation.operation_id, "step-1");
865 assert_eq!(sorted[0].operation.action, OperationAction::Start);
866 assert_eq!(sorted[1].operation.operation_id, "step-1");
867 assert_eq!(sorted[1].operation.action, OperationAction::Succeed);
868 }
869
870 #[test]
871 fn test_context_start_and_completion_same_batch() {
872 let batch = vec![
873 create_request(create_succeed_update("ctx-1", OperationType::Context)),
874 create_request(create_start_update("ctx-1", OperationType::Context)),
875 ];
876
877 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
878
879 assert_eq!(sorted.len(), 2);
880 assert_eq!(sorted[0].operation.operation_id, "ctx-1");
881 assert_eq!(sorted[0].operation.action, OperationAction::Start);
882 assert_eq!(sorted[1].operation.operation_id, "ctx-1");
883 assert_eq!(sorted[1].operation.action, OperationAction::Succeed);
884 }
885
886 #[test]
887 fn test_step_start_and_fail_same_batch() {
888 let batch = vec![
889 create_request(create_fail_update("step-1", OperationType::Step)),
890 create_request(create_start_update("step-1", OperationType::Step)),
891 ];
892
893 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
894
895 assert_eq!(sorted.len(), 2);
896 assert_eq!(sorted[0].operation.operation_id, "step-1");
897 assert_eq!(sorted[0].operation.action, OperationAction::Start);
898 assert_eq!(sorted[1].operation.operation_id, "step-1");
899 assert_eq!(sorted[1].operation.action, OperationAction::Fail);
900 }
901
902 #[test]
903 fn test_complex_ordering_scenario() {
904 let mut child1 = create_start_update("child-1", OperationType::Step);
905 child1.parent_id = Some("parent-ctx".to_string());
906
907 let mut child2 = create_succeed_update("child-2", OperationType::Step);
908 child2.parent_id = Some("parent-ctx".to_string());
909
910 let batch = vec![
911 create_request(create_succeed_update("exec-1", OperationType::Execution)),
912 create_request(child1),
913 create_request(create_succeed_update("parent-ctx", OperationType::Context)),
914 create_request(create_start_update("parent-ctx", OperationType::Context)),
915 create_request(child2),
916 ];
917
918 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
919
920 assert_eq!(sorted.len(), 5);
921
922 let parent_start_pos = sorted
923 .iter()
924 .position(|r| {
925 r.operation.operation_id == "parent-ctx"
926 && r.operation.action == OperationAction::Start
927 })
928 .unwrap();
929 let parent_succeed_pos = sorted
930 .iter()
931 .position(|r| {
932 r.operation.operation_id == "parent-ctx"
933 && r.operation.action == OperationAction::Succeed
934 })
935 .unwrap();
936 let child1_pos = sorted
937 .iter()
938 .position(|r| r.operation.operation_id == "child-1")
939 .unwrap();
940 let child2_pos = sorted
941 .iter()
942 .position(|r| r.operation.operation_id == "child-2")
943 .unwrap();
944 let exec_pos = sorted
945 .iter()
946 .position(|r| r.operation.operation_id == "exec-1")
947 .unwrap();
948
949 assert!(parent_start_pos < parent_succeed_pos);
950 assert!(parent_start_pos < child1_pos);
951 assert!(parent_start_pos < child2_pos);
952 assert_eq!(exec_pos, 4);
953 }
954
955 #[test]
956 fn test_empty_batch() {
957 let batch: Vec<CheckpointRequest> = vec![];
958 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
959 assert!(sorted.is_empty());
960 }
961
962 #[test]
963 fn test_single_item_batch() {
964 let batch = vec![create_request(create_start_update(
965 "step-1",
966 OperationType::Step,
967 ))];
968 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
969 assert_eq!(sorted.len(), 1);
970 assert_eq!(sorted[0].operation.operation_id, "step-1");
971 }
972
973 #[test]
974 fn test_preserves_original_order() {
975 let batch = vec![
976 create_request(create_start_update("step-1", OperationType::Step)),
977 create_request(create_start_update("step-2", OperationType::Step)),
978 create_request(create_start_update("step-3", OperationType::Step)),
979 ];
980
981 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
982
983 assert_eq!(sorted.len(), 3);
984 assert_eq!(sorted[0].operation.operation_id, "step-1");
985 assert_eq!(sorted[1].operation.operation_id, "step-2");
986 assert_eq!(sorted[2].operation.operation_id, "step-3");
987 }
988
989 #[test]
990 fn test_multiple_children_same_parent() {
991 let mut child1 = create_start_update("child-1", OperationType::Step);
992 child1.parent_id = Some("parent-ctx".to_string());
993
994 let mut child2 = create_start_update("child-2", OperationType::Step);
995 child2.parent_id = Some("parent-ctx".to_string());
996
997 let mut child3 = create_start_update("child-3", OperationType::Step);
998 child3.parent_id = Some("parent-ctx".to_string());
999
1000 let batch = vec![
1001 create_request(child1),
1002 create_request(child2),
1003 create_request(create_start_update("parent-ctx", OperationType::Context)),
1004 create_request(child3),
1005 ];
1006
1007 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1008
1009 assert_eq!(sorted[0].operation.operation_id, "parent-ctx");
1010
1011 for i in 1..4 {
1012 assert!(sorted[i].operation.parent_id.as_deref() == Some("parent-ctx"));
1013 }
1014 }
1015
1016 #[test]
1017 fn test_nested_contexts() {
1018 let mut parent_ctx = create_start_update("parent-ctx", OperationType::Context);
1019 parent_ctx.parent_id = Some("grandparent-ctx".to_string());
1020
1021 let mut child = create_start_update("child-1", OperationType::Step);
1022 child.parent_id = Some("parent-ctx".to_string());
1023
1024 let batch = vec![
1025 create_request(child),
1026 create_request(parent_ctx),
1027 create_request(create_start_update(
1028 "grandparent-ctx",
1029 OperationType::Context,
1030 )),
1031 ];
1032
1033 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1034
1035 let grandparent_pos = sorted
1036 .iter()
1037 .position(|r| r.operation.operation_id == "grandparent-ctx")
1038 .unwrap();
1039 let parent_pos = sorted
1040 .iter()
1041 .position(|r| r.operation.operation_id == "parent-ctx")
1042 .unwrap();
1043 let child_pos = sorted
1044 .iter()
1045 .position(|r| r.operation.operation_id == "child-1")
1046 .unwrap();
1047
1048 assert!(grandparent_pos < parent_pos);
1049 assert!(parent_pos < child_pos);
1050 }
1051
1052 #[test]
1053 fn test_execution_start_not_affected() {
1054 let batch = vec![
1055 create_request(create_start_update("step-1", OperationType::Step)),
1056 create_request(create_start_update("exec-1", OperationType::Execution)),
1057 create_request(create_start_update("step-2", OperationType::Step)),
1058 ];
1059
1060 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1061
1062 assert_eq!(sorted.len(), 3);
1063 assert_eq!(sorted[0].operation.operation_id, "step-1");
1064 assert_eq!(sorted[1].operation.operation_id, "exec-1");
1065 assert_eq!(sorted[2].operation.operation_id, "step-2");
1066 }
1067}
1068
1069#[cfg(test)]
1070mod property_tests {
1071 use super::*;
1072 use crate::client::{CheckpointResponse, DurableServiceClient, GetOperationsResponse};
1073 use async_trait::async_trait;
1074 use proptest::prelude::*;
1075 use std::collections::HashSet;
1076 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
1077
1078 struct CountingMockClient {
1079 checkpoint_count: Arc<AtomicUsize>,
1080 }
1081
1082 impl CountingMockClient {
1083 fn new() -> (Self, Arc<AtomicUsize>) {
1084 let count = Arc::new(AtomicUsize::new(0));
1085 (
1086 Self {
1087 checkpoint_count: count.clone(),
1088 },
1089 count,
1090 )
1091 }
1092 }
1093
1094 #[async_trait]
1095 impl DurableServiceClient for CountingMockClient {
1096 async fn checkpoint(
1097 &self,
1098 _durable_execution_arn: &str,
1099 _checkpoint_token: &str,
1100 _operations: Vec<OperationUpdate>,
1101 ) -> Result<CheckpointResponse, DurableError> {
1102 self.checkpoint_count.fetch_add(1, AtomicOrdering::SeqCst);
1103 Ok(CheckpointResponse::new(format!(
1104 "token-{}",
1105 self.checkpoint_count.load(AtomicOrdering::SeqCst)
1106 )))
1107 }
1108
1109 async fn get_operations(
1110 &self,
1111 _durable_execution_arn: &str,
1112 _next_marker: &str,
1113 ) -> Result<GetOperationsResponse, DurableError> {
1114 Ok(GetOperationsResponse {
1115 operations: vec![],
1116 next_marker: None,
1117 })
1118 }
1119 }
1120
1121 fn create_test_update_with_size(id: &str, result_size: usize) -> OperationUpdate {
1122 let mut update = OperationUpdate::start(id, OperationType::Step);
1123 if result_size > 0 {
1124 update.result = Some("x".repeat(result_size));
1125 }
1126 update
1127 }
1128
1129 proptest! {
1130 #![proptest_config(ProptestConfig::with_cases(100))]
1131
1132 #[test]
1133 fn prop_checkpoint_batching_respects_operation_count_limit(
1134 num_requests in 1usize..20,
1135 max_ops_per_batch in 1usize..10,
1136 ) {
1137 let rt = tokio::runtime::Runtime::new().unwrap();
1138 let result: Result<(), TestCaseError> = rt.block_on(async {
1139 let (client, call_count) = CountingMockClient::new();
1140 let client = Arc::new(client);
1141
1142 let (sender, rx) = create_checkpoint_queue(100);
1143 let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
1144
1145 let config = CheckpointBatcherConfig {
1146 max_batch_time_ms: 10,
1147 max_batch_operations: max_ops_per_batch,
1148 max_batch_size_bytes: usize::MAX,
1149 };
1150
1151 let mut batcher = CheckpointBatcher::new(
1152 config,
1153 rx,
1154 client,
1155 "arn:test".to_string(),
1156 checkpoint_token.clone(),
1157 );
1158
1159 for i in 0..num_requests {
1160 let update = create_test_update_with_size(&format!("op-{}", i), 0);
1161 sender.tx.send(CheckpointRequest::async_request(update)).await.unwrap();
1162 }
1163
1164 drop(sender);
1165 batcher.run().await;
1166
1167 let expected_max_calls = (num_requests + max_ops_per_batch - 1) / max_ops_per_batch;
1168 let actual_calls = call_count.load(AtomicOrdering::SeqCst);
1169
1170 if actual_calls > expected_max_calls {
1171 return Err(TestCaseError::fail(format!(
1172 "Expected at most {} API calls for {} requests with batch size {}, got {}",
1173 expected_max_calls, num_requests, max_ops_per_batch, actual_calls
1174 )));
1175 }
1176
1177 if actual_calls < 1 {
1178 return Err(TestCaseError::fail(format!(
1179 "Expected at least 1 API call for {} requests, got {}",
1180 num_requests, actual_calls
1181 )));
1182 }
1183
1184 Ok(())
1185 });
1186 result?;
1187 }
1188
1189 #[test]
1190 fn prop_checkpoint_batching_respects_size_limit(
1191 num_requests in 1usize..10,
1192 result_size in 100usize..500,
1193 max_batch_size in 500usize..2000,
1194 ) {
1195 let rt = tokio::runtime::Runtime::new().unwrap();
1196 let result: Result<(), TestCaseError> = rt.block_on(async {
1197 let (client, call_count) = CountingMockClient::new();
1198 let client = Arc::new(client);
1199
1200 let (sender, rx) = create_checkpoint_queue(100);
1201 let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
1202
1203 let config = CheckpointBatcherConfig {
1204 max_batch_time_ms: 10,
1205 max_batch_operations: usize::MAX,
1206 max_batch_size_bytes: max_batch_size,
1207 };
1208
1209 let mut batcher = CheckpointBatcher::new(
1210 config,
1211 rx,
1212 client,
1213 "arn:test".to_string(),
1214 checkpoint_token.clone(),
1215 );
1216
1217 for i in 0..num_requests {
1218 let update = create_test_update_with_size(&format!("op-{}", i), result_size);
1219 sender.tx.send(CheckpointRequest::async_request(update)).await.unwrap();
1220 }
1221
1222 drop(sender);
1223 batcher.run().await;
1224
1225 let estimated_request_size = 100 + result_size;
1226 let total_size = num_requests * estimated_request_size;
1227 let expected_max_calls = (total_size + max_batch_size - 1) / max_batch_size;
1228 let actual_calls = call_count.load(AtomicOrdering::SeqCst);
1229
1230 if actual_calls > expected_max_calls.max(1) * 2 {
1231 return Err(TestCaseError::fail(format!(
1232 "Expected at most ~{} API calls for {} requests of size {}, got {}",
1233 expected_max_calls, num_requests, estimated_request_size, actual_calls
1234 )));
1235 }
1236
1237 if actual_calls < 1 {
1238 return Err(TestCaseError::fail(format!(
1239 "Expected at least 1 API call, got {}",
1240 actual_calls
1241 )));
1242 }
1243
1244 Ok(())
1245 });
1246 result?;
1247 }
1248
1249 #[test]
1250 fn prop_all_requests_are_processed(
1251 num_requests in 1usize..20,
1252 ) {
1253 let rt = tokio::runtime::Runtime::new().unwrap();
1254 let result: Result<(), TestCaseError> = rt.block_on(async {
1255 let (client, _call_count) = CountingMockClient::new();
1256 let client = Arc::new(client);
1257
1258 let (sender, rx) = create_checkpoint_queue(100);
1259 let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
1260
1261 let config = CheckpointBatcherConfig {
1262 max_batch_time_ms: 10,
1263 max_batch_operations: 5,
1264 ..Default::default()
1265 };
1266
1267 let mut batcher = CheckpointBatcher::new(
1268 config,
1269 rx,
1270 client,
1271 "arn:test".to_string(),
1272 checkpoint_token.clone(),
1273 );
1274
1275 let mut receivers = Vec::new();
1276
1277 for i in 0..num_requests {
1278 let update = create_test_update_with_size(&format!("op-{}", i), 0);
1279 let (request, rx) = CheckpointRequest::sync(update);
1280 sender.tx.send(request).await.unwrap();
1281 receivers.push(rx);
1282 }
1283
1284 drop(sender);
1285 batcher.run().await;
1286
1287 let mut success_count = 0;
1288 for rx in receivers {
1289 if let Ok(result) = rx.await {
1290 if result.is_ok() {
1291 success_count += 1;
1292 }
1293 }
1294 }
1295
1296 if success_count != num_requests {
1297 return Err(TestCaseError::fail(format!(
1298 "Expected all {} requests to succeed, got {}",
1299 num_requests, success_count
1300 )));
1301 }
1302
1303 Ok(())
1304 });
1305 result?;
1306 }
1307 }
1308
1309 fn operation_id_strategy() -> impl Strategy<Value = String> {
1316 "[a-z]{1,8}-[0-9]{1,4}".prop_map(|s| s)
1317 }
1318
1319 fn operation_type_strategy() -> impl Strategy<Value = OperationType> {
1321 prop_oneof![
1322 Just(OperationType::Execution),
1323 Just(OperationType::Step),
1324 Just(OperationType::Wait),
1325 Just(OperationType::Callback),
1326 Just(OperationType::Invoke),
1327 Just(OperationType::Context),
1328 ]
1329 }
1330
1331 fn operation_action_strategy() -> impl Strategy<Value = OperationAction> {
1333 prop_oneof![
1334 Just(OperationAction::Start),
1335 Just(OperationAction::Succeed),
1336 Just(OperationAction::Fail),
1337 ]
1338 }
1339
1340 fn create_checkpoint_request(
1342 id: &str,
1343 op_type: OperationType,
1344 action: OperationAction,
1345 parent_id: Option<String>,
1346 ) -> CheckpointRequest {
1347 let mut update = match action {
1348 OperationAction::Start => OperationUpdate::start(id, op_type),
1349 OperationAction::Succeed => {
1350 OperationUpdate::succeed(id, op_type, Some("result".to_string()))
1351 }
1352 OperationAction::Fail => OperationUpdate::fail(
1353 id,
1354 op_type,
1355 crate::error::ErrorObject::new("Error", "message"),
1356 ),
1357 _ => OperationUpdate::start(id, op_type),
1358 };
1359 update.parent_id = parent_id;
1360 CheckpointRequest::async_request(update)
1361 }
1362
1363 fn parent_child_batch_strategy() -> impl Strategy<Value = Vec<CheckpointRequest>> {
1366 (
1367 operation_id_strategy(), prop::collection::vec(operation_id_strategy(), 1..5), prop::collection::vec(operation_type_strategy(), 1..5), )
1371 .prop_map(|(parent_id, child_ids, child_types)| {
1372 let mut batch = Vec::new();
1373
1374 for (child_id, child_type) in child_ids.iter().zip(child_types.iter()) {
1376 if *child_type != OperationType::Execution {
1378 batch.push(create_checkpoint_request(
1379 child_id,
1380 *child_type,
1381 OperationAction::Start,
1382 Some(parent_id.clone()),
1383 ));
1384 }
1385 }
1386
1387 batch.push(create_checkpoint_request(
1389 &parent_id,
1390 OperationType::Context,
1391 OperationAction::Start,
1392 None,
1393 ));
1394
1395 batch
1396 })
1397 }
1398
1399 fn execution_completion_batch_strategy() -> impl Strategy<Value = Vec<CheckpointRequest>> {
1402 (
1403 operation_id_strategy(), prop::collection::vec((operation_id_strategy(), operation_type_strategy()), 1..5), prop::bool::ANY, )
1407 .prop_map(|(exec_id, other_ops, succeed)| {
1408 let mut batch = Vec::new();
1409
1410 let exec_action = if succeed {
1412 OperationAction::Succeed
1413 } else {
1414 OperationAction::Fail
1415 };
1416 batch.push(create_checkpoint_request(
1417 &exec_id,
1418 OperationType::Execution,
1419 exec_action,
1420 None,
1421 ));
1422
1423 for (op_id, op_type) in other_ops {
1425 if op_type != OperationType::Execution {
1427 batch.push(create_checkpoint_request(
1428 &op_id,
1429 op_type,
1430 OperationAction::Start,
1431 None,
1432 ));
1433 }
1434 }
1435
1436 batch
1437 })
1438 }
1439
1440 fn same_operation_batch_strategy() -> impl Strategy<Value = Vec<CheckpointRequest>> {
1443 (
1444 operation_id_strategy(),
1445 prop_oneof![Just(OperationType::Step), Just(OperationType::Context),],
1446 prop::bool::ANY, )
1448 .prop_map(|(op_id, op_type, succeed)| {
1449 let mut batch = Vec::new();
1450
1451 let completion_action = if succeed {
1453 OperationAction::Succeed
1454 } else {
1455 OperationAction::Fail
1456 };
1457 batch.push(create_checkpoint_request(
1458 &op_id,
1459 op_type,
1460 completion_action,
1461 None,
1462 ));
1463
1464 batch.push(create_checkpoint_request(
1466 &op_id,
1467 op_type,
1468 OperationAction::Start,
1469 None,
1470 ));
1471
1472 batch
1473 })
1474 }
1475
1476 proptest! {
1477 #![proptest_config(ProptestConfig::with_cases(100))]
1478
1479 #[test]
1484 fn prop_parent_context_start_before_children(batch in parent_child_batch_strategy()) {
1485 if batch.is_empty() {
1487 return Ok(());
1488 }
1489
1490 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1491
1492 let parent_pos = sorted.iter().position(|r| {
1494 r.operation.operation_type == OperationType::Context
1495 && r.operation.action == OperationAction::Start
1496 && r.operation.parent_id.is_none()
1497 });
1498
1499 if let Some(parent_idx) = parent_pos {
1501 let parent_id = &sorted[parent_idx].operation.operation_id;
1502
1503 for (idx, req) in sorted.iter().enumerate() {
1504 if let Some(ref req_parent_id) = req.operation.parent_id {
1505 if req_parent_id == parent_id {
1506 prop_assert!(
1507 idx > parent_idx,
1508 "Child operation {} at index {} should come after parent {} at index {}",
1509 req.operation.operation_id,
1510 idx,
1511 parent_id,
1512 parent_idx
1513 );
1514 }
1515 }
1516 }
1517 }
1518 }
1519
1520 #[test]
1525 fn prop_execution_completion_is_last(batch in execution_completion_batch_strategy()) {
1526 if batch.is_empty() {
1528 return Ok(());
1529 }
1530
1531 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1532
1533 let exec_completion_pos = sorted.iter().position(|r| {
1535 r.operation.operation_type == OperationType::Execution
1536 && matches!(r.operation.action, OperationAction::Succeed | OperationAction::Fail)
1537 });
1538
1539 if let Some(exec_idx) = exec_completion_pos {
1541 prop_assert_eq!(
1542 exec_idx,
1543 sorted.len() - 1,
1544 "EXECUTION completion should be at index {} (last), but found at index {}",
1545 sorted.len() - 1,
1546 exec_idx
1547 );
1548 }
1549 }
1550
1551 #[test]
1557 fn prop_operation_id_uniqueness_with_start_completion_exception(
1558 batch in same_operation_batch_strategy()
1559 ) {
1560 if batch.is_empty() {
1562 return Ok(());
1563 }
1564
1565 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1566
1567 let mut seen: std::collections::HashMap<String, Vec<OperationAction>> = std::collections::HashMap::new();
1569
1570 for req in &sorted {
1571 seen.entry(req.operation.operation_id.clone())
1572 .or_default()
1573 .push(req.operation.action);
1574 }
1575
1576 for (op_id, actions) in &seen {
1578 if actions.len() > 1 {
1579 let has_start = actions.contains(&OperationAction::Start);
1581 let has_completion = actions.iter().any(|a| {
1582 matches!(a, OperationAction::Succeed | OperationAction::Fail | OperationAction::Retry)
1583 });
1584
1585 prop_assert!(
1586 has_start && has_completion && actions.len() == 2,
1587 "Operation {} appears {} times with actions {:?}. \
1588 Multiple occurrences only allowed for START + completion pair.",
1589 op_id,
1590 actions.len(),
1591 actions
1592 );
1593 }
1594 }
1595 }
1596
1597 #[test]
1601 fn prop_start_before_completion_for_same_operation(
1602 batch in same_operation_batch_strategy()
1603 ) {
1604 if batch.is_empty() {
1606 return Ok(());
1607 }
1608
1609 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1610
1611 let mut start_positions: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
1613 let mut completion_positions: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
1614
1615 for (idx, req) in sorted.iter().enumerate() {
1616 let op_id = &req.operation.operation_id;
1617 match req.operation.action {
1618 OperationAction::Start => {
1619 start_positions.insert(op_id.clone(), idx);
1620 }
1621 OperationAction::Succeed | OperationAction::Fail | OperationAction::Retry => {
1622 completion_positions.insert(op_id.clone(), idx);
1623 }
1624 _ => {}
1625 }
1626 }
1627
1628 for (op_id, start_idx) in &start_positions {
1630 if let Some(completion_idx) = completion_positions.get(op_id) {
1631 prop_assert!(
1632 start_idx < completion_idx,
1633 "For operation {}, START at index {} should come before completion at index {}",
1634 op_id,
1635 start_idx,
1636 completion_idx
1637 );
1638 }
1639 }
1640 }
1641 }
1642
1643 fn random_batch_strategy() -> impl Strategy<Value = Vec<CheckpointRequest>> {
1650 prop::collection::vec(
1651 (
1652 operation_id_strategy(),
1653 operation_type_strategy(),
1654 operation_action_strategy(),
1655 ),
1656 1..10,
1657 )
1658 .prop_map(|ops| {
1659 ops.into_iter()
1660 .filter(|(_, op_type, _)| *op_type != OperationType::Execution) .map(|(id, op_type, action)| create_checkpoint_request(&id, op_type, action, None))
1662 .collect()
1663 })
1664 }
1665
1666 proptest! {
1667 #![proptest_config(ProptestConfig::with_cases(100))]
1668
1669 #[test]
1674 fn prop_batch_sorting_preserves_all_operations(batch in random_batch_strategy()) {
1675 if batch.is_empty() {
1677 return Ok(());
1678 }
1679
1680 let original_ops: HashSet<(String, String)> = batch.iter()
1682 .map(|r| (r.operation.operation_id.clone(), format!("{:?}", r.operation.action)))
1683 .collect();
1684
1685 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1686
1687 let sorted_ops: HashSet<(String, String)> = sorted.iter()
1689 .map(|r| (r.operation.operation_id.clone(), format!("{:?}", r.operation.action)))
1690 .collect();
1691
1692 prop_assert_eq!(
1694 original_ops.len(),
1695 sorted_ops.len(),
1696 "Sorting changed the number of operations: original {}, sorted {}",
1697 original_ops.len(),
1698 sorted_ops.len()
1699 );
1700
1701 prop_assert_eq!(
1702 original_ops,
1703 sorted_ops,
1704 "Sorting changed the set of operations"
1705 );
1706 }
1707
1708 #[test]
1712 fn prop_sorting_is_idempotent(batch in random_batch_strategy()) {
1713 if batch.is_empty() {
1715 return Ok(());
1716 }
1717
1718 let sorted_once = CheckpointBatcher::sort_checkpoint_batch(batch);
1719
1720 let sorted_once_clone: Vec<CheckpointRequest> = sorted_once.iter()
1722 .map(|r| CheckpointRequest::async_request(r.operation.clone()))
1723 .collect();
1724
1725 let sorted_twice = CheckpointBatcher::sort_checkpoint_batch(sorted_once_clone);
1726
1727 prop_assert_eq!(
1729 sorted_once.len(),
1730 sorted_twice.len(),
1731 "Double sorting changed the number of operations"
1732 );
1733
1734 for (idx, (first, second)) in sorted_once.iter().zip(sorted_twice.iter()).enumerate() {
1735 prop_assert_eq!(
1736 &first.operation.operation_id,
1737 &second.operation.operation_id,
1738 "Operation ID mismatch at index {}: {} vs {}",
1739 idx,
1740 &first.operation.operation_id,
1741 &second.operation.operation_id
1742 );
1743 prop_assert_eq!(
1744 first.operation.action,
1745 second.operation.action,
1746 "Action mismatch at index {} for operation {}: {:?} vs {:?}",
1747 idx,
1748 &first.operation.operation_id,
1749 first.operation.action,
1750 second.operation.action
1751 );
1752 }
1753 }
1754 }
1755}