1use std::fs;
2use std::future::Future;
3use std::io;
4use std::path::{Path, PathBuf};
5use std::sync::{
6 Arc, Mutex,
7 mpsc::{self, Receiver, Sender},
8};
9use std::thread::{self, JoinHandle};
10
11use factstr::{
12 AppendResult, DurableStream, EventQuery, EventRecord, EventStore, EventStoreError, EventStream,
13 HandleStream, NewEvent, QueryResult,
14};
15use serde_json::Value;
16use sqlx::sqlite::SqliteRow;
17use sqlx::{QueryBuilder, Row, Sqlite, SqlitePool};
18use time::OffsetDateTime;
19use time::format_description::well_known::Rfc3339;
20use tokio::runtime::Builder;
21use tokio::runtime::Runtime;
22
23use crate::connection::open_pool;
24use crate::query_match::matches_query;
25use crate::schema::initialize_schema;
26use crate::stream_registry::{DeliveryOutcome, PendingDelivery, SubscriptionRegistry};
27
28#[derive(Clone, Debug)]
29struct CommittedAppend {
30 append_result: AppendResult,
31 event_records: Vec<EventRecord>,
32}
33
34enum DeliveryCommand {
35 Deliver(Vec<PendingDelivery>),
36 Shutdown,
37}
38
39pub struct SqliteStore {
40 database_path: PathBuf,
41 pool: SqlitePool,
42 subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
43 delivery_sender: Sender<DeliveryCommand>,
44 delivery_thread: Mutex<Option<JoinHandle<()>>>,
45}
46
47#[derive(Clone, Debug)]
48struct ReplayBatch {
49 last_processed_sequence_number: u64,
50 delivered_batch: Vec<EventRecord>,
51}
52
53#[derive(Clone, Debug)]
54struct DurableReplayState {
55 subscription_id: u64,
56 last_processed_sequence_number: u64,
57 replay_until_sequence_number: u64,
58}
59
60impl SqliteStore {
61 pub fn open(database_path: impl AsRef<Path>) -> Result<Self, sqlx::Error> {
62 let database_path = database_path.as_ref().to_path_buf();
63 ensure_parent_directory(&database_path).map_err(sqlx_io_error)?;
64
65 let (result_sender, result_receiver) = mpsc::sync_channel(1);
66 let bootstrap_path = database_path.clone();
67
68 let bootstrap_thread = thread::Builder::new()
69 .name("factstr-sqlite-bootstrap".to_owned())
70 .spawn(move || {
71 let runtime = Builder::new_current_thread()
72 .enable_all()
73 .build()
74 .map_err(sqlx_io_error);
75
76 let result = match runtime {
77 Ok(runtime) => runtime.block_on(async {
78 let pool = open_pool(&bootstrap_path).await?;
79 initialize_schema(&pool).await?;
80 Ok::<_, sqlx::Error>(pool)
81 }),
82 Err(error) => Err(error),
83 };
84
85 let _ = result_sender.send(result);
86 })
87 .map_err(sqlx_io_error)?;
88
89 let pool = match result_receiver.recv() {
90 Ok(result) => result,
91 Err(error) => Err(sqlx_io_error(io::Error::other(format!(
92 "sqlite bootstrap thread did not return a result: {error}"
93 )))),
94 }?;
95
96 bootstrap_thread.join().map_err(|_| {
97 sqlx_io_error(io::Error::other(
98 "sqlite bootstrap thread panicked before open completed",
99 ))
100 })?;
101
102 let subscription_registry = Arc::new(Mutex::new(SubscriptionRegistry::default()));
103 let (delivery_sender, delivery_receiver) = mpsc::channel();
104 let delivery_pool = pool.clone();
105 let delivery_registry = Arc::clone(&subscription_registry);
106 let delivery_thread = thread::Builder::new()
107 .name("factstr-sqlite-delivery".to_owned())
108 .spawn(move || run_delivery_thread(delivery_pool, delivery_registry, delivery_receiver))
109 .map_err(sqlx_io_error)?;
110
111 Ok(Self {
112 database_path,
113 pool,
114 subscription_registry,
115 delivery_sender,
116 delivery_thread: Mutex::new(Some(delivery_thread)),
117 })
118 }
119
120 pub fn database_path(&self) -> &Path {
121 &self.database_path
122 }
123
124 fn run_async<T, Fut, F>(&self, operation: &'static str, work: F) -> Result<T, EventStoreError>
125 where
126 T: Send + 'static,
127 Fut: Future<Output = Result<T, EventStoreError>> + Send + 'static,
128 F: FnOnce(SqlitePool) -> Fut + Send + 'static,
129 {
130 let pool = self.pool.clone();
131 let (result_sender, result_receiver) = mpsc::sync_channel(1);
132
133 let worker_thread = thread::Builder::new()
134 .name(format!("factstr-sqlite-{operation}"))
135 .spawn(move || {
136 let runtime = Builder::new_current_thread()
137 .enable_all()
138 .build()
139 .map_err(sqlx_io_error)
140 .map_err(sqlx_backend_failure);
141
142 let result = match runtime {
143 Ok(runtime) => runtime.block_on(work(pool)),
144 Err(error) => Err(error),
145 };
146
147 let _ = result_sender.send(result);
148 })
149 .map_err(sqlx_io_error)
150 .map_err(sqlx_backend_failure)?;
151
152 let result = result_receiver
153 .recv()
154 .map_err(|error| EventStoreError::BackendFailure {
155 message: format!("sqlite {operation} thread did not return a result: {error}"),
156 })?;
157
158 worker_thread
159 .join()
160 .map_err(|_| EventStoreError::BackendFailure {
161 message: format!("sqlite {operation} thread panicked before completion"),
162 })?;
163
164 result
165 }
166
167 fn pending_deliveries(&self, committed_batch: &[EventRecord]) -> Vec<PendingDelivery> {
168 match self.subscription_registry.lock() {
169 Ok(mut subscription_registry) => {
170 subscription_registry.pending_deliveries(committed_batch)
171 }
172 Err(poisoned) => poisoned.into_inner().pending_deliveries(committed_batch),
173 }
174 }
175
176 fn enqueue_delivery(&self, pending_deliveries: Vec<PendingDelivery>) {
177 if pending_deliveries.is_empty() {
178 return;
179 }
180
181 if let Err(error) = self
182 .delivery_sender
183 .send(DeliveryCommand::Deliver(pending_deliveries))
184 {
185 eprintln!(
186 "factstr-sqlite delivery dispatcher stopped after commit: {}",
187 error
188 );
189 }
190 }
191
192 fn register_all_durable_stream(
193 &self,
194 subscriber_id: impl Into<String>,
195 handle: HandleStream,
196 ) -> Result<EventStream, EventStoreError> {
197 self.subscribe_durable(subscriber_id.into(), EventQuery::all(), handle)
198 }
199
200 fn register_durable_stream(
201 &self,
202 subscriber_id: impl Into<String>,
203 event_query: &EventQuery,
204 handle: HandleStream,
205 ) -> Result<EventStream, EventStoreError> {
206 self.subscribe_durable(subscriber_id.into(), event_query.clone(), handle)
207 }
208
209 fn subscribe_durable(
210 &self,
211 subscriber_id: String,
212 event_query: EventQuery,
213 handle: HandleStream,
214 ) -> Result<EventStream, EventStoreError> {
215 let normalized_event_query = normalized_durable_event_query(&event_query);
216 let event_query_json = serialize_event_query(&normalized_event_query)?;
217 let subscription_registry = Arc::clone(&self.subscription_registry);
218 let durable_subscriber_id = subscriber_id.clone();
219 let normalized_event_query_for_registry = normalized_event_query.clone();
220 let handle_for_registry = handle.clone();
221 let replay_subscriber_id = subscriber_id.clone();
222
223 let replay_state = self.run_async("subscribe_durable", move |pool| async move {
224 let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
225
226 sqlx::query("BEGIN IMMEDIATE")
227 .execute(connection.as_mut())
228 .await
229 .map_err(sqlx_backend_failure)?;
230
231 let last_processed_sequence_number = load_or_create_subscriber_cursor(
232 connection.as_mut(),
233 &durable_subscriber_id,
234 &event_query_json,
235 )
236 .await?;
237 let replay_until_sequence_number =
238 current_max_sequence_number(connection.as_mut()).await?;
239
240 let subscription_id = match subscription_registry.lock() {
241 Ok(mut subscription_registry) => {
242 if normalized_event_query_for_registry.filters.is_none() {
243 subscription_registry
244 .subscribe_all_durable(
245 durable_subscriber_id.clone(),
246 replay_until_sequence_number,
247 handle_for_registry.clone(),
248 )
249 .map_err(subscription_registry_backend_failure)?
250 } else {
251 subscription_registry
252 .subscribe_to_durable(
253 durable_subscriber_id.clone(),
254 Some(normalized_event_query_for_registry.clone()),
255 replay_until_sequence_number,
256 handle_for_registry.clone(),
257 )
258 .map_err(subscription_registry_backend_failure)?
259 }
260 }
261 Err(poisoned) => {
262 let mut subscription_registry = poisoned.into_inner();
263 if normalized_event_query_for_registry.filters.is_none() {
264 subscription_registry
265 .subscribe_all_durable(
266 durable_subscriber_id.clone(),
267 replay_until_sequence_number,
268 handle_for_registry.clone(),
269 )
270 .map_err(subscription_registry_backend_failure)?
271 } else {
272 subscription_registry
273 .subscribe_to_durable(
274 durable_subscriber_id.clone(),
275 Some(normalized_event_query_for_registry.clone()),
276 replay_until_sequence_number,
277 handle_for_registry.clone(),
278 )
279 .map_err(subscription_registry_backend_failure)?
280 }
281 }
282 };
283
284 if let Err(error) = sqlx::query("COMMIT").execute(connection.as_mut()).await {
285 match subscription_registry.lock() {
286 Ok(mut subscription_registry) => {
287 subscription_registry.unsubscribe(subscription_id)
288 }
289 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
290 }
291 return Err(sqlx_backend_failure(error));
292 }
293
294 Ok(DurableReplayState {
295 subscription_id,
296 last_processed_sequence_number,
297 replay_until_sequence_number,
298 })
299 })?;
300
301 let subscription = self.build_subscription_handle(replay_state.subscription_id);
302
303 let replay_batches = match self.run_async("durable_replay", {
304 let event_query = normalized_event_query.clone();
305 move |pool| async move {
306 ensure_replay_history_is_available_for_pool(
307 &pool,
308 replay_state.replay_until_sequence_number,
309 )
310 .await?;
311 load_replay_batches(
312 &pool,
313 &event_query,
314 replay_state.last_processed_sequence_number,
315 replay_state.replay_until_sequence_number,
316 )
317 .await
318 }
319 }) {
320 Ok(replay_batches) => replay_batches,
321 Err(error) => {
322 self.cleanup_durable_subscription(replay_state.subscription_id);
323 return Err(error);
324 }
325 };
326 let delivery_runtime = build_delivery_runtime("factstr-sqlite durable replay")?;
327
328 for replay_batch in replay_batches {
329 let pending_delivery = PendingDelivery {
330 subscription_id: replay_state.subscription_id,
331 durable_subscriber_id: Some(replay_subscriber_id.clone()),
332 last_processed_sequence_number: replay_batch.last_processed_sequence_number,
333 delivered_batch: replay_batch.delivered_batch,
334 handle: handle.clone(),
335 };
336
337 match self.process_durable_delivery(&delivery_runtime, pending_delivery) {
338 Ok(true) => {}
339 Ok(false) => {
340 self.cleanup_durable_subscription(replay_state.subscription_id);
341
342 return Err(EventStoreError::BackendFailure {
343 message: format!(
344 "durable replay for subscriber {} did not complete successfully",
345 replay_subscriber_id
346 ),
347 });
348 }
349 Err(error) => {
350 self.cleanup_durable_subscription(replay_state.subscription_id);
351 return Err(error);
352 }
353 }
354 }
355
356 self.finish_durable_replay(replay_state.subscription_id);
357 Ok(subscription)
358 }
359
360 fn build_subscription_handle(&self, subscription_id: u64) -> EventStream {
361 let subscription_registry = Arc::clone(&self.subscription_registry);
362
363 EventStream::new(
364 subscription_id,
365 Arc::new(move |subscription_id| match subscription_registry.lock() {
366 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
367 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
368 }),
369 )
370 }
371
372 fn process_durable_delivery(
373 &self,
374 delivery_runtime: &Runtime,
375 pending_delivery: PendingDelivery,
376 ) -> Result<bool, EventStoreError> {
377 match pending_delivery.deliver(delivery_runtime) {
378 DeliveryOutcome::Succeeded {
379 durable_subscriber_id,
380 last_processed_sequence_number,
381 ..
382 } => {
383 if let Some(durable_subscriber_id) = durable_subscriber_id {
384 self.run_async("advance_durable_cursor", move |pool| async move {
385 update_subscriber_cursor(
386 &pool,
387 &durable_subscriber_id,
388 last_processed_sequence_number,
389 )
390 .await
391 })?;
392 }
393
394 Ok(true)
395 }
396 DeliveryOutcome::Failed { .. } | DeliveryOutcome::Panicked { .. } => Ok(false),
397 }
398 }
399
400 fn finish_durable_replay(&self, subscription_id: u64) {
401 match self.subscription_registry.lock() {
402 Ok(mut subscription_registry) => {
403 let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
404 self.enqueue_delivery(buffered_deliveries);
405 }
406 Err(poisoned) => {
407 let mut subscription_registry = poisoned.into_inner();
408 let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
409 self.enqueue_delivery(buffered_deliveries);
410 }
411 }
412 }
413
414 fn cleanup_durable_subscription(&self, subscription_id: u64) {
415 match self.subscription_registry.lock() {
416 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
417 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
418 }
419 }
420}
421
422impl Drop for SqliteStore {
423 fn drop(&mut self) {
424 let _ = self.delivery_sender.send(DeliveryCommand::Shutdown);
425
426 if let Ok(mut delivery_thread) = self.delivery_thread.lock() {
427 if let Some(delivery_thread) = delivery_thread.take() {
428 let _ = delivery_thread.join();
429 }
430 }
431 }
432}
433
434impl EventStore for SqliteStore {
435 fn query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
436 let matching_records = self.run_async("query", {
437 let event_query = event_query.clone();
438 move |pool| async move { load_matching_records(&pool, &event_query).await }
439 })?;
440
441 let current_context_version = matching_records
442 .last()
443 .map(|event_record| event_record.sequence_number);
444
445 let event_records = matching_records
446 .into_iter()
447 .filter(|event_record| {
448 event_query
449 .min_sequence_number
450 .is_none_or(|min_sequence_number| {
451 event_record.sequence_number > min_sequence_number
452 })
453 })
454 .collect::<Vec<_>>();
455
456 let last_returned_sequence_number = event_records
457 .last()
458 .map(|event_record| event_record.sequence_number);
459
460 Ok(QueryResult {
461 event_records,
462 last_returned_sequence_number,
463 current_context_version,
464 })
465 }
466
467 fn append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
468 if new_events.is_empty() {
469 return Err(EventStoreError::EmptyAppend);
470 }
471
472 let committed_append = self.run_async("append", move |pool| async move {
473 let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
474
475 sqlx::query("BEGIN IMMEDIATE")
476 .execute(connection.as_mut())
477 .await
478 .map_err(sqlx_backend_failure)?;
479
480 let append_result = append_batch(connection.as_mut(), new_events).await;
481
482 match append_result {
483 Ok(committed_append) => {
484 sqlx::query("COMMIT")
485 .execute(connection.as_mut())
486 .await
487 .map_err(sqlx_backend_failure)?;
488 Ok(committed_append)
489 }
490 Err(error) => {
491 let _ = sqlx::query("ROLLBACK").execute(connection.as_mut()).await;
492 Err(error)
493 }
494 }
495 })?;
496
497 let pending_deliveries = self.pending_deliveries(&committed_append.event_records);
498 self.enqueue_delivery(pending_deliveries);
499 Ok(committed_append.append_result)
500 }
501
502 fn append_if(
503 &self,
504 new_events: Vec<NewEvent>,
505 context_query: &EventQuery,
506 expected_context_version: Option<u64>,
507 ) -> Result<AppendResult, EventStoreError> {
508 if new_events.is_empty() {
509 return Err(EventStoreError::EmptyAppend);
510 }
511
512 let committed_append = self.run_async("append_if", {
513 let context_query = context_query.clone();
514 move |pool| async move {
515 let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
516
517 sqlx::query("BEGIN IMMEDIATE")
518 .execute(connection.as_mut())
519 .await
520 .map_err(sqlx_backend_failure)?;
521
522 let conflict_query = EventQuery {
523 filters: context_query.filters.clone(),
524 min_sequence_number: None,
525 };
526 let actual_context_version =
527 load_matching_records_from_connection(connection.as_mut(), &conflict_query)
528 .await?
529 .last()
530 .map(|event_record| event_record.sequence_number);
531
532 if actual_context_version != expected_context_version {
533 sqlx::query("ROLLBACK")
534 .execute(connection.as_mut())
535 .await
536 .map_err(sqlx_backend_failure)?;
537
538 return Err(EventStoreError::ConditionalAppendConflict {
539 expected: expected_context_version,
540 actual: actual_context_version,
541 });
542 }
543
544 let append_result = append_batch(connection.as_mut(), new_events).await;
545
546 match append_result {
547 Ok(committed_append) => {
548 sqlx::query("COMMIT")
549 .execute(connection.as_mut())
550 .await
551 .map_err(sqlx_backend_failure)?;
552 Ok(committed_append)
553 }
554 Err(error) => {
555 let _ = sqlx::query("ROLLBACK").execute(connection.as_mut()).await;
556 Err(error)
557 }
558 }
559 }
560 })?;
561
562 let pending_deliveries = self.pending_deliveries(&committed_append.event_records);
563 self.enqueue_delivery(pending_deliveries);
564 Ok(committed_append.append_result)
565 }
566
567 fn stream_all(&self, handle: HandleStream) -> Result<EventStream, EventStoreError> {
568 let subscription_registry = Arc::clone(&self.subscription_registry);
569 let id = match subscription_registry.lock() {
570 Ok(mut subscription_registry) => subscription_registry.subscribe_all(handle),
571 Err(poisoned) => poisoned.into_inner().subscribe_all(handle),
572 };
573
574 Ok(EventStream::new(
575 id,
576 Arc::new(move |subscription_id| match subscription_registry.lock() {
577 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
578 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
579 }),
580 ))
581 }
582
583 fn stream_to(
584 &self,
585 event_query: &EventQuery,
586 handle: HandleStream,
587 ) -> Result<EventStream, EventStoreError> {
588 let subscription_registry = Arc::clone(&self.subscription_registry);
589 let id = match subscription_registry.lock() {
590 Ok(mut subscription_registry) => {
591 subscription_registry.subscribe_to(Some(event_query.clone()), handle)
592 }
593 Err(poisoned) => poisoned
594 .into_inner()
595 .subscribe_to(Some(event_query.clone()), handle),
596 };
597
598 Ok(EventStream::new(
599 id,
600 Arc::new(move |subscription_id| match subscription_registry.lock() {
601 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
602 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
603 }),
604 ))
605 }
606
607 fn stream_all_durable(
608 &self,
609 durable_stream: &DurableStream,
610 handle: HandleStream,
611 ) -> Result<EventStream, EventStoreError> {
612 self.register_all_durable_stream(durable_stream.name(), handle)
613 }
614
615 fn stream_to_durable(
616 &self,
617 durable_stream: &DurableStream,
618 event_query: &EventQuery,
619 handle: HandleStream,
620 ) -> Result<EventStream, EventStoreError> {
621 self.register_durable_stream(durable_stream.name(), event_query, handle)
622 }
623}
624
625async fn append_batch(
626 connection: &mut sqlx::SqliteConnection,
627 new_events: Vec<NewEvent>,
628) -> Result<CommittedAppend, EventStoreError> {
629 let last_sequence_number =
630 sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(sequence_number) FROM events")
631 .fetch_one(&mut *connection)
632 .await
633 .map_err(sqlx_backend_failure)?
634 .unwrap_or(0);
635
636 let first_sequence_number = (last_sequence_number as u64) + 1;
637 let committed_count = new_events.len() as u64;
638 let last_sequence_number = first_sequence_number + committed_count - 1;
639
640 let event_records = new_events
641 .into_iter()
642 .enumerate()
643 .map(|(offset, new_event)| EventRecord {
644 sequence_number: first_sequence_number + offset as u64,
645 occurred_at: OffsetDateTime::now_utc(),
646 event_type: new_event.event_type,
647 payload: new_event.payload,
648 })
649 .collect::<Vec<_>>();
650
651 for event_record in &event_records {
652 let payload = serde_json::to_string(&event_record.payload).map_err(json_backend_failure)?;
653 let occurred_at = event_record
654 .occurred_at
655 .format(&Rfc3339)
656 .expect("sqlite occurred_at should format as RFC3339");
657
658 sqlx::query(
659 "INSERT INTO events (sequence_number, occurred_at, event_type, payload)
660 VALUES (?1, ?2, ?3, ?4)",
661 )
662 .bind(event_record.sequence_number as i64)
663 .bind(occurred_at)
664 .bind(&event_record.event_type)
665 .bind(payload)
666 .execute(&mut *connection)
667 .await
668 .map_err(sqlx_backend_failure)?;
669 }
670
671 sqlx::query(
672 "INSERT INTO append_batches (first_sequence_number, last_sequence_number)
673 VALUES (?1, ?2)",
674 )
675 .bind(first_sequence_number as i64)
676 .bind(last_sequence_number as i64)
677 .execute(&mut *connection)
678 .await
679 .map_err(sqlx_backend_failure)?;
680
681 Ok(CommittedAppend {
682 append_result: AppendResult {
683 first_sequence_number,
684 last_sequence_number,
685 committed_count,
686 },
687 event_records,
688 })
689}
690
691async fn load_matching_records(
692 pool: &SqlitePool,
693 event_query: &EventQuery,
694) -> Result<Vec<EventRecord>, EventStoreError> {
695 let candidate_event_types = event_type_candidates(event_query);
696 let rows = fetch_candidate_rows(pool, candidate_event_types.as_deref()).await?;
697 matching_records_from_rows(rows, event_query)
698}
699
700async fn load_matching_records_from_connection(
701 connection: &mut sqlx::SqliteConnection,
702 event_query: &EventQuery,
703) -> Result<Vec<EventRecord>, EventStoreError> {
704 let candidate_event_types = event_type_candidates(event_query);
705 let rows =
706 fetch_candidate_rows_from_connection(connection, candidate_event_types.as_deref()).await?;
707 matching_records_from_rows(rows, event_query)
708}
709
710fn matching_records_from_rows(
711 rows: Vec<SqliteRow>,
712 event_query: &EventQuery,
713) -> Result<Vec<EventRecord>, EventStoreError> {
714 rows.into_iter()
715 .map(row_to_event_record)
716 .filter(|result| {
717 result
718 .as_ref()
719 .is_ok_and(|event_record| matches_query(event_query, event_record))
720 })
721 .collect()
722}
723
724async fn fetch_candidate_rows(
725 pool: &SqlitePool,
726 candidate_event_types: Option<&[String]>,
727) -> Result<Vec<SqliteRow>, EventStoreError> {
728 let mut query_builder = QueryBuilder::<Sqlite>::new(
729 "SELECT sequence_number, occurred_at, event_type, payload FROM events",
730 );
731
732 if let Some(candidate_event_types) = candidate_event_types {
733 if candidate_event_types.is_empty() {
734 query_builder.push(" WHERE FALSE");
735 } else {
736 query_builder.push(" WHERE event_type IN (");
737
738 let mut separated = query_builder.separated(", ");
739 for event_type in candidate_event_types {
740 separated.push_bind(event_type);
741 }
742 separated.push_unseparated(")");
743 }
744 }
745
746 query_builder.push(" ORDER BY sequence_number ASC");
747
748 query_builder
749 .build()
750 .fetch_all(pool)
751 .await
752 .map_err(sqlx_backend_failure)
753}
754
755async fn fetch_candidate_rows_from_connection(
756 connection: &mut sqlx::SqliteConnection,
757 candidate_event_types: Option<&[String]>,
758) -> Result<Vec<SqliteRow>, EventStoreError> {
759 let mut query_builder = QueryBuilder::<Sqlite>::new(
760 "SELECT sequence_number, occurred_at, event_type, payload FROM events",
761 );
762
763 if let Some(candidate_event_types) = candidate_event_types {
764 if candidate_event_types.is_empty() {
765 query_builder.push(" WHERE FALSE");
766 } else {
767 query_builder.push(" WHERE event_type IN (");
768
769 let mut separated = query_builder.separated(", ");
770 for event_type in candidate_event_types {
771 separated.push_bind(event_type);
772 }
773 separated.push_unseparated(")");
774 }
775 }
776
777 query_builder.push(" ORDER BY sequence_number ASC");
778
779 query_builder
780 .build()
781 .fetch_all(connection)
782 .await
783 .map_err(sqlx_backend_failure)
784}
785
786fn event_type_candidates(event_query: &EventQuery) -> Option<Vec<String>> {
787 let filters = match &event_query.filters {
788 None => return None,
789 Some(filters) if filters.is_empty() => return None,
790 Some(filters) => filters,
791 };
792
793 if filters
794 .iter()
795 .any(|event_filter| event_filter.event_types.is_none())
796 {
797 return None;
798 }
799
800 let mut candidate_event_types = Vec::new();
801
802 for event_filter in filters {
803 if let Some(event_types) = &event_filter.event_types {
804 for event_type in event_types {
805 if !candidate_event_types
806 .iter()
807 .any(|known| known == event_type)
808 {
809 candidate_event_types.push(event_type.clone());
810 }
811 }
812 }
813 }
814
815 Some(candidate_event_types)
816}
817
818fn row_to_event_record(row: SqliteRow) -> Result<EventRecord, EventStoreError> {
819 let payload_text = row.get::<String, _>("payload");
820 let payload: Value = serde_json::from_str(&payload_text).map_err(json_backend_failure)?;
821
822 Ok(EventRecord {
823 sequence_number: row.get::<i64, _>("sequence_number") as u64,
824 occurred_at: OffsetDateTime::parse(&row.get::<String, _>("occurred_at"), &Rfc3339)
825 .map_err(time_backend_failure)?,
826 event_type: row.get::<String, _>("event_type"),
827 payload,
828 })
829}
830
831fn ensure_parent_directory(database_path: &Path) -> Result<(), io::Error> {
832 if let Some(parent_directory) = database_path.parent() {
833 if !parent_directory.as_os_str().is_empty() {
834 fs::create_dir_all(parent_directory)?;
835 }
836 }
837
838 Ok(())
839}
840
841fn sqlx_io_error(error: io::Error) -> sqlx::Error {
842 sqlx::Error::Io(error)
843}
844
845fn sqlx_backend_failure(error: sqlx::Error) -> EventStoreError {
846 EventStoreError::BackendFailure {
847 message: error.to_string(),
848 }
849}
850
851fn json_backend_failure(error: serde_json::Error) -> EventStoreError {
852 EventStoreError::BackendFailure {
853 message: error.to_string(),
854 }
855}
856
857fn time_backend_failure(error: time::error::Parse) -> EventStoreError {
858 EventStoreError::BackendFailure {
859 message: error.to_string(),
860 }
861}
862
863fn subscription_registry_backend_failure(message: String) -> EventStoreError {
864 EventStoreError::BackendFailure { message }
865}
866
867fn build_delivery_runtime(operation: &'static str) -> Result<Runtime, EventStoreError> {
868 Builder::new_current_thread()
869 .enable_all()
870 .build()
871 .map_err(sqlx_io_error)
872 .map_err(sqlx_backend_failure)
873 .map_err(|error| match error {
874 EventStoreError::BackendFailure { message } => EventStoreError::BackendFailure {
875 message: format!("{operation}: {message}"),
876 },
877 other => other,
878 })
879}
880
881async fn load_or_create_subscriber_cursor(
882 connection: &mut sqlx::SqliteConnection,
883 subscriber_id: &str,
884 event_query_json: &str,
885) -> Result<u64, EventStoreError> {
886 let existing_cursor = sqlx::query(
887 "SELECT event_query, last_processed_sequence_number
888 FROM subscriber_cursors
889 WHERE subscriber_id = ?1",
890 )
891 .bind(subscriber_id)
892 .fetch_optional(&mut *connection)
893 .await
894 .map_err(sqlx_backend_failure)?;
895
896 match existing_cursor {
897 Some(row) => {
898 let stored_event_query = row.get::<String, _>("event_query");
899 if stored_event_query != event_query_json {
900 return Err(EventStoreError::BackendFailure {
901 message: format!(
902 "durable subscriber {subscriber_id} was resumed with a different query"
903 ),
904 });
905 }
906
907 Ok(row.get::<i64, _>("last_processed_sequence_number") as u64)
908 }
909 None => {
910 sqlx::query(
911 "INSERT INTO subscriber_cursors (subscriber_id, event_query, last_processed_sequence_number)
912 VALUES (?1, ?2, 0)",
913 )
914 .bind(subscriber_id)
915 .bind(event_query_json)
916 .execute(&mut *connection)
917 .await
918 .map_err(sqlx_backend_failure)?;
919
920 Ok(0)
921 }
922 }
923}
924
925async fn current_max_sequence_number(
926 connection: &mut sqlx::SqliteConnection,
927) -> Result<u64, EventStoreError> {
928 Ok(
929 sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(sequence_number) FROM events")
930 .fetch_one(&mut *connection)
931 .await
932 .map_err(sqlx_backend_failure)?
933 .unwrap_or(0) as u64,
934 )
935}
936
937async fn ensure_replay_history_is_available(
938 connection: &mut sqlx::SqliteConnection,
939 current_max_sequence_number: u64,
940) -> Result<(), EventStoreError> {
941 if current_max_sequence_number == 0 {
942 return Ok(());
943 }
944
945 let batch_rows = sqlx::query(
946 "SELECT first_sequence_number, last_sequence_number
947 FROM append_batches
948 ORDER BY first_sequence_number ASC",
949 )
950 .fetch_all(&mut *connection)
951 .await
952 .map_err(sqlx_backend_failure)?;
953
954 if batch_rows.is_empty() {
955 return Err(EventStoreError::BackendFailure {
956 message: "durable replay requires append_batches history for all persisted events"
957 .to_owned(),
958 });
959 }
960
961 let mut expected_first_sequence_number = 1_u64;
962
963 for batch_row in batch_rows {
964 let first_sequence_number = batch_row.get::<i64, _>("first_sequence_number") as u64;
965 let last_sequence_number = batch_row.get::<i64, _>("last_sequence_number") as u64;
966
967 if first_sequence_number != expected_first_sequence_number
968 || last_sequence_number < first_sequence_number
969 {
970 return Err(EventStoreError::BackendFailure {
971 message:
972 "durable replay requires contiguous append_batches history for all persisted events"
973 .to_owned(),
974 });
975 }
976
977 expected_first_sequence_number = last_sequence_number + 1;
978 }
979
980 if expected_first_sequence_number - 1 != current_max_sequence_number {
981 return Err(EventStoreError::BackendFailure {
982 message: "durable replay requires append_batches history for all persisted events"
983 .to_owned(),
984 });
985 }
986
987 Ok(())
988}
989
990async fn ensure_replay_history_is_available_for_pool(
991 pool: &SqlitePool,
992 current_max_sequence_number: u64,
993) -> Result<(), EventStoreError> {
994 let mut connection = pool.acquire().await.map_err(sqlx_backend_failure)?;
995 ensure_replay_history_is_available(connection.as_mut(), current_max_sequence_number).await
996}
997
998async fn update_subscriber_cursor(
999 pool: &SqlitePool,
1000 subscriber_id: &str,
1001 last_processed_sequence_number: u64,
1002) -> Result<(), EventStoreError> {
1003 sqlx::query(
1004 "UPDATE subscriber_cursors
1005 SET last_processed_sequence_number = ?2
1006 WHERE subscriber_id = ?1",
1007 )
1008 .bind(subscriber_id)
1009 .bind(last_processed_sequence_number as i64)
1010 .execute(pool)
1011 .await
1012 .map_err(sqlx_backend_failure)?;
1013
1014 Ok(())
1015}
1016
1017async fn load_replay_batches(
1018 pool: &SqlitePool,
1019 event_query: &EventQuery,
1020 last_processed_sequence_number: u64,
1021 replay_until_sequence_number: u64,
1022) -> Result<Vec<ReplayBatch>, EventStoreError> {
1023 if replay_until_sequence_number <= last_processed_sequence_number {
1024 return Ok(Vec::new());
1025 }
1026
1027 let batch_rows = sqlx::query(
1028 "SELECT first_sequence_number, last_sequence_number
1029 FROM append_batches
1030 WHERE last_sequence_number > ?1
1031 AND first_sequence_number <= ?2
1032 ORDER BY first_sequence_number ASC",
1033 )
1034 .bind(last_processed_sequence_number as i64)
1035 .bind(replay_until_sequence_number as i64)
1036 .fetch_all(pool)
1037 .await
1038 .map_err(sqlx_backend_failure)?;
1039
1040 let mut replay_batches = Vec::new();
1041
1042 for batch_row in batch_rows {
1043 let first_sequence_number = batch_row.get::<i64, _>("first_sequence_number") as u64;
1044 let last_sequence_number = batch_row.get::<i64, _>("last_sequence_number") as u64;
1045 let event_rows = sqlx::query(
1046 "SELECT sequence_number, occurred_at, event_type, payload
1047 FROM events
1048 WHERE sequence_number >= ?1 AND sequence_number <= ?2
1049 ORDER BY sequence_number ASC",
1050 )
1051 .bind(first_sequence_number as i64)
1052 .bind(last_sequence_number as i64)
1053 .fetch_all(pool)
1054 .await
1055 .map_err(sqlx_backend_failure)?;
1056
1057 let delivered_batch = event_rows
1058 .into_iter()
1059 .map(row_to_event_record)
1060 .filter(|result| {
1061 result
1062 .as_ref()
1063 .is_ok_and(|event_record| matches_query(event_query, event_record))
1064 })
1065 .collect::<Result<Vec<_>, _>>()?;
1066
1067 replay_batches.push(ReplayBatch {
1068 last_processed_sequence_number: last_sequence_number,
1069 delivered_batch,
1070 });
1071 }
1072
1073 Ok(replay_batches)
1074}
1075
1076fn normalized_durable_event_query(event_query: &EventQuery) -> EventQuery {
1077 EventQuery {
1078 filters: event_query.filters.clone(),
1079 min_sequence_number: None,
1080 }
1081}
1082
1083fn serialize_event_query(event_query: &EventQuery) -> Result<String, EventStoreError> {
1084 let filters = event_query.filters.as_ref().map(|filters| {
1085 filters
1086 .iter()
1087 .map(|filter| {
1088 serde_json::json!({
1089 "event_types": filter.event_types,
1090 "payload_predicates": filter.payload_predicates,
1091 })
1092 })
1093 .collect::<Vec<_>>()
1094 });
1095
1096 serde_json::to_string(&serde_json::json!({
1097 "filters": filters,
1098 "min_sequence_number": event_query.min_sequence_number,
1099 }))
1100 .map_err(json_backend_failure)
1101}
1102
1103fn run_delivery_thread(
1104 pool: SqlitePool,
1105 subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
1106 delivery_receiver: Receiver<DeliveryCommand>,
1107) {
1108 let runtime = match Builder::new_current_thread().enable_all().build() {
1109 Ok(runtime) => runtime,
1110 Err(error) => {
1111 eprintln!("factstr-sqlite delivery runtime could not start: {}", error);
1112 return;
1113 }
1114 };
1115
1116 while let Ok(delivery_command) = delivery_receiver.recv() {
1117 match delivery_command {
1118 DeliveryCommand::Deliver(pending_deliveries) => {
1119 for pending_delivery in pending_deliveries {
1120 match pending_delivery.deliver(&runtime) {
1121 DeliveryOutcome::Succeeded {
1122 subscription_id,
1123 durable_subscriber_id,
1124 last_processed_sequence_number,
1125 } => {
1126 if let Some(durable_subscriber_id) = durable_subscriber_id {
1127 if let Err(error) = runtime.block_on(update_subscriber_cursor(
1128 &pool,
1129 &durable_subscriber_id,
1130 last_processed_sequence_number,
1131 )) {
1132 eprintln!(
1133 "factstr-sqlite durable cursor update failed after delivery for subscriber {}: {}",
1134 durable_subscriber_id, error
1135 );
1136 match subscription_registry.lock() {
1137 Ok(mut subscription_registry) => {
1138 subscription_registry.unsubscribe(subscription_id)
1139 }
1140 Err(poisoned) => {
1141 poisoned.into_inner().unsubscribe(subscription_id)
1142 }
1143 }
1144 }
1145 }
1146 }
1147 DeliveryOutcome::Failed {
1148 subscription_id,
1149 durable_subscriber_id,
1150 }
1151 | DeliveryOutcome::Panicked {
1152 subscription_id,
1153 durable_subscriber_id,
1154 } => {
1155 if durable_subscriber_id.is_some() {
1156 match subscription_registry.lock() {
1157 Ok(mut subscription_registry) => {
1158 subscription_registry.unsubscribe(subscription_id)
1159 }
1160 Err(poisoned) => {
1161 poisoned.into_inner().unsubscribe(subscription_id)
1162 }
1163 }
1164 }
1165 }
1166 }
1167 }
1168 }
1169 DeliveryCommand::Shutdown => break,
1170 }
1171 }
1172}