faucet_source_postgres_cdc/pgoutput/
registry.rs1use super::messages::Relation;
5use faucet_core::FaucetError;
6use std::collections::HashMap;
7
8#[derive(Debug, Default)]
9pub struct RelationRegistry {
10 by_oid: HashMap<u32, Relation>,
11}
12
13impl RelationRegistry {
14 pub fn new() -> Self {
15 Self::default()
16 }
17
18 pub fn insert(&mut self, rel: Relation) {
19 if let Some(prev) = self.by_oid.get(&rel.oid) {
25 let prev_cols: Vec<(&str, u32)> = prev
26 .columns
27 .iter()
28 .map(|c| (c.name.as_str(), c.type_oid))
29 .collect();
30 let new_cols: Vec<(&str, u32)> = rel
31 .columns
32 .iter()
33 .map(|c| (c.name.as_str(), c.type_oid))
34 .collect();
35 if prev_cols != new_cols {
36 tracing::warn!(
37 relation = %rel.name,
38 oid = rel.oid,
39 "postgres-cdc: relation column set changed mid-stream (schema change); \
40 subsequent rows decode against the new descriptor"
41 );
42 }
43 }
44 self.by_oid.insert(rel.oid, rel);
45 }
46
47 pub fn get(&self, oid: u32) -> Result<&Relation, FaucetError> {
48 self.by_oid.get(&oid).ok_or_else(|| {
49 FaucetError::Source(format!(
50 "pgoutput: change event for unknown relation oid {oid} \
51 (Relation message must precede first change)"
52 ))
53 })
54 }
55}
56
57#[cfg(test)]
58mod tests {
59 use super::*;
60 use crate::pgoutput::messages::ReplicaIdentity;
61
62 fn rel(oid: u32, name: &str) -> Relation {
63 Relation {
64 oid,
65 namespace: "public".into(),
66 name: name.into(),
67 replica_identity: ReplicaIdentity::Default,
68 columns: vec![],
69 }
70 }
71
72 #[test]
73 fn insert_then_get() {
74 let mut r = RelationRegistry::new();
75 r.insert(rel(16384, "users"));
76 assert_eq!(r.get(16384).unwrap().name, "users");
77 }
78
79 #[test]
80 fn second_insert_replaces() {
81 let mut r = RelationRegistry::new();
82 r.insert(rel(16384, "users_v1"));
83 r.insert(rel(16384, "users_v2"));
84 assert_eq!(r.get(16384).unwrap().name, "users_v2");
85 }
86
87 #[test]
88 fn missing_oid_errors() {
89 let r = RelationRegistry::new();
90 let err = r.get(99999).unwrap_err();
91 assert!(format!("{err}").contains("99999"));
92 }
93}