1use std::path::PathBuf;
4
5use aion_store::{
6 Event, ReadableEventStore, RunSummary, StoreError, TimerEntry, TimerId, WorkflowFilter,
7 WorkflowId, WorkflowSummary, WritableEventStore, WriteToken,
8};
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11
12use crate::config::{LibSqlConfig, LibSqlMode};
13
14#[derive(Clone)]
16pub struct LibSqlStore {
17 conn: libsql::Connection,
18 db: std::sync::Arc<libsql::Database>,
19}
20
21impl LibSqlStore {
22 pub async fn connect(config: LibSqlConfig) -> Result<Self, StoreError> {
29 let opened = crate::connection::open_connection(&config).await?;
30 let conn = opened.connection;
31 crate::schema::ensure_schema(&conn).await?;
32
33 Ok(Self {
34 conn,
35 db: std::sync::Arc::new(opened.database),
36 })
37 }
38
39 pub async fn open(path: impl Into<PathBuf>) -> Result<Self, StoreError> {
48 Self::connect(LibSqlConfig {
49 mode: LibSqlMode::Embedded { path: path.into() },
50 journal_mode: None,
51 synchronous: None,
52 sync_interval_seconds: None,
53 })
54 .await
55 }
56
57 pub async fn validate_event_compatibility(&self) -> Result<(), StoreError> {
64 crate::read::validate_all_events(self.connection()).await
65 }
66
67 pub async fn sync(&self) -> Result<(), StoreError> {
74 self.db
75 .sync()
76 .await
77 .map(|_| ())
78 .map_err(|error| crate::error::libsql_error(&error))
79 }
80
81 pub(crate) fn connection(&self) -> &libsql::Connection {
83 &self.conn
84 }
85}
86
87#[async_trait]
88impl WritableEventStore for LibSqlStore {
89 async fn append(
90 &self,
91 _token: WriteToken,
92 workflow_id: &WorkflowId,
93 events: &[Event],
94 expected_seq: u64,
95 ) -> Result<(), StoreError> {
96 crate::append::append(self.connection(), workflow_id, events, expected_seq).await
97 }
98}
99
100#[async_trait]
101impl ReadableEventStore for LibSqlStore {
102 async fn read_history(&self, workflow_id: &WorkflowId) -> Result<Vec<Event>, StoreError> {
103 crate::read::read_history(self.connection(), workflow_id).await
104 }
105
106 async fn read_history_from(
107 &self,
108 workflow_id: &WorkflowId,
109 from_seq: u64,
110 ) -> Result<Vec<Event>, StoreError> {
111 crate::read::read_history_from(self.connection(), workflow_id, from_seq).await
112 }
113
114 async fn read_run_chain(
115 &self,
116 workflow_id: &WorkflowId,
117 ) -> Result<Vec<RunSummary>, StoreError> {
118 crate::read::read_run_chain(self.connection(), workflow_id).await
119 }
120
121 async fn list_workflow_ids(&self) -> Result<Vec<WorkflowId>, StoreError> {
122 crate::read::list_workflow_ids(self.connection()).await
123 }
124
125 async fn list_active(&self) -> Result<Vec<WorkflowId>, StoreError> {
126 crate::read::list_active(self.connection()).await
127 }
128
129 async fn query(&self, filter: &WorkflowFilter) -> Result<Vec<WorkflowSummary>, StoreError> {
130 crate::read::query(self.connection(), filter).await
131 }
132
133 async fn schedule_timer(
134 &self,
135 workflow_id: &WorkflowId,
136 timer_id: &TimerId,
137 fire_at: DateTime<Utc>,
138 ) -> Result<(), StoreError> {
139 crate::timer::schedule_timer(self.connection(), workflow_id, timer_id, fire_at).await
140 }
141
142 async fn expired_timers(&self, as_of: DateTime<Utc>) -> Result<Vec<TimerEntry>, StoreError> {
143 crate::timer::expired_timers(self.connection(), as_of).await
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use std::path::PathBuf;
150 use std::sync::Arc;
151 use std::time::{SystemTime, UNIX_EPOCH};
152
153 use aion_store::{EventStore, StoreError};
154
155 use super::LibSqlStore;
156
157 #[test]
158 fn libsql_store_is_send_sync_static() {
159 fn assert_send_sync_static<T: Send + Sync + 'static>() {}
160
161 assert_send_sync_static::<LibSqlStore>();
162 }
163
164 #[tokio::test]
165 async fn open_creates_schema() -> Result<(), StoreError> {
166 let store = LibSqlStore::open(unique_temp_path("open-schema")).await?;
167
168 assert_schema_object(store.connection(), "table", "events").await?;
169 assert_schema_object(store.connection(), "table", "timers").await?;
170 assert_schema_object(store.connection(), "table", "visibility").await?;
171
172 Ok(())
173 }
174
175 #[tokio::test]
176 async fn store_can_be_used_as_event_store_trait_object() -> Result<(), StoreError> {
177 let store = LibSqlStore::open(unique_temp_path("trait-object")).await?;
178 let store: Arc<dyn EventStore> = Arc::new(store);
179
180 assert_eq!(Arc::strong_count(&store), 1);
181 Ok(())
182 }
183
184 #[tokio::test]
185 async fn connection_accessor_reuses_same_database_handle() -> Result<(), StoreError> {
186 let store = LibSqlStore::open(unique_temp_path("shared-handle")).await?;
187
188 store
189 .connection()
190 .execute(
191 "INSERT INTO timers (workflow_id, timer_id, fire_at) VALUES (?1, ?2, ?3)",
192 ("workflow-a", "timer-a", "2026-06-03T00:00:00Z"),
193 )
194 .await
195 .map_err(|error| crate::error::libsql_error(&error))?;
196
197 let count = timer_count(store.connection()).await?;
198 if count == 1 {
199 Ok(())
200 } else {
201 Err(StoreError::Backend(format!(
202 "expected one timer through shared connection, found {count}"
203 )))
204 }
205 }
206
207 async fn assert_schema_object(
208 conn: &libsql::Connection,
209 object_type: &str,
210 name: &str,
211 ) -> Result<(), StoreError> {
212 let mut rows = conn
213 .query(
214 "SELECT name FROM sqlite_master WHERE type = ?1 AND name = ?2",
215 (object_type, name),
216 )
217 .await
218 .map_err(|error| crate::error::libsql_error(&error))?;
219 let found = rows
220 .next()
221 .await
222 .map_err(|error| crate::error::libsql_error(&error))?
223 .is_some();
224
225 if found {
226 Ok(())
227 } else {
228 Err(StoreError::Backend(format!(
229 "schema object {object_type} {name} was not created"
230 )))
231 }
232 }
233
234 async fn timer_count(conn: &libsql::Connection) -> Result<i64, StoreError> {
235 let mut rows = conn
236 .query("SELECT COUNT(*) FROM timers", ())
237 .await
238 .map_err(|error| crate::error::libsql_error(&error))?;
239 let row = rows
240 .next()
241 .await
242 .map_err(|error| crate::error::libsql_error(&error))?
243 .ok_or_else(|| {
244 StoreError::Backend(String::from("timer count query returned no row"))
245 })?;
246
247 row.get(0)
248 .map_err(|error| crate::error::libsql_error(&error))
249 }
250
251 fn unique_temp_path(name: &str) -> PathBuf {
252 let nanos = SystemTime::now()
253 .duration_since(UNIX_EPOCH)
254 .map_or(0, |duration| duration.as_nanos());
255 std::env::temp_dir().join(format!(
256 "aion-store-libsql-store-{name}-{}-{nanos}.db",
257 std::process::id()
258 ))
259 }
260}