use crate::connection::Connection;
use crate::error::{Error, Result};
use crate::table_definition::TableDefinition;
use hyperdb_api_core::types::SqlType;
#[derive(Debug)]
pub struct Catalog<'conn> {
connection: &'conn Connection,
}
impl<'conn> Catalog<'conn> {
pub fn new(connection: &'conn Connection) -> Self {
Catalog { connection }
}
pub fn create_database(&self, path: &str) -> Result<()> {
self.connection.create_database(path)
}
pub fn drop_database(&self, path: &str) -> Result<()> {
self.connection.drop_database(path)
}
pub fn attach_database(&self, path: &str, alias: Option<&str>) -> Result<()> {
self.connection.attach_database(path, alias)
}
pub fn detach_database(&self, alias: &str) -> Result<()> {
self.connection.detach_database(alias)
}
pub fn detach_all_databases(&self) -> Result<()> {
self.connection.detach_all_databases()
}
pub fn create_schema<T>(&self, schema_name: T) -> Result<()>
where
T: TryInto<crate::SchemaName>,
crate::Error: From<T::Error>,
{
let schema = schema_name.try_into()?;
let sql = format!("CREATE SCHEMA IF NOT EXISTS {schema}");
self.connection.execute_command(&sql)?;
Ok(())
}
pub fn get_schema_names<T>(&self, database: Option<T>) -> Result<Vec<String>>
where
T: TryInto<crate::DatabaseName>,
crate::Error: From<T::Error>,
{
let database = match database {
Some(db) => Some(db.try_into()?),
None => None,
};
let query = if let Some(db) = database {
format!(
"SELECT nspname FROM {db}.pg_catalog.pg_namespace WHERE nspname NOT IN ('pg_catalog', 'pg_temp', 'information_schema')"
)
} else {
"SELECT nspname FROM pg_catalog.pg_namespace WHERE nspname NOT IN ('pg_catalog', 'pg_temp', 'information_schema')".to_string()
};
let mut result = self.connection.execute_query(&query)?;
let mut names = Vec::new();
while let Some(chunk) = result.next_chunk()? {
for row in &chunk {
if let Some(name) = row.get::<String>(0) {
names.push(name);
}
}
}
Ok(names)
}
pub fn get_table_names<T>(&self, schema: T) -> Result<Vec<String>>
where
T: TryInto<crate::SchemaName>,
crate::Error: From<T::Error>,
{
let schema = schema.try_into()?;
let db_prefix = if let Some(db) = schema.database() {
format!("{db}.")
} else {
String::new()
};
let query = format!(
"SELECT tablename FROM {}pg_catalog.pg_tables WHERE schemaname = '{}'",
db_prefix,
schema.unescaped().replace('\'', "''")
);
let mut result = self.connection.execute_query(&query)?;
let mut names = Vec::new();
while let Some(chunk) = result.next_chunk()? {
for row in &chunk {
if let Some(name) = row.get::<String>(0) {
names.push(name);
}
}
}
Ok(names)
}
pub fn has_schema<T>(&self, schema: T) -> Result<bool>
where
T: TryInto<crate::SchemaName>,
crate::Error: From<T::Error>,
{
let schema = schema.try_into()?;
let db_prefix = if let Some(db) = schema.database() {
format!("{db}.")
} else {
String::new()
};
let query = format!(
"SELECT 1 FROM {}pg_catalog.pg_namespace WHERE nspname = '{}'",
db_prefix,
schema.unescaped().replace('\'', "''")
);
let mut result = self.connection.execute_query(&query)?;
if let Some(chunk) = result.next_chunk()? {
Ok(!chunk.is_empty())
} else {
Ok(false)
}
}
pub fn has_table<T>(&self, table_name: T) -> Result<bool>
where
T: TryInto<crate::TableName>,
crate::Error: From<T::Error>,
{
let table_name = table_name.try_into()?;
let schema = table_name
.schema()
.map_or("public", super::names::Name::unescaped);
let db_prefix = if let Some(db) = table_name.database() {
format!("{db}.")
} else {
String::new()
};
let query = format!(
"SELECT 1 FROM {}pg_catalog.pg_tables WHERE schemaname = '{}' AND tablename = '{}'",
db_prefix,
schema.replace('\'', "''"),
table_name.table().unescaped().replace('\'', "''")
);
let mut result = self.connection.execute_query(&query)?;
if let Some(chunk) = result.next_chunk()? {
Ok(!chunk.is_empty())
} else {
Ok(false)
}
}
pub fn get_table_definition<T>(&self, table_name: T) -> Result<TableDefinition>
where
T: TryInto<crate::TableName>,
crate::Error: From<T::Error>,
{
let table_name = table_name.try_into()?;
let schema = table_name
.schema()
.map_or("public", super::names::Name::unescaped);
let table = table_name.table().unescaped();
let query = if let Some(db) = table_name.database() {
format!(
r"SELECT a.attname, t.typname, NOT a.attnotnull as is_nullable, a.atttypid, a.atttypmod
FROM {db}.pg_catalog.pg_attribute a
JOIN {db}.pg_catalog.pg_type t ON a.atttypid = t.oid
JOIN {db}.pg_catalog.pg_class c ON a.attrelid = c.oid
JOIN {db}.pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = '{schema}' AND c.relname = '{table}'
AND a.attnum > 0 AND NOT a.attisdropped
ORDER BY a.attnum",
db = db,
schema = schema.replace('\'', "''"),
table = table.replace('\'', "''")
)
} else {
format!(
r"SELECT a.attname, t.typname, NOT a.attnotnull as is_nullable, a.atttypid, a.atttypmod
FROM pg_catalog.pg_attribute a
JOIN pg_catalog.pg_type t ON a.atttypid = t.oid
JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = '{schema}' AND c.relname = '{table}'
AND a.attnum > 0 AND NOT a.attisdropped
ORDER BY a.attnum",
schema = schema.replace('\'', "''"),
table = table.replace('\'', "''")
)
};
let mut result = self.connection.execute_query(&query)?;
let mut table_def = TableDefinition::new(table);
table_def.schema = Some(schema.to_string());
if let Some(db) = table_name.database() {
table_def.database = Some(db.unescaped().to_string());
}
let mut found_columns = false;
while let Some(chunk) = result.next_chunk()? {
for row in &chunk {
found_columns = true;
let col_name = row.get::<String>(0).unwrap_or_default();
let _data_type = row.get::<String>(1).unwrap_or_default();
let is_nullable = row.get::<bool>(2).unwrap_or(false);
#[expect(
clippy::cast_sign_loss,
reason = "intentional u32 bit-pattern reinterpret of PostgreSQL oid transported as Int4"
)]
let type_oid = row.get::<i32>(3).unwrap_or(0) as u32;
let type_mod = row.get::<i32>(4).unwrap_or(-1);
let sql_type = SqlType::from_oid_and_modifier(type_oid, type_mod);
table_def.add_column_with_sql_type(&col_name, sql_type, is_nullable);
}
}
if !found_columns {
return Err(Error::NotFound(format!("Table {schema}.{table}")));
}
Ok(table_def)
}
pub fn create_table(&self, table_def: &TableDefinition) -> Result<()> {
let sql = table_def.to_create_sql(true)?;
self.connection.execute_command(&sql)?;
Ok(())
}
pub fn create_table_if_not_exists(&self, table_def: &TableDefinition) -> Result<()> {
let sql = table_def.to_create_sql(false)?;
self.connection.execute_command(&sql)?;
Ok(())
}
pub fn drop_table<T>(&self, table_name: T) -> Result<()>
where
T: TryInto<crate::TableName>,
crate::Error: From<T::Error>,
{
let table_name = table_name.try_into()?;
let sql = format!("DROP TABLE {table_name}");
self.connection.execute_command(&sql)?;
Ok(())
}
pub fn drop_table_if_exists<T>(&self, table_name: T) -> Result<()>
where
T: TryInto<crate::TableName>,
crate::Error: From<T::Error>,
{
let table_name = table_name.try_into()?;
let sql = format!("DROP TABLE IF EXISTS {table_name}");
self.connection.execute_command(&sql)?;
Ok(())
}
pub fn drop_schema<T>(&self, schema_name: T, cascade: bool) -> Result<()>
where
T: TryInto<crate::SchemaName>,
crate::Error: From<T::Error>,
{
let schema_name = schema_name.try_into()?;
let sql = if cascade {
format!("DROP SCHEMA {schema_name} CASCADE")
} else {
format!("DROP SCHEMA {schema_name}")
};
self.connection.execute_command(&sql)?;
Ok(())
}
pub fn drop_schema_if_exists<T>(&self, schema_name: T, cascade: bool) -> Result<()>
where
T: TryInto<crate::SchemaName>,
crate::Error: From<T::Error>,
{
let schema_name = schema_name.try_into()?;
let sql = if cascade {
format!("DROP SCHEMA IF EXISTS {schema_name} CASCADE")
} else {
format!("DROP SCHEMA IF EXISTS {schema_name}")
};
self.connection.execute_command(&sql)?;
Ok(())
}
pub fn get_row_count<T>(&self, table_name: T) -> Result<i64>
where
T: TryInto<crate::TableName>,
crate::Error: From<T::Error>,
{
let table_name = table_name.try_into()?;
self.connection
.query_count(&format!("SELECT COUNT(*) FROM {table_name}"))
}
pub fn get_column_names<T>(&self, table_name: T) -> Result<Vec<String>>
where
T: TryInto<crate::TableName>,
crate::Error: From<T::Error>,
{
let table_def = self.get_table_definition(table_name)?;
Ok(table_def.columns().iter().map(|c| c.name.clone()).collect())
}
pub fn get_database_names(&self) -> Result<Vec<String>> {
let query = "SELECT datname FROM pg_catalog.pg_database";
let mut result = self.connection.execute_query(query)?;
let mut names = Vec::new();
while let Some(chunk) = result.next_chunk()? {
for row in &chunk {
if let Some(name) = row.get::<String>(0) {
names.push(name);
}
}
}
Ok(names)
}
}