#![cfg(feature = "postgres")]
use a2a_protocol_server::push::{
PostgresPushConfigStore, PushConfigStore, TenantAwarePostgresPushConfigStore,
};
use a2a_protocol_server::store::tenant::TenantContext;
use a2a_protocol_server::store::{
PgMigrationRunner, PostgresTaskStore, TaskStore, TenantAwarePostgresTaskStore,
};
use a2a_protocol_types::error::A2aResult;
use a2a_protocol_types::params::ListTasksParams;
use a2a_protocol_types::push::TaskPushNotificationConfig;
use a2a_protocol_types::task::{ContextId, Task, TaskId, TaskState, TaskStatus};
const URL_ENV: &str = "A2A_TEST_POSTGRES_URL";
struct TestDb {
admin_url: String,
name: String,
url: String,
}
impl TestDb {
async fn create(prefix: &str) -> Self {
let admin_url = std::env::var(URL_ENV).unwrap_or_else(|_| {
panic!(
"{URL_ENV} must point at a live PostgreSQL server \
(e.g. postgres://postgres:postgres@localhost:5432/postgres) \
to run the ignored postgres integration tests"
)
});
let (base, admin_db) = admin_url
.rsplit_once('/')
.expect("admin URL must include a database path, e.g. .../postgres");
assert!(
!admin_db.is_empty() && !admin_db.contains('@'),
"admin URL must end in a database name, e.g. .../postgres"
);
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("clock before unix epoch")
.as_nanos();
let name = format!("a2a_test_{prefix}_{nanos}");
let admin = sqlx::postgres::PgPoolOptions::new()
.max_connections(1)
.connect(&admin_url)
.await
.expect("connect to admin database");
sqlx::query(&format!("CREATE DATABASE \"{name}\""))
.execute(&admin)
.await
.expect("create scratch database");
admin.close().await;
let url = format!("{base}/{name}");
Self {
admin_url,
name,
url,
}
}
async fn drop_db(self) {
let admin = sqlx::postgres::PgPoolOptions::new()
.max_connections(1)
.connect(&self.admin_url)
.await
.expect("connect to admin database");
sqlx::query(&format!(
"DROP DATABASE IF EXISTS \"{}\" WITH (FORCE)",
self.name
))
.execute(&admin)
.await
.expect("drop scratch database");
admin.close().await;
}
}
fn make_task(id: &str, context_id: &str) -> Task {
Task {
id: TaskId(id.to_string()),
context_id: ContextId(context_id.to_string()),
status: TaskStatus::new(TaskState::Submitted),
artifacts: None,
history: None,
metadata: None,
}
}
fn make_push_config(task_id: &str) -> TaskPushNotificationConfig {
TaskPushNotificationConfig {
task_id: task_id.to_string(),
id: None,
tenant: None,
url: "https://example.com/push".to_string(),
token: Some("tok".to_string()),
authentication: None,
}
}
#[tokio::test]
#[ignore = "requires a live PostgreSQL server (set A2A_TEST_POSTGRES_URL)"]
async fn task_save_and_get() -> A2aResult<()> {
let db = TestDb::create("save_get").await;
let store = PostgresTaskStore::with_migrations(&db.url)
.await
.expect("open postgres store");
let task = make_task("t1", "ctx1");
store.save(&task).await?;
let got = store.get(&TaskId("t1".into())).await?;
assert!(got.is_some());
let got = got.unwrap();
assert_eq!(got.id.0, "t1");
assert_eq!(got.context_id.0, "ctx1");
assert_eq!(got.status.state, TaskState::Submitted);
db.drop_db().await;
Ok(())
}
#[tokio::test]
#[ignore = "requires a live PostgreSQL server (set A2A_TEST_POSTGRES_URL)"]
async fn task_get_missing() -> A2aResult<()> {
let db = TestDb::create("get_missing").await;
let store = PostgresTaskStore::with_migrations(&db.url)
.await
.expect("open postgres store");
assert!(store.get(&TaskId("nope".into())).await?.is_none());
db.drop_db().await;
Ok(())
}
#[tokio::test]
#[ignore = "requires a live PostgreSQL server (set A2A_TEST_POSTGRES_URL)"]
async fn task_save_upsert() -> A2aResult<()> {
let db = TestDb::create("upsert").await;
let store = PostgresTaskStore::with_migrations(&db.url)
.await
.expect("open postgres store");
let mut task = make_task("t1", "ctx1");
store.save(&task).await?;
task.status = TaskStatus::new(TaskState::Working);
store.save(&task).await?;
let got = store.get(&TaskId("t1".into())).await?.unwrap();
assert_eq!(got.status.state, TaskState::Working);
db.drop_db().await;
Ok(())
}
#[tokio::test]
#[ignore = "requires a live PostgreSQL server (set A2A_TEST_POSTGRES_URL)"]
async fn task_insert_if_absent() -> A2aResult<()> {
let db = TestDb::create("insert_absent").await;
let store = PostgresTaskStore::with_migrations(&db.url)
.await
.expect("open postgres store");
let task = make_task("t1", "ctx1");
assert!(store.insert_if_absent(&task).await?);
assert!(!store.insert_if_absent(&task).await?);
db.drop_db().await;
Ok(())
}
#[tokio::test]
#[ignore = "requires a live PostgreSQL server (set A2A_TEST_POSTGRES_URL)"]
async fn task_delete() -> A2aResult<()> {
let db = TestDb::create("delete").await;
let store = PostgresTaskStore::with_migrations(&db.url)
.await
.expect("open postgres store");
store.save(&make_task("t1", "ctx1")).await?;
store.delete(&TaskId("t1".into())).await?;
assert!(store.get(&TaskId("t1".into())).await?.is_none());
db.drop_db().await;
Ok(())
}
#[tokio::test]
#[ignore = "requires a live PostgreSQL server (set A2A_TEST_POSTGRES_URL)"]
async fn task_count() -> A2aResult<()> {
let db = TestDb::create("count").await;
let store = PostgresTaskStore::with_migrations(&db.url)
.await
.expect("open postgres store");
assert_eq!(store.count().await?, 0);
store.save(&make_task("t1", "ctx1")).await?;
store.save(&make_task("t2", "ctx1")).await?;
assert_eq!(store.count().await?, 2);
db.drop_db().await;
Ok(())
}
#[tokio::test]
#[ignore = "requires a live PostgreSQL server (set A2A_TEST_POSTGRES_URL)"]
async fn task_list_basic() -> A2aResult<()> {
let db = TestDb::create("list_basic").await;
let store = PostgresTaskStore::with_migrations(&db.url)
.await
.expect("open postgres store");
store.save(&make_task("a", "ctx1")).await?;
store.save(&make_task("b", "ctx1")).await?;
store.save(&make_task("c", "ctx2")).await?;
let all = store.list(&ListTasksParams::default()).await?;
assert_eq!(all.tasks.len(), 3);
let filtered = store
.list(&ListTasksParams {
context_id: Some("ctx1".into()),
..Default::default()
})
.await?;
assert_eq!(filtered.tasks.len(), 2);
db.drop_db().await;
Ok(())
}
#[tokio::test]
#[ignore = "requires a live PostgreSQL server (set A2A_TEST_POSTGRES_URL)"]
async fn task_list_pagination() -> A2aResult<()> {
let db = TestDb::create("pagination").await;
let store = PostgresTaskStore::with_migrations(&db.url)
.await
.expect("open postgres store");
for i in 0..5 {
store.save(&make_task(&format!("t{i:02}"), "ctx")).await?;
}
let page1 = store
.list(&ListTasksParams {
page_size: Some(2),
..Default::default()
})
.await?;
assert_eq!(page1.tasks.len(), 2);
assert!(!page1.next_page_token.is_empty());
let page2 = store
.list(&ListTasksParams {
page_size: Some(2),
page_token: Some(page1.next_page_token),
..Default::default()
})
.await?;
assert_eq!(page2.tasks.len(), 2);
db.drop_db().await;
Ok(())
}
#[tokio::test]
#[ignore = "requires a live PostgreSQL server (set A2A_TEST_POSTGRES_URL)"]
async fn migrations_apply_in_order_and_are_idempotent() {
let db = TestDb::create("migrations").await;
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(2)
.connect(&db.url)
.await
.expect("connect to scratch database");
let runner = PgMigrationRunner::new(pool.clone());
assert_eq!(
runner.current_version().await.expect("current_version"),
0,
"fresh database starts at version 0"
);
assert_eq!(
runner
.pending_migrations()
.await
.expect("pending_migrations")
.len(),
2,
"both built-in migrations should be pending"
);
let applied = runner.run_pending().await.expect("run_pending");
assert_eq!(applied, vec![1, 2], "migrations apply in version order");
assert_eq!(runner.current_version().await.expect("current_version"), 2);
let reapplied = runner.run_pending().await.expect("run_pending again");
assert!(reapplied.is_empty(), "second run applies nothing");
let store = PostgresTaskStore::from_pool(pool)
.await
.expect("store from migrated pool");
store
.save(&make_task("t1", "ctx1"))
.await
.expect("save on migrated schema");
assert!(store
.get(&TaskId("t1".into()))
.await
.expect("get on migrated schema")
.is_some());
db.drop_db().await;
}
#[tokio::test]
#[ignore = "requires a live PostgreSQL server (set A2A_TEST_POSTGRES_URL)"]
async fn push_set_get_list_delete() -> A2aResult<()> {
let db = TestDb::create("push_crud").await;
let store = PostgresPushConfigStore::new(&db.url)
.await
.expect("open postgres push store");
let config = store.set(make_push_config("t1")).await?;
let id = config.id.clone().expect("id auto-generated");
let got = store.get("t1", &id).await?;
assert!(got.is_some());
assert_eq!(got.unwrap().task_id, "t1");
assert!(store.get("t1", "nope").await?.is_none());
store.set(make_push_config("t1")).await?;
store.set(make_push_config("t2")).await?;
assert_eq!(store.list("t1").await?.len(), 2);
assert_eq!(store.list("t2").await?.len(), 1);
store.delete("t1", &id).await?;
assert!(store.get("t1", &id).await?.is_none());
db.drop_db().await;
Ok(())
}
#[tokio::test]
#[ignore = "requires a live PostgreSQL server (set A2A_TEST_POSTGRES_URL)"]
async fn push_upsert() -> A2aResult<()> {
let db = TestDb::create("push_upsert").await;
let store = PostgresPushConfigStore::new(&db.url)
.await
.expect("open postgres push store");
let mut config = make_push_config("t1");
config.id = Some("fixed-id".into());
store.set(config.clone()).await?;
config.url = "https://example.com/v2".to_string();
store.set(config).await?;
let configs = store.list("t1").await?;
assert_eq!(configs.len(), 1);
assert_eq!(configs[0].url, "https://example.com/v2");
db.drop_db().await;
Ok(())
}
#[tokio::test]
#[ignore = "requires a live PostgreSQL server (set A2A_TEST_POSTGRES_URL)"]
async fn tenant_task_store_isolates_tenants() {
let db = TestDb::create("tenant_tasks").await;
let store = TenantAwarePostgresTaskStore::new(&db.url)
.await
.expect("open tenant postgres store");
TenantContext::scope("acme", async {
store
.save(&make_task("t1", "ctx1"))
.await
.expect("save under acme");
assert!(store
.get(&TaskId("t1".into()))
.await
.expect("get under acme")
.is_some());
})
.await;
TenantContext::scope("globex", async {
assert!(
store
.get(&TaskId("t1".into()))
.await
.expect("get under globex")
.is_none(),
"tenant globex must not see acme's task"
);
let list = store
.list(&ListTasksParams::default())
.await
.expect("list under globex");
assert!(list.tasks.is_empty(), "tenant globex must list no tasks");
})
.await;
db.drop_db().await;
}
#[tokio::test]
#[ignore = "requires a live PostgreSQL server (set A2A_TEST_POSTGRES_URL)"]
async fn tenant_push_store_isolates_tenants() {
let db = TestDb::create("tenant_push").await;
let store = TenantAwarePostgresPushConfigStore::new(&db.url)
.await
.expect("open tenant postgres push store");
let id = TenantContext::scope("acme", async {
let saved = store
.set(make_push_config("task-1"))
.await
.expect("set under acme");
let id = saved.id.expect("id auto-generated");
assert!(store
.get("task-1", &id)
.await
.expect("get under acme")
.is_some());
id
})
.await;
TenantContext::scope("globex", async {
assert!(
store
.get("task-1", &id)
.await
.expect("get under globex")
.is_none(),
"tenant globex must not see acme's push config"
);
assert!(
store
.list("task-1")
.await
.expect("list under globex")
.is_empty(),
"tenant globex must list no push configs"
);
})
.await;
db.drop_db().await;
}