1use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::RwLock;
10
11use super::{
12 TableSchema, ColumnSchema, IndexSchema, AccessPattern,
13 DataTemperature, WorkloadType, Relationship,
14};
15use super::registry::{StorageType, IndexType, RelationshipType};
16
17#[derive(Debug, Clone)]
19pub struct DiscoveryConfig {
20 pub enabled: bool,
22 pub refresh_interval: Duration,
24 pub schemas: Vec<String>,
26 pub exclude_tables: Vec<String>,
28 pub include_system_tables: bool,
30 pub discover_relationships: bool,
32 pub discover_indexes: bool,
34 pub infer_access_patterns: bool,
36 pub sample_statistics: bool,
38}
39
40impl Default for DiscoveryConfig {
41 fn default() -> Self {
42 Self {
43 enabled: true,
44 refresh_interval: Duration::from_secs(300),
45 schemas: vec!["public".to_string()],
46 exclude_tables: Vec::new(),
47 include_system_tables: false,
48 discover_relationships: true,
49 discover_indexes: true,
50 infer_access_patterns: true,
51 sample_statistics: true,
52 }
53 }
54}
55
56pub struct SchemaDiscovery {
58 config: DiscoveryConfig,
59 cache: Arc<RwLock<DiscoveryCache>>,
61 last_refresh: Arc<RwLock<Option<std::time::Instant>>>,
63}
64
65#[derive(Debug, Default)]
67struct DiscoveryCache {
68 tables: HashMap<String, TableSchema>,
69 indexes: HashMap<String, Vec<IndexSchema>>,
70 relationships: Vec<Relationship>,
71 statistics: HashMap<String, TableStatistics>,
72}
73
74#[derive(Debug, Clone)]
76pub struct TableStatistics {
77 pub table_name: String,
79 pub row_count: u64,
81 pub size_bytes: u64,
83 pub index_size_bytes: u64,
85 pub seq_scan_count: u64,
87 pub idx_scan_count: u64,
89 pub n_tup_ins: u64,
91 pub n_tup_upd: u64,
93 pub n_tup_del: u64,
95 pub last_vacuum: Option<String>,
97 pub last_analyze: Option<String>,
99}
100
101impl SchemaDiscovery {
102 pub fn new(config: DiscoveryConfig) -> Self {
104 Self {
105 config,
106 cache: Arc::new(RwLock::new(DiscoveryCache::default())),
107 last_refresh: Arc::new(RwLock::new(None)),
108 }
109 }
110
111 pub async fn discover(&self) -> Result<Vec<TableSchema>, DiscoveryError> {
115 let _queries = self.build_discovery_queries();
117
118 let mut tables = Vec::new();
121
122 let users_table = TableSchema {
124 name: "users".to_string(),
125 columns: vec![
126 ColumnSchema {
127 name: "id".to_string(),
128 data_type: "bigint".to_string(),
129 nullable: false,
130 is_primary_key: true,
131 is_indexed: true,
132 storage_type: StorageType::Row,
133 },
134 ColumnSchema {
135 name: "email".to_string(),
136 data_type: "varchar(255)".to_string(),
137 nullable: false,
138 is_primary_key: false,
139 is_indexed: true,
140 storage_type: StorageType::Row,
141 },
142 ColumnSchema {
143 name: "created_at".to_string(),
144 data_type: "timestamp".to_string(),
145 nullable: false,
146 is_primary_key: false,
147 is_indexed: true,
148 storage_type: StorageType::Row,
149 },
150 ],
151 access_pattern: AccessPattern::PointLookup,
152 temperature: DataTemperature::Hot,
153 workload: WorkloadType::OLTP,
154 primary_key: vec!["id".to_string()],
155 shard_key: Some("id".to_string()),
156 estimated_rows: 1_000_000,
157 avg_row_size: 100,
158 partition_key: None,
159 preferred_nodes: Vec::new(),
160 };
161 tables.push(users_table);
162
163 let mut cache = self.cache.write().await;
165 for table in &tables {
166 cache.tables.insert(table.name.clone(), table.clone());
167 }
168
169 let mut last_refresh = self.last_refresh.write().await;
171 *last_refresh = Some(std::time::Instant::now());
172
173 Ok(tables)
174 }
175
176 pub async fn discover_table(&self, table_name: &str) -> Result<TableSchema, DiscoveryError> {
178 {
180 let cache = self.cache.read().await;
181 if let Some(table) = cache.tables.get(table_name) {
182 return Ok(table.clone());
183 }
184 }
185
186 let _query = self.build_table_query(table_name);
188
189 Err(DiscoveryError::TableNotFound(table_name.to_string()))
191 }
192
193 pub async fn discover_indexes(&self, table_name: &str) -> Result<Vec<IndexSchema>, DiscoveryError> {
195 if !self.config.discover_indexes {
196 return Ok(Vec::new());
197 }
198
199 {
201 let cache = self.cache.read().await;
202 if let Some(indexes) = cache.indexes.get(table_name) {
203 return Ok(indexes.clone());
204 }
205 }
206
207 let indexes = vec![
209 IndexSchema {
210 name: format!("{}_pkey", table_name),
211 table: table_name.to_string(),
212 columns: vec!["id".to_string()],
213 is_unique: true,
214 index_type: IndexType::BTree,
215 },
216 IndexSchema {
217 name: format!("{}_email_idx", table_name),
218 table: table_name.to_string(),
219 columns: vec!["email".to_string()],
220 is_unique: true,
221 index_type: IndexType::BTree,
222 },
223 ];
224
225 let mut cache = self.cache.write().await;
227 cache.indexes.insert(table_name.to_string(), indexes.clone());
228
229 Ok(indexes)
230 }
231
232 pub async fn discover_relationships(&self) -> Result<Vec<Relationship>, DiscoveryError> {
234 if !self.config.discover_relationships {
235 return Ok(Vec::new());
236 }
237
238 {
240 let cache = self.cache.read().await;
241 if !cache.relationships.is_empty() {
242 return Ok(cache.relationships.clone());
243 }
244 }
245
246 let relationships = vec![
248 Relationship {
249 from_table: "orders".to_string(),
250 from_column: "user_id".to_string(),
251 to_table: "users".to_string(),
252 to_column: "id".to_string(),
253 relationship_type: RelationshipType::ManyToOne,
254 },
255 Relationship {
256 from_table: "order_items".to_string(),
257 from_column: "order_id".to_string(),
258 to_table: "orders".to_string(),
259 to_column: "id".to_string(),
260 relationship_type: RelationshipType::ManyToOne,
261 },
262 ];
263
264 let mut cache = self.cache.write().await;
266 cache.relationships = relationships.clone();
267
268 Ok(relationships)
269 }
270
271 pub async fn get_statistics(&self, table_name: &str) -> Result<TableStatistics, DiscoveryError> {
273 if !self.config.sample_statistics {
274 return Err(DiscoveryError::StatisticsDisabled);
275 }
276
277 {
279 let cache = self.cache.read().await;
280 if let Some(stats) = cache.statistics.get(table_name) {
281 return Ok(stats.clone());
282 }
283 }
284
285 let stats = TableStatistics {
287 table_name: table_name.to_string(),
288 row_count: 1_000_000,
289 size_bytes: 100_000_000,
290 index_size_bytes: 20_000_000,
291 seq_scan_count: 100,
292 idx_scan_count: 50_000,
293 n_tup_ins: 1000,
294 n_tup_upd: 500,
295 n_tup_del: 100,
296 last_vacuum: Some("2024-01-15 10:00:00".to_string()),
297 last_analyze: Some("2024-01-15 10:00:00".to_string()),
298 };
299
300 let mut cache = self.cache.write().await;
302 cache.statistics.insert(table_name.to_string(), stats.clone());
303
304 Ok(stats)
305 }
306
307 pub fn infer_temperature(&self, stats: &TableStatistics) -> DataTemperature {
309 let total_scans = stats.seq_scan_count + stats.idx_scan_count;
311 let write_rate = stats.n_tup_ins + stats.n_tup_upd + stats.n_tup_del;
312
313 if total_scans > 10_000 && write_rate > 100 {
315 return DataTemperature::Hot;
316 }
317
318 if total_scans > 1_000 || write_rate > 10 {
320 return DataTemperature::Warm;
321 }
322
323 if total_scans > 100 {
325 return DataTemperature::Cold;
326 }
327
328 DataTemperature::Frozen
330 }
331
332 pub fn infer_access_pattern(&self, stats: &TableStatistics) -> AccessPattern {
334 let total_scans = stats.seq_scan_count + stats.idx_scan_count;
335
336 if total_scans == 0 {
337 return AccessPattern::Mixed;
338 }
339
340 let index_ratio = stats.idx_scan_count as f64 / total_scans as f64;
341
342 if index_ratio > 0.9 {
344 return AccessPattern::PointLookup;
345 }
346
347 if index_ratio > 0.5 {
349 return AccessPattern::RangeScan;
350 }
351
352 if index_ratio < 0.1 {
354 return AccessPattern::FullScan;
355 }
356
357 AccessPattern::Mixed
358 }
359
360 pub fn infer_workload(&self, stats: &TableStatistics) -> WorkloadType {
362 let total_scans = stats.seq_scan_count + stats.idx_scan_count;
363 let write_rate = stats.n_tup_ins + stats.n_tup_upd + stats.n_tup_del;
364
365 if write_rate > 100 && stats.idx_scan_count > stats.seq_scan_count {
367 return WorkloadType::OLTP;
368 }
369
370 if total_scans > 1000 && stats.seq_scan_count > stats.idx_scan_count * 2 {
372 return WorkloadType::OLAP;
373 }
374
375 if write_rate > 50 && total_scans > 500 {
377 return WorkloadType::HTAP;
378 }
379
380 WorkloadType::Mixed
381 }
382
383 pub async fn needs_refresh(&self) -> bool {
385 let last_refresh = self.last_refresh.read().await;
386 match *last_refresh {
387 None => true,
388 Some(time) => time.elapsed() > self.config.refresh_interval,
389 }
390 }
391
392 pub async fn refresh(&self) -> Result<(), DiscoveryError> {
394 self.discover().await?;
395 if self.config.discover_relationships {
396 self.discover_relationships().await?;
397 }
398 Ok(())
399 }
400
401 pub async fn clear_cache(&self) {
403 let mut cache = self.cache.write().await;
404 *cache = DiscoveryCache::default();
405
406 let mut last_refresh = self.last_refresh.write().await;
407 *last_refresh = None;
408 }
409
410 fn build_discovery_queries(&self) -> Vec<String> {
412 let mut queries = Vec::new();
413
414 let schemas_filter = if self.config.schemas.is_empty() {
416 String::new()
417 } else {
418 let schemas = self.config.schemas.iter()
419 .map(|s| format!("'{}'", s))
420 .collect::<Vec<_>>()
421 .join(", ");
422 format!("AND table_schema IN ({})", schemas)
423 };
424
425 queries.push(format!(
426 r#"
427 SELECT
428 table_schema,
429 table_name,
430 table_type
431 FROM information_schema.tables
432 WHERE table_type = 'BASE TABLE'
433 {}
434 ORDER BY table_schema, table_name
435 "#,
436 schemas_filter
437 ));
438
439 queries.push(
441 r#"
442 SELECT
443 table_schema,
444 table_name,
445 column_name,
446 data_type,
447 is_nullable,
448 column_default
449 FROM information_schema.columns
450 ORDER BY table_schema, table_name, ordinal_position
451 "#.to_string()
452 );
453
454 queries.push(
456 r#"
457 SELECT
458 schemaname,
459 tablename,
460 indexname,
461 indexdef
462 FROM pg_indexes
463 ORDER BY schemaname, tablename, indexname
464 "#.to_string()
465 );
466
467 if self.config.discover_relationships {
469 queries.push(
470 r#"
471 SELECT
472 tc.table_schema,
473 tc.table_name,
474 kcu.column_name,
475 ccu.table_schema AS foreign_table_schema,
476 ccu.table_name AS foreign_table_name,
477 ccu.column_name AS foreign_column_name
478 FROM information_schema.table_constraints AS tc
479 JOIN information_schema.key_column_usage AS kcu
480 ON tc.constraint_name = kcu.constraint_name
481 JOIN information_schema.constraint_column_usage AS ccu
482 ON ccu.constraint_name = tc.constraint_name
483 WHERE tc.constraint_type = 'FOREIGN KEY'
484 "#.to_string()
485 );
486 }
487
488 if self.config.sample_statistics {
490 queries.push(
491 r#"
492 SELECT
493 schemaname,
494 relname as tablename,
495 n_live_tup as row_count,
496 seq_scan,
497 idx_scan,
498 n_tup_ins,
499 n_tup_upd,
500 n_tup_del,
501 last_vacuum,
502 last_analyze
503 FROM pg_stat_user_tables
504 "#.to_string()
505 );
506 }
507
508 queries
509 }
510
511 fn build_table_query(&self, table_name: &str) -> String {
513 format!(
514 r#"
515 SELECT
516 c.column_name,
517 c.data_type,
518 c.is_nullable,
519 c.column_default,
520 CASE WHEN pk.column_name IS NOT NULL THEN true ELSE false END as is_primary_key
521 FROM information_schema.columns c
522 LEFT JOIN (
523 SELECT kcu.column_name
524 FROM information_schema.table_constraints tc
525 JOIN information_schema.key_column_usage kcu
526 ON tc.constraint_name = kcu.constraint_name
527 WHERE tc.constraint_type = 'PRIMARY KEY'
528 AND tc.table_name = '{}'
529 ) pk ON c.column_name = pk.column_name
530 WHERE c.table_name = '{}'
531 ORDER BY c.ordinal_position
532 "#,
533 table_name, table_name
534 )
535 }
536}
537
538#[derive(Debug, Clone)]
540pub enum DiscoveryError {
541 TableNotFound(String),
543 ConnectionError(String),
545 QueryError(String),
547 StatisticsDisabled,
549 RefreshFailed(String),
551}
552
553impl std::fmt::Display for DiscoveryError {
554 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
555 match self {
556 Self::TableNotFound(name) => write!(f, "Table not found: {}", name),
557 Self::ConnectionError(msg) => write!(f, "Connection error: {}", msg),
558 Self::QueryError(msg) => write!(f, "Query error: {}", msg),
559 Self::StatisticsDisabled => write!(f, "Statistics collection is disabled"),
560 Self::RefreshFailed(msg) => write!(f, "Cache refresh failed: {}", msg),
561 }
562 }
563}
564
565impl std::error::Error for DiscoveryError {}
566
567#[cfg(test)]
568mod tests {
569 use super::*;
570
571 #[tokio::test]
572 async fn test_discovery_config_default() {
573 let config = DiscoveryConfig::default();
574 assert!(config.enabled);
575 assert_eq!(config.schemas, vec!["public"]);
576 assert!(config.discover_relationships);
577 assert!(config.discover_indexes);
578 }
579
580 #[tokio::test]
581 async fn test_schema_discovery_new() {
582 let config = DiscoveryConfig::default();
583 let discovery = SchemaDiscovery::new(config);
584 assert!(discovery.needs_refresh().await);
585 }
586
587 #[tokio::test]
588 async fn test_discover_tables() {
589 let config = DiscoveryConfig::default();
590 let discovery = SchemaDiscovery::new(config);
591
592 let tables = discovery.discover().await.unwrap();
593 assert!(!tables.is_empty());
594
595 let users = tables.iter().find(|t| t.name == "users").unwrap();
596 assert_eq!(users.temperature, DataTemperature::Hot);
597 assert_eq!(users.workload, WorkloadType::OLTP);
598 }
599
600 #[tokio::test]
601 async fn test_discover_indexes() {
602 let config = DiscoveryConfig::default();
603 let discovery = SchemaDiscovery::new(config);
604
605 let indexes = discovery.discover_indexes("users").await.unwrap();
606 assert!(!indexes.is_empty());
607
608 let pkey = indexes.iter().find(|i| i.name.ends_with("_pkey")).unwrap();
609 assert!(pkey.is_unique);
610 }
611
612 #[tokio::test]
613 async fn test_discover_relationships() {
614 let config = DiscoveryConfig::default();
615 let discovery = SchemaDiscovery::new(config);
616
617 let rels = discovery.discover_relationships().await.unwrap();
618 assert!(!rels.is_empty());
619
620 let order_user = rels.iter()
621 .find(|r| r.from_table == "orders" && r.to_table == "users")
622 .unwrap();
623 assert_eq!(order_user.relationship_type, RelationshipType::ManyToOne);
624 }
625
626 #[tokio::test]
627 async fn test_get_statistics() {
628 let config = DiscoveryConfig::default();
629 let discovery = SchemaDiscovery::new(config);
630
631 let stats = discovery.get_statistics("users").await.unwrap();
632 assert_eq!(stats.table_name, "users");
633 assert!(stats.row_count > 0);
634 }
635
636 #[tokio::test]
637 async fn test_infer_temperature() {
638 let config = DiscoveryConfig::default();
639 let discovery = SchemaDiscovery::new(config);
640
641 let hot_stats = TableStatistics {
643 table_name: "active_sessions".to_string(),
644 row_count: 10000,
645 size_bytes: 1_000_000,
646 index_size_bytes: 100_000,
647 seq_scan_count: 1000,
648 idx_scan_count: 50000,
649 n_tup_ins: 500,
650 n_tup_upd: 200,
651 n_tup_del: 100,
652 last_vacuum: None,
653 last_analyze: None,
654 };
655 assert_eq!(discovery.infer_temperature(&hot_stats), DataTemperature::Hot);
656
657 let cold_stats = TableStatistics {
659 table_name: "audit_logs".to_string(),
660 row_count: 1_000_000,
661 size_bytes: 100_000_000,
662 index_size_bytes: 10_000_000,
663 seq_scan_count: 50,
664 idx_scan_count: 100,
665 n_tup_ins: 5,
666 n_tup_upd: 0,
667 n_tup_del: 0,
668 last_vacuum: None,
669 last_analyze: None,
670 };
671 assert_eq!(discovery.infer_temperature(&cold_stats), DataTemperature::Cold);
672 }
673
674 #[tokio::test]
675 async fn test_infer_access_pattern() {
676 let config = DiscoveryConfig::default();
677 let discovery = SchemaDiscovery::new(config);
678
679 let point_stats = TableStatistics {
681 table_name: "users".to_string(),
682 row_count: 100000,
683 size_bytes: 10_000_000,
684 index_size_bytes: 1_000_000,
685 seq_scan_count: 10,
686 idx_scan_count: 10000,
687 n_tup_ins: 0,
688 n_tup_upd: 0,
689 n_tup_del: 0,
690 last_vacuum: None,
691 last_analyze: None,
692 };
693 assert_eq!(discovery.infer_access_pattern(&point_stats), AccessPattern::PointLookup);
694
695 let scan_stats = TableStatistics {
697 table_name: "reports".to_string(),
698 row_count: 100000,
699 size_bytes: 10_000_000,
700 index_size_bytes: 1_000_000,
701 seq_scan_count: 1000,
702 idx_scan_count: 50,
703 n_tup_ins: 0,
704 n_tup_upd: 0,
705 n_tup_del: 0,
706 last_vacuum: None,
707 last_analyze: None,
708 };
709 assert_eq!(discovery.infer_access_pattern(&scan_stats), AccessPattern::FullScan);
710 }
711
712 #[tokio::test]
713 async fn test_infer_workload() {
714 let config = DiscoveryConfig::default();
715 let discovery = SchemaDiscovery::new(config);
716
717 let oltp_stats = TableStatistics {
719 table_name: "orders".to_string(),
720 row_count: 100000,
721 size_bytes: 10_000_000,
722 index_size_bytes: 1_000_000,
723 seq_scan_count: 100,
724 idx_scan_count: 5000,
725 n_tup_ins: 200,
726 n_tup_upd: 50,
727 n_tup_del: 10,
728 last_vacuum: None,
729 last_analyze: None,
730 };
731 assert_eq!(discovery.infer_workload(&oltp_stats), WorkloadType::OLTP);
732
733 let olap_stats = TableStatistics {
735 table_name: "sales_history".to_string(),
736 row_count: 10_000_000,
737 size_bytes: 1_000_000_000,
738 index_size_bytes: 100_000_000,
739 seq_scan_count: 5000,
740 idx_scan_count: 100,
741 n_tup_ins: 10,
742 n_tup_upd: 0,
743 n_tup_del: 0,
744 last_vacuum: None,
745 last_analyze: None,
746 };
747 assert_eq!(discovery.infer_workload(&olap_stats), WorkloadType::OLAP);
748 }
749
750 #[tokio::test]
751 async fn test_cache_clear() {
752 let config = DiscoveryConfig::default();
753 let discovery = SchemaDiscovery::new(config);
754
755 discovery.discover().await.unwrap();
757 assert!(!discovery.needs_refresh().await);
758
759 discovery.clear_cache().await;
761 assert!(discovery.needs_refresh().await);
762 }
763
764 #[tokio::test]
765 async fn test_table_not_found() {
766 let config = DiscoveryConfig::default();
767 let discovery = SchemaDiscovery::new(config);
768
769 let result = discovery.discover_table("nonexistent_table").await;
770 assert!(matches!(result, Err(DiscoveryError::TableNotFound(_))));
771 }
772
773 #[tokio::test]
774 async fn test_statistics_disabled() {
775 let config = DiscoveryConfig {
776 sample_statistics: false,
777 ..Default::default()
778 };
779 let discovery = SchemaDiscovery::new(config);
780
781 let result = discovery.get_statistics("users").await;
782 assert!(matches!(result, Err(DiscoveryError::StatisticsDisabled)));
783 }
784
785 #[test]
786 fn test_discovery_error_display() {
787 let err = DiscoveryError::TableNotFound("users".to_string());
788 assert_eq!(err.to_string(), "Table not found: users");
789
790 let err = DiscoveryError::ConnectionError("timeout".to_string());
791 assert_eq!(err.to_string(), "Connection error: timeout");
792 }
793}