1use std::{
8 sync::{
9 Arc,
10 atomic::{AtomicBool, Ordering},
11 },
12 time::Duration,
13};
14
15use ::postgres::{Client, Config as PgClientConfig, NoTls};
16use log::{error, info, trace, warn};
17use myko::{
18 event::{MEvent, MEventType},
19 server::{HandlerRegistry, PersistError, PersistHealth, Persister},
20 store::StoreRegistry,
21};
22use postgres::fallible_iterator::FallibleIterator;
23use uuid::Uuid;
24
25const PG_CONNECT_TIMEOUT_SECS: u64 = 10;
26const PG_KEEPALIVE_IDLE_SECS: u64 = 30;
27const PG_KEEPALIVE_INTERVAL_SECS: u64 = 10;
28const PG_KEEPALIVE_RETRIES: u32 = 3;
29const PG_PRODUCER_MAX_BATCH: usize = 256;
30
31#[derive(Debug, Clone)]
33pub struct PostgresConfig {
34 pub url: String,
36 pub table: String,
38 pub channel: String,
40}
41
42#[derive(Debug, Clone)]
44pub struct PersistedEvent {
45 pub id: i64,
46 pub created_at: String,
47 pub event: MEvent,
48}
49
50impl PostgresConfig {
51 pub fn from_env() -> Option<Self> {
57 let url = std::env::var("MYKO_POSTGRES_URL").ok()?;
58 let table = std::env::var("MYKO_POSTGRES_TABLE").unwrap_or_else(|_| "myko_events".into());
59 let channel =
60 std::env::var("MYKO_POSTGRES_CHANNEL").unwrap_or_else(|_| "myko_events_notify".into());
61 Some(Self {
62 url,
63 table,
64 channel,
65 })
66 }
67}
68
69pub struct PostgresHistoryStore {
71 config: PostgresConfig,
72}
73
74impl PostgresHistoryStore {
75 pub fn new(config: PostgresConfig) -> Result<Self, String> {
77 validate_ident(&config.table)?;
78 validate_ident(&config.channel)?;
79 Ok(Self { config })
80 }
81
82 pub fn load_after_id(&self, after_id: i64, limit: i64) -> Result<Vec<PersistedEvent>, String> {
84 let mut client = connect_pg_client(&self.config, "history(load_after_id)")?;
85 let sql = format!(
86 "SELECT id, created_at::text, event::text FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
87 qi(&self.config.table)
88 );
89 let rows = client
90 .query(&sql, &[&after_id, &limit])
91 .map_err(|e| format!("history query failed: {e}"))?;
92 rows.into_iter()
93 .map(row_to_persisted_event)
94 .collect::<Result<Vec<_>, _>>()
95 }
96
97 pub fn load_until(
99 &self,
100 after_id: i64,
101 until: &str,
102 limit: i64,
103 ) -> Result<Vec<PersistedEvent>, String> {
104 let mut client = connect_pg_client(&self.config, "history(load_until)")?;
105 if !until
107 .chars()
108 .all(|c| c.is_ascii_alphanumeric() || "-.:+TZ ".contains(c))
109 {
110 return Err(format!("Invalid timestamp format: {}", until));
111 }
112 let sql = format!(
113 "SELECT id, created_at::text, event::text FROM {} WHERE id > {} AND created_at <= '{}'::timestamptz ORDER BY id ASC LIMIT {}",
114 qi(&self.config.table),
115 after_id,
116 until,
117 limit
118 );
119 let rows = client
120 .query(&sql, &[])
121 .map_err(|e| format!("history query failed: {e}"))?;
122 rows.into_iter()
123 .map(row_to_persisted_event)
124 .collect::<Result<Vec<_>, _>>()
125 }
126
127 pub fn load_between(
129 &self,
130 from_iso: &str,
131 to_iso: &str,
132 limit: i64,
133 ) -> Result<Vec<PersistedEvent>, String> {
134 let mut client = connect_pg_client(&self.config, "history(load_between)")?;
135 let sql = format!(
136 "SELECT id, created_at::text, event::text FROM {} WHERE created_at >= $1::timestamptz AND created_at <= $2::timestamptz ORDER BY id ASC LIMIT $3",
137 qi(&self.config.table)
138 );
139 let rows = client
140 .query(&sql, &[&from_iso, &to_iso, &limit])
141 .map_err(|e| format!("history query failed: {e}"))?;
142 rows.into_iter()
143 .map(row_to_persisted_event)
144 .collect::<Result<Vec<_>, _>>()
145 }
146}
147
148pub struct PostgresHistoryReplayProvider {
150 config: PostgresConfig,
151}
152
153impl PostgresHistoryReplayProvider {
154 pub fn new(config: PostgresConfig) -> Self {
155 Self { config }
156 }
157}
158
159impl myko::server::HistoryReplayProvider for PostgresHistoryReplayProvider {
160 fn replay_to_store(
161 &self,
162 until: &str,
163 handler_registry: &HandlerRegistry,
164 ) -> Result<Arc<StoreRegistry>, String> {
165 eprintln!(
166 "[HistoryReplay] loading snapshot as of {} from {}",
167 until, self.config.url
168 );
169
170 if !until
172 .chars()
173 .all(|c| c.is_ascii_alphanumeric() || "-.:+TZ ".contains(c))
174 {
175 return Err(format!("Invalid timestamp format: {}", until));
176 }
177
178 let mut client = connect_pg_client(&self.config, "history_replay")?;
179 let table = qi(&self.config.table);
180 let registry = StoreRegistry::new();
181
182 let sql = format!(
186 "
187 WITH latest AS (
188 SELECT DISTINCT ON (item_type, item_id)
189 id, change_type
190 FROM {table}
191 WHERE created_at <= '{until}'::timestamptz
192 ORDER BY item_type, item_id, id DESC
193 )
194 SELECT e.event::text
195 FROM latest
196 JOIN {table} e ON e.id = latest.id
197 WHERE latest.change_type = 'SET'
198 ORDER BY e.id ASC
199 "
200 );
201
202 let rows = client
203 .query(&sql, &[])
204 .map_err(|e| format!("history snapshot query failed: {e}"))?;
205
206 let mut count = 0usize;
207 for row in &rows {
208 let event_json: String = row.get(0);
209 match MEvent::from_str_trim(&event_json) {
210 Ok(event) => {
211 if let Some(parse) = handler_registry.get_item_parser(&event.item_type)
212 && let Ok(item) = parse(event.item.clone())
213 {
214 let store = registry.get_or_create(&event.item_type);
215 store.insert(item.id(), item);
216 count += 1;
217 }
218 }
219 Err(err) => {
220 eprintln!("[HistoryReplay] invalid event row: {}", err);
221 }
222 }
223 }
224
225 eprintln!(
226 "[HistoryReplay] loaded {} entities from {} rows as of {}",
227 count,
228 rows.len(),
229 until
230 );
231
232 Ok(Arc::new(registry))
233 }
234}
235
236type ProducerRequest = MEvent;
237
238#[derive(Clone)]
240pub struct PostgresProducerHandle {
241 sender: flume::Sender<ProducerRequest>,
242 host_id: Uuid,
243 config: PostgresConfig,
244 health: Arc<PersistHealth>,
245}
246
247impl PostgresProducerHandle {
248 pub fn produce(&self, mut event: MEvent) -> Result<(), PersistError> {
251 if event.source_id.is_none() {
252 event.source_id = Some(self.host_id.to_string());
253 }
254 let entity_type = event.item_type.clone();
255
256 match self.sender.send(event) {
257 Ok(()) => {
258 self.health.record_enqueue();
259 Ok(())
260 }
261 Err(_) => {
262 let msg = "Postgres producer thread not running".to_string();
263 self.health.record_dropped(msg.clone());
264 Err(PersistError {
265 entity_type,
266 message: msg,
267 })
268 }
269 }
270 }
271}
272
273impl Persister for PostgresProducerHandle {
274 fn persist(&self, event: MEvent) -> Result<(), PersistError> {
275 self.produce(event)
276 }
277
278 fn health(&self) -> Arc<PersistHealth> {
279 self.health.clone()
280 }
281
282 fn startup_healthcheck(&self) -> Result<(), String> {
283 validate_ident(&self.config.table)?;
284 validate_ident(&self.config.channel)?;
285 Ok(())
286 }
287}
288
289pub struct CellPostgresProducer {
291 handle: PostgresProducerHandle,
292}
293
294impl CellPostgresProducer {
295 pub fn new(config: &PostgresConfig, host_id: Uuid) -> Result<Self, String> {
297 validate_ident(&config.table)?;
298 validate_ident(&config.channel)?;
299
300 let health = Arc::new(PersistHealth::default());
301 let (tx, rx) = flume::unbounded::<ProducerRequest>();
302 let cfg = config.clone();
303 let thread_health = health.clone();
304 std::thread::spawn(move || run_producer_loop(cfg, rx, thread_health));
305
306 Ok(Self {
307 handle: PostgresProducerHandle {
308 sender: tx,
309 host_id,
310 config: config.clone(),
311 health,
312 },
313 })
314 }
315
316 pub fn handle(&self) -> PostgresProducerHandle {
318 self.handle.clone()
319 }
320}
321
322fn run_producer_loop(
323 config: PostgresConfig,
324 rx: flume::Receiver<ProducerRequest>,
325 health: Arc<PersistHealth>,
326) {
327 let mut client: Option<Client> = None;
328 let mut retry_batch: Vec<MEvent> = Vec::new();
329
330 loop {
331 let mut batch: Vec<MEvent> = Vec::new();
333 if !retry_batch.is_empty() {
334 std::mem::swap(&mut batch, &mut retry_batch);
335 } else {
336 match rx.recv() {
337 Ok(ev) => batch.push(ev),
338 Err(_) => break, }
340 }
341 while batch.len() < PG_PRODUCER_MAX_BATCH {
343 match rx.try_recv() {
344 Ok(ev) => batch.push(ev),
345 Err(_) => break,
346 }
347 }
348
349 if client.is_none() {
350 client = connect_producer_client(&config);
351 }
352
353 let batch_len = batch.len();
354 if let Some(c) = client.as_mut() {
355 match insert_event_batch(c, &config, &batch) {
356 Ok(()) => {
357 health.record_success_batch(batch_len as u64);
358 }
359 Err(err) => {
360 let msg =
361 format_pg_error("insert_event_batch(producer)", Some(&config.url), &err);
362 error!("{}", msg);
363 health.record_error_no_dequeue(msg);
364 client = None;
365 retry_batch = batch;
366 std::thread::sleep(Duration::from_secs(1));
368 }
369 }
370 } else {
371 let msg = format!(
372 "Postgres producer connection failed (url: {})",
373 redact_pg_url(&config.url),
374 );
375 error!("{}", msg);
376 health.record_error_no_dequeue(msg);
377 retry_batch = batch;
378 std::thread::sleep(Duration::from_secs(1));
379 }
380 }
381}
382
383fn connect_producer_client(config: &PostgresConfig) -> Option<Client> {
384 match connect_pg_client(config, "producer") {
385 Ok(mut c) => match ensure_schema(&mut c, config) {
386 Ok(()) => Some(c),
387 Err(err) => {
388 error!(
389 "{}",
390 format_pg_error("ensure_schema(producer)", Some(&config.url), &err)
391 );
392 None
393 }
394 },
395 Err(err) => {
396 error!("{err}");
397 None
398 }
399 }
400}
401
402fn insert_event_batch(
403 client: &mut Client,
404 config: &PostgresConfig,
405 events: &[MEvent],
406) -> Result<(), ::postgres::Error> {
407 if events.is_empty() {
408 return Ok(());
409 }
410
411 let table = qi(&config.table);
412
413 if events.len() == 1 {
415 let event = &events[0];
416 let sql = format!(
417 "INSERT INTO {table} (item_type, item_id, change_type, created_at, tx, source_id, event) VALUES ($1, $2, $3, ($4::text)::timestamptz, $5, ($6::text), ($7::text)::jsonb)"
418 );
419 let item_id = event
420 .item
421 .get("id")
422 .and_then(|v| v.as_str())
423 .unwrap_or("unknown")
424 .to_string();
425 let event_json = serde_json::to_string(event).unwrap_or_else(|_| "{}".to_string());
426 let change_type = match event.change_type {
427 MEventType::SET => "SET",
428 MEventType::DEL => "DEL",
429 };
430 client.execute(
431 &sql,
432 &[
433 &event.item_type,
434 &item_id,
435 &change_type,
436 &event.created_at,
437 &event.tx,
438 &event.source_id,
439 &event_json,
440 ],
441 )?;
442 return Ok(());
443 }
444
445 let mut sql = format!(
447 "INSERT INTO {table} (item_type, item_id, change_type, created_at, tx, source_id, event) VALUES "
448 );
449 let mut params: Vec<Box<dyn postgres::types::ToSql + Sync>> =
450 Vec::with_capacity(events.len() * 7);
451 for (i, event) in events.iter().enumerate() {
452 if i > 0 {
453 sql.push_str(", ");
454 }
455 let base = i * 7;
456 sql.push_str(&format!(
457 "(${}, ${}, ${}, (${}::text)::timestamptz, ${}, (${}::text), (${}::text)::jsonb)",
458 base + 1,
459 base + 2,
460 base + 3,
461 base + 4,
462 base + 5,
463 base + 6,
464 base + 7
465 ));
466 let item_id = event
467 .item
468 .get("id")
469 .and_then(|v| v.as_str())
470 .unwrap_or("unknown")
471 .to_string();
472 let event_json = serde_json::to_string(event).unwrap_or_else(|_| "{}".to_string());
473 let change_type = match event.change_type {
474 MEventType::SET => "SET",
475 MEventType::DEL => "DEL",
476 };
477 params.push(Box::new(event.item_type.clone()));
478 params.push(Box::new(item_id));
479 params.push(Box::new(change_type.to_string()));
480 params.push(Box::new(event.created_at.clone()));
481 params.push(Box::new(event.tx.clone()));
482 params.push(Box::new(event.source_id.clone()));
483 params.push(Box::new(event_json));
484 }
485
486 let param_refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
487 params.iter().map(|p| p.as_ref()).collect();
488
489 let mut txn = client.transaction()?;
490 txn.execute(&sql, ¶m_refs)?;
491 txn.commit()?;
492
493 Ok(())
494}
495
496#[derive(Debug)]
498pub struct CatchUpStatus {
499 caught_up: AtomicBool,
500 failed: AtomicBool,
501 failure_reason: std::sync::RwLock<Option<String>>,
502}
503
504impl CatchUpStatus {
505 fn new() -> Self {
506 Self {
507 caught_up: AtomicBool::new(false),
508 failed: AtomicBool::new(false),
509 failure_reason: std::sync::RwLock::new(None),
510 }
511 }
512
513 pub fn is_caught_up(&self) -> bool {
515 self.caught_up.load(Ordering::SeqCst)
516 }
517
518 pub fn is_failed(&self) -> bool {
520 self.failed.load(Ordering::SeqCst)
521 }
522
523 fn fail(&self, reason: impl Into<String>) {
524 let reason = reason.into();
525 *self.failure_reason.write().unwrap() = Some(reason.clone());
526 self.failed.store(true, Ordering::SeqCst);
527 self.caught_up.store(false, Ordering::SeqCst);
528 }
529
530 pub fn wait_until_caught_up(&self, timeout: Duration) -> Result<(), String> {
532 let start = std::time::Instant::now();
533 while !self.is_caught_up() {
534 if self.is_failed() {
535 return Err(self
536 .failure_reason
537 .read()
538 .unwrap()
539 .clone()
540 .unwrap_or_else(|| "Postgres catch-up failed".to_string()));
541 }
542 if start.elapsed() >= timeout {
543 return Err(format!(
544 "Postgres catch-up timed out after {}s",
545 timeout.as_secs()
546 ));
547 }
548 std::thread::sleep(Duration::from_millis(50));
549 }
550 Ok(())
551 }
552}
553
554pub struct CellPostgresConsumer {
556 catch_up_status: Arc<CatchUpStatus>,
557 _handle: std::thread::JoinHandle<()>,
558}
559
560impl CellPostgresConsumer {
561 pub fn start(
563 config: &PostgresConfig,
564 host_id: Uuid,
565 handler_registry: Arc<HandlerRegistry>,
566 registry: Arc<StoreRegistry>,
567 ) -> Result<Self, String> {
568 validate_ident(&config.table)?;
569 validate_ident(&config.channel)?;
570
571 let catch_up_status = Arc::new(CatchUpStatus::new());
572 let status = catch_up_status.clone();
573 let cfg = config.clone();
574 let host_id_string = host_id.to_string();
575
576 let handle = std::thread::spawn(move || {
577 if let Err(err) = run_consumer_loop(
578 &cfg,
579 &host_id_string,
580 handler_registry,
581 registry,
582 status.clone(),
583 ) {
584 let reason = format!("Postgres consumer failed: {err}");
585 error!("{reason}");
586 status.fail(reason);
587 }
588 });
589
590 Ok(Self {
591 catch_up_status,
592 _handle: handle,
593 })
594 }
595
596 pub fn is_caught_up(&self) -> bool {
598 self.catch_up_status.is_caught_up()
599 }
600
601 pub fn wait_until_caught_up(&self, timeout: Duration) -> Result<(), String> {
603 self.catch_up_status.wait_until_caught_up(timeout)
604 }
605}
606
607fn run_consumer_loop(
608 config: &PostgresConfig,
609 host_id: &str,
610 handler_registry: Arc<HandlerRegistry>,
611 registry: Arc<StoreRegistry>,
612 status: Arc<CatchUpStatus>,
613) -> Result<(), String> {
614 let mut reader = connect_consumer_client("reader", config)?;
615 let mut listener = connect_listener_client(config)?;
616
617 info!(
618 "CellPostgresConsumer started (table={}, channel={})",
619 config.table, config.channel
620 );
621
622 let table = qi(&config.table);
623 let high_water_sql = format!("SELECT COALESCE(MAX(id), 0) FROM {table}");
624 let high_water_row = reader
625 .query_one(&high_water_sql, &[])
626 .map_err(|e| format_pg_error("query(high_water)", Some(&config.url), &e))?;
627 let high_water: i64 = high_water_row.get(0);
628 let snapshot_sql = format!(
629 "
630 WITH latest AS (
631 SELECT DISTINCT ON (item_type, item_id)
632 id, change_type
633 FROM {table}
634 WHERE id <= $1
635 ORDER BY item_type, item_id, id DESC
636 )
637 SELECT e.id, e.event::text
638 FROM latest
639 JOIN {table} e ON e.id = latest.id
640 ORDER BY e.id ASC
641 "
642 );
643 let snapshot_rows = reader
644 .query(&snapshot_sql, &[&high_water])
645 .map_err(|e| format_pg_error("query(snapshot latest events)", Some(&config.url), &e))?;
646 let snapshot_count = snapshot_rows.len();
647 for row in snapshot_rows {
648 let id: i64 = row.get(0);
649 let event_json: String = row.get(1);
650 match MEvent::from_str_trim(&event_json) {
651 Ok(event) => {
652 apply_remote_event(event, host_id, &handler_registry, ®istry);
653 }
654 Err(err) => {
655 error!("Invalid postgres snapshot row id={id}: {err}");
656 }
657 }
658 }
659 info!(
660 "Postgres snapshot loaded latest state rows={} high_water={}",
661 snapshot_count, high_water
662 );
663
664 let fetch_sql =
665 format!("SELECT id, event::text FROM {table} WHERE id > $1 ORDER BY id ASC LIMIT $2");
666 let mut last_seen_id: i64 = high_water;
667 let mut initial_done = false;
668
669 loop {
670 let rows = match reader.query(&fetch_sql, &[&last_seen_id, &1000_i64]) {
671 Ok(rows) => rows,
672 Err(err) => {
673 warn!(
674 "{}",
675 format_pg_error("query(fetch events)", Some(&config.url), &err)
676 );
677 std::thread::sleep(Duration::from_millis(500));
678 reader = connect_consumer_client("reader", config)?;
679 continue;
680 }
681 };
682 if !rows.is_empty() {
683 for row in rows {
684 let id: i64 = row.get(0);
685 let event_json: String = row.get(1);
686 last_seen_id = id;
687
688 match MEvent::from_str_trim(&event_json) {
689 Ok(event) => {
690 apply_remote_event(event, host_id, &handler_registry, ®istry);
691 }
692 Err(err) => {
693 error!("Invalid postgres event row id={id}: {err}");
694 }
695 }
696 }
697 continue;
698 }
699
700 if !initial_done {
701 initial_done = true;
702 status.caught_up.store(true, Ordering::SeqCst);
703 info!("Postgres consumer caught up at event_id={last_seen_id}");
704 }
705
706 let mut notified = false;
707 let mut reconnect_listener = false;
708 {
709 let mut notifications = listener.notifications();
710 let mut iter = notifications.timeout_iter(Duration::from_millis(500));
711 match iter.next() {
712 Ok(Some(_n)) => {
713 notified = true;
714 }
715 Ok(None) => {}
716 Err(err) => {
717 warn!("Postgres LISTEN error: {err}; reconnecting listener");
718 reconnect_listener = true;
719 }
720 }
721 }
722 if reconnect_listener {
723 std::thread::sleep(Duration::from_millis(500));
724 listener = connect_listener_client(config)?;
725 continue;
727 }
728
729 if !notified {
730 trace!("Postgres consumer poll tick (no notification)");
731 }
732 }
733}
734
735fn connect_consumer_client(role: &str, config: &PostgresConfig) -> Result<Client, String> {
736 let mut backoff_ms = 250u64;
737 loop {
738 match connect_pg_client(config, role) {
739 Ok(mut client) => {
740 if let Err(err) = ensure_schema(&mut client, config) {
741 warn!(
742 "{}",
743 format_pg_error(&format!("ensure_schema({role})"), Some(&config.url), &err)
744 );
745 }
746 return Ok(client);
747 }
748 Err(err) => {
749 warn!("{err}");
750 std::thread::sleep(Duration::from_millis(backoff_ms));
751 backoff_ms = (backoff_ms * 2).min(5_000);
752 }
753 }
754 }
755}
756
757fn connect_listener_client(config: &PostgresConfig) -> Result<Client, String> {
758 let mut backoff_ms = 250u64;
759 loop {
760 let mut client = connect_consumer_client("listener", config)?;
761 match client.batch_execute(&format!("LISTEN {};", qi(&config.channel))) {
762 Ok(()) => return Ok(client),
763 Err(err) => {
764 warn!(
765 "{}",
766 format_pg_error("LISTEN(register)", Some(&config.url), &err)
767 );
768 std::thread::sleep(Duration::from_millis(backoff_ms));
769 backoff_ms = (backoff_ms * 2).min(5_000);
770 }
771 }
772 }
773}
774
775fn apply_remote_event(
776 event: MEvent,
777 host_id: &str,
778 handler_registry: &Arc<HandlerRegistry>,
779 registry: &Arc<StoreRegistry>,
780) {
781 let is_my_event = event.source_id.as_ref().is_some_and(|id| id == host_id);
782 if is_my_event {
783 return;
784 }
785
786 match event.change_type {
787 MEventType::SET => {
788 if let Some(parse) = handler_registry.get_item_parser(&event.item_type) {
789 match parse(event.item.clone()) {
790 Ok(item) => {
791 let store = registry.get_or_create(item.entity_type());
792 store.insert(item.id(), item);
793 }
794 Err(e) => {
795 let msg = e.to_string();
796 let short = msg
797 .find(", expected one of")
798 .map(|pos| msg[..pos].to_string())
799 .unwrap_or(msg);
800 error!("Failed to parse {}: {short}", event.item_type);
801 }
802 }
803 } else {
804 warn!("No parser for entity type: {}", event.item_type);
805 }
806 }
807 MEventType::DEL => {
808 if let Some(id) = event.item.get("id").and_then(|v| v.as_str()) {
809 let store = registry.get_or_create(&event.item_type);
810 store.remove(&id.into());
811 } else {
812 error!("DEL event missing id field: {:?}", event.item);
813 }
814 }
815 }
816}
817
818fn ensure_schema(client: &mut Client, config: &PostgresConfig) -> Result<(), ::postgres::Error> {
819 let table = qi(&config.table);
820 let idx_tx = qi(&format!("{}_tx_idx", config.table));
821 let idx_item = qi(&format!("{}_item_type_item_id_idx", config.table));
822 let idx_item_latest = qi(&format!("{}_item_latest_idx", config.table));
823 let idx_created = qi(&format!("{}_created_at_idx", config.table));
824 let trigger_fn = qi(&format!("{}_notify_insert_fn", config.table));
825 let trigger_name = qi(&format!("{}_notify_insert_trigger", config.table));
826
827 client.batch_execute(&format!(
828 "
829 CREATE TABLE IF NOT EXISTS {table} (
830 id BIGSERIAL PRIMARY KEY,
831 item_type TEXT NOT NULL,
832 item_id TEXT NOT NULL,
833 change_type TEXT NOT NULL,
834 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
835 tx TEXT NOT NULL,
836 source_id TEXT,
837 event JSONB NOT NULL
838 );
839 CREATE INDEX IF NOT EXISTS {idx_tx} ON {table} (tx);
840 CREATE INDEX IF NOT EXISTS {idx_item} ON {table} (item_type, item_id);
841 CREATE INDEX IF NOT EXISTS {idx_item_latest} ON {table} (item_type, item_id, id DESC);
842 CREATE INDEX IF NOT EXISTS {idx_created} ON {table} (created_at);
843 CREATE OR REPLACE FUNCTION {trigger_fn}() RETURNS trigger AS $$
844 BEGIN
845 PERFORM pg_notify('{channel}', NEW.id::text);
846 RETURN NEW;
847 END;
848 $$ LANGUAGE plpgsql;
849 DROP TRIGGER IF EXISTS {trigger_name} ON {table};
850 CREATE TRIGGER {trigger_name}
851 AFTER INSERT ON {table}
852 FOR EACH ROW
853 EXECUTE FUNCTION {trigger_fn}();
854 ",
855 channel = config.channel
856 ))?;
857
858 Ok(())
859}
860
861fn validate_ident(name: &str) -> Result<(), String> {
862 if name.is_empty() {
863 return Err("identifier cannot be empty".to_string());
864 }
865 let mut chars = name.chars();
866 let first = chars
867 .next()
868 .ok_or_else(|| "identifier cannot be empty".to_string())?;
869 if !(first == '_' || first.is_ascii_alphabetic()) {
870 return Err(format!(
871 "invalid identifier `{name}`: must start with letter or underscore"
872 ));
873 }
874 if !chars.all(|c| c == '_' || c.is_ascii_alphanumeric()) {
875 return Err(format!(
876 "invalid identifier `{name}`: only letters, numbers, underscore are allowed"
877 ));
878 }
879 Ok(())
880}
881
882fn qi(name: &str) -> String {
883 format!("\"{name}\"")
884}
885
886fn row_to_persisted_event(row: ::postgres::Row) -> Result<PersistedEvent, String> {
887 let id: i64 = row.get(0);
888 let created_at: String = row.get(1);
889 let event_json: String = row.get(2);
890 let event = MEvent::from_str_trim(&event_json)
891 .map_err(|e| format!("invalid history event payload for id={id}: {e}"))?;
892 Ok(PersistedEvent {
893 id,
894 created_at,
895 event,
896 })
897}
898
899fn redact_pg_url(url: &str) -> String {
900 match url.rfind('@') {
901 Some(at) => {
902 let after_scheme = url.find("://").map(|idx| idx + 3).unwrap_or(0);
903 format!("{}***{}", &url[..after_scheme], &url[at..])
904 }
905 None => url.to_string(),
906 }
907}
908
909fn format_pg_connect_error(role: &str, url: &str, err: &postgres::Error) -> String {
910 format_pg_error(role, Some(url), err)
911}
912
913fn format_pg_error(role: &str, url: Option<&str>, err: &postgres::Error) -> String {
914 let mut msg = match url {
915 Some(url) => format!("{role} failed (dsn={}): {}", redact_pg_url(url), err),
916 None => format!("{role} failed: {err}"),
917 };
918 if let Some(db) = err.as_db_error() {
919 msg.push_str(&format!(
920 " [code={} severity={} message={}]",
921 db.code().code(),
922 db.severity(),
923 db.message()
924 ));
925 if let Some(detail) = db.detail() {
926 msg.push_str(&format!(" [detail={}]", detail));
927 }
928 if let Some(hint) = db.hint() {
929 msg.push_str(&format!(" [hint={}]", hint));
930 }
931 }
932 msg
933}
934
935fn connect_pg_client(config: &PostgresConfig, role: &str) -> Result<Client, String> {
936 let mut client_config = parse_pg_client_config(config, role)?;
937
938 client_config.connect_timeout(Duration::from_secs(PG_CONNECT_TIMEOUT_SECS));
940 client_config.keepalives(true);
941 client_config.keepalives_idle(Duration::from_secs(PG_KEEPALIVE_IDLE_SECS));
942 client_config.keepalives_interval(Duration::from_secs(PG_KEEPALIVE_INTERVAL_SECS));
943 client_config.keepalives_retries(PG_KEEPALIVE_RETRIES);
944
945 client_config
946 .connect(NoTls)
947 .map_err(|err| format_pg_connect_error(role, &config.url, &err))
948}
949
950fn parse_pg_client_config(config: &PostgresConfig, role: &str) -> Result<PgClientConfig, String> {
951 config.url.parse::<PgClientConfig>().map_err(|err| {
952 format!(
953 "postgres config parse failed ({role}, dsn={}): {err}",
954 redact_pg_url(&config.url)
955 )
956 })
957}