use async_trait::async_trait;
use turul_a2a::error::A2aError;
use turul_a2a::executor::AgentExecutor;
use turul_a2a::storage::{
A2aAtomicStore, A2aCancellationSupervisor, A2aEventStore, A2aPushNotificationStorage,
A2aStorageError, A2aTaskStorage, InMemoryA2aStorage,
};
use turul_a2a_types::{Message, Task};
use crate::LambdaA2aHandler;
struct DummyExecutor;
#[async_trait]
impl AgentExecutor for DummyExecutor {
async fn execute(
&self,
_task: &mut Task,
_msg: &Message,
_ctx: &turul_a2a::executor::ExecutionContext,
) -> Result<(), A2aError> {
Ok(())
}
fn agent_card(&self) -> turul_a2a_proto::AgentCard {
turul_a2a_proto::AgentCard::default()
}
}
#[derive(Clone, Default)]
struct FakeBackend;
#[async_trait]
impl A2aTaskStorage for FakeBackend {
fn backend_name(&self) -> &'static str {
"fake-backend"
}
async fn create_task(&self, _t: &str, _o: &str, task: Task) -> Result<Task, A2aStorageError> {
Ok(task)
}
async fn get_task(
&self,
_t: &str,
_tid: &str,
_o: &str,
_h: Option<i32>,
) -> Result<Option<Task>, A2aStorageError> {
Ok(None)
}
async fn update_task(&self, _t: &str, _o: &str, _task: Task) -> Result<(), A2aStorageError> {
Ok(())
}
async fn delete_task(&self, _t: &str, _tid: &str, _o: &str) -> Result<bool, A2aStorageError> {
Ok(false)
}
async fn list_tasks(
&self,
_f: turul_a2a::storage::TaskFilter,
) -> Result<turul_a2a::storage::TaskListPage, A2aStorageError> {
Ok(turul_a2a::storage::TaskListPage {
tasks: vec![],
next_page_token: String::new(),
page_size: 0,
total_size: 0,
})
}
async fn update_task_status(
&self,
_t: &str,
_tid: &str,
_o: &str,
_s: turul_a2a_types::TaskStatus,
) -> Result<Task, A2aStorageError> {
unimplemented!()
}
async fn append_message(
&self,
_t: &str,
_tid: &str,
_o: &str,
_m: Message,
) -> Result<(), A2aStorageError> {
Ok(())
}
async fn append_artifact(
&self,
_t: &str,
_tid: &str,
_o: &str,
_a: turul_a2a_types::Artifact,
_append: bool,
_last: bool,
) -> Result<(), A2aStorageError> {
Ok(())
}
async fn task_count(&self) -> Result<usize, A2aStorageError> {
Ok(0)
}
async fn maintenance(&self) -> Result<(), A2aStorageError> {
Ok(())
}
async fn set_cancel_requested(
&self,
_t: &str,
_tid: &str,
_o: &str,
) -> Result<(), A2aStorageError> {
Ok(())
}
}
#[async_trait]
impl A2aPushNotificationStorage for FakeBackend {
fn backend_name(&self) -> &'static str {
"fake-backend"
}
async fn create_config(
&self,
_t: &str,
c: turul_a2a_proto::TaskPushNotificationConfig,
) -> Result<turul_a2a_proto::TaskPushNotificationConfig, A2aStorageError> {
Ok(c)
}
async fn get_config(
&self,
_t: &str,
_tid: &str,
_c: &str,
) -> Result<Option<turul_a2a_proto::TaskPushNotificationConfig>, A2aStorageError> {
Ok(None)
}
async fn list_configs(
&self,
_t: &str,
_tid: &str,
_p: Option<&str>,
_ps: Option<i32>,
) -> Result<turul_a2a::storage::PushConfigListPage, A2aStorageError> {
Ok(turul_a2a::storage::PushConfigListPage {
configs: vec![],
next_page_token: String::new(),
})
}
async fn delete_config(&self, _t: &str, _tid: &str, _c: &str) -> Result<(), A2aStorageError> {
Ok(())
}
async fn list_configs_eligible_at_event(
&self,
_t: &str,
_tid: &str,
_seq: u64,
_p: Option<&str>,
_ps: Option<i32>,
) -> Result<turul_a2a::storage::PushConfigListPage, A2aStorageError> {
Ok(turul_a2a::storage::PushConfigListPage {
configs: vec![],
next_page_token: String::new(),
})
}
}
#[async_trait]
impl A2aEventStore for FakeBackend {
fn backend_name(&self) -> &'static str {
"fake-backend"
}
async fn append_event(
&self,
_t: &str,
_tid: &str,
_e: turul_a2a::streaming::StreamEvent,
) -> Result<u64, A2aStorageError> {
Ok(0)
}
async fn get_events_after(
&self,
_t: &str,
_tid: &str,
_s: u64,
) -> Result<Vec<(u64, turul_a2a::streaming::StreamEvent)>, A2aStorageError> {
Ok(vec![])
}
async fn latest_sequence(&self, _t: &str, _tid: &str) -> Result<u64, A2aStorageError> {
Ok(0)
}
async fn cleanup_expired(&self) -> Result<u64, A2aStorageError> {
Ok(0)
}
}
#[async_trait]
impl A2aAtomicStore for FakeBackend {
fn backend_name(&self) -> &'static str {
"fake-backend"
}
async fn create_task_with_events(
&self,
_t: &str,
_o: &str,
task: Task,
_e: Vec<turul_a2a::streaming::StreamEvent>,
) -> Result<(Task, Vec<u64>), A2aStorageError> {
Ok((task, vec![]))
}
async fn update_task_status_with_events(
&self,
_t: &str,
_tid: &str,
_o: &str,
_s: turul_a2a_types::TaskStatus,
_e: Vec<turul_a2a::streaming::StreamEvent>,
) -> Result<(Task, Vec<u64>), A2aStorageError> {
unimplemented!()
}
async fn update_task_with_events(
&self,
_t: &str,
_o: &str,
_task: Task,
_e: Vec<turul_a2a::streaming::StreamEvent>,
) -> Result<Vec<u64>, A2aStorageError> {
Ok(vec![])
}
}
#[async_trait]
impl A2aCancellationSupervisor for FakeBackend {
fn backend_name(&self) -> &'static str {
"fake-backend"
}
async fn supervisor_get_cancel_requested(
&self,
_t: &str,
_tid: &str,
) -> Result<bool, A2aStorageError> {
Ok(false)
}
async fn supervisor_list_cancel_requested(
&self,
_t: &str,
_tids: &[String],
) -> Result<Vec<String>, A2aStorageError> {
Ok(vec![])
}
}
#[test]
fn storage_bundle_requires_cancellation_supervisor_trait_bound() {
let result = LambdaA2aHandler::builder()
.executor(DummyExecutor)
.storage(InMemoryA2aStorage::new())
.build();
assert!(result.is_ok(), "unified storage bundle should build");
}
#[test]
fn build_rejects_missing_cancellation_supervisor() {
let storage = InMemoryA2aStorage::new();
let result = LambdaA2aHandler::builder()
.executor(DummyExecutor)
.task_storage(storage.clone())
.push_storage(storage.clone())
.event_store(storage.clone())
.atomic_store(storage)
.build();
match result {
Err(A2aError::Internal(msg)) => {
assert!(
msg.contains("cancellation_supervisor"),
"error message should mention cancellation_supervisor: {msg}"
);
}
Ok(_) => {
panic!("expected Internal error about missing cancellation_supervisor, got Ok(handler)")
}
Err(other) => panic!("expected Internal error, got different error: {other}"),
}
}
#[test]
fn build_rejects_cancellation_supervisor_backend_mismatch() {
let storage = InMemoryA2aStorage::new();
let wrong_supervisor = FakeBackend;
let result = LambdaA2aHandler::builder()
.executor(DummyExecutor)
.task_storage(storage.clone())
.push_storage(storage.clone())
.event_store(storage.clone())
.atomic_store(storage)
.cancellation_supervisor(wrong_supervisor)
.build();
match result {
Err(A2aError::Internal(msg)) => {
assert!(
msg.contains("backend mismatch") || msg.contains("Storage backend mismatch"),
"error should mention backend mismatch: {msg}"
);
assert!(
msg.contains("cancellation_supervisor"),
"error should mention which field mismatched: {msg}"
);
}
Ok(_) => panic!("expected same-backend rejection, got Ok(handler)"),
Err(other) => panic!("expected Internal mismatch error, got different error: {other}"),
}
}
#[test]
fn build_succeeds_with_explicit_cancellation_supervisor_same_backend() {
let storage = InMemoryA2aStorage::new();
let result = LambdaA2aHandler::builder()
.executor(DummyExecutor)
.task_storage(storage.clone())
.push_storage(storage.clone())
.event_store(storage.clone())
.atomic_store(storage.clone())
.cancellation_supervisor(storage)
.build();
assert!(
result.is_ok(),
"same-backend individual setters including cancellation_supervisor should build: {:?}",
result.err()
);
}
#[test]
fn lambda_builder_rejects_push_consumer_without_dispatch_enabled() {
let storage = InMemoryA2aStorage::new(); let result = LambdaA2aHandler::builder()
.executor(DummyExecutor)
.task_storage(storage.clone())
.push_storage(storage.clone())
.event_store(storage.clone())
.atomic_store(storage.clone())
.cancellation_supervisor(storage.clone())
.push_delivery_store(storage)
.build();
let err = match result {
Err(e) => e.to_string(),
Ok(_) => panic!("orphan delivery store must be rejected"),
};
assert!(
err.contains("push_delivery_store wired")
&& err.contains("push_dispatch_enabled")
&& err.contains("with_push_dispatch_enabled(true)"),
"lambda error must mirror main server wording: {err}"
);
}
#[test]
fn lambda_builder_rejects_push_dispatch_without_consumer() {
let storage = InMemoryA2aStorage::new().with_push_dispatch_enabled(true);
let result = LambdaA2aHandler::builder()
.executor(DummyExecutor)
.task_storage(storage.clone())
.push_storage(storage.clone())
.event_store(storage.clone())
.atomic_store(storage.clone())
.cancellation_supervisor(storage)
.build();
let err = match result {
Err(e) => e.to_string(),
Ok(_) => panic!("orphan dispatch flag must be rejected"),
};
assert!(
err.contains("push_dispatch_enabled() is true") && err.contains("no consumer"),
"lambda error must cite the orphaned-marker rationale: {err}"
);
}
#[test]
fn lambda_builder_accepts_push_fully_wired() {
let storage = InMemoryA2aStorage::new().with_push_dispatch_enabled(true);
let result = LambdaA2aHandler::builder()
.executor(DummyExecutor)
.storage(storage.clone())
.push_delivery_store(storage)
.build();
assert!(
result.is_ok(),
"lambda push fully wired must build: {:?}",
result.err()
);
}
#[test]
fn lambda_builder_accepts_non_push_deployment() {
let storage = InMemoryA2aStorage::new(); let result = LambdaA2aHandler::builder()
.executor(DummyExecutor)
.task_storage(storage.clone())
.push_storage(storage.clone())
.event_store(storage.clone())
.atomic_store(storage.clone())
.cancellation_supervisor(storage)
.build();
assert!(
result.is_ok(),
"lambda non-push deployment must build: {:?}",
result.err()
);
}
#[test]
fn lambda_builder_rejects_retry_horizon_violation() {
use std::time::Duration;
use turul_a2a::server::RuntimeConfig;
let storage = InMemoryA2aStorage::new().with_push_dispatch_enabled(true);
let mut cfg = RuntimeConfig::default();
cfg.push_max_attempts = 10;
cfg.push_backoff_cap = Duration::from_secs(60);
cfg.push_claim_expiry = Duration::from_secs(600);
let result = LambdaA2aHandler::builder()
.executor(DummyExecutor)
.storage(storage.clone())
.push_delivery_store(storage)
.runtime_config(cfg)
.build();
let err = match result {
Err(e) => e.to_string(),
Ok(_) => panic!("retry horizon violation must be rejected"),
};
assert!(
err.contains("retry horizon") || err.contains("push_claim_expiry"),
"lambda error must cite the retry horizon: {err}"
);
}
fn agent_card_request(path: &str) -> lambda_http::Request {
http::Request::builder()
.method("GET")
.uri(path)
.header("accept", "application/json")
.header("a2a-version", "1.0")
.body(lambda_http::Body::Empty)
.unwrap()
}
async fn response_status(
handler: &LambdaA2aHandler,
req: lambda_http::Request,
) -> http::StatusCode {
let resp = handler.handle(req).await.expect("handle() must not error");
resp.status()
}
#[tokio::test]
async fn strip_path_prefix_routes_agent_card_under_apigw_stage_prefix() {
let handler = LambdaA2aHandler::builder()
.executor(DummyExecutor)
.storage(InMemoryA2aStorage::new())
.strip_path_prefix("/stage/agent")
.build()
.expect("builder must accept a valid strip_path_prefix");
let req = agent_card_request("/stage/agent/.well-known/agent-card.json");
assert_eq!(
response_status(&handler, req).await,
http::StatusCode::OK,
"prefixed agent-card path must reach the router after strip"
);
}
#[tokio::test]
async fn without_strip_path_prefix_apigw_stage_prefixed_path_is_404() {
let handler = LambdaA2aHandler::builder()
.executor(DummyExecutor)
.storage(InMemoryA2aStorage::new())
.build()
.expect("builder must accept no-prefix config");
let req = agent_card_request("/stage/agent/.well-known/agent-card.json");
assert_eq!(
response_status(&handler, req).await,
http::StatusCode::NOT_FOUND,
"baseline — without strip, the prefixed path must 404"
);
}
#[tokio::test]
async fn strip_path_prefix_passes_through_root_routed_request() {
let handler = LambdaA2aHandler::builder()
.executor(DummyExecutor)
.storage(InMemoryA2aStorage::new())
.strip_path_prefix("/stage/agent")
.build()
.unwrap();
let req = agent_card_request("/.well-known/agent-card.json");
assert_eq!(
response_status(&handler, req).await,
http::StatusCode::OK,
"un-prefixed root request must still reach the router"
);
}
#[test]
fn strip_path_prefix_rejects_missing_leading_slash() {
let result = LambdaA2aHandler::builder()
.executor(DummyExecutor)
.storage(InMemoryA2aStorage::new())
.strip_path_prefix("stage/agent")
.build();
let err = match result {
Err(e) => e.to_string(),
Ok(_) => panic!("must reject a prefix without a leading slash"),
};
assert!(
err.contains("strip_path_prefix"),
"error must name strip_path_prefix: {err}"
);
assert!(
err.contains("start with '/'"),
"error must explain the rule: {err}"
);
}
#[test]
fn strip_path_prefix_rejects_trailing_slash() {
let result = LambdaA2aHandler::builder()
.executor(DummyExecutor)
.storage(InMemoryA2aStorage::new())
.strip_path_prefix("/stage/agent/")
.build();
let err = match result {
Err(e) => e.to_string(),
Ok(_) => panic!("must reject a prefix ending with a slash"),
};
assert!(
err.contains("strip_path_prefix"),
"error must name strip_path_prefix: {err}"
);
assert!(
err.contains("not end with '/'"),
"error must explain the rule: {err}"
);
}
#[test]
fn strip_path_prefix_treats_slash_as_noop() {
let handler = LambdaA2aHandler::builder()
.executor(DummyExecutor)
.storage(InMemoryA2aStorage::new())
.strip_path_prefix("/")
.build();
assert!(
handler.is_ok(),
"single-slash prefix must build — treated as no-op"
);
}