Skip to main content

durable_execution_sdk_testing/
cloud_runner.rs

1//! Cloud test runner for testing deployed Lambda functions.
2//!
3//! This module provides the `CloudDurableTestRunner` for testing durable functions
4//! deployed to AWS Lambda, enabling integration testing against real AWS infrastructure.
5//!
6//! # Examples
7//!
8//! ```ignore
9//! use durable_execution_sdk_testing::{
10//!     CloudDurableTestRunner, CloudTestRunnerConfig, ExecutionStatus,
11//! };
12//!
13//! #[tokio::test]
14//! async fn test_deployed_workflow() {
15//!     let runner = CloudDurableTestRunner::<String>::new("my-function-name")
16//!         .await
17//!         .unwrap();
18//!
19//!     let result = runner.run("input".to_string()).await.unwrap();
20//!     assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
21//! }
22//! ```
23
24use std::collections::HashMap;
25use std::marker::PhantomData;
26use std::sync::Arc;
27use std::time::{Duration, Instant};
28
29use aws_sdk_lambda::Client as LambdaClient;
30use serde::de::DeserializeOwned;
31use serde::Serialize;
32use tokio::sync::RwLock;
33
34use crate::error::TestError;
35use crate::history_poller::{HistoryApiClient, HistoryPage, HistoryPoller};
36use crate::operation::{CallbackSender, DurableOperation};
37use crate::operation_handle::{OperationHandle, OperationMatcher};
38use crate::test_result::TestResult;
39use crate::types::{ExecutionStatus, TestResultError};
40use durable_execution_sdk::{
41    DurableServiceClient, LambdaDurableServiceClient, Operation, OperationStatus, OperationType,
42};
43
44/// Configuration for the cloud test runner.
45///
46/// Controls polling behavior and timeouts when testing deployed Lambda functions.
47///
48/// # Examples
49///
50/// ```
51/// use durable_execution_sdk_testing::CloudTestRunnerConfig;
52/// use std::time::Duration;
53///
54/// let config = CloudTestRunnerConfig {
55///     poll_interval: Duration::from_millis(500),
56///     timeout: Duration::from_secs(60),
57/// };
58/// ```
59#[derive(Debug, Clone)]
60pub struct CloudTestRunnerConfig {
61    /// Polling interval when waiting for execution completion.
62    ///
63    /// Default: 1000ms (1 second)
64    pub poll_interval: Duration,
65
66    /// Maximum wait time for execution completion.
67    ///
68    /// Default: 300 seconds (5 minutes)
69    pub timeout: Duration,
70}
71
72impl Default for CloudTestRunnerConfig {
73    fn default() -> Self {
74        Self {
75            poll_interval: Duration::from_millis(1000),
76            timeout: Duration::from_secs(300),
77        }
78    }
79}
80
81impl CloudTestRunnerConfig {
82    /// Creates a new configuration with default values.
83    pub fn new() -> Self {
84        Self::default()
85    }
86
87    /// Sets the polling interval.
88    ///
89    /// # Arguments
90    ///
91    /// * `interval` - The interval between status polls
92    pub fn with_poll_interval(mut self, interval: Duration) -> Self {
93        self.poll_interval = interval;
94        self
95    }
96
97    /// Sets the timeout.
98    ///
99    /// # Arguments
100    ///
101    /// * `timeout` - The maximum time to wait for execution completion
102    pub fn with_timeout(mut self, timeout: Duration) -> Self {
103        self.timeout = timeout;
104        self
105    }
106}
107
108/// Internal storage for operations captured from cloud execution.
109#[derive(Debug, Default)]
110struct OperationStorage {
111    /// All operations in execution order
112    operations: Vec<Operation>,
113    /// Map from operation ID to index in operations vec
114    operations_by_id: HashMap<String, usize>,
115    /// Map from operation name to indices in operations vec
116    operations_by_name: HashMap<String, Vec<usize>>,
117}
118
119impl OperationStorage {
120    fn new() -> Self {
121        Self::default()
122    }
123
124    #[allow(dead_code)]
125    fn add_operation(&mut self, operation: Operation) {
126        let index = self.operations.len();
127        let id = operation.operation_id.clone();
128        let name = operation.name.clone();
129
130        self.operations.push(operation);
131        self.operations_by_id.insert(id, index);
132
133        if let Some(name) = name {
134            self.operations_by_name.entry(name).or_default().push(index);
135        }
136    }
137
138    /// If an operation with the same `operation_id` already exists, update it
139    /// in place; otherwise append it via `add_operation`.
140    #[allow(dead_code)]
141    pub(crate) fn add_or_update(&mut self, operation: Operation) {
142        if let Some(&idx) = self.operations_by_id.get(&operation.operation_id) {
143            self.operations[idx] = operation;
144        } else {
145            self.add_operation(operation);
146        }
147    }
148
149    fn get_by_id(&self, id: &str) -> Option<&Operation> {
150        self.operations_by_id
151            .get(id)
152            .and_then(|&idx| self.operations.get(idx))
153    }
154
155    fn get_by_name(&self, name: &str) -> Option<&Operation> {
156        self.operations_by_name
157            .get(name)
158            .and_then(|indices| indices.first())
159            .and_then(|&idx| self.operations.get(idx))
160    }
161
162    fn get_by_name_and_index(&self, name: &str, index: usize) -> Option<&Operation> {
163        self.operations_by_name
164            .get(name)
165            .and_then(|indices| indices.get(index))
166            .and_then(|&idx| self.operations.get(idx))
167    }
168
169    fn get_by_index(&self, index: usize) -> Option<&Operation> {
170        self.operations.get(index)
171    }
172
173    fn get_all(&self) -> &[Operation] {
174        &self.operations
175    }
176
177    fn clear(&mut self) {
178        self.operations.clear();
179        self.operations_by_id.clear();
180        self.operations_by_name.clear();
181    }
182}
183
184/// Real implementation of [`HistoryApiClient`] that calls the durable execution
185/// state API via a [`LambdaDurableServiceClient`].
186///
187/// Wraps a `LambdaClient` and creates an internal service client to make
188/// signed HTTP calls to the `GetDurableExecutionHistory` API endpoint.
189///
190/// # Requirements
191///
192/// - 1.1: Polls the GetDurableExecutionHistory API using the Durable_Execution_ARN
193/// - 1.3: Uses the ARN from the Lambda invocation response
194pub struct LambdaHistoryApiClient {
195    service_client: LambdaDurableServiceClient,
196}
197
198impl LambdaHistoryApiClient {
199    /// Creates a new `LambdaHistoryApiClient` from an AWS SDK config.
200    ///
201    /// The config is used to construct a `LambdaDurableServiceClient` that
202    /// makes signed HTTP calls to the durable execution state API.
203    ///
204    /// # Arguments
205    ///
206    /// * `aws_config` - The AWS SDK configuration (same one used to create the `LambdaClient`)
207    pub fn from_aws_config(aws_config: &aws_config::SdkConfig) -> Self {
208        Self {
209            service_client: LambdaDurableServiceClient::from_aws_config(aws_config),
210        }
211    }
212
213    /// Creates a new `LambdaHistoryApiClient` from an existing `LambdaDurableServiceClient`.
214    ///
215    /// Useful when a service client is already available.
216    pub fn from_service_client(service_client: LambdaDurableServiceClient) -> Self {
217        Self { service_client }
218    }
219
220    /// Maps an [`OperationStatus`] to an [`ExecutionStatus`] for terminal detection.
221    fn map_terminal_status(status: &OperationStatus) -> Option<ExecutionStatus> {
222        match status {
223            OperationStatus::Succeeded => Some(ExecutionStatus::Succeeded),
224            OperationStatus::Failed => Some(ExecutionStatus::Failed),
225            OperationStatus::Cancelled => Some(ExecutionStatus::Cancelled),
226            OperationStatus::TimedOut => Some(ExecutionStatus::TimedOut),
227            _ => None,
228        }
229    }
230}
231
232#[async_trait::async_trait]
233impl HistoryApiClient for LambdaHistoryApiClient {
234    /// Retrieves a single page of execution history by calling the durable execution state API.
235    ///
236    /// Detects terminal state by examining EXECUTION-type operations: when an execution
237    /// operation has a terminal status (Succeeded, Failed, Cancelled, TimedOut), the page
238    /// is marked as terminal with the corresponding status, result, and error.
239    async fn get_history(&self, arn: &str, marker: Option<&str>) -> Result<HistoryPage, TestError> {
240        let marker_str = marker.unwrap_or("");
241
242        let response = self
243            .service_client
244            .get_operations(arn, marker_str)
245            .await
246            .map_err(|e| {
247                TestError::aws_error(format!("GetDurableExecutionHistory failed: {}", e))
248            })?;
249
250        // Detect terminal state from EXECUTION-type operations
251        let mut is_terminal = false;
252        let mut terminal_status = None;
253        let mut terminal_result = None;
254        let mut terminal_error = None;
255
256        for op in &response.operations {
257            if op.operation_type == OperationType::Execution {
258                if let Some(exec_status) = Self::map_terminal_status(&op.status) {
259                    is_terminal = true;
260                    terminal_status = Some(exec_status);
261                    terminal_result = op.result.clone();
262                    if let Some(ref err) = op.error {
263                        terminal_error =
264                            Some(TestResultError::new(&err.error_type, &err.error_message));
265                    }
266                    break;
267                }
268            }
269        }
270
271        Ok(HistoryPage {
272            events: Vec::new(), // The state API returns operations, not raw history events
273            operations: response.operations,
274            next_marker: response.next_marker,
275            is_terminal,
276            terminal_status,
277            terminal_result,
278            terminal_error,
279        })
280    }
281}
282
283/// Sends callback signals (success, failure, heartbeat) to a durable execution
284/// via the AWS Lambda API.
285///
286/// This bridges the [`CallbackSender`] trait (used by [`OperationHandle`]) with
287/// the Lambda durable execution callback APIs, enabling handles to send callback
288/// responses during cloud test execution.
289///
290/// # Requirements
291///
292/// - 6.1: Allows sending a callback success signal via the Callback_Sender
293/// - 6.2: Allows sending a callback failure signal via the Callback_Sender
294/// - 6.3: Allows sending a callback heartbeat signal via the Callback_Sender
295/// - 6.4: THE Cloud_Runner SHALL configure Operation_Handle instances with a
296///   Callback_Sender that communicates with the durable execution service
297pub(crate) struct CloudCallbackSender {
298    /// The AWS Lambda client used to send callback API calls
299    client: LambdaClient,
300    /// The ARN of the durable execution to send callbacks to
301    _durable_execution_arn: String,
302}
303
304impl CloudCallbackSender {
305    /// Creates a new `CloudCallbackSender`.
306    ///
307    /// # Arguments
308    ///
309    /// * `client` - The AWS Lambda client
310    /// * `durable_execution_arn` - The ARN of the durable execution
311    pub fn new(client: LambdaClient, durable_execution_arn: String) -> Self {
312        Self {
313            client,
314            _durable_execution_arn: durable_execution_arn,
315        }
316    }
317}
318
319#[async_trait::async_trait]
320impl CallbackSender for CloudCallbackSender {
321    /// Sends a success response for a callback operation.
322    ///
323    /// Calls the `SendDurableExecutionCallbackSuccess` Lambda API with the
324    /// callback ID and result payload.
325    ///
326    /// # Arguments
327    ///
328    /// * `callback_id` - The unique callback identifier
329    /// * `result` - The success result payload (serialized as bytes)
330    async fn send_success(&self, callback_id: &str, result: &str) -> Result<(), TestError> {
331        self.client
332            .send_durable_execution_callback_success()
333            .callback_id(callback_id)
334            .result(aws_sdk_lambda::primitives::Blob::new(result.as_bytes()))
335            .send()
336            .await
337            .map_err(|e| {
338                TestError::aws_error(format!(
339                    "SendDurableExecutionCallbackSuccess failed for callback '{}': {}",
340                    callback_id, e
341                ))
342            })?;
343        Ok(())
344    }
345
346    /// Sends a failure response for a callback operation.
347    ///
348    /// Calls the `SendDurableExecutionCallbackFailure` Lambda API with the
349    /// callback ID and error details.
350    ///
351    /// # Arguments
352    ///
353    /// * `callback_id` - The unique callback identifier
354    /// * `error` - The error information to send
355    async fn send_failure(
356        &self,
357        callback_id: &str,
358        error: &TestResultError,
359    ) -> Result<(), TestError> {
360        let error_object = aws_sdk_lambda::types::ErrorObject::builder()
361            .set_error_type(error.error_type.clone())
362            .set_error_message(error.error_message.clone())
363            .build();
364
365        self.client
366            .send_durable_execution_callback_failure()
367            .callback_id(callback_id)
368            .error(error_object)
369            .send()
370            .await
371            .map_err(|e| {
372                TestError::aws_error(format!(
373                    "SendDurableExecutionCallbackFailure failed for callback '{}': {}",
374                    callback_id, e
375                ))
376            })?;
377        Ok(())
378    }
379
380    /// Sends a heartbeat for a callback operation.
381    ///
382    /// Calls the `SendDurableExecutionCallbackHeartbeat` Lambda API with the
383    /// callback ID to keep the callback active.
384    ///
385    /// # Arguments
386    ///
387    /// * `callback_id` - The unique callback identifier
388    async fn send_heartbeat(&self, callback_id: &str) -> Result<(), TestError> {
389        self.client
390            .send_durable_execution_callback_heartbeat()
391            .callback_id(callback_id)
392            .send()
393            .await
394            .map_err(|e| {
395                TestError::aws_error(format!(
396                    "SendDurableExecutionCallbackHeartbeat failed for callback '{}': {}",
397                    callback_id, e
398                ))
399            })?;
400        Ok(())
401    }
402}
403
404/// Cloud test runner for testing deployed Lambda functions.
405///
406/// Invokes deployed Lambda functions and polls for execution completion,
407/// enabling integration testing against real AWS infrastructure.
408///
409/// # Type Parameters
410///
411/// * `O` - The output type (must be deserializable)
412///
413/// # Examples
414///
415/// ```ignore
416/// use durable_execution_sdk_testing::CloudDurableTestRunner;
417///
418/// // Create runner with default AWS config
419/// let runner = CloudDurableTestRunner::<String>::new("my-function")
420///     .await
421///     .unwrap();
422///
423/// // Run test
424/// let result = runner.run("input".to_string()).await.unwrap();
425/// println!("Status: {:?}", result.get_status());
426/// ```
427pub struct CloudDurableTestRunner<O>
428where
429    O: DeserializeOwned + Send,
430{
431    /// The Lambda function name or ARN
432    function_name: String,
433    /// The AWS Lambda client
434    lambda_client: LambdaClient,
435    /// The AWS SDK configuration (stored for creating service clients during run)
436    aws_config: Option<aws_config::SdkConfig>,
437    /// Configuration for polling and timeouts
438    config: CloudTestRunnerConfig,
439    /// Storage for captured operations
440    operation_storage: OperationStorage,
441    /// Pre-registered operation handles for lazy population during execution
442    handles: Vec<OperationHandle>,
443    /// Shared operations list for child operation enumeration across handles
444    all_operations: Arc<RwLock<Vec<Operation>>>,
445    /// Phantom data for the output type
446    _phantom: PhantomData<O>,
447}
448
449impl<O> std::fmt::Debug for CloudDurableTestRunner<O>
450where
451    O: DeserializeOwned + Send,
452{
453    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
454        f.debug_struct("CloudDurableTestRunner")
455            .field("function_name", &self.function_name)
456            .field("config", &self.config)
457            .field("operation_count", &self.operation_storage.operations.len())
458            .field("handle_count", &self.handles.len())
459            .finish()
460    }
461}
462
463impl<O> CloudDurableTestRunner<O>
464where
465    O: DeserializeOwned + Send,
466{
467    /// Creates a new cloud test runner for the given Lambda function.
468    ///
469    /// This constructor uses the default AWS configuration, which loads
470    /// credentials from environment variables, AWS config files, or IAM roles.
471    ///
472    /// # Arguments
473    ///
474    /// * `function_name` - The Lambda function name or ARN
475    ///
476    /// # Returns
477    ///
478    /// A new `CloudDurableTestRunner` configured with default settings.
479    ///
480    /// # Requirements
481    ///
482    /// - 8.1: WHEN a developer creates a Cloud_Test_Runner with a function name,
483    ///   THE Cloud_Test_Runner SHALL configure the Lambda client for that function
484    ///
485    /// # Examples
486    ///
487    /// ```ignore
488    /// use durable_execution_sdk_testing::CloudDurableTestRunner;
489    ///
490    /// let runner = CloudDurableTestRunner::<String>::new("my-function")
491    ///     .await
492    ///     .unwrap();
493    /// ```
494    pub async fn new(function_name: impl Into<String>) -> Result<Self, TestError> {
495        let aws_cfg = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
496        let lambda_client = LambdaClient::new(&aws_cfg);
497
498        Ok(Self {
499            function_name: function_name.into(),
500            lambda_client,
501            aws_config: Some(aws_cfg),
502            config: CloudTestRunnerConfig::default(),
503            operation_storage: OperationStorage::new(),
504            handles: Vec::new(),
505            all_operations: Arc::new(RwLock::new(Vec::new())),
506            _phantom: PhantomData,
507        })
508    }
509
510    /// Creates a new cloud test runner with a custom Lambda client.
511    ///
512    /// This constructor allows using a pre-configured Lambda client,
513    /// useful for testing with custom credentials or endpoints.
514    ///
515    /// # Arguments
516    ///
517    /// * `function_name` - The Lambda function name or ARN
518    /// * `client` - A pre-configured Lambda client
519    ///
520    /// # Returns
521    ///
522    /// A new `CloudDurableTestRunner` using the provided client.
523    ///
524    /// # Requirements
525    ///
526    /// - 8.4: WHEN a developer provides a custom Lambda client,
527    ///   THE Cloud_Test_Runner SHALL use that client for invocations
528    ///
529    /// # Examples
530    ///
531    /// ```ignore
532    /// use durable_execution_sdk_testing::CloudDurableTestRunner;
533    /// use aws_sdk_lambda::Client as LambdaClient;
534    ///
535    /// let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
536    /// let custom_client = LambdaClient::new(&config);
537    ///
538    /// let runner = CloudDurableTestRunner::<String>::with_client(
539    ///     "my-function",
540    ///     custom_client,
541    /// );
542    /// ```
543    pub fn with_client(function_name: impl Into<String>, client: LambdaClient) -> Self {
544        Self {
545            function_name: function_name.into(),
546            lambda_client: client,
547            aws_config: None,
548            config: CloudTestRunnerConfig::default(),
549            operation_storage: OperationStorage::new(),
550            handles: Vec::new(),
551            all_operations: Arc::new(RwLock::new(Vec::new())),
552            _phantom: PhantomData,
553        }
554    }
555
556    /// Configures the test runner with custom settings.
557    ///
558    /// # Arguments
559    ///
560    /// * `config` - The configuration to use
561    ///
562    /// # Returns
563    ///
564    /// The runner with updated configuration.
565    ///
566    /// # Requirements
567    ///
568    /// - 8.5: WHEN a developer configures poll_interval,
569    ///   THE Cloud_Test_Runner SHALL use that interval when polling for execution status
570    ///
571    /// # Examples
572    ///
573    /// ```ignore
574    /// use durable_execution_sdk_testing::{CloudDurableTestRunner, CloudTestRunnerConfig};
575    /// use std::time::Duration;
576    ///
577    /// let runner = CloudDurableTestRunner::<String>::new("my-function")
578    ///     .await
579    ///     .unwrap()
580    ///     .with_config(CloudTestRunnerConfig {
581    ///         poll_interval: Duration::from_millis(500),
582    ///         timeout: Duration::from_secs(60),
583    ///     });
584    /// ```
585    pub fn with_config(mut self, config: CloudTestRunnerConfig) -> Self {
586        self.config = config;
587        self
588    }
589
590    /// Returns the function name.
591    pub fn function_name(&self) -> &str {
592        &self.function_name
593    }
594
595    /// Returns the current configuration.
596    pub fn config(&self) -> &CloudTestRunnerConfig {
597        &self.config
598    }
599
600    /// Returns a reference to the Lambda client.
601    pub fn lambda_client(&self) -> &LambdaClient {
602        &self.lambda_client
603    }
604}
605
606impl<O> CloudDurableTestRunner<O>
607where
608    O: DeserializeOwned + Send,
609{
610    /// Runs the durable function and polls for execution completion.
611    ///
612    /// This method invokes the Lambda function, then polls the
613    /// `GetDurableExecutionHistory` API until the execution reaches a terminal
614    /// state or the configured timeout elapses. During polling, operations are
615    /// stored in `OperationStorage` and waiting `OperationHandle` instances are
616    /// notified.
617    ///
618    /// # Arguments
619    ///
620    /// * `payload` - The input payload to send to the Lambda function
621    ///
622    /// # Returns
623    ///
624    /// A `TestResult` reflecting the full execution outcome, including all
625    /// operations and history events collected during polling.
626    ///
627    /// # Requirements
628    ///
629    /// - 1.1: Begin polling after Lambda invocation returns an ARN
630    /// - 1.2: Continue polling until terminal state
631    /// - 1.3: Use the Durable_Execution_ARN from invocation
632    /// - 1.4: Stop polling and return TimedOut on timeout
633    /// - 1.5: Stop polling on terminal event
634    /// - 3.1: Populate OperationStorage with polled operations
635    /// - 3.3: TestResult contains all operations from storage
636    /// - 3.4: Clear storage at start of each run
637    /// - 7.1: Use configured poll_interval
638    /// - 7.2: Use configured timeout
639    /// - 8.1: Invoke Lambda, start poller, await completion
640    /// - 8.2: Parse result from terminal event on success
641    /// - 8.3: Parse error from terminal event on failure
642    /// - 8.4: Return error without polling if invocation fails
643    /// - 8.5: Stop poller in all exit paths
644    /// - 9.1: Collect all history events into TestResult
645    /// - 9.2: Preserve chronological order of events
646    /// - 9.3: Include events from all poll cycles
647    ///
648    /// # Examples
649    ///
650    /// ```ignore
651    /// use durable_execution_sdk_testing::CloudDurableTestRunner;
652    ///
653    /// let mut runner = CloudDurableTestRunner::<String>::new("my-function")
654    ///     .await
655    ///     .unwrap();
656    ///
657    /// let result = runner.run("input").await.unwrap();
658    /// println!("Status: {:?}", result.get_status());
659    /// ```
660    pub async fn run<I>(&mut self, payload: I) -> Result<TestResult<O>, TestError>
661    where
662        I: Serialize + Send,
663    {
664        // Requirement 3.4: Clear storage at start of each run
665        self.operation_storage.clear();
666
667        // Requirement 8.1, 8.4: Invoke Lambda and extract ARN
668        let arn = self.invoke_lambda(&payload).await?;
669
670        // Requirement 1.1: Create HistoryPoller with the ARN
671        let history_client = self.create_history_client();
672        let mut poller = HistoryPoller::new(history_client, arn.clone(), self.config.poll_interval);
673
674        // Requirement 6.4: Create CloudCallbackSender and configure handles
675        let callback_sender: Arc<dyn CallbackSender> = Arc::new(CloudCallbackSender::new(
676            self.lambda_client.clone(),
677            arn.clone(),
678        ));
679        for handle in &self.handles {
680            let mut sender = handle.callback_sender.write().await;
681            *sender = Some(callback_sender.clone());
682        }
683
684        // Requirement 7.2: Use configured timeout
685        let deadline = Instant::now() + self.config.timeout;
686        let mut all_events = Vec::new();
687
688        loop {
689            // Requirement 1.4: Check timeout
690            if Instant::now() >= deadline {
691                return Ok(TestResult::with_status(
692                    ExecutionStatus::TimedOut,
693                    self.operation_storage.get_all().to_vec(),
694                ));
695            }
696
697            // Requirement 7.1: Wait poll_interval between cycles
698            tokio::time::sleep(self.config.poll_interval).await;
699
700            let poll_result = poller.poll_once().await?;
701
702            // Requirement 3.1: Populate OperationStorage (deduplicated)
703            for op in &poll_result.operations {
704                self.operation_storage.add_or_update(op.clone());
705            }
706
707            // Requirement 5.5: Notify waiting OperationHandles
708            self.notify_handles().await;
709
710            // Requirement 9.1: Collect history events
711            all_events.extend(poll_result.events);
712
713            // Requirement 1.2, 1.5: Check terminal state
714            if let Some(terminal) = poll_result.terminal {
715                let mut result = match terminal.status {
716                    ExecutionStatus::Succeeded => {
717                        // Requirement 8.2: Parse result from terminal event
718                        let output: O =
719                            serde_json::from_str(terminal.result.as_deref().unwrap_or("null"))?;
720                        TestResult::success(output, self.operation_storage.get_all().to_vec())
721                    }
722                    status => {
723                        // Requirement 8.3: Parse error from terminal event
724                        let mut r = TestResult::with_status(
725                            status,
726                            self.operation_storage.get_all().to_vec(),
727                        );
728                        if let Some(err) = terminal.error {
729                            r.set_error(err);
730                        }
731                        r
732                    }
733                };
734                // Requirement 9.1, 9.2, 9.3: Include all history events
735                result.set_history_events(all_events);
736                return Ok(result);
737            }
738        }
739    }
740
741    /// Invokes the Lambda function and extracts the `DurableExecutionArn` from the response.
742    ///
743    /// # Requirements
744    ///
745    /// - 8.4: Return error without polling if invocation fails
746    async fn invoke_lambda<I: Serialize>(&self, payload: &I) -> Result<String, TestError> {
747        let payload_json = serde_json::to_vec(payload)?;
748
749        let invoke_result = self
750            .lambda_client
751            .invoke()
752            .function_name(&self.function_name)
753            .payload(aws_sdk_lambda::primitives::Blob::new(payload_json))
754            .send()
755            .await
756            .map_err(|e| TestError::aws_error(format!("Lambda invoke failed: {}", e)))?;
757
758        // Check for function error
759        if let Some(function_error) = invoke_result.function_error() {
760            let error_payload = invoke_result
761                .payload()
762                .map(|p| String::from_utf8_lossy(p.as_ref()).to_string())
763                .unwrap_or_else(|| "Unknown error".to_string());
764
765            return Err(TestError::aws_error(format!(
766                "Lambda function error ({}): {}",
767                function_error, error_payload
768            )));
769        }
770
771        // Parse the response to extract DurableExecutionArn
772        let response_payload = invoke_result
773            .payload()
774            .ok_or_else(|| TestError::aws_error("No response payload from Lambda"))?;
775
776        let response_str = String::from_utf8_lossy(response_payload.as_ref());
777
778        // Try to parse as JSON and extract the ARN
779        let response_json: serde_json::Value = serde_json::from_str(&response_str)
780            .map_err(|e| TestError::aws_error(format!("Failed to parse Lambda response: {}", e)))?;
781
782        let arn = response_json
783            .get("DurableExecutionArn")
784            .or_else(|| response_json.get("durableExecutionArn"))
785            .and_then(|v| v.as_str())
786            .ok_or_else(|| {
787                TestError::aws_error(format!(
788                    "Lambda response missing DurableExecutionArn: {}",
789                    response_str
790                ))
791            })?;
792
793        Ok(arn.to_string())
794    }
795
796    /// Creates a `LambdaHistoryApiClient` from the stored AWS config.
797    fn create_history_client(&self) -> LambdaHistoryApiClient {
798        match &self.aws_config {
799            Some(cfg) => LambdaHistoryApiClient::from_aws_config(cfg),
800            None => {
801                // Fallback: create a service client from a default config.
802                // This path is used when with_client() was called without an SdkConfig.
803                LambdaHistoryApiClient::from_service_client(
804                    LambdaDurableServiceClient::from_aws_config(
805                        &aws_config::SdkConfig::builder().build(),
806                    ),
807                )
808            }
809        }
810    }
811}
812
813impl<O> CloudDurableTestRunner<O>
814where
815    O: DeserializeOwned + Send,
816{
817    // =========================================================================
818    // Operation Handle Methods (Requirements 5.1, 5.2, 5.3, 5.4)
819    // =========================================================================
820
821    /// Returns a lazy `OperationHandle` that populates with the first operation
822    /// matching the given name during execution.
823    ///
824    /// # Arguments
825    ///
826    /// * `name` - The operation name to match against
827    ///
828    /// # Requirements
829    ///
830    /// - 5.1: THE Cloud_Runner SHALL provide a method to obtain an Operation_Handle
831    ///   by operation name before or during execution.
832    ///
833    /// # Examples
834    ///
835    /// ```ignore
836    /// let handle = runner.get_operation_handle("my-callback");
837    /// // handle is unpopulated until run() executes and produces a matching operation
838    /// ```
839    pub fn get_operation_handle(&mut self, name: &str) -> OperationHandle {
840        let handle = OperationHandle::new(
841            OperationMatcher::ByName(name.to_string()),
842            self.all_operations.clone(),
843        );
844        self.handles.push(handle.clone());
845        handle
846    }
847
848    /// Returns a lazy `OperationHandle` that populates with the operation
849    /// at the given execution order index.
850    ///
851    /// # Arguments
852    ///
853    /// * `index` - The zero-based execution order index
854    ///
855    /// # Requirements
856    ///
857    /// - 5.2: THE Cloud_Runner SHALL provide a method to obtain an Operation_Handle
858    ///   by operation index before or during execution.
859    ///
860    /// # Examples
861    ///
862    /// ```ignore
863    /// let handle = runner.get_operation_handle_by_index(0);
864    /// // handle populates with the first operation created during execution
865    /// ```
866    pub fn get_operation_handle_by_index(&mut self, index: usize) -> OperationHandle {
867        let handle = OperationHandle::new(
868            OperationMatcher::ByIndex(index),
869            self.all_operations.clone(),
870        );
871        self.handles.push(handle.clone());
872        handle
873    }
874
875    /// Returns a lazy `OperationHandle` that populates with the nth operation
876    /// matching the given name during execution.
877    ///
878    /// # Arguments
879    ///
880    /// * `name` - The operation name to match against
881    /// * `index` - The zero-based index among operations with that name
882    ///
883    /// # Requirements
884    ///
885    /// - 5.3: THE Cloud_Runner SHALL provide a method to obtain an Operation_Handle
886    ///   by operation name and index before or during execution.
887    ///
888    /// # Examples
889    ///
890    /// ```ignore
891    /// let handle = runner.get_operation_handle_by_name_and_index("process", 1);
892    /// // handle populates with the second "process" operation during execution
893    /// ```
894    pub fn get_operation_handle_by_name_and_index(
895        &mut self,
896        name: &str,
897        index: usize,
898    ) -> OperationHandle {
899        let handle = OperationHandle::new(
900            OperationMatcher::ByNameAndIndex(name.to_string(), index),
901            self.all_operations.clone(),
902        );
903        self.handles.push(handle.clone());
904        handle
905    }
906
907    /// Returns a lazy `OperationHandle` that populates with the operation
908    /// matching the given unique ID.
909    ///
910    /// # Arguments
911    ///
912    /// * `id` - The unique operation ID to match against
913    ///
914    /// # Requirements
915    ///
916    /// - 5.4: THE Cloud_Runner SHALL provide a method to obtain an Operation_Handle
917    ///   by operation id before or during execution.
918    ///
919    /// # Examples
920    ///
921    /// ```ignore
922    /// let handle = runner.get_operation_handle_by_id("op-abc-123");
923    /// // handle populates with the operation whose ID matches during execution
924    /// ```
925    pub fn get_operation_handle_by_id(&mut self, id: &str) -> OperationHandle {
926        let handle = OperationHandle::new(
927            OperationMatcher::ById(id.to_string()),
928            self.all_operations.clone(),
929        );
930        self.handles.push(handle.clone());
931        handle
932    }
933
934    /// Notifies all registered operation handles with matching operation data
935    /// from the operation storage, and updates the shared `all_operations` list.
936    ///
937    /// For each handle, the matcher is used to look up the corresponding operation
938    /// in storage. If found, the handle's inner data is populated and a status
939    /// notification is sent via the watch channel so that `wait_for_data` resolves.
940    ///
941    /// # Requirements
942    ///
943    /// - 5.5: WHEN the History_Poller populates new operation data,
944    ///   THE Cloud_Runner SHALL notify waiting Operation_Handle instances
945    ///   so that their `wait_for_data` calls resolve.
946    pub(crate) async fn notify_handles(&self) {
947        for handle in &self.handles {
948            let matched_op = match &handle.matcher {
949                OperationMatcher::ByName(name) => self.operation_storage.get_by_name(name).cloned(),
950                OperationMatcher::ByIndex(idx) => {
951                    self.operation_storage.get_by_index(*idx).cloned()
952                }
953                OperationMatcher::ById(id) => self.operation_storage.get_by_id(id).cloned(),
954                OperationMatcher::ByNameAndIndex(name, idx) => self
955                    .operation_storage
956                    .get_by_name_and_index(name, *idx)
957                    .cloned(),
958            };
959            if let Some(op) = matched_op {
960                let status = op.status;
961                let mut inner = handle.inner.write().await;
962                *inner = Some(op);
963                drop(inner);
964                let _ = handle.status_tx.send(Some(status));
965            }
966        }
967        // Update shared all_operations for child enumeration
968        let mut all_ops = self.all_operations.write().await;
969        *all_ops = self.operation_storage.get_all().to_vec();
970    }
971}
972
973impl<O> CloudDurableTestRunner<O>
974where
975    O: DeserializeOwned + Send,
976{
977    // =========================================================================
978    // Operation Lookup Methods (Requirements 4.1, 4.2, 4.3, 4.4)
979    // =========================================================================
980
981    /// Gets the first operation with the given name.
982    ///
983    /// # Arguments
984    ///
985    /// * `name` - The operation name to search for
986    ///
987    /// # Returns
988    ///
989    /// A `DurableOperation` wrapping the first operation with that name,
990    /// or `None` if no operation with that name exists.
991    ///
992    /// # Requirements
993    ///
994    /// - 4.1: WHEN a developer calls get_operation(name) on Test_Runner,
995    ///   THE Test_Runner SHALL return the first operation with that name
996    ///
997    /// # Examples
998    ///
999    /// ```ignore
1000    /// use durable_execution_sdk_testing::CloudDurableTestRunner;
1001    ///
1002    /// let mut runner = CloudDurableTestRunner::<String>::new("my-function")
1003    ///     .await
1004    ///     .unwrap();
1005    /// let _ = runner.run("input".to_string()).await.unwrap();
1006    ///
1007    /// if let Some(op) = runner.get_operation("process_data") {
1008    ///     println!("Found operation: {:?}", op.get_status());
1009    /// }
1010    /// ```
1011    pub fn get_operation(&self, name: &str) -> Option<DurableOperation> {
1012        let all_ops = Arc::new(self.operation_storage.get_all().to_vec());
1013        self.operation_storage
1014            .get_by_name(name)
1015            .cloned()
1016            .map(|op| DurableOperation::new(op).with_operations(Arc::clone(&all_ops)))
1017    }
1018
1019    /// Gets an operation by its index in the execution order.
1020    ///
1021    /// # Arguments
1022    ///
1023    /// * `index` - The zero-based index of the operation
1024    ///
1025    /// # Returns
1026    ///
1027    /// A `DurableOperation` at that index, or `None` if the index is out of bounds.
1028    ///
1029    /// # Requirements
1030    ///
1031    /// - 4.2: WHEN a developer calls get_operation_by_index(index) on Test_Runner,
1032    ///   THE Test_Runner SHALL return the operation at that execution order index
1033    ///
1034    /// # Examples
1035    ///
1036    /// ```ignore
1037    /// use durable_execution_sdk_testing::CloudDurableTestRunner;
1038    ///
1039    /// let mut runner = CloudDurableTestRunner::<String>::new("my-function")
1040    ///     .await
1041    ///     .unwrap();
1042    /// let _ = runner.run("input".to_string()).await.unwrap();
1043    ///
1044    /// // Get the first operation
1045    /// if let Some(op) = runner.get_operation_by_index(0) {
1046    ///     println!("First operation: {:?}", op.get_type());
1047    /// }
1048    /// ```
1049    pub fn get_operation_by_index(&self, index: usize) -> Option<DurableOperation> {
1050        let all_ops = Arc::new(self.operation_storage.get_all().to_vec());
1051        self.operation_storage
1052            .get_by_index(index)
1053            .cloned()
1054            .map(|op| DurableOperation::new(op).with_operations(Arc::clone(&all_ops)))
1055    }
1056
1057    /// Gets an operation by name and occurrence index.
1058    ///
1059    /// This is useful when multiple operations have the same name and you need
1060    /// to access a specific occurrence.
1061    ///
1062    /// # Arguments
1063    ///
1064    /// * `name` - The operation name to search for
1065    /// * `index` - The zero-based index among operations with that name
1066    ///
1067    /// # Returns
1068    ///
1069    /// A `DurableOperation` at that name/index combination, or `None` if not found.
1070    ///
1071    /// # Requirements
1072    ///
1073    /// - 4.3: WHEN a developer calls get_operation_by_name_and_index(name, index) on Test_Runner,
1074    ///   THE Test_Runner SHALL return the nth operation with that name
1075    ///
1076    /// # Examples
1077    ///
1078    /// ```ignore
1079    /// use durable_execution_sdk_testing::CloudDurableTestRunner;
1080    ///
1081    /// let mut runner = CloudDurableTestRunner::<String>::new("my-function")
1082    ///     .await
1083    ///     .unwrap();
1084    /// let _ = runner.run("input".to_string()).await.unwrap();
1085    ///
1086    /// // Get the second "process" operation
1087    /// if let Some(op) = runner.get_operation_by_name_and_index("process", 1) {
1088    ///     println!("Second process operation: {:?}", op.get_status());
1089    /// }
1090    /// ```
1091    pub fn get_operation_by_name_and_index(
1092        &self,
1093        name: &str,
1094        index: usize,
1095    ) -> Option<DurableOperation> {
1096        let all_ops = Arc::new(self.operation_storage.get_all().to_vec());
1097        self.operation_storage
1098            .get_by_name_and_index(name, index)
1099            .cloned()
1100            .map(|op| DurableOperation::new(op).with_operations(Arc::clone(&all_ops)))
1101    }
1102
1103    /// Gets an operation by its unique ID.
1104    ///
1105    /// # Arguments
1106    ///
1107    /// * `id` - The unique operation ID
1108    ///
1109    /// # Returns
1110    ///
1111    /// A `DurableOperation` with that ID, or `None` if no operation with that ID exists.
1112    ///
1113    /// # Requirements
1114    ///
1115    /// - 4.4: WHEN a developer calls get_operation_by_id(id) on Test_Runner,
1116    ///   THE Test_Runner SHALL return the operation with that unique identifier
1117    ///
1118    /// # Examples
1119    ///
1120    /// ```ignore
1121    /// use durable_execution_sdk_testing::CloudDurableTestRunner;
1122    ///
1123    /// let mut runner = CloudDurableTestRunner::<String>::new("my-function")
1124    ///     .await
1125    ///     .unwrap();
1126    /// let _ = runner.run("input".to_string()).await.unwrap();
1127    ///
1128    /// if let Some(op) = runner.get_operation_by_id("op-123") {
1129    ///     println!("Found operation: {:?}", op.get_name());
1130    /// }
1131    /// ```
1132    pub fn get_operation_by_id(&self, id: &str) -> Option<DurableOperation> {
1133        let all_ops = Arc::new(self.operation_storage.get_all().to_vec());
1134        self.operation_storage
1135            .get_by_id(id)
1136            .cloned()
1137            .map(|op| DurableOperation::new(op).with_operations(Arc::clone(&all_ops)))
1138    }
1139
1140    /// Gets all captured operations.
1141    ///
1142    /// # Returns
1143    ///
1144    /// A vector of all operations in execution order.
1145    ///
1146    /// # Examples
1147    ///
1148    /// ```ignore
1149    /// use durable_execution_sdk_testing::CloudDurableTestRunner;
1150    ///
1151    /// let mut runner = CloudDurableTestRunner::<String>::new("my-function")
1152    ///     .await
1153    ///     .unwrap();
1154    /// let _ = runner.run("input".to_string()).await.unwrap();
1155    ///
1156    /// let all_ops = runner.get_all_operations();
1157    /// println!("Total operations: {}", all_ops.len());
1158    /// ```
1159    pub fn get_all_operations(&self) -> Vec<DurableOperation> {
1160        let all_ops = Arc::new(self.operation_storage.get_all().to_vec());
1161        self.operation_storage
1162            .get_all()
1163            .iter()
1164            .cloned()
1165            .map(|op| DurableOperation::new(op).with_operations(Arc::clone(&all_ops)))
1166            .collect()
1167    }
1168
1169    /// Returns the number of captured operations.
1170    pub fn operation_count(&self) -> usize {
1171        self.operation_storage.operations.len()
1172    }
1173
1174    /// Clears all captured operations.
1175    ///
1176    /// This is useful when reusing the runner for multiple test runs.
1177    pub fn clear_operations(&mut self) {
1178        self.operation_storage.clear();
1179    }
1180}
1181
1182#[cfg(test)]
1183mod tests {
1184    use super::*;
1185
1186    #[test]
1187    fn test_config_default() {
1188        let config = CloudTestRunnerConfig::default();
1189        assert_eq!(config.poll_interval, Duration::from_millis(1000));
1190        assert_eq!(config.timeout, Duration::from_secs(300));
1191    }
1192
1193    #[test]
1194    fn test_config_builder() {
1195        let config = CloudTestRunnerConfig::new()
1196            .with_poll_interval(Duration::from_millis(500))
1197            .with_timeout(Duration::from_secs(60));
1198
1199        assert_eq!(config.poll_interval, Duration::from_millis(500));
1200        assert_eq!(config.timeout, Duration::from_secs(60));
1201    }
1202
1203    #[test]
1204    fn test_operation_storage() {
1205        let mut storage = OperationStorage::new();
1206
1207        // Add operations
1208        let mut op1 = Operation::new("op-001", durable_execution_sdk::OperationType::Step);
1209        op1.name = Some("step1".to_string());
1210        storage.add_operation(op1);
1211
1212        let mut op2 = Operation::new("op-002", durable_execution_sdk::OperationType::Wait);
1213        op2.name = Some("wait1".to_string());
1214        storage.add_operation(op2);
1215
1216        let mut op3 = Operation::new("op-003", durable_execution_sdk::OperationType::Step);
1217        op3.name = Some("step1".to_string()); // Same name as op1
1218        storage.add_operation(op3);
1219
1220        // Test get_by_id
1221        assert!(storage.get_by_id("op-001").is_some());
1222        assert!(storage.get_by_id("op-002").is_some());
1223        assert!(storage.get_by_id("nonexistent").is_none());
1224
1225        // Test get_by_name (returns first)
1226        let first_step = storage.get_by_name("step1").unwrap();
1227        assert_eq!(first_step.operation_id, "op-001");
1228
1229        // Test get_by_name_and_index
1230        let second_step = storage.get_by_name_and_index("step1", 1).unwrap();
1231        assert_eq!(second_step.operation_id, "op-003");
1232
1233        // Test get_by_index
1234        let first_op = storage.get_by_index(0).unwrap();
1235        assert_eq!(first_op.operation_id, "op-001");
1236
1237        // Test get_all
1238        assert_eq!(storage.get_all().len(), 3);
1239
1240        // Test clear
1241        storage.clear();
1242        assert_eq!(storage.get_all().len(), 0);
1243    }
1244}