1use std::{
8 sync::{
9 Arc,
10 atomic::{AtomicBool, Ordering},
11 },
12 time::Duration,
13};
14
15use ::postgres::{Client, Config as PgClientConfig, NoTls};
16use log::{debug, error, info, trace, warn};
17use myko::{
18 event::{MEvent, MEventType},
19 server::{HandlerRegistry, PersistError, PersistHealth, Persister},
20 store::StoreRegistry,
21};
22use postgres::fallible_iterator::FallibleIterator;
23use uuid::Uuid;
24
25const PG_CONNECT_TIMEOUT_SECS: u64 = 10;
26const PG_KEEPALIVE_IDLE_SECS: u64 = 30;
27const PG_KEEPALIVE_INTERVAL_SECS: u64 = 10;
28const PG_KEEPALIVE_RETRIES: u32 = 3;
29const PG_PRODUCER_MAX_BATCH: usize = 256;
30
31#[derive(Debug, Clone)]
33pub struct PostgresConfig {
34 pub url: String,
36 pub table: String,
38 pub channel: String,
40}
41
42#[derive(Debug, Clone)]
44pub struct PersistedEvent {
45 pub id: i64,
46 pub created_at: String,
47 pub event: MEvent,
48}
49
50impl PostgresConfig {
51 pub fn from_env() -> Option<Self> {
57 let url = std::env::var("MYKO_POSTGRES_URL").ok()?;
58 let table = std::env::var("MYKO_POSTGRES_TABLE").unwrap_or_else(|_| "myko_events".into());
59 let channel =
60 std::env::var("MYKO_POSTGRES_CHANNEL").unwrap_or_else(|_| "myko_events_notify".into());
61 Some(Self {
62 url,
63 table,
64 channel,
65 })
66 }
67}
68
69pub struct PostgresHistoryStore {
71 config: PostgresConfig,
72}
73
74impl PostgresHistoryStore {
75 pub fn new(config: PostgresConfig) -> Result<Self, String> {
77 validate_ident(&config.table)?;
78 validate_ident(&config.channel)?;
79 Ok(Self { config })
80 }
81
82 pub fn load_after_id(&self, after_id: i64, limit: i64) -> Result<Vec<PersistedEvent>, String> {
84 let mut client = connect_pg_client(&self.config, "history(load_after_id)")?;
85 let sql = format!(
86 "SELECT id, created_at::text, event::text FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
87 qi(&self.config.table)
88 );
89 let rows = client
90 .query(&sql, &[&after_id, &limit])
91 .map_err(|e| format!("history query failed: {e}"))?;
92 rows.into_iter()
93 .map(row_to_persisted_event)
94 .collect::<Result<Vec<_>, _>>()
95 }
96
97 pub fn load_until(
99 &self,
100 after_id: i64,
101 until: &str,
102 limit: i64,
103 ) -> Result<Vec<PersistedEvent>, String> {
104 let mut client = connect_pg_client(&self.config, "history(load_until)")?;
105 if !until
107 .chars()
108 .all(|c| c.is_ascii_alphanumeric() || "-.:+TZ ".contains(c))
109 {
110 return Err(format!("Invalid timestamp format: {}", until));
111 }
112 let sql = format!(
113 "SELECT id, created_at::text, event::text FROM {} WHERE id > {} AND created_at <= '{}'::timestamptz ORDER BY id ASC LIMIT {}",
114 qi(&self.config.table),
115 after_id,
116 until,
117 limit
118 );
119 let rows = client
120 .query(&sql, &[])
121 .map_err(|e| format!("history query failed: {e}"))?;
122 rows.into_iter()
123 .map(row_to_persisted_event)
124 .collect::<Result<Vec<_>, _>>()
125 }
126
127 pub fn load_between(
129 &self,
130 from_iso: &str,
131 to_iso: &str,
132 limit: i64,
133 ) -> Result<Vec<PersistedEvent>, String> {
134 let mut client = connect_pg_client(&self.config, "history(load_between)")?;
135 let sql = format!(
136 "SELECT id, created_at::text, event::text FROM {} WHERE created_at >= $1::timestamptz AND created_at <= $2::timestamptz ORDER BY id ASC LIMIT $3",
137 qi(&self.config.table)
138 );
139 let rows = client
140 .query(&sql, &[&from_iso, &to_iso, &limit])
141 .map_err(|e| format!("history query failed: {e}"))?;
142 rows.into_iter()
143 .map(row_to_persisted_event)
144 .collect::<Result<Vec<_>, _>>()
145 }
146}
147
148pub struct PostgresHistoryReplayProvider {
150 config: PostgresConfig,
151}
152
153impl PostgresHistoryReplayProvider {
154 pub fn new(config: PostgresConfig) -> Self {
155 Self { config }
156 }
157}
158
159impl myko::server::HistoryReplayProvider for PostgresHistoryReplayProvider {
160 fn replay_to_store(
161 &self,
162 until: &str,
163 handler_registry: &HandlerRegistry,
164 ) -> Result<Arc<StoreRegistry>, String> {
165 eprintln!(
166 "[HistoryReplay] loading snapshot as of {} from {}",
167 until, self.config.url
168 );
169
170 if !until
172 .chars()
173 .all(|c| c.is_ascii_alphanumeric() || "-.:+TZ ".contains(c))
174 {
175 return Err(format!("Invalid timestamp format: {}", until));
176 }
177
178 let mut client = connect_pg_client(&self.config, "history_replay")?;
179 let table = qi(&self.config.table);
180 let registry = StoreRegistry::new();
181
182 let sql = format!(
186 "
187 WITH latest AS (
188 SELECT DISTINCT ON (item_type, item_id)
189 id, change_type
190 FROM {table}
191 WHERE created_at <= '{until}'::timestamptz
192 ORDER BY item_type, item_id, id DESC
193 )
194 SELECT e.event::text
195 FROM latest
196 JOIN {table} e ON e.id = latest.id
197 WHERE latest.change_type = 'SET'
198 ORDER BY e.id ASC
199 "
200 );
201
202 let rows = client
203 .query(&sql, &[])
204 .map_err(|e| format!("history snapshot query failed: {e}"))?;
205
206 let mut count = 0usize;
207 for row in &rows {
208 let event_json: String = row.get(0);
209 match MEvent::from_str_trim(&event_json) {
210 Ok(event) => {
211 if let Some(parse) = handler_registry.get_item_parser(&event.item_type)
212 && let Ok(item) = parse(event.item.clone())
213 {
214 let store = registry.get_or_create(&event.item_type);
215 store.insert(item.id(), item);
216 count += 1;
217 }
218 }
219 Err(err) => {
220 eprintln!("[HistoryReplay] invalid event row: {}", err);
221 }
222 }
223 }
224
225 eprintln!(
226 "[HistoryReplay] loaded {} entities from {} rows as of {}",
227 count,
228 rows.len(),
229 until
230 );
231
232 Ok(Arc::new(registry))
233 }
234}
235
236type ProducerRequest = MEvent;
237
238#[derive(Clone)]
240pub struct PostgresProducerHandle {
241 sender: flume::Sender<ProducerRequest>,
242 host_id: Uuid,
243 config: PostgresConfig,
244 health: Arc<PersistHealth>,
245}
246
247impl PostgresProducerHandle {
248 pub fn produce(&self, mut event: MEvent) -> Result<(), PersistError> {
251 if event.source_id.is_none() {
252 event.source_id = Some(self.host_id.to_string());
253 }
254 let entity_type = event.item_type.clone();
255
256 match self.sender.send(event) {
257 Ok(()) => {
258 self.health.record_enqueue();
259 Ok(())
260 }
261 Err(_) => {
262 let msg = "Postgres producer thread not running".to_string();
263 self.health.record_dropped(msg.clone());
264 Err(PersistError {
265 entity_type,
266 message: msg,
267 })
268 }
269 }
270 }
271}
272
273impl Persister for PostgresProducerHandle {
274 fn persist(&self, event: MEvent) -> Result<(), PersistError> {
275 self.produce(event)
276 }
277
278 fn health(&self) -> Arc<PersistHealth> {
279 self.health.clone()
280 }
281
282 fn startup_healthcheck(&self) -> Result<(), String> {
283 validate_ident(&self.config.table)?;
284 validate_ident(&self.config.channel)?;
285 Ok(())
286 }
287}
288
289pub struct CellPostgresProducer {
291 handle: PostgresProducerHandle,
292}
293
294impl CellPostgresProducer {
295 pub fn new(config: &PostgresConfig, host_id: Uuid) -> Result<Self, String> {
297 validate_ident(&config.table)?;
298 validate_ident(&config.channel)?;
299
300 let health = Arc::new(PersistHealth::default());
301 let (tx, rx) = flume::unbounded::<ProducerRequest>();
302 let cfg = config.clone();
303 let thread_health = health.clone();
304 std::thread::spawn(move || run_producer_loop(cfg, rx, thread_health));
305
306 Ok(Self {
307 handle: PostgresProducerHandle {
308 sender: tx,
309 host_id,
310 config: config.clone(),
311 health,
312 },
313 })
314 }
315
316 pub fn handle(&self) -> PostgresProducerHandle {
318 self.handle.clone()
319 }
320}
321
322fn run_producer_loop(
323 config: PostgresConfig,
324 rx: flume::Receiver<ProducerRequest>,
325 health: Arc<PersistHealth>,
326) {
327 let mut client: Option<Client> = None;
328 let mut retry_batch: Vec<MEvent> = Vec::new();
329
330 loop {
331 let mut batch: Vec<MEvent> = Vec::new();
333 if !retry_batch.is_empty() {
334 std::mem::swap(&mut batch, &mut retry_batch);
335 } else {
336 match rx.recv() {
337 Ok(ev) => batch.push(ev),
338 Err(_) => break, }
340 }
341 while batch.len() < PG_PRODUCER_MAX_BATCH {
343 match rx.try_recv() {
344 Ok(ev) => batch.push(ev),
345 Err(_) => break,
346 }
347 }
348
349 if client.is_none() {
350 client = connect_producer_client(&config);
351 }
352
353 let batch_len = batch.len();
354 let pending = rx.len();
355 if pending > 1000 {
356 trace!(
357 "Postgres producer backlog: {} pending events in channel",
358 pending
359 );
360 }
361 if log::log_enabled!(log::Level::Debug) {
362 let mut counts: std::collections::BTreeMap<(&str, &str), usize> =
363 std::collections::BTreeMap::new();
364 for ev in &batch {
365 let kind = match ev.change_type {
366 MEventType::SET => "SET",
367 MEventType::DEL => "DEL",
368 };
369 *counts.entry((ev.item_type.as_str(), kind)).or_insert(0) += 1;
370 }
371 let summary: Vec<String> = counts
372 .iter()
373 .map(|((t, k), n)| format!("{}:{}={}", t, k, n))
374 .collect();
375 debug!(
376 "[pg-producer] batch_len={} pending_after={} kinds=[{}]",
377 batch_len,
378 pending,
379 summary.join(", ")
380 );
381 }
382 if let Some(c) = client.as_mut() {
383 match insert_event_batch(c, &config, &batch) {
384 Ok(()) => {
385 health.record_success_batch(batch_len as u64);
386 }
387 Err(err) => {
388 let msg =
389 format_pg_error("insert_event_batch(producer)", Some(&config.url), &err);
390 error!("{}", msg);
391 health.record_error_no_dequeue(msg);
392 client = None;
393 retry_batch = batch;
394 std::thread::sleep(Duration::from_secs(1));
396 }
397 }
398 } else {
399 let msg = format!(
400 "Postgres producer connection failed (url: {})",
401 redact_pg_url(&config.url),
402 );
403 error!("{}", msg);
404 health.record_error_no_dequeue(msg);
405 retry_batch = batch;
406 std::thread::sleep(Duration::from_secs(1));
407 }
408 }
409}
410
411fn connect_producer_client(config: &PostgresConfig) -> Option<Client> {
412 match connect_pg_client(config, "producer") {
413 Ok(mut c) => match ensure_schema(&mut c, config) {
414 Ok(()) => Some(c),
415 Err(err) => {
416 error!(
417 "{}",
418 format_pg_error("ensure_schema(producer)", Some(&config.url), &err)
419 );
420 None
421 }
422 },
423 Err(err) => {
424 error!("{err}");
425 None
426 }
427 }
428}
429
430fn insert_event_batch(
431 client: &mut Client,
432 config: &PostgresConfig,
433 events: &[MEvent],
434) -> Result<(), ::postgres::Error> {
435 if events.is_empty() {
436 return Ok(());
437 }
438
439 let table = qi(&config.table);
440
441 if events.len() == 1 {
443 let event = &events[0];
444 let sql = format!(
445 "INSERT INTO {table} (item_type, item_id, change_type, created_at, tx, source_id, event) VALUES ($1, $2, $3, ($4::text)::timestamptz, $5, ($6::text), ($7::text)::jsonb)"
446 );
447 let item_id = event
448 .item
449 .get("id")
450 .and_then(|v| v.as_str())
451 .unwrap_or("unknown")
452 .to_string();
453 let event_json = serde_json::to_string(event).unwrap_or_else(|_| "{}".to_string());
454 let change_type = match event.change_type {
455 MEventType::SET => "SET",
456 MEventType::DEL => "DEL",
457 };
458 client.execute(
459 &sql,
460 &[
461 &event.item_type,
462 &item_id,
463 &change_type,
464 &event.created_at,
465 &event.tx,
466 &event.source_id,
467 &event_json,
468 ],
469 )?;
470 return Ok(());
471 }
472
473 let mut sql = format!(
475 "INSERT INTO {table} (item_type, item_id, change_type, created_at, tx, source_id, event) VALUES "
476 );
477 let mut params: Vec<Box<dyn postgres::types::ToSql + Sync>> =
478 Vec::with_capacity(events.len() * 7);
479 for (i, event) in events.iter().enumerate() {
480 if i > 0 {
481 sql.push_str(", ");
482 }
483 let base = i * 7;
484 sql.push_str(&format!(
485 "(${}, ${}, ${}, (${}::text)::timestamptz, ${}, (${}::text), (${}::text)::jsonb)",
486 base + 1,
487 base + 2,
488 base + 3,
489 base + 4,
490 base + 5,
491 base + 6,
492 base + 7
493 ));
494 let item_id = event
495 .item
496 .get("id")
497 .and_then(|v| v.as_str())
498 .unwrap_or("unknown")
499 .to_string();
500 let event_json = serde_json::to_string(event).unwrap_or_else(|_| "{}".to_string());
501 let change_type = match event.change_type {
502 MEventType::SET => "SET",
503 MEventType::DEL => "DEL",
504 };
505 params.push(Box::new(event.item_type.clone()));
506 params.push(Box::new(item_id));
507 params.push(Box::new(change_type.to_string()));
508 params.push(Box::new(event.created_at.clone()));
509 params.push(Box::new(event.tx.clone()));
510 params.push(Box::new(event.source_id.clone()));
511 params.push(Box::new(event_json));
512 }
513
514 let param_refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
515 params.iter().map(|p| p.as_ref()).collect();
516
517 let mut txn = client.transaction()?;
518 txn.execute(&sql, ¶m_refs)?;
519 txn.commit()?;
520
521 Ok(())
522}
523
524#[derive(Debug)]
526pub struct CatchUpStatus {
527 caught_up: AtomicBool,
528 failed: AtomicBool,
529 failure_reason: std::sync::RwLock<Option<String>>,
530}
531
532impl CatchUpStatus {
533 fn new() -> Self {
534 Self {
535 caught_up: AtomicBool::new(false),
536 failed: AtomicBool::new(false),
537 failure_reason: std::sync::RwLock::new(None),
538 }
539 }
540
541 pub fn is_caught_up(&self) -> bool {
543 self.caught_up.load(Ordering::SeqCst)
544 }
545
546 pub fn is_failed(&self) -> bool {
548 self.failed.load(Ordering::SeqCst)
549 }
550
551 fn fail(&self, reason: impl Into<String>) {
552 let reason = reason.into();
553 *self.failure_reason.write().unwrap() = Some(reason.clone());
554 self.failed.store(true, Ordering::SeqCst);
555 self.caught_up.store(false, Ordering::SeqCst);
556 }
557
558 pub fn wait_until_caught_up(&self, timeout: Duration) -> Result<(), String> {
560 let start = std::time::Instant::now();
561 while !self.is_caught_up() {
562 if self.is_failed() {
563 return Err(self
564 .failure_reason
565 .read()
566 .unwrap()
567 .clone()
568 .unwrap_or_else(|| "Postgres catch-up failed".to_string()));
569 }
570 if start.elapsed() >= timeout {
571 return Err(format!(
572 "Postgres catch-up timed out after {}s",
573 timeout.as_secs()
574 ));
575 }
576 std::thread::sleep(Duration::from_millis(50));
577 }
578 Ok(())
579 }
580}
581
582pub struct CellPostgresConsumer {
584 catch_up_status: Arc<CatchUpStatus>,
585 _handle: std::thread::JoinHandle<()>,
586}
587
588impl CellPostgresConsumer {
589 pub fn start(
591 config: &PostgresConfig,
592 host_id: Uuid,
593 handler_registry: Arc<HandlerRegistry>,
594 registry: Arc<StoreRegistry>,
595 ) -> Result<Self, String> {
596 validate_ident(&config.table)?;
597 validate_ident(&config.channel)?;
598
599 let catch_up_status = Arc::new(CatchUpStatus::new());
600 let status = catch_up_status.clone();
601 let cfg = config.clone();
602 let host_id_string = host_id.to_string();
603
604 let handle = std::thread::spawn(move || {
605 if let Err(err) = run_consumer_loop(
606 &cfg,
607 &host_id_string,
608 handler_registry,
609 registry,
610 status.clone(),
611 ) {
612 let reason = format!("Postgres consumer failed: {err}");
613 error!("{reason}");
614 status.fail(reason);
615 }
616 });
617
618 Ok(Self {
619 catch_up_status,
620 _handle: handle,
621 })
622 }
623
624 pub fn is_caught_up(&self) -> bool {
626 self.catch_up_status.is_caught_up()
627 }
628
629 pub fn wait_until_caught_up(&self, timeout: Duration) -> Result<(), String> {
631 self.catch_up_status.wait_until_caught_up(timeout)
632 }
633}
634
635fn run_consumer_loop(
636 config: &PostgresConfig,
637 host_id: &str,
638 handler_registry: Arc<HandlerRegistry>,
639 registry: Arc<StoreRegistry>,
640 status: Arc<CatchUpStatus>,
641) -> Result<(), String> {
642 let mut reader = connect_consumer_client("reader", config)?;
643 let mut listener = connect_listener_client(config)?;
644
645 info!(
646 "CellPostgresConsumer started (table={}, channel={})",
647 config.table, config.channel
648 );
649
650 let table = qi(&config.table);
651 let high_water_sql = format!("SELECT COALESCE(MAX(id), 0) FROM {table}");
652 let high_water_row = reader
653 .query_one(&high_water_sql, &[])
654 .map_err(|e| format_pg_error("query(high_water)", Some(&config.url), &e))?;
655 let high_water: i64 = high_water_row.get(0);
656 let snapshot_sql = format!(
657 "
658 WITH latest AS (
659 SELECT DISTINCT ON (item_type, item_id)
660 id, change_type
661 FROM {table}
662 WHERE id <= $1
663 ORDER BY item_type, item_id, id DESC
664 )
665 SELECT e.id, e.event::text
666 FROM latest
667 JOIN {table} e ON e.id = latest.id
668 ORDER BY e.id ASC
669 "
670 );
671 let snapshot_rows = reader
672 .query(&snapshot_sql, &[&high_water])
673 .map_err(|e| format_pg_error("query(snapshot latest events)", Some(&config.url), &e))?;
674 let snapshot_count = snapshot_rows.len();
675 for row in snapshot_rows {
676 let id: i64 = row.get(0);
677 let event_json: String = row.get(1);
678 match MEvent::from_str_trim(&event_json) {
679 Ok(event) => {
680 apply_remote_event(event, host_id, &handler_registry, ®istry);
681 }
682 Err(err) => {
683 error!("Invalid postgres snapshot row id={id}: {err}");
684 }
685 }
686 }
687 info!(
688 "Postgres snapshot loaded latest state rows={} high_water={}",
689 snapshot_count, high_water
690 );
691
692 let fetch_sql =
693 format!("SELECT id, event::text FROM {table} WHERE id > $1 ORDER BY id ASC LIMIT $2");
694 let mut last_seen_id: i64 = high_water;
695 let mut initial_done = false;
696
697 loop {
698 let rows = match reader.query(&fetch_sql, &[&last_seen_id, &1000_i64]) {
699 Ok(rows) => rows,
700 Err(err) => {
701 warn!(
702 "{}",
703 format_pg_error("query(fetch events)", Some(&config.url), &err)
704 );
705 std::thread::sleep(Duration::from_millis(500));
706 reader = connect_consumer_client("reader", config)?;
707 continue;
708 }
709 };
710 if !rows.is_empty() {
711 for row in rows {
712 let id: i64 = row.get(0);
713 let event_json: String = row.get(1);
714 last_seen_id = id;
715
716 match MEvent::from_str_trim(&event_json) {
717 Ok(event) => {
718 apply_remote_event(event, host_id, &handler_registry, ®istry);
719 }
720 Err(err) => {
721 error!("Invalid postgres event row id={id}: {err}");
722 }
723 }
724 }
725 continue;
726 }
727
728 if !initial_done {
729 initial_done = true;
730 status.caught_up.store(true, Ordering::SeqCst);
731 info!("Postgres consumer caught up at event_id={last_seen_id}");
732 }
733
734 let mut notified = false;
735 let mut reconnect_listener = false;
736 {
737 let mut notifications = listener.notifications();
738 let mut iter = notifications.timeout_iter(Duration::from_millis(500));
739 match iter.next() {
740 Ok(Some(_n)) => {
741 notified = true;
742 }
743 Ok(None) => {}
744 Err(err) => {
745 warn!("Postgres LISTEN error: {err}; reconnecting listener");
746 reconnect_listener = true;
747 }
748 }
749 }
750 if reconnect_listener {
751 std::thread::sleep(Duration::from_millis(500));
752 listener = connect_listener_client(config)?;
753 continue;
755 }
756
757 if !notified {
758 trace!("Postgres consumer poll tick (no notification)");
759 }
760 }
761}
762
763fn connect_consumer_client(role: &str, config: &PostgresConfig) -> Result<Client, String> {
764 let mut backoff_ms = 250u64;
765 loop {
766 match connect_pg_client(config, role) {
767 Ok(mut client) => {
768 if let Err(err) = ensure_schema(&mut client, config) {
769 warn!(
770 "{}",
771 format_pg_error(&format!("ensure_schema({role})"), Some(&config.url), &err)
772 );
773 }
774 return Ok(client);
775 }
776 Err(err) => {
777 warn!("{err}");
778 std::thread::sleep(Duration::from_millis(backoff_ms));
779 backoff_ms = (backoff_ms * 2).min(5_000);
780 }
781 }
782 }
783}
784
785fn connect_listener_client(config: &PostgresConfig) -> Result<Client, String> {
786 let mut backoff_ms = 250u64;
787 loop {
788 let mut client = connect_consumer_client("listener", config)?;
789 match client.batch_execute(&format!("LISTEN {};", qi(&config.channel))) {
790 Ok(()) => return Ok(client),
791 Err(err) => {
792 warn!(
793 "{}",
794 format_pg_error("LISTEN(register)", Some(&config.url), &err)
795 );
796 std::thread::sleep(Duration::from_millis(backoff_ms));
797 backoff_ms = (backoff_ms * 2).min(5_000);
798 }
799 }
800 }
801}
802
803fn apply_remote_event(
804 event: MEvent,
805 host_id: &str,
806 handler_registry: &Arc<HandlerRegistry>,
807 registry: &Arc<StoreRegistry>,
808) {
809 let is_my_event = event.source_id.as_ref().is_some_and(|id| id == host_id);
810 if is_my_event {
811 return;
812 }
813
814 match event.change_type {
815 MEventType::SET => {
816 if let Some(parse) = handler_registry.get_item_parser(&event.item_type) {
817 match parse(event.item.clone()) {
818 Ok(item) => {
819 let store = registry.get_or_create(item.entity_type());
820 store.insert(item.id(), item);
821 }
822 Err(e) => {
823 let msg = e.to_string();
824 let short = msg
825 .find(", expected one of")
826 .map(|pos| msg[..pos].to_string())
827 .unwrap_or(msg);
828 error!("Failed to parse {}: {short}", event.item_type);
829 }
830 }
831 } else {
832 warn!("No parser for entity type: {}", event.item_type);
833 }
834 }
835 MEventType::DEL => {
836 if let Some(id) = event.item.get("id").and_then(|v| v.as_str()) {
837 let store = registry.get_or_create(&event.item_type);
838 store.remove(&id.into());
839 } else {
840 error!("DEL event missing id field: {:?}", event.item);
841 }
842 }
843 }
844}
845
846fn ensure_schema(client: &mut Client, config: &PostgresConfig) -> Result<(), ::postgres::Error> {
847 let table = qi(&config.table);
848 let idx_tx = qi(&format!("{}_tx_idx", config.table));
849 let idx_item = qi(&format!("{}_item_type_item_id_idx", config.table));
850 let idx_item_latest = qi(&format!("{}_item_latest_idx", config.table));
851 let idx_created = qi(&format!("{}_created_at_idx", config.table));
852 let trigger_fn = qi(&format!("{}_notify_insert_fn", config.table));
853 let trigger_name = qi(&format!("{}_notify_insert_trigger", config.table));
854
855 client.batch_execute(&format!(
856 "
857 CREATE TABLE IF NOT EXISTS {table} (
858 id BIGSERIAL PRIMARY KEY,
859 item_type TEXT NOT NULL,
860 item_id TEXT NOT NULL,
861 change_type TEXT NOT NULL,
862 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
863 tx TEXT NOT NULL,
864 source_id TEXT,
865 event JSONB NOT NULL
866 );
867 CREATE INDEX IF NOT EXISTS {idx_tx} ON {table} (tx);
868 CREATE INDEX IF NOT EXISTS {idx_item} ON {table} (item_type, item_id);
869 CREATE INDEX IF NOT EXISTS {idx_item_latest} ON {table} (item_type, item_id, id DESC);
870 CREATE INDEX IF NOT EXISTS {idx_created} ON {table} (created_at);
871 CREATE OR REPLACE FUNCTION {trigger_fn}() RETURNS trigger AS $$
872 BEGIN
873 PERFORM pg_notify('{channel}', NEW.id::text);
874 RETURN NEW;
875 END;
876 $$ LANGUAGE plpgsql;
877 DROP TRIGGER IF EXISTS {trigger_name} ON {table};
878 CREATE TRIGGER {trigger_name}
879 AFTER INSERT ON {table}
880 FOR EACH ROW
881 EXECUTE FUNCTION {trigger_fn}();
882 ",
883 channel = config.channel
884 ))?;
885
886 Ok(())
887}
888
889fn validate_ident(name: &str) -> Result<(), String> {
890 if name.is_empty() {
891 return Err("identifier cannot be empty".to_string());
892 }
893 let mut chars = name.chars();
894 let first = chars
895 .next()
896 .ok_or_else(|| "identifier cannot be empty".to_string())?;
897 if !(first == '_' || first.is_ascii_alphabetic()) {
898 return Err(format!(
899 "invalid identifier `{name}`: must start with letter or underscore"
900 ));
901 }
902 if !chars.all(|c| c == '_' || c.is_ascii_alphanumeric()) {
903 return Err(format!(
904 "invalid identifier `{name}`: only letters, numbers, underscore are allowed"
905 ));
906 }
907 Ok(())
908}
909
910fn qi(name: &str) -> String {
911 format!("\"{name}\"")
912}
913
914fn row_to_persisted_event(row: ::postgres::Row) -> Result<PersistedEvent, String> {
915 let id: i64 = row.get(0);
916 let created_at: String = row.get(1);
917 let event_json: String = row.get(2);
918 let event = MEvent::from_str_trim(&event_json)
919 .map_err(|e| format!("invalid history event payload for id={id}: {e}"))?;
920 Ok(PersistedEvent {
921 id,
922 created_at,
923 event,
924 })
925}
926
927fn redact_pg_url(url: &str) -> String {
928 match url.rfind('@') {
929 Some(at) => {
930 let after_scheme = url.find("://").map(|idx| idx + 3).unwrap_or(0);
931 format!("{}***{}", &url[..after_scheme], &url[at..])
932 }
933 None => url.to_string(),
934 }
935}
936
937fn format_pg_connect_error(role: &str, url: &str, err: &postgres::Error) -> String {
938 format_pg_error(role, Some(url), err)
939}
940
941fn format_pg_error(role: &str, url: Option<&str>, err: &postgres::Error) -> String {
942 let mut msg = match url {
943 Some(url) => format!("{role} failed (dsn={}): {}", redact_pg_url(url), err),
944 None => format!("{role} failed: {err}"),
945 };
946 if let Some(db) = err.as_db_error() {
947 msg.push_str(&format!(
948 " [code={} severity={} message={}]",
949 db.code().code(),
950 db.severity(),
951 db.message()
952 ));
953 if let Some(detail) = db.detail() {
954 msg.push_str(&format!(" [detail={}]", detail));
955 }
956 if let Some(hint) = db.hint() {
957 msg.push_str(&format!(" [hint={}]", hint));
958 }
959 }
960 msg
961}
962
963fn connect_pg_client(config: &PostgresConfig, role: &str) -> Result<Client, String> {
964 let mut client_config = parse_pg_client_config(config, role)?;
965
966 client_config.connect_timeout(Duration::from_secs(PG_CONNECT_TIMEOUT_SECS));
968 client_config.keepalives(true);
969 client_config.keepalives_idle(Duration::from_secs(PG_KEEPALIVE_IDLE_SECS));
970 client_config.keepalives_interval(Duration::from_secs(PG_KEEPALIVE_INTERVAL_SECS));
971 client_config.keepalives_retries(PG_KEEPALIVE_RETRIES);
972
973 client_config
974 .connect(NoTls)
975 .map_err(|err| format_pg_connect_error(role, &config.url, &err))
976}
977
978fn parse_pg_client_config(config: &PostgresConfig, role: &str) -> Result<PgClientConfig, String> {
979 config.url.parse::<PgClientConfig>().map_err(|err| {
980 format!(
981 "postgres config parse failed ({role}, dsn={}): {err}",
982 redact_pg_url(&config.url)
983 )
984 })
985}