Skip to main content

factstr_sqlite/
sqlite_store.rs

1use std::fs;
2use std::future::Future;
3use std::io;
4use std::path::{Path, PathBuf};
5use std::sync::{
6    Arc, Mutex,
7    mpsc::{self, Receiver, Sender},
8};
9use std::thread::{self, JoinHandle};
10
11use factstr::{
12    AppendResult, DurableStream, EventQuery, EventRecord, EventStore, EventStoreError, EventStream,
13    HandleStream, NewEvent, QueryResult,
14};
15use serde_json::Value;
16use sqlx::sqlite::SqliteRow;
17use sqlx::{QueryBuilder, Row, Sqlite, SqlitePool};
18use time::OffsetDateTime;
19use time::format_description::well_known::Rfc3339;
20use tokio::runtime::Builder;
21use tokio::runtime::Runtime;
22
23use crate::connection::open_pool;
24use crate::query_match::matches_query;
25use crate::schema::initialize_schema;
26use crate::stream_registry::{DeliveryOutcome, PendingDelivery, SubscriptionRegistry};
27
28#[derive(Clone, Debug)]
29struct CommittedAppend {
30    append_result: AppendResult,
31    event_records: Vec<EventRecord>,
32}
33
34enum DeliveryCommand {
35    Deliver(Vec<PendingDelivery>),
36    Shutdown,
37}
38
39pub struct SqliteStore {
40    database_path: PathBuf,
41    pool: SqlitePool,
42    subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
43    delivery_sender: Sender<DeliveryCommand>,
44    delivery_thread: Mutex<Option<JoinHandle<()>>>,
45}
46
47#[derive(Clone, Debug)]
48struct ReplayBatch {
49    last_processed_sequence_number: u64,
50    delivered_batch: Vec<EventRecord>,
51}
52
53#[derive(Clone, Debug)]
54struct DurableReplayState {
55    subscription_id: u64,
56    last_processed_sequence_number: u64,
57    replay_until_sequence_number: u64,
58}
59
60impl SqliteStore {
61    pub fn open(database_path: impl AsRef<Path>) -> Result<Self, sqlx::Error> {
62        let database_path = database_path.as_ref().to_path_buf();
63        ensure_parent_directory(&database_path).map_err(sqlx_io_error)?;
64
65        let (result_sender, result_receiver) = mpsc::sync_channel(1);
66        let bootstrap_path = database_path.clone();
67
68        let bootstrap_thread = thread::Builder::new()
69            .name("factstr-sqlite-bootstrap".to_owned())
70            .spawn(move || {
71                let runtime = Builder::new_current_thread()
72                    .enable_all()
73                    .build()
74                    .map_err(sqlx_io_error);
75
76                let result = match runtime {
77                    Ok(runtime) => runtime.block_on(async {
78                        let pool = open_pool(&bootstrap_path).await?;
79                        initialize_schema(&pool).await?;
80                        Ok::<_, sqlx::Error>(pool)
81                    }),
82                    Err(error) => Err(error),
83                };
84
85                let _ = result_sender.send(result);
86            })
87            .map_err(sqlx_io_error)?;
88
89        let pool = match result_receiver.recv() {
90            Ok(result) => result,
91            Err(error) => Err(sqlx_io_error(io::Error::other(format!(
92                "sqlite bootstrap thread did not return a result: {error}"
93            )))),
94        }?;
95
96        bootstrap_thread.join().map_err(|_| {
97            sqlx_io_error(io::Error::other(
98                "sqlite bootstrap thread panicked before open completed",
99            ))
100        })?;
101
102        let subscription_registry = Arc::new(Mutex::new(SubscriptionRegistry::default()));
103        let (delivery_sender, delivery_receiver) = mpsc::channel();
104        let delivery_pool = pool.clone();
105        let delivery_registry = Arc::clone(&subscription_registry);
106        let delivery_thread = thread::Builder::new()
107            .name("factstr-sqlite-delivery".to_owned())
108            .spawn(move || run_delivery_thread(delivery_pool, delivery_registry, delivery_receiver))
109            .map_err(sqlx_io_error)?;
110
111        Ok(Self {
112            database_path,
113            pool,
114            subscription_registry,
115            delivery_sender,
116            delivery_thread: Mutex::new(Some(delivery_thread)),
117        })
118    }
119
120    pub fn database_path(&self) -> &Path {
121        &self.database_path
122    }
123
124    fn run_async<T, Fut, F>(&self, operation: &'static str, work: F) -> Result<T, EventStoreError>
125    where
126        T: Send + 'static,
127        Fut: Future<Output = Result<T, EventStoreError>> + Send + 'static,
128        F: FnOnce(SqlitePool) -> Fut + Send + 'static,
129    {
130        let pool = self.pool.clone();
131        let (result_sender, result_receiver) = mpsc::sync_channel(1);
132
133        let worker_thread = thread::Builder::new()
134            .name(format!("factstr-sqlite-{operation}"))
135            .spawn(move || {
136                let runtime = Builder::new_current_thread()
137                    .enable_all()
138                    .build()
139                    .map_err(sqlx_io_error)
140                    .map_err(sqlx_backend_failure);
141
142                let result = match runtime {
143                    Ok(runtime) => runtime.block_on(work(pool)),
144                    Err(error) => Err(error),
145                };
146
147                let _ = result_sender.send(result);
148            })
149            .map_err(sqlx_io_error)
150            .map_err(sqlx_backend_failure)?;
151
152        let result = result_receiver
153            .recv()
154            .map_err(|error| EventStoreError::BackendFailure {
155                message: format!("sqlite {operation} thread did not return a result: {error}"),
156            })?;
157
158        worker_thread
159            .join()
160            .map_err(|_| EventStoreError::BackendFailure {
161                message: format!("sqlite {operation} thread panicked before completion"),
162            })?;
163
164        result
165    }
166
167    fn pending_deliveries(&self, committed_batch: &[EventRecord]) -> Vec<PendingDelivery> {
168        match self.subscription_registry.lock() {
169            Ok(mut subscription_registry) => {
170                subscription_registry.pending_deliveries(committed_batch)
171            }
172            Err(poisoned) => poisoned.into_inner().pending_deliveries(committed_batch),
173        }
174    }
175
176    fn enqueue_delivery(&self, pending_deliveries: Vec<PendingDelivery>) {
177        if pending_deliveries.is_empty() {
178            return;
179        }
180
181        if let Err(error) = self
182            .delivery_sender
183            .send(DeliveryCommand::Deliver(pending_deliveries))
184        {
185            eprintln!(
186                "factstr-sqlite delivery dispatcher stopped after commit: {}",
187                error
188            );
189        }
190    }
191
192    fn register_all_durable_stream(
193        &self,
194        subscriber_id: impl Into<String>,
195        handle: HandleStream,
196    ) -> Result<EventStream, EventStoreError> {
197        self.subscribe_durable(subscriber_id.into(), EventQuery::all(), handle)
198    }
199
200    fn register_durable_stream(
201        &self,
202        subscriber_id: impl Into<String>,
203        event_query: &EventQuery,
204        handle: HandleStream,
205    ) -> Result<EventStream, EventStoreError> {
206        self.subscribe_durable(subscriber_id.into(), event_query.clone(), handle)
207    }
208
209    fn subscribe_durable(
210        &self,
211        subscriber_id: String,
212        event_query: EventQuery,
213        handle: HandleStream,
214    ) -> Result<EventStream, EventStoreError> {
215        let normalized_event_query = normalized_durable_event_query(&event_query);
216        let event_query_json = serialize_event_query(&normalized_event_query)?;
217        let subscription_registry = Arc::clone(&self.subscription_registry);
218        let durable_subscriber_id = subscriber_id.clone();
219        let normalized_event_query_for_registry = normalized_event_query.clone();
220        let handle_for_registry = handle.clone();
221        let replay_subscriber_id = subscriber_id.clone();
222
223        let replay_state = self.run_async("subscribe_durable", move |pool| async move {
224            let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
225
226            sqlx::query("BEGIN IMMEDIATE")
227                .execute(connection.as_mut())
228                .await
229                .map_err(sqlx_backend_failure)?;
230
231            let last_processed_sequence_number = load_or_create_subscriber_cursor(
232                connection.as_mut(),
233                &durable_subscriber_id,
234                &event_query_json,
235            )
236            .await?;
237            let replay_until_sequence_number =
238                current_max_sequence_number(connection.as_mut()).await?;
239
240            let subscription_id = match subscription_registry.lock() {
241                Ok(mut subscription_registry) => {
242                    if normalized_event_query_for_registry.filters.is_none() {
243                        subscription_registry
244                            .subscribe_all_durable(
245                                durable_subscriber_id.clone(),
246                                replay_until_sequence_number,
247                                handle_for_registry.clone(),
248                            )
249                            .map_err(subscription_registry_backend_failure)?
250                    } else {
251                        subscription_registry
252                            .subscribe_to_durable(
253                                durable_subscriber_id.clone(),
254                                Some(normalized_event_query_for_registry.clone()),
255                                replay_until_sequence_number,
256                                handle_for_registry.clone(),
257                            )
258                            .map_err(subscription_registry_backend_failure)?
259                    }
260                }
261                Err(poisoned) => {
262                    let mut subscription_registry = poisoned.into_inner();
263                    if normalized_event_query_for_registry.filters.is_none() {
264                        subscription_registry
265                            .subscribe_all_durable(
266                                durable_subscriber_id.clone(),
267                                replay_until_sequence_number,
268                                handle_for_registry.clone(),
269                            )
270                            .map_err(subscription_registry_backend_failure)?
271                    } else {
272                        subscription_registry
273                            .subscribe_to_durable(
274                                durable_subscriber_id.clone(),
275                                Some(normalized_event_query_for_registry.clone()),
276                                replay_until_sequence_number,
277                                handle_for_registry.clone(),
278                            )
279                            .map_err(subscription_registry_backend_failure)?
280                    }
281                }
282            };
283
284            if let Err(error) = sqlx::query("COMMIT").execute(connection.as_mut()).await {
285                match subscription_registry.lock() {
286                    Ok(mut subscription_registry) => {
287                        subscription_registry.unsubscribe(subscription_id)
288                    }
289                    Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
290                }
291                return Err(sqlx_backend_failure(error));
292            }
293
294            Ok(DurableReplayState {
295                subscription_id,
296                last_processed_sequence_number,
297                replay_until_sequence_number,
298            })
299        })?;
300
301        let subscription = self.build_subscription_handle(replay_state.subscription_id);
302
303        let replay_batches = match self.run_async("durable_replay", {
304            let event_query = normalized_event_query.clone();
305            move |pool| async move {
306                ensure_replay_history_is_available_for_pool(
307                    &pool,
308                    replay_state.replay_until_sequence_number,
309                )
310                .await?;
311                load_replay_batches(
312                    &pool,
313                    &event_query,
314                    replay_state.last_processed_sequence_number,
315                    replay_state.replay_until_sequence_number,
316                )
317                .await
318            }
319        }) {
320            Ok(replay_batches) => replay_batches,
321            Err(error) => {
322                self.cleanup_durable_subscription(replay_state.subscription_id);
323                return Err(error);
324            }
325        };
326        let delivery_runtime = build_delivery_runtime("factstr-sqlite durable replay")?;
327
328        for replay_batch in replay_batches {
329            let pending_delivery = PendingDelivery {
330                subscription_id: replay_state.subscription_id,
331                durable_subscriber_id: Some(replay_subscriber_id.clone()),
332                last_processed_sequence_number: replay_batch.last_processed_sequence_number,
333                delivered_batch: replay_batch.delivered_batch,
334                handle: handle.clone(),
335            };
336
337            match self.process_durable_delivery(&delivery_runtime, pending_delivery) {
338                Ok(true) => {}
339                Ok(false) => {
340                    self.cleanup_durable_subscription(replay_state.subscription_id);
341
342                    return Err(EventStoreError::BackendFailure {
343                        message: format!(
344                            "durable replay for subscriber {} did not complete successfully",
345                            replay_subscriber_id
346                        ),
347                    });
348                }
349                Err(error) => {
350                    self.cleanup_durable_subscription(replay_state.subscription_id);
351                    return Err(error);
352                }
353            }
354        }
355
356        self.finish_durable_replay(replay_state.subscription_id);
357        Ok(subscription)
358    }
359
360    fn build_subscription_handle(&self, subscription_id: u64) -> EventStream {
361        let subscription_registry = Arc::clone(&self.subscription_registry);
362
363        EventStream::new(
364            subscription_id,
365            Arc::new(move |subscription_id| match subscription_registry.lock() {
366                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
367                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
368            }),
369        )
370    }
371
372    fn process_durable_delivery(
373        &self,
374        delivery_runtime: &Runtime,
375        pending_delivery: PendingDelivery,
376    ) -> Result<bool, EventStoreError> {
377        match pending_delivery.deliver(delivery_runtime) {
378            DeliveryOutcome::Succeeded {
379                durable_subscriber_id,
380                last_processed_sequence_number,
381                ..
382            } => {
383                if let Some(durable_subscriber_id) = durable_subscriber_id {
384                    self.run_async("advance_durable_cursor", move |pool| async move {
385                        update_subscriber_cursor(
386                            &pool,
387                            &durable_subscriber_id,
388                            last_processed_sequence_number,
389                        )
390                        .await
391                    })?;
392                }
393
394                Ok(true)
395            }
396            DeliveryOutcome::Failed { .. } | DeliveryOutcome::Panicked { .. } => Ok(false),
397        }
398    }
399
400    fn finish_durable_replay(&self, subscription_id: u64) {
401        match self.subscription_registry.lock() {
402            Ok(mut subscription_registry) => {
403                let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
404                self.enqueue_delivery(buffered_deliveries);
405            }
406            Err(poisoned) => {
407                let mut subscription_registry = poisoned.into_inner();
408                let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
409                self.enqueue_delivery(buffered_deliveries);
410            }
411        }
412    }
413
414    fn cleanup_durable_subscription(&self, subscription_id: u64) {
415        match self.subscription_registry.lock() {
416            Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
417            Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
418        }
419    }
420}
421
422impl Drop for SqliteStore {
423    fn drop(&mut self) {
424        let _ = self.delivery_sender.send(DeliveryCommand::Shutdown);
425
426        if let Ok(mut delivery_thread) = self.delivery_thread.lock() {
427            if let Some(delivery_thread) = delivery_thread.take() {
428                let _ = delivery_thread.join();
429            }
430        }
431    }
432}
433
434impl EventStore for SqliteStore {
435    fn query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
436        let matching_records = self.run_async("query", {
437            let event_query = event_query.clone();
438            move |pool| async move { load_matching_records(&pool, &event_query).await }
439        })?;
440
441        let current_context_version = matching_records
442            .last()
443            .map(|event_record| event_record.sequence_number);
444
445        let event_records = matching_records
446            .into_iter()
447            .filter(|event_record| {
448                event_query
449                    .min_sequence_number
450                    .is_none_or(|min_sequence_number| {
451                        event_record.sequence_number > min_sequence_number
452                    })
453            })
454            .collect::<Vec<_>>();
455
456        let last_returned_sequence_number = event_records
457            .last()
458            .map(|event_record| event_record.sequence_number);
459
460        Ok(QueryResult {
461            event_records,
462            last_returned_sequence_number,
463            current_context_version,
464        })
465    }
466
467    fn append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
468        if new_events.is_empty() {
469            return Err(EventStoreError::EmptyAppend);
470        }
471
472        let committed_append = self.run_async("append", move |pool| async move {
473            let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
474
475            sqlx::query("BEGIN IMMEDIATE")
476                .execute(connection.as_mut())
477                .await
478                .map_err(sqlx_backend_failure)?;
479
480            let append_result = append_batch(connection.as_mut(), new_events).await;
481
482            match append_result {
483                Ok(committed_append) => {
484                    sqlx::query("COMMIT")
485                        .execute(connection.as_mut())
486                        .await
487                        .map_err(sqlx_backend_failure)?;
488                    Ok(committed_append)
489                }
490                Err(error) => {
491                    let _ = sqlx::query("ROLLBACK").execute(connection.as_mut()).await;
492                    Err(error)
493                }
494            }
495        })?;
496
497        let pending_deliveries = self.pending_deliveries(&committed_append.event_records);
498        self.enqueue_delivery(pending_deliveries);
499        Ok(committed_append.append_result)
500    }
501
502    fn append_if(
503        &self,
504        new_events: Vec<NewEvent>,
505        context_query: &EventQuery,
506        expected_context_version: Option<u64>,
507    ) -> Result<AppendResult, EventStoreError> {
508        if new_events.is_empty() {
509            return Err(EventStoreError::EmptyAppend);
510        }
511
512        let committed_append = self.run_async("append_if", {
513            let context_query = context_query.clone();
514            move |pool| async move {
515                let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
516
517                sqlx::query("BEGIN IMMEDIATE")
518                    .execute(connection.as_mut())
519                    .await
520                    .map_err(sqlx_backend_failure)?;
521
522                let conflict_query = EventQuery {
523                    filters: context_query.filters.clone(),
524                    min_sequence_number: None,
525                };
526                let actual_context_version =
527                    load_matching_records_from_connection(connection.as_mut(), &conflict_query)
528                        .await?
529                        .last()
530                        .map(|event_record| event_record.sequence_number);
531
532                if actual_context_version != expected_context_version {
533                    sqlx::query("ROLLBACK")
534                        .execute(connection.as_mut())
535                        .await
536                        .map_err(sqlx_backend_failure)?;
537
538                    return Err(EventStoreError::ConditionalAppendConflict {
539                        expected: expected_context_version,
540                        actual: actual_context_version,
541                    });
542                }
543
544                let append_result = append_batch(connection.as_mut(), new_events).await;
545
546                match append_result {
547                    Ok(committed_append) => {
548                        sqlx::query("COMMIT")
549                            .execute(connection.as_mut())
550                            .await
551                            .map_err(sqlx_backend_failure)?;
552                        Ok(committed_append)
553                    }
554                    Err(error) => {
555                        let _ = sqlx::query("ROLLBACK").execute(connection.as_mut()).await;
556                        Err(error)
557                    }
558                }
559            }
560        })?;
561
562        let pending_deliveries = self.pending_deliveries(&committed_append.event_records);
563        self.enqueue_delivery(pending_deliveries);
564        Ok(committed_append.append_result)
565    }
566
567    fn stream_all(&self, handle: HandleStream) -> Result<EventStream, EventStoreError> {
568        let subscription_registry = Arc::clone(&self.subscription_registry);
569        let id = match subscription_registry.lock() {
570            Ok(mut subscription_registry) => subscription_registry.subscribe_all(handle),
571            Err(poisoned) => poisoned.into_inner().subscribe_all(handle),
572        };
573
574        Ok(EventStream::new(
575            id,
576            Arc::new(move |subscription_id| match subscription_registry.lock() {
577                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
578                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
579            }),
580        ))
581    }
582
583    fn stream_to(
584        &self,
585        event_query: &EventQuery,
586        handle: HandleStream,
587    ) -> Result<EventStream, EventStoreError> {
588        let subscription_registry = Arc::clone(&self.subscription_registry);
589        let id = match subscription_registry.lock() {
590            Ok(mut subscription_registry) => {
591                subscription_registry.subscribe_to(Some(event_query.clone()), handle)
592            }
593            Err(poisoned) => poisoned
594                .into_inner()
595                .subscribe_to(Some(event_query.clone()), handle),
596        };
597
598        Ok(EventStream::new(
599            id,
600            Arc::new(move |subscription_id| match subscription_registry.lock() {
601                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
602                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
603            }),
604        ))
605    }
606
607    fn stream_all_durable(
608        &self,
609        durable_stream: &DurableStream,
610        handle: HandleStream,
611    ) -> Result<EventStream, EventStoreError> {
612        self.register_all_durable_stream(durable_stream.name(), handle)
613    }
614
615    fn stream_to_durable(
616        &self,
617        durable_stream: &DurableStream,
618        event_query: &EventQuery,
619        handle: HandleStream,
620    ) -> Result<EventStream, EventStoreError> {
621        self.register_durable_stream(durable_stream.name(), event_query, handle)
622    }
623}
624
625async fn append_batch(
626    connection: &mut sqlx::SqliteConnection,
627    new_events: Vec<NewEvent>,
628) -> Result<CommittedAppend, EventStoreError> {
629    let last_sequence_number =
630        sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(sequence_number) FROM events")
631            .fetch_one(&mut *connection)
632            .await
633            .map_err(sqlx_backend_failure)?
634            .unwrap_or(0);
635
636    let first_sequence_number = (last_sequence_number as u64) + 1;
637    let committed_count = new_events.len() as u64;
638    let last_sequence_number = first_sequence_number + committed_count - 1;
639
640    let event_records = new_events
641        .into_iter()
642        .enumerate()
643        .map(|(offset, new_event)| EventRecord {
644            sequence_number: first_sequence_number + offset as u64,
645            occurred_at: OffsetDateTime::now_utc(),
646            event_type: new_event.event_type,
647            payload: new_event.payload,
648        })
649        .collect::<Vec<_>>();
650
651    for event_record in &event_records {
652        let payload = serde_json::to_string(&event_record.payload).map_err(json_backend_failure)?;
653        let occurred_at = event_record
654            .occurred_at
655            .format(&Rfc3339)
656            .expect("sqlite occurred_at should format as RFC3339");
657
658        sqlx::query(
659            "INSERT INTO events (sequence_number, occurred_at, event_type, payload)
660             VALUES (?1, ?2, ?3, ?4)",
661        )
662        .bind(event_record.sequence_number as i64)
663        .bind(occurred_at)
664        .bind(&event_record.event_type)
665        .bind(payload)
666        .execute(&mut *connection)
667        .await
668        .map_err(sqlx_backend_failure)?;
669    }
670
671    sqlx::query(
672        "INSERT INTO append_batches (first_sequence_number, last_sequence_number)
673         VALUES (?1, ?2)",
674    )
675    .bind(first_sequence_number as i64)
676    .bind(last_sequence_number as i64)
677    .execute(&mut *connection)
678    .await
679    .map_err(sqlx_backend_failure)?;
680
681    Ok(CommittedAppend {
682        append_result: AppendResult {
683            first_sequence_number,
684            last_sequence_number,
685            committed_count,
686        },
687        event_records,
688    })
689}
690
691async fn load_matching_records(
692    pool: &SqlitePool,
693    event_query: &EventQuery,
694) -> Result<Vec<EventRecord>, EventStoreError> {
695    let candidate_event_types = event_type_candidates(event_query);
696    let rows = fetch_candidate_rows(pool, candidate_event_types.as_deref()).await?;
697    matching_records_from_rows(rows, event_query)
698}
699
700async fn load_matching_records_from_connection(
701    connection: &mut sqlx::SqliteConnection,
702    event_query: &EventQuery,
703) -> Result<Vec<EventRecord>, EventStoreError> {
704    let candidate_event_types = event_type_candidates(event_query);
705    let rows =
706        fetch_candidate_rows_from_connection(connection, candidate_event_types.as_deref()).await?;
707    matching_records_from_rows(rows, event_query)
708}
709
710fn matching_records_from_rows(
711    rows: Vec<SqliteRow>,
712    event_query: &EventQuery,
713) -> Result<Vec<EventRecord>, EventStoreError> {
714    rows.into_iter()
715        .map(row_to_event_record)
716        .filter(|result| {
717            result
718                .as_ref()
719                .is_ok_and(|event_record| matches_query(event_query, event_record))
720        })
721        .collect()
722}
723
724async fn fetch_candidate_rows(
725    pool: &SqlitePool,
726    candidate_event_types: Option<&[String]>,
727) -> Result<Vec<SqliteRow>, EventStoreError> {
728    let mut query_builder = QueryBuilder::<Sqlite>::new(
729        "SELECT sequence_number, occurred_at, event_type, payload FROM events",
730    );
731
732    if let Some(candidate_event_types) = candidate_event_types {
733        if candidate_event_types.is_empty() {
734            query_builder.push(" WHERE FALSE");
735        } else {
736            query_builder.push(" WHERE event_type IN (");
737
738            let mut separated = query_builder.separated(", ");
739            for event_type in candidate_event_types {
740                separated.push_bind(event_type);
741            }
742            separated.push_unseparated(")");
743        }
744    }
745
746    query_builder.push(" ORDER BY sequence_number ASC");
747
748    query_builder
749        .build()
750        .fetch_all(pool)
751        .await
752        .map_err(sqlx_backend_failure)
753}
754
755async fn fetch_candidate_rows_from_connection(
756    connection: &mut sqlx::SqliteConnection,
757    candidate_event_types: Option<&[String]>,
758) -> Result<Vec<SqliteRow>, EventStoreError> {
759    let mut query_builder = QueryBuilder::<Sqlite>::new(
760        "SELECT sequence_number, occurred_at, event_type, payload FROM events",
761    );
762
763    if let Some(candidate_event_types) = candidate_event_types {
764        if candidate_event_types.is_empty() {
765            query_builder.push(" WHERE FALSE");
766        } else {
767            query_builder.push(" WHERE event_type IN (");
768
769            let mut separated = query_builder.separated(", ");
770            for event_type in candidate_event_types {
771                separated.push_bind(event_type);
772            }
773            separated.push_unseparated(")");
774        }
775    }
776
777    query_builder.push(" ORDER BY sequence_number ASC");
778
779    query_builder
780        .build()
781        .fetch_all(connection)
782        .await
783        .map_err(sqlx_backend_failure)
784}
785
786fn event_type_candidates(event_query: &EventQuery) -> Option<Vec<String>> {
787    let filters = match &event_query.filters {
788        None => return None,
789        Some(filters) if filters.is_empty() => return None,
790        Some(filters) => filters,
791    };
792
793    if filters
794        .iter()
795        .any(|event_filter| event_filter.event_types.is_none())
796    {
797        return None;
798    }
799
800    let mut candidate_event_types = Vec::new();
801
802    for event_filter in filters {
803        if let Some(event_types) = &event_filter.event_types {
804            for event_type in event_types {
805                if !candidate_event_types
806                    .iter()
807                    .any(|known| known == event_type)
808                {
809                    candidate_event_types.push(event_type.clone());
810                }
811            }
812        }
813    }
814
815    Some(candidate_event_types)
816}
817
818fn row_to_event_record(row: SqliteRow) -> Result<EventRecord, EventStoreError> {
819    let payload_text = row.get::<String, _>("payload");
820    let payload: Value = serde_json::from_str(&payload_text).map_err(json_backend_failure)?;
821
822    Ok(EventRecord {
823        sequence_number: row.get::<i64, _>("sequence_number") as u64,
824        occurred_at: OffsetDateTime::parse(&row.get::<String, _>("occurred_at"), &Rfc3339)
825            .map_err(time_backend_failure)?,
826        event_type: row.get::<String, _>("event_type"),
827        payload,
828    })
829}
830
831fn ensure_parent_directory(database_path: &Path) -> Result<(), io::Error> {
832    if let Some(parent_directory) = database_path.parent() {
833        if !parent_directory.as_os_str().is_empty() {
834            fs::create_dir_all(parent_directory)?;
835        }
836    }
837
838    Ok(())
839}
840
841fn sqlx_io_error(error: io::Error) -> sqlx::Error {
842    sqlx::Error::Io(error)
843}
844
845fn sqlx_backend_failure(error: sqlx::Error) -> EventStoreError {
846    EventStoreError::BackendFailure {
847        message: error.to_string(),
848    }
849}
850
851fn json_backend_failure(error: serde_json::Error) -> EventStoreError {
852    EventStoreError::BackendFailure {
853        message: error.to_string(),
854    }
855}
856
857fn time_backend_failure(error: time::error::Parse) -> EventStoreError {
858    EventStoreError::BackendFailure {
859        message: error.to_string(),
860    }
861}
862
863fn subscription_registry_backend_failure(message: String) -> EventStoreError {
864    EventStoreError::BackendFailure { message }
865}
866
867fn build_delivery_runtime(operation: &'static str) -> Result<Runtime, EventStoreError> {
868    Builder::new_current_thread()
869        .enable_all()
870        .build()
871        .map_err(sqlx_io_error)
872        .map_err(sqlx_backend_failure)
873        .map_err(|error| match error {
874            EventStoreError::BackendFailure { message } => EventStoreError::BackendFailure {
875                message: format!("{operation}: {message}"),
876            },
877            other => other,
878        })
879}
880
881async fn load_or_create_subscriber_cursor(
882    connection: &mut sqlx::SqliteConnection,
883    subscriber_id: &str,
884    event_query_json: &str,
885) -> Result<u64, EventStoreError> {
886    let existing_cursor = sqlx::query(
887        "SELECT event_query, last_processed_sequence_number
888         FROM subscriber_cursors
889         WHERE subscriber_id = ?1",
890    )
891    .bind(subscriber_id)
892    .fetch_optional(&mut *connection)
893    .await
894    .map_err(sqlx_backend_failure)?;
895
896    match existing_cursor {
897        Some(row) => {
898            let stored_event_query = row.get::<String, _>("event_query");
899            if stored_event_query != event_query_json {
900                return Err(EventStoreError::BackendFailure {
901                    message: format!(
902                        "durable subscriber {subscriber_id} was resumed with a different query"
903                    ),
904                });
905            }
906
907            Ok(row.get::<i64, _>("last_processed_sequence_number") as u64)
908        }
909        None => {
910            sqlx::query(
911                "INSERT INTO subscriber_cursors (subscriber_id, event_query, last_processed_sequence_number)
912                 VALUES (?1, ?2, 0)",
913            )
914            .bind(subscriber_id)
915            .bind(event_query_json)
916            .execute(&mut *connection)
917            .await
918            .map_err(sqlx_backend_failure)?;
919
920            Ok(0)
921        }
922    }
923}
924
925async fn current_max_sequence_number(
926    connection: &mut sqlx::SqliteConnection,
927) -> Result<u64, EventStoreError> {
928    Ok(
929        sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(sequence_number) FROM events")
930            .fetch_one(&mut *connection)
931            .await
932            .map_err(sqlx_backend_failure)?
933            .unwrap_or(0) as u64,
934    )
935}
936
937async fn ensure_replay_history_is_available(
938    connection: &mut sqlx::SqliteConnection,
939    current_max_sequence_number: u64,
940) -> Result<(), EventStoreError> {
941    if current_max_sequence_number == 0 {
942        return Ok(());
943    }
944
945    let batch_rows = sqlx::query(
946        "SELECT first_sequence_number, last_sequence_number
947         FROM append_batches
948         ORDER BY first_sequence_number ASC",
949    )
950    .fetch_all(&mut *connection)
951    .await
952    .map_err(sqlx_backend_failure)?;
953
954    if batch_rows.is_empty() {
955        return Err(EventStoreError::BackendFailure {
956            message: "durable replay requires append_batches history for all persisted events"
957                .to_owned(),
958        });
959    }
960
961    let mut expected_first_sequence_number = 1_u64;
962
963    for batch_row in batch_rows {
964        let first_sequence_number = batch_row.get::<i64, _>("first_sequence_number") as u64;
965        let last_sequence_number = batch_row.get::<i64, _>("last_sequence_number") as u64;
966
967        if first_sequence_number != expected_first_sequence_number
968            || last_sequence_number < first_sequence_number
969        {
970            return Err(EventStoreError::BackendFailure {
971                message:
972                    "durable replay requires contiguous append_batches history for all persisted events"
973                        .to_owned(),
974            });
975        }
976
977        expected_first_sequence_number = last_sequence_number + 1;
978    }
979
980    if expected_first_sequence_number - 1 != current_max_sequence_number {
981        return Err(EventStoreError::BackendFailure {
982            message: "durable replay requires append_batches history for all persisted events"
983                .to_owned(),
984        });
985    }
986
987    Ok(())
988}
989
990async fn ensure_replay_history_is_available_for_pool(
991    pool: &SqlitePool,
992    current_max_sequence_number: u64,
993) -> Result<(), EventStoreError> {
994    let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
995    ensure_replay_history_is_available(connection.as_mut(), current_max_sequence_number).await
996}
997
998async fn update_subscriber_cursor(
999    pool: &SqlitePool,
1000    subscriber_id: &str,
1001    last_processed_sequence_number: u64,
1002) -> Result<(), EventStoreError> {
1003    sqlx::query(
1004        "UPDATE subscriber_cursors
1005         SET last_processed_sequence_number = ?2
1006         WHERE subscriber_id = ?1",
1007    )
1008    .bind(subscriber_id)
1009    .bind(last_processed_sequence_number as i64)
1010    .execute(pool)
1011    .await
1012    .map_err(sqlx_backend_failure)?;
1013
1014    Ok(())
1015}
1016
1017async fn load_replay_batches(
1018    pool: &SqlitePool,
1019    event_query: &EventQuery,
1020    last_processed_sequence_number: u64,
1021    replay_until_sequence_number: u64,
1022) -> Result<Vec<ReplayBatch>, EventStoreError> {
1023    if replay_until_sequence_number <= last_processed_sequence_number {
1024        return Ok(Vec::new());
1025    }
1026
1027    let batch_rows = sqlx::query(
1028        "SELECT first_sequence_number, last_sequence_number
1029         FROM append_batches
1030         WHERE last_sequence_number > ?1
1031           AND first_sequence_number <= ?2
1032         ORDER BY first_sequence_number ASC",
1033    )
1034    .bind(last_processed_sequence_number as i64)
1035    .bind(replay_until_sequence_number as i64)
1036    .fetch_all(pool)
1037    .await
1038    .map_err(sqlx_backend_failure)?;
1039
1040    let mut replay_batches = Vec::new();
1041
1042    for batch_row in batch_rows {
1043        let first_sequence_number = batch_row.get::<i64, _>("first_sequence_number") as u64;
1044        let last_sequence_number = batch_row.get::<i64, _>("last_sequence_number") as u64;
1045        let event_rows = sqlx::query(
1046            "SELECT sequence_number, occurred_at, event_type, payload
1047             FROM events
1048             WHERE sequence_number >= ?1 AND sequence_number <= ?2
1049             ORDER BY sequence_number ASC",
1050        )
1051        .bind(first_sequence_number as i64)
1052        .bind(last_sequence_number as i64)
1053        .fetch_all(pool)
1054        .await
1055        .map_err(sqlx_backend_failure)?;
1056
1057        let delivered_batch = event_rows
1058            .into_iter()
1059            .map(row_to_event_record)
1060            .filter(|result| {
1061                result
1062                    .as_ref()
1063                    .is_ok_and(|event_record| matches_query(event_query, event_record))
1064            })
1065            .collect::<Result<Vec<_>, _>>()?;
1066
1067        replay_batches.push(ReplayBatch {
1068            last_processed_sequence_number: last_sequence_number,
1069            delivered_batch,
1070        });
1071    }
1072
1073    Ok(replay_batches)
1074}
1075
1076fn normalized_durable_event_query(event_query: &EventQuery) -> EventQuery {
1077    EventQuery {
1078        filters: event_query.filters.clone(),
1079        min_sequence_number: None,
1080    }
1081}
1082
1083fn serialize_event_query(event_query: &EventQuery) -> Result<String, EventStoreError> {
1084    let filters = event_query.filters.as_ref().map(|filters| {
1085        filters
1086            .iter()
1087            .map(|filter| {
1088                serde_json::json!({
1089                    "event_types": filter.event_types,
1090                    "payload_predicates": filter.payload_predicates,
1091                })
1092            })
1093            .collect::<Vec<_>>()
1094    });
1095
1096    serde_json::to_string(&serde_json::json!({
1097        "filters": filters,
1098        "min_sequence_number": event_query.min_sequence_number,
1099    }))
1100    .map_err(json_backend_failure)
1101}
1102
1103fn run_delivery_thread(
1104    pool: SqlitePool,
1105    subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
1106    delivery_receiver: Receiver<DeliveryCommand>,
1107) {
1108    let runtime = match Builder::new_current_thread().enable_all().build() {
1109        Ok(runtime) => runtime,
1110        Err(error) => {
1111            eprintln!("factstr-sqlite delivery runtime could not start: {}", error);
1112            return;
1113        }
1114    };
1115
1116    while let Ok(delivery_command) = delivery_receiver.recv() {
1117        match delivery_command {
1118            DeliveryCommand::Deliver(pending_deliveries) => {
1119                for pending_delivery in pending_deliveries {
1120                    match pending_delivery.deliver(&runtime) {
1121                        DeliveryOutcome::Succeeded {
1122                            subscription_id,
1123                            durable_subscriber_id,
1124                            last_processed_sequence_number,
1125                        } => {
1126                            if let Some(durable_subscriber_id) = durable_subscriber_id {
1127                                if let Err(error) = runtime.block_on(update_subscriber_cursor(
1128                                    &pool,
1129                                    &durable_subscriber_id,
1130                                    last_processed_sequence_number,
1131                                )) {
1132                                    eprintln!(
1133                                        "factstr-sqlite durable cursor update failed after delivery for subscriber {}: {}",
1134                                        durable_subscriber_id, error
1135                                    );
1136                                    match subscription_registry.lock() {
1137                                        Ok(mut subscription_registry) => {
1138                                            subscription_registry.unsubscribe(subscription_id)
1139                                        }
1140                                        Err(poisoned) => {
1141                                            poisoned.into_inner().unsubscribe(subscription_id)
1142                                        }
1143                                    }
1144                                }
1145                            }
1146                        }
1147                        DeliveryOutcome::Failed {
1148                            subscription_id,
1149                            durable_subscriber_id,
1150                        }
1151                        | DeliveryOutcome::Panicked {
1152                            subscription_id,
1153                            durable_subscriber_id,
1154                        } => {
1155                            if durable_subscriber_id.is_some() {
1156                                match subscription_registry.lock() {
1157                                    Ok(mut subscription_registry) => {
1158                                        subscription_registry.unsubscribe(subscription_id)
1159                                    }
1160                                    Err(poisoned) => {
1161                                        poisoned.into_inner().unsubscribe(subscription_id)
1162                                    }
1163                                }
1164                            }
1165                        }
1166                    }
1167                }
1168            }
1169            DeliveryCommand::Shutdown => break,
1170        }
1171    }
1172}