Skip to main content

eventcore_sqlite/
lib.rs

1use std::collections::{HashMap, HashSet};
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use eventcore_types::{
6    CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
7    EventStreamReader, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
8    StreamWriteEntry, StreamWrites,
9};
10use rusqlite::OptionalExtension;
11use rusqlite::params;
12use thiserror::Error;
13use tokio::sync::Mutex;
14use tracing::{error, info, instrument, warn};
15use uuid::Uuid;
16
17// ---------------------------------------------------------------------------
18// Error types
19// ---------------------------------------------------------------------------
20
21#[derive(Debug, Error)]
22pub enum SqliteEventStoreError {
23    #[error("failed to open SQLite connection: {0}")]
24    ConnectionFailed(#[source] rusqlite::Error),
25
26    #[error("migration failed: {0}")]
27    MigrationFailed(#[source] rusqlite::Error),
28
29    #[error("internal task failed: {0}")]
30    TaskFailed(String),
31}
32
33#[derive(Debug, Error)]
34pub enum SqliteCheckpointError {
35    #[error("database operation failed: {0}")]
36    DatabaseError(#[source] rusqlite::Error),
37
38    #[error("corrupted checkpoint: invalid position UUID '{position}': {source}")]
39    CorruptedCheckpoint {
40        position: String,
41        #[source]
42        source: uuid::Error,
43    },
44
45    #[error("internal task failed: {0}")]
46    TaskFailed(String),
47}
48
49#[derive(Debug, Error)]
50pub enum SqliteCoordinationError {
51    #[error("leadership not acquired: another instance holds the lock")]
52    LeadershipNotAcquired { subscription_name: String },
53}
54
55// ---------------------------------------------------------------------------
56// Config
57// ---------------------------------------------------------------------------
58
59/// Configuration for SQLite event store connections.
60///
61/// The `encryption_key` field is redacted from `Debug` output to prevent
62/// accidental exposure in logs.
63#[derive(Clone)]
64pub struct SqliteConfig {
65    pub path: PathBuf,
66    pub encryption_key: Option<String>,
67}
68
69impl std::fmt::Debug for SqliteConfig {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct("SqliteConfig")
72            .field("path", &self.path)
73            .field(
74                "encryption_key",
75                &self.encryption_key.as_ref().map(|_| "[REDACTED]"),
76            )
77            .finish()
78    }
79}
80
81// ---------------------------------------------------------------------------
82// Shared connection helper
83// ---------------------------------------------------------------------------
84
85/// Internal helper for opening and configuring a SQLite connection.
86/// Shared by `SqliteEventStore` and `SqliteCheckpointStore` to avoid
87/// duplicating connection setup logic.
88fn open_connection(path: &PathBuf) -> Result<rusqlite::Connection, SqliteEventStoreError> {
89    let conn = rusqlite::Connection::open(path).map_err(SqliteEventStoreError::ConnectionFailed)?;
90    // WAL mode is a no-op for in-memory databases but kept for code consistency.
91    conn.pragma_update(None, "journal_mode", "WAL")
92        .map_err(SqliteEventStoreError::ConnectionFailed)?;
93    Ok(conn)
94}
95
96fn open_in_memory_connection() -> Result<rusqlite::Connection, SqliteEventStoreError> {
97    let conn =
98        rusqlite::Connection::open_in_memory().map_err(SqliteEventStoreError::ConnectionFailed)?;
99    // WAL mode is a no-op for in-memory databases but kept for code consistency.
100    conn.pragma_update(None, "journal_mode", "WAL")
101        .map_err(SqliteEventStoreError::ConnectionFailed)?;
102    Ok(conn)
103}
104
105fn apply_encryption_key(
106    conn: &rusqlite::Connection,
107    key: &str,
108) -> Result<(), SqliteEventStoreError> {
109    conn.pragma_update(None, "key", key)
110        .map_err(SqliteEventStoreError::ConnectionFailed)
111}
112
113/// Map a `JoinError` from `spawn_blocking` into an `EventStoreError`.
114fn map_join_error(e: tokio::task::JoinError, operation: Operation) -> EventStoreError {
115    error!(error = %e, ?operation, "[sqlite] spawn_blocking task failed");
116    EventStoreError::StoreFailure { operation }
117}
118
119/// Map a `JoinError` from `spawn_blocking` into a `SqliteEventStoreError`.
120fn map_join_error_migration(e: tokio::task::JoinError) -> SqliteEventStoreError {
121    SqliteEventStoreError::TaskFailed(e.to_string())
122}
123
124/// Map a `JoinError` from `spawn_blocking` into a `SqliteCheckpointError`.
125fn map_join_error_checkpoint(e: tokio::task::JoinError) -> SqliteCheckpointError {
126    SqliteCheckpointError::TaskFailed(e.to_string())
127}
128
129// ---------------------------------------------------------------------------
130// Shared checkpoint helpers
131// ---------------------------------------------------------------------------
132
133fn checkpoint_load(
134    conn: &rusqlite::Connection,
135    name: &str,
136) -> Result<Option<StreamPosition>, SqliteCheckpointError> {
137    let mut stmt = conn
138        .prepare(
139            "SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = ?1",
140        )
141        .map_err(SqliteCheckpointError::DatabaseError)?;
142
143    let result: Option<String> = stmt
144        .query_row(params![name], |row| row.get(0))
145        .optional()
146        .map_err(SqliteCheckpointError::DatabaseError)?;
147
148    match result {
149        Some(pos_str) => {
150            let uuid = Uuid::parse_str(&pos_str).map_err(|e| {
151                SqliteCheckpointError::CorruptedCheckpoint {
152                    position: pos_str,
153                    source: e,
154                }
155            })?;
156            Ok(Some(StreamPosition::new(uuid)))
157        }
158        None => Ok(None),
159    }
160}
161
162fn checkpoint_save(
163    conn: &rusqlite::Connection,
164    name: &str,
165    position_str: &str,
166) -> Result<(), SqliteCheckpointError> {
167    conn.execute(
168        "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
169         VALUES (?1, ?2, strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
170         ON CONFLICT (subscription_name) DO UPDATE SET last_position = ?2, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now')",
171        params![name, position_str],
172    )
173    .map_err(SqliteCheckpointError::DatabaseError)?;
174    Ok(())
175}
176
177// ---------------------------------------------------------------------------
178// Shared coordination helpers
179// ---------------------------------------------------------------------------
180
181fn try_acquire_lock(
182    locks: &std::sync::RwLock<HashSet<String>>,
183    subscription_name: &str,
184) -> Result<(), SqliteCoordinationError> {
185    let mut guard = locks.write().expect("coordination lock poisoned");
186    if guard.contains(subscription_name) {
187        return Err(SqliteCoordinationError::LeadershipNotAcquired {
188            subscription_name: subscription_name.to_string(),
189        });
190    }
191    guard.insert(subscription_name.to_string());
192    Ok(())
193}
194
195// ---------------------------------------------------------------------------
196// SqliteEventStore
197// ---------------------------------------------------------------------------
198
199pub struct SqliteEventStore {
200    conn: Arc<Mutex<rusqlite::Connection>>,
201    locks: Arc<std::sync::RwLock<HashSet<String>>>,
202}
203
204impl std::fmt::Debug for SqliteEventStore {
205    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
206        f.debug_struct("SqliteEventStore").finish_non_exhaustive()
207    }
208}
209
210impl SqliteEventStore {
211    pub fn new(config: SqliteConfig) -> Result<Self, SqliteEventStoreError> {
212        let conn = open_connection(&config.path)?;
213        if let Some(ref key) = config.encryption_key {
214            apply_encryption_key(&conn, key)?;
215        }
216        Ok(Self {
217            conn: Arc::new(Mutex::new(conn)),
218            locks: Arc::new(std::sync::RwLock::new(HashSet::new())),
219        })
220    }
221
222    pub fn in_memory() -> Result<Self, SqliteEventStoreError> {
223        let conn = open_in_memory_connection()?;
224        Ok(Self {
225            conn: Arc::new(Mutex::new(conn)),
226            locks: Arc::new(std::sync::RwLock::new(HashSet::new())),
227        })
228    }
229
230    pub async fn migrate(&self) -> Result<(), SqliteEventStoreError> {
231        let conn = self.conn.clone();
232        tokio::task::spawn_blocking(move || {
233            let conn = conn.blocking_lock();
234            conn.execute_batch(
235                "CREATE TABLE IF NOT EXISTS eventcore_events (
236                    event_id TEXT PRIMARY KEY,
237                    stream_id TEXT NOT NULL,
238                    stream_version INTEGER NOT NULL,
239                    event_type TEXT NOT NULL,
240                    event_data TEXT NOT NULL,
241                    metadata TEXT NOT NULL DEFAULT '{}',
242                    created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
243                );
244                CREATE UNIQUE INDEX IF NOT EXISTS idx_eventcore_events_stream_version
245                    ON eventcore_events (stream_id, stream_version);
246                CREATE INDEX IF NOT EXISTS idx_eventcore_events_stream_id
247                    ON eventcore_events (stream_id);
248                CREATE TABLE IF NOT EXISTS eventcore_subscription_versions (
249                    subscription_name TEXT PRIMARY KEY,
250                    last_position TEXT NOT NULL,
251                    updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
252                );",
253            )
254            .map_err(SqliteEventStoreError::MigrationFailed)?;
255            Ok(())
256        })
257        .await
258        .map_err(map_join_error_migration)?
259    }
260}
261
262impl EventStore for SqliteEventStore {
263    #[instrument(name = "sqlite.read_stream", skip(self))]
264    async fn read_stream<E: Event>(
265        &self,
266        stream_id: StreamId,
267    ) -> Result<EventStreamReader<E>, EventStoreError> {
268        info!(
269            stream = %stream_id,
270            "[sqlite.read_stream] reading events from sqlite"
271        );
272
273        let conn = self.conn.clone();
274        let sid = stream_id.clone();
275        let rows = tokio::task::spawn_blocking(move || {
276            let conn = conn.blocking_lock();
277            let mut stmt = conn
278                .prepare(
279                    "SELECT event_data FROM eventcore_events WHERE stream_id = ?1 ORDER BY stream_version ASC",
280                )
281                .map_err(|e| {
282                    error!(error = %e, "[sqlite.read_stream] prepare failed");
283                    EventStoreError::StoreFailure {
284                        operation: Operation::ReadStream,
285                    }
286                })?;
287            let rows: Vec<String> = stmt
288                .query_map(params![sid.as_ref()], |row| row.get(0))
289                .map_err(|e| {
290                    error!(error = %e, "[sqlite.read_stream] query failed");
291                    EventStoreError::StoreFailure {
292                        operation: Operation::ReadStream,
293                    }
294                })?
295                .collect::<Result<Vec<String>, _>>()
296                .map_err(|e| {
297                    error!(error = %e, "[sqlite.read_stream] row extraction failed");
298                    EventStoreError::StoreFailure {
299                        operation: Operation::ReadStream,
300                    }
301                })?;
302            Ok::<Vec<String>, EventStoreError>(rows)
303        })
304        .await
305        .map_err(|e| map_join_error(e, Operation::ReadStream))??;
306
307        let mut events = Vec::with_capacity(rows.len());
308        for json_str in rows {
309            let event: E = serde_json::from_str(&json_str).map_err(|e| {
310                EventStoreError::DeserializationFailed {
311                    stream_id: stream_id.clone(),
312                    detail: e.to_string(),
313                }
314            })?;
315            events.push(event);
316        }
317
318        Ok(EventStreamReader::new(events))
319    }
320
321    #[instrument(name = "sqlite.append_events", skip(self, writes))]
322    async fn append_events(
323        &self,
324        writes: StreamWrites,
325    ) -> Result<EventStreamSlice, EventStoreError> {
326        let expected_versions = writes.expected_versions().clone();
327        let entries = writes.into_entries();
328
329        if entries.is_empty() {
330            return Ok(EventStreamSlice);
331        }
332
333        info!(
334            stream_count = expected_versions.len(),
335            event_count = entries.len(),
336            "[sqlite.append_events] appending events to sqlite"
337        );
338
339        let conn = self.conn.clone();
340        tokio::task::spawn_blocking(move || {
341            let conn = conn.blocking_lock();
342            let tx = conn.unchecked_transaction().map_err(|e| {
343                error!(error = %e, "[sqlite.append_events] begin transaction failed");
344                EventStoreError::StoreFailure {
345                    operation: Operation::BeginTransaction,
346                }
347            })?;
348
349            for (stream_id, expected_version) in &expected_versions {
350                let current: usize = tx
351                    .query_row(
352                        "SELECT COALESCE(MAX(stream_version), 0) FROM eventcore_events WHERE stream_id = ?1",
353                        params![stream_id.as_ref()],
354                        |row| row.get(0),
355                    )
356                    .map_err(|e| {
357                        error!(error = %e, "[sqlite.append_events] version check failed");
358                        EventStoreError::StoreFailure {
359                            operation: Operation::AppendEvents,
360                        }
361                    })?;
362
363                if current != expected_version.into_inner() {
364                    warn!(
365                        stream = %stream_id,
366                        expected = expected_version.into_inner(),
367                        actual = current,
368                        "[sqlite.version_conflict] optimistic concurrency check failed"
369                    );
370                    return Err(EventStoreError::VersionConflict);
371                }
372            }
373
374            // Initialize per-stream version counters from expected versions;
375            // each will be incremented before assignment.
376            let mut current_versions: HashMap<&StreamId, usize> = expected_versions
377                .iter()
378                .map(|(sid, v)| (sid, v.into_inner()))
379                .collect();
380
381            for entry in &entries {
382                let StreamWriteEntry {
383                    stream_id,
384                    event_type,
385                    event_data,
386                    ..
387                } = entry;
388
389                let event_id = Uuid::now_v7().to_string();
390                let version_counter = current_versions
391                    .get_mut(stream_id)
392                    .expect("stream must be registered");
393                *version_counter += 1;
394                let version = *version_counter;
395
396                let event_json = serde_json::to_string(event_data).map_err(|e| {
397                    error!(error = %e, "[sqlite.append_events] serialization failed");
398                    EventStoreError::StoreFailure {
399                        operation: Operation::AppendEvents,
400                    }
401                })?;
402
403                tx.execute(
404                    "INSERT INTO eventcore_events (event_id, stream_id, stream_version, event_type, event_data, metadata)
405                     VALUES (?1, ?2, ?3, ?4, ?5, '{}')",
406                    params![
407                        event_id,
408                        stream_id.as_ref(),
409                        version,
410                        event_type,
411                        event_json,
412                    ],
413                )
414                .map_err(|e| {
415                    error!(error = %e, "[sqlite.append_events] insert failed");
416                    EventStoreError::StoreFailure {
417                        operation: Operation::AppendEvents,
418                    }
419                })?;
420            }
421
422            tx.commit().map_err(|e| {
423                error!(error = %e, "[sqlite.append_events] commit failed");
424                EventStoreError::StoreFailure {
425                    operation: Operation::CommitTransaction,
426                }
427            })?;
428
429            Ok(EventStreamSlice)
430        })
431        .await
432        .map_err(|e| map_join_error(e, Operation::AppendEvents))?
433    }
434}
435
436impl EventReader for SqliteEventStore {
437    type Error = EventStoreError;
438
439    #[instrument(name = "sqlite.read_events", skip(self))]
440    async fn read_events<E: Event>(
441        &self,
442        filter: EventFilter,
443        page: EventPage,
444    ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
445        let conn = self.conn.clone();
446        let after_event_id: Option<String> =
447            page.after_position().map(|p| p.into_inner().to_string());
448        let limit = page.limit().into_inner() as i64;
449        let prefix = filter.stream_prefix().map(|p| p.as_ref().to_string());
450
451        let rows = tokio::task::spawn_blocking(move || {
452            let conn = conn.blocking_lock();
453
454            let (sql, param_values): (String, Vec<Box<dyn rusqlite::types::ToSql>>) =
455                match (&prefix, &after_event_id) {
456                    // UUIDv7 event IDs sort lexicographically in chronological order,
457                    // so text comparison (`event_id > ?1`) preserves insertion order
458                    // for cursor-based pagination.
459                    (Some(pfx), Some(after_id)) => (
460                        "SELECT event_id, event_data FROM eventcore_events WHERE event_id > ?1 AND stream_id LIKE ?2 ORDER BY event_id LIMIT ?3"
461                            .to_string(),
462                        vec![
463                            Box::new(after_id.clone()) as Box<dyn rusqlite::types::ToSql>,
464                            Box::new(format!("{}%", pfx)),
465                            Box::new(limit),
466                        ],
467                    ),
468                    (Some(pfx), None) => (
469                        "SELECT event_id, event_data FROM eventcore_events WHERE stream_id LIKE ?1 ORDER BY event_id LIMIT ?2"
470                            .to_string(),
471                        vec![
472                            Box::new(format!("{}%", pfx)) as Box<dyn rusqlite::types::ToSql>,
473                            Box::new(limit),
474                        ],
475                    ),
476                    (None, Some(after_id)) => (
477                        "SELECT event_id, event_data FROM eventcore_events WHERE event_id > ?1 ORDER BY event_id LIMIT ?2"
478                            .to_string(),
479                        vec![
480                            Box::new(after_id.clone()) as Box<dyn rusqlite::types::ToSql>,
481                            Box::new(limit),
482                        ],
483                    ),
484                    (None, None) => (
485                        "SELECT event_id, event_data FROM eventcore_events ORDER BY event_id LIMIT ?1"
486                            .to_string(),
487                        vec![Box::new(limit) as Box<dyn rusqlite::types::ToSql>],
488                    ),
489                };
490
491            let params_refs: Vec<&dyn rusqlite::types::ToSql> =
492                param_values.iter().map(|p| p.as_ref()).collect();
493
494            let mut stmt = conn.prepare(&sql).map_err(|e| {
495                error!(error = %e, "[sqlite.read_events] prepare failed");
496                EventStoreError::StoreFailure {
497                    operation: Operation::ReadStream,
498                }
499            })?;
500
501            let rows: Vec<(String, String)> = stmt
502                .query_map(params_refs.as_slice(), |row| {
503                    Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
504                })
505                .map_err(|e| {
506                    error!(error = %e, "[sqlite.read_events] query failed");
507                    EventStoreError::StoreFailure {
508                        operation: Operation::ReadStream,
509                    }
510                })?
511                .collect::<Result<Vec<_>, _>>()
512                .map_err(|e| {
513                    error!(error = %e, "[sqlite.read_events] row extraction failed");
514                    EventStoreError::StoreFailure {
515                        operation: Operation::ReadStream,
516                    }
517                })?;
518
519            Ok::<Vec<(String, String)>, EventStoreError>(rows)
520        })
521        .await
522        .map_err(|e| map_join_error(e, Operation::ReadStream))??;
523
524        // Silently skip events that cannot be deserialized into the requested
525        // type E. This is intentional: EventReader serves polymorphic consumers
526        // that may only understand a subset of stored event types. This matches
527        // the behavior of the postgres and in-memory backends.
528        let events: Vec<(E, StreamPosition)> = rows
529            .into_iter()
530            .filter_map(|(event_id_str, event_data_str)| {
531                let uuid = Uuid::parse_str(&event_id_str).ok()?;
532                let event: E = serde_json::from_str(&event_data_str).ok()?;
533                Some((event, StreamPosition::new(uuid)))
534            })
535            .collect();
536
537        Ok(events)
538    }
539}
540
541impl CheckpointStore for SqliteEventStore {
542    type Error = SqliteCheckpointError;
543
544    #[instrument(name = "sqlite.checkpoint.load", skip(self))]
545    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
546        let conn = self.conn.clone();
547        let name = name.to_string();
548        tokio::task::spawn_blocking(move || {
549            let conn = conn.blocking_lock();
550            checkpoint_load(&conn, &name)
551        })
552        .await
553        .map_err(map_join_error_checkpoint)?
554    }
555
556    #[instrument(name = "sqlite.checkpoint.save", skip(self))]
557    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
558        let conn = self.conn.clone();
559        let name = name.to_string();
560        let position_str = position.into_inner().to_string();
561        tokio::task::spawn_blocking(move || {
562            let conn = conn.blocking_lock();
563            checkpoint_save(&conn, &name, &position_str)
564        })
565        .await
566        .map_err(map_join_error_checkpoint)?
567    }
568}
569
570impl ProjectorCoordinator for SqliteEventStore {
571    type Error = SqliteCoordinationError;
572    type Guard = SqliteCoordinationGuard;
573
574    #[instrument(name = "sqlite.coordinator.try_acquire", skip(self))]
575    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
576        try_acquire_lock(&self.locks, subscription_name)?;
577        Ok(SqliteCoordinationGuard {
578            subscription_name: subscription_name.to_string(),
579            locks: Arc::clone(&self.locks),
580        })
581    }
582}
583
584// ---------------------------------------------------------------------------
585// SqliteCoordinationGuard
586// ---------------------------------------------------------------------------
587
588/// Guard that releases leadership when dropped.
589///
590/// Uses `std::sync::RwLock` (not `tokio::sync::RwLock`) so that cleanup
591/// works reliably in `Drop` without requiring an async runtime.
592#[derive(Debug)]
593pub struct SqliteCoordinationGuard {
594    subscription_name: String,
595    locks: Arc<std::sync::RwLock<HashSet<String>>>,
596}
597
598impl Drop for SqliteCoordinationGuard {
599    fn drop(&mut self) {
600        if let Ok(mut guard) = self.locks.write() {
601            guard.remove(&self.subscription_name);
602        } else {
603            error!(
604                subscription = %self.subscription_name,
605                "[sqlite.coordination_guard] lock poisoned, cannot release leadership"
606            );
607        }
608    }
609}
610
611// ---------------------------------------------------------------------------
612// SqliteCheckpointStore (standalone)
613// ---------------------------------------------------------------------------
614
615pub struct SqliteCheckpointStore {
616    conn: Arc<Mutex<rusqlite::Connection>>,
617}
618
619impl std::fmt::Debug for SqliteCheckpointStore {
620    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
621        f.debug_struct("SqliteCheckpointStore")
622            .finish_non_exhaustive()
623    }
624}
625
626impl SqliteCheckpointStore {
627    pub fn new(config: SqliteConfig) -> Result<Self, SqliteEventStoreError> {
628        let conn = open_connection(&config.path)?;
629        if let Some(ref key) = config.encryption_key {
630            apply_encryption_key(&conn, key)?;
631        }
632        Ok(Self {
633            conn: Arc::new(Mutex::new(conn)),
634        })
635    }
636
637    pub fn in_memory() -> Result<Self, SqliteEventStoreError> {
638        let conn = open_in_memory_connection()?;
639        Ok(Self {
640            conn: Arc::new(Mutex::new(conn)),
641        })
642    }
643
644    pub async fn migrate(&self) -> Result<(), SqliteEventStoreError> {
645        let conn = self.conn.clone();
646        tokio::task::spawn_blocking(move || {
647            let conn = conn.blocking_lock();
648            conn.execute_batch(
649                "CREATE TABLE IF NOT EXISTS eventcore_subscription_versions (
650                    subscription_name TEXT PRIMARY KEY,
651                    last_position TEXT NOT NULL,
652                    updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
653                );",
654            )
655            .map_err(SqliteEventStoreError::MigrationFailed)?;
656            Ok(())
657        })
658        .await
659        .map_err(map_join_error_migration)?
660    }
661}
662
663impl CheckpointStore for SqliteCheckpointStore {
664    type Error = SqliteCheckpointError;
665
666    #[instrument(name = "sqlite.checkpoint.load", skip(self))]
667    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
668        let conn = self.conn.clone();
669        let name = name.to_string();
670        tokio::task::spawn_blocking(move || {
671            let conn = conn.blocking_lock();
672            checkpoint_load(&conn, &name)
673        })
674        .await
675        .map_err(map_join_error_checkpoint)?
676    }
677
678    #[instrument(name = "sqlite.checkpoint.save", skip(self))]
679    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
680        let conn = self.conn.clone();
681        let name = name.to_string();
682        let position_str = position.into_inner().to_string();
683        tokio::task::spawn_blocking(move || {
684            let conn = conn.blocking_lock();
685            checkpoint_save(&conn, &name, &position_str)
686        })
687        .await
688        .map_err(map_join_error_checkpoint)?
689    }
690}
691
692// ---------------------------------------------------------------------------
693// SqliteProjectorCoordinator (standalone)
694// ---------------------------------------------------------------------------
695
696#[derive(Debug, Clone, Default)]
697pub struct SqliteProjectorCoordinator {
698    locks: Arc<std::sync::RwLock<HashSet<String>>>,
699}
700
701impl SqliteProjectorCoordinator {
702    pub fn new() -> Self {
703        Self::default()
704    }
705}
706
707impl ProjectorCoordinator for SqliteProjectorCoordinator {
708    type Error = SqliteCoordinationError;
709    type Guard = SqliteCoordinationGuard;
710
711    #[instrument(name = "sqlite.coordinator.try_acquire", skip(self))]
712    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
713        try_acquire_lock(&self.locks, subscription_name)?;
714        Ok(SqliteCoordinationGuard {
715            subscription_name: subscription_name.to_string(),
716            locks: Arc::clone(&self.locks),
717        })
718    }
719}