1use std::future::Future;
2use std::io;
3use std::sync::{
4 Arc, Mutex,
5 mpsc::{self, Receiver, Sender},
6};
7use std::thread::{self, JoinHandle};
8
9use factstr::{
10 AppendResult, DurableStream, EventQuery, EventRecord, EventStore, EventStoreError, EventStream,
11 HandleStream, NewEvent, QueryResult,
12};
13use sqlx::{
14 PgPool, Postgres, QueryBuilder, Row, Transaction,
15 postgres::{PgPoolOptions, PgRow},
16};
17use time::OffsetDateTime;
18use tokio::runtime::Builder;
19use url::Url;
20
21use crate::query_match::matches_query;
22use crate::query_sql::push_query_conditions;
23use crate::stream_registry::{DeliveryOutcome, PendingDelivery, SubscriptionRegistry};
24
25#[derive(Clone, Debug)]
26struct CommittedAppend {
27 append_result: AppendResult,
28 event_records: Vec<EventRecord>,
29}
30
31#[derive(Clone, Debug)]
32struct ReplayBatch {
33 last_processed_sequence_number: u64,
34 delivered_batch: Vec<EventRecord>,
35}
36
37#[derive(Clone, Debug)]
38struct DurableReplayState {
39 subscription_id: u64,
40 last_processed_sequence_number: u64,
41 replay_until_sequence_number: u64,
42}
43
44enum WorkerCommand {
45 Query {
46 event_query: EventQuery,
47 reply: Sender<Result<QueryResult, EventStoreError>>,
48 },
49 Append {
50 new_events: Vec<NewEvent>,
51 reply: Sender<Result<AppendResult, EventStoreError>>,
52 },
53 AppendIf {
54 new_events: Vec<NewEvent>,
55 context_query: EventQuery,
56 expected_context_version: Option<u64>,
57 reply: Sender<Result<AppendResult, EventStoreError>>,
58 },
59 Shutdown,
60}
61
62enum DeliveryCommand {
63 Deliver(Vec<PendingDelivery>),
64 Shutdown,
65}
66
67#[derive(Clone, Debug, Eq, PartialEq)]
68pub struct PostgresBootstrapOptions {
69 pub server_url: String,
70 pub database_name: String,
71}
72
73pub struct PostgresStore {
74 connection_string: String,
75 subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
76 worker_sender: Mutex<Sender<WorkerCommand>>,
77 worker_thread: Mutex<Option<JoinHandle<()>>>,
78 delivery_sender: Sender<DeliveryCommand>,
79 delivery_thread: Mutex<Option<JoinHandle<()>>>,
80}
81
82impl PostgresStore {
83 pub fn connect(connection_string: &str) -> Result<Self, sqlx::Error> {
84 let connection_string = connection_string.to_owned();
85 bootstrap_connection(&connection_string)?;
86
87 let subscription_registry = Arc::new(Mutex::new(SubscriptionRegistry::default()));
88 let (delivery_sender, delivery_receiver) = mpsc::channel();
89 let delivery_thread = thread::Builder::new()
90 .name("factstr-postgres-delivery".to_owned())
91 .spawn({
92 let connection_string = connection_string.clone();
93 let subscription_registry = Arc::clone(&subscription_registry);
94 move || {
95 run_delivery_thread(connection_string, subscription_registry, delivery_receiver)
96 }
97 })
98 .map_err(sqlx_io_error)?;
99
100 let (worker_sender, worker_receiver) = mpsc::channel();
101 let (ready_sender, ready_receiver) = mpsc::sync_channel(1);
102 let worker_thread = thread::Builder::new()
103 .name("factstr-postgres-worker".to_owned())
104 .spawn({
105 let connection_string = connection_string.clone();
106 let subscription_registry = Arc::clone(&subscription_registry);
107 let delivery_sender = delivery_sender.clone();
108 move || {
109 run_worker_thread(
110 connection_string,
111 worker_receiver,
112 ready_sender,
113 subscription_registry,
114 delivery_sender,
115 )
116 }
117 })
118 .map_err(sqlx_io_error)?;
119
120 match ready_receiver.recv() {
121 Ok(Ok(())) => {}
122 Ok(Err(error)) => {
123 let _ = worker_thread.join();
124 let _ = delivery_sender.send(DeliveryCommand::Shutdown);
125 let _ = delivery_thread.join();
126 return Err(error);
127 }
128 Err(error) => {
129 let _ = worker_thread.join();
130 let _ = delivery_sender.send(DeliveryCommand::Shutdown);
131 let _ = delivery_thread.join();
132 return Err(sqlx_io_error(io::Error::other(format!(
133 "postgres worker startup channel failed: {error}"
134 ))));
135 }
136 }
137
138 Ok(Self {
139 connection_string,
140 subscription_registry,
141 worker_sender: Mutex::new(worker_sender),
142 worker_thread: Mutex::new(Some(worker_thread)),
143 delivery_sender,
144 delivery_thread: Mutex::new(Some(delivery_thread)),
145 })
146 }
147
148 pub fn bootstrap(options: PostgresBootstrapOptions) -> Result<Self, EventStoreError> {
149 bootstrap_database(&options)?;
150 let database_url = database_url_with_name(&options.server_url, &options.database_name)?;
151 Self::connect(&database_url).map_err(Self::backend_failure)
152 }
153
154 fn backend_failure(error: sqlx::Error) -> EventStoreError {
155 EventStoreError::BackendFailure {
156 message: error.to_string(),
157 }
158 }
159
160 fn worker_failure(message: impl Into<String>) -> EventStoreError {
161 EventStoreError::BackendFailure {
162 message: message.into(),
163 }
164 }
165
166 fn send_command(&self, worker_command: WorkerCommand) -> Result<(), EventStoreError> {
167 let worker_sender = self
168 .worker_sender
169 .lock()
170 .map_err(|_| Self::worker_failure("postgres worker sender lock poisoned"))?;
171
172 worker_sender
173 .send(worker_command)
174 .map_err(|error| Self::worker_failure(format!("postgres worker stopped: {error}")))
175 }
176
177 fn run_query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
178 let (reply_sender, reply_receiver) = mpsc::channel();
179 self.send_command(WorkerCommand::Query {
180 event_query: event_query.clone(),
181 reply: reply_sender,
182 })?;
183
184 reply_receiver.recv().map_err(|error| {
185 Self::worker_failure(format!("postgres worker query reply failed: {error}"))
186 })?
187 }
188
189 fn run_append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
190 let (reply_sender, reply_receiver) = mpsc::channel();
191 self.send_command(WorkerCommand::Append {
192 new_events,
193 reply: reply_sender,
194 })?;
195
196 reply_receiver.recv().map_err(|error| {
197 Self::worker_failure(format!("postgres worker append reply failed: {error}"))
198 })?
199 }
200
201 fn run_append_if(
202 &self,
203 new_events: Vec<NewEvent>,
204 context_query: &EventQuery,
205 expected_context_version: Option<u64>,
206 ) -> Result<AppendResult, EventStoreError> {
207 let (reply_sender, reply_receiver) = mpsc::channel();
208 self.send_command(WorkerCommand::AppendIf {
209 new_events,
210 context_query: context_query.clone(),
211 expected_context_version,
212 reply: reply_sender,
213 })?;
214
215 reply_receiver.recv().map_err(|error| {
216 Self::worker_failure(format!(
217 "postgres worker conditional append reply failed: {error}"
218 ))
219 })?
220 }
221
222 fn run_async<T, Fut, F>(&self, operation: &'static str, work: F) -> Result<T, EventStoreError>
223 where
224 T: Send + 'static,
225 Fut: Future<Output = Result<T, EventStoreError>> + Send + 'static,
226 F: FnOnce(PgPool) -> Fut + Send + 'static,
227 {
228 let connection_string = self.connection_string.clone();
229 let (result_sender, result_receiver) = mpsc::sync_channel(1);
230
231 let worker_thread = thread::Builder::new()
232 .name(format!("factstr-postgres-{operation}"))
233 .spawn(move || {
234 let runtime = Builder::new_current_thread()
235 .enable_all()
236 .build()
237 .map_err(sqlx_io_error)
238 .map_err(Self::backend_failure);
239
240 let result = match runtime {
241 Ok(runtime) => runtime.block_on(async {
242 let pool = PgPoolOptions::new()
243 .max_connections(1)
244 .connect(&connection_string)
245 .await
246 .map_err(Self::backend_failure)?;
247 work(pool).await
248 }),
249 Err(error) => Err(error),
250 };
251
252 let _ = result_sender.send(result);
253 })
254 .map_err(sqlx_io_error)
255 .map_err(Self::backend_failure)?;
256
257 let result = result_receiver
258 .recv()
259 .map_err(|error| EventStoreError::BackendFailure {
260 message: format!("postgres {operation} thread did not return a result: {error}"),
261 })?;
262
263 worker_thread
264 .join()
265 .map_err(|_| EventStoreError::BackendFailure {
266 message: format!("postgres {operation} thread panicked before completion"),
267 })?;
268
269 result
270 }
271
272 fn enqueue_delivery(&self, pending_deliveries: Vec<PendingDelivery>) {
273 Self::enqueue_delivery_with_sender(&self.delivery_sender, pending_deliveries);
274 }
275
276 fn enqueue_delivery_with_sender(
277 delivery_sender: &Sender<DeliveryCommand>,
278 pending_deliveries: Vec<PendingDelivery>,
279 ) {
280 if pending_deliveries.is_empty() {
281 return;
282 }
283
284 if let Err(error) = delivery_sender.send(DeliveryCommand::Deliver(pending_deliveries)) {
285 eprintln!(
286 "factstr-postgres delivery dispatcher stopped after commit: {}",
287 error
288 );
289 }
290 }
291
292 fn register_all_durable_stream(
293 &self,
294 durable_stream_id: impl Into<String>,
295 handle: HandleStream,
296 ) -> Result<EventStream, EventStoreError> {
297 self.stream_durable(durable_stream_id.into(), EventQuery::all(), handle)
298 }
299
300 fn register_durable_stream(
301 &self,
302 durable_stream_id: impl Into<String>,
303 event_query: &EventQuery,
304 handle: HandleStream,
305 ) -> Result<EventStream, EventStoreError> {
306 self.stream_durable(durable_stream_id.into(), event_query.clone(), handle)
307 }
308
309 fn stream_durable(
310 &self,
311 durable_stream_id: String,
312 event_query: EventQuery,
313 handle: HandleStream,
314 ) -> Result<EventStream, EventStoreError> {
315 let normalized_event_query = normalized_durable_event_query(&event_query);
316 let event_query_json = serialize_event_query(&normalized_event_query)?;
317 let subscription_registry = Arc::clone(&self.subscription_registry);
318 let durable_stream_id_for_registry = durable_stream_id.clone();
319 let normalized_event_query_for_registry = normalized_event_query.clone();
320 let handle_for_registry = handle.clone();
321 let replay_stream_id = durable_stream_id.clone();
322
323 let replay_state = self.run_async("stream_durable", move |pool| async move {
324 let mut transaction = pool.begin().await.map_err(Self::backend_failure)?;
325
326 let last_processed_sequence_number = load_or_create_durable_stream_cursor(
327 &mut transaction,
328 &durable_stream_id_for_registry,
329 &event_query_json,
330 )
331 .await?;
332 let replay_until_sequence_number =
333 current_max_sequence_number_in_transaction(&mut transaction).await?;
334
335 let subscription_id = match subscription_registry.lock() {
336 Ok(mut subscription_registry) => {
337 if normalized_event_query_for_registry.filters.is_none() {
338 subscription_registry
339 .subscribe_all_durable(
340 durable_stream_id_for_registry.clone(),
341 replay_until_sequence_number,
342 handle_for_registry.clone(),
343 )
344 .map_err(subscription_registry_backend_failure)?
345 } else {
346 subscription_registry
347 .subscribe_to_durable(
348 durable_stream_id_for_registry.clone(),
349 Some(normalized_event_query_for_registry.clone()),
350 replay_until_sequence_number,
351 handle_for_registry.clone(),
352 )
353 .map_err(subscription_registry_backend_failure)?
354 }
355 }
356 Err(poisoned) => {
357 let mut subscription_registry = poisoned.into_inner();
358 if normalized_event_query_for_registry.filters.is_none() {
359 subscription_registry
360 .subscribe_all_durable(
361 durable_stream_id_for_registry.clone(),
362 replay_until_sequence_number,
363 handle_for_registry.clone(),
364 )
365 .map_err(subscription_registry_backend_failure)?
366 } else {
367 subscription_registry
368 .subscribe_to_durable(
369 durable_stream_id_for_registry.clone(),
370 Some(normalized_event_query_for_registry.clone()),
371 replay_until_sequence_number,
372 handle_for_registry.clone(),
373 )
374 .map_err(subscription_registry_backend_failure)?
375 }
376 }
377 };
378
379 if let Err(error) = transaction.commit().await {
380 match subscription_registry.lock() {
381 Ok(mut subscription_registry) => {
382 subscription_registry.unsubscribe(subscription_id)
383 }
384 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
385 }
386 return Err(Self::backend_failure(error));
387 }
388
389 Ok(DurableReplayState {
390 subscription_id,
391 last_processed_sequence_number,
392 replay_until_sequence_number,
393 })
394 })?;
395
396 let subscription = self.build_subscription_handle(replay_state.subscription_id);
397
398 let replay_batches = match self.run_async("durable_replay", {
399 let event_query = normalized_event_query.clone();
400 move |pool| async move {
401 ensure_replay_history_is_available_for_pool(
402 &pool,
403 replay_state.replay_until_sequence_number,
404 )
405 .await?;
406 load_replay_batches(
407 &pool,
408 &event_query,
409 replay_state.last_processed_sequence_number,
410 replay_state.replay_until_sequence_number,
411 )
412 .await
413 }
414 }) {
415 Ok(replay_batches) => replay_batches,
416 Err(error) => {
417 self.cleanup_durable_subscription(replay_state.subscription_id);
418 return Err(error);
419 }
420 };
421
422 for replay_batch in replay_batches {
423 let pending_delivery = PendingDelivery {
424 subscription_id: replay_state.subscription_id,
425 durable_stream_id: Some(replay_stream_id.clone()),
426 last_processed_sequence_number: replay_batch.last_processed_sequence_number,
427 delivered_batch: replay_batch.delivered_batch,
428 handle: handle.clone(),
429 };
430
431 match self.process_durable_delivery(pending_delivery) {
432 Ok(true) => {}
433 Ok(false) => {
434 self.cleanup_durable_subscription(replay_state.subscription_id);
435 return Err(EventStoreError::BackendFailure {
436 message: format!(
437 "durable replay for stream {} did not complete successfully",
438 replay_stream_id
439 ),
440 });
441 }
442 Err(error) => {
443 self.cleanup_durable_subscription(replay_state.subscription_id);
444 return Err(error);
445 }
446 }
447 }
448
449 self.finish_durable_replay(replay_state.subscription_id);
450 Ok(subscription)
451 }
452
453 fn build_subscription_handle(&self, subscription_id: u64) -> EventStream {
454 let subscription_registry = Arc::clone(&self.subscription_registry);
455
456 EventStream::new(
457 subscription_id,
458 Arc::new(move |subscription_id| match subscription_registry.lock() {
459 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
460 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
461 }),
462 )
463 }
464
465 fn process_durable_delivery(
466 &self,
467 pending_delivery: PendingDelivery,
468 ) -> Result<bool, EventStoreError> {
469 match pending_delivery.deliver() {
470 DeliveryOutcome::Succeeded {
471 durable_stream_id,
472 last_processed_sequence_number,
473 ..
474 } => {
475 if let Some(durable_stream_id) = durable_stream_id {
476 self.run_async("advance_durable_cursor", move |pool| async move {
477 update_durable_stream_cursor(
478 &pool,
479 &durable_stream_id,
480 last_processed_sequence_number,
481 )
482 .await
483 })?;
484 }
485
486 Ok(true)
487 }
488 DeliveryOutcome::Failed { .. } | DeliveryOutcome::Panicked { .. } => Ok(false),
489 }
490 }
491
492 fn finish_durable_replay(&self, subscription_id: u64) {
493 match self.subscription_registry.lock() {
494 Ok(mut subscription_registry) => {
495 let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
496 self.enqueue_delivery(buffered_deliveries);
497 }
498 Err(poisoned) => {
499 let mut subscription_registry = poisoned.into_inner();
500 let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
501 self.enqueue_delivery(buffered_deliveries);
502 }
503 }
504 }
505
506 fn cleanup_durable_subscription(&self, subscription_id: u64) {
507 match self.subscription_registry.lock() {
508 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
509 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
510 }
511 }
512}
513
514impl Drop for PostgresStore {
515 fn drop(&mut self) {
516 if let Ok(worker_sender) = self.worker_sender.lock() {
517 let _ = worker_sender.send(WorkerCommand::Shutdown);
518 }
519
520 if let Ok(mut worker_thread) = self.worker_thread.lock() {
521 if let Some(worker_thread) = worker_thread.take() {
522 let _ = worker_thread.join();
523 }
524 }
525
526 let _ = self.delivery_sender.send(DeliveryCommand::Shutdown);
527
528 if let Ok(mut delivery_thread) = self.delivery_thread.lock() {
529 if let Some(delivery_thread) = delivery_thread.take() {
530 let _ = delivery_thread.join();
531 }
532 }
533 }
534}
535
536impl EventStore for PostgresStore {
537 fn query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
538 self.run_query(event_query)
539 }
540
541 fn append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
542 if new_events.is_empty() {
543 return Err(EventStoreError::EmptyAppend);
544 }
545
546 self.run_append(new_events)
547 }
548
549 fn append_if(
550 &self,
551 new_events: Vec<NewEvent>,
552 context_query: &EventQuery,
553 expected_context_version: Option<u64>,
554 ) -> Result<AppendResult, EventStoreError> {
555 if new_events.is_empty() {
556 return Err(EventStoreError::EmptyAppend);
557 }
558
559 self.run_append_if(new_events, context_query, expected_context_version)
560 }
561
562 fn stream_all(&self, handle: HandleStream) -> Result<EventStream, EventStoreError> {
563 let subscription_registry = Arc::clone(&self.subscription_registry);
564 let id = match subscription_registry.lock() {
565 Ok(mut subscription_registry) => subscription_registry.subscribe_all(handle),
566 Err(poisoned) => poisoned.into_inner().subscribe_all(handle),
567 };
568
569 Ok(EventStream::new(
570 id,
571 Arc::new(move |subscription_id| match subscription_registry.lock() {
572 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
573 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
574 }),
575 ))
576 }
577
578 fn stream_to(
579 &self,
580 event_query: &EventQuery,
581 handle: HandleStream,
582 ) -> Result<EventStream, EventStoreError> {
583 let subscription_registry = Arc::clone(&self.subscription_registry);
584 let id = match subscription_registry.lock() {
585 Ok(mut subscription_registry) => {
586 subscription_registry.subscribe_to(Some(event_query.clone()), handle)
587 }
588 Err(poisoned) => poisoned
589 .into_inner()
590 .subscribe_to(Some(event_query.clone()), handle),
591 };
592
593 Ok(EventStream::new(
594 id,
595 Arc::new(move |subscription_id| match subscription_registry.lock() {
596 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
597 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
598 }),
599 ))
600 }
601
602 fn stream_all_durable(
603 &self,
604 durable_stream: &DurableStream,
605 handle: HandleStream,
606 ) -> Result<EventStream, EventStoreError> {
607 self.register_all_durable_stream(durable_stream.name(), handle)
608 }
609
610 fn stream_to_durable(
611 &self,
612 durable_stream: &DurableStream,
613 event_query: &EventQuery,
614 handle: HandleStream,
615 ) -> Result<EventStream, EventStoreError> {
616 self.register_durable_stream(durable_stream.name(), event_query, handle)
617 }
618}
619
620fn run_worker_thread(
621 connection_string: String,
622 worker_receiver: Receiver<WorkerCommand>,
623 ready_sender: mpsc::SyncSender<Result<(), sqlx::Error>>,
624 subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
625 delivery_sender: Sender<DeliveryCommand>,
626) {
627 let runtime = match Builder::new_current_thread().enable_all().build() {
628 Ok(runtime) => runtime,
629 Err(error) => {
630 let _ = ready_sender.send(Err(sqlx_io_error(io::Error::other(format!(
631 "tokio runtime should build: {error}"
632 )))));
633 return;
634 }
635 };
636
637 let pool = match runtime.block_on(async {
638 PgPoolOptions::new()
639 .max_connections(1)
640 .connect(&connection_string)
641 .await
642 }) {
643 Ok(pool) => pool,
644 Err(error) => {
645 let _ = ready_sender.send(Err(error));
646 return;
647 }
648 };
649
650 if ready_sender.send(Ok(())).is_err() {
651 return;
652 }
653
654 while let Ok(worker_command) = worker_receiver.recv() {
655 match worker_command {
656 WorkerCommand::Query { event_query, reply } => {
657 let result = runtime
658 .block_on(query_with_pool(&pool, &event_query))
659 .map_err(PostgresStore::backend_failure);
660 let _ = reply.send(result);
661 }
662 WorkerCommand::Append { new_events, reply } => {
663 let result = runtime
664 .block_on(append_with_pool(&pool, new_events))
665 .map_err(PostgresStore::backend_failure);
666 if let Ok(committed_append) = &result {
667 let pending_deliveries = match subscription_registry.lock() {
668 Ok(mut subscription_registry) => subscription_registry
669 .pending_deliveries(&committed_append.event_records),
670 Err(poisoned) => poisoned
671 .into_inner()
672 .pending_deliveries(&committed_append.event_records),
673 };
674 PostgresStore::enqueue_delivery_with_sender(
675 &delivery_sender,
676 pending_deliveries,
677 );
678 }
679 let result = result.map(|committed_append| committed_append.append_result);
680 let _ = reply.send(result);
681 }
682 WorkerCommand::AppendIf {
683 new_events,
684 context_query,
685 expected_context_version,
686 reply,
687 } => {
688 let result = runtime
689 .block_on(append_if_with_pool(
690 &pool,
691 new_events,
692 &context_query,
693 expected_context_version,
694 ))
695 .map_err(PostgresStore::backend_failure)
696 .and_then(|result| result);
697 if let Ok(committed_append) = &result {
698 let pending_deliveries = match subscription_registry.lock() {
699 Ok(mut subscription_registry) => subscription_registry
700 .pending_deliveries(&committed_append.event_records),
701 Err(poisoned) => poisoned
702 .into_inner()
703 .pending_deliveries(&committed_append.event_records),
704 };
705 PostgresStore::enqueue_delivery_with_sender(
706 &delivery_sender,
707 pending_deliveries,
708 );
709 }
710 let result = result.map(|committed_append| committed_append.append_result);
711 let _ = reply.send(result);
712 }
713 WorkerCommand::Shutdown => break,
714 }
715 }
716}
717
718fn run_delivery_thread(
719 connection_string: String,
720 subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
721 delivery_receiver: Receiver<DeliveryCommand>,
722) {
723 let runtime = match Builder::new_current_thread().enable_all().build() {
724 Ok(runtime) => runtime,
725 Err(error) => {
726 eprintln!(
727 "factstr-postgres delivery runtime could not start: {}",
728 error
729 );
730 return;
731 }
732 };
733
734 let pool = match runtime.block_on(async {
735 PgPoolOptions::new()
736 .max_connections(1)
737 .connect(&connection_string)
738 .await
739 }) {
740 Ok(pool) => pool,
741 Err(error) => {
742 eprintln!(
743 "factstr-postgres delivery pool could not connect: {}",
744 error
745 );
746 return;
747 }
748 };
749
750 while let Ok(delivery_command) = delivery_receiver.recv() {
751 match delivery_command {
752 DeliveryCommand::Deliver(pending_deliveries) => {
753 for pending_delivery in pending_deliveries {
754 match pending_delivery.deliver() {
755 DeliveryOutcome::Succeeded {
756 subscription_id,
757 durable_stream_id,
758 last_processed_sequence_number,
759 } => {
760 if let Some(durable_stream_id) = durable_stream_id {
761 if let Err(error) = runtime.block_on(update_durable_stream_cursor(
762 &pool,
763 &durable_stream_id,
764 last_processed_sequence_number,
765 )) {
766 eprintln!(
767 "factstr-postgres durable cursor update failed after delivery for stream {}: {}",
768 durable_stream_id, error
769 );
770 match subscription_registry.lock() {
771 Ok(mut subscription_registry) => {
772 subscription_registry.unsubscribe(subscription_id)
773 }
774 Err(poisoned) => {
775 poisoned.into_inner().unsubscribe(subscription_id)
776 }
777 }
778 }
779 }
780 }
781 DeliveryOutcome::Failed {
782 subscription_id,
783 durable_stream_id,
784 }
785 | DeliveryOutcome::Panicked {
786 subscription_id,
787 durable_stream_id,
788 } => {
789 if durable_stream_id.is_some() {
790 match subscription_registry.lock() {
791 Ok(mut subscription_registry) => {
792 subscription_registry.unsubscribe(subscription_id)
793 }
794 Err(poisoned) => {
795 poisoned.into_inner().unsubscribe(subscription_id)
796 }
797 }
798 }
799 }
800 }
801 }
802 }
803 DeliveryCommand::Shutdown => break,
804 }
805 }
806}
807
808fn sqlx_io_error(error: io::Error) -> sqlx::Error {
809 sqlx::Error::Io(error)
810}
811
812fn backend_failure_message(message: impl Into<String>) -> EventStoreError {
813 EventStoreError::BackendFailure {
814 message: message.into(),
815 }
816}
817
818fn bootstrap_database(options: &PostgresBootstrapOptions) -> Result<(), EventStoreError> {
819 let options = options.clone();
820 let (result_sender, result_receiver) = mpsc::sync_channel(1);
821
822 let bootstrap_thread = thread::Builder::new()
823 .name("factstr-postgres-database-bootstrap".to_owned())
824 .spawn(move || {
825 let runtime = Builder::new_current_thread()
826 .enable_all()
827 .build()
828 .map_err(sqlx_io_error)
829 .map_err(PostgresStore::backend_failure);
830
831 let result = match runtime {
832 Ok(runtime) => runtime.block_on(async move {
833 ensure_database_exists(&options.server_url, &options.database_name).await
834 }),
835 Err(error) => Err(error),
836 };
837
838 let _ = result_sender.send(result);
839 })
840 .map_err(sqlx_io_error)
841 .map_err(PostgresStore::backend_failure)?;
842
843 let result = result_receiver.recv().map_err(|error| {
844 backend_failure_message(format!(
845 "postgres bootstrap database thread did not return a result: {error}"
846 ))
847 })?;
848
849 bootstrap_thread.join().map_err(|_| {
850 backend_failure_message("postgres bootstrap database thread panicked before completion")
851 })?;
852
853 result
854}
855
856async fn ensure_database_exists(
857 server_url: &str,
858 database_name: &str,
859) -> Result<(), EventStoreError> {
860 validate_bootstrap_database_name(database_name)?;
861 let _ = database_url_with_name(server_url, database_name)?;
862
863 let quoted_database_name = quote_postgres_identifier(database_name)?;
864 let pool = PgPoolOptions::new()
865 .max_connections(1)
866 .connect(server_url)
867 .await
868 .map_err(|error| {
869 backend_failure_message(format!(
870 "postgres bootstrap could not connect to server_url {server_url:?}: {error}"
871 ))
872 })?;
873
874 let database_exists = sqlx::query_scalar::<_, bool>(
875 "SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = $1)",
876 )
877 .bind(database_name)
878 .fetch_one(&pool)
879 .await
880 .map_err(|error| {
881 backend_failure_message(format!(
882 "postgres bootstrap could not inspect database {database_name:?}: {error}"
883 ))
884 })?;
885
886 if database_exists {
887 return Ok(());
888 }
889
890 let create_database_sql = format!("CREATE DATABASE {quoted_database_name}");
891 match sqlx::query(&create_database_sql).execute(&pool).await {
892 Ok(_) => Ok(()),
893 Err(sqlx::Error::Database(database_error))
894 if database_error.code().as_deref() == Some("42P04") =>
895 {
896 Ok(())
897 }
898 Err(error) => Err(backend_failure_message(format!(
899 "postgres bootstrap could not create database {database_name:?}: {error}"
900 ))),
901 }
902}
903
904fn quote_postgres_identifier(identifier: &str) -> Result<String, EventStoreError> {
905 if identifier.is_empty() {
906 return Err(backend_failure_message(
907 "postgres bootstrap database_name must not be empty",
908 ));
909 }
910
911 if identifier.contains('\0') {
912 return Err(backend_failure_message(
913 "postgres bootstrap database_name must not contain null bytes",
914 ));
915 }
916
917 Ok(format!("\"{}\"", identifier.replace('"', "\"\"")))
918}
919
920fn validate_bootstrap_database_name(database_name: &str) -> Result<(), EventStoreError> {
921 if database_name.is_empty() {
922 return Err(backend_failure_message(
923 "postgres bootstrap database_name must not be empty",
924 ));
925 }
926
927 if database_name.contains('\0') {
928 return Err(backend_failure_message(
929 "postgres bootstrap database_name must not contain null bytes",
930 ));
931 }
932
933 let mut characters = database_name.chars();
934 let Some(first_character) = characters.next() else {
935 return Err(backend_failure_message(
936 "postgres bootstrap database_name must not be empty",
937 ));
938 };
939
940 if !(first_character.is_ascii_alphabetic() || first_character == '_') {
941 return Err(backend_failure_message(
942 "postgres bootstrap database_name must match [A-Za-z_][A-Za-z0-9_]*",
943 ));
944 }
945
946 if !characters.all(|character| character.is_ascii_alphanumeric() || character == '_') {
947 return Err(backend_failure_message(
948 "postgres bootstrap database_name must match [A-Za-z_][A-Za-z0-9_]*",
949 ));
950 }
951
952 Ok(())
953}
954
955fn database_url_with_name(
956 server_url: &str,
957 database_name: &str,
958) -> Result<String, EventStoreError> {
959 let mut url = Url::parse(server_url).map_err(|error| {
960 backend_failure_message(format!(
961 "invalid postgres server_url {server_url:?}: {error}"
962 ))
963 })?;
964 url.set_path(&format!("/{database_name}"));
965 Ok(url.into())
966}
967
968fn bootstrap_connection(connection_string: &str) -> Result<(), sqlx::Error> {
969 let connection_string = connection_string.to_owned();
970 let (result_sender, result_receiver) = mpsc::sync_channel(1);
971
972 let bootstrap_thread = thread::Builder::new()
973 .name("factstr-postgres-bootstrap".to_owned())
974 .spawn(move || {
975 let runtime = Builder::new_current_thread()
976 .enable_all()
977 .build()
978 .map_err(sqlx_io_error);
979
980 let result = match runtime {
981 Ok(runtime) => runtime.block_on(async {
982 let pool = PgPoolOptions::new()
983 .max_connections(1)
984 .connect(&connection_string)
985 .await?;
986 initialize_schema(&pool).await?;
987 Ok::<_, sqlx::Error>(())
988 }),
989 Err(error) => Err(error),
990 };
991
992 let _ = result_sender.send(result);
993 })
994 .map_err(sqlx_io_error)?;
995
996 let result = match result_receiver.recv() {
997 Ok(result) => result,
998 Err(error) => Err(sqlx_io_error(io::Error::other(format!(
999 "postgres bootstrap thread did not return a result: {error}"
1000 )))),
1001 };
1002
1003 bootstrap_thread.join().map_err(|_| {
1004 sqlx_io_error(io::Error::other(
1005 "postgres bootstrap thread panicked before connect completed",
1006 ))
1007 })?;
1008
1009 result
1010}
1011
1012async fn query_with_pool(
1013 pool: &PgPool,
1014 event_query: &EventQuery,
1015) -> Result<QueryResult, sqlx::Error> {
1016 let current_context_version = current_context_version(pool, event_query).await?;
1017
1018 let mut query_builder: QueryBuilder<'_, Postgres> =
1019 QueryBuilder::new("SELECT sequence_number, occurred_at, event_type, payload FROM events");
1020 push_query_conditions(&mut query_builder, event_query, true);
1021 query_builder.push(" ORDER BY sequence_number ASC");
1022
1023 let event_records = query_builder
1024 .build()
1025 .fetch_all(pool)
1026 .await?
1027 .into_iter()
1028 .map(event_record_from_row)
1029 .collect::<Result<Vec<_>, _>>()?;
1030
1031 let last_returned_sequence_number = event_records
1032 .last()
1033 .map(|event_record| event_record.sequence_number);
1034
1035 Ok(QueryResult {
1036 event_records,
1037 last_returned_sequence_number,
1038 current_context_version,
1039 })
1040}
1041
1042async fn append_with_pool(
1043 pool: &PgPool,
1044 new_events: Vec<NewEvent>,
1045) -> Result<CommittedAppend, sqlx::Error> {
1046 let mut transaction = pool.begin().await?;
1047 lock_events_tables(&mut transaction).await?;
1048 let committed_append = append_records(&mut transaction, new_events).await?;
1049 transaction.commit().await?;
1050 Ok(committed_append)
1051}
1052
1053async fn append_if_with_pool(
1054 pool: &PgPool,
1055 new_events: Vec<NewEvent>,
1056 context_query: &EventQuery,
1057 expected_context_version: Option<u64>,
1058) -> Result<Result<CommittedAppend, EventStoreError>, sqlx::Error> {
1059 let mut transaction = pool.begin().await?;
1060 lock_events_tables(&mut transaction).await?;
1061
1062 let actual_context_version =
1063 current_context_version_in_transaction(&mut transaction, context_query).await?;
1064
1065 if actual_context_version != expected_context_version {
1066 transaction.rollback().await?;
1067 return Ok(Err(EventStoreError::ConditionalAppendConflict {
1068 expected: expected_context_version,
1069 actual: actual_context_version,
1070 }));
1071 }
1072
1073 let committed_append = append_records(&mut transaction, new_events).await?;
1074 transaction.commit().await?;
1075
1076 Ok(Ok(committed_append))
1077}
1078
1079async fn initialize_schema(pool: &PgPool) -> Result<(), sqlx::Error> {
1080 sqlx::query(
1081 "CREATE TABLE IF NOT EXISTS events (
1082 sequence_number BIGINT PRIMARY KEY,
1083 occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
1084 event_type TEXT NOT NULL,
1085 payload JSONB NOT NULL
1086 )",
1087 )
1088 .execute(pool)
1089 .await?;
1090
1091 sqlx::query(
1092 "CREATE TABLE IF NOT EXISTS append_batches (
1093 first_sequence_number BIGINT PRIMARY KEY,
1094 last_sequence_number BIGINT NOT NULL
1095 )",
1096 )
1097 .execute(pool)
1098 .await?;
1099
1100 sqlx::query(
1101 "CREATE TABLE IF NOT EXISTS durable_stream_cursors (
1102 durable_stream_id TEXT PRIMARY KEY,
1103 event_query TEXT NOT NULL,
1104 last_processed_sequence_number BIGINT NOT NULL
1105 )",
1106 )
1107 .execute(pool)
1108 .await?;
1109
1110 sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type)")
1111 .execute(pool)
1112 .await?;
1113
1114 sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_occurred_at ON events(occurred_at)")
1115 .execute(pool)
1116 .await?;
1117
1118 sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_payload_gin ON events USING gin(payload)")
1119 .execute(pool)
1120 .await?;
1121
1122 Ok(())
1123}
1124
1125async fn lock_events_tables(
1126 transaction: &mut Transaction<'_, Postgres>,
1127) -> Result<(), sqlx::Error> {
1128 sqlx::query("LOCK TABLE events, append_batches IN EXCLUSIVE MODE")
1129 .execute(transaction.as_mut())
1130 .await?;
1131
1132 Ok(())
1133}
1134
1135async fn append_records(
1136 transaction: &mut Transaction<'_, Postgres>,
1137 new_events: Vec<NewEvent>,
1138) -> Result<CommittedAppend, sqlx::Error> {
1139 let committed_count = new_events.len() as u64;
1140 let first_sequence_number =
1141 sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(sequence_number), 0) + 1 FROM events")
1142 .fetch_one(transaction.as_mut())
1143 .await? as u64;
1144 let last_sequence_number = first_sequence_number + committed_count - 1;
1145 let committed_event_records = new_events
1146 .into_iter()
1147 .enumerate()
1148 .map(|(offset, new_event)| EventRecord {
1149 sequence_number: first_sequence_number + offset as u64,
1150 occurred_at: OffsetDateTime::now_utc(),
1151 event_type: new_event.event_type,
1152 payload: new_event.payload,
1153 })
1154 .collect::<Vec<_>>();
1155
1156 for event_record in &committed_event_records {
1157 sqlx::query(
1158 "INSERT INTO events (sequence_number, occurred_at, event_type, payload)
1159 VALUES ($1, $2, $3, $4)",
1160 )
1161 .bind(event_record.sequence_number as i64)
1162 .bind(event_record.occurred_at)
1163 .bind(&event_record.event_type)
1164 .bind(&event_record.payload)
1165 .execute(transaction.as_mut())
1166 .await?;
1167 }
1168
1169 sqlx::query(
1170 "INSERT INTO append_batches (first_sequence_number, last_sequence_number)
1171 VALUES ($1, $2)",
1172 )
1173 .bind(first_sequence_number as i64)
1174 .bind(last_sequence_number as i64)
1175 .execute(transaction.as_mut())
1176 .await?;
1177
1178 Ok(CommittedAppend {
1179 append_result: AppendResult {
1180 first_sequence_number,
1181 last_sequence_number,
1182 committed_count,
1183 },
1184 event_records: committed_event_records,
1185 })
1186}
1187
1188async fn current_context_version(
1189 pool: &PgPool,
1190 event_query: &EventQuery,
1191) -> Result<Option<u64>, sqlx::Error> {
1192 let mut query_builder: QueryBuilder<'_, Postgres> =
1193 QueryBuilder::new("SELECT MAX(sequence_number) FROM events");
1194 push_query_conditions(&mut query_builder, event_query, false);
1195
1196 Ok(query_builder
1197 .build_query_scalar::<Option<i64>>()
1198 .fetch_one(pool)
1199 .await?
1200 .map(|sequence_number| sequence_number as u64))
1201}
1202
1203async fn current_context_version_in_transaction(
1204 transaction: &mut Transaction<'_, Postgres>,
1205 event_query: &EventQuery,
1206) -> Result<Option<u64>, sqlx::Error> {
1207 let mut query_builder: QueryBuilder<'_, Postgres> =
1208 QueryBuilder::new("SELECT MAX(sequence_number) FROM events");
1209 push_query_conditions(&mut query_builder, event_query, false);
1210
1211 Ok(query_builder
1212 .build_query_scalar::<Option<i64>>()
1213 .fetch_one(transaction.as_mut())
1214 .await?
1215 .map(|sequence_number| sequence_number as u64))
1216}
1217
1218fn event_record_from_row(row: PgRow) -> Result<EventRecord, sqlx::Error> {
1219 Ok(EventRecord {
1220 sequence_number: row.try_get::<i64, _>("sequence_number")? as u64,
1221 occurred_at: row.try_get("occurred_at")?,
1222 event_type: row.try_get("event_type")?,
1223 payload: row.try_get("payload")?,
1224 })
1225}
1226
1227fn json_backend_failure(error: serde_json::Error) -> EventStoreError {
1228 EventStoreError::BackendFailure {
1229 message: error.to_string(),
1230 }
1231}
1232
1233fn subscription_registry_backend_failure(message: String) -> EventStoreError {
1234 EventStoreError::BackendFailure { message }
1235}
1236
1237async fn load_or_create_durable_stream_cursor(
1238 transaction: &mut Transaction<'_, Postgres>,
1239 durable_stream_id: &str,
1240 event_query_json: &str,
1241) -> Result<u64, EventStoreError> {
1242 let existing_cursor = sqlx::query(
1243 "SELECT event_query, last_processed_sequence_number
1244 FROM durable_stream_cursors
1245 WHERE durable_stream_id = $1",
1246 )
1247 .bind(durable_stream_id)
1248 .fetch_optional(transaction.as_mut())
1249 .await
1250 .map_err(PostgresStore::backend_failure)?;
1251
1252 match existing_cursor {
1253 Some(row) => {
1254 let stored_event_query = row.get::<String, _>("event_query");
1255 if stored_event_query != event_query_json {
1256 return Err(EventStoreError::BackendFailure {
1257 message: format!(
1258 "durable stream {durable_stream_id} was resumed with a different query"
1259 ),
1260 });
1261 }
1262
1263 Ok(row.get::<i64, _>("last_processed_sequence_number") as u64)
1264 }
1265 None => {
1266 sqlx::query(
1267 "INSERT INTO durable_stream_cursors (
1268 durable_stream_id,
1269 event_query,
1270 last_processed_sequence_number
1271 ) VALUES ($1, $2, 0)",
1272 )
1273 .bind(durable_stream_id)
1274 .bind(event_query_json)
1275 .execute(transaction.as_mut())
1276 .await
1277 .map_err(PostgresStore::backend_failure)?;
1278
1279 Ok(0)
1280 }
1281 }
1282}
1283
1284async fn current_max_sequence_number_in_transaction(
1285 transaction: &mut Transaction<'_, Postgres>,
1286) -> Result<u64, EventStoreError> {
1287 Ok(
1288 sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(sequence_number) FROM events")
1289 .fetch_one(transaction.as_mut())
1290 .await
1291 .map_err(PostgresStore::backend_failure)?
1292 .unwrap_or(0) as u64,
1293 )
1294}
1295
1296async fn ensure_replay_history_is_available(
1297 connection: &mut sqlx::PgConnection,
1298 current_max_sequence_number: u64,
1299) -> Result<(), EventStoreError> {
1300 if current_max_sequence_number == 0 {
1301 return Ok(());
1302 }
1303
1304 let batch_rows = sqlx::query(
1305 "SELECT first_sequence_number, last_sequence_number
1306 FROM append_batches
1307 ORDER BY first_sequence_number ASC",
1308 )
1309 .fetch_all(&mut *connection)
1310 .await
1311 .map_err(PostgresStore::backend_failure)?;
1312
1313 if batch_rows.is_empty() {
1314 return Err(EventStoreError::BackendFailure {
1315 message: "durable replay requires append_batches history for all persisted events"
1316 .to_owned(),
1317 });
1318 }
1319
1320 let mut expected_first_sequence_number = 1_u64;
1321
1322 for batch_row in batch_rows {
1323 let first_sequence_number = batch_row.get::<i64, _>("first_sequence_number") as u64;
1324 let last_sequence_number = batch_row.get::<i64, _>("last_sequence_number") as u64;
1325
1326 if first_sequence_number != expected_first_sequence_number
1327 || last_sequence_number < first_sequence_number
1328 {
1329 return Err(EventStoreError::BackendFailure {
1330 message:
1331 "durable replay requires contiguous append_batches history for all persisted events"
1332 .to_owned(),
1333 });
1334 }
1335
1336 expected_first_sequence_number = last_sequence_number + 1;
1337 }
1338
1339 if expected_first_sequence_number - 1 != current_max_sequence_number {
1340 return Err(EventStoreError::BackendFailure {
1341 message: "durable replay requires append_batches history for all persisted events"
1342 .to_owned(),
1343 });
1344 }
1345
1346 Ok(())
1347}
1348
1349async fn ensure_replay_history_is_available_for_pool(
1350 pool: &PgPool,
1351 current_max_sequence_number: u64,
1352) -> Result<(), EventStoreError> {
1353 let mut connection = pool
1354 .acquire()
1355 .await
1356 .map_err(PostgresStore::backend_failure)?;
1357 ensure_replay_history_is_available(connection.as_mut(), current_max_sequence_number).await
1358}
1359
1360async fn update_durable_stream_cursor(
1361 pool: &PgPool,
1362 durable_stream_id: &str,
1363 last_processed_sequence_number: u64,
1364) -> Result<(), EventStoreError> {
1365 sqlx::query(
1366 "UPDATE durable_stream_cursors
1367 SET last_processed_sequence_number = $2
1368 WHERE durable_stream_id = $1",
1369 )
1370 .bind(durable_stream_id)
1371 .bind(last_processed_sequence_number as i64)
1372 .execute(pool)
1373 .await
1374 .map_err(PostgresStore::backend_failure)?;
1375
1376 Ok(())
1377}
1378
1379async fn load_replay_batches(
1380 pool: &PgPool,
1381 event_query: &EventQuery,
1382 last_processed_sequence_number: u64,
1383 replay_until_sequence_number: u64,
1384) -> Result<Vec<ReplayBatch>, EventStoreError> {
1385 if replay_until_sequence_number <= last_processed_sequence_number {
1386 return Ok(Vec::new());
1387 }
1388
1389 let batch_rows = sqlx::query(
1390 "SELECT first_sequence_number, last_sequence_number
1391 FROM append_batches
1392 WHERE last_sequence_number > $1
1393 AND first_sequence_number <= $2
1394 ORDER BY first_sequence_number ASC",
1395 )
1396 .bind(last_processed_sequence_number as i64)
1397 .bind(replay_until_sequence_number as i64)
1398 .fetch_all(pool)
1399 .await
1400 .map_err(PostgresStore::backend_failure)?;
1401
1402 let mut replay_batches = Vec::new();
1403
1404 for batch_row in batch_rows {
1405 let first_sequence_number = batch_row.get::<i64, _>("first_sequence_number") as u64;
1406 let last_sequence_number = batch_row.get::<i64, _>("last_sequence_number") as u64;
1407 let event_rows = sqlx::query(
1408 "SELECT sequence_number, occurred_at, event_type, payload
1409 FROM events
1410 WHERE sequence_number >= $1 AND sequence_number <= $2
1411 ORDER BY sequence_number ASC",
1412 )
1413 .bind(first_sequence_number as i64)
1414 .bind(last_sequence_number as i64)
1415 .fetch_all(pool)
1416 .await
1417 .map_err(PostgresStore::backend_failure)?;
1418
1419 let delivered_batch = event_rows
1420 .into_iter()
1421 .map(event_record_from_row)
1422 .collect::<Result<Vec<_>, _>>()
1423 .map_err(PostgresStore::backend_failure)?
1424 .into_iter()
1425 .filter(|event_record| matches_query(event_query, event_record))
1426 .collect::<Vec<_>>();
1427
1428 replay_batches.push(ReplayBatch {
1429 last_processed_sequence_number: last_sequence_number,
1430 delivered_batch,
1431 });
1432 }
1433
1434 Ok(replay_batches)
1435}
1436
1437fn normalized_durable_event_query(event_query: &EventQuery) -> EventQuery {
1438 EventQuery {
1439 filters: event_query.filters.clone(),
1440 min_sequence_number: None,
1441 }
1442}
1443
1444fn serialize_event_query(event_query: &EventQuery) -> Result<String, EventStoreError> {
1445 let filters = event_query.filters.as_ref().map(|filters| {
1446 filters
1447 .iter()
1448 .map(|filter| {
1449 serde_json::json!({
1450 "event_types": filter.event_types,
1451 "payload_predicates": filter.payload_predicates,
1452 })
1453 })
1454 .collect::<Vec<_>>()
1455 });
1456
1457 serde_json::to_string(&serde_json::json!({
1458 "filters": filters,
1459 "min_sequence_number": event_query.min_sequence_number,
1460 }))
1461 .map_err(json_backend_failure)
1462}