1use std::{
8 sync::{
9 Arc,
10 atomic::{AtomicBool, Ordering},
11 },
12 time::Duration,
13};
14
15use ::postgres::{Client, Config as PgClientConfig, NoTls};
16use log::{error, info, trace, warn};
17use myko::{
18 event::{MEvent, MEventType},
19 server::{HandlerRegistry, PersistError, PersistHealth, Persister},
20 store::StoreRegistry,
21};
22use postgres::fallible_iterator::FallibleIterator;
23use uuid::Uuid;
24
25const PG_CONNECT_TIMEOUT_SECS: u64 = 10;
26const PG_KEEPALIVE_IDLE_SECS: u64 = 30;
27const PG_KEEPALIVE_INTERVAL_SECS: u64 = 10;
28const PG_KEEPALIVE_RETRIES: u32 = 3;
29const PG_PRODUCER_MAX_BATCH: usize = 256;
30
31#[derive(Debug, Clone)]
33pub struct PostgresConfig {
34 pub url: String,
36 pub table: String,
38 pub channel: String,
40}
41
42#[derive(Debug, Clone)]
44pub struct PersistedEvent {
45 pub id: i64,
46 pub created_at: String,
47 pub event: MEvent,
48}
49
50impl PostgresConfig {
51 pub fn from_env() -> Option<Self> {
57 let url = std::env::var("MYKO_POSTGRES_URL").ok()?;
58 let table = std::env::var("MYKO_POSTGRES_TABLE").unwrap_or_else(|_| "myko_events".into());
59 let channel =
60 std::env::var("MYKO_POSTGRES_CHANNEL").unwrap_or_else(|_| "myko_events_notify".into());
61 Some(Self {
62 url,
63 table,
64 channel,
65 })
66 }
67}
68
69pub struct PostgresHistoryStore {
71 config: PostgresConfig,
72}
73
74impl PostgresHistoryStore {
75 pub fn new(config: PostgresConfig) -> Result<Self, String> {
77 validate_ident(&config.table)?;
78 validate_ident(&config.channel)?;
79 Ok(Self { config })
80 }
81
82 pub fn load_after_id(&self, after_id: i64, limit: i64) -> Result<Vec<PersistedEvent>, String> {
84 let mut client = connect_pg_client(&self.config, "history(load_after_id)")?;
85 let sql = format!(
86 "SELECT id, created_at::text, event::text FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
87 qi(&self.config.table)
88 );
89 let rows = client
90 .query(&sql, &[&after_id, &limit])
91 .map_err(|e| format!("history query failed: {e}"))?;
92 rows.into_iter()
93 .map(row_to_persisted_event)
94 .collect::<Result<Vec<_>, _>>()
95 }
96
97 pub fn load_until(
99 &self,
100 after_id: i64,
101 until: &str,
102 limit: i64,
103 ) -> Result<Vec<PersistedEvent>, String> {
104 let mut client = connect_pg_client(&self.config, "history(load_until)")?;
105 if !until
107 .chars()
108 .all(|c| c.is_ascii_alphanumeric() || "-.:+TZ ".contains(c))
109 {
110 return Err(format!("Invalid timestamp format: {}", until));
111 }
112 let sql = format!(
113 "SELECT id, created_at::text, event::text FROM {} WHERE id > {} AND created_at <= '{}'::timestamptz ORDER BY id ASC LIMIT {}",
114 qi(&self.config.table),
115 after_id,
116 until,
117 limit
118 );
119 let rows = client
120 .query(&sql, &[])
121 .map_err(|e| format!("history query failed: {e}"))?;
122 rows.into_iter()
123 .map(row_to_persisted_event)
124 .collect::<Result<Vec<_>, _>>()
125 }
126
127 pub fn load_between(
129 &self,
130 from_iso: &str,
131 to_iso: &str,
132 limit: i64,
133 ) -> Result<Vec<PersistedEvent>, String> {
134 let mut client = connect_pg_client(&self.config, "history(load_between)")?;
135 let sql = format!(
136 "SELECT id, created_at::text, event::text FROM {} WHERE created_at >= $1::timestamptz AND created_at <= $2::timestamptz ORDER BY id ASC LIMIT $3",
137 qi(&self.config.table)
138 );
139 let rows = client
140 .query(&sql, &[&from_iso, &to_iso, &limit])
141 .map_err(|e| format!("history query failed: {e}"))?;
142 rows.into_iter()
143 .map(row_to_persisted_event)
144 .collect::<Result<Vec<_>, _>>()
145 }
146}
147
148pub struct PostgresHistoryReplayProvider {
150 config: PostgresConfig,
151}
152
153impl PostgresHistoryReplayProvider {
154 pub fn new(config: PostgresConfig) -> Self {
155 Self { config }
156 }
157}
158
159impl myko::server::HistoryReplayProvider for PostgresHistoryReplayProvider {
160 fn replay_to_store(
161 &self,
162 until: &str,
163 handler_registry: &HandlerRegistry,
164 ) -> Result<Arc<StoreRegistry>, String> {
165 eprintln!(
166 "[HistoryReplay] loading snapshot as of {} from {}",
167 until, self.config.url
168 );
169
170 if !until
172 .chars()
173 .all(|c| c.is_ascii_alphanumeric() || "-.:+TZ ".contains(c))
174 {
175 return Err(format!("Invalid timestamp format: {}", until));
176 }
177
178 let mut client = connect_pg_client(&self.config, "history_replay")?;
179 let table = qi(&self.config.table);
180 let registry = StoreRegistry::new();
181
182 let sql = format!(
186 "
187 WITH latest AS (
188 SELECT DISTINCT ON (item_type, item_id)
189 id, change_type
190 FROM {table}
191 WHERE created_at <= '{until}'::timestamptz
192 ORDER BY item_type, item_id, id DESC
193 )
194 SELECT e.event::text
195 FROM latest
196 JOIN {table} e ON e.id = latest.id
197 WHERE latest.change_type = 'SET'
198 ORDER BY e.id ASC
199 "
200 );
201
202 let rows = client
203 .query(&sql, &[])
204 .map_err(|e| format!("history snapshot query failed: {e}"))?;
205
206 let mut count = 0usize;
207 for row in &rows {
208 let event_json: String = row.get(0);
209 match MEvent::from_str_trim(&event_json) {
210 Ok(event) => {
211 if let Some(parse) = handler_registry.get_item_parser(&event.item_type)
212 && let Ok(item) = parse(event.item.clone())
213 {
214 let store = registry.get_or_create(&event.item_type);
215 store.insert(item.id(), item);
216 count += 1;
217 }
218 }
219 Err(err) => {
220 eprintln!("[HistoryReplay] invalid event row: {}", err);
221 }
222 }
223 }
224
225 eprintln!(
226 "[HistoryReplay] loaded {} entities from {} rows as of {}",
227 count,
228 rows.len(),
229 until
230 );
231
232 Ok(Arc::new(registry))
233 }
234}
235
236type ProducerRequest = MEvent;
237
238#[derive(Clone)]
240pub struct PostgresProducerHandle {
241 sender: flume::Sender<ProducerRequest>,
242 host_id: Uuid,
243 config: PostgresConfig,
244 health: Arc<PersistHealth>,
245}
246
247impl PostgresProducerHandle {
248 pub fn produce(&self, mut event: MEvent) -> Result<(), PersistError> {
251 if event.source_id.is_none() {
252 event.source_id = Some(self.host_id.to_string());
253 }
254 let entity_type = event.item_type.clone();
255
256 match self.sender.send(event) {
257 Ok(()) => {
258 self.health.record_enqueue();
259 Ok(())
260 }
261 Err(_) => {
262 let msg = "Postgres producer thread not running".to_string();
263 self.health.record_dropped(msg.clone());
264 Err(PersistError {
265 entity_type,
266 message: msg,
267 })
268 }
269 }
270 }
271}
272
273impl Persister for PostgresProducerHandle {
274 fn persist(&self, event: MEvent) -> Result<(), PersistError> {
275 self.produce(event)
276 }
277
278 fn health(&self) -> Arc<PersistHealth> {
279 self.health.clone()
280 }
281
282 fn startup_healthcheck(&self) -> Result<(), String> {
283 validate_ident(&self.config.table)?;
284 validate_ident(&self.config.channel)?;
285 Ok(())
286 }
287}
288
289pub struct CellPostgresProducer {
291 handle: PostgresProducerHandle,
292}
293
294impl CellPostgresProducer {
295 pub fn new(config: &PostgresConfig, host_id: Uuid) -> Result<Self, String> {
297 validate_ident(&config.table)?;
298 validate_ident(&config.channel)?;
299
300 let health = Arc::new(PersistHealth::default());
301 let (tx, rx) = flume::unbounded::<ProducerRequest>();
302 let cfg = config.clone();
303 let thread_health = health.clone();
304 std::thread::spawn(move || run_producer_loop(cfg, rx, thread_health));
305
306 Ok(Self {
307 handle: PostgresProducerHandle {
308 sender: tx,
309 host_id,
310 config: config.clone(),
311 health,
312 },
313 })
314 }
315
316 pub fn handle(&self) -> PostgresProducerHandle {
318 self.handle.clone()
319 }
320}
321
322fn run_producer_loop(
323 config: PostgresConfig,
324 rx: flume::Receiver<ProducerRequest>,
325 health: Arc<PersistHealth>,
326) {
327 let mut client: Option<Client> = None;
328 let mut retry_batch: Vec<MEvent> = Vec::new();
329
330 loop {
331 let mut batch: Vec<MEvent> = Vec::new();
333 if !retry_batch.is_empty() {
334 std::mem::swap(&mut batch, &mut retry_batch);
335 } else {
336 match rx.recv() {
337 Ok(ev) => batch.push(ev),
338 Err(_) => break, }
340 }
341 while batch.len() < PG_PRODUCER_MAX_BATCH {
343 match rx.try_recv() {
344 Ok(ev) => batch.push(ev),
345 Err(_) => break,
346 }
347 }
348
349 if client.is_none() {
350 client = connect_producer_client(&config);
351 }
352
353 let batch_len = batch.len();
354 let pending = rx.len();
355 if pending > 1000 {
356 trace!(
357 "Postgres producer backlog: {} pending events in channel",
358 pending
359 );
360 }
361 if let Some(c) = client.as_mut() {
362 match insert_event_batch(c, &config, &batch) {
363 Ok(()) => {
364 health.record_success_batch(batch_len as u64);
365 }
366 Err(err) => {
367 let msg =
368 format_pg_error("insert_event_batch(producer)", Some(&config.url), &err);
369 error!("{}", msg);
370 health.record_error_no_dequeue(msg);
371 client = None;
372 retry_batch = batch;
373 std::thread::sleep(Duration::from_secs(1));
375 }
376 }
377 } else {
378 let msg = format!(
379 "Postgres producer connection failed (url: {})",
380 redact_pg_url(&config.url),
381 );
382 error!("{}", msg);
383 health.record_error_no_dequeue(msg);
384 retry_batch = batch;
385 std::thread::sleep(Duration::from_secs(1));
386 }
387 }
388}
389
390fn connect_producer_client(config: &PostgresConfig) -> Option<Client> {
391 match connect_pg_client(config, "producer") {
392 Ok(mut c) => match ensure_schema(&mut c, config) {
393 Ok(()) => Some(c),
394 Err(err) => {
395 error!(
396 "{}",
397 format_pg_error("ensure_schema(producer)", Some(&config.url), &err)
398 );
399 None
400 }
401 },
402 Err(err) => {
403 error!("{err}");
404 None
405 }
406 }
407}
408
409fn insert_event_batch(
410 client: &mut Client,
411 config: &PostgresConfig,
412 events: &[MEvent],
413) -> Result<(), ::postgres::Error> {
414 if events.is_empty() {
415 return Ok(());
416 }
417
418 let table = qi(&config.table);
419
420 if events.len() == 1 {
422 let event = &events[0];
423 let sql = format!(
424 "INSERT INTO {table} (item_type, item_id, change_type, created_at, tx, source_id, event) VALUES ($1, $2, $3, ($4::text)::timestamptz, $5, ($6::text), ($7::text)::jsonb)"
425 );
426 let item_id = event
427 .item
428 .get("id")
429 .and_then(|v| v.as_str())
430 .unwrap_or("unknown")
431 .to_string();
432 let event_json = serde_json::to_string(event).unwrap_or_else(|_| "{}".to_string());
433 let change_type = match event.change_type {
434 MEventType::SET => "SET",
435 MEventType::DEL => "DEL",
436 };
437 client.execute(
438 &sql,
439 &[
440 &event.item_type,
441 &item_id,
442 &change_type,
443 &event.created_at,
444 &event.tx,
445 &event.source_id,
446 &event_json,
447 ],
448 )?;
449 return Ok(());
450 }
451
452 let mut sql = format!(
454 "INSERT INTO {table} (item_type, item_id, change_type, created_at, tx, source_id, event) VALUES "
455 );
456 let mut params: Vec<Box<dyn postgres::types::ToSql + Sync>> =
457 Vec::with_capacity(events.len() * 7);
458 for (i, event) in events.iter().enumerate() {
459 if i > 0 {
460 sql.push_str(", ");
461 }
462 let base = i * 7;
463 sql.push_str(&format!(
464 "(${}, ${}, ${}, (${}::text)::timestamptz, ${}, (${}::text), (${}::text)::jsonb)",
465 base + 1,
466 base + 2,
467 base + 3,
468 base + 4,
469 base + 5,
470 base + 6,
471 base + 7
472 ));
473 let item_id = event
474 .item
475 .get("id")
476 .and_then(|v| v.as_str())
477 .unwrap_or("unknown")
478 .to_string();
479 let event_json = serde_json::to_string(event).unwrap_or_else(|_| "{}".to_string());
480 let change_type = match event.change_type {
481 MEventType::SET => "SET",
482 MEventType::DEL => "DEL",
483 };
484 params.push(Box::new(event.item_type.clone()));
485 params.push(Box::new(item_id));
486 params.push(Box::new(change_type.to_string()));
487 params.push(Box::new(event.created_at.clone()));
488 params.push(Box::new(event.tx.clone()));
489 params.push(Box::new(event.source_id.clone()));
490 params.push(Box::new(event_json));
491 }
492
493 let param_refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
494 params.iter().map(|p| p.as_ref()).collect();
495
496 let mut txn = client.transaction()?;
497 txn.execute(&sql, ¶m_refs)?;
498 txn.commit()?;
499
500 Ok(())
501}
502
503#[derive(Debug)]
505pub struct CatchUpStatus {
506 caught_up: AtomicBool,
507 failed: AtomicBool,
508 failure_reason: std::sync::RwLock<Option<String>>,
509}
510
511impl CatchUpStatus {
512 fn new() -> Self {
513 Self {
514 caught_up: AtomicBool::new(false),
515 failed: AtomicBool::new(false),
516 failure_reason: std::sync::RwLock::new(None),
517 }
518 }
519
520 pub fn is_caught_up(&self) -> bool {
522 self.caught_up.load(Ordering::SeqCst)
523 }
524
525 pub fn is_failed(&self) -> bool {
527 self.failed.load(Ordering::SeqCst)
528 }
529
530 fn fail(&self, reason: impl Into<String>) {
531 let reason = reason.into();
532 *self.failure_reason.write().unwrap() = Some(reason.clone());
533 self.failed.store(true, Ordering::SeqCst);
534 self.caught_up.store(false, Ordering::SeqCst);
535 }
536
537 pub fn wait_until_caught_up(&self, timeout: Duration) -> Result<(), String> {
539 let start = std::time::Instant::now();
540 while !self.is_caught_up() {
541 if self.is_failed() {
542 return Err(self
543 .failure_reason
544 .read()
545 .unwrap()
546 .clone()
547 .unwrap_or_else(|| "Postgres catch-up failed".to_string()));
548 }
549 if start.elapsed() >= timeout {
550 return Err(format!(
551 "Postgres catch-up timed out after {}s",
552 timeout.as_secs()
553 ));
554 }
555 std::thread::sleep(Duration::from_millis(50));
556 }
557 Ok(())
558 }
559}
560
561pub struct CellPostgresConsumer {
563 catch_up_status: Arc<CatchUpStatus>,
564 _handle: std::thread::JoinHandle<()>,
565}
566
567impl CellPostgresConsumer {
568 pub fn start(
570 config: &PostgresConfig,
571 host_id: Uuid,
572 handler_registry: Arc<HandlerRegistry>,
573 registry: Arc<StoreRegistry>,
574 ) -> Result<Self, String> {
575 validate_ident(&config.table)?;
576 validate_ident(&config.channel)?;
577
578 let catch_up_status = Arc::new(CatchUpStatus::new());
579 let status = catch_up_status.clone();
580 let cfg = config.clone();
581 let host_id_string = host_id.to_string();
582
583 let handle = std::thread::spawn(move || {
584 if let Err(err) = run_consumer_loop(
585 &cfg,
586 &host_id_string,
587 handler_registry,
588 registry,
589 status.clone(),
590 ) {
591 let reason = format!("Postgres consumer failed: {err}");
592 error!("{reason}");
593 status.fail(reason);
594 }
595 });
596
597 Ok(Self {
598 catch_up_status,
599 _handle: handle,
600 })
601 }
602
603 pub fn is_caught_up(&self) -> bool {
605 self.catch_up_status.is_caught_up()
606 }
607
608 pub fn wait_until_caught_up(&self, timeout: Duration) -> Result<(), String> {
610 self.catch_up_status.wait_until_caught_up(timeout)
611 }
612}
613
614fn run_consumer_loop(
615 config: &PostgresConfig,
616 host_id: &str,
617 handler_registry: Arc<HandlerRegistry>,
618 registry: Arc<StoreRegistry>,
619 status: Arc<CatchUpStatus>,
620) -> Result<(), String> {
621 let mut reader = connect_consumer_client("reader", config)?;
622 let mut listener = connect_listener_client(config)?;
623
624 info!(
625 "CellPostgresConsumer started (table={}, channel={})",
626 config.table, config.channel
627 );
628
629 let table = qi(&config.table);
630 let high_water_sql = format!("SELECT COALESCE(MAX(id), 0) FROM {table}");
631 let high_water_row = reader
632 .query_one(&high_water_sql, &[])
633 .map_err(|e| format_pg_error("query(high_water)", Some(&config.url), &e))?;
634 let high_water: i64 = high_water_row.get(0);
635 let snapshot_sql = format!(
636 "
637 WITH latest AS (
638 SELECT DISTINCT ON (item_type, item_id)
639 id, change_type
640 FROM {table}
641 WHERE id <= $1
642 ORDER BY item_type, item_id, id DESC
643 )
644 SELECT e.id, e.event::text
645 FROM latest
646 JOIN {table} e ON e.id = latest.id
647 ORDER BY e.id ASC
648 "
649 );
650 let snapshot_rows = reader
651 .query(&snapshot_sql, &[&high_water])
652 .map_err(|e| format_pg_error("query(snapshot latest events)", Some(&config.url), &e))?;
653 let snapshot_count = snapshot_rows.len();
654 for row in snapshot_rows {
655 let id: i64 = row.get(0);
656 let event_json: String = row.get(1);
657 match MEvent::from_str_trim(&event_json) {
658 Ok(event) => {
659 apply_remote_event(event, host_id, &handler_registry, ®istry);
660 }
661 Err(err) => {
662 error!("Invalid postgres snapshot row id={id}: {err}");
663 }
664 }
665 }
666 info!(
667 "Postgres snapshot loaded latest state rows={} high_water={}",
668 snapshot_count, high_water
669 );
670
671 let fetch_sql =
672 format!("SELECT id, event::text FROM {table} WHERE id > $1 ORDER BY id ASC LIMIT $2");
673 let mut last_seen_id: i64 = high_water;
674 let mut initial_done = false;
675
676 loop {
677 let rows = match reader.query(&fetch_sql, &[&last_seen_id, &1000_i64]) {
678 Ok(rows) => rows,
679 Err(err) => {
680 warn!(
681 "{}",
682 format_pg_error("query(fetch events)", Some(&config.url), &err)
683 );
684 std::thread::sleep(Duration::from_millis(500));
685 reader = connect_consumer_client("reader", config)?;
686 continue;
687 }
688 };
689 if !rows.is_empty() {
690 for row in rows {
691 let id: i64 = row.get(0);
692 let event_json: String = row.get(1);
693 last_seen_id = id;
694
695 match MEvent::from_str_trim(&event_json) {
696 Ok(event) => {
697 apply_remote_event(event, host_id, &handler_registry, ®istry);
698 }
699 Err(err) => {
700 error!("Invalid postgres event row id={id}: {err}");
701 }
702 }
703 }
704 continue;
705 }
706
707 if !initial_done {
708 initial_done = true;
709 status.caught_up.store(true, Ordering::SeqCst);
710 info!("Postgres consumer caught up at event_id={last_seen_id}");
711 }
712
713 let mut notified = false;
714 let mut reconnect_listener = false;
715 {
716 let mut notifications = listener.notifications();
717 let mut iter = notifications.timeout_iter(Duration::from_millis(500));
718 match iter.next() {
719 Ok(Some(_n)) => {
720 notified = true;
721 }
722 Ok(None) => {}
723 Err(err) => {
724 warn!("Postgres LISTEN error: {err}; reconnecting listener");
725 reconnect_listener = true;
726 }
727 }
728 }
729 if reconnect_listener {
730 std::thread::sleep(Duration::from_millis(500));
731 listener = connect_listener_client(config)?;
732 continue;
734 }
735
736 if !notified {
737 trace!("Postgres consumer poll tick (no notification)");
738 }
739 }
740}
741
742fn connect_consumer_client(role: &str, config: &PostgresConfig) -> Result<Client, String> {
743 let mut backoff_ms = 250u64;
744 loop {
745 match connect_pg_client(config, role) {
746 Ok(mut client) => {
747 if let Err(err) = ensure_schema(&mut client, config) {
748 warn!(
749 "{}",
750 format_pg_error(&format!("ensure_schema({role})"), Some(&config.url), &err)
751 );
752 }
753 return Ok(client);
754 }
755 Err(err) => {
756 warn!("{err}");
757 std::thread::sleep(Duration::from_millis(backoff_ms));
758 backoff_ms = (backoff_ms * 2).min(5_000);
759 }
760 }
761 }
762}
763
764fn connect_listener_client(config: &PostgresConfig) -> Result<Client, String> {
765 let mut backoff_ms = 250u64;
766 loop {
767 let mut client = connect_consumer_client("listener", config)?;
768 match client.batch_execute(&format!("LISTEN {};", qi(&config.channel))) {
769 Ok(()) => return Ok(client),
770 Err(err) => {
771 warn!(
772 "{}",
773 format_pg_error("LISTEN(register)", Some(&config.url), &err)
774 );
775 std::thread::sleep(Duration::from_millis(backoff_ms));
776 backoff_ms = (backoff_ms * 2).min(5_000);
777 }
778 }
779 }
780}
781
782fn apply_remote_event(
783 event: MEvent,
784 host_id: &str,
785 handler_registry: &Arc<HandlerRegistry>,
786 registry: &Arc<StoreRegistry>,
787) {
788 let is_my_event = event.source_id.as_ref().is_some_and(|id| id == host_id);
789 if is_my_event {
790 return;
791 }
792
793 match event.change_type {
794 MEventType::SET => {
795 if let Some(parse) = handler_registry.get_item_parser(&event.item_type) {
796 match parse(event.item.clone()) {
797 Ok(item) => {
798 let store = registry.get_or_create(item.entity_type());
799 store.insert(item.id(), item);
800 }
801 Err(e) => {
802 let msg = e.to_string();
803 let short = msg
804 .find(", expected one of")
805 .map(|pos| msg[..pos].to_string())
806 .unwrap_or(msg);
807 error!("Failed to parse {}: {short}", event.item_type);
808 }
809 }
810 } else {
811 warn!("No parser for entity type: {}", event.item_type);
812 }
813 }
814 MEventType::DEL => {
815 if let Some(id) = event.item.get("id").and_then(|v| v.as_str()) {
816 let store = registry.get_or_create(&event.item_type);
817 store.remove(&id.into());
818 } else {
819 error!("DEL event missing id field: {:?}", event.item);
820 }
821 }
822 }
823}
824
825fn ensure_schema(client: &mut Client, config: &PostgresConfig) -> Result<(), ::postgres::Error> {
826 let table = qi(&config.table);
827 let idx_tx = qi(&format!("{}_tx_idx", config.table));
828 let idx_item = qi(&format!("{}_item_type_item_id_idx", config.table));
829 let idx_item_latest = qi(&format!("{}_item_latest_idx", config.table));
830 let idx_created = qi(&format!("{}_created_at_idx", config.table));
831 let trigger_fn = qi(&format!("{}_notify_insert_fn", config.table));
832 let trigger_name = qi(&format!("{}_notify_insert_trigger", config.table));
833
834 client.batch_execute(&format!(
835 "
836 CREATE TABLE IF NOT EXISTS {table} (
837 id BIGSERIAL PRIMARY KEY,
838 item_type TEXT NOT NULL,
839 item_id TEXT NOT NULL,
840 change_type TEXT NOT NULL,
841 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
842 tx TEXT NOT NULL,
843 source_id TEXT,
844 event JSONB NOT NULL
845 );
846 CREATE INDEX IF NOT EXISTS {idx_tx} ON {table} (tx);
847 CREATE INDEX IF NOT EXISTS {idx_item} ON {table} (item_type, item_id);
848 CREATE INDEX IF NOT EXISTS {idx_item_latest} ON {table} (item_type, item_id, id DESC);
849 CREATE INDEX IF NOT EXISTS {idx_created} ON {table} (created_at);
850 CREATE OR REPLACE FUNCTION {trigger_fn}() RETURNS trigger AS $$
851 BEGIN
852 PERFORM pg_notify('{channel}', NEW.id::text);
853 RETURN NEW;
854 END;
855 $$ LANGUAGE plpgsql;
856 DROP TRIGGER IF EXISTS {trigger_name} ON {table};
857 CREATE TRIGGER {trigger_name}
858 AFTER INSERT ON {table}
859 FOR EACH ROW
860 EXECUTE FUNCTION {trigger_fn}();
861 ",
862 channel = config.channel
863 ))?;
864
865 Ok(())
866}
867
868fn validate_ident(name: &str) -> Result<(), String> {
869 if name.is_empty() {
870 return Err("identifier cannot be empty".to_string());
871 }
872 let mut chars = name.chars();
873 let first = chars
874 .next()
875 .ok_or_else(|| "identifier cannot be empty".to_string())?;
876 if !(first == '_' || first.is_ascii_alphabetic()) {
877 return Err(format!(
878 "invalid identifier `{name}`: must start with letter or underscore"
879 ));
880 }
881 if !chars.all(|c| c == '_' || c.is_ascii_alphanumeric()) {
882 return Err(format!(
883 "invalid identifier `{name}`: only letters, numbers, underscore are allowed"
884 ));
885 }
886 Ok(())
887}
888
889fn qi(name: &str) -> String {
890 format!("\"{name}\"")
891}
892
893fn row_to_persisted_event(row: ::postgres::Row) -> Result<PersistedEvent, String> {
894 let id: i64 = row.get(0);
895 let created_at: String = row.get(1);
896 let event_json: String = row.get(2);
897 let event = MEvent::from_str_trim(&event_json)
898 .map_err(|e| format!("invalid history event payload for id={id}: {e}"))?;
899 Ok(PersistedEvent {
900 id,
901 created_at,
902 event,
903 })
904}
905
906fn redact_pg_url(url: &str) -> String {
907 match url.rfind('@') {
908 Some(at) => {
909 let after_scheme = url.find("://").map(|idx| idx + 3).unwrap_or(0);
910 format!("{}***{}", &url[..after_scheme], &url[at..])
911 }
912 None => url.to_string(),
913 }
914}
915
916fn format_pg_connect_error(role: &str, url: &str, err: &postgres::Error) -> String {
917 format_pg_error(role, Some(url), err)
918}
919
920fn format_pg_error(role: &str, url: Option<&str>, err: &postgres::Error) -> String {
921 let mut msg = match url {
922 Some(url) => format!("{role} failed (dsn={}): {}", redact_pg_url(url), err),
923 None => format!("{role} failed: {err}"),
924 };
925 if let Some(db) = err.as_db_error() {
926 msg.push_str(&format!(
927 " [code={} severity={} message={}]",
928 db.code().code(),
929 db.severity(),
930 db.message()
931 ));
932 if let Some(detail) = db.detail() {
933 msg.push_str(&format!(" [detail={}]", detail));
934 }
935 if let Some(hint) = db.hint() {
936 msg.push_str(&format!(" [hint={}]", hint));
937 }
938 }
939 msg
940}
941
942fn connect_pg_client(config: &PostgresConfig, role: &str) -> Result<Client, String> {
943 let mut client_config = parse_pg_client_config(config, role)?;
944
945 client_config.connect_timeout(Duration::from_secs(PG_CONNECT_TIMEOUT_SECS));
947 client_config.keepalives(true);
948 client_config.keepalives_idle(Duration::from_secs(PG_KEEPALIVE_IDLE_SECS));
949 client_config.keepalives_interval(Duration::from_secs(PG_KEEPALIVE_INTERVAL_SECS));
950 client_config.keepalives_retries(PG_KEEPALIVE_RETRIES);
951
952 client_config
953 .connect(NoTls)
954 .map_err(|err| format_pg_connect_error(role, &config.url, &err))
955}
956
957fn parse_pg_client_config(config: &PostgresConfig, role: &str) -> Result<PgClientConfig, String> {
958 config.url.parse::<PgClientConfig>().map_err(|err| {
959 format!(
960 "postgres config parse failed ({role}, dsn={}): {err}",
961 redact_pg_url(&config.url)
962 )
963 })
964}