use std::sync::Arc;
use async_trait::async_trait;
use crate::{
application::{HasPushNotifier, HasStreaming, HasTaskLifecycle, TaskStatusBroadcast},
domain::{A2AError, ContextId, Message, Part, Role, Task, TaskId, TaskState},
port::{AsyncMessageHandler, AsyncPushNotifier, AsyncStreamingHandler, AsyncTaskLifecycle},
};
#[async_trait]
pub trait Responder: Send + Sync {
async fn respond(
&self,
message: &Message,
task: &Task,
) -> Result<(Message, TaskState), A2AError>;
}
#[derive(Clone, Debug, Default)]
pub struct EchoResponder;
#[async_trait]
impl Responder for EchoResponder {
async fn respond(
&self,
message: &Message,
task: &Task,
) -> Result<(Message, TaskState), A2AError> {
let echoed = message
.parts
.iter()
.filter_map(|p| p.get_text())
.collect::<Vec<_>>()
.join(" ");
let reply = Message::builder()
.role(Role::Agent)
.parts(vec![Part::text(format!("Echo: {}", echoed))])
.message_id(uuid::Uuid::new_v4().to_string())
.task_id(task.id.clone())
.context_id(message.context_id.clone())
.build();
Ok((reply, TaskState::Working))
}
}
#[derive(Clone)]
pub struct ResponderMessageHandler {
task_lifecycle: Arc<dyn AsyncTaskLifecycle>,
streaming: Arc<dyn AsyncStreamingHandler>,
push_notifier: Arc<dyn AsyncPushNotifier>,
responder: Arc<dyn Responder>,
}
impl ResponderMessageHandler {
pub fn new(
task_lifecycle: impl AsyncTaskLifecycle + 'static,
streaming: impl AsyncStreamingHandler + 'static,
push_notifier: impl AsyncPushNotifier + 'static,
responder: impl Responder + 'static,
) -> Self {
Self {
task_lifecycle: Arc::new(task_lifecycle),
streaming: Arc::new(streaming),
push_notifier: Arc::new(push_notifier),
responder: Arc::new(responder),
}
}
pub fn echo(
task_lifecycle: impl AsyncTaskLifecycle + 'static,
streaming: impl AsyncStreamingHandler + 'static,
push_notifier: impl AsyncPushNotifier + 'static,
) -> Self {
Self::new(task_lifecycle, streaming, push_notifier, EchoResponder)
}
}
impl HasTaskLifecycle for ResponderMessageHandler {
fn lifecycle(&self) -> &dyn AsyncTaskLifecycle {
self.task_lifecycle.as_ref()
}
}
impl HasStreaming for ResponderMessageHandler {
fn streaming(&self) -> &dyn AsyncStreamingHandler {
self.streaming.as_ref()
}
}
impl HasPushNotifier for ResponderMessageHandler {
fn push_notifier(&self) -> &dyn AsyncPushNotifier {
self.push_notifier.as_ref()
}
}
#[async_trait]
impl AsyncMessageHandler for ResponderMessageHandler {
async fn process_message(
&self,
task_id: &str,
message: &Message,
session_id: Option<&str>,
) -> Result<Task, A2AError> {
let id: TaskId = task_id.parse()?;
if !self.task_lifecycle.exists(&id).await? {
let context_id: ContextId = session_id.unwrap_or("default").parse()?;
self.task_lifecycle.create(&id, &context_id).await?;
}
let task = self
.update_and_broadcast(&id, TaskState::Working, Some(message.clone()))
.await?;
let (reply, state) = self.responder.respond(message, &task).await?;
let final_task = self.update_and_broadcast(&id, state, Some(reply)).await?;
Ok(final_task)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::storage::InMemoryTaskStorage;
use crate::adapter::streaming::InMemoryStreamingHandler;
struct FixedResponder;
#[async_trait]
impl Responder for FixedResponder {
async fn respond(
&self,
_message: &Message,
task: &Task,
) -> Result<(Message, TaskState), A2AError> {
let reply = Message::builder()
.role(Role::Agent)
.parts(vec![Part::text("done".to_string())])
.message_id("fixed-1".to_string())
.task_id(task.id.clone())
.build();
Ok((reply, TaskState::Completed))
}
}
#[tokio::test]
async fn injected_responder_controls_reply_and_state() {
let storage = InMemoryTaskStorage::new();
let streaming = InMemoryStreamingHandler::new();
let push = storage.push_notifier();
let handler = ResponderMessageHandler::new(storage, streaming, push, FixedResponder);
let message = Message::user_text("anything".to_string(), "m1".to_string());
let task = handler.process_message("t1", &message, None).await.unwrap();
assert_eq!(task.status.state, TaskState::Completed);
let replied = task.history.iter().any(|m| {
m.parts
.iter()
.filter_map(|p| p.get_text())
.any(|t| t == "done")
});
assert!(replied, "responder reply should be in task history");
}
}