1use core::fmt;
4
5#[cfg(feature = "serde")]
6use serde::{Deserialize, Serialize};
7
8#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
10#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
11pub struct TableId(pub u32);
12
13impl fmt::Display for TableId {
14 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
15 write!(f, "table:{}", self.0)
16 }
17}
18
19#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
21#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
22pub struct EdgeId(pub u32);
23
24impl fmt::Display for EdgeId {
25 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26 write!(f, "edge:{}", self.0)
27 }
28}
29
30#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
35#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
36pub struct NodeKey(pub u64);
37
38impl NodeKey {
39 #[must_use]
45 pub const fn registered(table: TableId, primary_key: u64) -> Self {
46 Self(((table.0 as u64) << 32) | primary_key)
47 }
48
49 #[must_use]
51 pub const fn table_id(self) -> TableId {
52 TableId((self.0 >> 32) as u32)
53 }
54
55 #[must_use]
57 pub const fn primary_key(self) -> u64 {
58 self.0 & 0xFFFF_FFFF
59 }
60}
61
62impl fmt::Display for NodeKey {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 write!(f, "node:{}:{}", self.table_id(), self.primary_key())
65 }
66}
67
68#[derive(Clone, Debug, PartialEq, Eq)]
70#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
71pub struct FilterColumn {
72 pub table: TableId,
74 pub column: String,
76}
77
78#[derive(Clone, Debug, PartialEq, Eq)]
80#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
81pub struct RegisteredTable {
82 pub id: TableId,
84 pub schema: String,
86 pub name: String,
88 pub primary_key_column: String,
90}
91
92#[derive(Clone, Debug, PartialEq, Eq)]
94#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
95pub struct RegisteredEdge {
96 pub id: EdgeId,
98 pub source_table: TableId,
100 pub target_table: TableId,
102 pub source_column: String,
104 pub target_column: String,
106 pub schema: String,
108 pub name: String,
110}
111
112#[derive(Clone, Debug, Default, PartialEq, Eq)]
114#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
115pub struct Catalog {
116 pub tables: Vec<RegisteredTable>,
118 pub edges: Vec<RegisteredEdge>,
120 pub filter_columns: Vec<FilterColumn>,
122}
123
124impl Catalog {
125 #[must_use]
127 pub fn new() -> Self {
128 Self::default()
129 }
130
131 pub const fn validate_for_build(&self) -> Result<(), CatalogError> {
141 if self.tables.is_empty() {
142 return Err(CatalogError::EmptyCatalog);
143 }
144 Ok(())
145 }
146
147 pub fn add_table(&mut self, table: RegisteredTable) -> Result<(), CatalogError> {
157 if self.tables.iter().any(|existing| existing.id == table.id) {
158 return Err(CatalogError::DuplicateTableId(table.id));
159 }
160 if self
161 .tables
162 .iter()
163 .any(|existing| existing.schema == table.schema && existing.name == table.name)
164 {
165 return Err(CatalogError::DuplicateTableName {
166 schema: table.schema,
167 name: table.name,
168 });
169 }
170 self.tables.push(table);
171 Ok(())
172 }
173
174 pub fn add_edge(&mut self, edge: RegisteredEdge) -> Result<(), CatalogError> {
184 if self.edges.iter().any(|existing| existing.id == edge.id) {
185 return Err(CatalogError::DuplicateEdgeId(edge.id));
186 }
187 if !self
188 .tables
189 .iter()
190 .any(|table| table.id == edge.source_table)
191 {
192 return Err(CatalogError::MissingTable(edge.source_table));
193 }
194 if !self
195 .tables
196 .iter()
197 .any(|table| table.id == edge.target_table)
198 {
199 return Err(CatalogError::MissingTable(edge.target_table));
200 }
201 self.edges.push(edge);
202 Ok(())
203 }
204
205 pub fn add_filter_column(&mut self, column: FilterColumn) -> Result<(), CatalogError> {
215 if !self.tables.iter().any(|table| table.id == column.table) {
216 return Err(CatalogError::MissingTable(column.table));
217 }
218 self.filter_columns.push(column);
219 Ok(())
220 }
221
222 #[must_use]
224 pub fn table(&self, id: TableId) -> Option<&RegisteredTable> {
225 self.tables.iter().find(|table| table.id == id)
226 }
227
228 pub fn from_registration_rows(
238 tables: impl IntoIterator<Item = RegisteredTable>,
239 edges: impl IntoIterator<Item = RegisteredEdge>,
240 filter_columns: impl IntoIterator<Item = FilterColumn>,
241 ) -> Result<Self, CatalogError> {
242 let mut catalog = Self::new();
243 for table in tables {
244 catalog.add_table(table)?;
245 }
246 for edge in edges {
247 catalog.add_edge(edge)?;
248 }
249 for column in filter_columns {
250 catalog.add_filter_column(column)?;
251 }
252 Ok(catalog)
253 }
254}
255
256impl RegisteredEdge {
257 pub fn edge_scan_sql(&self, catalog: &Catalog) -> Result<alloc::string::String, CatalogError> {
273 validate_sql_ident(&self.schema)?;
274 validate_sql_ident(&self.name)?;
275 validate_sql_ident(&self.source_column)?;
276 validate_sql_ident(&self.target_column)?;
277
278 let source = catalog
279 .table(self.source_table)
280 .ok_or(CatalogError::MissingTable(self.source_table))?;
281 let target = catalog
282 .table(self.target_table)
283 .ok_or(CatalogError::MissingTable(self.target_table))?;
284 validate_sql_ident(&source.schema)?;
285 validate_sql_ident(&source.name)?;
286 validate_sql_ident(&target.schema)?;
287 validate_sql_ident(&target.name)?;
288 validate_sql_ident(&source.primary_key_column)?;
289 validate_sql_ident(&target.primary_key_column)?;
290
291 if self.source_table == self.target_table {
292 return Ok(alloc::format!(
293 "SELECT \"{}\"::bigint, \"{}\"::bigint FROM \"{}\".\"{}\"",
294 self.source_column,
295 self.target_column,
296 self.schema,
297 self.name
298 ));
299 }
300
301 Ok(alloc::format!(
302 "SELECT src.\"{}\"::bigint, tgt.\"{}\"::bigint \
303 FROM \"{}\".\"{}\" e \
304 JOIN \"{}\".\"{}\" src ON e.\"{}\" = src.\"{}\" \
305 JOIN \"{}\".\"{}\" tgt ON e.\"{}\" = tgt.\"{}\"",
306 source.primary_key_column,
307 target.primary_key_column,
308 self.schema,
309 self.name,
310 source.schema,
311 source.name,
312 self.source_column,
313 source.primary_key_column,
314 target.schema,
315 target.name,
316 self.target_column,
317 target.primary_key_column,
318 ))
319 }
320}
321
322pub fn validate_primary_key(value: i64) -> Result<u64, CatalogError> {
334 if value.is_negative() {
335 return Err(CatalogError::InvalidPrimaryKey);
336 }
337 let primary_key = u64::try_from(value).map_err(|_| CatalogError::PrimaryKeyOutOfRange)?;
338 if primary_key > u64::from(u32::MAX) {
339 return Err(CatalogError::PrimaryKeyOutOfRange);
340 }
341 Ok(primary_key)
342}
343
344pub fn table_id_from_i32(value: i32) -> Result<TableId, CatalogError> {
354 if value.is_negative() {
355 return Err(CatalogError::InvalidTableId);
356 }
357 u32::try_from(value)
358 .map(TableId)
359 .map_err(|_| CatalogError::InvalidTableId)
360}
361
362pub fn edge_id_from_i32(value: i32) -> Result<EdgeId, CatalogError> {
372 if value.is_negative() {
373 return Err(CatalogError::InvalidEdgeId);
374 }
375 u32::try_from(value)
376 .map(EdgeId)
377 .map_err(|_| CatalogError::InvalidEdgeId)
378}
379
380pub fn validate_sql_ident(ident: &str) -> Result<(), CatalogError> {
391 let mut chars = ident.chars();
392 let Some(first) = chars.next() else {
393 return Err(CatalogError::InvalidSqlIdent);
394 };
395 if !first.is_ascii_alphabetic() && first != '_' {
396 return Err(CatalogError::InvalidSqlIdent);
397 }
398 if !chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_') {
399 return Err(CatalogError::InvalidSqlIdent);
400 }
401 Ok(())
402}
403
404#[derive(Debug, Clone, PartialEq, Eq)]
406pub enum CatalogError {
407 EmptyCatalog,
409 DuplicateTableId(TableId),
411 DuplicateEdgeId(EdgeId),
413 DuplicateTableName {
415 schema: String,
417 name: String,
419 },
420 MissingTable(TableId),
422 InvalidSqlIdent,
424 InvalidPrimaryKey,
426 PrimaryKeyOutOfRange,
428 InvalidTableId,
430 InvalidEdgeId,
432}
433
434impl fmt::Display for CatalogError {
435 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
436 match self {
437 Self::EmptyCatalog => f.write_str("catalog must register at least one table"),
438 Self::DuplicateTableId(id) => write!(f, "duplicate table id {id}"),
439 Self::DuplicateEdgeId(id) => write!(f, "duplicate edge id {id}"),
440 Self::DuplicateTableName { schema, name } => {
441 write!(f, "duplicate table name {schema}.{name}")
442 }
443 Self::MissingTable(id) => write!(f, "missing catalog table {id}"),
444 Self::InvalidSqlIdent => f.write_str("invalid SQL identifier"),
445 Self::InvalidPrimaryKey => f.write_str("primary key must be non-negative"),
446 Self::PrimaryKeyOutOfRange => {
447 f.write_str("primary key must fit in u32 for NodeKey encoding")
448 }
449 Self::InvalidTableId => f.write_str("invalid registered table id"),
450 Self::InvalidEdgeId => f.write_str("invalid registered edge id"),
451 }
452 }
453}
454
455impl core::error::Error for CatalogError {}