#![cfg(feature = "sqlite")]
use a2a_protocol_server::store::tenant::TenantContext;
use a2a_protocol_server::store::TaskStore;
use a2a_protocol_server::store::TenantAwareSqliteTaskStore;
use a2a_protocol_types::params::ListTasksParams;
use a2a_protocol_types::task::{ContextId, Task, TaskId, TaskState, TaskStatus};
fn make_task(id: &str, ctx: &str, state: TaskState) -> Task {
Task {
id: TaskId::new(id),
context_id: ContextId::new(ctx),
status: TaskStatus::new(state),
history: None,
artifacts: None,
metadata: None,
}
}
async fn new_store() -> TenantAwareSqliteTaskStore {
TenantAwareSqliteTaskStore::new("sqlite::memory:")
.await
.expect("failed to create in-memory store")
}
#[tokio::test]
async fn new_creates_store() {
let _store = new_store().await;
}
#[tokio::test]
async fn save_and_get_roundtrip() {
let store = new_store().await;
let task = make_task("t1", "ctx1", TaskState::Submitted);
TenantContext::scope("acme", async {
store.save(task.clone()).await.unwrap();
let fetched = store.get(&TaskId::new("t1")).await.unwrap();
assert!(fetched.is_some());
let fetched = fetched.unwrap();
assert_eq!(fetched.id.0, "t1");
assert_eq!(fetched.context_id.0, "ctx1");
assert_eq!(fetched.status.state, TaskState::Submitted);
})
.await;
}
#[tokio::test]
async fn get_nonexistent_returns_none() {
let store = new_store().await;
TenantContext::scope("acme", async {
let result = store.get(&TaskId::new("does-not-exist")).await.unwrap();
assert!(result.is_none());
})
.await;
}
#[tokio::test]
async fn save_upserts_existing() {
let store = new_store().await;
TenantContext::scope("acme", async {
store
.save(make_task("t1", "ctx1", TaskState::Submitted))
.await
.unwrap();
let fetched = store.get(&TaskId::new("t1")).await.unwrap().unwrap();
assert_eq!(fetched.status.state, TaskState::Submitted);
store
.save(make_task("t1", "ctx1", TaskState::Working))
.await
.unwrap();
let fetched = store.get(&TaskId::new("t1")).await.unwrap().unwrap();
assert_eq!(fetched.status.state, TaskState::Working);
assert_eq!(store.count().await.unwrap(), 1);
})
.await;
}
#[tokio::test]
async fn insert_if_absent_returns_true_then_false() {
let store = new_store().await;
TenantContext::scope("acme", async {
let task = make_task("t1", "ctx1", TaskState::Submitted);
let inserted = store.insert_if_absent(task.clone()).await.unwrap();
assert!(inserted, "first insert should return true");
let inserted_again = store.insert_if_absent(task).await.unwrap();
assert!(!inserted_again, "duplicate insert should return false");
let fetched = store.get(&TaskId::new("t1")).await.unwrap();
assert!(fetched.is_some());
})
.await;
}
#[tokio::test]
async fn delete_removes_task() {
let store = new_store().await;
TenantContext::scope("acme", async {
store
.save(make_task("t1", "ctx1", TaskState::Submitted))
.await
.unwrap();
assert!(store.get(&TaskId::new("t1")).await.unwrap().is_some());
store.delete(&TaskId::new("t1")).await.unwrap();
assert!(store.get(&TaskId::new("t1")).await.unwrap().is_none());
})
.await;
}
#[tokio::test]
async fn count_reflects_stored_tasks() {
let store = new_store().await;
TenantContext::scope("acme", async {
for i in 1..=3 {
store
.save(make_task(&format!("t{i}"), "ctx1", TaskState::Submitted))
.await
.unwrap();
}
assert_eq!(store.count().await.unwrap(), 3);
store.delete(&TaskId::new("t2")).await.unwrap();
assert_eq!(store.count().await.unwrap(), 2);
})
.await;
}
#[tokio::test]
async fn list_returns_all_tasks() {
let store = new_store().await;
TenantContext::scope("acme", async {
store
.save(make_task("t1", "ctx1", TaskState::Submitted))
.await
.unwrap();
store
.save(make_task("t2", "ctx2", TaskState::Working))
.await
.unwrap();
store
.save(make_task("t3", "ctx1", TaskState::Completed))
.await
.unwrap();
let params = ListTasksParams::default();
let resp = store.list(¶ms).await.unwrap();
assert_eq!(resp.tasks.len(), 3);
assert!(resp.next_page_token.is_empty());
})
.await;
}
#[tokio::test]
async fn list_filters_by_context_id() {
let store = new_store().await;
TenantContext::scope("acme", async {
store
.save(make_task("t1", "ctx-a", TaskState::Submitted))
.await
.unwrap();
store
.save(make_task("t2", "ctx-b", TaskState::Submitted))
.await
.unwrap();
store
.save(make_task("t3", "ctx-a", TaskState::Working))
.await
.unwrap();
let params = ListTasksParams {
context_id: Some("ctx-a".to_string()),
..Default::default()
};
let resp = store.list(¶ms).await.unwrap();
assert_eq!(resp.tasks.len(), 2);
for task in &resp.tasks {
assert_eq!(task.context_id.0, "ctx-a");
}
})
.await;
}
#[tokio::test]
async fn list_filters_by_status() {
let store = new_store().await;
TenantContext::scope("acme", async {
store
.save(make_task("t1", "ctx1", TaskState::Submitted))
.await
.unwrap();
store
.save(make_task("t2", "ctx1", TaskState::Working))
.await
.unwrap();
store
.save(make_task("t3", "ctx1", TaskState::Working))
.await
.unwrap();
let params = ListTasksParams {
status: Some(TaskState::Working),
..Default::default()
};
let resp = store.list(¶ms).await.unwrap();
assert_eq!(resp.tasks.len(), 2);
for task in &resp.tasks {
assert_eq!(task.status.state, TaskState::Working);
}
})
.await;
}
#[tokio::test]
async fn list_paginates_with_page_size() {
let store = new_store().await;
TenantContext::scope("acme", async {
for i in 1..=5 {
store
.save(make_task(&format!("t{i}"), "ctx1", TaskState::Submitted))
.await
.unwrap();
}
let params = ListTasksParams {
page_size: Some(2),
..Default::default()
};
let resp = store.list(¶ms).await.unwrap();
assert_eq!(resp.tasks.len(), 2);
assert!(
!resp.next_page_token.is_empty(),
"expected a next page token"
);
})
.await;
}
#[tokio::test]
async fn list_paginates_with_page_token() {
let store = new_store().await;
TenantContext::scope("acme", async {
for i in 1..=5 {
store
.save(make_task(&format!("t{i}"), "ctx1", TaskState::Submitted))
.await
.unwrap();
}
let params = ListTasksParams {
page_size: Some(2),
..Default::default()
};
let resp1 = store.list(¶ms).await.unwrap();
assert_eq!(resp1.tasks.len(), 2);
let token = resp1.next_page_token.clone();
assert!(!token.is_empty(), "expected a next page token");
let params = ListTasksParams {
page_size: Some(2),
page_token: Some(token),
..Default::default()
};
let resp2 = store.list(¶ms).await.unwrap();
assert_eq!(resp2.tasks.len(), 2);
assert!(!resp2.next_page_token.is_empty());
let page1_ids: Vec<&str> = resp1.tasks.iter().map(|t| t.id.0.as_str()).collect();
let page2_ids: Vec<&str> = resp2.tasks.iter().map(|t| t.id.0.as_str()).collect();
for id in &page2_ids {
assert!(!page1_ids.contains(id), "page overlap detected for {id}");
}
let params = ListTasksParams {
page_size: Some(2),
page_token: Some(resp2.next_page_token.clone()),
..Default::default()
};
let resp3 = store.list(¶ms).await.unwrap();
assert_eq!(resp3.tasks.len(), 1);
assert!(
resp3.next_page_token.is_empty(),
"last page should have no token"
);
})
.await;
}
#[tokio::test]
async fn list_page_size_zero_uses_default() {
let store = new_store().await;
TenantContext::scope("acme", async {
for i in 1..=3 {
store
.save(make_task(&format!("t{i}"), "ctx1", TaskState::Submitted))
.await
.unwrap();
}
let params = ListTasksParams {
page_size: Some(0),
..Default::default()
};
let resp = store.list(¶ms).await.unwrap();
assert_eq!(resp.tasks.len(), 3);
assert!(resp.next_page_token.is_empty());
})
.await;
}
#[tokio::test]
async fn tenant_isolation_save_and_get() {
let store = new_store().await;
let task = make_task("t1", "ctx1", TaskState::Submitted);
TenantContext::scope("tenant-a", async {
store.save(task).await.unwrap();
})
.await;
TenantContext::scope("tenant-a", async {
let result = store.get(&TaskId::new("t1")).await.unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().id.0, "t1");
})
.await;
TenantContext::scope("tenant-b", async {
let result = store.get(&TaskId::new("t1")).await.unwrap();
assert!(result.is_none());
})
.await;
}
#[tokio::test]
async fn tenant_isolation_list() {
let store = new_store().await;
TenantContext::scope("tenant-a", async {
store
.save(make_task("a1", "ctx1", TaskState::Submitted))
.await
.unwrap();
store
.save(make_task("a2", "ctx1", TaskState::Working))
.await
.unwrap();
})
.await;
TenantContext::scope("tenant-b", async {
store
.save(make_task("b1", "ctx1", TaskState::Submitted))
.await
.unwrap();
})
.await;
TenantContext::scope("tenant-a", async {
let resp = store.list(&ListTasksParams::default()).await.unwrap();
assert_eq!(resp.tasks.len(), 2);
let ids: Vec<&str> = resp.tasks.iter().map(|t| t.id.0.as_str()).collect();
assert!(ids.contains(&"a1"));
assert!(ids.contains(&"a2"));
})
.await;
TenantContext::scope("tenant-b", async {
let resp = store.list(&ListTasksParams::default()).await.unwrap();
assert_eq!(resp.tasks.len(), 1);
assert_eq!(resp.tasks[0].id.0, "b1");
})
.await;
}
#[tokio::test]
async fn tenant_isolation_count() {
let store = new_store().await;
TenantContext::scope("tenant-a", async {
for i in 1..=3 {
store
.save(make_task(&format!("a{i}"), "ctx1", TaskState::Submitted))
.await
.unwrap();
}
assert_eq!(store.count().await.unwrap(), 3);
})
.await;
TenantContext::scope("tenant-b", async {
store
.save(make_task("b1", "ctx1", TaskState::Submitted))
.await
.unwrap();
assert_eq!(store.count().await.unwrap(), 1);
})
.await;
TenantContext::scope("tenant-a", async {
assert_eq!(store.count().await.unwrap(), 3);
})
.await;
}
#[tokio::test]
async fn tenant_isolation_delete() {
let store = new_store().await;
TenantContext::scope("tenant-a", async {
store
.save(make_task("t1", "ctx1", TaskState::Submitted))
.await
.unwrap();
})
.await;
TenantContext::scope("tenant-b", async {
store.delete(&TaskId::new("t1")).await.unwrap();
})
.await;
TenantContext::scope("tenant-a", async {
let result = store.get(&TaskId::new("t1")).await.unwrap();
assert!(result.is_some(), "tenant-b delete must not affect tenant-a");
})
.await;
}
#[tokio::test]
async fn tenant_isolation_insert_if_absent() {
let store = new_store().await;
let inserted_a = TenantContext::scope("tenant-a", async {
let task = make_task("t1", "ctx1", TaskState::Submitted);
store.insert_if_absent(task).await.unwrap()
})
.await;
assert!(inserted_a, "tenant-a first insert should return true");
let inserted_b = TenantContext::scope("tenant-b", async {
let task = make_task("t1", "ctx1", TaskState::Working);
store.insert_if_absent(task).await.unwrap()
})
.await;
assert!(
inserted_b,
"tenant-b insert of same task id should return true (different tenant)"
);
TenantContext::scope("tenant-a", async {
let task = store.get(&TaskId::new("t1")).await.unwrap().unwrap();
assert_eq!(task.status.state, TaskState::Submitted);
})
.await;
TenantContext::scope("tenant-b", async {
let task = store.get(&TaskId::new("t1")).await.unwrap().unwrap();
assert_eq!(task.status.state, TaskState::Working);
})
.await;
}