1use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::RwLock;
10
11use super::registry::{IndexType, RelationshipType, StorageType};
12use super::{
13 AccessPattern, ColumnSchema, DataTemperature, IndexSchema, Relationship, TableSchema,
14 WorkloadType,
15};
16
17#[derive(Debug, Clone)]
19pub struct DiscoveryConfig {
20 pub enabled: bool,
22 pub refresh_interval: Duration,
24 pub schemas: Vec<String>,
26 pub exclude_tables: Vec<String>,
28 pub include_system_tables: bool,
30 pub discover_relationships: bool,
32 pub discover_indexes: bool,
34 pub infer_access_patterns: bool,
36 pub sample_statistics: bool,
38}
39
40impl Default for DiscoveryConfig {
41 fn default() -> Self {
42 Self {
43 enabled: true,
44 refresh_interval: Duration::from_secs(300),
45 schemas: vec!["public".to_string()],
46 exclude_tables: Vec::new(),
47 include_system_tables: false,
48 discover_relationships: true,
49 discover_indexes: true,
50 infer_access_patterns: true,
51 sample_statistics: true,
52 }
53 }
54}
55
56pub struct SchemaDiscovery {
58 config: DiscoveryConfig,
59 cache: Arc<RwLock<DiscoveryCache>>,
61 last_refresh: Arc<RwLock<Option<std::time::Instant>>>,
63}
64
65#[derive(Debug, Default)]
67struct DiscoveryCache {
68 tables: HashMap<String, TableSchema>,
69 indexes: HashMap<String, Vec<IndexSchema>>,
70 relationships: Vec<Relationship>,
71 statistics: HashMap<String, TableStatistics>,
72}
73
74#[derive(Debug, Clone)]
76pub struct TableStatistics {
77 pub table_name: String,
79 pub row_count: u64,
81 pub size_bytes: u64,
83 pub index_size_bytes: u64,
85 pub seq_scan_count: u64,
87 pub idx_scan_count: u64,
89 pub n_tup_ins: u64,
91 pub n_tup_upd: u64,
93 pub n_tup_del: u64,
95 pub last_vacuum: Option<String>,
97 pub last_analyze: Option<String>,
99}
100
101impl SchemaDiscovery {
102 pub fn new(config: DiscoveryConfig) -> Self {
104 Self {
105 config,
106 cache: Arc::new(RwLock::new(DiscoveryCache::default())),
107 last_refresh: Arc::new(RwLock::new(None)),
108 }
109 }
110
111 pub async fn discover(&self) -> Result<Vec<TableSchema>, DiscoveryError> {
115 let _queries = self.build_discovery_queries();
117
118 let mut tables = Vec::new();
121
122 let users_table = TableSchema {
124 name: "users".to_string(),
125 columns: vec![
126 ColumnSchema {
127 name: "id".to_string(),
128 data_type: "bigint".to_string(),
129 nullable: false,
130 is_primary_key: true,
131 is_indexed: true,
132 storage_type: StorageType::Row,
133 },
134 ColumnSchema {
135 name: "email".to_string(),
136 data_type: "varchar(255)".to_string(),
137 nullable: false,
138 is_primary_key: false,
139 is_indexed: true,
140 storage_type: StorageType::Row,
141 },
142 ColumnSchema {
143 name: "created_at".to_string(),
144 data_type: "timestamp".to_string(),
145 nullable: false,
146 is_primary_key: false,
147 is_indexed: true,
148 storage_type: StorageType::Row,
149 },
150 ],
151 access_pattern: AccessPattern::PointLookup,
152 temperature: DataTemperature::Hot,
153 workload: WorkloadType::OLTP,
154 primary_key: vec!["id".to_string()],
155 shard_key: Some("id".to_string()),
156 estimated_rows: 1_000_000,
157 avg_row_size: 100,
158 partition_key: None,
159 preferred_nodes: Vec::new(),
160 };
161 tables.push(users_table);
162
163 let mut cache = self.cache.write().await;
165 for table in &tables {
166 cache.tables.insert(table.name.clone(), table.clone());
167 }
168
169 let mut last_refresh = self.last_refresh.write().await;
171 *last_refresh = Some(std::time::Instant::now());
172
173 Ok(tables)
174 }
175
176 pub async fn discover_table(&self, table_name: &str) -> Result<TableSchema, DiscoveryError> {
178 {
180 let cache = self.cache.read().await;
181 if let Some(table) = cache.tables.get(table_name) {
182 return Ok(table.clone());
183 }
184 }
185
186 let _query = self.build_table_query(table_name);
188
189 Err(DiscoveryError::TableNotFound(table_name.to_string()))
191 }
192
193 pub async fn discover_indexes(
195 &self,
196 table_name: &str,
197 ) -> Result<Vec<IndexSchema>, DiscoveryError> {
198 if !self.config.discover_indexes {
199 return Ok(Vec::new());
200 }
201
202 {
204 let cache = self.cache.read().await;
205 if let Some(indexes) = cache.indexes.get(table_name) {
206 return Ok(indexes.clone());
207 }
208 }
209
210 let indexes = vec![
212 IndexSchema {
213 name: format!("{}_pkey", table_name),
214 table: table_name.to_string(),
215 columns: vec!["id".to_string()],
216 is_unique: true,
217 index_type: IndexType::BTree,
218 },
219 IndexSchema {
220 name: format!("{}_email_idx", table_name),
221 table: table_name.to_string(),
222 columns: vec!["email".to_string()],
223 is_unique: true,
224 index_type: IndexType::BTree,
225 },
226 ];
227
228 let mut cache = self.cache.write().await;
230 cache
231 .indexes
232 .insert(table_name.to_string(), indexes.clone());
233
234 Ok(indexes)
235 }
236
237 pub async fn discover_relationships(&self) -> Result<Vec<Relationship>, DiscoveryError> {
239 if !self.config.discover_relationships {
240 return Ok(Vec::new());
241 }
242
243 {
245 let cache = self.cache.read().await;
246 if !cache.relationships.is_empty() {
247 return Ok(cache.relationships.clone());
248 }
249 }
250
251 let relationships = vec![
253 Relationship {
254 from_table: "orders".to_string(),
255 from_column: "user_id".to_string(),
256 to_table: "users".to_string(),
257 to_column: "id".to_string(),
258 relationship_type: RelationshipType::ManyToOne,
259 },
260 Relationship {
261 from_table: "order_items".to_string(),
262 from_column: "order_id".to_string(),
263 to_table: "orders".to_string(),
264 to_column: "id".to_string(),
265 relationship_type: RelationshipType::ManyToOne,
266 },
267 ];
268
269 let mut cache = self.cache.write().await;
271 cache.relationships = relationships.clone();
272
273 Ok(relationships)
274 }
275
276 pub async fn get_statistics(
278 &self,
279 table_name: &str,
280 ) -> Result<TableStatistics, DiscoveryError> {
281 if !self.config.sample_statistics {
282 return Err(DiscoveryError::StatisticsDisabled);
283 }
284
285 {
287 let cache = self.cache.read().await;
288 if let Some(stats) = cache.statistics.get(table_name) {
289 return Ok(stats.clone());
290 }
291 }
292
293 let stats = TableStatistics {
295 table_name: table_name.to_string(),
296 row_count: 1_000_000,
297 size_bytes: 100_000_000,
298 index_size_bytes: 20_000_000,
299 seq_scan_count: 100,
300 idx_scan_count: 50_000,
301 n_tup_ins: 1000,
302 n_tup_upd: 500,
303 n_tup_del: 100,
304 last_vacuum: Some("2024-01-15 10:00:00".to_string()),
305 last_analyze: Some("2024-01-15 10:00:00".to_string()),
306 };
307
308 let mut cache = self.cache.write().await;
310 cache
311 .statistics
312 .insert(table_name.to_string(), stats.clone());
313
314 Ok(stats)
315 }
316
317 pub fn infer_temperature(&self, stats: &TableStatistics) -> DataTemperature {
319 let total_scans = stats.seq_scan_count + stats.idx_scan_count;
321 let write_rate = stats.n_tup_ins + stats.n_tup_upd + stats.n_tup_del;
322
323 if total_scans > 10_000 && write_rate > 100 {
325 return DataTemperature::Hot;
326 }
327
328 if total_scans > 1_000 || write_rate > 10 {
330 return DataTemperature::Warm;
331 }
332
333 if total_scans > 100 {
335 return DataTemperature::Cold;
336 }
337
338 DataTemperature::Frozen
340 }
341
342 pub fn infer_access_pattern(&self, stats: &TableStatistics) -> AccessPattern {
344 let total_scans = stats.seq_scan_count + stats.idx_scan_count;
345
346 if total_scans == 0 {
347 return AccessPattern::Mixed;
348 }
349
350 let index_ratio = stats.idx_scan_count as f64 / total_scans as f64;
351
352 if index_ratio > 0.9 {
354 return AccessPattern::PointLookup;
355 }
356
357 if index_ratio > 0.5 {
359 return AccessPattern::RangeScan;
360 }
361
362 if index_ratio < 0.1 {
364 return AccessPattern::FullScan;
365 }
366
367 AccessPattern::Mixed
368 }
369
370 pub fn infer_workload(&self, stats: &TableStatistics) -> WorkloadType {
372 let total_scans = stats.seq_scan_count + stats.idx_scan_count;
373 let write_rate = stats.n_tup_ins + stats.n_tup_upd + stats.n_tup_del;
374
375 if write_rate > 100 && stats.idx_scan_count > stats.seq_scan_count {
377 return WorkloadType::OLTP;
378 }
379
380 if total_scans > 1000 && stats.seq_scan_count > stats.idx_scan_count * 2 {
382 return WorkloadType::OLAP;
383 }
384
385 if write_rate > 50 && total_scans > 500 {
387 return WorkloadType::HTAP;
388 }
389
390 WorkloadType::Mixed
391 }
392
393 pub async fn needs_refresh(&self) -> bool {
395 let last_refresh = self.last_refresh.read().await;
396 match *last_refresh {
397 None => true,
398 Some(time) => time.elapsed() > self.config.refresh_interval,
399 }
400 }
401
402 pub async fn refresh(&self) -> Result<(), DiscoveryError> {
404 self.discover().await?;
405 if self.config.discover_relationships {
406 self.discover_relationships().await?;
407 }
408 Ok(())
409 }
410
411 pub async fn clear_cache(&self) {
413 let mut cache = self.cache.write().await;
414 *cache = DiscoveryCache::default();
415
416 let mut last_refresh = self.last_refresh.write().await;
417 *last_refresh = None;
418 }
419
420 fn build_discovery_queries(&self) -> Vec<String> {
422 let mut queries = Vec::new();
423
424 let schemas_filter = if self.config.schemas.is_empty() {
426 String::new()
427 } else {
428 let schemas = self
429 .config
430 .schemas
431 .iter()
432 .map(|s| format!("'{}'", s))
433 .collect::<Vec<_>>()
434 .join(", ");
435 format!("AND table_schema IN ({})", schemas)
436 };
437
438 queries.push(format!(
439 r#"
440 SELECT
441 table_schema,
442 table_name,
443 table_type
444 FROM information_schema.tables
445 WHERE table_type = 'BASE TABLE'
446 {}
447 ORDER BY table_schema, table_name
448 "#,
449 schemas_filter
450 ));
451
452 queries.push(
454 r#"
455 SELECT
456 table_schema,
457 table_name,
458 column_name,
459 data_type,
460 is_nullable,
461 column_default
462 FROM information_schema.columns
463 ORDER BY table_schema, table_name, ordinal_position
464 "#
465 .to_string(),
466 );
467
468 queries.push(
470 r#"
471 SELECT
472 schemaname,
473 tablename,
474 indexname,
475 indexdef
476 FROM pg_indexes
477 ORDER BY schemaname, tablename, indexname
478 "#
479 .to_string(),
480 );
481
482 if self.config.discover_relationships {
484 queries.push(
485 r#"
486 SELECT
487 tc.table_schema,
488 tc.table_name,
489 kcu.column_name,
490 ccu.table_schema AS foreign_table_schema,
491 ccu.table_name AS foreign_table_name,
492 ccu.column_name AS foreign_column_name
493 FROM information_schema.table_constraints AS tc
494 JOIN information_schema.key_column_usage AS kcu
495 ON tc.constraint_name = kcu.constraint_name
496 JOIN information_schema.constraint_column_usage AS ccu
497 ON ccu.constraint_name = tc.constraint_name
498 WHERE tc.constraint_type = 'FOREIGN KEY'
499 "#
500 .to_string(),
501 );
502 }
503
504 if self.config.sample_statistics {
506 queries.push(
507 r#"
508 SELECT
509 schemaname,
510 relname as tablename,
511 n_live_tup as row_count,
512 seq_scan,
513 idx_scan,
514 n_tup_ins,
515 n_tup_upd,
516 n_tup_del,
517 last_vacuum,
518 last_analyze
519 FROM pg_stat_user_tables
520 "#
521 .to_string(),
522 );
523 }
524
525 queries
526 }
527
528 fn build_table_query(&self, table_name: &str) -> String {
530 format!(
531 r#"
532 SELECT
533 c.column_name,
534 c.data_type,
535 c.is_nullable,
536 c.column_default,
537 CASE WHEN pk.column_name IS NOT NULL THEN true ELSE false END as is_primary_key
538 FROM information_schema.columns c
539 LEFT JOIN (
540 SELECT kcu.column_name
541 FROM information_schema.table_constraints tc
542 JOIN information_schema.key_column_usage kcu
543 ON tc.constraint_name = kcu.constraint_name
544 WHERE tc.constraint_type = 'PRIMARY KEY'
545 AND tc.table_name = '{}'
546 ) pk ON c.column_name = pk.column_name
547 WHERE c.table_name = '{}'
548 ORDER BY c.ordinal_position
549 "#,
550 table_name, table_name
551 )
552 }
553}
554
555#[derive(Debug, Clone)]
557pub enum DiscoveryError {
558 TableNotFound(String),
560 ConnectionError(String),
562 QueryError(String),
564 StatisticsDisabled,
566 RefreshFailed(String),
568}
569
570impl std::fmt::Display for DiscoveryError {
571 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
572 match self {
573 Self::TableNotFound(name) => write!(f, "Table not found: {}", name),
574 Self::ConnectionError(msg) => write!(f, "Connection error: {}", msg),
575 Self::QueryError(msg) => write!(f, "Query error: {}", msg),
576 Self::StatisticsDisabled => write!(f, "Statistics collection is disabled"),
577 Self::RefreshFailed(msg) => write!(f, "Cache refresh failed: {}", msg),
578 }
579 }
580}
581
582impl std::error::Error for DiscoveryError {}
583
584#[cfg(test)]
585mod tests {
586 use super::*;
587
588 #[tokio::test]
589 async fn test_discovery_config_default() {
590 let config = DiscoveryConfig::default();
591 assert!(config.enabled);
592 assert_eq!(config.schemas, vec!["public"]);
593 assert!(config.discover_relationships);
594 assert!(config.discover_indexes);
595 }
596
597 #[tokio::test]
598 async fn test_schema_discovery_new() {
599 let config = DiscoveryConfig::default();
600 let discovery = SchemaDiscovery::new(config);
601 assert!(discovery.needs_refresh().await);
602 }
603
604 #[tokio::test]
605 async fn test_discover_tables() {
606 let config = DiscoveryConfig::default();
607 let discovery = SchemaDiscovery::new(config);
608
609 let tables = discovery.discover().await.unwrap();
610 assert!(!tables.is_empty());
611
612 let users = tables.iter().find(|t| t.name == "users").unwrap();
613 assert_eq!(users.temperature, DataTemperature::Hot);
614 assert_eq!(users.workload, WorkloadType::OLTP);
615 }
616
617 #[tokio::test]
618 async fn test_discover_indexes() {
619 let config = DiscoveryConfig::default();
620 let discovery = SchemaDiscovery::new(config);
621
622 let indexes = discovery.discover_indexes("users").await.unwrap();
623 assert!(!indexes.is_empty());
624
625 let pkey = indexes.iter().find(|i| i.name.ends_with("_pkey")).unwrap();
626 assert!(pkey.is_unique);
627 }
628
629 #[tokio::test]
630 async fn test_discover_relationships() {
631 let config = DiscoveryConfig::default();
632 let discovery = SchemaDiscovery::new(config);
633
634 let rels = discovery.discover_relationships().await.unwrap();
635 assert!(!rels.is_empty());
636
637 let order_user = rels
638 .iter()
639 .find(|r| r.from_table == "orders" && r.to_table == "users")
640 .unwrap();
641 assert_eq!(order_user.relationship_type, RelationshipType::ManyToOne);
642 }
643
644 #[tokio::test]
645 async fn test_get_statistics() {
646 let config = DiscoveryConfig::default();
647 let discovery = SchemaDiscovery::new(config);
648
649 let stats = discovery.get_statistics("users").await.unwrap();
650 assert_eq!(stats.table_name, "users");
651 assert!(stats.row_count > 0);
652 }
653
654 #[tokio::test]
655 async fn test_infer_temperature() {
656 let config = DiscoveryConfig::default();
657 let discovery = SchemaDiscovery::new(config);
658
659 let hot_stats = TableStatistics {
661 table_name: "active_sessions".to_string(),
662 row_count: 10000,
663 size_bytes: 1_000_000,
664 index_size_bytes: 100_000,
665 seq_scan_count: 1000,
666 idx_scan_count: 50000,
667 n_tup_ins: 500,
668 n_tup_upd: 200,
669 n_tup_del: 100,
670 last_vacuum: None,
671 last_analyze: None,
672 };
673 assert_eq!(
674 discovery.infer_temperature(&hot_stats),
675 DataTemperature::Hot
676 );
677
678 let cold_stats = TableStatistics {
680 table_name: "audit_logs".to_string(),
681 row_count: 1_000_000,
682 size_bytes: 100_000_000,
683 index_size_bytes: 10_000_000,
684 seq_scan_count: 50,
685 idx_scan_count: 100,
686 n_tup_ins: 5,
687 n_tup_upd: 0,
688 n_tup_del: 0,
689 last_vacuum: None,
690 last_analyze: None,
691 };
692 assert_eq!(
693 discovery.infer_temperature(&cold_stats),
694 DataTemperature::Cold
695 );
696 }
697
698 #[tokio::test]
699 async fn test_infer_access_pattern() {
700 let config = DiscoveryConfig::default();
701 let discovery = SchemaDiscovery::new(config);
702
703 let point_stats = TableStatistics {
705 table_name: "users".to_string(),
706 row_count: 100000,
707 size_bytes: 10_000_000,
708 index_size_bytes: 1_000_000,
709 seq_scan_count: 10,
710 idx_scan_count: 10000,
711 n_tup_ins: 0,
712 n_tup_upd: 0,
713 n_tup_del: 0,
714 last_vacuum: None,
715 last_analyze: None,
716 };
717 assert_eq!(
718 discovery.infer_access_pattern(&point_stats),
719 AccessPattern::PointLookup
720 );
721
722 let scan_stats = TableStatistics {
724 table_name: "reports".to_string(),
725 row_count: 100000,
726 size_bytes: 10_000_000,
727 index_size_bytes: 1_000_000,
728 seq_scan_count: 1000,
729 idx_scan_count: 50,
730 n_tup_ins: 0,
731 n_tup_upd: 0,
732 n_tup_del: 0,
733 last_vacuum: None,
734 last_analyze: None,
735 };
736 assert_eq!(
737 discovery.infer_access_pattern(&scan_stats),
738 AccessPattern::FullScan
739 );
740 }
741
742 #[tokio::test]
743 async fn test_infer_workload() {
744 let config = DiscoveryConfig::default();
745 let discovery = SchemaDiscovery::new(config);
746
747 let oltp_stats = TableStatistics {
749 table_name: "orders".to_string(),
750 row_count: 100000,
751 size_bytes: 10_000_000,
752 index_size_bytes: 1_000_000,
753 seq_scan_count: 100,
754 idx_scan_count: 5000,
755 n_tup_ins: 200,
756 n_tup_upd: 50,
757 n_tup_del: 10,
758 last_vacuum: None,
759 last_analyze: None,
760 };
761 assert_eq!(discovery.infer_workload(&oltp_stats), WorkloadType::OLTP);
762
763 let olap_stats = TableStatistics {
765 table_name: "sales_history".to_string(),
766 row_count: 10_000_000,
767 size_bytes: 1_000_000_000,
768 index_size_bytes: 100_000_000,
769 seq_scan_count: 5000,
770 idx_scan_count: 100,
771 n_tup_ins: 10,
772 n_tup_upd: 0,
773 n_tup_del: 0,
774 last_vacuum: None,
775 last_analyze: None,
776 };
777 assert_eq!(discovery.infer_workload(&olap_stats), WorkloadType::OLAP);
778 }
779
780 #[tokio::test]
781 async fn test_cache_clear() {
782 let config = DiscoveryConfig::default();
783 let discovery = SchemaDiscovery::new(config);
784
785 discovery.discover().await.unwrap();
787 assert!(!discovery.needs_refresh().await);
788
789 discovery.clear_cache().await;
791 assert!(discovery.needs_refresh().await);
792 }
793
794 #[tokio::test]
795 async fn test_table_not_found() {
796 let config = DiscoveryConfig::default();
797 let discovery = SchemaDiscovery::new(config);
798
799 let result = discovery.discover_table("nonexistent_table").await;
800 assert!(matches!(result, Err(DiscoveryError::TableNotFound(_))));
801 }
802
803 #[tokio::test]
804 async fn test_statistics_disabled() {
805 let config = DiscoveryConfig {
806 sample_statistics: false,
807 ..Default::default()
808 };
809 let discovery = SchemaDiscovery::new(config);
810
811 let result = discovery.get_statistics("users").await;
812 assert!(matches!(result, Err(DiscoveryError::StatisticsDisabled)));
813 }
814
815 #[test]
816 fn test_discovery_error_display() {
817 let err = DiscoveryError::TableNotFound("users".to_string());
818 assert_eq!(err.to_string(), "Table not found: users");
819
820 let err = DiscoveryError::ConnectionError("timeout".to_string());
821 assert_eq!(err.to_string(), "Connection error: timeout");
822 }
823}