hyperdb_api/catalog.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Database catalog operations.
5//!
6//! The `Catalog` struct provides methods for working with database metadata,
7//! including creating and dropping databases, schemas, and tables.
8//!
9//! # SQL Injection Prevention
10//!
11//! All catalog methods use SQL identifier and literal escaping to prevent
12//! SQL injection attacks:
13//!
14//! - Identifiers (database names, schema names, table names) are quoted with
15//! double quotes and internal quotes are escaped (e.g., `"` → `""`)
16//! - String literals (comparison values) are quoted with single quotes and
17//! internal quotes are escaped (e.g., `'` → `''`)
18//!
19//! While this provides protection against basic SQL injection, parameterized
20//! queries would be more robust. The escaping methods used are:
21//!
22//! - `name.replace('"', "\"\"")` for identifiers
23//! - `value.replace('\'', "''")` for literals
24//!
25//! **Note**: User-provided names should still be validated against expected
26//! patterns when possible, as a defense-in-depth measure.
27
28use crate::connection::Connection;
29use crate::error::{Error, Result};
30use crate::table_definition::TableDefinition;
31use hyperdb_api_core::types::SqlType;
32
33/// Provides catalog operations for database metadata.
34///
35/// # Example
36///
37/// ```no_run
38/// use hyperdb_api::{Connection, Catalog, CreateMode, Result};
39///
40/// fn main() -> Result<()> {
41/// let conn = Connection::connect("localhost:7483", "example.hyper", CreateMode::CreateIfNotExists)?;
42/// let catalog = Catalog::new(&conn);
43///
44/// // Check if a schema exists
45/// if !catalog.has_schema("my_schema")? {
46/// catalog.create_schema("my_schema")?;
47/// }
48///
49/// // List tables
50/// let tables = catalog.get_table_names("my_schema")?;
51/// for table in tables {
52/// println!("Table: {}", table);
53/// }
54/// Ok(())
55/// }
56/// ```
57#[derive(Debug)]
58pub struct Catalog<'conn> {
59 connection: &'conn Connection,
60}
61
62impl<'conn> Catalog<'conn> {
63 /// Creates a new Catalog for the given connection.
64 pub fn new(connection: &'conn Connection) -> Self {
65 Catalog { connection }
66 }
67
68 // ============================================================
69 // Database Operations
70 // ============================================================
71
72 /// Creates a new database file (delegates to Connection).
73 ///
74 /// # Errors
75 ///
76 /// Forwards the error from [`Connection::create_database`].
77 pub fn create_database(&self, path: &str) -> Result<()> {
78 self.connection.create_database(path)
79 }
80
81 /// Drops (deletes) a database file (delegates to Connection).
82 ///
83 /// # Errors
84 ///
85 /// Forwards the error from [`Connection::drop_database`].
86 pub fn drop_database(&self, path: &str) -> Result<()> {
87 self.connection.drop_database(path)
88 }
89
90 /// Attaches a database file to the connection.
91 ///
92 /// Once attached, the database can be queried and modified.
93 /// The database is identified by its alias (or by its path if no alias is provided).
94 ///
95 /// # Arguments
96 ///
97 /// * `path` - The path to the database file to attach.
98 /// * `alias` - Optional alias for the database. If `None`, the database is
99 /// attached without an explicit alias (typically using its filename).
100 ///
101 /// # Errors
102 ///
103 /// Returns an error if the database file doesn't exist or if attachment fails.
104 pub fn attach_database(&self, path: &str, alias: Option<&str>) -> Result<()> {
105 self.connection.attach_database(path, alias)
106 }
107
108 /// Detaches a database from the connection.
109 ///
110 /// After detaching, the database file is released and can be accessed
111 /// externally (e.g., copied, moved, etc.). All pending updates are
112 /// written to disk before detaching.
113 ///
114 /// # Arguments
115 ///
116 /// * `alias` - The alias of the database to detach.
117 ///
118 /// # Errors
119 ///
120 /// Returns an error if the database is not attached or if detachment fails.
121 pub fn detach_database(&self, alias: &str) -> Result<()> {
122 self.connection.detach_database(alias)
123 }
124
125 /// Detaches all databases from the connection.
126 ///
127 /// This is useful for cleanup before closing a connection or when
128 /// you need to release all database files.
129 ///
130 /// # Errors
131 ///
132 /// Returns an error if the databases could not be detached.
133 pub fn detach_all_databases(&self) -> Result<()> {
134 self.connection.detach_all_databases()
135 }
136
137 // ============================================================
138 // Schema Operations
139 // ============================================================
140
141 /// Creates a schema.
142 ///
143 /// # Errors
144 ///
145 /// - Returns an error if `schema_name` cannot be converted to a
146 /// [`SchemaName`](crate::SchemaName).
147 /// - Returns [`Error::Client`] if the server rejects
148 /// `CREATE SCHEMA IF NOT EXISTS`.
149 pub fn create_schema<T>(&self, schema_name: T) -> Result<()>
150 where
151 T: TryInto<crate::SchemaName>,
152 crate::Error: From<T::Error>,
153 {
154 let schema = schema_name.try_into()?;
155 let sql = format!("CREATE SCHEMA IF NOT EXISTS {schema}");
156 self.connection.execute_command(&sql)?;
157 Ok(())
158 }
159
160 // ============================================================
161 // Query Operations
162 // ============================================================
163
164 /// Returns a list of schema names in the database.
165 ///
166 /// # Arguments
167 ///
168 /// * `database` - The database name, or `None` to use the first database
169 /// in the search path.
170 ///
171 /// # Returns
172 ///
173 /// A vector of schema names.
174 ///
175 /// # Errors
176 ///
177 /// Returns an error if the query fails.
178 pub fn get_schema_names<T>(&self, database: Option<T>) -> Result<Vec<String>>
179 where
180 T: TryInto<crate::DatabaseName>,
181 crate::Error: From<T::Error>,
182 {
183 let database = match database {
184 Some(db) => Some(db.try_into()?),
185 None => None,
186 };
187
188 let query = if let Some(db) = database {
189 format!(
190 "SELECT nspname FROM {db}.pg_catalog.pg_namespace WHERE nspname NOT IN ('pg_catalog', 'pg_temp', 'information_schema')"
191 )
192 } else {
193 "SELECT nspname FROM pg_catalog.pg_namespace WHERE nspname NOT IN ('pg_catalog', 'pg_temp', 'information_schema')".to_string()
194 };
195
196 let mut result = self.connection.execute_query(&query)?;
197 let mut names = Vec::new();
198 while let Some(chunk) = result.next_chunk()? {
199 for row in &chunk {
200 if let Some(name) = row.get::<String>(0) {
201 names.push(name);
202 }
203 }
204 }
205 Ok(names)
206 }
207
208 /// Returns a list of table names in the given schema.
209 ///
210 /// # Arguments
211 ///
212 /// * `schema` - The schema name (can include database qualifier).
213 ///
214 /// # Returns
215 ///
216 /// A vector of table names.
217 ///
218 /// # Errors
219 ///
220 /// Returns an error if the query fails.
221 pub fn get_table_names<T>(&self, schema: T) -> Result<Vec<String>>
222 where
223 T: TryInto<crate::SchemaName>,
224 crate::Error: From<T::Error>,
225 {
226 let schema = schema.try_into()?;
227 let db_prefix = if let Some(db) = schema.database() {
228 format!("{db}.")
229 } else {
230 String::new()
231 };
232
233 let query = format!(
234 "SELECT tablename FROM {}pg_catalog.pg_tables WHERE schemaname = '{}'",
235 db_prefix,
236 schema.unescaped().replace('\'', "''")
237 );
238
239 let mut result = self.connection.execute_query(&query)?;
240 let mut names = Vec::new();
241 while let Some(chunk) = result.next_chunk()? {
242 for row in &chunk {
243 if let Some(name) = row.get::<String>(0) {
244 names.push(name);
245 }
246 }
247 }
248 Ok(names)
249 }
250
251 /// Checks whether a schema exists.
252 ///
253 /// # Arguments
254 ///
255 /// * `schema` - The schema name (can include database qualifier).
256 ///
257 /// # Returns
258 ///
259 /// `true` if the schema exists, `false` otherwise.
260 ///
261 /// # Errors
262 ///
263 /// - Returns an error if `schema` cannot be converted to a
264 /// [`SchemaName`](crate::SchemaName).
265 /// - Returns [`Error::Client`] if the `pg_catalog.pg_namespace` lookup
266 /// query fails.
267 pub fn has_schema<T>(&self, schema: T) -> Result<bool>
268 where
269 T: TryInto<crate::SchemaName>,
270 crate::Error: From<T::Error>,
271 {
272 let schema = schema.try_into()?;
273 let db_prefix = if let Some(db) = schema.database() {
274 format!("{db}.")
275 } else {
276 String::new()
277 };
278
279 let query = format!(
280 "SELECT 1 FROM {}pg_catalog.pg_namespace WHERE nspname = '{}'",
281 db_prefix,
282 schema.unescaped().replace('\'', "''")
283 );
284
285 let mut result = self.connection.execute_query(&query)?;
286 if let Some(chunk) = result.next_chunk()? {
287 Ok(!chunk.is_empty())
288 } else {
289 Ok(false)
290 }
291 }
292
293 /// Checks whether a table exists.
294 ///
295 /// # Arguments
296 ///
297 /// * `table_name` - The table name (can include database and schema qualifiers).
298 ///
299 /// # Returns
300 ///
301 /// `true` if the table exists, `false` otherwise.
302 ///
303 /// # Errors
304 ///
305 /// - Returns an error if `table_name` cannot be converted to a
306 /// [`TableName`](crate::TableName).
307 /// - Returns [`Error::Client`] if the `pg_catalog.pg_tables` lookup
308 /// query fails.
309 pub fn has_table<T>(&self, table_name: T) -> Result<bool>
310 where
311 T: TryInto<crate::TableName>,
312 crate::Error: From<T::Error>,
313 {
314 let table_name = table_name.try_into()?;
315 let schema = table_name
316 .schema()
317 .map_or("public", super::names::Name::unescaped);
318 let db_prefix = if let Some(db) = table_name.database() {
319 format!("{db}.")
320 } else {
321 String::new()
322 };
323
324 let query = format!(
325 "SELECT 1 FROM {}pg_catalog.pg_tables WHERE schemaname = '{}' AND tablename = '{}'",
326 db_prefix,
327 schema.replace('\'', "''"),
328 table_name.table().unescaped().replace('\'', "''")
329 );
330
331 let mut result = self.connection.execute_query(&query)?;
332 if let Some(chunk) = result.next_chunk()? {
333 Ok(!chunk.is_empty())
334 } else {
335 Ok(false)
336 }
337 }
338
339 /// Retrieves the table definition for an existing table.
340 ///
341 /// # Arguments
342 ///
343 /// * `table_name` - The table name (can include database and schema qualifiers).
344 ///
345 /// # Returns
346 ///
347 /// A [`TableDefinition`] representing the table's schema.
348 ///
349 /// # Errors
350 ///
351 /// Returns an error if the table does not exist or if retrieval fails.
352 ///
353 /// # Example
354 ///
355 /// ```no_run
356 /// use hyperdb_api::{Connection, Catalog, Result};
357 ///
358 /// fn main() -> Result<()> {
359 /// let conn = Connection::without_database("localhost:7483")?;
360 /// let catalog = Catalog::new(&conn);
361 ///
362 /// let table_def = catalog.get_table_definition("public.products")?;
363 /// println!("Columns: {}", table_def.column_count());
364 /// for col in table_def.columns() {
365 /// println!(" - {}: {}", col.name, col.type_name());
366 /// }
367 /// Ok(())
368 /// }
369 /// ```
370 pub fn get_table_definition<T>(&self, table_name: T) -> Result<TableDefinition>
371 where
372 T: TryInto<crate::TableName>,
373 crate::Error: From<T::Error>,
374 {
375 let table_name = table_name.try_into()?;
376 let schema = table_name
377 .schema()
378 .map_or("public", super::names::Name::unescaped);
379 let table = table_name.table().unescaped();
380
381 // Query column information from pg_catalog
382 // Join pg_attribute with pg_type to get column names, types, and type modifiers
383 let query = if let Some(db) = table_name.database() {
384 format!(
385 r"SELECT a.attname, t.typname, NOT a.attnotnull as is_nullable, a.atttypid, a.atttypmod
386 FROM {db}.pg_catalog.pg_attribute a
387 JOIN {db}.pg_catalog.pg_type t ON a.atttypid = t.oid
388 JOIN {db}.pg_catalog.pg_class c ON a.attrelid = c.oid
389 JOIN {db}.pg_catalog.pg_namespace n ON c.relnamespace = n.oid
390 WHERE n.nspname = '{schema}' AND c.relname = '{table}'
391 AND a.attnum > 0 AND NOT a.attisdropped
392 ORDER BY a.attnum",
393 db = db,
394 schema = schema.replace('\'', "''"),
395 table = table.replace('\'', "''")
396 )
397 } else {
398 format!(
399 r"SELECT a.attname, t.typname, NOT a.attnotnull as is_nullable, a.atttypid, a.atttypmod
400 FROM pg_catalog.pg_attribute a
401 JOIN pg_catalog.pg_type t ON a.atttypid = t.oid
402 JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
403 JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
404 WHERE n.nspname = '{schema}' AND c.relname = '{table}'
405 AND a.attnum > 0 AND NOT a.attisdropped
406 ORDER BY a.attnum",
407 schema = schema.replace('\'', "''"),
408 table = table.replace('\'', "''")
409 )
410 };
411
412 let mut result = self.connection.execute_query(&query)?;
413
414 let mut table_def = TableDefinition::new(table);
415 table_def.schema = Some(schema.to_string());
416 if let Some(db) = table_name.database() {
417 table_def.database = Some(db.unescaped().to_string());
418 }
419
420 let mut found_columns = false;
421 while let Some(chunk) = result.next_chunk()? {
422 for row in &chunk {
423 found_columns = true;
424 let col_name = row.get::<String>(0).unwrap_or_default();
425 let _data_type = row.get::<String>(1).unwrap_or_default();
426 // Hyper returns boolean as binary bool
427 let is_nullable = row.get::<bool>(2).unwrap_or(false);
428
429 // Get type OID and modifier for proper type construction.
430 // Bit-pattern reinterpret: pg_type.oid is transported as Int4 on the
431 // wire but semantically is a u32 OID; this `as u32` recovers the
432 // original bit pattern.
433 #[expect(
434 clippy::cast_sign_loss,
435 reason = "intentional u32 bit-pattern reinterpret of PostgreSQL oid transported as Int4"
436 )]
437 let type_oid = row.get::<i32>(3).unwrap_or(0) as u32;
438 let type_mod = row.get::<i32>(4).unwrap_or(-1);
439
440 // Use OID and modifier to create proper SqlType with precision/scale
441 let sql_type = SqlType::from_oid_and_modifier(type_oid, type_mod);
442 table_def.add_column_with_sql_type(&col_name, sql_type, is_nullable);
443 }
444 }
445
446 if !found_columns {
447 return Err(Error::NotFound(format!("Table {schema}.{table}")));
448 }
449
450 Ok(table_def)
451 }
452
453 // ============================================================
454 // Table Operations
455 // ============================================================
456
457 /// Creates a table from a definition.
458 ///
459 /// # Arguments
460 ///
461 /// * `table_def` - The table definition describing the table to create.
462 ///
463 /// # Errors
464 ///
465 /// Returns an error if the table already exists or if creation fails.
466 pub fn create_table(&self, table_def: &TableDefinition) -> Result<()> {
467 let sql = table_def.to_create_sql(true)?;
468 self.connection.execute_command(&sql)?;
469 Ok(())
470 }
471
472 /// Creates a table from a definition if it doesn't exist.
473 ///
474 /// Unlike [`create_table`](Self::create_table), this method does not fail
475 /// if the table already exists.
476 ///
477 /// # Errors
478 ///
479 /// - Returns [`Error::InvalidTableDefinition`] if `table_def` cannot be
480 /// rendered as valid SQL (zero columns, bad identifiers).
481 /// - Returns [`Error::Client`] if the server rejects
482 /// `CREATE TABLE IF NOT EXISTS`.
483 pub fn create_table_if_not_exists(&self, table_def: &TableDefinition) -> Result<()> {
484 let sql = table_def.to_create_sql(false)?;
485 self.connection.execute_command(&sql)?;
486 Ok(())
487 }
488
489 /// Drops a table.
490 ///
491 /// # Arguments
492 ///
493 /// * `table_name` - The table name (can include database and schema qualifiers).
494 ///
495 /// # Errors
496 ///
497 /// Returns an error if the table doesn't exist or if deletion fails.
498 pub fn drop_table<T>(&self, table_name: T) -> Result<()>
499 where
500 T: TryInto<crate::TableName>,
501 crate::Error: From<T::Error>,
502 {
503 let table_name = table_name.try_into()?;
504 let sql = format!("DROP TABLE {table_name}");
505 self.connection.execute_command(&sql)?;
506 Ok(())
507 }
508
509 /// Drops a table if it exists.
510 ///
511 /// Unlike [`drop_table`](Self::drop_table), this method does not fail
512 /// if the table doesn't exist.
513 ///
514 /// # Errors
515 ///
516 /// - Returns an error if `table_name` cannot be converted to a
517 /// [`TableName`](crate::TableName).
518 /// - Returns [`Error::Client`] if the server rejects
519 /// `DROP TABLE IF EXISTS`.
520 pub fn drop_table_if_exists<T>(&self, table_name: T) -> Result<()>
521 where
522 T: TryInto<crate::TableName>,
523 crate::Error: From<T::Error>,
524 {
525 let table_name = table_name.try_into()?;
526 let sql = format!("DROP TABLE IF EXISTS {table_name}");
527 self.connection.execute_command(&sql)?;
528 Ok(())
529 }
530
531 /// Drops a schema.
532 ///
533 /// # Arguments
534 ///
535 /// * `schema_name` - The schema name (can include database qualifier).
536 /// * `cascade` - If true, drop all objects in the schema.
537 ///
538 /// # Errors
539 ///
540 /// Returns an error if the schema doesn't exist or if deletion fails.
541 pub fn drop_schema<T>(&self, schema_name: T, cascade: bool) -> Result<()>
542 where
543 T: TryInto<crate::SchemaName>,
544 crate::Error: From<T::Error>,
545 {
546 let schema_name = schema_name.try_into()?;
547 let sql = if cascade {
548 format!("DROP SCHEMA {schema_name} CASCADE")
549 } else {
550 format!("DROP SCHEMA {schema_name}")
551 };
552 self.connection.execute_command(&sql)?;
553 Ok(())
554 }
555
556 /// Drops a schema if it exists.
557 ///
558 /// # Errors
559 ///
560 /// - Returns an error if `schema_name` cannot be converted to a
561 /// [`SchemaName`](crate::SchemaName).
562 /// - Returns [`Error::Client`] if the server rejects
563 /// `DROP SCHEMA IF EXISTS` — typically because `cascade` was `false`
564 /// and the schema is not empty.
565 pub fn drop_schema_if_exists<T>(&self, schema_name: T, cascade: bool) -> Result<()>
566 where
567 T: TryInto<crate::SchemaName>,
568 crate::Error: From<T::Error>,
569 {
570 let schema_name = schema_name.try_into()?;
571 let sql = if cascade {
572 format!("DROP SCHEMA IF EXISTS {schema_name} CASCADE")
573 } else {
574 format!("DROP SCHEMA IF EXISTS {schema_name}")
575 };
576 self.connection.execute_command(&sql)?;
577 Ok(())
578 }
579
580 // ============================================================
581 // Metadata Helpers
582 // ============================================================
583
584 /// Returns the approximate row count for a table.
585 ///
586 /// This executes `SELECT COUNT(*) FROM table_name`.
587 ///
588 /// # Example
589 ///
590 /// ```no_run
591 /// # use hyperdb_api::{Connection, Catalog, CreateMode, Result};
592 /// # fn example(conn: &Connection) -> Result<()> {
593 /// let catalog = Catalog::new(&conn);
594 /// let count = catalog.get_row_count("public.users")?;
595 /// println!("Users: {}", count);
596 /// # Ok(())
597 /// # }
598 /// ```
599 ///
600 /// # Errors
601 ///
602 /// - Returns an error if `table_name` cannot be converted to a
603 /// [`TableName`](crate::TableName).
604 /// - Returns [`Error::Client`] if the `SELECT COUNT(*)` query fails
605 /// (e.g. table does not exist).
606 pub fn get_row_count<T>(&self, table_name: T) -> Result<i64>
607 where
608 T: TryInto<crate::TableName>,
609 crate::Error: From<T::Error>,
610 {
611 let table_name = table_name.try_into()?;
612 self.connection
613 .query_count(&format!("SELECT COUNT(*) FROM {table_name}"))
614 }
615
616 /// Returns the column names for a table.
617 ///
618 /// # Example
619 ///
620 /// ```no_run
621 /// # use hyperdb_api::{Connection, Catalog, CreateMode, Result};
622 /// # fn example(conn: &Connection) -> Result<()> {
623 /// let catalog = Catalog::new(&conn);
624 /// let columns = catalog.get_column_names("public.users")?;
625 /// for col in &columns {
626 /// println!("Column: {}", col);
627 /// }
628 /// # Ok(())
629 /// # }
630 /// ```
631 ///
632 /// # Errors
633 ///
634 /// Forwards the error from
635 /// [`get_table_definition`](Self::get_table_definition) — invalid
636 /// `table_name`, missing table, or a failed catalog query.
637 pub fn get_column_names<T>(&self, table_name: T) -> Result<Vec<String>>
638 where
639 T: TryInto<crate::TableName>,
640 crate::Error: From<T::Error>,
641 {
642 let table_def = self.get_table_definition(table_name)?;
643 Ok(table_def.columns().iter().map(|c| c.name.clone()).collect())
644 }
645
646 /// Returns a list of attached database names.
647 ///
648 /// # Example
649 ///
650 /// ```no_run
651 /// # use hyperdb_api::{Connection, Catalog, CreateMode, Result};
652 /// # fn example(conn: &Connection) -> Result<()> {
653 /// let catalog = Catalog::new(&conn);
654 /// let databases = catalog.get_database_names()?;
655 /// for db in &databases {
656 /// println!("Database: {}", db);
657 /// }
658 /// # Ok(())
659 /// # }
660 /// ```
661 ///
662 /// # Errors
663 ///
664 /// Returns [`Error::Client`] if the
665 /// `SELECT datname FROM pg_catalog.pg_database` query fails or a
666 /// streaming error occurs while draining the result.
667 pub fn get_database_names(&self) -> Result<Vec<String>> {
668 let query = "SELECT datname FROM pg_catalog.pg_database";
669 let mut result = self.connection.execute_query(query)?;
670 let mut names = Vec::new();
671 while let Some(chunk) = result.next_chunk()? {
672 for row in &chunk {
673 if let Some(name) = row.get::<String>(0) {
674 names.push(name);
675 }
676 }
677 }
678 Ok(names)
679 }
680}