Skip to main content

factstr_postgres/
postgres_store.rs

1use std::future::Future;
2use std::io;
3use std::sync::{
4    Arc, Mutex,
5    mpsc::{self, Receiver, Sender},
6};
7use std::thread::{self, JoinHandle};
8
9use factstr::{
10    AppendResult, DurableStream, EventQuery, EventRecord, EventStore, EventStoreError, EventStream,
11    HandleStream, NewEvent, QueryResult,
12};
13use sqlx::{
14    PgPool, Postgres, QueryBuilder, Row, Transaction,
15    postgres::{PgPoolOptions, PgRow},
16};
17use time::OffsetDateTime;
18use tokio::runtime::Builder;
19use url::Url;
20
21use crate::query_match::matches_query;
22use crate::query_sql::push_query_conditions;
23use crate::stream_registry::{DeliveryOutcome, PendingDelivery, SubscriptionRegistry};
24
25#[derive(Clone, Debug)]
26struct CommittedAppend {
27    append_result: AppendResult,
28    event_records: Vec<EventRecord>,
29}
30
31#[derive(Clone, Debug)]
32struct ReplayBatch {
33    last_processed_sequence_number: u64,
34    delivered_batch: Vec<EventRecord>,
35}
36
37#[derive(Clone, Debug)]
38struct DurableReplayState {
39    subscription_id: u64,
40    last_processed_sequence_number: u64,
41    replay_until_sequence_number: u64,
42}
43
44enum WorkerCommand {
45    Query {
46        event_query: EventQuery,
47        reply: Sender<Result<QueryResult, EventStoreError>>,
48    },
49    Append {
50        new_events: Vec<NewEvent>,
51        reply: Sender<Result<AppendResult, EventStoreError>>,
52    },
53    AppendIf {
54        new_events: Vec<NewEvent>,
55        context_query: EventQuery,
56        expected_context_version: Option<u64>,
57        reply: Sender<Result<AppendResult, EventStoreError>>,
58    },
59    Shutdown,
60}
61
62enum DeliveryCommand {
63    Deliver(Vec<PendingDelivery>),
64    Shutdown,
65}
66
67#[derive(Clone, Debug, Eq, PartialEq)]
68pub struct PostgresBootstrapOptions {
69    pub server_url: String,
70    pub database_name: String,
71}
72
73pub struct PostgresStore {
74    connection_string: String,
75    subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
76    worker_sender: Mutex<Sender<WorkerCommand>>,
77    worker_thread: Mutex<Option<JoinHandle<()>>>,
78    delivery_sender: Sender<DeliveryCommand>,
79    delivery_thread: Mutex<Option<JoinHandle<()>>>,
80}
81
82impl PostgresStore {
83    pub fn connect(connection_string: &str) -> Result<Self, sqlx::Error> {
84        let connection_string = connection_string.to_owned();
85        bootstrap_connection(&connection_string)?;
86
87        let subscription_registry = Arc::new(Mutex::new(SubscriptionRegistry::default()));
88        let (delivery_sender, delivery_receiver) = mpsc::channel();
89        let delivery_thread = thread::Builder::new()
90            .name("factstr-postgres-delivery".to_owned())
91            .spawn({
92                let connection_string = connection_string.clone();
93                let subscription_registry = Arc::clone(&subscription_registry);
94                move || {
95                    run_delivery_thread(connection_string, subscription_registry, delivery_receiver)
96                }
97            })
98            .map_err(sqlx_io_error)?;
99
100        let (worker_sender, worker_receiver) = mpsc::channel();
101        let (ready_sender, ready_receiver) = mpsc::sync_channel(1);
102        let worker_thread = thread::Builder::new()
103            .name("factstr-postgres-worker".to_owned())
104            .spawn({
105                let connection_string = connection_string.clone();
106                let subscription_registry = Arc::clone(&subscription_registry);
107                let delivery_sender = delivery_sender.clone();
108                move || {
109                    run_worker_thread(
110                        connection_string,
111                        worker_receiver,
112                        ready_sender,
113                        subscription_registry,
114                        delivery_sender,
115                    )
116                }
117            })
118            .map_err(sqlx_io_error)?;
119
120        match ready_receiver.recv() {
121            Ok(Ok(())) => {}
122            Ok(Err(error)) => {
123                let _ = worker_thread.join();
124                let _ = delivery_sender.send(DeliveryCommand::Shutdown);
125                let _ = delivery_thread.join();
126                return Err(error);
127            }
128            Err(error) => {
129                let _ = worker_thread.join();
130                let _ = delivery_sender.send(DeliveryCommand::Shutdown);
131                let _ = delivery_thread.join();
132                return Err(sqlx_io_error(io::Error::other(format!(
133                    "postgres worker startup channel failed: {error}"
134                ))));
135            }
136        }
137
138        Ok(Self {
139            connection_string,
140            subscription_registry,
141            worker_sender: Mutex::new(worker_sender),
142            worker_thread: Mutex::new(Some(worker_thread)),
143            delivery_sender,
144            delivery_thread: Mutex::new(Some(delivery_thread)),
145        })
146    }
147
148    pub fn bootstrap(options: PostgresBootstrapOptions) -> Result<Self, EventStoreError> {
149        bootstrap_database(&options)?;
150        let database_url = database_url_with_name(&options.server_url, &options.database_name)?;
151        Self::connect(&database_url).map_err(Self::backend_failure)
152    }
153
154    fn backend_failure(error: sqlx::Error) -> EventStoreError {
155        EventStoreError::BackendFailure {
156            message: error.to_string(),
157        }
158    }
159
160    fn worker_failure(message: impl Into<String>) -> EventStoreError {
161        EventStoreError::BackendFailure {
162            message: message.into(),
163        }
164    }
165
166    fn send_command(&self, worker_command: WorkerCommand) -> Result<(), EventStoreError> {
167        let worker_sender = self
168            .worker_sender
169            .lock()
170            .map_err(|_| Self::worker_failure("postgres worker sender lock poisoned"))?;
171
172        worker_sender
173            .send(worker_command)
174            .map_err(|error| Self::worker_failure(format!("postgres worker stopped: {error}")))
175    }
176
177    fn run_query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
178        let (reply_sender, reply_receiver) = mpsc::channel();
179        self.send_command(WorkerCommand::Query {
180            event_query: event_query.clone(),
181            reply: reply_sender,
182        })?;
183
184        reply_receiver.recv().map_err(|error| {
185            Self::worker_failure(format!("postgres worker query reply failed: {error}"))
186        })?
187    }
188
189    fn run_append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
190        let (reply_sender, reply_receiver) = mpsc::channel();
191        self.send_command(WorkerCommand::Append {
192            new_events,
193            reply: reply_sender,
194        })?;
195
196        reply_receiver.recv().map_err(|error| {
197            Self::worker_failure(format!("postgres worker append reply failed: {error}"))
198        })?
199    }
200
201    fn run_append_if(
202        &self,
203        new_events: Vec<NewEvent>,
204        context_query: &EventQuery,
205        expected_context_version: Option<u64>,
206    ) -> Result<AppendResult, EventStoreError> {
207        let (reply_sender, reply_receiver) = mpsc::channel();
208        self.send_command(WorkerCommand::AppendIf {
209            new_events,
210            context_query: context_query.clone(),
211            expected_context_version,
212            reply: reply_sender,
213        })?;
214
215        reply_receiver.recv().map_err(|error| {
216            Self::worker_failure(format!(
217                "postgres worker conditional append reply failed: {error}"
218            ))
219        })?
220    }
221
222    fn run_async<T, Fut, F>(&self, operation: &'static str, work: F) -> Result<T, EventStoreError>
223    where
224        T: Send + 'static,
225        Fut: Future<Output = Result<T, EventStoreError>> + Send + 'static,
226        F: FnOnce(PgPool) -> Fut + Send + 'static,
227    {
228        let connection_string = self.connection_string.clone();
229        let (result_sender, result_receiver) = mpsc::sync_channel(1);
230
231        let worker_thread = thread::Builder::new()
232            .name(format!("factstr-postgres-{operation}"))
233            .spawn(move || {
234                let runtime = Builder::new_current_thread()
235                    .enable_all()
236                    .build()
237                    .map_err(sqlx_io_error)
238                    .map_err(Self::backend_failure);
239
240                let result = match runtime {
241                    Ok(runtime) => runtime.block_on(async {
242                        let pool = PgPoolOptions::new()
243                            .max_connections(1)
244                            .connect(&connection_string)
245                            .await
246                            .map_err(Self::backend_failure)?;
247                        work(pool).await
248                    }),
249                    Err(error) => Err(error),
250                };
251
252                let _ = result_sender.send(result);
253            })
254            .map_err(sqlx_io_error)
255            .map_err(Self::backend_failure)?;
256
257        let result = result_receiver
258            .recv()
259            .map_err(|error| EventStoreError::BackendFailure {
260                message: format!("postgres {operation} thread did not return a result: {error}"),
261            })?;
262
263        worker_thread
264            .join()
265            .map_err(|_| EventStoreError::BackendFailure {
266                message: format!("postgres {operation} thread panicked before completion"),
267            })?;
268
269        result
270    }
271
272    fn enqueue_delivery(&self, pending_deliveries: Vec<PendingDelivery>) {
273        Self::enqueue_delivery_with_sender(&self.delivery_sender, pending_deliveries);
274    }
275
276    fn enqueue_delivery_with_sender(
277        delivery_sender: &Sender<DeliveryCommand>,
278        pending_deliveries: Vec<PendingDelivery>,
279    ) {
280        if pending_deliveries.is_empty() {
281            return;
282        }
283
284        if let Err(error) = delivery_sender.send(DeliveryCommand::Deliver(pending_deliveries)) {
285            eprintln!(
286                "factstr-postgres delivery dispatcher stopped after commit: {}",
287                error
288            );
289        }
290    }
291
292    fn register_all_durable_stream(
293        &self,
294        durable_stream_id: impl Into<String>,
295        handle: HandleStream,
296    ) -> Result<EventStream, EventStoreError> {
297        self.stream_durable(durable_stream_id.into(), EventQuery::all(), handle)
298    }
299
300    fn register_durable_stream(
301        &self,
302        durable_stream_id: impl Into<String>,
303        event_query: &EventQuery,
304        handle: HandleStream,
305    ) -> Result<EventStream, EventStoreError> {
306        self.stream_durable(durable_stream_id.into(), event_query.clone(), handle)
307    }
308
309    fn stream_durable(
310        &self,
311        durable_stream_id: String,
312        event_query: EventQuery,
313        handle: HandleStream,
314    ) -> Result<EventStream, EventStoreError> {
315        let normalized_event_query = normalized_durable_event_query(&event_query);
316        let event_query_json = serialize_event_query(&normalized_event_query)?;
317        let subscription_registry = Arc::clone(&self.subscription_registry);
318        let durable_stream_id_for_registry = durable_stream_id.clone();
319        let normalized_event_query_for_registry = normalized_event_query.clone();
320        let handle_for_registry = handle.clone();
321        let replay_stream_id = durable_stream_id.clone();
322
323        let replay_state = self.run_async("stream_durable", move |pool| async move {
324            let mut transaction = pool.begin().await.map_err(Self::backend_failure)?;
325
326            let last_processed_sequence_number = load_or_create_durable_stream_cursor(
327                &mut transaction,
328                &durable_stream_id_for_registry,
329                &event_query_json,
330            )
331            .await?;
332            let replay_until_sequence_number =
333                current_max_sequence_number_in_transaction(&mut transaction).await?;
334
335            let subscription_id = match subscription_registry.lock() {
336                Ok(mut subscription_registry) => {
337                    if normalized_event_query_for_registry.filters.is_none() {
338                        subscription_registry
339                            .subscribe_all_durable(
340                                durable_stream_id_for_registry.clone(),
341                                replay_until_sequence_number,
342                                handle_for_registry.clone(),
343                            )
344                            .map_err(subscription_registry_backend_failure)?
345                    } else {
346                        subscription_registry
347                            .subscribe_to_durable(
348                                durable_stream_id_for_registry.clone(),
349                                Some(normalized_event_query_for_registry.clone()),
350                                replay_until_sequence_number,
351                                handle_for_registry.clone(),
352                            )
353                            .map_err(subscription_registry_backend_failure)?
354                    }
355                }
356                Err(poisoned) => {
357                    let mut subscription_registry = poisoned.into_inner();
358                    if normalized_event_query_for_registry.filters.is_none() {
359                        subscription_registry
360                            .subscribe_all_durable(
361                                durable_stream_id_for_registry.clone(),
362                                replay_until_sequence_number,
363                                handle_for_registry.clone(),
364                            )
365                            .map_err(subscription_registry_backend_failure)?
366                    } else {
367                        subscription_registry
368                            .subscribe_to_durable(
369                                durable_stream_id_for_registry.clone(),
370                                Some(normalized_event_query_for_registry.clone()),
371                                replay_until_sequence_number,
372                                handle_for_registry.clone(),
373                            )
374                            .map_err(subscription_registry_backend_failure)?
375                    }
376                }
377            };
378
379            if let Err(error) = transaction.commit().await {
380                match subscription_registry.lock() {
381                    Ok(mut subscription_registry) => {
382                        subscription_registry.unsubscribe(subscription_id)
383                    }
384                    Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
385                }
386                return Err(Self::backend_failure(error));
387            }
388
389            Ok(DurableReplayState {
390                subscription_id,
391                last_processed_sequence_number,
392                replay_until_sequence_number,
393            })
394        })?;
395
396        let subscription = self.build_subscription_handle(replay_state.subscription_id);
397
398        let replay_batches = match self.run_async("durable_replay", {
399            let event_query = normalized_event_query.clone();
400            move |pool| async move {
401                ensure_replay_history_is_available_for_pool(
402                    &pool,
403                    replay_state.replay_until_sequence_number,
404                )
405                .await?;
406                load_replay_batches(
407                    &pool,
408                    &event_query,
409                    replay_state.last_processed_sequence_number,
410                    replay_state.replay_until_sequence_number,
411                )
412                .await
413            }
414        }) {
415            Ok(replay_batches) => replay_batches,
416            Err(error) => {
417                self.cleanup_durable_subscription(replay_state.subscription_id);
418                return Err(error);
419            }
420        };
421
422        for replay_batch in replay_batches {
423            let pending_delivery = PendingDelivery {
424                subscription_id: replay_state.subscription_id,
425                durable_stream_id: Some(replay_stream_id.clone()),
426                last_processed_sequence_number: replay_batch.last_processed_sequence_number,
427                delivered_batch: replay_batch.delivered_batch,
428                handle: handle.clone(),
429            };
430
431            match self.process_durable_delivery(pending_delivery) {
432                Ok(true) => {}
433                Ok(false) => {
434                    self.cleanup_durable_subscription(replay_state.subscription_id);
435                    return Err(EventStoreError::BackendFailure {
436                        message: format!(
437                            "durable replay for stream {} did not complete successfully",
438                            replay_stream_id
439                        ),
440                    });
441                }
442                Err(error) => {
443                    self.cleanup_durable_subscription(replay_state.subscription_id);
444                    return Err(error);
445                }
446            }
447        }
448
449        self.finish_durable_replay(replay_state.subscription_id);
450        Ok(subscription)
451    }
452
453    fn build_subscription_handle(&self, subscription_id: u64) -> EventStream {
454        let subscription_registry = Arc::clone(&self.subscription_registry);
455
456        EventStream::new(
457            subscription_id,
458            Arc::new(move |subscription_id| match subscription_registry.lock() {
459                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
460                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
461            }),
462        )
463    }
464
465    fn process_durable_delivery(
466        &self,
467        pending_delivery: PendingDelivery,
468    ) -> Result<bool, EventStoreError> {
469        match pending_delivery.deliver() {
470            DeliveryOutcome::Succeeded {
471                durable_stream_id,
472                last_processed_sequence_number,
473                ..
474            } => {
475                if let Some(durable_stream_id) = durable_stream_id {
476                    self.run_async("advance_durable_cursor", move |pool| async move {
477                        update_durable_stream_cursor(
478                            &pool,
479                            &durable_stream_id,
480                            last_processed_sequence_number,
481                        )
482                        .await
483                    })?;
484                }
485
486                Ok(true)
487            }
488            DeliveryOutcome::Failed { .. } | DeliveryOutcome::Panicked { .. } => Ok(false),
489        }
490    }
491
492    fn finish_durable_replay(&self, subscription_id: u64) {
493        match self.subscription_registry.lock() {
494            Ok(mut subscription_registry) => {
495                let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
496                self.enqueue_delivery(buffered_deliveries);
497            }
498            Err(poisoned) => {
499                let mut subscription_registry = poisoned.into_inner();
500                let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
501                self.enqueue_delivery(buffered_deliveries);
502            }
503        }
504    }
505
506    fn cleanup_durable_subscription(&self, subscription_id: u64) {
507        match self.subscription_registry.lock() {
508            Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
509            Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
510        }
511    }
512}
513
514impl Drop for PostgresStore {
515    fn drop(&mut self) {
516        if let Ok(worker_sender) = self.worker_sender.lock() {
517            let _ = worker_sender.send(WorkerCommand::Shutdown);
518        }
519
520        if let Ok(mut worker_thread) = self.worker_thread.lock() {
521            if let Some(worker_thread) = worker_thread.take() {
522                let _ = worker_thread.join();
523            }
524        }
525
526        let _ = self.delivery_sender.send(DeliveryCommand::Shutdown);
527
528        if let Ok(mut delivery_thread) = self.delivery_thread.lock() {
529            if let Some(delivery_thread) = delivery_thread.take() {
530                let _ = delivery_thread.join();
531            }
532        }
533    }
534}
535
536impl EventStore for PostgresStore {
537    fn query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
538        self.run_query(event_query)
539    }
540
541    fn append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
542        if new_events.is_empty() {
543            return Err(EventStoreError::EmptyAppend);
544        }
545
546        self.run_append(new_events)
547    }
548
549    fn append_if(
550        &self,
551        new_events: Vec<NewEvent>,
552        context_query: &EventQuery,
553        expected_context_version: Option<u64>,
554    ) -> Result<AppendResult, EventStoreError> {
555        if new_events.is_empty() {
556            return Err(EventStoreError::EmptyAppend);
557        }
558
559        self.run_append_if(new_events, context_query, expected_context_version)
560    }
561
562    fn stream_all(&self, handle: HandleStream) -> Result<EventStream, EventStoreError> {
563        let subscription_registry = Arc::clone(&self.subscription_registry);
564        let id = match subscription_registry.lock() {
565            Ok(mut subscription_registry) => subscription_registry.subscribe_all(handle),
566            Err(poisoned) => poisoned.into_inner().subscribe_all(handle),
567        };
568
569        Ok(EventStream::new(
570            id,
571            Arc::new(move |subscription_id| match subscription_registry.lock() {
572                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
573                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
574            }),
575        ))
576    }
577
578    fn stream_to(
579        &self,
580        event_query: &EventQuery,
581        handle: HandleStream,
582    ) -> Result<EventStream, EventStoreError> {
583        let subscription_registry = Arc::clone(&self.subscription_registry);
584        let id = match subscription_registry.lock() {
585            Ok(mut subscription_registry) => {
586                subscription_registry.subscribe_to(Some(event_query.clone()), handle)
587            }
588            Err(poisoned) => poisoned
589                .into_inner()
590                .subscribe_to(Some(event_query.clone()), handle),
591        };
592
593        Ok(EventStream::new(
594            id,
595            Arc::new(move |subscription_id| match subscription_registry.lock() {
596                Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
597                Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
598            }),
599        ))
600    }
601
602    fn stream_all_durable(
603        &self,
604        durable_stream: &DurableStream,
605        handle: HandleStream,
606    ) -> Result<EventStream, EventStoreError> {
607        self.register_all_durable_stream(durable_stream.name(), handle)
608    }
609
610    fn stream_to_durable(
611        &self,
612        durable_stream: &DurableStream,
613        event_query: &EventQuery,
614        handle: HandleStream,
615    ) -> Result<EventStream, EventStoreError> {
616        self.register_durable_stream(durable_stream.name(), event_query, handle)
617    }
618}
619
620fn run_worker_thread(
621    connection_string: String,
622    worker_receiver: Receiver<WorkerCommand>,
623    ready_sender: mpsc::SyncSender<Result<(), sqlx::Error>>,
624    subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
625    delivery_sender: Sender<DeliveryCommand>,
626) {
627    let runtime = match Builder::new_current_thread().enable_all().build() {
628        Ok(runtime) => runtime,
629        Err(error) => {
630            let _ = ready_sender.send(Err(sqlx_io_error(io::Error::other(format!(
631                "tokio runtime should build: {error}"
632            )))));
633            return;
634        }
635    };
636
637    let pool = match runtime.block_on(async {
638        PgPoolOptions::new()
639            .max_connections(1)
640            .connect(&connection_string)
641            .await
642    }) {
643        Ok(pool) => pool,
644        Err(error) => {
645            let _ = ready_sender.send(Err(error));
646            return;
647        }
648    };
649
650    if ready_sender.send(Ok(())).is_err() {
651        return;
652    }
653
654    while let Ok(worker_command) = worker_receiver.recv() {
655        match worker_command {
656            WorkerCommand::Query { event_query, reply } => {
657                let result = runtime
658                    .block_on(query_with_pool(&pool, &event_query))
659                    .map_err(PostgresStore::backend_failure);
660                let _ = reply.send(result);
661            }
662            WorkerCommand::Append { new_events, reply } => {
663                let result = runtime
664                    .block_on(append_with_pool(&pool, new_events))
665                    .map_err(PostgresStore::backend_failure);
666                if let Ok(committed_append) = &result {
667                    let pending_deliveries = match subscription_registry.lock() {
668                        Ok(mut subscription_registry) => subscription_registry
669                            .pending_deliveries(&committed_append.event_records),
670                        Err(poisoned) => poisoned
671                            .into_inner()
672                            .pending_deliveries(&committed_append.event_records),
673                    };
674                    PostgresStore::enqueue_delivery_with_sender(
675                        &delivery_sender,
676                        pending_deliveries,
677                    );
678                }
679                let result = result.map(|committed_append| committed_append.append_result);
680                let _ = reply.send(result);
681            }
682            WorkerCommand::AppendIf {
683                new_events,
684                context_query,
685                expected_context_version,
686                reply,
687            } => {
688                let result = runtime
689                    .block_on(append_if_with_pool(
690                        &pool,
691                        new_events,
692                        &context_query,
693                        expected_context_version,
694                    ))
695                    .map_err(PostgresStore::backend_failure)
696                    .and_then(|result| result);
697                if let Ok(committed_append) = &result {
698                    let pending_deliveries = match subscription_registry.lock() {
699                        Ok(mut subscription_registry) => subscription_registry
700                            .pending_deliveries(&committed_append.event_records),
701                        Err(poisoned) => poisoned
702                            .into_inner()
703                            .pending_deliveries(&committed_append.event_records),
704                    };
705                    PostgresStore::enqueue_delivery_with_sender(
706                        &delivery_sender,
707                        pending_deliveries,
708                    );
709                }
710                let result = result.map(|committed_append| committed_append.append_result);
711                let _ = reply.send(result);
712            }
713            WorkerCommand::Shutdown => break,
714        }
715    }
716}
717
718fn run_delivery_thread(
719    connection_string: String,
720    subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
721    delivery_receiver: Receiver<DeliveryCommand>,
722) {
723    let runtime = match Builder::new_current_thread().enable_all().build() {
724        Ok(runtime) => runtime,
725        Err(error) => {
726            eprintln!(
727                "factstr-postgres delivery runtime could not start: {}",
728                error
729            );
730            return;
731        }
732    };
733
734    let pool = match runtime.block_on(async {
735        PgPoolOptions::new()
736            .max_connections(1)
737            .connect(&connection_string)
738            .await
739    }) {
740        Ok(pool) => pool,
741        Err(error) => {
742            eprintln!(
743                "factstr-postgres delivery pool could not connect: {}",
744                error
745            );
746            return;
747        }
748    };
749
750    while let Ok(delivery_command) = delivery_receiver.recv() {
751        match delivery_command {
752            DeliveryCommand::Deliver(pending_deliveries) => {
753                for pending_delivery in pending_deliveries {
754                    match pending_delivery.deliver() {
755                        DeliveryOutcome::Succeeded {
756                            subscription_id,
757                            durable_stream_id,
758                            last_processed_sequence_number,
759                        } => {
760                            if let Some(durable_stream_id) = durable_stream_id {
761                                if let Err(error) = runtime.block_on(update_durable_stream_cursor(
762                                    &pool,
763                                    &durable_stream_id,
764                                    last_processed_sequence_number,
765                                )) {
766                                    eprintln!(
767                                        "factstr-postgres durable cursor update failed after delivery for stream {}: {}",
768                                        durable_stream_id, error
769                                    );
770                                    match subscription_registry.lock() {
771                                        Ok(mut subscription_registry) => {
772                                            subscription_registry.unsubscribe(subscription_id)
773                                        }
774                                        Err(poisoned) => {
775                                            poisoned.into_inner().unsubscribe(subscription_id)
776                                        }
777                                    }
778                                }
779                            }
780                        }
781                        DeliveryOutcome::Failed {
782                            subscription_id,
783                            durable_stream_id,
784                        }
785                        | DeliveryOutcome::Panicked {
786                            subscription_id,
787                            durable_stream_id,
788                        } => {
789                            if durable_stream_id.is_some() {
790                                match subscription_registry.lock() {
791                                    Ok(mut subscription_registry) => {
792                                        subscription_registry.unsubscribe(subscription_id)
793                                    }
794                                    Err(poisoned) => {
795                                        poisoned.into_inner().unsubscribe(subscription_id)
796                                    }
797                                }
798                            }
799                        }
800                    }
801                }
802            }
803            DeliveryCommand::Shutdown => break,
804        }
805    }
806}
807
808fn sqlx_io_error(error: io::Error) -> sqlx::Error {
809    sqlx::Error::Io(error)
810}
811
812fn backend_failure_message(message: impl Into<String>) -> EventStoreError {
813    EventStoreError::BackendFailure {
814        message: message.into(),
815    }
816}
817
818fn bootstrap_database(options: &PostgresBootstrapOptions) -> Result<(), EventStoreError> {
819    let options = options.clone();
820    let (result_sender, result_receiver) = mpsc::sync_channel(1);
821
822    let bootstrap_thread = thread::Builder::new()
823        .name("factstr-postgres-database-bootstrap".to_owned())
824        .spawn(move || {
825            let runtime = Builder::new_current_thread()
826                .enable_all()
827                .build()
828                .map_err(sqlx_io_error)
829                .map_err(PostgresStore::backend_failure);
830
831            let result = match runtime {
832                Ok(runtime) => runtime.block_on(async move {
833                    ensure_database_exists(&options.server_url, &options.database_name).await
834                }),
835                Err(error) => Err(error),
836            };
837
838            let _ = result_sender.send(result);
839        })
840        .map_err(sqlx_io_error)
841        .map_err(PostgresStore::backend_failure)?;
842
843    let result = result_receiver.recv().map_err(|error| {
844        backend_failure_message(format!(
845            "postgres bootstrap database thread did not return a result: {error}"
846        ))
847    })?;
848
849    bootstrap_thread.join().map_err(|_| {
850        backend_failure_message("postgres bootstrap database thread panicked before completion")
851    })?;
852
853    result
854}
855
856async fn ensure_database_exists(
857    server_url: &str,
858    database_name: &str,
859) -> Result<(), EventStoreError> {
860    validate_bootstrap_database_name(database_name)?;
861    let _ = database_url_with_name(server_url, database_name)?;
862
863    let quoted_database_name = quote_postgres_identifier(database_name)?;
864    let pool = PgPoolOptions::new()
865        .max_connections(1)
866        .connect(server_url)
867        .await
868        .map_err(|error| {
869            backend_failure_message(format!(
870                "postgres bootstrap could not connect to server_url {server_url:?}: {error}"
871            ))
872        })?;
873
874    let database_exists = sqlx::query_scalar::<_, bool>(
875        "SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = $1)",
876    )
877    .bind(database_name)
878    .fetch_one(&pool)
879    .await
880    .map_err(|error| {
881        backend_failure_message(format!(
882            "postgres bootstrap could not inspect database {database_name:?}: {error}"
883        ))
884    })?;
885
886    if database_exists {
887        return Ok(());
888    }
889
890    let create_database_sql = format!("CREATE DATABASE {quoted_database_name}");
891    match sqlx::query(&create_database_sql).execute(&pool).await {
892        Ok(_) => Ok(()),
893        Err(sqlx::Error::Database(database_error))
894            if database_error.code().as_deref() == Some("42P04") =>
895        {
896            Ok(())
897        }
898        Err(error) => Err(backend_failure_message(format!(
899            "postgres bootstrap could not create database {database_name:?}: {error}"
900        ))),
901    }
902}
903
904fn quote_postgres_identifier(identifier: &str) -> Result<String, EventStoreError> {
905    if identifier.is_empty() {
906        return Err(backend_failure_message(
907            "postgres bootstrap database_name must not be empty",
908        ));
909    }
910
911    if identifier.contains('\0') {
912        return Err(backend_failure_message(
913            "postgres bootstrap database_name must not contain null bytes",
914        ));
915    }
916
917    Ok(format!("\"{}\"", identifier.replace('"', "\"\"")))
918}
919
920fn validate_bootstrap_database_name(database_name: &str) -> Result<(), EventStoreError> {
921    if database_name.is_empty() {
922        return Err(backend_failure_message(
923            "postgres bootstrap database_name must not be empty",
924        ));
925    }
926
927    if database_name.contains('\0') {
928        return Err(backend_failure_message(
929            "postgres bootstrap database_name must not contain null bytes",
930        ));
931    }
932
933    let mut characters = database_name.chars();
934    let Some(first_character) = characters.next() else {
935        return Err(backend_failure_message(
936            "postgres bootstrap database_name must not be empty",
937        ));
938    };
939
940    if !(first_character.is_ascii_alphabetic() || first_character == '_') {
941        return Err(backend_failure_message(
942            "postgres bootstrap database_name must match [A-Za-z_][A-Za-z0-9_]*",
943        ));
944    }
945
946    if !characters.all(|character| character.is_ascii_alphanumeric() || character == '_') {
947        return Err(backend_failure_message(
948            "postgres bootstrap database_name must match [A-Za-z_][A-Za-z0-9_]*",
949        ));
950    }
951
952    Ok(())
953}
954
955fn database_url_with_name(
956    server_url: &str,
957    database_name: &str,
958) -> Result<String, EventStoreError> {
959    let mut url = Url::parse(server_url).map_err(|error| {
960        backend_failure_message(format!(
961            "invalid postgres server_url {server_url:?}: {error}"
962        ))
963    })?;
964    url.set_path(&format!("/{database_name}"));
965    Ok(url.into())
966}
967
968fn bootstrap_connection(connection_string: &str) -> Result<(), sqlx::Error> {
969    let connection_string = connection_string.to_owned();
970    let (result_sender, result_receiver) = mpsc::sync_channel(1);
971
972    let bootstrap_thread = thread::Builder::new()
973        .name("factstr-postgres-bootstrap".to_owned())
974        .spawn(move || {
975            let runtime = Builder::new_current_thread()
976                .enable_all()
977                .build()
978                .map_err(sqlx_io_error);
979
980            let result = match runtime {
981                Ok(runtime) => runtime.block_on(async {
982                    let pool = PgPoolOptions::new()
983                        .max_connections(1)
984                        .connect(&connection_string)
985                        .await?;
986                    initialize_schema(&pool).await?;
987                    Ok::<_, sqlx::Error>(())
988                }),
989                Err(error) => Err(error),
990            };
991
992            let _ = result_sender.send(result);
993        })
994        .map_err(sqlx_io_error)?;
995
996    let result = match result_receiver.recv() {
997        Ok(result) => result,
998        Err(error) => Err(sqlx_io_error(io::Error::other(format!(
999            "postgres bootstrap thread did not return a result: {error}"
1000        )))),
1001    };
1002
1003    bootstrap_thread.join().map_err(|_| {
1004        sqlx_io_error(io::Error::other(
1005            "postgres bootstrap thread panicked before connect completed",
1006        ))
1007    })?;
1008
1009    result
1010}
1011
1012async fn query_with_pool(
1013    pool: &PgPool,
1014    event_query: &EventQuery,
1015) -> Result<QueryResult, sqlx::Error> {
1016    let current_context_version = current_context_version(pool, event_query).await?;
1017
1018    let mut query_builder: QueryBuilder<'_, Postgres> =
1019        QueryBuilder::new("SELECT sequence_number, occurred_at, event_type, payload FROM events");
1020    push_query_conditions(&mut query_builder, event_query, true);
1021    query_builder.push(" ORDER BY sequence_number ASC");
1022
1023    let event_records = query_builder
1024        .build()
1025        .fetch_all(pool)
1026        .await?
1027        .into_iter()
1028        .map(event_record_from_row)
1029        .collect::<Result<Vec<_>, _>>()?;
1030
1031    let last_returned_sequence_number = event_records
1032        .last()
1033        .map(|event_record| event_record.sequence_number);
1034
1035    Ok(QueryResult {
1036        event_records,
1037        last_returned_sequence_number,
1038        current_context_version,
1039    })
1040}
1041
1042async fn append_with_pool(
1043    pool: &PgPool,
1044    new_events: Vec<NewEvent>,
1045) -> Result<CommittedAppend, sqlx::Error> {
1046    let mut transaction = pool.begin().await?;
1047    lock_events_tables(&mut transaction).await?;
1048    let committed_append = append_records(&mut transaction, new_events).await?;
1049    transaction.commit().await?;
1050    Ok(committed_append)
1051}
1052
1053async fn append_if_with_pool(
1054    pool: &PgPool,
1055    new_events: Vec<NewEvent>,
1056    context_query: &EventQuery,
1057    expected_context_version: Option<u64>,
1058) -> Result<Result<CommittedAppend, EventStoreError>, sqlx::Error> {
1059    let mut transaction = pool.begin().await?;
1060    lock_events_tables(&mut transaction).await?;
1061
1062    let actual_context_version =
1063        current_context_version_in_transaction(&mut transaction, context_query).await?;
1064
1065    if actual_context_version != expected_context_version {
1066        transaction.rollback().await?;
1067        return Ok(Err(EventStoreError::ConditionalAppendConflict {
1068            expected: expected_context_version,
1069            actual: actual_context_version,
1070        }));
1071    }
1072
1073    let committed_append = append_records(&mut transaction, new_events).await?;
1074    transaction.commit().await?;
1075
1076    Ok(Ok(committed_append))
1077}
1078
1079async fn initialize_schema(pool: &PgPool) -> Result<(), sqlx::Error> {
1080    sqlx::query(
1081        "CREATE TABLE IF NOT EXISTS events (
1082            sequence_number BIGINT PRIMARY KEY,
1083            occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
1084            event_type TEXT NOT NULL,
1085            payload JSONB NOT NULL
1086        )",
1087    )
1088    .execute(pool)
1089    .await?;
1090
1091    sqlx::query(
1092        "CREATE TABLE IF NOT EXISTS append_batches (
1093            first_sequence_number BIGINT PRIMARY KEY,
1094            last_sequence_number BIGINT NOT NULL
1095        )",
1096    )
1097    .execute(pool)
1098    .await?;
1099
1100    sqlx::query(
1101        "CREATE TABLE IF NOT EXISTS durable_stream_cursors (
1102            durable_stream_id TEXT PRIMARY KEY,
1103            event_query TEXT NOT NULL,
1104            last_processed_sequence_number BIGINT NOT NULL
1105        )",
1106    )
1107    .execute(pool)
1108    .await?;
1109
1110    sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type)")
1111        .execute(pool)
1112        .await?;
1113
1114    sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_occurred_at ON events(occurred_at)")
1115        .execute(pool)
1116        .await?;
1117
1118    sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_payload_gin ON events USING gin(payload)")
1119        .execute(pool)
1120        .await?;
1121
1122    Ok(())
1123}
1124
1125async fn lock_events_tables(
1126    transaction: &mut Transaction<'_, Postgres>,
1127) -> Result<(), sqlx::Error> {
1128    sqlx::query("LOCK TABLE events, append_batches IN EXCLUSIVE MODE")
1129        .execute(transaction.as_mut())
1130        .await?;
1131
1132    Ok(())
1133}
1134
1135async fn append_records(
1136    transaction: &mut Transaction<'_, Postgres>,
1137    new_events: Vec<NewEvent>,
1138) -> Result<CommittedAppend, sqlx::Error> {
1139    let committed_count = new_events.len() as u64;
1140    let first_sequence_number =
1141        sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(sequence_number), 0) + 1 FROM events")
1142            .fetch_one(transaction.as_mut())
1143            .await? as u64;
1144    let last_sequence_number = first_sequence_number + committed_count - 1;
1145    let committed_event_records = new_events
1146        .into_iter()
1147        .enumerate()
1148        .map(|(offset, new_event)| EventRecord {
1149            sequence_number: first_sequence_number + offset as u64,
1150            occurred_at: OffsetDateTime::now_utc(),
1151            event_type: new_event.event_type,
1152            payload: new_event.payload,
1153        })
1154        .collect::<Vec<_>>();
1155
1156    for event_record in &committed_event_records {
1157        sqlx::query(
1158            "INSERT INTO events (sequence_number, occurred_at, event_type, payload)
1159             VALUES ($1, $2, $3, $4)",
1160        )
1161        .bind(event_record.sequence_number as i64)
1162        .bind(event_record.occurred_at)
1163        .bind(&event_record.event_type)
1164        .bind(&event_record.payload)
1165        .execute(transaction.as_mut())
1166        .await?;
1167    }
1168
1169    sqlx::query(
1170        "INSERT INTO append_batches (first_sequence_number, last_sequence_number)
1171         VALUES ($1, $2)",
1172    )
1173    .bind(first_sequence_number as i64)
1174    .bind(last_sequence_number as i64)
1175    .execute(transaction.as_mut())
1176    .await?;
1177
1178    Ok(CommittedAppend {
1179        append_result: AppendResult {
1180            first_sequence_number,
1181            last_sequence_number,
1182            committed_count,
1183        },
1184        event_records: committed_event_records,
1185    })
1186}
1187
1188async fn current_context_version(
1189    pool: &PgPool,
1190    event_query: &EventQuery,
1191) -> Result<Option<u64>, sqlx::Error> {
1192    let mut query_builder: QueryBuilder<'_, Postgres> =
1193        QueryBuilder::new("SELECT MAX(sequence_number) FROM events");
1194    push_query_conditions(&mut query_builder, event_query, false);
1195
1196    Ok(query_builder
1197        .build_query_scalar::<Option<i64>>()
1198        .fetch_one(pool)
1199        .await?
1200        .map(|sequence_number| sequence_number as u64))
1201}
1202
1203async fn current_context_version_in_transaction(
1204    transaction: &mut Transaction<'_, Postgres>,
1205    event_query: &EventQuery,
1206) -> Result<Option<u64>, sqlx::Error> {
1207    let mut query_builder: QueryBuilder<'_, Postgres> =
1208        QueryBuilder::new("SELECT MAX(sequence_number) FROM events");
1209    push_query_conditions(&mut query_builder, event_query, false);
1210
1211    Ok(query_builder
1212        .build_query_scalar::<Option<i64>>()
1213        .fetch_one(transaction.as_mut())
1214        .await?
1215        .map(|sequence_number| sequence_number as u64))
1216}
1217
1218fn event_record_from_row(row: PgRow) -> Result<EventRecord, sqlx::Error> {
1219    Ok(EventRecord {
1220        sequence_number: row.try_get::<i64, _>("sequence_number")? as u64,
1221        occurred_at: row.try_get("occurred_at")?,
1222        event_type: row.try_get("event_type")?,
1223        payload: row.try_get("payload")?,
1224    })
1225}
1226
1227fn json_backend_failure(error: serde_json::Error) -> EventStoreError {
1228    EventStoreError::BackendFailure {
1229        message: error.to_string(),
1230    }
1231}
1232
1233fn subscription_registry_backend_failure(message: String) -> EventStoreError {
1234    EventStoreError::BackendFailure { message }
1235}
1236
1237async fn load_or_create_durable_stream_cursor(
1238    transaction: &mut Transaction<'_, Postgres>,
1239    durable_stream_id: &str,
1240    event_query_json: &str,
1241) -> Result<u64, EventStoreError> {
1242    let existing_cursor = sqlx::query(
1243        "SELECT event_query, last_processed_sequence_number
1244         FROM durable_stream_cursors
1245         WHERE durable_stream_id = $1",
1246    )
1247    .bind(durable_stream_id)
1248    .fetch_optional(transaction.as_mut())
1249    .await
1250    .map_err(PostgresStore::backend_failure)?;
1251
1252    match existing_cursor {
1253        Some(row) => {
1254            let stored_event_query = row.get::<String, _>("event_query");
1255            if stored_event_query != event_query_json {
1256                return Err(EventStoreError::BackendFailure {
1257                    message: format!(
1258                        "durable stream {durable_stream_id} was resumed with a different query"
1259                    ),
1260                });
1261            }
1262
1263            Ok(row.get::<i64, _>("last_processed_sequence_number") as u64)
1264        }
1265        None => {
1266            sqlx::query(
1267                "INSERT INTO durable_stream_cursors (
1268                    durable_stream_id,
1269                    event_query,
1270                    last_processed_sequence_number
1271                 ) VALUES ($1, $2, 0)",
1272            )
1273            .bind(durable_stream_id)
1274            .bind(event_query_json)
1275            .execute(transaction.as_mut())
1276            .await
1277            .map_err(PostgresStore::backend_failure)?;
1278
1279            Ok(0)
1280        }
1281    }
1282}
1283
1284async fn current_max_sequence_number_in_transaction(
1285    transaction: &mut Transaction<'_, Postgres>,
1286) -> Result<u64, EventStoreError> {
1287    Ok(
1288        sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(sequence_number) FROM events")
1289            .fetch_one(transaction.as_mut())
1290            .await
1291            .map_err(PostgresStore::backend_failure)?
1292            .unwrap_or(0) as u64,
1293    )
1294}
1295
1296async fn ensure_replay_history_is_available(
1297    connection: &mut sqlx::PgConnection,
1298    current_max_sequence_number: u64,
1299) -> Result<(), EventStoreError> {
1300    if current_max_sequence_number == 0 {
1301        return Ok(());
1302    }
1303
1304    let batch_rows = sqlx::query(
1305        "SELECT first_sequence_number, last_sequence_number
1306         FROM append_batches
1307         ORDER BY first_sequence_number ASC",
1308    )
1309    .fetch_all(&mut *connection)
1310    .await
1311    .map_err(PostgresStore::backend_failure)?;
1312
1313    if batch_rows.is_empty() {
1314        return Err(EventStoreError::BackendFailure {
1315            message: "durable replay requires append_batches history for all persisted events"
1316                .to_owned(),
1317        });
1318    }
1319
1320    let mut expected_first_sequence_number = 1_u64;
1321
1322    for batch_row in batch_rows {
1323        let first_sequence_number = batch_row.get::<i64, _>("first_sequence_number") as u64;
1324        let last_sequence_number = batch_row.get::<i64, _>("last_sequence_number") as u64;
1325
1326        if first_sequence_number != expected_first_sequence_number
1327            || last_sequence_number < first_sequence_number
1328        {
1329            return Err(EventStoreError::BackendFailure {
1330                message:
1331                    "durable replay requires contiguous append_batches history for all persisted events"
1332                        .to_owned(),
1333            });
1334        }
1335
1336        expected_first_sequence_number = last_sequence_number + 1;
1337    }
1338
1339    if expected_first_sequence_number - 1 != current_max_sequence_number {
1340        return Err(EventStoreError::BackendFailure {
1341            message: "durable replay requires append_batches history for all persisted events"
1342                .to_owned(),
1343        });
1344    }
1345
1346    Ok(())
1347}
1348
1349async fn ensure_replay_history_is_available_for_pool(
1350    pool: &PgPool,
1351    current_max_sequence_number: u64,
1352) -> Result<(), EventStoreError> {
1353    let mut connection = pool
1354        .acquire()
1355        .await
1356        .map_err(PostgresStore::backend_failure)?;
1357    ensure_replay_history_is_available(connection.as_mut(), current_max_sequence_number).await
1358}
1359
1360async fn update_durable_stream_cursor(
1361    pool: &PgPool,
1362    durable_stream_id: &str,
1363    last_processed_sequence_number: u64,
1364) -> Result<(), EventStoreError> {
1365    sqlx::query(
1366        "UPDATE durable_stream_cursors
1367         SET last_processed_sequence_number = $2
1368         WHERE durable_stream_id = $1",
1369    )
1370    .bind(durable_stream_id)
1371    .bind(last_processed_sequence_number as i64)
1372    .execute(pool)
1373    .await
1374    .map_err(PostgresStore::backend_failure)?;
1375
1376    Ok(())
1377}
1378
1379async fn load_replay_batches(
1380    pool: &PgPool,
1381    event_query: &EventQuery,
1382    last_processed_sequence_number: u64,
1383    replay_until_sequence_number: u64,
1384) -> Result<Vec<ReplayBatch>, EventStoreError> {
1385    if replay_until_sequence_number <= last_processed_sequence_number {
1386        return Ok(Vec::new());
1387    }
1388
1389    let batch_rows = sqlx::query(
1390        "SELECT first_sequence_number, last_sequence_number
1391         FROM append_batches
1392         WHERE last_sequence_number > $1
1393           AND first_sequence_number <= $2
1394         ORDER BY first_sequence_number ASC",
1395    )
1396    .bind(last_processed_sequence_number as i64)
1397    .bind(replay_until_sequence_number as i64)
1398    .fetch_all(pool)
1399    .await
1400    .map_err(PostgresStore::backend_failure)?;
1401
1402    let mut replay_batches = Vec::new();
1403
1404    for batch_row in batch_rows {
1405        let first_sequence_number = batch_row.get::<i64, _>("first_sequence_number") as u64;
1406        let last_sequence_number = batch_row.get::<i64, _>("last_sequence_number") as u64;
1407        let event_rows = sqlx::query(
1408            "SELECT sequence_number, occurred_at, event_type, payload
1409             FROM events
1410             WHERE sequence_number >= $1 AND sequence_number <= $2
1411             ORDER BY sequence_number ASC",
1412        )
1413        .bind(first_sequence_number as i64)
1414        .bind(last_sequence_number as i64)
1415        .fetch_all(pool)
1416        .await
1417        .map_err(PostgresStore::backend_failure)?;
1418
1419        let delivered_batch = event_rows
1420            .into_iter()
1421            .map(event_record_from_row)
1422            .collect::<Result<Vec<_>, _>>()
1423            .map_err(PostgresStore::backend_failure)?
1424            .into_iter()
1425            .filter(|event_record| matches_query(event_query, event_record))
1426            .collect::<Vec<_>>();
1427
1428        replay_batches.push(ReplayBatch {
1429            last_processed_sequence_number: last_sequence_number,
1430            delivered_batch,
1431        });
1432    }
1433
1434    Ok(replay_batches)
1435}
1436
1437fn normalized_durable_event_query(event_query: &EventQuery) -> EventQuery {
1438    EventQuery {
1439        filters: event_query.filters.clone(),
1440        min_sequence_number: None,
1441    }
1442}
1443
1444fn serialize_event_query(event_query: &EventQuery) -> Result<String, EventStoreError> {
1445    let filters = event_query.filters.as_ref().map(|filters| {
1446        filters
1447            .iter()
1448            .map(|filter| {
1449                serde_json::json!({
1450                    "event_types": filter.event_types,
1451                    "payload_predicates": filter.payload_predicates,
1452                })
1453            })
1454            .collect::<Vec<_>>()
1455    });
1456
1457    serde_json::to_string(&serde_json::json!({
1458        "filters": filters,
1459        "min_sequence_number": event_query.min_sequence_number,
1460    }))
1461    .map_err(json_backend_failure)
1462}