use std::str::FromStr;
use sqlx::postgres::{PgConnectOptions, PgRow};
use sqlx::{Connection, PgConnection, Row};
use crate::store::postgres_backend::{
application_name_for_with_namespace, mask_dsn_pub, resolve_dsn, StoreError,
};
use crate::store_introspect::{
build_manifest_store, IntrospectionRow, OmittedColumn,
};
use crate::store_schema_manifest::Manifest;
pub const INTROSPECT_NAMESPACE: &str = "introspect";
pub async fn introspect_store(
connection: &str,
store_name: &str,
) -> Result<(Manifest, Vec<OmittedColumn>), StoreError> {
let dsn = resolve_dsn(connection)?;
let mut conn = open_introspection_connection(&dsn, store_name).await?;
let resolved_schema = resolve_table_schema(&mut conn, store_name)
.await?
.ok_or_else(|| StoreError::TableNotResolved {
table: store_name.to_string(),
})?;
let rows = fetch_introspection_rows(&mut conn, &resolved_schema, store_name).await?;
if rows.is_empty() {
return Err(StoreError::TableNotResolved {
table: store_name.to_string(),
});
}
let (manifest_store, omissions) = build_manifest_store(&rows);
let mut manifest = Manifest::new();
let qualified = format!("{resolved_schema}.{store_name}");
manifest.stores.insert(qualified, manifest_store);
manifest.refresh_content_hash();
Ok((manifest, omissions))
}
pub async fn introspect_stores(
connection: &str,
store_names: &[String],
) -> Result<(Manifest, Vec<OmittedColumn>), StoreError> {
let mut merged = Manifest::new();
let mut all_omissions: Vec<OmittedColumn> = Vec::new();
for name in store_names {
let (m, omissions) = introspect_store(connection, name).await?;
for (key, store) in m.stores {
merged.stores.insert(key, store);
}
all_omissions.extend(omissions);
}
merged.refresh_content_hash();
Ok((merged, all_omissions))
}
async fn open_introspection_connection(
dsn: &str,
store_name: &str,
) -> Result<PgConnection, StoreError> {
let opts = PgConnectOptions::from_str(dsn)
.map_err(|e| StoreError::PoolInit {
dsn_masked: mask_dsn_pub(dsn),
source: e.to_string(),
})?
.statement_cache_capacity(0)
.application_name(&application_name_for_with_namespace(
store_name,
Some(INTROSPECT_NAMESPACE),
));
PgConnection::connect_with(&opts)
.await
.map_err(|e| StoreError::Connect {
source: e.to_string(),
})
}
async fn resolve_table_schema(
conn: &mut PgConnection,
table: &str,
) -> Result<Option<String>, StoreError> {
let primary: Option<(String,)> = sqlx::query_as(
"SELECT n.nspname \
FROM pg_catalog.pg_class c \
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace \
WHERE c.oid = to_regclass($1)",
)
.persistent(false)
.bind(format!("\"{table}\""))
.fetch_optional(&mut *conn)
.await
.map_err(|e| StoreError::Query {
op: "introspect",
source: e.to_string(),
})?;
if let Some((schema,)) = primary {
return Ok(Some(schema));
}
let scan: Vec<(String,)> = sqlx::query_as(
"SELECT n.nspname \
FROM pg_catalog.pg_class c \
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace \
WHERE c.relname = $1 \
AND c.relkind IN ('r', 'v', 'm', 'p', 'f') \
AND left(n.nspname, 3) <> 'pg_' \
AND n.nspname <> 'information_schema' \
ORDER BY n.nspname",
)
.persistent(false)
.bind(table)
.fetch_all(&mut *conn)
.await
.map_err(|e| StoreError::Query {
op: "introspect",
source: e.to_string(),
})?;
if scan.len() == 1 {
return Ok(Some(scan.into_iter().next().unwrap().0));
}
if scan.is_empty() {
return Ok(None);
}
Err(StoreError::AmbiguousTable {
table: table.to_string(),
schemas: scan.into_iter().map(|(s,)| s).collect(),
})
}
async fn fetch_introspection_rows(
conn: &mut PgConnection,
schema: &str,
table: &str,
) -> Result<Vec<IntrospectionRow>, StoreError> {
let qualified = format!("\"{schema}\".\"{table}\"");
let pg_rows = sqlx::query(
"SELECT \
a.attname AS column_name, \
t.typname AS pg_udt, \
a.attnotnull AS not_null, \
COALESCE( \
(SELECT pg_get_expr(d.adbin, d.adrelid) \
FROM pg_catalog.pg_attrdef d \
WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum), \
'' \
) AS default_expression, \
a.attidentity::text AS identity_kind, \
EXISTS ( \
SELECT 1 FROM pg_catalog.pg_constraint c \
WHERE c.conrelid = a.attrelid \
AND c.contype = 'p' \
AND a.attnum = ANY(c.conkey) \
) AS primary_key, \
EXISTS ( \
SELECT 1 FROM pg_catalog.pg_constraint c \
WHERE c.conrelid = a.attrelid \
AND c.contype = 'u' \
AND c.conkey = ARRAY[a.attnum] \
) AS unique_col \
FROM pg_catalog.pg_class cl \
JOIN pg_catalog.pg_namespace n ON n.oid = cl.relnamespace \
JOIN pg_catalog.pg_attribute a ON a.attrelid = cl.oid \
JOIN pg_catalog.pg_type t ON t.oid = a.atttypid \
WHERE cl.oid = to_regclass($1) \
AND a.attnum > 0 \
AND NOT a.attisdropped \
ORDER BY a.attnum",
)
.persistent(false)
.bind(qualified)
.fetch_all(&mut *conn)
.await
.map_err(|e| StoreError::Query {
op: "introspect",
source: e.to_string(),
})?;
let mut out: Vec<IntrospectionRow> = Vec::with_capacity(pg_rows.len());
for row in pg_rows {
out.push(decode_introspection_row(&row)?);
}
Ok(out)
}
fn decode_introspection_row(row: &PgRow) -> Result<IntrospectionRow, StoreError> {
let column_name: String =
row.try_get("column_name").map_err(|e| StoreError::Decode {
column: "column_name".into(),
pg_type: "name".into(),
source: e.to_string(),
})?;
let pg_udt: String = row.try_get("pg_udt").map_err(|e| StoreError::Decode {
column: "pg_udt".into(),
pg_type: "name".into(),
source: e.to_string(),
})?;
let not_null: bool = row.try_get("not_null").map_err(|e| StoreError::Decode {
column: "not_null".into(),
pg_type: "bool".into(),
source: e.to_string(),
})?;
let default_expression: String = row
.try_get("default_expression")
.map_err(|e| StoreError::Decode {
column: "default_expression".into(),
pg_type: "text".into(),
source: e.to_string(),
})?;
let primary_key: bool = row.try_get("primary_key").map_err(|e| StoreError::Decode {
column: "primary_key".into(),
pg_type: "bool".into(),
source: e.to_string(),
})?;
let unique_col: bool = row.try_get("unique_col").map_err(|e| StoreError::Decode {
column: "unique_col".into(),
pg_type: "bool".into(),
source: e.to_string(),
})?;
let identity_kind_text: String =
row.try_get("identity_kind").map_err(|e| StoreError::Decode {
column: "identity_kind".into(),
pg_type: "text".into(),
source: e.to_string(),
})?;
let identity_kind: Option<char> = match identity_kind_text.chars().next() {
Some('a') => Some('a'),
Some('d') => Some('d'),
Some(_) | None => None, };
Ok(IntrospectionRow {
column_name,
pg_udt,
not_null,
primary_key,
unique: unique_col,
default_expression,
identity_kind,
})
}
pub fn render_introspection_output(
manifest: &Manifest,
omissions: &[OmittedColumn],
) -> String {
let mut out = manifest.canonical_serialize(true);
if !omissions.is_empty() {
out.push('\n');
for omission in omissions {
out.push_str(&omission.as_comment_line());
out.push('\n');
}
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store_introspect::{IntrospectionRow, OmittedColumn};
#[test]
fn render_introspection_output_emits_canonical_json_with_no_omission_tail() {
let mut m = Manifest::new();
m.refresh_content_hash();
let out = render_introspection_output(&m, &[]);
assert!(out.contains(r#""version":1"#));
assert!(!out.contains("# omitted"));
assert!(!out.ends_with('\n'));
}
#[test]
fn render_introspection_output_appends_per_column_omission_comments() {
let mut m = Manifest::new();
m.refresh_content_hash();
let omissions = vec![
OmittedColumn {
name: "tier".into(),
pg_udt: "tier_enum".into(),
reason: "outside the v1.38.0 closed type catalog".into(),
},
OmittedColumn {
name: "shape".into(),
pg_udt: "geometry".into(),
reason: "outside the v1.38.0 closed type catalog".into(),
},
];
let out = render_introspection_output(&m, &omissions);
assert!(out.contains("# omitted: column `tier` (pg type `tier_enum`)"));
assert!(out.contains("# omitted: column `shape` (pg type `geometry`)"));
assert!(out.ends_with('\n'));
}
#[test]
fn introspect_namespace_constant_is_stable() {
assert_eq!(INTROSPECT_NAMESPACE, "introspect");
}
}