Skip to main content

oxgraph_postgres/
catalog.rs

1//! Registration model for relational source tables and edges.
2
3use core::fmt;
4
5#[cfg(feature = "serde")]
6use serde::{Deserialize, Serialize};
7
8/// Stable identifier for a registered source table.
9#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
10#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
11pub struct TableId(pub u32);
12
13impl fmt::Display for TableId {
14    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
15        write!(f, "table:{}", self.0)
16    }
17}
18
19/// Stable identifier for a registered edge mapping.
20#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
21#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
22pub struct EdgeId(pub u32);
23
24impl fmt::Display for EdgeId {
25    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26        write!(f, "edge:{}", self.0)
27    }
28}
29
30/// External node key supplied by a registered table row.
31///
32/// Encodes `(table_id, primary_key)` as `(table_id << 32) | primary_key` so keys from
33/// different registered tables remain distinct at build time.
34#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
35#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
36pub struct NodeKey(pub u64);
37
38impl NodeKey {
39    /// Builds a node key from a registered table id and SQL primary-key value.
40    ///
41    /// # Performance
42    ///
43    /// This method is `O(1)`.
44    #[must_use]
45    pub const fn registered(table: TableId, primary_key: u64) -> Self {
46        Self(((table.0 as u64) << 32) | primary_key)
47    }
48
49    /// Returns the registered table id embedded in this key.
50    #[must_use]
51    pub const fn table_id(self) -> TableId {
52        TableId((self.0 >> 32) as u32)
53    }
54
55    /// Returns the SQL primary-key component embedded in this key.
56    #[must_use]
57    pub const fn primary_key(self) -> u64 {
58        self.0 & 0xFFFF_FFFF
59    }
60}
61
62impl fmt::Display for NodeKey {
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        write!(f, "node:{}:{}", self.table_id(), self.primary_key())
65    }
66}
67
68/// Column used for filter/search indexing at query time.
69#[derive(Clone, Debug, PartialEq, Eq)]
70#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
71pub struct FilterColumn {
72    /// Owning registered table.
73    pub table: TableId,
74    /// SQL column name (semantic boundary — interpreted only by the extension).
75    pub column: String,
76}
77
78/// Registered node table metadata.
79#[derive(Clone, Debug, PartialEq, Eq)]
80#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
81pub struct RegisteredTable {
82    /// Stable table id assigned at registration time.
83    pub id: TableId,
84    /// SQL schema name.
85    pub schema: String,
86    /// SQL table name.
87    pub name: String,
88    /// Primary-key column used to derive [`NodeKey`] values.
89    pub primary_key_column: String,
90}
91
92/// Registered directed edge between two node tables.
93#[derive(Clone, Debug, PartialEq, Eq)]
94#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
95pub struct RegisteredEdge {
96    /// Stable edge id assigned at registration time.
97    pub id: EdgeId,
98    /// Source endpoint table.
99    pub source_table: TableId,
100    /// Target endpoint table.
101    pub target_table: TableId,
102    /// Source foreign-key column on the edge table.
103    pub source_column: String,
104    /// Target foreign-key column on the edge table.
105    pub target_column: String,
106    /// Edge table schema.
107    pub schema: String,
108    /// Edge table name.
109    pub name: String,
110}
111
112/// In-memory catalog of registered tables, edges, and filter columns.
113#[derive(Clone, Debug, Default, PartialEq, Eq)]
114#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
115pub struct Catalog {
116    /// Registered node tables.
117    pub tables: Vec<RegisteredTable>,
118    /// Registered edge mappings.
119    pub edges: Vec<RegisteredEdge>,
120    /// Optional filter columns indexed at query time.
121    pub filter_columns: Vec<FilterColumn>,
122}
123
124impl Catalog {
125    /// Creates an empty catalog.
126    #[must_use]
127    pub fn new() -> Self {
128        Self::default()
129    }
130
131    /// Returns whether registration is sufficient to run a snapshot rebuild.
132    ///
133    /// # Errors
134    ///
135    /// Returns [`CatalogError::EmptyCatalog`] when no tables are registered.
136    ///
137    /// # Performance
138    ///
139    /// This method is `O(1)`.
140    pub const fn validate_for_build(&self) -> Result<(), CatalogError> {
141        if self.tables.is_empty() {
142            return Err(CatalogError::EmptyCatalog);
143        }
144        Ok(())
145    }
146
147    /// Registers a node table after validating uniqueness.
148    ///
149    /// # Errors
150    ///
151    /// Returns [`CatalogError`] when ids or names collide.
152    ///
153    /// # Performance
154    ///
155    /// This method is `O(t)` where `t` is the number of registered tables.
156    pub fn add_table(&mut self, table: RegisteredTable) -> Result<(), CatalogError> {
157        if self.tables.iter().any(|existing| existing.id == table.id) {
158            return Err(CatalogError::DuplicateTableId(table.id));
159        }
160        if self
161            .tables
162            .iter()
163            .any(|existing| existing.schema == table.schema && existing.name == table.name)
164        {
165            return Err(CatalogError::DuplicateTableName {
166                schema: table.schema,
167                name: table.name,
168            });
169        }
170        self.tables.push(table);
171        Ok(())
172    }
173
174    /// Registers an edge mapping after validating endpoint tables exist.
175    ///
176    /// # Errors
177    ///
178    /// Returns [`CatalogError`] when endpoints are missing or ids collide.
179    ///
180    /// # Performance
181    ///
182    /// This method is `O(t + e)`.
183    pub fn add_edge(&mut self, edge: RegisteredEdge) -> Result<(), CatalogError> {
184        if self.edges.iter().any(|existing| existing.id == edge.id) {
185            return Err(CatalogError::DuplicateEdgeId(edge.id));
186        }
187        if !self
188            .tables
189            .iter()
190            .any(|table| table.id == edge.source_table)
191        {
192            return Err(CatalogError::MissingTable(edge.source_table));
193        }
194        if !self
195            .tables
196            .iter()
197            .any(|table| table.id == edge.target_table)
198        {
199            return Err(CatalogError::MissingTable(edge.target_table));
200        }
201        self.edges.push(edge);
202        Ok(())
203    }
204
205    /// Registers a filter column for search indexing.
206    ///
207    /// # Errors
208    ///
209    /// Returns [`CatalogError`] when the table id is unknown.
210    ///
211    /// # Performance
212    ///
213    /// This method is `O(t + f)`.
214    pub fn add_filter_column(&mut self, column: FilterColumn) -> Result<(), CatalogError> {
215        if !self.tables.iter().any(|table| table.id == column.table) {
216            return Err(CatalogError::MissingTable(column.table));
217        }
218        self.filter_columns.push(column);
219        Ok(())
220    }
221
222    /// Looks up a registered table by id.
223    #[must_use]
224    pub fn table(&self, id: TableId) -> Option<&RegisteredTable> {
225        self.tables.iter().find(|table| table.id == id)
226    }
227
228    /// Hydrates a catalog from registration rows (SPI adapters collect rows first).
229    ///
230    /// # Errors
231    ///
232    /// Returns [`CatalogError`] when any registration row violates catalog invariants.
233    ///
234    /// # Performance
235    ///
236    /// This method is `O(t + e + f)` for table, edge, and filter row counts.
237    pub fn from_registration_rows(
238        tables: impl IntoIterator<Item = RegisteredTable>,
239        edges: impl IntoIterator<Item = RegisteredEdge>,
240        filter_columns: impl IntoIterator<Item = FilterColumn>,
241    ) -> Result<Self, CatalogError> {
242        let mut catalog = Self::new();
243        for table in tables {
244            catalog.add_table(table)?;
245        }
246        for edge in edges {
247            catalog.add_edge(edge)?;
248        }
249        for column in filter_columns {
250            catalog.add_filter_column(column)?;
251        }
252        Ok(catalog)
253    }
254}
255
256impl RegisteredEdge {
257    /// Builds a validated `SELECT` that resolves endpoint [`NodeKey`] primary-key values.
258    ///
259    /// When endpoints reference different registered tables, the scan joins each node
260    /// table on the edge foreign-key columns. When both endpoints use the same registered
261    /// table and edge columns store that table's primary-key values directly, a single-table
262    /// scan is used.
263    ///
264    /// # Errors
265    ///
266    /// Returns [`CatalogError::InvalidSqlIdent`] when any identifier is unsafe, or
267    /// [`CatalogError::MissingTable`] when endpoint tables are not registered.
268    ///
269    /// # Performance
270    ///
271    /// This method is `O(i)` where `i` is total identifier length.
272    pub fn edge_scan_sql(&self, catalog: &Catalog) -> Result<alloc::string::String, CatalogError> {
273        validate_sql_ident(&self.schema)?;
274        validate_sql_ident(&self.name)?;
275        validate_sql_ident(&self.source_column)?;
276        validate_sql_ident(&self.target_column)?;
277
278        let source = catalog
279            .table(self.source_table)
280            .ok_or(CatalogError::MissingTable(self.source_table))?;
281        let target = catalog
282            .table(self.target_table)
283            .ok_or(CatalogError::MissingTable(self.target_table))?;
284        validate_sql_ident(&source.schema)?;
285        validate_sql_ident(&source.name)?;
286        validate_sql_ident(&target.schema)?;
287        validate_sql_ident(&target.name)?;
288        validate_sql_ident(&source.primary_key_column)?;
289        validate_sql_ident(&target.primary_key_column)?;
290
291        if self.source_table == self.target_table {
292            return Ok(alloc::format!(
293                "SELECT \"{}\"::bigint, \"{}\"::bigint FROM \"{}\".\"{}\"",
294                self.source_column,
295                self.target_column,
296                self.schema,
297                self.name
298            ));
299        }
300
301        Ok(alloc::format!(
302            "SELECT src.\"{}\"::bigint, tgt.\"{}\"::bigint \
303             FROM \"{}\".\"{}\" e \
304             JOIN \"{}\".\"{}\" src ON e.\"{}\" = src.\"{}\" \
305             JOIN \"{}\".\"{}\" tgt ON e.\"{}\" = tgt.\"{}\"",
306            source.primary_key_column,
307            target.primary_key_column,
308            self.schema,
309            self.name,
310            source.schema,
311            source.name,
312            self.source_column,
313            source.primary_key_column,
314            target.schema,
315            target.name,
316            self.target_column,
317            target.primary_key_column,
318        ))
319    }
320}
321
322/// Validates a SQL primary-key value for [`NodeKey::registered`].
323///
324/// # Errors
325///
326/// Returns [`CatalogError::InvalidPrimaryKey`] for negative values and
327/// [`CatalogError::PrimaryKeyOutOfRange`] when the key does not fit the lower
328/// 32 bits of a [`NodeKey`].
329///
330/// # Performance
331///
332/// This function is `O(1)`.
333pub fn validate_primary_key(value: i64) -> Result<u64, CatalogError> {
334    if value.is_negative() {
335        return Err(CatalogError::InvalidPrimaryKey);
336    }
337    let primary_key = u64::try_from(value).map_err(|_| CatalogError::PrimaryKeyOutOfRange)?;
338    if primary_key > u64::from(u32::MAX) {
339        return Err(CatalogError::PrimaryKeyOutOfRange);
340    }
341    Ok(primary_key)
342}
343
344/// Parses a registered table id from SPI/catalog storage.
345///
346/// # Errors
347///
348/// Returns [`CatalogError::InvalidTableId`] when `value` is negative or overflows `u32`.
349///
350/// # Performance
351///
352/// This function is `O(1)`.
353pub fn table_id_from_i32(value: i32) -> Result<TableId, CatalogError> {
354    if value.is_negative() {
355        return Err(CatalogError::InvalidTableId);
356    }
357    u32::try_from(value)
358        .map(TableId)
359        .map_err(|_| CatalogError::InvalidTableId)
360}
361
362/// Parses a registered edge id from SPI/catalog storage.
363///
364/// # Errors
365///
366/// Returns [`CatalogError::InvalidEdgeId`] when `value` is negative or overflows `u32`.
367///
368/// # Performance
369///
370/// This function is `O(1)`.
371pub fn edge_id_from_i32(value: i32) -> Result<EdgeId, CatalogError> {
372    if value.is_negative() {
373        return Err(CatalogError::InvalidEdgeId);
374    }
375    u32::try_from(value)
376        .map(EdgeId)
377        .map_err(|_| CatalogError::InvalidEdgeId)
378}
379
380/// Returns whether `ident` is safe for double-quoted SQL identifiers.
381///
382/// # Errors
383///
384/// Returns [`CatalogError::InvalidSqlIdent`] when the identifier is empty,
385/// starts with an unsafe character, or contains non-identifier characters.
386///
387/// # Performance
388///
389/// This function is `O(i)` where `i` is identifier length.
390pub fn validate_sql_ident(ident: &str) -> Result<(), CatalogError> {
391    let mut chars = ident.chars();
392    let Some(first) = chars.next() else {
393        return Err(CatalogError::InvalidSqlIdent);
394    };
395    if !first.is_ascii_alphabetic() && first != '_' {
396        return Err(CatalogError::InvalidSqlIdent);
397    }
398    if !chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_') {
399        return Err(CatalogError::InvalidSqlIdent);
400    }
401    Ok(())
402}
403
404/// Catalog validation failures.
405#[derive(Debug, Clone, PartialEq, Eq)]
406pub enum CatalogError {
407    /// No tables were registered.
408    EmptyCatalog,
409    /// Duplicate registered table id.
410    DuplicateTableId(TableId),
411    /// Duplicate registered edge id.
412    DuplicateEdgeId(EdgeId),
413    /// Duplicate schema-qualified table name.
414    DuplicateTableName {
415        /// SQL schema name.
416        schema: String,
417        /// SQL table name.
418        name: String,
419    },
420    /// Referenced table id was not registered.
421    MissingTable(TableId),
422    /// SQL identifier failed validation.
423    InvalidSqlIdent,
424    /// SQL primary key was negative.
425    InvalidPrimaryKey,
426    /// SQL primary key does not fit in a [`NodeKey`] payload.
427    PrimaryKeyOutOfRange,
428    /// Registered table id was negative or overflowed `u32`.
429    InvalidTableId,
430    /// Registered edge id was negative or overflowed `u32`.
431    InvalidEdgeId,
432}
433
434impl fmt::Display for CatalogError {
435    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
436        match self {
437            Self::EmptyCatalog => f.write_str("catalog must register at least one table"),
438            Self::DuplicateTableId(id) => write!(f, "duplicate table id {id}"),
439            Self::DuplicateEdgeId(id) => write!(f, "duplicate edge id {id}"),
440            Self::DuplicateTableName { schema, name } => {
441                write!(f, "duplicate table name {schema}.{name}")
442            }
443            Self::MissingTable(id) => write!(f, "missing catalog table {id}"),
444            Self::InvalidSqlIdent => f.write_str("invalid SQL identifier"),
445            Self::InvalidPrimaryKey => f.write_str("primary key must be non-negative"),
446            Self::PrimaryKeyOutOfRange => {
447                f.write_str("primary key must fit in u32 for NodeKey encoding")
448            }
449            Self::InvalidTableId => f.write_str("invalid registered table id"),
450            Self::InvalidEdgeId => f.write_str("invalid registered edge id"),
451        }
452    }
453}
454
455impl core::error::Error for CatalogError {}