use std::net::SocketAddr;
use std::sync::Arc;
use crate::error::A2aError;
use crate::executor::AgentExecutor;
use crate::middleware::{A2aMiddleware, MiddlewareStack, SecurityContribution};
use crate::router::{build_router, AppState};
use crate::storage::{A2aAtomicStore, A2aPushNotificationStorage, A2aTaskStorage, InMemoryA2aStorage};
use crate::streaming::TaskEventBroker;
pub struct A2aServerBuilder {
executor: Option<Arc<dyn AgentExecutor>>,
task_storage: Option<Arc<dyn A2aTaskStorage>>,
push_storage: Option<Arc<dyn A2aPushNotificationStorage>>,
event_store: Option<Arc<dyn crate::storage::A2aEventStore>>,
atomic_store: Option<Arc<dyn A2aAtomicStore>>,
bind_addr: SocketAddr,
middleware: Vec<Arc<dyn A2aMiddleware>>,
}
impl A2aServerBuilder {
pub fn new() -> Self {
Self {
executor: None,
task_storage: None,
push_storage: None,
event_store: None,
atomic_store: None,
bind_addr: ([0, 0, 0, 0], 3000).into(),
middleware: vec![],
}
}
pub fn executor(mut self, executor: impl AgentExecutor + 'static) -> Self {
self.executor = Some(Arc::new(executor));
self
}
pub fn storage<S>(mut self, storage: S) -> Self
where
S: A2aTaskStorage
+ A2aPushNotificationStorage
+ crate::storage::A2aEventStore
+ A2aAtomicStore
+ Clone
+ 'static,
{
self.task_storage = Some(Arc::new(storage.clone()));
self.push_storage = Some(Arc::new(storage.clone()));
self.event_store = Some(Arc::new(storage.clone()));
self.atomic_store = Some(Arc::new(storage));
self
}
pub fn task_storage(mut self, storage: impl A2aTaskStorage + 'static) -> Self {
self.task_storage = Some(Arc::new(storage));
self
}
pub fn push_storage(mut self, storage: impl A2aPushNotificationStorage + 'static) -> Self {
self.push_storage = Some(Arc::new(storage));
self
}
pub fn event_store(mut self, store: impl crate::storage::A2aEventStore + 'static) -> Self {
self.event_store = Some(Arc::new(store));
self
}
pub fn atomic_store(mut self, store: impl A2aAtomicStore + 'static) -> Self {
self.atomic_store = Some(Arc::new(store));
self
}
pub fn bind(mut self, addr: impl Into<SocketAddr>) -> Self {
self.bind_addr = addr.into();
self
}
pub fn middleware(mut self, mw: Arc<dyn A2aMiddleware>) -> Self {
self.middleware.push(mw);
self
}
pub fn build(self) -> Result<A2aServer, A2aError> {
let executor = self
.executor
.ok_or(A2aError::Internal("executor is required".into()))?;
let default_storage = InMemoryA2aStorage::new();
let task_storage = self
.task_storage
.unwrap_or_else(|| Arc::new(default_storage.clone()));
let push_storage = self
.push_storage
.unwrap_or_else(|| Arc::new(default_storage.clone()));
let event_store: Arc<dyn crate::storage::A2aEventStore> = self
.event_store
.unwrap_or_else(|| Arc::new(default_storage.clone()));
let atomic_store: Arc<dyn A2aAtomicStore> = self
.atomic_store
.unwrap_or_else(|| Arc::new(default_storage));
let task_backend = task_storage.backend_name();
let push_backend = push_storage.backend_name();
let event_backend = event_store.backend_name();
let atomic_backend = atomic_store.backend_name();
if task_backend != push_backend
|| task_backend != event_backend
|| task_backend != atomic_backend
{
return Err(A2aError::Internal(format!(
"Storage backend mismatch: task={task_backend}, push={push_backend}, \
event={event_backend}, atomic={atomic_backend}. \
ADR-009 requires all storage traits to share the same backend."
)));
}
let contributions: Vec<SecurityContribution> = self
.middleware
.iter()
.map(|m| m.security_contribution())
.collect();
let merged = merge_stacked_contributions(&contributions)?;
Ok(A2aServer {
state: AppState {
executor,
task_storage,
push_storage,
event_store,
atomic_store,
event_broker: TaskEventBroker::new(),
middleware_stack: Arc::new(MiddlewareStack::new(self.middleware)),
},
merged_security: merged,
bind_addr: self.bind_addr,
})
}
}
impl Default for A2aServerBuilder {
fn default() -> Self {
Self::new()
}
}
fn merge_stacked_contributions(
contributions: &[SecurityContribution],
) -> Result<SecurityContribution, A2aError> {
let mut merged = SecurityContribution::new();
if contributions.is_empty() {
return Ok(merged);
}
let mut seen_schemes: std::collections::HashMap<String, turul_a2a_proto::SecurityScheme> =
std::collections::HashMap::new();
for contrib in contributions {
for (name, scheme) in &contrib.schemes {
if let Some(existing) = seen_schemes.get(name) {
if !schemes_equivalent(existing, scheme) {
return Err(A2aError::Internal(format!(
"Security scheme collision: '{}' has conflicting definitions",
name
)));
}
} else {
seen_schemes.insert(name.clone(), scheme.clone());
merged.schemes.push((name.clone(), scheme.clone()));
}
}
}
let requirement_sets: Vec<&[turul_a2a_proto::SecurityRequirement]> = contributions
.iter()
.filter(|c| !c.requirements.is_empty())
.map(|c| c.requirements.as_slice())
.collect();
if requirement_sets.is_empty() {
return Ok(merged);
}
let mut combined: Vec<turul_a2a_proto::SecurityRequirement> =
requirement_sets[0].to_vec();
for alternatives in &requirement_sets[1..] {
let mut new_combined = Vec::new();
for existing in &combined {
for alt in *alternatives {
let mut merged_schemes = existing.schemes.clone();
for (name, scopes) in &alt.schemes {
merged_schemes
.entry(name.clone())
.and_modify(|existing_scopes| {
for s in &scopes.list {
if !existing_scopes.list.contains(s) {
existing_scopes.list.push(s.clone());
}
}
existing_scopes.list.sort();
existing_scopes.list.dedup();
})
.or_insert_with(|| scopes.clone());
}
new_combined.push(turul_a2a_proto::SecurityRequirement {
schemes: merged_schemes,
});
}
}
combined = new_combined;
}
merged.requirements = combined;
Ok(merged)
}
fn schemes_equivalent(
a: &turul_a2a_proto::SecurityScheme,
b: &turul_a2a_proto::SecurityScheme,
) -> bool {
a == b
}
pub struct A2aServer {
state: AppState,
merged_security: SecurityContribution,
bind_addr: SocketAddr,
}
impl A2aServer {
pub fn builder() -> A2aServerBuilder {
A2aServerBuilder::new()
}
pub fn into_router(self) -> axum::Router {
let router = build_router(self.state.clone());
if self.merged_security.is_empty() {
return router;
}
let wrapped = SecurityAugmentedExecutor {
inner: self.state.executor.clone(),
security: self.merged_security,
};
let augmented_state = AppState {
executor: Arc::new(wrapped),
task_storage: self.state.task_storage,
push_storage: self.state.push_storage,
event_store: self.state.event_store,
atomic_store: self.state.atomic_store,
event_broker: self.state.event_broker,
middleware_stack: self.state.middleware_stack,
};
build_router(augmented_state)
}
pub async fn run(self) -> Result<(), A2aError> {
let bind_addr = self.bind_addr;
let app = self.into_router();
let listener = tokio::net::TcpListener::bind(bind_addr)
.await
.map_err(|e| A2aError::Internal(format!("Failed to bind: {e}")))?;
tracing::info!("A2A server listening on {}", bind_addr);
axum::serve(listener, app)
.await
.map_err(|e| A2aError::Internal(format!("Server error: {e}")))?;
Ok(())
}
}
struct SecurityAugmentedExecutor {
inner: Arc<dyn AgentExecutor>,
security: SecurityContribution,
}
#[async_trait::async_trait]
impl AgentExecutor for SecurityAugmentedExecutor {
async fn execute(
&self,
task: &mut turul_a2a_types::Task,
msg: &turul_a2a_types::Message,
ctx: &crate::executor::ExecutionContext,
) -> Result<(), A2aError> {
self.inner.execute(task, msg, ctx).await
}
fn agent_card(&self) -> turul_a2a_proto::AgentCard {
let mut card = self.inner.agent_card();
for (name, scheme) in &self.security.schemes {
card.security_schemes
.entry(name.clone())
.or_insert_with(|| scheme.clone());
}
for req in &self.security.requirements {
card.security_requirements.push(req.clone());
}
card
}
fn extended_agent_card(
&self,
claims: Option<&serde_json::Value>,
) -> Option<turul_a2a_proto::AgentCard> {
self.inner.extended_agent_card(claims).map(|mut card| {
for (name, scheme) in &self.security.schemes {
card.security_schemes
.entry(name.clone())
.or_insert_with(|| scheme.clone());
}
for req in &self.security.requirements {
card.security_requirements.push(req.clone());
}
card
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::A2aError;
use crate::executor::AgentExecutor;
use turul_a2a_types::{Message, Task};
struct DummyExecutor;
#[async_trait::async_trait]
impl AgentExecutor for DummyExecutor {
async fn execute(&self, _task: &mut Task, _msg: &Message, _ctx: &crate::executor::ExecutionContext) -> Result<(), A2aError> {
Ok(())
}
fn agent_card(&self) -> turul_a2a_proto::AgentCard {
turul_a2a_proto::AgentCard::default()
}
}
#[test]
fn builder_requires_executor() {
let result = A2aServer::builder().build();
assert!(result.is_err());
}
#[test]
fn builder_with_executor_defaults_storage() {
let server = A2aServer::builder()
.executor(DummyExecutor)
.build()
.unwrap();
let _ = server.into_router();
}
#[test]
fn builder_with_explicit_storage() {
let storage = InMemoryA2aStorage::new();
let server = A2aServer::builder()
.executor(DummyExecutor)
.storage(storage)
.bind(([127, 0, 0, 1], 8080))
.build()
.unwrap();
let _ = server.into_router();
}
struct FakeEventStore;
#[async_trait::async_trait]
impl crate::storage::A2aEventStore for FakeEventStore {
fn backend_name(&self) -> &'static str { "fake-backend" }
async fn append_event(&self, _t: &str, _tid: &str, _e: crate::streaming::StreamEvent) -> Result<u64, crate::storage::A2aStorageError> { Ok(0) }
async fn get_events_after(&self, _t: &str, _tid: &str, _s: u64) -> Result<Vec<(u64, crate::streaming::StreamEvent)>, crate::storage::A2aStorageError> { Ok(vec![]) }
async fn latest_sequence(&self, _t: &str, _tid: &str) -> Result<u64, crate::storage::A2aStorageError> { Ok(0) }
async fn cleanup_expired(&self) -> Result<u64, crate::storage::A2aStorageError> { Ok(0) }
}
#[test]
fn mixed_backend_rejected_at_build() {
let result = A2aServer::builder()
.executor(DummyExecutor)
.event_store(FakeEventStore)
.build();
match result {
Err(e) => {
let msg = e.to_string();
assert!(
msg.contains("backend mismatch") || msg.contains("Storage backend mismatch"),
"Error should mention backend mismatch: {msg}"
);
}
Ok(_) => panic!("Mixed backends should be rejected"),
}
}
struct FakeAtomicStore;
#[async_trait::async_trait]
impl crate::storage::A2aAtomicStore for FakeAtomicStore {
fn backend_name(&self) -> &'static str { "fake-atomic" }
async fn create_task_with_events(&self, _t: &str, _o: &str, task: turul_a2a_types::Task, _e: Vec<crate::streaming::StreamEvent>) -> Result<(turul_a2a_types::Task, Vec<u64>), crate::storage::A2aStorageError> { Ok((task, vec![])) }
async fn update_task_status_with_events(&self, _t: &str, _tid: &str, _o: &str, _s: turul_a2a_types::TaskStatus, _e: Vec<crate::streaming::StreamEvent>) -> Result<(turul_a2a_types::Task, Vec<u64>), crate::storage::A2aStorageError> { unimplemented!() }
async fn update_task_with_events(&self, _t: &str, _o: &str, _task: turul_a2a_types::Task, _e: Vec<crate::streaming::StreamEvent>) -> Result<Vec<u64>, crate::storage::A2aStorageError> { Ok(vec![]) }
}
#[test]
fn mixed_atomic_backend_rejected_at_build() {
let result = A2aServer::builder()
.executor(DummyExecutor)
.atomic_store(FakeAtomicStore)
.build();
match result {
Err(e) => {
let msg = e.to_string();
assert!(
msg.contains("backend mismatch") || msg.contains("Storage backend mismatch"),
"Error should mention backend mismatch: {msg}"
);
}
Ok(_) => panic!("Mixed atomic backend should be rejected"),
}
}
#[test]
fn same_backend_accepted() {
let storage = InMemoryA2aStorage::new();
let result = A2aServer::builder()
.executor(DummyExecutor)
.task_storage(storage.clone())
.push_storage(storage.clone())
.event_store(storage.clone())
.atomic_store(storage)
.build();
assert!(result.is_ok(), "Same backend should be accepted");
}
#[test]
fn unified_storage_accepted() {
let result = A2aServer::builder()
.executor(DummyExecutor)
.storage(InMemoryA2aStorage::new())
.build();
assert!(result.is_ok(), "Unified .storage() should be accepted");
}
}