1use std::fs;
2use std::future::Future;
3use std::io;
4use std::path::{Path, PathBuf};
5use std::sync::{
6 Arc, Mutex,
7 mpsc::{self, Receiver, Sender},
8};
9use std::thread::{self, JoinHandle};
10
11use factstr::{
12 AppendResult, DurableStream, EventQuery, EventRecord, EventStore, EventStoreError, EventStream,
13 HandleStream, NewEvent, QueryResult,
14};
15use serde_json::Value;
16use sqlx::sqlite::SqliteRow;
17use sqlx::{QueryBuilder, Row, Sqlite, SqlitePool};
18use time::OffsetDateTime;
19use time::format_description::well_known::Rfc3339;
20use tokio::runtime::Builder;
21use tokio::runtime::Runtime;
22
23use crate::connection::open_pool;
24use crate::query_match::matches_query;
25use crate::schema::{
26 APPEND_BATCH_BOUNDARY_FORMAT_KEY, APPEND_BATCH_BOUNDARY_FORMAT_SPARSE_V1, initialize_schema,
27};
28use crate::stream_registry::{DeliveryOutcome, PendingDelivery, SubscriptionRegistry};
29
30#[derive(Clone, Debug)]
31struct CommittedAppend {
32 append_result: AppendResult,
33 event_records: Vec<EventRecord>,
34}
35
36enum DeliveryCommand {
37 Deliver(Vec<PendingDelivery>),
38 Shutdown,
39}
40
41pub struct SqliteStore {
42 database_path: PathBuf,
43 pool: SqlitePool,
44 subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
45 delivery_sender: Sender<DeliveryCommand>,
46 delivery_thread: Mutex<Option<JoinHandle<()>>>,
47}
48
49#[derive(Clone, Debug)]
50struct ReplayBatch {
51 last_processed_sequence_number: u64,
52 delivered_batch: Vec<EventRecord>,
53}
54
55#[derive(Clone, Debug)]
56struct AppendBatchBoundary {
57 first_sequence_number: u64,
58 last_sequence_number: u64,
59}
60
61#[derive(Clone, Debug)]
62struct DurableReplayState {
63 subscription_id: u64,
64 last_processed_sequence_number: u64,
65 replay_until_sequence_number: u64,
66}
67
68impl SqliteStore {
69 pub fn open(database_path: impl AsRef<Path>) -> Result<Self, sqlx::Error> {
70 let database_path = database_path.as_ref().to_path_buf();
71 ensure_parent_directory(&database_path).map_err(sqlx_io_error)?;
72
73 let (result_sender, result_receiver) = mpsc::sync_channel(1);
74 let bootstrap_path = database_path.clone();
75
76 let bootstrap_thread = thread::Builder::new()
77 .name("factstr-sqlite-bootstrap".to_owned())
78 .spawn(move || {
79 let runtime = Builder::new_current_thread()
80 .enable_all()
81 .build()
82 .map_err(sqlx_io_error);
83
84 let result = match runtime {
85 Ok(runtime) => runtime.block_on(async {
86 let pool = open_pool(&bootstrap_path).await?;
87 initialize_schema(&pool).await?;
88 Ok::<_, sqlx::Error>(pool)
89 }),
90 Err(error) => Err(error),
91 };
92
93 let _ = result_sender.send(result);
94 })
95 .map_err(sqlx_io_error)?;
96
97 let pool = match result_receiver.recv() {
98 Ok(result) => result,
99 Err(error) => Err(sqlx_io_error(io::Error::other(format!(
100 "sqlite bootstrap thread did not return a result: {error}"
101 )))),
102 }?;
103
104 bootstrap_thread.join().map_err(|_| {
105 sqlx_io_error(io::Error::other(
106 "sqlite bootstrap thread panicked before open completed",
107 ))
108 })?;
109
110 let subscription_registry = Arc::new(Mutex::new(SubscriptionRegistry::default()));
111 let (delivery_sender, delivery_receiver) = mpsc::channel();
112 let delivery_pool = pool.clone();
113 let delivery_registry = Arc::clone(&subscription_registry);
114 let delivery_thread = thread::Builder::new()
115 .name("factstr-sqlite-delivery".to_owned())
116 .spawn(move || run_delivery_thread(delivery_pool, delivery_registry, delivery_receiver))
117 .map_err(sqlx_io_error)?;
118
119 Ok(Self {
120 database_path,
121 pool,
122 subscription_registry,
123 delivery_sender,
124 delivery_thread: Mutex::new(Some(delivery_thread)),
125 })
126 }
127
128 pub fn database_path(&self) -> &Path {
129 &self.database_path
130 }
131
132 fn run_async<T, Fut, F>(&self, operation: &'static str, work: F) -> Result<T, EventStoreError>
133 where
134 T: Send + 'static,
135 Fut: Future<Output = Result<T, EventStoreError>> + Send + 'static,
136 F: FnOnce(SqlitePool) -> Fut + Send + 'static,
137 {
138 let pool = self.pool.clone();
139 let (result_sender, result_receiver) = mpsc::sync_channel(1);
140
141 let worker_thread = thread::Builder::new()
142 .name(format!("factstr-sqlite-{operation}"))
143 .spawn(move || {
144 let runtime = Builder::new_current_thread()
145 .enable_all()
146 .build()
147 .map_err(sqlx_io_error)
148 .map_err(sqlx_backend_failure);
149
150 let result = match runtime {
151 Ok(runtime) => runtime.block_on(work(pool)),
152 Err(error) => Err(error),
153 };
154
155 let _ = result_sender.send(result);
156 })
157 .map_err(sqlx_io_error)
158 .map_err(sqlx_backend_failure)?;
159
160 let result = result_receiver
161 .recv()
162 .map_err(|error| EventStoreError::BackendFailure {
163 message: format!("sqlite {operation} thread did not return a result: {error}"),
164 })?;
165
166 worker_thread
167 .join()
168 .map_err(|_| EventStoreError::BackendFailure {
169 message: format!("sqlite {operation} thread panicked before completion"),
170 })?;
171
172 result
173 }
174
175 fn pending_deliveries(&self, committed_batch: &[EventRecord]) -> Vec<PendingDelivery> {
176 match self.subscription_registry.lock() {
177 Ok(mut subscription_registry) => {
178 subscription_registry.pending_deliveries(committed_batch)
179 }
180 Err(poisoned) => poisoned.into_inner().pending_deliveries(committed_batch),
181 }
182 }
183
184 fn enqueue_delivery(&self, pending_deliveries: Vec<PendingDelivery>) {
185 if pending_deliveries.is_empty() {
186 return;
187 }
188
189 if let Err(error) = self
190 .delivery_sender
191 .send(DeliveryCommand::Deliver(pending_deliveries))
192 {
193 eprintln!(
194 "factstr-sqlite delivery dispatcher stopped after commit: {}",
195 error
196 );
197 }
198 }
199
200 fn register_all_durable_stream(
201 &self,
202 subscriber_id: impl Into<String>,
203 handle: HandleStream,
204 ) -> Result<EventStream, EventStoreError> {
205 self.subscribe_durable(subscriber_id.into(), EventQuery::all(), handle)
206 }
207
208 fn register_durable_stream(
209 &self,
210 subscriber_id: impl Into<String>,
211 event_query: &EventQuery,
212 handle: HandleStream,
213 ) -> Result<EventStream, EventStoreError> {
214 self.subscribe_durable(subscriber_id.into(), event_query.clone(), handle)
215 }
216
217 fn subscribe_durable(
218 &self,
219 subscriber_id: String,
220 event_query: EventQuery,
221 handle: HandleStream,
222 ) -> Result<EventStream, EventStoreError> {
223 let normalized_event_query = normalized_durable_event_query(&event_query);
224 let event_query_json = serialize_event_query(&normalized_event_query)?;
225 let subscription_registry = Arc::clone(&self.subscription_registry);
226 let durable_subscriber_id = subscriber_id.clone();
227 let normalized_event_query_for_registry = normalized_event_query.clone();
228 let handle_for_registry = handle.clone();
229 let replay_subscriber_id = subscriber_id.clone();
230
231 let replay_state = self.run_async("subscribe_durable", move |pool| async move {
232 let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
233
234 sqlx::query("BEGIN IMMEDIATE")
235 .execute(connection.as_mut())
236 .await
237 .map_err(sqlx_backend_failure)?;
238
239 let last_processed_sequence_number = load_or_create_subscriber_cursor(
240 connection.as_mut(),
241 &durable_subscriber_id,
242 &event_query_json,
243 )
244 .await?;
245 let replay_until_sequence_number =
246 current_max_sequence_number(connection.as_mut()).await?;
247
248 let subscription_id = match subscription_registry.lock() {
249 Ok(mut subscription_registry) => {
250 if normalized_event_query_for_registry.filters.is_none() {
251 subscription_registry
252 .subscribe_all_durable(
253 durable_subscriber_id.clone(),
254 replay_until_sequence_number,
255 handle_for_registry.clone(),
256 )
257 .map_err(subscription_registry_backend_failure)?
258 } else {
259 subscription_registry
260 .subscribe_to_durable(
261 durable_subscriber_id.clone(),
262 Some(normalized_event_query_for_registry.clone()),
263 replay_until_sequence_number,
264 handle_for_registry.clone(),
265 )
266 .map_err(subscription_registry_backend_failure)?
267 }
268 }
269 Err(poisoned) => {
270 let mut subscription_registry = poisoned.into_inner();
271 if normalized_event_query_for_registry.filters.is_none() {
272 subscription_registry
273 .subscribe_all_durable(
274 durable_subscriber_id.clone(),
275 replay_until_sequence_number,
276 handle_for_registry.clone(),
277 )
278 .map_err(subscription_registry_backend_failure)?
279 } else {
280 subscription_registry
281 .subscribe_to_durable(
282 durable_subscriber_id.clone(),
283 Some(normalized_event_query_for_registry.clone()),
284 replay_until_sequence_number,
285 handle_for_registry.clone(),
286 )
287 .map_err(subscription_registry_backend_failure)?
288 }
289 }
290 };
291
292 if let Err(error) = sqlx::query("COMMIT").execute(connection.as_mut()).await {
293 match subscription_registry.lock() {
294 Ok(mut subscription_registry) => {
295 subscription_registry.unsubscribe(subscription_id)
296 }
297 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
298 }
299 return Err(sqlx_backend_failure(error));
300 }
301
302 Ok(DurableReplayState {
303 subscription_id,
304 last_processed_sequence_number,
305 replay_until_sequence_number,
306 })
307 })?;
308
309 let subscription = self.build_subscription_handle(replay_state.subscription_id);
310
311 let replay_batches = match self.run_async("durable_replay", {
312 let event_query = normalized_event_query.clone();
313 move |pool| async move {
314 ensure_replay_history_is_available_for_pool(
315 &pool,
316 replay_state.replay_until_sequence_number,
317 )
318 .await?;
319 load_replay_batches(
320 &pool,
321 &event_query,
322 replay_state.last_processed_sequence_number,
323 replay_state.replay_until_sequence_number,
324 )
325 .await
326 }
327 }) {
328 Ok(replay_batches) => replay_batches,
329 Err(error) => {
330 self.cleanup_durable_subscription(replay_state.subscription_id);
331 return Err(error);
332 }
333 };
334 let delivery_runtime = build_delivery_runtime("factstr-sqlite durable replay")?;
335
336 for replay_batch in replay_batches {
337 let pending_delivery = PendingDelivery {
338 subscription_id: replay_state.subscription_id,
339 durable_subscriber_id: Some(replay_subscriber_id.clone()),
340 last_processed_sequence_number: replay_batch.last_processed_sequence_number,
341 delivered_batch: replay_batch.delivered_batch,
342 handle: handle.clone(),
343 };
344
345 match self.process_durable_delivery(&delivery_runtime, pending_delivery) {
346 Ok(true) => {}
347 Ok(false) => {
348 self.cleanup_durable_subscription(replay_state.subscription_id);
349
350 return Err(EventStoreError::BackendFailure {
351 message: format!(
352 "durable replay for subscriber {} did not complete successfully",
353 replay_subscriber_id
354 ),
355 });
356 }
357 Err(error) => {
358 self.cleanup_durable_subscription(replay_state.subscription_id);
359 return Err(error);
360 }
361 }
362 }
363
364 self.finish_durable_replay(replay_state.subscription_id);
365 Ok(subscription)
366 }
367
368 fn build_subscription_handle(&self, subscription_id: u64) -> EventStream {
369 let subscription_registry = Arc::clone(&self.subscription_registry);
370
371 EventStream::new(
372 subscription_id,
373 Arc::new(move |subscription_id| match subscription_registry.lock() {
374 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
375 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
376 }),
377 )
378 }
379
380 fn process_durable_delivery(
381 &self,
382 delivery_runtime: &Runtime,
383 pending_delivery: PendingDelivery,
384 ) -> Result<bool, EventStoreError> {
385 match pending_delivery.deliver(delivery_runtime) {
386 DeliveryOutcome::Succeeded {
387 durable_subscriber_id,
388 last_processed_sequence_number,
389 ..
390 } => {
391 if let Some(durable_subscriber_id) = durable_subscriber_id {
392 self.run_async("advance_durable_cursor", move |pool| async move {
393 update_subscriber_cursor(
394 &pool,
395 &durable_subscriber_id,
396 last_processed_sequence_number,
397 )
398 .await
399 })?;
400 }
401
402 Ok(true)
403 }
404 DeliveryOutcome::Failed { .. } | DeliveryOutcome::Panicked { .. } => Ok(false),
405 }
406 }
407
408 fn finish_durable_replay(&self, subscription_id: u64) {
409 match self.subscription_registry.lock() {
410 Ok(mut subscription_registry) => {
411 let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
412 self.enqueue_delivery(buffered_deliveries);
413 }
414 Err(poisoned) => {
415 let mut subscription_registry = poisoned.into_inner();
416 let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
417 self.enqueue_delivery(buffered_deliveries);
418 }
419 }
420 }
421
422 fn cleanup_durable_subscription(&self, subscription_id: u64) {
423 match self.subscription_registry.lock() {
424 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
425 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
426 }
427 }
428}
429
430impl Drop for SqliteStore {
431 fn drop(&mut self) {
432 let _ = self.delivery_sender.send(DeliveryCommand::Shutdown);
433
434 if let Ok(mut delivery_thread) = self.delivery_thread.lock() {
435 if let Some(delivery_thread) = delivery_thread.take() {
436 let _ = delivery_thread.join();
437 }
438 }
439 }
440}
441
442impl EventStore for SqliteStore {
443 fn query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
444 let matching_records = self.run_async("query", {
445 let event_query = event_query.clone();
446 move |pool| async move { load_matching_records(&pool, &event_query).await }
447 })?;
448
449 let current_context_version = matching_records
450 .last()
451 .map(|event_record| event_record.sequence_number);
452
453 let event_records = matching_records
454 .into_iter()
455 .filter(|event_record| {
456 event_query
457 .min_sequence_number
458 .is_none_or(|min_sequence_number| {
459 event_record.sequence_number > min_sequence_number
460 })
461 })
462 .collect::<Vec<_>>();
463
464 let last_returned_sequence_number = event_records
465 .last()
466 .map(|event_record| event_record.sequence_number);
467
468 Ok(QueryResult {
469 event_records,
470 last_returned_sequence_number,
471 current_context_version,
472 })
473 }
474
475 fn append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
476 if new_events.is_empty() {
477 return Err(EventStoreError::EmptyAppend);
478 }
479
480 let committed_append = self.run_async("append", move |pool| async move {
481 let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
482
483 sqlx::query("BEGIN IMMEDIATE")
484 .execute(connection.as_mut())
485 .await
486 .map_err(sqlx_backend_failure)?;
487
488 let append_result = append_batch(connection.as_mut(), new_events).await;
489
490 match append_result {
491 Ok(committed_append) => {
492 sqlx::query("COMMIT")
493 .execute(connection.as_mut())
494 .await
495 .map_err(sqlx_backend_failure)?;
496 Ok(committed_append)
497 }
498 Err(error) => {
499 let _ = sqlx::query("ROLLBACK").execute(connection.as_mut()).await;
500 Err(error)
501 }
502 }
503 })?;
504
505 let pending_deliveries = self.pending_deliveries(&committed_append.event_records);
506 self.enqueue_delivery(pending_deliveries);
507 Ok(committed_append.append_result)
508 }
509
510 fn append_if(
511 &self,
512 new_events: Vec<NewEvent>,
513 context_query: &EventQuery,
514 expected_context_version: Option<u64>,
515 ) -> Result<AppendResult, EventStoreError> {
516 if new_events.is_empty() {
517 return Err(EventStoreError::EmptyAppend);
518 }
519
520 let committed_append = self.run_async("append_if", {
521 let context_query = context_query.clone();
522 move |pool| async move {
523 let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
524
525 sqlx::query("BEGIN IMMEDIATE")
526 .execute(connection.as_mut())
527 .await
528 .map_err(sqlx_backend_failure)?;
529
530 let conflict_query = EventQuery {
531 filters: context_query.filters.clone(),
532 min_sequence_number: None,
533 };
534 let actual_context_version =
535 load_matching_records_from_connection(connection.as_mut(), &conflict_query)
536 .await?
537 .last()
538 .map(|event_record| event_record.sequence_number);
539
540 if actual_context_version != expected_context_version {
541 sqlx::query("ROLLBACK")
542 .execute(connection.as_mut())
543 .await
544 .map_err(sqlx_backend_failure)?;
545
546 return Err(EventStoreError::ConditionalAppendConflict {
547 expected: expected_context_version,
548 actual: actual_context_version,
549 });
550 }
551
552 let append_result = append_batch(connection.as_mut(), new_events).await;
553
554 match append_result {
555 Ok(committed_append) => {
556 sqlx::query("COMMIT")
557 .execute(connection.as_mut())
558 .await
559 .map_err(sqlx_backend_failure)?;
560 Ok(committed_append)
561 }
562 Err(error) => {
563 let _ = sqlx::query("ROLLBACK").execute(connection.as_mut()).await;
564 Err(error)
565 }
566 }
567 }
568 })?;
569
570 let pending_deliveries = self.pending_deliveries(&committed_append.event_records);
571 self.enqueue_delivery(pending_deliveries);
572 Ok(committed_append.append_result)
573 }
574
575 fn stream_all(&self, handle: HandleStream) -> Result<EventStream, EventStoreError> {
576 let subscription_registry = Arc::clone(&self.subscription_registry);
577 let id = match subscription_registry.lock() {
578 Ok(mut subscription_registry) => subscription_registry.subscribe_all(handle),
579 Err(poisoned) => poisoned.into_inner().subscribe_all(handle),
580 };
581
582 Ok(EventStream::new(
583 id,
584 Arc::new(move |subscription_id| match subscription_registry.lock() {
585 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
586 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
587 }),
588 ))
589 }
590
591 fn stream_to(
592 &self,
593 event_query: &EventQuery,
594 handle: HandleStream,
595 ) -> Result<EventStream, EventStoreError> {
596 let subscription_registry = Arc::clone(&self.subscription_registry);
597 let id = match subscription_registry.lock() {
598 Ok(mut subscription_registry) => {
599 subscription_registry.subscribe_to(Some(event_query.clone()), handle)
600 }
601 Err(poisoned) => poisoned
602 .into_inner()
603 .subscribe_to(Some(event_query.clone()), handle),
604 };
605
606 Ok(EventStream::new(
607 id,
608 Arc::new(move |subscription_id| match subscription_registry.lock() {
609 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
610 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
611 }),
612 ))
613 }
614
615 fn stream_all_durable(
616 &self,
617 durable_stream: &DurableStream,
618 handle: HandleStream,
619 ) -> Result<EventStream, EventStoreError> {
620 self.register_all_durable_stream(durable_stream.name(), handle)
621 }
622
623 fn stream_to_durable(
624 &self,
625 durable_stream: &DurableStream,
626 event_query: &EventQuery,
627 handle: HandleStream,
628 ) -> Result<EventStream, EventStoreError> {
629 self.register_durable_stream(durable_stream.name(), event_query, handle)
630 }
631}
632
633async fn append_batch(
634 connection: &mut sqlx::SqliteConnection,
635 new_events: Vec<NewEvent>,
636) -> Result<CommittedAppend, EventStoreError> {
637 let last_sequence_number =
638 sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(sequence_number) FROM events")
639 .fetch_one(&mut *connection)
640 .await
641 .map_err(sqlx_backend_failure)?
642 .unwrap_or(0);
643
644 let first_sequence_number = (last_sequence_number as u64) + 1;
645 let committed_count = new_events.len() as u64;
646 let last_sequence_number = first_sequence_number + committed_count - 1;
647
648 let event_records = new_events
649 .into_iter()
650 .enumerate()
651 .map(|(offset, new_event)| EventRecord {
652 sequence_number: first_sequence_number + offset as u64,
653 occurred_at: OffsetDateTime::now_utc(),
654 event_type: new_event.event_type,
655 payload: new_event.payload,
656 })
657 .collect::<Vec<_>>();
658
659 for event_record in &event_records {
660 let payload = serde_json::to_string(&event_record.payload).map_err(json_backend_failure)?;
661 let occurred_at = event_record
662 .occurred_at
663 .format(&Rfc3339)
664 .expect("sqlite occurred_at should format as RFC3339");
665
666 sqlx::query(
667 "INSERT INTO events (sequence_number, occurred_at, event_type, payload)
668 VALUES (?1, ?2, ?3, ?4)",
669 )
670 .bind(event_record.sequence_number as i64)
671 .bind(occurred_at)
672 .bind(&event_record.event_type)
673 .bind(payload)
674 .execute(&mut *connection)
675 .await
676 .map_err(sqlx_backend_failure)?;
677 }
678
679 if first_sequence_number != last_sequence_number {
680 sqlx::query(
681 "INSERT INTO append_batches (first_sequence_number, last_sequence_number)
682 VALUES (?1, ?2)",
683 )
684 .bind(first_sequence_number as i64)
685 .bind(last_sequence_number as i64)
686 .execute(&mut *connection)
687 .await
688 .map_err(sqlx_backend_failure)?;
689 }
690
691 Ok(CommittedAppend {
692 append_result: AppendResult {
693 first_sequence_number,
694 last_sequence_number,
695 committed_count,
696 },
697 event_records,
698 })
699}
700
701async fn load_matching_records(
702 pool: &SqlitePool,
703 event_query: &EventQuery,
704) -> Result<Vec<EventRecord>, EventStoreError> {
705 let candidate_event_types = event_type_candidates(event_query);
706 let rows = fetch_candidate_rows(pool, candidate_event_types.as_deref()).await?;
707 matching_records_from_rows(rows, event_query)
708}
709
710async fn load_matching_records_from_connection(
711 connection: &mut sqlx::SqliteConnection,
712 event_query: &EventQuery,
713) -> Result<Vec<EventRecord>, EventStoreError> {
714 let candidate_event_types = event_type_candidates(event_query);
715 let rows =
716 fetch_candidate_rows_from_connection(connection, candidate_event_types.as_deref()).await?;
717 matching_records_from_rows(rows, event_query)
718}
719
720fn matching_records_from_rows(
721 rows: Vec<SqliteRow>,
722 event_query: &EventQuery,
723) -> Result<Vec<EventRecord>, EventStoreError> {
724 rows.into_iter()
725 .map(row_to_event_record)
726 .filter(|result| {
727 result
728 .as_ref()
729 .is_ok_and(|event_record| matches_query(event_query, event_record))
730 })
731 .collect()
732}
733
734async fn fetch_candidate_rows(
735 pool: &SqlitePool,
736 candidate_event_types: Option<&[String]>,
737) -> Result<Vec<SqliteRow>, EventStoreError> {
738 let mut query_builder = QueryBuilder::<Sqlite>::new(
739 "SELECT sequence_number, occurred_at, event_type, payload FROM events",
740 );
741
742 if let Some(candidate_event_types) = candidate_event_types {
743 if candidate_event_types.is_empty() {
744 query_builder.push(" WHERE FALSE");
745 } else {
746 query_builder.push(" WHERE event_type IN (");
747
748 let mut separated = query_builder.separated(", ");
749 for event_type in candidate_event_types {
750 separated.push_bind(event_type);
751 }
752 separated.push_unseparated(")");
753 }
754 }
755
756 query_builder.push(" ORDER BY sequence_number ASC");
757
758 query_builder
759 .build()
760 .fetch_all(pool)
761 .await
762 .map_err(sqlx_backend_failure)
763}
764
765async fn fetch_candidate_rows_from_connection(
766 connection: &mut sqlx::SqliteConnection,
767 candidate_event_types: Option<&[String]>,
768) -> Result<Vec<SqliteRow>, EventStoreError> {
769 let mut query_builder = QueryBuilder::<Sqlite>::new(
770 "SELECT sequence_number, occurred_at, event_type, payload FROM events",
771 );
772
773 if let Some(candidate_event_types) = candidate_event_types {
774 if candidate_event_types.is_empty() {
775 query_builder.push(" WHERE FALSE");
776 } else {
777 query_builder.push(" WHERE event_type IN (");
778
779 let mut separated = query_builder.separated(", ");
780 for event_type in candidate_event_types {
781 separated.push_bind(event_type);
782 }
783 separated.push_unseparated(")");
784 }
785 }
786
787 query_builder.push(" ORDER BY sequence_number ASC");
788
789 query_builder
790 .build()
791 .fetch_all(connection)
792 .await
793 .map_err(sqlx_backend_failure)
794}
795
796fn event_type_candidates(event_query: &EventQuery) -> Option<Vec<String>> {
797 let filters = match &event_query.filters {
798 None => return None,
799 Some(filters) if filters.is_empty() => return None,
800 Some(filters) => filters,
801 };
802
803 if filters
804 .iter()
805 .any(|event_filter| event_filter.event_types.is_none())
806 {
807 return None;
808 }
809
810 let mut candidate_event_types = Vec::new();
811
812 for event_filter in filters {
813 if let Some(event_types) = &event_filter.event_types {
814 for event_type in event_types {
815 if !candidate_event_types
816 .iter()
817 .any(|known| known == event_type)
818 {
819 candidate_event_types.push(event_type.clone());
820 }
821 }
822 }
823 }
824
825 Some(candidate_event_types)
826}
827
828fn row_to_event_record(row: SqliteRow) -> Result<EventRecord, EventStoreError> {
829 let payload_text = row.get::<String, _>("payload");
830 let payload: Value = serde_json::from_str(&payload_text).map_err(json_backend_failure)?;
831
832 Ok(EventRecord {
833 sequence_number: row.get::<i64, _>("sequence_number") as u64,
834 occurred_at: OffsetDateTime::parse(&row.get::<String, _>("occurred_at"), &Rfc3339)
835 .map_err(time_backend_failure)?,
836 event_type: row.get::<String, _>("event_type"),
837 payload,
838 })
839}
840
841fn ensure_parent_directory(database_path: &Path) -> Result<(), io::Error> {
842 if let Some(parent_directory) = database_path.parent() {
843 if !parent_directory.as_os_str().is_empty() {
844 fs::create_dir_all(parent_directory)?;
845 }
846 }
847
848 Ok(())
849}
850
851fn sqlx_io_error(error: io::Error) -> sqlx::Error {
852 sqlx::Error::Io(error)
853}
854
855fn sqlx_backend_failure(error: sqlx::Error) -> EventStoreError {
856 EventStoreError::BackendFailure {
857 message: error.to_string(),
858 }
859}
860
861fn json_backend_failure(error: serde_json::Error) -> EventStoreError {
862 EventStoreError::BackendFailure {
863 message: error.to_string(),
864 }
865}
866
867fn time_backend_failure(error: time::error::Parse) -> EventStoreError {
868 EventStoreError::BackendFailure {
869 message: error.to_string(),
870 }
871}
872
873fn subscription_registry_backend_failure(message: String) -> EventStoreError {
874 EventStoreError::BackendFailure { message }
875}
876
877fn build_delivery_runtime(operation: &'static str) -> Result<Runtime, EventStoreError> {
878 Builder::new_current_thread()
879 .enable_all()
880 .build()
881 .map_err(sqlx_io_error)
882 .map_err(sqlx_backend_failure)
883 .map_err(|error| match error {
884 EventStoreError::BackendFailure { message } => EventStoreError::BackendFailure {
885 message: format!("{operation}: {message}"),
886 },
887 other => other,
888 })
889}
890
891async fn load_or_create_subscriber_cursor(
892 connection: &mut sqlx::SqliteConnection,
893 subscriber_id: &str,
894 event_query_json: &str,
895) -> Result<u64, EventStoreError> {
896 let existing_cursor = sqlx::query(
897 "SELECT event_query, last_processed_sequence_number
898 FROM subscriber_cursors
899 WHERE subscriber_id = ?1",
900 )
901 .bind(subscriber_id)
902 .fetch_optional(&mut *connection)
903 .await
904 .map_err(sqlx_backend_failure)?;
905
906 match existing_cursor {
907 Some(row) => {
908 let stored_event_query = row.get::<String, _>("event_query");
909 if stored_event_query != event_query_json {
910 return Err(EventStoreError::BackendFailure {
911 message: format!(
912 "durable subscriber {subscriber_id} was resumed with a different query"
913 ),
914 });
915 }
916
917 Ok(row.get::<i64, _>("last_processed_sequence_number") as u64)
918 }
919 None => {
920 sqlx::query(
921 "INSERT INTO subscriber_cursors (subscriber_id, event_query, last_processed_sequence_number)
922 VALUES (?1, ?2, 0)",
923 )
924 .bind(subscriber_id)
925 .bind(event_query_json)
926 .execute(&mut *connection)
927 .await
928 .map_err(sqlx_backend_failure)?;
929
930 Ok(0)
931 }
932 }
933}
934
935async fn current_max_sequence_number(
936 connection: &mut sqlx::SqliteConnection,
937) -> Result<u64, EventStoreError> {
938 Ok(
939 sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(sequence_number) FROM events")
940 .fetch_one(&mut *connection)
941 .await
942 .map_err(sqlx_backend_failure)?
943 .unwrap_or(0) as u64,
944 )
945}
946
947async fn ensure_replay_history_is_available(
948 connection: &mut sqlx::SqliteConnection,
949 _current_max_sequence_number: u64,
950) -> Result<(), EventStoreError> {
951 let boundary_format =
952 sqlx::query_scalar::<_, Option<String>>("SELECT value FROM store_metadata WHERE key = ?1")
953 .bind(APPEND_BATCH_BOUNDARY_FORMAT_KEY)
954 .fetch_optional(&mut *connection)
955 .await
956 .map_err(sqlx_backend_failure)?;
957
958 if boundary_format.flatten().as_deref() != Some(APPEND_BATCH_BOUNDARY_FORMAT_SPARSE_V1) {
959 return Err(EventStoreError::BackendFailure {
960 message: format!(
961 "durable replay requires store_metadata {}={}",
962 APPEND_BATCH_BOUNDARY_FORMAT_KEY, APPEND_BATCH_BOUNDARY_FORMAT_SPARSE_V1
963 ),
964 });
965 }
966
967 Ok(())
968}
969
970async fn ensure_replay_history_is_available_for_pool(
971 pool: &SqlitePool,
972 current_max_sequence_number: u64,
973) -> Result<(), EventStoreError> {
974 let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
975 ensure_replay_history_is_available(connection.as_mut(), current_max_sequence_number).await
976}
977
978async fn update_subscriber_cursor(
979 pool: &SqlitePool,
980 subscriber_id: &str,
981 last_processed_sequence_number: u64,
982) -> Result<(), EventStoreError> {
983 sqlx::query(
984 "UPDATE subscriber_cursors
985 SET last_processed_sequence_number = ?2
986 WHERE subscriber_id = ?1",
987 )
988 .bind(subscriber_id)
989 .bind(last_processed_sequence_number as i64)
990 .execute(pool)
991 .await
992 .map_err(sqlx_backend_failure)?;
993
994 Ok(())
995}
996
997async fn load_replay_batches(
998 pool: &SqlitePool,
999 event_query: &EventQuery,
1000 last_processed_sequence_number: u64,
1001 replay_until_sequence_number: u64,
1002) -> Result<Vec<ReplayBatch>, EventStoreError> {
1003 if replay_until_sequence_number <= last_processed_sequence_number {
1004 return Ok(Vec::new());
1005 }
1006
1007 let batch_rows = sqlx::query(
1008 "SELECT first_sequence_number, last_sequence_number
1009 FROM append_batches
1010 WHERE last_sequence_number > ?1
1011 AND first_sequence_number <= ?2
1012 ORDER BY first_sequence_number ASC",
1013 )
1014 .bind(last_processed_sequence_number as i64)
1015 .bind(replay_until_sequence_number as i64)
1016 .fetch_all(pool)
1017 .await
1018 .map_err(sqlx_backend_failure)?;
1019
1020 let append_batch_boundaries = batch_rows
1021 .into_iter()
1022 .map(|batch_row| AppendBatchBoundary {
1023 first_sequence_number: batch_row.get::<i64, _>("first_sequence_number") as u64,
1024 last_sequence_number: batch_row.get::<i64, _>("last_sequence_number") as u64,
1025 })
1026 .collect::<Vec<_>>();
1027 let event_records = sqlx::query(
1028 "SELECT sequence_number, occurred_at, event_type, payload
1029 FROM events
1030 WHERE sequence_number > ?1
1031 AND sequence_number <= ?2
1032 ORDER BY sequence_number ASC",
1033 )
1034 .bind(last_processed_sequence_number as i64)
1035 .bind(replay_until_sequence_number as i64)
1036 .fetch_all(pool)
1037 .await
1038 .map_err(sqlx_backend_failure)?
1039 .into_iter()
1040 .map(row_to_event_record)
1041 .collect::<Result<Vec<_>, _>>()?;
1042
1043 Ok(replay_batches_from_records(
1044 event_query,
1045 event_records,
1046 &append_batch_boundaries,
1047 ))
1048}
1049
1050fn replay_batches_from_records(
1051 event_query: &EventQuery,
1052 event_records: Vec<EventRecord>,
1053 append_batch_boundaries: &[AppendBatchBoundary],
1054) -> Vec<ReplayBatch> {
1055 let mut replay_batches = Vec::new();
1056 let mut event_index = 0;
1057 let mut boundary_index = 0;
1058
1059 while event_index < event_records.len() {
1060 let next_sequence_number = event_records[event_index].sequence_number;
1061
1062 while boundary_index < append_batch_boundaries.len()
1063 && append_batch_boundaries[boundary_index].last_sequence_number < next_sequence_number
1064 {
1065 boundary_index += 1;
1066 }
1067
1068 if boundary_index < append_batch_boundaries.len() {
1069 let boundary = &append_batch_boundaries[boundary_index];
1070 if boundary.first_sequence_number <= next_sequence_number
1071 && next_sequence_number <= boundary.last_sequence_number
1072 {
1073 let batch_start = event_index;
1074 while event_index < event_records.len()
1075 && event_records[event_index].sequence_number <= boundary.last_sequence_number
1076 {
1077 event_index += 1;
1078 }
1079
1080 let delivered_batch = event_records[batch_start..event_index]
1081 .iter()
1082 .filter(|event_record| matches_query(event_query, event_record))
1083 .cloned()
1084 .collect();
1085
1086 replay_batches.push(ReplayBatch {
1087 last_processed_sequence_number: boundary.last_sequence_number,
1088 delivered_batch,
1089 });
1090 continue;
1091 }
1092 }
1093
1094 let event_record = event_records[event_index].clone();
1095 event_index += 1;
1096
1097 let delivered_batch = if matches_query(event_query, &event_record) {
1098 vec![event_record.clone()]
1099 } else {
1100 Vec::new()
1101 };
1102
1103 replay_batches.push(ReplayBatch {
1104 last_processed_sequence_number: event_record.sequence_number,
1105 delivered_batch,
1106 });
1107 }
1108
1109 replay_batches
1110}
1111
1112fn normalized_durable_event_query(event_query: &EventQuery) -> EventQuery {
1113 EventQuery {
1114 filters: event_query.filters.clone(),
1115 min_sequence_number: None,
1116 }
1117}
1118
1119fn serialize_event_query(event_query: &EventQuery) -> Result<String, EventStoreError> {
1120 let filters = event_query.filters.as_ref().map(|filters| {
1121 filters
1122 .iter()
1123 .map(|filter| {
1124 serde_json::json!({
1125 "event_types": filter.event_types,
1126 "payload_predicates": filter.payload_predicates,
1127 })
1128 })
1129 .collect::<Vec<_>>()
1130 });
1131
1132 serde_json::to_string(&serde_json::json!({
1133 "filters": filters,
1134 "min_sequence_number": event_query.min_sequence_number,
1135 }))
1136 .map_err(json_backend_failure)
1137}
1138
1139fn run_delivery_thread(
1140 pool: SqlitePool,
1141 subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
1142 delivery_receiver: Receiver<DeliveryCommand>,
1143) {
1144 let runtime = match Builder::new_current_thread().enable_all().build() {
1145 Ok(runtime) => runtime,
1146 Err(error) => {
1147 eprintln!("factstr-sqlite delivery runtime could not start: {}", error);
1148 return;
1149 }
1150 };
1151
1152 while let Ok(delivery_command) = delivery_receiver.recv() {
1153 match delivery_command {
1154 DeliveryCommand::Deliver(pending_deliveries) => {
1155 for pending_delivery in pending_deliveries {
1156 match pending_delivery.deliver(&runtime) {
1157 DeliveryOutcome::Succeeded {
1158 subscription_id,
1159 durable_subscriber_id,
1160 last_processed_sequence_number,
1161 } => {
1162 if let Some(durable_subscriber_id) = durable_subscriber_id {
1163 if let Err(error) = runtime.block_on(update_subscriber_cursor(
1164 &pool,
1165 &durable_subscriber_id,
1166 last_processed_sequence_number,
1167 )) {
1168 eprintln!(
1169 "factstr-sqlite durable cursor update failed after delivery for subscriber {}: {}",
1170 durable_subscriber_id, error
1171 );
1172 match subscription_registry.lock() {
1173 Ok(mut subscription_registry) => {
1174 subscription_registry.unsubscribe(subscription_id)
1175 }
1176 Err(poisoned) => {
1177 poisoned.into_inner().unsubscribe(subscription_id)
1178 }
1179 }
1180 }
1181 }
1182 }
1183 DeliveryOutcome::Failed {
1184 subscription_id,
1185 durable_subscriber_id,
1186 }
1187 | DeliveryOutcome::Panicked {
1188 subscription_id,
1189 durable_subscriber_id,
1190 } => {
1191 if durable_subscriber_id.is_some() {
1192 match subscription_registry.lock() {
1193 Ok(mut subscription_registry) => {
1194 subscription_registry.unsubscribe(subscription_id)
1195 }
1196 Err(poisoned) => {
1197 poisoned.into_inner().unsubscribe(subscription_id)
1198 }
1199 }
1200 }
1201 }
1202 }
1203 }
1204 }
1205 DeliveryCommand::Shutdown => break,
1206 }
1207 }
1208}