Skip to main content

factstr_sqlite/
sqlite_store.rs

1use std::fs;
2use std::future::Future;
3use std::io;
4use std::path::{Path, PathBuf};
5use std::sync::{
6    Arc, Mutex,
7    mpsc::{self, Receiver, Sender},
8};
9use std::thread::{self, JoinHandle};
10
11use factstr::{
12    AppendResult, DurableStream, EventQuery, EventRecord, EventStore, EventStoreError, EventStream,
13    HandleStream, NewEvent, QueryResult,
14};
15use serde_json::Value;
16use sqlx::sqlite::SqliteRow;
17use sqlx::{QueryBuilder, Row, Sqlite, SqlitePool};
18use time::OffsetDateTime;
19use time::format_description::well_known::Rfc3339;
20use tokio::runtime::Builder;
21use tokio::runtime::Runtime;
22
23use crate::connection::open_pool;
24use crate::query_match::matches_query;
25use crate::schema::{
26    APPEND_BATCH_BOUNDARY_FORMAT_KEY, APPEND_BATCH_BOUNDARY_FORMAT_SPARSE_V1, initialize_schema,
27};
28use crate::stream_registry::{DeliveryOutcome, PendingDelivery, SubscriptionRegistry};
29
30#[derive(Clone, Debug)]
31struct CommittedAppend {
32    append_result: AppendResult,
33    event_records: Vec<EventRecord>,
34}
35
36enum DeliveryCommand {
37    Deliver(Vec<PendingDelivery>),
38    Shutdown,
39}
40
41pub struct SqliteStore {
42    database_path: PathBuf,
43    pool: SqlitePool,
44    subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
45    delivery_sender: Sender<DeliveryCommand>,
46    delivery_thread: Mutex<Option<JoinHandle<()>>>,
47}
48
49#[derive(Clone, Debug)]
50struct ReplayBatch {
51    last_processed_sequence_number: u64,
52    delivered_batch: Vec<EventRecord>,
53}
54
55#[derive(Clone, Debug)]
56struct AppendBatchBoundary {
57    first_sequence_number: u64,
58    last_sequence_number: u64,
59}
60
61#[derive(Clone, Debug)]
62struct DurableReplayState {
63    subscription_id: u64,
64    last_processed_sequence_number: u64,
65    replay_until_sequence_number: u64,
66}
67
68impl SqliteStore {
69    pub fn open(database_path: impl AsRef<Path>) -> Result<Self, sqlx::Error> {
70        let database_path = database_path.as_ref().to_path_buf();
71        ensure_parent_directory(&database_path).map_err(sqlx_io_error)?;
72
73        let (result_sender, result_receiver) = mpsc::sync_channel(1);
74        let bootstrap_path = database_path.clone();
75
76        let bootstrap_thread = thread::Builder::new()
77            .name("factstr-sqlite-bootstrap".to_owned())
78            .spawn(move || {
79                let runtime = Builder::new_current_thread()
80                    .enable_all()
81                    .build()
82                    .map_err(sqlx_io_error);
83
84                let result = match runtime {
85                    Ok(runtime) => runtime.block_on(async {
86                        let pool = open_pool(&bootstrap_path).await?;
87                        initialize_schema(&pool).await?;
88                        Ok::<_, sqlx::Error>(pool)
89                    }),
90                    Err(error) => Err(error),
91                };
92
93                let _ = result_sender.send(result);
94            })
95            .map_err(sqlx_io_error)?;
96
97        let pool = match result_receiver.recv() {
98            Ok(result) => result,
99            Err(error) => Err(sqlx_io_error(io::Error::other(format!(
100                "sqlite bootstrap thread did not return a result: {error}"
101            )))),
102        }?;
103
104        bootstrap_thread.join().map_err(|_| {
105            sqlx_io_error(io::Error::other(
106                "sqlite bootstrap thread panicked before open completed",
107            ))
108        })?;
109
110        let subscription_registry = Arc::new(Mutex::new(SubscriptionRegistry::default()));
111        let (delivery_sender, delivery_receiver) = mpsc::channel();
112        let delivery_pool = pool.clone();
113        let delivery_registry = Arc::clone(&subscription_registry);
114        let delivery_thread = thread::Builder::new()
115            .name("factstr-sqlite-delivery".to_owned())
116            .spawn(move || run_delivery_thread(delivery_pool, delivery_registry, delivery_receiver))
117            .map_err(sqlx_io_error)?;
118
119        Ok(Self {
120            database_path,
121            pool,
122            subscription_registry,
123            delivery_sender,
124            delivery_thread: Mutex::new(Some(delivery_thread)),
125        })
126    }
127
128    pub fn database_path(&self) -> &Path {
129        &self.database_path
130    }
131
132    fn run_async<T, Fut, F>(&self, operation: &'static str, work: F) -> Result<T, EventStoreError>
133    where
134        T: Send + 'static,
135        Fut: Future<Output = Result<T, EventStoreError>> + Send + 'static,
136        F: FnOnce(SqlitePool) -> Fut + Send + 'static,
137    {
138        let pool = self.pool.clone();
139        let (result_sender, result_receiver) = mpsc::sync_channel(1);
140
141        let worker_thread = thread::Builder::new()
142            .name(format!("factstr-sqlite-{operation}"))
143            .spawn(move || {
144                let runtime = Builder::new_current_thread()
145                    .enable_all()
146                    .build()
147                    .map_err(sqlx_io_error)
148                    .map_err(sqlx_backend_failure);
149
150                let result = match runtime {
151                    Ok(runtime) => runtime.block_on(work(pool)),
152                    Err(error) => Err(error),
153                };
154
155                let _ = result_sender.send(result);
156            })
157            .map_err(sqlx_io_error)
158            .map_err(sqlx_backend_failure)?;
159
160        let result = result_receiver
161            .recv()
162            .map_err(|error| EventStoreError::BackendFailure {
163                message: format!("sqlite {operation} thread did not return a result: {error}"),
164            })?;
165
166        worker_thread
167            .join()
168            .map_err(|_| EventStoreError::BackendFailure {
169                message: format!("sqlite {operation} thread panicked before completion"),
170            })?;
171
172        result
173    }
174
175    fn pending_deliveries(&self, committed_batch: &[EventRecord]) -> Vec<PendingDelivery> {
176        match self.subscription_registry.lock() {
177            Ok(mut subscription_registry) => {
178                subscription_registry.pending_deliveries(committed_batch)
179            }
180            Err(poisoned) => poisoned.into_inner().pending_deliveries(committed_batch),
181        }
182    }
183
184    fn enqueue_delivery(&self, pending_deliveries: Vec<PendingDelivery>) {
185        if pending_deliveries.is_empty() {
186            return;
187        }
188
189        if let Err(error) = self
190            .delivery_sender
191            .send(DeliveryCommand::Deliver(pending_deliveries))
192        {
193            eprintln!(
194                "factstr-sqlite delivery dispatcher stopped after commit: {}",
195                error
196            );
197        }
198    }
199
200    fn register_all_durable_stream(
201        &self,
202        subscriber_id: impl Into<String>,
203        handle: HandleStream,
204    ) -> Result<EventStream, EventStoreError> {
205        self.subscribe_durable(subscriber_id.into(), EventQuery::all(), handle)
206    }
207
208    fn register_durable_stream(
209        &self,
210        subscriber_id: impl Into<String>,
211        event_query: &EventQuery,
212        handle: HandleStream,
213    ) -> Result<EventStream, EventStoreError> {
214        self.subscribe_durable(subscriber_id.into(), event_query.clone(), handle)
215    }
216
217    fn subscribe_durable(
218        &self,
219        subscriber_id: String,
220        event_query: EventQuery,
221        handle: HandleStream,
222    ) -> Result<EventStream, EventStoreError> {
223        let normalized_event_query = normalized_durable_event_query(&event_query);
224        let event_query_json = serialize_event_query(&normalized_event_query)?;
225        let subscription_registry = Arc::clone(&self.subscription_registry);
226        let durable_subscriber_id = subscriber_id.clone();
227        let normalized_event_query_for_registry = normalized_event_query.clone();
228        let handle_for_registry = handle.clone();
229        let replay_subscriber_id = subscriber_id.clone();
230
231        let replay_state = self.run_async("subscribe_durable", move |pool| async move {
232            let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
233
234            sqlx::query("BEGIN IMMEDIATE")
235                .execute(connection.as_mut())
236                .await
237                .map_err(sqlx_backend_failure)?;
238
239            let last_processed_sequence_number = load_or_create_subscriber_cursor(
240                connection.as_mut(),
241                &durable_subscriber_id,
242                &event_query_json,
243            )
244            .await?;
245            let replay_until_sequence_number =
246                current_max_sequence_number(connection.as_mut()).await?;
247
248            let subscription_id = match subscription_registry.lock() {
249                Ok(mut subscription_registry) => {
250                    if normalized_event_query_for_registry.filters.is_none() {
251                        subscription_registry
252                            .subscribe_all_durable(
253                                durable_subscriber_id.clone(),
254                                replay_until_sequence_number,
255                                handle_for_registry.clone(),
256                            )
257                            .map_err(subscription_registry_backend_failure)?
258                    } else {
259                        subscription_registry
260                            .subscribe_to_durable(
261                                durable_subscriber_id.clone(),
262                                Some(normalized_event_query_for_registry.clone()),
263                                replay_until_sequence_number,
264                                handle_for_registry.clone(),
265                            )
266                            .map_err(subscription_registry_backend_failure)?
267                    }
268                }
269                Err(poisoned) => {
270                    let mut subscription_registry = poisoned.into_inner();
271                    if normalized_event_query_for_registry.filters.is_none() {
272                        subscription_registry
273                            .subscribe_all_durable(
274                                durable_subscriber_id.clone(),
275                                replay_until_sequence_number,
276                                handle_for_registry.clone(),
277                            )
278                            .map_err(subscription_registry_backend_failure)?
279                    } else {
280                        subscription_registry
281                            .subscribe_to_durable(
282                                durable_subscriber_id.clone(),
283                                Some(normalized_event_query_for_registry.clone()),
284                                replay_until_sequence_number,
285                                handle_for_registry.clone(),
286                            )
287                            .map_err(subscription_registry_backend_failure)?
288                    }
289                }
290            };
291
292            if let Err(error) = sqlx::query("COMMIT").execute(connection.as_mut()).await {
293                match subscription_registry.lock() {
294                    Ok(mut subscription_registry) => {
295                        subscription_registry.unsubscribe(subscription_id)
296                    }
297                    Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
298                }
299                return Err(sqlx_backend_failure(error));
300            }
301
302            Ok(DurableReplayState {
303                subscription_id,
304                last_processed_sequence_number,
305                replay_until_sequence_number,
306            })
307        })?;
308
309        let subscription = self.build_subscription_handle(replay_state.subscription_id);
310
311        let replay_batches = match self.run_async("durable_replay", {
312            let event_query = normalized_event_query.clone();
313            move |pool| async move {
314                ensure_replay_history_is_available_for_pool(
315                    &pool,
316                    replay_state.replay_until_sequence_number,
317                )
318                .await?;
319                load_replay_batches(
320                    &pool,
321                    &event_query,
322                    replay_state.last_processed_sequence_number,
323                    replay_state.replay_until_sequence_number,
324                )
325                .await
326            }
327        }) {
328            Ok(replay_batches) => replay_batches,
329            Err(error) => {
330                self.cleanup_durable_subscription(replay_state.subscription_id);
331                return Err(error);
332            }
333        };
334        let delivery_runtime = build_delivery_runtime("factstr-sqlite durable replay")?;
335
336        for replay_batch in replay_batches {
337            let pending_delivery = PendingDelivery {
338                subscription_id: replay_state.subscription_id,
339                durable_subscriber_id: Some(replay_subscriber_id.clone()),
340                last_processed_sequence_number: replay_batch.last_processed_sequence_number,
341                delivered_batch: replay_batch.delivered_batch,
342                handle: handle.clone(),
343            };
344
345            match self.process_durable_delivery(&delivery_runtime, pending_delivery) {
346                Ok(true) => {}
347                Ok(false) => {
348                    self.cleanup_durable_subscription(replay_state.subscription_id);
349
350                    return Err(EventStoreError::BackendFailure {
351                        message: format!(
352                            "durable replay for subscriber {} did not complete successfully",
353                            replay_subscriber_id
354                        ),
355                    });
356                }
357                Err(error) => {
358                    self.cleanup_durable_subscription(replay_state.subscription_id);
359                    return Err(error);
360                }
361            }
362        }
363
364        self.finish_durable_replay(replay_state.subscription_id);
365        Ok(subscription)
366    }
367
368    fn build_subscription_handle(&self, subscription_id: u64) -> EventStream {
369        let subscription_registry = Arc::clone(&self.subscription_registry);
370
371        EventStream::new(
372            subscription_id,
373            Arc::new(move |subscription_id| match subscription_registry.lock() {
374                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
375                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
376            }),
377        )
378    }
379
380    fn process_durable_delivery(
381        &self,
382        delivery_runtime: &Runtime,
383        pending_delivery: PendingDelivery,
384    ) -> Result<bool, EventStoreError> {
385        match pending_delivery.deliver(delivery_runtime) {
386            DeliveryOutcome::Succeeded {
387                durable_subscriber_id,
388                last_processed_sequence_number,
389                ..
390            } => {
391                if let Some(durable_subscriber_id) = durable_subscriber_id {
392                    self.run_async("advance_durable_cursor", move |pool| async move {
393                        update_subscriber_cursor(
394                            &pool,
395                            &durable_subscriber_id,
396                            last_processed_sequence_number,
397                        )
398                        .await
399                    })?;
400                }
401
402                Ok(true)
403            }
404            DeliveryOutcome::Failed { .. } | DeliveryOutcome::Panicked { .. } => Ok(false),
405        }
406    }
407
408    fn finish_durable_replay(&self, subscription_id: u64) {
409        match self.subscription_registry.lock() {
410            Ok(mut subscription_registry) => {
411                let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
412                self.enqueue_delivery(buffered_deliveries);
413            }
414            Err(poisoned) => {
415                let mut subscription_registry = poisoned.into_inner();
416                let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
417                self.enqueue_delivery(buffered_deliveries);
418            }
419        }
420    }
421
422    fn cleanup_durable_subscription(&self, subscription_id: u64) {
423        match self.subscription_registry.lock() {
424            Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
425            Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
426        }
427    }
428}
429
430impl Drop for SqliteStore {
431    fn drop(&mut self) {
432        let _ = self.delivery_sender.send(DeliveryCommand::Shutdown);
433
434        if let Ok(mut delivery_thread) = self.delivery_thread.lock() {
435            if let Some(delivery_thread) = delivery_thread.take() {
436                let _ = delivery_thread.join();
437            }
438        }
439    }
440}
441
442impl EventStore for SqliteStore {
443    fn query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
444        let matching_records = self.run_async("query", {
445            let event_query = event_query.clone();
446            move |pool| async move { load_matching_records(&pool, &event_query).await }
447        })?;
448
449        let current_context_version = matching_records
450            .last()
451            .map(|event_record| event_record.sequence_number);
452
453        let event_records = matching_records
454            .into_iter()
455            .filter(|event_record| {
456                event_query
457                    .min_sequence_number
458                    .is_none_or(|min_sequence_number| {
459                        event_record.sequence_number > min_sequence_number
460                    })
461            })
462            .collect::<Vec<_>>();
463
464        let last_returned_sequence_number = event_records
465            .last()
466            .map(|event_record| event_record.sequence_number);
467
468        Ok(QueryResult {
469            event_records,
470            last_returned_sequence_number,
471            current_context_version,
472        })
473    }
474
475    fn append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
476        if new_events.is_empty() {
477            return Err(EventStoreError::EmptyAppend);
478        }
479
480        let committed_append = self.run_async("append", move |pool| async move {
481            let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
482
483            sqlx::query("BEGIN IMMEDIATE")
484                .execute(connection.as_mut())
485                .await
486                .map_err(sqlx_backend_failure)?;
487
488            let append_result = append_batch(connection.as_mut(), new_events).await;
489
490            match append_result {
491                Ok(committed_append) => {
492                    sqlx::query("COMMIT")
493                        .execute(connection.as_mut())
494                        .await
495                        .map_err(sqlx_backend_failure)?;
496                    Ok(committed_append)
497                }
498                Err(error) => {
499                    let _ = sqlx::query("ROLLBACK").execute(connection.as_mut()).await;
500                    Err(error)
501                }
502            }
503        })?;
504
505        let pending_deliveries = self.pending_deliveries(&committed_append.event_records);
506        self.enqueue_delivery(pending_deliveries);
507        Ok(committed_append.append_result)
508    }
509
510    fn append_if(
511        &self,
512        new_events: Vec<NewEvent>,
513        context_query: &EventQuery,
514        expected_context_version: Option<u64>,
515    ) -> Result<AppendResult, EventStoreError> {
516        if new_events.is_empty() {
517            return Err(EventStoreError::EmptyAppend);
518        }
519
520        let committed_append = self.run_async("append_if", {
521            let context_query = context_query.clone();
522            move |pool| async move {
523                let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
524
525                sqlx::query("BEGIN IMMEDIATE")
526                    .execute(connection.as_mut())
527                    .await
528                    .map_err(sqlx_backend_failure)?;
529
530                let conflict_query = EventQuery {
531                    filters: context_query.filters.clone(),
532                    min_sequence_number: None,
533                };
534                let actual_context_version =
535                    load_matching_records_from_connection(connection.as_mut(), &conflict_query)
536                        .await?
537                        .last()
538                        .map(|event_record| event_record.sequence_number);
539
540                if actual_context_version != expected_context_version {
541                    sqlx::query("ROLLBACK")
542                        .execute(connection.as_mut())
543                        .await
544                        .map_err(sqlx_backend_failure)?;
545
546                    return Err(EventStoreError::ConditionalAppendConflict {
547                        expected: expected_context_version,
548                        actual: actual_context_version,
549                    });
550                }
551
552                let append_result = append_batch(connection.as_mut(), new_events).await;
553
554                match append_result {
555                    Ok(committed_append) => {
556                        sqlx::query("COMMIT")
557                            .execute(connection.as_mut())
558                            .await
559                            .map_err(sqlx_backend_failure)?;
560                        Ok(committed_append)
561                    }
562                    Err(error) => {
563                        let _ = sqlx::query("ROLLBACK").execute(connection.as_mut()).await;
564                        Err(error)
565                    }
566                }
567            }
568        })?;
569
570        let pending_deliveries = self.pending_deliveries(&committed_append.event_records);
571        self.enqueue_delivery(pending_deliveries);
572        Ok(committed_append.append_result)
573    }
574
575    fn stream_all(&self, handle: HandleStream) -> Result<EventStream, EventStoreError> {
576        let subscription_registry = Arc::clone(&self.subscription_registry);
577        let id = match subscription_registry.lock() {
578            Ok(mut subscription_registry) => subscription_registry.subscribe_all(handle),
579            Err(poisoned) => poisoned.into_inner().subscribe_all(handle),
580        };
581
582        Ok(EventStream::new(
583            id,
584            Arc::new(move |subscription_id| match subscription_registry.lock() {
585                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
586                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
587            }),
588        ))
589    }
590
591    fn stream_to(
592        &self,
593        event_query: &EventQuery,
594        handle: HandleStream,
595    ) -> Result<EventStream, EventStoreError> {
596        let subscription_registry = Arc::clone(&self.subscription_registry);
597        let id = match subscription_registry.lock() {
598            Ok(mut subscription_registry) => {
599                subscription_registry.subscribe_to(Some(event_query.clone()), handle)
600            }
601            Err(poisoned) => poisoned
602                .into_inner()
603                .subscribe_to(Some(event_query.clone()), handle),
604        };
605
606        Ok(EventStream::new(
607            id,
608            Arc::new(move |subscription_id| match subscription_registry.lock() {
609                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
610                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
611            }),
612        ))
613    }
614
615    fn stream_all_durable(
616        &self,
617        durable_stream: &DurableStream,
618        handle: HandleStream,
619    ) -> Result<EventStream, EventStoreError> {
620        self.register_all_durable_stream(durable_stream.name(), handle)
621    }
622
623    fn stream_to_durable(
624        &self,
625        durable_stream: &DurableStream,
626        event_query: &EventQuery,
627        handle: HandleStream,
628    ) -> Result<EventStream, EventStoreError> {
629        self.register_durable_stream(durable_stream.name(), event_query, handle)
630    }
631}
632
633async fn append_batch(
634    connection: &mut sqlx::SqliteConnection,
635    new_events: Vec<NewEvent>,
636) -> Result<CommittedAppend, EventStoreError> {
637    let last_sequence_number =
638        sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(sequence_number) FROM events")
639            .fetch_one(&mut *connection)
640            .await
641            .map_err(sqlx_backend_failure)?
642            .unwrap_or(0);
643
644    let first_sequence_number = (last_sequence_number as u64) + 1;
645    let committed_count = new_events.len() as u64;
646    let last_sequence_number = first_sequence_number + committed_count - 1;
647
648    let event_records = new_events
649        .into_iter()
650        .enumerate()
651        .map(|(offset, new_event)| EventRecord {
652            sequence_number: first_sequence_number + offset as u64,
653            occurred_at: OffsetDateTime::now_utc(),
654            event_type: new_event.event_type,
655            payload: new_event.payload,
656        })
657        .collect::<Vec<_>>();
658
659    for event_record in &event_records {
660        let payload = serde_json::to_string(&event_record.payload).map_err(json_backend_failure)?;
661        let occurred_at = event_record
662            .occurred_at
663            .format(&Rfc3339)
664            .expect("sqlite occurred_at should format as RFC3339");
665
666        sqlx::query(
667            "INSERT INTO events (sequence_number, occurred_at, event_type, payload)
668             VALUES (?1, ?2, ?3, ?4)",
669        )
670        .bind(event_record.sequence_number as i64)
671        .bind(occurred_at)
672        .bind(&event_record.event_type)
673        .bind(payload)
674        .execute(&mut *connection)
675        .await
676        .map_err(sqlx_backend_failure)?;
677    }
678
679    if first_sequence_number != last_sequence_number {
680        sqlx::query(
681            "INSERT INTO append_batches (first_sequence_number, last_sequence_number)
682             VALUES (?1, ?2)",
683        )
684        .bind(first_sequence_number as i64)
685        .bind(last_sequence_number as i64)
686        .execute(&mut *connection)
687        .await
688        .map_err(sqlx_backend_failure)?;
689    }
690
691    Ok(CommittedAppend {
692        append_result: AppendResult {
693            first_sequence_number,
694            last_sequence_number,
695            committed_count,
696        },
697        event_records,
698    })
699}
700
701async fn load_matching_records(
702    pool: &SqlitePool,
703    event_query: &EventQuery,
704) -> Result<Vec<EventRecord>, EventStoreError> {
705    let candidate_event_types = event_type_candidates(event_query);
706    let rows = fetch_candidate_rows(pool, candidate_event_types.as_deref()).await?;
707    matching_records_from_rows(rows, event_query)
708}
709
710async fn load_matching_records_from_connection(
711    connection: &mut sqlx::SqliteConnection,
712    event_query: &EventQuery,
713) -> Result<Vec<EventRecord>, EventStoreError> {
714    let candidate_event_types = event_type_candidates(event_query);
715    let rows =
716        fetch_candidate_rows_from_connection(connection, candidate_event_types.as_deref()).await?;
717    matching_records_from_rows(rows, event_query)
718}
719
720fn matching_records_from_rows(
721    rows: Vec<SqliteRow>,
722    event_query: &EventQuery,
723) -> Result<Vec<EventRecord>, EventStoreError> {
724    rows.into_iter()
725        .map(row_to_event_record)
726        .filter(|result| {
727            result
728                .as_ref()
729                .is_ok_and(|event_record| matches_query(event_query, event_record))
730        })
731        .collect()
732}
733
734async fn fetch_candidate_rows(
735    pool: &SqlitePool,
736    candidate_event_types: Option<&[String]>,
737) -> Result<Vec<SqliteRow>, EventStoreError> {
738    let mut query_builder = QueryBuilder::<Sqlite>::new(
739        "SELECT sequence_number, occurred_at, event_type, payload FROM events",
740    );
741
742    if let Some(candidate_event_types) = candidate_event_types {
743        if candidate_event_types.is_empty() {
744            query_builder.push(" WHERE FALSE");
745        } else {
746            query_builder.push(" WHERE event_type IN (");
747
748            let mut separated = query_builder.separated(", ");
749            for event_type in candidate_event_types {
750                separated.push_bind(event_type);
751            }
752            separated.push_unseparated(")");
753        }
754    }
755
756    query_builder.push(" ORDER BY sequence_number ASC");
757
758    query_builder
759        .build()
760        .fetch_all(pool)
761        .await
762        .map_err(sqlx_backend_failure)
763}
764
765async fn fetch_candidate_rows_from_connection(
766    connection: &mut sqlx::SqliteConnection,
767    candidate_event_types: Option<&[String]>,
768) -> Result<Vec<SqliteRow>, EventStoreError> {
769    let mut query_builder = QueryBuilder::<Sqlite>::new(
770        "SELECT sequence_number, occurred_at, event_type, payload FROM events",
771    );
772
773    if let Some(candidate_event_types) = candidate_event_types {
774        if candidate_event_types.is_empty() {
775            query_builder.push(" WHERE FALSE");
776        } else {
777            query_builder.push(" WHERE event_type IN (");
778
779            let mut separated = query_builder.separated(", ");
780            for event_type in candidate_event_types {
781                separated.push_bind(event_type);
782            }
783            separated.push_unseparated(")");
784        }
785    }
786
787    query_builder.push(" ORDER BY sequence_number ASC");
788
789    query_builder
790        .build()
791        .fetch_all(connection)
792        .await
793        .map_err(sqlx_backend_failure)
794}
795
796fn event_type_candidates(event_query: &EventQuery) -> Option<Vec<String>> {
797    let filters = match &event_query.filters {
798        None => return None,
799        Some(filters) if filters.is_empty() => return None,
800        Some(filters) => filters,
801    };
802
803    if filters
804        .iter()
805        .any(|event_filter| event_filter.event_types.is_none())
806    {
807        return None;
808    }
809
810    let mut candidate_event_types = Vec::new();
811
812    for event_filter in filters {
813        if let Some(event_types) = &event_filter.event_types {
814            for event_type in event_types {
815                if !candidate_event_types
816                    .iter()
817                    .any(|known| known == event_type)
818                {
819                    candidate_event_types.push(event_type.clone());
820                }
821            }
822        }
823    }
824
825    Some(candidate_event_types)
826}
827
828fn row_to_event_record(row: SqliteRow) -> Result<EventRecord, EventStoreError> {
829    let payload_text = row.get::<String, _>("payload");
830    let payload: Value = serde_json::from_str(&payload_text).map_err(json_backend_failure)?;
831
832    Ok(EventRecord {
833        sequence_number: row.get::<i64, _>("sequence_number") as u64,
834        occurred_at: OffsetDateTime::parse(&row.get::<String, _>("occurred_at"), &Rfc3339)
835            .map_err(time_backend_failure)?,
836        event_type: row.get::<String, _>("event_type"),
837        payload,
838    })
839}
840
841fn ensure_parent_directory(database_path: &Path) -> Result<(), io::Error> {
842    if let Some(parent_directory) = database_path.parent() {
843        if !parent_directory.as_os_str().is_empty() {
844            fs::create_dir_all(parent_directory)?;
845        }
846    }
847
848    Ok(())
849}
850
851fn sqlx_io_error(error: io::Error) -> sqlx::Error {
852    sqlx::Error::Io(error)
853}
854
855fn sqlx_backend_failure(error: sqlx::Error) -> EventStoreError {
856    EventStoreError::BackendFailure {
857        message: error.to_string(),
858    }
859}
860
861fn json_backend_failure(error: serde_json::Error) -> EventStoreError {
862    EventStoreError::BackendFailure {
863        message: error.to_string(),
864    }
865}
866
867fn time_backend_failure(error: time::error::Parse) -> EventStoreError {
868    EventStoreError::BackendFailure {
869        message: error.to_string(),
870    }
871}
872
873fn subscription_registry_backend_failure(message: String) -> EventStoreError {
874    EventStoreError::BackendFailure { message }
875}
876
877fn build_delivery_runtime(operation: &'static str) -> Result<Runtime, EventStoreError> {
878    Builder::new_current_thread()
879        .enable_all()
880        .build()
881        .map_err(sqlx_io_error)
882        .map_err(sqlx_backend_failure)
883        .map_err(|error| match error {
884            EventStoreError::BackendFailure { message } => EventStoreError::BackendFailure {
885                message: format!("{operation}: {message}"),
886            },
887            other => other,
888        })
889}
890
891async fn load_or_create_subscriber_cursor(
892    connection: &mut sqlx::SqliteConnection,
893    subscriber_id: &str,
894    event_query_json: &str,
895) -> Result<u64, EventStoreError> {
896    let existing_cursor = sqlx::query(
897        "SELECT event_query, last_processed_sequence_number
898         FROM subscriber_cursors
899         WHERE subscriber_id = ?1",
900    )
901    .bind(subscriber_id)
902    .fetch_optional(&mut *connection)
903    .await
904    .map_err(sqlx_backend_failure)?;
905
906    match existing_cursor {
907        Some(row) => {
908            let stored_event_query = row.get::<String, _>("event_query");
909            if stored_event_query != event_query_json {
910                return Err(EventStoreError::BackendFailure {
911                    message: format!(
912                        "durable subscriber {subscriber_id} was resumed with a different query"
913                    ),
914                });
915            }
916
917            Ok(row.get::<i64, _>("last_processed_sequence_number") as u64)
918        }
919        None => {
920            sqlx::query(
921                "INSERT INTO subscriber_cursors (subscriber_id, event_query, last_processed_sequence_number)
922                 VALUES (?1, ?2, 0)",
923            )
924            .bind(subscriber_id)
925            .bind(event_query_json)
926            .execute(&mut *connection)
927            .await
928            .map_err(sqlx_backend_failure)?;
929
930            Ok(0)
931        }
932    }
933}
934
935async fn current_max_sequence_number(
936    connection: &mut sqlx::SqliteConnection,
937) -> Result<u64, EventStoreError> {
938    Ok(
939        sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(sequence_number) FROM events")
940            .fetch_one(&mut *connection)
941            .await
942            .map_err(sqlx_backend_failure)?
943            .unwrap_or(0) as u64,
944    )
945}
946
947async fn ensure_replay_history_is_available(
948    connection: &mut sqlx::SqliteConnection,
949    _current_max_sequence_number: u64,
950) -> Result<(), EventStoreError> {
951    let boundary_format =
952        sqlx::query_scalar::<_, Option<String>>("SELECT value FROM store_metadata WHERE key = ?1")
953            .bind(APPEND_BATCH_BOUNDARY_FORMAT_KEY)
954            .fetch_optional(&mut *connection)
955            .await
956            .map_err(sqlx_backend_failure)?;
957
958    if boundary_format.flatten().as_deref() != Some(APPEND_BATCH_BOUNDARY_FORMAT_SPARSE_V1) {
959        return Err(EventStoreError::BackendFailure {
960            message: format!(
961                "durable replay requires store_metadata {}={}",
962                APPEND_BATCH_BOUNDARY_FORMAT_KEY, APPEND_BATCH_BOUNDARY_FORMAT_SPARSE_V1
963            ),
964        });
965    }
966
967    Ok(())
968}
969
970async fn ensure_replay_history_is_available_for_pool(
971    pool: &SqlitePool,
972    current_max_sequence_number: u64,
973) -> Result<(), EventStoreError> {
974    let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
975    ensure_replay_history_is_available(connection.as_mut(), current_max_sequence_number).await
976}
977
978async fn update_subscriber_cursor(
979    pool: &SqlitePool,
980    subscriber_id: &str,
981    last_processed_sequence_number: u64,
982) -> Result<(), EventStoreError> {
983    sqlx::query(
984        "UPDATE subscriber_cursors
985         SET last_processed_sequence_number = ?2
986         WHERE subscriber_id = ?1",
987    )
988    .bind(subscriber_id)
989    .bind(last_processed_sequence_number as i64)
990    .execute(pool)
991    .await
992    .map_err(sqlx_backend_failure)?;
993
994    Ok(())
995}
996
997async fn load_replay_batches(
998    pool: &SqlitePool,
999    event_query: &EventQuery,
1000    last_processed_sequence_number: u64,
1001    replay_until_sequence_number: u64,
1002) -> Result<Vec<ReplayBatch>, EventStoreError> {
1003    if replay_until_sequence_number <= last_processed_sequence_number {
1004        return Ok(Vec::new());
1005    }
1006
1007    let batch_rows = sqlx::query(
1008        "SELECT first_sequence_number, last_sequence_number
1009         FROM append_batches
1010         WHERE last_sequence_number > ?1
1011           AND first_sequence_number <= ?2
1012         ORDER BY first_sequence_number ASC",
1013    )
1014    .bind(last_processed_sequence_number as i64)
1015    .bind(replay_until_sequence_number as i64)
1016    .fetch_all(pool)
1017    .await
1018    .map_err(sqlx_backend_failure)?;
1019
1020    let append_batch_boundaries = batch_rows
1021        .into_iter()
1022        .map(|batch_row| AppendBatchBoundary {
1023            first_sequence_number: batch_row.get::<i64, _>("first_sequence_number") as u64,
1024            last_sequence_number: batch_row.get::<i64, _>("last_sequence_number") as u64,
1025        })
1026        .collect::<Vec<_>>();
1027    let event_records = sqlx::query(
1028        "SELECT sequence_number, occurred_at, event_type, payload
1029         FROM events
1030         WHERE sequence_number > ?1
1031           AND sequence_number <= ?2
1032         ORDER BY sequence_number ASC",
1033    )
1034    .bind(last_processed_sequence_number as i64)
1035    .bind(replay_until_sequence_number as i64)
1036    .fetch_all(pool)
1037    .await
1038    .map_err(sqlx_backend_failure)?
1039    .into_iter()
1040    .map(row_to_event_record)
1041    .collect::<Result<Vec<_>, _>>()?;
1042
1043    Ok(replay_batches_from_records(
1044        event_query,
1045        event_records,
1046        &append_batch_boundaries,
1047    ))
1048}
1049
1050fn replay_batches_from_records(
1051    event_query: &EventQuery,
1052    event_records: Vec<EventRecord>,
1053    append_batch_boundaries: &[AppendBatchBoundary],
1054) -> Vec<ReplayBatch> {
1055    let mut replay_batches = Vec::new();
1056    let mut event_index = 0;
1057    let mut boundary_index = 0;
1058
1059    while event_index < event_records.len() {
1060        let next_sequence_number = event_records[event_index].sequence_number;
1061
1062        while boundary_index < append_batch_boundaries.len()
1063            && append_batch_boundaries[boundary_index].last_sequence_number < next_sequence_number
1064        {
1065            boundary_index += 1;
1066        }
1067
1068        if boundary_index < append_batch_boundaries.len() {
1069            let boundary = &append_batch_boundaries[boundary_index];
1070            if boundary.first_sequence_number <= next_sequence_number
1071                && next_sequence_number <= boundary.last_sequence_number
1072            {
1073                let batch_start = event_index;
1074                while event_index < event_records.len()
1075                    && event_records[event_index].sequence_number <= boundary.last_sequence_number
1076                {
1077                    event_index += 1;
1078                }
1079
1080                let delivered_batch = event_records[batch_start..event_index]
1081                    .iter()
1082                    .filter(|event_record| matches_query(event_query, event_record))
1083                    .cloned()
1084                    .collect();
1085
1086                replay_batches.push(ReplayBatch {
1087                    last_processed_sequence_number: boundary.last_sequence_number,
1088                    delivered_batch,
1089                });
1090                continue;
1091            }
1092        }
1093
1094        let event_record = event_records[event_index].clone();
1095        event_index += 1;
1096
1097        let delivered_batch = if matches_query(event_query, &event_record) {
1098            vec![event_record.clone()]
1099        } else {
1100            Vec::new()
1101        };
1102
1103        replay_batches.push(ReplayBatch {
1104            last_processed_sequence_number: event_record.sequence_number,
1105            delivered_batch,
1106        });
1107    }
1108
1109    replay_batches
1110}
1111
1112fn normalized_durable_event_query(event_query: &EventQuery) -> EventQuery {
1113    EventQuery {
1114        filters: event_query.filters.clone(),
1115        min_sequence_number: None,
1116    }
1117}
1118
1119fn serialize_event_query(event_query: &EventQuery) -> Result<String, EventStoreError> {
1120    let filters = event_query.filters.as_ref().map(|filters| {
1121        filters
1122            .iter()
1123            .map(|filter| {
1124                serde_json::json!({
1125                    "event_types": filter.event_types,
1126                    "payload_predicates": filter.payload_predicates,
1127                })
1128            })
1129            .collect::<Vec<_>>()
1130    });
1131
1132    serde_json::to_string(&serde_json::json!({
1133        "filters": filters,
1134        "min_sequence_number": event_query.min_sequence_number,
1135    }))
1136    .map_err(json_backend_failure)
1137}
1138
1139fn run_delivery_thread(
1140    pool: SqlitePool,
1141    subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
1142    delivery_receiver: Receiver<DeliveryCommand>,
1143) {
1144    let runtime = match Builder::new_current_thread().enable_all().build() {
1145        Ok(runtime) => runtime,
1146        Err(error) => {
1147            eprintln!("factstr-sqlite delivery runtime could not start: {}", error);
1148            return;
1149        }
1150    };
1151
1152    while let Ok(delivery_command) = delivery_receiver.recv() {
1153        match delivery_command {
1154            DeliveryCommand::Deliver(pending_deliveries) => {
1155                for pending_delivery in pending_deliveries {
1156                    match pending_delivery.deliver(&runtime) {
1157                        DeliveryOutcome::Succeeded {
1158                            subscription_id,
1159                            durable_subscriber_id,
1160                            last_processed_sequence_number,
1161                        } => {
1162                            if let Some(durable_subscriber_id) = durable_subscriber_id {
1163                                if let Err(error) = runtime.block_on(update_subscriber_cursor(
1164                                    &pool,
1165                                    &durable_subscriber_id,
1166                                    last_processed_sequence_number,
1167                                )) {
1168                                    eprintln!(
1169                                        "factstr-sqlite durable cursor update failed after delivery for subscriber {}: {}",
1170                                        durable_subscriber_id, error
1171                                    );
1172                                    match subscription_registry.lock() {
1173                                        Ok(mut subscription_registry) => {
1174                                            subscription_registry.unsubscribe(subscription_id)
1175                                        }
1176                                        Err(poisoned) => {
1177                                            poisoned.into_inner().unsubscribe(subscription_id)
1178                                        }
1179                                    }
1180                                }
1181                            }
1182                        }
1183                        DeliveryOutcome::Failed {
1184                            subscription_id,
1185                            durable_subscriber_id,
1186                        }
1187                        | DeliveryOutcome::Panicked {
1188                            subscription_id,
1189                            durable_subscriber_id,
1190                        } => {
1191                            if durable_subscriber_id.is_some() {
1192                                match subscription_registry.lock() {
1193                                    Ok(mut subscription_registry) => {
1194                                        subscription_registry.unsubscribe(subscription_id)
1195                                    }
1196                                    Err(poisoned) => {
1197                                        poisoned.into_inner().unsubscribe(subscription_id)
1198                                    }
1199                                }
1200                            }
1201                        }
1202                    }
1203                }
1204            }
1205            DeliveryCommand::Shutdown => break,
1206        }
1207    }
1208}