#[macro_export]
macro_rules! all {
($ctx:expr, $($fut:expr),+ $(,)?) => {{
let futures: ::std::vec::Vec<
::std::pin::Pin<
::std::boxed::Box<
dyn ::std::future::Future<
Output = $crate::error::DurableResult<_>
> + ::std::marker::Send
>
>
> = ::std::vec![
$(::std::boxed::Box::pin($fut)),+
];
$ctx.all(futures)
}};
}
#[macro_export]
macro_rules! any {
($ctx:expr, $($fut:expr),+ $(,)?) => {{
let futures: ::std::vec::Vec<
::std::pin::Pin<
::std::boxed::Box<
dyn ::std::future::Future<
Output = $crate::error::DurableResult<_>
> + ::std::marker::Send
>
>
> = ::std::vec![
$(::std::boxed::Box::pin($fut)),+
];
$ctx.any(futures)
}};
}
#[macro_export]
macro_rules! race {
($ctx:expr, $($fut:expr),+ $(,)?) => {{
let futures: ::std::vec::Vec<
::std::pin::Pin<
::std::boxed::Box<
dyn ::std::future::Future<
Output = $crate::error::DurableResult<_>
> + ::std::marker::Send
>
>
> = ::std::vec![
$(::std::boxed::Box::pin($fut)),+
];
$ctx.race(futures)
}};
}
#[macro_export]
macro_rules! all_settled {
($ctx:expr, $($fut:expr),+ $(,)?) => {{
let futures: ::std::vec::Vec<
::std::pin::Pin<
::std::boxed::Box<
dyn ::std::future::Future<
Output = $crate::error::DurableResult<_>
> + ::std::marker::Send
>
>
> = ::std::vec![
$(::std::boxed::Box::pin($fut)),+
];
$ctx.all_settled(futures)
}};
}
#[cfg(test)]
mod tests {
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use crate::client::{CheckpointResponse, MockDurableServiceClient, SharedDurableServiceClient};
use crate::context::TracingLogger;
use crate::error::DurableError;
use crate::lambda::InitialExecutionState;
use crate::state::ExecutionState;
type DurableFuture<T> = Pin<Box<dyn Future<Output = Result<T, DurableError>> + Send>>;
fn create_mock_client() -> SharedDurableServiceClient {
Arc::new(
MockDurableServiceClient::new()
.with_checkpoint_response(Ok(CheckpointResponse::new("token-1")))
.with_checkpoint_response(Ok(CheckpointResponse::new("token-2")))
.with_checkpoint_response(Ok(CheckpointResponse::new("token-3")))
.with_checkpoint_response(Ok(CheckpointResponse::new("token-4")))
.with_checkpoint_response(Ok(CheckpointResponse::new("token-5"))),
)
}
fn create_test_state(client: SharedDurableServiceClient) -> Arc<ExecutionState> {
Arc::new(ExecutionState::new(
"arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
"initial-token",
InitialExecutionState::new(),
client,
))
}
struct MockContext {
state: Arc<ExecutionState>,
logger: Arc<dyn crate::context::Logger>,
id_counter: std::sync::atomic::AtomicU64,
}
impl MockContext {
fn new(state: Arc<ExecutionState>) -> Self {
Self {
state,
logger: Arc::new(TracingLogger),
id_counter: std::sync::atomic::AtomicU64::new(0),
}
}
fn next_op_id(&self) -> crate::context::OperationIdentifier {
let counter = self
.id_counter
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
crate::context::OperationIdentifier::new(format!("test-op-{}", counter), None, None)
}
pub async fn all<T, Fut>(&self, futures: Vec<Fut>) -> Result<Vec<T>, DurableError>
where
T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
{
let op_id = self.next_op_id();
crate::handlers::promise::all_handler(futures, &self.state, &op_id, &self.logger).await
}
pub async fn any<T, Fut>(&self, futures: Vec<Fut>) -> Result<T, DurableError>
where
T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
{
let op_id = self.next_op_id();
crate::handlers::promise::any_handler(futures, &self.state, &op_id, &self.logger).await
}
pub async fn race<T, Fut>(&self, futures: Vec<Fut>) -> Result<T, DurableError>
where
T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
{
let op_id = self.next_op_id();
crate::handlers::promise::race_handler(futures, &self.state, &op_id, &self.logger).await
}
pub async fn all_settled<T, Fut>(
&self,
futures: Vec<Fut>,
) -> Result<crate::concurrency::BatchResult<T>, DurableError>
where
T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
{
let op_id = self.next_op_id();
crate::handlers::promise::all_settled_handler(
futures,
&self.state,
&op_id,
&self.logger,
)
.await
}
}
#[tokio::test]
async fn test_all_macro_success_returns_vec_in_order() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::all!(
ctx,
Box::pin(async { Ok::<_, DurableError>(1) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async { Ok(2) }),
Box::pin(async { Ok(3) }),
)
.await;
assert!(result.is_ok());
let values = result.unwrap();
assert_eq!(values, vec![1, 2, 3]);
}
#[tokio::test]
async fn test_all_macro_single_future() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::all!(
ctx,
Box::pin(async { Ok::<_, DurableError>(42) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
)
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), vec![42]);
}
#[tokio::test]
async fn test_all_macro_two_futures() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::all!(
ctx,
Box::pin(async { Ok::<_, DurableError>("hello".to_string()) })
as Pin<Box<dyn Future<Output = Result<String, DurableError>> + Send>>,
Box::pin(async { Ok("world".to_string()) }),
)
.await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
vec!["hello".to_string(), "world".to_string()]
);
}
#[tokio::test]
async fn test_all_macro_failure_returns_first_error() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::all!(
ctx,
Box::pin(async { Ok::<_, DurableError>(1) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async { Err(DurableError::execution("test error")) }),
Box::pin(async { Ok(3) }),
)
.await;
assert!(result.is_err());
let error = result.unwrap_err();
let error_msg = format!("{}", error);
assert!(error_msg.contains("failed") || error_msg.contains("error"));
}
#[tokio::test]
async fn test_all_macro_first_future_fails() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::all!(
ctx,
Box::pin(async { Err::<i32, _>(DurableError::execution("first error")) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async { Ok(2) }),
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_all_macro_trailing_comma() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::all!(
ctx,
Box::pin(async { Ok::<_, DurableError>(1) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async { Ok(2) }),
)
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), vec![1, 2]);
}
#[tokio::test]
async fn test_all_macro_string_type() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::all!(
ctx,
Box::pin(async { Ok::<_, DurableError>("a".to_string()) })
as Pin<Box<dyn Future<Output = Result<String, DurableError>> + Send>>,
Box::pin(async { Ok("b".to_string()) }),
Box::pin(async { Ok("c".to_string()) }),
)
.await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
vec!["a".to_string(), "b".to_string(), "c".to_string()]
);
}
#[tokio::test]
async fn test_all_macro_equivalence_with_manual_boxing() {
let client1 = create_mock_client();
let state1 = create_test_state(client1);
let ctx1 = MockContext::new(state1);
let macro_result = crate::all!(
ctx1,
Box::pin(async { Ok::<_, DurableError>(10) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async { Ok(20) }),
Box::pin(async { Ok(30) }),
)
.await;
let client2 = create_mock_client();
let state2 = create_test_state(client2);
let ctx2 = MockContext::new(state2);
let futures: Vec<DurableFuture<i32>> = vec![
Box::pin(async { Ok::<_, DurableError>(10) }),
Box::pin(async { Ok(20) }),
Box::pin(async { Ok(30) }),
];
let manual_result = ctx2.all(futures).await;
assert!(macro_result.is_ok());
assert!(manual_result.is_ok());
assert_eq!(macro_result.unwrap(), manual_result.unwrap());
}
#[tokio::test]
async fn test_all_macro_equivalence_with_manual_boxing_failure() {
let client1 = create_mock_client();
let state1 = create_test_state(client1);
let ctx1 = MockContext::new(state1);
let macro_result = crate::all!(
ctx1,
Box::pin(async { Ok::<_, DurableError>(1) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async { Err(DurableError::execution("test error")) }),
)
.await;
let client2 = create_mock_client();
let state2 = create_test_state(client2);
let ctx2 = MockContext::new(state2);
let futures: Vec<DurableFuture<i32>> = vec![
Box::pin(async { Ok::<_, DurableError>(1) }),
Box::pin(async { Err(DurableError::execution("test error")) }),
];
let manual_result = ctx2.all(futures).await;
assert!(macro_result.is_err());
assert!(manual_result.is_err());
}
#[tokio::test]
async fn test_any_macro_first_success_returned() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::any!(
ctx,
Box::pin(async { Ok::<_, DurableError>(1) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async { Ok(2) }),
Box::pin(async { Ok(3) }),
)
.await;
assert!(result.is_ok());
let value = result.unwrap();
assert!(value == 1 || value == 2 || value == 3);
}
#[tokio::test]
async fn test_any_macro_success_among_failures() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::any!(
ctx,
Box::pin(async { Err::<i32, _>(DurableError::execution("error 1")) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async { Ok(42) }),
Box::pin(async { Err(DurableError::execution("error 2")) }),
)
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 42);
}
#[tokio::test]
async fn test_any_macro_all_failures_returns_combined_error() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::any!(
ctx,
Box::pin(async { Err::<i32, _>(DurableError::execution("error 1")) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async { Err(DurableError::execution("error 2")) }),
Box::pin(async { Err(DurableError::execution("error 3")) }),
)
.await;
assert!(result.is_err());
let error = result.unwrap_err();
let error_msg = format!("{}", error);
assert!(error_msg.contains("All") || error_msg.contains("failed"));
}
#[tokio::test]
async fn test_any_macro_single_future_success() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::any!(
ctx,
Box::pin(async { Ok::<_, DurableError>(99) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
)
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 99);
}
#[tokio::test]
async fn test_any_macro_single_future_failure() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::any!(
ctx,
Box::pin(async { Err::<i32, _>(DurableError::execution("single error")) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_any_macro_trailing_comma() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::any!(
ctx,
Box::pin(async { Ok::<_, DurableError>(1) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async { Ok(2) }),
)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_any_macro_string_type() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::any!(
ctx,
Box::pin(async { Ok::<_, DurableError>("hello".to_string()) })
as Pin<Box<dyn Future<Output = Result<String, DurableError>> + Send>>,
Box::pin(async { Ok("world".to_string()) }),
)
.await;
assert!(result.is_ok());
let value = result.unwrap();
assert!(value == "hello" || value == "world");
}
#[tokio::test]
async fn test_any_macro_equivalence_with_manual_boxing() {
let client1 = create_mock_client();
let state1 = create_test_state(client1);
let ctx1 = MockContext::new(state1);
let macro_result = crate::any!(
ctx1,
Box::pin(async { Err::<i32, _>(DurableError::execution("fail")) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async { Ok(42) }),
)
.await;
let client2 = create_mock_client();
let state2 = create_test_state(client2);
let ctx2 = MockContext::new(state2);
let futures: Vec<DurableFuture<i32>> = vec![
Box::pin(async { Err::<i32, _>(DurableError::execution("fail")) }),
Box::pin(async { Ok(42) }),
];
let manual_result = ctx2.any(futures).await;
assert!(macro_result.is_ok());
assert!(manual_result.is_ok());
assert_eq!(macro_result.unwrap(), manual_result.unwrap());
}
#[tokio::test]
async fn test_race_macro_first_success_settles() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::race!(
ctx,
Box::pin(async { Ok::<_, DurableError>(1) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Ok(2)
}),
)
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 1);
}
#[tokio::test]
async fn test_race_macro_first_failure_settles() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::race!(
ctx,
Box::pin(async { Err::<i32, _>(DurableError::execution("fast error")) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Ok(2)
}),
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_race_macro_single_future_success() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::race!(
ctx,
Box::pin(async { Ok::<_, DurableError>(42) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
)
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 42);
}
#[tokio::test]
async fn test_race_macro_single_future_failure() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::race!(
ctx,
Box::pin(async { Err::<i32, _>(DurableError::execution("single error")) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_race_macro_trailing_comma() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::race!(
ctx,
Box::pin(async { Ok::<_, DurableError>(1) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async { Ok(2) }),
)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_race_macro_string_type() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::race!(
ctx,
Box::pin(async { Ok::<_, DurableError>("hello".to_string()) })
as Pin<Box<dyn Future<Output = Result<String, DurableError>> + Send>>,
Box::pin(async {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Ok("world".to_string())
}),
)
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), "hello");
}
#[tokio::test]
async fn test_race_macro_multiple_futures_success_first() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::race!(
ctx,
Box::pin(async { Ok::<_, DurableError>(1) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async { Ok(2) }),
Box::pin(async { Ok(3) }),
)
.await;
assert!(result.is_ok());
let value = result.unwrap();
assert!(value == 1 || value == 2 || value == 3);
}
#[tokio::test]
async fn test_race_macro_equivalence_with_manual_boxing() {
let client1 = create_mock_client();
let state1 = create_test_state(client1);
let ctx1 = MockContext::new(state1);
let macro_result = crate::race!(
ctx1,
Box::pin(async { Ok::<_, DurableError>(10) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Ok(20)
}),
)
.await;
let client2 = create_mock_client();
let state2 = create_test_state(client2);
let ctx2 = MockContext::new(state2);
let futures: Vec<DurableFuture<i32>> = vec![
Box::pin(async { Ok::<_, DurableError>(10) }),
Box::pin(async {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Ok(20)
}),
];
let manual_result = ctx2.race(futures).await;
assert!(macro_result.is_ok());
assert!(manual_result.is_ok());
assert_eq!(macro_result.unwrap(), manual_result.unwrap());
}
#[tokio::test]
async fn test_race_macro_equivalence_with_manual_boxing_failure() {
let client1 = create_mock_client();
let state1 = create_test_state(client1);
let ctx1 = MockContext::new(state1);
let macro_result = crate::race!(
ctx1,
Box::pin(async { Err::<i32, _>(DurableError::execution("fast error")) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Ok(20)
}),
)
.await;
let client2 = create_mock_client();
let state2 = create_test_state(client2);
let ctx2 = MockContext::new(state2);
let futures: Vec<DurableFuture<i32>> = vec![
Box::pin(async { Err::<i32, _>(DurableError::execution("fast error")) }),
Box::pin(async {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Ok(20)
}),
];
let manual_result = ctx2.race(futures).await;
assert!(macro_result.is_err());
assert!(manual_result.is_err());
}
#[tokio::test]
async fn test_all_settled_macro_returns_batch_result() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::all_settled!(
ctx,
Box::pin(async { Ok::<_, DurableError>(1) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async { Ok(2) }),
Box::pin(async { Ok(3) }),
)
.await;
assert!(result.is_ok());
let batch = result.unwrap();
assert_eq!(batch.items.len(), 3);
assert_eq!(batch.success_count(), 3);
assert_eq!(batch.failure_count(), 0);
assert!(batch.all_succeeded());
}
#[tokio::test]
async fn test_all_settled_macro_preserves_order() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::all_settled!(
ctx,
Box::pin(async { Ok::<_, DurableError>(10) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async { Ok(20) }),
Box::pin(async { Ok(30) }),
)
.await;
assert!(result.is_ok());
let batch = result.unwrap();
assert_eq!(batch.items.len(), 3);
assert_eq!(batch.items[0].index, 0);
assert_eq!(batch.items[0].get_result(), Some(&10));
assert_eq!(batch.items[1].index, 1);
assert_eq!(batch.items[1].get_result(), Some(&20));
assert_eq!(batch.items[2].index, 2);
assert_eq!(batch.items[2].get_result(), Some(&30));
}
#[tokio::test]
async fn test_all_settled_macro_mixed_results() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::all_settled!(
ctx,
Box::pin(async { Ok::<_, DurableError>(1) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async { Err(DurableError::execution("test error")) }),
Box::pin(async { Ok(3) }),
)
.await;
assert!(result.is_ok());
let batch = result.unwrap();
assert_eq!(batch.items.len(), 3);
assert_eq!(batch.success_count(), 2);
assert_eq!(batch.failure_count(), 1);
assert!(!batch.all_succeeded());
assert!(batch.has_failures());
}
#[tokio::test]
async fn test_all_settled_macro_single_future() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::all_settled!(
ctx,
Box::pin(async { Ok::<_, DurableError>(42) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
)
.await;
assert!(result.is_ok());
let batch = result.unwrap();
assert_eq!(batch.items.len(), 1);
assert_eq!(batch.success_count(), 1);
assert_eq!(batch.items[0].get_result(), Some(&42));
}
#[tokio::test]
async fn test_all_settled_macro_single_failure() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::all_settled!(
ctx,
Box::pin(async { Err::<i32, _>(DurableError::execution("single error")) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
)
.await;
assert!(result.is_ok());
let batch = result.unwrap();
assert_eq!(batch.items.len(), 1);
assert_eq!(batch.failure_count(), 1);
assert!(batch.items[0].is_failed());
}
#[tokio::test]
async fn test_all_settled_macro_trailing_comma() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::all_settled!(
ctx,
Box::pin(async { Ok::<_, DurableError>(1) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async { Ok(2) }),
)
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap().items.len(), 2);
}
#[tokio::test]
async fn test_all_settled_macro_string_type() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::all_settled!(
ctx,
Box::pin(async { Ok::<_, DurableError>("a".to_string()) })
as Pin<Box<dyn Future<Output = Result<String, DurableError>> + Send>>,
Box::pin(async { Ok("b".to_string()) }),
Box::pin(async { Ok("c".to_string()) }),
)
.await;
assert!(result.is_ok());
let batch = result.unwrap();
assert_eq!(batch.items.len(), 3);
assert_eq!(batch.items[0].get_result(), Some(&"a".to_string()));
assert_eq!(batch.items[1].get_result(), Some(&"b".to_string()));
assert_eq!(batch.items[2].get_result(), Some(&"c".to_string()));
}
#[tokio::test]
async fn test_all_settled_macro_equivalence_with_manual_boxing() {
let client1 = create_mock_client();
let state1 = create_test_state(client1);
let ctx1 = MockContext::new(state1);
let macro_result = crate::all_settled!(
ctx1,
Box::pin(async { Ok::<_, DurableError>(10) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async { Err(DurableError::execution("test error")) }),
Box::pin(async { Ok(30) }),
)
.await;
let client2 = create_mock_client();
let state2 = create_test_state(client2);
let ctx2 = MockContext::new(state2);
let futures: Vec<DurableFuture<i32>> = vec![
Box::pin(async { Ok::<_, DurableError>(10) }),
Box::pin(async { Err(DurableError::execution("test error")) }),
Box::pin(async { Ok(30) }),
];
let manual_result = ctx2.all_settled(futures).await;
assert!(macro_result.is_ok());
assert!(manual_result.is_ok());
let macro_batch = macro_result.unwrap();
let manual_batch = manual_result.unwrap();
assert_eq!(macro_batch.items.len(), manual_batch.items.len());
assert_eq!(macro_batch.success_count(), manual_batch.success_count());
assert_eq!(macro_batch.failure_count(), manual_batch.failure_count());
}
#[tokio::test]
async fn test_all_settled_macro_all_failures() {
let client = create_mock_client();
let state = create_test_state(client);
let ctx = MockContext::new(state);
let result = crate::all_settled!(
ctx,
Box::pin(async { Err::<i32, _>(DurableError::execution("error 1")) })
as Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>>,
Box::pin(async { Err(DurableError::execution("error 2")) }),
Box::pin(async { Err(DurableError::execution("error 3")) }),
)
.await;
assert!(result.is_ok());
let batch = result.unwrap();
assert_eq!(batch.items.len(), 3);
assert_eq!(batch.success_count(), 0);
assert_eq!(batch.failure_count(), 3);
assert!(!batch.all_succeeded());
}
}