1use std::path::PathBuf;
4
5use aion_store::{
6 Event, PackageRecord, PackageRouteRecord, PackageStore, ReadableEventStore, RunSummary,
7 StoreError, TimerEntry, TimerId, WorkflowFilter, WorkflowId, WorkflowSummary,
8 WritableEventStore, WriteToken,
9};
10use async_trait::async_trait;
11use chrono::{DateTime, Utc};
12
13use crate::config::{LibSqlConfig, LibSqlMode};
14
15#[derive(Clone)]
17pub struct LibSqlStore {
18 conn: libsql::Connection,
19 db: std::sync::Arc<libsql::Database>,
20}
21
22impl LibSqlStore {
23 pub async fn connect(config: LibSqlConfig) -> Result<Self, StoreError> {
30 let opened = crate::connection::open_connection(&config).await?;
31 let conn = opened.connection;
32 crate::schema::ensure_schema(&conn).await?;
33
34 Ok(Self {
35 conn,
36 db: std::sync::Arc::new(opened.database),
37 })
38 }
39
40 pub async fn open(path: impl Into<PathBuf>) -> Result<Self, StoreError> {
49 Self::connect(LibSqlConfig {
50 mode: LibSqlMode::Embedded { path: path.into() },
51 journal_mode: None,
52 synchronous: None,
53 sync_interval_seconds: None,
54 })
55 .await
56 }
57
58 pub async fn validate_event_compatibility(&self) -> Result<(), StoreError> {
65 crate::read::validate_all_events(self.connection()).await
66 }
67
68 pub async fn sync(&self) -> Result<(), StoreError> {
75 self.db
76 .sync()
77 .await
78 .map(|_| ())
79 .map_err(|error| crate::error::libsql_error(&error))
80 }
81
82 pub(crate) fn connection(&self) -> &libsql::Connection {
84 &self.conn
85 }
86}
87
88#[async_trait]
89impl PackageStore for LibSqlStore {
90 async fn put_package(&self, record: PackageRecord) -> Result<(), StoreError> {
91 crate::package::put_package(self.connection(), record).await
92 }
93
94 async fn list_packages(&self) -> Result<Vec<PackageRecord>, StoreError> {
95 crate::package::list_packages(self.connection()).await
96 }
97
98 async fn delete_package(
99 &self,
100 workflow_type: &str,
101 content_hash: &str,
102 ) -> Result<(), StoreError> {
103 crate::package::delete_package(self.connection(), workflow_type, content_hash).await
104 }
105
106 async fn put_package_route(
107 &self,
108 workflow_type: &str,
109 content_hash: &str,
110 ) -> Result<(), StoreError> {
111 crate::package::put_package_route(self.connection(), workflow_type, content_hash).await
112 }
113
114 async fn list_package_routes(&self) -> Result<Vec<PackageRouteRecord>, StoreError> {
115 crate::package::list_package_routes(self.connection()).await
116 }
117}
118
119#[async_trait]
120impl WritableEventStore for LibSqlStore {
121 async fn append(
122 &self,
123 _token: WriteToken,
124 workflow_id: &WorkflowId,
125 events: &[Event],
126 expected_seq: u64,
127 ) -> Result<(), StoreError> {
128 crate::append::append(self.connection(), workflow_id, events, expected_seq).await
129 }
130}
131
132#[async_trait]
133impl ReadableEventStore for LibSqlStore {
134 async fn read_history(&self, workflow_id: &WorkflowId) -> Result<Vec<Event>, StoreError> {
135 crate::read::read_history(self.connection(), workflow_id).await
136 }
137
138 async fn read_history_from(
139 &self,
140 workflow_id: &WorkflowId,
141 from_seq: u64,
142 ) -> Result<Vec<Event>, StoreError> {
143 crate::read::read_history_from(self.connection(), workflow_id, from_seq).await
144 }
145
146 async fn read_run_chain(
147 &self,
148 workflow_id: &WorkflowId,
149 ) -> Result<Vec<RunSummary>, StoreError> {
150 crate::read::read_run_chain(self.connection(), workflow_id).await
151 }
152
153 async fn list_workflow_ids(&self) -> Result<Vec<WorkflowId>, StoreError> {
154 crate::read::list_workflow_ids(self.connection()).await
155 }
156
157 async fn list_active(&self) -> Result<Vec<WorkflowId>, StoreError> {
158 crate::read::list_active(self.connection()).await
159 }
160
161 async fn query(&self, filter: &WorkflowFilter) -> Result<Vec<WorkflowSummary>, StoreError> {
162 crate::read::query(self.connection(), filter).await
163 }
164
165 async fn schedule_timer(
166 &self,
167 workflow_id: &WorkflowId,
168 timer_id: &TimerId,
169 fire_at: DateTime<Utc>,
170 ) -> Result<(), StoreError> {
171 crate::timer::schedule_timer(self.connection(), workflow_id, timer_id, fire_at).await
172 }
173
174 async fn expired_timers(&self, as_of: DateTime<Utc>) -> Result<Vec<TimerEntry>, StoreError> {
175 crate::timer::expired_timers(self.connection(), as_of).await
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use std::path::PathBuf;
182 use std::sync::Arc;
183 use std::time::{SystemTime, UNIX_EPOCH};
184
185 use aion_store::{EventStore, StoreError};
186
187 use super::LibSqlStore;
188
189 #[test]
190 fn libsql_store_is_send_sync_static() {
191 fn assert_send_sync_static<T: Send + Sync + 'static>() {}
192
193 assert_send_sync_static::<LibSqlStore>();
194 }
195
196 #[tokio::test]
197 async fn open_creates_schema() -> Result<(), StoreError> {
198 let store = LibSqlStore::open(unique_temp_path("open-schema")).await?;
199
200 assert_schema_object(store.connection(), "table", "events").await?;
201 assert_schema_object(store.connection(), "table", "timers").await?;
202 assert_schema_object(store.connection(), "table", "visibility").await?;
203
204 Ok(())
205 }
206
207 #[tokio::test]
208 async fn store_can_be_used_as_event_store_trait_object() -> Result<(), StoreError> {
209 let store = LibSqlStore::open(unique_temp_path("trait-object")).await?;
210 let store: Arc<dyn EventStore> = Arc::new(store);
211
212 assert_eq!(Arc::strong_count(&store), 1);
213 Ok(())
214 }
215
216 #[tokio::test]
217 async fn connection_accessor_reuses_same_database_handle() -> Result<(), StoreError> {
218 let store = LibSqlStore::open(unique_temp_path("shared-handle")).await?;
219
220 store
221 .connection()
222 .execute(
223 "INSERT INTO timers (workflow_id, timer_id, fire_at) VALUES (?1, ?2, ?3)",
224 ("workflow-a", "timer-a", "2026-06-03T00:00:00Z"),
225 )
226 .await
227 .map_err(|error| crate::error::libsql_error(&error))?;
228
229 let count = timer_count(store.connection()).await?;
230 if count == 1 {
231 Ok(())
232 } else {
233 Err(StoreError::Backend(format!(
234 "expected one timer through shared connection, found {count}"
235 )))
236 }
237 }
238
239 async fn assert_schema_object(
240 conn: &libsql::Connection,
241 object_type: &str,
242 name: &str,
243 ) -> Result<(), StoreError> {
244 let mut rows = conn
245 .query(
246 "SELECT name FROM sqlite_master WHERE type = ?1 AND name = ?2",
247 (object_type, name),
248 )
249 .await
250 .map_err(|error| crate::error::libsql_error(&error))?;
251 let found = rows
252 .next()
253 .await
254 .map_err(|error| crate::error::libsql_error(&error))?
255 .is_some();
256
257 if found {
258 Ok(())
259 } else {
260 Err(StoreError::Backend(format!(
261 "schema object {object_type} {name} was not created"
262 )))
263 }
264 }
265
266 async fn timer_count(conn: &libsql::Connection) -> Result<i64, StoreError> {
267 let mut rows = conn
268 .query("SELECT COUNT(*) FROM timers", ())
269 .await
270 .map_err(|error| crate::error::libsql_error(&error))?;
271 let row = rows
272 .next()
273 .await
274 .map_err(|error| crate::error::libsql_error(&error))?
275 .ok_or_else(|| {
276 StoreError::Backend(String::from("timer count query returned no row"))
277 })?;
278
279 row.get(0)
280 .map_err(|error| crate::error::libsql_error(&error))
281 }
282
283 fn unique_temp_path(name: &str) -> PathBuf {
284 let nanos = SystemTime::now()
285 .duration_since(UNIX_EPOCH)
286 .map_or(0, |duration| duration.as_nanos());
287 std::env::temp_dir().join(format!(
288 "aion-store-libsql-store-{name}-{}-{nanos}.db",
289 std::process::id()
290 ))
291 }
292}