Skip to main content

faucet_source_postgres_cdc/pgoutput/
registry.rs

1//! In-process cache of `Relation` messages so subsequent `Insert`/`Update`/
2//! `Delete` events can look up column names + type OIDs by relation OID.
3
4use super::messages::Relation;
5use faucet_core::FaucetError;
6use std::collections::HashMap;
7
8#[derive(Debug, Default)]
9pub struct RelationRegistry {
10    by_oid: HashMap<u32, Relation>,
11}
12
13impl RelationRegistry {
14    pub fn new() -> Self {
15        Self::default()
16    }
17
18    pub fn insert(&mut self, rel: Relation) {
19        // A re-sent Relation with a different column set means the table's
20        // schema changed mid-stream (ALTER TABLE). Subsequent tuples decode
21        // against the *new* descriptor, but a same-arity rename/type change
22        // can silently bind values to the wrong column names — surface it so
23        // an operator can correlate any downstream surprise (#78/#46).
24        if let Some(prev) = self.by_oid.get(&rel.oid) {
25            let prev_cols: Vec<(&str, u32)> = prev
26                .columns
27                .iter()
28                .map(|c| (c.name.as_str(), c.type_oid))
29                .collect();
30            let new_cols: Vec<(&str, u32)> = rel
31                .columns
32                .iter()
33                .map(|c| (c.name.as_str(), c.type_oid))
34                .collect();
35            if prev_cols != new_cols {
36                tracing::warn!(
37                    relation = %rel.name,
38                    oid = rel.oid,
39                    "postgres-cdc: relation column set changed mid-stream (schema change); \
40                     subsequent rows decode against the new descriptor"
41                );
42            }
43        }
44        self.by_oid.insert(rel.oid, rel);
45    }
46
47    pub fn get(&self, oid: u32) -> Result<&Relation, FaucetError> {
48        self.by_oid.get(&oid).ok_or_else(|| {
49            FaucetError::Source(format!(
50                "pgoutput: change event for unknown relation oid {oid} \
51                 (Relation message must precede first change)"
52            ))
53        })
54    }
55}
56
57#[cfg(test)]
58mod tests {
59    use super::*;
60    use crate::pgoutput::messages::ReplicaIdentity;
61
62    fn rel(oid: u32, name: &str) -> Relation {
63        Relation {
64            oid,
65            namespace: "public".into(),
66            name: name.into(),
67            replica_identity: ReplicaIdentity::Default,
68            columns: vec![],
69        }
70    }
71
72    #[test]
73    fn insert_then_get() {
74        let mut r = RelationRegistry::new();
75        r.insert(rel(16384, "users"));
76        assert_eq!(r.get(16384).unwrap().name, "users");
77    }
78
79    #[test]
80    fn second_insert_replaces() {
81        let mut r = RelationRegistry::new();
82        r.insert(rel(16384, "users_v1"));
83        r.insert(rel(16384, "users_v2"));
84        assert_eq!(r.get(16384).unwrap().name, "users_v2");
85    }
86
87    #[test]
88    fn missing_oid_errors() {
89        let r = RelationRegistry::new();
90        let err = r.get(99999).unwrap_err();
91        assert!(format!("{err}").contains("99999"));
92    }
93}