1use crate::error::{Result, TdbError};
15use parking_lot::RwLock;
16use std::collections::{HashMap, HashSet};
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21#[derive(Debug, Clone)]
23pub struct MaterializedViewConfig {
24 pub max_views: usize,
26 pub default_refresh_strategy: RefreshStrategy,
28 pub enable_auto_selection: bool,
30 pub max_view_size: usize,
32 pub view_expiration: Duration,
34}
35
36impl Default for MaterializedViewConfig {
37 fn default() -> Self {
38 Self {
39 max_views: 100,
40 default_refresh_strategy: RefreshStrategy::Deferred,
41 enable_auto_selection: true,
42 max_view_size: 1_000_000,
43 view_expiration: Duration::from_secs(3600), }
45 }
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum RefreshStrategy {
51 Immediate,
53 Deferred,
55 Manual,
57}
58
59#[derive(Debug, Clone)]
61pub struct MaterializedView {
62 pub id: u64,
64 pub name: String,
66 pub query_pattern: String,
68 pub refresh_strategy: RefreshStrategy,
70 pub is_valid: bool,
72 pub created_at: Instant,
74 pub last_refreshed: Instant,
76 pub result_count: usize,
78 pub data: Vec<Vec<u8>>,
81}
82
83impl MaterializedView {
84 pub fn new(id: u64, name: String, query_pattern: String, strategy: RefreshStrategy) -> Self {
86 let now = Instant::now();
87 Self {
88 id,
89 name,
90 query_pattern,
91 refresh_strategy: strategy,
92 is_valid: false,
93 created_at: now,
94 last_refreshed: now,
95 result_count: 0,
96 data: Vec::new(),
97 }
98 }
99
100 pub fn needs_refresh(&self, max_age: Duration) -> bool {
102 !self.is_valid || self.last_refreshed.elapsed() > max_age
103 }
104
105 pub fn invalidate(&mut self) {
107 self.is_valid = false;
108 }
109
110 pub fn refresh(&mut self, data: Vec<Vec<u8>>) {
112 self.data = data;
113 self.result_count = self.data.len();
114 self.last_refreshed = Instant::now();
115 self.is_valid = true;
116 }
117
118 pub fn age(&self) -> Duration {
120 self.last_refreshed.elapsed()
121 }
122}
123
124pub struct MaterializedViewManager {
126 config: MaterializedViewConfig,
128 views: RwLock<HashMap<u64, Arc<RwLock<MaterializedView>>>>,
130 name_to_id: RwLock<HashMap<String, u64>>,
132 pattern_index: RwLock<HashMap<String, HashSet<u64>>>,
134 next_view_id: AtomicU64,
136 stats: MaterializedViewStats,
138}
139
140impl MaterializedViewManager {
141 pub fn new(config: MaterializedViewConfig) -> Self {
143 Self {
144 config,
145 views: RwLock::new(HashMap::new()),
146 name_to_id: RwLock::new(HashMap::new()),
147 pattern_index: RwLock::new(HashMap::new()),
148 next_view_id: AtomicU64::new(1),
149 stats: MaterializedViewStats::default(),
150 }
151 }
152
153 pub fn create_view(
155 &self,
156 name: String,
157 query_pattern: String,
158 strategy: RefreshStrategy,
159 ) -> Result<u64> {
160 if self.name_to_id.read().contains_key(&name) {
162 return Err(TdbError::Other(format!(
163 "View with name '{}' already exists",
164 name
165 )));
166 }
167
168 if self.views.read().len() >= self.config.max_views {
170 return Err(TdbError::Other(format!(
171 "Maximum number of views ({}) reached",
172 self.config.max_views
173 )));
174 }
175
176 let view_id = self.next_view_id.fetch_add(1, Ordering::Relaxed);
177 let view = MaterializedView::new(view_id, name.clone(), query_pattern.clone(), strategy);
178
179 self.views
181 .write()
182 .insert(view_id, Arc::new(RwLock::new(view)));
183
184 self.name_to_id.write().insert(name, view_id);
186
187 self.pattern_index
189 .write()
190 .entry(query_pattern)
191 .or_default()
192 .insert(view_id);
193
194 self.stats
195 .total_views_created
196 .fetch_add(1, Ordering::Relaxed);
197
198 Ok(view_id)
199 }
200
201 pub fn drop_view(&self, view_id: u64) -> Result<()> {
203 let views = self.views.read();
204 let view = views
205 .get(&view_id)
206 .ok_or_else(|| TdbError::Other(format!("View {} not found", view_id)))?;
207
208 let view_data = view.read();
209 let name = view_data.name.clone();
210 let pattern = view_data.query_pattern.clone();
211 drop(view_data);
212 drop(views);
213
214 self.views.write().remove(&view_id);
216 self.name_to_id.write().remove(&name);
217
218 if let Some(view_set) = self.pattern_index.write().get_mut(&pattern) {
219 view_set.remove(&view_id);
220 }
221
222 self.stats
223 .total_views_dropped
224 .fetch_add(1, Ordering::Relaxed);
225
226 Ok(())
227 }
228
229 pub fn refresh_view(&self, view_id: u64, data: Vec<Vec<u8>>) -> Result<()> {
231 let views = self.views.read();
232 let view = views
233 .get(&view_id)
234 .ok_or_else(|| TdbError::Other(format!("View {} not found", view_id)))?;
235
236 let mut view_data = view.write();
237
238 if data.len() > self.config.max_view_size {
240 return Err(TdbError::Other(format!(
241 "View size {} exceeds limit {}",
242 data.len(),
243 self.config.max_view_size
244 )));
245 }
246
247 view_data.refresh(data);
248 self.stats.total_refreshes.fetch_add(1, Ordering::Relaxed);
249
250 Ok(())
251 }
252
253 pub fn invalidate_views(&self, affected_patterns: &[String]) {
255 let pattern_index = self.pattern_index.read();
256
257 for pattern in affected_patterns {
258 if let Some(view_ids) = pattern_index.get(pattern) {
259 for &view_id in view_ids {
260 if let Some(view) = self.views.read().get(&view_id) {
261 let mut view_data = view.write();
262 if view_data.refresh_strategy == RefreshStrategy::Immediate {
263 view_data.invalidate();
265 self.stats
266 .total_invalidations
267 .fetch_add(1, Ordering::Relaxed);
268 } else {
269 view_data.invalidate();
270 self.stats
271 .total_invalidations
272 .fetch_add(1, Ordering::Relaxed);
273 }
274 }
275 }
276 }
277 }
278 }
279
280 pub fn find_applicable_views(&self, query_pattern: &str) -> Vec<u64> {
282 let pattern_index = self.pattern_index.read();
283
284 pattern_index
285 .get(query_pattern)
286 .map(|view_ids| view_ids.iter().copied().collect())
287 .unwrap_or_default()
288 }
289
290 pub fn get_view(&self, view_id: u64) -> Option<Arc<RwLock<MaterializedView>>> {
292 self.views.read().get(&view_id).cloned()
293 }
294
295 pub fn get_view_by_name(&self, name: &str) -> Option<Arc<RwLock<MaterializedView>>> {
297 let name_to_id = self.name_to_id.read();
298 let view_id = name_to_id.get(name)?;
299 self.views.read().get(view_id).cloned()
300 }
301
302 pub fn list_views(&self) -> Vec<ViewInfo> {
304 self.views
305 .read()
306 .values()
307 .map(|view| {
308 let v = view.read();
309 ViewInfo {
310 id: v.id,
311 name: v.name.clone(),
312 query_pattern: v.query_pattern.clone(),
313 refresh_strategy: v.refresh_strategy,
314 is_valid: v.is_valid,
315 result_count: v.result_count,
316 age: v.age(),
317 }
318 })
319 .collect()
320 }
321
322 pub fn cleanup_expired_views(&self) -> usize {
324 if self.config.view_expiration.is_zero() {
325 return 0;
326 }
327
328 let expired_views: Vec<u64> = self
329 .views
330 .read()
331 .values()
332 .filter_map(|view| {
333 let v = view.read();
334 if v.age() > self.config.view_expiration {
335 Some(v.id)
336 } else {
337 None
338 }
339 })
340 .collect();
341
342 let count = expired_views.len();
343 for view_id in expired_views {
344 let _ = self.drop_view(view_id);
345 }
346
347 count
348 }
349
350 pub fn stats(&self) -> MaterializedViewManagerStats {
352 MaterializedViewManagerStats {
353 total_views: self.views.read().len(),
354 total_views_created: self.stats.total_views_created.load(Ordering::Relaxed),
355 total_views_dropped: self.stats.total_views_dropped.load(Ordering::Relaxed),
356 total_refreshes: self.stats.total_refreshes.load(Ordering::Relaxed),
357 total_invalidations: self.stats.total_invalidations.load(Ordering::Relaxed),
358 total_hits: self.stats.total_hits.load(Ordering::Relaxed),
359 total_misses: self.stats.total_misses.load(Ordering::Relaxed),
360 }
361 }
362
363 pub fn record_hit(&self) {
365 self.stats.total_hits.fetch_add(1, Ordering::Relaxed);
366 }
367
368 pub fn record_miss(&self) {
370 self.stats.total_misses.fetch_add(1, Ordering::Relaxed);
371 }
372}
373
374#[derive(Debug, Clone)]
376pub struct ViewInfo {
377 pub id: u64,
379 pub name: String,
381 pub query_pattern: String,
383 pub refresh_strategy: RefreshStrategy,
385 pub is_valid: bool,
387 pub result_count: usize,
389 pub age: Duration,
391}
392
393#[derive(Debug, Default)]
395struct MaterializedViewStats {
396 total_views_created: AtomicU64,
398 total_views_dropped: AtomicU64,
400 total_refreshes: AtomicU64,
402 total_invalidations: AtomicU64,
404 total_hits: AtomicU64,
406 total_misses: AtomicU64,
408}
409
410#[derive(Debug, Clone)]
412pub struct MaterializedViewManagerStats {
413 pub total_views: usize,
415 pub total_views_created: u64,
417 pub total_views_dropped: u64,
419 pub total_refreshes: u64,
421 pub total_invalidations: u64,
423 pub total_hits: u64,
425 pub total_misses: u64,
427}
428
429impl MaterializedViewManagerStats {
430 pub fn hit_rate(&self) -> f64 {
432 let total = self.total_hits + self.total_misses;
433 if total == 0 {
434 0.0
435 } else {
436 (self.total_hits as f64 / total as f64) * 100.0
437 }
438 }
439
440 pub fn avg_refreshes_per_view(&self) -> f64 {
442 if self.total_views_created == 0 {
443 0.0
444 } else {
445 self.total_refreshes as f64 / self.total_views_created as f64
446 }
447 }
448}
449
450#[cfg(test)]
451#[allow(clippy::field_reassign_with_default)]
452mod tests {
453 use super::*;
454
455 #[test]
456 fn test_materialized_view_creation() {
457 let view = MaterializedView::new(
458 1,
459 "test_view".to_string(),
460 "SELECT * WHERE { ?s ?p ?o }".to_string(),
461 RefreshStrategy::Deferred,
462 );
463
464 assert_eq!(view.id, 1);
465 assert_eq!(view.name, "test_view");
466 assert!(!view.is_valid);
467 assert_eq!(view.result_count, 0);
468 }
469
470 #[test]
471 fn test_view_refresh() {
472 let mut view = MaterializedView::new(
473 1,
474 "test".to_string(),
475 "pattern".to_string(),
476 RefreshStrategy::Deferred,
477 );
478
479 assert!(!view.is_valid);
480
481 let data = vec![vec![1, 2, 3], vec![4, 5, 6]];
482 view.refresh(data.clone());
483
484 assert!(view.is_valid);
485 assert_eq!(view.result_count, 2);
486 assert_eq!(view.data, data);
487 }
488
489 #[test]
490 fn test_view_invalidation() {
491 let mut view = MaterializedView::new(
492 1,
493 "test".to_string(),
494 "pattern".to_string(),
495 RefreshStrategy::Deferred,
496 );
497
498 view.refresh(vec![vec![1, 2, 3]]);
499 assert!(view.is_valid);
500
501 view.invalidate();
502 assert!(!view.is_valid);
503 }
504
505 #[test]
506 fn test_view_manager_creation() {
507 let config = MaterializedViewConfig::default();
508 let manager = MaterializedViewManager::new(config);
509
510 let stats = manager.stats();
511 assert_eq!(stats.total_views, 0);
512 assert_eq!(stats.total_views_created, 0);
513 }
514
515 #[test]
516 fn test_create_and_drop_view() {
517 let config = MaterializedViewConfig::default();
518 let manager = MaterializedViewManager::new(config);
519
520 let view_id = manager
522 .create_view(
523 "test_view".to_string(),
524 "SELECT * WHERE { ?s ?p ?o }".to_string(),
525 RefreshStrategy::Deferred,
526 )
527 .unwrap();
528
529 assert_eq!(view_id, 1);
530
531 let stats = manager.stats();
532 assert_eq!(stats.total_views, 1);
533 assert_eq!(stats.total_views_created, 1);
534
535 manager.drop_view(view_id).unwrap();
537
538 let stats = manager.stats();
539 assert_eq!(stats.total_views, 0);
540 assert_eq!(stats.total_views_dropped, 1);
541 }
542
543 #[test]
544 fn test_duplicate_view_name() {
545 let config = MaterializedViewConfig::default();
546 let manager = MaterializedViewManager::new(config);
547
548 manager
550 .create_view(
551 "dup".to_string(),
552 "pattern1".to_string(),
553 RefreshStrategy::Deferred,
554 )
555 .unwrap();
556
557 let result = manager.create_view(
559 "dup".to_string(),
560 "pattern2".to_string(),
561 RefreshStrategy::Deferred,
562 );
563
564 assert!(result.is_err());
565 }
566
567 #[test]
568 fn test_max_views_limit() {
569 let mut config = MaterializedViewConfig::default();
570 config.max_views = 2;
571
572 let manager = MaterializedViewManager::new(config);
573
574 manager
576 .create_view(
577 "view1".to_string(),
578 "pattern1".to_string(),
579 RefreshStrategy::Deferred,
580 )
581 .unwrap();
582 manager
583 .create_view(
584 "view2".to_string(),
585 "pattern2".to_string(),
586 RefreshStrategy::Deferred,
587 )
588 .unwrap();
589
590 let result = manager.create_view(
592 "view3".to_string(),
593 "pattern3".to_string(),
594 RefreshStrategy::Deferred,
595 );
596
597 assert!(result.is_err());
598 }
599
600 #[test]
601 fn test_refresh_view() {
602 let config = MaterializedViewConfig::default();
603 let manager = MaterializedViewManager::new(config);
604
605 let view_id = manager
606 .create_view(
607 "test".to_string(),
608 "pattern".to_string(),
609 RefreshStrategy::Deferred,
610 )
611 .unwrap();
612
613 let data = vec![vec![1, 2, 3], vec![4, 5, 6]];
614 manager.refresh_view(view_id, data.clone()).unwrap();
615
616 let view = manager.get_view(view_id).unwrap();
617 let view_data = view.read();
618
619 assert!(view_data.is_valid);
620 assert_eq!(view_data.result_count, 2);
621 }
622
623 #[test]
624 fn test_view_size_limit() {
625 let mut config = MaterializedViewConfig::default();
626 config.max_view_size = 2;
627
628 let manager = MaterializedViewManager::new(config);
629
630 let view_id = manager
631 .create_view(
632 "test".to_string(),
633 "pattern".to_string(),
634 RefreshStrategy::Deferred,
635 )
636 .unwrap();
637
638 let data = vec![vec![1], vec![2], vec![3]]; let result = manager.refresh_view(view_id, data);
641
642 assert!(result.is_err());
643 }
644
645 #[test]
646 fn test_invalidate_views() {
647 let config = MaterializedViewConfig::default();
648 let manager = MaterializedViewManager::new(config);
649
650 let view_id = manager
651 .create_view(
652 "test".to_string(),
653 "pattern1".to_string(),
654 RefreshStrategy::Deferred,
655 )
656 .unwrap();
657
658 manager.refresh_view(view_id, vec![vec![1, 2, 3]]).unwrap();
660
661 let view = manager.get_view(view_id).unwrap();
662 assert!(view.read().is_valid);
663
664 manager.invalidate_views(&["pattern1".to_string()]);
666
667 assert!(!view.read().is_valid);
668
669 let stats = manager.stats();
670 assert_eq!(stats.total_invalidations, 1);
671 }
672
673 #[test]
674 fn test_find_applicable_views() {
675 let config = MaterializedViewConfig::default();
676 let manager = MaterializedViewManager::new(config);
677
678 let view_id1 = manager
679 .create_view(
680 "view1".to_string(),
681 "pattern1".to_string(),
682 RefreshStrategy::Deferred,
683 )
684 .unwrap();
685 let view_id2 = manager
686 .create_view(
687 "view2".to_string(),
688 "pattern1".to_string(),
689 RefreshStrategy::Deferred,
690 )
691 .unwrap();
692
693 let applicable = manager.find_applicable_views("pattern1");
694
695 assert_eq!(applicable.len(), 2);
696 assert!(applicable.contains(&view_id1));
697 assert!(applicable.contains(&view_id2));
698 }
699
700 #[test]
701 fn test_get_view_by_name() {
702 let config = MaterializedViewConfig::default();
703 let manager = MaterializedViewManager::new(config);
704
705 manager
706 .create_view(
707 "myview".to_string(),
708 "pattern".to_string(),
709 RefreshStrategy::Deferred,
710 )
711 .unwrap();
712
713 let view = manager.get_view_by_name("myview");
714 assert!(view.is_some());
715
716 let view_arc = view.unwrap();
717 let view_data = view_arc.read();
718 assert_eq!(view_data.name, "myview");
719 }
720
721 #[test]
722 fn test_list_views() {
723 let config = MaterializedViewConfig::default();
724 let manager = MaterializedViewManager::new(config);
725
726 manager
727 .create_view(
728 "view1".to_string(),
729 "pattern1".to_string(),
730 RefreshStrategy::Deferred,
731 )
732 .unwrap();
733 manager
734 .create_view(
735 "view2".to_string(),
736 "pattern2".to_string(),
737 RefreshStrategy::Immediate,
738 )
739 .unwrap();
740
741 let views = manager.list_views();
742 assert_eq!(views.len(), 2);
743
744 let names: Vec<String> = views.iter().map(|v| v.name.clone()).collect();
745 assert!(names.contains(&"view1".to_string()));
746 assert!(names.contains(&"view2".to_string()));
747 }
748
749 #[test]
750 fn test_hit_miss_tracking() {
751 let config = MaterializedViewConfig::default();
752 let manager = MaterializedViewManager::new(config);
753
754 manager.record_hit();
755 manager.record_hit();
756 manager.record_miss();
757
758 let stats = manager.stats();
759 assert_eq!(stats.total_hits, 2);
760 assert_eq!(stats.total_misses, 1);
761 assert!((stats.hit_rate() - 66.67).abs() < 0.1);
762 }
763
764 #[test]
765 fn test_stats_calculations() {
766 let stats = MaterializedViewManagerStats {
767 total_views: 5,
768 total_views_created: 10,
769 total_views_dropped: 5,
770 total_refreshes: 30,
771 total_invalidations: 15,
772 total_hits: 80,
773 total_misses: 20,
774 };
775
776 assert_eq!(stats.hit_rate(), 80.0);
777 assert_eq!(stats.avg_refreshes_per_view(), 3.0);
778 }
779
780 #[test]
781 fn test_view_needs_refresh() {
782 let view = MaterializedView::new(
783 1,
784 "test".to_string(),
785 "pattern".to_string(),
786 RefreshStrategy::Deferred,
787 );
788
789 assert!(view.needs_refresh(Duration::from_secs(60)));
791
792 let mut view2 = view.clone();
794 view2.refresh(vec![vec![1, 2, 3]]);
795 assert!(!view2.needs_refresh(Duration::from_secs(3600)));
796 }
797}