Skip to main content

factstr_postgres/
postgres_store.rs

1use std::future::Future;
2use std::io;
3use std::sync::{
4    Arc, Mutex,
5    mpsc::{self, Receiver, Sender},
6};
7use std::thread::{self, JoinHandle};
8
9use factstr::{
10    AppendResult, DurableStream, EventQuery, EventRecord, EventStore, EventStoreError, EventStream,
11    HandleStream, NewEvent, QueryResult,
12};
13use sqlx::{
14    PgPool, Postgres, QueryBuilder, Row, Transaction,
15    postgres::{PgPoolOptions, PgRow},
16};
17use time::OffsetDateTime;
18use tokio::runtime::Builder;
19use tokio::runtime::Runtime;
20use url::Url;
21
22use crate::query_match::matches_query;
23use crate::query_sql::push_query_conditions;
24use crate::stream_registry::{DeliveryOutcome, PendingDelivery, SubscriptionRegistry};
25
26#[derive(Clone, Debug)]
27struct CommittedAppend {
28    append_result: AppendResult,
29    event_records: Vec<EventRecord>,
30}
31
32#[derive(Clone, Debug)]
33struct ReplayBatch {
34    last_processed_sequence_number: u64,
35    delivered_batch: Vec<EventRecord>,
36}
37
38#[derive(Clone, Debug)]
39struct DurableReplayState {
40    subscription_id: u64,
41    last_processed_sequence_number: u64,
42    replay_until_sequence_number: u64,
43}
44
45enum WorkerCommand {
46    Query {
47        event_query: EventQuery,
48        reply: Sender<Result<QueryResult, EventStoreError>>,
49    },
50    Append {
51        new_events: Vec<NewEvent>,
52        reply: Sender<Result<AppendResult, EventStoreError>>,
53    },
54    AppendIf {
55        new_events: Vec<NewEvent>,
56        context_query: EventQuery,
57        expected_context_version: Option<u64>,
58        reply: Sender<Result<AppendResult, EventStoreError>>,
59    },
60    Shutdown,
61}
62
63enum DeliveryCommand {
64    Deliver(Vec<PendingDelivery>),
65    Shutdown,
66}
67
68#[derive(Clone, Debug, Eq, PartialEq)]
69pub struct PostgresBootstrapOptions {
70    pub server_url: String,
71    pub database_name: String,
72}
73
74pub struct PostgresStore {
75    connection_string: String,
76    subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
77    worker_sender: Mutex<Sender<WorkerCommand>>,
78    worker_thread: Mutex<Option<JoinHandle<()>>>,
79    delivery_sender: Sender<DeliveryCommand>,
80    delivery_thread: Mutex<Option<JoinHandle<()>>>,
81}
82
83impl PostgresStore {
84    pub fn connect(connection_string: &str) -> Result<Self, sqlx::Error> {
85        let connection_string = connection_string.to_owned();
86        bootstrap_connection(&connection_string)?;
87
88        let subscription_registry = Arc::new(Mutex::new(SubscriptionRegistry::default()));
89        let (delivery_sender, delivery_receiver) = mpsc::channel();
90        let delivery_thread = thread::Builder::new()
91            .name("factstr-postgres-delivery".to_owned())
92            .spawn({
93                let connection_string = connection_string.clone();
94                let subscription_registry = Arc::clone(&subscription_registry);
95                move || {
96                    run_delivery_thread(connection_string, subscription_registry, delivery_receiver)
97                }
98            })
99            .map_err(sqlx_io_error)?;
100
101        let (worker_sender, worker_receiver) = mpsc::channel();
102        let (ready_sender, ready_receiver) = mpsc::sync_channel(1);
103        let worker_thread = thread::Builder::new()
104            .name("factstr-postgres-worker".to_owned())
105            .spawn({
106                let connection_string = connection_string.clone();
107                let subscription_registry = Arc::clone(&subscription_registry);
108                let delivery_sender = delivery_sender.clone();
109                move || {
110                    run_worker_thread(
111                        connection_string,
112                        worker_receiver,
113                        ready_sender,
114                        subscription_registry,
115                        delivery_sender,
116                    )
117                }
118            })
119            .map_err(sqlx_io_error)?;
120
121        match ready_receiver.recv() {
122            Ok(Ok(())) => {}
123            Ok(Err(error)) => {
124                let _ = worker_thread.join();
125                let _ = delivery_sender.send(DeliveryCommand::Shutdown);
126                let _ = delivery_thread.join();
127                return Err(error);
128            }
129            Err(error) => {
130                let _ = worker_thread.join();
131                let _ = delivery_sender.send(DeliveryCommand::Shutdown);
132                let _ = delivery_thread.join();
133                return Err(sqlx_io_error(io::Error::other(format!(
134                    "postgres worker startup channel failed: {error}"
135                ))));
136            }
137        }
138
139        Ok(Self {
140            connection_string,
141            subscription_registry,
142            worker_sender: Mutex::new(worker_sender),
143            worker_thread: Mutex::new(Some(worker_thread)),
144            delivery_sender,
145            delivery_thread: Mutex::new(Some(delivery_thread)),
146        })
147    }
148
149    pub fn bootstrap(options: PostgresBootstrapOptions) -> Result<Self, EventStoreError> {
150        bootstrap_database(&options)?;
151        let database_url = database_url_with_name(&options.server_url, &options.database_name)?;
152        Self::connect(&database_url).map_err(Self::backend_failure)
153    }
154
155    fn backend_failure(error: sqlx::Error) -> EventStoreError {
156        EventStoreError::BackendFailure {
157            message: error.to_string(),
158        }
159    }
160
161    fn worker_failure(message: impl Into<String>) -> EventStoreError {
162        EventStoreError::BackendFailure {
163            message: message.into(),
164        }
165    }
166
167    fn send_command(&self, worker_command: WorkerCommand) -> Result<(), EventStoreError> {
168        let worker_sender = self
169            .worker_sender
170            .lock()
171            .map_err(|_| Self::worker_failure("postgres worker sender lock poisoned"))?;
172
173        worker_sender
174            .send(worker_command)
175            .map_err(|error| Self::worker_failure(format!("postgres worker stopped: {error}")))
176    }
177
178    fn run_query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
179        let (reply_sender, reply_receiver) = mpsc::channel();
180        self.send_command(WorkerCommand::Query {
181            event_query: event_query.clone(),
182            reply: reply_sender,
183        })?;
184
185        reply_receiver.recv().map_err(|error| {
186            Self::worker_failure(format!("postgres worker query reply failed: {error}"))
187        })?
188    }
189
190    fn run_append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
191        let (reply_sender, reply_receiver) = mpsc::channel();
192        self.send_command(WorkerCommand::Append {
193            new_events,
194            reply: reply_sender,
195        })?;
196
197        reply_receiver.recv().map_err(|error| {
198            Self::worker_failure(format!("postgres worker append reply failed: {error}"))
199        })?
200    }
201
202    fn run_append_if(
203        &self,
204        new_events: Vec<NewEvent>,
205        context_query: &EventQuery,
206        expected_context_version: Option<u64>,
207    ) -> Result<AppendResult, EventStoreError> {
208        let (reply_sender, reply_receiver) = mpsc::channel();
209        self.send_command(WorkerCommand::AppendIf {
210            new_events,
211            context_query: context_query.clone(),
212            expected_context_version,
213            reply: reply_sender,
214        })?;
215
216        reply_receiver.recv().map_err(|error| {
217            Self::worker_failure(format!(
218                "postgres worker conditional append reply failed: {error}"
219            ))
220        })?
221    }
222
223    fn run_async<T, Fut, F>(&self, operation: &'static str, work: F) -> Result<T, EventStoreError>
224    where
225        T: Send + 'static,
226        Fut: Future<Output = Result<T, EventStoreError>> + Send + 'static,
227        F: FnOnce(PgPool) -> Fut + Send + 'static,
228    {
229        let connection_string = self.connection_string.clone();
230        let (result_sender, result_receiver) = mpsc::sync_channel(1);
231
232        let worker_thread = thread::Builder::new()
233            .name(format!("factstr-postgres-{operation}"))
234            .spawn(move || {
235                let runtime = Builder::new_current_thread()
236                    .enable_all()
237                    .build()
238                    .map_err(sqlx_io_error)
239                    .map_err(Self::backend_failure);
240
241                let result = match runtime {
242                    Ok(runtime) => runtime.block_on(async {
243                        let pool = PgPoolOptions::new()
244                            .max_connections(1)
245                            .connect(&connection_string)
246                            .await
247                            .map_err(Self::backend_failure)?;
248                        work(pool).await
249                    }),
250                    Err(error) => Err(error),
251                };
252
253                let _ = result_sender.send(result);
254            })
255            .map_err(sqlx_io_error)
256            .map_err(Self::backend_failure)?;
257
258        let result = result_receiver
259            .recv()
260            .map_err(|error| EventStoreError::BackendFailure {
261                message: format!("postgres {operation} thread did not return a result: {error}"),
262            })?;
263
264        worker_thread
265            .join()
266            .map_err(|_| EventStoreError::BackendFailure {
267                message: format!("postgres {operation} thread panicked before completion"),
268            })?;
269
270        result
271    }
272
273    fn enqueue_delivery(&self, pending_deliveries: Vec<PendingDelivery>) {
274        Self::enqueue_delivery_with_sender(&self.delivery_sender, pending_deliveries);
275    }
276
277    fn enqueue_delivery_with_sender(
278        delivery_sender: &Sender<DeliveryCommand>,
279        pending_deliveries: Vec<PendingDelivery>,
280    ) {
281        if pending_deliveries.is_empty() {
282            return;
283        }
284
285        if let Err(error) = delivery_sender.send(DeliveryCommand::Deliver(pending_deliveries)) {
286            eprintln!(
287                "factstr-postgres delivery dispatcher stopped after commit: {}",
288                error
289            );
290        }
291    }
292
293    fn register_all_durable_stream(
294        &self,
295        durable_stream_id: impl Into<String>,
296        handle: HandleStream,
297    ) -> Result<EventStream, EventStoreError> {
298        self.stream_durable(durable_stream_id.into(), EventQuery::all(), handle)
299    }
300
301    fn register_durable_stream(
302        &self,
303        durable_stream_id: impl Into<String>,
304        event_query: &EventQuery,
305        handle: HandleStream,
306    ) -> Result<EventStream, EventStoreError> {
307        self.stream_durable(durable_stream_id.into(), event_query.clone(), handle)
308    }
309
310    fn stream_durable(
311        &self,
312        durable_stream_id: String,
313        event_query: EventQuery,
314        handle: HandleStream,
315    ) -> Result<EventStream, EventStoreError> {
316        let normalized_event_query = normalized_durable_event_query(&event_query);
317        let event_query_json = serialize_event_query(&normalized_event_query)?;
318        let subscription_registry = Arc::clone(&self.subscription_registry);
319        let durable_stream_id_for_registry = durable_stream_id.clone();
320        let normalized_event_query_for_registry = normalized_event_query.clone();
321        let handle_for_registry = handle.clone();
322        let replay_stream_id = durable_stream_id.clone();
323
324        let replay_state = self.run_async("stream_durable", move |pool| async move {
325            let mut transaction = pool.begin().await.map_err(Self::backend_failure)?;
326
327            let last_processed_sequence_number = load_or_create_durable_stream_cursor(
328                &mut transaction,
329                &durable_stream_id_for_registry,
330                &event_query_json,
331            )
332            .await?;
333            let replay_until_sequence_number =
334                current_max_sequence_number_in_transaction(&mut transaction).await?;
335
336            let subscription_id = match subscription_registry.lock() {
337                Ok(mut subscription_registry) => {
338                    if normalized_event_query_for_registry.filters.is_none() {
339                        subscription_registry
340                            .subscribe_all_durable(
341                                durable_stream_id_for_registry.clone(),
342                                replay_until_sequence_number,
343                                handle_for_registry.clone(),
344                            )
345                            .map_err(subscription_registry_backend_failure)?
346                    } else {
347                        subscription_registry
348                            .subscribe_to_durable(
349                                durable_stream_id_for_registry.clone(),
350                                Some(normalized_event_query_for_registry.clone()),
351                                replay_until_sequence_number,
352                                handle_for_registry.clone(),
353                            )
354                            .map_err(subscription_registry_backend_failure)?
355                    }
356                }
357                Err(poisoned) => {
358                    let mut subscription_registry = poisoned.into_inner();
359                    if normalized_event_query_for_registry.filters.is_none() {
360                        subscription_registry
361                            .subscribe_all_durable(
362                                durable_stream_id_for_registry.clone(),
363                                replay_until_sequence_number,
364                                handle_for_registry.clone(),
365                            )
366                            .map_err(subscription_registry_backend_failure)?
367                    } else {
368                        subscription_registry
369                            .subscribe_to_durable(
370                                durable_stream_id_for_registry.clone(),
371                                Some(normalized_event_query_for_registry.clone()),
372                                replay_until_sequence_number,
373                                handle_for_registry.clone(),
374                            )
375                            .map_err(subscription_registry_backend_failure)?
376                    }
377                }
378            };
379
380            if let Err(error) = transaction.commit().await {
381                match subscription_registry.lock() {
382                    Ok(mut subscription_registry) => {
383                        subscription_registry.unsubscribe(subscription_id)
384                    }
385                    Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
386                }
387                return Err(Self::backend_failure(error));
388            }
389
390            Ok(DurableReplayState {
391                subscription_id,
392                last_processed_sequence_number,
393                replay_until_sequence_number,
394            })
395        })?;
396
397        let subscription = self.build_subscription_handle(replay_state.subscription_id);
398
399        let replay_batches = match self.run_async("durable_replay", {
400            let event_query = normalized_event_query.clone();
401            move |pool| async move {
402                ensure_replay_history_is_available_for_pool(
403                    &pool,
404                    replay_state.replay_until_sequence_number,
405                )
406                .await?;
407                load_replay_batches(
408                    &pool,
409                    &event_query,
410                    replay_state.last_processed_sequence_number,
411                    replay_state.replay_until_sequence_number,
412                )
413                .await
414            }
415        }) {
416            Ok(replay_batches) => replay_batches,
417            Err(error) => {
418                self.cleanup_durable_subscription(replay_state.subscription_id);
419                return Err(error);
420            }
421        };
422        let delivery_runtime = build_delivery_runtime("factstr-postgres durable replay")?;
423
424        for replay_batch in replay_batches {
425            let pending_delivery = PendingDelivery {
426                subscription_id: replay_state.subscription_id,
427                durable_stream_id: Some(replay_stream_id.clone()),
428                last_processed_sequence_number: replay_batch.last_processed_sequence_number,
429                delivered_batch: replay_batch.delivered_batch,
430                handle: handle.clone(),
431            };
432
433            match self.process_durable_delivery(&delivery_runtime, pending_delivery) {
434                Ok(true) => {}
435                Ok(false) => {
436                    self.cleanup_durable_subscription(replay_state.subscription_id);
437                    return Err(EventStoreError::BackendFailure {
438                        message: format!(
439                            "durable replay for stream {} did not complete successfully",
440                            replay_stream_id
441                        ),
442                    });
443                }
444                Err(error) => {
445                    self.cleanup_durable_subscription(replay_state.subscription_id);
446                    return Err(error);
447                }
448            }
449        }
450
451        self.finish_durable_replay(replay_state.subscription_id);
452        Ok(subscription)
453    }
454
455    fn build_subscription_handle(&self, subscription_id: u64) -> EventStream {
456        let subscription_registry = Arc::clone(&self.subscription_registry);
457
458        EventStream::new(
459            subscription_id,
460            Arc::new(move |subscription_id| match subscription_registry.lock() {
461                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
462                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
463            }),
464        )
465    }
466
467    fn process_durable_delivery(
468        &self,
469        delivery_runtime: &Runtime,
470        pending_delivery: PendingDelivery,
471    ) -> Result<bool, EventStoreError> {
472        match pending_delivery.deliver(delivery_runtime) {
473            DeliveryOutcome::Succeeded {
474                durable_stream_id,
475                last_processed_sequence_number,
476                ..
477            } => {
478                if let Some(durable_stream_id) = durable_stream_id {
479                    self.run_async("advance_durable_cursor", move |pool| async move {
480                        update_durable_stream_cursor(
481                            &pool,
482                            &durable_stream_id,
483                            last_processed_sequence_number,
484                        )
485                        .await
486                    })?;
487                }
488
489                Ok(true)
490            }
491            DeliveryOutcome::Failed { .. } | DeliveryOutcome::Panicked { .. } => Ok(false),
492        }
493    }
494
495    fn finish_durable_replay(&self, subscription_id: u64) {
496        match self.subscription_registry.lock() {
497            Ok(mut subscription_registry) => {
498                let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
499                self.enqueue_delivery(buffered_deliveries);
500            }
501            Err(poisoned) => {
502                let mut subscription_registry = poisoned.into_inner();
503                let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
504                self.enqueue_delivery(buffered_deliveries);
505            }
506        }
507    }
508
509    fn cleanup_durable_subscription(&self, subscription_id: u64) {
510        match self.subscription_registry.lock() {
511            Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
512            Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
513        }
514    }
515}
516
517impl Drop for PostgresStore {
518    fn drop(&mut self) {
519        if let Ok(worker_sender) = self.worker_sender.lock() {
520            let _ = worker_sender.send(WorkerCommand::Shutdown);
521        }
522
523        if let Ok(mut worker_thread) = self.worker_thread.lock() {
524            if let Some(worker_thread) = worker_thread.take() {
525                let _ = worker_thread.join();
526            }
527        }
528
529        let _ = self.delivery_sender.send(DeliveryCommand::Shutdown);
530
531        if let Ok(mut delivery_thread) = self.delivery_thread.lock() {
532            if let Some(delivery_thread) = delivery_thread.take() {
533                let _ = delivery_thread.join();
534            }
535        }
536    }
537}
538
539impl EventStore for PostgresStore {
540    fn query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
541        self.run_query(event_query)
542    }
543
544    fn append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
545        if new_events.is_empty() {
546            return Err(EventStoreError::EmptyAppend);
547        }
548
549        self.run_append(new_events)
550    }
551
552    fn append_if(
553        &self,
554        new_events: Vec<NewEvent>,
555        context_query: &EventQuery,
556        expected_context_version: Option<u64>,
557    ) -> Result<AppendResult, EventStoreError> {
558        if new_events.is_empty() {
559            return Err(EventStoreError::EmptyAppend);
560        }
561
562        self.run_append_if(new_events, context_query, expected_context_version)
563    }
564
565    fn stream_all(&self, handle: HandleStream) -> Result<EventStream, EventStoreError> {
566        let subscription_registry = Arc::clone(&self.subscription_registry);
567        let id = match subscription_registry.lock() {
568            Ok(mut subscription_registry) => subscription_registry.subscribe_all(handle),
569            Err(poisoned) => poisoned.into_inner().subscribe_all(handle),
570        };
571
572        Ok(EventStream::new(
573            id,
574            Arc::new(move |subscription_id| match subscription_registry.lock() {
575                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
576                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
577            }),
578        ))
579    }
580
581    fn stream_to(
582        &self,
583        event_query: &EventQuery,
584        handle: HandleStream,
585    ) -> Result<EventStream, EventStoreError> {
586        let subscription_registry = Arc::clone(&self.subscription_registry);
587        let id = match subscription_registry.lock() {
588            Ok(mut subscription_registry) => {
589                subscription_registry.subscribe_to(Some(event_query.clone()), handle)
590            }
591            Err(poisoned) => poisoned
592                .into_inner()
593                .subscribe_to(Some(event_query.clone()), handle),
594        };
595
596        Ok(EventStream::new(
597            id,
598            Arc::new(move |subscription_id| match subscription_registry.lock() {
599                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
600                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
601            }),
602        ))
603    }
604
605    fn stream_all_durable(
606        &self,
607        durable_stream: &DurableStream,
608        handle: HandleStream,
609    ) -> Result<EventStream, EventStoreError> {
610        self.register_all_durable_stream(durable_stream.name(), handle)
611    }
612
613    fn stream_to_durable(
614        &self,
615        durable_stream: &DurableStream,
616        event_query: &EventQuery,
617        handle: HandleStream,
618    ) -> Result<EventStream, EventStoreError> {
619        self.register_durable_stream(durable_stream.name(), event_query, handle)
620    }
621}
622
623fn run_worker_thread(
624    connection_string: String,
625    worker_receiver: Receiver<WorkerCommand>,
626    ready_sender: mpsc::SyncSender<Result<(), sqlx::Error>>,
627    subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
628    delivery_sender: Sender<DeliveryCommand>,
629) {
630    let runtime = match Builder::new_current_thread().enable_all().build() {
631        Ok(runtime) => runtime,
632        Err(error) => {
633            let _ = ready_sender.send(Err(sqlx_io_error(io::Error::other(format!(
634                "tokio runtime should build: {error}"
635            )))));
636            return;
637        }
638    };
639
640    let pool = match runtime.block_on(async {
641        PgPoolOptions::new()
642            .max_connections(1)
643            .connect(&connection_string)
644            .await
645    }) {
646        Ok(pool) => pool,
647        Err(error) => {
648            let _ = ready_sender.send(Err(error));
649            return;
650        }
651    };
652
653    if ready_sender.send(Ok(())).is_err() {
654        return;
655    }
656
657    while let Ok(worker_command) = worker_receiver.recv() {
658        match worker_command {
659            WorkerCommand::Query { event_query, reply } => {
660                let result = runtime
661                    .block_on(query_with_pool(&pool, &event_query))
662                    .map_err(PostgresStore::backend_failure);
663                let _ = reply.send(result);
664            }
665            WorkerCommand::Append { new_events, reply } => {
666                let result = runtime
667                    .block_on(append_with_pool(&pool, new_events))
668                    .map_err(PostgresStore::backend_failure);
669                if let Ok(committed_append) = &result {
670                    let pending_deliveries = match subscription_registry.lock() {
671                        Ok(mut subscription_registry) => subscription_registry
672                            .pending_deliveries(&committed_append.event_records),
673                        Err(poisoned) => poisoned
674                            .into_inner()
675                            .pending_deliveries(&committed_append.event_records),
676                    };
677                    PostgresStore::enqueue_delivery_with_sender(
678                        &delivery_sender,
679                        pending_deliveries,
680                    );
681                }
682                let result = result.map(|committed_append| committed_append.append_result);
683                let _ = reply.send(result);
684            }
685            WorkerCommand::AppendIf {
686                new_events,
687                context_query,
688                expected_context_version,
689                reply,
690            } => {
691                let result = runtime
692                    .block_on(append_if_with_pool(
693                        &pool,
694                        new_events,
695                        &context_query,
696                        expected_context_version,
697                    ))
698                    .map_err(PostgresStore::backend_failure)
699                    .and_then(|result| result);
700                if let Ok(committed_append) = &result {
701                    let pending_deliveries = match subscription_registry.lock() {
702                        Ok(mut subscription_registry) => subscription_registry
703                            .pending_deliveries(&committed_append.event_records),
704                        Err(poisoned) => poisoned
705                            .into_inner()
706                            .pending_deliveries(&committed_append.event_records),
707                    };
708                    PostgresStore::enqueue_delivery_with_sender(
709                        &delivery_sender,
710                        pending_deliveries,
711                    );
712                }
713                let result = result.map(|committed_append| committed_append.append_result);
714                let _ = reply.send(result);
715            }
716            WorkerCommand::Shutdown => break,
717        }
718    }
719}
720
721fn run_delivery_thread(
722    connection_string: String,
723    subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
724    delivery_receiver: Receiver<DeliveryCommand>,
725) {
726    let runtime = match Builder::new_current_thread().enable_all().build() {
727        Ok(runtime) => runtime,
728        Err(error) => {
729            eprintln!(
730                "factstr-postgres delivery runtime could not start: {}",
731                error
732            );
733            return;
734        }
735    };
736
737    let pool = match runtime.block_on(async {
738        PgPoolOptions::new()
739            .max_connections(1)
740            .connect(&connection_string)
741            .await
742    }) {
743        Ok(pool) => pool,
744        Err(error) => {
745            eprintln!(
746                "factstr-postgres delivery pool could not connect: {}",
747                error
748            );
749            return;
750        }
751    };
752
753    while let Ok(delivery_command) = delivery_receiver.recv() {
754        match delivery_command {
755            DeliveryCommand::Deliver(pending_deliveries) => {
756                for pending_delivery in pending_deliveries {
757                    match pending_delivery.deliver(&runtime) {
758                        DeliveryOutcome::Succeeded {
759                            subscription_id,
760                            durable_stream_id,
761                            last_processed_sequence_number,
762                        } => {
763                            if let Some(durable_stream_id) = durable_stream_id {
764                                if let Err(error) = runtime.block_on(update_durable_stream_cursor(
765                                    &pool,
766                                    &durable_stream_id,
767                                    last_processed_sequence_number,
768                                )) {
769                                    eprintln!(
770                                        "factstr-postgres durable cursor update failed after delivery for stream {}: {}",
771                                        durable_stream_id, error
772                                    );
773                                    match subscription_registry.lock() {
774                                        Ok(mut subscription_registry) => {
775                                            subscription_registry.unsubscribe(subscription_id)
776                                        }
777                                        Err(poisoned) => {
778                                            poisoned.into_inner().unsubscribe(subscription_id)
779                                        }
780                                    }
781                                }
782                            }
783                        }
784                        DeliveryOutcome::Failed {
785                            subscription_id,
786                            durable_stream_id,
787                        }
788                        | DeliveryOutcome::Panicked {
789                            subscription_id,
790                            durable_stream_id,
791                        } => {
792                            if durable_stream_id.is_some() {
793                                match subscription_registry.lock() {
794                                    Ok(mut subscription_registry) => {
795                                        subscription_registry.unsubscribe(subscription_id)
796                                    }
797                                    Err(poisoned) => {
798                                        poisoned.into_inner().unsubscribe(subscription_id)
799                                    }
800                                }
801                            }
802                        }
803                    }
804                }
805            }
806            DeliveryCommand::Shutdown => break,
807        }
808    }
809}
810
811fn sqlx_io_error(error: io::Error) -> sqlx::Error {
812    sqlx::Error::Io(error)
813}
814
815fn backend_failure_message(message: impl Into<String>) -> EventStoreError {
816    EventStoreError::BackendFailure {
817        message: message.into(),
818    }
819}
820
821fn build_delivery_runtime(operation: &'static str) -> Result<Runtime, EventStoreError> {
822    Builder::new_current_thread()
823        .enable_all()
824        .build()
825        .map_err(sqlx_io_error)
826        .map_err(PostgresStore::backend_failure)
827        .map_err(|error| match error {
828            EventStoreError::BackendFailure { message } => EventStoreError::BackendFailure {
829                message: format!("{operation}: {message}"),
830            },
831            other => other,
832        })
833}
834
835fn bootstrap_database(options: &PostgresBootstrapOptions) -> Result<(), EventStoreError> {
836    let options = options.clone();
837    let (result_sender, result_receiver) = mpsc::sync_channel(1);
838
839    let bootstrap_thread = thread::Builder::new()
840        .name("factstr-postgres-database-bootstrap".to_owned())
841        .spawn(move || {
842            let runtime = Builder::new_current_thread()
843                .enable_all()
844                .build()
845                .map_err(sqlx_io_error)
846                .map_err(PostgresStore::backend_failure);
847
848            let result = match runtime {
849                Ok(runtime) => runtime.block_on(async move {
850                    ensure_database_exists(&options.server_url, &options.database_name).await
851                }),
852                Err(error) => Err(error),
853            };
854
855            let _ = result_sender.send(result);
856        })
857        .map_err(sqlx_io_error)
858        .map_err(PostgresStore::backend_failure)?;
859
860    let result = result_receiver.recv().map_err(|error| {
861        backend_failure_message(format!(
862            "postgres bootstrap database thread did not return a result: {error}"
863        ))
864    })?;
865
866    bootstrap_thread.join().map_err(|_| {
867        backend_failure_message("postgres bootstrap database thread panicked before completion")
868    })?;
869
870    result
871}
872
873async fn ensure_database_exists(
874    server_url: &str,
875    database_name: &str,
876) -> Result<(), EventStoreError> {
877    validate_bootstrap_database_name(database_name)?;
878    let _ = database_url_with_name(server_url, database_name)?;
879
880    let quoted_database_name = quote_postgres_identifier(database_name)?;
881    let pool = PgPoolOptions::new()
882        .max_connections(1)
883        .connect(server_url)
884        .await
885        .map_err(|error| {
886            backend_failure_message(format!(
887                "postgres bootstrap could not connect to server_url {server_url:?}: {error}"
888            ))
889        })?;
890
891    let database_exists = sqlx::query_scalar::<_, bool>(
892        "SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = $1)",
893    )
894    .bind(database_name)
895    .fetch_one(&pool)
896    .await
897    .map_err(|error| {
898        backend_failure_message(format!(
899            "postgres bootstrap could not inspect database {database_name:?}: {error}"
900        ))
901    })?;
902
903    if database_exists {
904        return Ok(());
905    }
906
907    let create_database_sql = format!("CREATE DATABASE {quoted_database_name}");
908    match sqlx::query(&create_database_sql).execute(&pool).await {
909        Ok(_) => Ok(()),
910        Err(sqlx::Error::Database(database_error))
911            if database_error.code().as_deref() == Some("42P04") =>
912        {
913            Ok(())
914        }
915        Err(error) => Err(backend_failure_message(format!(
916            "postgres bootstrap could not create database {database_name:?}: {error}"
917        ))),
918    }
919}
920
921fn quote_postgres_identifier(identifier: &str) -> Result<String, EventStoreError> {
922    if identifier.is_empty() {
923        return Err(backend_failure_message(
924            "postgres bootstrap database_name must not be empty",
925        ));
926    }
927
928    if identifier.contains('\0') {
929        return Err(backend_failure_message(
930            "postgres bootstrap database_name must not contain null bytes",
931        ));
932    }
933
934    Ok(format!("\"{}\"", identifier.replace('"', "\"\"")))
935}
936
937fn validate_bootstrap_database_name(database_name: &str) -> Result<(), EventStoreError> {
938    if database_name.is_empty() {
939        return Err(backend_failure_message(
940            "postgres bootstrap database_name must not be empty",
941        ));
942    }
943
944    if database_name.contains('\0') {
945        return Err(backend_failure_message(
946            "postgres bootstrap database_name must not contain null bytes",
947        ));
948    }
949
950    let mut characters = database_name.chars();
951    let Some(first_character) = characters.next() else {
952        return Err(backend_failure_message(
953            "postgres bootstrap database_name must not be empty",
954        ));
955    };
956
957    if !(first_character.is_ascii_alphabetic() || first_character == '_') {
958        return Err(backend_failure_message(
959            "postgres bootstrap database_name must match [A-Za-z_][A-Za-z0-9_]*",
960        ));
961    }
962
963    if !characters.all(|character| character.is_ascii_alphanumeric() || character == '_') {
964        return Err(backend_failure_message(
965            "postgres bootstrap database_name must match [A-Za-z_][A-Za-z0-9_]*",
966        ));
967    }
968
969    Ok(())
970}
971
972fn database_url_with_name(
973    server_url: &str,
974    database_name: &str,
975) -> Result<String, EventStoreError> {
976    let mut url = Url::parse(server_url).map_err(|error| {
977        backend_failure_message(format!(
978            "invalid postgres server_url {server_url:?}: {error}"
979        ))
980    })?;
981    url.set_path(&format!("/{database_name}"));
982    Ok(url.into())
983}
984
985fn bootstrap_connection(connection_string: &str) -> Result<(), sqlx::Error> {
986    let connection_string = connection_string.to_owned();
987    let (result_sender, result_receiver) = mpsc::sync_channel(1);
988
989    let bootstrap_thread = thread::Builder::new()
990        .name("factstr-postgres-bootstrap".to_owned())
991        .spawn(move || {
992            let runtime = Builder::new_current_thread()
993                .enable_all()
994                .build()
995                .map_err(sqlx_io_error);
996
997            let result = match runtime {
998                Ok(runtime) => runtime.block_on(async {
999                    let pool = PgPoolOptions::new()
1000                        .max_connections(1)
1001                        .connect(&connection_string)
1002                        .await?;
1003                    initialize_schema(&pool).await?;
1004                    Ok::<_, sqlx::Error>(())
1005                }),
1006                Err(error) => Err(error),
1007            };
1008
1009            let _ = result_sender.send(result);
1010        })
1011        .map_err(sqlx_io_error)?;
1012
1013    let result = match result_receiver.recv() {
1014        Ok(result) => result,
1015        Err(error) => Err(sqlx_io_error(io::Error::other(format!(
1016            "postgres bootstrap thread did not return a result: {error}"
1017        )))),
1018    };
1019
1020    bootstrap_thread.join().map_err(|_| {
1021        sqlx_io_error(io::Error::other(
1022            "postgres bootstrap thread panicked before connect completed",
1023        ))
1024    })?;
1025
1026    result
1027}
1028
1029async fn query_with_pool(
1030    pool: &PgPool,
1031    event_query: &EventQuery,
1032) -> Result<QueryResult, sqlx::Error> {
1033    let current_context_version = current_context_version(pool, event_query).await?;
1034
1035    let mut query_builder: QueryBuilder<'_, Postgres> =
1036        QueryBuilder::new("SELECT sequence_number, occurred_at, event_type, payload FROM events");
1037    push_query_conditions(&mut query_builder, event_query, true);
1038    query_builder.push(" ORDER BY sequence_number ASC");
1039
1040    let event_records = query_builder
1041        .build()
1042        .fetch_all(pool)
1043        .await?
1044        .into_iter()
1045        .map(event_record_from_row)
1046        .collect::<Result<Vec<_>, _>>()?;
1047
1048    let last_returned_sequence_number = event_records
1049        .last()
1050        .map(|event_record| event_record.sequence_number);
1051
1052    Ok(QueryResult {
1053        event_records,
1054        last_returned_sequence_number,
1055        current_context_version,
1056    })
1057}
1058
1059async fn append_with_pool(
1060    pool: &PgPool,
1061    new_events: Vec<NewEvent>,
1062) -> Result<CommittedAppend, sqlx::Error> {
1063    let mut transaction = pool.begin().await?;
1064    lock_events_tables(&mut transaction).await?;
1065    let committed_append = append_records(&mut transaction, new_events).await?;
1066    transaction.commit().await?;
1067    Ok(committed_append)
1068}
1069
1070async fn append_if_with_pool(
1071    pool: &PgPool,
1072    new_events: Vec<NewEvent>,
1073    context_query: &EventQuery,
1074    expected_context_version: Option<u64>,
1075) -> Result<Result<CommittedAppend, EventStoreError>, sqlx::Error> {
1076    let mut transaction = pool.begin().await?;
1077    lock_events_tables(&mut transaction).await?;
1078
1079    let actual_context_version =
1080        current_context_version_in_transaction(&mut transaction, context_query).await?;
1081
1082    if actual_context_version != expected_context_version {
1083        transaction.rollback().await?;
1084        return Ok(Err(EventStoreError::ConditionalAppendConflict {
1085            expected: expected_context_version,
1086            actual: actual_context_version,
1087        }));
1088    }
1089
1090    let committed_append = append_records(&mut transaction, new_events).await?;
1091    transaction.commit().await?;
1092
1093    Ok(Ok(committed_append))
1094}
1095
1096async fn initialize_schema(pool: &PgPool) -> Result<(), sqlx::Error> {
1097    sqlx::query(
1098        "CREATE TABLE IF NOT EXISTS events (
1099            sequence_number BIGINT PRIMARY KEY,
1100            occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
1101            event_type TEXT NOT NULL,
1102            payload JSONB NOT NULL
1103        )",
1104    )
1105    .execute(pool)
1106    .await?;
1107
1108    sqlx::query(
1109        "CREATE TABLE IF NOT EXISTS append_batches (
1110            first_sequence_number BIGINT PRIMARY KEY,
1111            last_sequence_number BIGINT NOT NULL
1112        )",
1113    )
1114    .execute(pool)
1115    .await?;
1116
1117    sqlx::query(
1118        "CREATE TABLE IF NOT EXISTS durable_stream_cursors (
1119            durable_stream_id TEXT PRIMARY KEY,
1120            event_query TEXT NOT NULL,
1121            last_processed_sequence_number BIGINT NOT NULL
1122        )",
1123    )
1124    .execute(pool)
1125    .await?;
1126
1127    sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type)")
1128        .execute(pool)
1129        .await?;
1130
1131    sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_occurred_at ON events(occurred_at)")
1132        .execute(pool)
1133        .await?;
1134
1135    sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_payload_gin ON events USING gin(payload)")
1136        .execute(pool)
1137        .await?;
1138
1139    Ok(())
1140}
1141
1142async fn lock_events_tables(
1143    transaction: &mut Transaction<'_, Postgres>,
1144) -> Result<(), sqlx::Error> {
1145    sqlx::query("LOCK TABLE events, append_batches IN EXCLUSIVE MODE")
1146        .execute(transaction.as_mut())
1147        .await?;
1148
1149    Ok(())
1150}
1151
1152async fn append_records(
1153    transaction: &mut Transaction<'_, Postgres>,
1154    new_events: Vec<NewEvent>,
1155) -> Result<CommittedAppend, sqlx::Error> {
1156    let committed_count = new_events.len() as u64;
1157    let first_sequence_number =
1158        sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(sequence_number), 0) + 1 FROM events")
1159            .fetch_one(transaction.as_mut())
1160            .await? as u64;
1161    let last_sequence_number = first_sequence_number + committed_count - 1;
1162    let committed_event_records = new_events
1163        .into_iter()
1164        .enumerate()
1165        .map(|(offset, new_event)| EventRecord {
1166            sequence_number: first_sequence_number + offset as u64,
1167            occurred_at: OffsetDateTime::now_utc(),
1168            event_type: new_event.event_type,
1169            payload: new_event.payload,
1170        })
1171        .collect::<Vec<_>>();
1172
1173    for event_record in &committed_event_records {
1174        sqlx::query(
1175            "INSERT INTO events (sequence_number, occurred_at, event_type, payload)
1176             VALUES ($1, $2, $3, $4)",
1177        )
1178        .bind(event_record.sequence_number as i64)
1179        .bind(event_record.occurred_at)
1180        .bind(&event_record.event_type)
1181        .bind(&event_record.payload)
1182        .execute(transaction.as_mut())
1183        .await?;
1184    }
1185
1186    sqlx::query(
1187        "INSERT INTO append_batches (first_sequence_number, last_sequence_number)
1188         VALUES ($1, $2)",
1189    )
1190    .bind(first_sequence_number as i64)
1191    .bind(last_sequence_number as i64)
1192    .execute(transaction.as_mut())
1193    .await?;
1194
1195    Ok(CommittedAppend {
1196        append_result: AppendResult {
1197            first_sequence_number,
1198            last_sequence_number,
1199            committed_count,
1200        },
1201        event_records: committed_event_records,
1202    })
1203}
1204
1205async fn current_context_version(
1206    pool: &PgPool,
1207    event_query: &EventQuery,
1208) -> Result<Option<u64>, sqlx::Error> {
1209    let mut query_builder: QueryBuilder<'_, Postgres> =
1210        QueryBuilder::new("SELECT MAX(sequence_number) FROM events");
1211    push_query_conditions(&mut query_builder, event_query, false);
1212
1213    Ok(query_builder
1214        .build_query_scalar::<Option<i64>>()
1215        .fetch_one(pool)
1216        .await?
1217        .map(|sequence_number| sequence_number as u64))
1218}
1219
1220async fn current_context_version_in_transaction(
1221    transaction: &mut Transaction<'_, Postgres>,
1222    event_query: &EventQuery,
1223) -> Result<Option<u64>, sqlx::Error> {
1224    let mut query_builder: QueryBuilder<'_, Postgres> =
1225        QueryBuilder::new("SELECT MAX(sequence_number) FROM events");
1226    push_query_conditions(&mut query_builder, event_query, false);
1227
1228    Ok(query_builder
1229        .build_query_scalar::<Option<i64>>()
1230        .fetch_one(transaction.as_mut())
1231        .await?
1232        .map(|sequence_number| sequence_number as u64))
1233}
1234
1235fn event_record_from_row(row: PgRow) -> Result<EventRecord, sqlx::Error> {
1236    Ok(EventRecord {
1237        sequence_number: row.try_get::<i64, _>("sequence_number")? as u64,
1238        occurred_at: row.try_get("occurred_at")?,
1239        event_type: row.try_get("event_type")?,
1240        payload: row.try_get("payload")?,
1241    })
1242}
1243
1244fn json_backend_failure(error: serde_json::Error) -> EventStoreError {
1245    EventStoreError::BackendFailure {
1246        message: error.to_string(),
1247    }
1248}
1249
1250fn subscription_registry_backend_failure(message: String) -> EventStoreError {
1251    EventStoreError::BackendFailure { message }
1252}
1253
1254async fn load_or_create_durable_stream_cursor(
1255    transaction: &mut Transaction<'_, Postgres>,
1256    durable_stream_id: &str,
1257    event_query_json: &str,
1258) -> Result<u64, EventStoreError> {
1259    let existing_cursor = sqlx::query(
1260        "SELECT event_query, last_processed_sequence_number
1261         FROM durable_stream_cursors
1262         WHERE durable_stream_id = $1",
1263    )
1264    .bind(durable_stream_id)
1265    .fetch_optional(transaction.as_mut())
1266    .await
1267    .map_err(PostgresStore::backend_failure)?;
1268
1269    match existing_cursor {
1270        Some(row) => {
1271            let stored_event_query = row.get::<String, _>("event_query");
1272            if stored_event_query != event_query_json {
1273                return Err(EventStoreError::BackendFailure {
1274                    message: format!(
1275                        "durable stream {durable_stream_id} was resumed with a different query"
1276                    ),
1277                });
1278            }
1279
1280            Ok(row.get::<i64, _>("last_processed_sequence_number") as u64)
1281        }
1282        None => {
1283            sqlx::query(
1284                "INSERT INTO durable_stream_cursors (
1285                    durable_stream_id,
1286                    event_query,
1287                    last_processed_sequence_number
1288                 ) VALUES ($1, $2, 0)",
1289            )
1290            .bind(durable_stream_id)
1291            .bind(event_query_json)
1292            .execute(transaction.as_mut())
1293            .await
1294            .map_err(PostgresStore::backend_failure)?;
1295
1296            Ok(0)
1297        }
1298    }
1299}
1300
1301async fn current_max_sequence_number_in_transaction(
1302    transaction: &mut Transaction<'_, Postgres>,
1303) -> Result<u64, EventStoreError> {
1304    Ok(
1305        sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(sequence_number) FROM events")
1306            .fetch_one(transaction.as_mut())
1307            .await
1308            .map_err(PostgresStore::backend_failure)?
1309            .unwrap_or(0) as u64,
1310    )
1311}
1312
1313async fn ensure_replay_history_is_available(
1314    connection: &mut sqlx::PgConnection,
1315    current_max_sequence_number: u64,
1316) -> Result<(), EventStoreError> {
1317    if current_max_sequence_number == 0 {
1318        return Ok(());
1319    }
1320
1321    let batch_rows = sqlx::query(
1322        "SELECT first_sequence_number, last_sequence_number
1323         FROM append_batches
1324         ORDER BY first_sequence_number ASC",
1325    )
1326    .fetch_all(&mut *connection)
1327    .await
1328    .map_err(PostgresStore::backend_failure)?;
1329
1330    if batch_rows.is_empty() {
1331        return Err(EventStoreError::BackendFailure {
1332            message: "durable replay requires append_batches history for all persisted events"
1333                .to_owned(),
1334        });
1335    }
1336
1337    let mut expected_first_sequence_number = 1_u64;
1338
1339    for batch_row in batch_rows {
1340        let first_sequence_number = batch_row.get::<i64, _>("first_sequence_number") as u64;
1341        let last_sequence_number = batch_row.get::<i64, _>("last_sequence_number") as u64;
1342
1343        if first_sequence_number != expected_first_sequence_number
1344            || last_sequence_number < first_sequence_number
1345        {
1346            return Err(EventStoreError::BackendFailure {
1347                message:
1348                    "durable replay requires contiguous append_batches history for all persisted events"
1349                        .to_owned(),
1350            });
1351        }
1352
1353        expected_first_sequence_number = last_sequence_number + 1;
1354    }
1355
1356    if expected_first_sequence_number - 1 != current_max_sequence_number {
1357        return Err(EventStoreError::BackendFailure {
1358            message: "durable replay requires append_batches history for all persisted events"
1359                .to_owned(),
1360        });
1361    }
1362
1363    Ok(())
1364}
1365
1366async fn ensure_replay_history_is_available_for_pool(
1367    pool: &PgPool,
1368    current_max_sequence_number: u64,
1369) -> Result<(), EventStoreError> {
1370    let mut connection = pool
1371        .acquire()
1372        .await
1373        .map_err(PostgresStore::backend_failure)?;
1374    ensure_replay_history_is_available(connection.as_mut(), current_max_sequence_number).await
1375}
1376
1377async fn update_durable_stream_cursor(
1378    pool: &PgPool,
1379    durable_stream_id: &str,
1380    last_processed_sequence_number: u64,
1381) -> Result<(), EventStoreError> {
1382    sqlx::query(
1383        "UPDATE durable_stream_cursors
1384         SET last_processed_sequence_number = $2
1385         WHERE durable_stream_id = $1",
1386    )
1387    .bind(durable_stream_id)
1388    .bind(last_processed_sequence_number as i64)
1389    .execute(pool)
1390    .await
1391    .map_err(PostgresStore::backend_failure)?;
1392
1393    Ok(())
1394}
1395
1396async fn load_replay_batches(
1397    pool: &PgPool,
1398    event_query: &EventQuery,
1399    last_processed_sequence_number: u64,
1400    replay_until_sequence_number: u64,
1401) -> Result<Vec<ReplayBatch>, EventStoreError> {
1402    if replay_until_sequence_number <= last_processed_sequence_number {
1403        return Ok(Vec::new());
1404    }
1405
1406    let batch_rows = sqlx::query(
1407        "SELECT first_sequence_number, last_sequence_number
1408         FROM append_batches
1409         WHERE last_sequence_number > $1
1410           AND first_sequence_number <= $2
1411         ORDER BY first_sequence_number ASC",
1412    )
1413    .bind(last_processed_sequence_number as i64)
1414    .bind(replay_until_sequence_number as i64)
1415    .fetch_all(pool)
1416    .await
1417    .map_err(PostgresStore::backend_failure)?;
1418
1419    let mut replay_batches = Vec::new();
1420
1421    for batch_row in batch_rows {
1422        let first_sequence_number = batch_row.get::<i64, _>("first_sequence_number") as u64;
1423        let last_sequence_number = batch_row.get::<i64, _>("last_sequence_number") as u64;
1424        let event_rows = sqlx::query(
1425            "SELECT sequence_number, occurred_at, event_type, payload
1426             FROM events
1427             WHERE sequence_number >= $1 AND sequence_number <= $2
1428             ORDER BY sequence_number ASC",
1429        )
1430        .bind(first_sequence_number as i64)
1431        .bind(last_sequence_number as i64)
1432        .fetch_all(pool)
1433        .await
1434        .map_err(PostgresStore::backend_failure)?;
1435
1436        let delivered_batch = event_rows
1437            .into_iter()
1438            .map(event_record_from_row)
1439            .collect::<Result<Vec<_>, _>>()
1440            .map_err(PostgresStore::backend_failure)?
1441            .into_iter()
1442            .filter(|event_record| matches_query(event_query, event_record))
1443            .collect::<Vec<_>>();
1444
1445        replay_batches.push(ReplayBatch {
1446            last_processed_sequence_number: last_sequence_number,
1447            delivered_batch,
1448        });
1449    }
1450
1451    Ok(replay_batches)
1452}
1453
1454fn normalized_durable_event_query(event_query: &EventQuery) -> EventQuery {
1455    EventQuery {
1456        filters: event_query.filters.clone(),
1457        min_sequence_number: None,
1458    }
1459}
1460
1461fn serialize_event_query(event_query: &EventQuery) -> Result<String, EventStoreError> {
1462    let filters = event_query.filters.as_ref().map(|filters| {
1463        filters
1464            .iter()
1465            .map(|filter| {
1466                serde_json::json!({
1467                    "event_types": filter.event_types,
1468                    "payload_predicates": filter.payload_predicates,
1469                })
1470            })
1471            .collect::<Vec<_>>()
1472    });
1473
1474    serde_json::to_string(&serde_json::json!({
1475        "filters": filters,
1476        "min_sequence_number": event_query.min_sequence_number,
1477    }))
1478    .map_err(json_backend_failure)
1479}