Skip to main content

axess_core/session/storage/
sqlite.rs

1//! SQLite-backed session store using sqlx.
2//!
3//! # Schema
4//!
5//! ```sql
6//! CREATE TABLE IF NOT EXISTS sessions (
7//!   id TEXT PRIMARY KEY,           -- UUID as hyphenated string
8//!   data TEXT NOT NULL,            -- JSON-encoded SessionData
9//!   expires_at INTEGER NOT NULL    -- Unix timestamp seconds
10//! );
11//! ```
12
13use crate::session::storage::session_codec::{SessionCodec, SqlStoreError, expires_at};
14use crate::session::storage::sql_helpers::log_cleanup_outcome;
15use crate::session::{data::SessionData, id::SessionId, store::SessionStore};
16use axess_clock::{Clock, SystemClock};
17use sqlx::SqlitePool;
18use std::sync::Arc;
19use std::time::Duration;
20
21/// Backward-compatible type alias. Use [`SqlStoreError`] directly in new code.
22pub type SqliteStoreError = SqlStoreError;
23
24/// SQLite-backed session store with AES-256-GCM encryption at rest.
25///
26/// Wrap an existing [`SqlitePool`] and call [`init_schema`](Self::init_schema) once at startup.
27/// **Production deployments must also schedule cleanup** of expired session
28/// rows: either by calling [`spawn_cleanup_task`](Self::spawn_cleanup_task)
29/// at startup or by running an external job that invokes
30/// [`cleanup_expired`](Self::cleanup_expired). Without one of these, the
31/// `sessions` table grows unbounded.
32///
33/// # Encryption
34///
35/// The primary constructor [`new`](SqliteSessionStore::new) requires a
36/// [`SessionCrypto`](crate::session::crypto::SessionCrypto) key; session data
37/// is encrypted before storage and decrypted on load.
38///
39/// For local development or testing where encryption is not needed, use
40/// [`plaintext`](SqliteSessionStore::plaintext) instead. This is an
41/// explicit opt-out so that production code never accidentally stores
42/// sessions unencrypted.
43///
44/// ```rust,ignore
45/// use axess::session::SessionCrypto;
46///
47/// // Production: encrypted (required).
48/// let store = SqliteSessionStore::new(pool, SessionCrypto::new(key));
49///
50/// // Development only: plaintext (explicit opt-out).
51/// let store = SqliteSessionStore::plaintext(pool);
52/// ```
53#[derive(Clone)]
54pub struct SqliteSessionStore {
55    pool: SqlitePool,
56    codec: SessionCodec,
57    clock: Arc<dyn Clock>,
58}
59
60impl SqliteSessionStore {
61    /// Create an **encrypted** store (recommended for production).
62    pub fn new(pool: SqlitePool, crypto: crate::session::crypto::SessionCrypto) -> Self {
63        Self {
64            pool,
65            codec: SessionCodec::encrypted(crypto),
66            clock: Arc::new(SystemClock),
67        }
68    }
69
70    /// Create a **plaintext** store (development/testing only).
71    pub fn plaintext(pool: SqlitePool) -> Self {
72        tracing::warn!(
73            "SqliteSessionStore created without encryption; \
74             do not use in production"
75        );
76        Self {
77            pool,
78            codec: SessionCodec::plaintext(),
79            clock: Arc::new(SystemClock),
80        }
81    }
82
83    /// Inject a [`Clock`] for deterministic-simulation testing.
84    pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
85        self.clock = clock;
86        self
87    }
88
89    /// Create the `sessions` table if it doesn't already exist.
90    pub async fn init_schema(&self) -> Result<(), sqlx::Error> {
91        sqlx::query(
92            r#"
93            CREATE TABLE IF NOT EXISTS sessions (
94                id TEXT PRIMARY KEY,
95                data TEXT NOT NULL,
96                expires_at INTEGER NOT NULL
97            )
98            "#,
99        )
100        .execute(&self.pool)
101        .await?;
102
103        sqlx::query("CREATE INDEX IF NOT EXISTS idx_sessions_expires_at ON sessions (expires_at)")
104            .execute(&self.pool)
105            .await?;
106
107        Ok(())
108    }
109
110    /// Delete all sessions whose `expires_at` is in the past.
111    pub async fn cleanup_expired(&self) -> Result<u64, sqlx::Error> {
112        let now = self.clock.now().timestamp();
113        let result = sqlx::query("DELETE FROM sessions WHERE expires_at < ?1")
114            .bind(now)
115            .execute(&self.pool)
116            .await?;
117        Ok(result.rows_affected())
118    }
119
120    /// Spawn a background task that calls [`cleanup_expired`](Self::cleanup_expired) on a fixed interval.
121    ///
122    /// SQL stores accumulate expired session rows forever unless something
123    /// removes them. Production deployments **must** either call this helper
124    /// once at startup, run an external scheduled job, or accept unbounded
125    /// table growth. The returned [`tokio::task::JoinHandle`] aborts the
126    /// loop when dropped, so store it for the lifetime of the application
127    /// (typically alongside your shutdown signal).
128    ///
129    /// Errors from `cleanup_expired` are logged at `warn` and swallowed;
130    /// the loop keeps running so a single transient DB blip does not
131    /// silently halt cleanup forever.
132    ///
133    /// ```rust,ignore
134    /// let store = SqliteSessionStore::new(pool, crypto);
135    /// store.init_schema().await?;
136    /// let _cleanup = store.spawn_cleanup_task(std::time::Duration::from_secs(3600));
137    /// ```
138    pub fn spawn_cleanup_task(&self, interval: Duration) -> tokio::task::JoinHandle<()> {
139        let store = self.clone();
140        tokio::spawn(async move {
141            let mut ticker = tokio::time::interval(interval);
142            // Skip the immediate first tick; `tokio::time::interval` fires
143            // once at t=0 by default, which would race with `init_schema`
144            // and any seeded test data.
145            ticker.tick().await;
146            loop {
147                ticker.tick().await;
148                log_cleanup_outcome("sqlite", store.cleanup_expired().await);
149            }
150        })
151    }
152}
153
154impl SessionStore for SqliteSessionStore {
155    type Error = SqlStoreError;
156
157    async fn load(&self, id: &SessionId) -> Result<Option<SessionData>, Self::Error> {
158        let id_str = id.to_string();
159        let now = self.clock.now().timestamp();
160
161        let row: Option<(String,)> =
162            sqlx::query_as("SELECT data FROM sessions WHERE id = ?1 AND expires_at > ?2")
163                .bind(&id_str)
164                .bind(now)
165                .fetch_optional(&self.pool)
166                .await?;
167
168        match row {
169            Some((stored,)) => Ok(Some(self.codec.decode(&stored)?)),
170            None => Ok(None),
171        }
172    }
173
174    async fn save(
175        &self,
176        id: &SessionId,
177        data: &SessionData,
178        ttl: Duration,
179    ) -> Result<(), Self::Error> {
180        let id_str = id.to_string();
181        let encoded = self.codec.encode(data)?;
182        let exp = expires_at(&*self.clock, ttl);
183
184        sqlx::query(
185            r#"
186            INSERT INTO sessions (id, data, expires_at)
187            VALUES (?1, ?2, ?3)
188            ON CONFLICT(id) DO UPDATE SET data = excluded.data, expires_at = excluded.expires_at
189            "#,
190        )
191        .bind(&id_str)
192        .bind(&encoded)
193        .bind(exp)
194        .execute(&self.pool)
195        .await?;
196
197        Ok(())
198    }
199
200    async fn delete(&self, id: &SessionId) -> Result<(), Self::Error> {
201        let id_str = id.to_string();
202        sqlx::query("DELETE FROM sessions WHERE id = ?1")
203            .bind(&id_str)
204            .execute(&self.pool)
205            .await?;
206        Ok(())
207    }
208
209    async fn cycle(
210        &self,
211        old_id: &SessionId,
212        new_id: &SessionId,
213        data: &SessionData,
214        ttl: Duration,
215    ) -> Result<(), Self::Error> {
216        let encoded = self.codec.encode(data)?;
217        let exp = expires_at(&*self.clock, ttl);
218        let old_str = old_id.to_string();
219        let new_str = new_id.to_string();
220
221        let mut tx = self.pool.begin().await?;
222
223        sqlx::query("DELETE FROM sessions WHERE id = ?1")
224            .bind(&old_str)
225            .execute(&mut *tx)
226            .await?;
227
228        sqlx::query("INSERT INTO sessions (id, data, expires_at) VALUES (?1, ?2, ?3)")
229            .bind(&new_str)
230            .bind(&encoded)
231            .bind(exp)
232            .execute(&mut *tx)
233            .await?;
234
235        tx.commit().await?;
236        Ok(())
237    }
238
239    async fn prune_expired(&self) -> Result<u64, Self::Error> {
240        // Trait surface over the existing inherent
241        // `cleanup_expired`. Backends that already provide this as a
242        // typed inherent method delegate so callers can drive the sweep
243        // generically (e.g. on application startup, or from a generic
244        // operations endpoint).
245        Ok(self.cleanup_expired().await?)
246    }
247}
248
249// ── HealthCheck ──────────────────────────────────────────────────────────────
250
251use crate::health::{HealthCheck, HealthStatus};
252use crate::session::storage::sql_helpers::sql_health_probe;
253
254impl HealthCheck for SqliteSessionStore {
255    fn check(
256        &self,
257    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = HealthStatus> + Send + '_>> {
258        Box::pin(sql_health_probe(
259            "sqlite",
260            sqlx::query_scalar::<_, i32>("SELECT 1").fetch_one(&self.pool),
261        ))
262    }
263}
264
265// ── Store<SessionId, SessionData> ────────────────────────────────────────────
266//
267// surface; adopters that hold an `Arc<dyn Store<SessionId, SessionData>>`
268// or a generic `S: Store<SessionId, SessionData>` can program against any
269// backend uniformly. The `SessionStore` trait stays the primary surface
270// (it carries the `cycle` + `find_sessions_for_user` session-domain
271// primitives `Store` doesn't); this impl is the bridge to the generic
272// world. The session-domain `prune_expired` and the `Store::prune_expired`
273// share the same body since both delegate to `cleanup_expired`.
274
275impl crate::store::Store<SessionId, SessionData> for SqliteSessionStore {
276    type Error = SqlStoreError;
277
278    fn get(
279        &self,
280        key: &SessionId,
281    ) -> impl std::future::Future<Output = Result<Option<SessionData>, Self::Error>> + Send {
282        <Self as SessionStore>::load(self, key)
283    }
284
285    fn put(
286        &self,
287        key: &SessionId,
288        value: &SessionData,
289        ttl: Duration,
290    ) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send {
291        <Self as SessionStore>::save(self, key, value, ttl)
292    }
293
294    fn delete(
295        &self,
296        key: &SessionId,
297    ) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send {
298        <Self as SessionStore>::delete(self, key)
299    }
300
301    fn prune_expired(&self) -> impl std::future::Future<Output = Result<u64, Self::Error>> + Send {
302        <Self as SessionStore>::prune_expired(self)
303    }
304}
305
306#[cfg(test)]
307mod sqlite_tests {
308    //! Pin every inherent and `SessionStore` body for
309    //! `SqliteSessionStore`. Uses a real in-memory SQLite pool so we
310    //! observe round-trip behaviour, then assert that a save → load
311    //! cycle returns the same payload (and an absent key returns None).
312    //! Body-replacement mutations on `load`, `save`, `delete`, `cycle`,
313    //! `prune_expired`, `cleanup_expired`, and `init_schema` are caught
314    //! because the mutated body either returns a wrong value or fails
315    //! to apply the real side effect (so a follow-up observation
316    //! disagrees with the original).
317    use super::*;
318    use crate::session::data::SessionData;
319    use crate::session::id::SessionId;
320    use axess_rng::SystemRng;
321    use sqlx::sqlite::SqlitePoolOptions;
322
323    async fn memory_pool() -> SqlitePool {
324        SqlitePoolOptions::new()
325            .max_connections(1)
326            .connect("sqlite::memory:")
327            .await
328            .expect("in-memory sqlite must connect")
329    }
330
331    async fn store() -> SqliteSessionStore {
332        let pool = memory_pool().await;
333        let store = SqliteSessionStore::plaintext(pool);
334        store.init_schema().await.expect("init_schema");
335        store
336    }
337
338    fn sample_id() -> SessionId {
339        SessionId::new(&SystemRng)
340    }
341
342    fn payload_with_custom() -> SessionData {
343        SessionData {
344            custom: serde_json::json!({"k": "v"}),
345            ..SessionData::default()
346        }
347    }
348
349    #[tokio::test]
350    async fn init_schema_creates_sessions_table() {
351        // Pins line 90 body against `Ok(())`. After init_schema, an
352        // INSERT into `sessions` succeeds; if the body was replaced
353        // by `Ok(())`, the table would not exist and the save below
354        // would fail.
355        let store = store().await;
356        let result = store
357            .save(
358                &sample_id(),
359                &SessionData::default(),
360                Duration::from_secs(60),
361            )
362            .await;
363        assert!(
364            result.is_ok(),
365            "save after init_schema must succeed: {result:?}"
366        );
367    }
368
369    #[tokio::test]
370    async fn save_then_load_returns_persisted_payload() {
371        // Pins lines 165 (load body), 187 (save body) against
372        // `Ok(None)`, `Ok(Some(Default::default()))`, and `Ok(())`.
373        // The mutations would either drop the persisted row or
374        // return a default; the round-trip catches both.
375        let store = store().await;
376        let id = sample_id();
377        let data = payload_with_custom();
378
379        store
380            .save(&id, &data, Duration::from_secs(60))
381            .await
382            .expect("save");
383
384        let loaded = store.load(&id).await.expect("load").expect("Some");
385        assert_eq!(
386            serde_json::to_string(&data).unwrap(),
387            serde_json::to_string(&loaded).unwrap(),
388            "load must return what save persisted; kills Ok(None) AND Ok(Some(Default))"
389        );
390    }
391
392    #[tokio::test]
393    async fn load_of_absent_key_returns_none() {
394        // Kill `Ok(Some(Default::default()))` mutation on load;
395        // a row that was never saved must come back as None, not
396        // a default-constructed SessionData.
397        let store = store().await;
398        let loaded = store.load(&sample_id()).await.expect("load");
399        assert!(
400            loaded.is_none(),
401            "absent key must yield None, not Some(Default)"
402        );
403    }
404
405    #[tokio::test]
406    async fn delete_actually_removes_row() {
407        // Pins line 208 body against `Ok(())`. With the mutation,
408        // delete still returns Ok(()), but the row remains;
409        // load() afterwards would still find it.
410        let store = store().await;
411        let id = sample_id();
412        let data = payload_with_custom();
413        store
414            .save(&id, &data, Duration::from_secs(60))
415            .await
416            .expect("save");
417        assert!(store.load(&id).await.expect("load before delete").is_some());
418
419        store.delete(&id).await.expect("delete");
420        let after = store.load(&id).await.expect("load after delete");
421        assert!(
422            after.is_none(),
423            "delete must remove the row; mutated body would leave it"
424        );
425    }
426
427    #[tokio::test]
428    async fn cycle_atomically_swaps_session_ids() {
429        // Pins line 223 body against `Ok(())`. Real cycle replaces
430        // old_id row with new_id row; mutated cycle returns Ok(())
431        // without doing the swap; so old row stays and new row
432        // never appears.
433        let store = store().await;
434        let old = sample_id();
435        let new = sample_id();
436        let data = payload_with_custom();
437        store
438            .save(&old, &data, Duration::from_secs(60))
439            .await
440            .expect("save");
441
442        store
443            .cycle(&old, &new, &data, Duration::from_secs(60))
444            .await
445            .expect("cycle");
446
447        assert!(
448            store.load(&old).await.expect("load old").is_none(),
449            "old id must be gone after cycle"
450        );
451        assert!(
452            store.load(&new).await.expect("load new").is_some(),
453            "new id must exist after cycle; kills cycle->Ok(()) mutant"
454        );
455    }
456
457    #[tokio::test]
458    async fn cleanup_expired_returns_real_row_count() {
459        // Pins line 111 against `Ok(0)` and `Ok(1)` mutations.
460        // Save two sessions with negative TTL (already expired);
461        // cleanup_expired must return 2, not 0 or 1.
462        let store = store().await;
463        let id1 = sample_id();
464        let id2 = sample_id();
465
466        // Insert directly with an expires_at in the past; the
467        // `save` helper would use the configured clock, so reach
468        // into the SQL layer with a hand-rolled INSERT instead.
469        for id in [&id1, &id2] {
470            sqlx::query("INSERT INTO sessions (id, data, expires_at) VALUES (?1, '{}', 0)")
471                .bind(id.to_string())
472                .execute(&store.pool)
473                .await
474                .expect("manual insert");
475        }
476
477        let removed = store.cleanup_expired().await.expect("cleanup_expired");
478        assert_eq!(
479            removed, 2,
480            "cleanup_expired must report the real row-count; kills Ok(0) and Ok(1)"
481        );
482    }
483
484    #[tokio::test]
485    async fn prune_expired_trait_surface_matches_inherent() {
486        // Pins line 252 body against `Ok(0)` and `Ok(1)`. After
487        // inserting three expired rows, prune_expired must
488        // delegate to cleanup_expired and return 3.
489        let store = store().await;
490        for _ in 0..3 {
491            sqlx::query("INSERT INTO sessions (id, data, expires_at) VALUES (?1, '{}', 0)")
492                .bind(SessionId::new(&SystemRng).to_string())
493                .execute(&store.pool)
494                .await
495                .expect("manual insert");
496        }
497        let removed = store.prune_expired().await.expect("prune_expired");
498        assert_eq!(removed, 3);
499    }
500
501    // log_cleanup_outcome is now shared in `sql_helpers` and pinned by tests
502    // there; the per-backend duplicates were dropped to keep one source of
503    // truth. The cleanup-pipeline shape (label propagation, debug-only-on-
504    // positive, warn-on-err) is unchanged.
505}