aion_store_libsql/
schema.rs1use aion_store::StoreError;
4
5pub const CREATE_EVENTS_TABLE: &str = "
7CREATE TABLE IF NOT EXISTS events (
8 workflow_id TEXT NOT NULL,
9 seq INTEGER NOT NULL,
10 event BLOB NOT NULL,
11 recorded_at TEXT NOT NULL,
12 event_kind TEXT NOT NULL,
13 is_queryable_event INTEGER NOT NULL,
14 workflow_type TEXT,
15 child_workflow_id TEXT,
16 PRIMARY KEY (workflow_id, seq)
17)";
18
19pub const CREATE_EVENTS_PROJECTION_INDEX: &str = "
21CREATE INDEX IF NOT EXISTS idx_events_queryable_filter
22ON events (is_queryable_event, workflow_id, seq, event_kind, workflow_type, recorded_at, child_workflow_id)";
23
24pub const CREATE_TIMERS_TABLE: &str = "
26CREATE TABLE IF NOT EXISTS timers (
27 workflow_id TEXT NOT NULL,
28 timer_id TEXT NOT NULL,
29 fire_at TEXT NOT NULL,
30 PRIMARY KEY (workflow_id, timer_id)
31)";
32
33pub const CREATE_TIMERS_FIRE_AT_INDEX: &str = "
35CREATE INDEX IF NOT EXISTS idx_timers_fire_at
36ON timers (fire_at)";
37
38pub const CREATE_VISIBILITY_TABLE: &str = "
40CREATE TABLE IF NOT EXISTS visibility (
41 workflow_id TEXT PRIMARY KEY,
42 run_id TEXT NOT NULL,
43 workflow_type TEXT NOT NULL,
44 status TEXT NOT NULL,
45 start_time TEXT NOT NULL,
46 close_time TEXT,
47 search_attributes TEXT NOT NULL CHECK (json_valid(search_attributes))
48)";
49
50pub const CREATE_VISIBILITY_WORKFLOW_TYPE_INDEX: &str = "
52CREATE INDEX IF NOT EXISTS idx_visibility_workflow_type
53ON visibility (workflow_type)";
54
55pub const CREATE_VISIBILITY_STATUS_INDEX: &str = "
57CREATE INDEX IF NOT EXISTS idx_visibility_status
58ON visibility (status)";
59
60pub const CREATE_VISIBILITY_START_TIME_INDEX: &str = "
62CREATE INDEX IF NOT EXISTS idx_visibility_start_time
63ON visibility (start_time)";
64
65pub const CREATE_VISIBILITY_CLOSE_TIME_INDEX: &str = "
67CREATE INDEX IF NOT EXISTS idx_visibility_close_time
68ON visibility (close_time)";
69
70const DDL_STATEMENTS: [&str; 9] = [
71 CREATE_EVENTS_TABLE,
72 CREATE_EVENTS_PROJECTION_INDEX,
73 CREATE_TIMERS_TABLE,
74 CREATE_TIMERS_FIRE_AT_INDEX,
75 CREATE_VISIBILITY_TABLE,
76 CREATE_VISIBILITY_WORKFLOW_TYPE_INDEX,
77 CREATE_VISIBILITY_STATUS_INDEX,
78 CREATE_VISIBILITY_START_TIME_INDEX,
79 CREATE_VISIBILITY_CLOSE_TIME_INDEX,
80];
81
82pub async fn ensure_schema(conn: &libsql::Connection) -> Result<(), StoreError> {
88 for statement in DDL_STATEMENTS {
89 conn.execute(statement, ())
90 .await
91 .map_err(|error| crate::error::libsql_error(&error))?;
92 }
93
94 Ok(())
95}
96
97#[cfg(test)]
98mod tests {
99 use std::path::PathBuf;
100 use std::time::{SystemTime, UNIX_EPOCH};
101
102 use aion_store::StoreError;
103
104 use super::ensure_schema;
105 use crate::config::{LibSqlConfig, LibSqlMode};
106 use crate::connection::open_connection;
107
108 #[tokio::test]
109 async fn ensure_schema_is_idempotent() -> Result<(), StoreError> {
110 let conn = open_test_connection("idempotent").await?;
111
112 ensure_schema(&conn).await?;
113 ensure_schema(&conn).await?;
114
115 Ok(())
116 }
117
118 #[tokio::test]
119 async fn ensure_schema_creates_tables_and_indexes() -> Result<(), StoreError> {
120 let conn = open_test_connection("objects").await?;
121
122 ensure_schema(&conn).await?;
123
124 assert_schema_object(&conn, "table", "events").await?;
125 assert_schema_object(&conn, "index", "sqlite_autoindex_events_1").await?;
126 assert_schema_object(&conn, "index", "idx_events_queryable_filter").await?;
127 assert_schema_object(&conn, "table", "timers").await?;
128 assert_schema_object(&conn, "index", "sqlite_autoindex_timers_1").await?;
129 assert_schema_object(&conn, "index", "idx_timers_fire_at").await?;
130 assert_schema_object(&conn, "table", "visibility").await?;
131 assert_schema_object(&conn, "index", "sqlite_autoindex_visibility_1").await?;
132 assert_schema_object(&conn, "index", "idx_visibility_workflow_type").await?;
133 assert_schema_object(&conn, "index", "idx_visibility_status").await?;
134 assert_schema_object(&conn, "index", "idx_visibility_start_time").await?;
135 assert_schema_object(&conn, "index", "idx_visibility_close_time").await?;
136
137 Ok(())
138 }
139
140 async fn open_test_connection(name: &str) -> Result<libsql::Connection, StoreError> {
141 let config = LibSqlConfig {
142 mode: LibSqlMode::Embedded {
143 path: unique_temp_path(name),
144 },
145 journal_mode: None,
146 synchronous: None,
147 sync_interval_seconds: None,
148 };
149
150 open_connection(&config)
151 .await
152 .map(|opened| opened.connection)
153 }
154
155 async fn assert_schema_object(
156 conn: &libsql::Connection,
157 object_type: &str,
158 name: &str,
159 ) -> Result<(), StoreError> {
160 let mut rows = conn
161 .query(
162 "SELECT name FROM sqlite_master WHERE type = ?1 AND name = ?2",
163 (object_type, name),
164 )
165 .await
166 .map_err(|error| crate::error::libsql_error(&error))?;
167 let found = rows
168 .next()
169 .await
170 .map_err(|error| crate::error::libsql_error(&error))?
171 .is_some();
172
173 if found {
174 Ok(())
175 } else {
176 Err(StoreError::Backend(format!(
177 "schema object {object_type} {name} was not created"
178 )))
179 }
180 }
181
182 fn unique_temp_path(name: &str) -> PathBuf {
183 let nanos = SystemTime::now()
184 .duration_since(UNIX_EPOCH)
185 .map_or(0, |duration| duration.as_nanos());
186 std::env::temp_dir().join(format!(
187 "aion-store-libsql-schema-{name}-{}-{nanos}.db",
188 std::process::id()
189 ))
190 }
191}