Skip to main content

aion_store_libsql/
schema.rs

1//! Idempotent schema DDL for the libSQL event store.
2
3use aion_store::StoreError;
4
5/// Append-only workflow event table.
6pub const CREATE_EVENTS_TABLE: &str = "
7CREATE TABLE IF NOT EXISTS events (
8    workflow_id TEXT NOT NULL,
9    seq INTEGER NOT NULL,
10    event BLOB NOT NULL,
11    recorded_at TEXT NOT NULL,
12    event_kind TEXT NOT NULL,
13    is_queryable_event INTEGER NOT NULL,
14    workflow_type TEXT,
15    child_workflow_id TEXT,
16    PRIMARY KEY (workflow_id, seq)
17)";
18
19/// Event index supporting lifecycle projection scans and filter subqueries.
20pub const CREATE_EVENTS_PROJECTION_INDEX: &str = "
21CREATE INDEX IF NOT EXISTS idx_events_queryable_filter
22ON events (is_queryable_event, workflow_id, seq, event_kind, workflow_type, recorded_at, child_workflow_id)";
23
24/// Durable workflow timers table.
25pub const CREATE_TIMERS_TABLE: &str = "
26CREATE TABLE IF NOT EXISTS timers (
27    workflow_id TEXT NOT NULL,
28    timer_id TEXT NOT NULL,
29    fire_at TEXT NOT NULL,
30    PRIMARY KEY (workflow_id, timer_id)
31)";
32
33/// Timer index supporting due-timer range scans.
34pub const CREATE_TIMERS_FIRE_AT_INDEX: &str = "
35CREATE INDEX IF NOT EXISTS idx_timers_fire_at
36ON timers (fire_at)";
37
38/// Workflow visibility projection table.
39pub const CREATE_VISIBILITY_TABLE: &str = "
40CREATE TABLE IF NOT EXISTS visibility (
41    workflow_id TEXT PRIMARY KEY,
42    run_id TEXT NOT NULL,
43    workflow_type TEXT NOT NULL,
44    status TEXT NOT NULL,
45    start_time TEXT NOT NULL,
46    close_time TEXT,
47    search_attributes TEXT NOT NULL CHECK (json_valid(search_attributes))
48)";
49
50/// Visibility index supporting workflow-type equality filters.
51pub const CREATE_VISIBILITY_WORKFLOW_TYPE_INDEX: &str = "
52CREATE INDEX IF NOT EXISTS idx_visibility_workflow_type
53ON visibility (workflow_type)";
54
55/// Visibility index supporting status equality filters.
56pub const CREATE_VISIBILITY_STATUS_INDEX: &str = "
57CREATE INDEX IF NOT EXISTS idx_visibility_status
58ON visibility (status)";
59
60/// Visibility index supporting start-time range filters and ordering.
61pub const CREATE_VISIBILITY_START_TIME_INDEX: &str = "
62CREATE INDEX IF NOT EXISTS idx_visibility_start_time
63ON visibility (start_time)";
64
65/// Visibility index supporting close-time range filters.
66pub const CREATE_VISIBILITY_CLOSE_TIME_INDEX: &str = "
67CREATE INDEX IF NOT EXISTS idx_visibility_close_time
68ON visibility (close_time)";
69
70const DDL_STATEMENTS: [&str; 9] = [
71    CREATE_EVENTS_TABLE,
72    CREATE_EVENTS_PROJECTION_INDEX,
73    CREATE_TIMERS_TABLE,
74    CREATE_TIMERS_FIRE_AT_INDEX,
75    CREATE_VISIBILITY_TABLE,
76    CREATE_VISIBILITY_WORKFLOW_TYPE_INDEX,
77    CREATE_VISIBILITY_STATUS_INDEX,
78    CREATE_VISIBILITY_START_TIME_INDEX,
79    CREATE_VISIBILITY_CLOSE_TIME_INDEX,
80];
81
82/// Ensure the libSQL schema exists on a fresh or previously-created database.
83///
84/// # Errors
85///
86/// Returns `StoreError::Backend` when any idempotent DDL statement fails at the libSQL boundary.
87pub async fn ensure_schema(conn: &libsql::Connection) -> Result<(), StoreError> {
88    for statement in DDL_STATEMENTS {
89        conn.execute(statement, ())
90            .await
91            .map_err(|error| crate::error::libsql_error(&error))?;
92    }
93
94    Ok(())
95}
96
97#[cfg(test)]
98mod tests {
99    use std::path::PathBuf;
100    use std::time::{SystemTime, UNIX_EPOCH};
101
102    use aion_store::StoreError;
103
104    use super::ensure_schema;
105    use crate::config::{LibSqlConfig, LibSqlMode};
106    use crate::connection::open_connection;
107
108    #[tokio::test]
109    async fn ensure_schema_is_idempotent() -> Result<(), StoreError> {
110        let conn = open_test_connection("idempotent").await?;
111
112        ensure_schema(&conn).await?;
113        ensure_schema(&conn).await?;
114
115        Ok(())
116    }
117
118    #[tokio::test]
119    async fn ensure_schema_creates_tables_and_indexes() -> Result<(), StoreError> {
120        let conn = open_test_connection("objects").await?;
121
122        ensure_schema(&conn).await?;
123
124        assert_schema_object(&conn, "table", "events").await?;
125        assert_schema_object(&conn, "index", "sqlite_autoindex_events_1").await?;
126        assert_schema_object(&conn, "index", "idx_events_queryable_filter").await?;
127        assert_schema_object(&conn, "table", "timers").await?;
128        assert_schema_object(&conn, "index", "sqlite_autoindex_timers_1").await?;
129        assert_schema_object(&conn, "index", "idx_timers_fire_at").await?;
130        assert_schema_object(&conn, "table", "visibility").await?;
131        assert_schema_object(&conn, "index", "sqlite_autoindex_visibility_1").await?;
132        assert_schema_object(&conn, "index", "idx_visibility_workflow_type").await?;
133        assert_schema_object(&conn, "index", "idx_visibility_status").await?;
134        assert_schema_object(&conn, "index", "idx_visibility_start_time").await?;
135        assert_schema_object(&conn, "index", "idx_visibility_close_time").await?;
136
137        Ok(())
138    }
139
140    async fn open_test_connection(name: &str) -> Result<libsql::Connection, StoreError> {
141        let config = LibSqlConfig {
142            mode: LibSqlMode::Embedded {
143                path: unique_temp_path(name),
144            },
145            journal_mode: None,
146            synchronous: None,
147            sync_interval_seconds: None,
148        };
149
150        open_connection(&config)
151            .await
152            .map(|opened| opened.connection)
153    }
154
155    async fn assert_schema_object(
156        conn: &libsql::Connection,
157        object_type: &str,
158        name: &str,
159    ) -> Result<(), StoreError> {
160        let mut rows = conn
161            .query(
162                "SELECT name FROM sqlite_master WHERE type = ?1 AND name = ?2",
163                (object_type, name),
164            )
165            .await
166            .map_err(|error| crate::error::libsql_error(&error))?;
167        let found = rows
168            .next()
169            .await
170            .map_err(|error| crate::error::libsql_error(&error))?
171            .is_some();
172
173        if found {
174            Ok(())
175        } else {
176            Err(StoreError::Backend(format!(
177                "schema object {object_type} {name} was not created"
178            )))
179        }
180    }
181
182    fn unique_temp_path(name: &str) -> PathBuf {
183        let nanos = SystemTime::now()
184            .duration_since(UNIX_EPOCH)
185            .map_or(0, |duration| duration.as_nanos());
186        std::env::temp_dir().join(format!(
187            "aion-store-libsql-schema-{name}-{}-{nanos}.db",
188            std::process::id()
189        ))
190    }
191}