use crate::pg::connection::PgConnection;
use crate::util::{Result, SchemaError};
use serde::{Deserialize, Serialize};
use sqlx::Row;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum UnsupportedObject {
CompositeType {
schema: String,
name: String,
},
Aggregate {
schema: String,
name: String,
},
Rule {
schema: String,
table: String,
name: String,
},
InheritedTable {
schema: String,
name: String,
},
ForeignTable {
schema: String,
name: String,
},
}
impl UnsupportedObject {
pub fn kind(&self) -> &'static str {
match self {
Self::CompositeType { .. } => "composite type",
Self::Aggregate { .. } => "aggregate",
Self::Rule { .. } => "rule",
Self::InheritedTable { .. } => "inherited table",
Self::ForeignTable { .. } => "foreign table",
}
}
pub fn qualified_name(&self) -> String {
match self {
Self::CompositeType { schema, name }
| Self::Aggregate { schema, name }
| Self::InheritedTable { schema, name }
| Self::ForeignTable { schema, name } => format!("{schema}.{name}"),
Self::Rule {
schema,
table,
name,
} => format!("{schema}.{table}.{name}"),
}
}
}
pub async fn detect_unsupported_objects(
connection: &PgConnection,
target_schemas: &[String],
) -> Result<Vec<UnsupportedObject>> {
let mut unsupported = Vec::new();
unsupported.extend(detect_composite_types(connection, target_schemas).await?);
unsupported.extend(detect_aggregates(connection, target_schemas).await?);
unsupported.extend(detect_rules(connection, target_schemas).await?);
unsupported.extend(detect_inherited_tables(connection, target_schemas).await?);
unsupported.extend(detect_foreign_tables(connection, target_schemas).await?);
Ok(unsupported)
}
async fn detect_composite_types(
connection: &PgConnection,
target_schemas: &[String],
) -> Result<Vec<UnsupportedObject>> {
let rows = sqlx::query(
r#"
SELECT n.nspname, t.typname
FROM pg_type t
JOIN pg_namespace n ON t.typnamespace = n.oid
WHERE t.typtype = 'c'
AND n.nspname = ANY($1)
AND NOT EXISTS (
SELECT 1 FROM pg_class c
WHERE c.reltype = t.oid AND c.relkind IN ('r', 'v', 'f', 'm')
)
"#,
)
.bind(target_schemas)
.fetch_all(connection.pool())
.await
.map_err(|e| SchemaError::DatabaseError(format!("Failed to detect composite types: {e}")))?;
Ok(rows
.into_iter()
.map(|row| UnsupportedObject::CompositeType {
schema: row.get("nspname"),
name: row.get("typname"),
})
.collect())
}
async fn detect_aggregates(
connection: &PgConnection,
target_schemas: &[String],
) -> Result<Vec<UnsupportedObject>> {
let rows = sqlx::query(
r#"
SELECT n.nspname, p.proname
FROM pg_proc p
JOIN pg_namespace n ON p.pronamespace = n.oid
WHERE p.prokind = 'a' AND n.nspname = ANY($1)
"#,
)
.bind(target_schemas)
.fetch_all(connection.pool())
.await
.map_err(|e| SchemaError::DatabaseError(format!("Failed to detect aggregates: {e}")))?;
Ok(rows
.into_iter()
.map(|row| UnsupportedObject::Aggregate {
schema: row.get("nspname"),
name: row.get("proname"),
})
.collect())
}
async fn detect_rules(
connection: &PgConnection,
target_schemas: &[String],
) -> Result<Vec<UnsupportedObject>> {
let rows = sqlx::query(
r#"
SELECT schemaname, tablename, rulename
FROM pg_rules
WHERE schemaname = ANY($1) AND rulename NOT LIKE '_RETURN'
"#,
)
.bind(target_schemas)
.fetch_all(connection.pool())
.await
.map_err(|e| SchemaError::DatabaseError(format!("Failed to detect rules: {e}")))?;
Ok(rows
.into_iter()
.map(|row| UnsupportedObject::Rule {
schema: row.get("schemaname"),
table: row.get("tablename"),
name: row.get("rulename"),
})
.collect())
}
async fn detect_inherited_tables(
connection: &PgConnection,
target_schemas: &[String],
) -> Result<Vec<UnsupportedObject>> {
let rows = sqlx::query(
r#"
SELECT n.nspname, c.relname
FROM pg_class c
JOIN pg_namespace n ON c.relnamespace = n.oid
JOIN pg_inherits i ON c.oid = i.inhrelid
WHERE n.nspname = ANY($1)
AND NOT c.relispartition
"#,
)
.bind(target_schemas)
.fetch_all(connection.pool())
.await
.map_err(|e| SchemaError::DatabaseError(format!("Failed to detect inherited tables: {e}")))?;
Ok(rows
.into_iter()
.map(|row| UnsupportedObject::InheritedTable {
schema: row.get("nspname"),
name: row.get("relname"),
})
.collect())
}
async fn detect_foreign_tables(
connection: &PgConnection,
target_schemas: &[String],
) -> Result<Vec<UnsupportedObject>> {
let rows = sqlx::query(
r#"
SELECT n.nspname, c.relname
FROM pg_class c
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE c.relkind = 'f' AND n.nspname = ANY($1)
"#,
)
.bind(target_schemas)
.fetch_all(connection.pool())
.await
.map_err(|e| SchemaError::DatabaseError(format!("Failed to detect foreign tables: {e}")))?;
Ok(rows
.into_iter()
.map(|row| UnsupportedObject::ForeignTable {
schema: row.get("nspname"),
name: row.get("relname"),
})
.collect())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn unsupported_object_kind() {
let composite = UnsupportedObject::CompositeType {
schema: "public".into(),
name: "address".into(),
};
assert_eq!(composite.kind(), "composite type");
let rule = UnsupportedObject::Rule {
schema: "public".into(),
table: "users".into(),
name: "protect_users".into(),
};
assert_eq!(rule.kind(), "rule");
}
#[test]
fn unsupported_object_qualified_name() {
let composite = UnsupportedObject::CompositeType {
schema: "analytics".into(),
name: "address".into(),
};
assert_eq!(composite.qualified_name(), "analytics.address");
let rule = UnsupportedObject::Rule {
schema: "public".into(),
table: "users".into(),
name: "protect_users".into(),
};
assert_eq!(rule.qualified_name(), "public.users.protect_users");
}
}