Skip to main content

myko_server/
postgres.rs

1//! PostgreSQL producer and consumer for the cell-based server.
2//!
3//! Architecture:
4//! - Producer persists `MEvent` rows into a durable table.
5//! - Consumer replays table rows (catch-up), then follows new inserts via LISTEN/NOTIFY.
6
7use std::{
8    sync::{
9        Arc,
10        atomic::{AtomicBool, Ordering},
11    },
12    time::Duration,
13};
14
15use ::postgres::{Client, Config as PgClientConfig, NoTls};
16use log::{error, info, trace, warn};
17use myko::{
18    event::{MEvent, MEventType},
19    server::{HandlerRegistry, PersistError, PersistHealth, Persister},
20    store::StoreRegistry,
21};
22use postgres::fallible_iterator::FallibleIterator;
23use uuid::Uuid;
24
25const PG_CONNECT_TIMEOUT_SECS: u64 = 10;
26const PG_KEEPALIVE_IDLE_SECS: u64 = 30;
27const PG_KEEPALIVE_INTERVAL_SECS: u64 = 10;
28const PG_KEEPALIVE_RETRIES: u32 = 3;
29const PG_PRODUCER_MAX_BATCH: usize = 256;
30
31/// PostgreSQL configuration.
32#[derive(Debug, Clone)]
33pub struct PostgresConfig {
34    /// PostgreSQL connection URL.
35    pub url: String,
36    /// Events table name.
37    pub table: String,
38    /// LISTEN/NOTIFY channel name.
39    pub channel: String,
40}
41
42/// Persisted event row fetched from Postgres.
43#[derive(Debug, Clone)]
44pub struct PersistedEvent {
45    pub id: i64,
46    pub created_at: String,
47    pub event: MEvent,
48}
49
50impl PostgresConfig {
51    /// Create config from environment.
52    ///
53    /// - `MYKO_POSTGRES_URL` (required)
54    /// - `MYKO_POSTGRES_TABLE` (optional, default `myko_events`)
55    /// - `MYKO_POSTGRES_CHANNEL` (optional, default `myko_events_notify`)
56    pub fn from_env() -> Option<Self> {
57        let url = std::env::var("MYKO_POSTGRES_URL").ok()?;
58        let table = std::env::var("MYKO_POSTGRES_TABLE").unwrap_or_else(|_| "myko_events".into());
59        let channel =
60            std::env::var("MYKO_POSTGRES_CHANNEL").unwrap_or_else(|_| "myko_events_notify".into());
61        Some(Self {
62            url,
63            table,
64            channel,
65        })
66    }
67}
68
69/// Read API for durable event history (windback/replay providers).
70pub struct PostgresHistoryStore {
71    config: PostgresConfig,
72}
73
74impl PostgresHistoryStore {
75    /// Create a history store and ensure schema exists.
76    pub fn new(config: PostgresConfig) -> Result<Self, String> {
77        validate_ident(&config.table)?;
78        validate_ident(&config.channel)?;
79        Ok(Self { config })
80    }
81
82    /// Read events with `id > after_id`, ascending.
83    pub fn load_after_id(&self, after_id: i64, limit: i64) -> Result<Vec<PersistedEvent>, String> {
84        let mut client = connect_pg_client(&self.config, "history(load_after_id)")?;
85        let sql = format!(
86            "SELECT id, created_at::text, event::text FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
87            qi(&self.config.table)
88        );
89        let rows = client
90            .query(&sql, &[&after_id, &limit])
91            .map_err(|e| format!("history query failed: {e}"))?;
92        rows.into_iter()
93            .map(row_to_persisted_event)
94            .collect::<Result<Vec<_>, _>>()
95    }
96
97    /// Read events with `id > after_id` and `created_at <= until`, ascending.
98    pub fn load_until(
99        &self,
100        after_id: i64,
101        until: &str,
102        limit: i64,
103    ) -> Result<Vec<PersistedEvent>, String> {
104        let mut client = connect_pg_client(&self.config, "history(load_until)")?;
105        // NOTE(ts): Validate timestamp format to prevent SQL injection
106        if !until
107            .chars()
108            .all(|c| c.is_ascii_alphanumeric() || "-.:+TZ ".contains(c))
109        {
110            return Err(format!("Invalid timestamp format: {}", until));
111        }
112        let sql = format!(
113            "SELECT id, created_at::text, event::text FROM {} WHERE id > {} AND created_at <= '{}'::timestamptz ORDER BY id ASC LIMIT {}",
114            qi(&self.config.table),
115            after_id,
116            until,
117            limit
118        );
119        let rows = client
120            .query(&sql, &[])
121            .map_err(|e| format!("history query failed: {e}"))?;
122        rows.into_iter()
123            .map(row_to_persisted_event)
124            .collect::<Result<Vec<_>, _>>()
125    }
126
127    /// Read events in a time window, ascending.
128    pub fn load_between(
129        &self,
130        from_iso: &str,
131        to_iso: &str,
132        limit: i64,
133    ) -> Result<Vec<PersistedEvent>, String> {
134        let mut client = connect_pg_client(&self.config, "history(load_between)")?;
135        let sql = format!(
136            "SELECT id, created_at::text, event::text FROM {} WHERE created_at >= $1::timestamptz AND created_at <= $2::timestamptz ORDER BY id ASC LIMIT $3",
137            qi(&self.config.table)
138        );
139        let rows = client
140            .query(&sql, &[&from_iso, &to_iso, &limit])
141            .map_err(|e| format!("history query failed: {e}"))?;
142        rows.into_iter()
143            .map(row_to_persisted_event)
144            .collect::<Result<Vec<_>, _>>()
145    }
146}
147
148/// HistoryReplayProvider backed by PostgresHistoryStore.
149pub struct PostgresHistoryReplayProvider {
150    config: PostgresConfig,
151}
152
153impl PostgresHistoryReplayProvider {
154    pub fn new(config: PostgresConfig) -> Self {
155        Self { config }
156    }
157}
158
159impl myko::server::HistoryReplayProvider for PostgresHistoryReplayProvider {
160    fn replay_to_store(
161        &self,
162        until: &str,
163        handler_registry: &HandlerRegistry,
164    ) -> Result<Arc<StoreRegistry>, String> {
165        eprintln!(
166            "[HistoryReplay] loading snapshot as of {} from {}",
167            until, self.config.url
168        );
169
170        // NOTE(ts): Validate timestamp format to prevent SQL injection
171        if !until
172            .chars()
173            .all(|c| c.is_ascii_alphanumeric() || "-.:+TZ ".contains(c))
174        {
175            return Err(format!("Invalid timestamp format: {}", until));
176        }
177
178        let mut client = connect_pg_client(&self.config, "history_replay")?;
179        let table = qi(&self.config.table);
180        let registry = StoreRegistry::new();
181
182        // NOTE(ts): Use DISTINCT ON to get the latest event per entity as of the
183        // timestamp, same approach as the bootstrap snapshot but time-bounded.
184        // Only include entities whose latest event is a SET (not deleted).
185        let sql = format!(
186            "
187            WITH latest AS (
188                SELECT DISTINCT ON (item_type, item_id)
189                    id, change_type
190                FROM {table}
191                WHERE created_at <= '{until}'::timestamptz
192                ORDER BY item_type, item_id, id DESC
193            )
194            SELECT e.event::text
195            FROM latest
196            JOIN {table} e ON e.id = latest.id
197            WHERE latest.change_type = 'SET'
198            ORDER BY e.id ASC
199            "
200        );
201
202        let rows = client
203            .query(&sql, &[])
204            .map_err(|e| format!("history snapshot query failed: {e}"))?;
205
206        let mut count = 0usize;
207        for row in &rows {
208            let event_json: String = row.get(0);
209            match MEvent::from_str_trim(&event_json) {
210                Ok(event) => {
211                    if let Some(parse) = handler_registry.get_item_parser(&event.item_type)
212                        && let Ok(item) = parse(event.item.clone())
213                    {
214                        let store = registry.get_or_create(&event.item_type);
215                        store.insert(item.id(), item);
216                        count += 1;
217                    }
218                }
219                Err(err) => {
220                    eprintln!("[HistoryReplay] invalid event row: {}", err);
221                }
222            }
223        }
224
225        eprintln!(
226            "[HistoryReplay] loaded {} entities from {} rows as of {}",
227            count,
228            rows.len(),
229            until
230        );
231
232        Ok(Arc::new(registry))
233    }
234}
235
236type ProducerRequest = MEvent;
237
238/// Handle to the PostgreSQL producer.
239#[derive(Clone)]
240pub struct PostgresProducerHandle {
241    sender: flume::Sender<ProducerRequest>,
242    host_id: Uuid,
243    config: PostgresConfig,
244    health: Arc<PersistHealth>,
245}
246
247impl PostgresProducerHandle {
248    /// Persist an event. Enqueues to the background producer thread and
249    /// returns immediately. Returns Err only if the channel is full (backpressure).
250    pub fn produce(&self, mut event: MEvent) -> Result<(), PersistError> {
251        if event.source_id.is_none() {
252            event.source_id = Some(self.host_id.to_string());
253        }
254        let entity_type = event.item_type.clone();
255
256        match self.sender.send(event) {
257            Ok(()) => {
258                self.health.record_enqueue();
259                Ok(())
260            }
261            Err(_) => {
262                let msg = "Postgres producer thread not running".to_string();
263                self.health.record_dropped(msg.clone());
264                Err(PersistError {
265                    entity_type,
266                    message: msg,
267                })
268            }
269        }
270    }
271}
272
273impl Persister for PostgresProducerHandle {
274    fn persist(&self, event: MEvent) -> Result<(), PersistError> {
275        self.produce(event)
276    }
277
278    fn health(&self) -> Arc<PersistHealth> {
279        self.health.clone()
280    }
281
282    fn startup_healthcheck(&self) -> Result<(), String> {
283        validate_ident(&self.config.table)?;
284        validate_ident(&self.config.channel)?;
285        Ok(())
286    }
287}
288
289/// PostgreSQL producer — background thread with fail-fast error propagation.
290pub struct CellPostgresProducer {
291    handle: PostgresProducerHandle,
292}
293
294impl CellPostgresProducer {
295    /// Create a new PostgreSQL producer.
296    pub fn new(config: &PostgresConfig, host_id: Uuid) -> Result<Self, String> {
297        validate_ident(&config.table)?;
298        validate_ident(&config.channel)?;
299
300        let health = Arc::new(PersistHealth::default());
301        let (tx, rx) = flume::unbounded::<ProducerRequest>();
302        let cfg = config.clone();
303        let thread_health = health.clone();
304        std::thread::spawn(move || run_producer_loop(cfg, rx, thread_health));
305
306        Ok(Self {
307            handle: PostgresProducerHandle {
308                sender: tx,
309                host_id,
310                config: config.clone(),
311                health,
312            },
313        })
314    }
315
316    /// Get a shareable persister handle.
317    pub fn handle(&self) -> PostgresProducerHandle {
318        self.handle.clone()
319    }
320}
321
322fn run_producer_loop(
323    config: PostgresConfig,
324    rx: flume::Receiver<ProducerRequest>,
325    health: Arc<PersistHealth>,
326) {
327    let mut client: Option<Client> = None;
328    let mut retry_batch: Vec<MEvent> = Vec::new();
329
330    loop {
331        // NOTE(ts): Collect a batch — start with any retry events, then drain the channel.
332        let mut batch: Vec<MEvent> = Vec::new();
333        if !retry_batch.is_empty() {
334            std::mem::swap(&mut batch, &mut retry_batch);
335        } else {
336            match rx.recv() {
337                Ok(ev) => batch.push(ev),
338                Err(_) => break, // channel closed
339            }
340        }
341        // NOTE(ts): Drain additional ready events up to the batch limit.
342        while batch.len() < PG_PRODUCER_MAX_BATCH {
343            match rx.try_recv() {
344                Ok(ev) => batch.push(ev),
345                Err(_) => break,
346            }
347        }
348
349        if client.is_none() {
350            client = connect_producer_client(&config);
351        }
352
353        let batch_len = batch.len();
354        if let Some(c) = client.as_mut() {
355            match insert_event_batch(c, &config, &batch) {
356                Ok(()) => {
357                    health.record_success_batch(batch_len as u64);
358                }
359                Err(err) => {
360                    let msg =
361                        format_pg_error("insert_event_batch(producer)", Some(&config.url), &err);
362                    error!("{}", msg);
363                    health.record_error_no_dequeue(msg);
364                    client = None;
365                    retry_batch = batch;
366                    // NOTE(ts): Back off before retrying to avoid tight-looping on persistent failures.
367                    std::thread::sleep(Duration::from_secs(1));
368                }
369            }
370        } else {
371            let msg = format!(
372                "Postgres producer connection failed (url: {})",
373                redact_pg_url(&config.url),
374            );
375            error!("{}", msg);
376            health.record_error_no_dequeue(msg);
377            retry_batch = batch;
378            std::thread::sleep(Duration::from_secs(1));
379        }
380    }
381}
382
383fn connect_producer_client(config: &PostgresConfig) -> Option<Client> {
384    match connect_pg_client(config, "producer") {
385        Ok(mut c) => match ensure_schema(&mut c, config) {
386            Ok(()) => Some(c),
387            Err(err) => {
388                error!(
389                    "{}",
390                    format_pg_error("ensure_schema(producer)", Some(&config.url), &err)
391                );
392                None
393            }
394        },
395        Err(err) => {
396            error!("{err}");
397            None
398        }
399    }
400}
401
402fn insert_event_batch(
403    client: &mut Client,
404    config: &PostgresConfig,
405    events: &[MEvent],
406) -> Result<(), ::postgres::Error> {
407    if events.is_empty() {
408        return Ok(());
409    }
410
411    let table = qi(&config.table);
412
413    // NOTE(ts): Single event — skip transaction overhead.
414    if events.len() == 1 {
415        let event = &events[0];
416        let sql = format!(
417            "INSERT INTO {table} (item_type, item_id, change_type, created_at, tx, source_id, event) VALUES ($1, $2, $3, ($4::text)::timestamptz, $5, ($6::text), ($7::text)::jsonb)"
418        );
419        let item_id = event
420            .item
421            .get("id")
422            .and_then(|v| v.as_str())
423            .unwrap_or("unknown")
424            .to_string();
425        let event_json = serde_json::to_string(event).unwrap_or_else(|_| "{}".to_string());
426        let change_type = match event.change_type {
427            MEventType::SET => "SET",
428            MEventType::DEL => "DEL",
429        };
430        client.execute(
431            &sql,
432            &[
433                &event.item_type,
434                &item_id,
435                &change_type,
436                &event.created_at,
437                &event.tx,
438                &event.source_id,
439                &event_json,
440            ],
441        )?;
442        return Ok(());
443    }
444
445    // NOTE(ts): Build a multi-row INSERT for the batch within a single transaction.
446    let mut sql = format!(
447        "INSERT INTO {table} (item_type, item_id, change_type, created_at, tx, source_id, event) VALUES "
448    );
449    let mut params: Vec<Box<dyn postgres::types::ToSql + Sync>> =
450        Vec::with_capacity(events.len() * 7);
451    for (i, event) in events.iter().enumerate() {
452        if i > 0 {
453            sql.push_str(", ");
454        }
455        let base = i * 7;
456        sql.push_str(&format!(
457            "(${}, ${}, ${}, (${}::text)::timestamptz, ${}, (${}::text), (${}::text)::jsonb)",
458            base + 1,
459            base + 2,
460            base + 3,
461            base + 4,
462            base + 5,
463            base + 6,
464            base + 7
465        ));
466        let item_id = event
467            .item
468            .get("id")
469            .and_then(|v| v.as_str())
470            .unwrap_or("unknown")
471            .to_string();
472        let event_json = serde_json::to_string(event).unwrap_or_else(|_| "{}".to_string());
473        let change_type = match event.change_type {
474            MEventType::SET => "SET",
475            MEventType::DEL => "DEL",
476        };
477        params.push(Box::new(event.item_type.clone()));
478        params.push(Box::new(item_id));
479        params.push(Box::new(change_type.to_string()));
480        params.push(Box::new(event.created_at.clone()));
481        params.push(Box::new(event.tx.clone()));
482        params.push(Box::new(event.source_id.clone()));
483        params.push(Box::new(event_json));
484    }
485
486    let param_refs: Vec<&(dyn postgres::types::ToSql + Sync)> =
487        params.iter().map(|p| p.as_ref()).collect();
488
489    let mut txn = client.transaction()?;
490    txn.execute(&sql, &param_refs)?;
491    txn.commit()?;
492
493    Ok(())
494}
495
496/// Shared status for startup catch-up.
497#[derive(Debug)]
498pub struct CatchUpStatus {
499    caught_up: AtomicBool,
500    failed: AtomicBool,
501    failure_reason: std::sync::RwLock<Option<String>>,
502}
503
504impl CatchUpStatus {
505    fn new() -> Self {
506        Self {
507            caught_up: AtomicBool::new(false),
508            failed: AtomicBool::new(false),
509            failure_reason: std::sync::RwLock::new(None),
510        }
511    }
512
513    /// Check if startup catch-up completed.
514    pub fn is_caught_up(&self) -> bool {
515        self.caught_up.load(Ordering::SeqCst)
516    }
517
518    /// Check if catch-up has failed.
519    pub fn is_failed(&self) -> bool {
520        self.failed.load(Ordering::SeqCst)
521    }
522
523    fn fail(&self, reason: impl Into<String>) {
524        let reason = reason.into();
525        *self.failure_reason.write().unwrap() = Some(reason.clone());
526        self.failed.store(true, Ordering::SeqCst);
527        self.caught_up.store(false, Ordering::SeqCst);
528    }
529
530    /// Block until caught up or timeout.
531    pub fn wait_until_caught_up(&self, timeout: Duration) -> Result<(), String> {
532        let start = std::time::Instant::now();
533        while !self.is_caught_up() {
534            if self.is_failed() {
535                return Err(self
536                    .failure_reason
537                    .read()
538                    .unwrap()
539                    .clone()
540                    .unwrap_or_else(|| "Postgres catch-up failed".to_string()));
541            }
542            if start.elapsed() >= timeout {
543                return Err(format!(
544                    "Postgres catch-up timed out after {}s",
545                    timeout.as_secs()
546                ));
547            }
548            std::thread::sleep(Duration::from_millis(50));
549        }
550        Ok(())
551    }
552}
553
554/// PostgreSQL consumer that replays + tails events.
555pub struct CellPostgresConsumer {
556    catch_up_status: Arc<CatchUpStatus>,
557    _handle: std::thread::JoinHandle<()>,
558}
559
560impl CellPostgresConsumer {
561    /// Start a PostgreSQL event consumer thread.
562    pub fn start(
563        config: &PostgresConfig,
564        host_id: Uuid,
565        handler_registry: Arc<HandlerRegistry>,
566        registry: Arc<StoreRegistry>,
567    ) -> Result<Self, String> {
568        validate_ident(&config.table)?;
569        validate_ident(&config.channel)?;
570
571        let catch_up_status = Arc::new(CatchUpStatus::new());
572        let status = catch_up_status.clone();
573        let cfg = config.clone();
574        let host_id_string = host_id.to_string();
575
576        let handle = std::thread::spawn(move || {
577            if let Err(err) = run_consumer_loop(
578                &cfg,
579                &host_id_string,
580                handler_registry,
581                registry,
582                status.clone(),
583            ) {
584                let reason = format!("Postgres consumer failed: {err}");
585                error!("{reason}");
586                status.fail(reason);
587            }
588        });
589
590        Ok(Self {
591            catch_up_status,
592            _handle: handle,
593        })
594    }
595
596    /// Check if startup catch-up is complete.
597    pub fn is_caught_up(&self) -> bool {
598        self.catch_up_status.is_caught_up()
599    }
600
601    /// Wait for startup catch-up with timeout.
602    pub fn wait_until_caught_up(&self, timeout: Duration) -> Result<(), String> {
603        self.catch_up_status.wait_until_caught_up(timeout)
604    }
605}
606
607fn run_consumer_loop(
608    config: &PostgresConfig,
609    host_id: &str,
610    handler_registry: Arc<HandlerRegistry>,
611    registry: Arc<StoreRegistry>,
612    status: Arc<CatchUpStatus>,
613) -> Result<(), String> {
614    let mut reader = connect_consumer_client("reader", config)?;
615    let mut listener = connect_listener_client(config)?;
616
617    info!(
618        "CellPostgresConsumer started (table={}, channel={})",
619        config.table, config.channel
620    );
621
622    let table = qi(&config.table);
623    let high_water_sql = format!("SELECT COALESCE(MAX(id), 0) FROM {table}");
624    let high_water_row = reader
625        .query_one(&high_water_sql, &[])
626        .map_err(|e| format_pg_error("query(high_water)", Some(&config.url), &e))?;
627    let high_water: i64 = high_water_row.get(0);
628    let snapshot_sql = format!(
629        "
630        WITH latest AS (
631            SELECT DISTINCT ON (item_type, item_id)
632                id, change_type
633            FROM {table}
634            WHERE id <= $1
635            ORDER BY item_type, item_id, id DESC
636        )
637        SELECT e.id, e.event::text
638        FROM latest
639        JOIN {table} e ON e.id = latest.id
640        ORDER BY e.id ASC
641        "
642    );
643    let snapshot_rows = reader
644        .query(&snapshot_sql, &[&high_water])
645        .map_err(|e| format_pg_error("query(snapshot latest events)", Some(&config.url), &e))?;
646    let snapshot_count = snapshot_rows.len();
647    for row in snapshot_rows {
648        let id: i64 = row.get(0);
649        let event_json: String = row.get(1);
650        match MEvent::from_str_trim(&event_json) {
651            Ok(event) => {
652                apply_remote_event(event, host_id, &handler_registry, &registry);
653            }
654            Err(err) => {
655                error!("Invalid postgres snapshot row id={id}: {err}");
656            }
657        }
658    }
659    info!(
660        "Postgres snapshot loaded latest state rows={} high_water={}",
661        snapshot_count, high_water
662    );
663
664    let fetch_sql =
665        format!("SELECT id, event::text FROM {table} WHERE id > $1 ORDER BY id ASC LIMIT $2");
666    let mut last_seen_id: i64 = high_water;
667    let mut initial_done = false;
668
669    loop {
670        let rows = match reader.query(&fetch_sql, &[&last_seen_id, &1000_i64]) {
671            Ok(rows) => rows,
672            Err(err) => {
673                warn!(
674                    "{}",
675                    format_pg_error("query(fetch events)", Some(&config.url), &err)
676                );
677                std::thread::sleep(Duration::from_millis(500));
678                reader = connect_consumer_client("reader", config)?;
679                continue;
680            }
681        };
682        if !rows.is_empty() {
683            for row in rows {
684                let id: i64 = row.get(0);
685                let event_json: String = row.get(1);
686                last_seen_id = id;
687
688                match MEvent::from_str_trim(&event_json) {
689                    Ok(event) => {
690                        apply_remote_event(event, host_id, &handler_registry, &registry);
691                    }
692                    Err(err) => {
693                        error!("Invalid postgres event row id={id}: {err}");
694                    }
695                }
696            }
697            continue;
698        }
699
700        if !initial_done {
701            initial_done = true;
702            status.caught_up.store(true, Ordering::SeqCst);
703            info!("Postgres consumer caught up at event_id={last_seen_id}");
704        }
705
706        let mut notified = false;
707        let mut reconnect_listener = false;
708        {
709            let mut notifications = listener.notifications();
710            let mut iter = notifications.timeout_iter(Duration::from_millis(500));
711            match iter.next() {
712                Ok(Some(_n)) => {
713                    notified = true;
714                }
715                Ok(None) => {}
716                Err(err) => {
717                    warn!("Postgres LISTEN error: {err}; reconnecting listener");
718                    reconnect_listener = true;
719                }
720            }
721        }
722        if reconnect_listener {
723            std::thread::sleep(Duration::from_millis(500));
724            listener = connect_listener_client(config)?;
725            // Listener dropped; run an immediate catch-up query pass before waiting again.
726            continue;
727        }
728
729        if !notified {
730            trace!("Postgres consumer poll tick (no notification)");
731        }
732    }
733}
734
735fn connect_consumer_client(role: &str, config: &PostgresConfig) -> Result<Client, String> {
736    let mut backoff_ms = 250u64;
737    loop {
738        match connect_pg_client(config, role) {
739            Ok(mut client) => {
740                if let Err(err) = ensure_schema(&mut client, config) {
741                    warn!(
742                        "{}",
743                        format_pg_error(&format!("ensure_schema({role})"), Some(&config.url), &err)
744                    );
745                }
746                return Ok(client);
747            }
748            Err(err) => {
749                warn!("{err}");
750                std::thread::sleep(Duration::from_millis(backoff_ms));
751                backoff_ms = (backoff_ms * 2).min(5_000);
752            }
753        }
754    }
755}
756
757fn connect_listener_client(config: &PostgresConfig) -> Result<Client, String> {
758    let mut backoff_ms = 250u64;
759    loop {
760        let mut client = connect_consumer_client("listener", config)?;
761        match client.batch_execute(&format!("LISTEN {};", qi(&config.channel))) {
762            Ok(()) => return Ok(client),
763            Err(err) => {
764                warn!(
765                    "{}",
766                    format_pg_error("LISTEN(register)", Some(&config.url), &err)
767                );
768                std::thread::sleep(Duration::from_millis(backoff_ms));
769                backoff_ms = (backoff_ms * 2).min(5_000);
770            }
771        }
772    }
773}
774
775fn apply_remote_event(
776    event: MEvent,
777    host_id: &str,
778    handler_registry: &Arc<HandlerRegistry>,
779    registry: &Arc<StoreRegistry>,
780) {
781    let is_my_event = event.source_id.as_ref().is_some_and(|id| id == host_id);
782    if is_my_event {
783        return;
784    }
785
786    match event.change_type {
787        MEventType::SET => {
788            if let Some(parse) = handler_registry.get_item_parser(&event.item_type) {
789                match parse(event.item.clone()) {
790                    Ok(item) => {
791                        let store = registry.get_or_create(item.entity_type());
792                        store.insert(item.id(), item);
793                    }
794                    Err(e) => {
795                        let msg = e.to_string();
796                        let short = msg
797                            .find(", expected one of")
798                            .map(|pos| msg[..pos].to_string())
799                            .unwrap_or(msg);
800                        error!("Failed to parse {}: {short}", event.item_type);
801                    }
802                }
803            } else {
804                warn!("No parser for entity type: {}", event.item_type);
805            }
806        }
807        MEventType::DEL => {
808            if let Some(id) = event.item.get("id").and_then(|v| v.as_str()) {
809                let store = registry.get_or_create(&event.item_type);
810                store.remove(&id.into());
811            } else {
812                error!("DEL event missing id field: {:?}", event.item);
813            }
814        }
815    }
816}
817
818fn ensure_schema(client: &mut Client, config: &PostgresConfig) -> Result<(), ::postgres::Error> {
819    let table = qi(&config.table);
820    let idx_tx = qi(&format!("{}_tx_idx", config.table));
821    let idx_item = qi(&format!("{}_item_type_item_id_idx", config.table));
822    let idx_item_latest = qi(&format!("{}_item_latest_idx", config.table));
823    let idx_created = qi(&format!("{}_created_at_idx", config.table));
824    let trigger_fn = qi(&format!("{}_notify_insert_fn", config.table));
825    let trigger_name = qi(&format!("{}_notify_insert_trigger", config.table));
826
827    client.batch_execute(&format!(
828        "
829        CREATE TABLE IF NOT EXISTS {table} (
830            id BIGSERIAL PRIMARY KEY,
831            item_type TEXT NOT NULL,
832            item_id TEXT NOT NULL,
833            change_type TEXT NOT NULL,
834            created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
835            tx TEXT NOT NULL,
836            source_id TEXT,
837            event JSONB NOT NULL
838        );
839        CREATE INDEX IF NOT EXISTS {idx_tx} ON {table} (tx);
840        CREATE INDEX IF NOT EXISTS {idx_item} ON {table} (item_type, item_id);
841        CREATE INDEX IF NOT EXISTS {idx_item_latest} ON {table} (item_type, item_id, id DESC);
842        CREATE INDEX IF NOT EXISTS {idx_created} ON {table} (created_at);
843        CREATE OR REPLACE FUNCTION {trigger_fn}() RETURNS trigger AS $$
844        BEGIN
845            PERFORM pg_notify('{channel}', NEW.id::text);
846            RETURN NEW;
847        END;
848        $$ LANGUAGE plpgsql;
849        DROP TRIGGER IF EXISTS {trigger_name} ON {table};
850        CREATE TRIGGER {trigger_name}
851            AFTER INSERT ON {table}
852            FOR EACH ROW
853            EXECUTE FUNCTION {trigger_fn}();
854        ",
855        channel = config.channel
856    ))?;
857
858    Ok(())
859}
860
861fn validate_ident(name: &str) -> Result<(), String> {
862    if name.is_empty() {
863        return Err("identifier cannot be empty".to_string());
864    }
865    let mut chars = name.chars();
866    let first = chars
867        .next()
868        .ok_or_else(|| "identifier cannot be empty".to_string())?;
869    if !(first == '_' || first.is_ascii_alphabetic()) {
870        return Err(format!(
871            "invalid identifier `{name}`: must start with letter or underscore"
872        ));
873    }
874    if !chars.all(|c| c == '_' || c.is_ascii_alphanumeric()) {
875        return Err(format!(
876            "invalid identifier `{name}`: only letters, numbers, underscore are allowed"
877        ));
878    }
879    Ok(())
880}
881
882fn qi(name: &str) -> String {
883    format!("\"{name}\"")
884}
885
886fn row_to_persisted_event(row: ::postgres::Row) -> Result<PersistedEvent, String> {
887    let id: i64 = row.get(0);
888    let created_at: String = row.get(1);
889    let event_json: String = row.get(2);
890    let event = MEvent::from_str_trim(&event_json)
891        .map_err(|e| format!("invalid history event payload for id={id}: {e}"))?;
892    Ok(PersistedEvent {
893        id,
894        created_at,
895        event,
896    })
897}
898
899fn redact_pg_url(url: &str) -> String {
900    match url.rfind('@') {
901        Some(at) => {
902            let after_scheme = url.find("://").map(|idx| idx + 3).unwrap_or(0);
903            format!("{}***{}", &url[..after_scheme], &url[at..])
904        }
905        None => url.to_string(),
906    }
907}
908
909fn format_pg_connect_error(role: &str, url: &str, err: &postgres::Error) -> String {
910    format_pg_error(role, Some(url), err)
911}
912
913fn format_pg_error(role: &str, url: Option<&str>, err: &postgres::Error) -> String {
914    let mut msg = match url {
915        Some(url) => format!("{role} failed (dsn={}): {}", redact_pg_url(url), err),
916        None => format!("{role} failed: {err}"),
917    };
918    if let Some(db) = err.as_db_error() {
919        msg.push_str(&format!(
920            " [code={} severity={} message={}]",
921            db.code().code(),
922            db.severity(),
923            db.message()
924        ));
925        if let Some(detail) = db.detail() {
926            msg.push_str(&format!(" [detail={}]", detail));
927        }
928        if let Some(hint) = db.hint() {
929            msg.push_str(&format!(" [hint={}]", hint));
930        }
931    }
932    msg
933}
934
935fn connect_pg_client(config: &PostgresConfig, role: &str) -> Result<Client, String> {
936    let mut client_config = parse_pg_client_config(config, role)?;
937
938    // Avoid default 2h keepalive-idle so long-lived idle sockets are detected quickly.
939    client_config.connect_timeout(Duration::from_secs(PG_CONNECT_TIMEOUT_SECS));
940    client_config.keepalives(true);
941    client_config.keepalives_idle(Duration::from_secs(PG_KEEPALIVE_IDLE_SECS));
942    client_config.keepalives_interval(Duration::from_secs(PG_KEEPALIVE_INTERVAL_SECS));
943    client_config.keepalives_retries(PG_KEEPALIVE_RETRIES);
944
945    client_config
946        .connect(NoTls)
947        .map_err(|err| format_pg_connect_error(role, &config.url, &err))
948}
949
950fn parse_pg_client_config(config: &PostgresConfig, role: &str) -> Result<PgClientConfig, String> {
951    config.url.parse::<PgClientConfig>().map_err(|err| {
952        format!(
953            "postgres config parse failed ({role}, dsn={}): {err}",
954            redact_pg_url(&config.url)
955        )
956    })
957}