kaccy_db/
schema_validator.rs

1//! Database schema validation and integrity checking
2//!
3//! This module provides comprehensive schema validation including:
4//! - Foreign key relationship validation
5//! - Missing index detection on foreign keys
6//! - Orphaned record detection
7//! - Constraint validation
8//! - Data integrity checks
9
10use crate::error::Result;
11use serde::{Deserialize, Serialize};
12use sqlx::{PgPool, Row};
13
14/// Foreign key constraint information
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct ForeignKeyInfo {
17    /// Constraint name
18    pub constraint_name: String,
19    /// Source table name
20    pub table_name: String,
21    /// Source column name
22    pub column_name: String,
23    /// Referenced table name
24    pub referenced_table: String,
25    /// Referenced column name
26    pub referenced_column: String,
27    /// Whether the foreign key has an index
28    pub has_index: bool,
29}
30
31/// Orphaned record information
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct OrphanedRecordInfo {
34    /// Table containing orphaned records
35    pub table_name: String,
36    /// Foreign key column
37    pub column_name: String,
38    /// Referenced table
39    pub referenced_table: String,
40    /// Number of orphaned records
41    pub orphaned_count: i64,
42}
43
44/// Check constraint information
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct CheckConstraintInfo {
47    /// Constraint name
48    pub constraint_name: String,
49    /// Table name
50    pub table_name: String,
51    /// Check definition
52    pub check_definition: String,
53    /// Whether the constraint is validated
54    pub is_validated: bool,
55}
56
57/// Schema validation issue severity
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
59pub enum ValidationSeverity {
60    /// Informational issue
61    Info,
62    /// Warning - should be addressed
63    Warning,
64    /// Error - requires immediate attention
65    Error,
66    /// Critical - data integrity at risk
67    Critical,
68}
69
70/// Schema validation issue
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct ValidationIssue {
73    /// Issue severity
74    pub severity: ValidationSeverity,
75    /// Issue category
76    pub category: String,
77    /// Issue description
78    pub description: String,
79    /// Affected object (table, constraint, etc.)
80    pub affected_object: String,
81    /// Recommendation for fixing
82    pub recommendation: String,
83}
84
85/// Complete schema validation report
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct SchemaValidationReport {
88    /// All validation issues found
89    pub issues: Vec<ValidationIssue>,
90    /// Foreign key information
91    pub foreign_keys: Vec<ForeignKeyInfo>,
92    /// Orphaned records found
93    pub orphaned_records: Vec<OrphanedRecordInfo>,
94    /// Check constraints
95    pub check_constraints: Vec<CheckConstraintInfo>,
96    /// Total number of issues by severity
97    pub critical_count: usize,
98    /// Number of errors
99    pub error_count: usize,
100    /// Number of warnings
101    pub warning_count: usize,
102    /// Number of info items
103    pub info_count: usize,
104    /// Validation timestamp
105    pub validated_at: chrono::DateTime<chrono::Utc>,
106}
107
108/// Get all foreign key constraints in the database
109pub async fn get_foreign_keys(pool: &PgPool) -> Result<Vec<ForeignKeyInfo>> {
110    let rows = sqlx::query(
111        r#"
112        SELECT
113            tc.constraint_name,
114            tc.table_name,
115            kcu.column_name,
116            ccu.table_name AS referenced_table,
117            ccu.column_name AS referenced_column,
118            EXISTS(
119                SELECT 1 FROM pg_indexes
120                WHERE tablename = tc.table_name
121                AND indexdef LIKE '%' || kcu.column_name || '%'
122            ) AS has_index
123        FROM information_schema.table_constraints AS tc
124        JOIN information_schema.key_column_usage AS kcu
125            ON tc.constraint_name = kcu.constraint_name
126            AND tc.table_schema = kcu.table_schema
127        JOIN information_schema.constraint_column_usage AS ccu
128            ON ccu.constraint_name = tc.constraint_name
129            AND ccu.table_schema = tc.table_schema
130        WHERE tc.constraint_type = 'FOREIGN KEY'
131            AND tc.table_schema = 'public'
132        ORDER BY tc.table_name, kcu.column_name
133        "#,
134    )
135    .fetch_all(pool)
136    .await?;
137
138    let mut foreign_keys = Vec::new();
139    for row in rows {
140        foreign_keys.push(ForeignKeyInfo {
141            constraint_name: row.try_get("constraint_name")?,
142            table_name: row.try_get("table_name")?,
143            column_name: row.try_get("column_name")?,
144            referenced_table: row.try_get("referenced_table").unwrap_or_default(),
145            referenced_column: row.try_get("referenced_column").unwrap_or_default(),
146            has_index: row.try_get("has_index").unwrap_or(false),
147        });
148    }
149
150    Ok(foreign_keys)
151}
152
153/// Detect orphaned records for a specific foreign key
154pub async fn detect_orphaned_records(
155    pool: &PgPool,
156    table_name: &str,
157    column_name: &str,
158    referenced_table: &str,
159    referenced_column: &str,
160) -> Result<i64> {
161    let query = format!(
162        r#"
163        SELECT COUNT(*) as count
164        FROM "{}" AS t
165        WHERE t."{}" IS NOT NULL
166        AND NOT EXISTS (
167            SELECT 1 FROM "{}" AS r
168            WHERE r."{}" = t."{}"
169        )
170        "#,
171        table_name, column_name, referenced_table, referenced_column, column_name
172    );
173
174    let row: (i64,) = sqlx::query_as(&query).fetch_one(pool).await?;
175    Ok(row.0)
176}
177
178/// Get all orphaned records in the database
179pub async fn get_all_orphaned_records(pool: &PgPool) -> Result<Vec<OrphanedRecordInfo>> {
180    let foreign_keys = get_foreign_keys(pool).await?;
181    let mut orphaned_records = Vec::new();
182
183    for fk in foreign_keys {
184        let count = detect_orphaned_records(
185            pool,
186            &fk.table_name,
187            &fk.column_name,
188            &fk.referenced_table,
189            &fk.referenced_column,
190        )
191        .await?;
192
193        if count > 0 {
194            orphaned_records.push(OrphanedRecordInfo {
195                table_name: fk.table_name,
196                column_name: fk.column_name,
197                referenced_table: fk.referenced_table,
198                orphaned_count: count,
199            });
200        }
201    }
202
203    Ok(orphaned_records)
204}
205
206/// Get all check constraints in the database
207pub async fn get_check_constraints(pool: &PgPool) -> Result<Vec<CheckConstraintInfo>> {
208    let rows = sqlx::query(
209        r#"
210        SELECT
211            tc.constraint_name,
212            tc.table_name,
213            pg_get_constraintdef(pgc.oid) AS check_definition,
214            pgc.convalidated AS is_validated
215        FROM information_schema.table_constraints AS tc
216        JOIN pg_constraint AS pgc
217            ON pgc.conname = tc.constraint_name
218        WHERE tc.constraint_type = 'CHECK'
219            AND tc.table_schema = 'public'
220        ORDER BY tc.table_name, tc.constraint_name
221        "#,
222    )
223    .fetch_all(pool)
224    .await?;
225
226    let mut constraints = Vec::new();
227    for row in rows {
228        constraints.push(CheckConstraintInfo {
229            constraint_name: row.try_get("constraint_name")?,
230            table_name: row.try_get("table_name")?,
231            check_definition: row.try_get("check_definition").unwrap_or_default(),
232            is_validated: row.try_get("is_validated").unwrap_or(false),
233        });
234    }
235
236    Ok(constraints)
237}
238
239/// Validate database schema and generate a comprehensive report
240pub async fn validate_schema(pool: &PgPool) -> Result<SchemaValidationReport> {
241    let mut issues = Vec::new();
242
243    // Get foreign keys and check for missing indexes
244    let foreign_keys = get_foreign_keys(pool).await?;
245    for fk in &foreign_keys {
246        if !fk.has_index {
247            issues.push(ValidationIssue {
248                severity: ValidationSeverity::Warning,
249                category: "Missing Index".to_string(),
250                description: format!(
251                    "Foreign key column '{}' in table '{}' lacks an index",
252                    fk.column_name, fk.table_name
253                ),
254                affected_object: format!("{}.{}", fk.table_name, fk.column_name),
255                recommendation: format!(
256                    "CREATE INDEX idx_{}_{} ON \"{}\" (\"{}\");",
257                    fk.table_name, fk.column_name, fk.table_name, fk.column_name
258                ),
259            });
260        }
261    }
262
263    // Check for orphaned records
264    let orphaned_records = get_all_orphaned_records(pool).await?;
265    for orphaned in &orphaned_records {
266        let severity = if orphaned.orphaned_count > 100 {
267            ValidationSeverity::Critical
268        } else if orphaned.orphaned_count > 10 {
269            ValidationSeverity::Error
270        } else {
271            ValidationSeverity::Warning
272        };
273
274        issues.push(ValidationIssue {
275            severity,
276            category: "Orphaned Records".to_string(),
277            description: format!(
278                "Table '{}' has {} orphaned records in column '{}'",
279                orphaned.table_name, orphaned.orphaned_count, orphaned.column_name
280            ),
281            affected_object: format!("{}.{}", orphaned.table_name, orphaned.column_name),
282            recommendation: format!(
283                "Clean up orphaned records or fix references to '{}'",
284                orphaned.referenced_table
285            ),
286        });
287    }
288
289    // Check for unvalidated constraints
290    let check_constraints = get_check_constraints(pool).await?;
291    for constraint in &check_constraints {
292        if !constraint.is_validated {
293            issues.push(ValidationIssue {
294                severity: ValidationSeverity::Warning,
295                category: "Unvalidated Constraint".to_string(),
296                description: format!(
297                    "Check constraint '{}' on table '{}' is not validated",
298                    constraint.constraint_name, constraint.table_name
299                ),
300                affected_object: format!(
301                    "{}.{}",
302                    constraint.table_name, constraint.constraint_name
303                ),
304                recommendation: format!(
305                    "ALTER TABLE \"{}\" VALIDATE CONSTRAINT \"{}\";",
306                    constraint.table_name, constraint.constraint_name
307                ),
308            });
309        }
310    }
311
312    // Count issues by severity
313    let critical_count = issues
314        .iter()
315        .filter(|i| i.severity == ValidationSeverity::Critical)
316        .count();
317    let error_count = issues
318        .iter()
319        .filter(|i| i.severity == ValidationSeverity::Error)
320        .count();
321    let warning_count = issues
322        .iter()
323        .filter(|i| i.severity == ValidationSeverity::Warning)
324        .count();
325    let info_count = issues
326        .iter()
327        .filter(|i| i.severity == ValidationSeverity::Info)
328        .count();
329
330    Ok(SchemaValidationReport {
331        issues,
332        foreign_keys,
333        orphaned_records,
334        check_constraints,
335        critical_count,
336        error_count,
337        warning_count,
338        info_count,
339        validated_at: chrono::Utc::now(),
340    })
341}
342
343/// Get tables with missing primary keys
344pub async fn get_tables_without_primary_keys(pool: &PgPool) -> Result<Vec<String>> {
345    let rows = sqlx::query(
346        r#"
347        SELECT table_name
348        FROM information_schema.tables
349        WHERE table_schema = 'public'
350        AND table_type = 'BASE TABLE'
351        AND table_name NOT IN (
352            SELECT table_name
353            FROM information_schema.table_constraints
354            WHERE constraint_type = 'PRIMARY KEY'
355            AND table_schema = 'public'
356        )
357        ORDER BY table_name
358        "#,
359    )
360    .fetch_all(pool)
361    .await?;
362
363    let mut tables = Vec::new();
364    for row in rows {
365        tables.push(row.try_get("table_name")?);
366    }
367
368    Ok(tables)
369}
370
371/// Get columns that should be NOT NULL but aren't
372pub async fn get_nullable_foreign_key_columns(pool: &PgPool) -> Result<Vec<String>> {
373    let rows = sqlx::query(
374        r#"
375        SELECT DISTINCT
376            kcu.table_name || '.' || kcu.column_name AS column_path
377        FROM information_schema.key_column_usage AS kcu
378        JOIN information_schema.table_constraints AS tc
379            ON kcu.constraint_name = tc.constraint_name
380        JOIN information_schema.columns AS c
381            ON c.table_name = kcu.table_name
382            AND c.column_name = kcu.column_name
383        WHERE tc.constraint_type = 'FOREIGN KEY'
384            AND c.is_nullable = 'YES'
385            AND kcu.table_schema = 'public'
386        ORDER BY column_path
387        "#,
388    )
389    .fetch_all(pool)
390    .await?;
391
392    let mut columns = Vec::new();
393    for row in rows {
394        columns.push(row.try_get("column_path")?);
395    }
396
397    Ok(columns)
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403
404    #[test]
405    fn test_foreign_key_info_creation() {
406        let fk = ForeignKeyInfo {
407            constraint_name: "fk_user_id".to_string(),
408            table_name: "orders".to_string(),
409            column_name: "user_id".to_string(),
410            referenced_table: "users".to_string(),
411            referenced_column: "id".to_string(),
412            has_index: true,
413        };
414
415        assert_eq!(fk.table_name, "orders");
416        assert_eq!(fk.referenced_table, "users");
417        assert!(fk.has_index);
418    }
419
420    #[test]
421    fn test_orphaned_record_info_creation() {
422        let orphaned = OrphanedRecordInfo {
423            table_name: "orders".to_string(),
424            column_name: "user_id".to_string(),
425            referenced_table: "users".to_string(),
426            orphaned_count: 42,
427        };
428
429        assert_eq!(orphaned.orphaned_count, 42);
430    }
431
432    #[test]
433    fn test_validation_issue_severity() {
434        let issue = ValidationIssue {
435            severity: ValidationSeverity::Critical,
436            category: "Data Integrity".to_string(),
437            description: "Test issue".to_string(),
438            affected_object: "test_table".to_string(),
439            recommendation: "Fix it".to_string(),
440        };
441
442        assert_eq!(issue.severity, ValidationSeverity::Critical);
443    }
444
445    #[test]
446    fn test_validation_severity_ordering() {
447        assert!(ValidationSeverity::Critical != ValidationSeverity::Error);
448        assert!(ValidationSeverity::Warning != ValidationSeverity::Info);
449    }
450
451    #[test]
452    fn test_check_constraint_info() {
453        let constraint = CheckConstraintInfo {
454            constraint_name: "check_positive".to_string(),
455            table_name: "balances".to_string(),
456            check_definition: "CHECK (amount >= 0)".to_string(),
457            is_validated: true,
458        };
459
460        assert_eq!(constraint.table_name, "balances");
461        assert!(constraint.is_validated);
462    }
463
464    #[test]
465    fn test_schema_validation_report_counts() {
466        let report = SchemaValidationReport {
467            issues: vec![
468                ValidationIssue {
469                    severity: ValidationSeverity::Critical,
470                    category: "Test".to_string(),
471                    description: "Critical issue".to_string(),
472                    affected_object: "obj1".to_string(),
473                    recommendation: "Fix".to_string(),
474                },
475                ValidationIssue {
476                    severity: ValidationSeverity::Warning,
477                    category: "Test".to_string(),
478                    description: "Warning issue".to_string(),
479                    affected_object: "obj2".to_string(),
480                    recommendation: "Fix".to_string(),
481                },
482            ],
483            foreign_keys: vec![],
484            orphaned_records: vec![],
485            check_constraints: vec![],
486            critical_count: 1,
487            error_count: 0,
488            warning_count: 1,
489            info_count: 0,
490            validated_at: chrono::Utc::now(),
491        };
492
493        assert_eq!(report.critical_count, 1);
494        assert_eq!(report.warning_count, 1);
495        assert_eq!(report.issues.len(), 2);
496    }
497
498    #[test]
499    fn test_validation_issue_serialization() {
500        let issue = ValidationIssue {
501            severity: ValidationSeverity::Error,
502            category: "Integrity".to_string(),
503            description: "Test".to_string(),
504            affected_object: "table.column".to_string(),
505            recommendation: "Fix immediately".to_string(),
506        };
507
508        let json = serde_json::to_string(&issue).unwrap();
509        let deserialized: ValidationIssue = serde_json::from_str(&json).unwrap();
510        assert_eq!(issue.severity, deserialized.severity);
511        assert_eq!(issue.category, deserialized.category);
512    }
513
514    #[test]
515    fn test_foreign_key_serialization() {
516        let fk = ForeignKeyInfo {
517            constraint_name: "fk_test".to_string(),
518            table_name: "test".to_string(),
519            column_name: "id".to_string(),
520            referenced_table: "ref".to_string(),
521            referenced_column: "id".to_string(),
522            has_index: false,
523        };
524
525        let json = serde_json::to_string(&fk).unwrap();
526        let deserialized: ForeignKeyInfo = serde_json::from_str(&json).unwrap();
527        assert_eq!(fk.table_name, deserialized.table_name);
528        assert_eq!(fk.has_index, deserialized.has_index);
529    }
530
531    #[test]
532    fn test_orphaned_record_serialization() {
533        let orphaned = OrphanedRecordInfo {
534            table_name: "orders".to_string(),
535            column_name: "user_id".to_string(),
536            referenced_table: "users".to_string(),
537            orphaned_count: 10,
538        };
539
540        let json = serde_json::to_string(&orphaned).unwrap();
541        let deserialized: OrphanedRecordInfo = serde_json::from_str(&json).unwrap();
542        assert_eq!(orphaned.orphaned_count, deserialized.orphaned_count);
543    }
544
545    #[test]
546    fn test_schema_validation_report_serialization() {
547        let report = SchemaValidationReport {
548            issues: vec![],
549            foreign_keys: vec![],
550            orphaned_records: vec![],
551            check_constraints: vec![],
552            critical_count: 0,
553            error_count: 0,
554            warning_count: 0,
555            info_count: 0,
556            validated_at: chrono::Utc::now(),
557        };
558
559        let json = serde_json::to_string(&report).unwrap();
560        let deserialized: SchemaValidationReport = serde_json::from_str(&json).unwrap();
561        assert_eq!(report.critical_count, deserialized.critical_count);
562    }
563}