1use std::future::Future;
2use std::io;
3use std::sync::{
4 Arc, Mutex,
5 mpsc::{self, Receiver, Sender},
6};
7use std::thread::{self, JoinHandle};
8
9use factstr::{
10 AppendResult, DurableStream, EventQuery, EventRecord, EventStore, EventStoreError, EventStream,
11 HandleStream, NewEvent, QueryResult,
12};
13use sqlx::{
14 PgPool, Postgres, QueryBuilder, Row, Transaction,
15 postgres::{PgPoolOptions, PgRow},
16};
17use time::OffsetDateTime;
18use tokio::runtime::Builder;
19
20use crate::query_match::matches_query;
21use crate::query_sql::push_query_conditions;
22use crate::stream_registry::{DeliveryOutcome, PendingDelivery, SubscriptionRegistry};
23
24#[derive(Clone, Debug)]
25struct CommittedAppend {
26 append_result: AppendResult,
27 event_records: Vec<EventRecord>,
28}
29
30#[derive(Clone, Debug)]
31struct ReplayBatch {
32 last_processed_sequence_number: u64,
33 delivered_batch: Vec<EventRecord>,
34}
35
36#[derive(Clone, Debug)]
37struct DurableReplayState {
38 subscription_id: u64,
39 last_processed_sequence_number: u64,
40 replay_until_sequence_number: u64,
41}
42
43enum WorkerCommand {
44 Query {
45 event_query: EventQuery,
46 reply: Sender<Result<QueryResult, EventStoreError>>,
47 },
48 Append {
49 new_events: Vec<NewEvent>,
50 reply: Sender<Result<AppendResult, EventStoreError>>,
51 },
52 AppendIf {
53 new_events: Vec<NewEvent>,
54 context_query: EventQuery,
55 expected_context_version: Option<u64>,
56 reply: Sender<Result<AppendResult, EventStoreError>>,
57 },
58 Shutdown,
59}
60
61enum DeliveryCommand {
62 Deliver(Vec<PendingDelivery>),
63 Shutdown,
64}
65
66pub struct PostgresStore {
67 connection_string: String,
68 subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
69 worker_sender: Mutex<Sender<WorkerCommand>>,
70 worker_thread: Mutex<Option<JoinHandle<()>>>,
71 delivery_sender: Sender<DeliveryCommand>,
72 delivery_thread: Mutex<Option<JoinHandle<()>>>,
73}
74
75impl PostgresStore {
76 pub fn connect(connection_string: &str) -> Result<Self, sqlx::Error> {
77 let connection_string = connection_string.to_owned();
78 bootstrap_connection(&connection_string)?;
79
80 let subscription_registry = Arc::new(Mutex::new(SubscriptionRegistry::default()));
81 let (delivery_sender, delivery_receiver) = mpsc::channel();
82 let delivery_thread = thread::Builder::new()
83 .name("factstr-postgres-delivery".to_owned())
84 .spawn({
85 let connection_string = connection_string.clone();
86 let subscription_registry = Arc::clone(&subscription_registry);
87 move || {
88 run_delivery_thread(connection_string, subscription_registry, delivery_receiver)
89 }
90 })
91 .map_err(sqlx_io_error)?;
92
93 let (worker_sender, worker_receiver) = mpsc::channel();
94 let (ready_sender, ready_receiver) = mpsc::sync_channel(1);
95 let worker_thread = thread::Builder::new()
96 .name("factstr-postgres-worker".to_owned())
97 .spawn({
98 let connection_string = connection_string.clone();
99 let subscription_registry = Arc::clone(&subscription_registry);
100 let delivery_sender = delivery_sender.clone();
101 move || {
102 run_worker_thread(
103 connection_string,
104 worker_receiver,
105 ready_sender,
106 subscription_registry,
107 delivery_sender,
108 )
109 }
110 })
111 .map_err(sqlx_io_error)?;
112
113 match ready_receiver.recv() {
114 Ok(Ok(())) => {}
115 Ok(Err(error)) => {
116 let _ = worker_thread.join();
117 let _ = delivery_sender.send(DeliveryCommand::Shutdown);
118 let _ = delivery_thread.join();
119 return Err(error);
120 }
121 Err(error) => {
122 let _ = worker_thread.join();
123 let _ = delivery_sender.send(DeliveryCommand::Shutdown);
124 let _ = delivery_thread.join();
125 return Err(sqlx_io_error(io::Error::other(format!(
126 "postgres worker startup channel failed: {error}"
127 ))));
128 }
129 }
130
131 Ok(Self {
132 connection_string,
133 subscription_registry,
134 worker_sender: Mutex::new(worker_sender),
135 worker_thread: Mutex::new(Some(worker_thread)),
136 delivery_sender,
137 delivery_thread: Mutex::new(Some(delivery_thread)),
138 })
139 }
140
141 fn backend_failure(error: sqlx::Error) -> EventStoreError {
142 EventStoreError::BackendFailure {
143 message: error.to_string(),
144 }
145 }
146
147 fn worker_failure(message: impl Into<String>) -> EventStoreError {
148 EventStoreError::BackendFailure {
149 message: message.into(),
150 }
151 }
152
153 fn send_command(&self, worker_command: WorkerCommand) -> Result<(), EventStoreError> {
154 let worker_sender = self
155 .worker_sender
156 .lock()
157 .map_err(|_| Self::worker_failure("postgres worker sender lock poisoned"))?;
158
159 worker_sender
160 .send(worker_command)
161 .map_err(|error| Self::worker_failure(format!("postgres worker stopped: {error}")))
162 }
163
164 fn run_query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
165 let (reply_sender, reply_receiver) = mpsc::channel();
166 self.send_command(WorkerCommand::Query {
167 event_query: event_query.clone(),
168 reply: reply_sender,
169 })?;
170
171 reply_receiver.recv().map_err(|error| {
172 Self::worker_failure(format!("postgres worker query reply failed: {error}"))
173 })?
174 }
175
176 fn run_append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
177 let (reply_sender, reply_receiver) = mpsc::channel();
178 self.send_command(WorkerCommand::Append {
179 new_events,
180 reply: reply_sender,
181 })?;
182
183 reply_receiver.recv().map_err(|error| {
184 Self::worker_failure(format!("postgres worker append reply failed: {error}"))
185 })?
186 }
187
188 fn run_append_if(
189 &self,
190 new_events: Vec<NewEvent>,
191 context_query: &EventQuery,
192 expected_context_version: Option<u64>,
193 ) -> Result<AppendResult, EventStoreError> {
194 let (reply_sender, reply_receiver) = mpsc::channel();
195 self.send_command(WorkerCommand::AppendIf {
196 new_events,
197 context_query: context_query.clone(),
198 expected_context_version,
199 reply: reply_sender,
200 })?;
201
202 reply_receiver.recv().map_err(|error| {
203 Self::worker_failure(format!(
204 "postgres worker conditional append reply failed: {error}"
205 ))
206 })?
207 }
208
209 fn run_async<T, Fut, F>(&self, operation: &'static str, work: F) -> Result<T, EventStoreError>
210 where
211 T: Send + 'static,
212 Fut: Future<Output = Result<T, EventStoreError>> + Send + 'static,
213 F: FnOnce(PgPool) -> Fut + Send + 'static,
214 {
215 let connection_string = self.connection_string.clone();
216 let (result_sender, result_receiver) = mpsc::sync_channel(1);
217
218 let worker_thread = thread::Builder::new()
219 .name(format!("factstr-postgres-{operation}"))
220 .spawn(move || {
221 let runtime = Builder::new_current_thread()
222 .enable_all()
223 .build()
224 .map_err(sqlx_io_error)
225 .map_err(Self::backend_failure);
226
227 let result = match runtime {
228 Ok(runtime) => runtime.block_on(async {
229 let pool = PgPoolOptions::new()
230 .max_connections(1)
231 .connect(&connection_string)
232 .await
233 .map_err(Self::backend_failure)?;
234 work(pool).await
235 }),
236 Err(error) => Err(error),
237 };
238
239 let _ = result_sender.send(result);
240 })
241 .map_err(sqlx_io_error)
242 .map_err(Self::backend_failure)?;
243
244 let result = result_receiver
245 .recv()
246 .map_err(|error| EventStoreError::BackendFailure {
247 message: format!("postgres {operation} thread did not return a result: {error}"),
248 })?;
249
250 worker_thread
251 .join()
252 .map_err(|_| EventStoreError::BackendFailure {
253 message: format!("postgres {operation} thread panicked before completion"),
254 })?;
255
256 result
257 }
258
259 fn enqueue_delivery(&self, pending_deliveries: Vec<PendingDelivery>) {
260 Self::enqueue_delivery_with_sender(&self.delivery_sender, pending_deliveries);
261 }
262
263 fn enqueue_delivery_with_sender(
264 delivery_sender: &Sender<DeliveryCommand>,
265 pending_deliveries: Vec<PendingDelivery>,
266 ) {
267 if pending_deliveries.is_empty() {
268 return;
269 }
270
271 if let Err(error) = delivery_sender.send(DeliveryCommand::Deliver(pending_deliveries)) {
272 eprintln!(
273 "factstr-postgres delivery dispatcher stopped after commit: {}",
274 error
275 );
276 }
277 }
278
279 fn register_all_durable_stream(
280 &self,
281 durable_stream_id: impl Into<String>,
282 handle: HandleStream,
283 ) -> Result<EventStream, EventStoreError> {
284 self.stream_durable(durable_stream_id.into(), EventQuery::all(), handle)
285 }
286
287 fn register_durable_stream(
288 &self,
289 durable_stream_id: impl Into<String>,
290 event_query: &EventQuery,
291 handle: HandleStream,
292 ) -> Result<EventStream, EventStoreError> {
293 self.stream_durable(durable_stream_id.into(), event_query.clone(), handle)
294 }
295
296 fn stream_durable(
297 &self,
298 durable_stream_id: String,
299 event_query: EventQuery,
300 handle: HandleStream,
301 ) -> Result<EventStream, EventStoreError> {
302 let normalized_event_query = normalized_durable_event_query(&event_query);
303 let event_query_json = serialize_event_query(&normalized_event_query)?;
304 let subscription_registry = Arc::clone(&self.subscription_registry);
305 let durable_stream_id_for_registry = durable_stream_id.clone();
306 let normalized_event_query_for_registry = normalized_event_query.clone();
307 let handle_for_registry = handle.clone();
308 let replay_stream_id = durable_stream_id.clone();
309
310 let replay_state = self.run_async("stream_durable", move |pool| async move {
311 let mut transaction = pool.begin().await.map_err(Self::backend_failure)?;
312
313 let last_processed_sequence_number = load_or_create_durable_stream_cursor(
314 &mut transaction,
315 &durable_stream_id_for_registry,
316 &event_query_json,
317 )
318 .await?;
319 let replay_until_sequence_number =
320 current_max_sequence_number_in_transaction(&mut transaction).await?;
321
322 let subscription_id = match subscription_registry.lock() {
323 Ok(mut subscription_registry) => {
324 if normalized_event_query_for_registry.filters.is_none() {
325 subscription_registry
326 .subscribe_all_durable(
327 durable_stream_id_for_registry.clone(),
328 replay_until_sequence_number,
329 handle_for_registry.clone(),
330 )
331 .map_err(subscription_registry_backend_failure)?
332 } else {
333 subscription_registry
334 .subscribe_to_durable(
335 durable_stream_id_for_registry.clone(),
336 Some(normalized_event_query_for_registry.clone()),
337 replay_until_sequence_number,
338 handle_for_registry.clone(),
339 )
340 .map_err(subscription_registry_backend_failure)?
341 }
342 }
343 Err(poisoned) => {
344 let mut subscription_registry = poisoned.into_inner();
345 if normalized_event_query_for_registry.filters.is_none() {
346 subscription_registry
347 .subscribe_all_durable(
348 durable_stream_id_for_registry.clone(),
349 replay_until_sequence_number,
350 handle_for_registry.clone(),
351 )
352 .map_err(subscription_registry_backend_failure)?
353 } else {
354 subscription_registry
355 .subscribe_to_durable(
356 durable_stream_id_for_registry.clone(),
357 Some(normalized_event_query_for_registry.clone()),
358 replay_until_sequence_number,
359 handle_for_registry.clone(),
360 )
361 .map_err(subscription_registry_backend_failure)?
362 }
363 }
364 };
365
366 if let Err(error) = transaction.commit().await {
367 match subscription_registry.lock() {
368 Ok(mut subscription_registry) => {
369 subscription_registry.unsubscribe(subscription_id)
370 }
371 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
372 }
373 return Err(Self::backend_failure(error));
374 }
375
376 Ok(DurableReplayState {
377 subscription_id,
378 last_processed_sequence_number,
379 replay_until_sequence_number,
380 })
381 })?;
382
383 let subscription = self.build_subscription_handle(replay_state.subscription_id);
384
385 let replay_batches = match self.run_async("durable_replay", {
386 let event_query = normalized_event_query.clone();
387 move |pool| async move {
388 ensure_replay_history_is_available_for_pool(
389 &pool,
390 replay_state.replay_until_sequence_number,
391 )
392 .await?;
393 load_replay_batches(
394 &pool,
395 &event_query,
396 replay_state.last_processed_sequence_number,
397 replay_state.replay_until_sequence_number,
398 )
399 .await
400 }
401 }) {
402 Ok(replay_batches) => replay_batches,
403 Err(error) => {
404 self.cleanup_durable_subscription(replay_state.subscription_id);
405 return Err(error);
406 }
407 };
408
409 for replay_batch in replay_batches {
410 let pending_delivery = PendingDelivery {
411 subscription_id: replay_state.subscription_id,
412 durable_stream_id: Some(replay_stream_id.clone()),
413 last_processed_sequence_number: replay_batch.last_processed_sequence_number,
414 delivered_batch: replay_batch.delivered_batch,
415 handle: handle.clone(),
416 };
417
418 match self.process_durable_delivery(pending_delivery) {
419 Ok(true) => {}
420 Ok(false) => {
421 self.cleanup_durable_subscription(replay_state.subscription_id);
422 return Err(EventStoreError::BackendFailure {
423 message: format!(
424 "durable replay for stream {} did not complete successfully",
425 replay_stream_id
426 ),
427 });
428 }
429 Err(error) => {
430 self.cleanup_durable_subscription(replay_state.subscription_id);
431 return Err(error);
432 }
433 }
434 }
435
436 self.finish_durable_replay(replay_state.subscription_id);
437 Ok(subscription)
438 }
439
440 fn build_subscription_handle(&self, subscription_id: u64) -> EventStream {
441 let subscription_registry = Arc::clone(&self.subscription_registry);
442
443 EventStream::new(
444 subscription_id,
445 Arc::new(move |subscription_id| match subscription_registry.lock() {
446 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
447 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
448 }),
449 )
450 }
451
452 fn process_durable_delivery(
453 &self,
454 pending_delivery: PendingDelivery,
455 ) -> Result<bool, EventStoreError> {
456 match pending_delivery.deliver() {
457 DeliveryOutcome::Succeeded {
458 durable_stream_id,
459 last_processed_sequence_number,
460 ..
461 } => {
462 if let Some(durable_stream_id) = durable_stream_id {
463 self.run_async("advance_durable_cursor", move |pool| async move {
464 update_durable_stream_cursor(
465 &pool,
466 &durable_stream_id,
467 last_processed_sequence_number,
468 )
469 .await
470 })?;
471 }
472
473 Ok(true)
474 }
475 DeliveryOutcome::Failed { .. } | DeliveryOutcome::Panicked { .. } => Ok(false),
476 }
477 }
478
479 fn finish_durable_replay(&self, subscription_id: u64) {
480 match self.subscription_registry.lock() {
481 Ok(mut subscription_registry) => {
482 let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
483 self.enqueue_delivery(buffered_deliveries);
484 }
485 Err(poisoned) => {
486 let mut subscription_registry = poisoned.into_inner();
487 let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
488 self.enqueue_delivery(buffered_deliveries);
489 }
490 }
491 }
492
493 fn cleanup_durable_subscription(&self, subscription_id: u64) {
494 match self.subscription_registry.lock() {
495 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
496 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
497 }
498 }
499}
500
501impl Drop for PostgresStore {
502 fn drop(&mut self) {
503 if let Ok(worker_sender) = self.worker_sender.lock() {
504 let _ = worker_sender.send(WorkerCommand::Shutdown);
505 }
506
507 if let Ok(mut worker_thread) = self.worker_thread.lock() {
508 if let Some(worker_thread) = worker_thread.take() {
509 let _ = worker_thread.join();
510 }
511 }
512
513 let _ = self.delivery_sender.send(DeliveryCommand::Shutdown);
514
515 if let Ok(mut delivery_thread) = self.delivery_thread.lock() {
516 if let Some(delivery_thread) = delivery_thread.take() {
517 let _ = delivery_thread.join();
518 }
519 }
520 }
521}
522
523impl EventStore for PostgresStore {
524 fn query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
525 self.run_query(event_query)
526 }
527
528 fn append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
529 if new_events.is_empty() {
530 return Err(EventStoreError::EmptyAppend);
531 }
532
533 self.run_append(new_events)
534 }
535
536 fn append_if(
537 &self,
538 new_events: Vec<NewEvent>,
539 context_query: &EventQuery,
540 expected_context_version: Option<u64>,
541 ) -> Result<AppendResult, EventStoreError> {
542 if new_events.is_empty() {
543 return Err(EventStoreError::EmptyAppend);
544 }
545
546 self.run_append_if(new_events, context_query, expected_context_version)
547 }
548
549 fn stream_all(&self, handle: HandleStream) -> Result<EventStream, EventStoreError> {
550 let subscription_registry = Arc::clone(&self.subscription_registry);
551 let id = match subscription_registry.lock() {
552 Ok(mut subscription_registry) => subscription_registry.subscribe_all(handle),
553 Err(poisoned) => poisoned.into_inner().subscribe_all(handle),
554 };
555
556 Ok(EventStream::new(
557 id,
558 Arc::new(move |subscription_id| match subscription_registry.lock() {
559 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
560 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
561 }),
562 ))
563 }
564
565 fn stream_to(
566 &self,
567 event_query: &EventQuery,
568 handle: HandleStream,
569 ) -> Result<EventStream, EventStoreError> {
570 let subscription_registry = Arc::clone(&self.subscription_registry);
571 let id = match subscription_registry.lock() {
572 Ok(mut subscription_registry) => {
573 subscription_registry.subscribe_to(Some(event_query.clone()), handle)
574 }
575 Err(poisoned) => poisoned
576 .into_inner()
577 .subscribe_to(Some(event_query.clone()), handle),
578 };
579
580 Ok(EventStream::new(
581 id,
582 Arc::new(move |subscription_id| match subscription_registry.lock() {
583 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
584 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
585 }),
586 ))
587 }
588
589 fn stream_all_durable(
590 &self,
591 durable_stream: &DurableStream,
592 handle: HandleStream,
593 ) -> Result<EventStream, EventStoreError> {
594 self.register_all_durable_stream(durable_stream.name(), handle)
595 }
596
597 fn stream_to_durable(
598 &self,
599 durable_stream: &DurableStream,
600 event_query: &EventQuery,
601 handle: HandleStream,
602 ) -> Result<EventStream, EventStoreError> {
603 self.register_durable_stream(durable_stream.name(), event_query, handle)
604 }
605}
606
607fn run_worker_thread(
608 connection_string: String,
609 worker_receiver: Receiver<WorkerCommand>,
610 ready_sender: mpsc::SyncSender<Result<(), sqlx::Error>>,
611 subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
612 delivery_sender: Sender<DeliveryCommand>,
613) {
614 let runtime = match Builder::new_current_thread().enable_all().build() {
615 Ok(runtime) => runtime,
616 Err(error) => {
617 let _ = ready_sender.send(Err(sqlx_io_error(io::Error::other(format!(
618 "tokio runtime should build: {error}"
619 )))));
620 return;
621 }
622 };
623
624 let pool = match runtime.block_on(async {
625 PgPoolOptions::new()
626 .max_connections(1)
627 .connect(&connection_string)
628 .await
629 }) {
630 Ok(pool) => pool,
631 Err(error) => {
632 let _ = ready_sender.send(Err(error));
633 return;
634 }
635 };
636
637 if ready_sender.send(Ok(())).is_err() {
638 return;
639 }
640
641 while let Ok(worker_command) = worker_receiver.recv() {
642 match worker_command {
643 WorkerCommand::Query { event_query, reply } => {
644 let result = runtime
645 .block_on(query_with_pool(&pool, &event_query))
646 .map_err(PostgresStore::backend_failure);
647 let _ = reply.send(result);
648 }
649 WorkerCommand::Append { new_events, reply } => {
650 let result = runtime
651 .block_on(append_with_pool(&pool, new_events))
652 .map_err(PostgresStore::backend_failure);
653 if let Ok(committed_append) = &result {
654 let pending_deliveries = match subscription_registry.lock() {
655 Ok(mut subscription_registry) => subscription_registry
656 .pending_deliveries(&committed_append.event_records),
657 Err(poisoned) => poisoned
658 .into_inner()
659 .pending_deliveries(&committed_append.event_records),
660 };
661 PostgresStore::enqueue_delivery_with_sender(
662 &delivery_sender,
663 pending_deliveries,
664 );
665 }
666 let result = result.map(|committed_append| committed_append.append_result);
667 let _ = reply.send(result);
668 }
669 WorkerCommand::AppendIf {
670 new_events,
671 context_query,
672 expected_context_version,
673 reply,
674 } => {
675 let result = runtime
676 .block_on(append_if_with_pool(
677 &pool,
678 new_events,
679 &context_query,
680 expected_context_version,
681 ))
682 .map_err(PostgresStore::backend_failure)
683 .and_then(|result| result);
684 if let Ok(committed_append) = &result {
685 let pending_deliveries = match subscription_registry.lock() {
686 Ok(mut subscription_registry) => subscription_registry
687 .pending_deliveries(&committed_append.event_records),
688 Err(poisoned) => poisoned
689 .into_inner()
690 .pending_deliveries(&committed_append.event_records),
691 };
692 PostgresStore::enqueue_delivery_with_sender(
693 &delivery_sender,
694 pending_deliveries,
695 );
696 }
697 let result = result.map(|committed_append| committed_append.append_result);
698 let _ = reply.send(result);
699 }
700 WorkerCommand::Shutdown => break,
701 }
702 }
703}
704
705fn run_delivery_thread(
706 connection_string: String,
707 subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
708 delivery_receiver: Receiver<DeliveryCommand>,
709) {
710 let runtime = match Builder::new_current_thread().enable_all().build() {
711 Ok(runtime) => runtime,
712 Err(error) => {
713 eprintln!(
714 "factstr-postgres delivery runtime could not start: {}",
715 error
716 );
717 return;
718 }
719 };
720
721 let pool = match runtime.block_on(async {
722 PgPoolOptions::new()
723 .max_connections(1)
724 .connect(&connection_string)
725 .await
726 }) {
727 Ok(pool) => pool,
728 Err(error) => {
729 eprintln!(
730 "factstr-postgres delivery pool could not connect: {}",
731 error
732 );
733 return;
734 }
735 };
736
737 while let Ok(delivery_command) = delivery_receiver.recv() {
738 match delivery_command {
739 DeliveryCommand::Deliver(pending_deliveries) => {
740 for pending_delivery in pending_deliveries {
741 match pending_delivery.deliver() {
742 DeliveryOutcome::Succeeded {
743 subscription_id,
744 durable_stream_id,
745 last_processed_sequence_number,
746 } => {
747 if let Some(durable_stream_id) = durable_stream_id {
748 if let Err(error) = runtime.block_on(update_durable_stream_cursor(
749 &pool,
750 &durable_stream_id,
751 last_processed_sequence_number,
752 )) {
753 eprintln!(
754 "factstr-postgres durable cursor update failed after delivery for stream {}: {}",
755 durable_stream_id, error
756 );
757 match subscription_registry.lock() {
758 Ok(mut subscription_registry) => {
759 subscription_registry.unsubscribe(subscription_id)
760 }
761 Err(poisoned) => {
762 poisoned.into_inner().unsubscribe(subscription_id)
763 }
764 }
765 }
766 }
767 }
768 DeliveryOutcome::Failed {
769 subscription_id,
770 durable_stream_id,
771 }
772 | DeliveryOutcome::Panicked {
773 subscription_id,
774 durable_stream_id,
775 } => {
776 if durable_stream_id.is_some() {
777 match subscription_registry.lock() {
778 Ok(mut subscription_registry) => {
779 subscription_registry.unsubscribe(subscription_id)
780 }
781 Err(poisoned) => {
782 poisoned.into_inner().unsubscribe(subscription_id)
783 }
784 }
785 }
786 }
787 }
788 }
789 }
790 DeliveryCommand::Shutdown => break,
791 }
792 }
793}
794
795fn sqlx_io_error(error: io::Error) -> sqlx::Error {
796 sqlx::Error::Io(error)
797}
798
799fn bootstrap_connection(connection_string: &str) -> Result<(), sqlx::Error> {
800 let connection_string = connection_string.to_owned();
801 let (result_sender, result_receiver) = mpsc::sync_channel(1);
802
803 let bootstrap_thread = thread::Builder::new()
804 .name("factstr-postgres-bootstrap".to_owned())
805 .spawn(move || {
806 let runtime = Builder::new_current_thread()
807 .enable_all()
808 .build()
809 .map_err(sqlx_io_error);
810
811 let result = match runtime {
812 Ok(runtime) => runtime.block_on(async {
813 let pool = PgPoolOptions::new()
814 .max_connections(1)
815 .connect(&connection_string)
816 .await?;
817 initialize_schema(&pool).await?;
818 Ok::<_, sqlx::Error>(())
819 }),
820 Err(error) => Err(error),
821 };
822
823 let _ = result_sender.send(result);
824 })
825 .map_err(sqlx_io_error)?;
826
827 let result = match result_receiver.recv() {
828 Ok(result) => result,
829 Err(error) => Err(sqlx_io_error(io::Error::other(format!(
830 "postgres bootstrap thread did not return a result: {error}"
831 )))),
832 };
833
834 bootstrap_thread.join().map_err(|_| {
835 sqlx_io_error(io::Error::other(
836 "postgres bootstrap thread panicked before connect completed",
837 ))
838 })?;
839
840 result
841}
842
843async fn query_with_pool(
844 pool: &PgPool,
845 event_query: &EventQuery,
846) -> Result<QueryResult, sqlx::Error> {
847 let current_context_version = current_context_version(pool, event_query).await?;
848
849 let mut query_builder: QueryBuilder<'_, Postgres> =
850 QueryBuilder::new("SELECT sequence_number, occurred_at, event_type, payload FROM events");
851 push_query_conditions(&mut query_builder, event_query, true);
852 query_builder.push(" ORDER BY sequence_number ASC");
853
854 let event_records = query_builder
855 .build()
856 .fetch_all(pool)
857 .await?
858 .into_iter()
859 .map(event_record_from_row)
860 .collect::<Vec<_>>();
861
862 let last_returned_sequence_number = event_records
863 .last()
864 .map(|event_record| event_record.sequence_number);
865
866 Ok(QueryResult {
867 event_records,
868 last_returned_sequence_number,
869 current_context_version,
870 })
871}
872
873async fn append_with_pool(
874 pool: &PgPool,
875 new_events: Vec<NewEvent>,
876) -> Result<CommittedAppend, sqlx::Error> {
877 let mut transaction = pool.begin().await?;
878 lock_events_tables(&mut transaction).await?;
879 let committed_append = append_records(&mut transaction, new_events).await?;
880 transaction.commit().await?;
881 Ok(committed_append)
882}
883
884async fn append_if_with_pool(
885 pool: &PgPool,
886 new_events: Vec<NewEvent>,
887 context_query: &EventQuery,
888 expected_context_version: Option<u64>,
889) -> Result<Result<CommittedAppend, EventStoreError>, sqlx::Error> {
890 let mut transaction = pool.begin().await?;
891 lock_events_tables(&mut transaction).await?;
892
893 let actual_context_version =
894 current_context_version_in_transaction(&mut transaction, context_query).await?;
895
896 if actual_context_version != expected_context_version {
897 transaction.rollback().await?;
898 return Ok(Err(EventStoreError::ConditionalAppendConflict {
899 expected: expected_context_version,
900 actual: actual_context_version,
901 }));
902 }
903
904 let committed_append = append_records(&mut transaction, new_events).await?;
905 transaction.commit().await?;
906
907 Ok(Ok(committed_append))
908}
909
910async fn initialize_schema(pool: &PgPool) -> Result<(), sqlx::Error> {
911 sqlx::query(
912 "CREATE TABLE IF NOT EXISTS events (
913 sequence_number BIGINT PRIMARY KEY,
914 occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
915 event_type TEXT NOT NULL,
916 payload JSONB NOT NULL
917 )",
918 )
919 .execute(pool)
920 .await?;
921
922 sqlx::query(
923 "CREATE TABLE IF NOT EXISTS append_batches (
924 first_sequence_number BIGINT PRIMARY KEY,
925 last_sequence_number BIGINT NOT NULL
926 )",
927 )
928 .execute(pool)
929 .await?;
930
931 sqlx::query(
932 "CREATE TABLE IF NOT EXISTS durable_stream_cursors (
933 durable_stream_id TEXT PRIMARY KEY,
934 event_query TEXT NOT NULL,
935 last_processed_sequence_number BIGINT NOT NULL
936 )",
937 )
938 .execute(pool)
939 .await?;
940
941 sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type)")
942 .execute(pool)
943 .await?;
944
945 sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_occurred_at ON events(occurred_at)")
946 .execute(pool)
947 .await?;
948
949 sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_payload_gin ON events USING gin(payload)")
950 .execute(pool)
951 .await?;
952
953 Ok(())
954}
955
956async fn lock_events_tables(
957 transaction: &mut Transaction<'_, Postgres>,
958) -> Result<(), sqlx::Error> {
959 sqlx::query("LOCK TABLE events, append_batches IN EXCLUSIVE MODE")
960 .execute(transaction.as_mut())
961 .await?;
962
963 Ok(())
964}
965
966async fn append_records(
967 transaction: &mut Transaction<'_, Postgres>,
968 new_events: Vec<NewEvent>,
969) -> Result<CommittedAppend, sqlx::Error> {
970 let committed_count = new_events.len() as u64;
971 let first_sequence_number =
972 sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(sequence_number), 0) + 1 FROM events")
973 .fetch_one(transaction.as_mut())
974 .await? as u64;
975 let last_sequence_number = first_sequence_number + committed_count - 1;
976 let committed_event_records = new_events
977 .into_iter()
978 .enumerate()
979 .map(|(offset, new_event)| EventRecord {
980 sequence_number: first_sequence_number + offset as u64,
981 occurred_at: OffsetDateTime::now_utc(),
982 event_type: new_event.event_type,
983 payload: new_event.payload,
984 })
985 .collect::<Vec<_>>();
986
987 for event_record in &committed_event_records {
988 sqlx::query(
989 "INSERT INTO events (sequence_number, occurred_at, event_type, payload)
990 VALUES ($1, $2, $3, $4)",
991 )
992 .bind(event_record.sequence_number as i64)
993 .bind(event_record.occurred_at)
994 .bind(&event_record.event_type)
995 .bind(&event_record.payload)
996 .execute(transaction.as_mut())
997 .await?;
998 }
999
1000 sqlx::query(
1001 "INSERT INTO append_batches (first_sequence_number, last_sequence_number)
1002 VALUES ($1, $2)",
1003 )
1004 .bind(first_sequence_number as i64)
1005 .bind(last_sequence_number as i64)
1006 .execute(transaction.as_mut())
1007 .await?;
1008
1009 Ok(CommittedAppend {
1010 append_result: AppendResult {
1011 first_sequence_number,
1012 last_sequence_number,
1013 committed_count,
1014 },
1015 event_records: committed_event_records,
1016 })
1017}
1018
1019async fn current_context_version(
1020 pool: &PgPool,
1021 event_query: &EventQuery,
1022) -> Result<Option<u64>, sqlx::Error> {
1023 let mut query_builder: QueryBuilder<'_, Postgres> =
1024 QueryBuilder::new("SELECT MAX(sequence_number) FROM events");
1025 push_query_conditions(&mut query_builder, event_query, false);
1026
1027 Ok(query_builder
1028 .build_query_scalar::<Option<i64>>()
1029 .fetch_one(pool)
1030 .await?
1031 .map(|sequence_number| sequence_number as u64))
1032}
1033
1034async fn current_context_version_in_transaction(
1035 transaction: &mut Transaction<'_, Postgres>,
1036 event_query: &EventQuery,
1037) -> Result<Option<u64>, sqlx::Error> {
1038 let mut query_builder: QueryBuilder<'_, Postgres> =
1039 QueryBuilder::new("SELECT MAX(sequence_number) FROM events");
1040 push_query_conditions(&mut query_builder, event_query, false);
1041
1042 Ok(query_builder
1043 .build_query_scalar::<Option<i64>>()
1044 .fetch_one(transaction.as_mut())
1045 .await?
1046 .map(|sequence_number| sequence_number as u64))
1047}
1048
1049fn event_record_from_row(row: PgRow) -> EventRecord {
1050 EventRecord {
1051 sequence_number: row.get::<i64, _>("sequence_number") as u64,
1052 occurred_at: row.get("occurred_at"),
1053 event_type: row.get("event_type"),
1054 payload: row.get("payload"),
1055 }
1056}
1057
1058fn json_backend_failure(error: serde_json::Error) -> EventStoreError {
1059 EventStoreError::BackendFailure {
1060 message: error.to_string(),
1061 }
1062}
1063
1064fn subscription_registry_backend_failure(message: String) -> EventStoreError {
1065 EventStoreError::BackendFailure { message }
1066}
1067
1068async fn load_or_create_durable_stream_cursor(
1069 transaction: &mut Transaction<'_, Postgres>,
1070 durable_stream_id: &str,
1071 event_query_json: &str,
1072) -> Result<u64, EventStoreError> {
1073 let existing_cursor = sqlx::query(
1074 "SELECT event_query, last_processed_sequence_number
1075 FROM durable_stream_cursors
1076 WHERE durable_stream_id = $1",
1077 )
1078 .bind(durable_stream_id)
1079 .fetch_optional(transaction.as_mut())
1080 .await
1081 .map_err(PostgresStore::backend_failure)?;
1082
1083 match existing_cursor {
1084 Some(row) => {
1085 let stored_event_query = row.get::<String, _>("event_query");
1086 if stored_event_query != event_query_json {
1087 return Err(EventStoreError::BackendFailure {
1088 message: format!(
1089 "durable stream {durable_stream_id} was resumed with a different query"
1090 ),
1091 });
1092 }
1093
1094 Ok(row.get::<i64, _>("last_processed_sequence_number") as u64)
1095 }
1096 None => {
1097 sqlx::query(
1098 "INSERT INTO durable_stream_cursors (
1099 durable_stream_id,
1100 event_query,
1101 last_processed_sequence_number
1102 ) VALUES ($1, $2, 0)",
1103 )
1104 .bind(durable_stream_id)
1105 .bind(event_query_json)
1106 .execute(transaction.as_mut())
1107 .await
1108 .map_err(PostgresStore::backend_failure)?;
1109
1110 Ok(0)
1111 }
1112 }
1113}
1114
1115async fn current_max_sequence_number_in_transaction(
1116 transaction: &mut Transaction<'_, Postgres>,
1117) -> Result<u64, EventStoreError> {
1118 Ok(
1119 sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(sequence_number) FROM events")
1120 .fetch_one(transaction.as_mut())
1121 .await
1122 .map_err(PostgresStore::backend_failure)?
1123 .unwrap_or(0) as u64,
1124 )
1125}
1126
1127async fn ensure_replay_history_is_available(
1128 connection: &mut sqlx::PgConnection,
1129 current_max_sequence_number: u64,
1130) -> Result<(), EventStoreError> {
1131 if current_max_sequence_number == 0 {
1132 return Ok(());
1133 }
1134
1135 let batch_rows = sqlx::query(
1136 "SELECT first_sequence_number, last_sequence_number
1137 FROM append_batches
1138 ORDER BY first_sequence_number ASC",
1139 )
1140 .fetch_all(&mut *connection)
1141 .await
1142 .map_err(PostgresStore::backend_failure)?;
1143
1144 if batch_rows.is_empty() {
1145 return Err(EventStoreError::BackendFailure {
1146 message: "durable replay requires append_batches history for all persisted events"
1147 .to_owned(),
1148 });
1149 }
1150
1151 let mut expected_first_sequence_number = 1_u64;
1152
1153 for batch_row in batch_rows {
1154 let first_sequence_number = batch_row.get::<i64, _>("first_sequence_number") as u64;
1155 let last_sequence_number = batch_row.get::<i64, _>("last_sequence_number") as u64;
1156
1157 if first_sequence_number != expected_first_sequence_number
1158 || last_sequence_number < first_sequence_number
1159 {
1160 return Err(EventStoreError::BackendFailure {
1161 message:
1162 "durable replay requires contiguous append_batches history for all persisted events"
1163 .to_owned(),
1164 });
1165 }
1166
1167 expected_first_sequence_number = last_sequence_number + 1;
1168 }
1169
1170 if expected_first_sequence_number - 1 != current_max_sequence_number {
1171 return Err(EventStoreError::BackendFailure {
1172 message: "durable replay requires append_batches history for all persisted events"
1173 .to_owned(),
1174 });
1175 }
1176
1177 Ok(())
1178}
1179
1180async fn ensure_replay_history_is_available_for_pool(
1181 pool: &PgPool,
1182 current_max_sequence_number: u64,
1183) -> Result<(), EventStoreError> {
1184 let mut connection = pool
1185 .acquire()
1186 .await
1187 .map_err(PostgresStore::backend_failure)?;
1188 ensure_replay_history_is_available(connection.as_mut(), current_max_sequence_number).await
1189}
1190
1191async fn update_durable_stream_cursor(
1192 pool: &PgPool,
1193 durable_stream_id: &str,
1194 last_processed_sequence_number: u64,
1195) -> Result<(), EventStoreError> {
1196 sqlx::query(
1197 "UPDATE durable_stream_cursors
1198 SET last_processed_sequence_number = $2
1199 WHERE durable_stream_id = $1",
1200 )
1201 .bind(durable_stream_id)
1202 .bind(last_processed_sequence_number as i64)
1203 .execute(pool)
1204 .await
1205 .map_err(PostgresStore::backend_failure)?;
1206
1207 Ok(())
1208}
1209
1210async fn load_replay_batches(
1211 pool: &PgPool,
1212 event_query: &EventQuery,
1213 last_processed_sequence_number: u64,
1214 replay_until_sequence_number: u64,
1215) -> Result<Vec<ReplayBatch>, EventStoreError> {
1216 if replay_until_sequence_number <= last_processed_sequence_number {
1217 return Ok(Vec::new());
1218 }
1219
1220 let batch_rows = sqlx::query(
1221 "SELECT first_sequence_number, last_sequence_number
1222 FROM append_batches
1223 WHERE last_sequence_number > $1
1224 AND first_sequence_number <= $2
1225 ORDER BY first_sequence_number ASC",
1226 )
1227 .bind(last_processed_sequence_number as i64)
1228 .bind(replay_until_sequence_number as i64)
1229 .fetch_all(pool)
1230 .await
1231 .map_err(PostgresStore::backend_failure)?;
1232
1233 let mut replay_batches = Vec::new();
1234
1235 for batch_row in batch_rows {
1236 let first_sequence_number = batch_row.get::<i64, _>("first_sequence_number") as u64;
1237 let last_sequence_number = batch_row.get::<i64, _>("last_sequence_number") as u64;
1238 let event_rows = sqlx::query(
1239 "SELECT sequence_number, event_type, payload
1240 FROM events
1241 WHERE sequence_number >= $1 AND sequence_number <= $2
1242 ORDER BY sequence_number ASC",
1243 )
1244 .bind(first_sequence_number as i64)
1245 .bind(last_sequence_number as i64)
1246 .fetch_all(pool)
1247 .await
1248 .map_err(PostgresStore::backend_failure)?;
1249
1250 let delivered_batch = event_rows
1251 .into_iter()
1252 .map(event_record_from_row)
1253 .filter(|event_record| matches_query(event_query, event_record))
1254 .collect::<Vec<_>>();
1255
1256 replay_batches.push(ReplayBatch {
1257 last_processed_sequence_number: last_sequence_number,
1258 delivered_batch,
1259 });
1260 }
1261
1262 Ok(replay_batches)
1263}
1264
1265fn normalized_durable_event_query(event_query: &EventQuery) -> EventQuery {
1266 EventQuery {
1267 filters: event_query.filters.clone(),
1268 min_sequence_number: None,
1269 }
1270}
1271
1272fn serialize_event_query(event_query: &EventQuery) -> Result<String, EventStoreError> {
1273 let filters = event_query.filters.as_ref().map(|filters| {
1274 filters
1275 .iter()
1276 .map(|filter| {
1277 serde_json::json!({
1278 "event_types": filter.event_types,
1279 "payload_predicates": filter.payload_predicates,
1280 })
1281 })
1282 .collect::<Vec<_>>()
1283 });
1284
1285 serde_json::to_string(&serde_json::json!({
1286 "filters": filters,
1287 "min_sequence_number": event_query.min_sequence_number,
1288 }))
1289 .map_err(json_backend_failure)
1290}