use turul_a2a_types::{Artifact, Message, Part, Role, Task, TaskState, TaskStatus};
use crate::streaming::StreamEvent;
use super::atomic::A2aAtomicStore;
use super::event_store::A2aEventStore;
use super::filter::TaskFilter;
use super::traits::{A2aPushNotificationStorage, A2aTaskStorage};
fn make_task(task_id: &str, context_id: &str) -> Task {
Task::new(task_id, TaskStatus::new(TaskState::Submitted)).with_context_id(context_id)
}
fn make_message(id: &str, text: &str) -> Message {
Message::new(id, Role::User, vec![Part::text(text)])
}
fn make_artifact(id: &str, text: &str) -> Artifact {
Artifact::new(id, vec![Part::text(text)])
}
pub async fn test_create_and_retrieve(storage: &dyn A2aTaskStorage) {
let task = make_task("parity-crud-1", "ctx-1");
let created = storage.create_task("default", "owner-a", task).await.unwrap();
assert_eq!(created.id(), "parity-crud-1");
assert_eq!(created.context_id(), "ctx-1");
let fetched = storage
.get_task("default", "parity-crud-1", "owner-a", None)
.await
.unwrap();
assert!(fetched.is_some());
let fetched = fetched.unwrap();
assert_eq!(fetched.id(), "parity-crud-1");
let missing = storage
.get_task("default", "nonexistent", "owner-a", None)
.await
.unwrap();
assert!(missing.is_none());
assert!(storage.delete_task("default", "parity-crud-1", "owner-a").await.unwrap());
assert!(!storage.delete_task("default", "parity-crud-1", "owner-a").await.unwrap());
}
pub async fn test_state_machine_enforcement(storage: &dyn A2aTaskStorage) {
let task = make_task("sm-1", "ctx-sm");
storage.create_task("default", "owner-a", task).await.unwrap();
let updated = storage
.update_task_status("default", "sm-1", "owner-a", TaskStatus::new(TaskState::Working))
.await
.unwrap();
assert_eq!(updated.status().unwrap().state().unwrap(), TaskState::Working);
storage
.update_task_status("default", "sm-1", "owner-a", TaskStatus::new(TaskState::InputRequired))
.await
.unwrap();
storage
.update_task_status("default", "sm-1", "owner-a", TaskStatus::new(TaskState::Working))
.await
.unwrap();
storage
.update_task_status("default", "sm-1", "owner-a", TaskStatus::new(TaskState::Completed))
.await
.unwrap();
}
pub async fn test_terminal_state_rejection(storage: &dyn A2aTaskStorage) {
for (i, terminal) in [
TaskState::Completed,
TaskState::Failed,
TaskState::Canceled,
TaskState::Rejected,
]
.iter()
.enumerate()
{
let id = format!("term-{i}");
let task = make_task(&id, "ctx-term");
storage.create_task("default", "owner-a", task).await.unwrap();
storage
.update_task_status("default", &id, "owner-a", TaskStatus::new(TaskState::Working))
.await
.unwrap();
if *terminal == TaskState::Rejected {
let id2 = format!("term-rej-{i}");
storage.create_task("default", "owner-a", make_task(&id2, "ctx-term")).await.unwrap();
storage
.update_task_status("default", &id2, "owner-a", TaskStatus::new(TaskState::Rejected))
.await
.unwrap();
let result = storage
.update_task_status("default", &id2, "owner-a", TaskStatus::new(TaskState::Working))
.await;
assert!(result.is_err(), "Terminal {terminal:?} should reject transitions");
} else {
storage
.update_task_status("default", &id, "owner-a", TaskStatus::new(*terminal))
.await
.unwrap();
let result = storage
.update_task_status("default", &id, "owner-a", TaskStatus::new(TaskState::Working))
.await;
assert!(result.is_err(), "Terminal {terminal:?} should reject transitions");
}
}
}
pub async fn test_tenant_isolation(storage: &dyn A2aTaskStorage) {
storage.create_task("tenant-a", "owner", make_task("ti-1", "ctx")).await.unwrap();
storage.create_task("tenant-b", "owner", make_task("ti-2", "ctx")).await.unwrap();
let result = storage.get_task("tenant-a", "ti-2", "owner", None).await.unwrap();
assert!(result.is_none(), "Tenant A should not see Tenant B's task");
let result = storage.get_task("tenant-b", "ti-1", "owner", None).await.unwrap();
assert!(result.is_none(), "Tenant B should not see Tenant A's task");
let page_a = storage
.list_tasks(TaskFilter {
tenant: Some("tenant-a".to_string()),
owner: Some("owner".to_string()),
..Default::default()
})
.await
.unwrap();
assert_eq!(page_a.total_size, 1);
assert_eq!(page_a.tasks[0].id(), "ti-1");
assert!(!storage.delete_task("tenant-a", "ti-2", "owner").await.unwrap());
}
pub async fn test_owner_isolation(storage: &dyn A2aTaskStorage) {
storage.create_task("default", "alice", make_task("oi-1", "ctx")).await.unwrap();
storage.create_task("default", "bob", make_task("oi-2", "ctx")).await.unwrap();
let result = storage.get_task("default", "oi-2", "alice", None).await.unwrap();
assert!(result.is_none());
assert!(!storage.delete_task("default", "oi-2", "alice").await.unwrap());
let result = storage
.update_task_status("default", "oi-2", "alice", TaskStatus::new(TaskState::Working))
.await;
assert!(result.is_err());
}
pub async fn test_history_length(storage: &dyn A2aTaskStorage) {
storage.create_task("default", "owner", make_task("hl-1", "ctx")).await.unwrap();
for i in 0..5 {
storage
.append_message("default", "hl-1", "owner", make_message(&format!("m-{i}"), &format!("msg {i}")))
.await
.unwrap();
}
let task = storage.get_task("default", "hl-1", "owner", Some(0)).await.unwrap().unwrap();
assert!(task.history().is_empty(), "history_length=0 should return empty history");
let task = storage.get_task("default", "hl-1", "owner", None).await.unwrap().unwrap();
assert_eq!(task.history().len(), 5, "history_length=None should return all");
let task = storage.get_task("default", "hl-1", "owner", Some(2)).await.unwrap().unwrap();
assert_eq!(task.history().len(), 2, "history_length=2 should return 2");
assert_eq!(task.history()[0].message_id, "m-3");
assert_eq!(task.history()[1].message_id, "m-4");
}
pub async fn test_list_pagination(storage: &dyn A2aTaskStorage) {
for i in 0..7 {
storage
.create_task("default", "owner", make_task(&format!("pg-{i}"), "ctx-pg"))
.await
.unwrap();
}
let mut all_ids = Vec::new();
let mut page_token = None;
loop {
let page = storage
.list_tasks(TaskFilter {
tenant: Some("default".to_string()),
owner: Some("owner".to_string()),
context_id: Some("ctx-pg".to_string()),
page_size: Some(3),
page_token: page_token.clone(),
..Default::default()
})
.await
.unwrap();
assert_eq!(page.total_size, 7, "total_size should be 7 on every page");
assert!(page.tasks.len() <= 3, "page should have at most 3 tasks");
all_ids.extend(page.tasks.iter().map(|t| t.id().to_string()));
if page.next_page_token.is_empty() {
break;
}
page_token = Some(page.next_page_token);
}
assert_eq!(all_ids.len(), 7, "should collect all 7 tasks across pages");
let unique: std::collections::HashSet<_> = all_ids.iter().collect();
assert_eq!(unique.len(), 7);
}
pub async fn test_list_filter_by_status(storage: &dyn A2aTaskStorage) {
for i in 0..3 {
storage
.create_task("default", "owner", make_task(&format!("fs-{i}"), "ctx-fs"))
.await
.unwrap();
}
storage.update_task_status("default", "fs-0", "owner", TaskStatus::new(TaskState::Working)).await.unwrap();
storage.update_task_status("default", "fs-2", "owner", TaskStatus::new(TaskState::Working)).await.unwrap();
let page = storage
.list_tasks(TaskFilter {
tenant: Some("default".to_string()),
owner: Some("owner".to_string()),
status: Some(TaskState::Working),
..Default::default()
})
.await
.unwrap();
assert_eq!(page.total_size, 2);
}
pub async fn test_list_filter_by_context_id(storage: &dyn A2aTaskStorage) {
storage.create_task("default", "owner", make_task("fc-1", "ctx-alpha")).await.unwrap();
storage.create_task("default", "owner", make_task("fc-2", "ctx-beta")).await.unwrap();
storage.create_task("default", "owner", make_task("fc-3", "ctx-alpha")).await.unwrap();
let page = storage
.list_tasks(TaskFilter {
tenant: Some("default".to_string()),
owner: Some("owner".to_string()),
context_id: Some("ctx-alpha".to_string()),
..Default::default()
})
.await
.unwrap();
assert_eq!(page.total_size, 2);
}
pub async fn test_append_message(storage: &dyn A2aTaskStorage) {
storage.create_task("default", "owner", make_task("am-1", "ctx")).await.unwrap();
storage.append_message("default", "am-1", "owner", make_message("m-1", "first")).await.unwrap();
storage.append_message("default", "am-1", "owner", make_message("m-2", "second")).await.unwrap();
let task = storage.get_task("default", "am-1", "owner", None).await.unwrap().unwrap();
assert_eq!(task.history().len(), 2);
assert_eq!(task.history()[0].message_id, "m-1");
assert_eq!(task.history()[1].message_id, "m-2");
}
pub async fn test_append_artifact(storage: &dyn A2aTaskStorage) {
storage.create_task("default", "owner", make_task("aa-1", "ctx")).await.unwrap();
storage
.append_artifact("default", "aa-1", "owner", make_artifact("art-1", "chunk1"), false, false)
.await
.unwrap();
let task = storage.get_task("default", "aa-1", "owner", None).await.unwrap().unwrap();
assert_eq!(task.artifacts().len(), 1);
assert_eq!(task.artifacts()[0].artifact_id, "art-1");
}
pub async fn test_owner_isolation_mutations(storage: &dyn A2aTaskStorage) {
storage.create_task("default", "alice", make_task("oim-1", "ctx")).await.unwrap();
let result = storage
.append_message("default", "oim-1", "bob", make_message("m-bad", "nope"))
.await;
assert!(result.is_err(), "Bob should not append message to Alice's task");
let result = storage
.append_artifact("default", "oim-1", "bob", make_artifact("a-bad", "nope"), false, false)
.await;
assert!(result.is_err(), "Bob should not append artifact to Alice's task");
storage
.append_message("default", "oim-1", "alice", make_message("m-ok", "yes"))
.await
.unwrap();
storage
.append_artifact("default", "oim-1", "alice", make_artifact("a-ok", "yes"), false, false)
.await
.unwrap();
}
pub async fn test_artifact_chunk_semantics(storage: &dyn A2aTaskStorage) {
storage.create_task("default", "owner", make_task("acs-1", "ctx")).await.unwrap();
storage
.append_artifact("default", "acs-1", "owner", make_artifact("art-1", "chunk-1"), false, false)
.await
.unwrap();
let task = storage.get_task("default", "acs-1", "owner", None).await.unwrap().unwrap();
assert_eq!(task.artifacts().len(), 1);
assert_eq!(task.artifacts()[0].parts.len(), 1);
storage
.append_artifact("default", "acs-1", "owner", make_artifact("art-1", "chunk-2"), true, false)
.await
.unwrap();
let task = storage.get_task("default", "acs-1", "owner", None).await.unwrap().unwrap();
assert_eq!(task.artifacts().len(), 1, "should still be 1 artifact");
assert_eq!(task.artifacts()[0].parts.len(), 2, "should have 2 parts after append");
storage
.append_artifact("default", "acs-1", "owner", make_artifact("art-1", "chunk-3"), true, true)
.await
.unwrap();
let task = storage.get_task("default", "acs-1", "owner", None).await.unwrap().unwrap();
assert_eq!(task.artifacts()[0].parts.len(), 3, "should have 3 parts total");
storage
.append_artifact("default", "acs-1", "owner", make_artifact("art-2", "separate"), false, true)
.await
.unwrap();
let task = storage.get_task("default", "acs-1", "owner", None).await.unwrap().unwrap();
assert_eq!(task.artifacts().len(), 2, "should have 2 distinct artifacts");
}
pub async fn test_task_count(storage: &dyn A2aTaskStorage) {
let initial = storage.task_count().await.unwrap();
storage.create_task("default", "owner", make_task("tc-1", "ctx")).await.unwrap();
storage.create_task("default", "owner", make_task("tc-2", "ctx")).await.unwrap();
assert_eq!(storage.task_count().await.unwrap(), initial + 2);
storage.delete_task("default", "tc-1", "owner").await.unwrap();
assert_eq!(storage.task_count().await.unwrap(), initial + 1);
}
pub async fn test_push_config_crud(storage: &dyn A2aPushNotificationStorage) {
let config = turul_a2a_proto::TaskPushNotificationConfig {
tenant: String::new(),
id: String::new(), task_id: "task-1".to_string(),
url: "https://example.com/webhook".to_string(),
token: "tok-123".to_string(),
authentication: None,
};
let created = storage.create_config("default", config).await.unwrap();
assert!(!created.id.is_empty(), "server should generate config id");
assert_eq!(created.task_id, "task-1");
let fetched = storage
.get_config("default", "task-1", &created.id)
.await
.unwrap();
assert!(fetched.is_some());
let missing = storage.get_config("default", "task-1", "nope").await.unwrap();
assert!(missing.is_none());
}
pub async fn test_push_config_idempotent_delete(storage: &dyn A2aPushNotificationStorage) {
let config = turul_a2a_proto::TaskPushNotificationConfig {
tenant: String::new(),
id: String::new(),
task_id: "task-del".to_string(),
url: "https://example.com/hook".to_string(),
token: String::new(),
authentication: None,
};
let created = storage.create_config("default", config).await.unwrap();
storage.delete_config("default", "task-del", &created.id).await.unwrap();
storage.delete_config("default", "task-del", &created.id).await.unwrap();
assert!(storage.get_config("default", "task-del", &created.id).await.unwrap().is_none());
}
pub async fn test_push_config_list_pagination(storage: &dyn A2aPushNotificationStorage) {
for i in 0..5 {
let config = turul_a2a_proto::TaskPushNotificationConfig {
tenant: String::new(),
id: String::new(),
task_id: "task-pg".to_string(),
url: format!("https://example.com/hook-{i}"),
token: String::new(),
authentication: None,
};
storage.create_config("default", config).await.unwrap();
}
let mut all_ids = Vec::new();
let mut page_token = None;
loop {
let page = storage
.list_configs("default", "task-pg", page_token.as_deref(), Some(2))
.await
.unwrap();
assert!(page.configs.len() <= 2);
all_ids.extend(page.configs.iter().map(|c| c.id.clone()));
if page.next_page_token.is_empty() {
break;
}
page_token = Some(page.next_page_token);
}
assert_eq!(all_ids.len(), 5, "should collect all 5 configs across pages");
let unique: std::collections::HashSet<_> = all_ids.iter().collect();
assert_eq!(unique.len(), 5, "no duplicate configs");
}
pub async fn test_push_config_tenant_isolation(storage: &dyn A2aPushNotificationStorage) {
let config = turul_a2a_proto::TaskPushNotificationConfig {
tenant: String::new(),
id: String::new(),
task_id: "task-iso".to_string(),
url: "https://example.com/hook".to_string(),
token: String::new(),
authentication: None,
};
let created = storage.create_config("tenant-a", config).await.unwrap();
let result = storage
.get_config("tenant-b", "task-iso", &created.id)
.await
.unwrap();
assert!(result.is_none());
}
fn make_status_event(state: &str) -> StreamEvent {
StreamEvent::StatusUpdate {
status_update: crate::streaming::StatusUpdatePayload {
task_id: String::new(),
context_id: String::new(),
status: serde_json::json!({"state": state}),
},
}
}
pub async fn test_event_append_and_retrieve(storage: &dyn A2aEventStore) {
let seq1 = storage
.append_event("default", "evt-1", make_status_event("WORKING"))
.await
.unwrap();
assert_eq!(seq1, 1);
let seq2 = storage
.append_event("default", "evt-1", make_status_event("COMPLETED"))
.await
.unwrap();
assert_eq!(seq2, 2);
let events = storage
.get_events_after("default", "evt-1", 0)
.await
.unwrap();
assert_eq!(events.len(), 2);
assert_eq!(events[0].0, 1);
assert_eq!(events[1].0, 2);
let events = storage
.get_events_after("default", "evt-1", 1)
.await
.unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].0, 2);
let events = storage
.get_events_after("default", "evt-1", 2)
.await
.unwrap();
assert!(events.is_empty());
}
pub async fn test_event_monotonic_ordering(storage: &dyn A2aEventStore) {
for i in 1..=5 {
let seq = storage
.append_event("default", "ord-1", make_status_event(&format!("state-{i}")))
.await
.unwrap();
assert_eq!(seq, i as u64);
}
let events = storage
.get_events_after("default", "ord-1", 0)
.await
.unwrap();
for (i, (seq, _)) in events.iter().enumerate() {
assert_eq!(*seq, (i + 1) as u64);
}
}
pub async fn test_event_per_task_isolation(storage: &dyn A2aEventStore) {
storage.append_event("default", "iso-a", make_status_event("A1")).await.unwrap();
storage.append_event("default", "iso-a", make_status_event("A2")).await.unwrap();
storage.append_event("default", "iso-b", make_status_event("B1")).await.unwrap();
let a_events = storage.get_events_after("default", "iso-a", 0).await.unwrap();
assert_eq!(a_events.len(), 2);
let b_events = storage.get_events_after("default", "iso-b", 0).await.unwrap();
assert_eq!(b_events.len(), 1);
assert_eq!(a_events[0].0, 1);
assert_eq!(b_events[0].0, 1);
}
pub async fn test_event_tenant_isolation(storage: &dyn A2aEventStore) {
storage.append_event("tenant-x", "iso-t", make_status_event("X")).await.unwrap();
storage.append_event("tenant-y", "iso-t", make_status_event("Y")).await.unwrap();
let x_events = storage.get_events_after("tenant-x", "iso-t", 0).await.unwrap();
assert_eq!(x_events.len(), 1);
let y_events = storage.get_events_after("tenant-y", "iso-t", 0).await.unwrap();
assert_eq!(y_events.len(), 1);
}
pub async fn test_event_latest_sequence(storage: &dyn A2aEventStore) {
assert_eq!(storage.latest_sequence("default", "seq-t").await.unwrap(), 0);
storage.append_event("default", "seq-t", make_status_event("S1")).await.unwrap();
assert_eq!(storage.latest_sequence("default", "seq-t").await.unwrap(), 1);
storage.append_event("default", "seq-t", make_status_event("S2")).await.unwrap();
storage.append_event("default", "seq-t", make_status_event("S3")).await.unwrap();
assert_eq!(storage.latest_sequence("default", "seq-t").await.unwrap(), 3);
}
pub async fn test_event_empty_task(storage: &dyn A2aEventStore) {
let events = storage.get_events_after("default", "nonexistent", 0).await.unwrap();
assert!(events.is_empty());
assert_eq!(storage.latest_sequence("default", "nonexistent").await.unwrap(), 0);
}
fn make_status_event_for(task_id: &str, context_id: &str, state: &str) -> StreamEvent {
StreamEvent::StatusUpdate {
status_update: crate::streaming::StatusUpdatePayload {
task_id: task_id.to_string(),
context_id: context_id.to_string(),
status: serde_json::json!({"state": state}),
},
}
}
pub async fn test_atomic_create_task_with_events(
atomic: &dyn A2aAtomicStore,
tasks: &dyn A2aTaskStorage,
events: &dyn A2aEventStore,
) {
let task = make_task("at-create-1", "ctx-1");
let evts = vec![
make_status_event_for("at-create-1", "ctx-1", "TASK_STATE_SUBMITTED"),
];
let (created, seqs) = atomic
.create_task_with_events("default", "owner-1", task, evts)
.await
.unwrap();
assert_eq!(created.id(), "at-create-1");
assert_eq!(seqs.len(), 1);
assert_eq!(seqs[0], 1);
let fetched = tasks
.get_task("default", "at-create-1", "owner-1", None)
.await
.unwrap()
.expect("Task should exist after atomic create");
assert_eq!(fetched.id(), "at-create-1");
let stored_events = events
.get_events_after("default", "at-create-1", 0)
.await
.unwrap();
assert_eq!(stored_events.len(), 1);
assert_eq!(stored_events[0].0, 1);
}
pub async fn test_atomic_update_status_with_events(
atomic: &dyn A2aAtomicStore,
tasks: &dyn A2aTaskStorage,
events: &dyn A2aEventStore,
) {
let task = make_task("at-status-1", "ctx-1");
tasks.create_task("default", "owner-1", task).await.unwrap();
let evts = vec![
make_status_event_for("at-status-1", "ctx-1", "TASK_STATE_WORKING"),
];
let (updated, seqs) = atomic
.update_task_status_with_events(
"default", "at-status-1", "owner-1",
TaskStatus::new(TaskState::Working), evts,
)
.await
.unwrap();
assert_eq!(seqs.len(), 1);
assert_eq!(updated.status().unwrap().state().unwrap(), TaskState::Working);
let fetched = tasks
.get_task("default", "at-status-1", "owner-1", None)
.await
.unwrap()
.unwrap();
assert_eq!(fetched.status().unwrap().state().unwrap(), TaskState::Working);
let stored_events = events
.get_events_after("default", "at-status-1", 0)
.await
.unwrap();
assert_eq!(stored_events.len(), 1);
}
pub async fn test_atomic_status_rejects_invalid_transition(
atomic: &dyn A2aAtomicStore,
tasks: &dyn A2aTaskStorage,
events: &dyn A2aEventStore,
) {
let task = make_task("at-invalid-1", "ctx-1");
tasks.create_task("default", "owner-1", task).await.unwrap();
tasks
.update_task_status("default", "at-invalid-1", "owner-1", TaskStatus::new(TaskState::Working))
.await
.unwrap();
tasks
.update_task_status("default", "at-invalid-1", "owner-1", TaskStatus::new(TaskState::Completed))
.await
.unwrap();
let evts = vec![
make_status_event_for("at-invalid-1", "ctx-1", "TASK_STATE_WORKING"),
];
let result = atomic
.update_task_status_with_events(
"default", "at-invalid-1", "owner-1",
TaskStatus::new(TaskState::Working), evts,
)
.await;
assert!(result.is_err(), "Invalid transition should fail");
let fetched = tasks
.get_task("default", "at-invalid-1", "owner-1", None)
.await
.unwrap()
.unwrap();
assert_eq!(fetched.status().unwrap().state().unwrap(), TaskState::Completed);
let stored_events = events
.get_events_after("default", "at-invalid-1", 0)
.await
.unwrap();
assert!(stored_events.is_empty(), "No events should be written on failed atomic op");
}
pub async fn test_atomic_update_task_with_events(
atomic: &dyn A2aAtomicStore,
tasks: &dyn A2aTaskStorage,
events: &dyn A2aEventStore,
) {
let task = make_task("at-update-1", "ctx-1");
tasks.create_task("default", "owner-1", task).await.unwrap();
let mut updated_task = tasks
.get_task("default", "at-update-1", "owner-1", None)
.await
.unwrap()
.unwrap();
updated_task.set_status(TaskStatus::new(TaskState::Working));
updated_task.push_text_artifact("art-1", "Result", "some output");
updated_task.complete();
let evts = vec![
make_status_event_for("at-update-1", "ctx-1", "TASK_STATE_WORKING"),
make_status_event_for("at-update-1", "ctx-1", "TASK_STATE_COMPLETED"),
];
let seqs = atomic
.update_task_with_events("default", "owner-1", updated_task, evts)
.await
.unwrap();
assert_eq!(seqs.len(), 2);
assert_eq!(seqs[0], 1);
assert_eq!(seqs[1], 2);
let fetched = tasks
.get_task("default", "at-update-1", "owner-1", None)
.await
.unwrap()
.unwrap();
assert_eq!(fetched.status().unwrap().state().unwrap(), TaskState::Completed);
assert!(!fetched.artifacts().is_empty());
let stored_events = events
.get_events_after("default", "at-update-1", 0)
.await
.unwrap();
assert_eq!(stored_events.len(), 2);
}
pub async fn test_atomic_owner_isolation(
atomic: &dyn A2aAtomicStore,
tasks: &dyn A2aTaskStorage,
) {
let task = make_task("at-owner-1", "ctx-1");
tasks.create_task("default", "alice", task).await.unwrap();
let evts = vec![
make_status_event_for("at-owner-1", "ctx-1", "TASK_STATE_WORKING"),
];
let result = atomic
.update_task_status_with_events(
"default", "at-owner-1", "bob",
TaskStatus::new(TaskState::Working), evts,
)
.await;
assert!(result.is_err(), "Wrong owner should fail");
let mut fake_task = make_task("at-owner-1", "ctx-1");
fake_task.set_status(TaskStatus::new(TaskState::Working));
let evts = vec![
make_status_event_for("at-owner-1", "ctx-1", "TASK_STATE_WORKING"),
];
let result = atomic
.update_task_with_events("default", "bob", fake_task, evts)
.await;
assert!(result.is_err(), "Wrong owner should fail for update_task_with_events");
}
pub async fn test_atomic_tenant_isolation(
atomic: &dyn A2aAtomicStore,
tasks: &dyn A2aTaskStorage,
events: &dyn A2aEventStore,
) {
let task_a = make_task("at-tenant-1", "ctx-1");
let task_b = make_task("at-tenant-1", "ctx-1");
atomic
.create_task_with_events(
"tenant-a", "owner-1", task_a,
vec![make_status_event_for("at-tenant-1", "ctx-1", "SUBMITTED_A")],
)
.await
.unwrap();
atomic
.create_task_with_events(
"tenant-b", "owner-1", task_b,
vec![make_status_event_for("at-tenant-1", "ctx-1", "SUBMITTED_B")],
)
.await
.unwrap();
let a_events = events.get_events_after("tenant-a", "at-tenant-1", 0).await.unwrap();
let b_events = events.get_events_after("tenant-b", "at-tenant-1", 0).await.unwrap();
assert_eq!(a_events.len(), 1);
assert_eq!(b_events.len(), 1);
let a_task = tasks.get_task("tenant-a", "at-tenant-1", "owner-1", None).await.unwrap();
let b_task = tasks.get_task("tenant-b", "at-tenant-1", "owner-1", None).await.unwrap();
assert!(a_task.is_some());
assert!(b_task.is_some());
let cross = tasks.get_task("tenant-a", "at-tenant-1", "owner-1", None).await.unwrap();
assert!(cross.is_some()); let wrong_tenant_events = events.get_events_after("tenant-c", "at-tenant-1", 0).await.unwrap();
assert!(wrong_tenant_events.is_empty()); }
pub async fn test_atomic_create_with_empty_events(
atomic: &dyn A2aAtomicStore,
tasks: &dyn A2aTaskStorage,
events: &dyn A2aEventStore,
) {
let task = make_task("at-empty-1", "ctx-1");
let (created, seqs) = atomic
.create_task_with_events("default", "owner-1", task, vec![])
.await
.unwrap();
assert_eq!(created.id(), "at-empty-1");
assert!(seqs.is_empty());
let fetched = tasks.get_task("default", "at-empty-1", "owner-1", None).await.unwrap();
assert!(fetched.is_some());
let stored = events.get_events_after("default", "at-empty-1", 0).await.unwrap();
assert!(stored.is_empty());
}
pub async fn test_atomic_sequence_continuity(
atomic: &dyn A2aAtomicStore,
events: &dyn A2aEventStore,
) {
let task = make_task("at-seq-1", "ctx-1");
let (_, seqs) = atomic
.create_task_with_events(
"default", "owner-1", task,
vec![make_status_event_for("at-seq-1", "ctx-1", "SUBMITTED")],
)
.await
.unwrap();
assert_eq!(seqs, vec![1]);
let seq = events
.append_event("default", "at-seq-1", make_status_event("WORKING"))
.await
.unwrap();
assert_eq!(seq, 2);
let mut task2 = make_task("at-seq-1", "ctx-1");
task2.set_status(TaskStatus::new(TaskState::Working));
task2.complete();
let seqs2 = atomic
.update_task_with_events(
"default", "owner-1", task2,
vec![make_status_event_for("at-seq-1", "ctx-1", "COMPLETED")],
)
.await
.unwrap();
assert_eq!(seqs2, vec![3]);
let all = events.get_events_after("default", "at-seq-1", 0).await.unwrap();
assert_eq!(all.len(), 3);
assert_eq!(all[0].0, 1);
assert_eq!(all[1].0, 2);
assert_eq!(all[2].0, 3);
}
pub async fn test_atomic_concurrent_sequence_integrity<S>(storage: std::sync::Arc<S>)
where
S: A2aAtomicStore + A2aEventStore + A2aTaskStorage + Send + Sync + 'static,
{
let task = make_task("at-conc-1", "ctx-1");
storage
.create_task("default", "owner-1", task)
.await
.unwrap();
let total_atomic = 10usize;
let total_non_atomic = 10usize;
let mut handles = Vec::new();
for _ in 0..total_atomic {
let s = storage.clone();
handles.push(tokio::spawn(async move {
let task = s
.get_task("default", "at-conc-1", "owner-1", None)
.await
.unwrap()
.unwrap();
let evts = vec![make_status_event("atomic")];
s.update_task_with_events("default", "owner-1", task, evts)
.await
.unwrap()
}));
}
for _ in 0..total_non_atomic {
let s = storage.clone();
handles.push(tokio::spawn(async move {
let seq = s
.append_event("default", "at-conc-1", make_status_event("non-atomic"))
.await
.unwrap();
vec![seq]
}));
}
let mut all_seqs = Vec::new();
for handle in handles {
let seqs = handle.await.unwrap();
all_seqs.extend(seqs);
}
all_seqs.sort();
let before_dedup = all_seqs.len();
all_seqs.dedup();
assert_eq!(
all_seqs.len(),
before_dedup,
"All sequences must be unique — found duplicates"
);
assert_eq!(
all_seqs.len(),
total_atomic + total_non_atomic,
"Expected {} events, got {}",
total_atomic + total_non_atomic,
all_seqs.len()
);
let stored = storage
.get_events_after("default", "at-conc-1", 0)
.await
.unwrap();
assert_eq!(stored.len(), total_atomic + total_non_atomic);
for window in stored.windows(2) {
assert!(
window[0].0 < window[1].0,
"Events must be monotonically ordered in store: {} >= {}",
window[0].0,
window[1].0,
);
}
}