Skip to main content

aion_store_libsql/
store.rs

1//! `LibSqlStore` struct and `EventStore` implementation wiring.
2
3use std::path::PathBuf;
4
5use aion_store::{
6    Event, PackageRecord, PackageRouteRecord, PackageStore, ReadableEventStore, RunSummary,
7    StoreError, TimerEntry, TimerId, WorkflowFilter, WorkflowId, WorkflowSummary,
8    WritableEventStore, WriteToken,
9};
10use async_trait::async_trait;
11use chrono::{DateTime, Utc};
12
13use crate::config::{LibSqlConfig, LibSqlMode};
14
15/// Durable `EventStore` backed by a shared libSQL connection.
16#[derive(Clone)]
17pub struct LibSqlStore {
18    conn: libsql::Connection,
19    db: std::sync::Arc<libsql::Database>,
20}
21
22impl LibSqlStore {
23    /// Open a store from operator-provided libSQL configuration.
24    ///
25    /// # Errors
26    ///
27    /// Returns `StoreError::Backend` when the connection cannot be opened or when the idempotent
28    /// schema DDL cannot be applied.
29    pub async fn connect(config: LibSqlConfig) -> Result<Self, StoreError> {
30        let opened = crate::connection::open_connection(&config).await?;
31        let conn = opened.connection;
32        crate::schema::ensure_schema(&conn).await?;
33
34        Ok(Self {
35            conn,
36            db: std::sync::Arc::new(opened.database),
37        })
38    }
39
40    /// Open an embedded local-file store at `path`.
41    ///
42    /// Operator tunables remain unset; this convenience constructor only selects embedded mode.
43    ///
44    /// # Errors
45    ///
46    /// Returns `StoreError::Backend` when the connection cannot be opened or when the idempotent
47    /// schema DDL cannot be applied.
48    pub async fn open(path: impl Into<PathBuf>) -> Result<Self, StoreError> {
49        Self::connect(LibSqlConfig {
50            mode: LibSqlMode::Embedded { path: path.into() },
51            journal_mode: None,
52            synchronous: None,
53            sync_interval_seconds: None,
54        })
55        .await
56    }
57
58    /// Validate stored event blobs against the current Aion event schema.
59    ///
60    /// # Errors
61    ///
62    /// Returns `StoreError::Serialization` when any stored event cannot be decoded by the current
63    /// event schema, or `StoreError::Backend` for libSQL scan failures.
64    pub async fn validate_event_compatibility(&self) -> Result<(), StoreError> {
65        crate::read::validate_all_events(self.connection()).await
66    }
67
68    /// Trigger and await a libSQL replica synchronization cycle.
69    ///
70    /// # Errors
71    ///
72    /// Returns `StoreError::Backend` when the current libSQL database mode does not support sync or
73    /// when the replica sync operation fails.
74    pub async fn sync(&self) -> Result<(), StoreError> {
75        self.db
76            .sync()
77            .await
78            .map(|_| ())
79            .map_err(|error| crate::error::libsql_error(&error))
80    }
81
82    /// Borrow the shared libSQL connection used by append, read, and timer modules.
83    pub(crate) fn connection(&self) -> &libsql::Connection {
84        &self.conn
85    }
86}
87
88#[async_trait]
89impl PackageStore for LibSqlStore {
90    async fn put_package(&self, record: PackageRecord) -> Result<(), StoreError> {
91        crate::package::put_package(self.connection(), record).await
92    }
93
94    async fn list_packages(&self) -> Result<Vec<PackageRecord>, StoreError> {
95        crate::package::list_packages(self.connection()).await
96    }
97
98    async fn delete_package(
99        &self,
100        workflow_type: &str,
101        content_hash: &str,
102    ) -> Result<(), StoreError> {
103        crate::package::delete_package(self.connection(), workflow_type, content_hash).await
104    }
105
106    async fn put_package_route(
107        &self,
108        workflow_type: &str,
109        content_hash: &str,
110    ) -> Result<(), StoreError> {
111        crate::package::put_package_route(self.connection(), workflow_type, content_hash).await
112    }
113
114    async fn list_package_routes(&self) -> Result<Vec<PackageRouteRecord>, StoreError> {
115        crate::package::list_package_routes(self.connection()).await
116    }
117}
118
119#[async_trait]
120impl WritableEventStore for LibSqlStore {
121    async fn append(
122        &self,
123        _token: WriteToken,
124        workflow_id: &WorkflowId,
125        events: &[Event],
126        expected_seq: u64,
127    ) -> Result<(), StoreError> {
128        crate::append::append(self.connection(), workflow_id, events, expected_seq).await
129    }
130}
131
132#[async_trait]
133impl ReadableEventStore for LibSqlStore {
134    async fn read_history(&self, workflow_id: &WorkflowId) -> Result<Vec<Event>, StoreError> {
135        crate::read::read_history(self.connection(), workflow_id).await
136    }
137
138    async fn read_history_from(
139        &self,
140        workflow_id: &WorkflowId,
141        from_seq: u64,
142    ) -> Result<Vec<Event>, StoreError> {
143        crate::read::read_history_from(self.connection(), workflow_id, from_seq).await
144    }
145
146    async fn read_run_chain(
147        &self,
148        workflow_id: &WorkflowId,
149    ) -> Result<Vec<RunSummary>, StoreError> {
150        crate::read::read_run_chain(self.connection(), workflow_id).await
151    }
152
153    async fn list_workflow_ids(&self) -> Result<Vec<WorkflowId>, StoreError> {
154        crate::read::list_workflow_ids(self.connection()).await
155    }
156
157    async fn list_active(&self) -> Result<Vec<WorkflowId>, StoreError> {
158        crate::read::list_active(self.connection()).await
159    }
160
161    async fn query(&self, filter: &WorkflowFilter) -> Result<Vec<WorkflowSummary>, StoreError> {
162        crate::read::query(self.connection(), filter).await
163    }
164
165    async fn schedule_timer(
166        &self,
167        workflow_id: &WorkflowId,
168        timer_id: &TimerId,
169        fire_at: DateTime<Utc>,
170    ) -> Result<(), StoreError> {
171        crate::timer::schedule_timer(self.connection(), workflow_id, timer_id, fire_at).await
172    }
173
174    async fn expired_timers(&self, as_of: DateTime<Utc>) -> Result<Vec<TimerEntry>, StoreError> {
175        crate::timer::expired_timers(self.connection(), as_of).await
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use std::path::PathBuf;
182    use std::sync::Arc;
183    use std::time::{SystemTime, UNIX_EPOCH};
184
185    use aion_store::{EventStore, StoreError};
186
187    use super::LibSqlStore;
188
189    #[test]
190    fn libsql_store_is_send_sync_static() {
191        fn assert_send_sync_static<T: Send + Sync + 'static>() {}
192
193        assert_send_sync_static::<LibSqlStore>();
194    }
195
196    #[tokio::test]
197    async fn open_creates_schema() -> Result<(), StoreError> {
198        let store = LibSqlStore::open(unique_temp_path("open-schema")).await?;
199
200        assert_schema_object(store.connection(), "table", "events").await?;
201        assert_schema_object(store.connection(), "table", "timers").await?;
202        assert_schema_object(store.connection(), "table", "visibility").await?;
203
204        Ok(())
205    }
206
207    #[tokio::test]
208    async fn store_can_be_used_as_event_store_trait_object() -> Result<(), StoreError> {
209        let store = LibSqlStore::open(unique_temp_path("trait-object")).await?;
210        let store: Arc<dyn EventStore> = Arc::new(store);
211
212        assert_eq!(Arc::strong_count(&store), 1);
213        Ok(())
214    }
215
216    #[tokio::test]
217    async fn connection_accessor_reuses_same_database_handle() -> Result<(), StoreError> {
218        let store = LibSqlStore::open(unique_temp_path("shared-handle")).await?;
219
220        store
221            .connection()
222            .execute(
223                "INSERT INTO timers (workflow_id, timer_id, fire_at) VALUES (?1, ?2, ?3)",
224                ("workflow-a", "timer-a", "2026-06-03T00:00:00Z"),
225            )
226            .await
227            .map_err(|error| crate::error::libsql_error(&error))?;
228
229        let count = timer_count(store.connection()).await?;
230        if count == 1 {
231            Ok(())
232        } else {
233            Err(StoreError::Backend(format!(
234                "expected one timer through shared connection, found {count}"
235            )))
236        }
237    }
238
239    async fn assert_schema_object(
240        conn: &libsql::Connection,
241        object_type: &str,
242        name: &str,
243    ) -> Result<(), StoreError> {
244        let mut rows = conn
245            .query(
246                "SELECT name FROM sqlite_master WHERE type = ?1 AND name = ?2",
247                (object_type, name),
248            )
249            .await
250            .map_err(|error| crate::error::libsql_error(&error))?;
251        let found = rows
252            .next()
253            .await
254            .map_err(|error| crate::error::libsql_error(&error))?
255            .is_some();
256
257        if found {
258            Ok(())
259        } else {
260            Err(StoreError::Backend(format!(
261                "schema object {object_type} {name} was not created"
262            )))
263        }
264    }
265
266    async fn timer_count(conn: &libsql::Connection) -> Result<i64, StoreError> {
267        let mut rows = conn
268            .query("SELECT COUNT(*) FROM timers", ())
269            .await
270            .map_err(|error| crate::error::libsql_error(&error))?;
271        let row = rows
272            .next()
273            .await
274            .map_err(|error| crate::error::libsql_error(&error))?
275            .ok_or_else(|| {
276                StoreError::Backend(String::from("timer count query returned no row"))
277            })?;
278
279        row.get(0)
280            .map_err(|error| crate::error::libsql_error(&error))
281    }
282
283    fn unique_temp_path(name: &str) -> PathBuf {
284        let nanos = SystemTime::now()
285            .duration_since(UNIX_EPOCH)
286            .map_or(0, |duration| duration.as_nanos());
287        std::env::temp_dir().join(format!(
288            "aion-store-libsql-store-{name}-{}-{nanos}.db",
289            std::process::id()
290        ))
291    }
292}