use std::sync::Arc;
use lash_core::testing::conformance::{
ReopenableHostEventStore, ReopenableLashlangArtifactStore, ReopenableProcessRegistry,
ReopenableRuntimePersistence,
};
use lash_core::{DurabilityTier, HostEventStore, ProcessRegistry, RuntimePersistence};
use lash_postgres_store::PostgresStorage;
fn database_url() -> Option<String> {
std::env::var("LASH_POSTGRES_DATABASE_URL").ok()
}
fn sync_await<T: Send + 'static>(
future: impl std::future::Future<Output = T> + Send + 'static,
) -> T {
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(future))
}
async fn storage() -> Option<PostgresStorage> {
let url = database_url()?;
Some(
PostgresStorage::connect(&url)
.await
.expect("connect postgres"),
)
}
async fn reset(storage: &PostgresStorage) {
let pool = storage.pool();
sqlx::query(
r#"
TRUNCATE
lash_host_event_deliveries,
lash_host_event_occurrences,
lash_host_event_trigger_subscriptions,
lash_process_wake_acks,
lash_process_handle_grants,
lash_process_leases,
lash_process_events,
lash_processes,
lash_queued_work_items,
lash_queued_work_batches,
lash_runtime_turn_commits,
lash_session_meta,
lash_usage_deltas,
lash_graph_nodes,
lash_sessions,
lash_attachment_manifest,
lash_lashlang_artifacts,
lash_blobs
RESTART IDENTITY CASCADE
"#,
)
.execute(pool)
.await
.expect("reset postgres conformance tables");
sqlx::query("ALTER SEQUENCE lash_host_event_subscription_seq RESTART WITH 1")
.execute(pool)
.await
.expect("reset postgres host-event subscription sequence");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn postgres_runtime_persistence_satisfies_conformance_when_configured() {
let Some(storage) = storage().await else {
eprintln!("skipping Postgres conformance: LASH_POSTGRES_DATABASE_URL is not set");
return;
};
let storage = Arc::new(storage);
lash_core::testing::conformance::runtime_persistence_reopenable(|| {
let storage = Arc::clone(&storage);
sync_await(async move {
reset(&storage).await;
let open = Arc::new(storage.unbound_session_store()) as Arc<dyn RuntimePersistence>;
let reopen = Arc::new(storage.unbound_session_store()) as Arc<dyn RuntimePersistence>;
ReopenableRuntimePersistence { open, reopen }
})
})
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn postgres_process_registry_satisfies_conformance_when_configured() {
let Some(storage) = storage().await else {
eprintln!("skipping Postgres process conformance: LASH_POSTGRES_DATABASE_URL is not set");
return;
};
let storage = Arc::new(storage);
lash_core::testing::conformance::process_registry_reopenable(|| {
let storage = Arc::clone(&storage);
sync_await(async move {
reset(&storage).await;
let open = Arc::new(storage.process_registry()) as Arc<dyn ProcessRegistry>;
let reopen = Arc::new(storage.process_registry()) as Arc<dyn ProcessRegistry>;
ReopenableProcessRegistry { open, reopen }
})
})
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn postgres_host_event_store_satisfies_conformance_when_configured() {
let Some(storage) = storage().await else {
eprintln!(
"skipping Postgres host-event conformance: LASH_POSTGRES_DATABASE_URL is not set"
);
return;
};
let storage = Arc::new(storage);
lash_core::testing::conformance::host_event_store_reopenable(
|| {
let storage = Arc::clone(&storage);
sync_await(async move {
reset(&storage).await;
let open = Arc::new(storage.host_event_store()) as Arc<dyn HostEventStore>;
let reopen = Arc::new(storage.host_event_store()) as Arc<dyn HostEventStore>;
ReopenableHostEventStore { open, reopen }
})
},
DurabilityTier::Durable,
)
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn postgres_lashlang_artifact_store_satisfies_conformance_when_configured() {
let Some(storage) = storage().await else {
eprintln!("skipping Postgres artifact conformance: LASH_POSTGRES_DATABASE_URL is not set");
return;
};
let storage = Arc::new(storage);
lash_core::testing::conformance::lashlang_artifact_store_reopenable(
|| {
let storage = Arc::clone(&storage);
sync_await(async move {
reset(&storage).await;
let open = Arc::new(storage.lashlang_artifact_store())
as Arc<dyn lashlang::LashlangArtifactStore>;
let reopen = Arc::new(storage.lashlang_artifact_store())
as Arc<dyn lashlang::LashlangArtifactStore>;
ReopenableLashlangArtifactStore { open, reopen }
})
},
DurabilityTier::Durable,
)
.await;
}