1use std::future::Future;
2use std::io;
3use std::sync::{
4 Arc, Mutex,
5 mpsc::{self, Receiver, Sender},
6};
7use std::thread::{self, JoinHandle};
8
9use factstr::{
10 AppendResult, DurableStream, EventQuery, EventRecord, EventStore, EventStoreError, EventStream,
11 HandleStream, NewEvent, QueryResult,
12};
13use sqlx::{
14 PgPool, Postgres, QueryBuilder, Row, Transaction,
15 postgres::{PgPoolOptions, PgRow},
16};
17use time::OffsetDateTime;
18use tokio::runtime::Builder;
19use tokio::runtime::Runtime;
20use url::Url;
21
22use crate::query_match::matches_query;
23use crate::query_sql::push_query_conditions;
24use crate::stream_registry::{DeliveryOutcome, PendingDelivery, SubscriptionRegistry};
25
26#[derive(Clone, Debug)]
27struct CommittedAppend {
28 append_result: AppendResult,
29 event_records: Vec<EventRecord>,
30}
31
32#[derive(Clone, Debug)]
33struct ReplayBatch {
34 last_processed_sequence_number: u64,
35 delivered_batch: Vec<EventRecord>,
36}
37
38#[derive(Clone, Debug)]
39struct DurableReplayState {
40 subscription_id: u64,
41 last_processed_sequence_number: u64,
42 replay_until_sequence_number: u64,
43}
44
45enum WorkerCommand {
46 Query {
47 event_query: EventQuery,
48 reply: Sender<Result<QueryResult, EventStoreError>>,
49 },
50 Append {
51 new_events: Vec<NewEvent>,
52 reply: Sender<Result<AppendResult, EventStoreError>>,
53 },
54 AppendIf {
55 new_events: Vec<NewEvent>,
56 context_query: EventQuery,
57 expected_context_version: Option<u64>,
58 reply: Sender<Result<AppendResult, EventStoreError>>,
59 },
60 Shutdown,
61}
62
63enum DeliveryCommand {
64 Deliver(Vec<PendingDelivery>),
65 Shutdown,
66}
67
68#[derive(Clone, Debug, Eq, PartialEq)]
69pub struct PostgresBootstrapOptions {
70 pub server_url: String,
71 pub database_name: String,
72}
73
74pub struct PostgresStore {
75 connection_string: String,
76 subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
77 worker_sender: Mutex<Sender<WorkerCommand>>,
78 worker_thread: Mutex<Option<JoinHandle<()>>>,
79 delivery_sender: Sender<DeliveryCommand>,
80 delivery_thread: Mutex<Option<JoinHandle<()>>>,
81}
82
83impl PostgresStore {
84 pub fn connect(connection_string: &str) -> Result<Self, sqlx::Error> {
85 let connection_string = connection_string.to_owned();
86 bootstrap_connection(&connection_string)?;
87
88 let subscription_registry = Arc::new(Mutex::new(SubscriptionRegistry::default()));
89 let (delivery_sender, delivery_receiver) = mpsc::channel();
90 let delivery_thread = thread::Builder::new()
91 .name("factstr-postgres-delivery".to_owned())
92 .spawn({
93 let connection_string = connection_string.clone();
94 let subscription_registry = Arc::clone(&subscription_registry);
95 move || {
96 run_delivery_thread(connection_string, subscription_registry, delivery_receiver)
97 }
98 })
99 .map_err(sqlx_io_error)?;
100
101 let (worker_sender, worker_receiver) = mpsc::channel();
102 let (ready_sender, ready_receiver) = mpsc::sync_channel(1);
103 let worker_thread = thread::Builder::new()
104 .name("factstr-postgres-worker".to_owned())
105 .spawn({
106 let connection_string = connection_string.clone();
107 let subscription_registry = Arc::clone(&subscription_registry);
108 let delivery_sender = delivery_sender.clone();
109 move || {
110 run_worker_thread(
111 connection_string,
112 worker_receiver,
113 ready_sender,
114 subscription_registry,
115 delivery_sender,
116 )
117 }
118 })
119 .map_err(sqlx_io_error)?;
120
121 match ready_receiver.recv() {
122 Ok(Ok(())) => {}
123 Ok(Err(error)) => {
124 let _ = worker_thread.join();
125 let _ = delivery_sender.send(DeliveryCommand::Shutdown);
126 let _ = delivery_thread.join();
127 return Err(error);
128 }
129 Err(error) => {
130 let _ = worker_thread.join();
131 let _ = delivery_sender.send(DeliveryCommand::Shutdown);
132 let _ = delivery_thread.join();
133 return Err(sqlx_io_error(io::Error::other(format!(
134 "postgres worker startup channel failed: {error}"
135 ))));
136 }
137 }
138
139 Ok(Self {
140 connection_string,
141 subscription_registry,
142 worker_sender: Mutex::new(worker_sender),
143 worker_thread: Mutex::new(Some(worker_thread)),
144 delivery_sender,
145 delivery_thread: Mutex::new(Some(delivery_thread)),
146 })
147 }
148
149 pub fn bootstrap(options: PostgresBootstrapOptions) -> Result<Self, EventStoreError> {
150 bootstrap_database(&options)?;
151 let database_url = database_url_with_name(&options.server_url, &options.database_name)?;
152 Self::connect(&database_url).map_err(Self::backend_failure)
153 }
154
155 fn backend_failure(error: sqlx::Error) -> EventStoreError {
156 EventStoreError::BackendFailure {
157 message: error.to_string(),
158 }
159 }
160
161 fn worker_failure(message: impl Into<String>) -> EventStoreError {
162 EventStoreError::BackendFailure {
163 message: message.into(),
164 }
165 }
166
167 fn send_command(&self, worker_command: WorkerCommand) -> Result<(), EventStoreError> {
168 let worker_sender = self
169 .worker_sender
170 .lock()
171 .map_err(|_| Self::worker_failure("postgres worker sender lock poisoned"))?;
172
173 worker_sender
174 .send(worker_command)
175 .map_err(|error| Self::worker_failure(format!("postgres worker stopped: {error}")))
176 }
177
178 fn run_query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
179 let (reply_sender, reply_receiver) = mpsc::channel();
180 self.send_command(WorkerCommand::Query {
181 event_query: event_query.clone(),
182 reply: reply_sender,
183 })?;
184
185 reply_receiver.recv().map_err(|error| {
186 Self::worker_failure(format!("postgres worker query reply failed: {error}"))
187 })?
188 }
189
190 fn run_append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
191 let (reply_sender, reply_receiver) = mpsc::channel();
192 self.send_command(WorkerCommand::Append {
193 new_events,
194 reply: reply_sender,
195 })?;
196
197 reply_receiver.recv().map_err(|error| {
198 Self::worker_failure(format!("postgres worker append reply failed: {error}"))
199 })?
200 }
201
202 fn run_append_if(
203 &self,
204 new_events: Vec<NewEvent>,
205 context_query: &EventQuery,
206 expected_context_version: Option<u64>,
207 ) -> Result<AppendResult, EventStoreError> {
208 let (reply_sender, reply_receiver) = mpsc::channel();
209 self.send_command(WorkerCommand::AppendIf {
210 new_events,
211 context_query: context_query.clone(),
212 expected_context_version,
213 reply: reply_sender,
214 })?;
215
216 reply_receiver.recv().map_err(|error| {
217 Self::worker_failure(format!(
218 "postgres worker conditional append reply failed: {error}"
219 ))
220 })?
221 }
222
223 fn run_async<T, Fut, F>(&self, operation: &'static str, work: F) -> Result<T, EventStoreError>
224 where
225 T: Send + 'static,
226 Fut: Future<Output = Result<T, EventStoreError>> + Send + 'static,
227 F: FnOnce(PgPool) -> Fut + Send + 'static,
228 {
229 let connection_string = self.connection_string.clone();
230 let (result_sender, result_receiver) = mpsc::sync_channel(1);
231
232 let worker_thread = thread::Builder::new()
233 .name(format!("factstr-postgres-{operation}"))
234 .spawn(move || {
235 let runtime = Builder::new_current_thread()
236 .enable_all()
237 .build()
238 .map_err(sqlx_io_error)
239 .map_err(Self::backend_failure);
240
241 let result = match runtime {
242 Ok(runtime) => runtime.block_on(async {
243 let pool = PgPoolOptions::new()
244 .max_connections(1)
245 .connect(&connection_string)
246 .await
247 .map_err(Self::backend_failure)?;
248 work(pool).await
249 }),
250 Err(error) => Err(error),
251 };
252
253 let _ = result_sender.send(result);
254 })
255 .map_err(sqlx_io_error)
256 .map_err(Self::backend_failure)?;
257
258 let result = result_receiver
259 .recv()
260 .map_err(|error| EventStoreError::BackendFailure {
261 message: format!("postgres {operation} thread did not return a result: {error}"),
262 })?;
263
264 worker_thread
265 .join()
266 .map_err(|_| EventStoreError::BackendFailure {
267 message: format!("postgres {operation} thread panicked before completion"),
268 })?;
269
270 result
271 }
272
273 fn enqueue_delivery(&self, pending_deliveries: Vec<PendingDelivery>) {
274 Self::enqueue_delivery_with_sender(&self.delivery_sender, pending_deliveries);
275 }
276
277 fn enqueue_delivery_with_sender(
278 delivery_sender: &Sender<DeliveryCommand>,
279 pending_deliveries: Vec<PendingDelivery>,
280 ) {
281 if pending_deliveries.is_empty() {
282 return;
283 }
284
285 if let Err(error) = delivery_sender.send(DeliveryCommand::Deliver(pending_deliveries)) {
286 eprintln!(
287 "factstr-postgres delivery dispatcher stopped after commit: {}",
288 error
289 );
290 }
291 }
292
293 fn register_all_durable_stream(
294 &self,
295 durable_stream_id: impl Into<String>,
296 handle: HandleStream,
297 ) -> Result<EventStream, EventStoreError> {
298 self.stream_durable(durable_stream_id.into(), EventQuery::all(), handle)
299 }
300
301 fn register_durable_stream(
302 &self,
303 durable_stream_id: impl Into<String>,
304 event_query: &EventQuery,
305 handle: HandleStream,
306 ) -> Result<EventStream, EventStoreError> {
307 self.stream_durable(durable_stream_id.into(), event_query.clone(), handle)
308 }
309
310 fn stream_durable(
311 &self,
312 durable_stream_id: String,
313 event_query: EventQuery,
314 handle: HandleStream,
315 ) -> Result<EventStream, EventStoreError> {
316 let normalized_event_query = normalized_durable_event_query(&event_query);
317 let event_query_json = serialize_event_query(&normalized_event_query)?;
318 let subscription_registry = Arc::clone(&self.subscription_registry);
319 let durable_stream_id_for_registry = durable_stream_id.clone();
320 let normalized_event_query_for_registry = normalized_event_query.clone();
321 let handle_for_registry = handle.clone();
322 let replay_stream_id = durable_stream_id.clone();
323
324 let replay_state = self.run_async("stream_durable", move |pool| async move {
325 let mut transaction = pool.begin().await.map_err(Self::backend_failure)?;
326
327 let last_processed_sequence_number = load_or_create_durable_stream_cursor(
328 &mut transaction,
329 &durable_stream_id_for_registry,
330 &event_query_json,
331 )
332 .await?;
333 let replay_until_sequence_number =
334 current_max_sequence_number_in_transaction(&mut transaction).await?;
335
336 let subscription_id = match subscription_registry.lock() {
337 Ok(mut subscription_registry) => {
338 if normalized_event_query_for_registry.filters.is_none() {
339 subscription_registry
340 .subscribe_all_durable(
341 durable_stream_id_for_registry.clone(),
342 replay_until_sequence_number,
343 handle_for_registry.clone(),
344 )
345 .map_err(subscription_registry_backend_failure)?
346 } else {
347 subscription_registry
348 .subscribe_to_durable(
349 durable_stream_id_for_registry.clone(),
350 Some(normalized_event_query_for_registry.clone()),
351 replay_until_sequence_number,
352 handle_for_registry.clone(),
353 )
354 .map_err(subscription_registry_backend_failure)?
355 }
356 }
357 Err(poisoned) => {
358 let mut subscription_registry = poisoned.into_inner();
359 if normalized_event_query_for_registry.filters.is_none() {
360 subscription_registry
361 .subscribe_all_durable(
362 durable_stream_id_for_registry.clone(),
363 replay_until_sequence_number,
364 handle_for_registry.clone(),
365 )
366 .map_err(subscription_registry_backend_failure)?
367 } else {
368 subscription_registry
369 .subscribe_to_durable(
370 durable_stream_id_for_registry.clone(),
371 Some(normalized_event_query_for_registry.clone()),
372 replay_until_sequence_number,
373 handle_for_registry.clone(),
374 )
375 .map_err(subscription_registry_backend_failure)?
376 }
377 }
378 };
379
380 if let Err(error) = transaction.commit().await {
381 match subscription_registry.lock() {
382 Ok(mut subscription_registry) => {
383 subscription_registry.unsubscribe(subscription_id)
384 }
385 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
386 }
387 return Err(Self::backend_failure(error));
388 }
389
390 Ok(DurableReplayState {
391 subscription_id,
392 last_processed_sequence_number,
393 replay_until_sequence_number,
394 })
395 })?;
396
397 let subscription = self.build_subscription_handle(replay_state.subscription_id);
398
399 let replay_batches = match self.run_async("durable_replay", {
400 let event_query = normalized_event_query.clone();
401 move |pool| async move {
402 ensure_replay_history_is_available_for_pool(
403 &pool,
404 replay_state.replay_until_sequence_number,
405 )
406 .await?;
407 load_replay_batches(
408 &pool,
409 &event_query,
410 replay_state.last_processed_sequence_number,
411 replay_state.replay_until_sequence_number,
412 )
413 .await
414 }
415 }) {
416 Ok(replay_batches) => replay_batches,
417 Err(error) => {
418 self.cleanup_durable_subscription(replay_state.subscription_id);
419 return Err(error);
420 }
421 };
422 let delivery_runtime = build_delivery_runtime("factstr-postgres durable replay")?;
423
424 for replay_batch in replay_batches {
425 let pending_delivery = PendingDelivery {
426 subscription_id: replay_state.subscription_id,
427 durable_stream_id: Some(replay_stream_id.clone()),
428 last_processed_sequence_number: replay_batch.last_processed_sequence_number,
429 delivered_batch: replay_batch.delivered_batch,
430 handle: handle.clone(),
431 };
432
433 match self.process_durable_delivery(&delivery_runtime, pending_delivery) {
434 Ok(true) => {}
435 Ok(false) => {
436 self.cleanup_durable_subscription(replay_state.subscription_id);
437 return Err(EventStoreError::BackendFailure {
438 message: format!(
439 "durable replay for stream {} did not complete successfully",
440 replay_stream_id
441 ),
442 });
443 }
444 Err(error) => {
445 self.cleanup_durable_subscription(replay_state.subscription_id);
446 return Err(error);
447 }
448 }
449 }
450
451 self.finish_durable_replay(replay_state.subscription_id);
452 Ok(subscription)
453 }
454
455 fn build_subscription_handle(&self, subscription_id: u64) -> EventStream {
456 let subscription_registry = Arc::clone(&self.subscription_registry);
457
458 EventStream::new(
459 subscription_id,
460 Arc::new(move |subscription_id| match subscription_registry.lock() {
461 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
462 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
463 }),
464 )
465 }
466
467 fn process_durable_delivery(
468 &self,
469 delivery_runtime: &Runtime,
470 pending_delivery: PendingDelivery,
471 ) -> Result<bool, EventStoreError> {
472 match pending_delivery.deliver(delivery_runtime) {
473 DeliveryOutcome::Succeeded {
474 durable_stream_id,
475 last_processed_sequence_number,
476 ..
477 } => {
478 if let Some(durable_stream_id) = durable_stream_id {
479 self.run_async("advance_durable_cursor", move |pool| async move {
480 update_durable_stream_cursor(
481 &pool,
482 &durable_stream_id,
483 last_processed_sequence_number,
484 )
485 .await
486 })?;
487 }
488
489 Ok(true)
490 }
491 DeliveryOutcome::Failed { .. } | DeliveryOutcome::Panicked { .. } => Ok(false),
492 }
493 }
494
495 fn finish_durable_replay(&self, subscription_id: u64) {
496 match self.subscription_registry.lock() {
497 Ok(mut subscription_registry) => {
498 let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
499 self.enqueue_delivery(buffered_deliveries);
500 }
501 Err(poisoned) => {
502 let mut subscription_registry = poisoned.into_inner();
503 let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
504 self.enqueue_delivery(buffered_deliveries);
505 }
506 }
507 }
508
509 fn cleanup_durable_subscription(&self, subscription_id: u64) {
510 match self.subscription_registry.lock() {
511 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
512 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
513 }
514 }
515}
516
517impl Drop for PostgresStore {
518 fn drop(&mut self) {
519 if let Ok(worker_sender) = self.worker_sender.lock() {
520 let _ = worker_sender.send(WorkerCommand::Shutdown);
521 }
522
523 if let Ok(mut worker_thread) = self.worker_thread.lock() {
524 if let Some(worker_thread) = worker_thread.take() {
525 let _ = worker_thread.join();
526 }
527 }
528
529 let _ = self.delivery_sender.send(DeliveryCommand::Shutdown);
530
531 if let Ok(mut delivery_thread) = self.delivery_thread.lock() {
532 if let Some(delivery_thread) = delivery_thread.take() {
533 let _ = delivery_thread.join();
534 }
535 }
536 }
537}
538
539impl EventStore for PostgresStore {
540 fn query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
541 self.run_query(event_query)
542 }
543
544 fn append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
545 if new_events.is_empty() {
546 return Err(EventStoreError::EmptyAppend);
547 }
548
549 self.run_append(new_events)
550 }
551
552 fn append_if(
553 &self,
554 new_events: Vec<NewEvent>,
555 context_query: &EventQuery,
556 expected_context_version: Option<u64>,
557 ) -> Result<AppendResult, EventStoreError> {
558 if new_events.is_empty() {
559 return Err(EventStoreError::EmptyAppend);
560 }
561
562 self.run_append_if(new_events, context_query, expected_context_version)
563 }
564
565 fn stream_all(&self, handle: HandleStream) -> Result<EventStream, EventStoreError> {
566 let subscription_registry = Arc::clone(&self.subscription_registry);
567 let id = match subscription_registry.lock() {
568 Ok(mut subscription_registry) => subscription_registry.subscribe_all(handle),
569 Err(poisoned) => poisoned.into_inner().subscribe_all(handle),
570 };
571
572 Ok(EventStream::new(
573 id,
574 Arc::new(move |subscription_id| match subscription_registry.lock() {
575 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
576 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
577 }),
578 ))
579 }
580
581 fn stream_to(
582 &self,
583 event_query: &EventQuery,
584 handle: HandleStream,
585 ) -> Result<EventStream, EventStoreError> {
586 let subscription_registry = Arc::clone(&self.subscription_registry);
587 let id = match subscription_registry.lock() {
588 Ok(mut subscription_registry) => {
589 subscription_registry.subscribe_to(Some(event_query.clone()), handle)
590 }
591 Err(poisoned) => poisoned
592 .into_inner()
593 .subscribe_to(Some(event_query.clone()), handle),
594 };
595
596 Ok(EventStream::new(
597 id,
598 Arc::new(move |subscription_id| match subscription_registry.lock() {
599 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
600 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
601 }),
602 ))
603 }
604
605 fn stream_all_durable(
606 &self,
607 durable_stream: &DurableStream,
608 handle: HandleStream,
609 ) -> Result<EventStream, EventStoreError> {
610 self.register_all_durable_stream(durable_stream.name(), handle)
611 }
612
613 fn stream_to_durable(
614 &self,
615 durable_stream: &DurableStream,
616 event_query: &EventQuery,
617 handle: HandleStream,
618 ) -> Result<EventStream, EventStoreError> {
619 self.register_durable_stream(durable_stream.name(), event_query, handle)
620 }
621}
622
623fn run_worker_thread(
624 connection_string: String,
625 worker_receiver: Receiver<WorkerCommand>,
626 ready_sender: mpsc::SyncSender<Result<(), sqlx::Error>>,
627 subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
628 delivery_sender: Sender<DeliveryCommand>,
629) {
630 let runtime = match Builder::new_current_thread().enable_all().build() {
631 Ok(runtime) => runtime,
632 Err(error) => {
633 let _ = ready_sender.send(Err(sqlx_io_error(io::Error::other(format!(
634 "tokio runtime should build: {error}"
635 )))));
636 return;
637 }
638 };
639
640 let pool = match runtime.block_on(async {
641 PgPoolOptions::new()
642 .max_connections(1)
643 .connect(&connection_string)
644 .await
645 }) {
646 Ok(pool) => pool,
647 Err(error) => {
648 let _ = ready_sender.send(Err(error));
649 return;
650 }
651 };
652
653 if ready_sender.send(Ok(())).is_err() {
654 return;
655 }
656
657 while let Ok(worker_command) = worker_receiver.recv() {
658 match worker_command {
659 WorkerCommand::Query { event_query, reply } => {
660 let result = runtime
661 .block_on(query_with_pool(&pool, &event_query))
662 .map_err(PostgresStore::backend_failure);
663 let _ = reply.send(result);
664 }
665 WorkerCommand::Append { new_events, reply } => {
666 let result = runtime
667 .block_on(append_with_pool(&pool, new_events))
668 .map_err(PostgresStore::backend_failure);
669 if let Ok(committed_append) = &result {
670 let pending_deliveries = match subscription_registry.lock() {
671 Ok(mut subscription_registry) => subscription_registry
672 .pending_deliveries(&committed_append.event_records),
673 Err(poisoned) => poisoned
674 .into_inner()
675 .pending_deliveries(&committed_append.event_records),
676 };
677 PostgresStore::enqueue_delivery_with_sender(
678 &delivery_sender,
679 pending_deliveries,
680 );
681 }
682 let result = result.map(|committed_append| committed_append.append_result);
683 let _ = reply.send(result);
684 }
685 WorkerCommand::AppendIf {
686 new_events,
687 context_query,
688 expected_context_version,
689 reply,
690 } => {
691 let result = runtime
692 .block_on(append_if_with_pool(
693 &pool,
694 new_events,
695 &context_query,
696 expected_context_version,
697 ))
698 .map_err(PostgresStore::backend_failure)
699 .and_then(|result| result);
700 if let Ok(committed_append) = &result {
701 let pending_deliveries = match subscription_registry.lock() {
702 Ok(mut subscription_registry) => subscription_registry
703 .pending_deliveries(&committed_append.event_records),
704 Err(poisoned) => poisoned
705 .into_inner()
706 .pending_deliveries(&committed_append.event_records),
707 };
708 PostgresStore::enqueue_delivery_with_sender(
709 &delivery_sender,
710 pending_deliveries,
711 );
712 }
713 let result = result.map(|committed_append| committed_append.append_result);
714 let _ = reply.send(result);
715 }
716 WorkerCommand::Shutdown => break,
717 }
718 }
719}
720
721fn run_delivery_thread(
722 connection_string: String,
723 subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
724 delivery_receiver: Receiver<DeliveryCommand>,
725) {
726 let runtime = match Builder::new_current_thread().enable_all().build() {
727 Ok(runtime) => runtime,
728 Err(error) => {
729 eprintln!(
730 "factstr-postgres delivery runtime could not start: {}",
731 error
732 );
733 return;
734 }
735 };
736
737 let pool = match runtime.block_on(async {
738 PgPoolOptions::new()
739 .max_connections(1)
740 .connect(&connection_string)
741 .await
742 }) {
743 Ok(pool) => pool,
744 Err(error) => {
745 eprintln!(
746 "factstr-postgres delivery pool could not connect: {}",
747 error
748 );
749 return;
750 }
751 };
752
753 while let Ok(delivery_command) = delivery_receiver.recv() {
754 match delivery_command {
755 DeliveryCommand::Deliver(pending_deliveries) => {
756 for pending_delivery in pending_deliveries {
757 match pending_delivery.deliver(&runtime) {
758 DeliveryOutcome::Succeeded {
759 subscription_id,
760 durable_stream_id,
761 last_processed_sequence_number,
762 } => {
763 if let Some(durable_stream_id) = durable_stream_id {
764 if let Err(error) = runtime.block_on(update_durable_stream_cursor(
765 &pool,
766 &durable_stream_id,
767 last_processed_sequence_number,
768 )) {
769 eprintln!(
770 "factstr-postgres durable cursor update failed after delivery for stream {}: {}",
771 durable_stream_id, error
772 );
773 match subscription_registry.lock() {
774 Ok(mut subscription_registry) => {
775 subscription_registry.unsubscribe(subscription_id)
776 }
777 Err(poisoned) => {
778 poisoned.into_inner().unsubscribe(subscription_id)
779 }
780 }
781 }
782 }
783 }
784 DeliveryOutcome::Failed {
785 subscription_id,
786 durable_stream_id,
787 }
788 | DeliveryOutcome::Panicked {
789 subscription_id,
790 durable_stream_id,
791 } => {
792 if durable_stream_id.is_some() {
793 match subscription_registry.lock() {
794 Ok(mut subscription_registry) => {
795 subscription_registry.unsubscribe(subscription_id)
796 }
797 Err(poisoned) => {
798 poisoned.into_inner().unsubscribe(subscription_id)
799 }
800 }
801 }
802 }
803 }
804 }
805 }
806 DeliveryCommand::Shutdown => break,
807 }
808 }
809}
810
811fn sqlx_io_error(error: io::Error) -> sqlx::Error {
812 sqlx::Error::Io(error)
813}
814
815fn backend_failure_message(message: impl Into<String>) -> EventStoreError {
816 EventStoreError::BackendFailure {
817 message: message.into(),
818 }
819}
820
821fn build_delivery_runtime(operation: &'static str) -> Result<Runtime, EventStoreError> {
822 Builder::new_current_thread()
823 .enable_all()
824 .build()
825 .map_err(sqlx_io_error)
826 .map_err(PostgresStore::backend_failure)
827 .map_err(|error| match error {
828 EventStoreError::BackendFailure { message } => EventStoreError::BackendFailure {
829 message: format!("{operation}: {message}"),
830 },
831 other => other,
832 })
833}
834
835fn bootstrap_database(options: &PostgresBootstrapOptions) -> Result<(), EventStoreError> {
836 let options = options.clone();
837 let (result_sender, result_receiver) = mpsc::sync_channel(1);
838
839 let bootstrap_thread = thread::Builder::new()
840 .name("factstr-postgres-database-bootstrap".to_owned())
841 .spawn(move || {
842 let runtime = Builder::new_current_thread()
843 .enable_all()
844 .build()
845 .map_err(sqlx_io_error)
846 .map_err(PostgresStore::backend_failure);
847
848 let result = match runtime {
849 Ok(runtime) => runtime.block_on(async move {
850 ensure_database_exists(&options.server_url, &options.database_name).await
851 }),
852 Err(error) => Err(error),
853 };
854
855 let _ = result_sender.send(result);
856 })
857 .map_err(sqlx_io_error)
858 .map_err(PostgresStore::backend_failure)?;
859
860 let result = result_receiver.recv().map_err(|error| {
861 backend_failure_message(format!(
862 "postgres bootstrap database thread did not return a result: {error}"
863 ))
864 })?;
865
866 bootstrap_thread.join().map_err(|_| {
867 backend_failure_message("postgres bootstrap database thread panicked before completion")
868 })?;
869
870 result
871}
872
873async fn ensure_database_exists(
874 server_url: &str,
875 database_name: &str,
876) -> Result<(), EventStoreError> {
877 validate_bootstrap_database_name(database_name)?;
878 let _ = database_url_with_name(server_url, database_name)?;
879
880 let quoted_database_name = quote_postgres_identifier(database_name)?;
881 let pool = PgPoolOptions::new()
882 .max_connections(1)
883 .connect(server_url)
884 .await
885 .map_err(|error| {
886 backend_failure_message(format!(
887 "postgres bootstrap could not connect to server_url {server_url:?}: {error}"
888 ))
889 })?;
890
891 let database_exists = sqlx::query_scalar::<_, bool>(
892 "SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = $1)",
893 )
894 .bind(database_name)
895 .fetch_one(&pool)
896 .await
897 .map_err(|error| {
898 backend_failure_message(format!(
899 "postgres bootstrap could not inspect database {database_name:?}: {error}"
900 ))
901 })?;
902
903 if database_exists {
904 return Ok(());
905 }
906
907 let create_database_sql = format!("CREATE DATABASE {quoted_database_name}");
908 match sqlx::query(&create_database_sql).execute(&pool).await {
909 Ok(_) => Ok(()),
910 Err(sqlx::Error::Database(database_error))
911 if database_error.code().as_deref() == Some("42P04") =>
912 {
913 Ok(())
914 }
915 Err(error) => Err(backend_failure_message(format!(
916 "postgres bootstrap could not create database {database_name:?}: {error}"
917 ))),
918 }
919}
920
921fn quote_postgres_identifier(identifier: &str) -> Result<String, EventStoreError> {
922 if identifier.is_empty() {
923 return Err(backend_failure_message(
924 "postgres bootstrap database_name must not be empty",
925 ));
926 }
927
928 if identifier.contains('\0') {
929 return Err(backend_failure_message(
930 "postgres bootstrap database_name must not contain null bytes",
931 ));
932 }
933
934 Ok(format!("\"{}\"", identifier.replace('"', "\"\"")))
935}
936
937fn validate_bootstrap_database_name(database_name: &str) -> Result<(), EventStoreError> {
938 if database_name.is_empty() {
939 return Err(backend_failure_message(
940 "postgres bootstrap database_name must not be empty",
941 ));
942 }
943
944 if database_name.contains('\0') {
945 return Err(backend_failure_message(
946 "postgres bootstrap database_name must not contain null bytes",
947 ));
948 }
949
950 let mut characters = database_name.chars();
951 let Some(first_character) = characters.next() else {
952 return Err(backend_failure_message(
953 "postgres bootstrap database_name must not be empty",
954 ));
955 };
956
957 if !(first_character.is_ascii_alphabetic() || first_character == '_') {
958 return Err(backend_failure_message(
959 "postgres bootstrap database_name must match [A-Za-z_][A-Za-z0-9_]*",
960 ));
961 }
962
963 if !characters.all(|character| character.is_ascii_alphanumeric() || character == '_') {
964 return Err(backend_failure_message(
965 "postgres bootstrap database_name must match [A-Za-z_][A-Za-z0-9_]*",
966 ));
967 }
968
969 Ok(())
970}
971
972fn database_url_with_name(
973 server_url: &str,
974 database_name: &str,
975) -> Result<String, EventStoreError> {
976 let mut url = Url::parse(server_url).map_err(|error| {
977 backend_failure_message(format!(
978 "invalid postgres server_url {server_url:?}: {error}"
979 ))
980 })?;
981 url.set_path(&format!("/{database_name}"));
982 Ok(url.into())
983}
984
985fn bootstrap_connection(connection_string: &str) -> Result<(), sqlx::Error> {
986 let connection_string = connection_string.to_owned();
987 let (result_sender, result_receiver) = mpsc::sync_channel(1);
988
989 let bootstrap_thread = thread::Builder::new()
990 .name("factstr-postgres-bootstrap".to_owned())
991 .spawn(move || {
992 let runtime = Builder::new_current_thread()
993 .enable_all()
994 .build()
995 .map_err(sqlx_io_error);
996
997 let result = match runtime {
998 Ok(runtime) => runtime.block_on(async {
999 let pool = PgPoolOptions::new()
1000 .max_connections(1)
1001 .connect(&connection_string)
1002 .await?;
1003 initialize_schema(&pool).await?;
1004 Ok::<_, sqlx::Error>(())
1005 }),
1006 Err(error) => Err(error),
1007 };
1008
1009 let _ = result_sender.send(result);
1010 })
1011 .map_err(sqlx_io_error)?;
1012
1013 let result = match result_receiver.recv() {
1014 Ok(result) => result,
1015 Err(error) => Err(sqlx_io_error(io::Error::other(format!(
1016 "postgres bootstrap thread did not return a result: {error}"
1017 )))),
1018 };
1019
1020 bootstrap_thread.join().map_err(|_| {
1021 sqlx_io_error(io::Error::other(
1022 "postgres bootstrap thread panicked before connect completed",
1023 ))
1024 })?;
1025
1026 result
1027}
1028
1029async fn query_with_pool(
1030 pool: &PgPool,
1031 event_query: &EventQuery,
1032) -> Result<QueryResult, sqlx::Error> {
1033 let current_context_version = current_context_version(pool, event_query).await?;
1034
1035 let mut query_builder: QueryBuilder<'_, Postgres> =
1036 QueryBuilder::new("SELECT sequence_number, occurred_at, event_type, payload FROM events");
1037 push_query_conditions(&mut query_builder, event_query, true);
1038 query_builder.push(" ORDER BY sequence_number ASC");
1039
1040 let event_records = query_builder
1041 .build()
1042 .fetch_all(pool)
1043 .await?
1044 .into_iter()
1045 .map(event_record_from_row)
1046 .collect::<Result<Vec<_>, _>>()?;
1047
1048 let last_returned_sequence_number = event_records
1049 .last()
1050 .map(|event_record| event_record.sequence_number);
1051
1052 Ok(QueryResult {
1053 event_records,
1054 last_returned_sequence_number,
1055 current_context_version,
1056 })
1057}
1058
1059async fn append_with_pool(
1060 pool: &PgPool,
1061 new_events: Vec<NewEvent>,
1062) -> Result<CommittedAppend, sqlx::Error> {
1063 let mut transaction = pool.begin().await?;
1064 lock_events_tables(&mut transaction).await?;
1065 let committed_append = append_records(&mut transaction, new_events).await?;
1066 transaction.commit().await?;
1067 Ok(committed_append)
1068}
1069
1070async fn append_if_with_pool(
1071 pool: &PgPool,
1072 new_events: Vec<NewEvent>,
1073 context_query: &EventQuery,
1074 expected_context_version: Option<u64>,
1075) -> Result<Result<CommittedAppend, EventStoreError>, sqlx::Error> {
1076 let mut transaction = pool.begin().await?;
1077 lock_events_tables(&mut transaction).await?;
1078
1079 let actual_context_version =
1080 current_context_version_in_transaction(&mut transaction, context_query).await?;
1081
1082 if actual_context_version != expected_context_version {
1083 transaction.rollback().await?;
1084 return Ok(Err(EventStoreError::ConditionalAppendConflict {
1085 expected: expected_context_version,
1086 actual: actual_context_version,
1087 }));
1088 }
1089
1090 let committed_append = append_records(&mut transaction, new_events).await?;
1091 transaction.commit().await?;
1092
1093 Ok(Ok(committed_append))
1094}
1095
1096async fn initialize_schema(pool: &PgPool) -> Result<(), sqlx::Error> {
1097 sqlx::query(
1098 "CREATE TABLE IF NOT EXISTS events (
1099 sequence_number BIGINT PRIMARY KEY,
1100 occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
1101 event_type TEXT NOT NULL,
1102 payload JSONB NOT NULL
1103 )",
1104 )
1105 .execute(pool)
1106 .await?;
1107
1108 sqlx::query(
1109 "CREATE TABLE IF NOT EXISTS append_batches (
1110 first_sequence_number BIGINT PRIMARY KEY,
1111 last_sequence_number BIGINT NOT NULL
1112 )",
1113 )
1114 .execute(pool)
1115 .await?;
1116
1117 sqlx::query(
1118 "CREATE TABLE IF NOT EXISTS durable_stream_cursors (
1119 durable_stream_id TEXT PRIMARY KEY,
1120 event_query TEXT NOT NULL,
1121 last_processed_sequence_number BIGINT NOT NULL
1122 )",
1123 )
1124 .execute(pool)
1125 .await?;
1126
1127 sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type)")
1128 .execute(pool)
1129 .await?;
1130
1131 sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_occurred_at ON events(occurred_at)")
1132 .execute(pool)
1133 .await?;
1134
1135 sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_payload_gin ON events USING gin(payload)")
1136 .execute(pool)
1137 .await?;
1138
1139 Ok(())
1140}
1141
1142async fn lock_events_tables(
1143 transaction: &mut Transaction<'_, Postgres>,
1144) -> Result<(), sqlx::Error> {
1145 sqlx::query("LOCK TABLE events, append_batches IN EXCLUSIVE MODE")
1146 .execute(transaction.as_mut())
1147 .await?;
1148
1149 Ok(())
1150}
1151
1152async fn append_records(
1153 transaction: &mut Transaction<'_, Postgres>,
1154 new_events: Vec<NewEvent>,
1155) -> Result<CommittedAppend, sqlx::Error> {
1156 let committed_count = new_events.len() as u64;
1157 let first_sequence_number =
1158 sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(sequence_number), 0) + 1 FROM events")
1159 .fetch_one(transaction.as_mut())
1160 .await? as u64;
1161 let last_sequence_number = first_sequence_number + committed_count - 1;
1162 let committed_event_records = new_events
1163 .into_iter()
1164 .enumerate()
1165 .map(|(offset, new_event)| EventRecord {
1166 sequence_number: first_sequence_number + offset as u64,
1167 occurred_at: OffsetDateTime::now_utc(),
1168 event_type: new_event.event_type,
1169 payload: new_event.payload,
1170 })
1171 .collect::<Vec<_>>();
1172
1173 for event_record in &committed_event_records {
1174 sqlx::query(
1175 "INSERT INTO events (sequence_number, occurred_at, event_type, payload)
1176 VALUES ($1, $2, $3, $4)",
1177 )
1178 .bind(event_record.sequence_number as i64)
1179 .bind(event_record.occurred_at)
1180 .bind(&event_record.event_type)
1181 .bind(&event_record.payload)
1182 .execute(transaction.as_mut())
1183 .await?;
1184 }
1185
1186 sqlx::query(
1187 "INSERT INTO append_batches (first_sequence_number, last_sequence_number)
1188 VALUES ($1, $2)",
1189 )
1190 .bind(first_sequence_number as i64)
1191 .bind(last_sequence_number as i64)
1192 .execute(transaction.as_mut())
1193 .await?;
1194
1195 Ok(CommittedAppend {
1196 append_result: AppendResult {
1197 first_sequence_number,
1198 last_sequence_number,
1199 committed_count,
1200 },
1201 event_records: committed_event_records,
1202 })
1203}
1204
1205async fn current_context_version(
1206 pool: &PgPool,
1207 event_query: &EventQuery,
1208) -> Result<Option<u64>, sqlx::Error> {
1209 let mut query_builder: QueryBuilder<'_, Postgres> =
1210 QueryBuilder::new("SELECT MAX(sequence_number) FROM events");
1211 push_query_conditions(&mut query_builder, event_query, false);
1212
1213 Ok(query_builder
1214 .build_query_scalar::<Option<i64>>()
1215 .fetch_one(pool)
1216 .await?
1217 .map(|sequence_number| sequence_number as u64))
1218}
1219
1220async fn current_context_version_in_transaction(
1221 transaction: &mut Transaction<'_, Postgres>,
1222 event_query: &EventQuery,
1223) -> Result<Option<u64>, sqlx::Error> {
1224 let mut query_builder: QueryBuilder<'_, Postgres> =
1225 QueryBuilder::new("SELECT MAX(sequence_number) FROM events");
1226 push_query_conditions(&mut query_builder, event_query, false);
1227
1228 Ok(query_builder
1229 .build_query_scalar::<Option<i64>>()
1230 .fetch_one(transaction.as_mut())
1231 .await?
1232 .map(|sequence_number| sequence_number as u64))
1233}
1234
1235fn event_record_from_row(row: PgRow) -> Result<EventRecord, sqlx::Error> {
1236 Ok(EventRecord {
1237 sequence_number: row.try_get::<i64, _>("sequence_number")? as u64,
1238 occurred_at: row.try_get("occurred_at")?,
1239 event_type: row.try_get("event_type")?,
1240 payload: row.try_get("payload")?,
1241 })
1242}
1243
1244fn json_backend_failure(error: serde_json::Error) -> EventStoreError {
1245 EventStoreError::BackendFailure {
1246 message: error.to_string(),
1247 }
1248}
1249
1250fn subscription_registry_backend_failure(message: String) -> EventStoreError {
1251 EventStoreError::BackendFailure { message }
1252}
1253
1254async fn load_or_create_durable_stream_cursor(
1255 transaction: &mut Transaction<'_, Postgres>,
1256 durable_stream_id: &str,
1257 event_query_json: &str,
1258) -> Result<u64, EventStoreError> {
1259 let existing_cursor = sqlx::query(
1260 "SELECT event_query, last_processed_sequence_number
1261 FROM durable_stream_cursors
1262 WHERE durable_stream_id = $1",
1263 )
1264 .bind(durable_stream_id)
1265 .fetch_optional(transaction.as_mut())
1266 .await
1267 .map_err(PostgresStore::backend_failure)?;
1268
1269 match existing_cursor {
1270 Some(row) => {
1271 let stored_event_query = row.get::<String, _>("event_query");
1272 if stored_event_query != event_query_json {
1273 return Err(EventStoreError::BackendFailure {
1274 message: format!(
1275 "durable stream {durable_stream_id} was resumed with a different query"
1276 ),
1277 });
1278 }
1279
1280 Ok(row.get::<i64, _>("last_processed_sequence_number") as u64)
1281 }
1282 None => {
1283 sqlx::query(
1284 "INSERT INTO durable_stream_cursors (
1285 durable_stream_id,
1286 event_query,
1287 last_processed_sequence_number
1288 ) VALUES ($1, $2, 0)",
1289 )
1290 .bind(durable_stream_id)
1291 .bind(event_query_json)
1292 .execute(transaction.as_mut())
1293 .await
1294 .map_err(PostgresStore::backend_failure)?;
1295
1296 Ok(0)
1297 }
1298 }
1299}
1300
1301async fn current_max_sequence_number_in_transaction(
1302 transaction: &mut Transaction<'_, Postgres>,
1303) -> Result<u64, EventStoreError> {
1304 Ok(
1305 sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(sequence_number) FROM events")
1306 .fetch_one(transaction.as_mut())
1307 .await
1308 .map_err(PostgresStore::backend_failure)?
1309 .unwrap_or(0) as u64,
1310 )
1311}
1312
1313async fn ensure_replay_history_is_available(
1314 connection: &mut sqlx::PgConnection,
1315 current_max_sequence_number: u64,
1316) -> Result<(), EventStoreError> {
1317 if current_max_sequence_number == 0 {
1318 return Ok(());
1319 }
1320
1321 let batch_rows = sqlx::query(
1322 "SELECT first_sequence_number, last_sequence_number
1323 FROM append_batches
1324 ORDER BY first_sequence_number ASC",
1325 )
1326 .fetch_all(&mut *connection)
1327 .await
1328 .map_err(PostgresStore::backend_failure)?;
1329
1330 if batch_rows.is_empty() {
1331 return Err(EventStoreError::BackendFailure {
1332 message: "durable replay requires append_batches history for all persisted events"
1333 .to_owned(),
1334 });
1335 }
1336
1337 let mut expected_first_sequence_number = 1_u64;
1338
1339 for batch_row in batch_rows {
1340 let first_sequence_number = batch_row.get::<i64, _>("first_sequence_number") as u64;
1341 let last_sequence_number = batch_row.get::<i64, _>("last_sequence_number") as u64;
1342
1343 if first_sequence_number != expected_first_sequence_number
1344 || last_sequence_number < first_sequence_number
1345 {
1346 return Err(EventStoreError::BackendFailure {
1347 message:
1348 "durable replay requires contiguous append_batches history for all persisted events"
1349 .to_owned(),
1350 });
1351 }
1352
1353 expected_first_sequence_number = last_sequence_number + 1;
1354 }
1355
1356 if expected_first_sequence_number - 1 != current_max_sequence_number {
1357 return Err(EventStoreError::BackendFailure {
1358 message: "durable replay requires append_batches history for all persisted events"
1359 .to_owned(),
1360 });
1361 }
1362
1363 Ok(())
1364}
1365
1366async fn ensure_replay_history_is_available_for_pool(
1367 pool: &PgPool,
1368 current_max_sequence_number: u64,
1369) -> Result<(), EventStoreError> {
1370 let mut connection = pool
1371 .acquire()
1372 .await
1373 .map_err(PostgresStore::backend_failure)?;
1374 ensure_replay_history_is_available(connection.as_mut(), current_max_sequence_number).await
1375}
1376
1377async fn update_durable_stream_cursor(
1378 pool: &PgPool,
1379 durable_stream_id: &str,
1380 last_processed_sequence_number: u64,
1381) -> Result<(), EventStoreError> {
1382 sqlx::query(
1383 "UPDATE durable_stream_cursors
1384 SET last_processed_sequence_number = $2
1385 WHERE durable_stream_id = $1",
1386 )
1387 .bind(durable_stream_id)
1388 .bind(last_processed_sequence_number as i64)
1389 .execute(pool)
1390 .await
1391 .map_err(PostgresStore::backend_failure)?;
1392
1393 Ok(())
1394}
1395
1396async fn load_replay_batches(
1397 pool: &PgPool,
1398 event_query: &EventQuery,
1399 last_processed_sequence_number: u64,
1400 replay_until_sequence_number: u64,
1401) -> Result<Vec<ReplayBatch>, EventStoreError> {
1402 if replay_until_sequence_number <= last_processed_sequence_number {
1403 return Ok(Vec::new());
1404 }
1405
1406 let batch_rows = sqlx::query(
1407 "SELECT first_sequence_number, last_sequence_number
1408 FROM append_batches
1409 WHERE last_sequence_number > $1
1410 AND first_sequence_number <= $2
1411 ORDER BY first_sequence_number ASC",
1412 )
1413 .bind(last_processed_sequence_number as i64)
1414 .bind(replay_until_sequence_number as i64)
1415 .fetch_all(pool)
1416 .await
1417 .map_err(PostgresStore::backend_failure)?;
1418
1419 let mut replay_batches = Vec::new();
1420
1421 for batch_row in batch_rows {
1422 let first_sequence_number = batch_row.get::<i64, _>("first_sequence_number") as u64;
1423 let last_sequence_number = batch_row.get::<i64, _>("last_sequence_number") as u64;
1424 let event_rows = sqlx::query(
1425 "SELECT sequence_number, occurred_at, event_type, payload
1426 FROM events
1427 WHERE sequence_number >= $1 AND sequence_number <= $2
1428 ORDER BY sequence_number ASC",
1429 )
1430 .bind(first_sequence_number as i64)
1431 .bind(last_sequence_number as i64)
1432 .fetch_all(pool)
1433 .await
1434 .map_err(PostgresStore::backend_failure)?;
1435
1436 let delivered_batch = event_rows
1437 .into_iter()
1438 .map(event_record_from_row)
1439 .collect::<Result<Vec<_>, _>>()
1440 .map_err(PostgresStore::backend_failure)?
1441 .into_iter()
1442 .filter(|event_record| matches_query(event_query, event_record))
1443 .collect::<Vec<_>>();
1444
1445 replay_batches.push(ReplayBatch {
1446 last_processed_sequence_number: last_sequence_number,
1447 delivered_batch,
1448 });
1449 }
1450
1451 Ok(replay_batches)
1452}
1453
1454fn normalized_durable_event_query(event_query: &EventQuery) -> EventQuery {
1455 EventQuery {
1456 filters: event_query.filters.clone(),
1457 min_sequence_number: None,
1458 }
1459}
1460
1461fn serialize_event_query(event_query: &EventQuery) -> Result<String, EventStoreError> {
1462 let filters = event_query.filters.as_ref().map(|filters| {
1463 filters
1464 .iter()
1465 .map(|filter| {
1466 serde_json::json!({
1467 "event_types": filter.event_types,
1468 "payload_predicates": filter.payload_predicates,
1469 })
1470 })
1471 .collect::<Vec<_>>()
1472 });
1473
1474 serde_json::to_string(&serde_json::json!({
1475 "filters": filters,
1476 "min_sequence_number": event_query.min_sequence_number,
1477 }))
1478 .map_err(json_backend_failure)
1479}