Skip to main content

aion_store_libsql/
schema.rs

1//! Idempotent schema DDL for the libSQL event store.
2
3use aion_store::StoreError;
4
5/// Append-only workflow event table.
6pub const CREATE_EVENTS_TABLE: &str = "
7CREATE TABLE IF NOT EXISTS events (
8    workflow_id TEXT NOT NULL,
9    seq INTEGER NOT NULL,
10    event BLOB NOT NULL,
11    recorded_at TEXT NOT NULL,
12    event_kind TEXT NOT NULL,
13    is_queryable_event INTEGER NOT NULL,
14    workflow_type TEXT,
15    child_workflow_id TEXT,
16    PRIMARY KEY (workflow_id, seq)
17)";
18
19/// Event index supporting lifecycle projection scans and filter subqueries.
20pub const CREATE_EVENTS_PROJECTION_INDEX: &str = "
21CREATE INDEX IF NOT EXISTS idx_events_queryable_filter
22ON events (is_queryable_event, workflow_id, seq, event_kind, workflow_type, recorded_at, child_workflow_id)";
23
24/// Durable workflow timers table.
25pub const CREATE_TIMERS_TABLE: &str = "
26CREATE TABLE IF NOT EXISTS timers (
27    workflow_id TEXT NOT NULL,
28    timer_id TEXT NOT NULL,
29    fire_at TEXT NOT NULL,
30    PRIMARY KEY (workflow_id, timer_id)
31)";
32
33/// Timer index supporting due-timer range scans.
34pub const CREATE_TIMERS_FIRE_AT_INDEX: &str = "
35CREATE INDEX IF NOT EXISTS idx_timers_fire_at
36ON timers (fire_at)";
37
38/// Runtime-deployed package archives keyed by `(workflow_type, content_hash)`.
39pub const CREATE_PACKAGES_TABLE: &str = "
40CREATE TABLE IF NOT EXISTS packages (
41    workflow_type TEXT NOT NULL,
42    content_hash TEXT NOT NULL,
43    archive BLOB NOT NULL,
44    deployed_at TEXT NOT NULL,
45    PRIMARY KEY (workflow_type, content_hash)
46)";
47
48/// Per-workflow-type route pointer for new workflow starts.
49pub const CREATE_PACKAGE_ROUTES_TABLE: &str = "
50CREATE TABLE IF NOT EXISTS package_routes (
51    workflow_type TEXT PRIMARY KEY,
52    content_hash TEXT NOT NULL
53)";
54
55/// Workflow visibility projection table.
56pub const CREATE_VISIBILITY_TABLE: &str = "
57CREATE TABLE IF NOT EXISTS visibility (
58    workflow_id TEXT PRIMARY KEY,
59    run_id TEXT NOT NULL,
60    workflow_type TEXT NOT NULL,
61    status TEXT NOT NULL,
62    start_time TEXT NOT NULL,
63    close_time TEXT,
64    search_attributes TEXT NOT NULL CHECK (json_valid(search_attributes))
65)";
66
67/// Visibility index supporting workflow-type equality filters.
68pub const CREATE_VISIBILITY_WORKFLOW_TYPE_INDEX: &str = "
69CREATE INDEX IF NOT EXISTS idx_visibility_workflow_type
70ON visibility (workflow_type)";
71
72/// Visibility index supporting status equality filters.
73pub const CREATE_VISIBILITY_STATUS_INDEX: &str = "
74CREATE INDEX IF NOT EXISTS idx_visibility_status
75ON visibility (status)";
76
77/// Visibility index supporting start-time range filters and ordering.
78pub const CREATE_VISIBILITY_START_TIME_INDEX: &str = "
79CREATE INDEX IF NOT EXISTS idx_visibility_start_time
80ON visibility (start_time)";
81
82/// Visibility index supporting close-time range filters.
83pub const CREATE_VISIBILITY_CLOSE_TIME_INDEX: &str = "
84CREATE INDEX IF NOT EXISTS idx_visibility_close_time
85ON visibility (close_time)";
86
87const DDL_STATEMENTS: [&str; 11] = [
88    CREATE_EVENTS_TABLE,
89    CREATE_EVENTS_PROJECTION_INDEX,
90    CREATE_TIMERS_TABLE,
91    CREATE_TIMERS_FIRE_AT_INDEX,
92    CREATE_PACKAGES_TABLE,
93    CREATE_PACKAGE_ROUTES_TABLE,
94    CREATE_VISIBILITY_TABLE,
95    CREATE_VISIBILITY_WORKFLOW_TYPE_INDEX,
96    CREATE_VISIBILITY_STATUS_INDEX,
97    CREATE_VISIBILITY_START_TIME_INDEX,
98    CREATE_VISIBILITY_CLOSE_TIME_INDEX,
99];
100
101/// Ensure the libSQL schema exists on a fresh or previously-created database.
102///
103/// # Errors
104///
105/// Returns `StoreError::Backend` when any idempotent DDL statement fails at the libSQL boundary.
106pub async fn ensure_schema(conn: &libsql::Connection) -> Result<(), StoreError> {
107    for statement in DDL_STATEMENTS {
108        conn.execute(statement, ())
109            .await
110            .map_err(|error| crate::error::libsql_error(&error))?;
111    }
112
113    Ok(())
114}
115
116#[cfg(test)]
117mod tests {
118    use std::path::PathBuf;
119    use std::time::{SystemTime, UNIX_EPOCH};
120
121    use aion_store::StoreError;
122
123    use super::ensure_schema;
124    use crate::config::{LibSqlConfig, LibSqlMode};
125    use crate::connection::open_connection;
126
127    #[tokio::test]
128    async fn ensure_schema_is_idempotent() -> Result<(), StoreError> {
129        let conn = open_test_connection("idempotent").await?;
130
131        ensure_schema(&conn).await?;
132        ensure_schema(&conn).await?;
133
134        Ok(())
135    }
136
137    #[tokio::test]
138    async fn ensure_schema_creates_tables_and_indexes() -> Result<(), StoreError> {
139        let conn = open_test_connection("objects").await?;
140
141        ensure_schema(&conn).await?;
142
143        assert_schema_object(&conn, "table", "events").await?;
144        assert_schema_object(&conn, "index", "sqlite_autoindex_events_1").await?;
145        assert_schema_object(&conn, "index", "idx_events_queryable_filter").await?;
146        assert_schema_object(&conn, "table", "timers").await?;
147        assert_schema_object(&conn, "index", "sqlite_autoindex_timers_1").await?;
148        assert_schema_object(&conn, "index", "idx_timers_fire_at").await?;
149        assert_schema_object(&conn, "table", "packages").await?;
150        assert_schema_object(&conn, "index", "sqlite_autoindex_packages_1").await?;
151        assert_schema_object(&conn, "table", "package_routes").await?;
152        assert_schema_object(&conn, "index", "sqlite_autoindex_package_routes_1").await?;
153        assert_schema_object(&conn, "table", "visibility").await?;
154        assert_schema_object(&conn, "index", "sqlite_autoindex_visibility_1").await?;
155        assert_schema_object(&conn, "index", "idx_visibility_workflow_type").await?;
156        assert_schema_object(&conn, "index", "idx_visibility_status").await?;
157        assert_schema_object(&conn, "index", "idx_visibility_start_time").await?;
158        assert_schema_object(&conn, "index", "idx_visibility_close_time").await?;
159
160        Ok(())
161    }
162
163    async fn open_test_connection(name: &str) -> Result<libsql::Connection, StoreError> {
164        let config = LibSqlConfig {
165            mode: LibSqlMode::Embedded {
166                path: unique_temp_path(name),
167            },
168            journal_mode: None,
169            synchronous: None,
170            sync_interval_seconds: None,
171        };
172
173        open_connection(&config)
174            .await
175            .map(|opened| opened.connection)
176    }
177
178    async fn assert_schema_object(
179        conn: &libsql::Connection,
180        object_type: &str,
181        name: &str,
182    ) -> Result<(), StoreError> {
183        let mut rows = conn
184            .query(
185                "SELECT name FROM sqlite_master WHERE type = ?1 AND name = ?2",
186                (object_type, name),
187            )
188            .await
189            .map_err(|error| crate::error::libsql_error(&error))?;
190        let found = rows
191            .next()
192            .await
193            .map_err(|error| crate::error::libsql_error(&error))?
194            .is_some();
195
196        if found {
197            Ok(())
198        } else {
199            Err(StoreError::Backend(format!(
200                "schema object {object_type} {name} was not created"
201            )))
202        }
203    }
204
205    fn unique_temp_path(name: &str) -> PathBuf {
206        let nanos = SystemTime::now()
207            .duration_since(UNIX_EPOCH)
208            .map_or(0, |duration| duration.as_nanos());
209        std::env::temp_dir().join(format!(
210            "aion-store-libsql-schema-{name}-{}-{nanos}.db",
211            std::process::id()
212        ))
213    }
214}