1use std::collections::{HashMap, HashSet};
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use eventcore_types::{
6 CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
7 EventStreamReader, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
8 StreamWriteEntry, StreamWrites,
9};
10use rusqlite::OptionalExtension;
11use rusqlite::params;
12use thiserror::Error;
13use tokio::sync::Mutex;
14use tracing::{error, info, instrument, warn};
15use uuid::Uuid;
16
17#[derive(Debug, Error)]
22pub enum SqliteEventStoreError {
23 #[error("failed to open SQLite connection: {0}")]
24 ConnectionFailed(#[source] rusqlite::Error),
25
26 #[error("migration failed: {0}")]
27 MigrationFailed(#[source] rusqlite::Error),
28
29 #[error("internal task failed: {0}")]
30 TaskFailed(String),
31}
32
33#[derive(Debug, Error)]
34pub enum SqliteCheckpointError {
35 #[error("database operation failed: {0}")]
36 DatabaseError(#[source] rusqlite::Error),
37
38 #[error("corrupted checkpoint: invalid position UUID '{position}': {source}")]
39 CorruptedCheckpoint {
40 position: String,
41 #[source]
42 source: uuid::Error,
43 },
44
45 #[error("internal task failed: {0}")]
46 TaskFailed(String),
47}
48
49#[derive(Debug, Error)]
50pub enum SqliteCoordinationError {
51 #[error("leadership not acquired: another instance holds the lock")]
52 LeadershipNotAcquired { subscription_name: String },
53}
54
55#[derive(Clone)]
64pub struct SqliteConfig {
65 pub path: PathBuf,
66 pub encryption_key: Option<String>,
67}
68
69impl std::fmt::Debug for SqliteConfig {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct("SqliteConfig")
72 .field("path", &self.path)
73 .field(
74 "encryption_key",
75 &self.encryption_key.as_ref().map(|_| "[REDACTED]"),
76 )
77 .finish()
78 }
79}
80
81fn open_connection(path: &PathBuf) -> Result<rusqlite::Connection, SqliteEventStoreError> {
89 let conn = rusqlite::Connection::open(path).map_err(SqliteEventStoreError::ConnectionFailed)?;
90 conn.pragma_update(None, "journal_mode", "WAL")
92 .map_err(SqliteEventStoreError::ConnectionFailed)?;
93 Ok(conn)
94}
95
96fn open_in_memory_connection() -> Result<rusqlite::Connection, SqliteEventStoreError> {
97 let conn =
98 rusqlite::Connection::open_in_memory().map_err(SqliteEventStoreError::ConnectionFailed)?;
99 conn.pragma_update(None, "journal_mode", "WAL")
101 .map_err(SqliteEventStoreError::ConnectionFailed)?;
102 Ok(conn)
103}
104
105fn apply_encryption_key(
106 conn: &rusqlite::Connection,
107 key: &str,
108) -> Result<(), SqliteEventStoreError> {
109 conn.pragma_update(None, "key", key)
110 .map_err(SqliteEventStoreError::ConnectionFailed)
111}
112
113fn map_join_error(e: tokio::task::JoinError, operation: Operation) -> EventStoreError {
115 error!(error = %e, ?operation, "[sqlite] spawn_blocking task failed");
116 EventStoreError::StoreFailure { operation }
117}
118
119fn map_join_error_migration(e: tokio::task::JoinError) -> SqliteEventStoreError {
121 SqliteEventStoreError::TaskFailed(e.to_string())
122}
123
124fn map_join_error_checkpoint(e: tokio::task::JoinError) -> SqliteCheckpointError {
126 SqliteCheckpointError::TaskFailed(e.to_string())
127}
128
129fn checkpoint_load(
134 conn: &rusqlite::Connection,
135 name: &str,
136) -> Result<Option<StreamPosition>, SqliteCheckpointError> {
137 let mut stmt = conn
138 .prepare(
139 "SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = ?1",
140 )
141 .map_err(SqliteCheckpointError::DatabaseError)?;
142
143 let result: Option<String> = stmt
144 .query_row(params![name], |row| row.get(0))
145 .optional()
146 .map_err(SqliteCheckpointError::DatabaseError)?;
147
148 match result {
149 Some(pos_str) => {
150 let uuid = Uuid::parse_str(&pos_str).map_err(|e| {
151 SqliteCheckpointError::CorruptedCheckpoint {
152 position: pos_str,
153 source: e,
154 }
155 })?;
156 Ok(Some(StreamPosition::new(uuid)))
157 }
158 None => Ok(None),
159 }
160}
161
162fn checkpoint_save(
163 conn: &rusqlite::Connection,
164 name: &str,
165 position_str: &str,
166) -> Result<(), SqliteCheckpointError> {
167 conn.execute(
168 "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
169 VALUES (?1, ?2, strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
170 ON CONFLICT (subscription_name) DO UPDATE SET last_position = ?2, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now')",
171 params![name, position_str],
172 )
173 .map_err(SqliteCheckpointError::DatabaseError)?;
174 Ok(())
175}
176
177fn try_acquire_lock(
182 locks: &std::sync::RwLock<HashSet<String>>,
183 subscription_name: &str,
184) -> Result<(), SqliteCoordinationError> {
185 let mut guard = locks.write().expect("coordination lock poisoned");
186 if guard.contains(subscription_name) {
187 return Err(SqliteCoordinationError::LeadershipNotAcquired {
188 subscription_name: subscription_name.to_string(),
189 });
190 }
191 guard.insert(subscription_name.to_string());
192 Ok(())
193}
194
195pub struct SqliteEventStore {
200 conn: Arc<Mutex<rusqlite::Connection>>,
201 locks: Arc<std::sync::RwLock<HashSet<String>>>,
202}
203
204impl std::fmt::Debug for SqliteEventStore {
205 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
206 f.debug_struct("SqliteEventStore").finish_non_exhaustive()
207 }
208}
209
210impl SqliteEventStore {
211 pub fn new(config: SqliteConfig) -> Result<Self, SqliteEventStoreError> {
212 let conn = open_connection(&config.path)?;
213 if let Some(ref key) = config.encryption_key {
214 apply_encryption_key(&conn, key)?;
215 }
216 Ok(Self {
217 conn: Arc::new(Mutex::new(conn)),
218 locks: Arc::new(std::sync::RwLock::new(HashSet::new())),
219 })
220 }
221
222 pub fn in_memory() -> Result<Self, SqliteEventStoreError> {
223 let conn = open_in_memory_connection()?;
224 Ok(Self {
225 conn: Arc::new(Mutex::new(conn)),
226 locks: Arc::new(std::sync::RwLock::new(HashSet::new())),
227 })
228 }
229
230 pub async fn migrate(&self) -> Result<(), SqliteEventStoreError> {
231 let conn = self.conn.clone();
232 tokio::task::spawn_blocking(move || {
233 let conn = conn.blocking_lock();
234 conn.execute_batch(
235 "CREATE TABLE IF NOT EXISTS eventcore_events (
236 event_id TEXT PRIMARY KEY,
237 stream_id TEXT NOT NULL,
238 stream_version INTEGER NOT NULL,
239 event_type TEXT NOT NULL,
240 event_data TEXT NOT NULL,
241 metadata TEXT NOT NULL DEFAULT '{}',
242 created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
243 );
244 CREATE UNIQUE INDEX IF NOT EXISTS idx_eventcore_events_stream_version
245 ON eventcore_events (stream_id, stream_version);
246 CREATE INDEX IF NOT EXISTS idx_eventcore_events_stream_id
247 ON eventcore_events (stream_id);
248 CREATE TABLE IF NOT EXISTS eventcore_subscription_versions (
249 subscription_name TEXT PRIMARY KEY,
250 last_position TEXT NOT NULL,
251 updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
252 );",
253 )
254 .map_err(SqliteEventStoreError::MigrationFailed)?;
255 Ok(())
256 })
257 .await
258 .map_err(map_join_error_migration)?
259 }
260}
261
262impl EventStore for SqliteEventStore {
263 #[instrument(name = "sqlite.read_stream", skip(self))]
264 async fn read_stream<E: Event>(
265 &self,
266 stream_id: StreamId,
267 ) -> Result<EventStreamReader<E>, EventStoreError> {
268 info!(
269 stream = %stream_id,
270 "[sqlite.read_stream] reading events from sqlite"
271 );
272
273 let conn = self.conn.clone();
274 let sid = stream_id.clone();
275 let rows = tokio::task::spawn_blocking(move || {
276 let conn = conn.blocking_lock();
277 let mut stmt = conn
278 .prepare(
279 "SELECT event_data FROM eventcore_events WHERE stream_id = ?1 ORDER BY stream_version ASC",
280 )
281 .map_err(|e| {
282 error!(error = %e, "[sqlite.read_stream] prepare failed");
283 EventStoreError::StoreFailure {
284 operation: Operation::ReadStream,
285 }
286 })?;
287 let rows: Vec<String> = stmt
288 .query_map(params![sid.as_ref()], |row| row.get(0))
289 .map_err(|e| {
290 error!(error = %e, "[sqlite.read_stream] query failed");
291 EventStoreError::StoreFailure {
292 operation: Operation::ReadStream,
293 }
294 })?
295 .collect::<Result<Vec<String>, _>>()
296 .map_err(|e| {
297 error!(error = %e, "[sqlite.read_stream] row extraction failed");
298 EventStoreError::StoreFailure {
299 operation: Operation::ReadStream,
300 }
301 })?;
302 Ok::<Vec<String>, EventStoreError>(rows)
303 })
304 .await
305 .map_err(|e| map_join_error(e, Operation::ReadStream))??;
306
307 let mut events = Vec::with_capacity(rows.len());
308 for json_str in rows {
309 let event: E = serde_json::from_str(&json_str).map_err(|e| {
310 EventStoreError::DeserializationFailed {
311 stream_id: stream_id.clone(),
312 detail: e.to_string(),
313 }
314 })?;
315 events.push(event);
316 }
317
318 Ok(EventStreamReader::new(events))
319 }
320
321 #[instrument(name = "sqlite.append_events", skip(self, writes))]
322 async fn append_events(
323 &self,
324 writes: StreamWrites,
325 ) -> Result<EventStreamSlice, EventStoreError> {
326 let expected_versions = writes.expected_versions().clone();
327 let entries = writes.into_entries();
328
329 if entries.is_empty() {
330 return Ok(EventStreamSlice);
331 }
332
333 info!(
334 stream_count = expected_versions.len(),
335 event_count = entries.len(),
336 "[sqlite.append_events] appending events to sqlite"
337 );
338
339 let conn = self.conn.clone();
340 tokio::task::spawn_blocking(move || {
341 let conn = conn.blocking_lock();
342 let tx = conn.unchecked_transaction().map_err(|e| {
343 error!(error = %e, "[sqlite.append_events] begin transaction failed");
344 EventStoreError::StoreFailure {
345 operation: Operation::BeginTransaction,
346 }
347 })?;
348
349 for (stream_id, expected_version) in &expected_versions {
350 let current: usize = tx
351 .query_row(
352 "SELECT COALESCE(MAX(stream_version), 0) FROM eventcore_events WHERE stream_id = ?1",
353 params![stream_id.as_ref()],
354 |row| row.get(0),
355 )
356 .map_err(|e| {
357 error!(error = %e, "[sqlite.append_events] version check failed");
358 EventStoreError::StoreFailure {
359 operation: Operation::AppendEvents,
360 }
361 })?;
362
363 if current != expected_version.into_inner() {
364 warn!(
365 stream = %stream_id,
366 expected = expected_version.into_inner(),
367 actual = current,
368 "[sqlite.version_conflict] optimistic concurrency check failed"
369 );
370 return Err(EventStoreError::VersionConflict);
371 }
372 }
373
374 let mut current_versions: HashMap<&StreamId, usize> = expected_versions
377 .iter()
378 .map(|(sid, v)| (sid, v.into_inner()))
379 .collect();
380
381 for entry in &entries {
382 let StreamWriteEntry {
383 stream_id,
384 event_type,
385 event_data,
386 ..
387 } = entry;
388
389 let event_id = Uuid::now_v7().to_string();
390 let version_counter = current_versions
391 .get_mut(stream_id)
392 .expect("stream must be registered");
393 *version_counter += 1;
394 let version = *version_counter;
395
396 let event_json = serde_json::to_string(event_data).map_err(|e| {
397 error!(error = %e, "[sqlite.append_events] serialization failed");
398 EventStoreError::StoreFailure {
399 operation: Operation::AppendEvents,
400 }
401 })?;
402
403 tx.execute(
404 "INSERT INTO eventcore_events (event_id, stream_id, stream_version, event_type, event_data, metadata)
405 VALUES (?1, ?2, ?3, ?4, ?5, '{}')",
406 params![
407 event_id,
408 stream_id.as_ref(),
409 version,
410 event_type,
411 event_json,
412 ],
413 )
414 .map_err(|e| {
415 error!(error = %e, "[sqlite.append_events] insert failed");
416 EventStoreError::StoreFailure {
417 operation: Operation::AppendEvents,
418 }
419 })?;
420 }
421
422 tx.commit().map_err(|e| {
423 error!(error = %e, "[sqlite.append_events] commit failed");
424 EventStoreError::StoreFailure {
425 operation: Operation::CommitTransaction,
426 }
427 })?;
428
429 Ok(EventStreamSlice)
430 })
431 .await
432 .map_err(|e| map_join_error(e, Operation::AppendEvents))?
433 }
434}
435
436impl EventReader for SqliteEventStore {
437 type Error = EventStoreError;
438
439 #[instrument(name = "sqlite.read_events", skip(self))]
440 async fn read_events<E: Event>(
441 &self,
442 filter: EventFilter,
443 page: EventPage,
444 ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
445 let conn = self.conn.clone();
446 let after_event_id: Option<String> =
447 page.after_position().map(|p| p.into_inner().to_string());
448 let limit = page.limit().into_inner() as i64;
449 let prefix = filter.stream_prefix().map(|p| p.as_ref().to_string());
450
451 let rows = tokio::task::spawn_blocking(move || {
452 let conn = conn.blocking_lock();
453
454 let (sql, param_values): (String, Vec<Box<dyn rusqlite::types::ToSql>>) =
455 match (&prefix, &after_event_id) {
456 (Some(pfx), Some(after_id)) => (
460 "SELECT event_id, event_data FROM eventcore_events WHERE event_id > ?1 AND stream_id LIKE ?2 ORDER BY event_id LIMIT ?3"
461 .to_string(),
462 vec![
463 Box::new(after_id.clone()) as Box<dyn rusqlite::types::ToSql>,
464 Box::new(format!("{}%", pfx)),
465 Box::new(limit),
466 ],
467 ),
468 (Some(pfx), None) => (
469 "SELECT event_id, event_data FROM eventcore_events WHERE stream_id LIKE ?1 ORDER BY event_id LIMIT ?2"
470 .to_string(),
471 vec![
472 Box::new(format!("{}%", pfx)) as Box<dyn rusqlite::types::ToSql>,
473 Box::new(limit),
474 ],
475 ),
476 (None, Some(after_id)) => (
477 "SELECT event_id, event_data FROM eventcore_events WHERE event_id > ?1 ORDER BY event_id LIMIT ?2"
478 .to_string(),
479 vec![
480 Box::new(after_id.clone()) as Box<dyn rusqlite::types::ToSql>,
481 Box::new(limit),
482 ],
483 ),
484 (None, None) => (
485 "SELECT event_id, event_data FROM eventcore_events ORDER BY event_id LIMIT ?1"
486 .to_string(),
487 vec![Box::new(limit) as Box<dyn rusqlite::types::ToSql>],
488 ),
489 };
490
491 let params_refs: Vec<&dyn rusqlite::types::ToSql> =
492 param_values.iter().map(|p| p.as_ref()).collect();
493
494 let mut stmt = conn.prepare(&sql).map_err(|e| {
495 error!(error = %e, "[sqlite.read_events] prepare failed");
496 EventStoreError::StoreFailure {
497 operation: Operation::ReadStream,
498 }
499 })?;
500
501 let rows: Vec<(String, String)> = stmt
502 .query_map(params_refs.as_slice(), |row| {
503 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
504 })
505 .map_err(|e| {
506 error!(error = %e, "[sqlite.read_events] query failed");
507 EventStoreError::StoreFailure {
508 operation: Operation::ReadStream,
509 }
510 })?
511 .collect::<Result<Vec<_>, _>>()
512 .map_err(|e| {
513 error!(error = %e, "[sqlite.read_events] row extraction failed");
514 EventStoreError::StoreFailure {
515 operation: Operation::ReadStream,
516 }
517 })?;
518
519 Ok::<Vec<(String, String)>, EventStoreError>(rows)
520 })
521 .await
522 .map_err(|e| map_join_error(e, Operation::ReadStream))??;
523
524 let events: Vec<(E, StreamPosition)> = rows
529 .into_iter()
530 .filter_map(|(event_id_str, event_data_str)| {
531 let uuid = Uuid::parse_str(&event_id_str).ok()?;
532 let event: E = serde_json::from_str(&event_data_str).ok()?;
533 Some((event, StreamPosition::new(uuid)))
534 })
535 .collect();
536
537 Ok(events)
538 }
539}
540
541impl CheckpointStore for SqliteEventStore {
542 type Error = SqliteCheckpointError;
543
544 #[instrument(name = "sqlite.checkpoint.load", skip(self))]
545 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
546 let conn = self.conn.clone();
547 let name = name.to_string();
548 tokio::task::spawn_blocking(move || {
549 let conn = conn.blocking_lock();
550 checkpoint_load(&conn, &name)
551 })
552 .await
553 .map_err(map_join_error_checkpoint)?
554 }
555
556 #[instrument(name = "sqlite.checkpoint.save", skip(self))]
557 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
558 let conn = self.conn.clone();
559 let name = name.to_string();
560 let position_str = position.into_inner().to_string();
561 tokio::task::spawn_blocking(move || {
562 let conn = conn.blocking_lock();
563 checkpoint_save(&conn, &name, &position_str)
564 })
565 .await
566 .map_err(map_join_error_checkpoint)?
567 }
568}
569
570impl ProjectorCoordinator for SqliteEventStore {
571 type Error = SqliteCoordinationError;
572 type Guard = SqliteCoordinationGuard;
573
574 #[instrument(name = "sqlite.coordinator.try_acquire", skip(self))]
575 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
576 try_acquire_lock(&self.locks, subscription_name)?;
577 Ok(SqliteCoordinationGuard {
578 subscription_name: subscription_name.to_string(),
579 locks: Arc::clone(&self.locks),
580 })
581 }
582}
583
584#[derive(Debug)]
593pub struct SqliteCoordinationGuard {
594 subscription_name: String,
595 locks: Arc<std::sync::RwLock<HashSet<String>>>,
596}
597
598impl Drop for SqliteCoordinationGuard {
599 fn drop(&mut self) {
600 if let Ok(mut guard) = self.locks.write() {
601 guard.remove(&self.subscription_name);
602 } else {
603 error!(
604 subscription = %self.subscription_name,
605 "[sqlite.coordination_guard] lock poisoned, cannot release leadership"
606 );
607 }
608 }
609}
610
611pub struct SqliteCheckpointStore {
616 conn: Arc<Mutex<rusqlite::Connection>>,
617}
618
619impl std::fmt::Debug for SqliteCheckpointStore {
620 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
621 f.debug_struct("SqliteCheckpointStore")
622 .finish_non_exhaustive()
623 }
624}
625
626impl SqliteCheckpointStore {
627 pub fn new(config: SqliteConfig) -> Result<Self, SqliteEventStoreError> {
628 let conn = open_connection(&config.path)?;
629 if let Some(ref key) = config.encryption_key {
630 apply_encryption_key(&conn, key)?;
631 }
632 Ok(Self {
633 conn: Arc::new(Mutex::new(conn)),
634 })
635 }
636
637 pub fn in_memory() -> Result<Self, SqliteEventStoreError> {
638 let conn = open_in_memory_connection()?;
639 Ok(Self {
640 conn: Arc::new(Mutex::new(conn)),
641 })
642 }
643
644 pub async fn migrate(&self) -> Result<(), SqliteEventStoreError> {
645 let conn = self.conn.clone();
646 tokio::task::spawn_blocking(move || {
647 let conn = conn.blocking_lock();
648 conn.execute_batch(
649 "CREATE TABLE IF NOT EXISTS eventcore_subscription_versions (
650 subscription_name TEXT PRIMARY KEY,
651 last_position TEXT NOT NULL,
652 updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
653 );",
654 )
655 .map_err(SqliteEventStoreError::MigrationFailed)?;
656 Ok(())
657 })
658 .await
659 .map_err(map_join_error_migration)?
660 }
661}
662
663impl CheckpointStore for SqliteCheckpointStore {
664 type Error = SqliteCheckpointError;
665
666 #[instrument(name = "sqlite.checkpoint.load", skip(self))]
667 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
668 let conn = self.conn.clone();
669 let name = name.to_string();
670 tokio::task::spawn_blocking(move || {
671 let conn = conn.blocking_lock();
672 checkpoint_load(&conn, &name)
673 })
674 .await
675 .map_err(map_join_error_checkpoint)?
676 }
677
678 #[instrument(name = "sqlite.checkpoint.save", skip(self))]
679 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
680 let conn = self.conn.clone();
681 let name = name.to_string();
682 let position_str = position.into_inner().to_string();
683 tokio::task::spawn_blocking(move || {
684 let conn = conn.blocking_lock();
685 checkpoint_save(&conn, &name, &position_str)
686 })
687 .await
688 .map_err(map_join_error_checkpoint)?
689 }
690}
691
692#[derive(Debug, Clone, Default)]
697pub struct SqliteProjectorCoordinator {
698 locks: Arc<std::sync::RwLock<HashSet<String>>>,
699}
700
701impl SqliteProjectorCoordinator {
702 pub fn new() -> Self {
703 Self::default()
704 }
705}
706
707impl ProjectorCoordinator for SqliteProjectorCoordinator {
708 type Error = SqliteCoordinationError;
709 type Guard = SqliteCoordinationGuard;
710
711 #[instrument(name = "sqlite.coordinator.try_acquire", skip(self))]
712 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
713 try_acquire_lock(&self.locks, subscription_name)?;
714 Ok(SqliteCoordinationGuard {
715 subscription_name: subscription_name.to_string(),
716 locks: Arc::clone(&self.locks),
717 })
718 }
719}