Skip to main content

hyperdb_api/
catalog.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Database catalog operations.
5//!
6//! The `Catalog` struct provides methods for working with database metadata,
7//! including creating and dropping databases, schemas, and tables.
8//!
9//! # SQL Injection Prevention
10//!
11//! All catalog methods use SQL identifier and literal escaping to prevent
12//! SQL injection attacks:
13//!
14//! - Identifiers (database names, schema names, table names) are quoted with
15//!   double quotes and internal quotes are escaped (e.g., `"` → `""`)
16//! - String literals (comparison values) are quoted with single quotes and
17//!   internal quotes are escaped (e.g., `'` → `''`)
18//!
19//! While this provides protection against basic SQL injection, parameterized
20//! queries would be more robust. The escaping methods used are:
21//!
22//! - `name.replace('"', "\"\"")` for identifiers
23//! - `value.replace('\'', "''")` for literals
24//!
25//! **Note**: User-provided names should still be validated against expected
26//! patterns when possible, as a defense-in-depth measure.
27
28use crate::connection::Connection;
29use crate::error::{Error, Result};
30use crate::table_definition::TableDefinition;
31use hyperdb_api_core::types::SqlType;
32
33/// Provides catalog operations for database metadata.
34///
35/// # Example
36///
37/// ```no_run
38/// use hyperdb_api::{Connection, Catalog, CreateMode, Result};
39///
40/// fn main() -> Result<()> {
41///     let conn = Connection::connect("localhost:7483", "example.hyper", CreateMode::CreateIfNotExists)?;
42///     let catalog = Catalog::new(&conn);
43///
44///     // Check if a schema exists
45///     if !catalog.has_schema("my_schema")? {
46///         catalog.create_schema("my_schema")?;
47///     }
48///
49///     // List tables
50///     let tables = catalog.get_table_names("my_schema")?;
51///     for table in tables {
52///         println!("Table: {}", table);
53///     }
54///     Ok(())
55/// }
56/// ```
57#[derive(Debug)]
58pub struct Catalog<'conn> {
59    connection: &'conn Connection,
60}
61
62impl<'conn> Catalog<'conn> {
63    /// Creates a new Catalog for the given connection.
64    pub fn new(connection: &'conn Connection) -> Self {
65        Catalog { connection }
66    }
67
68    // ============================================================
69    // Database Operations
70    // ============================================================
71
72    /// Creates a new database file (delegates to Connection).
73    ///
74    /// # Errors
75    ///
76    /// Forwards the error from [`Connection::create_database`].
77    pub fn create_database(&self, path: &str) -> Result<()> {
78        self.connection.create_database(path)
79    }
80
81    /// Drops (deletes) a database file (delegates to Connection).
82    ///
83    /// # Errors
84    ///
85    /// Forwards the error from [`Connection::drop_database`].
86    pub fn drop_database(&self, path: &str) -> Result<()> {
87        self.connection.drop_database(path)
88    }
89
90    /// Attaches a database file to the connection.
91    ///
92    /// Once attached, the database can be queried and modified.
93    /// The database is identified by its alias (or by its path if no alias is provided).
94    ///
95    /// # Arguments
96    ///
97    /// * `path` - The path to the database file to attach.
98    /// * `alias` - Optional alias for the database. If `None`, the database is
99    ///   attached without an explicit alias (typically using its filename).
100    ///
101    /// # Errors
102    ///
103    /// Returns an error if the database file doesn't exist or if attachment fails.
104    pub fn attach_database(&self, path: &str, alias: Option<&str>) -> Result<()> {
105        self.connection.attach_database(path, alias)
106    }
107
108    /// Detaches a database from the connection.
109    ///
110    /// After detaching, the database file is released and can be accessed
111    /// externally (e.g., copied, moved, etc.). All pending updates are
112    /// written to disk before detaching.
113    ///
114    /// # Arguments
115    ///
116    /// * `alias` - The alias of the database to detach.
117    ///
118    /// # Errors
119    ///
120    /// Returns an error if the database is not attached or if detachment fails.
121    pub fn detach_database(&self, alias: &str) -> Result<()> {
122        self.connection.detach_database(alias)
123    }
124
125    /// Detaches all databases from the connection.
126    ///
127    /// This is useful for cleanup before closing a connection or when
128    /// you need to release all database files.
129    ///
130    /// # Errors
131    ///
132    /// Returns an error if the databases could not be detached.
133    pub fn detach_all_databases(&self) -> Result<()> {
134        self.connection.detach_all_databases()
135    }
136
137    // ============================================================
138    // Schema Operations
139    // ============================================================
140
141    /// Creates a schema.
142    ///
143    /// # Errors
144    ///
145    /// - Returns an error if `schema_name` cannot be converted to a
146    ///   [`SchemaName`](crate::SchemaName).
147    /// - Returns [`Error::Client`] if the server rejects
148    ///   `CREATE SCHEMA IF NOT EXISTS`.
149    pub fn create_schema<T>(&self, schema_name: T) -> Result<()>
150    where
151        T: TryInto<crate::SchemaName>,
152        crate::Error: From<T::Error>,
153    {
154        let schema = schema_name.try_into()?;
155        let sql = format!("CREATE SCHEMA IF NOT EXISTS {schema}");
156        self.connection.execute_command(&sql)?;
157        Ok(())
158    }
159
160    // ============================================================
161    // Query Operations
162    // ============================================================
163
164    /// Returns a list of schema names in the database.
165    ///
166    /// # Arguments
167    ///
168    /// * `database` - The database name, or `None` to use the first database
169    ///   in the search path.
170    ///
171    /// # Returns
172    ///
173    /// A vector of schema names.
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if the query fails.
178    pub fn get_schema_names<T>(&self, database: Option<T>) -> Result<Vec<String>>
179    where
180        T: TryInto<crate::DatabaseName>,
181        crate::Error: From<T::Error>,
182    {
183        let database = match database {
184            Some(db) => Some(db.try_into()?),
185            None => None,
186        };
187
188        let query = if let Some(db) = database {
189            format!(
190                "SELECT nspname FROM {db}.pg_catalog.pg_namespace WHERE nspname NOT IN ('pg_catalog', 'pg_temp', 'information_schema')"
191            )
192        } else {
193            "SELECT nspname FROM pg_catalog.pg_namespace WHERE nspname NOT IN ('pg_catalog', 'pg_temp', 'information_schema')".to_string()
194        };
195
196        let mut result = self.connection.execute_query(&query)?;
197        let mut names = Vec::new();
198        while let Some(chunk) = result.next_chunk()? {
199            for row in &chunk {
200                if let Some(name) = row.get::<String>(0) {
201                    names.push(name);
202                }
203            }
204        }
205        Ok(names)
206    }
207
208    /// Returns a list of table names in the given schema.
209    ///
210    /// # Arguments
211    ///
212    /// * `schema` - The schema name (can include database qualifier).
213    ///
214    /// # Returns
215    ///
216    /// A vector of table names.
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if the query fails.
221    pub fn get_table_names<T>(&self, schema: T) -> Result<Vec<String>>
222    where
223        T: TryInto<crate::SchemaName>,
224        crate::Error: From<T::Error>,
225    {
226        let schema = schema.try_into()?;
227        let db_prefix = if let Some(db) = schema.database() {
228            format!("{db}.")
229        } else {
230            String::new()
231        };
232
233        let query = format!(
234            "SELECT tablename FROM {}pg_catalog.pg_tables WHERE schemaname = '{}'",
235            db_prefix,
236            schema.unescaped().replace('\'', "''")
237        );
238
239        let mut result = self.connection.execute_query(&query)?;
240        let mut names = Vec::new();
241        while let Some(chunk) = result.next_chunk()? {
242            for row in &chunk {
243                if let Some(name) = row.get::<String>(0) {
244                    names.push(name);
245                }
246            }
247        }
248        Ok(names)
249    }
250
251    /// Checks whether a schema exists.
252    ///
253    /// # Arguments
254    ///
255    /// * `schema` - The schema name (can include database qualifier).
256    ///
257    /// # Returns
258    ///
259    /// `true` if the schema exists, `false` otherwise.
260    ///
261    /// # Errors
262    ///
263    /// - Returns an error if `schema` cannot be converted to a
264    ///   [`SchemaName`](crate::SchemaName).
265    /// - Returns [`Error::Client`] if the `pg_catalog.pg_namespace` lookup
266    ///   query fails.
267    pub fn has_schema<T>(&self, schema: T) -> Result<bool>
268    where
269        T: TryInto<crate::SchemaName>,
270        crate::Error: From<T::Error>,
271    {
272        let schema = schema.try_into()?;
273        let db_prefix = if let Some(db) = schema.database() {
274            format!("{db}.")
275        } else {
276            String::new()
277        };
278
279        let query = format!(
280            "SELECT 1 FROM {}pg_catalog.pg_namespace WHERE nspname = '{}'",
281            db_prefix,
282            schema.unescaped().replace('\'', "''")
283        );
284
285        let mut result = self.connection.execute_query(&query)?;
286        if let Some(chunk) = result.next_chunk()? {
287            Ok(!chunk.is_empty())
288        } else {
289            Ok(false)
290        }
291    }
292
293    /// Checks whether a table exists.
294    ///
295    /// # Arguments
296    ///
297    /// * `table_name` - The table name (can include database and schema qualifiers).
298    ///
299    /// # Returns
300    ///
301    /// `true` if the table exists, `false` otherwise.
302    ///
303    /// # Errors
304    ///
305    /// - Returns an error if `table_name` cannot be converted to a
306    ///   [`TableName`](crate::TableName).
307    /// - Returns [`Error::Client`] if the `pg_catalog.pg_tables` lookup
308    ///   query fails.
309    pub fn has_table<T>(&self, table_name: T) -> Result<bool>
310    where
311        T: TryInto<crate::TableName>,
312        crate::Error: From<T::Error>,
313    {
314        let table_name = table_name.try_into()?;
315        let schema = table_name
316            .schema()
317            .map_or("public", super::names::Name::unescaped);
318        let db_prefix = if let Some(db) = table_name.database() {
319            format!("{db}.")
320        } else {
321            String::new()
322        };
323
324        let query = format!(
325            "SELECT 1 FROM {}pg_catalog.pg_tables WHERE schemaname = '{}' AND tablename = '{}'",
326            db_prefix,
327            schema.replace('\'', "''"),
328            table_name.table().unescaped().replace('\'', "''")
329        );
330
331        let mut result = self.connection.execute_query(&query)?;
332        if let Some(chunk) = result.next_chunk()? {
333            Ok(!chunk.is_empty())
334        } else {
335            Ok(false)
336        }
337    }
338
339    /// Retrieves the table definition for an existing table.
340    ///
341    /// # Arguments
342    ///
343    /// * `table_name` - The table name (can include database and schema qualifiers).
344    ///
345    /// # Returns
346    ///
347    /// A [`TableDefinition`] representing the table's schema.
348    ///
349    /// # Errors
350    ///
351    /// Returns an error if the table does not exist or if retrieval fails.
352    ///
353    /// # Example
354    ///
355    /// ```no_run
356    /// use hyperdb_api::{Connection, Catalog, Result};
357    ///
358    /// fn main() -> Result<()> {
359    ///     let conn = Connection::without_database("localhost:7483")?;
360    ///     let catalog = Catalog::new(&conn);
361    ///
362    ///     let table_def = catalog.get_table_definition("public.products")?;
363    ///     println!("Columns: {}", table_def.column_count());
364    ///     for col in table_def.columns() {
365    ///         println!("  - {}: {}", col.name, col.type_name());
366    ///     }
367    ///     Ok(())
368    /// }
369    /// ```
370    pub fn get_table_definition<T>(&self, table_name: T) -> Result<TableDefinition>
371    where
372        T: TryInto<crate::TableName>,
373        crate::Error: From<T::Error>,
374    {
375        let table_name = table_name.try_into()?;
376        let schema = table_name
377            .schema()
378            .map_or("public", super::names::Name::unescaped);
379        let table = table_name.table().unescaped();
380
381        // Query column information from pg_catalog
382        // Join pg_attribute with pg_type to get column names, types, and type modifiers
383        let query = if let Some(db) = table_name.database() {
384            format!(
385                r"SELECT a.attname, t.typname, NOT a.attnotnull as is_nullable, a.atttypid, a.atttypmod
386                 FROM {db}.pg_catalog.pg_attribute a
387                 JOIN {db}.pg_catalog.pg_type t ON a.atttypid = t.oid
388                 JOIN {db}.pg_catalog.pg_class c ON a.attrelid = c.oid
389                 JOIN {db}.pg_catalog.pg_namespace n ON c.relnamespace = n.oid
390                 WHERE n.nspname = '{schema}' AND c.relname = '{table}'
391                   AND a.attnum > 0 AND NOT a.attisdropped
392                 ORDER BY a.attnum",
393                db = db,
394                schema = schema.replace('\'', "''"),
395                table = table.replace('\'', "''")
396            )
397        } else {
398            format!(
399                r"SELECT a.attname, t.typname, NOT a.attnotnull as is_nullable, a.atttypid, a.atttypmod
400                 FROM pg_catalog.pg_attribute a
401                 JOIN pg_catalog.pg_type t ON a.atttypid = t.oid
402                 JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
403                 JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
404                 WHERE n.nspname = '{schema}' AND c.relname = '{table}'
405                   AND a.attnum > 0 AND NOT a.attisdropped
406                 ORDER BY a.attnum",
407                schema = schema.replace('\'', "''"),
408                table = table.replace('\'', "''")
409            )
410        };
411
412        let mut result = self.connection.execute_query(&query)?;
413
414        let mut table_def = TableDefinition::new(table);
415        table_def.schema = Some(schema.to_string());
416        if let Some(db) = table_name.database() {
417            table_def.database = Some(db.unescaped().to_string());
418        }
419
420        let mut found_columns = false;
421        while let Some(chunk) = result.next_chunk()? {
422            for row in &chunk {
423                found_columns = true;
424                let col_name = row.get::<String>(0).unwrap_or_default();
425                let _data_type = row.get::<String>(1).unwrap_or_default();
426                // Hyper returns boolean as binary bool
427                let is_nullable = row.get::<bool>(2).unwrap_or(false);
428
429                // Get type OID and modifier for proper type construction.
430                // Bit-pattern reinterpret: pg_type.oid is transported as Int4 on the
431                // wire but semantically is a u32 OID; this `as u32` recovers the
432                // original bit pattern.
433                #[expect(
434                    clippy::cast_sign_loss,
435                    reason = "intentional u32 bit-pattern reinterpret of PostgreSQL oid transported as Int4"
436                )]
437                let type_oid = row.get::<i32>(3).unwrap_or(0) as u32;
438                let type_mod = row.get::<i32>(4).unwrap_or(-1);
439
440                // Use OID and modifier to create proper SqlType with precision/scale
441                let sql_type = SqlType::from_oid_and_modifier(type_oid, type_mod);
442                table_def.add_column_with_sql_type(&col_name, sql_type, is_nullable);
443            }
444        }
445
446        if !found_columns {
447            return Err(Error::NotFound(format!("Table {schema}.{table}")));
448        }
449
450        Ok(table_def)
451    }
452
453    // ============================================================
454    // Table Operations
455    // ============================================================
456
457    /// Creates a table from a definition.
458    ///
459    /// # Arguments
460    ///
461    /// * `table_def` - The table definition describing the table to create.
462    ///
463    /// # Errors
464    ///
465    /// Returns an error if the table already exists or if creation fails.
466    pub fn create_table(&self, table_def: &TableDefinition) -> Result<()> {
467        let sql = table_def.to_create_sql(true)?;
468        self.connection.execute_command(&sql)?;
469        Ok(())
470    }
471
472    /// Creates a table from a definition if it doesn't exist.
473    ///
474    /// Unlike [`create_table`](Self::create_table), this method does not fail
475    /// if the table already exists.
476    ///
477    /// # Errors
478    ///
479    /// - Returns [`Error::InvalidTableDefinition`] if `table_def` cannot be
480    ///   rendered as valid SQL (zero columns, bad identifiers).
481    /// - Returns [`Error::Client`] if the server rejects
482    ///   `CREATE TABLE IF NOT EXISTS`.
483    pub fn create_table_if_not_exists(&self, table_def: &TableDefinition) -> Result<()> {
484        let sql = table_def.to_create_sql(false)?;
485        self.connection.execute_command(&sql)?;
486        Ok(())
487    }
488
489    /// Drops a table.
490    ///
491    /// # Arguments
492    ///
493    /// * `table_name` - The table name (can include database and schema qualifiers).
494    ///
495    /// # Errors
496    ///
497    /// Returns an error if the table doesn't exist or if deletion fails.
498    pub fn drop_table<T>(&self, table_name: T) -> Result<()>
499    where
500        T: TryInto<crate::TableName>,
501        crate::Error: From<T::Error>,
502    {
503        let table_name = table_name.try_into()?;
504        let sql = format!("DROP TABLE {table_name}");
505        self.connection.execute_command(&sql)?;
506        Ok(())
507    }
508
509    /// Drops a table if it exists.
510    ///
511    /// Unlike [`drop_table`](Self::drop_table), this method does not fail
512    /// if the table doesn't exist.
513    ///
514    /// # Errors
515    ///
516    /// - Returns an error if `table_name` cannot be converted to a
517    ///   [`TableName`](crate::TableName).
518    /// - Returns [`Error::Client`] if the server rejects
519    ///   `DROP TABLE IF EXISTS`.
520    pub fn drop_table_if_exists<T>(&self, table_name: T) -> Result<()>
521    where
522        T: TryInto<crate::TableName>,
523        crate::Error: From<T::Error>,
524    {
525        let table_name = table_name.try_into()?;
526        let sql = format!("DROP TABLE IF EXISTS {table_name}");
527        self.connection.execute_command(&sql)?;
528        Ok(())
529    }
530
531    /// Drops a schema.
532    ///
533    /// # Arguments
534    ///
535    /// * `schema_name` - The schema name (can include database qualifier).
536    /// * `cascade` - If true, drop all objects in the schema.
537    ///
538    /// # Errors
539    ///
540    /// Returns an error if the schema doesn't exist or if deletion fails.
541    pub fn drop_schema<T>(&self, schema_name: T, cascade: bool) -> Result<()>
542    where
543        T: TryInto<crate::SchemaName>,
544        crate::Error: From<T::Error>,
545    {
546        let schema_name = schema_name.try_into()?;
547        let sql = if cascade {
548            format!("DROP SCHEMA {schema_name} CASCADE")
549        } else {
550            format!("DROP SCHEMA {schema_name}")
551        };
552        self.connection.execute_command(&sql)?;
553        Ok(())
554    }
555
556    /// Drops a schema if it exists.
557    ///
558    /// # Errors
559    ///
560    /// - Returns an error if `schema_name` cannot be converted to a
561    ///   [`SchemaName`](crate::SchemaName).
562    /// - Returns [`Error::Client`] if the server rejects
563    ///   `DROP SCHEMA IF EXISTS` — typically because `cascade` was `false`
564    ///   and the schema is not empty.
565    pub fn drop_schema_if_exists<T>(&self, schema_name: T, cascade: bool) -> Result<()>
566    where
567        T: TryInto<crate::SchemaName>,
568        crate::Error: From<T::Error>,
569    {
570        let schema_name = schema_name.try_into()?;
571        let sql = if cascade {
572            format!("DROP SCHEMA IF EXISTS {schema_name} CASCADE")
573        } else {
574            format!("DROP SCHEMA IF EXISTS {schema_name}")
575        };
576        self.connection.execute_command(&sql)?;
577        Ok(())
578    }
579
580    // ============================================================
581    // Metadata Helpers
582    // ============================================================
583
584    /// Returns the approximate row count for a table.
585    ///
586    /// This executes `SELECT COUNT(*) FROM table_name`.
587    ///
588    /// # Example
589    ///
590    /// ```no_run
591    /// # use hyperdb_api::{Connection, Catalog, CreateMode, Result};
592    /// # fn example(conn: &Connection) -> Result<()> {
593    /// let catalog = Catalog::new(&conn);
594    /// let count = catalog.get_row_count("public.users")?;
595    /// println!("Users: {}", count);
596    /// # Ok(())
597    /// # }
598    /// ```
599    ///
600    /// # Errors
601    ///
602    /// - Returns an error if `table_name` cannot be converted to a
603    ///   [`TableName`](crate::TableName).
604    /// - Returns [`Error::Client`] if the `SELECT COUNT(*)` query fails
605    ///   (e.g. table does not exist).
606    pub fn get_row_count<T>(&self, table_name: T) -> Result<i64>
607    where
608        T: TryInto<crate::TableName>,
609        crate::Error: From<T::Error>,
610    {
611        let table_name = table_name.try_into()?;
612        self.connection
613            .query_count(&format!("SELECT COUNT(*) FROM {table_name}"))
614    }
615
616    /// Returns the column names for a table.
617    ///
618    /// # Example
619    ///
620    /// ```no_run
621    /// # use hyperdb_api::{Connection, Catalog, CreateMode, Result};
622    /// # fn example(conn: &Connection) -> Result<()> {
623    /// let catalog = Catalog::new(&conn);
624    /// let columns = catalog.get_column_names("public.users")?;
625    /// for col in &columns {
626    ///     println!("Column: {}", col);
627    /// }
628    /// # Ok(())
629    /// # }
630    /// ```
631    ///
632    /// # Errors
633    ///
634    /// Forwards the error from
635    /// [`get_table_definition`](Self::get_table_definition) — invalid
636    /// `table_name`, missing table, or a failed catalog query.
637    pub fn get_column_names<T>(&self, table_name: T) -> Result<Vec<String>>
638    where
639        T: TryInto<crate::TableName>,
640        crate::Error: From<T::Error>,
641    {
642        let table_def = self.get_table_definition(table_name)?;
643        Ok(table_def.columns().iter().map(|c| c.name.clone()).collect())
644    }
645
646    /// Returns a list of attached database names.
647    ///
648    /// # Example
649    ///
650    /// ```no_run
651    /// # use hyperdb_api::{Connection, Catalog, CreateMode, Result};
652    /// # fn example(conn: &Connection) -> Result<()> {
653    /// let catalog = Catalog::new(&conn);
654    /// let databases = catalog.get_database_names()?;
655    /// for db in &databases {
656    ///     println!("Database: {}", db);
657    /// }
658    /// # Ok(())
659    /// # }
660    /// ```
661    ///
662    /// # Errors
663    ///
664    /// Returns [`Error::Client`] if the
665    /// `SELECT datname FROM pg_catalog.pg_database` query fails or a
666    /// streaming error occurs while draining the result.
667    pub fn get_database_names(&self) -> Result<Vec<String>> {
668        let query = "SELECT datname FROM pg_catalog.pg_database";
669        let mut result = self.connection.execute_query(query)?;
670        let mut names = Vec::new();
671        while let Some(chunk) = result.next_chunk()? {
672            for row in &chunk {
673                if let Some(name) = row.get::<String>(0) {
674                    names.push(name);
675                }
676            }
677        }
678        Ok(names)
679    }
680}