Skip to main content

durable_execution_sdk_testing/
history_poller.rs

1//! History polling types and traits for cloud test execution.
2//!
3//! This module defines the `HistoryApiClient` trait for abstracting history API calls,
4//! along with the data types used to represent history pages, poll results, and terminal states.
5//! These are the building blocks for the `HistoryPoller` which periodically retrieves
6//! operation history during cloud durable execution testing.
7
8use std::time::Duration;
9
10use crate::error::TestError;
11use crate::test_result::HistoryEvent;
12use crate::types::{ExecutionStatus, TestResultError};
13use durable_execution_sdk::Operation;
14
15/// Trait abstracting the `GetDurableExecutionHistory` API call for testability.
16///
17/// Implementations of this trait provide the ability to retrieve paginated history
18/// for a durable execution identified by its ARN. The trait is object-safe and
19/// async, enabling both real API clients and mock implementations for testing.
20#[async_trait::async_trait]
21pub trait HistoryApiClient: Send + Sync {
22    /// Retrieves a single page of execution history.
23    ///
24    /// # Arguments
25    ///
26    /// * `arn` - The Durable Execution ARN identifying the execution
27    /// * `marker` - Optional pagination marker from a previous response
28    ///
29    /// # Returns
30    ///
31    /// A `HistoryPage` containing events, operations, and pagination/terminal info,
32    /// or a `TestError` if the API call fails.
33    async fn get_history(&self, arn: &str, marker: Option<&str>) -> Result<HistoryPage, TestError>;
34}
35
36/// A single page of results from the `GetDurableExecutionHistory` API.
37///
38/// Each page may contain history events, operations, and pagination metadata.
39/// When the execution reaches a terminal state, the page indicates this along
40/// with the terminal status and result/error information.
41#[derive(Debug, Clone)]
42pub struct HistoryPage {
43    /// History events from this page.
44    pub events: Vec<HistoryEvent>,
45    /// Operations discovered in this page.
46    pub operations: Vec<Operation>,
47    /// Pagination marker for the next page, if more results are available.
48    pub next_marker: Option<String>,
49    /// Whether this page indicates the execution has reached a terminal state.
50    pub is_terminal: bool,
51    /// The terminal execution status, if `is_terminal` is true.
52    pub terminal_status: Option<ExecutionStatus>,
53    /// The serialized result payload, if the execution succeeded.
54    pub terminal_result: Option<String>,
55    /// The error details, if the execution failed.
56    pub terminal_error: Option<TestResultError>,
57}
58
59/// Aggregated result of a single poll cycle, which may span multiple pages.
60///
61/// A poll cycle exhausts all available pages (following `next_marker` pagination)
62/// and collects all operations and events into a single result. If any page
63/// indicates a terminal state, it is captured here.
64#[derive(Debug, Clone)]
65pub struct PollResult {
66    /// All operations collected across all pages in this poll cycle.
67    pub operations: Vec<Operation>,
68    /// All history events collected across all pages in this poll cycle.
69    pub events: Vec<HistoryEvent>,
70    /// Terminal state information, if the execution completed during this cycle.
71    pub terminal: Option<TerminalState>,
72}
73
74/// Terminal execution outcome extracted from a history page.
75///
76/// Captures the final status and result/error of a completed durable execution.
77#[derive(Debug, Clone)]
78pub struct TerminalState {
79    /// The terminal execution status (Succeeded, Failed, Cancelled, or TimedOut).
80    pub status: ExecutionStatus,
81    /// The serialized result payload, if the execution succeeded.
82    pub result: Option<String>,
83    /// The error details, if the execution failed.
84    pub error: Option<TestResultError>,
85}
86
87/// Polls `GetDurableExecutionHistory` and feeds results into `OperationStorage`.
88///
89/// The `HistoryPoller` periodically calls the history API, handles pagination
90/// within each poll cycle, and retries transient errors with exponential backoff.
91pub struct HistoryPoller<C: HistoryApiClient> {
92    api_client: C,
93    durable_execution_arn: String,
94    poll_interval: Duration,
95    pub(crate) last_marker: Option<String>,
96    max_retries: usize,
97}
98
99impl<C: HistoryApiClient> HistoryPoller<C> {
100    /// Creates a new `HistoryPoller`.
101    ///
102    /// # Arguments
103    ///
104    /// * `api_client` - The client used to call the history API
105    /// * `arn` - The Durable Execution ARN identifying the execution to poll
106    /// * `poll_interval` - Duration to wait between page requests within a poll cycle
107    pub fn new(api_client: C, arn: String, poll_interval: Duration) -> Self {
108        Self {
109            api_client,
110            durable_execution_arn: arn,
111            poll_interval,
112            last_marker: None,
113            max_retries: 3,
114        }
115    }
116
117    /// Makes a single API call with retry logic and exponential backoff.
118    ///
119    /// Retries transient errors up to `max_retries` times using the formula:
120    /// `(attempt² - 1) * 150 + 1000` milliseconds.
121    ///
122    /// For attempt 1: (1 - 1) * 150 + 1000 = 1000ms
123    /// For attempt 2: (4 - 1) * 150 + 1000 = 1450ms
124    /// For attempt 3: (9 - 1) * 150 + 1000 = 2200ms
125    ///
126    /// # Arguments
127    ///
128    /// * `marker` - Optional pagination marker for the API call
129    ///
130    /// # Returns
131    ///
132    /// The `HistoryPage` on success, or a `TestError` if all retries are exhausted.
133    pub(crate) async fn call_with_retries(
134        &self,
135        marker: Option<&str>,
136    ) -> Result<HistoryPage, TestError> {
137        let mut attempts: u64 = 0;
138        loop {
139            match self
140                .api_client
141                .get_history(&self.durable_execution_arn, marker)
142                .await
143            {
144                Ok(page) => return Ok(page),
145                Err(_e) if attempts < self.max_retries as u64 => {
146                    attempts += 1;
147                    let delay_ms = (attempts.pow(2) - 1) * 150 + 1000;
148                    tokio::time::sleep(Duration::from_millis(delay_ms)).await;
149                }
150                Err(e) => return Err(e),
151            }
152        }
153    }
154
155    /// Executes one poll cycle, exhausting all pages via pagination.
156    ///
157    /// Starting from `self.last_marker`, this method calls the history API
158    /// and follows `next_marker` pagination tokens until no more pages remain.
159    /// Between each page request, it waits `poll_interval` to avoid throttling.
160    ///
161    /// After the cycle completes, `self.last_marker` is updated to the last
162    /// page's `next_marker` so the next poll cycle starts where this one left off.
163    ///
164    /// # Returns
165    ///
166    /// A `PollResult` containing all operations and events aggregated across
167    /// all pages, plus any terminal state detected during the cycle.
168    pub async fn poll_once(&mut self) -> Result<PollResult, TestError> {
169        let mut all_operations = Vec::new();
170        let mut all_events = Vec::new();
171        let mut terminal = None;
172
173        // First page uses the marker carried over from the previous cycle
174        let mut current_marker = self.last_marker.clone();
175
176        loop {
177            let page = self.call_with_retries(current_marker.as_deref()).await?;
178
179            all_operations.extend(page.operations);
180            all_events.extend(page.events);
181
182            // Capture terminal state from any page (first one wins)
183            if page.is_terminal && terminal.is_none() {
184                terminal = Some(TerminalState {
185                    status: page.terminal_status.unwrap_or(ExecutionStatus::Failed),
186                    result: page.terminal_result,
187                    error: page.terminal_error,
188                });
189            }
190
191            match page.next_marker {
192                Some(next) => {
193                    current_marker = Some(next);
194                    // Wait between pages within a cycle to avoid throttling (Req 2.3)
195                    tokio::time::sleep(self.poll_interval).await;
196                }
197                None => {
198                    // Update last_marker for cross-cycle continuity (Req 2.4)
199                    self.last_marker = current_marker;
200                    break;
201                }
202            }
203        }
204
205        Ok(PollResult {
206            operations: all_operations,
207            events: all_events,
208            terminal,
209        })
210    }
211}
212
213#[cfg(test)]
214pub(crate) mod tests {
215    use super::*;
216    use std::collections::VecDeque;
217    use std::sync::Mutex;
218
219    /// A mock implementation of `HistoryApiClient` for testing.
220    ///
221    /// Records every `(arn, marker)` call and returns pre-configured responses
222    /// from a `VecDeque`. Supports injecting transient errors for retry testing.
223    pub(crate) struct MockHistoryApiClient {
224        /// Pre-configured responses returned in FIFO order.
225        responses: Mutex<VecDeque<Result<HistoryPage, TestError>>>,
226        /// Recorded `(arn, marker)` pairs from each `get_history` call.
227        pub(crate) calls: Mutex<Vec<(String, Option<String>)>>,
228    }
229
230    impl MockHistoryApiClient {
231        /// Creates a new mock with the given sequence of responses.
232        pub(crate) fn new(responses: VecDeque<Result<HistoryPage, TestError>>) -> Self {
233            Self {
234                responses: Mutex::new(responses),
235                calls: Mutex::new(Vec::new()),
236            }
237        }
238    }
239
240    #[async_trait::async_trait]
241    impl HistoryApiClient for MockHistoryApiClient {
242        async fn get_history(
243            &self,
244            arn: &str,
245            marker: Option<&str>,
246        ) -> Result<HistoryPage, TestError> {
247            // Record the call
248            self.calls
249                .lock()
250                .unwrap()
251                .push((arn.to_string(), marker.map(|m| m.to_string())));
252
253            // Pop the next response, or panic if none left
254            self.responses
255                .lock()
256                .unwrap()
257                .pop_front()
258                .expect("MockHistoryApiClient: no more responses configured")
259        }
260    }
261
262    #[test]
263    fn mock_returns_configured_responses() {
264        let page = HistoryPage {
265            events: vec![],
266            operations: vec![],
267            next_marker: None,
268            is_terminal: false,
269            terminal_status: None,
270            terminal_result: None,
271            terminal_error: None,
272        };
273
274        let mut responses = VecDeque::new();
275        responses.push_back(Ok(page));
276        responses.push_back(Err(TestError::aws_error("transient failure")));
277
278        let mock = MockHistoryApiClient::new(responses);
279
280        let rt = tokio::runtime::Builder::new_current_thread()
281            .enable_all()
282            .build()
283            .unwrap();
284
285        // First call returns Ok
286        let result = rt.block_on(mock.get_history("arn:test", None));
287        assert!(result.is_ok());
288
289        // Second call returns Err
290        let result = rt.block_on(mock.get_history("arn:test", Some("marker-1")));
291        assert!(result.is_err());
292
293        // Verify recorded calls
294        let calls = mock.calls.lock().unwrap();
295        assert_eq!(calls.len(), 2);
296        assert_eq!(calls[0], ("arn:test".to_string(), None));
297        assert_eq!(
298            calls[1],
299            ("arn:test".to_string(), Some("marker-1".to_string()))
300        );
301    }
302
303    #[test]
304    fn mock_records_all_call_arguments() {
305        let mut responses = VecDeque::new();
306        for _ in 0..3 {
307            responses.push_back(Ok(HistoryPage {
308                events: vec![],
309                operations: vec![],
310                next_marker: None,
311                is_terminal: false,
312                terminal_status: None,
313                terminal_result: None,
314                terminal_error: None,
315            }));
316        }
317
318        let mock = MockHistoryApiClient::new(responses);
319
320        let rt = tokio::runtime::Builder::new_current_thread()
321            .enable_all()
322            .build()
323            .unwrap();
324
325        rt.block_on(mock.get_history("arn:exec:1", None)).unwrap();
326        rt.block_on(mock.get_history("arn:exec:1", Some("m1")))
327            .unwrap();
328        rt.block_on(mock.get_history("arn:exec:2", Some("m2")))
329            .unwrap();
330
331        let calls = mock.calls.lock().unwrap();
332        assert_eq!(calls.len(), 3);
333        assert_eq!(calls[0].0, "arn:exec:1");
334        assert_eq!(calls[0].1, None);
335        assert_eq!(calls[1].1, Some("m1".to_string()));
336        assert_eq!(calls[2].0, "arn:exec:2");
337        assert_eq!(calls[2].1, Some("m2".to_string()));
338    }
339
340    #[test]
341    #[should_panic(expected = "no more responses configured")]
342    fn mock_panics_when_responses_exhausted() {
343        let mock = MockHistoryApiClient::new(VecDeque::new());
344
345        let rt = tokio::runtime::Builder::new_current_thread()
346            .enable_all()
347            .build()
348            .unwrap();
349
350        // Should panic — no responses configured
351        let _ = rt.block_on(mock.get_history("arn:test", None));
352    }
353
354    // --- HistoryPoller tests ---
355
356    fn empty_page() -> HistoryPage {
357        HistoryPage {
358            events: vec![],
359            operations: vec![],
360            next_marker: None,
361            is_terminal: false,
362            terminal_status: None,
363            terminal_result: None,
364            terminal_error: None,
365        }
366    }
367
368    #[test]
369    fn new_sets_defaults() {
370        let mock = MockHistoryApiClient::new(VecDeque::new());
371        let poller =
372            HistoryPoller::new(mock, "arn:test:123".to_string(), Duration::from_millis(500));
373
374        assert_eq!(poller.durable_execution_arn, "arn:test:123");
375        assert_eq!(poller.poll_interval, Duration::from_millis(500));
376        assert!(poller.last_marker.is_none());
377        assert_eq!(poller.max_retries, 3);
378    }
379
380    #[tokio::test]
381    async fn call_with_retries_succeeds_on_first_try() {
382        let mut responses = VecDeque::new();
383        responses.push_back(Ok(empty_page()));
384
385        let mock = MockHistoryApiClient::new(responses);
386        let poller = HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(100));
387
388        let result = poller.call_with_retries(None).await;
389        assert!(result.is_ok());
390
391        let calls = poller.api_client.calls.lock().unwrap();
392        assert_eq!(calls.len(), 1);
393    }
394
395    #[tokio::test]
396    async fn call_with_retries_succeeds_after_transient_errors() {
397        tokio::time::pause();
398
399        let mut responses = VecDeque::new();
400        // 2 failures then success
401        responses.push_back(Err(TestError::aws_error("transient 1")));
402        responses.push_back(Err(TestError::aws_error("transient 2")));
403        responses.push_back(Ok(empty_page()));
404
405        let mock = MockHistoryApiClient::new(responses);
406        let poller = HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(100));
407
408        let result = poller.call_with_retries(None).await;
409        assert!(result.is_ok());
410
411        let calls = poller.api_client.calls.lock().unwrap();
412        assert_eq!(calls.len(), 3);
413    }
414
415    #[tokio::test]
416    async fn call_with_retries_fails_after_max_retries() {
417        tokio::time::pause();
418
419        let mut responses = VecDeque::new();
420        // 4 failures (exceeds max_retries of 3)
421        responses.push_back(Err(TestError::aws_error("fail 1")));
422        responses.push_back(Err(TestError::aws_error("fail 2")));
423        responses.push_back(Err(TestError::aws_error("fail 3")));
424        responses.push_back(Err(TestError::aws_error("fail 4")));
425
426        let mock = MockHistoryApiClient::new(responses);
427        let poller = HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(100));
428
429        let result = poller.call_with_retries(None).await;
430        assert!(result.is_err());
431        assert!(result.unwrap_err().to_string().contains("fail 4"));
432
433        // Should have made 4 calls: initial + 3 retries
434        let calls = poller.api_client.calls.lock().unwrap();
435        assert_eq!(calls.len(), 4);
436    }
437
438    #[tokio::test]
439    async fn call_with_retries_passes_marker() {
440        let mut responses = VecDeque::new();
441        responses.push_back(Ok(empty_page()));
442
443        let mock = MockHistoryApiClient::new(responses);
444        let poller = HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(100));
445
446        let result = poller.call_with_retries(Some("my-marker")).await;
447        assert!(result.is_ok());
448
449        let calls = poller.api_client.calls.lock().unwrap();
450        assert_eq!(calls[0].1, Some("my-marker".to_string()));
451    }
452
453    #[tokio::test]
454    async fn call_with_retries_uses_correct_arn() {
455        let mut responses = VecDeque::new();
456        responses.push_back(Ok(empty_page()));
457
458        let mock = MockHistoryApiClient::new(responses);
459        let poller = HistoryPoller::new(
460            mock,
461            "arn:aws:lambda:us-east-1:123:exec-456".to_string(),
462            Duration::from_millis(100),
463        );
464
465        poller.call_with_retries(None).await.unwrap();
466
467        let calls = poller.api_client.calls.lock().unwrap();
468        assert_eq!(calls[0].0, "arn:aws:lambda:us-east-1:123:exec-456");
469    }
470
471    #[tokio::test]
472    async fn call_with_retries_backoff_timing() {
473        tokio::time::pause();
474
475        let mut responses = VecDeque::new();
476        // 3 failures then success
477        responses.push_back(Err(TestError::aws_error("fail 1")));
478        responses.push_back(Err(TestError::aws_error("fail 2")));
479        responses.push_back(Err(TestError::aws_error("fail 3")));
480        responses.push_back(Ok(empty_page()));
481
482        let mock = MockHistoryApiClient::new(responses);
483        let poller = HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(100));
484
485        let start = tokio::time::Instant::now();
486        let result = poller.call_with_retries(None).await;
487        let elapsed = start.elapsed();
488
489        assert!(result.is_ok());
490
491        // Expected delays:
492        // attempt 1: (1² - 1) * 150 + 1000 = 1000ms
493        // attempt 2: (2² - 1) * 150 + 1000 = 1450ms
494        // attempt 3: (3² - 1) * 150 + 1000 = 2200ms
495        // Total: 4650ms
496        let expected_total = Duration::from_millis(1000 + 1450 + 2200);
497        assert!(
498            elapsed >= expected_total,
499            "Expected at least {:?} elapsed, got {:?}",
500            expected_total,
501            elapsed
502        );
503    }
504
505    // --- poll_once tests ---
506
507    fn make_operation(id: &str) -> Operation {
508        Operation::new(id, durable_execution_sdk::OperationType::Step)
509    }
510
511    fn make_event(event_type: &str) -> crate::test_result::HistoryEvent {
512        crate::test_result::HistoryEvent::new(event_type)
513    }
514
515    #[tokio::test]
516    async fn poll_once_single_page_no_terminal() {
517        let op = make_operation("op-1");
518        let evt = make_event("StepStarted");
519
520        let mut responses = VecDeque::new();
521        responses.push_back(Ok(HistoryPage {
522            events: vec![evt],
523            operations: vec![op],
524            next_marker: None,
525            is_terminal: false,
526            terminal_status: None,
527            terminal_result: None,
528            terminal_error: None,
529        }));
530
531        let mock = MockHistoryApiClient::new(responses);
532        let mut poller =
533            HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(10));
534
535        let result = poller.poll_once().await.unwrap();
536        assert_eq!(result.operations.len(), 1);
537        assert_eq!(result.operations[0].operation_id, "op-1");
538        assert_eq!(result.events.len(), 1);
539        assert!(result.terminal.is_none());
540        // last_marker should be None (no next_marker on the page)
541        assert!(poller.last_marker.is_none());
542    }
543
544    #[tokio::test]
545    async fn poll_once_multi_page_pagination() {
546        tokio::time::pause();
547
548        let mut responses = VecDeque::new();
549        // Page 1: has next_marker
550        responses.push_back(Ok(HistoryPage {
551            events: vec![make_event("E1")],
552            operations: vec![make_operation("op-1")],
553            next_marker: Some("marker-A".to_string()),
554            is_terminal: false,
555            terminal_status: None,
556            terminal_result: None,
557            terminal_error: None,
558        }));
559        // Page 2: has next_marker
560        responses.push_back(Ok(HistoryPage {
561            events: vec![make_event("E2")],
562            operations: vec![make_operation("op-2")],
563            next_marker: Some("marker-B".to_string()),
564            is_terminal: false,
565            terminal_status: None,
566            terminal_result: None,
567            terminal_error: None,
568        }));
569        // Page 3: no next_marker (end of pagination)
570        responses.push_back(Ok(HistoryPage {
571            events: vec![make_event("E3")],
572            operations: vec![make_operation("op-3")],
573            next_marker: None,
574            is_terminal: false,
575            terminal_status: None,
576            terminal_result: None,
577            terminal_error: None,
578        }));
579
580        let mock = MockHistoryApiClient::new(responses);
581        let mut poller =
582            HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(50));
583
584        let result = poller.poll_once().await.unwrap();
585
586        // All 3 pages aggregated
587        assert_eq!(result.operations.len(), 3);
588        assert_eq!(result.events.len(), 3);
589        assert!(result.terminal.is_none());
590
591        // Verify API calls used correct markers
592        let calls = poller.api_client.calls.lock().unwrap();
593        assert_eq!(calls.len(), 3);
594        assert_eq!(calls[0].1, None); // first call: no marker
595        assert_eq!(calls[1].1, Some("marker-A".to_string()));
596        assert_eq!(calls[2].1, Some("marker-B".to_string()));
597
598        // last_marker should be the last page's current_marker (marker-B)
599        // since the last page had no next_marker, current_marker stays as marker-B
600        assert_eq!(poller.last_marker, Some("marker-B".to_string()));
601    }
602
603    #[tokio::test]
604    async fn poll_once_detects_terminal_state() {
605        let mut responses = VecDeque::new();
606        responses.push_back(Ok(HistoryPage {
607            events: vec![make_event("ExecutionSucceeded")],
608            operations: vec![make_operation("op-1")],
609            next_marker: None,
610            is_terminal: true,
611            terminal_status: Some(crate::types::ExecutionStatus::Succeeded),
612            terminal_result: Some(r#"{"value": 42}"#.to_string()),
613            terminal_error: None,
614        }));
615
616        let mock = MockHistoryApiClient::new(responses);
617        let mut poller =
618            HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(10));
619
620        let result = poller.poll_once().await.unwrap();
621        assert!(result.terminal.is_some());
622        let terminal = result.terminal.unwrap();
623        assert_eq!(terminal.status, crate::types::ExecutionStatus::Succeeded);
624        assert_eq!(terminal.result, Some(r#"{"value": 42}"#.to_string()));
625        assert!(terminal.error.is_none());
626    }
627
628    #[tokio::test]
629    async fn poll_once_terminal_with_error() {
630        let mut responses = VecDeque::new();
631        responses.push_back(Ok(HistoryPage {
632            events: vec![],
633            operations: vec![],
634            next_marker: None,
635            is_terminal: true,
636            terminal_status: Some(crate::types::ExecutionStatus::Failed),
637            terminal_result: None,
638            terminal_error: Some(crate::types::TestResultError::new("RuntimeError", "boom")),
639        }));
640
641        let mock = MockHistoryApiClient::new(responses);
642        let mut poller =
643            HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(10));
644
645        let result = poller.poll_once().await.unwrap();
646        let terminal = result.terminal.unwrap();
647        assert_eq!(terminal.status, crate::types::ExecutionStatus::Failed);
648        assert!(terminal.error.is_some());
649        assert_eq!(
650            terminal.error.unwrap().error_type,
651            Some("RuntimeError".to_string())
652        );
653    }
654
655    #[tokio::test]
656    async fn poll_once_marker_continuity_across_cycles() {
657        tokio::time::pause();
658
659        let mut responses = VecDeque::new();
660        // Cycle 1: single page with next_marker
661        responses.push_back(Ok(HistoryPage {
662            events: vec![],
663            operations: vec![make_operation("op-1")],
664            next_marker: Some("cycle1-marker".to_string()),
665            is_terminal: false,
666            terminal_status: None,
667            terminal_result: None,
668            terminal_error: None,
669        }));
670        // Cycle 1 page 2: no next_marker
671        responses.push_back(Ok(HistoryPage {
672            events: vec![],
673            operations: vec![],
674            next_marker: None,
675            is_terminal: false,
676            terminal_status: None,
677            terminal_result: None,
678            terminal_error: None,
679        }));
680        // Cycle 2: should start with cycle1-marker
681        responses.push_back(Ok(HistoryPage {
682            events: vec![],
683            operations: vec![make_operation("op-2")],
684            next_marker: None,
685            is_terminal: true,
686            terminal_status: Some(crate::types::ExecutionStatus::Succeeded),
687            terminal_result: Some("\"done\"".to_string()),
688            terminal_error: None,
689        }));
690
691        let mock = MockHistoryApiClient::new(responses);
692        let mut poller =
693            HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(10));
694
695        // Cycle 1
696        let _r1 = poller.poll_once().await.unwrap();
697        assert_eq!(poller.last_marker, Some("cycle1-marker".to_string()));
698
699        // Cycle 2: should use cycle1-marker as starting marker
700        let _r2 = poller.poll_once().await.unwrap();
701
702        let calls = poller.api_client.calls.lock().unwrap();
703        // Cycle 2's first call should use the marker from cycle 1
704        assert_eq!(calls[2].1, Some("cycle1-marker".to_string()));
705    }
706
707    #[tokio::test]
708    async fn poll_once_propagates_api_error() {
709        tokio::time::pause();
710
711        let mut responses = VecDeque::new();
712        // All retries fail
713        for _ in 0..4 {
714            responses.push_back(Err(TestError::aws_error("service unavailable")));
715        }
716
717        let mock = MockHistoryApiClient::new(responses);
718        let mut poller =
719            HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(10));
720
721        let result = poller.poll_once().await;
722        assert!(result.is_err());
723    }
724
725    #[tokio::test]
726    async fn poll_once_empty_page() {
727        let mut responses = VecDeque::new();
728        responses.push_back(Ok(empty_page()));
729
730        let mock = MockHistoryApiClient::new(responses);
731        let mut poller =
732            HistoryPoller::new(mock, "arn:test".to_string(), Duration::from_millis(10));
733
734        let result = poller.poll_once().await.unwrap();
735        assert!(result.operations.is_empty());
736        assert!(result.events.is_empty());
737        assert!(result.terminal.is_none());
738    }
739
740    #[tokio::test]
741    async fn poll_once_pagination_waits_between_pages() {
742        tokio::time::pause();
743
744        let mut responses = VecDeque::new();
745        responses.push_back(Ok(HistoryPage {
746            events: vec![],
747            operations: vec![],
748            next_marker: Some("m1".to_string()),
749            is_terminal: false,
750            terminal_status: None,
751            terminal_result: None,
752            terminal_error: None,
753        }));
754        responses.push_back(Ok(empty_page()));
755
756        let mock = MockHistoryApiClient::new(responses);
757        let poll_interval = Duration::from_millis(200);
758        let mut poller = HistoryPoller::new(mock, "arn:test".to_string(), poll_interval);
759
760        let start = tokio::time::Instant::now();
761        poller.poll_once().await.unwrap();
762        let elapsed = start.elapsed();
763
764        // Should have waited at least poll_interval between the two pages
765        assert!(
766            elapsed >= poll_interval,
767            "Expected at least {:?} elapsed, got {:?}",
768            poll_interval,
769            elapsed
770        );
771    }
772}