Skip to main content

schema_risk/
db.rs

1//! Live PostgreSQL schema introspection.
2//!
3//! When the user passes `--db-url postgres://...` the engine fetches real
4//! metadata (table sizes, row counts, index definitions, FK constraints) and
5//! merges it into the risk analysis so scores are accurate instead of
6//! guesswork.
7//!
8//! Compiled only when the `db` feature is enabled.
9
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12
13// ─────────────────────────────────────────────
14// Live schema snapshot from a running database
15// ─────────────────────────────────────────────
16
17/// Everything we know about the live database.
18#[derive(Debug, Clone, Default, Serialize, Deserialize)]
19pub struct LiveSchema {
20    /// table_name → metadata
21    pub tables: HashMap<String, TableMeta>,
22    /// index_name → metadata
23    pub indexes: HashMap<String, IndexMeta>,
24    /// constraint_name → FK metadata
25    pub foreign_keys: Vec<FkMeta>,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct TableMeta {
30    pub name: String,
31    pub schema: String,
32    /// pg_class.reltuples estimate (may be stale between VACUUMs)
33    pub estimated_rows: i64,
34    /// actual disk size in bytes (pg_total_relation_size)
35    pub total_size_bytes: i64,
36    /// human-readable (e.g. "42 MB")
37    pub total_size_pretty: String,
38    pub columns: Vec<ColumnMeta>,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct ColumnMeta {
43    pub name: String,
44    pub data_type: String,
45    pub is_nullable: bool,
46    pub column_default: Option<String>,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct IndexMeta {
51    pub name: String,
52    pub table: String,
53    pub definition: String,
54    pub is_unique: bool,
55    pub is_primary: bool,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct FkMeta {
60    pub constraint_name: String,
61    pub from_schema: String,
62    pub from_table: String,
63    pub from_column: String,
64    pub to_schema: String,
65    pub to_table: String,
66    pub to_column: String,
67    pub on_delete: String,
68    pub on_update: String,
69}
70
71// ─────────────────────────────────────────────
72// Feature-gated connector
73// ─────────────────────────────────────────────
74
75#[cfg(feature = "db")]
76pub mod connector {
77    use super::*;
78    use crate::error::SchemaRiskError;
79    use tokio_postgres::{Client, NoTls};
80
81    /// Connect to PostgreSQL and return a full `LiveSchema`.
82    pub async fn fetch(db_url: &str) -> Result<LiveSchema, SchemaRiskError> {
83        let (client, connection) = tokio_postgres::connect(db_url, NoTls)
84            .await
85            .map_err(|e| SchemaRiskError::DbConnect(e.to_string()))?;
86
87        // Drive the connection in the background
88        tokio::spawn(async move {
89            if let Err(e) = connection.await {
90                eprintln!("db connection error: {}", e);
91            }
92        });
93
94        let tables = fetch_tables(&client).await?;
95        let indexes = fetch_indexes(&client).await?;
96        let foreign_keys = fetch_foreign_keys(&client).await?;
97
98        Ok(LiveSchema {
99            tables,
100            indexes,
101            foreign_keys,
102        })
103    }
104
105    // ── table sizes & row counts ──────────────────────────────────────────
106
107    async fn fetch_tables(client: &Client) -> Result<HashMap<String, TableMeta>, SchemaRiskError> {
108        // pg_class.reltuples is a float estimate updated by VACUUM/ANALYZE.
109        // pg_total_relation_size includes toast + indexes.
110        let rows = client
111            .query(
112                r#"
113SELECT
114    t.table_schema            AS schema,
115    t.table_name              AS name,
116    COALESCE(c.reltuples, 0)::bigint  AS estimated_rows,
117    pg_total_relation_size(quote_ident(t.table_schema) || '.' || quote_ident(t.table_name))
118                              AS total_size_bytes,
119    pg_size_pretty(pg_total_relation_size(quote_ident(t.table_schema) || '.' || quote_ident(t.table_name)))
120                              AS total_size_pretty
121FROM information_schema.tables t
122LEFT JOIN pg_class c
123    ON c.relname = t.table_name
124    AND c.relnamespace = (
125        SELECT oid FROM pg_namespace WHERE nspname = t.table_schema
126    )
127WHERE t.table_schema NOT IN ('pg_catalog', 'information_schema')
128  AND t.table_type = 'BASE TABLE'
129ORDER BY t.table_schema, t.table_name
130                "#,
131                &[],
132            )
133            .await
134            .map_err(|e| SchemaRiskError::DbQuery(e.to_string()))?;
135
136        let mut tables: HashMap<String, TableMeta> = HashMap::new();
137
138        for row in &rows {
139            let schema: String = row.get("schema");
140            let name: String = row.get("name");
141            let estimated_rows: i64 = row.get("estimated_rows");
142            let total_size_bytes: i64 = row.get("total_size_bytes");
143            let total_size_pretty: String = row.get("total_size_pretty");
144
145            // Fetch columns for this table
146            tables.insert(
147                name.clone(),
148                TableMeta {
149                    name: name.clone(),
150                    schema,
151                    estimated_rows,
152                    total_size_bytes,
153                    total_size_pretty,
154                    columns: vec![], // filled below
155                },
156            );
157        }
158
159        // Fetch columns in a single bulk query (avoids N+1)
160        let col_rows = client
161            .query(
162                r#"
163SELECT
164    table_schema,
165    table_name,
166    column_name,
167    data_type,
168    is_nullable,
169    column_default
170FROM information_schema.columns
171WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
172ORDER BY table_name, ordinal_position
173                "#,
174                &[],
175            )
176            .await
177            .map_err(|e| SchemaRiskError::DbQuery(e.to_string()))?;
178
179        for row in &col_rows {
180            let table_name: String = row.get("table_name");
181            let col = ColumnMeta {
182                name: row.get("column_name"),
183                data_type: row.get("data_type"),
184                is_nullable: row.get::<_, &str>("is_nullable") == "YES",
185                column_default: row.get("column_default"),
186            };
187            if let Some(t) = tables.get_mut(&table_name) {
188                t.columns.push(col);
189            }
190        }
191
192        Ok(tables)
193    }
194
195    // ── indexes ───────────────────────────────────────────────────────────
196
197    async fn fetch_indexes(client: &Client) -> Result<HashMap<String, IndexMeta>, SchemaRiskError> {
198        let rows = client
199            .query(
200                r#"
201SELECT
202    i.indexname        AS name,
203    i.tablename        AS tablename,
204    i.indexdef         AS definition,
205    ix.indisunique     AS is_unique,
206    ix.indisprimary    AS is_primary
207FROM pg_indexes i
208JOIN pg_class c   ON c.relname = i.indexname
209JOIN pg_index ix  ON ix.indexrelid = c.oid
210WHERE i.schemaname NOT IN ('pg_catalog', 'information_schema')
211ORDER BY i.tablename, i.indexname
212                "#,
213                &[],
214            )
215            .await
216            .map_err(|e| SchemaRiskError::DbQuery(e.to_string()))?;
217
218        let mut indexes = HashMap::new();
219        for row in &rows {
220            let name: String = row.get("name");
221            indexes.insert(
222                name.clone(),
223                IndexMeta {
224                    name,
225                    table: row.get("tablename"),
226                    definition: row.get("definition"),
227                    is_unique: row.get("is_unique"),
228                    is_primary: row.get("is_primary"),
229                },
230            );
231        }
232        Ok(indexes)
233    }
234
235    // ── foreign keys ──────────────────────────────────────────────────────
236
237    async fn fetch_foreign_keys(client: &Client) -> Result<Vec<FkMeta>, SchemaRiskError> {
238        let rows = client
239            .query(
240                r#"
241SELECT
242    tc.constraint_name,
243    tc.table_schema     AS from_schema,
244    tc.table_name       AS from_table,
245    kcu.column_name     AS from_column,
246    ccu.table_schema    AS to_schema,
247    ccu.table_name      AS to_table,
248    ccu.column_name     AS to_column,
249    rc.delete_rule      AS on_delete,
250    rc.update_rule      AS on_update
251FROM information_schema.table_constraints       AS tc
252JOIN information_schema.key_column_usage        AS kcu
253    ON tc.constraint_name = kcu.constraint_name
254    AND tc.table_schema   = kcu.table_schema
255JOIN information_schema.referential_constraints AS rc
256    ON tc.constraint_name = rc.constraint_name
257JOIN information_schema.constraint_column_usage AS ccu
258    ON ccu.constraint_name = rc.unique_constraint_name
259    AND ccu.table_schema   = rc.unique_constraint_schema
260WHERE tc.constraint_type = 'FOREIGN KEY'
261ORDER BY tc.table_name, tc.constraint_name
262                "#,
263                &[],
264            )
265            .await
266            .map_err(|e| SchemaRiskError::DbQuery(e.to_string()))?;
267
268        let fks = rows
269            .iter()
270            .map(|r| FkMeta {
271                constraint_name: r.get("constraint_name"),
272                from_schema: r.get("from_schema"),
273                from_table: r.get("from_table"),
274                from_column: r.get("from_column"),
275                to_schema: r.get("to_schema"),
276                to_table: r.get("to_table"),
277                to_column: r.get("to_column"),
278                on_delete: r.get("on_delete"),
279                on_update: r.get("on_update"),
280            })
281            .collect();
282
283        Ok(fks)
284    }
285}
286
287// ─────────────────────────────────────────────
288// Stub for builds without the `db` feature
289// ─────────────────────────────────────────────
290
291#[cfg(not(feature = "db"))]
292pub mod connector {
293    use super::*;
294    use crate::error::SchemaRiskError;
295
296    pub async fn fetch(_db_url: &str) -> Result<LiveSchema, SchemaRiskError> {
297        Err(SchemaRiskError::FeatureDisabled(
298            "Database introspection requires the `db` feature. \
299             Rebuild with: cargo build --features db"
300                .to_string(),
301        ))
302    }
303}
304
305// ─────────────────────────────────────────────
306// Convert LiveSchema into the row_counts map
307// ─────────────────────────────────────────────
308
309impl LiveSchema {
310    /// Produce the `HashMap<table → rows>` that `RiskEngine` expects.
311    pub fn to_row_counts(&self) -> HashMap<String, u64> {
312        self.tables
313            .values()
314            .map(|t| (t.name.clone(), t.estimated_rows.max(0) as u64))
315            .collect()
316    }
317
318    /// Total size in bytes for a given table (0 if unknown).
319    pub fn table_size_bytes(&self, table: &str) -> i64 {
320        self.tables
321            .get(table)
322            .map(|t| t.total_size_bytes)
323            .unwrap_or(0)
324    }
325}