use a2a_protocol_types::artifact::Artifact;
use a2a_protocol_types::events::{StreamResponse, TaskArtifactUpdateEvent, TaskStatusUpdateEvent};
use a2a_protocol_types::message::Part;
use a2a_protocol_types::task::{TaskId, TaskState, TaskStatus};
use a2a_protocol_server::streaming::sse::{write_event, write_keep_alive};
use a2a_protocol_server::streaming::{EventQueueManager, EventQueueReader, EventQueueWriter};
#[tokio::test]
async fn queue_write_and_read() {
let manager = EventQueueManager::new();
let task_id = TaskId::new("task-1");
let (writer, reader) = manager.get_or_create(&task_id).await;
let mut reader = reader.expect("should get reader on first call");
let event = StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
task_id: task_id.clone(),
context_id: "ctx-1".into(),
status: TaskStatus::new(TaskState::Working),
metadata: None,
});
writer.write(event).await.expect("write");
drop(writer);
manager.destroy(&task_id).await;
let received = reader.read().await.expect("read");
let update = received.expect("should be ok");
assert!(
matches!(update, StreamResponse::StatusUpdate(u) if u.status.state == TaskState::Working)
);
assert!(reader.read().await.is_none());
}
#[tokio::test]
async fn queue_multiple_events() {
let manager = EventQueueManager::new();
let task_id = TaskId::new("task-2");
let (writer, reader) = manager.get_or_create(&task_id).await;
let mut reader = reader.expect("reader");
writer
.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
task_id: task_id.clone(),
context_id: "ctx".into(),
status: TaskStatus::new(TaskState::Working),
metadata: None,
}))
.await
.unwrap();
writer
.write(StreamResponse::ArtifactUpdate(TaskArtifactUpdateEvent {
task_id: task_id.clone(),
context_id: "ctx".into(),
artifact: Artifact::new("a1", vec![Part::text("data")]),
append: None,
last_chunk: Some(true),
metadata: None,
}))
.await
.unwrap();
drop(writer);
manager.destroy(&task_id).await;
let mut events = vec![];
while let Some(event) = reader.read().await {
events.push(event.unwrap());
}
assert_eq!(events.len(), 2);
assert!(matches!(&events[0], StreamResponse::StatusUpdate(_)));
assert!(matches!(&events[1], StreamResponse::ArtifactUpdate(_)));
}
#[tokio::test]
async fn queue_get_or_create_reuses_writer() {
let manager = EventQueueManager::new();
let task_id = TaskId::new("task-3");
let (_writer1, reader1) = manager.get_or_create(&task_id).await;
assert!(reader1.is_some());
let (_writer2, reader2) = manager.get_or_create(&task_id).await;
assert!(reader2.is_none());
}
#[tokio::test]
async fn queue_destroy_allows_recreation() {
let manager = EventQueueManager::new();
let task_id = TaskId::new("task-4");
let (_writer, _reader) = manager.get_or_create(&task_id).await;
manager.destroy(&task_id).await;
let (_writer2, reader2) = manager.get_or_create(&task_id).await;
assert!(reader2.is_some());
}
#[test]
fn sse_write_event_format() {
let frame = write_event("message", r#"{"status":"ok"}"#);
let text = String::from_utf8(frame.to_vec()).expect("utf8");
assert!(text.starts_with("event: message\n"));
assert!(text.contains("data: {\"status\":\"ok\"}\n"));
assert!(text.ends_with("\n\n"));
}
#[test]
fn sse_write_event_multiline() {
let frame = write_event("message", "line1\nline2");
let text = String::from_utf8(frame.to_vec()).expect("utf8");
assert!(text.contains("data: line1\n"));
assert!(text.contains("data: line2\n"));
}
#[test]
fn sse_write_keep_alive_format() {
let frame = write_keep_alive();
let text = String::from_utf8(frame.to_vec()).expect("utf8");
assert_eq!(text, ": keep-alive\n\n");
}