use super::*;
#[tokio::test]
async fn streaming_mode_background_processor_updates_store() {
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::Notify;
struct WaitingExecutor {
proceed: Arc<Notify>,
started: Arc<AtomicBool>,
}
impl AgentExecutor for WaitingExecutor {
fn execute<'a>(
&'a self,
ctx: &'a RequestContext,
queue: &'a dyn EventQueueWriter,
) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
Box::pin(async move {
self.started.store(true, Ordering::SeqCst);
self.proceed.notified().await;
queue
.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
task_id: ctx.task_id.clone(),
context_id: ContextId::new(ctx.context_id.clone()),
status: TaskStatus::new(TaskState::Working),
metadata: None,
}))
.await?;
queue
.write(StreamResponse::ArtifactUpdate(TaskArtifactUpdateEvent {
task_id: ctx.task_id.clone(),
context_id: ContextId::new(ctx.context_id.clone()),
artifact: Artifact::new("art-bg", vec![Part::text("background")]),
append: None,
last_chunk: Some(true),
metadata: None,
}))
.await?;
queue
.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
task_id: ctx.task_id.clone(),
context_id: ContextId::new(ctx.context_id.clone()),
status: TaskStatus::new(TaskState::Completed),
metadata: None,
}))
.await?;
Ok(())
})
}
}
let proceed = Arc::new(Notify::new());
let started = Arc::new(AtomicBool::new(false));
let handler = Arc::new(
RequestHandlerBuilder::new(WaitingExecutor {
proceed: proceed.clone(),
started: started.clone(),
})
.build()
.expect("build handler"),
);
let handler_clone = handler.clone();
let send_handle = tokio::spawn(async move {
let result = handler_clone
.on_send_message(make_send_params(), true, None)
.await
.expect("send message");
match result {
SendMessageResult::Stream(mut reader) => {
while let Some(_event) = reader.read().await {}
}
_ => panic!("expected Stream"),
}
});
while !started.load(Ordering::SeqCst) {
tokio::task::yield_now().await;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
proceed.notify_one();
send_handle.await.expect("send handle");
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let list = handler
.on_list_tasks(default_list_params(), None)
.await
.expect("list tasks");
assert_eq!(
list.tasks.len(),
1,
"should have exactly 1 task in the store"
);
let task = &list.tasks[0];
assert_eq!(
task.status.state,
TaskState::Completed,
"task should be in Completed state"
);
assert!(task.artifacts.is_some(), "task should have artifacts");
assert_eq!(
task.artifacts.as_ref().unwrap().len(),
1,
"task should have exactly 1 artifact"
);
}
#[tokio::test]
async fn streaming_mode_push_delivery_with_cooperative_executor() {
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::Notify;
struct WaitingExecutor {
proceed: Arc<Notify>,
started: Arc<AtomicBool>,
}
impl AgentExecutor for WaitingExecutor {
fn execute<'a>(
&'a self,
ctx: &'a RequestContext,
queue: &'a dyn EventQueueWriter,
) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
Box::pin(async move {
self.started.store(true, Ordering::SeqCst);
self.proceed.notified().await;
queue
.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
task_id: ctx.task_id.clone(),
context_id: ContextId::new(ctx.context_id.clone()),
status: TaskStatus::new(TaskState::Working),
metadata: None,
}))
.await?;
queue
.write(StreamResponse::ArtifactUpdate(TaskArtifactUpdateEvent {
task_id: ctx.task_id.clone(),
context_id: ContextId::new(ctx.context_id.clone()),
artifact: Artifact::new("art-coop", vec![Part::text("coop")]),
append: None,
last_chunk: None,
metadata: None,
}))
.await?;
queue
.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
task_id: ctx.task_id.clone(),
context_id: ContextId::new(ctx.context_id.clone()),
status: TaskStatus::new(TaskState::Completed),
metadata: None,
}))
.await?;
Ok(())
})
}
}
let calls: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let proceed = Arc::new(Notify::new());
let started = Arc::new(AtomicBool::new(false));
let handler = Arc::new(
RequestHandlerBuilder::new(WaitingExecutor {
proceed: proceed.clone(),
started: started.clone(),
})
.with_push_sender(SharedRecordingPushSender {
calls: calls.clone(),
})
.build()
.expect("build handler"),
);
let handler_clone = handler.clone();
let send_handle = tokio::spawn(async move {
let result = handler_clone
.on_send_message(make_send_params(), true, None)
.await
.expect("send message");
match result {
SendMessageResult::Stream(mut reader) => {
while let Some(_event) = reader.read().await {}
}
_ => panic!("expected Stream"),
}
});
while !started.load(Ordering::SeqCst) {
tokio::task::yield_now().await;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let list = handler
.on_list_tasks(default_list_params(), None)
.await
.expect("list tasks");
assert!(
!list.tasks.is_empty(),
"task should exist in store before proceeding"
);
let task_id = list.tasks[0].id.0.clone();
let config = TaskPushNotificationConfig::new(&task_id, "https://example.com/push-test");
handler
.on_set_push_config(config, None)
.await
.expect("set push config");
proceed.notify_one();
send_handle.await.expect("send handle");
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let push_calls = calls.lock().unwrap().clone();
let count = push_calls.len();
assert!(
count >= 2,
"expected at least 2 push calls (status + artifact), got {count}"
);
assert!(
push_calls
.iter()
.all(|u| u == "https://example.com/push-test"),
"all push calls should target the configured URL, got: {push_calls:?}"
);
}
#[tokio::test]
async fn streaming_mode_background_drains_after_executor_done() {
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::Notify;
struct WaitingArtifactExecutor {
proceed: Arc<Notify>,
started: Arc<AtomicBool>,
}
impl AgentExecutor for WaitingArtifactExecutor {
fn execute<'a>(
&'a self,
ctx: &'a RequestContext,
queue: &'a dyn EventQueueWriter,
) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
Box::pin(async move {
self.started.store(true, Ordering::SeqCst);
self.proceed.notified().await;
queue
.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
task_id: ctx.task_id.clone(),
context_id: ContextId::new(ctx.context_id.clone()),
status: TaskStatus::new(TaskState::Working),
metadata: None,
}))
.await?;
queue
.write(StreamResponse::ArtifactUpdate(TaskArtifactUpdateEvent {
task_id: ctx.task_id.clone(),
context_id: ContextId::new(ctx.context_id.clone()),
artifact: Artifact::new("art-1", vec![Part::text("artifact content")]),
append: None,
last_chunk: Some(true),
metadata: None,
}))
.await?;
queue
.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
task_id: ctx.task_id.clone(),
context_id: ContextId::new(ctx.context_id.clone()),
status: TaskStatus::new(TaskState::Completed),
metadata: None,
}))
.await?;
Ok(())
})
}
}
let proceed = Arc::new(Notify::new());
let started = Arc::new(AtomicBool::new(false));
let handler = Arc::new(
RequestHandlerBuilder::new(WaitingArtifactExecutor {
proceed: proceed.clone(),
started: started.clone(),
})
.build()
.expect("build handler"),
);
let handler_clone = handler.clone();
let send_handle = tokio::spawn(async move {
let result = handler_clone
.on_send_message(make_send_params(), true, None)
.await
.expect("send message");
match result {
SendMessageResult::Stream(mut reader) => {
let mut event_count = 0;
while let Some(event) = reader.read().await {
if event.is_ok() {
event_count += 1;
}
}
assert_eq!(
event_count, 3,
"should receive exactly 3 events (Working + Artifact + Completed), got {event_count}"
);
}
_ => panic!("expected Stream"),
}
});
while !started.load(Ordering::SeqCst) {
tokio::task::yield_now().await;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
proceed.notify_one();
send_handle.await.expect("send handle");
let mut found = false;
for _ in 0..40 {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let list = handler
.on_list_tasks(default_list_params(), None)
.await
.expect("list tasks");
if list
.tasks
.iter()
.any(|t| t.status.state == TaskState::Completed && t.artifacts.is_some())
{
found = true;
break;
}
}
assert!(found, "background processor should have drained all events");
}