Skip to main content

factstr_postgres/
postgres_store.rs

1use std::future::Future;
2use std::io;
3use std::sync::{
4    Arc, Mutex,
5    mpsc::{self, Receiver, Sender},
6};
7use std::thread::{self, JoinHandle};
8
9use factstr::{
10    AppendResult, DurableStream, EventQuery, EventRecord, EventStore, EventStoreError, EventStream,
11    HandleStream, NewEvent, QueryResult,
12};
13use sqlx::{
14    PgPool, Postgres, QueryBuilder, Row, Transaction,
15    postgres::{PgPoolOptions, PgRow},
16};
17use time::OffsetDateTime;
18use tokio::runtime::Builder;
19use tokio::runtime::Runtime;
20use url::Url;
21
22use crate::query_match::matches_query;
23use crate::query_sql::push_query_conditions;
24use crate::stream_registry::{DeliveryOutcome, PendingDelivery, SubscriptionRegistry};
25
26const STORE_FORMAT_VERSION: &str = "1";
27const APPEND_BATCH_BOUNDARY_FORMAT_KEY: &str = "append_batch_boundary_format";
28const APPEND_BATCH_BOUNDARY_FORMAT_SPARSE_V1: &str = "sparse_v1";
29
30#[derive(Clone, Debug)]
31struct CommittedAppend {
32    append_result: AppendResult,
33    event_records: Vec<EventRecord>,
34}
35
36#[derive(Clone, Debug)]
37struct ReplayBatch {
38    last_processed_sequence_number: u64,
39    delivered_batch: Vec<EventRecord>,
40}
41
42#[derive(Clone, Debug)]
43struct AppendBatchBoundary {
44    first_sequence_number: u64,
45    last_sequence_number: u64,
46}
47
48#[derive(Clone, Debug)]
49struct DurableReplayState {
50    subscription_id: u64,
51    last_processed_sequence_number: u64,
52    replay_until_sequence_number: u64,
53}
54
55enum WorkerCommand {
56    Query {
57        event_query: EventQuery,
58        reply: Sender<Result<QueryResult, EventStoreError>>,
59    },
60    Append {
61        new_events: Vec<NewEvent>,
62        reply: Sender<Result<AppendResult, EventStoreError>>,
63    },
64    AppendIf {
65        new_events: Vec<NewEvent>,
66        context_query: EventQuery,
67        expected_context_version: Option<u64>,
68        reply: Sender<Result<AppendResult, EventStoreError>>,
69    },
70    Shutdown,
71}
72
73enum DeliveryCommand {
74    Deliver(Vec<PendingDelivery>),
75    Shutdown,
76}
77
78#[derive(Clone, Debug, Eq, PartialEq)]
79pub struct PostgresBootstrapOptions {
80    pub server_url: String,
81    pub database_name: String,
82}
83
84pub struct PostgresStore {
85    connection_string: String,
86    subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
87    worker_sender: Mutex<Sender<WorkerCommand>>,
88    worker_thread: Mutex<Option<JoinHandle<()>>>,
89    delivery_sender: Sender<DeliveryCommand>,
90    delivery_thread: Mutex<Option<JoinHandle<()>>>,
91}
92
93impl PostgresStore {
94    pub fn connect(connection_string: &str) -> Result<Self, sqlx::Error> {
95        let connection_string = connection_string.to_owned();
96        bootstrap_connection(&connection_string)?;
97
98        let subscription_registry = Arc::new(Mutex::new(SubscriptionRegistry::default()));
99        let (delivery_sender, delivery_receiver) = mpsc::channel();
100        let delivery_thread = thread::Builder::new()
101            .name("factstr-postgres-delivery".to_owned())
102            .spawn({
103                let connection_string = connection_string.clone();
104                let subscription_registry = Arc::clone(&subscription_registry);
105                move || {
106                    run_delivery_thread(connection_string, subscription_registry, delivery_receiver)
107                }
108            })
109            .map_err(sqlx_io_error)?;
110
111        let (worker_sender, worker_receiver) = mpsc::channel();
112        let (ready_sender, ready_receiver) = mpsc::sync_channel(1);
113        let worker_thread = thread::Builder::new()
114            .name("factstr-postgres-worker".to_owned())
115            .spawn({
116                let connection_string = connection_string.clone();
117                let subscription_registry = Arc::clone(&subscription_registry);
118                let delivery_sender = delivery_sender.clone();
119                move || {
120                    run_worker_thread(
121                        connection_string,
122                        worker_receiver,
123                        ready_sender,
124                        subscription_registry,
125                        delivery_sender,
126                    )
127                }
128            })
129            .map_err(sqlx_io_error)?;
130
131        match ready_receiver.recv() {
132            Ok(Ok(())) => {}
133            Ok(Err(error)) => {
134                let _ = worker_thread.join();
135                let _ = delivery_sender.send(DeliveryCommand::Shutdown);
136                let _ = delivery_thread.join();
137                return Err(error);
138            }
139            Err(error) => {
140                let _ = worker_thread.join();
141                let _ = delivery_sender.send(DeliveryCommand::Shutdown);
142                let _ = delivery_thread.join();
143                return Err(sqlx_io_error(io::Error::other(format!(
144                    "postgres worker startup channel failed: {error}"
145                ))));
146            }
147        }
148
149        Ok(Self {
150            connection_string,
151            subscription_registry,
152            worker_sender: Mutex::new(worker_sender),
153            worker_thread: Mutex::new(Some(worker_thread)),
154            delivery_sender,
155            delivery_thread: Mutex::new(Some(delivery_thread)),
156        })
157    }
158
159    pub fn bootstrap(options: PostgresBootstrapOptions) -> Result<Self, EventStoreError> {
160        bootstrap_database(&options)?;
161        let database_url = database_url_with_name(&options.server_url, &options.database_name)?;
162        Self::connect(&database_url).map_err(Self::backend_failure)
163    }
164
165    fn backend_failure(error: sqlx::Error) -> EventStoreError {
166        EventStoreError::BackendFailure {
167            message: error.to_string(),
168        }
169    }
170
171    fn worker_failure(message: impl Into<String>) -> EventStoreError {
172        EventStoreError::BackendFailure {
173            message: message.into(),
174        }
175    }
176
177    fn send_command(&self, worker_command: WorkerCommand) -> Result<(), EventStoreError> {
178        let worker_sender = self
179            .worker_sender
180            .lock()
181            .map_err(|_| Self::worker_failure("postgres worker sender lock poisoned"))?;
182
183        worker_sender
184            .send(worker_command)
185            .map_err(|error| Self::worker_failure(format!("postgres worker stopped: {error}")))
186    }
187
188    fn run_query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
189        let (reply_sender, reply_receiver) = mpsc::channel();
190        self.send_command(WorkerCommand::Query {
191            event_query: event_query.clone(),
192            reply: reply_sender,
193        })?;
194
195        reply_receiver.recv().map_err(|error| {
196            Self::worker_failure(format!("postgres worker query reply failed: {error}"))
197        })?
198    }
199
200    fn run_append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
201        let (reply_sender, reply_receiver) = mpsc::channel();
202        self.send_command(WorkerCommand::Append {
203            new_events,
204            reply: reply_sender,
205        })?;
206
207        reply_receiver.recv().map_err(|error| {
208            Self::worker_failure(format!("postgres worker append reply failed: {error}"))
209        })?
210    }
211
212    fn run_append_if(
213        &self,
214        new_events: Vec<NewEvent>,
215        context_query: &EventQuery,
216        expected_context_version: Option<u64>,
217    ) -> Result<AppendResult, EventStoreError> {
218        let (reply_sender, reply_receiver) = mpsc::channel();
219        self.send_command(WorkerCommand::AppendIf {
220            new_events,
221            context_query: context_query.clone(),
222            expected_context_version,
223            reply: reply_sender,
224        })?;
225
226        reply_receiver.recv().map_err(|error| {
227            Self::worker_failure(format!(
228                "postgres worker conditional append reply failed: {error}"
229            ))
230        })?
231    }
232
233    fn run_async<T, Fut, F>(&self, operation: &'static str, work: F) -> Result<T, EventStoreError>
234    where
235        T: Send + 'static,
236        Fut: Future<Output = Result<T, EventStoreError>> + Send + 'static,
237        F: FnOnce(PgPool) -> Fut + Send + 'static,
238    {
239        let connection_string = self.connection_string.clone();
240        let (result_sender, result_receiver) = mpsc::sync_channel(1);
241
242        let worker_thread = thread::Builder::new()
243            .name(format!("factstr-postgres-{operation}"))
244            .spawn(move || {
245                let runtime = Builder::new_current_thread()
246                    .enable_all()
247                    .build()
248                    .map_err(sqlx_io_error)
249                    .map_err(Self::backend_failure);
250
251                let result = match runtime {
252                    Ok(runtime) => runtime.block_on(async {
253                        let pool = PgPoolOptions::new()
254                            .max_connections(1)
255                            .connect(&connection_string)
256                            .await
257                            .map_err(Self::backend_failure)?;
258                        work(pool).await
259                    }),
260                    Err(error) => Err(error),
261                };
262
263                let _ = result_sender.send(result);
264            })
265            .map_err(sqlx_io_error)
266            .map_err(Self::backend_failure)?;
267
268        let result = result_receiver
269            .recv()
270            .map_err(|error| EventStoreError::BackendFailure {
271                message: format!("postgres {operation} thread did not return a result: {error}"),
272            })?;
273
274        worker_thread
275            .join()
276            .map_err(|_| EventStoreError::BackendFailure {
277                message: format!("postgres {operation} thread panicked before completion"),
278            })?;
279
280        result
281    }
282
283    fn enqueue_delivery(&self, pending_deliveries: Vec<PendingDelivery>) {
284        Self::enqueue_delivery_with_sender(&self.delivery_sender, pending_deliveries);
285    }
286
287    fn enqueue_delivery_with_sender(
288        delivery_sender: &Sender<DeliveryCommand>,
289        pending_deliveries: Vec<PendingDelivery>,
290    ) {
291        if pending_deliveries.is_empty() {
292            return;
293        }
294
295        if let Err(error) = delivery_sender.send(DeliveryCommand::Deliver(pending_deliveries)) {
296            eprintln!(
297                "factstr-postgres delivery dispatcher stopped after commit: {}",
298                error
299            );
300        }
301    }
302
303    fn register_all_durable_stream(
304        &self,
305        durable_stream_id: impl Into<String>,
306        handle: HandleStream,
307    ) -> Result<EventStream, EventStoreError> {
308        self.stream_durable(durable_stream_id.into(), EventQuery::all(), handle)
309    }
310
311    fn register_durable_stream(
312        &self,
313        durable_stream_id: impl Into<String>,
314        event_query: &EventQuery,
315        handle: HandleStream,
316    ) -> Result<EventStream, EventStoreError> {
317        self.stream_durable(durable_stream_id.into(), event_query.clone(), handle)
318    }
319
320    fn stream_durable(
321        &self,
322        durable_stream_id: String,
323        event_query: EventQuery,
324        handle: HandleStream,
325    ) -> Result<EventStream, EventStoreError> {
326        let normalized_event_query = normalized_durable_event_query(&event_query);
327        let event_query_json = serialize_event_query(&normalized_event_query)?;
328        let subscription_registry = Arc::clone(&self.subscription_registry);
329        let durable_stream_id_for_registry = durable_stream_id.clone();
330        let normalized_event_query_for_registry = normalized_event_query.clone();
331        let handle_for_registry = handle.clone();
332        let replay_stream_id = durable_stream_id.clone();
333
334        let replay_state = self.run_async("stream_durable", move |pool| async move {
335            let mut transaction = pool.begin().await.map_err(Self::backend_failure)?;
336
337            let last_processed_sequence_number = load_or_create_durable_stream_cursor(
338                &mut transaction,
339                &durable_stream_id_for_registry,
340                &event_query_json,
341            )
342            .await?;
343            let replay_until_sequence_number =
344                current_max_sequence_number_in_transaction(&mut transaction).await?;
345
346            let subscription_id = match subscription_registry.lock() {
347                Ok(mut subscription_registry) => {
348                    if normalized_event_query_for_registry.filters.is_none() {
349                        subscription_registry
350                            .subscribe_all_durable(
351                                durable_stream_id_for_registry.clone(),
352                                replay_until_sequence_number,
353                                handle_for_registry.clone(),
354                            )
355                            .map_err(subscription_registry_backend_failure)?
356                    } else {
357                        subscription_registry
358                            .subscribe_to_durable(
359                                durable_stream_id_for_registry.clone(),
360                                Some(normalized_event_query_for_registry.clone()),
361                                replay_until_sequence_number,
362                                handle_for_registry.clone(),
363                            )
364                            .map_err(subscription_registry_backend_failure)?
365                    }
366                }
367                Err(poisoned) => {
368                    let mut subscription_registry = poisoned.into_inner();
369                    if normalized_event_query_for_registry.filters.is_none() {
370                        subscription_registry
371                            .subscribe_all_durable(
372                                durable_stream_id_for_registry.clone(),
373                                replay_until_sequence_number,
374                                handle_for_registry.clone(),
375                            )
376                            .map_err(subscription_registry_backend_failure)?
377                    } else {
378                        subscription_registry
379                            .subscribe_to_durable(
380                                durable_stream_id_for_registry.clone(),
381                                Some(normalized_event_query_for_registry.clone()),
382                                replay_until_sequence_number,
383                                handle_for_registry.clone(),
384                            )
385                            .map_err(subscription_registry_backend_failure)?
386                    }
387                }
388            };
389
390            if let Err(error) = transaction.commit().await {
391                match subscription_registry.lock() {
392                    Ok(mut subscription_registry) => {
393                        subscription_registry.unsubscribe(subscription_id)
394                    }
395                    Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
396                }
397                return Err(Self::backend_failure(error));
398            }
399
400            Ok(DurableReplayState {
401                subscription_id,
402                last_processed_sequence_number,
403                replay_until_sequence_number,
404            })
405        })?;
406
407        let subscription = self.build_subscription_handle(replay_state.subscription_id);
408
409        let replay_batches = match self.run_async("durable_replay", {
410            let event_query = normalized_event_query.clone();
411            move |pool| async move {
412                ensure_replay_history_is_available_for_pool(
413                    &pool,
414                    replay_state.replay_until_sequence_number,
415                )
416                .await?;
417                load_replay_batches(
418                    &pool,
419                    &event_query,
420                    replay_state.last_processed_sequence_number,
421                    replay_state.replay_until_sequence_number,
422                )
423                .await
424            }
425        }) {
426            Ok(replay_batches) => replay_batches,
427            Err(error) => {
428                self.cleanup_durable_subscription(replay_state.subscription_id);
429                return Err(error);
430            }
431        };
432        let delivery_runtime = build_delivery_runtime("factstr-postgres durable replay")?;
433
434        for replay_batch in replay_batches {
435            let pending_delivery = PendingDelivery {
436                subscription_id: replay_state.subscription_id,
437                durable_stream_id: Some(replay_stream_id.clone()),
438                last_processed_sequence_number: replay_batch.last_processed_sequence_number,
439                delivered_batch: replay_batch.delivered_batch,
440                handle: handle.clone(),
441            };
442
443            match self.process_durable_delivery(&delivery_runtime, pending_delivery) {
444                Ok(true) => {}
445                Ok(false) => {
446                    self.cleanup_durable_subscription(replay_state.subscription_id);
447                    return Err(EventStoreError::BackendFailure {
448                        message: format!(
449                            "durable replay for stream {} did not complete successfully",
450                            replay_stream_id
451                        ),
452                    });
453                }
454                Err(error) => {
455                    self.cleanup_durable_subscription(replay_state.subscription_id);
456                    return Err(error);
457                }
458            }
459        }
460
461        self.finish_durable_replay(replay_state.subscription_id);
462        Ok(subscription)
463    }
464
465    fn build_subscription_handle(&self, subscription_id: u64) -> EventStream {
466        let subscription_registry = Arc::clone(&self.subscription_registry);
467
468        EventStream::new(
469            subscription_id,
470            Arc::new(move |subscription_id| match subscription_registry.lock() {
471                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
472                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
473            }),
474        )
475    }
476
477    fn process_durable_delivery(
478        &self,
479        delivery_runtime: &Runtime,
480        pending_delivery: PendingDelivery,
481    ) -> Result<bool, EventStoreError> {
482        match pending_delivery.deliver(delivery_runtime) {
483            DeliveryOutcome::Succeeded {
484                durable_stream_id,
485                last_processed_sequence_number,
486                ..
487            } => {
488                if let Some(durable_stream_id) = durable_stream_id {
489                    self.run_async("advance_durable_cursor", move |pool| async move {
490                        update_durable_stream_cursor(
491                            &pool,
492                            &durable_stream_id,
493                            last_processed_sequence_number,
494                        )
495                        .await
496                    })?;
497                }
498
499                Ok(true)
500            }
501            DeliveryOutcome::Failed { .. } | DeliveryOutcome::Panicked { .. } => Ok(false),
502        }
503    }
504
505    fn finish_durable_replay(&self, subscription_id: u64) {
506        match self.subscription_registry.lock() {
507            Ok(mut subscription_registry) => {
508                let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
509                self.enqueue_delivery(buffered_deliveries);
510            }
511            Err(poisoned) => {
512                let mut subscription_registry = poisoned.into_inner();
513                let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
514                self.enqueue_delivery(buffered_deliveries);
515            }
516        }
517    }
518
519    fn cleanup_durable_subscription(&self, subscription_id: u64) {
520        match self.subscription_registry.lock() {
521            Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
522            Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
523        }
524    }
525}
526
527impl Drop for PostgresStore {
528    fn drop(&mut self) {
529        if let Ok(worker_sender) = self.worker_sender.lock() {
530            let _ = worker_sender.send(WorkerCommand::Shutdown);
531        }
532
533        if let Ok(mut worker_thread) = self.worker_thread.lock() {
534            if let Some(worker_thread) = worker_thread.take() {
535                let _ = worker_thread.join();
536            }
537        }
538
539        let _ = self.delivery_sender.send(DeliveryCommand::Shutdown);
540
541        if let Ok(mut delivery_thread) = self.delivery_thread.lock() {
542            if let Some(delivery_thread) = delivery_thread.take() {
543                let _ = delivery_thread.join();
544            }
545        }
546    }
547}
548
549impl EventStore for PostgresStore {
550    fn query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
551        self.run_query(event_query)
552    }
553
554    fn append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
555        if new_events.is_empty() {
556            return Err(EventStoreError::EmptyAppend);
557        }
558
559        self.run_append(new_events)
560    }
561
562    fn append_if(
563        &self,
564        new_events: Vec<NewEvent>,
565        context_query: &EventQuery,
566        expected_context_version: Option<u64>,
567    ) -> Result<AppendResult, EventStoreError> {
568        if new_events.is_empty() {
569            return Err(EventStoreError::EmptyAppend);
570        }
571
572        self.run_append_if(new_events, context_query, expected_context_version)
573    }
574
575    fn stream_all(&self, handle: HandleStream) -> Result<EventStream, EventStoreError> {
576        let subscription_registry = Arc::clone(&self.subscription_registry);
577        let id = match subscription_registry.lock() {
578            Ok(mut subscription_registry) => subscription_registry.subscribe_all(handle),
579            Err(poisoned) => poisoned.into_inner().subscribe_all(handle),
580        };
581
582        Ok(EventStream::new(
583            id,
584            Arc::new(move |subscription_id| match subscription_registry.lock() {
585                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
586                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
587            }),
588        ))
589    }
590
591    fn stream_to(
592        &self,
593        event_query: &EventQuery,
594        handle: HandleStream,
595    ) -> Result<EventStream, EventStoreError> {
596        let subscription_registry = Arc::clone(&self.subscription_registry);
597        let id = match subscription_registry.lock() {
598            Ok(mut subscription_registry) => {
599                subscription_registry.subscribe_to(Some(event_query.clone()), handle)
600            }
601            Err(poisoned) => poisoned
602                .into_inner()
603                .subscribe_to(Some(event_query.clone()), handle),
604        };
605
606        Ok(EventStream::new(
607            id,
608            Arc::new(move |subscription_id| match subscription_registry.lock() {
609                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
610                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
611            }),
612        ))
613    }
614
615    fn stream_all_durable(
616        &self,
617        durable_stream: &DurableStream,
618        handle: HandleStream,
619    ) -> Result<EventStream, EventStoreError> {
620        self.register_all_durable_stream(durable_stream.name(), handle)
621    }
622
623    fn stream_to_durable(
624        &self,
625        durable_stream: &DurableStream,
626        event_query: &EventQuery,
627        handle: HandleStream,
628    ) -> Result<EventStream, EventStoreError> {
629        self.register_durable_stream(durable_stream.name(), event_query, handle)
630    }
631}
632
633fn run_worker_thread(
634    connection_string: String,
635    worker_receiver: Receiver<WorkerCommand>,
636    ready_sender: mpsc::SyncSender<Result<(), sqlx::Error>>,
637    subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
638    delivery_sender: Sender<DeliveryCommand>,
639) {
640    let runtime = match Builder::new_current_thread().enable_all().build() {
641        Ok(runtime) => runtime,
642        Err(error) => {
643            let _ = ready_sender.send(Err(sqlx_io_error(io::Error::other(format!(
644                "tokio runtime should build: {error}"
645            )))));
646            return;
647        }
648    };
649
650    let pool = match runtime.block_on(async {
651        PgPoolOptions::new()
652            .max_connections(1)
653            .connect(&connection_string)
654            .await
655    }) {
656        Ok(pool) => pool,
657        Err(error) => {
658            let _ = ready_sender.send(Err(error));
659            return;
660        }
661    };
662
663    if ready_sender.send(Ok(())).is_err() {
664        return;
665    }
666
667    while let Ok(worker_command) = worker_receiver.recv() {
668        match worker_command {
669            WorkerCommand::Query { event_query, reply } => {
670                let result = runtime
671                    .block_on(query_with_pool(&pool, &event_query))
672                    .map_err(PostgresStore::backend_failure);
673                let _ = reply.send(result);
674            }
675            WorkerCommand::Append { new_events, reply } => {
676                let result = runtime
677                    .block_on(append_with_pool(&pool, new_events))
678                    .map_err(PostgresStore::backend_failure);
679                if let Ok(committed_append) = &result {
680                    let pending_deliveries = match subscription_registry.lock() {
681                        Ok(mut subscription_registry) => subscription_registry
682                            .pending_deliveries(&committed_append.event_records),
683                        Err(poisoned) => poisoned
684                            .into_inner()
685                            .pending_deliveries(&committed_append.event_records),
686                    };
687                    PostgresStore::enqueue_delivery_with_sender(
688                        &delivery_sender,
689                        pending_deliveries,
690                    );
691                }
692                let result = result.map(|committed_append| committed_append.append_result);
693                let _ = reply.send(result);
694            }
695            WorkerCommand::AppendIf {
696                new_events,
697                context_query,
698                expected_context_version,
699                reply,
700            } => {
701                let result = runtime
702                    .block_on(append_if_with_pool(
703                        &pool,
704                        new_events,
705                        &context_query,
706                        expected_context_version,
707                    ))
708                    .map_err(PostgresStore::backend_failure)
709                    .and_then(|result| result);
710                if let Ok(committed_append) = &result {
711                    let pending_deliveries = match subscription_registry.lock() {
712                        Ok(mut subscription_registry) => subscription_registry
713                            .pending_deliveries(&committed_append.event_records),
714                        Err(poisoned) => poisoned
715                            .into_inner()
716                            .pending_deliveries(&committed_append.event_records),
717                    };
718                    PostgresStore::enqueue_delivery_with_sender(
719                        &delivery_sender,
720                        pending_deliveries,
721                    );
722                }
723                let result = result.map(|committed_append| committed_append.append_result);
724                let _ = reply.send(result);
725            }
726            WorkerCommand::Shutdown => break,
727        }
728    }
729}
730
731fn run_delivery_thread(
732    connection_string: String,
733    subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
734    delivery_receiver: Receiver<DeliveryCommand>,
735) {
736    let runtime = match Builder::new_current_thread().enable_all().build() {
737        Ok(runtime) => runtime,
738        Err(error) => {
739            eprintln!(
740                "factstr-postgres delivery runtime could not start: {}",
741                error
742            );
743            return;
744        }
745    };
746
747    let pool = match runtime.block_on(async {
748        PgPoolOptions::new()
749            .max_connections(1)
750            .connect(&connection_string)
751            .await
752    }) {
753        Ok(pool) => pool,
754        Err(error) => {
755            eprintln!(
756                "factstr-postgres delivery pool could not connect: {}",
757                error
758            );
759            return;
760        }
761    };
762
763    while let Ok(delivery_command) = delivery_receiver.recv() {
764        match delivery_command {
765            DeliveryCommand::Deliver(pending_deliveries) => {
766                for pending_delivery in pending_deliveries {
767                    match pending_delivery.deliver(&runtime) {
768                        DeliveryOutcome::Succeeded {
769                            subscription_id,
770                            durable_stream_id,
771                            last_processed_sequence_number,
772                        } => {
773                            if let Some(durable_stream_id) = durable_stream_id {
774                                if let Err(error) = runtime.block_on(update_durable_stream_cursor(
775                                    &pool,
776                                    &durable_stream_id,
777                                    last_processed_sequence_number,
778                                )) {
779                                    eprintln!(
780                                        "factstr-postgres durable cursor update failed after delivery for stream {}: {}",
781                                        durable_stream_id, error
782                                    );
783                                    match subscription_registry.lock() {
784                                        Ok(mut subscription_registry) => {
785                                            subscription_registry.unsubscribe(subscription_id)
786                                        }
787                                        Err(poisoned) => {
788                                            poisoned.into_inner().unsubscribe(subscription_id)
789                                        }
790                                    }
791                                }
792                            }
793                        }
794                        DeliveryOutcome::Failed {
795                            subscription_id,
796                            durable_stream_id,
797                        }
798                        | DeliveryOutcome::Panicked {
799                            subscription_id,
800                            durable_stream_id,
801                        } => {
802                            if durable_stream_id.is_some() {
803                                match subscription_registry.lock() {
804                                    Ok(mut subscription_registry) => {
805                                        subscription_registry.unsubscribe(subscription_id)
806                                    }
807                                    Err(poisoned) => {
808                                        poisoned.into_inner().unsubscribe(subscription_id)
809                                    }
810                                }
811                            }
812                        }
813                    }
814                }
815            }
816            DeliveryCommand::Shutdown => break,
817        }
818    }
819}
820
821fn sqlx_io_error(error: io::Error) -> sqlx::Error {
822    sqlx::Error::Io(error)
823}
824
825fn backend_failure_message(message: impl Into<String>) -> EventStoreError {
826    EventStoreError::BackendFailure {
827        message: message.into(),
828    }
829}
830
831fn build_delivery_runtime(operation: &'static str) -> Result<Runtime, EventStoreError> {
832    Builder::new_current_thread()
833        .enable_all()
834        .build()
835        .map_err(sqlx_io_error)
836        .map_err(PostgresStore::backend_failure)
837        .map_err(|error| match error {
838            EventStoreError::BackendFailure { message } => EventStoreError::BackendFailure {
839                message: format!("{operation}: {message}"),
840            },
841            other => other,
842        })
843}
844
845fn bootstrap_database(options: &PostgresBootstrapOptions) -> Result<(), EventStoreError> {
846    let options = options.clone();
847    let (result_sender, result_receiver) = mpsc::sync_channel(1);
848
849    let bootstrap_thread = thread::Builder::new()
850        .name("factstr-postgres-database-bootstrap".to_owned())
851        .spawn(move || {
852            let runtime = Builder::new_current_thread()
853                .enable_all()
854                .build()
855                .map_err(sqlx_io_error)
856                .map_err(PostgresStore::backend_failure);
857
858            let result = match runtime {
859                Ok(runtime) => runtime.block_on(async move {
860                    ensure_database_exists(&options.server_url, &options.database_name).await
861                }),
862                Err(error) => Err(error),
863            };
864
865            let _ = result_sender.send(result);
866        })
867        .map_err(sqlx_io_error)
868        .map_err(PostgresStore::backend_failure)?;
869
870    let result = result_receiver.recv().map_err(|error| {
871        backend_failure_message(format!(
872            "postgres bootstrap database thread did not return a result: {error}"
873        ))
874    })?;
875
876    bootstrap_thread.join().map_err(|_| {
877        backend_failure_message("postgres bootstrap database thread panicked before completion")
878    })?;
879
880    result
881}
882
883async fn ensure_database_exists(
884    server_url: &str,
885    database_name: &str,
886) -> Result<(), EventStoreError> {
887    validate_bootstrap_database_name(database_name)?;
888    let _ = database_url_with_name(server_url, database_name)?;
889
890    let quoted_database_name = quote_postgres_identifier(database_name)?;
891    let pool = PgPoolOptions::new()
892        .max_connections(1)
893        .connect(server_url)
894        .await
895        .map_err(|error| {
896            backend_failure_message(format!(
897                "postgres bootstrap could not connect to server_url {server_url:?}: {error}"
898            ))
899        })?;
900
901    let database_exists = sqlx::query_scalar::<_, bool>(
902        "SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = $1)",
903    )
904    .bind(database_name)
905    .fetch_one(&pool)
906    .await
907    .map_err(|error| {
908        backend_failure_message(format!(
909            "postgres bootstrap could not inspect database {database_name:?}: {error}"
910        ))
911    })?;
912
913    if database_exists {
914        return Ok(());
915    }
916
917    let create_database_sql = format!("CREATE DATABASE {quoted_database_name}");
918    match sqlx::query(&create_database_sql).execute(&pool).await {
919        Ok(_) => Ok(()),
920        Err(sqlx::Error::Database(database_error))
921            if database_error.code().as_deref() == Some("42P04") =>
922        {
923            Ok(())
924        }
925        Err(error) => Err(backend_failure_message(format!(
926            "postgres bootstrap could not create database {database_name:?}: {error}"
927        ))),
928    }
929}
930
931fn quote_postgres_identifier(identifier: &str) -> Result<String, EventStoreError> {
932    if identifier.is_empty() {
933        return Err(backend_failure_message(
934            "postgres bootstrap database_name must not be empty",
935        ));
936    }
937
938    if identifier.contains('\0') {
939        return Err(backend_failure_message(
940            "postgres bootstrap database_name must not contain null bytes",
941        ));
942    }
943
944    Ok(format!("\"{}\"", identifier.replace('"', "\"\"")))
945}
946
947fn validate_bootstrap_database_name(database_name: &str) -> Result<(), EventStoreError> {
948    if database_name.is_empty() {
949        return Err(backend_failure_message(
950            "postgres bootstrap database_name must not be empty",
951        ));
952    }
953
954    if database_name.contains('\0') {
955        return Err(backend_failure_message(
956            "postgres bootstrap database_name must not contain null bytes",
957        ));
958    }
959
960    let mut characters = database_name.chars();
961    let Some(first_character) = characters.next() else {
962        return Err(backend_failure_message(
963            "postgres bootstrap database_name must not be empty",
964        ));
965    };
966
967    if !(first_character.is_ascii_alphabetic() || first_character == '_') {
968        return Err(backend_failure_message(
969            "postgres bootstrap database_name must match [A-Za-z_][A-Za-z0-9_]*",
970        ));
971    }
972
973    if !characters.all(|character| character.is_ascii_alphanumeric() || character == '_') {
974        return Err(backend_failure_message(
975            "postgres bootstrap database_name must match [A-Za-z_][A-Za-z0-9_]*",
976        ));
977    }
978
979    Ok(())
980}
981
982fn database_url_with_name(
983    server_url: &str,
984    database_name: &str,
985) -> Result<String, EventStoreError> {
986    let mut url = Url::parse(server_url).map_err(|error| {
987        backend_failure_message(format!(
988            "invalid postgres server_url {server_url:?}: {error}"
989        ))
990    })?;
991    url.set_path(&format!("/{database_name}"));
992    Ok(url.into())
993}
994
995fn bootstrap_connection(connection_string: &str) -> Result<(), sqlx::Error> {
996    let connection_string = connection_string.to_owned();
997    let (result_sender, result_receiver) = mpsc::sync_channel(1);
998
999    let bootstrap_thread = thread::Builder::new()
1000        .name("factstr-postgres-bootstrap".to_owned())
1001        .spawn(move || {
1002            let runtime = Builder::new_current_thread()
1003                .enable_all()
1004                .build()
1005                .map_err(sqlx_io_error);
1006
1007            let result = match runtime {
1008                Ok(runtime) => runtime.block_on(async {
1009                    let pool = PgPoolOptions::new()
1010                        .max_connections(1)
1011                        .connect(&connection_string)
1012                        .await?;
1013                    initialize_schema(&pool).await?;
1014                    Ok::<_, sqlx::Error>(())
1015                }),
1016                Err(error) => Err(error),
1017            };
1018
1019            let _ = result_sender.send(result);
1020        })
1021        .map_err(sqlx_io_error)?;
1022
1023    let result = match result_receiver.recv() {
1024        Ok(result) => result,
1025        Err(error) => Err(sqlx_io_error(io::Error::other(format!(
1026            "postgres bootstrap thread did not return a result: {error}"
1027        )))),
1028    };
1029
1030    bootstrap_thread.join().map_err(|_| {
1031        sqlx_io_error(io::Error::other(
1032            "postgres bootstrap thread panicked before connect completed",
1033        ))
1034    })?;
1035
1036    result
1037}
1038
1039async fn query_with_pool(
1040    pool: &PgPool,
1041    event_query: &EventQuery,
1042) -> Result<QueryResult, sqlx::Error> {
1043    let current_context_version = current_context_version(pool, event_query).await?;
1044
1045    let mut query_builder: QueryBuilder<'_, Postgres> =
1046        QueryBuilder::new("SELECT sequence_number, occurred_at, event_type, payload FROM events");
1047    push_query_conditions(&mut query_builder, event_query, true);
1048    query_builder.push(" ORDER BY sequence_number ASC");
1049
1050    let event_records = query_builder
1051        .build()
1052        .fetch_all(pool)
1053        .await?
1054        .into_iter()
1055        .map(event_record_from_row)
1056        .collect::<Result<Vec<_>, _>>()?;
1057
1058    let last_returned_sequence_number = event_records
1059        .last()
1060        .map(|event_record| event_record.sequence_number);
1061
1062    Ok(QueryResult {
1063        event_records,
1064        last_returned_sequence_number,
1065        current_context_version,
1066    })
1067}
1068
1069async fn append_with_pool(
1070    pool: &PgPool,
1071    new_events: Vec<NewEvent>,
1072) -> Result<CommittedAppend, sqlx::Error> {
1073    let mut transaction = pool.begin().await?;
1074    lock_events_tables(&mut transaction).await?;
1075    let committed_append = append_records(&mut transaction, new_events).await?;
1076    transaction.commit().await?;
1077    Ok(committed_append)
1078}
1079
1080async fn append_if_with_pool(
1081    pool: &PgPool,
1082    new_events: Vec<NewEvent>,
1083    context_query: &EventQuery,
1084    expected_context_version: Option<u64>,
1085) -> Result<Result<CommittedAppend, EventStoreError>, sqlx::Error> {
1086    let mut transaction = pool.begin().await?;
1087    lock_events_tables(&mut transaction).await?;
1088
1089    let actual_context_version =
1090        current_context_version_in_transaction(&mut transaction, context_query).await?;
1091
1092    if actual_context_version != expected_context_version {
1093        transaction.rollback().await?;
1094        return Ok(Err(EventStoreError::ConditionalAppendConflict {
1095            expected: expected_context_version,
1096            actual: actual_context_version,
1097        }));
1098    }
1099
1100    let committed_append = append_records(&mut transaction, new_events).await?;
1101    transaction.commit().await?;
1102
1103    Ok(Ok(committed_append))
1104}
1105
1106async fn initialize_schema(pool: &PgPool) -> Result<(), sqlx::Error> {
1107    let is_new_schema = sqlx::query_scalar::<_, Option<String>>(
1108        "SELECT to_regclass(current_schema() || '.events')::text",
1109    )
1110    .fetch_one(pool)
1111    .await?
1112    .is_none();
1113
1114    sqlx::query(
1115        "CREATE TABLE IF NOT EXISTS events (
1116            sequence_number BIGINT PRIMARY KEY,
1117            occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
1118            event_type TEXT NOT NULL,
1119            payload JSONB NOT NULL
1120        )",
1121    )
1122    .execute(pool)
1123    .await?;
1124
1125    sqlx::query(
1126        "CREATE TABLE IF NOT EXISTS append_batches (
1127            first_sequence_number BIGINT PRIMARY KEY,
1128            last_sequence_number BIGINT NOT NULL
1129        )",
1130    )
1131    .execute(pool)
1132    .await?;
1133
1134    sqlx::query(
1135        "CREATE TABLE IF NOT EXISTS store_metadata (
1136            key TEXT PRIMARY KEY,
1137            value TEXT NOT NULL
1138        )",
1139    )
1140    .execute(pool)
1141    .await?;
1142
1143    sqlx::query(
1144        "CREATE TABLE IF NOT EXISTS durable_stream_cursors (
1145            durable_stream_id TEXT PRIMARY KEY,
1146            event_query TEXT NOT NULL,
1147            last_processed_sequence_number BIGINT NOT NULL
1148        )",
1149    )
1150    .execute(pool)
1151    .await?;
1152
1153    sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type)")
1154        .execute(pool)
1155        .await?;
1156
1157    sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_occurred_at ON events(occurred_at)")
1158        .execute(pool)
1159        .await?;
1160
1161    sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_payload_gin ON events USING gin(payload)")
1162        .execute(pool)
1163        .await?;
1164
1165    sqlx::query(
1166        "INSERT INTO store_metadata (key, value)
1167         VALUES ('store_format_version', $1)
1168         ON CONFLICT(key) DO NOTHING",
1169    )
1170    .bind(STORE_FORMAT_VERSION)
1171    .execute(pool)
1172    .await?;
1173
1174    if is_new_schema {
1175        sqlx::query(
1176            "INSERT INTO store_metadata (key, value)
1177             VALUES ($1, $2)",
1178        )
1179        .bind(APPEND_BATCH_BOUNDARY_FORMAT_KEY)
1180        .bind(APPEND_BATCH_BOUNDARY_FORMAT_SPARSE_V1)
1181        .execute(pool)
1182        .await?;
1183    }
1184
1185    Ok(())
1186}
1187
1188async fn lock_events_tables(
1189    transaction: &mut Transaction<'_, Postgres>,
1190) -> Result<(), sqlx::Error> {
1191    sqlx::query("LOCK TABLE events, append_batches IN EXCLUSIVE MODE")
1192        .execute(transaction.as_mut())
1193        .await?;
1194
1195    Ok(())
1196}
1197
1198async fn append_records(
1199    transaction: &mut Transaction<'_, Postgres>,
1200    new_events: Vec<NewEvent>,
1201) -> Result<CommittedAppend, sqlx::Error> {
1202    let committed_count = new_events.len() as u64;
1203    let first_sequence_number =
1204        sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(sequence_number), 0) + 1 FROM events")
1205            .fetch_one(transaction.as_mut())
1206            .await? as u64;
1207    let last_sequence_number = first_sequence_number + committed_count - 1;
1208    let committed_event_records = new_events
1209        .into_iter()
1210        .enumerate()
1211        .map(|(offset, new_event)| EventRecord {
1212            sequence_number: first_sequence_number + offset as u64,
1213            occurred_at: OffsetDateTime::now_utc(),
1214            event_type: new_event.event_type,
1215            payload: new_event.payload,
1216        })
1217        .collect::<Vec<_>>();
1218
1219    for event_record in &committed_event_records {
1220        sqlx::query(
1221            "INSERT INTO events (sequence_number, occurred_at, event_type, payload)
1222             VALUES ($1, $2, $3, $4)",
1223        )
1224        .bind(event_record.sequence_number as i64)
1225        .bind(event_record.occurred_at)
1226        .bind(&event_record.event_type)
1227        .bind(&event_record.payload)
1228        .execute(transaction.as_mut())
1229        .await?;
1230    }
1231
1232    if first_sequence_number != last_sequence_number {
1233        sqlx::query(
1234            "INSERT INTO append_batches (first_sequence_number, last_sequence_number)
1235             VALUES ($1, $2)",
1236        )
1237        .bind(first_sequence_number as i64)
1238        .bind(last_sequence_number as i64)
1239        .execute(transaction.as_mut())
1240        .await?;
1241    }
1242
1243    Ok(CommittedAppend {
1244        append_result: AppendResult {
1245            first_sequence_number,
1246            last_sequence_number,
1247            committed_count,
1248        },
1249        event_records: committed_event_records,
1250    })
1251}
1252
1253async fn current_context_version(
1254    pool: &PgPool,
1255    event_query: &EventQuery,
1256) -> Result<Option<u64>, sqlx::Error> {
1257    let mut query_builder: QueryBuilder<'_, Postgres> =
1258        QueryBuilder::new("SELECT MAX(sequence_number) FROM events");
1259    push_query_conditions(&mut query_builder, event_query, false);
1260
1261    Ok(query_builder
1262        .build_query_scalar::<Option<i64>>()
1263        .fetch_one(pool)
1264        .await?
1265        .map(|sequence_number| sequence_number as u64))
1266}
1267
1268async fn current_context_version_in_transaction(
1269    transaction: &mut Transaction<'_, Postgres>,
1270    event_query: &EventQuery,
1271) -> Result<Option<u64>, sqlx::Error> {
1272    let mut query_builder: QueryBuilder<'_, Postgres> =
1273        QueryBuilder::new("SELECT MAX(sequence_number) FROM events");
1274    push_query_conditions(&mut query_builder, event_query, false);
1275
1276    Ok(query_builder
1277        .build_query_scalar::<Option<i64>>()
1278        .fetch_one(transaction.as_mut())
1279        .await?
1280        .map(|sequence_number| sequence_number as u64))
1281}
1282
1283fn event_record_from_row(row: PgRow) -> Result<EventRecord, sqlx::Error> {
1284    Ok(EventRecord {
1285        sequence_number: row.try_get::<i64, _>("sequence_number")? as u64,
1286        occurred_at: row.try_get("occurred_at")?,
1287        event_type: row.try_get("event_type")?,
1288        payload: row.try_get("payload")?,
1289    })
1290}
1291
1292fn json_backend_failure(error: serde_json::Error) -> EventStoreError {
1293    EventStoreError::BackendFailure {
1294        message: error.to_string(),
1295    }
1296}
1297
1298fn subscription_registry_backend_failure(message: String) -> EventStoreError {
1299    EventStoreError::BackendFailure { message }
1300}
1301
1302async fn load_or_create_durable_stream_cursor(
1303    transaction: &mut Transaction<'_, Postgres>,
1304    durable_stream_id: &str,
1305    event_query_json: &str,
1306) -> Result<u64, EventStoreError> {
1307    let existing_cursor = sqlx::query(
1308        "SELECT event_query, last_processed_sequence_number
1309         FROM durable_stream_cursors
1310         WHERE durable_stream_id = $1",
1311    )
1312    .bind(durable_stream_id)
1313    .fetch_optional(transaction.as_mut())
1314    .await
1315    .map_err(PostgresStore::backend_failure)?;
1316
1317    match existing_cursor {
1318        Some(row) => {
1319            let stored_event_query = row.get::<String, _>("event_query");
1320            if stored_event_query != event_query_json {
1321                return Err(EventStoreError::BackendFailure {
1322                    message: format!(
1323                        "durable stream {durable_stream_id} was resumed with a different query"
1324                    ),
1325                });
1326            }
1327
1328            Ok(row.get::<i64, _>("last_processed_sequence_number") as u64)
1329        }
1330        None => {
1331            sqlx::query(
1332                "INSERT INTO durable_stream_cursors (
1333                    durable_stream_id,
1334                    event_query,
1335                    last_processed_sequence_number
1336                 ) VALUES ($1, $2, 0)",
1337            )
1338            .bind(durable_stream_id)
1339            .bind(event_query_json)
1340            .execute(transaction.as_mut())
1341            .await
1342            .map_err(PostgresStore::backend_failure)?;
1343
1344            Ok(0)
1345        }
1346    }
1347}
1348
1349async fn current_max_sequence_number_in_transaction(
1350    transaction: &mut Transaction<'_, Postgres>,
1351) -> Result<u64, EventStoreError> {
1352    Ok(
1353        sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(sequence_number) FROM events")
1354            .fetch_one(transaction.as_mut())
1355            .await
1356            .map_err(PostgresStore::backend_failure)?
1357            .unwrap_or(0) as u64,
1358    )
1359}
1360
1361async fn ensure_replay_history_is_available(
1362    connection: &mut sqlx::PgConnection,
1363    _current_max_sequence_number: u64,
1364) -> Result<(), EventStoreError> {
1365    let boundary_format =
1366        sqlx::query_scalar::<_, Option<String>>("SELECT value FROM store_metadata WHERE key = $1")
1367            .bind(APPEND_BATCH_BOUNDARY_FORMAT_KEY)
1368            .fetch_optional(&mut *connection)
1369            .await
1370            .map_err(PostgresStore::backend_failure)?;
1371
1372    if boundary_format.flatten().as_deref() != Some(APPEND_BATCH_BOUNDARY_FORMAT_SPARSE_V1) {
1373        return Err(EventStoreError::BackendFailure {
1374            message: format!(
1375                "durable replay requires store_metadata {}={}",
1376                APPEND_BATCH_BOUNDARY_FORMAT_KEY, APPEND_BATCH_BOUNDARY_FORMAT_SPARSE_V1
1377            ),
1378        });
1379    }
1380
1381    Ok(())
1382}
1383
1384async fn ensure_replay_history_is_available_for_pool(
1385    pool: &PgPool,
1386    current_max_sequence_number: u64,
1387) -> Result<(), EventStoreError> {
1388    let mut connection = pool
1389        .acquire()
1390        .await
1391        .map_err(PostgresStore::backend_failure)?;
1392    ensure_replay_history_is_available(connection.as_mut(), current_max_sequence_number).await
1393}
1394
1395async fn update_durable_stream_cursor(
1396    pool: &PgPool,
1397    durable_stream_id: &str,
1398    last_processed_sequence_number: u64,
1399) -> Result<(), EventStoreError> {
1400    sqlx::query(
1401        "UPDATE durable_stream_cursors
1402         SET last_processed_sequence_number = $2
1403         WHERE durable_stream_id = $1",
1404    )
1405    .bind(durable_stream_id)
1406    .bind(last_processed_sequence_number as i64)
1407    .execute(pool)
1408    .await
1409    .map_err(PostgresStore::backend_failure)?;
1410
1411    Ok(())
1412}
1413
1414async fn load_replay_batches(
1415    pool: &PgPool,
1416    event_query: &EventQuery,
1417    last_processed_sequence_number: u64,
1418    replay_until_sequence_number: u64,
1419) -> Result<Vec<ReplayBatch>, EventStoreError> {
1420    if replay_until_sequence_number <= last_processed_sequence_number {
1421        return Ok(Vec::new());
1422    }
1423
1424    let batch_rows = sqlx::query(
1425        "SELECT first_sequence_number, last_sequence_number
1426         FROM append_batches
1427         WHERE last_sequence_number > $1
1428           AND first_sequence_number <= $2
1429         ORDER BY first_sequence_number ASC",
1430    )
1431    .bind(last_processed_sequence_number as i64)
1432    .bind(replay_until_sequence_number as i64)
1433    .fetch_all(pool)
1434    .await
1435    .map_err(PostgresStore::backend_failure)?;
1436
1437    let append_batch_boundaries = batch_rows
1438        .into_iter()
1439        .map(|batch_row| AppendBatchBoundary {
1440            first_sequence_number: batch_row.get::<i64, _>("first_sequence_number") as u64,
1441            last_sequence_number: batch_row.get::<i64, _>("last_sequence_number") as u64,
1442        })
1443        .collect::<Vec<_>>();
1444    let event_records = sqlx::query(
1445        "SELECT sequence_number, occurred_at, event_type, payload
1446         FROM events
1447         WHERE sequence_number > $1
1448           AND sequence_number <= $2
1449         ORDER BY sequence_number ASC",
1450    )
1451    .bind(last_processed_sequence_number as i64)
1452    .bind(replay_until_sequence_number as i64)
1453    .fetch_all(pool)
1454    .await
1455    .map_err(PostgresStore::backend_failure)?
1456    .into_iter()
1457    .map(event_record_from_row)
1458    .collect::<Result<Vec<_>, _>>()
1459    .map_err(PostgresStore::backend_failure)?;
1460
1461    Ok(replay_batches_from_records(
1462        event_query,
1463        event_records,
1464        &append_batch_boundaries,
1465    ))
1466}
1467
1468fn replay_batches_from_records(
1469    event_query: &EventQuery,
1470    event_records: Vec<EventRecord>,
1471    append_batch_boundaries: &[AppendBatchBoundary],
1472) -> Vec<ReplayBatch> {
1473    let mut replay_batches = Vec::new();
1474    let mut event_index = 0;
1475    let mut boundary_index = 0;
1476
1477    while event_index < event_records.len() {
1478        let next_sequence_number = event_records[event_index].sequence_number;
1479
1480        while boundary_index < append_batch_boundaries.len()
1481            && append_batch_boundaries[boundary_index].last_sequence_number < next_sequence_number
1482        {
1483            boundary_index += 1;
1484        }
1485
1486        if boundary_index < append_batch_boundaries.len() {
1487            let boundary = &append_batch_boundaries[boundary_index];
1488            if boundary.first_sequence_number <= next_sequence_number
1489                && next_sequence_number <= boundary.last_sequence_number
1490            {
1491                let batch_start = event_index;
1492                while event_index < event_records.len()
1493                    && event_records[event_index].sequence_number <= boundary.last_sequence_number
1494                {
1495                    event_index += 1;
1496                }
1497
1498                let delivered_batch = event_records[batch_start..event_index]
1499                    .iter()
1500                    .filter(|event_record| matches_query(event_query, event_record))
1501                    .cloned()
1502                    .collect();
1503
1504                replay_batches.push(ReplayBatch {
1505                    last_processed_sequence_number: boundary.last_sequence_number,
1506                    delivered_batch,
1507                });
1508                continue;
1509            }
1510        }
1511
1512        let event_record = event_records[event_index].clone();
1513        event_index += 1;
1514
1515        let delivered_batch = if matches_query(event_query, &event_record) {
1516            vec![event_record.clone()]
1517        } else {
1518            Vec::new()
1519        };
1520
1521        replay_batches.push(ReplayBatch {
1522            last_processed_sequence_number: event_record.sequence_number,
1523            delivered_batch,
1524        });
1525    }
1526
1527    replay_batches
1528}
1529
1530fn normalized_durable_event_query(event_query: &EventQuery) -> EventQuery {
1531    EventQuery {
1532        filters: event_query.filters.clone(),
1533        min_sequence_number: None,
1534    }
1535}
1536
1537fn serialize_event_query(event_query: &EventQuery) -> Result<String, EventStoreError> {
1538    let filters = event_query.filters.as_ref().map(|filters| {
1539        filters
1540            .iter()
1541            .map(|filter| {
1542                serde_json::json!({
1543                    "event_types": filter.event_types,
1544                    "payload_predicates": filter.payload_predicates,
1545                })
1546            })
1547            .collect::<Vec<_>>()
1548    });
1549
1550    serde_json::to_string(&serde_json::json!({
1551        "filters": filters,
1552        "min_sequence_number": event_query.min_sequence_number,
1553    }))
1554    .map_err(json_backend_failure)
1555}