Skip to main content

factstr_sqlite/
sqlite_store.rs

1use std::fs;
2use std::future::Future;
3use std::io;
4use std::path::{Path, PathBuf};
5use std::sync::{
6    Arc, Mutex,
7    mpsc::{self, Receiver, Sender},
8};
9use std::thread::{self, JoinHandle};
10
11use factstr::{
12    AppendResult, DurableStream, EventQuery, EventRecord, EventStore, EventStoreError, EventStream,
13    HandleStream, NewEvent, QueryResult,
14};
15use serde_json::Value;
16use sqlx::sqlite::SqliteRow;
17use sqlx::{QueryBuilder, Row, Sqlite, SqlitePool};
18use time::OffsetDateTime;
19use time::format_description::well_known::Rfc3339;
20use tokio::runtime::Builder;
21
22use crate::connection::open_pool;
23use crate::query_match::matches_query;
24use crate::schema::initialize_schema;
25use crate::stream_registry::{DeliveryOutcome, PendingDelivery, SubscriptionRegistry};
26
27#[derive(Clone, Debug)]
28struct CommittedAppend {
29    append_result: AppendResult,
30    event_records: Vec<EventRecord>,
31}
32
33enum DeliveryCommand {
34    Deliver(Vec<PendingDelivery>),
35    Shutdown,
36}
37
38pub struct SqliteStore {
39    database_path: PathBuf,
40    pool: SqlitePool,
41    subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
42    delivery_sender: Sender<DeliveryCommand>,
43    delivery_thread: Mutex<Option<JoinHandle<()>>>,
44}
45
46#[derive(Clone, Debug)]
47struct ReplayBatch {
48    last_processed_sequence_number: u64,
49    delivered_batch: Vec<EventRecord>,
50}
51
52#[derive(Clone, Debug)]
53struct DurableReplayState {
54    subscription_id: u64,
55    last_processed_sequence_number: u64,
56    replay_until_sequence_number: u64,
57}
58
59impl SqliteStore {
60    pub fn open(database_path: impl AsRef<Path>) -> Result<Self, sqlx::Error> {
61        let database_path = database_path.as_ref().to_path_buf();
62        ensure_parent_directory(&database_path).map_err(sqlx_io_error)?;
63
64        let (result_sender, result_receiver) = mpsc::sync_channel(1);
65        let bootstrap_path = database_path.clone();
66
67        let bootstrap_thread = thread::Builder::new()
68            .name("factstr-sqlite-bootstrap".to_owned())
69            .spawn(move || {
70                let runtime = Builder::new_current_thread()
71                    .enable_all()
72                    .build()
73                    .map_err(sqlx_io_error);
74
75                let result = match runtime {
76                    Ok(runtime) => runtime.block_on(async {
77                        let pool = open_pool(&bootstrap_path).await?;
78                        initialize_schema(&pool).await?;
79                        Ok::<_, sqlx::Error>(pool)
80                    }),
81                    Err(error) => Err(error),
82                };
83
84                let _ = result_sender.send(result);
85            })
86            .map_err(sqlx_io_error)?;
87
88        let pool = match result_receiver.recv() {
89            Ok(result) => result,
90            Err(error) => Err(sqlx_io_error(io::Error::other(format!(
91                "sqlite bootstrap thread did not return a result: {error}"
92            )))),
93        }?;
94
95        bootstrap_thread.join().map_err(|_| {
96            sqlx_io_error(io::Error::other(
97                "sqlite bootstrap thread panicked before open completed",
98            ))
99        })?;
100
101        let subscription_registry = Arc::new(Mutex::new(SubscriptionRegistry::default()));
102        let (delivery_sender, delivery_receiver) = mpsc::channel();
103        let delivery_pool = pool.clone();
104        let delivery_registry = Arc::clone(&subscription_registry);
105        let delivery_thread = thread::Builder::new()
106            .name("factstr-sqlite-delivery".to_owned())
107            .spawn(move || run_delivery_thread(delivery_pool, delivery_registry, delivery_receiver))
108            .map_err(sqlx_io_error)?;
109
110        Ok(Self {
111            database_path,
112            pool,
113            subscription_registry,
114            delivery_sender,
115            delivery_thread: Mutex::new(Some(delivery_thread)),
116        })
117    }
118
119    pub fn database_path(&self) -> &Path {
120        &self.database_path
121    }
122
123    fn run_async<T, Fut, F>(&self, operation: &'static str, work: F) -> Result<T, EventStoreError>
124    where
125        T: Send + 'static,
126        Fut: Future<Output = Result<T, EventStoreError>> + Send + 'static,
127        F: FnOnce(SqlitePool) -> Fut + Send + 'static,
128    {
129        let pool = self.pool.clone();
130        let (result_sender, result_receiver) = mpsc::sync_channel(1);
131
132        let worker_thread = thread::Builder::new()
133            .name(format!("factstr-sqlite-{operation}"))
134            .spawn(move || {
135                let runtime = Builder::new_current_thread()
136                    .enable_all()
137                    .build()
138                    .map_err(sqlx_io_error)
139                    .map_err(sqlx_backend_failure);
140
141                let result = match runtime {
142                    Ok(runtime) => runtime.block_on(work(pool)),
143                    Err(error) => Err(error),
144                };
145
146                let _ = result_sender.send(result);
147            })
148            .map_err(sqlx_io_error)
149            .map_err(sqlx_backend_failure)?;
150
151        let result = result_receiver
152            .recv()
153            .map_err(|error| EventStoreError::BackendFailure {
154                message: format!("sqlite {operation} thread did not return a result: {error}"),
155            })?;
156
157        worker_thread
158            .join()
159            .map_err(|_| EventStoreError::BackendFailure {
160                message: format!("sqlite {operation} thread panicked before completion"),
161            })?;
162
163        result
164    }
165
166    fn pending_deliveries(&self, committed_batch: &[EventRecord]) -> Vec<PendingDelivery> {
167        match self.subscription_registry.lock() {
168            Ok(mut subscription_registry) => {
169                subscription_registry.pending_deliveries(committed_batch)
170            }
171            Err(poisoned) => poisoned.into_inner().pending_deliveries(committed_batch),
172        }
173    }
174
175    fn enqueue_delivery(&self, pending_deliveries: Vec<PendingDelivery>) {
176        if pending_deliveries.is_empty() {
177            return;
178        }
179
180        if let Err(error) = self
181            .delivery_sender
182            .send(DeliveryCommand::Deliver(pending_deliveries))
183        {
184            eprintln!(
185                "factstr-sqlite delivery dispatcher stopped after commit: {}",
186                error
187            );
188        }
189    }
190
191    fn register_all_durable_stream(
192        &self,
193        subscriber_id: impl Into<String>,
194        handle: HandleStream,
195    ) -> Result<EventStream, EventStoreError> {
196        self.subscribe_durable(subscriber_id.into(), EventQuery::all(), handle)
197    }
198
199    fn register_durable_stream(
200        &self,
201        subscriber_id: impl Into<String>,
202        event_query: &EventQuery,
203        handle: HandleStream,
204    ) -> Result<EventStream, EventStoreError> {
205        self.subscribe_durable(subscriber_id.into(), event_query.clone(), handle)
206    }
207
208    fn subscribe_durable(
209        &self,
210        subscriber_id: String,
211        event_query: EventQuery,
212        handle: HandleStream,
213    ) -> Result<EventStream, EventStoreError> {
214        let normalized_event_query = normalized_durable_event_query(&event_query);
215        let event_query_json = serialize_event_query(&normalized_event_query)?;
216        let subscription_registry = Arc::clone(&self.subscription_registry);
217        let durable_subscriber_id = subscriber_id.clone();
218        let normalized_event_query_for_registry = normalized_event_query.clone();
219        let handle_for_registry = handle.clone();
220        let replay_subscriber_id = subscriber_id.clone();
221
222        let replay_state = self.run_async("subscribe_durable", move |pool| async move {
223            let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
224
225            sqlx::query("BEGIN IMMEDIATE")
226                .execute(connection.as_mut())
227                .await
228                .map_err(sqlx_backend_failure)?;
229
230            let last_processed_sequence_number = load_or_create_subscriber_cursor(
231                connection.as_mut(),
232                &durable_subscriber_id,
233                &event_query_json,
234            )
235            .await?;
236            let replay_until_sequence_number =
237                current_max_sequence_number(connection.as_mut()).await?;
238
239            let subscription_id = match subscription_registry.lock() {
240                Ok(mut subscription_registry) => {
241                    if normalized_event_query_for_registry.filters.is_none() {
242                        subscription_registry
243                            .subscribe_all_durable(
244                                durable_subscriber_id.clone(),
245                                replay_until_sequence_number,
246                                handle_for_registry.clone(),
247                            )
248                            .map_err(subscription_registry_backend_failure)?
249                    } else {
250                        subscription_registry
251                            .subscribe_to_durable(
252                                durable_subscriber_id.clone(),
253                                Some(normalized_event_query_for_registry.clone()),
254                                replay_until_sequence_number,
255                                handle_for_registry.clone(),
256                            )
257                            .map_err(subscription_registry_backend_failure)?
258                    }
259                }
260                Err(poisoned) => {
261                    let mut subscription_registry = poisoned.into_inner();
262                    if normalized_event_query_for_registry.filters.is_none() {
263                        subscription_registry
264                            .subscribe_all_durable(
265                                durable_subscriber_id.clone(),
266                                replay_until_sequence_number,
267                                handle_for_registry.clone(),
268                            )
269                            .map_err(subscription_registry_backend_failure)?
270                    } else {
271                        subscription_registry
272                            .subscribe_to_durable(
273                                durable_subscriber_id.clone(),
274                                Some(normalized_event_query_for_registry.clone()),
275                                replay_until_sequence_number,
276                                handle_for_registry.clone(),
277                            )
278                            .map_err(subscription_registry_backend_failure)?
279                    }
280                }
281            };
282
283            if let Err(error) = sqlx::query("COMMIT").execute(connection.as_mut()).await {
284                match subscription_registry.lock() {
285                    Ok(mut subscription_registry) => {
286                        subscription_registry.unsubscribe(subscription_id)
287                    }
288                    Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
289                }
290                return Err(sqlx_backend_failure(error));
291            }
292
293            Ok(DurableReplayState {
294                subscription_id,
295                last_processed_sequence_number,
296                replay_until_sequence_number,
297            })
298        })?;
299
300        let subscription = self.build_subscription_handle(replay_state.subscription_id);
301
302        let replay_batches = match self.run_async("durable_replay", {
303            let event_query = normalized_event_query.clone();
304            move |pool| async move {
305                ensure_replay_history_is_available_for_pool(
306                    &pool,
307                    replay_state.replay_until_sequence_number,
308                )
309                .await?;
310                load_replay_batches(
311                    &pool,
312                    &event_query,
313                    replay_state.last_processed_sequence_number,
314                    replay_state.replay_until_sequence_number,
315                )
316                .await
317            }
318        }) {
319            Ok(replay_batches) => replay_batches,
320            Err(error) => {
321                self.cleanup_durable_subscription(replay_state.subscription_id);
322                return Err(error);
323            }
324        };
325
326        for replay_batch in replay_batches {
327            let pending_delivery = PendingDelivery {
328                subscription_id: replay_state.subscription_id,
329                durable_subscriber_id: Some(replay_subscriber_id.clone()),
330                last_processed_sequence_number: replay_batch.last_processed_sequence_number,
331                delivered_batch: replay_batch.delivered_batch,
332                handle: handle.clone(),
333            };
334
335            match self.process_durable_delivery(pending_delivery) {
336                Ok(true) => {}
337                Ok(false) => {
338                    self.cleanup_durable_subscription(replay_state.subscription_id);
339
340                    return Err(EventStoreError::BackendFailure {
341                        message: format!(
342                            "durable replay for subscriber {} did not complete successfully",
343                            replay_subscriber_id
344                        ),
345                    });
346                }
347                Err(error) => {
348                    self.cleanup_durable_subscription(replay_state.subscription_id);
349                    return Err(error);
350                }
351            }
352        }
353
354        self.finish_durable_replay(replay_state.subscription_id);
355        Ok(subscription)
356    }
357
358    fn build_subscription_handle(&self, subscription_id: u64) -> EventStream {
359        let subscription_registry = Arc::clone(&self.subscription_registry);
360
361        EventStream::new(
362            subscription_id,
363            Arc::new(move |subscription_id| match subscription_registry.lock() {
364                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
365                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
366            }),
367        )
368    }
369
370    fn process_durable_delivery(
371        &self,
372        pending_delivery: PendingDelivery,
373    ) -> Result<bool, EventStoreError> {
374        match pending_delivery.deliver() {
375            DeliveryOutcome::Succeeded {
376                durable_subscriber_id,
377                last_processed_sequence_number,
378                ..
379            } => {
380                if let Some(durable_subscriber_id) = durable_subscriber_id {
381                    self.run_async("advance_durable_cursor", move |pool| async move {
382                        update_subscriber_cursor(
383                            &pool,
384                            &durable_subscriber_id,
385                            last_processed_sequence_number,
386                        )
387                        .await
388                    })?;
389                }
390
391                Ok(true)
392            }
393            DeliveryOutcome::Failed { .. } | DeliveryOutcome::Panicked { .. } => Ok(false),
394        }
395    }
396
397    fn finish_durable_replay(&self, subscription_id: u64) {
398        match self.subscription_registry.lock() {
399            Ok(mut subscription_registry) => {
400                let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
401                self.enqueue_delivery(buffered_deliveries);
402            }
403            Err(poisoned) => {
404                let mut subscription_registry = poisoned.into_inner();
405                let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
406                self.enqueue_delivery(buffered_deliveries);
407            }
408        }
409    }
410
411    fn cleanup_durable_subscription(&self, subscription_id: u64) {
412        match self.subscription_registry.lock() {
413            Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
414            Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
415        }
416    }
417}
418
419impl Drop for SqliteStore {
420    fn drop(&mut self) {
421        let _ = self.delivery_sender.send(DeliveryCommand::Shutdown);
422
423        if let Ok(mut delivery_thread) = self.delivery_thread.lock() {
424            if let Some(delivery_thread) = delivery_thread.take() {
425                let _ = delivery_thread.join();
426            }
427        }
428    }
429}
430
431impl EventStore for SqliteStore {
432    fn query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
433        let matching_records = self.run_async("query", {
434            let event_query = event_query.clone();
435            move |pool| async move { load_matching_records(&pool, &event_query).await }
436        })?;
437
438        let current_context_version = matching_records
439            .last()
440            .map(|event_record| event_record.sequence_number);
441
442        let event_records = matching_records
443            .into_iter()
444            .filter(|event_record| {
445                event_query
446                    .min_sequence_number
447                    .is_none_or(|min_sequence_number| {
448                        event_record.sequence_number > min_sequence_number
449                    })
450            })
451            .collect::<Vec<_>>();
452
453        let last_returned_sequence_number = event_records
454            .last()
455            .map(|event_record| event_record.sequence_number);
456
457        Ok(QueryResult {
458            event_records,
459            last_returned_sequence_number,
460            current_context_version,
461        })
462    }
463
464    fn append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
465        if new_events.is_empty() {
466            return Err(EventStoreError::EmptyAppend);
467        }
468
469        let committed_append = self.run_async("append", move |pool| async move {
470            let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
471
472            sqlx::query("BEGIN IMMEDIATE")
473                .execute(connection.as_mut())
474                .await
475                .map_err(sqlx_backend_failure)?;
476
477            let append_result = append_batch(connection.as_mut(), new_events).await;
478
479            match append_result {
480                Ok(committed_append) => {
481                    sqlx::query("COMMIT")
482                        .execute(connection.as_mut())
483                        .await
484                        .map_err(sqlx_backend_failure)?;
485                    Ok(committed_append)
486                }
487                Err(error) => {
488                    let _ = sqlx::query("ROLLBACK").execute(connection.as_mut()).await;
489                    Err(error)
490                }
491            }
492        })?;
493
494        let pending_deliveries = self.pending_deliveries(&committed_append.event_records);
495        self.enqueue_delivery(pending_deliveries);
496        Ok(committed_append.append_result)
497    }
498
499    fn append_if(
500        &self,
501        new_events: Vec<NewEvent>,
502        context_query: &EventQuery,
503        expected_context_version: Option<u64>,
504    ) -> Result<AppendResult, EventStoreError> {
505        if new_events.is_empty() {
506            return Err(EventStoreError::EmptyAppend);
507        }
508
509        let committed_append = self.run_async("append_if", {
510            let context_query = context_query.clone();
511            move |pool| async move {
512                let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
513
514                sqlx::query("BEGIN IMMEDIATE")
515                    .execute(connection.as_mut())
516                    .await
517                    .map_err(sqlx_backend_failure)?;
518
519                let conflict_query = EventQuery {
520                    filters: context_query.filters.clone(),
521                    min_sequence_number: None,
522                };
523                let actual_context_version =
524                    load_matching_records_from_connection(connection.as_mut(), &conflict_query)
525                        .await?
526                        .last()
527                        .map(|event_record| event_record.sequence_number);
528
529                if actual_context_version != expected_context_version {
530                    sqlx::query("ROLLBACK")
531                        .execute(connection.as_mut())
532                        .await
533                        .map_err(sqlx_backend_failure)?;
534
535                    return Err(EventStoreError::ConditionalAppendConflict {
536                        expected: expected_context_version,
537                        actual: actual_context_version,
538                    });
539                }
540
541                let append_result = append_batch(connection.as_mut(), new_events).await;
542
543                match append_result {
544                    Ok(committed_append) => {
545                        sqlx::query("COMMIT")
546                            .execute(connection.as_mut())
547                            .await
548                            .map_err(sqlx_backend_failure)?;
549                        Ok(committed_append)
550                    }
551                    Err(error) => {
552                        let _ = sqlx::query("ROLLBACK").execute(connection.as_mut()).await;
553                        Err(error)
554                    }
555                }
556            }
557        })?;
558
559        let pending_deliveries = self.pending_deliveries(&committed_append.event_records);
560        self.enqueue_delivery(pending_deliveries);
561        Ok(committed_append.append_result)
562    }
563
564    fn stream_all(&self, handle: HandleStream) -> Result<EventStream, EventStoreError> {
565        let subscription_registry = Arc::clone(&self.subscription_registry);
566        let id = match subscription_registry.lock() {
567            Ok(mut subscription_registry) => subscription_registry.subscribe_all(handle),
568            Err(poisoned) => poisoned.into_inner().subscribe_all(handle),
569        };
570
571        Ok(EventStream::new(
572            id,
573            Arc::new(move |subscription_id| match subscription_registry.lock() {
574                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
575                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
576            }),
577        ))
578    }
579
580    fn stream_to(
581        &self,
582        event_query: &EventQuery,
583        handle: HandleStream,
584    ) -> Result<EventStream, EventStoreError> {
585        let subscription_registry = Arc::clone(&self.subscription_registry);
586        let id = match subscription_registry.lock() {
587            Ok(mut subscription_registry) => {
588                subscription_registry.subscribe_to(Some(event_query.clone()), handle)
589            }
590            Err(poisoned) => poisoned
591                .into_inner()
592                .subscribe_to(Some(event_query.clone()), handle),
593        };
594
595        Ok(EventStream::new(
596            id,
597            Arc::new(move |subscription_id| match subscription_registry.lock() {
598                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
599                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
600            }),
601        ))
602    }
603
604    fn stream_all_durable(
605        &self,
606        durable_stream: &DurableStream,
607        handle: HandleStream,
608    ) -> Result<EventStream, EventStoreError> {
609        self.register_all_durable_stream(durable_stream.name(), handle)
610    }
611
612    fn stream_to_durable(
613        &self,
614        durable_stream: &DurableStream,
615        event_query: &EventQuery,
616        handle: HandleStream,
617    ) -> Result<EventStream, EventStoreError> {
618        self.register_durable_stream(durable_stream.name(), event_query, handle)
619    }
620}
621
622async fn append_batch(
623    connection: &mut sqlx::SqliteConnection,
624    new_events: Vec<NewEvent>,
625) -> Result<CommittedAppend, EventStoreError> {
626    let last_sequence_number =
627        sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(sequence_number) FROM events")
628            .fetch_one(&mut *connection)
629            .await
630            .map_err(sqlx_backend_failure)?
631            .unwrap_or(0);
632
633    let first_sequence_number = (last_sequence_number as u64) + 1;
634    let committed_count = new_events.len() as u64;
635    let last_sequence_number = first_sequence_number + committed_count - 1;
636
637    let event_records = new_events
638        .into_iter()
639        .enumerate()
640        .map(|(offset, new_event)| EventRecord {
641            sequence_number: first_sequence_number + offset as u64,
642            occurred_at: OffsetDateTime::now_utc(),
643            event_type: new_event.event_type,
644            payload: new_event.payload,
645        })
646        .collect::<Vec<_>>();
647
648    for event_record in &event_records {
649        let payload = serde_json::to_string(&event_record.payload).map_err(json_backend_failure)?;
650        let occurred_at = event_record
651            .occurred_at
652            .format(&Rfc3339)
653            .expect("sqlite occurred_at should format as RFC3339");
654
655        sqlx::query(
656            "INSERT INTO events (sequence_number, occurred_at, event_type, payload)
657             VALUES (?1, ?2, ?3, ?4)",
658        )
659        .bind(event_record.sequence_number as i64)
660        .bind(occurred_at)
661        .bind(&event_record.event_type)
662        .bind(payload)
663        .execute(&mut *connection)
664        .await
665        .map_err(sqlx_backend_failure)?;
666    }
667
668    sqlx::query(
669        "INSERT INTO append_batches (first_sequence_number, last_sequence_number)
670         VALUES (?1, ?2)",
671    )
672    .bind(first_sequence_number as i64)
673    .bind(last_sequence_number as i64)
674    .execute(&mut *connection)
675    .await
676    .map_err(sqlx_backend_failure)?;
677
678    Ok(CommittedAppend {
679        append_result: AppendResult {
680            first_sequence_number,
681            last_sequence_number,
682            committed_count,
683        },
684        event_records,
685    })
686}
687
688async fn load_matching_records(
689    pool: &SqlitePool,
690    event_query: &EventQuery,
691) -> Result<Vec<EventRecord>, EventStoreError> {
692    let candidate_event_types = event_type_candidates(event_query);
693    let rows = fetch_candidate_rows(pool, candidate_event_types.as_deref()).await?;
694    matching_records_from_rows(rows, event_query)
695}
696
697async fn load_matching_records_from_connection(
698    connection: &mut sqlx::SqliteConnection,
699    event_query: &EventQuery,
700) -> Result<Vec<EventRecord>, EventStoreError> {
701    let candidate_event_types = event_type_candidates(event_query);
702    let rows =
703        fetch_candidate_rows_from_connection(connection, candidate_event_types.as_deref()).await?;
704    matching_records_from_rows(rows, event_query)
705}
706
707fn matching_records_from_rows(
708    rows: Vec<SqliteRow>,
709    event_query: &EventQuery,
710) -> Result<Vec<EventRecord>, EventStoreError> {
711    rows.into_iter()
712        .map(row_to_event_record)
713        .filter(|result| {
714            result
715                .as_ref()
716                .is_ok_and(|event_record| matches_query(event_query, event_record))
717        })
718        .collect()
719}
720
721async fn fetch_candidate_rows(
722    pool: &SqlitePool,
723    candidate_event_types: Option<&[String]>,
724) -> Result<Vec<SqliteRow>, EventStoreError> {
725    let mut query_builder = QueryBuilder::<Sqlite>::new(
726        "SELECT sequence_number, occurred_at, event_type, payload FROM events",
727    );
728
729    if let Some(candidate_event_types) = candidate_event_types {
730        if candidate_event_types.is_empty() {
731            query_builder.push(" WHERE FALSE");
732        } else {
733            query_builder.push(" WHERE event_type IN (");
734
735            let mut separated = query_builder.separated(", ");
736            for event_type in candidate_event_types {
737                separated.push_bind(event_type);
738            }
739            separated.push_unseparated(")");
740        }
741    }
742
743    query_builder.push(" ORDER BY sequence_number ASC");
744
745    query_builder
746        .build()
747        .fetch_all(pool)
748        .await
749        .map_err(sqlx_backend_failure)
750}
751
752async fn fetch_candidate_rows_from_connection(
753    connection: &mut sqlx::SqliteConnection,
754    candidate_event_types: Option<&[String]>,
755) -> Result<Vec<SqliteRow>, EventStoreError> {
756    let mut query_builder = QueryBuilder::<Sqlite>::new(
757        "SELECT sequence_number, occurred_at, event_type, payload FROM events",
758    );
759
760    if let Some(candidate_event_types) = candidate_event_types {
761        if candidate_event_types.is_empty() {
762            query_builder.push(" WHERE FALSE");
763        } else {
764            query_builder.push(" WHERE event_type IN (");
765
766            let mut separated = query_builder.separated(", ");
767            for event_type in candidate_event_types {
768                separated.push_bind(event_type);
769            }
770            separated.push_unseparated(")");
771        }
772    }
773
774    query_builder.push(" ORDER BY sequence_number ASC");
775
776    query_builder
777        .build()
778        .fetch_all(connection)
779        .await
780        .map_err(sqlx_backend_failure)
781}
782
783fn event_type_candidates(event_query: &EventQuery) -> Option<Vec<String>> {
784    let filters = match &event_query.filters {
785        None => return None,
786        Some(filters) if filters.is_empty() => return None,
787        Some(filters) => filters,
788    };
789
790    if filters
791        .iter()
792        .any(|event_filter| event_filter.event_types.is_none())
793    {
794        return None;
795    }
796
797    let mut candidate_event_types = Vec::new();
798
799    for event_filter in filters {
800        if let Some(event_types) = &event_filter.event_types {
801            for event_type in event_types {
802                if !candidate_event_types
803                    .iter()
804                    .any(|known| known == event_type)
805                {
806                    candidate_event_types.push(event_type.clone());
807                }
808            }
809        }
810    }
811
812    Some(candidate_event_types)
813}
814
815fn row_to_event_record(row: SqliteRow) -> Result<EventRecord, EventStoreError> {
816    let payload_text = row.get::<String, _>("payload");
817    let payload: Value = serde_json::from_str(&payload_text).map_err(json_backend_failure)?;
818
819    Ok(EventRecord {
820        sequence_number: row.get::<i64, _>("sequence_number") as u64,
821        occurred_at: OffsetDateTime::parse(&row.get::<String, _>("occurred_at"), &Rfc3339)
822            .map_err(time_backend_failure)?,
823        event_type: row.get::<String, _>("event_type"),
824        payload,
825    })
826}
827
828fn ensure_parent_directory(database_path: &Path) -> Result<(), io::Error> {
829    if let Some(parent_directory) = database_path.parent() {
830        if !parent_directory.as_os_str().is_empty() {
831            fs::create_dir_all(parent_directory)?;
832        }
833    }
834
835    Ok(())
836}
837
838fn sqlx_io_error(error: io::Error) -> sqlx::Error {
839    sqlx::Error::Io(error)
840}
841
842fn sqlx_backend_failure(error: sqlx::Error) -> EventStoreError {
843    EventStoreError::BackendFailure {
844        message: error.to_string(),
845    }
846}
847
848fn json_backend_failure(error: serde_json::Error) -> EventStoreError {
849    EventStoreError::BackendFailure {
850        message: error.to_string(),
851    }
852}
853
854fn time_backend_failure(error: time::error::Parse) -> EventStoreError {
855    EventStoreError::BackendFailure {
856        message: error.to_string(),
857    }
858}
859
860fn subscription_registry_backend_failure(message: String) -> EventStoreError {
861    EventStoreError::BackendFailure { message }
862}
863
864async fn load_or_create_subscriber_cursor(
865    connection: &mut sqlx::SqliteConnection,
866    subscriber_id: &str,
867    event_query_json: &str,
868) -> Result<u64, EventStoreError> {
869    let existing_cursor = sqlx::query(
870        "SELECT event_query, last_processed_sequence_number
871         FROM subscriber_cursors
872         WHERE subscriber_id = ?1",
873    )
874    .bind(subscriber_id)
875    .fetch_optional(&mut *connection)
876    .await
877    .map_err(sqlx_backend_failure)?;
878
879    match existing_cursor {
880        Some(row) => {
881            let stored_event_query = row.get::<String, _>("event_query");
882            if stored_event_query != event_query_json {
883                return Err(EventStoreError::BackendFailure {
884                    message: format!(
885                        "durable subscriber {subscriber_id} was resumed with a different query"
886                    ),
887                });
888            }
889
890            Ok(row.get::<i64, _>("last_processed_sequence_number") as u64)
891        }
892        None => {
893            sqlx::query(
894                "INSERT INTO subscriber_cursors (subscriber_id, event_query, last_processed_sequence_number)
895                 VALUES (?1, ?2, 0)",
896            )
897            .bind(subscriber_id)
898            .bind(event_query_json)
899            .execute(&mut *connection)
900            .await
901            .map_err(sqlx_backend_failure)?;
902
903            Ok(0)
904        }
905    }
906}
907
908async fn current_max_sequence_number(
909    connection: &mut sqlx::SqliteConnection,
910) -> Result<u64, EventStoreError> {
911    Ok(
912        sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(sequence_number) FROM events")
913            .fetch_one(&mut *connection)
914            .await
915            .map_err(sqlx_backend_failure)?
916            .unwrap_or(0) as u64,
917    )
918}
919
920async fn ensure_replay_history_is_available(
921    connection: &mut sqlx::SqliteConnection,
922    current_max_sequence_number: u64,
923) -> Result<(), EventStoreError> {
924    if current_max_sequence_number == 0 {
925        return Ok(());
926    }
927
928    let batch_rows = sqlx::query(
929        "SELECT first_sequence_number, last_sequence_number
930         FROM append_batches
931         ORDER BY first_sequence_number ASC",
932    )
933    .fetch_all(&mut *connection)
934    .await
935    .map_err(sqlx_backend_failure)?;
936
937    if batch_rows.is_empty() {
938        return Err(EventStoreError::BackendFailure {
939            message: "durable replay requires append_batches history for all persisted events"
940                .to_owned(),
941        });
942    }
943
944    let mut expected_first_sequence_number = 1_u64;
945
946    for batch_row in batch_rows {
947        let first_sequence_number = batch_row.get::<i64, _>("first_sequence_number") as u64;
948        let last_sequence_number = batch_row.get::<i64, _>("last_sequence_number") as u64;
949
950        if first_sequence_number != expected_first_sequence_number
951            || last_sequence_number < first_sequence_number
952        {
953            return Err(EventStoreError::BackendFailure {
954                message:
955                    "durable replay requires contiguous append_batches history for all persisted events"
956                        .to_owned(),
957            });
958        }
959
960        expected_first_sequence_number = last_sequence_number + 1;
961    }
962
963    if expected_first_sequence_number - 1 != current_max_sequence_number {
964        return Err(EventStoreError::BackendFailure {
965            message: "durable replay requires append_batches history for all persisted events"
966                .to_owned(),
967        });
968    }
969
970    Ok(())
971}
972
973async fn ensure_replay_history_is_available_for_pool(
974    pool: &SqlitePool,
975    current_max_sequence_number: u64,
976) -> Result<(), EventStoreError> {
977    let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
978    ensure_replay_history_is_available(connection.as_mut(), current_max_sequence_number).await
979}
980
981async fn update_subscriber_cursor(
982    pool: &SqlitePool,
983    subscriber_id: &str,
984    last_processed_sequence_number: u64,
985) -> Result<(), EventStoreError> {
986    sqlx::query(
987        "UPDATE subscriber_cursors
988         SET last_processed_sequence_number = ?2
989         WHERE subscriber_id = ?1",
990    )
991    .bind(subscriber_id)
992    .bind(last_processed_sequence_number as i64)
993    .execute(pool)
994    .await
995    .map_err(sqlx_backend_failure)?;
996
997    Ok(())
998}
999
1000async fn load_replay_batches(
1001    pool: &SqlitePool,
1002    event_query: &EventQuery,
1003    last_processed_sequence_number: u64,
1004    replay_until_sequence_number: u64,
1005) -> Result<Vec<ReplayBatch>, EventStoreError> {
1006    if replay_until_sequence_number <= last_processed_sequence_number {
1007        return Ok(Vec::new());
1008    }
1009
1010    let batch_rows = sqlx::query(
1011        "SELECT first_sequence_number, last_sequence_number
1012         FROM append_batches
1013         WHERE last_sequence_number > ?1
1014           AND first_sequence_number <= ?2
1015         ORDER BY first_sequence_number ASC",
1016    )
1017    .bind(last_processed_sequence_number as i64)
1018    .bind(replay_until_sequence_number as i64)
1019    .fetch_all(pool)
1020    .await
1021    .map_err(sqlx_backend_failure)?;
1022
1023    let mut replay_batches = Vec::new();
1024
1025    for batch_row in batch_rows {
1026        let first_sequence_number = batch_row.get::<i64, _>("first_sequence_number") as u64;
1027        let last_sequence_number = batch_row.get::<i64, _>("last_sequence_number") as u64;
1028        let event_rows = sqlx::query(
1029            "SELECT sequence_number, occurred_at, event_type, payload
1030             FROM events
1031             WHERE sequence_number >= ?1 AND sequence_number <= ?2
1032             ORDER BY sequence_number ASC",
1033        )
1034        .bind(first_sequence_number as i64)
1035        .bind(last_sequence_number as i64)
1036        .fetch_all(pool)
1037        .await
1038        .map_err(sqlx_backend_failure)?;
1039
1040        let delivered_batch = event_rows
1041            .into_iter()
1042            .map(row_to_event_record)
1043            .filter(|result| {
1044                result
1045                    .as_ref()
1046                    .is_ok_and(|event_record| matches_query(event_query, event_record))
1047            })
1048            .collect::<Result<Vec<_>, _>>()?;
1049
1050        replay_batches.push(ReplayBatch {
1051            last_processed_sequence_number: last_sequence_number,
1052            delivered_batch,
1053        });
1054    }
1055
1056    Ok(replay_batches)
1057}
1058
1059fn normalized_durable_event_query(event_query: &EventQuery) -> EventQuery {
1060    EventQuery {
1061        filters: event_query.filters.clone(),
1062        min_sequence_number: None,
1063    }
1064}
1065
1066fn serialize_event_query(event_query: &EventQuery) -> Result<String, EventStoreError> {
1067    let filters = event_query.filters.as_ref().map(|filters| {
1068        filters
1069            .iter()
1070            .map(|filter| {
1071                serde_json::json!({
1072                    "event_types": filter.event_types,
1073                    "payload_predicates": filter.payload_predicates,
1074                })
1075            })
1076            .collect::<Vec<_>>()
1077    });
1078
1079    serde_json::to_string(&serde_json::json!({
1080        "filters": filters,
1081        "min_sequence_number": event_query.min_sequence_number,
1082    }))
1083    .map_err(json_backend_failure)
1084}
1085
1086fn run_delivery_thread(
1087    pool: SqlitePool,
1088    subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
1089    delivery_receiver: Receiver<DeliveryCommand>,
1090) {
1091    let runtime = match Builder::new_current_thread().enable_all().build() {
1092        Ok(runtime) => runtime,
1093        Err(error) => {
1094            eprintln!("factstr-sqlite delivery runtime could not start: {}", error);
1095            return;
1096        }
1097    };
1098
1099    while let Ok(delivery_command) = delivery_receiver.recv() {
1100        match delivery_command {
1101            DeliveryCommand::Deliver(pending_deliveries) => {
1102                for pending_delivery in pending_deliveries {
1103                    match pending_delivery.deliver() {
1104                        DeliveryOutcome::Succeeded {
1105                            subscription_id,
1106                            durable_subscriber_id,
1107                            last_processed_sequence_number,
1108                        } => {
1109                            if let Some(durable_subscriber_id) = durable_subscriber_id {
1110                                if let Err(error) = runtime.block_on(update_subscriber_cursor(
1111                                    &pool,
1112                                    &durable_subscriber_id,
1113                                    last_processed_sequence_number,
1114                                )) {
1115                                    eprintln!(
1116                                        "factstr-sqlite durable cursor update failed after delivery for subscriber {}: {}",
1117                                        durable_subscriber_id, error
1118                                    );
1119                                    match subscription_registry.lock() {
1120                                        Ok(mut subscription_registry) => {
1121                                            subscription_registry.unsubscribe(subscription_id)
1122                                        }
1123                                        Err(poisoned) => {
1124                                            poisoned.into_inner().unsubscribe(subscription_id)
1125                                        }
1126                                    }
1127                                }
1128                            }
1129                        }
1130                        DeliveryOutcome::Failed {
1131                            subscription_id,
1132                            durable_subscriber_id,
1133                        }
1134                        | DeliveryOutcome::Panicked {
1135                            subscription_id,
1136                            durable_subscriber_id,
1137                        } => {
1138                            if durable_subscriber_id.is_some() {
1139                                match subscription_registry.lock() {
1140                                    Ok(mut subscription_registry) => {
1141                                        subscription_registry.unsubscribe(subscription_id)
1142                                    }
1143                                    Err(poisoned) => {
1144                                        poisoned.into_inner().unsubscribe(subscription_id)
1145                                    }
1146                                }
1147                            }
1148                        }
1149                    }
1150                }
1151            }
1152            DeliveryCommand::Shutdown => break,
1153        }
1154    }
1155}