durable_execution_sdk_testing/cloud_runner.rs
1//! Cloud test runner for testing deployed Lambda functions.
2//!
3//! This module provides the `CloudDurableTestRunner` for testing durable functions
4//! deployed to AWS Lambda, enabling integration testing against real AWS infrastructure.
5//!
6//! # Examples
7//!
8//! ```ignore
9//! use durable_execution_sdk_testing::{
10//! CloudDurableTestRunner, CloudTestRunnerConfig, ExecutionStatus,
11//! };
12//!
13//! #[tokio::test]
14//! async fn test_deployed_workflow() {
15//! let runner = CloudDurableTestRunner::<String>::new("my-function-name")
16//! .await
17//! .unwrap();
18//!
19//! let result = runner.run("input".to_string()).await.unwrap();
20//! assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
21//! }
22//! ```
23
24use std::collections::HashMap;
25use std::marker::PhantomData;
26use std::sync::Arc;
27use std::time::{Duration, Instant};
28
29use aws_sdk_lambda::Client as LambdaClient;
30use serde::de::DeserializeOwned;
31use serde::Serialize;
32use tokio::sync::RwLock;
33
34use crate::error::TestError;
35use crate::history_poller::{HistoryApiClient, HistoryPage, HistoryPoller};
36use crate::operation::{CallbackSender, DurableOperation};
37use crate::operation_handle::{OperationHandle, OperationMatcher};
38use crate::test_result::TestResult;
39use crate::types::{ExecutionStatus, TestResultError};
40use durable_execution_sdk::{
41 DurableServiceClient, LambdaDurableServiceClient, Operation, OperationStatus, OperationType,
42};
43
44/// Configuration for the cloud test runner.
45///
46/// Controls polling behavior and timeouts when testing deployed Lambda functions.
47///
48/// # Examples
49///
50/// ```
51/// use durable_execution_sdk_testing::CloudTestRunnerConfig;
52/// use std::time::Duration;
53///
54/// let config = CloudTestRunnerConfig {
55/// poll_interval: Duration::from_millis(500),
56/// timeout: Duration::from_secs(60),
57/// };
58/// ```
59#[derive(Debug, Clone)]
60pub struct CloudTestRunnerConfig {
61 /// Polling interval when waiting for execution completion.
62 ///
63 /// Default: 1000ms (1 second)
64 pub poll_interval: Duration,
65
66 /// Maximum wait time for execution completion.
67 ///
68 /// Default: 300 seconds (5 minutes)
69 pub timeout: Duration,
70}
71
72impl Default for CloudTestRunnerConfig {
73 fn default() -> Self {
74 Self {
75 poll_interval: Duration::from_millis(1000),
76 timeout: Duration::from_secs(300),
77 }
78 }
79}
80
81impl CloudTestRunnerConfig {
82 /// Creates a new configuration with default values.
83 pub fn new() -> Self {
84 Self::default()
85 }
86
87 /// Sets the polling interval.
88 ///
89 /// # Arguments
90 ///
91 /// * `interval` - The interval between status polls
92 pub fn with_poll_interval(mut self, interval: Duration) -> Self {
93 self.poll_interval = interval;
94 self
95 }
96
97 /// Sets the timeout.
98 ///
99 /// # Arguments
100 ///
101 /// * `timeout` - The maximum time to wait for execution completion
102 pub fn with_timeout(mut self, timeout: Duration) -> Self {
103 self.timeout = timeout;
104 self
105 }
106}
107
108/// Internal storage for operations captured from cloud execution.
109#[derive(Debug, Default)]
110struct OperationStorage {
111 /// All operations in execution order
112 operations: Vec<Operation>,
113 /// Map from operation ID to index in operations vec
114 operations_by_id: HashMap<String, usize>,
115 /// Map from operation name to indices in operations vec
116 operations_by_name: HashMap<String, Vec<usize>>,
117}
118
119impl OperationStorage {
120 fn new() -> Self {
121 Self::default()
122 }
123
124 #[allow(dead_code)]
125 fn add_operation(&mut self, operation: Operation) {
126 let index = self.operations.len();
127 let id = operation.operation_id.clone();
128 let name = operation.name.clone();
129
130 self.operations.push(operation);
131 self.operations_by_id.insert(id, index);
132
133 if let Some(name) = name {
134 self.operations_by_name.entry(name).or_default().push(index);
135 }
136 }
137
138 /// If an operation with the same `operation_id` already exists, update it
139 /// in place; otherwise append it via `add_operation`.
140 #[allow(dead_code)]
141 pub(crate) fn add_or_update(&mut self, operation: Operation) {
142 if let Some(&idx) = self.operations_by_id.get(&operation.operation_id) {
143 self.operations[idx] = operation;
144 } else {
145 self.add_operation(operation);
146 }
147 }
148
149 fn get_by_id(&self, id: &str) -> Option<&Operation> {
150 self.operations_by_id
151 .get(id)
152 .and_then(|&idx| self.operations.get(idx))
153 }
154
155 fn get_by_name(&self, name: &str) -> Option<&Operation> {
156 self.operations_by_name
157 .get(name)
158 .and_then(|indices| indices.first())
159 .and_then(|&idx| self.operations.get(idx))
160 }
161
162 fn get_by_name_and_index(&self, name: &str, index: usize) -> Option<&Operation> {
163 self.operations_by_name
164 .get(name)
165 .and_then(|indices| indices.get(index))
166 .and_then(|&idx| self.operations.get(idx))
167 }
168
169 fn get_by_index(&self, index: usize) -> Option<&Operation> {
170 self.operations.get(index)
171 }
172
173 fn get_all(&self) -> &[Operation] {
174 &self.operations
175 }
176
177 fn clear(&mut self) {
178 self.operations.clear();
179 self.operations_by_id.clear();
180 self.operations_by_name.clear();
181 }
182}
183
184/// Real implementation of [`HistoryApiClient`] that calls the durable execution
185/// state API via a [`LambdaDurableServiceClient`].
186///
187/// Wraps a `LambdaClient` and creates an internal service client to make
188/// signed HTTP calls to the `GetDurableExecutionHistory` API endpoint.
189///
190/// # Requirements
191///
192/// - 1.1: Polls the GetDurableExecutionHistory API using the Durable_Execution_ARN
193/// - 1.3: Uses the ARN from the Lambda invocation response
194pub struct LambdaHistoryApiClient {
195 service_client: LambdaDurableServiceClient,
196}
197
198impl LambdaHistoryApiClient {
199 /// Creates a new `LambdaHistoryApiClient` from an AWS SDK config.
200 ///
201 /// The config is used to construct a `LambdaDurableServiceClient` that
202 /// makes signed HTTP calls to the durable execution state API.
203 ///
204 /// # Arguments
205 ///
206 /// * `aws_config` - The AWS SDK configuration (same one used to create the `LambdaClient`)
207 pub fn from_aws_config(aws_config: &aws_config::SdkConfig) -> Self {
208 Self {
209 service_client: LambdaDurableServiceClient::from_aws_config(aws_config),
210 }
211 }
212
213 /// Creates a new `LambdaHistoryApiClient` from an existing `LambdaDurableServiceClient`.
214 ///
215 /// Useful when a service client is already available.
216 pub fn from_service_client(service_client: LambdaDurableServiceClient) -> Self {
217 Self { service_client }
218 }
219
220 /// Maps an [`OperationStatus`] to an [`ExecutionStatus`] for terminal detection.
221 fn map_terminal_status(status: &OperationStatus) -> Option<ExecutionStatus> {
222 match status {
223 OperationStatus::Succeeded => Some(ExecutionStatus::Succeeded),
224 OperationStatus::Failed => Some(ExecutionStatus::Failed),
225 OperationStatus::Cancelled => Some(ExecutionStatus::Cancelled),
226 OperationStatus::TimedOut => Some(ExecutionStatus::TimedOut),
227 _ => None,
228 }
229 }
230}
231
232#[async_trait::async_trait]
233impl HistoryApiClient for LambdaHistoryApiClient {
234 /// Retrieves a single page of execution history by calling the durable execution state API.
235 ///
236 /// Detects terminal state by examining EXECUTION-type operations: when an execution
237 /// operation has a terminal status (Succeeded, Failed, Cancelled, TimedOut), the page
238 /// is marked as terminal with the corresponding status, result, and error.
239 async fn get_history(&self, arn: &str, marker: Option<&str>) -> Result<HistoryPage, TestError> {
240 let marker_str = marker.unwrap_or("");
241
242 let response = self
243 .service_client
244 .get_operations(arn, marker_str)
245 .await
246 .map_err(|e| {
247 TestError::aws_error(format!("GetDurableExecutionHistory failed: {}", e))
248 })?;
249
250 // Detect terminal state from EXECUTION-type operations
251 let mut is_terminal = false;
252 let mut terminal_status = None;
253 let mut terminal_result = None;
254 let mut terminal_error = None;
255
256 for op in &response.operations {
257 if op.operation_type == OperationType::Execution {
258 if let Some(exec_status) = Self::map_terminal_status(&op.status) {
259 is_terminal = true;
260 terminal_status = Some(exec_status);
261 terminal_result = op.result.clone();
262 if let Some(ref err) = op.error {
263 terminal_error =
264 Some(TestResultError::new(&err.error_type, &err.error_message));
265 }
266 break;
267 }
268 }
269 }
270
271 Ok(HistoryPage {
272 events: Vec::new(), // The state API returns operations, not raw history events
273 operations: response.operations,
274 next_marker: response.next_marker,
275 is_terminal,
276 terminal_status,
277 terminal_result,
278 terminal_error,
279 })
280 }
281}
282
283/// Sends callback signals (success, failure, heartbeat) to a durable execution
284/// via the AWS Lambda API.
285///
286/// This bridges the [`CallbackSender`] trait (used by [`OperationHandle`]) with
287/// the Lambda durable execution callback APIs, enabling handles to send callback
288/// responses during cloud test execution.
289///
290/// # Requirements
291///
292/// - 6.1: Allows sending a callback success signal via the Callback_Sender
293/// - 6.2: Allows sending a callback failure signal via the Callback_Sender
294/// - 6.3: Allows sending a callback heartbeat signal via the Callback_Sender
295/// - 6.4: THE Cloud_Runner SHALL configure Operation_Handle instances with a
296/// Callback_Sender that communicates with the durable execution service
297pub(crate) struct CloudCallbackSender {
298 /// The AWS Lambda client used to send callback API calls
299 client: LambdaClient,
300 /// The ARN of the durable execution to send callbacks to
301 _durable_execution_arn: String,
302}
303
304impl CloudCallbackSender {
305 /// Creates a new `CloudCallbackSender`.
306 ///
307 /// # Arguments
308 ///
309 /// * `client` - The AWS Lambda client
310 /// * `durable_execution_arn` - The ARN of the durable execution
311 pub fn new(client: LambdaClient, durable_execution_arn: String) -> Self {
312 Self {
313 client,
314 _durable_execution_arn: durable_execution_arn,
315 }
316 }
317}
318
319#[async_trait::async_trait]
320impl CallbackSender for CloudCallbackSender {
321 /// Sends a success response for a callback operation.
322 ///
323 /// Calls the `SendDurableExecutionCallbackSuccess` Lambda API with the
324 /// callback ID and result payload.
325 ///
326 /// # Arguments
327 ///
328 /// * `callback_id` - The unique callback identifier
329 /// * `result` - The success result payload (serialized as bytes)
330 async fn send_success(&self, callback_id: &str, result: &str) -> Result<(), TestError> {
331 self.client
332 .send_durable_execution_callback_success()
333 .callback_id(callback_id)
334 .result(aws_sdk_lambda::primitives::Blob::new(result.as_bytes()))
335 .send()
336 .await
337 .map_err(|e| {
338 TestError::aws_error(format!(
339 "SendDurableExecutionCallbackSuccess failed for callback '{}': {}",
340 callback_id, e
341 ))
342 })?;
343 Ok(())
344 }
345
346 /// Sends a failure response for a callback operation.
347 ///
348 /// Calls the `SendDurableExecutionCallbackFailure` Lambda API with the
349 /// callback ID and error details.
350 ///
351 /// # Arguments
352 ///
353 /// * `callback_id` - The unique callback identifier
354 /// * `error` - The error information to send
355 async fn send_failure(
356 &self,
357 callback_id: &str,
358 error: &TestResultError,
359 ) -> Result<(), TestError> {
360 let error_object = aws_sdk_lambda::types::ErrorObject::builder()
361 .set_error_type(error.error_type.clone())
362 .set_error_message(error.error_message.clone())
363 .build();
364
365 self.client
366 .send_durable_execution_callback_failure()
367 .callback_id(callback_id)
368 .error(error_object)
369 .send()
370 .await
371 .map_err(|e| {
372 TestError::aws_error(format!(
373 "SendDurableExecutionCallbackFailure failed for callback '{}': {}",
374 callback_id, e
375 ))
376 })?;
377 Ok(())
378 }
379
380 /// Sends a heartbeat for a callback operation.
381 ///
382 /// Calls the `SendDurableExecutionCallbackHeartbeat` Lambda API with the
383 /// callback ID to keep the callback active.
384 ///
385 /// # Arguments
386 ///
387 /// * `callback_id` - The unique callback identifier
388 async fn send_heartbeat(&self, callback_id: &str) -> Result<(), TestError> {
389 self.client
390 .send_durable_execution_callback_heartbeat()
391 .callback_id(callback_id)
392 .send()
393 .await
394 .map_err(|e| {
395 TestError::aws_error(format!(
396 "SendDurableExecutionCallbackHeartbeat failed for callback '{}': {}",
397 callback_id, e
398 ))
399 })?;
400 Ok(())
401 }
402}
403
404/// Cloud test runner for testing deployed Lambda functions.
405///
406/// Invokes deployed Lambda functions and polls for execution completion,
407/// enabling integration testing against real AWS infrastructure.
408///
409/// # Type Parameters
410///
411/// * `O` - The output type (must be deserializable)
412///
413/// # Examples
414///
415/// ```ignore
416/// use durable_execution_sdk_testing::CloudDurableTestRunner;
417///
418/// // Create runner with default AWS config
419/// let runner = CloudDurableTestRunner::<String>::new("my-function")
420/// .await
421/// .unwrap();
422///
423/// // Run test
424/// let result = runner.run("input".to_string()).await.unwrap();
425/// println!("Status: {:?}", result.get_status());
426/// ```
427pub struct CloudDurableTestRunner<O>
428where
429 O: DeserializeOwned + Send,
430{
431 /// The Lambda function name or ARN
432 function_name: String,
433 /// The AWS Lambda client
434 lambda_client: LambdaClient,
435 /// The AWS SDK configuration (stored for creating service clients during run)
436 aws_config: Option<aws_config::SdkConfig>,
437 /// Configuration for polling and timeouts
438 config: CloudTestRunnerConfig,
439 /// Storage for captured operations
440 operation_storage: OperationStorage,
441 /// Pre-registered operation handles for lazy population during execution
442 handles: Vec<OperationHandle>,
443 /// Shared operations list for child operation enumeration across handles
444 all_operations: Arc<RwLock<Vec<Operation>>>,
445 /// Phantom data for the output type
446 _phantom: PhantomData<O>,
447}
448
449impl<O> std::fmt::Debug for CloudDurableTestRunner<O>
450where
451 O: DeserializeOwned + Send,
452{
453 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
454 f.debug_struct("CloudDurableTestRunner")
455 .field("function_name", &self.function_name)
456 .field("config", &self.config)
457 .field("operation_count", &self.operation_storage.operations.len())
458 .field("handle_count", &self.handles.len())
459 .finish()
460 }
461}
462
463impl<O> CloudDurableTestRunner<O>
464where
465 O: DeserializeOwned + Send,
466{
467 /// Creates a new cloud test runner for the given Lambda function.
468 ///
469 /// This constructor uses the default AWS configuration, which loads
470 /// credentials from environment variables, AWS config files, or IAM roles.
471 ///
472 /// # Arguments
473 ///
474 /// * `function_name` - The Lambda function name or ARN
475 ///
476 /// # Returns
477 ///
478 /// A new `CloudDurableTestRunner` configured with default settings.
479 ///
480 /// # Requirements
481 ///
482 /// - 8.1: WHEN a developer creates a Cloud_Test_Runner with a function name,
483 /// THE Cloud_Test_Runner SHALL configure the Lambda client for that function
484 ///
485 /// # Examples
486 ///
487 /// ```ignore
488 /// use durable_execution_sdk_testing::CloudDurableTestRunner;
489 ///
490 /// let runner = CloudDurableTestRunner::<String>::new("my-function")
491 /// .await
492 /// .unwrap();
493 /// ```
494 pub async fn new(function_name: impl Into<String>) -> Result<Self, TestError> {
495 let aws_cfg = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
496 let lambda_client = LambdaClient::new(&aws_cfg);
497
498 Ok(Self {
499 function_name: function_name.into(),
500 lambda_client,
501 aws_config: Some(aws_cfg),
502 config: CloudTestRunnerConfig::default(),
503 operation_storage: OperationStorage::new(),
504 handles: Vec::new(),
505 all_operations: Arc::new(RwLock::new(Vec::new())),
506 _phantom: PhantomData,
507 })
508 }
509
510 /// Creates a new cloud test runner with a custom Lambda client.
511 ///
512 /// This constructor allows using a pre-configured Lambda client,
513 /// useful for testing with custom credentials or endpoints.
514 ///
515 /// # Arguments
516 ///
517 /// * `function_name` - The Lambda function name or ARN
518 /// * `client` - A pre-configured Lambda client
519 ///
520 /// # Returns
521 ///
522 /// A new `CloudDurableTestRunner` using the provided client.
523 ///
524 /// # Requirements
525 ///
526 /// - 8.4: WHEN a developer provides a custom Lambda client,
527 /// THE Cloud_Test_Runner SHALL use that client for invocations
528 ///
529 /// # Examples
530 ///
531 /// ```ignore
532 /// use durable_execution_sdk_testing::CloudDurableTestRunner;
533 /// use aws_sdk_lambda::Client as LambdaClient;
534 ///
535 /// let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
536 /// let custom_client = LambdaClient::new(&config);
537 ///
538 /// let runner = CloudDurableTestRunner::<String>::with_client(
539 /// "my-function",
540 /// custom_client,
541 /// );
542 /// ```
543 pub fn with_client(function_name: impl Into<String>, client: LambdaClient) -> Self {
544 Self {
545 function_name: function_name.into(),
546 lambda_client: client,
547 aws_config: None,
548 config: CloudTestRunnerConfig::default(),
549 operation_storage: OperationStorage::new(),
550 handles: Vec::new(),
551 all_operations: Arc::new(RwLock::new(Vec::new())),
552 _phantom: PhantomData,
553 }
554 }
555
556 /// Configures the test runner with custom settings.
557 ///
558 /// # Arguments
559 ///
560 /// * `config` - The configuration to use
561 ///
562 /// # Returns
563 ///
564 /// The runner with updated configuration.
565 ///
566 /// # Requirements
567 ///
568 /// - 8.5: WHEN a developer configures poll_interval,
569 /// THE Cloud_Test_Runner SHALL use that interval when polling for execution status
570 ///
571 /// # Examples
572 ///
573 /// ```ignore
574 /// use durable_execution_sdk_testing::{CloudDurableTestRunner, CloudTestRunnerConfig};
575 /// use std::time::Duration;
576 ///
577 /// let runner = CloudDurableTestRunner::<String>::new("my-function")
578 /// .await
579 /// .unwrap()
580 /// .with_config(CloudTestRunnerConfig {
581 /// poll_interval: Duration::from_millis(500),
582 /// timeout: Duration::from_secs(60),
583 /// });
584 /// ```
585 pub fn with_config(mut self, config: CloudTestRunnerConfig) -> Self {
586 self.config = config;
587 self
588 }
589
590 /// Returns the function name.
591 pub fn function_name(&self) -> &str {
592 &self.function_name
593 }
594
595 /// Returns the current configuration.
596 pub fn config(&self) -> &CloudTestRunnerConfig {
597 &self.config
598 }
599
600 /// Returns a reference to the Lambda client.
601 pub fn lambda_client(&self) -> &LambdaClient {
602 &self.lambda_client
603 }
604}
605
606impl<O> CloudDurableTestRunner<O>
607where
608 O: DeserializeOwned + Send,
609{
610 /// Runs the durable function and polls for execution completion.
611 ///
612 /// This method invokes the Lambda function, then polls the
613 /// `GetDurableExecutionHistory` API until the execution reaches a terminal
614 /// state or the configured timeout elapses. During polling, operations are
615 /// stored in `OperationStorage` and waiting `OperationHandle` instances are
616 /// notified.
617 ///
618 /// # Arguments
619 ///
620 /// * `payload` - The input payload to send to the Lambda function
621 ///
622 /// # Returns
623 ///
624 /// A `TestResult` reflecting the full execution outcome, including all
625 /// operations and history events collected during polling.
626 ///
627 /// # Requirements
628 ///
629 /// - 1.1: Begin polling after Lambda invocation returns an ARN
630 /// - 1.2: Continue polling until terminal state
631 /// - 1.3: Use the Durable_Execution_ARN from invocation
632 /// - 1.4: Stop polling and return TimedOut on timeout
633 /// - 1.5: Stop polling on terminal event
634 /// - 3.1: Populate OperationStorage with polled operations
635 /// - 3.3: TestResult contains all operations from storage
636 /// - 3.4: Clear storage at start of each run
637 /// - 7.1: Use configured poll_interval
638 /// - 7.2: Use configured timeout
639 /// - 8.1: Invoke Lambda, start poller, await completion
640 /// - 8.2: Parse result from terminal event on success
641 /// - 8.3: Parse error from terminal event on failure
642 /// - 8.4: Return error without polling if invocation fails
643 /// - 8.5: Stop poller in all exit paths
644 /// - 9.1: Collect all history events into TestResult
645 /// - 9.2: Preserve chronological order of events
646 /// - 9.3: Include events from all poll cycles
647 ///
648 /// # Examples
649 ///
650 /// ```ignore
651 /// use durable_execution_sdk_testing::CloudDurableTestRunner;
652 ///
653 /// let mut runner = CloudDurableTestRunner::<String>::new("my-function")
654 /// .await
655 /// .unwrap();
656 ///
657 /// let result = runner.run("input").await.unwrap();
658 /// println!("Status: {:?}", result.get_status());
659 /// ```
660 pub async fn run<I>(&mut self, payload: I) -> Result<TestResult<O>, TestError>
661 where
662 I: Serialize + Send,
663 {
664 // Requirement 3.4: Clear storage at start of each run
665 self.operation_storage.clear();
666
667 // Requirement 8.1, 8.4: Invoke Lambda and extract ARN
668 let arn = self.invoke_lambda(&payload).await?;
669
670 // Requirement 1.1: Create HistoryPoller with the ARN
671 let history_client = self.create_history_client();
672 let mut poller = HistoryPoller::new(history_client, arn.clone(), self.config.poll_interval);
673
674 // Requirement 6.4: Create CloudCallbackSender and configure handles
675 let callback_sender: Arc<dyn CallbackSender> = Arc::new(CloudCallbackSender::new(
676 self.lambda_client.clone(),
677 arn.clone(),
678 ));
679 for handle in &self.handles {
680 let mut sender = handle.callback_sender.write().await;
681 *sender = Some(callback_sender.clone());
682 }
683
684 // Requirement 7.2: Use configured timeout
685 let deadline = Instant::now() + self.config.timeout;
686 let mut all_events = Vec::new();
687
688 loop {
689 // Requirement 1.4: Check timeout
690 if Instant::now() >= deadline {
691 return Ok(TestResult::with_status(
692 ExecutionStatus::TimedOut,
693 self.operation_storage.get_all().to_vec(),
694 ));
695 }
696
697 // Requirement 7.1: Wait poll_interval between cycles
698 tokio::time::sleep(self.config.poll_interval).await;
699
700 let poll_result = poller.poll_once().await?;
701
702 // Requirement 3.1: Populate OperationStorage (deduplicated)
703 for op in &poll_result.operations {
704 self.operation_storage.add_or_update(op.clone());
705 }
706
707 // Requirement 5.5: Notify waiting OperationHandles
708 self.notify_handles().await;
709
710 // Requirement 9.1: Collect history events
711 all_events.extend(poll_result.events);
712
713 // Requirement 1.2, 1.5: Check terminal state
714 if let Some(terminal) = poll_result.terminal {
715 let mut result = match terminal.status {
716 ExecutionStatus::Succeeded => {
717 // Requirement 8.2: Parse result from terminal event
718 let output: O =
719 serde_json::from_str(terminal.result.as_deref().unwrap_or("null"))?;
720 TestResult::success(output, self.operation_storage.get_all().to_vec())
721 }
722 status => {
723 // Requirement 8.3: Parse error from terminal event
724 let mut r = TestResult::with_status(
725 status,
726 self.operation_storage.get_all().to_vec(),
727 );
728 if let Some(err) = terminal.error {
729 r.set_error(err);
730 }
731 r
732 }
733 };
734 // Requirement 9.1, 9.2, 9.3: Include all history events
735 result.set_history_events(all_events);
736 return Ok(result);
737 }
738 }
739 }
740
741 /// Invokes the Lambda function and extracts the `DurableExecutionArn` from the response.
742 ///
743 /// # Requirements
744 ///
745 /// - 8.4: Return error without polling if invocation fails
746 async fn invoke_lambda<I: Serialize>(&self, payload: &I) -> Result<String, TestError> {
747 let payload_json = serde_json::to_vec(payload)?;
748
749 let invoke_result = self
750 .lambda_client
751 .invoke()
752 .function_name(&self.function_name)
753 .payload(aws_sdk_lambda::primitives::Blob::new(payload_json))
754 .send()
755 .await
756 .map_err(|e| TestError::aws_error(format!("Lambda invoke failed: {}", e)))?;
757
758 // Check for function error
759 if let Some(function_error) = invoke_result.function_error() {
760 let error_payload = invoke_result
761 .payload()
762 .map(|p| String::from_utf8_lossy(p.as_ref()).to_string())
763 .unwrap_or_else(|| "Unknown error".to_string());
764
765 return Err(TestError::aws_error(format!(
766 "Lambda function error ({}): {}",
767 function_error, error_payload
768 )));
769 }
770
771 // Parse the response to extract DurableExecutionArn
772 let response_payload = invoke_result
773 .payload()
774 .ok_or_else(|| TestError::aws_error("No response payload from Lambda"))?;
775
776 let response_str = String::from_utf8_lossy(response_payload.as_ref());
777
778 // Try to parse as JSON and extract the ARN
779 let response_json: serde_json::Value = serde_json::from_str(&response_str)
780 .map_err(|e| TestError::aws_error(format!("Failed to parse Lambda response: {}", e)))?;
781
782 let arn = response_json
783 .get("DurableExecutionArn")
784 .or_else(|| response_json.get("durableExecutionArn"))
785 .and_then(|v| v.as_str())
786 .ok_or_else(|| {
787 TestError::aws_error(format!(
788 "Lambda response missing DurableExecutionArn: {}",
789 response_str
790 ))
791 })?;
792
793 Ok(arn.to_string())
794 }
795
796 /// Creates a `LambdaHistoryApiClient` from the stored AWS config.
797 fn create_history_client(&self) -> LambdaHistoryApiClient {
798 match &self.aws_config {
799 Some(cfg) => LambdaHistoryApiClient::from_aws_config(cfg),
800 None => {
801 // Fallback: create a service client from a default config.
802 // This path is used when with_client() was called without an SdkConfig.
803 LambdaHistoryApiClient::from_service_client(
804 LambdaDurableServiceClient::from_aws_config(
805 &aws_config::SdkConfig::builder().build(),
806 ),
807 )
808 }
809 }
810 }
811}
812
813impl<O> CloudDurableTestRunner<O>
814where
815 O: DeserializeOwned + Send,
816{
817 // =========================================================================
818 // Operation Handle Methods (Requirements 5.1, 5.2, 5.3, 5.4)
819 // =========================================================================
820
821 /// Returns a lazy `OperationHandle` that populates with the first operation
822 /// matching the given name during execution.
823 ///
824 /// # Arguments
825 ///
826 /// * `name` - The operation name to match against
827 ///
828 /// # Requirements
829 ///
830 /// - 5.1: THE Cloud_Runner SHALL provide a method to obtain an Operation_Handle
831 /// by operation name before or during execution.
832 ///
833 /// # Examples
834 ///
835 /// ```ignore
836 /// let handle = runner.get_operation_handle("my-callback");
837 /// // handle is unpopulated until run() executes and produces a matching operation
838 /// ```
839 pub fn get_operation_handle(&mut self, name: &str) -> OperationHandle {
840 let handle = OperationHandle::new(
841 OperationMatcher::ByName(name.to_string()),
842 self.all_operations.clone(),
843 );
844 self.handles.push(handle.clone());
845 handle
846 }
847
848 /// Returns a lazy `OperationHandle` that populates with the operation
849 /// at the given execution order index.
850 ///
851 /// # Arguments
852 ///
853 /// * `index` - The zero-based execution order index
854 ///
855 /// # Requirements
856 ///
857 /// - 5.2: THE Cloud_Runner SHALL provide a method to obtain an Operation_Handle
858 /// by operation index before or during execution.
859 ///
860 /// # Examples
861 ///
862 /// ```ignore
863 /// let handle = runner.get_operation_handle_by_index(0);
864 /// // handle populates with the first operation created during execution
865 /// ```
866 pub fn get_operation_handle_by_index(&mut self, index: usize) -> OperationHandle {
867 let handle = OperationHandle::new(
868 OperationMatcher::ByIndex(index),
869 self.all_operations.clone(),
870 );
871 self.handles.push(handle.clone());
872 handle
873 }
874
875 /// Returns a lazy `OperationHandle` that populates with the nth operation
876 /// matching the given name during execution.
877 ///
878 /// # Arguments
879 ///
880 /// * `name` - The operation name to match against
881 /// * `index` - The zero-based index among operations with that name
882 ///
883 /// # Requirements
884 ///
885 /// - 5.3: THE Cloud_Runner SHALL provide a method to obtain an Operation_Handle
886 /// by operation name and index before or during execution.
887 ///
888 /// # Examples
889 ///
890 /// ```ignore
891 /// let handle = runner.get_operation_handle_by_name_and_index("process", 1);
892 /// // handle populates with the second "process" operation during execution
893 /// ```
894 pub fn get_operation_handle_by_name_and_index(
895 &mut self,
896 name: &str,
897 index: usize,
898 ) -> OperationHandle {
899 let handle = OperationHandle::new(
900 OperationMatcher::ByNameAndIndex(name.to_string(), index),
901 self.all_operations.clone(),
902 );
903 self.handles.push(handle.clone());
904 handle
905 }
906
907 /// Returns a lazy `OperationHandle` that populates with the operation
908 /// matching the given unique ID.
909 ///
910 /// # Arguments
911 ///
912 /// * `id` - The unique operation ID to match against
913 ///
914 /// # Requirements
915 ///
916 /// - 5.4: THE Cloud_Runner SHALL provide a method to obtain an Operation_Handle
917 /// by operation id before or during execution.
918 ///
919 /// # Examples
920 ///
921 /// ```ignore
922 /// let handle = runner.get_operation_handle_by_id("op-abc-123");
923 /// // handle populates with the operation whose ID matches during execution
924 /// ```
925 pub fn get_operation_handle_by_id(&mut self, id: &str) -> OperationHandle {
926 let handle = OperationHandle::new(
927 OperationMatcher::ById(id.to_string()),
928 self.all_operations.clone(),
929 );
930 self.handles.push(handle.clone());
931 handle
932 }
933
934 /// Notifies all registered operation handles with matching operation data
935 /// from the operation storage, and updates the shared `all_operations` list.
936 ///
937 /// For each handle, the matcher is used to look up the corresponding operation
938 /// in storage. If found, the handle's inner data is populated and a status
939 /// notification is sent via the watch channel so that `wait_for_data` resolves.
940 ///
941 /// # Requirements
942 ///
943 /// - 5.5: WHEN the History_Poller populates new operation data,
944 /// THE Cloud_Runner SHALL notify waiting Operation_Handle instances
945 /// so that their `wait_for_data` calls resolve.
946 pub(crate) async fn notify_handles(&self) {
947 for handle in &self.handles {
948 let matched_op = match &handle.matcher {
949 OperationMatcher::ByName(name) => self.operation_storage.get_by_name(name).cloned(),
950 OperationMatcher::ByIndex(idx) => {
951 self.operation_storage.get_by_index(*idx).cloned()
952 }
953 OperationMatcher::ById(id) => self.operation_storage.get_by_id(id).cloned(),
954 OperationMatcher::ByNameAndIndex(name, idx) => self
955 .operation_storage
956 .get_by_name_and_index(name, *idx)
957 .cloned(),
958 };
959 if let Some(op) = matched_op {
960 let status = op.status;
961 let mut inner = handle.inner.write().await;
962 *inner = Some(op);
963 drop(inner);
964 let _ = handle.status_tx.send(Some(status));
965 }
966 }
967 // Update shared all_operations for child enumeration
968 let mut all_ops = self.all_operations.write().await;
969 *all_ops = self.operation_storage.get_all().to_vec();
970 }
971}
972
973impl<O> CloudDurableTestRunner<O>
974where
975 O: DeserializeOwned + Send,
976{
977 // =========================================================================
978 // Operation Lookup Methods (Requirements 4.1, 4.2, 4.3, 4.4)
979 // =========================================================================
980
981 /// Gets the first operation with the given name.
982 ///
983 /// # Arguments
984 ///
985 /// * `name` - The operation name to search for
986 ///
987 /// # Returns
988 ///
989 /// A `DurableOperation` wrapping the first operation with that name,
990 /// or `None` if no operation with that name exists.
991 ///
992 /// # Requirements
993 ///
994 /// - 4.1: WHEN a developer calls get_operation(name) on Test_Runner,
995 /// THE Test_Runner SHALL return the first operation with that name
996 ///
997 /// # Examples
998 ///
999 /// ```ignore
1000 /// use durable_execution_sdk_testing::CloudDurableTestRunner;
1001 ///
1002 /// let mut runner = CloudDurableTestRunner::<String>::new("my-function")
1003 /// .await
1004 /// .unwrap();
1005 /// let _ = runner.run("input".to_string()).await.unwrap();
1006 ///
1007 /// if let Some(op) = runner.get_operation("process_data") {
1008 /// println!("Found operation: {:?}", op.get_status());
1009 /// }
1010 /// ```
1011 pub fn get_operation(&self, name: &str) -> Option<DurableOperation> {
1012 let all_ops = Arc::new(self.operation_storage.get_all().to_vec());
1013 self.operation_storage
1014 .get_by_name(name)
1015 .cloned()
1016 .map(|op| DurableOperation::new(op).with_operations(Arc::clone(&all_ops)))
1017 }
1018
1019 /// Gets an operation by its index in the execution order.
1020 ///
1021 /// # Arguments
1022 ///
1023 /// * `index` - The zero-based index of the operation
1024 ///
1025 /// # Returns
1026 ///
1027 /// A `DurableOperation` at that index, or `None` if the index is out of bounds.
1028 ///
1029 /// # Requirements
1030 ///
1031 /// - 4.2: WHEN a developer calls get_operation_by_index(index) on Test_Runner,
1032 /// THE Test_Runner SHALL return the operation at that execution order index
1033 ///
1034 /// # Examples
1035 ///
1036 /// ```ignore
1037 /// use durable_execution_sdk_testing::CloudDurableTestRunner;
1038 ///
1039 /// let mut runner = CloudDurableTestRunner::<String>::new("my-function")
1040 /// .await
1041 /// .unwrap();
1042 /// let _ = runner.run("input".to_string()).await.unwrap();
1043 ///
1044 /// // Get the first operation
1045 /// if let Some(op) = runner.get_operation_by_index(0) {
1046 /// println!("First operation: {:?}", op.get_type());
1047 /// }
1048 /// ```
1049 pub fn get_operation_by_index(&self, index: usize) -> Option<DurableOperation> {
1050 let all_ops = Arc::new(self.operation_storage.get_all().to_vec());
1051 self.operation_storage
1052 .get_by_index(index)
1053 .cloned()
1054 .map(|op| DurableOperation::new(op).with_operations(Arc::clone(&all_ops)))
1055 }
1056
1057 /// Gets an operation by name and occurrence index.
1058 ///
1059 /// This is useful when multiple operations have the same name and you need
1060 /// to access a specific occurrence.
1061 ///
1062 /// # Arguments
1063 ///
1064 /// * `name` - The operation name to search for
1065 /// * `index` - The zero-based index among operations with that name
1066 ///
1067 /// # Returns
1068 ///
1069 /// A `DurableOperation` at that name/index combination, or `None` if not found.
1070 ///
1071 /// # Requirements
1072 ///
1073 /// - 4.3: WHEN a developer calls get_operation_by_name_and_index(name, index) on Test_Runner,
1074 /// THE Test_Runner SHALL return the nth operation with that name
1075 ///
1076 /// # Examples
1077 ///
1078 /// ```ignore
1079 /// use durable_execution_sdk_testing::CloudDurableTestRunner;
1080 ///
1081 /// let mut runner = CloudDurableTestRunner::<String>::new("my-function")
1082 /// .await
1083 /// .unwrap();
1084 /// let _ = runner.run("input".to_string()).await.unwrap();
1085 ///
1086 /// // Get the second "process" operation
1087 /// if let Some(op) = runner.get_operation_by_name_and_index("process", 1) {
1088 /// println!("Second process operation: {:?}", op.get_status());
1089 /// }
1090 /// ```
1091 pub fn get_operation_by_name_and_index(
1092 &self,
1093 name: &str,
1094 index: usize,
1095 ) -> Option<DurableOperation> {
1096 let all_ops = Arc::new(self.operation_storage.get_all().to_vec());
1097 self.operation_storage
1098 .get_by_name_and_index(name, index)
1099 .cloned()
1100 .map(|op| DurableOperation::new(op).with_operations(Arc::clone(&all_ops)))
1101 }
1102
1103 /// Gets an operation by its unique ID.
1104 ///
1105 /// # Arguments
1106 ///
1107 /// * `id` - The unique operation ID
1108 ///
1109 /// # Returns
1110 ///
1111 /// A `DurableOperation` with that ID, or `None` if no operation with that ID exists.
1112 ///
1113 /// # Requirements
1114 ///
1115 /// - 4.4: WHEN a developer calls get_operation_by_id(id) on Test_Runner,
1116 /// THE Test_Runner SHALL return the operation with that unique identifier
1117 ///
1118 /// # Examples
1119 ///
1120 /// ```ignore
1121 /// use durable_execution_sdk_testing::CloudDurableTestRunner;
1122 ///
1123 /// let mut runner = CloudDurableTestRunner::<String>::new("my-function")
1124 /// .await
1125 /// .unwrap();
1126 /// let _ = runner.run("input".to_string()).await.unwrap();
1127 ///
1128 /// if let Some(op) = runner.get_operation_by_id("op-123") {
1129 /// println!("Found operation: {:?}", op.get_name());
1130 /// }
1131 /// ```
1132 pub fn get_operation_by_id(&self, id: &str) -> Option<DurableOperation> {
1133 let all_ops = Arc::new(self.operation_storage.get_all().to_vec());
1134 self.operation_storage
1135 .get_by_id(id)
1136 .cloned()
1137 .map(|op| DurableOperation::new(op).with_operations(Arc::clone(&all_ops)))
1138 }
1139
1140 /// Gets all captured operations.
1141 ///
1142 /// # Returns
1143 ///
1144 /// A vector of all operations in execution order.
1145 ///
1146 /// # Examples
1147 ///
1148 /// ```ignore
1149 /// use durable_execution_sdk_testing::CloudDurableTestRunner;
1150 ///
1151 /// let mut runner = CloudDurableTestRunner::<String>::new("my-function")
1152 /// .await
1153 /// .unwrap();
1154 /// let _ = runner.run("input".to_string()).await.unwrap();
1155 ///
1156 /// let all_ops = runner.get_all_operations();
1157 /// println!("Total operations: {}", all_ops.len());
1158 /// ```
1159 pub fn get_all_operations(&self) -> Vec<DurableOperation> {
1160 let all_ops = Arc::new(self.operation_storage.get_all().to_vec());
1161 self.operation_storage
1162 .get_all()
1163 .iter()
1164 .cloned()
1165 .map(|op| DurableOperation::new(op).with_operations(Arc::clone(&all_ops)))
1166 .collect()
1167 }
1168
1169 /// Returns the number of captured operations.
1170 pub fn operation_count(&self) -> usize {
1171 self.operation_storage.operations.len()
1172 }
1173
1174 /// Clears all captured operations.
1175 ///
1176 /// This is useful when reusing the runner for multiple test runs.
1177 pub fn clear_operations(&mut self) {
1178 self.operation_storage.clear();
1179 }
1180}
1181
1182#[cfg(test)]
1183mod tests {
1184 use super::*;
1185
1186 #[test]
1187 fn test_config_default() {
1188 let config = CloudTestRunnerConfig::default();
1189 assert_eq!(config.poll_interval, Duration::from_millis(1000));
1190 assert_eq!(config.timeout, Duration::from_secs(300));
1191 }
1192
1193 #[test]
1194 fn test_config_builder() {
1195 let config = CloudTestRunnerConfig::new()
1196 .with_poll_interval(Duration::from_millis(500))
1197 .with_timeout(Duration::from_secs(60));
1198
1199 assert_eq!(config.poll_interval, Duration::from_millis(500));
1200 assert_eq!(config.timeout, Duration::from_secs(60));
1201 }
1202
1203 #[test]
1204 fn test_operation_storage() {
1205 let mut storage = OperationStorage::new();
1206
1207 // Add operations
1208 let mut op1 = Operation::new("op-001", durable_execution_sdk::OperationType::Step);
1209 op1.name = Some("step1".to_string());
1210 storage.add_operation(op1);
1211
1212 let mut op2 = Operation::new("op-002", durable_execution_sdk::OperationType::Wait);
1213 op2.name = Some("wait1".to_string());
1214 storage.add_operation(op2);
1215
1216 let mut op3 = Operation::new("op-003", durable_execution_sdk::OperationType::Step);
1217 op3.name = Some("step1".to_string()); // Same name as op1
1218 storage.add_operation(op3);
1219
1220 // Test get_by_id
1221 assert!(storage.get_by_id("op-001").is_some());
1222 assert!(storage.get_by_id("op-002").is_some());
1223 assert!(storage.get_by_id("nonexistent").is_none());
1224
1225 // Test get_by_name (returns first)
1226 let first_step = storage.get_by_name("step1").unwrap();
1227 assert_eq!(first_step.operation_id, "op-001");
1228
1229 // Test get_by_name_and_index
1230 let second_step = storage.get_by_name_and_index("step1", 1).unwrap();
1231 assert_eq!(second_step.operation_id, "op-003");
1232
1233 // Test get_by_index
1234 let first_op = storage.get_by_index(0).unwrap();
1235 assert_eq!(first_op.operation_id, "op-001");
1236
1237 // Test get_all
1238 assert_eq!(storage.get_all().len(), 3);
1239
1240 // Test clear
1241 storage.clear();
1242 assert_eq!(storage.get_all().len(), 0);
1243 }
1244}