Skip to main content

aion_store_libsql/
store.rs

1//! `LibSqlStore` struct and `EventStore` implementation wiring.
2
3use std::path::PathBuf;
4
5use aion_store::{
6    Event, ReadableEventStore, RunSummary, StoreError, TimerEntry, TimerId, WorkflowFilter,
7    WorkflowId, WorkflowSummary, WritableEventStore, WriteToken,
8};
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11
12use crate::config::{LibSqlConfig, LibSqlMode};
13
14/// Durable `EventStore` backed by a shared libSQL connection.
15#[derive(Clone)]
16pub struct LibSqlStore {
17    conn: libsql::Connection,
18    db: std::sync::Arc<libsql::Database>,
19}
20
21impl LibSqlStore {
22    /// Open a store from operator-provided libSQL configuration.
23    ///
24    /// # Errors
25    ///
26    /// Returns `StoreError::Backend` when the connection cannot be opened or when the idempotent
27    /// schema DDL cannot be applied.
28    pub async fn connect(config: LibSqlConfig) -> Result<Self, StoreError> {
29        let opened = crate::connection::open_connection(&config).await?;
30        let conn = opened.connection;
31        crate::schema::ensure_schema(&conn).await?;
32
33        Ok(Self {
34            conn,
35            db: std::sync::Arc::new(opened.database),
36        })
37    }
38
39    /// Open an embedded local-file store at `path`.
40    ///
41    /// Operator tunables remain unset; this convenience constructor only selects embedded mode.
42    ///
43    /// # Errors
44    ///
45    /// Returns `StoreError::Backend` when the connection cannot be opened or when the idempotent
46    /// schema DDL cannot be applied.
47    pub async fn open(path: impl Into<PathBuf>) -> Result<Self, StoreError> {
48        Self::connect(LibSqlConfig {
49            mode: LibSqlMode::Embedded { path: path.into() },
50            journal_mode: None,
51            synchronous: None,
52            sync_interval_seconds: None,
53        })
54        .await
55    }
56
57    /// Validate stored event blobs against the current Aion event schema.
58    ///
59    /// # Errors
60    ///
61    /// Returns `StoreError::Serialization` when any stored event cannot be decoded by the current
62    /// event schema, or `StoreError::Backend` for libSQL scan failures.
63    pub async fn validate_event_compatibility(&self) -> Result<(), StoreError> {
64        crate::read::validate_all_events(self.connection()).await
65    }
66
67    /// Trigger and await a libSQL replica synchronization cycle.
68    ///
69    /// # Errors
70    ///
71    /// Returns `StoreError::Backend` when the current libSQL database mode does not support sync or
72    /// when the replica sync operation fails.
73    pub async fn sync(&self) -> Result<(), StoreError> {
74        self.db
75            .sync()
76            .await
77            .map(|_| ())
78            .map_err(|error| crate::error::libsql_error(&error))
79    }
80
81    /// Borrow the shared libSQL connection used by append, read, and timer modules.
82    pub(crate) fn connection(&self) -> &libsql::Connection {
83        &self.conn
84    }
85}
86
87#[async_trait]
88impl WritableEventStore for LibSqlStore {
89    async fn append(
90        &self,
91        _token: WriteToken,
92        workflow_id: &WorkflowId,
93        events: &[Event],
94        expected_seq: u64,
95    ) -> Result<(), StoreError> {
96        crate::append::append(self.connection(), workflow_id, events, expected_seq).await
97    }
98}
99
100#[async_trait]
101impl ReadableEventStore for LibSqlStore {
102    async fn read_history(&self, workflow_id: &WorkflowId) -> Result<Vec<Event>, StoreError> {
103        crate::read::read_history(self.connection(), workflow_id).await
104    }
105
106    async fn read_history_from(
107        &self,
108        workflow_id: &WorkflowId,
109        from_seq: u64,
110    ) -> Result<Vec<Event>, StoreError> {
111        crate::read::read_history_from(self.connection(), workflow_id, from_seq).await
112    }
113
114    async fn read_run_chain(
115        &self,
116        workflow_id: &WorkflowId,
117    ) -> Result<Vec<RunSummary>, StoreError> {
118        crate::read::read_run_chain(self.connection(), workflow_id).await
119    }
120
121    async fn list_workflow_ids(&self) -> Result<Vec<WorkflowId>, StoreError> {
122        crate::read::list_workflow_ids(self.connection()).await
123    }
124
125    async fn list_active(&self) -> Result<Vec<WorkflowId>, StoreError> {
126        crate::read::list_active(self.connection()).await
127    }
128
129    async fn query(&self, filter: &WorkflowFilter) -> Result<Vec<WorkflowSummary>, StoreError> {
130        crate::read::query(self.connection(), filter).await
131    }
132
133    async fn schedule_timer(
134        &self,
135        workflow_id: &WorkflowId,
136        timer_id: &TimerId,
137        fire_at: DateTime<Utc>,
138    ) -> Result<(), StoreError> {
139        crate::timer::schedule_timer(self.connection(), workflow_id, timer_id, fire_at).await
140    }
141
142    async fn expired_timers(&self, as_of: DateTime<Utc>) -> Result<Vec<TimerEntry>, StoreError> {
143        crate::timer::expired_timers(self.connection(), as_of).await
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use std::path::PathBuf;
150    use std::sync::Arc;
151    use std::time::{SystemTime, UNIX_EPOCH};
152
153    use aion_store::{EventStore, StoreError};
154
155    use super::LibSqlStore;
156
157    #[test]
158    fn libsql_store_is_send_sync_static() {
159        fn assert_send_sync_static<T: Send + Sync + 'static>() {}
160
161        assert_send_sync_static::<LibSqlStore>();
162    }
163
164    #[tokio::test]
165    async fn open_creates_schema() -> Result<(), StoreError> {
166        let store = LibSqlStore::open(unique_temp_path("open-schema")).await?;
167
168        assert_schema_object(store.connection(), "table", "events").await?;
169        assert_schema_object(store.connection(), "table", "timers").await?;
170        assert_schema_object(store.connection(), "table", "visibility").await?;
171
172        Ok(())
173    }
174
175    #[tokio::test]
176    async fn store_can_be_used_as_event_store_trait_object() -> Result<(), StoreError> {
177        let store = LibSqlStore::open(unique_temp_path("trait-object")).await?;
178        let store: Arc<dyn EventStore> = Arc::new(store);
179
180        assert_eq!(Arc::strong_count(&store), 1);
181        Ok(())
182    }
183
184    #[tokio::test]
185    async fn connection_accessor_reuses_same_database_handle() -> Result<(), StoreError> {
186        let store = LibSqlStore::open(unique_temp_path("shared-handle")).await?;
187
188        store
189            .connection()
190            .execute(
191                "INSERT INTO timers (workflow_id, timer_id, fire_at) VALUES (?1, ?2, ?3)",
192                ("workflow-a", "timer-a", "2026-06-03T00:00:00Z"),
193            )
194            .await
195            .map_err(|error| crate::error::libsql_error(&error))?;
196
197        let count = timer_count(store.connection()).await?;
198        if count == 1 {
199            Ok(())
200        } else {
201            Err(StoreError::Backend(format!(
202                "expected one timer through shared connection, found {count}"
203            )))
204        }
205    }
206
207    async fn assert_schema_object(
208        conn: &libsql::Connection,
209        object_type: &str,
210        name: &str,
211    ) -> Result<(), StoreError> {
212        let mut rows = conn
213            .query(
214                "SELECT name FROM sqlite_master WHERE type = ?1 AND name = ?2",
215                (object_type, name),
216            )
217            .await
218            .map_err(|error| crate::error::libsql_error(&error))?;
219        let found = rows
220            .next()
221            .await
222            .map_err(|error| crate::error::libsql_error(&error))?
223            .is_some();
224
225        if found {
226            Ok(())
227        } else {
228            Err(StoreError::Backend(format!(
229                "schema object {object_type} {name} was not created"
230            )))
231        }
232    }
233
234    async fn timer_count(conn: &libsql::Connection) -> Result<i64, StoreError> {
235        let mut rows = conn
236            .query("SELECT COUNT(*) FROM timers", ())
237            .await
238            .map_err(|error| crate::error::libsql_error(&error))?;
239        let row = rows
240            .next()
241            .await
242            .map_err(|error| crate::error::libsql_error(&error))?
243            .ok_or_else(|| {
244                StoreError::Backend(String::from("timer count query returned no row"))
245            })?;
246
247        row.get(0)
248            .map_err(|error| crate::error::libsql_error(&error))
249    }
250
251    fn unique_temp_path(name: &str) -> PathBuf {
252        let nanos = SystemTime::now()
253            .duration_since(UNIX_EPOCH)
254            .map_or(0, |duration| duration.as_nanos());
255        std::env::temp_dir().join(format!(
256            "aion-store-libsql-store-{name}-{}-{nanos}.db",
257            std::process::id()
258        ))
259    }
260}