1use std::fs;
2use std::future::Future;
3use std::io;
4use std::path::{Path, PathBuf};
5use std::sync::{
6 Arc, Mutex,
7 mpsc::{self, Receiver, Sender},
8};
9use std::thread::{self, JoinHandle};
10
11use factstr::{
12 AppendResult, DurableStream, EventQuery, EventRecord, EventStore, EventStoreError, EventStream,
13 HandleStream, NewEvent, QueryResult,
14};
15use serde_json::Value;
16use sqlx::sqlite::SqliteRow;
17use sqlx::{QueryBuilder, Row, Sqlite, SqlitePool};
18use time::OffsetDateTime;
19use time::format_description::well_known::Rfc3339;
20use tokio::runtime::Builder;
21
22use crate::connection::open_pool;
23use crate::query_match::matches_query;
24use crate::schema::initialize_schema;
25use crate::stream_registry::{DeliveryOutcome, PendingDelivery, SubscriptionRegistry};
26
27#[derive(Clone, Debug)]
28struct CommittedAppend {
29 append_result: AppendResult,
30 event_records: Vec<EventRecord>,
31}
32
33enum DeliveryCommand {
34 Deliver(Vec<PendingDelivery>),
35 Shutdown,
36}
37
38pub struct SqliteStore {
39 database_path: PathBuf,
40 pool: SqlitePool,
41 subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
42 delivery_sender: Sender<DeliveryCommand>,
43 delivery_thread: Mutex<Option<JoinHandle<()>>>,
44}
45
46#[derive(Clone, Debug)]
47struct ReplayBatch {
48 last_processed_sequence_number: u64,
49 delivered_batch: Vec<EventRecord>,
50}
51
52#[derive(Clone, Debug)]
53struct DurableReplayState {
54 subscription_id: u64,
55 last_processed_sequence_number: u64,
56 replay_until_sequence_number: u64,
57}
58
59impl SqliteStore {
60 pub fn open(database_path: impl AsRef<Path>) -> Result<Self, sqlx::Error> {
61 let database_path = database_path.as_ref().to_path_buf();
62 ensure_parent_directory(&database_path).map_err(sqlx_io_error)?;
63
64 let (result_sender, result_receiver) = mpsc::sync_channel(1);
65 let bootstrap_path = database_path.clone();
66
67 let bootstrap_thread = thread::Builder::new()
68 .name("factstr-sqlite-bootstrap".to_owned())
69 .spawn(move || {
70 let runtime = Builder::new_current_thread()
71 .enable_all()
72 .build()
73 .map_err(sqlx_io_error);
74
75 let result = match runtime {
76 Ok(runtime) => runtime.block_on(async {
77 let pool = open_pool(&bootstrap_path).await?;
78 initialize_schema(&pool).await?;
79 Ok::<_, sqlx::Error>(pool)
80 }),
81 Err(error) => Err(error),
82 };
83
84 let _ = result_sender.send(result);
85 })
86 .map_err(sqlx_io_error)?;
87
88 let pool = match result_receiver.recv() {
89 Ok(result) => result,
90 Err(error) => Err(sqlx_io_error(io::Error::other(format!(
91 "sqlite bootstrap thread did not return a result: {error}"
92 )))),
93 }?;
94
95 bootstrap_thread.join().map_err(|_| {
96 sqlx_io_error(io::Error::other(
97 "sqlite bootstrap thread panicked before open completed",
98 ))
99 })?;
100
101 let subscription_registry = Arc::new(Mutex::new(SubscriptionRegistry::default()));
102 let (delivery_sender, delivery_receiver) = mpsc::channel();
103 let delivery_pool = pool.clone();
104 let delivery_registry = Arc::clone(&subscription_registry);
105 let delivery_thread = thread::Builder::new()
106 .name("factstr-sqlite-delivery".to_owned())
107 .spawn(move || run_delivery_thread(delivery_pool, delivery_registry, delivery_receiver))
108 .map_err(sqlx_io_error)?;
109
110 Ok(Self {
111 database_path,
112 pool,
113 subscription_registry,
114 delivery_sender,
115 delivery_thread: Mutex::new(Some(delivery_thread)),
116 })
117 }
118
119 pub fn database_path(&self) -> &Path {
120 &self.database_path
121 }
122
123 fn run_async<T, Fut, F>(&self, operation: &'static str, work: F) -> Result<T, EventStoreError>
124 where
125 T: Send + 'static,
126 Fut: Future<Output = Result<T, EventStoreError>> + Send + 'static,
127 F: FnOnce(SqlitePool) -> Fut + Send + 'static,
128 {
129 let pool = self.pool.clone();
130 let (result_sender, result_receiver) = mpsc::sync_channel(1);
131
132 let worker_thread = thread::Builder::new()
133 .name(format!("factstr-sqlite-{operation}"))
134 .spawn(move || {
135 let runtime = Builder::new_current_thread()
136 .enable_all()
137 .build()
138 .map_err(sqlx_io_error)
139 .map_err(sqlx_backend_failure);
140
141 let result = match runtime {
142 Ok(runtime) => runtime.block_on(work(pool)),
143 Err(error) => Err(error),
144 };
145
146 let _ = result_sender.send(result);
147 })
148 .map_err(sqlx_io_error)
149 .map_err(sqlx_backend_failure)?;
150
151 let result = result_receiver
152 .recv()
153 .map_err(|error| EventStoreError::BackendFailure {
154 message: format!("sqlite {operation} thread did not return a result: {error}"),
155 })?;
156
157 worker_thread
158 .join()
159 .map_err(|_| EventStoreError::BackendFailure {
160 message: format!("sqlite {operation} thread panicked before completion"),
161 })?;
162
163 result
164 }
165
166 fn pending_deliveries(&self, committed_batch: &[EventRecord]) -> Vec<PendingDelivery> {
167 match self.subscription_registry.lock() {
168 Ok(mut subscription_registry) => {
169 subscription_registry.pending_deliveries(committed_batch)
170 }
171 Err(poisoned) => poisoned.into_inner().pending_deliveries(committed_batch),
172 }
173 }
174
175 fn enqueue_delivery(&self, pending_deliveries: Vec<PendingDelivery>) {
176 if pending_deliveries.is_empty() {
177 return;
178 }
179
180 if let Err(error) = self
181 .delivery_sender
182 .send(DeliveryCommand::Deliver(pending_deliveries))
183 {
184 eprintln!(
185 "factstr-sqlite delivery dispatcher stopped after commit: {}",
186 error
187 );
188 }
189 }
190
191 fn register_all_durable_stream(
192 &self,
193 subscriber_id: impl Into<String>,
194 handle: HandleStream,
195 ) -> Result<EventStream, EventStoreError> {
196 self.subscribe_durable(subscriber_id.into(), EventQuery::all(), handle)
197 }
198
199 fn register_durable_stream(
200 &self,
201 subscriber_id: impl Into<String>,
202 event_query: &EventQuery,
203 handle: HandleStream,
204 ) -> Result<EventStream, EventStoreError> {
205 self.subscribe_durable(subscriber_id.into(), event_query.clone(), handle)
206 }
207
208 fn subscribe_durable(
209 &self,
210 subscriber_id: String,
211 event_query: EventQuery,
212 handle: HandleStream,
213 ) -> Result<EventStream, EventStoreError> {
214 let normalized_event_query = normalized_durable_event_query(&event_query);
215 let event_query_json = serialize_event_query(&normalized_event_query)?;
216 let subscription_registry = Arc::clone(&self.subscription_registry);
217 let durable_subscriber_id = subscriber_id.clone();
218 let normalized_event_query_for_registry = normalized_event_query.clone();
219 let handle_for_registry = handle.clone();
220 let replay_subscriber_id = subscriber_id.clone();
221
222 let replay_state = self.run_async("subscribe_durable", move |pool| async move {
223 let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
224
225 sqlx::query("BEGIN IMMEDIATE")
226 .execute(connection.as_mut())
227 .await
228 .map_err(sqlx_backend_failure)?;
229
230 let last_processed_sequence_number = load_or_create_subscriber_cursor(
231 connection.as_mut(),
232 &durable_subscriber_id,
233 &event_query_json,
234 )
235 .await?;
236 let replay_until_sequence_number =
237 current_max_sequence_number(connection.as_mut()).await?;
238
239 let subscription_id = match subscription_registry.lock() {
240 Ok(mut subscription_registry) => {
241 if normalized_event_query_for_registry.filters.is_none() {
242 subscription_registry
243 .subscribe_all_durable(
244 durable_subscriber_id.clone(),
245 replay_until_sequence_number,
246 handle_for_registry.clone(),
247 )
248 .map_err(subscription_registry_backend_failure)?
249 } else {
250 subscription_registry
251 .subscribe_to_durable(
252 durable_subscriber_id.clone(),
253 Some(normalized_event_query_for_registry.clone()),
254 replay_until_sequence_number,
255 handle_for_registry.clone(),
256 )
257 .map_err(subscription_registry_backend_failure)?
258 }
259 }
260 Err(poisoned) => {
261 let mut subscription_registry = poisoned.into_inner();
262 if normalized_event_query_for_registry.filters.is_none() {
263 subscription_registry
264 .subscribe_all_durable(
265 durable_subscriber_id.clone(),
266 replay_until_sequence_number,
267 handle_for_registry.clone(),
268 )
269 .map_err(subscription_registry_backend_failure)?
270 } else {
271 subscription_registry
272 .subscribe_to_durable(
273 durable_subscriber_id.clone(),
274 Some(normalized_event_query_for_registry.clone()),
275 replay_until_sequence_number,
276 handle_for_registry.clone(),
277 )
278 .map_err(subscription_registry_backend_failure)?
279 }
280 }
281 };
282
283 if let Err(error) = sqlx::query("COMMIT").execute(connection.as_mut()).await {
284 match subscription_registry.lock() {
285 Ok(mut subscription_registry) => {
286 subscription_registry.unsubscribe(subscription_id)
287 }
288 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
289 }
290 return Err(sqlx_backend_failure(error));
291 }
292
293 Ok(DurableReplayState {
294 subscription_id,
295 last_processed_sequence_number,
296 replay_until_sequence_number,
297 })
298 })?;
299
300 let subscription = self.build_subscription_handle(replay_state.subscription_id);
301
302 let replay_batches = match self.run_async("durable_replay", {
303 let event_query = normalized_event_query.clone();
304 move |pool| async move {
305 ensure_replay_history_is_available_for_pool(
306 &pool,
307 replay_state.replay_until_sequence_number,
308 )
309 .await?;
310 load_replay_batches(
311 &pool,
312 &event_query,
313 replay_state.last_processed_sequence_number,
314 replay_state.replay_until_sequence_number,
315 )
316 .await
317 }
318 }) {
319 Ok(replay_batches) => replay_batches,
320 Err(error) => {
321 self.cleanup_durable_subscription(replay_state.subscription_id);
322 return Err(error);
323 }
324 };
325
326 for replay_batch in replay_batches {
327 let pending_delivery = PendingDelivery {
328 subscription_id: replay_state.subscription_id,
329 durable_subscriber_id: Some(replay_subscriber_id.clone()),
330 last_processed_sequence_number: replay_batch.last_processed_sequence_number,
331 delivered_batch: replay_batch.delivered_batch,
332 handle: handle.clone(),
333 };
334
335 match self.process_durable_delivery(pending_delivery) {
336 Ok(true) => {}
337 Ok(false) => {
338 self.cleanup_durable_subscription(replay_state.subscription_id);
339
340 return Err(EventStoreError::BackendFailure {
341 message: format!(
342 "durable replay for subscriber {} did not complete successfully",
343 replay_subscriber_id
344 ),
345 });
346 }
347 Err(error) => {
348 self.cleanup_durable_subscription(replay_state.subscription_id);
349 return Err(error);
350 }
351 }
352 }
353
354 self.finish_durable_replay(replay_state.subscription_id);
355 Ok(subscription)
356 }
357
358 fn build_subscription_handle(&self, subscription_id: u64) -> EventStream {
359 let subscription_registry = Arc::clone(&self.subscription_registry);
360
361 EventStream::new(
362 subscription_id,
363 Arc::new(move |subscription_id| match subscription_registry.lock() {
364 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
365 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
366 }),
367 )
368 }
369
370 fn process_durable_delivery(
371 &self,
372 pending_delivery: PendingDelivery,
373 ) -> Result<bool, EventStoreError> {
374 match pending_delivery.deliver() {
375 DeliveryOutcome::Succeeded {
376 durable_subscriber_id,
377 last_processed_sequence_number,
378 ..
379 } => {
380 if let Some(durable_subscriber_id) = durable_subscriber_id {
381 self.run_async("advance_durable_cursor", move |pool| async move {
382 update_subscriber_cursor(
383 &pool,
384 &durable_subscriber_id,
385 last_processed_sequence_number,
386 )
387 .await
388 })?;
389 }
390
391 Ok(true)
392 }
393 DeliveryOutcome::Failed { .. } | DeliveryOutcome::Panicked { .. } => Ok(false),
394 }
395 }
396
397 fn finish_durable_replay(&self, subscription_id: u64) {
398 match self.subscription_registry.lock() {
399 Ok(mut subscription_registry) => {
400 let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
401 self.enqueue_delivery(buffered_deliveries);
402 }
403 Err(poisoned) => {
404 let mut subscription_registry = poisoned.into_inner();
405 let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
406 self.enqueue_delivery(buffered_deliveries);
407 }
408 }
409 }
410
411 fn cleanup_durable_subscription(&self, subscription_id: u64) {
412 match self.subscription_registry.lock() {
413 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
414 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
415 }
416 }
417}
418
419impl Drop for SqliteStore {
420 fn drop(&mut self) {
421 let _ = self.delivery_sender.send(DeliveryCommand::Shutdown);
422
423 if let Ok(mut delivery_thread) = self.delivery_thread.lock() {
424 if let Some(delivery_thread) = delivery_thread.take() {
425 let _ = delivery_thread.join();
426 }
427 }
428 }
429}
430
431impl EventStore for SqliteStore {
432 fn query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
433 let matching_records = self.run_async("query", {
434 let event_query = event_query.clone();
435 move |pool| async move { load_matching_records(&pool, &event_query).await }
436 })?;
437
438 let current_context_version = matching_records
439 .last()
440 .map(|event_record| event_record.sequence_number);
441
442 let event_records = matching_records
443 .into_iter()
444 .filter(|event_record| {
445 event_query
446 .min_sequence_number
447 .is_none_or(|min_sequence_number| {
448 event_record.sequence_number > min_sequence_number
449 })
450 })
451 .collect::<Vec<_>>();
452
453 let last_returned_sequence_number = event_records
454 .last()
455 .map(|event_record| event_record.sequence_number);
456
457 Ok(QueryResult {
458 event_records,
459 last_returned_sequence_number,
460 current_context_version,
461 })
462 }
463
464 fn append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
465 if new_events.is_empty() {
466 return Err(EventStoreError::EmptyAppend);
467 }
468
469 let committed_append = self.run_async("append", move |pool| async move {
470 let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
471
472 sqlx::query("BEGIN IMMEDIATE")
473 .execute(connection.as_mut())
474 .await
475 .map_err(sqlx_backend_failure)?;
476
477 let append_result = append_batch(connection.as_mut(), new_events).await;
478
479 match append_result {
480 Ok(committed_append) => {
481 sqlx::query("COMMIT")
482 .execute(connection.as_mut())
483 .await
484 .map_err(sqlx_backend_failure)?;
485 Ok(committed_append)
486 }
487 Err(error) => {
488 let _ = sqlx::query("ROLLBACK").execute(connection.as_mut()).await;
489 Err(error)
490 }
491 }
492 })?;
493
494 let pending_deliveries = self.pending_deliveries(&committed_append.event_records);
495 self.enqueue_delivery(pending_deliveries);
496 Ok(committed_append.append_result)
497 }
498
499 fn append_if(
500 &self,
501 new_events: Vec<NewEvent>,
502 context_query: &EventQuery,
503 expected_context_version: Option<u64>,
504 ) -> Result<AppendResult, EventStoreError> {
505 if new_events.is_empty() {
506 return Err(EventStoreError::EmptyAppend);
507 }
508
509 let committed_append = self.run_async("append_if", {
510 let context_query = context_query.clone();
511 move |pool| async move {
512 let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
513
514 sqlx::query("BEGIN IMMEDIATE")
515 .execute(connection.as_mut())
516 .await
517 .map_err(sqlx_backend_failure)?;
518
519 let conflict_query = EventQuery {
520 filters: context_query.filters.clone(),
521 min_sequence_number: None,
522 };
523 let actual_context_version =
524 load_matching_records_from_connection(connection.as_mut(), &conflict_query)
525 .await?
526 .last()
527 .map(|event_record| event_record.sequence_number);
528
529 if actual_context_version != expected_context_version {
530 sqlx::query("ROLLBACK")
531 .execute(connection.as_mut())
532 .await
533 .map_err(sqlx_backend_failure)?;
534
535 return Err(EventStoreError::ConditionalAppendConflict {
536 expected: expected_context_version,
537 actual: actual_context_version,
538 });
539 }
540
541 let append_result = append_batch(connection.as_mut(), new_events).await;
542
543 match append_result {
544 Ok(committed_append) => {
545 sqlx::query("COMMIT")
546 .execute(connection.as_mut())
547 .await
548 .map_err(sqlx_backend_failure)?;
549 Ok(committed_append)
550 }
551 Err(error) => {
552 let _ = sqlx::query("ROLLBACK").execute(connection.as_mut()).await;
553 Err(error)
554 }
555 }
556 }
557 })?;
558
559 let pending_deliveries = self.pending_deliveries(&committed_append.event_records);
560 self.enqueue_delivery(pending_deliveries);
561 Ok(committed_append.append_result)
562 }
563
564 fn stream_all(&self, handle: HandleStream) -> Result<EventStream, EventStoreError> {
565 let subscription_registry = Arc::clone(&self.subscription_registry);
566 let id = match subscription_registry.lock() {
567 Ok(mut subscription_registry) => subscription_registry.subscribe_all(handle),
568 Err(poisoned) => poisoned.into_inner().subscribe_all(handle),
569 };
570
571 Ok(EventStream::new(
572 id,
573 Arc::new(move |subscription_id| match subscription_registry.lock() {
574 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
575 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
576 }),
577 ))
578 }
579
580 fn stream_to(
581 &self,
582 event_query: &EventQuery,
583 handle: HandleStream,
584 ) -> Result<EventStream, EventStoreError> {
585 let subscription_registry = Arc::clone(&self.subscription_registry);
586 let id = match subscription_registry.lock() {
587 Ok(mut subscription_registry) => {
588 subscription_registry.subscribe_to(Some(event_query.clone()), handle)
589 }
590 Err(poisoned) => poisoned
591 .into_inner()
592 .subscribe_to(Some(event_query.clone()), handle),
593 };
594
595 Ok(EventStream::new(
596 id,
597 Arc::new(move |subscription_id| match subscription_registry.lock() {
598 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
599 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
600 }),
601 ))
602 }
603
604 fn stream_all_durable(
605 &self,
606 durable_stream: &DurableStream,
607 handle: HandleStream,
608 ) -> Result<EventStream, EventStoreError> {
609 self.register_all_durable_stream(durable_stream.name(), handle)
610 }
611
612 fn stream_to_durable(
613 &self,
614 durable_stream: &DurableStream,
615 event_query: &EventQuery,
616 handle: HandleStream,
617 ) -> Result<EventStream, EventStoreError> {
618 self.register_durable_stream(durable_stream.name(), event_query, handle)
619 }
620}
621
622async fn append_batch(
623 connection: &mut sqlx::SqliteConnection,
624 new_events: Vec<NewEvent>,
625) -> Result<CommittedAppend, EventStoreError> {
626 let last_sequence_number =
627 sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(sequence_number) FROM events")
628 .fetch_one(&mut *connection)
629 .await
630 .map_err(sqlx_backend_failure)?
631 .unwrap_or(0);
632
633 let first_sequence_number = (last_sequence_number as u64) + 1;
634 let committed_count = new_events.len() as u64;
635 let last_sequence_number = first_sequence_number + committed_count - 1;
636
637 let event_records = new_events
638 .into_iter()
639 .enumerate()
640 .map(|(offset, new_event)| EventRecord {
641 sequence_number: first_sequence_number + offset as u64,
642 occurred_at: OffsetDateTime::now_utc(),
643 event_type: new_event.event_type,
644 payload: new_event.payload,
645 })
646 .collect::<Vec<_>>();
647
648 for event_record in &event_records {
649 let payload = serde_json::to_string(&event_record.payload).map_err(json_backend_failure)?;
650 let occurred_at = event_record
651 .occurred_at
652 .format(&Rfc3339)
653 .expect("sqlite occurred_at should format as RFC3339");
654
655 sqlx::query(
656 "INSERT INTO events (sequence_number, occurred_at, event_type, payload)
657 VALUES (?1, ?2, ?3, ?4)",
658 )
659 .bind(event_record.sequence_number as i64)
660 .bind(occurred_at)
661 .bind(&event_record.event_type)
662 .bind(payload)
663 .execute(&mut *connection)
664 .await
665 .map_err(sqlx_backend_failure)?;
666 }
667
668 sqlx::query(
669 "INSERT INTO append_batches (first_sequence_number, last_sequence_number)
670 VALUES (?1, ?2)",
671 )
672 .bind(first_sequence_number as i64)
673 .bind(last_sequence_number as i64)
674 .execute(&mut *connection)
675 .await
676 .map_err(sqlx_backend_failure)?;
677
678 Ok(CommittedAppend {
679 append_result: AppendResult {
680 first_sequence_number,
681 last_sequence_number,
682 committed_count,
683 },
684 event_records,
685 })
686}
687
688async fn load_matching_records(
689 pool: &SqlitePool,
690 event_query: &EventQuery,
691) -> Result<Vec<EventRecord>, EventStoreError> {
692 let candidate_event_types = event_type_candidates(event_query);
693 let rows = fetch_candidate_rows(pool, candidate_event_types.as_deref()).await?;
694 matching_records_from_rows(rows, event_query)
695}
696
697async fn load_matching_records_from_connection(
698 connection: &mut sqlx::SqliteConnection,
699 event_query: &EventQuery,
700) -> Result<Vec<EventRecord>, EventStoreError> {
701 let candidate_event_types = event_type_candidates(event_query);
702 let rows =
703 fetch_candidate_rows_from_connection(connection, candidate_event_types.as_deref()).await?;
704 matching_records_from_rows(rows, event_query)
705}
706
707fn matching_records_from_rows(
708 rows: Vec<SqliteRow>,
709 event_query: &EventQuery,
710) -> Result<Vec<EventRecord>, EventStoreError> {
711 rows.into_iter()
712 .map(row_to_event_record)
713 .filter(|result| {
714 result
715 .as_ref()
716 .is_ok_and(|event_record| matches_query(event_query, event_record))
717 })
718 .collect()
719}
720
721async fn fetch_candidate_rows(
722 pool: &SqlitePool,
723 candidate_event_types: Option<&[String]>,
724) -> Result<Vec<SqliteRow>, EventStoreError> {
725 let mut query_builder = QueryBuilder::<Sqlite>::new(
726 "SELECT sequence_number, occurred_at, event_type, payload FROM events",
727 );
728
729 if let Some(candidate_event_types) = candidate_event_types {
730 if candidate_event_types.is_empty() {
731 query_builder.push(" WHERE FALSE");
732 } else {
733 query_builder.push(" WHERE event_type IN (");
734
735 let mut separated = query_builder.separated(", ");
736 for event_type in candidate_event_types {
737 separated.push_bind(event_type);
738 }
739 separated.push_unseparated(")");
740 }
741 }
742
743 query_builder.push(" ORDER BY sequence_number ASC");
744
745 query_builder
746 .build()
747 .fetch_all(pool)
748 .await
749 .map_err(sqlx_backend_failure)
750}
751
752async fn fetch_candidate_rows_from_connection(
753 connection: &mut sqlx::SqliteConnection,
754 candidate_event_types: Option<&[String]>,
755) -> Result<Vec<SqliteRow>, EventStoreError> {
756 let mut query_builder = QueryBuilder::<Sqlite>::new(
757 "SELECT sequence_number, occurred_at, event_type, payload FROM events",
758 );
759
760 if let Some(candidate_event_types) = candidate_event_types {
761 if candidate_event_types.is_empty() {
762 query_builder.push(" WHERE FALSE");
763 } else {
764 query_builder.push(" WHERE event_type IN (");
765
766 let mut separated = query_builder.separated(", ");
767 for event_type in candidate_event_types {
768 separated.push_bind(event_type);
769 }
770 separated.push_unseparated(")");
771 }
772 }
773
774 query_builder.push(" ORDER BY sequence_number ASC");
775
776 query_builder
777 .build()
778 .fetch_all(connection)
779 .await
780 .map_err(sqlx_backend_failure)
781}
782
783fn event_type_candidates(event_query: &EventQuery) -> Option<Vec<String>> {
784 let filters = match &event_query.filters {
785 None => return None,
786 Some(filters) if filters.is_empty() => return None,
787 Some(filters) => filters,
788 };
789
790 if filters
791 .iter()
792 .any(|event_filter| event_filter.event_types.is_none())
793 {
794 return None;
795 }
796
797 let mut candidate_event_types = Vec::new();
798
799 for event_filter in filters {
800 if let Some(event_types) = &event_filter.event_types {
801 for event_type in event_types {
802 if !candidate_event_types
803 .iter()
804 .any(|known| known == event_type)
805 {
806 candidate_event_types.push(event_type.clone());
807 }
808 }
809 }
810 }
811
812 Some(candidate_event_types)
813}
814
815fn row_to_event_record(row: SqliteRow) -> Result<EventRecord, EventStoreError> {
816 let payload_text = row.get::<String, _>("payload");
817 let payload: Value = serde_json::from_str(&payload_text).map_err(json_backend_failure)?;
818
819 Ok(EventRecord {
820 sequence_number: row.get::<i64, _>("sequence_number") as u64,
821 occurred_at: OffsetDateTime::parse(&row.get::<String, _>("occurred_at"), &Rfc3339)
822 .map_err(time_backend_failure)?,
823 event_type: row.get::<String, _>("event_type"),
824 payload,
825 })
826}
827
828fn ensure_parent_directory(database_path: &Path) -> Result<(), io::Error> {
829 if let Some(parent_directory) = database_path.parent() {
830 if !parent_directory.as_os_str().is_empty() {
831 fs::create_dir_all(parent_directory)?;
832 }
833 }
834
835 Ok(())
836}
837
838fn sqlx_io_error(error: io::Error) -> sqlx::Error {
839 sqlx::Error::Io(error)
840}
841
842fn sqlx_backend_failure(error: sqlx::Error) -> EventStoreError {
843 EventStoreError::BackendFailure {
844 message: error.to_string(),
845 }
846}
847
848fn json_backend_failure(error: serde_json::Error) -> EventStoreError {
849 EventStoreError::BackendFailure {
850 message: error.to_string(),
851 }
852}
853
854fn time_backend_failure(error: time::error::Parse) -> EventStoreError {
855 EventStoreError::BackendFailure {
856 message: error.to_string(),
857 }
858}
859
860fn subscription_registry_backend_failure(message: String) -> EventStoreError {
861 EventStoreError::BackendFailure { message }
862}
863
864async fn load_or_create_subscriber_cursor(
865 connection: &mut sqlx::SqliteConnection,
866 subscriber_id: &str,
867 event_query_json: &str,
868) -> Result<u64, EventStoreError> {
869 let existing_cursor = sqlx::query(
870 "SELECT event_query, last_processed_sequence_number
871 FROM subscriber_cursors
872 WHERE subscriber_id = ?1",
873 )
874 .bind(subscriber_id)
875 .fetch_optional(&mut *connection)
876 .await
877 .map_err(sqlx_backend_failure)?;
878
879 match existing_cursor {
880 Some(row) => {
881 let stored_event_query = row.get::<String, _>("event_query");
882 if stored_event_query != event_query_json {
883 return Err(EventStoreError::BackendFailure {
884 message: format!(
885 "durable subscriber {subscriber_id} was resumed with a different query"
886 ),
887 });
888 }
889
890 Ok(row.get::<i64, _>("last_processed_sequence_number") as u64)
891 }
892 None => {
893 sqlx::query(
894 "INSERT INTO subscriber_cursors (subscriber_id, event_query, last_processed_sequence_number)
895 VALUES (?1, ?2, 0)",
896 )
897 .bind(subscriber_id)
898 .bind(event_query_json)
899 .execute(&mut *connection)
900 .await
901 .map_err(sqlx_backend_failure)?;
902
903 Ok(0)
904 }
905 }
906}
907
908async fn current_max_sequence_number(
909 connection: &mut sqlx::SqliteConnection,
910) -> Result<u64, EventStoreError> {
911 Ok(
912 sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(sequence_number) FROM events")
913 .fetch_one(&mut *connection)
914 .await
915 .map_err(sqlx_backend_failure)?
916 .unwrap_or(0) as u64,
917 )
918}
919
920async fn ensure_replay_history_is_available(
921 connection: &mut sqlx::SqliteConnection,
922 current_max_sequence_number: u64,
923) -> Result<(), EventStoreError> {
924 if current_max_sequence_number == 0 {
925 return Ok(());
926 }
927
928 let batch_rows = sqlx::query(
929 "SELECT first_sequence_number, last_sequence_number
930 FROM append_batches
931 ORDER BY first_sequence_number ASC",
932 )
933 .fetch_all(&mut *connection)
934 .await
935 .map_err(sqlx_backend_failure)?;
936
937 if batch_rows.is_empty() {
938 return Err(EventStoreError::BackendFailure {
939 message: "durable replay requires append_batches history for all persisted events"
940 .to_owned(),
941 });
942 }
943
944 let mut expected_first_sequence_number = 1_u64;
945
946 for batch_row in batch_rows {
947 let first_sequence_number = batch_row.get::<i64, _>("first_sequence_number") as u64;
948 let last_sequence_number = batch_row.get::<i64, _>("last_sequence_number") as u64;
949
950 if first_sequence_number != expected_first_sequence_number
951 || last_sequence_number < first_sequence_number
952 {
953 return Err(EventStoreError::BackendFailure {
954 message:
955 "durable replay requires contiguous append_batches history for all persisted events"
956 .to_owned(),
957 });
958 }
959
960 expected_first_sequence_number = last_sequence_number + 1;
961 }
962
963 if expected_first_sequence_number - 1 != current_max_sequence_number {
964 return Err(EventStoreError::BackendFailure {
965 message: "durable replay requires append_batches history for all persisted events"
966 .to_owned(),
967 });
968 }
969
970 Ok(())
971}
972
973async fn ensure_replay_history_is_available_for_pool(
974 pool: &SqlitePool,
975 current_max_sequence_number: u64,
976) -> Result<(), EventStoreError> {
977 let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
978 ensure_replay_history_is_available(connection.as_mut(), current_max_sequence_number).await
979}
980
981async fn update_subscriber_cursor(
982 pool: &SqlitePool,
983 subscriber_id: &str,
984 last_processed_sequence_number: u64,
985) -> Result<(), EventStoreError> {
986 sqlx::query(
987 "UPDATE subscriber_cursors
988 SET last_processed_sequence_number = ?2
989 WHERE subscriber_id = ?1",
990 )
991 .bind(subscriber_id)
992 .bind(last_processed_sequence_number as i64)
993 .execute(pool)
994 .await
995 .map_err(sqlx_backend_failure)?;
996
997 Ok(())
998}
999
1000async fn load_replay_batches(
1001 pool: &SqlitePool,
1002 event_query: &EventQuery,
1003 last_processed_sequence_number: u64,
1004 replay_until_sequence_number: u64,
1005) -> Result<Vec<ReplayBatch>, EventStoreError> {
1006 if replay_until_sequence_number <= last_processed_sequence_number {
1007 return Ok(Vec::new());
1008 }
1009
1010 let batch_rows = sqlx::query(
1011 "SELECT first_sequence_number, last_sequence_number
1012 FROM append_batches
1013 WHERE last_sequence_number > ?1
1014 AND first_sequence_number <= ?2
1015 ORDER BY first_sequence_number ASC",
1016 )
1017 .bind(last_processed_sequence_number as i64)
1018 .bind(replay_until_sequence_number as i64)
1019 .fetch_all(pool)
1020 .await
1021 .map_err(sqlx_backend_failure)?;
1022
1023 let mut replay_batches = Vec::new();
1024
1025 for batch_row in batch_rows {
1026 let first_sequence_number = batch_row.get::<i64, _>("first_sequence_number") as u64;
1027 let last_sequence_number = batch_row.get::<i64, _>("last_sequence_number") as u64;
1028 let event_rows = sqlx::query(
1029 "SELECT sequence_number, occurred_at, event_type, payload
1030 FROM events
1031 WHERE sequence_number >= ?1 AND sequence_number <= ?2
1032 ORDER BY sequence_number ASC",
1033 )
1034 .bind(first_sequence_number as i64)
1035 .bind(last_sequence_number as i64)
1036 .fetch_all(pool)
1037 .await
1038 .map_err(sqlx_backend_failure)?;
1039
1040 let delivered_batch = event_rows
1041 .into_iter()
1042 .map(row_to_event_record)
1043 .filter(|result| {
1044 result
1045 .as_ref()
1046 .is_ok_and(|event_record| matches_query(event_query, event_record))
1047 })
1048 .collect::<Result<Vec<_>, _>>()?;
1049
1050 replay_batches.push(ReplayBatch {
1051 last_processed_sequence_number: last_sequence_number,
1052 delivered_batch,
1053 });
1054 }
1055
1056 Ok(replay_batches)
1057}
1058
1059fn normalized_durable_event_query(event_query: &EventQuery) -> EventQuery {
1060 EventQuery {
1061 filters: event_query.filters.clone(),
1062 min_sequence_number: None,
1063 }
1064}
1065
1066fn serialize_event_query(event_query: &EventQuery) -> Result<String, EventStoreError> {
1067 let filters = event_query.filters.as_ref().map(|filters| {
1068 filters
1069 .iter()
1070 .map(|filter| {
1071 serde_json::json!({
1072 "event_types": filter.event_types,
1073 "payload_predicates": filter.payload_predicates,
1074 })
1075 })
1076 .collect::<Vec<_>>()
1077 });
1078
1079 serde_json::to_string(&serde_json::json!({
1080 "filters": filters,
1081 "min_sequence_number": event_query.min_sequence_number,
1082 }))
1083 .map_err(json_backend_failure)
1084}
1085
1086fn run_delivery_thread(
1087 pool: SqlitePool,
1088 subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
1089 delivery_receiver: Receiver<DeliveryCommand>,
1090) {
1091 let runtime = match Builder::new_current_thread().enable_all().build() {
1092 Ok(runtime) => runtime,
1093 Err(error) => {
1094 eprintln!("factstr-sqlite delivery runtime could not start: {}", error);
1095 return;
1096 }
1097 };
1098
1099 while let Ok(delivery_command) = delivery_receiver.recv() {
1100 match delivery_command {
1101 DeliveryCommand::Deliver(pending_deliveries) => {
1102 for pending_delivery in pending_deliveries {
1103 match pending_delivery.deliver() {
1104 DeliveryOutcome::Succeeded {
1105 subscription_id,
1106 durable_subscriber_id,
1107 last_processed_sequence_number,
1108 } => {
1109 if let Some(durable_subscriber_id) = durable_subscriber_id {
1110 if let Err(error) = runtime.block_on(update_subscriber_cursor(
1111 &pool,
1112 &durable_subscriber_id,
1113 last_processed_sequence_number,
1114 )) {
1115 eprintln!(
1116 "factstr-sqlite durable cursor update failed after delivery for subscriber {}: {}",
1117 durable_subscriber_id, error
1118 );
1119 match subscription_registry.lock() {
1120 Ok(mut subscription_registry) => {
1121 subscription_registry.unsubscribe(subscription_id)
1122 }
1123 Err(poisoned) => {
1124 poisoned.into_inner().unsubscribe(subscription_id)
1125 }
1126 }
1127 }
1128 }
1129 }
1130 DeliveryOutcome::Failed {
1131 subscription_id,
1132 durable_subscriber_id,
1133 }
1134 | DeliveryOutcome::Panicked {
1135 subscription_id,
1136 durable_subscriber_id,
1137 } => {
1138 if durable_subscriber_id.is_some() {
1139 match subscription_registry.lock() {
1140 Ok(mut subscription_registry) => {
1141 subscription_registry.unsubscribe(subscription_id)
1142 }
1143 Err(poisoned) => {
1144 poisoned.into_inner().unsubscribe(subscription_id)
1145 }
1146 }
1147 }
1148 }
1149 }
1150 }
1151 }
1152 DeliveryCommand::Shutdown => break,
1153 }
1154 }
1155}