Skip to main content

myko_server/
postgres.rs

1//! PostgreSQL producer and consumer for the cell-based server.
2//!
3//! Architecture:
4//! - Producer persists `MEvent` rows into a durable table.
5//! - Consumer replays table rows (catch-up), then follows new inserts via LISTEN/NOTIFY.
6
7use std::{
8    sync::{
9        Arc,
10        atomic::{AtomicBool, Ordering},
11    },
12    time::Duration,
13};
14
15use ::postgres::{Client, Config as PgClientConfig, NoTls};
16use log::{debug, error, info, trace, warn};
17use myko::{
18    event::{MEvent, MEventType},
19    server::{HandlerRegistry, PersistError, PersistHealth, Persister},
20    store::StoreRegistry,
21};
22use postgres::fallible_iterator::FallibleIterator;
23use uuid::Uuid;
24
25const PG_CONNECT_TIMEOUT_SECS: u64 = 10;
26const PG_KEEPALIVE_IDLE_SECS: u64 = 30;
27const PG_KEEPALIVE_INTERVAL_SECS: u64 = 10;
28const PG_KEEPALIVE_RETRIES: u32 = 3;
29const PG_PRODUCER_MAX_BATCH: usize = 256;
30
31/// PostgreSQL configuration.
32#[derive(Debug, Clone)]
33pub struct PostgresConfig {
34    /// PostgreSQL connection URL.
35    pub url: String,
36    /// Events table name.
37    pub table: String,
38    /// LISTEN/NOTIFY channel name.
39    pub channel: String,
40}
41
42/// Persisted event row fetched from Postgres.
43#[derive(Debug, Clone)]
44pub struct PersistedEvent {
45    pub id: i64,
46    pub created_at: String,
47    pub event: MEvent,
48}
49
50impl PostgresConfig {
51    /// Create config from environment.
52    ///
53    /// - `MYKO_POSTGRES_URL` (required)
54    /// - `MYKO_POSTGRES_TABLE` (optional, default `myko_events`)
55    /// - `MYKO_POSTGRES_CHANNEL` (optional, default `myko_events_notify`)
56    pub fn from_env() -> Option<Self> {
57        let url = std::env::var("MYKO_POSTGRES_URL").ok()?;
58        let table = std::env::var("MYKO_POSTGRES_TABLE").unwrap_or_else(|_| "myko_events".into());
59        let channel =
60            std::env::var("MYKO_POSTGRES_CHANNEL").unwrap_or_else(|_| "myko_events_notify".into());
61        Some(Self {
62            url,
63            table,
64            channel,
65        })
66    }
67}
68
69/// Read API for durable event history (windback/replay providers).
70pub struct PostgresHistoryStore {
71    config: PostgresConfig,
72}
73
74impl PostgresHistoryStore {
75    /// Create a history store and ensure schema exists.
76    pub fn new(config: PostgresConfig) -> Result<Self, String> {
77        validate_ident(&config.table)?;
78        validate_ident(&config.channel)?;
79        Ok(Self { config })
80    }
81
82    /// Read events with `id > after_id`, ascending.
83    pub fn load_after_id(&self, after_id: i64, limit: i64) -> Result<Vec<PersistedEvent>, String> {
84        let mut client = connect_pg_client(&self.config, "history(load_after_id)")?;
85        let sql = format!(
86            "SELECT id, created_at::text, event::text FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
87            qi(&self.config.table)
88        );
89        let rows = client
90            .query(&sql, &[&after_id, &limit])
91            .map_err(|e| format!("history query failed: {e}"))?;
92        rows.into_iter()
93            .map(row_to_persisted_event)
94            .collect::<Result<Vec<_>, _>>()
95    }
96
97    /// Read events with `id > after_id` and `created_at <= until`, ascending.
98    pub fn load_until(
99        &self,
100        after_id: i64,
101        until: &str,
102        limit: i64,
103    ) -> Result<Vec<PersistedEvent>, String> {
104        let mut client = connect_pg_client(&self.config, "history(load_until)")?;
105        // NOTE(ts): Validate timestamp format to prevent SQL injection
106        if !until
107            .chars()
108            .all(|c| c.is_ascii_alphanumeric() || "-.:+TZ ".contains(c))
109        {
110            return Err(format!("Invalid timestamp format: {}", until));
111        }
112        let sql = format!(
113            "SELECT id, created_at::text, event::text FROM {} WHERE id > {} AND created_at <= '{}'::timestamptz ORDER BY id ASC LIMIT {}",
114            qi(&self.config.table),
115            after_id,
116            until,
117            limit
118        );
119        let rows = client
120            .query(&sql, &[])
121            .map_err(|e| format!("history query failed: {e}"))?;
122        rows.into_iter()
123            .map(row_to_persisted_event)
124            .collect::<Result<Vec<_>, _>>()
125    }
126
127    /// Read events in a time window, ascending.
128    pub fn load_between(
129        &self,
130        from_iso: &str,
131        to_iso: &str,
132        limit: i64,
133    ) -> Result<Vec<PersistedEvent>, String> {
134        let mut client = connect_pg_client(&self.config, "history(load_between)")?;
135        let sql = format!(
136            "SELECT id, created_at::text, event::text FROM {} WHERE created_at >= $1::timestamptz AND created_at <= $2::timestamptz ORDER BY id ASC LIMIT $3",
137            qi(&self.config.table)
138        );
139        let rows = client
140            .query(&sql, &[&from_iso, &to_iso, &limit])
141            .map_err(|e| format!("history query failed: {e}"))?;
142        rows.into_iter()
143            .map(row_to_persisted_event)
144            .collect::<Result<Vec<_>, _>>()
145    }
146}
147
148/// HistoryReplayProvider backed by PostgresHistoryStore.
149pub struct PostgresHistoryReplayProvider {
150    config: PostgresConfig,
151}
152
153impl PostgresHistoryReplayProvider {
154    pub fn new(config: PostgresConfig) -> Self {
155        Self { config }
156    }
157}
158
159impl myko::server::HistoryReplayProvider for PostgresHistoryReplayProvider {
160    fn replay_to_store(
161        &self,
162        until: &str,
163        handler_registry: &HandlerRegistry,
164    ) -> Result<Arc<StoreRegistry>, String> {
165        eprintln!(
166            "[HistoryReplay] loading snapshot as of {} from {}",
167            until, self.config.url
168        );
169
170        // NOTE(ts): Validate timestamp format to prevent SQL injection
171        if !until
172            .chars()
173            .all(|c| c.is_ascii_alphanumeric() || "-.:+TZ ".contains(c))
174        {
175            return Err(format!("Invalid timestamp format: {}", until));
176        }
177
178        let mut client = connect_pg_client(&self.config, "history_replay")?;
179        let table = qi(&self.config.table);
180        let registry = StoreRegistry::new();
181
182        // NOTE(ts): Use DISTINCT ON to get the latest event per entity as of the
183        // timestamp, same approach as the bootstrap snapshot but time-bounded.
184        // Only include entities whose latest event is a SET (not deleted).
185        let sql = format!(
186            "
187            WITH latest AS (
188                SELECT DISTINCT ON (item_type, item_id)
189                    id, change_type
190                FROM {table}
191                WHERE created_at <= '{until}'::timestamptz
192                ORDER BY item_type, item_id, id DESC
193            )
194            SELECT e.event::text
195            FROM latest
196            JOIN {table} e ON e.id = latest.id
197            WHERE latest.change_type = 'SET'
198            ORDER BY e.id ASC
199            "
200        );
201
202        let rows = client
203            .query(&sql, &[])
204            .map_err(|e| format!("history snapshot query failed: {e}"))?;
205
206        let mut count = 0usize;
207        for row in &rows {
208            let event_json: String = row.get(0);
209            match MEvent::from_str_trim(&event_json) {
210                Ok(event) => {
211                    if let Some(parse) = handler_registry.get_item_parser(&event.item_type)
212                        && let Ok(item) = parse(event.item.clone())
213                    {
214                        let store = registry.get_or_create(&event.item_type);
215                        store.insert(item.id(), item);
216                        count += 1;
217                    }
218                }
219                Err(err) => {
220                    eprintln!("[HistoryReplay] invalid event row: {}", err);
221                }
222            }
223        }
224
225        eprintln!(
226            "[HistoryReplay] loaded {} entities from {} rows as of {}",
227            count,
228            rows.len(),
229            until
230        );
231
232        Ok(Arc::new(registry))
233    }
234}
235
236type ProducerRequest = MEvent;
237
238/// Handle to the PostgreSQL producer.
239#[derive(Clone)]
240pub struct PostgresProducerHandle {
241    sender: flume::Sender<ProducerRequest>,
242    host_id: Uuid,
243    config: PostgresConfig,
244    health: Arc<PersistHealth>,
245}
246
247impl PostgresProducerHandle {
248    /// Persist an event. Enqueues to the background producer thread and
249    /// returns immediately. Returns Err only if the channel is full (backpressure).
250    pub fn produce(&self, mut event: MEvent) -> Result<(), PersistError> {
251        if event.source_id.is_none() {
252            event.source_id = Some(self.host_id.to_string());
253        }
254        let entity_type = event.item_type.clone();
255
256        match self.sender.send(event) {
257            Ok(()) => {
258                self.health.record_enqueue();
259                Ok(())
260            }
261            Err(_) => {
262                let msg = "Postgres producer thread not running".to_string();
263                self.health.record_dropped(msg.clone());
264                Err(PersistError {
265                    entity_type,
266                    message: msg,
267                })
268            }
269        }
270    }
271}
272
273impl Persister for PostgresProducerHandle {
274    fn persist(&self, event: MEvent) -> Result<(), PersistError> {
275        self.produce(event)
276    }
277
278    fn health(&self) -> Arc<PersistHealth> {
279        self.health.clone()
280    }
281
282    fn startup_healthcheck(&self) -> Result<(), String> {
283        validate_ident(&self.config.table)?;
284        validate_ident(&self.config.channel)?;
285        Ok(())
286    }
287}
288
289/// PostgreSQL producer — background thread with fail-fast error propagation.
290pub struct CellPostgresProducer {
291    handle: PostgresProducerHandle,
292}
293
294impl CellPostgresProducer {
295    /// Create a new PostgreSQL producer.
296    pub fn new(config: &PostgresConfig, host_id: Uuid) -> Result<Self, String> {
297        validate_ident(&config.table)?;
298        validate_ident(&config.channel)?;
299
300        let health = Arc::new(PersistHealth::default());
301        let (tx, rx) = flume::unbounded::<ProducerRequest>();
302        let cfg = config.clone();
303        let thread_health = health.clone();
304        std::thread::spawn(move || run_producer_loop(cfg, rx, thread_health));
305
306        Ok(Self {
307            handle: PostgresProducerHandle {
308                sender: tx,
309                host_id,
310                config: config.clone(),
311                health,
312            },
313        })
314    }
315
316    /// Get a shareable persister handle.
317    pub fn handle(&self) -> PostgresProducerHandle {
318        self.handle.clone()
319    }
320}
321
322fn run_producer_loop(
323    config: PostgresConfig,
324    rx: flume::Receiver<ProducerRequest>,
325    health: Arc<PersistHealth>,
326) {
327    let mut client: Option<Client> = None;
328    let mut retry_batch: Vec<MEvent> = Vec::new();
329
330    loop {
331        // NOTE(ts): Collect a batch — start with any retry events, then drain the channel.
332        let mut batch: Vec<MEvent> = Vec::new();
333        if !retry_batch.is_empty() {
334            std::mem::swap(&mut batch, &mut retry_batch);
335        } else {
336            match rx.recv() {
337                Ok(ev) => batch.push(ev),
338                Err(_) => break, // channel closed
339            }
340        }
341        // NOTE(ts): Drain additional ready events up to the batch limit.
342        while batch.len() < PG_PRODUCER_MAX_BATCH {
343            match rx.try_recv() {
344                Ok(ev) => batch.push(ev),
345                Err(_) => break,
346            }
347        }
348
349        if client.is_none() {
350            client = connect_producer_client(&config);
351        }
352
353        let batch_len = batch.len();
354        let pending = rx.len();
355        if pending > 1000 {
356            trace!(
357                "Postgres producer backlog: {} pending events in channel",
358                pending
359            );
360        }
361        if log::log_enabled!(log::Level::Debug) {
362            let mut counts: std::collections::BTreeMap<(&str, &str), usize> =
363                std::collections::BTreeMap::new();
364            for ev in &batch {
365                let kind = match ev.change_type {
366                    MEventType::SET => "SET",
367                    MEventType::DEL => "DEL",
368                };
369                *counts.entry((ev.item_type.as_str(), kind)).or_insert(0) += 1;
370            }
371            let summary: Vec<String> = counts
372                .iter()
373                .map(|((t, k), n)| format!("{}:{}={}", t, k, n))
374                .collect();
375            debug!(
376                "[pg-producer] batch_len={} pending_after={} kinds=[{}]",
377                batch_len,
378                pending,
379                summary.join(", ")
380            );
381        }
382        if let Some(c) = client.as_mut() {
383            match insert_event_batch(c, &config, &batch) {
384                Ok(()) => {
385                    health.record_success_batch(batch_len as u64);
386                }
387                Err(err) => {
388                    let msg =
389                        format_pg_error("insert_event_batch(producer)", Some(&config.url), &err);
390                    error!("{}", msg);
391                    health.record_error_no_dequeue(msg);
392                    client = None;
393                    retry_batch = batch;
394                    // NOTE(ts): Back off before retrying to avoid tight-looping on persistent failures.
395                    std::thread::sleep(Duration::from_secs(1));
396                }
397            }
398        } else {
399            let msg = format!(
400                "Postgres producer connection failed (url: {})",
401                redact_pg_url(&config.url),
402            );
403            error!("{}", msg);
404            health.record_error_no_dequeue(msg);
405            retry_batch = batch;
406            std::thread::sleep(Duration::from_secs(1));
407        }
408    }
409}
410
411fn connect_producer_client(config: &PostgresConfig) -> Option<Client> {
412    match connect_pg_client(config, "producer") {
413        Ok(mut c) => match ensure_schema(&mut c, config) {
414            Ok(()) => Some(c),
415            Err(err) => {
416                error!(
417                    "{}",
418                    format_pg_error("ensure_schema(producer)", Some(&config.url), &err)
419                );
420                None
421            }
422        },
423        Err(err) => {
424            error!("{err}");
425            None
426        }
427    }
428}
429
430fn insert_event_batch(
431    client: &mut Client,
432    config: &PostgresConfig,
433    events: &[MEvent],
434) -> Result<(), ::postgres::Error> {
435    if events.is_empty() {
436        return Ok(());
437    }
438
439    let table = qi(&config.table);
440
441    // NOTE(ts): Single event — skip transaction overhead.
442    if events.len() == 1 {
443        let event = &events[0];
444        let sql = format!(
445            "INSERT INTO {table} (item_type, item_id, change_type, created_at, tx, source_id, event) VALUES ($1, $2, $3, ($4::text)::timestamptz, $5, ($6::text), ($7::text)::jsonb)"
446        );
447        let item_id = event
448            .item
449            .get("id")
450            .and_then(|v| v.as_str())
451            .unwrap_or("unknown")
452            .to_string();
453        let event_json = serde_json::to_string(event).unwrap_or_else(|_| "{}".to_string());
454        let change_type = match event.change_type {
455            MEventType::SET => "SET",
456            MEventType::DEL => "DEL",
457        };
458        client.execute(
459            &sql,
460            &[
461                &event.item_type,
462                &item_id,
463                &change_type,
464                &event.created_at,
465                &event.tx,
466                &event.source_id,
467                &event_json,
468            ],
469        )?;
470        return Ok(());
471    }
472
473    // NOTE(ts): Build a multi-row INSERT for the batch within a single transaction.
474    let mut sql = format!(
475        "INSERT INTO {table} (item_type, item_id, change_type, created_at, tx, source_id, event) VALUES "
476    );
477    let mut params: Vec<Box<dyn postgres::types::ToSql + Sync>> =
478        Vec::with_capacity(events.len() * 7);
479    for (i, event) in events.iter().enumerate() {
480        if i > 0 {
481            sql.push_str(", ");
482        }
483        let base = i * 7;
484        sql.push_str(&format!(
485            "(${}, ${}, ${}, (${}::text)::timestamptz, ${}, (${}::text), (${}::text)::jsonb)",
486            base + 1,
487            base + 2,
488            base + 3,
489            base + 4,
490            base + 5,
491            base + 6,
492            base + 7
493        ));
494        let item_id = event
495            .item
496            .get("id")
497            .and_then(|v| v.as_str())
498            .unwrap_or("unknown")
499            .to_string();
500        let event_json = serde_json::to_string(event).unwrap_or_else(|_| "{}".to_string());
501        let change_type = match event.change_type {
502            MEventType::SET => "SET",
503            MEventType::DEL => "DEL",
504        };
505        params.push(Box::new(event.item_type.clone()));
506        params.push(Box::new(item_id));
507        params.push(Box::new(change_type.to_string()));
508        params.push(Box::new(event.created_at.clone()));
509        params.push(Box::new(event.tx.clone()));
510        params.push(Box::new(event.source_id.clone()));
511        params.push(Box::new(event_json));
512    }
513
514    let param_refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
515        params.iter().map(|p| p.as_ref()).collect();
516
517    let mut txn = client.transaction()?;
518    txn.execute(&sql, &param_refs)?;
519    txn.commit()?;
520
521    Ok(())
522}
523
524/// Shared status for startup catch-up.
525#[derive(Debug)]
526pub struct CatchUpStatus {
527    caught_up: AtomicBool,
528    failed: AtomicBool,
529    failure_reason: std::sync::RwLock<Option<String>>,
530}
531
532impl CatchUpStatus {
533    fn new() -> Self {
534        Self {
535            caught_up: AtomicBool::new(false),
536            failed: AtomicBool::new(false),
537            failure_reason: std::sync::RwLock::new(None),
538        }
539    }
540
541    /// Check if startup catch-up completed.
542    pub fn is_caught_up(&self) -> bool {
543        self.caught_up.load(Ordering::SeqCst)
544    }
545
546    /// Check if catch-up has failed.
547    pub fn is_failed(&self) -> bool {
548        self.failed.load(Ordering::SeqCst)
549    }
550
551    fn fail(&self, reason: impl Into<String>) {
552        let reason = reason.into();
553        *self.failure_reason.write().unwrap() = Some(reason.clone());
554        self.failed.store(true, Ordering::SeqCst);
555        self.caught_up.store(false, Ordering::SeqCst);
556    }
557
558    /// Block until caught up or timeout.
559    pub fn wait_until_caught_up(&self, timeout: Duration) -> Result<(), String> {
560        let start = std::time::Instant::now();
561        while !self.is_caught_up() {
562            if self.is_failed() {
563                return Err(self
564                    .failure_reason
565                    .read()
566                    .unwrap()
567                    .clone()
568                    .unwrap_or_else(|| "Postgres catch-up failed".to_string()));
569            }
570            if start.elapsed() >= timeout {
571                return Err(format!(
572                    "Postgres catch-up timed out after {}s",
573                    timeout.as_secs()
574                ));
575            }
576            std::thread::sleep(Duration::from_millis(50));
577        }
578        Ok(())
579    }
580}
581
582/// PostgreSQL consumer that replays + tails events.
583pub struct CellPostgresConsumer {
584    catch_up_status: Arc<CatchUpStatus>,
585    _handle: std::thread::JoinHandle<()>,
586}
587
588impl CellPostgresConsumer {
589    /// Start a PostgreSQL event consumer thread.
590    pub fn start(
591        config: &PostgresConfig,
592        host_id: Uuid,
593        handler_registry: Arc<HandlerRegistry>,
594        registry: Arc<StoreRegistry>,
595    ) -> Result<Self, String> {
596        validate_ident(&config.table)?;
597        validate_ident(&config.channel)?;
598
599        let catch_up_status = Arc::new(CatchUpStatus::new());
600        let status = catch_up_status.clone();
601        let cfg = config.clone();
602        let host_id_string = host_id.to_string();
603
604        let handle = std::thread::spawn(move || {
605            if let Err(err) = run_consumer_loop(
606                &cfg,
607                &host_id_string,
608                handler_registry,
609                registry,
610                status.clone(),
611            ) {
612                let reason = format!("Postgres consumer failed: {err}");
613                error!("{reason}");
614                status.fail(reason);
615            }
616        });
617
618        Ok(Self {
619            catch_up_status,
620            _handle: handle,
621        })
622    }
623
624    /// Check if startup catch-up is complete.
625    pub fn is_caught_up(&self) -> bool {
626        self.catch_up_status.is_caught_up()
627    }
628
629    /// Wait for startup catch-up with timeout.
630    pub fn wait_until_caught_up(&self, timeout: Duration) -> Result<(), String> {
631        self.catch_up_status.wait_until_caught_up(timeout)
632    }
633}
634
635fn run_consumer_loop(
636    config: &PostgresConfig,
637    host_id: &str,
638    handler_registry: Arc<HandlerRegistry>,
639    registry: Arc<StoreRegistry>,
640    status: Arc<CatchUpStatus>,
641) -> Result<(), String> {
642    let mut reader = connect_consumer_client("reader", config)?;
643    let mut listener = connect_listener_client(config)?;
644
645    info!(
646        "CellPostgresConsumer started (table={}, channel={})",
647        config.table, config.channel
648    );
649
650    let table = qi(&config.table);
651    let high_water_sql = format!("SELECT COALESCE(MAX(id), 0) FROM {table}");
652    let high_water_row = reader
653        .query_one(&high_water_sql, &[])
654        .map_err(|e| format_pg_error("query(high_water)", Some(&config.url), &e))?;
655    let high_water: i64 = high_water_row.get(0);
656    let snapshot_sql = format!(
657        "
658        WITH latest AS (
659            SELECT DISTINCT ON (item_type, item_id)
660                id, change_type
661            FROM {table}
662            WHERE id <= $1
663            ORDER BY item_type, item_id, id DESC
664        )
665        SELECT e.id, e.event::text
666        FROM latest
667        JOIN {table} e ON e.id = latest.id
668        ORDER BY e.id ASC
669        "
670    );
671    let snapshot_rows = reader
672        .query(&snapshot_sql, &[&high_water])
673        .map_err(|e| format_pg_error("query(snapshot latest events)", Some(&config.url), &e))?;
674    let snapshot_count = snapshot_rows.len();
675    for row in snapshot_rows {
676        let id: i64 = row.get(0);
677        let event_json: String = row.get(1);
678        match MEvent::from_str_trim(&event_json) {
679            Ok(event) => {
680                apply_remote_event(event, host_id, &handler_registry, &registry);
681            }
682            Err(err) => {
683                error!("Invalid postgres snapshot row id={id}: {err}");
684            }
685        }
686    }
687    info!(
688        "Postgres snapshot loaded latest state rows={} high_water={}",
689        snapshot_count, high_water
690    );
691
692    let fetch_sql =
693        format!("SELECT id, event::text FROM {table} WHERE id > $1 ORDER BY id ASC LIMIT $2");
694    let mut last_seen_id: i64 = high_water;
695    let mut initial_done = false;
696
697    loop {
698        let rows = match reader.query(&fetch_sql, &[&last_seen_id, &1000_i64]) {
699            Ok(rows) => rows,
700            Err(err) => {
701                warn!(
702                    "{}",
703                    format_pg_error("query(fetch events)", Some(&config.url), &err)
704                );
705                std::thread::sleep(Duration::from_millis(500));
706                reader = connect_consumer_client("reader", config)?;
707                continue;
708            }
709        };
710        if !rows.is_empty() {
711            for row in rows {
712                let id: i64 = row.get(0);
713                let event_json: String = row.get(1);
714                last_seen_id = id;
715
716                match MEvent::from_str_trim(&event_json) {
717                    Ok(event) => {
718                        apply_remote_event(event, host_id, &handler_registry, &registry);
719                    }
720                    Err(err) => {
721                        error!("Invalid postgres event row id={id}: {err}");
722                    }
723                }
724            }
725            continue;
726        }
727
728        if !initial_done {
729            initial_done = true;
730            status.caught_up.store(true, Ordering::SeqCst);
731            info!("Postgres consumer caught up at event_id={last_seen_id}");
732        }
733
734        let mut notified = false;
735        let mut reconnect_listener = false;
736        {
737            let mut notifications = listener.notifications();
738            let mut iter = notifications.timeout_iter(Duration::from_millis(500));
739            match iter.next() {
740                Ok(Some(_n)) => {
741                    notified = true;
742                }
743                Ok(None) => {}
744                Err(err) => {
745                    warn!("Postgres LISTEN error: {err}; reconnecting listener");
746                    reconnect_listener = true;
747                }
748            }
749        }
750        if reconnect_listener {
751            std::thread::sleep(Duration::from_millis(500));
752            listener = connect_listener_client(config)?;
753            // Listener dropped; run an immediate catch-up query pass before waiting again.
754            continue;
755        }
756
757        if !notified {
758            trace!("Postgres consumer poll tick (no notification)");
759        }
760    }
761}
762
763fn connect_consumer_client(role: &str, config: &PostgresConfig) -> Result<Client, String> {
764    let mut backoff_ms = 250u64;
765    loop {
766        match connect_pg_client(config, role) {
767            Ok(mut client) => {
768                if let Err(err) = ensure_schema(&mut client, config) {
769                    warn!(
770                        "{}",
771                        format_pg_error(&format!("ensure_schema({role})"), Some(&config.url), &err)
772                    );
773                }
774                return Ok(client);
775            }
776            Err(err) => {
777                warn!("{err}");
778                std::thread::sleep(Duration::from_millis(backoff_ms));
779                backoff_ms = (backoff_ms * 2).min(5_000);
780            }
781        }
782    }
783}
784
785fn connect_listener_client(config: &PostgresConfig) -> Result<Client, String> {
786    let mut backoff_ms = 250u64;
787    loop {
788        let mut client = connect_consumer_client("listener", config)?;
789        match client.batch_execute(&format!("LISTEN {};", qi(&config.channel))) {
790            Ok(()) => return Ok(client),
791            Err(err) => {
792                warn!(
793                    "{}",
794                    format_pg_error("LISTEN(register)", Some(&config.url), &err)
795                );
796                std::thread::sleep(Duration::from_millis(backoff_ms));
797                backoff_ms = (backoff_ms * 2).min(5_000);
798            }
799        }
800    }
801}
802
803fn apply_remote_event(
804    event: MEvent,
805    host_id: &str,
806    handler_registry: &Arc<HandlerRegistry>,
807    registry: &Arc<StoreRegistry>,
808) {
809    let is_my_event = event.source_id.as_ref().is_some_and(|id| id == host_id);
810    if is_my_event {
811        return;
812    }
813
814    match event.change_type {
815        MEventType::SET => {
816            if let Some(parse) = handler_registry.get_item_parser(&event.item_type) {
817                match parse(event.item.clone()) {
818                    Ok(item) => {
819                        let store = registry.get_or_create(item.entity_type());
820                        store.insert(item.id(), item);
821                    }
822                    Err(e) => {
823                        let msg = e.to_string();
824                        let short = msg
825                            .find(", expected one of")
826                            .map(|pos| msg[..pos].to_string())
827                            .unwrap_or(msg);
828                        error!("Failed to parse {}: {short}", event.item_type);
829                    }
830                }
831            } else {
832                warn!("No parser for entity type: {}", event.item_type);
833            }
834        }
835        MEventType::DEL => {
836            if let Some(id) = event.item.get("id").and_then(|v| v.as_str()) {
837                let store = registry.get_or_create(&event.item_type);
838                store.remove(&id.into());
839            } else {
840                error!("DEL event missing id field: {:?}", event.item);
841            }
842        }
843    }
844}
845
846fn ensure_schema(client: &mut Client, config: &PostgresConfig) -> Result<(), ::postgres::Error> {
847    let table = qi(&config.table);
848    let idx_tx = qi(&format!("{}_tx_idx", config.table));
849    let idx_item = qi(&format!("{}_item_type_item_id_idx", config.table));
850    let idx_item_latest = qi(&format!("{}_item_latest_idx", config.table));
851    let idx_created = qi(&format!("{}_created_at_idx", config.table));
852    let trigger_fn = qi(&format!("{}_notify_insert_fn", config.table));
853    let trigger_name = qi(&format!("{}_notify_insert_trigger", config.table));
854
855    client.batch_execute(&format!(
856        "
857        CREATE TABLE IF NOT EXISTS {table} (
858            id BIGSERIAL PRIMARY KEY,
859            item_type TEXT NOT NULL,
860            item_id TEXT NOT NULL,
861            change_type TEXT NOT NULL,
862            created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
863            tx TEXT NOT NULL,
864            source_id TEXT,
865            event JSONB NOT NULL
866        );
867        CREATE INDEX IF NOT EXISTS {idx_tx} ON {table} (tx);
868        CREATE INDEX IF NOT EXISTS {idx_item} ON {table} (item_type, item_id);
869        CREATE INDEX IF NOT EXISTS {idx_item_latest} ON {table} (item_type, item_id, id DESC);
870        CREATE INDEX IF NOT EXISTS {idx_created} ON {table} (created_at);
871        CREATE OR REPLACE FUNCTION {trigger_fn}() RETURNS trigger AS $$
872        BEGIN
873            PERFORM pg_notify('{channel}', NEW.id::text);
874            RETURN NEW;
875        END;
876        $$ LANGUAGE plpgsql;
877        DROP TRIGGER IF EXISTS {trigger_name} ON {table};
878        CREATE TRIGGER {trigger_name}
879            AFTER INSERT ON {table}
880            FOR EACH ROW
881            EXECUTE FUNCTION {trigger_fn}();
882        ",
883        channel = config.channel
884    ))?;
885
886    Ok(())
887}
888
889fn validate_ident(name: &str) -> Result<(), String> {
890    if name.is_empty() {
891        return Err("identifier cannot be empty".to_string());
892    }
893    let mut chars = name.chars();
894    let first = chars
895        .next()
896        .ok_or_else(|| "identifier cannot be empty".to_string())?;
897    if !(first == '_' || first.is_ascii_alphabetic()) {
898        return Err(format!(
899            "invalid identifier `{name}`: must start with letter or underscore"
900        ));
901    }
902    if !chars.all(|c| c == '_' || c.is_ascii_alphanumeric()) {
903        return Err(format!(
904            "invalid identifier `{name}`: only letters, numbers, underscore are allowed"
905        ));
906    }
907    Ok(())
908}
909
910fn qi(name: &str) -> String {
911    format!("\"{name}\"")
912}
913
914fn row_to_persisted_event(row: ::postgres::Row) -> Result<PersistedEvent, String> {
915    let id: i64 = row.get(0);
916    let created_at: String = row.get(1);
917    let event_json: String = row.get(2);
918    let event = MEvent::from_str_trim(&event_json)
919        .map_err(|e| format!("invalid history event payload for id={id}: {e}"))?;
920    Ok(PersistedEvent {
921        id,
922        created_at,
923        event,
924    })
925}
926
927fn redact_pg_url(url: &str) -> String {
928    match url.rfind('@') {
929        Some(at) => {
930            let after_scheme = url.find("://").map(|idx| idx + 3).unwrap_or(0);
931            format!("{}***{}", &url[..after_scheme], &url[at..])
932        }
933        None => url.to_string(),
934    }
935}
936
937fn format_pg_connect_error(role: &str, url: &str, err: &postgres::Error) -> String {
938    format_pg_error(role, Some(url), err)
939}
940
941fn format_pg_error(role: &str, url: Option<&str>, err: &postgres::Error) -> String {
942    let mut msg = match url {
943        Some(url) => format!("{role} failed (dsn={}): {}", redact_pg_url(url), err),
944        None => format!("{role} failed: {err}"),
945    };
946    if let Some(db) = err.as_db_error() {
947        msg.push_str(&format!(
948            " [code={} severity={} message={}]",
949            db.code().code(),
950            db.severity(),
951            db.message()
952        ));
953        if let Some(detail) = db.detail() {
954            msg.push_str(&format!(" [detail={}]", detail));
955        }
956        if let Some(hint) = db.hint() {
957            msg.push_str(&format!(" [hint={}]", hint));
958        }
959    }
960    msg
961}
962
963fn connect_pg_client(config: &PostgresConfig, role: &str) -> Result<Client, String> {
964    let mut client_config = parse_pg_client_config(config, role)?;
965
966    // Avoid default 2h keepalive-idle so long-lived idle sockets are detected quickly.
967    client_config.connect_timeout(Duration::from_secs(PG_CONNECT_TIMEOUT_SECS));
968    client_config.keepalives(true);
969    client_config.keepalives_idle(Duration::from_secs(PG_KEEPALIVE_IDLE_SECS));
970    client_config.keepalives_interval(Duration::from_secs(PG_KEEPALIVE_INTERVAL_SECS));
971    client_config.keepalives_retries(PG_KEEPALIVE_RETRIES);
972
973    client_config
974        .connect(NoTls)
975        .map_err(|err| format_pg_connect_error(role, &config.url, &err))
976}
977
978fn parse_pg_client_config(config: &PostgresConfig, role: &str) -> Result<PgClientConfig, String> {
979    config.url.parse::<PgClientConfig>().map_err(|err| {
980        format!(
981            "postgres config parse failed ({role}, dsn={}): {err}",
982            redact_pg_url(&config.url)
983        )
984    })
985}