Skip to main content

factstr_postgres/
postgres_store.rs

1use std::future::Future;
2use std::io;
3use std::sync::{
4    Arc, Mutex,
5    mpsc::{self, Receiver, Sender},
6};
7use std::thread::{self, JoinHandle};
8
9use factstr::{
10    AppendResult, DurableStream, EventQuery, EventRecord, EventStore, EventStoreError, EventStream,
11    HandleStream, NewEvent, QueryResult,
12};
13use sqlx::{
14    PgPool, Postgres, QueryBuilder, Row, Transaction,
15    postgres::{PgPoolOptions, PgRow},
16};
17use time::OffsetDateTime;
18use tokio::runtime::Builder;
19
20use crate::query_match::matches_query;
21use crate::query_sql::push_query_conditions;
22use crate::stream_registry::{DeliveryOutcome, PendingDelivery, SubscriptionRegistry};
23
24#[derive(Clone, Debug)]
25struct CommittedAppend {
26    append_result: AppendResult,
27    event_records: Vec<EventRecord>,
28}
29
30#[derive(Clone, Debug)]
31struct ReplayBatch {
32    last_processed_sequence_number: u64,
33    delivered_batch: Vec<EventRecord>,
34}
35
36#[derive(Clone, Debug)]
37struct DurableReplayState {
38    subscription_id: u64,
39    last_processed_sequence_number: u64,
40    replay_until_sequence_number: u64,
41}
42
43enum WorkerCommand {
44    Query {
45        event_query: EventQuery,
46        reply: Sender<Result<QueryResult, EventStoreError>>,
47    },
48    Append {
49        new_events: Vec<NewEvent>,
50        reply: Sender<Result<AppendResult, EventStoreError>>,
51    },
52    AppendIf {
53        new_events: Vec<NewEvent>,
54        context_query: EventQuery,
55        expected_context_version: Option<u64>,
56        reply: Sender<Result<AppendResult, EventStoreError>>,
57    },
58    Shutdown,
59}
60
61enum DeliveryCommand {
62    Deliver(Vec<PendingDelivery>),
63    Shutdown,
64}
65
66pub struct PostgresStore {
67    connection_string: String,
68    subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
69    worker_sender: Mutex<Sender<WorkerCommand>>,
70    worker_thread: Mutex<Option<JoinHandle<()>>>,
71    delivery_sender: Sender<DeliveryCommand>,
72    delivery_thread: Mutex<Option<JoinHandle<()>>>,
73}
74
75impl PostgresStore {
76    pub fn connect(connection_string: &str) -> Result<Self, sqlx::Error> {
77        let connection_string = connection_string.to_owned();
78        bootstrap_connection(&connection_string)?;
79
80        let subscription_registry = Arc::new(Mutex::new(SubscriptionRegistry::default()));
81        let (delivery_sender, delivery_receiver) = mpsc::channel();
82        let delivery_thread = thread::Builder::new()
83            .name("factstr-postgres-delivery".to_owned())
84            .spawn({
85                let connection_string = connection_string.clone();
86                let subscription_registry = Arc::clone(&subscription_registry);
87                move || {
88                    run_delivery_thread(connection_string, subscription_registry, delivery_receiver)
89                }
90            })
91            .map_err(sqlx_io_error)?;
92
93        let (worker_sender, worker_receiver) = mpsc::channel();
94        let (ready_sender, ready_receiver) = mpsc::sync_channel(1);
95        let worker_thread = thread::Builder::new()
96            .name("factstr-postgres-worker".to_owned())
97            .spawn({
98                let connection_string = connection_string.clone();
99                let subscription_registry = Arc::clone(&subscription_registry);
100                let delivery_sender = delivery_sender.clone();
101                move || {
102                    run_worker_thread(
103                        connection_string,
104                        worker_receiver,
105                        ready_sender,
106                        subscription_registry,
107                        delivery_sender,
108                    )
109                }
110            })
111            .map_err(sqlx_io_error)?;
112
113        match ready_receiver.recv() {
114            Ok(Ok(())) => {}
115            Ok(Err(error)) => {
116                let _ = worker_thread.join();
117                let _ = delivery_sender.send(DeliveryCommand::Shutdown);
118                let _ = delivery_thread.join();
119                return Err(error);
120            }
121            Err(error) => {
122                let _ = worker_thread.join();
123                let _ = delivery_sender.send(DeliveryCommand::Shutdown);
124                let _ = delivery_thread.join();
125                return Err(sqlx_io_error(io::Error::other(format!(
126                    "postgres worker startup channel failed: {error}"
127                ))));
128            }
129        }
130
131        Ok(Self {
132            connection_string,
133            subscription_registry,
134            worker_sender: Mutex::new(worker_sender),
135            worker_thread: Mutex::new(Some(worker_thread)),
136            delivery_sender,
137            delivery_thread: Mutex::new(Some(delivery_thread)),
138        })
139    }
140
141    fn backend_failure(error: sqlx::Error) -> EventStoreError {
142        EventStoreError::BackendFailure {
143            message: error.to_string(),
144        }
145    }
146
147    fn worker_failure(message: impl Into<String>) -> EventStoreError {
148        EventStoreError::BackendFailure {
149            message: message.into(),
150        }
151    }
152
153    fn send_command(&self, worker_command: WorkerCommand) -> Result<(), EventStoreError> {
154        let worker_sender = self
155            .worker_sender
156            .lock()
157            .map_err(|_| Self::worker_failure("postgres worker sender lock poisoned"))?;
158
159        worker_sender
160            .send(worker_command)
161            .map_err(|error| Self::worker_failure(format!("postgres worker stopped: {error}")))
162    }
163
164    fn run_query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
165        let (reply_sender, reply_receiver) = mpsc::channel();
166        self.send_command(WorkerCommand::Query {
167            event_query: event_query.clone(),
168            reply: reply_sender,
169        })?;
170
171        reply_receiver.recv().map_err(|error| {
172            Self::worker_failure(format!("postgres worker query reply failed: {error}"))
173        })?
174    }
175
176    fn run_append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
177        let (reply_sender, reply_receiver) = mpsc::channel();
178        self.send_command(WorkerCommand::Append {
179            new_events,
180            reply: reply_sender,
181        })?;
182
183        reply_receiver.recv().map_err(|error| {
184            Self::worker_failure(format!("postgres worker append reply failed: {error}"))
185        })?
186    }
187
188    fn run_append_if(
189        &self,
190        new_events: Vec<NewEvent>,
191        context_query: &EventQuery,
192        expected_context_version: Option<u64>,
193    ) -> Result<AppendResult, EventStoreError> {
194        let (reply_sender, reply_receiver) = mpsc::channel();
195        self.send_command(WorkerCommand::AppendIf {
196            new_events,
197            context_query: context_query.clone(),
198            expected_context_version,
199            reply: reply_sender,
200        })?;
201
202        reply_receiver.recv().map_err(|error| {
203            Self::worker_failure(format!(
204                "postgres worker conditional append reply failed: {error}"
205            ))
206        })?
207    }
208
209    fn run_async<T, Fut, F>(&self, operation: &'static str, work: F) -> Result<T, EventStoreError>
210    where
211        T: Send + 'static,
212        Fut: Future<Output = Result<T, EventStoreError>> + Send + 'static,
213        F: FnOnce(PgPool) -> Fut + Send + 'static,
214    {
215        let connection_string = self.connection_string.clone();
216        let (result_sender, result_receiver) = mpsc::sync_channel(1);
217
218        let worker_thread = thread::Builder::new()
219            .name(format!("factstr-postgres-{operation}"))
220            .spawn(move || {
221                let runtime = Builder::new_current_thread()
222                    .enable_all()
223                    .build()
224                    .map_err(sqlx_io_error)
225                    .map_err(Self::backend_failure);
226
227                let result = match runtime {
228                    Ok(runtime) => runtime.block_on(async {
229                        let pool = PgPoolOptions::new()
230                            .max_connections(1)
231                            .connect(&connection_string)
232                            .await
233                            .map_err(Self::backend_failure)?;
234                        work(pool).await
235                    }),
236                    Err(error) => Err(error),
237                };
238
239                let _ = result_sender.send(result);
240            })
241            .map_err(sqlx_io_error)
242            .map_err(Self::backend_failure)?;
243
244        let result = result_receiver
245            .recv()
246            .map_err(|error| EventStoreError::BackendFailure {
247                message: format!("postgres {operation} thread did not return a result: {error}"),
248            })?;
249
250        worker_thread
251            .join()
252            .map_err(|_| EventStoreError::BackendFailure {
253                message: format!("postgres {operation} thread panicked before completion"),
254            })?;
255
256        result
257    }
258
259    fn enqueue_delivery(&self, pending_deliveries: Vec<PendingDelivery>) {
260        Self::enqueue_delivery_with_sender(&self.delivery_sender, pending_deliveries);
261    }
262
263    fn enqueue_delivery_with_sender(
264        delivery_sender: &Sender<DeliveryCommand>,
265        pending_deliveries: Vec<PendingDelivery>,
266    ) {
267        if pending_deliveries.is_empty() {
268            return;
269        }
270
271        if let Err(error) = delivery_sender.send(DeliveryCommand::Deliver(pending_deliveries)) {
272            eprintln!(
273                "factstr-postgres delivery dispatcher stopped after commit: {}",
274                error
275            );
276        }
277    }
278
279    fn register_all_durable_stream(
280        &self,
281        durable_stream_id: impl Into<String>,
282        handle: HandleStream,
283    ) -> Result<EventStream, EventStoreError> {
284        self.stream_durable(durable_stream_id.into(), EventQuery::all(), handle)
285    }
286
287    fn register_durable_stream(
288        &self,
289        durable_stream_id: impl Into<String>,
290        event_query: &EventQuery,
291        handle: HandleStream,
292    ) -> Result<EventStream, EventStoreError> {
293        self.stream_durable(durable_stream_id.into(), event_query.clone(), handle)
294    }
295
296    fn stream_durable(
297        &self,
298        durable_stream_id: String,
299        event_query: EventQuery,
300        handle: HandleStream,
301    ) -> Result<EventStream, EventStoreError> {
302        let normalized_event_query = normalized_durable_event_query(&event_query);
303        let event_query_json = serialize_event_query(&normalized_event_query)?;
304        let subscription_registry = Arc::clone(&self.subscription_registry);
305        let durable_stream_id_for_registry = durable_stream_id.clone();
306        let normalized_event_query_for_registry = normalized_event_query.clone();
307        let handle_for_registry = handle.clone();
308        let replay_stream_id = durable_stream_id.clone();
309
310        let replay_state = self.run_async("stream_durable", move |pool| async move {
311            let mut transaction = pool.begin().await.map_err(Self::backend_failure)?;
312
313            let last_processed_sequence_number = load_or_create_durable_stream_cursor(
314                &mut transaction,
315                &durable_stream_id_for_registry,
316                &event_query_json,
317            )
318            .await?;
319            let replay_until_sequence_number =
320                current_max_sequence_number_in_transaction(&mut transaction).await?;
321
322            let subscription_id = match subscription_registry.lock() {
323                Ok(mut subscription_registry) => {
324                    if normalized_event_query_for_registry.filters.is_none() {
325                        subscription_registry
326                            .subscribe_all_durable(
327                                durable_stream_id_for_registry.clone(),
328                                replay_until_sequence_number,
329                                handle_for_registry.clone(),
330                            )
331                            .map_err(subscription_registry_backend_failure)?
332                    } else {
333                        subscription_registry
334                            .subscribe_to_durable(
335                                durable_stream_id_for_registry.clone(),
336                                Some(normalized_event_query_for_registry.clone()),
337                                replay_until_sequence_number,
338                                handle_for_registry.clone(),
339                            )
340                            .map_err(subscription_registry_backend_failure)?
341                    }
342                }
343                Err(poisoned) => {
344                    let mut subscription_registry = poisoned.into_inner();
345                    if normalized_event_query_for_registry.filters.is_none() {
346                        subscription_registry
347                            .subscribe_all_durable(
348                                durable_stream_id_for_registry.clone(),
349                                replay_until_sequence_number,
350                                handle_for_registry.clone(),
351                            )
352                            .map_err(subscription_registry_backend_failure)?
353                    } else {
354                        subscription_registry
355                            .subscribe_to_durable(
356                                durable_stream_id_for_registry.clone(),
357                                Some(normalized_event_query_for_registry.clone()),
358                                replay_until_sequence_number,
359                                handle_for_registry.clone(),
360                            )
361                            .map_err(subscription_registry_backend_failure)?
362                    }
363                }
364            };
365
366            if let Err(error) = transaction.commit().await {
367                match subscription_registry.lock() {
368                    Ok(mut subscription_registry) => {
369                        subscription_registry.unsubscribe(subscription_id)
370                    }
371                    Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
372                }
373                return Err(Self::backend_failure(error));
374            }
375
376            Ok(DurableReplayState {
377                subscription_id,
378                last_processed_sequence_number,
379                replay_until_sequence_number,
380            })
381        })?;
382
383        let subscription = self.build_subscription_handle(replay_state.subscription_id);
384
385        let replay_batches = match self.run_async("durable_replay", {
386            let event_query = normalized_event_query.clone();
387            move |pool| async move {
388                ensure_replay_history_is_available_for_pool(
389                    &pool,
390                    replay_state.replay_until_sequence_number,
391                )
392                .await?;
393                load_replay_batches(
394                    &pool,
395                    &event_query,
396                    replay_state.last_processed_sequence_number,
397                    replay_state.replay_until_sequence_number,
398                )
399                .await
400            }
401        }) {
402            Ok(replay_batches) => replay_batches,
403            Err(error) => {
404                self.cleanup_durable_subscription(replay_state.subscription_id);
405                return Err(error);
406            }
407        };
408
409        for replay_batch in replay_batches {
410            let pending_delivery = PendingDelivery {
411                subscription_id: replay_state.subscription_id,
412                durable_stream_id: Some(replay_stream_id.clone()),
413                last_processed_sequence_number: replay_batch.last_processed_sequence_number,
414                delivered_batch: replay_batch.delivered_batch,
415                handle: handle.clone(),
416            };
417
418            match self.process_durable_delivery(pending_delivery) {
419                Ok(true) => {}
420                Ok(false) => {
421                    self.cleanup_durable_subscription(replay_state.subscription_id);
422                    return Err(EventStoreError::BackendFailure {
423                        message: format!(
424                            "durable replay for stream {} did not complete successfully",
425                            replay_stream_id
426                        ),
427                    });
428                }
429                Err(error) => {
430                    self.cleanup_durable_subscription(replay_state.subscription_id);
431                    return Err(error);
432                }
433            }
434        }
435
436        self.finish_durable_replay(replay_state.subscription_id);
437        Ok(subscription)
438    }
439
440    fn build_subscription_handle(&self, subscription_id: u64) -> EventStream {
441        let subscription_registry = Arc::clone(&self.subscription_registry);
442
443        EventStream::new(
444            subscription_id,
445            Arc::new(move |subscription_id| match subscription_registry.lock() {
446                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
447                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
448            }),
449        )
450    }
451
452    fn process_durable_delivery(
453        &self,
454        pending_delivery: PendingDelivery,
455    ) -> Result<bool, EventStoreError> {
456        match pending_delivery.deliver() {
457            DeliveryOutcome::Succeeded {
458                durable_stream_id,
459                last_processed_sequence_number,
460                ..
461            } => {
462                if let Some(durable_stream_id) = durable_stream_id {
463                    self.run_async("advance_durable_cursor", move |pool| async move {
464                        update_durable_stream_cursor(
465                            &pool,
466                            &durable_stream_id,
467                            last_processed_sequence_number,
468                        )
469                        .await
470                    })?;
471                }
472
473                Ok(true)
474            }
475            DeliveryOutcome::Failed { .. } | DeliveryOutcome::Panicked { .. } => Ok(false),
476        }
477    }
478
479    fn finish_durable_replay(&self, subscription_id: u64) {
480        match self.subscription_registry.lock() {
481            Ok(mut subscription_registry) => {
482                let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
483                self.enqueue_delivery(buffered_deliveries);
484            }
485            Err(poisoned) => {
486                let mut subscription_registry = poisoned.into_inner();
487                let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
488                self.enqueue_delivery(buffered_deliveries);
489            }
490        }
491    }
492
493    fn cleanup_durable_subscription(&self, subscription_id: u64) {
494        match self.subscription_registry.lock() {
495            Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
496            Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
497        }
498    }
499}
500
501impl Drop for PostgresStore {
502    fn drop(&mut self) {
503        if let Ok(worker_sender) = self.worker_sender.lock() {
504            let _ = worker_sender.send(WorkerCommand::Shutdown);
505        }
506
507        if let Ok(mut worker_thread) = self.worker_thread.lock() {
508            if let Some(worker_thread) = worker_thread.take() {
509                let _ = worker_thread.join();
510            }
511        }
512
513        let _ = self.delivery_sender.send(DeliveryCommand::Shutdown);
514
515        if let Ok(mut delivery_thread) = self.delivery_thread.lock() {
516            if let Some(delivery_thread) = delivery_thread.take() {
517                let _ = delivery_thread.join();
518            }
519        }
520    }
521}
522
523impl EventStore for PostgresStore {
524    fn query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
525        self.run_query(event_query)
526    }
527
528    fn append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
529        if new_events.is_empty() {
530            return Err(EventStoreError::EmptyAppend);
531        }
532
533        self.run_append(new_events)
534    }
535
536    fn append_if(
537        &self,
538        new_events: Vec<NewEvent>,
539        context_query: &EventQuery,
540        expected_context_version: Option<u64>,
541    ) -> Result<AppendResult, EventStoreError> {
542        if new_events.is_empty() {
543            return Err(EventStoreError::EmptyAppend);
544        }
545
546        self.run_append_if(new_events, context_query, expected_context_version)
547    }
548
549    fn stream_all(&self, handle: HandleStream) -> Result<EventStream, EventStoreError> {
550        let subscription_registry = Arc::clone(&self.subscription_registry);
551        let id = match subscription_registry.lock() {
552            Ok(mut subscription_registry) => subscription_registry.subscribe_all(handle),
553            Err(poisoned) => poisoned.into_inner().subscribe_all(handle),
554        };
555
556        Ok(EventStream::new(
557            id,
558            Arc::new(move |subscription_id| match subscription_registry.lock() {
559                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
560                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
561            }),
562        ))
563    }
564
565    fn stream_to(
566        &self,
567        event_query: &EventQuery,
568        handle: HandleStream,
569    ) -> Result<EventStream, EventStoreError> {
570        let subscription_registry = Arc::clone(&self.subscription_registry);
571        let id = match subscription_registry.lock() {
572            Ok(mut subscription_registry) => {
573                subscription_registry.subscribe_to(Some(event_query.clone()), handle)
574            }
575            Err(poisoned) => poisoned
576                .into_inner()
577                .subscribe_to(Some(event_query.clone()), handle),
578        };
579
580        Ok(EventStream::new(
581            id,
582            Arc::new(move |subscription_id| match subscription_registry.lock() {
583                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
584                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
585            }),
586        ))
587    }
588
589    fn stream_all_durable(
590        &self,
591        durable_stream: &DurableStream,
592        handle: HandleStream,
593    ) -> Result<EventStream, EventStoreError> {
594        self.register_all_durable_stream(durable_stream.name(), handle)
595    }
596
597    fn stream_to_durable(
598        &self,
599        durable_stream: &DurableStream,
600        event_query: &EventQuery,
601        handle: HandleStream,
602    ) -> Result<EventStream, EventStoreError> {
603        self.register_durable_stream(durable_stream.name(), event_query, handle)
604    }
605}
606
607fn run_worker_thread(
608    connection_string: String,
609    worker_receiver: Receiver<WorkerCommand>,
610    ready_sender: mpsc::SyncSender<Result<(), sqlx::Error>>,
611    subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
612    delivery_sender: Sender<DeliveryCommand>,
613) {
614    let runtime = match Builder::new_current_thread().enable_all().build() {
615        Ok(runtime) => runtime,
616        Err(error) => {
617            let _ = ready_sender.send(Err(sqlx_io_error(io::Error::other(format!(
618                "tokio runtime should build: {error}"
619            )))));
620            return;
621        }
622    };
623
624    let pool = match runtime.block_on(async {
625        PgPoolOptions::new()
626            .max_connections(1)
627            .connect(&connection_string)
628            .await
629    }) {
630        Ok(pool) => pool,
631        Err(error) => {
632            let _ = ready_sender.send(Err(error));
633            return;
634        }
635    };
636
637    if ready_sender.send(Ok(())).is_err() {
638        return;
639    }
640
641    while let Ok(worker_command) = worker_receiver.recv() {
642        match worker_command {
643            WorkerCommand::Query { event_query, reply } => {
644                let result = runtime
645                    .block_on(query_with_pool(&pool, &event_query))
646                    .map_err(PostgresStore::backend_failure);
647                let _ = reply.send(result);
648            }
649            WorkerCommand::Append { new_events, reply } => {
650                let result = runtime
651                    .block_on(append_with_pool(&pool, new_events))
652                    .map_err(PostgresStore::backend_failure);
653                if let Ok(committed_append) = &result {
654                    let pending_deliveries = match subscription_registry.lock() {
655                        Ok(mut subscription_registry) => subscription_registry
656                            .pending_deliveries(&committed_append.event_records),
657                        Err(poisoned) => poisoned
658                            .into_inner()
659                            .pending_deliveries(&committed_append.event_records),
660                    };
661                    PostgresStore::enqueue_delivery_with_sender(
662                        &delivery_sender,
663                        pending_deliveries,
664                    );
665                }
666                let result = result.map(|committed_append| committed_append.append_result);
667                let _ = reply.send(result);
668            }
669            WorkerCommand::AppendIf {
670                new_events,
671                context_query,
672                expected_context_version,
673                reply,
674            } => {
675                let result = runtime
676                    .block_on(append_if_with_pool(
677                        &pool,
678                        new_events,
679                        &context_query,
680                        expected_context_version,
681                    ))
682                    .map_err(PostgresStore::backend_failure)
683                    .and_then(|result| result);
684                if let Ok(committed_append) = &result {
685                    let pending_deliveries = match subscription_registry.lock() {
686                        Ok(mut subscription_registry) => subscription_registry
687                            .pending_deliveries(&committed_append.event_records),
688                        Err(poisoned) => poisoned
689                            .into_inner()
690                            .pending_deliveries(&committed_append.event_records),
691                    };
692                    PostgresStore::enqueue_delivery_with_sender(
693                        &delivery_sender,
694                        pending_deliveries,
695                    );
696                }
697                let result = result.map(|committed_append| committed_append.append_result);
698                let _ = reply.send(result);
699            }
700            WorkerCommand::Shutdown => break,
701        }
702    }
703}
704
705fn run_delivery_thread(
706    connection_string: String,
707    subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
708    delivery_receiver: Receiver<DeliveryCommand>,
709) {
710    let runtime = match Builder::new_current_thread().enable_all().build() {
711        Ok(runtime) => runtime,
712        Err(error) => {
713            eprintln!(
714                "factstr-postgres delivery runtime could not start: {}",
715                error
716            );
717            return;
718        }
719    };
720
721    let pool = match runtime.block_on(async {
722        PgPoolOptions::new()
723            .max_connections(1)
724            .connect(&connection_string)
725            .await
726    }) {
727        Ok(pool) => pool,
728        Err(error) => {
729            eprintln!(
730                "factstr-postgres delivery pool could not connect: {}",
731                error
732            );
733            return;
734        }
735    };
736
737    while let Ok(delivery_command) = delivery_receiver.recv() {
738        match delivery_command {
739            DeliveryCommand::Deliver(pending_deliveries) => {
740                for pending_delivery in pending_deliveries {
741                    match pending_delivery.deliver() {
742                        DeliveryOutcome::Succeeded {
743                            subscription_id,
744                            durable_stream_id,
745                            last_processed_sequence_number,
746                        } => {
747                            if let Some(durable_stream_id) = durable_stream_id {
748                                if let Err(error) = runtime.block_on(update_durable_stream_cursor(
749                                    &pool,
750                                    &durable_stream_id,
751                                    last_processed_sequence_number,
752                                )) {
753                                    eprintln!(
754                                        "factstr-postgres durable cursor update failed after delivery for stream {}: {}",
755                                        durable_stream_id, error
756                                    );
757                                    match subscription_registry.lock() {
758                                        Ok(mut subscription_registry) => {
759                                            subscription_registry.unsubscribe(subscription_id)
760                                        }
761                                        Err(poisoned) => {
762                                            poisoned.into_inner().unsubscribe(subscription_id)
763                                        }
764                                    }
765                                }
766                            }
767                        }
768                        DeliveryOutcome::Failed {
769                            subscription_id,
770                            durable_stream_id,
771                        }
772                        | DeliveryOutcome::Panicked {
773                            subscription_id,
774                            durable_stream_id,
775                        } => {
776                            if durable_stream_id.is_some() {
777                                match subscription_registry.lock() {
778                                    Ok(mut subscription_registry) => {
779                                        subscription_registry.unsubscribe(subscription_id)
780                                    }
781                                    Err(poisoned) => {
782                                        poisoned.into_inner().unsubscribe(subscription_id)
783                                    }
784                                }
785                            }
786                        }
787                    }
788                }
789            }
790            DeliveryCommand::Shutdown => break,
791        }
792    }
793}
794
795fn sqlx_io_error(error: io::Error) -> sqlx::Error {
796    sqlx::Error::Io(error)
797}
798
799fn bootstrap_connection(connection_string: &str) -> Result<(), sqlx::Error> {
800    let connection_string = connection_string.to_owned();
801    let (result_sender, result_receiver) = mpsc::sync_channel(1);
802
803    let bootstrap_thread = thread::Builder::new()
804        .name("factstr-postgres-bootstrap".to_owned())
805        .spawn(move || {
806            let runtime = Builder::new_current_thread()
807                .enable_all()
808                .build()
809                .map_err(sqlx_io_error);
810
811            let result = match runtime {
812                Ok(runtime) => runtime.block_on(async {
813                    let pool = PgPoolOptions::new()
814                        .max_connections(1)
815                        .connect(&connection_string)
816                        .await?;
817                    initialize_schema(&pool).await?;
818                    Ok::<_, sqlx::Error>(())
819                }),
820                Err(error) => Err(error),
821            };
822
823            let _ = result_sender.send(result);
824        })
825        .map_err(sqlx_io_error)?;
826
827    let result = match result_receiver.recv() {
828        Ok(result) => result,
829        Err(error) => Err(sqlx_io_error(io::Error::other(format!(
830            "postgres bootstrap thread did not return a result: {error}"
831        )))),
832    };
833
834    bootstrap_thread.join().map_err(|_| {
835        sqlx_io_error(io::Error::other(
836            "postgres bootstrap thread panicked before connect completed",
837        ))
838    })?;
839
840    result
841}
842
843async fn query_with_pool(
844    pool: &PgPool,
845    event_query: &EventQuery,
846) -> Result<QueryResult, sqlx::Error> {
847    let current_context_version = current_context_version(pool, event_query).await?;
848
849    let mut query_builder: QueryBuilder<'_, Postgres> =
850        QueryBuilder::new("SELECT sequence_number, occurred_at, event_type, payload FROM events");
851    push_query_conditions(&mut query_builder, event_query, true);
852    query_builder.push(" ORDER BY sequence_number ASC");
853
854    let event_records = query_builder
855        .build()
856        .fetch_all(pool)
857        .await?
858        .into_iter()
859        .map(event_record_from_row)
860        .collect::<Vec<_>>();
861
862    let last_returned_sequence_number = event_records
863        .last()
864        .map(|event_record| event_record.sequence_number);
865
866    Ok(QueryResult {
867        event_records,
868        last_returned_sequence_number,
869        current_context_version,
870    })
871}
872
873async fn append_with_pool(
874    pool: &PgPool,
875    new_events: Vec<NewEvent>,
876) -> Result<CommittedAppend, sqlx::Error> {
877    let mut transaction = pool.begin().await?;
878    lock_events_tables(&mut transaction).await?;
879    let committed_append = append_records(&mut transaction, new_events).await?;
880    transaction.commit().await?;
881    Ok(committed_append)
882}
883
884async fn append_if_with_pool(
885    pool: &PgPool,
886    new_events: Vec<NewEvent>,
887    context_query: &EventQuery,
888    expected_context_version: Option<u64>,
889) -> Result<Result<CommittedAppend, EventStoreError>, sqlx::Error> {
890    let mut transaction = pool.begin().await?;
891    lock_events_tables(&mut transaction).await?;
892
893    let actual_context_version =
894        current_context_version_in_transaction(&mut transaction, context_query).await?;
895
896    if actual_context_version != expected_context_version {
897        transaction.rollback().await?;
898        return Ok(Err(EventStoreError::ConditionalAppendConflict {
899            expected: expected_context_version,
900            actual: actual_context_version,
901        }));
902    }
903
904    let committed_append = append_records(&mut transaction, new_events).await?;
905    transaction.commit().await?;
906
907    Ok(Ok(committed_append))
908}
909
910async fn initialize_schema(pool: &PgPool) -> Result<(), sqlx::Error> {
911    sqlx::query(
912        "CREATE TABLE IF NOT EXISTS events (
913            sequence_number BIGINT PRIMARY KEY,
914            occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
915            event_type TEXT NOT NULL,
916            payload JSONB NOT NULL
917        )",
918    )
919    .execute(pool)
920    .await?;
921
922    sqlx::query(
923        "CREATE TABLE IF NOT EXISTS append_batches (
924            first_sequence_number BIGINT PRIMARY KEY,
925            last_sequence_number BIGINT NOT NULL
926        )",
927    )
928    .execute(pool)
929    .await?;
930
931    sqlx::query(
932        "CREATE TABLE IF NOT EXISTS durable_stream_cursors (
933            durable_stream_id TEXT PRIMARY KEY,
934            event_query TEXT NOT NULL,
935            last_processed_sequence_number BIGINT NOT NULL
936        )",
937    )
938    .execute(pool)
939    .await?;
940
941    sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type)")
942        .execute(pool)
943        .await?;
944
945    sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_occurred_at ON events(occurred_at)")
946        .execute(pool)
947        .await?;
948
949    sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_payload_gin ON events USING gin(payload)")
950        .execute(pool)
951        .await?;
952
953    Ok(())
954}
955
956async fn lock_events_tables(
957    transaction: &mut Transaction<'_, Postgres>,
958) -> Result<(), sqlx::Error> {
959    sqlx::query("LOCK TABLE events, append_batches IN EXCLUSIVE MODE")
960        .execute(transaction.as_mut())
961        .await?;
962
963    Ok(())
964}
965
966async fn append_records(
967    transaction: &mut Transaction<'_, Postgres>,
968    new_events: Vec<NewEvent>,
969) -> Result<CommittedAppend, sqlx::Error> {
970    let committed_count = new_events.len() as u64;
971    let first_sequence_number =
972        sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(sequence_number), 0) + 1 FROM events")
973            .fetch_one(transaction.as_mut())
974            .await? as u64;
975    let last_sequence_number = first_sequence_number + committed_count - 1;
976    let committed_event_records = new_events
977        .into_iter()
978        .enumerate()
979        .map(|(offset, new_event)| EventRecord {
980            sequence_number: first_sequence_number + offset as u64,
981            occurred_at: OffsetDateTime::now_utc(),
982            event_type: new_event.event_type,
983            payload: new_event.payload,
984        })
985        .collect::<Vec<_>>();
986
987    for event_record in &committed_event_records {
988        sqlx::query(
989            "INSERT INTO events (sequence_number, occurred_at, event_type, payload)
990             VALUES ($1, $2, $3, $4)",
991        )
992        .bind(event_record.sequence_number as i64)
993        .bind(event_record.occurred_at)
994        .bind(&event_record.event_type)
995        .bind(&event_record.payload)
996        .execute(transaction.as_mut())
997        .await?;
998    }
999
1000    sqlx::query(
1001        "INSERT INTO append_batches (first_sequence_number, last_sequence_number)
1002         VALUES ($1, $2)",
1003    )
1004    .bind(first_sequence_number as i64)
1005    .bind(last_sequence_number as i64)
1006    .execute(transaction.as_mut())
1007    .await?;
1008
1009    Ok(CommittedAppend {
1010        append_result: AppendResult {
1011            first_sequence_number,
1012            last_sequence_number,
1013            committed_count,
1014        },
1015        event_records: committed_event_records,
1016    })
1017}
1018
1019async fn current_context_version(
1020    pool: &PgPool,
1021    event_query: &EventQuery,
1022) -> Result<Option<u64>, sqlx::Error> {
1023    let mut query_builder: QueryBuilder<'_, Postgres> =
1024        QueryBuilder::new("SELECT MAX(sequence_number) FROM events");
1025    push_query_conditions(&mut query_builder, event_query, false);
1026
1027    Ok(query_builder
1028        .build_query_scalar::<Option<i64>>()
1029        .fetch_one(pool)
1030        .await?
1031        .map(|sequence_number| sequence_number as u64))
1032}
1033
1034async fn current_context_version_in_transaction(
1035    transaction: &mut Transaction<'_, Postgres>,
1036    event_query: &EventQuery,
1037) -> Result<Option<u64>, sqlx::Error> {
1038    let mut query_builder: QueryBuilder<'_, Postgres> =
1039        QueryBuilder::new("SELECT MAX(sequence_number) FROM events");
1040    push_query_conditions(&mut query_builder, event_query, false);
1041
1042    Ok(query_builder
1043        .build_query_scalar::<Option<i64>>()
1044        .fetch_one(transaction.as_mut())
1045        .await?
1046        .map(|sequence_number| sequence_number as u64))
1047}
1048
1049fn event_record_from_row(row: PgRow) -> EventRecord {
1050    EventRecord {
1051        sequence_number: row.get::<i64, _>("sequence_number") as u64,
1052        occurred_at: row.get("occurred_at"),
1053        event_type: row.get("event_type"),
1054        payload: row.get("payload"),
1055    }
1056}
1057
1058fn json_backend_failure(error: serde_json::Error) -> EventStoreError {
1059    EventStoreError::BackendFailure {
1060        message: error.to_string(),
1061    }
1062}
1063
1064fn subscription_registry_backend_failure(message: String) -> EventStoreError {
1065    EventStoreError::BackendFailure { message }
1066}
1067
1068async fn load_or_create_durable_stream_cursor(
1069    transaction: &mut Transaction<'_, Postgres>,
1070    durable_stream_id: &str,
1071    event_query_json: &str,
1072) -> Result<u64, EventStoreError> {
1073    let existing_cursor = sqlx::query(
1074        "SELECT event_query, last_processed_sequence_number
1075         FROM durable_stream_cursors
1076         WHERE durable_stream_id = $1",
1077    )
1078    .bind(durable_stream_id)
1079    .fetch_optional(transaction.as_mut())
1080    .await
1081    .map_err(PostgresStore::backend_failure)?;
1082
1083    match existing_cursor {
1084        Some(row) => {
1085            let stored_event_query = row.get::<String, _>("event_query");
1086            if stored_event_query != event_query_json {
1087                return Err(EventStoreError::BackendFailure {
1088                    message: format!(
1089                        "durable stream {durable_stream_id} was resumed with a different query"
1090                    ),
1091                });
1092            }
1093
1094            Ok(row.get::<i64, _>("last_processed_sequence_number") as u64)
1095        }
1096        None => {
1097            sqlx::query(
1098                "INSERT INTO durable_stream_cursors (
1099                    durable_stream_id,
1100                    event_query,
1101                    last_processed_sequence_number
1102                 ) VALUES ($1, $2, 0)",
1103            )
1104            .bind(durable_stream_id)
1105            .bind(event_query_json)
1106            .execute(transaction.as_mut())
1107            .await
1108            .map_err(PostgresStore::backend_failure)?;
1109
1110            Ok(0)
1111        }
1112    }
1113}
1114
1115async fn current_max_sequence_number_in_transaction(
1116    transaction: &mut Transaction<'_, Postgres>,
1117) -> Result<u64, EventStoreError> {
1118    Ok(
1119        sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(sequence_number) FROM events")
1120            .fetch_one(transaction.as_mut())
1121            .await
1122            .map_err(PostgresStore::backend_failure)?
1123            .unwrap_or(0) as u64,
1124    )
1125}
1126
1127async fn ensure_replay_history_is_available(
1128    connection: &mut sqlx::PgConnection,
1129    current_max_sequence_number: u64,
1130) -> Result<(), EventStoreError> {
1131    if current_max_sequence_number == 0 {
1132        return Ok(());
1133    }
1134
1135    let batch_rows = sqlx::query(
1136        "SELECT first_sequence_number, last_sequence_number
1137         FROM append_batches
1138         ORDER BY first_sequence_number ASC",
1139    )
1140    .fetch_all(&mut *connection)
1141    .await
1142    .map_err(PostgresStore::backend_failure)?;
1143
1144    if batch_rows.is_empty() {
1145        return Err(EventStoreError::BackendFailure {
1146            message: "durable replay requires append_batches history for all persisted events"
1147                .to_owned(),
1148        });
1149    }
1150
1151    let mut expected_first_sequence_number = 1_u64;
1152
1153    for batch_row in batch_rows {
1154        let first_sequence_number = batch_row.get::<i64, _>("first_sequence_number") as u64;
1155        let last_sequence_number = batch_row.get::<i64, _>("last_sequence_number") as u64;
1156
1157        if first_sequence_number != expected_first_sequence_number
1158            || last_sequence_number < first_sequence_number
1159        {
1160            return Err(EventStoreError::BackendFailure {
1161                message:
1162                    "durable replay requires contiguous append_batches history for all persisted events"
1163                        .to_owned(),
1164            });
1165        }
1166
1167        expected_first_sequence_number = last_sequence_number + 1;
1168    }
1169
1170    if expected_first_sequence_number - 1 != current_max_sequence_number {
1171        return Err(EventStoreError::BackendFailure {
1172            message: "durable replay requires append_batches history for all persisted events"
1173                .to_owned(),
1174        });
1175    }
1176
1177    Ok(())
1178}
1179
1180async fn ensure_replay_history_is_available_for_pool(
1181    pool: &PgPool,
1182    current_max_sequence_number: u64,
1183) -> Result<(), EventStoreError> {
1184    let mut connection = pool
1185        .acquire()
1186        .await
1187        .map_err(PostgresStore::backend_failure)?;
1188    ensure_replay_history_is_available(connection.as_mut(), current_max_sequence_number).await
1189}
1190
1191async fn update_durable_stream_cursor(
1192    pool: &PgPool,
1193    durable_stream_id: &str,
1194    last_processed_sequence_number: u64,
1195) -> Result<(), EventStoreError> {
1196    sqlx::query(
1197        "UPDATE durable_stream_cursors
1198         SET last_processed_sequence_number = $2
1199         WHERE durable_stream_id = $1",
1200    )
1201    .bind(durable_stream_id)
1202    .bind(last_processed_sequence_number as i64)
1203    .execute(pool)
1204    .await
1205    .map_err(PostgresStore::backend_failure)?;
1206
1207    Ok(())
1208}
1209
1210async fn load_replay_batches(
1211    pool: &PgPool,
1212    event_query: &EventQuery,
1213    last_processed_sequence_number: u64,
1214    replay_until_sequence_number: u64,
1215) -> Result<Vec<ReplayBatch>, EventStoreError> {
1216    if replay_until_sequence_number <= last_processed_sequence_number {
1217        return Ok(Vec::new());
1218    }
1219
1220    let batch_rows = sqlx::query(
1221        "SELECT first_sequence_number, last_sequence_number
1222         FROM append_batches
1223         WHERE last_sequence_number > $1
1224           AND first_sequence_number <= $2
1225         ORDER BY first_sequence_number ASC",
1226    )
1227    .bind(last_processed_sequence_number as i64)
1228    .bind(replay_until_sequence_number as i64)
1229    .fetch_all(pool)
1230    .await
1231    .map_err(PostgresStore::backend_failure)?;
1232
1233    let mut replay_batches = Vec::new();
1234
1235    for batch_row in batch_rows {
1236        let first_sequence_number = batch_row.get::<i64, _>("first_sequence_number") as u64;
1237        let last_sequence_number = batch_row.get::<i64, _>("last_sequence_number") as u64;
1238        let event_rows = sqlx::query(
1239            "SELECT sequence_number, event_type, payload
1240             FROM events
1241             WHERE sequence_number >= $1 AND sequence_number <= $2
1242             ORDER BY sequence_number ASC",
1243        )
1244        .bind(first_sequence_number as i64)
1245        .bind(last_sequence_number as i64)
1246        .fetch_all(pool)
1247        .await
1248        .map_err(PostgresStore::backend_failure)?;
1249
1250        let delivered_batch = event_rows
1251            .into_iter()
1252            .map(event_record_from_row)
1253            .filter(|event_record| matches_query(event_query, event_record))
1254            .collect::<Vec<_>>();
1255
1256        replay_batches.push(ReplayBatch {
1257            last_processed_sequence_number: last_sequence_number,
1258            delivered_batch,
1259        });
1260    }
1261
1262    Ok(replay_batches)
1263}
1264
1265fn normalized_durable_event_query(event_query: &EventQuery) -> EventQuery {
1266    EventQuery {
1267        filters: event_query.filters.clone(),
1268        min_sequence_number: None,
1269    }
1270}
1271
1272fn serialize_event_query(event_query: &EventQuery) -> Result<String, EventStoreError> {
1273    let filters = event_query.filters.as_ref().map(|filters| {
1274        filters
1275            .iter()
1276            .map(|filter| {
1277                serde_json::json!({
1278                    "event_types": filter.event_types,
1279                    "payload_predicates": filter.payload_predicates,
1280                })
1281            })
1282            .collect::<Vec<_>>()
1283    });
1284
1285    serde_json::to_string(&serde_json::json!({
1286        "filters": filters,
1287        "min_sequence_number": event_query.min_sequence_number,
1288    }))
1289    .map_err(json_backend_failure)
1290}