use aion_store::StoreError;
pub const CREATE_EVENTS_TABLE: &str = "
CREATE TABLE IF NOT EXISTS events (
workflow_id TEXT NOT NULL,
seq INTEGER NOT NULL,
event BLOB NOT NULL,
recorded_at TEXT NOT NULL,
event_kind TEXT NOT NULL,
is_queryable_event INTEGER NOT NULL,
workflow_type TEXT,
child_workflow_id TEXT,
PRIMARY KEY (workflow_id, seq)
)";
pub const CREATE_EVENTS_PROJECTION_INDEX: &str = "
CREATE INDEX IF NOT EXISTS idx_events_queryable_filter
ON events (is_queryable_event, workflow_id, seq, event_kind, workflow_type, recorded_at, child_workflow_id)";
pub const CREATE_TIMERS_TABLE: &str = "
CREATE TABLE IF NOT EXISTS timers (
workflow_id TEXT NOT NULL,
timer_id TEXT NOT NULL,
fire_at TEXT NOT NULL,
PRIMARY KEY (workflow_id, timer_id)
)";
pub const CREATE_TIMERS_FIRE_AT_INDEX: &str = "
CREATE INDEX IF NOT EXISTS idx_timers_fire_at
ON timers (fire_at)";
pub const CREATE_PACKAGES_TABLE: &str = "
CREATE TABLE IF NOT EXISTS packages (
workflow_type TEXT NOT NULL,
content_hash TEXT NOT NULL,
archive BLOB NOT NULL,
deployed_at TEXT NOT NULL,
PRIMARY KEY (workflow_type, content_hash)
)";
pub const CREATE_PACKAGE_ROUTES_TABLE: &str = "
CREATE TABLE IF NOT EXISTS package_routes (
workflow_type TEXT PRIMARY KEY,
content_hash TEXT NOT NULL
)";
pub const CREATE_VISIBILITY_TABLE: &str = "
CREATE TABLE IF NOT EXISTS visibility (
workflow_id TEXT PRIMARY KEY,
run_id TEXT NOT NULL,
workflow_type TEXT NOT NULL,
status TEXT NOT NULL,
start_time TEXT NOT NULL,
close_time TEXT,
search_attributes TEXT NOT NULL CHECK (json_valid(search_attributes))
)";
pub const CREATE_VISIBILITY_WORKFLOW_TYPE_INDEX: &str = "
CREATE INDEX IF NOT EXISTS idx_visibility_workflow_type
ON visibility (workflow_type)";
pub const CREATE_VISIBILITY_STATUS_INDEX: &str = "
CREATE INDEX IF NOT EXISTS idx_visibility_status
ON visibility (status)";
pub const CREATE_VISIBILITY_START_TIME_INDEX: &str = "
CREATE INDEX IF NOT EXISTS idx_visibility_start_time
ON visibility (start_time)";
pub const CREATE_VISIBILITY_CLOSE_TIME_INDEX: &str = "
CREATE INDEX IF NOT EXISTS idx_visibility_close_time
ON visibility (close_time)";
const DDL_STATEMENTS: [&str; 11] = [
CREATE_EVENTS_TABLE,
CREATE_EVENTS_PROJECTION_INDEX,
CREATE_TIMERS_TABLE,
CREATE_TIMERS_FIRE_AT_INDEX,
CREATE_PACKAGES_TABLE,
CREATE_PACKAGE_ROUTES_TABLE,
CREATE_VISIBILITY_TABLE,
CREATE_VISIBILITY_WORKFLOW_TYPE_INDEX,
CREATE_VISIBILITY_STATUS_INDEX,
CREATE_VISIBILITY_START_TIME_INDEX,
CREATE_VISIBILITY_CLOSE_TIME_INDEX,
];
pub async fn ensure_schema(conn: &libsql::Connection) -> Result<(), StoreError> {
for statement in DDL_STATEMENTS {
conn.execute(statement, ())
.await
.map_err(|error| crate::error::libsql_error(&error))?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};
use aion_store::StoreError;
use super::ensure_schema;
use crate::config::{LibSqlConfig, LibSqlMode};
use crate::connection::open_connection;
#[tokio::test]
async fn ensure_schema_is_idempotent() -> Result<(), StoreError> {
let conn = open_test_connection("idempotent").await?;
ensure_schema(&conn).await?;
ensure_schema(&conn).await?;
Ok(())
}
#[tokio::test]
async fn ensure_schema_creates_tables_and_indexes() -> Result<(), StoreError> {
let conn = open_test_connection("objects").await?;
ensure_schema(&conn).await?;
assert_schema_object(&conn, "table", "events").await?;
assert_schema_object(&conn, "index", "sqlite_autoindex_events_1").await?;
assert_schema_object(&conn, "index", "idx_events_queryable_filter").await?;
assert_schema_object(&conn, "table", "timers").await?;
assert_schema_object(&conn, "index", "sqlite_autoindex_timers_1").await?;
assert_schema_object(&conn, "index", "idx_timers_fire_at").await?;
assert_schema_object(&conn, "table", "packages").await?;
assert_schema_object(&conn, "index", "sqlite_autoindex_packages_1").await?;
assert_schema_object(&conn, "table", "package_routes").await?;
assert_schema_object(&conn, "index", "sqlite_autoindex_package_routes_1").await?;
assert_schema_object(&conn, "table", "visibility").await?;
assert_schema_object(&conn, "index", "sqlite_autoindex_visibility_1").await?;
assert_schema_object(&conn, "index", "idx_visibility_workflow_type").await?;
assert_schema_object(&conn, "index", "idx_visibility_status").await?;
assert_schema_object(&conn, "index", "idx_visibility_start_time").await?;
assert_schema_object(&conn, "index", "idx_visibility_close_time").await?;
Ok(())
}
async fn open_test_connection(name: &str) -> Result<libsql::Connection, StoreError> {
let config = LibSqlConfig {
mode: LibSqlMode::Embedded {
path: unique_temp_path(name),
},
journal_mode: None,
synchronous: None,
sync_interval_seconds: None,
};
open_connection(&config)
.await
.map(|opened| opened.connection)
}
async fn assert_schema_object(
conn: &libsql::Connection,
object_type: &str,
name: &str,
) -> Result<(), StoreError> {
let mut rows = conn
.query(
"SELECT name FROM sqlite_master WHERE type = ?1 AND name = ?2",
(object_type, name),
)
.await
.map_err(|error| crate::error::libsql_error(&error))?;
let found = rows
.next()
.await
.map_err(|error| crate::error::libsql_error(&error))?
.is_some();
if found {
Ok(())
} else {
Err(StoreError::Backend(format!(
"schema object {object_type} {name} was not created"
)))
}
}
fn unique_temp_path(name: &str) -> PathBuf {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |duration| duration.as_nanos());
std::env::temp_dir().join(format!(
"aion-store-libsql-schema-{name}-{}-{nanos}.db",
std::process::id()
))
}
}