use std::future::Future;
use std::pin::Pin;
use a2a_protocol_types::artifact::Artifact;
use a2a_protocol_types::error::A2aResult;
use a2a_protocol_types::events::{StreamResponse, TaskArtifactUpdateEvent, TaskStatusUpdateEvent};
use a2a_protocol_types::message::Part;
use a2a_protocol_types::task::{ContextId, TaskState, TaskStatus};
use crate::request_context::RequestContext;
use crate::streaming::EventQueueWriter;
pub fn boxed_future<'a, T>(
fut: impl Future<Output = T> + Send + 'a,
) -> Pin<Box<dyn Future<Output = T> + Send + 'a>> {
Box::pin(fut)
}
#[macro_export]
macro_rules! agent_executor {
($ty:ty, |$ctx:ident, $queue:ident| async $body:block) => {
impl $crate::executor::AgentExecutor for $ty {
fn execute<'a>(
&'a self,
$ctx: &'a $crate::request_context::RequestContext,
$queue: &'a dyn $crate::streaming::EventQueueWriter,
) -> ::std::pin::Pin<
::std::boxed::Box<
dyn ::std::future::Future<
Output = ::a2a_protocol_types::error::A2aResult<()>,
> + ::std::marker::Send
+ 'a,
>,
> {
::std::boxed::Box::pin(async move $body)
}
}
};
($ty:ty,
execute: |$ctx:ident, $queue:ident| async $exec_body:block,
cancel: |$cctx:ident, $cqueue:ident| async $cancel_body:block
) => {
impl $crate::executor::AgentExecutor for $ty {
fn execute<'a>(
&'a self,
$ctx: &'a $crate::request_context::RequestContext,
$queue: &'a dyn $crate::streaming::EventQueueWriter,
) -> ::std::pin::Pin<
::std::boxed::Box<
dyn ::std::future::Future<
Output = ::a2a_protocol_types::error::A2aResult<()>,
> + ::std::marker::Send
+ 'a,
>,
> {
::std::boxed::Box::pin(async move $exec_body)
}
fn cancel<'a>(
&'a self,
$cctx: &'a $crate::request_context::RequestContext,
$cqueue: &'a dyn $crate::streaming::EventQueueWriter,
) -> ::std::pin::Pin<
::std::boxed::Box<
dyn ::std::future::Future<
Output = ::a2a_protocol_types::error::A2aResult<()>,
> + ::std::marker::Send
+ 'a,
>,
> {
::std::boxed::Box::pin(async move $cancel_body)
}
}
};
}
pub struct EventEmitter<'a> {
pub ctx: &'a RequestContext,
pub queue: &'a dyn EventQueueWriter,
}
impl<'a> EventEmitter<'a> {
#[must_use]
pub fn new(ctx: &'a RequestContext, queue: &'a dyn EventQueueWriter) -> Self {
Self { ctx, queue }
}
pub async fn status(&self, state: TaskState) -> A2aResult<()> {
self.queue
.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
task_id: self.ctx.task_id.clone(),
context_id: ContextId::new(self.ctx.context_id.clone()),
status: TaskStatus::new(state),
metadata: None,
}))
.await
}
pub async fn artifact(
&self,
id: &str,
parts: Vec<Part>,
append: Option<bool>,
last_chunk: Option<bool>,
) -> A2aResult<()> {
self.queue
.write(StreamResponse::ArtifactUpdate(TaskArtifactUpdateEvent {
task_id: self.ctx.task_id.clone(),
context_id: ContextId::new(self.ctx.context_id.clone()),
artifact: Artifact::new(id, parts),
append,
last_chunk,
metadata: None,
}))
.await
}
#[must_use]
pub fn is_cancelled(&self) -> bool {
self.ctx.cancellation_token.is_cancelled()
}
}
#[cfg(test)]
mod tests {
use super::*;
use a2a_protocol_types::message::{Message, MessageId, MessageRole};
use a2a_protocol_types::task::TaskId;
fn make_request_context() -> RequestContext {
let message = Message {
id: MessageId::new("test-msg"),
role: MessageRole::User,
parts: vec![],
task_id: None,
context_id: None,
reference_task_ids: None,
extensions: None,
metadata: None,
};
RequestContext::new(message, TaskId::new("test-task"), "test-ctx".into())
}
struct DummyWriter;
impl EventQueueWriter for DummyWriter {
fn write<'a>(
&'a self,
_event: a2a_protocol_types::events::StreamResponse,
) -> Pin<Box<dyn Future<Output = a2a_protocol_types::error::A2aResult<()>> + Send + 'a>>
{
Box::pin(async { Ok(()) })
}
fn close<'a>(
&'a self,
) -> Pin<Box<dyn Future<Output = a2a_protocol_types::error::A2aResult<()>> + Send + 'a>>
{
Box::pin(async { Ok(()) })
}
}
#[test]
fn is_cancelled_returns_false_initially() {
let ctx = make_request_context();
let emit = EventEmitter::new(&ctx, &DummyWriter);
assert!(!emit.is_cancelled());
}
#[test]
fn is_cancelled_returns_true_after_cancel() {
let ctx = make_request_context();
let emit = EventEmitter::new(&ctx, &DummyWriter);
ctx.cancellation_token.cancel();
assert!(emit.is_cancelled());
}
#[tokio::test]
async fn emit_status_writes_to_queue() {
let ctx = make_request_context();
let emit = EventEmitter::new(&ctx, &DummyWriter);
emit.status(TaskState::Working).await.unwrap();
emit.status(TaskState::Completed).await.unwrap();
}
#[tokio::test]
async fn emit_artifact_writes_to_queue() {
let ctx = make_request_context();
let emit = EventEmitter::new(&ctx, &DummyWriter);
emit.artifact("result-1", vec![Part::text("hello")], None, Some(true))
.await
.unwrap();
}
#[tokio::test]
async fn emit_artifact_with_append() {
let ctx = make_request_context();
let emit = EventEmitter::new(&ctx, &DummyWriter);
emit.artifact(
"chunk-1",
vec![Part::text("part1")],
Some(false),
Some(false),
)
.await
.unwrap();
emit.artifact("chunk-1", vec![Part::text("part2")], Some(true), Some(true))
.await
.unwrap();
}
#[test]
fn boxed_future_wraps_async_block() {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
let result = rt.block_on(boxed_future(async { 42 }));
assert_eq!(result, 42);
}
struct CancelableTestExecutor;
agent_executor!(CancelableTestExecutor,
execute: |_ctx, _queue| async { Ok(()) },
cancel: |_ctx, _queue| async { Ok(()) }
);
#[tokio::test]
async fn macro_cancel_form_compiles_and_runs() {
use crate::executor::AgentExecutor;
let executor = CancelableTestExecutor;
let ctx = make_request_context();
let writer = DummyWriter;
executor.execute(&ctx, &writer).await.unwrap();
executor.cancel(&ctx, &writer).await.unwrap();
}
}