#![cfg(all(feature = "http-client", feature = "http-server"))]
mod common;
use a2a_rs::{
adapter::{
DefaultRequestProcessor, HttpClient, HttpServer, InMemoryTaskStorage, SimpleAgentInfo,
},
domain::{ListTasksParams, Message, TaskState},
port::AsyncTaskManager,
services::AsyncA2AClient,
};
use common::TestBusinessHandler;
use std::time::Duration;
use tokio::sync::oneshot;
async fn setup_server_with_tasks(port: u16) -> (oneshot::Sender<()>, InMemoryTaskStorage) {
let storage = InMemoryTaskStorage::new();
let storage_clone = storage.clone();
let handler = TestBusinessHandler::with_storage(storage);
let test_agent_info = SimpleAgentInfo::new(
"test-agent".to_string(),
format!("http://localhost:{}", port),
);
let processor = DefaultRequestProcessor::with_handler(handler, test_agent_info);
let agent_info = SimpleAgentInfo::new(
"Task List Test Agent".to_string(),
format!("http://localhost:{}", port),
)
.with_state_transition_history();
let server = HttpServer::new(processor, agent_info, format!("127.0.0.1:{}", port));
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
tokio::spawn(async move {
tokio::select! {
_ = server.start() => {},
_ = shutdown_rx => {}
}
});
tokio::time::sleep(Duration::from_millis(100)).await;
(shutdown_tx, storage_clone)
}
#[tokio::test]
async fn test_task_list_basic() {
let port = 9001;
let (shutdown_tx, _storage) = setup_server_with_tasks(port).await;
let client = HttpClient::new(format!("http://localhost:{}", port));
let task_ids = vec![
format!("task-{}", uuid::Uuid::new_v4()),
format!("task-{}", uuid::Uuid::new_v4()),
format!("task-{}", uuid::Uuid::new_v4()),
];
for task_id in &task_ids {
let message = Message::user_text(
format!("Test message for {}", task_id),
format!("msg-{}", uuid::Uuid::new_v4()),
);
client
.send_task_message(task_id, &message, None, None)
.await
.expect("Failed to create task");
}
tokio::time::sleep(Duration::from_millis(100)).await;
let params = ListTasksParams::default();
let result = client
.list_tasks(¶ms)
.await
.expect("Failed to list tasks");
assert!(result.total_size >= 3, "Should have at least 3 tasks");
assert!(result.tasks.len() >= 3, "Should return at least 3 tasks");
assert_eq!(result.page_size, 50, "Default page size should be 50");
shutdown_tx.send(()).ok();
}
#[tokio::test]
async fn test_task_list_filter_by_context() {
let port = 9002;
let (shutdown_tx, _storage) = setup_server_with_tasks(port).await;
let client = HttpClient::new(format!("http://localhost:{}", port));
let context_a = "context-a";
let context_b = "context-b";
for i in 0..3 {
let task_id = format!("task-a-{}", i);
let message = Message::user_text(
format!("Message in context A - {}", i),
format!("msg-{}", uuid::Uuid::new_v4()),
);
client
.send_task_message(&task_id, &message, Some(context_a), None)
.await
.expect("Failed to create task in context A");
}
for i in 0..2 {
let task_id = format!("task-b-{}", i);
let message = Message::user_text(
format!("Message in context B - {}", i),
format!("msg-{}", uuid::Uuid::new_v4()),
);
client
.send_task_message(&task_id, &message, Some(context_b), None)
.await
.expect("Failed to create task in context B");
}
tokio::time::sleep(Duration::from_millis(100)).await;
let params = ListTasksParams {
context_id: Some(context_a.to_string()),
..Default::default()
};
let result = client
.list_tasks(¶ms)
.await
.expect("Failed to list tasks");
assert_eq!(result.total_size, 3, "Should have 3 tasks in context A");
for task in &result.tasks {
assert_eq!(task.context_id, context_a, "Task should be in context A");
}
let params = ListTasksParams {
context_id: Some(context_b.to_string()),
..Default::default()
};
let result = client
.list_tasks(¶ms)
.await
.expect("Failed to list tasks");
assert_eq!(result.total_size, 2, "Should have 2 tasks in context B");
for task in &result.tasks {
assert_eq!(task.context_id, context_b, "Task should be in context B");
}
shutdown_tx.send(()).ok();
}
#[tokio::test]
async fn test_task_list_filter_by_status() {
let port = 9003;
let (shutdown_tx, storage) = setup_server_with_tasks(port).await;
let client = HttpClient::new(format!("http://localhost:{}", port));
let task_id_1 = format!("task-{}", uuid::Uuid::new_v4());
let task_id_2 = format!("task-{}", uuid::Uuid::new_v4());
let task_id_3 = format!("task-{}", uuid::Uuid::new_v4());
for task_id in &[&task_id_1, &task_id_2, &task_id_3] {
let message = Message::user_text(
"Test message".to_string(),
format!("msg-{}", uuid::Uuid::new_v4()),
);
client
.send_task_message(task_id, &message, None, None)
.await
.expect("Failed to create task");
}
tokio::time::sleep(Duration::from_millis(50)).await;
storage
.update_task_status(&task_id_1, TaskState::Working, None)
.await
.expect("Failed to update task 1");
storage
.update_task_status(&task_id_2, TaskState::Completed, None)
.await
.expect("Failed to update task 2");
tokio::time::sleep(Duration::from_millis(50)).await;
let params = ListTasksParams {
status: Some(TaskState::Working),
..Default::default()
};
let result = client
.list_tasks(¶ms)
.await
.expect("Failed to list tasks");
assert!(
result.total_size >= 1,
"Should have at least 1 working task"
);
for task in &result.tasks {
assert_eq!(
task.status.state,
TaskState::Working,
"Task should be in Working state"
);
}
let params = ListTasksParams {
status: Some(TaskState::Completed),
..Default::default()
};
let result = client
.list_tasks(¶ms)
.await
.expect("Failed to list tasks");
assert!(
result.total_size >= 1,
"Should have at least 1 completed task"
);
for task in &result.tasks {
assert_eq!(
task.status.state,
TaskState::Completed,
"Task should be in Completed state"
);
}
shutdown_tx.send(()).ok();
}
#[tokio::test]
async fn test_task_list_pagination() {
let port = 9004;
let (shutdown_tx, _storage) = setup_server_with_tasks(port).await;
let client = HttpClient::new(format!("http://localhost:{}", port));
for i in 0..10 {
let task_id = format!("task-{}", i);
let message = Message::user_text(
format!("Message {}", i),
format!("msg-{}", uuid::Uuid::new_v4()),
);
client
.send_task_message(&task_id, &message, None, None)
.await
.expect("Failed to create task");
}
tokio::time::sleep(Duration::from_millis(100)).await;
let params = ListTasksParams {
page_size: Some(3),
..Default::default()
};
let result = client
.list_tasks(¶ms)
.await
.expect("Failed to list tasks");
assert_eq!(result.page_size, 3, "Page size should be 3");
assert_eq!(result.tasks.len(), 3, "Should return exactly 3 tasks");
assert!(result.total_size >= 10, "Total size should be at least 10");
assert!(
!result.next_page_token.is_empty(),
"Should have next page token"
);
let params = ListTasksParams {
page_size: Some(3),
page_token: Some(result.next_page_token.clone()),
..Default::default()
};
let result_page2 = client
.list_tasks(¶ms)
.await
.expect("Failed to list tasks page 2");
assert_eq!(result_page2.page_size, 3, "Page size should be 3");
assert_eq!(result_page2.tasks.len(), 3, "Should return exactly 3 tasks");
let page1_ids: Vec<_> = result.tasks.iter().map(|t| &t.id).collect();
let page2_ids: Vec<_> = result_page2.tasks.iter().map(|t| &t.id).collect();
for id in &page2_ids {
assert!(
!page1_ids.contains(id),
"Page 2 should have different tasks than page 1"
);
}
shutdown_tx.send(()).ok();
}
#[tokio::test]
async fn test_task_list_page_size_clamping() {
let port = 9005;
let (shutdown_tx, _storage) = setup_server_with_tasks(port).await;
let client = HttpClient::new(format!("http://localhost:{}", port));
let message = Message::user_text(
"Test message".to_string(),
format!("msg-{}", uuid::Uuid::new_v4()),
);
client
.send_task_message("task-1", &message, None, None)
.await
.expect("Failed to create task");
tokio::time::sleep(Duration::from_millis(50)).await;
let params = ListTasksParams {
page_size: Some(150), ..Default::default()
};
let result = client
.list_tasks(¶ms)
.await
.expect("Failed to list tasks");
assert_eq!(result.page_size, 100, "Page size should be clamped to 100");
shutdown_tx.send(()).ok();
}
#[tokio::test]
async fn test_task_list_history_length() {
let port = 9006;
let (shutdown_tx, _storage) = setup_server_with_tasks(port).await;
let client = HttpClient::new(format!("http://localhost:{}", port));
let task_id = format!("task-{}", uuid::Uuid::new_v4());
for i in 0..5 {
let message = Message::user_text(
format!("Message {}", i),
format!("msg-{}", uuid::Uuid::new_v4()),
);
client
.send_task_message(&task_id, &message, None, None)
.await
.expect("Failed to send message");
}
tokio::time::sleep(Duration::from_millis(100)).await;
let params = ListTasksParams {
history_length: Some(2),
..Default::default()
};
let result = client
.list_tasks(¶ms)
.await
.expect("Failed to list tasks");
let task = result
.tasks
.iter()
.find(|t| t.id == task_id)
.expect("Task not found");
if !task.history.is_empty() {
assert_eq!(
task.history.len(),
2,
"History should be limited to 2 messages"
);
}
let params = ListTasksParams {
history_length: Some(0),
..Default::default()
};
let result = client
.list_tasks(¶ms)
.await
.expect("Failed to list tasks");
let task = result
.tasks
.iter()
.find(|t| t.id == task_id)
.expect("Task not found");
assert!(
task.history.is_empty(),
"History should be empty when history_length is 0"
);
shutdown_tx.send(()).ok();
}
#[tokio::test]
async fn test_task_list_include_artifacts() {
let port = 9007;
let (shutdown_tx, storage) = setup_server_with_tasks(port).await;
let client = HttpClient::new(format!("http://localhost:{}", port));
let task_id = format!("task-{}", uuid::Uuid::new_v4());
let message = Message::user_text(
"Test message".to_string(),
format!("msg-{}", uuid::Uuid::new_v4()),
);
client
.send_task_message(&task_id, &message, None, None)
.await
.expect("Failed to create task");
let mut task = storage
.get_task(&task_id, None)
.await
.expect("Failed to get task");
task.add_artifact(a2a_rs::domain::Artifact {
artifact_id: format!("artifact-{}", uuid::Uuid::new_v4()),
name: "Test Artifact".to_string(),
description: String::new(),
parts: vec![a2a_rs::domain::Part::text("Artifact content".to_string())],
metadata: None.into(),
extensions: Vec::new(),
..Default::default()
});
let task_state = match task.status.state {
::buffa::EnumValue::Known(s) => s,
_ => TaskState::Unknown,
};
storage
.update_task_status(&task_id, task_state, None)
.await
.ok();
tokio::time::sleep(Duration::from_millis(50)).await;
let params = ListTasksParams {
include_artifacts: Some(false),
..Default::default()
};
let result = client
.list_tasks(¶ms)
.await
.expect("Failed to list tasks");
let listed_task = result
.tasks
.iter()
.find(|t| t.id == task_id)
.expect("Task not found");
assert!(
listed_task.artifacts.is_empty(),
"Artifacts should be excluded when include_artifacts is false"
);
let params = ListTasksParams {
include_artifacts: Some(true),
..Default::default()
};
let result = client
.list_tasks(¶ms)
.await
.expect("Failed to list tasks");
let _listed_task = result
.tasks
.iter()
.find(|t| t.id == task_id)
.expect("Task not found");
shutdown_tx.send(()).ok();
}
#[tokio::test]
async fn test_task_list_combined_filters() {
let port = 9008;
let (shutdown_tx, storage) = setup_server_with_tasks(port).await;
let client = HttpClient::new(format!("http://localhost:{}", port));
let context_id = "test-context-combined";
let task_ids: Vec<String> = (0..5)
.map(|_i| format!("combined-task-{}", uuid::Uuid::new_v4()))
.collect();
for task_id in &task_ids {
let message = Message::user_text(
format!("Message for {}", task_id),
format!("msg-{}", uuid::Uuid::new_v4()),
);
client
.send_task_message(task_id, &message, Some(context_id), None)
.await
.expect("Failed to create task");
}
tokio::time::sleep(Duration::from_millis(100)).await;
storage
.update_task_status(&task_ids[2], TaskState::Completed, None)
.await
.ok();
storage
.update_task_status(&task_ids[3], TaskState::Completed, None)
.await
.ok();
storage
.update_task_status(&task_ids[4], TaskState::Failed, None)
.await
.ok();
tokio::time::sleep(Duration::from_millis(50)).await;
let params = ListTasksParams {
context_id: Some(context_id.to_string()),
status: Some(TaskState::Working),
page_size: Some(10),
..Default::default()
};
let result = client
.list_tasks(¶ms)
.await
.expect("Failed to list tasks");
assert_eq!(
result.total_size, 2,
"Should have 2 working tasks in context"
);
for task in &result.tasks {
assert_eq!(task.context_id, context_id);
assert_eq!(task.status.state, TaskState::Working);
}
shutdown_tx.send(()).ok();
}
#[tokio::test]
async fn test_task_list_empty_results() {
let port = 9009;
let (shutdown_tx, _storage) = setup_server_with_tasks(port).await;
let client = HttpClient::new(format!("http://localhost:{}", port));
let params = ListTasksParams {
context_id: Some("non-existent-context".to_string()),
..Default::default()
};
let result = client
.list_tasks(¶ms)
.await
.expect("Failed to list tasks");
assert_eq!(result.total_size, 0, "Should have no tasks");
assert_eq!(result.tasks.len(), 0, "Should return empty array");
assert!(
result.next_page_token.is_empty(),
"Should have empty next page token"
);
shutdown_tx.send(()).ok();
}