1use std::time::Duration;
9
10use crate::error::TestError;
11use crate::test_result::HistoryEvent;
12use crate::types::{ExecutionStatus, TestResultError};
13use durable_execution_sdk::Operation;
14
15#[async_trait::async_trait]
21pub trait HistoryApiClient: Send + Sync {
22 async fn get_history(&self, arn: &str, marker: Option<&str>) -> Result<HistoryPage, TestError>;
34}
35
36#[derive(Debug, Clone)]
42pub struct HistoryPage {
43 pub events: Vec<HistoryEvent>,
45 pub operations: Vec<Operation>,
47 pub next_marker: Option<String>,
49 pub is_terminal: bool,
51 pub terminal_status: Option<ExecutionStatus>,
53 pub terminal_result: Option<String>,
55 pub terminal_error: Option<TestResultError>,
57}
58
59#[derive(Debug, Clone)]
65pub struct PollResult {
66 pub operations: Vec<Operation>,
68 pub events: Vec<HistoryEvent>,
70 pub terminal: Option<TerminalState>,
72}
73
74#[derive(Debug, Clone)]
78pub struct TerminalState {
79 pub status: ExecutionStatus,
81 pub result: Option<String>,
83 pub error: Option<TestResultError>,
85}
86
87pub struct HistoryPoller<C: HistoryApiClient> {
92 api_client: C,
93 durable_execution_arn: String,
94 poll_interval: Duration,
95 pub(crate) last_marker: Option<String>,
96 max_retries: usize,
97}
98
99impl<C: HistoryApiClient> HistoryPoller<C> {
100 pub fn new(api_client: C, arn: String, poll_interval: Duration) -> Self {
108 Self {
109 api_client,
110 durable_execution_arn: arn,
111 poll_interval,
112 last_marker: None,
113 max_retries: 3,
114 }
115 }
116
117 pub(crate) async fn call_with_retries(
134 &self,
135 marker: Option<&str>,
136 ) -> Result<HistoryPage, TestError> {
137 let mut attempts: u64 = 0;
138 loop {
139 match self
140 .api_client
141 .get_history(&self.durable_execution_arn, marker)
142 .await
143 {
144 Ok(page) => return Ok(page),
145 Err(_e) if attempts < self.max_retries as u64 => {
146 attempts += 1;
147 let delay_ms = (attempts.pow(2) - 1) * 150 + 1000;
148 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
149 }
150 Err(e) => return Err(e),
151 }
152 }
153 }
154
155 pub async fn poll_once(&mut self) -> Result<PollResult, TestError> {
169 let mut all_operations = Vec::new();
170 let mut all_events = Vec::new();
171 let mut terminal = None;
172
173 let mut current_marker = self.last_marker.clone();
175
176 loop {
177 let page = self.call_with_retries(current_marker.as_deref()).await?;
178
179 all_operations.extend(page.operations);
180 all_events.extend(page.events);
181
182 if page.is_terminal && terminal.is_none() {
184 terminal = Some(TerminalState {
185 status: page.terminal_status.unwrap_or(ExecutionStatus::Failed),
186 result: page.terminal_result,
187 error: page.terminal_error,
188 });
189 }
190
191 match page.next_marker {
192 Some(next) => {
193 current_marker = Some(next);
194 tokio::time::sleep(self.poll_interval).await;
196 }
197 None => {
198 self.last_marker = current_marker;
200 break;
201 }
202 }
203 }
204
205 Ok(PollResult {
206 operations: all_operations,
207 events: all_events,
208 terminal,
209 })
210 }
211}
212
213#[cfg(test)]
214pub(crate) mod tests {
215 use super::*;
216 use std::collections::VecDeque;
217 use std::sync::Mutex;
218
219 pub(crate) struct MockHistoryApiClient {
224 responses: Mutex<VecDeque<Result<HistoryPage, TestError>>>,
226 pub(crate) calls: Mutex<Vec<(String, Option<String>)>>,
228 }
229
230 impl MockHistoryApiClient {
231 pub(crate) fn new(responses: VecDeque<Result<HistoryPage, TestError>>) -> Self {
233 Self {
234 responses: Mutex::new(responses),
235 calls: Mutex::new(Vec::new()),
236 }
237 }
238 }
239
240 #[async_trait::async_trait]
241 impl HistoryApiClient for MockHistoryApiClient {
242 async fn get_history(
243 &self,
244 arn: &str,
245 marker: Option<&str>,
246 ) -> Result<HistoryPage, TestError> {
247 self.calls
249 .lock()
250 .unwrap()
251 .push((arn.to_string(), marker.map(|m| m.to_string())));
252
253 self.responses
255 .lock()
256 .unwrap()
257 .pop_front()
258 .expect("MockHistoryApiClient: no more responses configured")
259 }
260 }
261
262 #[test]
263 fn mock_returns_configured_responses() {
264 let page = HistoryPage {
265 events: vec![],
266 operations: vec![],
267 next_marker: None,
268 is_terminal: false,
269 terminal_status: None,
270 terminal_result: None,
271 terminal_error: None,
272 };
273
274 let mut responses = VecDeque::new();
275 responses.push_back(Ok(page));
276 responses.push_back(Err(TestError::aws_error("transient failure")));
277
278 let mock = MockHistoryApiClient::new(responses);
279
280 let rt = tokio::runtime::Builder::new_current_thread()
281 .enable_all()
282 .build()
283 .unwrap();
284
285 let result = rt.block_on(mock.get_history("arn:test", None));
287 assert!(result.is_ok());
288
289 let result = rt.block_on(mock.get_history("arn:test", Some("marker-1")));
291 assert!(result.is_err());
292
293 let calls = mock.calls.lock().unwrap();
295 assert_eq!(calls.len(), 2);
296 assert_eq!(calls[0], ("arn:test".to_string(), None));
297 assert_eq!(
298 calls[1],
299 ("arn:test".to_string(), Some("marker-1".to_string()))
300 );
301 }
302
303 #[test]
304 fn mock_records_all_call_arguments() {
305 let mut responses = VecDeque::new();
306 for _ in 0..3 {
307 responses.push_back(Ok(HistoryPage {
308 events: vec![],
309 operations: vec![],
310 next_marker: None,
311 is_terminal: false,
312 terminal_status: None,
313 terminal_result: None,
314 terminal_error: None,
315 }));
316 }
317
318 let mock = MockHistoryApiClient::new(responses);
319
320 let rt = tokio::runtime::Builder::new_current_thread()
321 .enable_all()
322 .build()
323 .unwrap();
324
325 rt.block_on(mock.get_history("arn:exec:1", None)).unwrap();
326 rt.block_on(mock.get_history("arn:exec:1", Some("m1")))
327 .unwrap();
328 rt.block_on(mock.get_history("arn:exec:2", Some("m2")))
329 .unwrap();
330
331 let calls = mock.calls.lock().unwrap();
332 assert_eq!(calls.len(), 3);
333 assert_eq!(calls[0].0, "arn:exec:1");
334 assert_eq!(calls[0].1, None);
335 assert_eq!(calls[1].1, Some("m1".to_string()));
336 assert_eq!(calls[2].0, "arn:exec:2");
337 assert_eq!(calls[2].1, Some("m2".to_string()));
338 }
339
340 #[test]
341 #[should_panic(expected = "no more responses configured")]
342 fn mock_panics_when_responses_exhausted() {
343 let mock = MockHistoryApiClient::new(VecDeque::new());
344
345 let rt = tokio::runtime::Builder::new_current_thread()
346 .enable_all()
347 .build()
348 .unwrap();
349
350 let _ = rt.block_on(mock.get_history("arn:test", None));
352 }
353
354 fn empty_page() -> HistoryPage {
357 HistoryPage {
358 events: vec![],
359 operations: vec![],
360 next_marker: None,
361 is_terminal: false,
362 terminal_status: None,
363 terminal_result: None,
364 terminal_error: None,
365 }
366 }
367
368 #[test]
369 fn new_sets_defaults() {
370 let mock = MockHistoryApiClient::new(VecDeque::new());
371 let poller =
372 HistoryPoller::new(mock, "arn:test:123".to_string(), Duration::from_millis(500));
373
374 assert_eq!(poller.durable_execution_arn, "arn:test:123");
375 assert_eq!(poller.poll_interval, Duration::from_millis(500));
376 assert!(poller.last_marker.is_none());
377 assert_eq!(poller.max_retries, 3);
378 }
379
380 #[tokio::test]
381 async fn call_with_retries_succeeds_on_first_try() {
382 let mut responses = VecDeque::new();
383 responses.push_back(Ok(empty_page()));
384
385 let mock = MockHistoryApiClient::new(responses);
386 let poller = HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(100));
387
388 let result = poller.call_with_retries(None).await;
389 assert!(result.is_ok());
390
391 let calls = poller.api_client.calls.lock().unwrap();
392 assert_eq!(calls.len(), 1);
393 }
394
395 #[tokio::test]
396 async fn call_with_retries_succeeds_after_transient_errors() {
397 tokio::time::pause();
398
399 let mut responses = VecDeque::new();
400 responses.push_back(Err(TestError::aws_error("transient 1")));
402 responses.push_back(Err(TestError::aws_error("transient 2")));
403 responses.push_back(Ok(empty_page()));
404
405 let mock = MockHistoryApiClient::new(responses);
406 let poller = HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(100));
407
408 let result = poller.call_with_retries(None).await;
409 assert!(result.is_ok());
410
411 let calls = poller.api_client.calls.lock().unwrap();
412 assert_eq!(calls.len(), 3);
413 }
414
415 #[tokio::test]
416 async fn call_with_retries_fails_after_max_retries() {
417 tokio::time::pause();
418
419 let mut responses = VecDeque::new();
420 responses.push_back(Err(TestError::aws_error("fail 1")));
422 responses.push_back(Err(TestError::aws_error("fail 2")));
423 responses.push_back(Err(TestError::aws_error("fail 3")));
424 responses.push_back(Err(TestError::aws_error("fail 4")));
425
426 let mock = MockHistoryApiClient::new(responses);
427 let poller = HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(100));
428
429 let result = poller.call_with_retries(None).await;
430 assert!(result.is_err());
431 assert!(result.unwrap_err().to_string().contains("fail 4"));
432
433 let calls = poller.api_client.calls.lock().unwrap();
435 assert_eq!(calls.len(), 4);
436 }
437
438 #[tokio::test]
439 async fn call_with_retries_passes_marker() {
440 let mut responses = VecDeque::new();
441 responses.push_back(Ok(empty_page()));
442
443 let mock = MockHistoryApiClient::new(responses);
444 let poller = HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(100));
445
446 let result = poller.call_with_retries(Some("my-marker")).await;
447 assert!(result.is_ok());
448
449 let calls = poller.api_client.calls.lock().unwrap();
450 assert_eq!(calls[0].1, Some("my-marker".to_string()));
451 }
452
453 #[tokio::test]
454 async fn call_with_retries_uses_correct_arn() {
455 let mut responses = VecDeque::new();
456 responses.push_back(Ok(empty_page()));
457
458 let mock = MockHistoryApiClient::new(responses);
459 let poller = HistoryPoller::new(
460 mock,
461 "arn:aws:lambda:us-east-1:123:exec-456".to_string(),
462 Duration::from_millis(100),
463 );
464
465 poller.call_with_retries(None).await.unwrap();
466
467 let calls = poller.api_client.calls.lock().unwrap();
468 assert_eq!(calls[0].0, "arn:aws:lambda:us-east-1:123:exec-456");
469 }
470
471 #[tokio::test]
472 async fn call_with_retries_backoff_timing() {
473 tokio::time::pause();
474
475 let mut responses = VecDeque::new();
476 responses.push_back(Err(TestError::aws_error("fail 1")));
478 responses.push_back(Err(TestError::aws_error("fail 2")));
479 responses.push_back(Err(TestError::aws_error("fail 3")));
480 responses.push_back(Ok(empty_page()));
481
482 let mock = MockHistoryApiClient::new(responses);
483 let poller = HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(100));
484
485 let start = tokio::time::Instant::now();
486 let result = poller.call_with_retries(None).await;
487 let elapsed = start.elapsed();
488
489 assert!(result.is_ok());
490
491 let expected_total = Duration::from_millis(1000 + 1450 + 2200);
497 assert!(
498 elapsed >= expected_total,
499 "Expected at least {:?} elapsed, got {:?}",
500 expected_total,
501 elapsed
502 );
503 }
504
505 fn make_operation(id: &str) -> Operation {
508 Operation::new(id, durable_execution_sdk::OperationType::Step)
509 }
510
511 fn make_event(event_type: &str) -> crate::test_result::HistoryEvent {
512 crate::test_result::HistoryEvent::new(event_type)
513 }
514
515 #[tokio::test]
516 async fn poll_once_single_page_no_terminal() {
517 let op = make_operation("op-1");
518 let evt = make_event("StepStarted");
519
520 let mut responses = VecDeque::new();
521 responses.push_back(Ok(HistoryPage {
522 events: vec![evt],
523 operations: vec![op],
524 next_marker: None,
525 is_terminal: false,
526 terminal_status: None,
527 terminal_result: None,
528 terminal_error: None,
529 }));
530
531 let mock = MockHistoryApiClient::new(responses);
532 let mut poller =
533 HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(10));
534
535 let result = poller.poll_once().await.unwrap();
536 assert_eq!(result.operations.len(), 1);
537 assert_eq!(result.operations[0].operation_id, "op-1");
538 assert_eq!(result.events.len(), 1);
539 assert!(result.terminal.is_none());
540 assert!(poller.last_marker.is_none());
542 }
543
544 #[tokio::test]
545 async fn poll_once_multi_page_pagination() {
546 tokio::time::pause();
547
548 let mut responses = VecDeque::new();
549 responses.push_back(Ok(HistoryPage {
551 events: vec![make_event("E1")],
552 operations: vec![make_operation("op-1")],
553 next_marker: Some("marker-A".to_string()),
554 is_terminal: false,
555 terminal_status: None,
556 terminal_result: None,
557 terminal_error: None,
558 }));
559 responses.push_back(Ok(HistoryPage {
561 events: vec![make_event("E2")],
562 operations: vec![make_operation("op-2")],
563 next_marker: Some("marker-B".to_string()),
564 is_terminal: false,
565 terminal_status: None,
566 terminal_result: None,
567 terminal_error: None,
568 }));
569 responses.push_back(Ok(HistoryPage {
571 events: vec![make_event("E3")],
572 operations: vec![make_operation("op-3")],
573 next_marker: None,
574 is_terminal: false,
575 terminal_status: None,
576 terminal_result: None,
577 terminal_error: None,
578 }));
579
580 let mock = MockHistoryApiClient::new(responses);
581 let mut poller =
582 HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(50));
583
584 let result = poller.poll_once().await.unwrap();
585
586 assert_eq!(result.operations.len(), 3);
588 assert_eq!(result.events.len(), 3);
589 assert!(result.terminal.is_none());
590
591 let calls = poller.api_client.calls.lock().unwrap();
593 assert_eq!(calls.len(), 3);
594 assert_eq!(calls[0].1, None); assert_eq!(calls[1].1, Some("marker-A".to_string()));
596 assert_eq!(calls[2].1, Some("marker-B".to_string()));
597
598 assert_eq!(poller.last_marker, Some("marker-B".to_string()));
601 }
602
603 #[tokio::test]
604 async fn poll_once_detects_terminal_state() {
605 let mut responses = VecDeque::new();
606 responses.push_back(Ok(HistoryPage {
607 events: vec![make_event("ExecutionSucceeded")],
608 operations: vec![make_operation("op-1")],
609 next_marker: None,
610 is_terminal: true,
611 terminal_status: Some(crate::types::ExecutionStatus::Succeeded),
612 terminal_result: Some(r#"{"value": 42}"#.to_string()),
613 terminal_error: None,
614 }));
615
616 let mock = MockHistoryApiClient::new(responses);
617 let mut poller =
618 HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(10));
619
620 let result = poller.poll_once().await.unwrap();
621 assert!(result.terminal.is_some());
622 let terminal = result.terminal.unwrap();
623 assert_eq!(terminal.status, crate::types::ExecutionStatus::Succeeded);
624 assert_eq!(terminal.result, Some(r#"{"value": 42}"#.to_string()));
625 assert!(terminal.error.is_none());
626 }
627
628 #[tokio::test]
629 async fn poll_once_terminal_with_error() {
630 let mut responses = VecDeque::new();
631 responses.push_back(Ok(HistoryPage {
632 events: vec![],
633 operations: vec![],
634 next_marker: None,
635 is_terminal: true,
636 terminal_status: Some(crate::types::ExecutionStatus::Failed),
637 terminal_result: None,
638 terminal_error: Some(crate::types::TestResultError::new("RuntimeError", "boom")),
639 }));
640
641 let mock = MockHistoryApiClient::new(responses);
642 let mut poller =
643 HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(10));
644
645 let result = poller.poll_once().await.unwrap();
646 let terminal = result.terminal.unwrap();
647 assert_eq!(terminal.status, crate::types::ExecutionStatus::Failed);
648 assert!(terminal.error.is_some());
649 assert_eq!(
650 terminal.error.unwrap().error_type,
651 Some("RuntimeError".to_string())
652 );
653 }
654
655 #[tokio::test]
656 async fn poll_once_marker_continuity_across_cycles() {
657 tokio::time::pause();
658
659 let mut responses = VecDeque::new();
660 responses.push_back(Ok(HistoryPage {
662 events: vec![],
663 operations: vec![make_operation("op-1")],
664 next_marker: Some("cycle1-marker".to_string()),
665 is_terminal: false,
666 terminal_status: None,
667 terminal_result: None,
668 terminal_error: None,
669 }));
670 responses.push_back(Ok(HistoryPage {
672 events: vec![],
673 operations: vec![],
674 next_marker: None,
675 is_terminal: false,
676 terminal_status: None,
677 terminal_result: None,
678 terminal_error: None,
679 }));
680 responses.push_back(Ok(HistoryPage {
682 events: vec![],
683 operations: vec![make_operation("op-2")],
684 next_marker: None,
685 is_terminal: true,
686 terminal_status: Some(crate::types::ExecutionStatus::Succeeded),
687 terminal_result: Some("\"done\"".to_string()),
688 terminal_error: None,
689 }));
690
691 let mock = MockHistoryApiClient::new(responses);
692 let mut poller =
693 HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(10));
694
695 let _r1 = poller.poll_once().await.unwrap();
697 assert_eq!(poller.last_marker, Some("cycle1-marker".to_string()));
698
699 let _r2 = poller.poll_once().await.unwrap();
701
702 let calls = poller.api_client.calls.lock().unwrap();
703 assert_eq!(calls[2].1, Some("cycle1-marker".to_string()));
705 }
706
707 #[tokio::test]
708 async fn poll_once_propagates_api_error() {
709 tokio::time::pause();
710
711 let mut responses = VecDeque::new();
712 for _ in 0..4 {
714 responses.push_back(Err(TestError::aws_error("service unavailable")));
715 }
716
717 let mock = MockHistoryApiClient::new(responses);
718 let mut poller =
719 HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(10));
720
721 let result = poller.poll_once().await;
722 assert!(result.is_err());
723 }
724
725 #[tokio::test]
726 async fn poll_once_empty_page() {
727 let mut responses = VecDeque::new();
728 responses.push_back(Ok(empty_page()));
729
730 let mock = MockHistoryApiClient::new(responses);
731 let mut poller =
732 HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(10));
733
734 let result = poller.poll_once().await.unwrap();
735 assert!(result.operations.is_empty());
736 assert!(result.events.is_empty());
737 assert!(result.terminal.is_none());
738 }
739
740 #[tokio::test]
741 async fn poll_once_pagination_waits_between_pages() {
742 tokio::time::pause();
743
744 let mut responses = VecDeque::new();
745 responses.push_back(Ok(HistoryPage {
746 events: vec![],
747 operations: vec![],
748 next_marker: Some("m1".to_string()),
749 is_terminal: false,
750 terminal_status: None,
751 terminal_result: None,
752 terminal_error: None,
753 }));
754 responses.push_back(Ok(empty_page()));
755
756 let mock = MockHistoryApiClient::new(responses);
757 let poll_interval = Duration::from_millis(200);
758 let mut poller = HistoryPoller::new(mock, "arn:test".to_string(), poll_interval);
759
760 let start = tokio::time::Instant::now();
761 poller.poll_once().await.unwrap();
762 let elapsed = start.elapsed();
763
764 assert!(
766 elapsed >= poll_interval,
767 "Expected at least {:?} elapsed, got {:?}",
768 poll_interval,
769 elapsed
770 );
771 }
772}