axess_core/session/storage/
sqlite.rs1use crate::session::storage::session_codec::{SessionCodec, SqlStoreError, expires_at};
14use crate::session::storage::sql_helpers::log_cleanup_outcome;
15use crate::session::{data::SessionData, id::SessionId, store::SessionStore};
16use axess_clock::{Clock, SystemClock};
17use sqlx::SqlitePool;
18use std::sync::Arc;
19use std::time::Duration;
20
21pub type SqliteStoreError = SqlStoreError;
23
24#[derive(Clone)]
54pub struct SqliteSessionStore {
55 pool: SqlitePool,
56 codec: SessionCodec,
57 clock: Arc<dyn Clock>,
58}
59
60impl SqliteSessionStore {
61 pub fn new(pool: SqlitePool, crypto: crate::session::crypto::SessionCrypto) -> Self {
63 Self {
64 pool,
65 codec: SessionCodec::encrypted(crypto),
66 clock: Arc::new(SystemClock),
67 }
68 }
69
70 pub fn plaintext(pool: SqlitePool) -> Self {
72 tracing::warn!(
73 "SqliteSessionStore created without encryption; \
74 do not use in production"
75 );
76 Self {
77 pool,
78 codec: SessionCodec::plaintext(),
79 clock: Arc::new(SystemClock),
80 }
81 }
82
83 pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
85 self.clock = clock;
86 self
87 }
88
89 pub async fn init_schema(&self) -> Result<(), sqlx::Error> {
91 sqlx::query(
92 r#"
93 CREATE TABLE IF NOT EXISTS sessions (
94 id TEXT PRIMARY KEY,
95 data TEXT NOT NULL,
96 expires_at INTEGER NOT NULL
97 )
98 "#,
99 )
100 .execute(&self.pool)
101 .await?;
102
103 sqlx::query("CREATE INDEX IF NOT EXISTS idx_sessions_expires_at ON sessions (expires_at)")
104 .execute(&self.pool)
105 .await?;
106
107 Ok(())
108 }
109
110 pub async fn cleanup_expired(&self) -> Result<u64, sqlx::Error> {
112 let now = self.clock.now().timestamp();
113 let result = sqlx::query("DELETE FROM sessions WHERE expires_at < ?1")
114 .bind(now)
115 .execute(&self.pool)
116 .await?;
117 Ok(result.rows_affected())
118 }
119
120 pub fn spawn_cleanup_task(&self, interval: Duration) -> tokio::task::JoinHandle<()> {
139 let store = self.clone();
140 tokio::spawn(async move {
141 let mut ticker = tokio::time::interval(interval);
142 ticker.tick().await;
146 loop {
147 ticker.tick().await;
148 log_cleanup_outcome("sqlite", store.cleanup_expired().await);
149 }
150 })
151 }
152}
153
154impl SessionStore for SqliteSessionStore {
155 type Error = SqlStoreError;
156
157 async fn load(&self, id: &SessionId) -> Result<Option<SessionData>, Self::Error> {
158 let id_str = id.to_string();
159 let now = self.clock.now().timestamp();
160
161 let row: Option<(String,)> =
162 sqlx::query_as("SELECT data FROM sessions WHERE id = ?1 AND expires_at > ?2")
163 .bind(&id_str)
164 .bind(now)
165 .fetch_optional(&self.pool)
166 .await?;
167
168 match row {
169 Some((stored,)) => Ok(Some(self.codec.decode(&stored)?)),
170 None => Ok(None),
171 }
172 }
173
174 async fn save(
175 &self,
176 id: &SessionId,
177 data: &SessionData,
178 ttl: Duration,
179 ) -> Result<(), Self::Error> {
180 let id_str = id.to_string();
181 let encoded = self.codec.encode(data)?;
182 let exp = expires_at(&*self.clock, ttl);
183
184 sqlx::query(
185 r#"
186 INSERT INTO sessions (id, data, expires_at)
187 VALUES (?1, ?2, ?3)
188 ON CONFLICT(id) DO UPDATE SET data = excluded.data, expires_at = excluded.expires_at
189 "#,
190 )
191 .bind(&id_str)
192 .bind(&encoded)
193 .bind(exp)
194 .execute(&self.pool)
195 .await?;
196
197 Ok(())
198 }
199
200 async fn delete(&self, id: &SessionId) -> Result<(), Self::Error> {
201 let id_str = id.to_string();
202 sqlx::query("DELETE FROM sessions WHERE id = ?1")
203 .bind(&id_str)
204 .execute(&self.pool)
205 .await?;
206 Ok(())
207 }
208
209 async fn cycle(
210 &self,
211 old_id: &SessionId,
212 new_id: &SessionId,
213 data: &SessionData,
214 ttl: Duration,
215 ) -> Result<(), Self::Error> {
216 let encoded = self.codec.encode(data)?;
217 let exp = expires_at(&*self.clock, ttl);
218 let old_str = old_id.to_string();
219 let new_str = new_id.to_string();
220
221 let mut tx = self.pool.begin().await?;
222
223 sqlx::query("DELETE FROM sessions WHERE id = ?1")
224 .bind(&old_str)
225 .execute(&mut *tx)
226 .await?;
227
228 sqlx::query("INSERT INTO sessions (id, data, expires_at) VALUES (?1, ?2, ?3)")
229 .bind(&new_str)
230 .bind(&encoded)
231 .bind(exp)
232 .execute(&mut *tx)
233 .await?;
234
235 tx.commit().await?;
236 Ok(())
237 }
238
239 async fn prune_expired(&self) -> Result<u64, Self::Error> {
240 Ok(self.cleanup_expired().await?)
246 }
247}
248
249use crate::health::{HealthCheck, HealthStatus};
252use crate::session::storage::sql_helpers::sql_health_probe;
253
254impl HealthCheck for SqliteSessionStore {
255 fn check(
256 &self,
257 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = HealthStatus> + Send + '_>> {
258 Box::pin(sql_health_probe(
259 "sqlite",
260 sqlx::query_scalar::<_, i32>("SELECT 1").fetch_one(&self.pool),
261 ))
262 }
263}
264
265impl crate::store::Store<SessionId, SessionData> for SqliteSessionStore {
276 type Error = SqlStoreError;
277
278 fn get(
279 &self,
280 key: &SessionId,
281 ) -> impl std::future::Future<Output = Result<Option<SessionData>, Self::Error>> + Send {
282 <Self as SessionStore>::load(self, key)
283 }
284
285 fn put(
286 &self,
287 key: &SessionId,
288 value: &SessionData,
289 ttl: Duration,
290 ) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send {
291 <Self as SessionStore>::save(self, key, value, ttl)
292 }
293
294 fn delete(
295 &self,
296 key: &SessionId,
297 ) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send {
298 <Self as SessionStore>::delete(self, key)
299 }
300
301 fn prune_expired(&self) -> impl std::future::Future<Output = Result<u64, Self::Error>> + Send {
302 <Self as SessionStore>::prune_expired(self)
303 }
304}
305
306#[cfg(test)]
307mod sqlite_tests {
308 use super::*;
318 use crate::session::data::SessionData;
319 use crate::session::id::SessionId;
320 use axess_rng::SystemRng;
321 use sqlx::sqlite::SqlitePoolOptions;
322
323 async fn memory_pool() -> SqlitePool {
324 SqlitePoolOptions::new()
325 .max_connections(1)
326 .connect("sqlite::memory:")
327 .await
328 .expect("in-memory sqlite must connect")
329 }
330
331 async fn store() -> SqliteSessionStore {
332 let pool = memory_pool().await;
333 let store = SqliteSessionStore::plaintext(pool);
334 store.init_schema().await.expect("init_schema");
335 store
336 }
337
338 fn sample_id() -> SessionId {
339 SessionId::new(&SystemRng)
340 }
341
342 fn payload_with_custom() -> SessionData {
343 SessionData {
344 custom: serde_json::json!({"k": "v"}),
345 ..SessionData::default()
346 }
347 }
348
349 #[tokio::test]
350 async fn init_schema_creates_sessions_table() {
351 let store = store().await;
356 let result = store
357 .save(
358 &sample_id(),
359 &SessionData::default(),
360 Duration::from_secs(60),
361 )
362 .await;
363 assert!(
364 result.is_ok(),
365 "save after init_schema must succeed: {result:?}"
366 );
367 }
368
369 #[tokio::test]
370 async fn save_then_load_returns_persisted_payload() {
371 let store = store().await;
376 let id = sample_id();
377 let data = payload_with_custom();
378
379 store
380 .save(&id, &data, Duration::from_secs(60))
381 .await
382 .expect("save");
383
384 let loaded = store.load(&id).await.expect("load").expect("Some");
385 assert_eq!(
386 serde_json::to_string(&data).unwrap(),
387 serde_json::to_string(&loaded).unwrap(),
388 "load must return what save persisted; kills Ok(None) AND Ok(Some(Default))"
389 );
390 }
391
392 #[tokio::test]
393 async fn load_of_absent_key_returns_none() {
394 let store = store().await;
398 let loaded = store.load(&sample_id()).await.expect("load");
399 assert!(
400 loaded.is_none(),
401 "absent key must yield None, not Some(Default)"
402 );
403 }
404
405 #[tokio::test]
406 async fn delete_actually_removes_row() {
407 let store = store().await;
411 let id = sample_id();
412 let data = payload_with_custom();
413 store
414 .save(&id, &data, Duration::from_secs(60))
415 .await
416 .expect("save");
417 assert!(store.load(&id).await.expect("load before delete").is_some());
418
419 store.delete(&id).await.expect("delete");
420 let after = store.load(&id).await.expect("load after delete");
421 assert!(
422 after.is_none(),
423 "delete must remove the row; mutated body would leave it"
424 );
425 }
426
427 #[tokio::test]
428 async fn cycle_atomically_swaps_session_ids() {
429 let store = store().await;
434 let old = sample_id();
435 let new = sample_id();
436 let data = payload_with_custom();
437 store
438 .save(&old, &data, Duration::from_secs(60))
439 .await
440 .expect("save");
441
442 store
443 .cycle(&old, &new, &data, Duration::from_secs(60))
444 .await
445 .expect("cycle");
446
447 assert!(
448 store.load(&old).await.expect("load old").is_none(),
449 "old id must be gone after cycle"
450 );
451 assert!(
452 store.load(&new).await.expect("load new").is_some(),
453 "new id must exist after cycle; kills cycle->Ok(()) mutant"
454 );
455 }
456
457 #[tokio::test]
458 async fn cleanup_expired_returns_real_row_count() {
459 let store = store().await;
463 let id1 = sample_id();
464 let id2 = sample_id();
465
466 for id in [&id1, &id2] {
470 sqlx::query("INSERT INTO sessions (id, data, expires_at) VALUES (?1, '{}', 0)")
471 .bind(id.to_string())
472 .execute(&store.pool)
473 .await
474 .expect("manual insert");
475 }
476
477 let removed = store.cleanup_expired().await.expect("cleanup_expired");
478 assert_eq!(
479 removed, 2,
480 "cleanup_expired must report the real row-count; kills Ok(0) and Ok(1)"
481 );
482 }
483
484 #[tokio::test]
485 async fn prune_expired_trait_surface_matches_inherent() {
486 let store = store().await;
490 for _ in 0..3 {
491 sqlx::query("INSERT INTO sessions (id, data, expires_at) VALUES (?1, '{}', 0)")
492 .bind(SessionId::new(&SystemRng).to_string())
493 .execute(&store.pool)
494 .await
495 .expect("manual insert");
496 }
497 let removed = store.prune_expired().await.expect("prune_expired");
498 assert_eq!(removed, 3);
499 }
500
501 }