1use aion_store::StoreError;
4
5pub const CREATE_EVENTS_TABLE: &str = "
7CREATE TABLE IF NOT EXISTS events (
8 workflow_id TEXT NOT NULL,
9 seq INTEGER NOT NULL,
10 event BLOB NOT NULL,
11 recorded_at TEXT NOT NULL,
12 event_kind TEXT NOT NULL,
13 is_queryable_event INTEGER NOT NULL,
14 workflow_type TEXT,
15 child_workflow_id TEXT,
16 PRIMARY KEY (workflow_id, seq)
17)";
18
19pub const CREATE_EVENTS_PROJECTION_INDEX: &str = "
21CREATE INDEX IF NOT EXISTS idx_events_queryable_filter
22ON events (is_queryable_event, workflow_id, seq, event_kind, workflow_type, recorded_at, child_workflow_id)";
23
24pub const CREATE_TIMERS_TABLE: &str = "
26CREATE TABLE IF NOT EXISTS timers (
27 workflow_id TEXT NOT NULL,
28 timer_id TEXT NOT NULL,
29 fire_at TEXT NOT NULL,
30 PRIMARY KEY (workflow_id, timer_id)
31)";
32
33pub const CREATE_TIMERS_FIRE_AT_INDEX: &str = "
35CREATE INDEX IF NOT EXISTS idx_timers_fire_at
36ON timers (fire_at)";
37
38pub const CREATE_PACKAGES_TABLE: &str = "
40CREATE TABLE IF NOT EXISTS packages (
41 workflow_type TEXT NOT NULL,
42 content_hash TEXT NOT NULL,
43 archive BLOB NOT NULL,
44 deployed_at TEXT NOT NULL,
45 PRIMARY KEY (workflow_type, content_hash)
46)";
47
48pub const CREATE_PACKAGE_ROUTES_TABLE: &str = "
50CREATE TABLE IF NOT EXISTS package_routes (
51 workflow_type TEXT PRIMARY KEY,
52 content_hash TEXT NOT NULL
53)";
54
55pub const CREATE_VISIBILITY_TABLE: &str = "
57CREATE TABLE IF NOT EXISTS visibility (
58 workflow_id TEXT PRIMARY KEY,
59 run_id TEXT NOT NULL,
60 workflow_type TEXT NOT NULL,
61 status TEXT NOT NULL,
62 start_time TEXT NOT NULL,
63 close_time TEXT,
64 search_attributes TEXT NOT NULL CHECK (json_valid(search_attributes))
65)";
66
67pub const CREATE_VISIBILITY_WORKFLOW_TYPE_INDEX: &str = "
69CREATE INDEX IF NOT EXISTS idx_visibility_workflow_type
70ON visibility (workflow_type)";
71
72pub const CREATE_VISIBILITY_STATUS_INDEX: &str = "
74CREATE INDEX IF NOT EXISTS idx_visibility_status
75ON visibility (status)";
76
77pub const CREATE_VISIBILITY_START_TIME_INDEX: &str = "
79CREATE INDEX IF NOT EXISTS idx_visibility_start_time
80ON visibility (start_time)";
81
82pub const CREATE_VISIBILITY_CLOSE_TIME_INDEX: &str = "
84CREATE INDEX IF NOT EXISTS idx_visibility_close_time
85ON visibility (close_time)";
86
87const DDL_STATEMENTS: [&str; 11] = [
88 CREATE_EVENTS_TABLE,
89 CREATE_EVENTS_PROJECTION_INDEX,
90 CREATE_TIMERS_TABLE,
91 CREATE_TIMERS_FIRE_AT_INDEX,
92 CREATE_PACKAGES_TABLE,
93 CREATE_PACKAGE_ROUTES_TABLE,
94 CREATE_VISIBILITY_TABLE,
95 CREATE_VISIBILITY_WORKFLOW_TYPE_INDEX,
96 CREATE_VISIBILITY_STATUS_INDEX,
97 CREATE_VISIBILITY_START_TIME_INDEX,
98 CREATE_VISIBILITY_CLOSE_TIME_INDEX,
99];
100
101pub async fn ensure_schema(conn: &libsql::Connection) -> Result<(), StoreError> {
107 for statement in DDL_STATEMENTS {
108 conn.execute(statement, ())
109 .await
110 .map_err(|error| crate::error::libsql_error(&error))?;
111 }
112
113 Ok(())
114}
115
116#[cfg(test)]
117mod tests {
118 use std::path::PathBuf;
119 use std::time::{SystemTime, UNIX_EPOCH};
120
121 use aion_store::StoreError;
122
123 use super::ensure_schema;
124 use crate::config::{LibSqlConfig, LibSqlMode};
125 use crate::connection::open_connection;
126
127 #[tokio::test]
128 async fn ensure_schema_is_idempotent() -> Result<(), StoreError> {
129 let conn = open_test_connection("idempotent").await?;
130
131 ensure_schema(&conn).await?;
132 ensure_schema(&conn).await?;
133
134 Ok(())
135 }
136
137 #[tokio::test]
138 async fn ensure_schema_creates_tables_and_indexes() -> Result<(), StoreError> {
139 let conn = open_test_connection("objects").await?;
140
141 ensure_schema(&conn).await?;
142
143 assert_schema_object(&conn, "table", "events").await?;
144 assert_schema_object(&conn, "index", "sqlite_autoindex_events_1").await?;
145 assert_schema_object(&conn, "index", "idx_events_queryable_filter").await?;
146 assert_schema_object(&conn, "table", "timers").await?;
147 assert_schema_object(&conn, "index", "sqlite_autoindex_timers_1").await?;
148 assert_schema_object(&conn, "index", "idx_timers_fire_at").await?;
149 assert_schema_object(&conn, "table", "packages").await?;
150 assert_schema_object(&conn, "index", "sqlite_autoindex_packages_1").await?;
151 assert_schema_object(&conn, "table", "package_routes").await?;
152 assert_schema_object(&conn, "index", "sqlite_autoindex_package_routes_1").await?;
153 assert_schema_object(&conn, "table", "visibility").await?;
154 assert_schema_object(&conn, "index", "sqlite_autoindex_visibility_1").await?;
155 assert_schema_object(&conn, "index", "idx_visibility_workflow_type").await?;
156 assert_schema_object(&conn, "index", "idx_visibility_status").await?;
157 assert_schema_object(&conn, "index", "idx_visibility_start_time").await?;
158 assert_schema_object(&conn, "index", "idx_visibility_close_time").await?;
159
160 Ok(())
161 }
162
163 async fn open_test_connection(name: &str) -> Result<libsql::Connection, StoreError> {
164 let config = LibSqlConfig {
165 mode: LibSqlMode::Embedded {
166 path: unique_temp_path(name),
167 },
168 journal_mode: None,
169 synchronous: None,
170 sync_interval_seconds: None,
171 };
172
173 open_connection(&config)
174 .await
175 .map(|opened| opened.connection)
176 }
177
178 async fn assert_schema_object(
179 conn: &libsql::Connection,
180 object_type: &str,
181 name: &str,
182 ) -> Result<(), StoreError> {
183 let mut rows = conn
184 .query(
185 "SELECT name FROM sqlite_master WHERE type = ?1 AND name = ?2",
186 (object_type, name),
187 )
188 .await
189 .map_err(|error| crate::error::libsql_error(&error))?;
190 let found = rows
191 .next()
192 .await
193 .map_err(|error| crate::error::libsql_error(&error))?
194 .is_some();
195
196 if found {
197 Ok(())
198 } else {
199 Err(StoreError::Backend(format!(
200 "schema object {object_type} {name} was not created"
201 )))
202 }
203 }
204
205 fn unique_temp_path(name: &str) -> PathBuf {
206 let nanos = SystemTime::now()
207 .duration_since(UNIX_EPOCH)
208 .map_or(0, |duration| duration.as_nanos());
209 std::env::temp_dir().join(format!(
210 "aion-store-libsql-schema-{name}-{}-{nanos}.db",
211 std::process::id()
212 ))
213 }
214}