1use std::collections::{HashMap, HashSet};
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::Arc;
28use std::time::Duration as StdDuration;
29
30use tokio::sync::{mpsc, oneshot, RwLock};
31use tokio::time::{timeout, Instant};
32
33use crate::client::SharedDurableServiceClient;
34use crate::error::DurableError;
35use crate::operation::{OperationAction, OperationType, OperationUpdate};
36
37#[derive(Debug, Clone)]
39pub struct CheckpointBatcherConfig {
40 pub max_batch_size_bytes: usize,
42 pub max_batch_time_ms: u64,
44 pub max_batch_operations: usize,
46}
47
48impl Default for CheckpointBatcherConfig {
49 fn default() -> Self {
50 Self {
51 max_batch_size_bytes: 750 * 1024, max_batch_time_ms: 1000, max_batch_operations: usize::MAX, }
55 }
56}
57
58#[derive(Debug)]
63pub struct CheckpointRequest {
64 pub operation: OperationUpdate,
66 pub completion: Option<oneshot::Sender<Result<(), DurableError>>>,
68}
69
70impl CheckpointRequest {
71 pub fn sync(operation: OperationUpdate) -> (Self, oneshot::Receiver<Result<(), DurableError>>) {
75 let (tx, rx) = oneshot::channel();
76 (
77 Self {
78 operation,
79 completion: Some(tx),
80 },
81 rx,
82 )
83 }
84
85 pub fn async_request(operation: OperationUpdate) -> Self {
87 Self {
88 operation,
89 completion: None,
90 }
91 }
92
93 pub fn is_sync(&self) -> bool {
95 self.completion.is_some()
96 }
97
98 pub fn estimated_size(&self) -> usize {
100 let base_size = 100; let op = &self.operation;
104
105 base_size
106 + op.operation_id.len()
107 + op.result.as_ref().map(|r| r.len()).unwrap_or(0)
108 + op.error
109 .as_ref()
110 .map(|e| e.error_message.len() + e.error_type.len())
111 .unwrap_or(0)
112 + op.parent_id.as_ref().map(|p| p.len()).unwrap_or(0)
113 + op.name.as_ref().map(|n| n.len()).unwrap_or(0)
114 }
115}
116
117#[derive(Debug)]
119pub struct BatchResult {
120 pub success: bool,
122 pub new_token: Option<String>,
124 pub error: Option<DurableError>,
126}
127
128pub struct CheckpointBatcher {
136 config: CheckpointBatcherConfig,
138 queue_rx: mpsc::Receiver<CheckpointRequest>,
140 service_client: SharedDurableServiceClient,
142 durable_execution_arn: String,
144 checkpoint_token: Arc<RwLock<String>>,
146 initial_token_consumed: AtomicBool,
148}
149
150impl CheckpointBatcher {
151 pub fn new(
161 config: CheckpointBatcherConfig,
162 queue_rx: mpsc::Receiver<CheckpointRequest>,
163 service_client: SharedDurableServiceClient,
164 durable_execution_arn: String,
165 checkpoint_token: Arc<RwLock<String>>,
166 ) -> Self {
167 Self {
168 config,
169 queue_rx,
170 service_client,
171 durable_execution_arn,
172 checkpoint_token,
173 initial_token_consumed: AtomicBool::new(false),
174 }
175 }
176
177 pub async fn run(&mut self) {
181 loop {
182 let batch = self.collect_batch().await;
183 if batch.is_empty() {
184 break;
186 }
187 self.process_batch(batch).await;
188 }
189 }
190
191 async fn collect_batch(&mut self) -> Vec<CheckpointRequest> {
195 let mut batch = Vec::new();
196 let mut batch_size = 0usize;
197 let batch_deadline =
198 Instant::now() + StdDuration::from_millis(self.config.max_batch_time_ms);
199
200 match self.queue_rx.recv().await {
202 Some(request) => {
203 batch_size += request.estimated_size();
204 batch.push(request);
205 }
206 None => return batch, }
208
209 loop {
211 if batch.len() >= self.config.max_batch_operations {
213 break;
214 }
215
216 let now = Instant::now();
218 if now >= batch_deadline {
219 break;
220 }
221 let remaining = batch_deadline - now;
222
223 match timeout(remaining, self.queue_rx.recv()).await {
225 Ok(Some(request)) => {
226 let request_size = request.estimated_size();
227
228 if batch_size + request_size > self.config.max_batch_size_bytes
230 && !batch.is_empty()
231 {
232 batch.push(request);
234 break;
235 }
236
237 batch_size += request_size;
238 batch.push(request);
239 }
240 Ok(None) => break, Err(_) => break, }
243 }
244
245 batch
246 }
247
248 pub fn sort_checkpoint_batch(batch: Vec<CheckpointRequest>) -> Vec<CheckpointRequest> {
256 if batch.len() <= 1 {
257 return batch;
258 }
259
260 let context_starts: HashSet<String> = batch
262 .iter()
263 .filter(|req| {
264 req.operation.operation_type == OperationType::Context
265 && req.operation.action == OperationAction::Start
266 })
267 .map(|req| req.operation.operation_id.clone())
268 .collect();
269
270 let parent_map: HashMap<String, Option<String>> = batch
272 .iter()
273 .map(|req| {
274 (
275 req.operation.operation_id.clone(),
276 req.operation.parent_id.clone(),
277 )
278 })
279 .collect();
280
281 let is_ancestor = |operation_id: &str, ancestor_id: &str| -> bool {
283 let mut current = parent_map.get(operation_id).and_then(|p| p.as_ref());
284 while let Some(parent_id) = current {
285 if parent_id == ancestor_id {
286 return true;
287 }
288 current = parent_map.get(parent_id).and_then(|p| p.as_ref());
289 }
290 false
291 };
292
293 let mut indexed_batch: Vec<(usize, CheckpointRequest)> =
295 batch.into_iter().enumerate().collect();
296
297 indexed_batch.sort_by(|(idx_a, a), (idx_b, b)| {
298 let a_is_exec_completion = a.operation.operation_type == OperationType::Execution
300 && matches!(
301 a.operation.action,
302 OperationAction::Succeed | OperationAction::Fail
303 );
304 let b_is_exec_completion = b.operation.operation_type == OperationType::Execution
305 && matches!(
306 b.operation.action,
307 OperationAction::Succeed | OperationAction::Fail
308 );
309
310 if a_is_exec_completion && !b_is_exec_completion {
311 return std::cmp::Ordering::Greater;
312 }
313 if !a_is_exec_completion && b_is_exec_completion {
314 return std::cmp::Ordering::Less;
315 }
316
317 if b.operation.action == OperationAction::Start
319 && context_starts.contains(&b.operation.operation_id)
320 {
321 if let Some(ref parent_id) = a.operation.parent_id {
322 if *parent_id == b.operation.operation_id
323 || is_ancestor(&a.operation.operation_id, &b.operation.operation_id)
324 {
325 return std::cmp::Ordering::Greater;
326 }
327 }
328 }
329 if a.operation.action == OperationAction::Start
330 && context_starts.contains(&a.operation.operation_id)
331 {
332 if let Some(ref parent_id) = b.operation.parent_id {
333 if *parent_id == a.operation.operation_id
334 || is_ancestor(&b.operation.operation_id, &a.operation.operation_id)
335 {
336 return std::cmp::Ordering::Less;
337 }
338 }
339 }
340
341 if a.operation.operation_id == b.operation.operation_id {
343 let a_is_start = a.operation.action == OperationAction::Start;
344 let b_is_start = b.operation.action == OperationAction::Start;
345 if a_is_start && !b_is_start {
346 return std::cmp::Ordering::Less;
347 }
348 if !a_is_start && b_is_start {
349 return std::cmp::Ordering::Greater;
350 }
351 }
352
353 idx_a.cmp(idx_b)
355 });
356
357 indexed_batch.into_iter().map(|(_, req)| req).collect()
359 }
360
361 async fn process_batch(&self, batch: Vec<CheckpointRequest>) {
363 if batch.is_empty() {
364 return;
365 }
366
367 let sorted_batch = Self::sort_checkpoint_batch(batch);
369
370 let (operations, completions): (Vec<_>, Vec<_>) = sorted_batch
372 .into_iter()
373 .map(|req| (req.operation, req.completion))
374 .unzip();
375
376 let token = self.checkpoint_token.read().await.clone();
378
379 let result = self.checkpoint_with_retry(&token, operations).await;
381
382 match result {
384 Ok(response) => {
385 self.initial_token_consumed.store(true, Ordering::Release);
392
393 {
395 let mut token_guard = self.checkpoint_token.write().await;
396 *token_guard = response.checkpoint_token;
397 }
398
399 for completion in completions.into_iter().flatten() {
401 let _ = completion.send(Ok(()));
402 }
403 }
404 Err(error) => {
405 let is_invalid_token = error.is_invalid_checkpoint_token();
407
408 for completion in completions.into_iter().flatten() {
410 let error_msg = if is_invalid_token {
411 format!(
412 "Invalid checkpoint token - token may have been consumed. \
413 Lambda will retry with a fresh token. Original error: {}",
414 error
415 )
416 } else {
417 error.to_string()
418 };
419
420 let _ = completion.send(Err(DurableError::Checkpoint {
421 message: error_msg,
422 is_retriable: error.is_retriable(),
423 aws_error: None,
424 }));
425 }
426 }
427 }
428 }
429
430 async fn checkpoint_with_retry(
436 &self,
437 token: &str,
438 operations: Vec<OperationUpdate>,
439 ) -> Result<crate::client::CheckpointResponse, DurableError> {
440 const MAX_RETRIES: u32 = 5;
441 const INITIAL_DELAY_MS: u64 = 100;
442 const MAX_DELAY_MS: u64 = 10_000;
443 const BACKOFF_MULTIPLIER: u64 = 2;
444
445 let mut attempt = 0;
446 let mut delay_ms = INITIAL_DELAY_MS;
447
448 loop {
449 let result = self
450 .service_client
451 .checkpoint(&self.durable_execution_arn, token, operations.clone())
452 .await;
453
454 match result {
455 Ok(response) => return Ok(response),
456 Err(error) if error.is_throttling() => {
457 attempt += 1;
458 if attempt > MAX_RETRIES {
459 tracing::warn!(
460 attempt = attempt,
461 "Checkpoint throttling: max retries exceeded"
462 );
463 return Err(error);
464 }
465
466 let actual_delay = error.get_retry_after_ms().unwrap_or(delay_ms);
467
468 tracing::debug!(
469 attempt = attempt,
470 delay_ms = actual_delay,
471 "Checkpoint throttled, retrying with exponential backoff"
472 );
473
474 tokio::time::sleep(StdDuration::from_millis(actual_delay)).await;
475 delay_ms = (delay_ms * BACKOFF_MULTIPLIER).min(MAX_DELAY_MS);
476 }
477 Err(error) => return Err(error),
478 }
479 }
480 }
481}
482
483#[derive(Clone)]
485pub struct CheckpointSender {
486 pub tx: mpsc::Sender<CheckpointRequest>,
488}
489
490impl CheckpointSender {
491 pub fn new(tx: mpsc::Sender<CheckpointRequest>) -> Self {
493 Self { tx }
494 }
495
496 pub async fn checkpoint_sync(&self, operation: OperationUpdate) -> Result<(), DurableError> {
500 let (request, rx) = CheckpointRequest::sync(operation);
501
502 self.tx
503 .send(request)
504 .await
505 .map_err(|_| DurableError::Checkpoint {
506 message: "Checkpoint queue closed".to_string(),
507 is_retriable: false,
508 aws_error: None,
509 })?;
510
511 rx.await.map_err(|_| DurableError::Checkpoint {
512 message: "Checkpoint completion channel closed".to_string(),
513 is_retriable: false,
514 aws_error: None,
515 })?
516 }
517
518 pub async fn checkpoint_async(&self, operation: OperationUpdate) -> Result<(), DurableError> {
522 let request = CheckpointRequest::async_request(operation);
523
524 self.tx
525 .send(request)
526 .await
527 .map_err(|_| DurableError::Checkpoint {
528 message: "Checkpoint queue closed".to_string(),
529 is_retriable: false,
530 aws_error: None,
531 })
532 }
533
534 pub async fn checkpoint(
536 &self,
537 operation: OperationUpdate,
538 is_sync: bool,
539 ) -> Result<(), DurableError> {
540 if is_sync {
541 self.checkpoint_sync(operation).await
542 } else {
543 self.checkpoint_async(operation).await
544 }
545 }
546}
547
548pub fn create_checkpoint_queue(
553 buffer_size: usize,
554) -> (CheckpointSender, mpsc::Receiver<CheckpointRequest>) {
555 let (tx, rx) = mpsc::channel(buffer_size);
556 (CheckpointSender::new(tx), rx)
557}
558
559#[cfg(test)]
560mod tests {
561 use super::*;
562 use crate::client::{CheckpointResponse, MockDurableServiceClient};
563 use crate::error::ErrorObject;
564
565 fn create_test_update(id: &str) -> OperationUpdate {
566 OperationUpdate::start(id, OperationType::Step)
567 }
568
569 fn create_start_update(id: &str, op_type: OperationType) -> OperationUpdate {
570 OperationUpdate::start(id, op_type)
571 }
572
573 fn create_succeed_update(id: &str, op_type: OperationType) -> OperationUpdate {
574 OperationUpdate::succeed(id, op_type, Some("result".to_string()))
575 }
576
577 fn create_fail_update(id: &str, op_type: OperationType) -> OperationUpdate {
578 OperationUpdate::fail(id, op_type, ErrorObject::new("Error", "message"))
579 }
580
581 fn create_request(update: OperationUpdate) -> CheckpointRequest {
582 CheckpointRequest::async_request(update)
583 }
584
585 #[test]
587 fn test_checkpoint_request_sync() {
588 let update = create_test_update("op-1");
589 let (request, _rx) = CheckpointRequest::sync(update);
590
591 assert!(request.is_sync());
592 assert_eq!(request.operation.operation_id, "op-1");
593 }
594
595 #[test]
596 fn test_checkpoint_request_async() {
597 let update = create_test_update("op-1");
598 let request = CheckpointRequest::async_request(update);
599
600 assert!(!request.is_sync());
601 assert_eq!(request.operation.operation_id, "op-1");
602 }
603
604 #[test]
605 fn test_checkpoint_request_estimated_size() {
606 let update = create_test_update("op-1");
607 let request = CheckpointRequest::async_request(update);
608
609 let size = request.estimated_size();
610 assert!(size > 0);
611 assert!(size >= 100);
612 }
613
614 #[test]
615 fn test_checkpoint_request_estimated_size_with_result() {
616 let mut update = create_test_update("op-1");
617 update.result = Some("a".repeat(1000));
618 let request = CheckpointRequest::async_request(update);
619
620 let size = request.estimated_size();
621 assert!(size >= 1100);
622 }
623
624 #[test]
625 fn test_create_checkpoint_queue() {
626 let (sender, _rx) = create_checkpoint_queue(100);
627 drop(sender);
628 }
629
630 #[tokio::test]
631 async fn test_checkpoint_sender_sync() {
632 let (sender, mut rx) = create_checkpoint_queue(10);
633
634 let handle = tokio::spawn(async move {
635 if let Some(request) = rx.recv().await {
636 assert!(request.is_sync());
637 if let Some(completion) = request.completion {
638 let _ = completion.send(Ok(()));
639 }
640 }
641 });
642
643 let update = create_test_update("op-1");
644 let result = sender.checkpoint_sync(update).await;
645 assert!(result.is_ok());
646
647 handle.await.unwrap();
648 }
649
650 #[tokio::test]
651 async fn test_checkpoint_sender_async() {
652 let (sender, mut rx) = create_checkpoint_queue(10);
653
654 let update = create_test_update("op-1");
655 let result = sender.checkpoint_async(update).await;
656 assert!(result.is_ok());
657
658 let request = rx.recv().await.unwrap();
659 assert!(!request.is_sync());
660 assert_eq!(request.operation.operation_id, "op-1");
661 }
662
663 #[tokio::test]
664 async fn test_checkpoint_sender_queue_closed() {
665 let (sender, rx) = create_checkpoint_queue(10);
666 drop(rx);
667
668 let update = create_test_update("op-1");
669 let result = sender.checkpoint_async(update).await;
670 assert!(result.is_err());
671 }
672
673 #[tokio::test]
674 async fn test_checkpoint_batcher_processes_batch() {
675 let client = Arc::new(
676 MockDurableServiceClient::new()
677 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
678 );
679
680 let (sender, rx) = create_checkpoint_queue(10);
681 let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
682
683 let config = CheckpointBatcherConfig {
684 max_batch_time_ms: 10,
685 ..Default::default()
686 };
687
688 let mut batcher = CheckpointBatcher::new(
689 config,
690 rx,
691 client,
692 "arn:test".to_string(),
693 checkpoint_token.clone(),
694 );
695
696 let update = create_test_update("op-1");
697 let (request, completion_rx) = CheckpointRequest::sync(update);
698 sender.tx.send(request).await.unwrap();
699
700 drop(sender);
701 batcher.run().await;
702
703 let result = completion_rx.await.unwrap();
704 assert!(result.is_ok());
705
706 let token = checkpoint_token.read().await;
707 assert_eq!(*token, "new-token");
708 }
709
710 #[tokio::test]
711 async fn test_checkpoint_batcher_handles_error() {
712 let client = Arc::new(
713 MockDurableServiceClient::new()
714 .with_checkpoint_response(Err(DurableError::checkpoint_retriable("Test error"))),
715 );
716
717 let (sender, rx) = create_checkpoint_queue(10);
718 let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
719
720 let config = CheckpointBatcherConfig {
721 max_batch_time_ms: 10,
722 ..Default::default()
723 };
724
725 let mut batcher = CheckpointBatcher::new(
726 config,
727 rx,
728 client,
729 "arn:test".to_string(),
730 checkpoint_token.clone(),
731 );
732
733 let update = create_test_update("op-1");
734 let (request, completion_rx) = CheckpointRequest::sync(update);
735 sender.tx.send(request).await.unwrap();
736
737 drop(sender);
738 batcher.run().await;
739
740 let result = completion_rx.await.unwrap();
741 assert!(result.is_err());
742
743 let token = checkpoint_token.read().await;
744 assert_eq!(*token, "initial-token");
745 }
746
747 #[tokio::test]
748 async fn test_checkpoint_batcher_batches_multiple_requests() {
749 let client = Arc::new(
750 MockDurableServiceClient::new()
751 .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
752 );
753
754 let (sender, rx) = create_checkpoint_queue(10);
755 let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
756
757 let config = CheckpointBatcherConfig {
758 max_batch_time_ms: 50,
759 max_batch_operations: 3,
760 ..Default::default()
761 };
762
763 let mut batcher = CheckpointBatcher::new(
764 config,
765 rx,
766 client,
767 "arn:test".to_string(),
768 checkpoint_token.clone(),
769 );
770
771 for i in 0..3 {
772 let update = create_test_update(&format!("op-{}", i));
773 sender
774 .tx
775 .send(CheckpointRequest::async_request(update))
776 .await
777 .unwrap();
778 }
779
780 drop(sender);
781 batcher.run().await;
782
783 let token = checkpoint_token.read().await;
784 assert_eq!(*token, "new-token");
785 }
786
787 #[test]
789 fn test_execution_completion_is_last() {
790 let batch = vec![
791 create_request(create_succeed_update("exec-1", OperationType::Execution)),
792 create_request(create_start_update("step-1", OperationType::Step)),
793 create_request(create_succeed_update("step-2", OperationType::Step)),
794 ];
795
796 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
797
798 assert_eq!(sorted.len(), 3);
799 assert_eq!(sorted[2].operation.operation_id, "exec-1");
800 assert_eq!(sorted[2].operation.action, OperationAction::Succeed);
801 assert_eq!(sorted[2].operation.operation_type, OperationType::Execution);
802 }
803
804 #[test]
805 fn test_execution_fail_is_last() {
806 let batch = vec![
807 create_request(create_fail_update("exec-1", OperationType::Execution)),
808 create_request(create_start_update("step-1", OperationType::Step)),
809 ];
810
811 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
812
813 assert_eq!(sorted.len(), 2);
814 assert_eq!(sorted[1].operation.operation_id, "exec-1");
815 assert_eq!(sorted[1].operation.action, OperationAction::Fail);
816 }
817
818 #[test]
819 fn test_child_after_parent_context_start() {
820 let mut child_update = create_start_update("child-1", OperationType::Step);
821 child_update.parent_id = Some("parent-ctx".to_string());
822
823 let batch = vec![
824 create_request(child_update),
825 create_request(create_start_update("parent-ctx", OperationType::Context)),
826 ];
827
828 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
829
830 assert_eq!(sorted.len(), 2);
831 assert_eq!(sorted[0].operation.operation_id, "parent-ctx");
832 assert_eq!(sorted[0].operation.action, OperationAction::Start);
833 assert_eq!(sorted[1].operation.operation_id, "child-1");
834 }
835
836 #[test]
837 fn test_start_and_completion_same_operation() {
838 let batch = vec![
839 create_request(create_succeed_update("step-1", OperationType::Step)),
840 create_request(create_start_update("step-1", OperationType::Step)),
841 ];
842
843 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
844
845 assert_eq!(sorted.len(), 2);
846 assert_eq!(sorted[0].operation.operation_id, "step-1");
847 assert_eq!(sorted[0].operation.action, OperationAction::Start);
848 assert_eq!(sorted[1].operation.operation_id, "step-1");
849 assert_eq!(sorted[1].operation.action, OperationAction::Succeed);
850 }
851
852 #[test]
853 fn test_context_start_and_completion_same_batch() {
854 let batch = vec![
855 create_request(create_succeed_update("ctx-1", OperationType::Context)),
856 create_request(create_start_update("ctx-1", OperationType::Context)),
857 ];
858
859 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
860
861 assert_eq!(sorted.len(), 2);
862 assert_eq!(sorted[0].operation.operation_id, "ctx-1");
863 assert_eq!(sorted[0].operation.action, OperationAction::Start);
864 assert_eq!(sorted[1].operation.operation_id, "ctx-1");
865 assert_eq!(sorted[1].operation.action, OperationAction::Succeed);
866 }
867
868 #[test]
869 fn test_step_start_and_fail_same_batch() {
870 let batch = vec![
871 create_request(create_fail_update("step-1", OperationType::Step)),
872 create_request(create_start_update("step-1", OperationType::Step)),
873 ];
874
875 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
876
877 assert_eq!(sorted.len(), 2);
878 assert_eq!(sorted[0].operation.operation_id, "step-1");
879 assert_eq!(sorted[0].operation.action, OperationAction::Start);
880 assert_eq!(sorted[1].operation.operation_id, "step-1");
881 assert_eq!(sorted[1].operation.action, OperationAction::Fail);
882 }
883
884 #[test]
885 fn test_complex_ordering_scenario() {
886 let mut child1 = create_start_update("child-1", OperationType::Step);
887 child1.parent_id = Some("parent-ctx".to_string());
888
889 let mut child2 = create_succeed_update("child-2", OperationType::Step);
890 child2.parent_id = Some("parent-ctx".to_string());
891
892 let batch = vec![
893 create_request(create_succeed_update("exec-1", OperationType::Execution)),
894 create_request(child1),
895 create_request(create_succeed_update("parent-ctx", OperationType::Context)),
896 create_request(create_start_update("parent-ctx", OperationType::Context)),
897 create_request(child2),
898 ];
899
900 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
901
902 assert_eq!(sorted.len(), 5);
903
904 let parent_start_pos = sorted
905 .iter()
906 .position(|r| {
907 r.operation.operation_id == "parent-ctx"
908 && r.operation.action == OperationAction::Start
909 })
910 .unwrap();
911 let parent_succeed_pos = sorted
912 .iter()
913 .position(|r| {
914 r.operation.operation_id == "parent-ctx"
915 && r.operation.action == OperationAction::Succeed
916 })
917 .unwrap();
918 let child1_pos = sorted
919 .iter()
920 .position(|r| r.operation.operation_id == "child-1")
921 .unwrap();
922 let child2_pos = sorted
923 .iter()
924 .position(|r| r.operation.operation_id == "child-2")
925 .unwrap();
926 let exec_pos = sorted
927 .iter()
928 .position(|r| r.operation.operation_id == "exec-1")
929 .unwrap();
930
931 assert!(parent_start_pos < parent_succeed_pos);
932 assert!(parent_start_pos < child1_pos);
933 assert!(parent_start_pos < child2_pos);
934 assert_eq!(exec_pos, 4);
935 }
936
937 #[test]
938 fn test_empty_batch() {
939 let batch: Vec<CheckpointRequest> = vec![];
940 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
941 assert!(sorted.is_empty());
942 }
943
944 #[test]
945 fn test_single_item_batch() {
946 let batch = vec![create_request(create_start_update(
947 "step-1",
948 OperationType::Step,
949 ))];
950 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
951 assert_eq!(sorted.len(), 1);
952 assert_eq!(sorted[0].operation.operation_id, "step-1");
953 }
954
955 #[test]
956 fn test_preserves_original_order() {
957 let batch = vec![
958 create_request(create_start_update("step-1", OperationType::Step)),
959 create_request(create_start_update("step-2", OperationType::Step)),
960 create_request(create_start_update("step-3", OperationType::Step)),
961 ];
962
963 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
964
965 assert_eq!(sorted.len(), 3);
966 assert_eq!(sorted[0].operation.operation_id, "step-1");
967 assert_eq!(sorted[1].operation.operation_id, "step-2");
968 assert_eq!(sorted[2].operation.operation_id, "step-3");
969 }
970
971 #[test]
972 fn test_multiple_children_same_parent() {
973 let mut child1 = create_start_update("child-1", OperationType::Step);
974 child1.parent_id = Some("parent-ctx".to_string());
975
976 let mut child2 = create_start_update("child-2", OperationType::Step);
977 child2.parent_id = Some("parent-ctx".to_string());
978
979 let mut child3 = create_start_update("child-3", OperationType::Step);
980 child3.parent_id = Some("parent-ctx".to_string());
981
982 let batch = vec![
983 create_request(child1),
984 create_request(child2),
985 create_request(create_start_update("parent-ctx", OperationType::Context)),
986 create_request(child3),
987 ];
988
989 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
990
991 assert_eq!(sorted[0].operation.operation_id, "parent-ctx");
992
993 for i in 1..4 {
994 assert!(sorted[i].operation.parent_id.as_deref() == Some("parent-ctx"));
995 }
996 }
997
998 #[test]
999 fn test_nested_contexts() {
1000 let mut parent_ctx = create_start_update("parent-ctx", OperationType::Context);
1001 parent_ctx.parent_id = Some("grandparent-ctx".to_string());
1002
1003 let mut child = create_start_update("child-1", OperationType::Step);
1004 child.parent_id = Some("parent-ctx".to_string());
1005
1006 let batch = vec![
1007 create_request(child),
1008 create_request(parent_ctx),
1009 create_request(create_start_update(
1010 "grandparent-ctx",
1011 OperationType::Context,
1012 )),
1013 ];
1014
1015 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1016
1017 let grandparent_pos = sorted
1018 .iter()
1019 .position(|r| r.operation.operation_id == "grandparent-ctx")
1020 .unwrap();
1021 let parent_pos = sorted
1022 .iter()
1023 .position(|r| r.operation.operation_id == "parent-ctx")
1024 .unwrap();
1025 let child_pos = sorted
1026 .iter()
1027 .position(|r| r.operation.operation_id == "child-1")
1028 .unwrap();
1029
1030 assert!(grandparent_pos < parent_pos);
1031 assert!(parent_pos < child_pos);
1032 }
1033
1034 #[test]
1035 fn test_execution_start_not_affected() {
1036 let batch = vec![
1037 create_request(create_start_update("step-1", OperationType::Step)),
1038 create_request(create_start_update("exec-1", OperationType::Execution)),
1039 create_request(create_start_update("step-2", OperationType::Step)),
1040 ];
1041
1042 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1043
1044 assert_eq!(sorted.len(), 3);
1045 assert_eq!(sorted[0].operation.operation_id, "step-1");
1046 assert_eq!(sorted[1].operation.operation_id, "exec-1");
1047 assert_eq!(sorted[2].operation.operation_id, "step-2");
1048 }
1049}
1050
1051#[cfg(test)]
1052mod property_tests {
1053 use super::*;
1054 use crate::client::{CheckpointResponse, DurableServiceClient, GetOperationsResponse};
1055 use async_trait::async_trait;
1056 use proptest::prelude::*;
1057 use std::collections::HashSet;
1058 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
1059
1060 struct CountingMockClient {
1061 checkpoint_count: Arc<AtomicUsize>,
1062 }
1063
1064 impl CountingMockClient {
1065 fn new() -> (Self, Arc<AtomicUsize>) {
1066 let count = Arc::new(AtomicUsize::new(0));
1067 (
1068 Self {
1069 checkpoint_count: count.clone(),
1070 },
1071 count,
1072 )
1073 }
1074 }
1075
1076 #[async_trait]
1077 impl DurableServiceClient for CountingMockClient {
1078 async fn checkpoint(
1079 &self,
1080 _durable_execution_arn: &str,
1081 _checkpoint_token: &str,
1082 _operations: Vec<OperationUpdate>,
1083 ) -> Result<CheckpointResponse, DurableError> {
1084 self.checkpoint_count.fetch_add(1, AtomicOrdering::SeqCst);
1085 Ok(CheckpointResponse::new(format!(
1086 "token-{}",
1087 self.checkpoint_count.load(AtomicOrdering::SeqCst)
1088 )))
1089 }
1090
1091 async fn get_operations(
1092 &self,
1093 _durable_execution_arn: &str,
1094 _next_marker: &str,
1095 ) -> Result<GetOperationsResponse, DurableError> {
1096 Ok(GetOperationsResponse {
1097 operations: vec![],
1098 next_marker: None,
1099 })
1100 }
1101 }
1102
1103 fn create_test_update_with_size(id: &str, result_size: usize) -> OperationUpdate {
1104 let mut update = OperationUpdate::start(id, OperationType::Step);
1105 if result_size > 0 {
1106 update.result = Some("x".repeat(result_size));
1107 }
1108 update
1109 }
1110
1111 proptest! {
1112 #![proptest_config(ProptestConfig::with_cases(100))]
1113
1114 #[test]
1115 fn prop_checkpoint_batching_respects_operation_count_limit(
1116 num_requests in 1usize..20,
1117 max_ops_per_batch in 1usize..10,
1118 ) {
1119 let rt = tokio::runtime::Runtime::new().unwrap();
1120 let result: Result<(), TestCaseError> = rt.block_on(async {
1121 let (client, call_count) = CountingMockClient::new();
1122 let client = Arc::new(client);
1123
1124 let (sender, rx) = create_checkpoint_queue(100);
1125 let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
1126
1127 let config = CheckpointBatcherConfig {
1128 max_batch_time_ms: 10,
1129 max_batch_operations: max_ops_per_batch,
1130 max_batch_size_bytes: usize::MAX,
1131 };
1132
1133 let mut batcher = CheckpointBatcher::new(
1134 config,
1135 rx,
1136 client,
1137 "arn:test".to_string(),
1138 checkpoint_token.clone(),
1139 );
1140
1141 for i in 0..num_requests {
1142 let update = create_test_update_with_size(&format!("op-{}", i), 0);
1143 sender.tx.send(CheckpointRequest::async_request(update)).await.unwrap();
1144 }
1145
1146 drop(sender);
1147 batcher.run().await;
1148
1149 let expected_max_calls = (num_requests + max_ops_per_batch - 1) / max_ops_per_batch;
1150 let actual_calls = call_count.load(AtomicOrdering::SeqCst);
1151
1152 if actual_calls > expected_max_calls {
1153 return Err(TestCaseError::fail(format!(
1154 "Expected at most {} API calls for {} requests with batch size {}, got {}",
1155 expected_max_calls, num_requests, max_ops_per_batch, actual_calls
1156 )));
1157 }
1158
1159 if actual_calls < 1 {
1160 return Err(TestCaseError::fail(format!(
1161 "Expected at least 1 API call for {} requests, got {}",
1162 num_requests, actual_calls
1163 )));
1164 }
1165
1166 Ok(())
1167 });
1168 result?;
1169 }
1170
1171 #[test]
1172 fn prop_checkpoint_batching_respects_size_limit(
1173 num_requests in 1usize..10,
1174 result_size in 100usize..500,
1175 max_batch_size in 500usize..2000,
1176 ) {
1177 let rt = tokio::runtime::Runtime::new().unwrap();
1178 let result: Result<(), TestCaseError> = rt.block_on(async {
1179 let (client, call_count) = CountingMockClient::new();
1180 let client = Arc::new(client);
1181
1182 let (sender, rx) = create_checkpoint_queue(100);
1183 let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
1184
1185 let config = CheckpointBatcherConfig {
1186 max_batch_time_ms: 10,
1187 max_batch_operations: usize::MAX,
1188 max_batch_size_bytes: max_batch_size,
1189 };
1190
1191 let mut batcher = CheckpointBatcher::new(
1192 config,
1193 rx,
1194 client,
1195 "arn:test".to_string(),
1196 checkpoint_token.clone(),
1197 );
1198
1199 for i in 0..num_requests {
1200 let update = create_test_update_with_size(&format!("op-{}", i), result_size);
1201 sender.tx.send(CheckpointRequest::async_request(update)).await.unwrap();
1202 }
1203
1204 drop(sender);
1205 batcher.run().await;
1206
1207 let estimated_request_size = 100 + result_size;
1208 let total_size = num_requests * estimated_request_size;
1209 let expected_max_calls = (total_size + max_batch_size - 1) / max_batch_size;
1210 let actual_calls = call_count.load(AtomicOrdering::SeqCst);
1211
1212 if actual_calls > expected_max_calls.max(1) * 2 {
1213 return Err(TestCaseError::fail(format!(
1214 "Expected at most ~{} API calls for {} requests of size {}, got {}",
1215 expected_max_calls, num_requests, estimated_request_size, actual_calls
1216 )));
1217 }
1218
1219 if actual_calls < 1 {
1220 return Err(TestCaseError::fail(format!(
1221 "Expected at least 1 API call, got {}",
1222 actual_calls
1223 )));
1224 }
1225
1226 Ok(())
1227 });
1228 result?;
1229 }
1230
1231 #[test]
1232 fn prop_all_requests_are_processed(
1233 num_requests in 1usize..20,
1234 ) {
1235 let rt = tokio::runtime::Runtime::new().unwrap();
1236 let result: Result<(), TestCaseError> = rt.block_on(async {
1237 let (client, _call_count) = CountingMockClient::new();
1238 let client = Arc::new(client);
1239
1240 let (sender, rx) = create_checkpoint_queue(100);
1241 let checkpoint_token = Arc::new(RwLock::new("initial-token".to_string()));
1242
1243 let config = CheckpointBatcherConfig {
1244 max_batch_time_ms: 10,
1245 max_batch_operations: 5,
1246 ..Default::default()
1247 };
1248
1249 let mut batcher = CheckpointBatcher::new(
1250 config,
1251 rx,
1252 client,
1253 "arn:test".to_string(),
1254 checkpoint_token.clone(),
1255 );
1256
1257 let mut receivers = Vec::new();
1258
1259 for i in 0..num_requests {
1260 let update = create_test_update_with_size(&format!("op-{}", i), 0);
1261 let (request, rx) = CheckpointRequest::sync(update);
1262 sender.tx.send(request).await.unwrap();
1263 receivers.push(rx);
1264 }
1265
1266 drop(sender);
1267 batcher.run().await;
1268
1269 let mut success_count = 0;
1270 for rx in receivers {
1271 if let Ok(result) = rx.await {
1272 if result.is_ok() {
1273 success_count += 1;
1274 }
1275 }
1276 }
1277
1278 if success_count != num_requests {
1279 return Err(TestCaseError::fail(format!(
1280 "Expected all {} requests to succeed, got {}",
1281 num_requests, success_count
1282 )));
1283 }
1284
1285 Ok(())
1286 });
1287 result?;
1288 }
1289 }
1290
1291 fn operation_id_strategy() -> impl Strategy<Value = String> {
1298 "[a-z]{1,8}-[0-9]{1,4}".prop_map(|s| s)
1299 }
1300
1301 fn operation_type_strategy() -> impl Strategy<Value = OperationType> {
1303 prop_oneof![
1304 Just(OperationType::Execution),
1305 Just(OperationType::Step),
1306 Just(OperationType::Wait),
1307 Just(OperationType::Callback),
1308 Just(OperationType::Invoke),
1309 Just(OperationType::Context),
1310 ]
1311 }
1312
1313 fn operation_action_strategy() -> impl Strategy<Value = OperationAction> {
1315 prop_oneof![
1316 Just(OperationAction::Start),
1317 Just(OperationAction::Succeed),
1318 Just(OperationAction::Fail),
1319 ]
1320 }
1321
1322 fn create_checkpoint_request(
1324 id: &str,
1325 op_type: OperationType,
1326 action: OperationAction,
1327 parent_id: Option<String>,
1328 ) -> CheckpointRequest {
1329 let mut update = match action {
1330 OperationAction::Start => OperationUpdate::start(id, op_type),
1331 OperationAction::Succeed => {
1332 OperationUpdate::succeed(id, op_type, Some("result".to_string()))
1333 }
1334 OperationAction::Fail => OperationUpdate::fail(
1335 id,
1336 op_type,
1337 crate::error::ErrorObject::new("Error", "message"),
1338 ),
1339 _ => OperationUpdate::start(id, op_type),
1340 };
1341 update.parent_id = parent_id;
1342 CheckpointRequest::async_request(update)
1343 }
1344
1345 fn parent_child_batch_strategy() -> impl Strategy<Value = Vec<CheckpointRequest>> {
1348 (
1349 operation_id_strategy(), prop::collection::vec(operation_id_strategy(), 1..5), prop::collection::vec(operation_type_strategy(), 1..5), )
1353 .prop_map(|(parent_id, child_ids, child_types)| {
1354 let mut batch = Vec::new();
1355
1356 for (child_id, child_type) in child_ids.iter().zip(child_types.iter()) {
1358 if *child_type != OperationType::Execution {
1360 batch.push(create_checkpoint_request(
1361 child_id,
1362 *child_type,
1363 OperationAction::Start,
1364 Some(parent_id.clone()),
1365 ));
1366 }
1367 }
1368
1369 batch.push(create_checkpoint_request(
1371 &parent_id,
1372 OperationType::Context,
1373 OperationAction::Start,
1374 None,
1375 ));
1376
1377 batch
1378 })
1379 }
1380
1381 fn execution_completion_batch_strategy() -> impl Strategy<Value = Vec<CheckpointRequest>> {
1384 (
1385 operation_id_strategy(), prop::collection::vec((operation_id_strategy(), operation_type_strategy()), 1..5), prop::bool::ANY, )
1389 .prop_map(|(exec_id, other_ops, succeed)| {
1390 let mut batch = Vec::new();
1391
1392 let exec_action = if succeed {
1394 OperationAction::Succeed
1395 } else {
1396 OperationAction::Fail
1397 };
1398 batch.push(create_checkpoint_request(
1399 &exec_id,
1400 OperationType::Execution,
1401 exec_action,
1402 None,
1403 ));
1404
1405 for (op_id, op_type) in other_ops {
1407 if op_type != OperationType::Execution {
1409 batch.push(create_checkpoint_request(
1410 &op_id,
1411 op_type,
1412 OperationAction::Start,
1413 None,
1414 ));
1415 }
1416 }
1417
1418 batch
1419 })
1420 }
1421
1422 fn same_operation_batch_strategy() -> impl Strategy<Value = Vec<CheckpointRequest>> {
1425 (
1426 operation_id_strategy(),
1427 prop_oneof![Just(OperationType::Step), Just(OperationType::Context),],
1428 prop::bool::ANY, )
1430 .prop_map(|(op_id, op_type, succeed)| {
1431 let mut batch = Vec::new();
1432
1433 let completion_action = if succeed {
1435 OperationAction::Succeed
1436 } else {
1437 OperationAction::Fail
1438 };
1439 batch.push(create_checkpoint_request(
1440 &op_id,
1441 op_type,
1442 completion_action,
1443 None,
1444 ));
1445
1446 batch.push(create_checkpoint_request(
1448 &op_id,
1449 op_type,
1450 OperationAction::Start,
1451 None,
1452 ));
1453
1454 batch
1455 })
1456 }
1457
1458 proptest! {
1459 #![proptest_config(ProptestConfig::with_cases(100))]
1460
1461 #[test]
1466 fn prop_parent_context_start_before_children(batch in parent_child_batch_strategy()) {
1467 if batch.is_empty() {
1469 return Ok(());
1470 }
1471
1472 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1473
1474 let parent_pos = sorted.iter().position(|r| {
1476 r.operation.operation_type == OperationType::Context
1477 && r.operation.action == OperationAction::Start
1478 && r.operation.parent_id.is_none()
1479 });
1480
1481 if let Some(parent_idx) = parent_pos {
1483 let parent_id = &sorted[parent_idx].operation.operation_id;
1484
1485 for (idx, req) in sorted.iter().enumerate() {
1486 if let Some(ref req_parent_id) = req.operation.parent_id {
1487 if req_parent_id == parent_id {
1488 prop_assert!(
1489 idx > parent_idx,
1490 "Child operation {} at index {} should come after parent {} at index {}",
1491 req.operation.operation_id,
1492 idx,
1493 parent_id,
1494 parent_idx
1495 );
1496 }
1497 }
1498 }
1499 }
1500 }
1501
1502 #[test]
1507 fn prop_execution_completion_is_last(batch in execution_completion_batch_strategy()) {
1508 if batch.is_empty() {
1510 return Ok(());
1511 }
1512
1513 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1514
1515 let exec_completion_pos = sorted.iter().position(|r| {
1517 r.operation.operation_type == OperationType::Execution
1518 && matches!(r.operation.action, OperationAction::Succeed | OperationAction::Fail)
1519 });
1520
1521 if let Some(exec_idx) = exec_completion_pos {
1523 prop_assert_eq!(
1524 exec_idx,
1525 sorted.len() - 1,
1526 "EXECUTION completion should be at index {} (last), but found at index {}",
1527 sorted.len() - 1,
1528 exec_idx
1529 );
1530 }
1531 }
1532
1533 #[test]
1539 fn prop_operation_id_uniqueness_with_start_completion_exception(
1540 batch in same_operation_batch_strategy()
1541 ) {
1542 if batch.is_empty() {
1544 return Ok(());
1545 }
1546
1547 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1548
1549 let mut seen: std::collections::HashMap<String, Vec<OperationAction>> = std::collections::HashMap::new();
1551
1552 for req in &sorted {
1553 seen.entry(req.operation.operation_id.clone())
1554 .or_default()
1555 .push(req.operation.action);
1556 }
1557
1558 for (op_id, actions) in &seen {
1560 if actions.len() > 1 {
1561 let has_start = actions.contains(&OperationAction::Start);
1563 let has_completion = actions.iter().any(|a| {
1564 matches!(a, OperationAction::Succeed | OperationAction::Fail | OperationAction::Retry)
1565 });
1566
1567 prop_assert!(
1568 has_start && has_completion && actions.len() == 2,
1569 "Operation {} appears {} times with actions {:?}. \
1570 Multiple occurrences only allowed for START + completion pair.",
1571 op_id,
1572 actions.len(),
1573 actions
1574 );
1575 }
1576 }
1577 }
1578
1579 #[test]
1583 fn prop_start_before_completion_for_same_operation(
1584 batch in same_operation_batch_strategy()
1585 ) {
1586 if batch.is_empty() {
1588 return Ok(());
1589 }
1590
1591 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1592
1593 let mut start_positions: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
1595 let mut completion_positions: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
1596
1597 for (idx, req) in sorted.iter().enumerate() {
1598 let op_id = &req.operation.operation_id;
1599 match req.operation.action {
1600 OperationAction::Start => {
1601 start_positions.insert(op_id.clone(), idx);
1602 }
1603 OperationAction::Succeed | OperationAction::Fail | OperationAction::Retry => {
1604 completion_positions.insert(op_id.clone(), idx);
1605 }
1606 _ => {}
1607 }
1608 }
1609
1610 for (op_id, start_idx) in &start_positions {
1612 if let Some(completion_idx) = completion_positions.get(op_id) {
1613 prop_assert!(
1614 start_idx < completion_idx,
1615 "For operation {}, START at index {} should come before completion at index {}",
1616 op_id,
1617 start_idx,
1618 completion_idx
1619 );
1620 }
1621 }
1622 }
1623 }
1624
1625 fn random_batch_strategy() -> impl Strategy<Value = Vec<CheckpointRequest>> {
1632 prop::collection::vec(
1633 (
1634 operation_id_strategy(),
1635 operation_type_strategy(),
1636 operation_action_strategy(),
1637 ),
1638 1..10,
1639 )
1640 .prop_map(|ops| {
1641 ops.into_iter()
1642 .filter(|(_, op_type, _)| *op_type != OperationType::Execution) .map(|(id, op_type, action)| create_checkpoint_request(&id, op_type, action, None))
1644 .collect()
1645 })
1646 }
1647
1648 proptest! {
1649 #![proptest_config(ProptestConfig::with_cases(100))]
1650
1651 #[test]
1656 fn prop_batch_sorting_preserves_all_operations(batch in random_batch_strategy()) {
1657 if batch.is_empty() {
1659 return Ok(());
1660 }
1661
1662 let original_ops: HashSet<(String, String)> = batch.iter()
1664 .map(|r| (r.operation.operation_id.clone(), format!("{:?}", r.operation.action)))
1665 .collect();
1666
1667 let sorted = CheckpointBatcher::sort_checkpoint_batch(batch);
1668
1669 let sorted_ops: HashSet<(String, String)> = sorted.iter()
1671 .map(|r| (r.operation.operation_id.clone(), format!("{:?}", r.operation.action)))
1672 .collect();
1673
1674 prop_assert_eq!(
1676 original_ops.len(),
1677 sorted_ops.len(),
1678 "Sorting changed the number of operations: original {}, sorted {}",
1679 original_ops.len(),
1680 sorted_ops.len()
1681 );
1682
1683 prop_assert_eq!(
1684 original_ops,
1685 sorted_ops,
1686 "Sorting changed the set of operations"
1687 );
1688 }
1689
1690 #[test]
1694 fn prop_sorting_is_idempotent(batch in random_batch_strategy()) {
1695 if batch.is_empty() {
1697 return Ok(());
1698 }
1699
1700 let sorted_once = CheckpointBatcher::sort_checkpoint_batch(batch);
1701
1702 let sorted_once_clone: Vec<CheckpointRequest> = sorted_once.iter()
1704 .map(|r| CheckpointRequest::async_request(r.operation.clone()))
1705 .collect();
1706
1707 let sorted_twice = CheckpointBatcher::sort_checkpoint_batch(sorted_once_clone);
1708
1709 prop_assert_eq!(
1711 sorted_once.len(),
1712 sorted_twice.len(),
1713 "Double sorting changed the number of operations"
1714 );
1715
1716 for (idx, (first, second)) in sorted_once.iter().zip(sorted_twice.iter()).enumerate() {
1717 prop_assert_eq!(
1718 &first.operation.operation_id,
1719 &second.operation.operation_id,
1720 "Operation ID mismatch at index {}: {} vs {}",
1721 idx,
1722 &first.operation.operation_id,
1723 &second.operation.operation_id
1724 );
1725 prop_assert_eq!(
1726 first.operation.action,
1727 second.operation.action,
1728 "Action mismatch at index {} for operation {}: {:?} vs {:?}",
1729 idx,
1730 &first.operation.operation_id,
1731 first.operation.action,
1732 second.operation.action
1733 );
1734 }
1735 }
1736 }
1737}