1use crate::error::Result;
11use serde::{Deserialize, Serialize};
12use sqlx::{PgPool, Row};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct ForeignKeyInfo {
17 pub constraint_name: String,
19 pub table_name: String,
21 pub column_name: String,
23 pub referenced_table: String,
25 pub referenced_column: String,
27 pub has_index: bool,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct OrphanedRecordInfo {
34 pub table_name: String,
36 pub column_name: String,
38 pub referenced_table: String,
40 pub orphaned_count: i64,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct CheckConstraintInfo {
47 pub constraint_name: String,
49 pub table_name: String,
51 pub check_definition: String,
53 pub is_validated: bool,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
59pub enum ValidationSeverity {
60 Info,
62 Warning,
64 Error,
66 Critical,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct ValidationIssue {
73 pub severity: ValidationSeverity,
75 pub category: String,
77 pub description: String,
79 pub affected_object: String,
81 pub recommendation: String,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct SchemaValidationReport {
88 pub issues: Vec<ValidationIssue>,
90 pub foreign_keys: Vec<ForeignKeyInfo>,
92 pub orphaned_records: Vec<OrphanedRecordInfo>,
94 pub check_constraints: Vec<CheckConstraintInfo>,
96 pub critical_count: usize,
98 pub error_count: usize,
100 pub warning_count: usize,
102 pub info_count: usize,
104 pub validated_at: chrono::DateTime<chrono::Utc>,
106}
107
108pub async fn get_foreign_keys(pool: &PgPool) -> Result<Vec<ForeignKeyInfo>> {
110 let rows = sqlx::query(
111 r#"
112 SELECT
113 tc.constraint_name,
114 tc.table_name,
115 kcu.column_name,
116 ccu.table_name AS referenced_table,
117 ccu.column_name AS referenced_column,
118 EXISTS(
119 SELECT 1 FROM pg_indexes
120 WHERE tablename = tc.table_name
121 AND indexdef LIKE '%' || kcu.column_name || '%'
122 ) AS has_index
123 FROM information_schema.table_constraints AS tc
124 JOIN information_schema.key_column_usage AS kcu
125 ON tc.constraint_name = kcu.constraint_name
126 AND tc.table_schema = kcu.table_schema
127 JOIN information_schema.constraint_column_usage AS ccu
128 ON ccu.constraint_name = tc.constraint_name
129 AND ccu.table_schema = tc.table_schema
130 WHERE tc.constraint_type = 'FOREIGN KEY'
131 AND tc.table_schema = 'public'
132 ORDER BY tc.table_name, kcu.column_name
133 "#,
134 )
135 .fetch_all(pool)
136 .await?;
137
138 let mut foreign_keys = Vec::new();
139 for row in rows {
140 foreign_keys.push(ForeignKeyInfo {
141 constraint_name: row.try_get("constraint_name")?,
142 table_name: row.try_get("table_name")?,
143 column_name: row.try_get("column_name")?,
144 referenced_table: row.try_get("referenced_table").unwrap_or_default(),
145 referenced_column: row.try_get("referenced_column").unwrap_or_default(),
146 has_index: row.try_get("has_index").unwrap_or(false),
147 });
148 }
149
150 Ok(foreign_keys)
151}
152
153pub async fn detect_orphaned_records(
155 pool: &PgPool,
156 table_name: &str,
157 column_name: &str,
158 referenced_table: &str,
159 referenced_column: &str,
160) -> Result<i64> {
161 let query = format!(
162 r#"
163 SELECT COUNT(*) as count
164 FROM "{}" AS t
165 WHERE t."{}" IS NOT NULL
166 AND NOT EXISTS (
167 SELECT 1 FROM "{}" AS r
168 WHERE r."{}" = t."{}"
169 )
170 "#,
171 table_name, column_name, referenced_table, referenced_column, column_name
172 );
173
174 let row: (i64,) = sqlx::query_as(&query).fetch_one(pool).await?;
175 Ok(row.0)
176}
177
178pub async fn get_all_orphaned_records(pool: &PgPool) -> Result<Vec<OrphanedRecordInfo>> {
180 let foreign_keys = get_foreign_keys(pool).await?;
181 let mut orphaned_records = Vec::new();
182
183 for fk in foreign_keys {
184 let count = detect_orphaned_records(
185 pool,
186 &fk.table_name,
187 &fk.column_name,
188 &fk.referenced_table,
189 &fk.referenced_column,
190 )
191 .await?;
192
193 if count > 0 {
194 orphaned_records.push(OrphanedRecordInfo {
195 table_name: fk.table_name,
196 column_name: fk.column_name,
197 referenced_table: fk.referenced_table,
198 orphaned_count: count,
199 });
200 }
201 }
202
203 Ok(orphaned_records)
204}
205
206pub async fn get_check_constraints(pool: &PgPool) -> Result<Vec<CheckConstraintInfo>> {
208 let rows = sqlx::query(
209 r#"
210 SELECT
211 tc.constraint_name,
212 tc.table_name,
213 pg_get_constraintdef(pgc.oid) AS check_definition,
214 pgc.convalidated AS is_validated
215 FROM information_schema.table_constraints AS tc
216 JOIN pg_constraint AS pgc
217 ON pgc.conname = tc.constraint_name
218 WHERE tc.constraint_type = 'CHECK'
219 AND tc.table_schema = 'public'
220 ORDER BY tc.table_name, tc.constraint_name
221 "#,
222 )
223 .fetch_all(pool)
224 .await?;
225
226 let mut constraints = Vec::new();
227 for row in rows {
228 constraints.push(CheckConstraintInfo {
229 constraint_name: row.try_get("constraint_name")?,
230 table_name: row.try_get("table_name")?,
231 check_definition: row.try_get("check_definition").unwrap_or_default(),
232 is_validated: row.try_get("is_validated").unwrap_or(false),
233 });
234 }
235
236 Ok(constraints)
237}
238
239pub async fn validate_schema(pool: &PgPool) -> Result<SchemaValidationReport> {
241 let mut issues = Vec::new();
242
243 let foreign_keys = get_foreign_keys(pool).await?;
245 for fk in &foreign_keys {
246 if !fk.has_index {
247 issues.push(ValidationIssue {
248 severity: ValidationSeverity::Warning,
249 category: "Missing Index".to_string(),
250 description: format!(
251 "Foreign key column '{}' in table '{}' lacks an index",
252 fk.column_name, fk.table_name
253 ),
254 affected_object: format!("{}.{}", fk.table_name, fk.column_name),
255 recommendation: format!(
256 "CREATE INDEX idx_{}_{} ON \"{}\" (\"{}\");",
257 fk.table_name, fk.column_name, fk.table_name, fk.column_name
258 ),
259 });
260 }
261 }
262
263 let orphaned_records = get_all_orphaned_records(pool).await?;
265 for orphaned in &orphaned_records {
266 let severity = if orphaned.orphaned_count > 100 {
267 ValidationSeverity::Critical
268 } else if orphaned.orphaned_count > 10 {
269 ValidationSeverity::Error
270 } else {
271 ValidationSeverity::Warning
272 };
273
274 issues.push(ValidationIssue {
275 severity,
276 category: "Orphaned Records".to_string(),
277 description: format!(
278 "Table '{}' has {} orphaned records in column '{}'",
279 orphaned.table_name, orphaned.orphaned_count, orphaned.column_name
280 ),
281 affected_object: format!("{}.{}", orphaned.table_name, orphaned.column_name),
282 recommendation: format!(
283 "Clean up orphaned records or fix references to '{}'",
284 orphaned.referenced_table
285 ),
286 });
287 }
288
289 let check_constraints = get_check_constraints(pool).await?;
291 for constraint in &check_constraints {
292 if !constraint.is_validated {
293 issues.push(ValidationIssue {
294 severity: ValidationSeverity::Warning,
295 category: "Unvalidated Constraint".to_string(),
296 description: format!(
297 "Check constraint '{}' on table '{}' is not validated",
298 constraint.constraint_name, constraint.table_name
299 ),
300 affected_object: format!(
301 "{}.{}",
302 constraint.table_name, constraint.constraint_name
303 ),
304 recommendation: format!(
305 "ALTER TABLE \"{}\" VALIDATE CONSTRAINT \"{}\";",
306 constraint.table_name, constraint.constraint_name
307 ),
308 });
309 }
310 }
311
312 let critical_count = issues
314 .iter()
315 .filter(|i| i.severity == ValidationSeverity::Critical)
316 .count();
317 let error_count = issues
318 .iter()
319 .filter(|i| i.severity == ValidationSeverity::Error)
320 .count();
321 let warning_count = issues
322 .iter()
323 .filter(|i| i.severity == ValidationSeverity::Warning)
324 .count();
325 let info_count = issues
326 .iter()
327 .filter(|i| i.severity == ValidationSeverity::Info)
328 .count();
329
330 Ok(SchemaValidationReport {
331 issues,
332 foreign_keys,
333 orphaned_records,
334 check_constraints,
335 critical_count,
336 error_count,
337 warning_count,
338 info_count,
339 validated_at: chrono::Utc::now(),
340 })
341}
342
343pub async fn get_tables_without_primary_keys(pool: &PgPool) -> Result<Vec<String>> {
345 let rows = sqlx::query(
346 r#"
347 SELECT table_name
348 FROM information_schema.tables
349 WHERE table_schema = 'public'
350 AND table_type = 'BASE TABLE'
351 AND table_name NOT IN (
352 SELECT table_name
353 FROM information_schema.table_constraints
354 WHERE constraint_type = 'PRIMARY KEY'
355 AND table_schema = 'public'
356 )
357 ORDER BY table_name
358 "#,
359 )
360 .fetch_all(pool)
361 .await?;
362
363 let mut tables = Vec::new();
364 for row in rows {
365 tables.push(row.try_get("table_name")?);
366 }
367
368 Ok(tables)
369}
370
371pub async fn get_nullable_foreign_key_columns(pool: &PgPool) -> Result<Vec<String>> {
373 let rows = sqlx::query(
374 r#"
375 SELECT DISTINCT
376 kcu.table_name || '.' || kcu.column_name AS column_path
377 FROM information_schema.key_column_usage AS kcu
378 JOIN information_schema.table_constraints AS tc
379 ON kcu.constraint_name = tc.constraint_name
380 JOIN information_schema.columns AS c
381 ON c.table_name = kcu.table_name
382 AND c.column_name = kcu.column_name
383 WHERE tc.constraint_type = 'FOREIGN KEY'
384 AND c.is_nullable = 'YES'
385 AND kcu.table_schema = 'public'
386 ORDER BY column_path
387 "#,
388 )
389 .fetch_all(pool)
390 .await?;
391
392 let mut columns = Vec::new();
393 for row in rows {
394 columns.push(row.try_get("column_path")?);
395 }
396
397 Ok(columns)
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403
404 #[test]
405 fn test_foreign_key_info_creation() {
406 let fk = ForeignKeyInfo {
407 constraint_name: "fk_user_id".to_string(),
408 table_name: "orders".to_string(),
409 column_name: "user_id".to_string(),
410 referenced_table: "users".to_string(),
411 referenced_column: "id".to_string(),
412 has_index: true,
413 };
414
415 assert_eq!(fk.table_name, "orders");
416 assert_eq!(fk.referenced_table, "users");
417 assert!(fk.has_index);
418 }
419
420 #[test]
421 fn test_orphaned_record_info_creation() {
422 let orphaned = OrphanedRecordInfo {
423 table_name: "orders".to_string(),
424 column_name: "user_id".to_string(),
425 referenced_table: "users".to_string(),
426 orphaned_count: 42,
427 };
428
429 assert_eq!(orphaned.orphaned_count, 42);
430 }
431
432 #[test]
433 fn test_validation_issue_severity() {
434 let issue = ValidationIssue {
435 severity: ValidationSeverity::Critical,
436 category: "Data Integrity".to_string(),
437 description: "Test issue".to_string(),
438 affected_object: "test_table".to_string(),
439 recommendation: "Fix it".to_string(),
440 };
441
442 assert_eq!(issue.severity, ValidationSeverity::Critical);
443 }
444
445 #[test]
446 fn test_validation_severity_ordering() {
447 assert!(ValidationSeverity::Critical != ValidationSeverity::Error);
448 assert!(ValidationSeverity::Warning != ValidationSeverity::Info);
449 }
450
451 #[test]
452 fn test_check_constraint_info() {
453 let constraint = CheckConstraintInfo {
454 constraint_name: "check_positive".to_string(),
455 table_name: "balances".to_string(),
456 check_definition: "CHECK (amount >= 0)".to_string(),
457 is_validated: true,
458 };
459
460 assert_eq!(constraint.table_name, "balances");
461 assert!(constraint.is_validated);
462 }
463
464 #[test]
465 fn test_schema_validation_report_counts() {
466 let report = SchemaValidationReport {
467 issues: vec![
468 ValidationIssue {
469 severity: ValidationSeverity::Critical,
470 category: "Test".to_string(),
471 description: "Critical issue".to_string(),
472 affected_object: "obj1".to_string(),
473 recommendation: "Fix".to_string(),
474 },
475 ValidationIssue {
476 severity: ValidationSeverity::Warning,
477 category: "Test".to_string(),
478 description: "Warning issue".to_string(),
479 affected_object: "obj2".to_string(),
480 recommendation: "Fix".to_string(),
481 },
482 ],
483 foreign_keys: vec![],
484 orphaned_records: vec![],
485 check_constraints: vec![],
486 critical_count: 1,
487 error_count: 0,
488 warning_count: 1,
489 info_count: 0,
490 validated_at: chrono::Utc::now(),
491 };
492
493 assert_eq!(report.critical_count, 1);
494 assert_eq!(report.warning_count, 1);
495 assert_eq!(report.issues.len(), 2);
496 }
497
498 #[test]
499 fn test_validation_issue_serialization() {
500 let issue = ValidationIssue {
501 severity: ValidationSeverity::Error,
502 category: "Integrity".to_string(),
503 description: "Test".to_string(),
504 affected_object: "table.column".to_string(),
505 recommendation: "Fix immediately".to_string(),
506 };
507
508 let json = serde_json::to_string(&issue).unwrap();
509 let deserialized: ValidationIssue = serde_json::from_str(&json).unwrap();
510 assert_eq!(issue.severity, deserialized.severity);
511 assert_eq!(issue.category, deserialized.category);
512 }
513
514 #[test]
515 fn test_foreign_key_serialization() {
516 let fk = ForeignKeyInfo {
517 constraint_name: "fk_test".to_string(),
518 table_name: "test".to_string(),
519 column_name: "id".to_string(),
520 referenced_table: "ref".to_string(),
521 referenced_column: "id".to_string(),
522 has_index: false,
523 };
524
525 let json = serde_json::to_string(&fk).unwrap();
526 let deserialized: ForeignKeyInfo = serde_json::from_str(&json).unwrap();
527 assert_eq!(fk.table_name, deserialized.table_name);
528 assert_eq!(fk.has_index, deserialized.has_index);
529 }
530
531 #[test]
532 fn test_orphaned_record_serialization() {
533 let orphaned = OrphanedRecordInfo {
534 table_name: "orders".to_string(),
535 column_name: "user_id".to_string(),
536 referenced_table: "users".to_string(),
537 orphaned_count: 10,
538 };
539
540 let json = serde_json::to_string(&orphaned).unwrap();
541 let deserialized: OrphanedRecordInfo = serde_json::from_str(&json).unwrap();
542 assert_eq!(orphaned.orphaned_count, deserialized.orphaned_count);
543 }
544
545 #[test]
546 fn test_schema_validation_report_serialization() {
547 let report = SchemaValidationReport {
548 issues: vec![],
549 foreign_keys: vec![],
550 orphaned_records: vec![],
551 check_constraints: vec![],
552 critical_count: 0,
553 error_count: 0,
554 warning_count: 0,
555 info_count: 0,
556 validated_at: chrono::Utc::now(),
557 };
558
559 let json = serde_json::to_string(&report).unwrap();
560 let deserialized: SchemaValidationReport = serde_json::from_str(&json).unwrap();
561 assert_eq!(report.critical_count, deserialized.critical_count);
562 }
563}