1use std::future::Future;
2use std::io;
3use std::sync::{
4 Arc, Mutex,
5 mpsc::{self, Receiver, Sender},
6};
7use std::thread::{self, JoinHandle};
8
9use factstr::{
10 AppendResult, DurableStream, EventQuery, EventRecord, EventStore, EventStoreError, EventStream,
11 HandleStream, NewEvent, QueryResult,
12};
13use sqlx::{
14 PgPool, Postgres, QueryBuilder, Row, Transaction,
15 postgres::{PgPoolOptions, PgRow},
16};
17use time::OffsetDateTime;
18use tokio::runtime::Builder;
19use tokio::runtime::Runtime;
20use url::Url;
21
22use crate::query_match::matches_query;
23use crate::query_sql::push_query_conditions;
24use crate::stream_registry::{DeliveryOutcome, PendingDelivery, SubscriptionRegistry};
25
26const STORE_FORMAT_VERSION: &str = "1";
27const APPEND_BATCH_BOUNDARY_FORMAT_KEY: &str = "append_batch_boundary_format";
28const APPEND_BATCH_BOUNDARY_FORMAT_SPARSE_V1: &str = "sparse_v1";
29
30#[derive(Clone, Debug)]
31struct CommittedAppend {
32 append_result: AppendResult,
33 event_records: Vec<EventRecord>,
34}
35
36#[derive(Clone, Debug)]
37struct ReplayBatch {
38 last_processed_sequence_number: u64,
39 delivered_batch: Vec<EventRecord>,
40}
41
42#[derive(Clone, Debug)]
43struct AppendBatchBoundary {
44 first_sequence_number: u64,
45 last_sequence_number: u64,
46}
47
48#[derive(Clone, Debug)]
49struct DurableReplayState {
50 subscription_id: u64,
51 last_processed_sequence_number: u64,
52 replay_until_sequence_number: u64,
53}
54
55enum WorkerCommand {
56 Query {
57 event_query: EventQuery,
58 reply: Sender<Result<QueryResult, EventStoreError>>,
59 },
60 Append {
61 new_events: Vec<NewEvent>,
62 reply: Sender<Result<AppendResult, EventStoreError>>,
63 },
64 AppendIf {
65 new_events: Vec<NewEvent>,
66 context_query: EventQuery,
67 expected_context_version: Option<u64>,
68 reply: Sender<Result<AppendResult, EventStoreError>>,
69 },
70 Shutdown,
71}
72
73enum DeliveryCommand {
74 Deliver(Vec<PendingDelivery>),
75 Shutdown,
76}
77
78#[derive(Clone, Debug, Eq, PartialEq)]
79pub struct PostgresBootstrapOptions {
80 pub server_url: String,
81 pub database_name: String,
82}
83
84pub struct PostgresStore {
85 connection_string: String,
86 subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
87 worker_sender: Mutex<Sender<WorkerCommand>>,
88 worker_thread: Mutex<Option<JoinHandle<()>>>,
89 delivery_sender: Sender<DeliveryCommand>,
90 delivery_thread: Mutex<Option<JoinHandle<()>>>,
91}
92
93impl PostgresStore {
94 pub fn connect(connection_string: &str) -> Result<Self, sqlx::Error> {
95 let connection_string = connection_string.to_owned();
96 bootstrap_connection(&connection_string)?;
97
98 let subscription_registry = Arc::new(Mutex::new(SubscriptionRegistry::default()));
99 let (delivery_sender, delivery_receiver) = mpsc::channel();
100 let delivery_thread = thread::Builder::new()
101 .name("factstr-postgres-delivery".to_owned())
102 .spawn({
103 let connection_string = connection_string.clone();
104 let subscription_registry = Arc::clone(&subscription_registry);
105 move || {
106 run_delivery_thread(connection_string, subscription_registry, delivery_receiver)
107 }
108 })
109 .map_err(sqlx_io_error)?;
110
111 let (worker_sender, worker_receiver) = mpsc::channel();
112 let (ready_sender, ready_receiver) = mpsc::sync_channel(1);
113 let worker_thread = thread::Builder::new()
114 .name("factstr-postgres-worker".to_owned())
115 .spawn({
116 let connection_string = connection_string.clone();
117 let subscription_registry = Arc::clone(&subscription_registry);
118 let delivery_sender = delivery_sender.clone();
119 move || {
120 run_worker_thread(
121 connection_string,
122 worker_receiver,
123 ready_sender,
124 subscription_registry,
125 delivery_sender,
126 )
127 }
128 })
129 .map_err(sqlx_io_error)?;
130
131 match ready_receiver.recv() {
132 Ok(Ok(())) => {}
133 Ok(Err(error)) => {
134 let _ = worker_thread.join();
135 let _ = delivery_sender.send(DeliveryCommand::Shutdown);
136 let _ = delivery_thread.join();
137 return Err(error);
138 }
139 Err(error) => {
140 let _ = worker_thread.join();
141 let _ = delivery_sender.send(DeliveryCommand::Shutdown);
142 let _ = delivery_thread.join();
143 return Err(sqlx_io_error(io::Error::other(format!(
144 "postgres worker startup channel failed: {error}"
145 ))));
146 }
147 }
148
149 Ok(Self {
150 connection_string,
151 subscription_registry,
152 worker_sender: Mutex::new(worker_sender),
153 worker_thread: Mutex::new(Some(worker_thread)),
154 delivery_sender,
155 delivery_thread: Mutex::new(Some(delivery_thread)),
156 })
157 }
158
159 pub fn bootstrap(options: PostgresBootstrapOptions) -> Result<Self, EventStoreError> {
160 bootstrap_database(&options)?;
161 let database_url = database_url_with_name(&options.server_url, &options.database_name)?;
162 Self::connect(&database_url).map_err(Self::backend_failure)
163 }
164
165 fn backend_failure(error: sqlx::Error) -> EventStoreError {
166 EventStoreError::BackendFailure {
167 message: error.to_string(),
168 }
169 }
170
171 fn worker_failure(message: impl Into<String>) -> EventStoreError {
172 EventStoreError::BackendFailure {
173 message: message.into(),
174 }
175 }
176
177 fn send_command(&self, worker_command: WorkerCommand) -> Result<(), EventStoreError> {
178 let worker_sender = self
179 .worker_sender
180 .lock()
181 .map_err(|_| Self::worker_failure("postgres worker sender lock poisoned"))?;
182
183 worker_sender
184 .send(worker_command)
185 .map_err(|error| Self::worker_failure(format!("postgres worker stopped: {error}")))
186 }
187
188 fn run_query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
189 let (reply_sender, reply_receiver) = mpsc::channel();
190 self.send_command(WorkerCommand::Query {
191 event_query: event_query.clone(),
192 reply: reply_sender,
193 })?;
194
195 reply_receiver.recv().map_err(|error| {
196 Self::worker_failure(format!("postgres worker query reply failed: {error}"))
197 })?
198 }
199
200 fn run_append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
201 let (reply_sender, reply_receiver) = mpsc::channel();
202 self.send_command(WorkerCommand::Append {
203 new_events,
204 reply: reply_sender,
205 })?;
206
207 reply_receiver.recv().map_err(|error| {
208 Self::worker_failure(format!("postgres worker append reply failed: {error}"))
209 })?
210 }
211
212 fn run_append_if(
213 &self,
214 new_events: Vec<NewEvent>,
215 context_query: &EventQuery,
216 expected_context_version: Option<u64>,
217 ) -> Result<AppendResult, EventStoreError> {
218 let (reply_sender, reply_receiver) = mpsc::channel();
219 self.send_command(WorkerCommand::AppendIf {
220 new_events,
221 context_query: context_query.clone(),
222 expected_context_version,
223 reply: reply_sender,
224 })?;
225
226 reply_receiver.recv().map_err(|error| {
227 Self::worker_failure(format!(
228 "postgres worker conditional append reply failed: {error}"
229 ))
230 })?
231 }
232
233 fn run_async<T, Fut, F>(&self, operation: &'static str, work: F) -> Result<T, EventStoreError>
234 where
235 T: Send + 'static,
236 Fut: Future<Output = Result<T, EventStoreError>> + Send + 'static,
237 F: FnOnce(PgPool) -> Fut + Send + 'static,
238 {
239 let connection_string = self.connection_string.clone();
240 let (result_sender, result_receiver) = mpsc::sync_channel(1);
241
242 let worker_thread = thread::Builder::new()
243 .name(format!("factstr-postgres-{operation}"))
244 .spawn(move || {
245 let runtime = Builder::new_current_thread()
246 .enable_all()
247 .build()
248 .map_err(sqlx_io_error)
249 .map_err(Self::backend_failure);
250
251 let result = match runtime {
252 Ok(runtime) => runtime.block_on(async {
253 let pool = PgPoolOptions::new()
254 .max_connections(1)
255 .connect(&connection_string)
256 .await
257 .map_err(Self::backend_failure)?;
258 work(pool).await
259 }),
260 Err(error) => Err(error),
261 };
262
263 let _ = result_sender.send(result);
264 })
265 .map_err(sqlx_io_error)
266 .map_err(Self::backend_failure)?;
267
268 let result = result_receiver
269 .recv()
270 .map_err(|error| EventStoreError::BackendFailure {
271 message: format!("postgres {operation} thread did not return a result: {error}"),
272 })?;
273
274 worker_thread
275 .join()
276 .map_err(|_| EventStoreError::BackendFailure {
277 message: format!("postgres {operation} thread panicked before completion"),
278 })?;
279
280 result
281 }
282
283 fn enqueue_delivery(&self, pending_deliveries: Vec<PendingDelivery>) {
284 Self::enqueue_delivery_with_sender(&self.delivery_sender, pending_deliveries);
285 }
286
287 fn enqueue_delivery_with_sender(
288 delivery_sender: &Sender<DeliveryCommand>,
289 pending_deliveries: Vec<PendingDelivery>,
290 ) {
291 if pending_deliveries.is_empty() {
292 return;
293 }
294
295 if let Err(error) = delivery_sender.send(DeliveryCommand::Deliver(pending_deliveries)) {
296 eprintln!(
297 "factstr-postgres delivery dispatcher stopped after commit: {}",
298 error
299 );
300 }
301 }
302
303 fn register_all_durable_stream(
304 &self,
305 durable_stream_id: impl Into<String>,
306 handle: HandleStream,
307 ) -> Result<EventStream, EventStoreError> {
308 self.stream_durable(durable_stream_id.into(), EventQuery::all(), handle)
309 }
310
311 fn register_durable_stream(
312 &self,
313 durable_stream_id: impl Into<String>,
314 event_query: &EventQuery,
315 handle: HandleStream,
316 ) -> Result<EventStream, EventStoreError> {
317 self.stream_durable(durable_stream_id.into(), event_query.clone(), handle)
318 }
319
320 fn stream_durable(
321 &self,
322 durable_stream_id: String,
323 event_query: EventQuery,
324 handle: HandleStream,
325 ) -> Result<EventStream, EventStoreError> {
326 let normalized_event_query = normalized_durable_event_query(&event_query);
327 let event_query_json = serialize_event_query(&normalized_event_query)?;
328 let subscription_registry = Arc::clone(&self.subscription_registry);
329 let durable_stream_id_for_registry = durable_stream_id.clone();
330 let normalized_event_query_for_registry = normalized_event_query.clone();
331 let handle_for_registry = handle.clone();
332 let replay_stream_id = durable_stream_id.clone();
333
334 let replay_state = self.run_async("stream_durable", move |pool| async move {
335 let mut transaction = pool.begin().await.map_err(Self::backend_failure)?;
336
337 let last_processed_sequence_number = load_or_create_durable_stream_cursor(
338 &mut transaction,
339 &durable_stream_id_for_registry,
340 &event_query_json,
341 )
342 .await?;
343 let replay_until_sequence_number =
344 current_max_sequence_number_in_transaction(&mut transaction).await?;
345
346 let subscription_id = match subscription_registry.lock() {
347 Ok(mut subscription_registry) => {
348 if normalized_event_query_for_registry.filters.is_none() {
349 subscription_registry
350 .subscribe_all_durable(
351 durable_stream_id_for_registry.clone(),
352 replay_until_sequence_number,
353 handle_for_registry.clone(),
354 )
355 .map_err(subscription_registry_backend_failure)?
356 } else {
357 subscription_registry
358 .subscribe_to_durable(
359 durable_stream_id_for_registry.clone(),
360 Some(normalized_event_query_for_registry.clone()),
361 replay_until_sequence_number,
362 handle_for_registry.clone(),
363 )
364 .map_err(subscription_registry_backend_failure)?
365 }
366 }
367 Err(poisoned) => {
368 let mut subscription_registry = poisoned.into_inner();
369 if normalized_event_query_for_registry.filters.is_none() {
370 subscription_registry
371 .subscribe_all_durable(
372 durable_stream_id_for_registry.clone(),
373 replay_until_sequence_number,
374 handle_for_registry.clone(),
375 )
376 .map_err(subscription_registry_backend_failure)?
377 } else {
378 subscription_registry
379 .subscribe_to_durable(
380 durable_stream_id_for_registry.clone(),
381 Some(normalized_event_query_for_registry.clone()),
382 replay_until_sequence_number,
383 handle_for_registry.clone(),
384 )
385 .map_err(subscription_registry_backend_failure)?
386 }
387 }
388 };
389
390 if let Err(error) = transaction.commit().await {
391 match subscription_registry.lock() {
392 Ok(mut subscription_registry) => {
393 subscription_registry.unsubscribe(subscription_id)
394 }
395 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
396 }
397 return Err(Self::backend_failure(error));
398 }
399
400 Ok(DurableReplayState {
401 subscription_id,
402 last_processed_sequence_number,
403 replay_until_sequence_number,
404 })
405 })?;
406
407 let subscription = self.build_subscription_handle(replay_state.subscription_id);
408
409 let replay_batches = match self.run_async("durable_replay", {
410 let event_query = normalized_event_query.clone();
411 move |pool| async move {
412 ensure_replay_history_is_available_for_pool(
413 &pool,
414 replay_state.replay_until_sequence_number,
415 )
416 .await?;
417 load_replay_batches(
418 &pool,
419 &event_query,
420 replay_state.last_processed_sequence_number,
421 replay_state.replay_until_sequence_number,
422 )
423 .await
424 }
425 }) {
426 Ok(replay_batches) => replay_batches,
427 Err(error) => {
428 self.cleanup_durable_subscription(replay_state.subscription_id);
429 return Err(error);
430 }
431 };
432 let delivery_runtime = build_delivery_runtime("factstr-postgres durable replay")?;
433
434 for replay_batch in replay_batches {
435 let pending_delivery = PendingDelivery {
436 subscription_id: replay_state.subscription_id,
437 durable_stream_id: Some(replay_stream_id.clone()),
438 last_processed_sequence_number: replay_batch.last_processed_sequence_number,
439 delivered_batch: replay_batch.delivered_batch,
440 handle: handle.clone(),
441 };
442
443 match self.process_durable_delivery(&delivery_runtime, pending_delivery) {
444 Ok(true) => {}
445 Ok(false) => {
446 self.cleanup_durable_subscription(replay_state.subscription_id);
447 return Err(EventStoreError::BackendFailure {
448 message: format!(
449 "durable replay for stream {} did not complete successfully",
450 replay_stream_id
451 ),
452 });
453 }
454 Err(error) => {
455 self.cleanup_durable_subscription(replay_state.subscription_id);
456 return Err(error);
457 }
458 }
459 }
460
461 self.finish_durable_replay(replay_state.subscription_id);
462 Ok(subscription)
463 }
464
465 fn build_subscription_handle(&self, subscription_id: u64) -> EventStream {
466 let subscription_registry = Arc::clone(&self.subscription_registry);
467
468 EventStream::new(
469 subscription_id,
470 Arc::new(move |subscription_id| match subscription_registry.lock() {
471 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
472 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
473 }),
474 )
475 }
476
477 fn process_durable_delivery(
478 &self,
479 delivery_runtime: &Runtime,
480 pending_delivery: PendingDelivery,
481 ) -> Result<bool, EventStoreError> {
482 match pending_delivery.deliver(delivery_runtime) {
483 DeliveryOutcome::Succeeded {
484 durable_stream_id,
485 last_processed_sequence_number,
486 ..
487 } => {
488 if let Some(durable_stream_id) = durable_stream_id {
489 self.run_async("advance_durable_cursor", move |pool| async move {
490 update_durable_stream_cursor(
491 &pool,
492 &durable_stream_id,
493 last_processed_sequence_number,
494 )
495 .await
496 })?;
497 }
498
499 Ok(true)
500 }
501 DeliveryOutcome::Failed { .. } | DeliveryOutcome::Panicked { .. } => Ok(false),
502 }
503 }
504
505 fn finish_durable_replay(&self, subscription_id: u64) {
506 match self.subscription_registry.lock() {
507 Ok(mut subscription_registry) => {
508 let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
509 self.enqueue_delivery(buffered_deliveries);
510 }
511 Err(poisoned) => {
512 let mut subscription_registry = poisoned.into_inner();
513 let buffered_deliveries = subscription_registry.finish_replay(subscription_id);
514 self.enqueue_delivery(buffered_deliveries);
515 }
516 }
517 }
518
519 fn cleanup_durable_subscription(&self, subscription_id: u64) {
520 match self.subscription_registry.lock() {
521 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
522 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
523 }
524 }
525}
526
527impl Drop for PostgresStore {
528 fn drop(&mut self) {
529 if let Ok(worker_sender) = self.worker_sender.lock() {
530 let _ = worker_sender.send(WorkerCommand::Shutdown);
531 }
532
533 if let Ok(mut worker_thread) = self.worker_thread.lock() {
534 if let Some(worker_thread) = worker_thread.take() {
535 let _ = worker_thread.join();
536 }
537 }
538
539 let _ = self.delivery_sender.send(DeliveryCommand::Shutdown);
540
541 if let Ok(mut delivery_thread) = self.delivery_thread.lock() {
542 if let Some(delivery_thread) = delivery_thread.take() {
543 let _ = delivery_thread.join();
544 }
545 }
546 }
547}
548
549impl EventStore for PostgresStore {
550 fn query(&self, event_query: &EventQuery) -> Result<QueryResult, EventStoreError> {
551 self.run_query(event_query)
552 }
553
554 fn append(&self, new_events: Vec<NewEvent>) -> Result<AppendResult, EventStoreError> {
555 if new_events.is_empty() {
556 return Err(EventStoreError::EmptyAppend);
557 }
558
559 self.run_append(new_events)
560 }
561
562 fn append_if(
563 &self,
564 new_events: Vec<NewEvent>,
565 context_query: &EventQuery,
566 expected_context_version: Option<u64>,
567 ) -> Result<AppendResult, EventStoreError> {
568 if new_events.is_empty() {
569 return Err(EventStoreError::EmptyAppend);
570 }
571
572 self.run_append_if(new_events, context_query, expected_context_version)
573 }
574
575 fn stream_all(&self, handle: HandleStream) -> Result<EventStream, EventStoreError> {
576 let subscription_registry = Arc::clone(&self.subscription_registry);
577 let id = match subscription_registry.lock() {
578 Ok(mut subscription_registry) => subscription_registry.subscribe_all(handle),
579 Err(poisoned) => poisoned.into_inner().subscribe_all(handle),
580 };
581
582 Ok(EventStream::new(
583 id,
584 Arc::new(move |subscription_id| match subscription_registry.lock() {
585 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
586 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
587 }),
588 ))
589 }
590
591 fn stream_to(
592 &self,
593 event_query: &EventQuery,
594 handle: HandleStream,
595 ) -> Result<EventStream, EventStoreError> {
596 let subscription_registry = Arc::clone(&self.subscription_registry);
597 let id = match subscription_registry.lock() {
598 Ok(mut subscription_registry) => {
599 subscription_registry.subscribe_to(Some(event_query.clone()), handle)
600 }
601 Err(poisoned) => poisoned
602 .into_inner()
603 .subscribe_to(Some(event_query.clone()), handle),
604 };
605
606 Ok(EventStream::new(
607 id,
608 Arc::new(move |subscription_id| match subscription_registry.lock() {
609 Ok(mut subscription_registry) => subscription_registry.unsubscribe(subscription_id),
610 Err(poisoned) => poisoned.into_inner().unsubscribe(subscription_id),
611 }),
612 ))
613 }
614
615 fn stream_all_durable(
616 &self,
617 durable_stream: &DurableStream,
618 handle: HandleStream,
619 ) -> Result<EventStream, EventStoreError> {
620 self.register_all_durable_stream(durable_stream.name(), handle)
621 }
622
623 fn stream_to_durable(
624 &self,
625 durable_stream: &DurableStream,
626 event_query: &EventQuery,
627 handle: HandleStream,
628 ) -> Result<EventStream, EventStoreError> {
629 self.register_durable_stream(durable_stream.name(), event_query, handle)
630 }
631}
632
633fn run_worker_thread(
634 connection_string: String,
635 worker_receiver: Receiver<WorkerCommand>,
636 ready_sender: mpsc::SyncSender<Result<(), sqlx::Error>>,
637 subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
638 delivery_sender: Sender<DeliveryCommand>,
639) {
640 let runtime = match Builder::new_current_thread().enable_all().build() {
641 Ok(runtime) => runtime,
642 Err(error) => {
643 let _ = ready_sender.send(Err(sqlx_io_error(io::Error::other(format!(
644 "tokio runtime should build: {error}"
645 )))));
646 return;
647 }
648 };
649
650 let pool = match runtime.block_on(async {
651 PgPoolOptions::new()
652 .max_connections(1)
653 .connect(&connection_string)
654 .await
655 }) {
656 Ok(pool) => pool,
657 Err(error) => {
658 let _ = ready_sender.send(Err(error));
659 return;
660 }
661 };
662
663 if ready_sender.send(Ok(())).is_err() {
664 return;
665 }
666
667 while let Ok(worker_command) = worker_receiver.recv() {
668 match worker_command {
669 WorkerCommand::Query { event_query, reply } => {
670 let result = runtime
671 .block_on(query_with_pool(&pool, &event_query))
672 .map_err(PostgresStore::backend_failure);
673 let _ = reply.send(result);
674 }
675 WorkerCommand::Append { new_events, reply } => {
676 let result = runtime
677 .block_on(append_with_pool(&pool, new_events))
678 .map_err(PostgresStore::backend_failure);
679 if let Ok(committed_append) = &result {
680 let pending_deliveries = match subscription_registry.lock() {
681 Ok(mut subscription_registry) => subscription_registry
682 .pending_deliveries(&committed_append.event_records),
683 Err(poisoned) => poisoned
684 .into_inner()
685 .pending_deliveries(&committed_append.event_records),
686 };
687 PostgresStore::enqueue_delivery_with_sender(
688 &delivery_sender,
689 pending_deliveries,
690 );
691 }
692 let result = result.map(|committed_append| committed_append.append_result);
693 let _ = reply.send(result);
694 }
695 WorkerCommand::AppendIf {
696 new_events,
697 context_query,
698 expected_context_version,
699 reply,
700 } => {
701 let result = runtime
702 .block_on(append_if_with_pool(
703 &pool,
704 new_events,
705 &context_query,
706 expected_context_version,
707 ))
708 .map_err(PostgresStore::backend_failure)
709 .and_then(|result| result);
710 if let Ok(committed_append) = &result {
711 let pending_deliveries = match subscription_registry.lock() {
712 Ok(mut subscription_registry) => subscription_registry
713 .pending_deliveries(&committed_append.event_records),
714 Err(poisoned) => poisoned
715 .into_inner()
716 .pending_deliveries(&committed_append.event_records),
717 };
718 PostgresStore::enqueue_delivery_with_sender(
719 &delivery_sender,
720 pending_deliveries,
721 );
722 }
723 let result = result.map(|committed_append| committed_append.append_result);
724 let _ = reply.send(result);
725 }
726 WorkerCommand::Shutdown => break,
727 }
728 }
729}
730
731fn run_delivery_thread(
732 connection_string: String,
733 subscription_registry: Arc<Mutex<SubscriptionRegistry>>,
734 delivery_receiver: Receiver<DeliveryCommand>,
735) {
736 let runtime = match Builder::new_current_thread().enable_all().build() {
737 Ok(runtime) => runtime,
738 Err(error) => {
739 eprintln!(
740 "factstr-postgres delivery runtime could not start: {}",
741 error
742 );
743 return;
744 }
745 };
746
747 let pool = match runtime.block_on(async {
748 PgPoolOptions::new()
749 .max_connections(1)
750 .connect(&connection_string)
751 .await
752 }) {
753 Ok(pool) => pool,
754 Err(error) => {
755 eprintln!(
756 "factstr-postgres delivery pool could not connect: {}",
757 error
758 );
759 return;
760 }
761 };
762
763 while let Ok(delivery_command) = delivery_receiver.recv() {
764 match delivery_command {
765 DeliveryCommand::Deliver(pending_deliveries) => {
766 for pending_delivery in pending_deliveries {
767 match pending_delivery.deliver(&runtime) {
768 DeliveryOutcome::Succeeded {
769 subscription_id,
770 durable_stream_id,
771 last_processed_sequence_number,
772 } => {
773 if let Some(durable_stream_id) = durable_stream_id {
774 if let Err(error) = runtime.block_on(update_durable_stream_cursor(
775 &pool,
776 &durable_stream_id,
777 last_processed_sequence_number,
778 )) {
779 eprintln!(
780 "factstr-postgres durable cursor update failed after delivery for stream {}: {}",
781 durable_stream_id, error
782 );
783 match subscription_registry.lock() {
784 Ok(mut subscription_registry) => {
785 subscription_registry.unsubscribe(subscription_id)
786 }
787 Err(poisoned) => {
788 poisoned.into_inner().unsubscribe(subscription_id)
789 }
790 }
791 }
792 }
793 }
794 DeliveryOutcome::Failed {
795 subscription_id,
796 durable_stream_id,
797 }
798 | DeliveryOutcome::Panicked {
799 subscription_id,
800 durable_stream_id,
801 } => {
802 if durable_stream_id.is_some() {
803 match subscription_registry.lock() {
804 Ok(mut subscription_registry) => {
805 subscription_registry.unsubscribe(subscription_id)
806 }
807 Err(poisoned) => {
808 poisoned.into_inner().unsubscribe(subscription_id)
809 }
810 }
811 }
812 }
813 }
814 }
815 }
816 DeliveryCommand::Shutdown => break,
817 }
818 }
819}
820
821fn sqlx_io_error(error: io::Error) -> sqlx::Error {
822 sqlx::Error::Io(error)
823}
824
825fn backend_failure_message(message: impl Into<String>) -> EventStoreError {
826 EventStoreError::BackendFailure {
827 message: message.into(),
828 }
829}
830
831fn build_delivery_runtime(operation: &'static str) -> Result<Runtime, EventStoreError> {
832 Builder::new_current_thread()
833 .enable_all()
834 .build()
835 .map_err(sqlx_io_error)
836 .map_err(PostgresStore::backend_failure)
837 .map_err(|error| match error {
838 EventStoreError::BackendFailure { message } => EventStoreError::BackendFailure {
839 message: format!("{operation}: {message}"),
840 },
841 other => other,
842 })
843}
844
845fn bootstrap_database(options: &PostgresBootstrapOptions) -> Result<(), EventStoreError> {
846 let options = options.clone();
847 let (result_sender, result_receiver) = mpsc::sync_channel(1);
848
849 let bootstrap_thread = thread::Builder::new()
850 .name("factstr-postgres-database-bootstrap".to_owned())
851 .spawn(move || {
852 let runtime = Builder::new_current_thread()
853 .enable_all()
854 .build()
855 .map_err(sqlx_io_error)
856 .map_err(PostgresStore::backend_failure);
857
858 let result = match runtime {
859 Ok(runtime) => runtime.block_on(async move {
860 ensure_database_exists(&options.server_url, &options.database_name).await
861 }),
862 Err(error) => Err(error),
863 };
864
865 let _ = result_sender.send(result);
866 })
867 .map_err(sqlx_io_error)
868 .map_err(PostgresStore::backend_failure)?;
869
870 let result = result_receiver.recv().map_err(|error| {
871 backend_failure_message(format!(
872 "postgres bootstrap database thread did not return a result: {error}"
873 ))
874 })?;
875
876 bootstrap_thread.join().map_err(|_| {
877 backend_failure_message("postgres bootstrap database thread panicked before completion")
878 })?;
879
880 result
881}
882
883async fn ensure_database_exists(
884 server_url: &str,
885 database_name: &str,
886) -> Result<(), EventStoreError> {
887 validate_bootstrap_database_name(database_name)?;
888 let _ = database_url_with_name(server_url, database_name)?;
889
890 let quoted_database_name = quote_postgres_identifier(database_name)?;
891 let pool = PgPoolOptions::new()
892 .max_connections(1)
893 .connect(server_url)
894 .await
895 .map_err(|error| {
896 backend_failure_message(format!(
897 "postgres bootstrap could not connect to server_url {server_url:?}: {error}"
898 ))
899 })?;
900
901 let database_exists = sqlx::query_scalar::<_, bool>(
902 "SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = $1)",
903 )
904 .bind(database_name)
905 .fetch_one(&pool)
906 .await
907 .map_err(|error| {
908 backend_failure_message(format!(
909 "postgres bootstrap could not inspect database {database_name:?}: {error}"
910 ))
911 })?;
912
913 if database_exists {
914 return Ok(());
915 }
916
917 let create_database_sql = format!("CREATE DATABASE {quoted_database_name}");
918 match sqlx::query(&create_database_sql).execute(&pool).await {
919 Ok(_) => Ok(()),
920 Err(sqlx::Error::Database(database_error))
921 if database_error.code().as_deref() == Some("42P04") =>
922 {
923 Ok(())
924 }
925 Err(error) => Err(backend_failure_message(format!(
926 "postgres bootstrap could not create database {database_name:?}: {error}"
927 ))),
928 }
929}
930
931fn quote_postgres_identifier(identifier: &str) -> Result<String, EventStoreError> {
932 if identifier.is_empty() {
933 return Err(backend_failure_message(
934 "postgres bootstrap database_name must not be empty",
935 ));
936 }
937
938 if identifier.contains('\0') {
939 return Err(backend_failure_message(
940 "postgres bootstrap database_name must not contain null bytes",
941 ));
942 }
943
944 Ok(format!("\"{}\"", identifier.replace('"', "\"\"")))
945}
946
947fn validate_bootstrap_database_name(database_name: &str) -> Result<(), EventStoreError> {
948 if database_name.is_empty() {
949 return Err(backend_failure_message(
950 "postgres bootstrap database_name must not be empty",
951 ));
952 }
953
954 if database_name.contains('\0') {
955 return Err(backend_failure_message(
956 "postgres bootstrap database_name must not contain null bytes",
957 ));
958 }
959
960 let mut characters = database_name.chars();
961 let Some(first_character) = characters.next() else {
962 return Err(backend_failure_message(
963 "postgres bootstrap database_name must not be empty",
964 ));
965 };
966
967 if !(first_character.is_ascii_alphabetic() || first_character == '_') {
968 return Err(backend_failure_message(
969 "postgres bootstrap database_name must match [A-Za-z_][A-Za-z0-9_]*",
970 ));
971 }
972
973 if !characters.all(|character| character.is_ascii_alphanumeric() || character == '_') {
974 return Err(backend_failure_message(
975 "postgres bootstrap database_name must match [A-Za-z_][A-Za-z0-9_]*",
976 ));
977 }
978
979 Ok(())
980}
981
982fn database_url_with_name(
983 server_url: &str,
984 database_name: &str,
985) -> Result<String, EventStoreError> {
986 let mut url = Url::parse(server_url).map_err(|error| {
987 backend_failure_message(format!(
988 "invalid postgres server_url {server_url:?}: {error}"
989 ))
990 })?;
991 url.set_path(&format!("/{database_name}"));
992 Ok(url.into())
993}
994
995fn bootstrap_connection(connection_string: &str) -> Result<(), sqlx::Error> {
996 let connection_string = connection_string.to_owned();
997 let (result_sender, result_receiver) = mpsc::sync_channel(1);
998
999 let bootstrap_thread = thread::Builder::new()
1000 .name("factstr-postgres-bootstrap".to_owned())
1001 .spawn(move || {
1002 let runtime = Builder::new_current_thread()
1003 .enable_all()
1004 .build()
1005 .map_err(sqlx_io_error);
1006
1007 let result = match runtime {
1008 Ok(runtime) => runtime.block_on(async {
1009 let pool = PgPoolOptions::new()
1010 .max_connections(1)
1011 .connect(&connection_string)
1012 .await?;
1013 initialize_schema(&pool).await?;
1014 Ok::<_, sqlx::Error>(())
1015 }),
1016 Err(error) => Err(error),
1017 };
1018
1019 let _ = result_sender.send(result);
1020 })
1021 .map_err(sqlx_io_error)?;
1022
1023 let result = match result_receiver.recv() {
1024 Ok(result) => result,
1025 Err(error) => Err(sqlx_io_error(io::Error::other(format!(
1026 "postgres bootstrap thread did not return a result: {error}"
1027 )))),
1028 };
1029
1030 bootstrap_thread.join().map_err(|_| {
1031 sqlx_io_error(io::Error::other(
1032 "postgres bootstrap thread panicked before connect completed",
1033 ))
1034 })?;
1035
1036 result
1037}
1038
1039async fn query_with_pool(
1040 pool: &PgPool,
1041 event_query: &EventQuery,
1042) -> Result<QueryResult, sqlx::Error> {
1043 let current_context_version = current_context_version(pool, event_query).await?;
1044
1045 let mut query_builder: QueryBuilder<'_, Postgres> =
1046 QueryBuilder::new("SELECT sequence_number, occurred_at, event_type, payload FROM events");
1047 push_query_conditions(&mut query_builder, event_query, true);
1048 query_builder.push(" ORDER BY sequence_number ASC");
1049
1050 let event_records = query_builder
1051 .build()
1052 .fetch_all(pool)
1053 .await?
1054 .into_iter()
1055 .map(event_record_from_row)
1056 .collect::<Result<Vec<_>, _>>()?;
1057
1058 let last_returned_sequence_number = event_records
1059 .last()
1060 .map(|event_record| event_record.sequence_number);
1061
1062 Ok(QueryResult {
1063 event_records,
1064 last_returned_sequence_number,
1065 current_context_version,
1066 })
1067}
1068
1069async fn append_with_pool(
1070 pool: &PgPool,
1071 new_events: Vec<NewEvent>,
1072) -> Result<CommittedAppend, sqlx::Error> {
1073 let mut transaction = pool.begin().await?;
1074 lock_events_tables(&mut transaction).await?;
1075 let committed_append = append_records(&mut transaction, new_events).await?;
1076 transaction.commit().await?;
1077 Ok(committed_append)
1078}
1079
1080async fn append_if_with_pool(
1081 pool: &PgPool,
1082 new_events: Vec<NewEvent>,
1083 context_query: &EventQuery,
1084 expected_context_version: Option<u64>,
1085) -> Result<Result<CommittedAppend, EventStoreError>, sqlx::Error> {
1086 let mut transaction = pool.begin().await?;
1087 lock_events_tables(&mut transaction).await?;
1088
1089 let actual_context_version =
1090 current_context_version_in_transaction(&mut transaction, context_query).await?;
1091
1092 if actual_context_version != expected_context_version {
1093 transaction.rollback().await?;
1094 return Ok(Err(EventStoreError::ConditionalAppendConflict {
1095 expected: expected_context_version,
1096 actual: actual_context_version,
1097 }));
1098 }
1099
1100 let committed_append = append_records(&mut transaction, new_events).await?;
1101 transaction.commit().await?;
1102
1103 Ok(Ok(committed_append))
1104}
1105
1106async fn initialize_schema(pool: &PgPool) -> Result<(), sqlx::Error> {
1107 let is_new_schema = sqlx::query_scalar::<_, Option<String>>(
1108 "SELECT to_regclass(current_schema() || '.events')::text",
1109 )
1110 .fetch_one(pool)
1111 .await?
1112 .is_none();
1113
1114 sqlx::query(
1115 "CREATE TABLE IF NOT EXISTS events (
1116 sequence_number BIGINT PRIMARY KEY,
1117 occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
1118 event_type TEXT NOT NULL,
1119 payload JSONB NOT NULL
1120 )",
1121 )
1122 .execute(pool)
1123 .await?;
1124
1125 sqlx::query(
1126 "CREATE TABLE IF NOT EXISTS append_batches (
1127 first_sequence_number BIGINT PRIMARY KEY,
1128 last_sequence_number BIGINT NOT NULL
1129 )",
1130 )
1131 .execute(pool)
1132 .await?;
1133
1134 sqlx::query(
1135 "CREATE TABLE IF NOT EXISTS store_metadata (
1136 key TEXT PRIMARY KEY,
1137 value TEXT NOT NULL
1138 )",
1139 )
1140 .execute(pool)
1141 .await?;
1142
1143 sqlx::query(
1144 "CREATE TABLE IF NOT EXISTS durable_stream_cursors (
1145 durable_stream_id TEXT PRIMARY KEY,
1146 event_query TEXT NOT NULL,
1147 last_processed_sequence_number BIGINT NOT NULL
1148 )",
1149 )
1150 .execute(pool)
1151 .await?;
1152
1153 sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type)")
1154 .execute(pool)
1155 .await?;
1156
1157 sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_occurred_at ON events(occurred_at)")
1158 .execute(pool)
1159 .await?;
1160
1161 sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_payload_gin ON events USING gin(payload)")
1162 .execute(pool)
1163 .await?;
1164
1165 sqlx::query(
1166 "INSERT INTO store_metadata (key, value)
1167 VALUES ('store_format_version', $1)
1168 ON CONFLICT(key) DO NOTHING",
1169 )
1170 .bind(STORE_FORMAT_VERSION)
1171 .execute(pool)
1172 .await?;
1173
1174 if is_new_schema {
1175 sqlx::query(
1176 "INSERT INTO store_metadata (key, value)
1177 VALUES ($1, $2)",
1178 )
1179 .bind(APPEND_BATCH_BOUNDARY_FORMAT_KEY)
1180 .bind(APPEND_BATCH_BOUNDARY_FORMAT_SPARSE_V1)
1181 .execute(pool)
1182 .await?;
1183 }
1184
1185 Ok(())
1186}
1187
1188async fn lock_events_tables(
1189 transaction: &mut Transaction<'_, Postgres>,
1190) -> Result<(), sqlx::Error> {
1191 sqlx::query("LOCK TABLE events, append_batches IN EXCLUSIVE MODE")
1192 .execute(transaction.as_mut())
1193 .await?;
1194
1195 Ok(())
1196}
1197
1198async fn append_records(
1199 transaction: &mut Transaction<'_, Postgres>,
1200 new_events: Vec<NewEvent>,
1201) -> Result<CommittedAppend, sqlx::Error> {
1202 let committed_count = new_events.len() as u64;
1203 let first_sequence_number =
1204 sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(sequence_number), 0) + 1 FROM events")
1205 .fetch_one(transaction.as_mut())
1206 .await? as u64;
1207 let last_sequence_number = first_sequence_number + committed_count - 1;
1208 let committed_event_records = new_events
1209 .into_iter()
1210 .enumerate()
1211 .map(|(offset, new_event)| EventRecord {
1212 sequence_number: first_sequence_number + offset as u64,
1213 occurred_at: OffsetDateTime::now_utc(),
1214 event_type: new_event.event_type,
1215 payload: new_event.payload,
1216 })
1217 .collect::<Vec<_>>();
1218
1219 for event_record in &committed_event_records {
1220 sqlx::query(
1221 "INSERT INTO events (sequence_number, occurred_at, event_type, payload)
1222 VALUES ($1, $2, $3, $4)",
1223 )
1224 .bind(event_record.sequence_number as i64)
1225 .bind(event_record.occurred_at)
1226 .bind(&event_record.event_type)
1227 .bind(&event_record.payload)
1228 .execute(transaction.as_mut())
1229 .await?;
1230 }
1231
1232 if first_sequence_number != last_sequence_number {
1233 sqlx::query(
1234 "INSERT INTO append_batches (first_sequence_number, last_sequence_number)
1235 VALUES ($1, $2)",
1236 )
1237 .bind(first_sequence_number as i64)
1238 .bind(last_sequence_number as i64)
1239 .execute(transaction.as_mut())
1240 .await?;
1241 }
1242
1243 Ok(CommittedAppend {
1244 append_result: AppendResult {
1245 first_sequence_number,
1246 last_sequence_number,
1247 committed_count,
1248 },
1249 event_records: committed_event_records,
1250 })
1251}
1252
1253async fn current_context_version(
1254 pool: &PgPool,
1255 event_query: &EventQuery,
1256) -> Result<Option<u64>, sqlx::Error> {
1257 let mut query_builder: QueryBuilder<'_, Postgres> =
1258 QueryBuilder::new("SELECT MAX(sequence_number) FROM events");
1259 push_query_conditions(&mut query_builder, event_query, false);
1260
1261 Ok(query_builder
1262 .build_query_scalar::<Option<i64>>()
1263 .fetch_one(pool)
1264 .await?
1265 .map(|sequence_number| sequence_number as u64))
1266}
1267
1268async fn current_context_version_in_transaction(
1269 transaction: &mut Transaction<'_, Postgres>,
1270 event_query: &EventQuery,
1271) -> Result<Option<u64>, sqlx::Error> {
1272 let mut query_builder: QueryBuilder<'_, Postgres> =
1273 QueryBuilder::new("SELECT MAX(sequence_number) FROM events");
1274 push_query_conditions(&mut query_builder, event_query, false);
1275
1276 Ok(query_builder
1277 .build_query_scalar::<Option<i64>>()
1278 .fetch_one(transaction.as_mut())
1279 .await?
1280 .map(|sequence_number| sequence_number as u64))
1281}
1282
1283fn event_record_from_row(row: PgRow) -> Result<EventRecord, sqlx::Error> {
1284 Ok(EventRecord {
1285 sequence_number: row.try_get::<i64, _>("sequence_number")? as u64,
1286 occurred_at: row.try_get("occurred_at")?,
1287 event_type: row.try_get("event_type")?,
1288 payload: row.try_get("payload")?,
1289 })
1290}
1291
1292fn json_backend_failure(error: serde_json::Error) -> EventStoreError {
1293 EventStoreError::BackendFailure {
1294 message: error.to_string(),
1295 }
1296}
1297
1298fn subscription_registry_backend_failure(message: String) -> EventStoreError {
1299 EventStoreError::BackendFailure { message }
1300}
1301
1302async fn load_or_create_durable_stream_cursor(
1303 transaction: &mut Transaction<'_, Postgres>,
1304 durable_stream_id: &str,
1305 event_query_json: &str,
1306) -> Result<u64, EventStoreError> {
1307 let existing_cursor = sqlx::query(
1308 "SELECT event_query, last_processed_sequence_number
1309 FROM durable_stream_cursors
1310 WHERE durable_stream_id = $1",
1311 )
1312 .bind(durable_stream_id)
1313 .fetch_optional(transaction.as_mut())
1314 .await
1315 .map_err(PostgresStore::backend_failure)?;
1316
1317 match existing_cursor {
1318 Some(row) => {
1319 let stored_event_query = row.get::<String, _>("event_query");
1320 if stored_event_query != event_query_json {
1321 return Err(EventStoreError::BackendFailure {
1322 message: format!(
1323 "durable stream {durable_stream_id} was resumed with a different query"
1324 ),
1325 });
1326 }
1327
1328 Ok(row.get::<i64, _>("last_processed_sequence_number") as u64)
1329 }
1330 None => {
1331 sqlx::query(
1332 "INSERT INTO durable_stream_cursors (
1333 durable_stream_id,
1334 event_query,
1335 last_processed_sequence_number
1336 ) VALUES ($1, $2, 0)",
1337 )
1338 .bind(durable_stream_id)
1339 .bind(event_query_json)
1340 .execute(transaction.as_mut())
1341 .await
1342 .map_err(PostgresStore::backend_failure)?;
1343
1344 Ok(0)
1345 }
1346 }
1347}
1348
1349async fn current_max_sequence_number_in_transaction(
1350 transaction: &mut Transaction<'_, Postgres>,
1351) -> Result<u64, EventStoreError> {
1352 Ok(
1353 sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(sequence_number) FROM events")
1354 .fetch_one(transaction.as_mut())
1355 .await
1356 .map_err(PostgresStore::backend_failure)?
1357 .unwrap_or(0) as u64,
1358 )
1359}
1360
1361async fn ensure_replay_history_is_available(
1362 connection: &mut sqlx::PgConnection,
1363 _current_max_sequence_number: u64,
1364) -> Result<(), EventStoreError> {
1365 let boundary_format =
1366 sqlx::query_scalar::<_, Option<String>>("SELECT value FROM store_metadata WHERE key = $1")
1367 .bind(APPEND_BATCH_BOUNDARY_FORMAT_KEY)
1368 .fetch_optional(&mut *connection)
1369 .await
1370 .map_err(PostgresStore::backend_failure)?;
1371
1372 if boundary_format.flatten().as_deref() != Some(APPEND_BATCH_BOUNDARY_FORMAT_SPARSE_V1) {
1373 return Err(EventStoreError::BackendFailure {
1374 message: format!(
1375 "durable replay requires store_metadata {}={}",
1376 APPEND_BATCH_BOUNDARY_FORMAT_KEY, APPEND_BATCH_BOUNDARY_FORMAT_SPARSE_V1
1377 ),
1378 });
1379 }
1380
1381 Ok(())
1382}
1383
1384async fn ensure_replay_history_is_available_for_pool(
1385 pool: &PgPool,
1386 current_max_sequence_number: u64,
1387) -> Result<(), EventStoreError> {
1388 let mut connection = pool
1389 .acquire()
1390 .await
1391 .map_err(PostgresStore::backend_failure)?;
1392 ensure_replay_history_is_available(connection.as_mut(), current_max_sequence_number).await
1393}
1394
1395async fn update_durable_stream_cursor(
1396 pool: &PgPool,
1397 durable_stream_id: &str,
1398 last_processed_sequence_number: u64,
1399) -> Result<(), EventStoreError> {
1400 sqlx::query(
1401 "UPDATE durable_stream_cursors
1402 SET last_processed_sequence_number = $2
1403 WHERE durable_stream_id = $1",
1404 )
1405 .bind(durable_stream_id)
1406 .bind(last_processed_sequence_number as i64)
1407 .execute(pool)
1408 .await
1409 .map_err(PostgresStore::backend_failure)?;
1410
1411 Ok(())
1412}
1413
1414async fn load_replay_batches(
1415 pool: &PgPool,
1416 event_query: &EventQuery,
1417 last_processed_sequence_number: u64,
1418 replay_until_sequence_number: u64,
1419) -> Result<Vec<ReplayBatch>, EventStoreError> {
1420 if replay_until_sequence_number <= last_processed_sequence_number {
1421 return Ok(Vec::new());
1422 }
1423
1424 let batch_rows = sqlx::query(
1425 "SELECT first_sequence_number, last_sequence_number
1426 FROM append_batches
1427 WHERE last_sequence_number > $1
1428 AND first_sequence_number <= $2
1429 ORDER BY first_sequence_number ASC",
1430 )
1431 .bind(last_processed_sequence_number as i64)
1432 .bind(replay_until_sequence_number as i64)
1433 .fetch_all(pool)
1434 .await
1435 .map_err(PostgresStore::backend_failure)?;
1436
1437 let append_batch_boundaries = batch_rows
1438 .into_iter()
1439 .map(|batch_row| AppendBatchBoundary {
1440 first_sequence_number: batch_row.get::<i64, _>("first_sequence_number") as u64,
1441 last_sequence_number: batch_row.get::<i64, _>("last_sequence_number") as u64,
1442 })
1443 .collect::<Vec<_>>();
1444 let event_records = sqlx::query(
1445 "SELECT sequence_number, occurred_at, event_type, payload
1446 FROM events
1447 WHERE sequence_number > $1
1448 AND sequence_number <= $2
1449 ORDER BY sequence_number ASC",
1450 )
1451 .bind(last_processed_sequence_number as i64)
1452 .bind(replay_until_sequence_number as i64)
1453 .fetch_all(pool)
1454 .await
1455 .map_err(PostgresStore::backend_failure)?
1456 .into_iter()
1457 .map(event_record_from_row)
1458 .collect::<Result<Vec<_>, _>>()
1459 .map_err(PostgresStore::backend_failure)?;
1460
1461 Ok(replay_batches_from_records(
1462 event_query,
1463 event_records,
1464 &append_batch_boundaries,
1465 ))
1466}
1467
1468fn replay_batches_from_records(
1469 event_query: &EventQuery,
1470 event_records: Vec<EventRecord>,
1471 append_batch_boundaries: &[AppendBatchBoundary],
1472) -> Vec<ReplayBatch> {
1473 let mut replay_batches = Vec::new();
1474 let mut event_index = 0;
1475 let mut boundary_index = 0;
1476
1477 while event_index < event_records.len() {
1478 let next_sequence_number = event_records[event_index].sequence_number;
1479
1480 while boundary_index < append_batch_boundaries.len()
1481 && append_batch_boundaries[boundary_index].last_sequence_number < next_sequence_number
1482 {
1483 boundary_index += 1;
1484 }
1485
1486 if boundary_index < append_batch_boundaries.len() {
1487 let boundary = &append_batch_boundaries[boundary_index];
1488 if boundary.first_sequence_number <= next_sequence_number
1489 && next_sequence_number <= boundary.last_sequence_number
1490 {
1491 let batch_start = event_index;
1492 while event_index < event_records.len()
1493 && event_records[event_index].sequence_number <= boundary.last_sequence_number
1494 {
1495 event_index += 1;
1496 }
1497
1498 let delivered_batch = event_records[batch_start..event_index]
1499 .iter()
1500 .filter(|event_record| matches_query(event_query, event_record))
1501 .cloned()
1502 .collect();
1503
1504 replay_batches.push(ReplayBatch {
1505 last_processed_sequence_number: boundary.last_sequence_number,
1506 delivered_batch,
1507 });
1508 continue;
1509 }
1510 }
1511
1512 let event_record = event_records[event_index].clone();
1513 event_index += 1;
1514
1515 let delivered_batch = if matches_query(event_query, &event_record) {
1516 vec![event_record.clone()]
1517 } else {
1518 Vec::new()
1519 };
1520
1521 replay_batches.push(ReplayBatch {
1522 last_processed_sequence_number: event_record.sequence_number,
1523 delivered_batch,
1524 });
1525 }
1526
1527 replay_batches
1528}
1529
1530fn normalized_durable_event_query(event_query: &EventQuery) -> EventQuery {
1531 EventQuery {
1532 filters: event_query.filters.clone(),
1533 min_sequence_number: None,
1534 }
1535}
1536
1537fn serialize_event_query(event_query: &EventQuery) -> Result<String, EventStoreError> {
1538 let filters = event_query.filters.as_ref().map(|filters| {
1539 filters
1540 .iter()
1541 .map(|filter| {
1542 serde_json::json!({
1543 "event_types": filter.event_types,
1544 "payload_predicates": filter.payload_predicates,
1545 })
1546 })
1547 .collect::<Vec<_>>()
1548 });
1549
1550 serde_json::to_string(&serde_json::json!({
1551 "filters": filters,
1552 "min_sequence_number": event_query.min_sequence_number,
1553 }))
1554 .map_err(json_backend_failure)
1555}