use criterion::{black_box, criterion_group, criterion_main, Criterion};
use a2a_protocol_server::store::{InMemoryTaskStore, TaskStore};
use a2a_protocol_server::streaming::{EventQueueManager, EventQueueReader, EventQueueWriter};
use a2a_protocol_types::events::{StreamResponse, TaskStatusUpdateEvent};
use a2a_protocol_types::message::{Message, MessageId, MessageRole, Part};
use a2a_protocol_types::params::ListTasksParams;
use a2a_protocol_types::task::{ContextId, Task, TaskId, TaskState, TaskStatus};
fn sample_task(i: usize) -> Task {
Task {
id: TaskId::new(format!("task-bench-{i:04}")),
context_id: ContextId::new("ctx-bench-001"),
status: TaskStatus::new(TaskState::Completed),
history: Some(vec![Message {
id: MessageId::new(format!("msg-{i}")),
role: MessageRole::User,
parts: vec![Part::text("Hello, agent!")],
task_id: None,
context_id: None,
reference_task_ids: None,
extensions: None,
metadata: None,
}]),
artifacts: None,
metadata: None,
}
}
fn sample_status_event(i: usize) -> StreamResponse {
StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
task_id: TaskId::new(format!("task-{i}")),
context_id: ContextId::new("ctx-1"),
status: TaskStatus::new(TaskState::Working),
metadata: None,
})
}
fn bench_task_store_save(c: &mut Criterion) {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
c.bench_function("task_store_save", |b| {
let store = InMemoryTaskStore::new();
let task = sample_task(0);
b.iter(|| {
rt.block_on(store.save(black_box(&task))).unwrap();
});
});
}
fn bench_task_store_get(c: &mut Criterion) {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let store = InMemoryTaskStore::new();
for i in 0..100 {
rt.block_on(store.save(&sample_task(i))).unwrap();
}
let target_id = TaskId::new("task-bench-0050");
c.bench_function("task_store_get", |b| {
b.iter(|| {
rt.block_on(store.get(black_box(&target_id))).unwrap();
});
});
}
fn bench_task_store_list(c: &mut Criterion) {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let store = InMemoryTaskStore::new();
for i in 0..200 {
let mut task = sample_task(i);
if i % 2 == 0 {
task.context_id = ContextId::new("ctx-even");
} else {
task.context_id = ContextId::new("ctx-odd");
}
rt.block_on(store.save(&task)).unwrap();
}
let params = ListTasksParams {
tenant: None,
context_id: Some("ctx-even".into()),
status: None,
page_size: Some(50),
page_token: None,
status_timestamp_after: None,
include_artifacts: None,
history_length: None,
};
c.bench_function("task_store_list_filtered", |b| {
b.iter(|| {
rt.block_on(store.list(black_box(¶ms))).unwrap();
});
});
}
fn bench_queue_manager_create_destroy(c: &mut Criterion) {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
c.bench_function("queue_manager_create_destroy", |b| {
let manager = EventQueueManager::new();
let task_id = TaskId::new("task-bench-fixed");
b.iter(|| {
rt.block_on(async {
let _ = manager.get_or_create(black_box(&task_id)).await;
manager.destroy(black_box(&task_id)).await;
});
});
});
}
fn bench_queue_write_read(c: &mut Criterion) {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
c.bench_function("queue_write_read_50_events", |b| {
let manager = EventQueueManager::new();
b.iter(|| {
rt.block_on(async {
let task_id = TaskId::new("task-throughput");
let (writer, reader) = manager.get_or_create(&task_id).await;
let mut reader = reader.expect("new queue should return reader");
for i in 0..50 {
writer
.write(black_box(sample_status_event(i)))
.await
.unwrap();
}
drop(writer);
manager.destroy(&task_id).await;
let mut count = 0;
while reader.read().await.is_some() {
count += 1;
}
debug_assert_eq!(count, 50);
});
});
});
}
criterion_group!(
benches,
bench_task_store_save,
bench_task_store_get,
bench_task_store_list,
bench_queue_manager_create_destroy,
bench_queue_write_read,
);
criterion_main!(benches);