oxirs_tdb/
materialized_views.rs

1//! Materialized Views for Query Acceleration
2//!
3//! This module provides materialized views (pre-computed query results) to accelerate
4//! common SPARQL queries. Views can be maintained incrementally or refreshed on-demand.
5//!
6//! Features:
7//! - View definition and storage
8//! - Automatic view maintenance on data changes
9//! - Query rewriting to use materialized views
10//! - Multiple refresh strategies (immediate, deferred, on-demand)
11//! - View invalidation tracking
12//! - View statistics and monitoring
13
14use crate::error::{Result, TdbError};
15use parking_lot::RwLock;
16use std::collections::{HashMap, HashSet};
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21/// Materialized view configuration
22#[derive(Debug, Clone)]
23pub struct MaterializedViewConfig {
24    /// Maximum number of views
25    pub max_views: usize,
26    /// Default refresh strategy
27    pub default_refresh_strategy: RefreshStrategy,
28    /// Enable automatic view selection for queries
29    pub enable_auto_selection: bool,
30    /// Maximum view size (number of results)
31    pub max_view_size: usize,
32    /// View expiration time (0 = never expire)
33    pub view_expiration: Duration,
34}
35
36impl Default for MaterializedViewConfig {
37    fn default() -> Self {
38        Self {
39            max_views: 100,
40            default_refresh_strategy: RefreshStrategy::Deferred,
41            enable_auto_selection: true,
42            max_view_size: 1_000_000,
43            view_expiration: Duration::from_secs(3600), // 1 hour
44        }
45    }
46}
47
48/// Refresh strategy for materialized views
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum RefreshStrategy {
51    /// Refresh immediately on data changes
52    Immediate,
53    /// Refresh periodically or on demand
54    Deferred,
55    /// Only refresh when explicitly requested
56    Manual,
57}
58
59/// Materialized view definition
60#[derive(Debug, Clone)]
61pub struct MaterializedView {
62    /// Unique view ID
63    pub id: u64,
64    /// View name
65    pub name: String,
66    /// Query pattern (simplified representation)
67    pub query_pattern: String,
68    /// Refresh strategy
69    pub refresh_strategy: RefreshStrategy,
70    /// Whether view is currently valid
71    pub is_valid: bool,
72    /// Creation time
73    pub created_at: Instant,
74    /// Last refresh time
75    pub last_refreshed: Instant,
76    /// Number of results in view
77    pub result_count: usize,
78    /// View data (in real implementation, this would be stored on disk)
79    /// For now, we store as opaque byte arrays representing serialized results
80    pub data: Vec<Vec<u8>>,
81}
82
83impl MaterializedView {
84    /// Create a new materialized view
85    pub fn new(id: u64, name: String, query_pattern: String, strategy: RefreshStrategy) -> Self {
86        let now = Instant::now();
87        Self {
88            id,
89            name,
90            query_pattern,
91            refresh_strategy: strategy,
92            is_valid: false,
93            created_at: now,
94            last_refreshed: now,
95            result_count: 0,
96            data: Vec::new(),
97        }
98    }
99
100    /// Check if view needs refresh
101    pub fn needs_refresh(&self, max_age: Duration) -> bool {
102        !self.is_valid || self.last_refreshed.elapsed() > max_age
103    }
104
105    /// Mark view as invalid (needs refresh)
106    pub fn invalidate(&mut self) {
107        self.is_valid = false;
108    }
109
110    /// Refresh view with new data
111    pub fn refresh(&mut self, data: Vec<Vec<u8>>) {
112        self.data = data;
113        self.result_count = self.data.len();
114        self.last_refreshed = Instant::now();
115        self.is_valid = true;
116    }
117
118    /// Get view age
119    pub fn age(&self) -> Duration {
120        self.last_refreshed.elapsed()
121    }
122}
123
124/// Materialized view manager
125pub struct MaterializedViewManager {
126    /// Configuration
127    config: MaterializedViewConfig,
128    /// Active views
129    views: RwLock<HashMap<u64, Arc<RwLock<MaterializedView>>>>,
130    /// View name to ID mapping
131    name_to_id: RwLock<HashMap<String, u64>>,
132    /// Query pattern to view IDs mapping (for query rewriting)
133    pattern_index: RwLock<HashMap<String, HashSet<u64>>>,
134    /// Next view ID
135    next_view_id: AtomicU64,
136    /// Statistics
137    stats: MaterializedViewStats,
138}
139
140impl MaterializedViewManager {
141    /// Create a new materialized view manager
142    pub fn new(config: MaterializedViewConfig) -> Self {
143        Self {
144            config,
145            views: RwLock::new(HashMap::new()),
146            name_to_id: RwLock::new(HashMap::new()),
147            pattern_index: RwLock::new(HashMap::new()),
148            next_view_id: AtomicU64::new(1),
149            stats: MaterializedViewStats::default(),
150        }
151    }
152
153    /// Create a new materialized view
154    pub fn create_view(
155        &self,
156        name: String,
157        query_pattern: String,
158        strategy: RefreshStrategy,
159    ) -> Result<u64> {
160        // Check if name already exists
161        if self.name_to_id.read().contains_key(&name) {
162            return Err(TdbError::Other(format!(
163                "View with name '{}' already exists",
164                name
165            )));
166        }
167
168        // Check view limit
169        if self.views.read().len() >= self.config.max_views {
170            return Err(TdbError::Other(format!(
171                "Maximum number of views ({}) reached",
172                self.config.max_views
173            )));
174        }
175
176        let view_id = self.next_view_id.fetch_add(1, Ordering::Relaxed);
177        let view = MaterializedView::new(view_id, name.clone(), query_pattern.clone(), strategy);
178
179        // Store view
180        self.views
181            .write()
182            .insert(view_id, Arc::new(RwLock::new(view)));
183
184        // Update name mapping
185        self.name_to_id.write().insert(name, view_id);
186
187        // Update pattern index
188        self.pattern_index
189            .write()
190            .entry(query_pattern)
191            .or_default()
192            .insert(view_id);
193
194        self.stats
195            .total_views_created
196            .fetch_add(1, Ordering::Relaxed);
197
198        Ok(view_id)
199    }
200
201    /// Drop a materialized view
202    pub fn drop_view(&self, view_id: u64) -> Result<()> {
203        let views = self.views.read();
204        let view = views
205            .get(&view_id)
206            .ok_or_else(|| TdbError::Other(format!("View {} not found", view_id)))?;
207
208        let view_data = view.read();
209        let name = view_data.name.clone();
210        let pattern = view_data.query_pattern.clone();
211        drop(view_data);
212        drop(views);
213
214        // Remove from all indexes
215        self.views.write().remove(&view_id);
216        self.name_to_id.write().remove(&name);
217
218        if let Some(view_set) = self.pattern_index.write().get_mut(&pattern) {
219            view_set.remove(&view_id);
220        }
221
222        self.stats
223            .total_views_dropped
224            .fetch_add(1, Ordering::Relaxed);
225
226        Ok(())
227    }
228
229    /// Refresh a specific view
230    pub fn refresh_view(&self, view_id: u64, data: Vec<Vec<u8>>) -> Result<()> {
231        let views = self.views.read();
232        let view = views
233            .get(&view_id)
234            .ok_or_else(|| TdbError::Other(format!("View {} not found", view_id)))?;
235
236        let mut view_data = view.write();
237
238        // Check size limit
239        if data.len() > self.config.max_view_size {
240            return Err(TdbError::Other(format!(
241                "View size {} exceeds limit {}",
242                data.len(),
243                self.config.max_view_size
244            )));
245        }
246
247        view_data.refresh(data);
248        self.stats.total_refreshes.fetch_add(1, Ordering::Relaxed);
249
250        Ok(())
251    }
252
253    /// Invalidate views affected by data changes
254    pub fn invalidate_views(&self, affected_patterns: &[String]) {
255        let pattern_index = self.pattern_index.read();
256
257        for pattern in affected_patterns {
258            if let Some(view_ids) = pattern_index.get(pattern) {
259                for &view_id in view_ids {
260                    if let Some(view) = self.views.read().get(&view_id) {
261                        let mut view_data = view.write();
262                        if view_data.refresh_strategy == RefreshStrategy::Immediate {
263                            // In real implementation, trigger immediate refresh
264                            view_data.invalidate();
265                            self.stats
266                                .total_invalidations
267                                .fetch_add(1, Ordering::Relaxed);
268                        } else {
269                            view_data.invalidate();
270                            self.stats
271                                .total_invalidations
272                                .fetch_add(1, Ordering::Relaxed);
273                        }
274                    }
275                }
276            }
277        }
278    }
279
280    /// Find views that can answer a query pattern
281    pub fn find_applicable_views(&self, query_pattern: &str) -> Vec<u64> {
282        let pattern_index = self.pattern_index.read();
283
284        pattern_index
285            .get(query_pattern)
286            .map(|view_ids| view_ids.iter().copied().collect())
287            .unwrap_or_default()
288    }
289
290    /// Get view by ID
291    pub fn get_view(&self, view_id: u64) -> Option<Arc<RwLock<MaterializedView>>> {
292        self.views.read().get(&view_id).cloned()
293    }
294
295    /// Get view by name
296    pub fn get_view_by_name(&self, name: &str) -> Option<Arc<RwLock<MaterializedView>>> {
297        let name_to_id = self.name_to_id.read();
298        let view_id = name_to_id.get(name)?;
299        self.views.read().get(view_id).cloned()
300    }
301
302    /// List all views
303    pub fn list_views(&self) -> Vec<ViewInfo> {
304        self.views
305            .read()
306            .values()
307            .map(|view| {
308                let v = view.read();
309                ViewInfo {
310                    id: v.id,
311                    name: v.name.clone(),
312                    query_pattern: v.query_pattern.clone(),
313                    refresh_strategy: v.refresh_strategy,
314                    is_valid: v.is_valid,
315                    result_count: v.result_count,
316                    age: v.age(),
317                }
318            })
319            .collect()
320    }
321
322    /// Cleanup expired views
323    pub fn cleanup_expired_views(&self) -> usize {
324        if self.config.view_expiration.is_zero() {
325            return 0;
326        }
327
328        let expired_views: Vec<u64> = self
329            .views
330            .read()
331            .values()
332            .filter_map(|view| {
333                let v = view.read();
334                if v.age() > self.config.view_expiration {
335                    Some(v.id)
336                } else {
337                    None
338                }
339            })
340            .collect();
341
342        let count = expired_views.len();
343        for view_id in expired_views {
344            let _ = self.drop_view(view_id);
345        }
346
347        count
348    }
349
350    /// Get manager statistics
351    pub fn stats(&self) -> MaterializedViewManagerStats {
352        MaterializedViewManagerStats {
353            total_views: self.views.read().len(),
354            total_views_created: self.stats.total_views_created.load(Ordering::Relaxed),
355            total_views_dropped: self.stats.total_views_dropped.load(Ordering::Relaxed),
356            total_refreshes: self.stats.total_refreshes.load(Ordering::Relaxed),
357            total_invalidations: self.stats.total_invalidations.load(Ordering::Relaxed),
358            total_hits: self.stats.total_hits.load(Ordering::Relaxed),
359            total_misses: self.stats.total_misses.load(Ordering::Relaxed),
360        }
361    }
362
363    /// Record a view hit (query answered by view)
364    pub fn record_hit(&self) {
365        self.stats.total_hits.fetch_add(1, Ordering::Relaxed);
366    }
367
368    /// Record a view miss (query not answered by view)
369    pub fn record_miss(&self) {
370        self.stats.total_misses.fetch_add(1, Ordering::Relaxed);
371    }
372}
373
374/// View information for listing
375#[derive(Debug, Clone)]
376pub struct ViewInfo {
377    /// View ID
378    pub id: u64,
379    /// View name
380    pub name: String,
381    /// Query pattern
382    pub query_pattern: String,
383    /// Refresh strategy
384    pub refresh_strategy: RefreshStrategy,
385    /// Whether view is valid
386    pub is_valid: bool,
387    /// Number of results
388    pub result_count: usize,
389    /// View age
390    pub age: Duration,
391}
392
393/// Materialized view statistics
394#[derive(Debug, Default)]
395struct MaterializedViewStats {
396    /// Total views created
397    total_views_created: AtomicU64,
398    /// Total views dropped
399    total_views_dropped: AtomicU64,
400    /// Total view refreshes
401    total_refreshes: AtomicU64,
402    /// Total view invalidations
403    total_invalidations: AtomicU64,
404    /// Total view hits (queries answered by views)
405    total_hits: AtomicU64,
406    /// Total view misses (queries not answered by views)
407    total_misses: AtomicU64,
408}
409
410/// Snapshot of materialized view manager statistics
411#[derive(Debug, Clone)]
412pub struct MaterializedViewManagerStats {
413    /// Current number of views
414    pub total_views: usize,
415    /// Total views created
416    pub total_views_created: u64,
417    /// Total views dropped
418    pub total_views_dropped: u64,
419    /// Total refreshes performed
420    pub total_refreshes: u64,
421    /// Total invalidations
422    pub total_invalidations: u64,
423    /// Total view hits
424    pub total_hits: u64,
425    /// Total view misses
426    pub total_misses: u64,
427}
428
429impl MaterializedViewManagerStats {
430    /// Calculate view hit rate
431    pub fn hit_rate(&self) -> f64 {
432        let total = self.total_hits + self.total_misses;
433        if total == 0 {
434            0.0
435        } else {
436            (self.total_hits as f64 / total as f64) * 100.0
437        }
438    }
439
440    /// Calculate average refreshes per view
441    pub fn avg_refreshes_per_view(&self) -> f64 {
442        if self.total_views_created == 0 {
443            0.0
444        } else {
445            self.total_refreshes as f64 / self.total_views_created as f64
446        }
447    }
448}
449
450#[cfg(test)]
451#[allow(clippy::field_reassign_with_default)]
452mod tests {
453    use super::*;
454
455    #[test]
456    fn test_materialized_view_creation() {
457        let view = MaterializedView::new(
458            1,
459            "test_view".to_string(),
460            "SELECT * WHERE { ?s ?p ?o }".to_string(),
461            RefreshStrategy::Deferred,
462        );
463
464        assert_eq!(view.id, 1);
465        assert_eq!(view.name, "test_view");
466        assert!(!view.is_valid);
467        assert_eq!(view.result_count, 0);
468    }
469
470    #[test]
471    fn test_view_refresh() {
472        let mut view = MaterializedView::new(
473            1,
474            "test".to_string(),
475            "pattern".to_string(),
476            RefreshStrategy::Deferred,
477        );
478
479        assert!(!view.is_valid);
480
481        let data = vec![vec![1, 2, 3], vec![4, 5, 6]];
482        view.refresh(data.clone());
483
484        assert!(view.is_valid);
485        assert_eq!(view.result_count, 2);
486        assert_eq!(view.data, data);
487    }
488
489    #[test]
490    fn test_view_invalidation() {
491        let mut view = MaterializedView::new(
492            1,
493            "test".to_string(),
494            "pattern".to_string(),
495            RefreshStrategy::Deferred,
496        );
497
498        view.refresh(vec![vec![1, 2, 3]]);
499        assert!(view.is_valid);
500
501        view.invalidate();
502        assert!(!view.is_valid);
503    }
504
505    #[test]
506    fn test_view_manager_creation() {
507        let config = MaterializedViewConfig::default();
508        let manager = MaterializedViewManager::new(config);
509
510        let stats = manager.stats();
511        assert_eq!(stats.total_views, 0);
512        assert_eq!(stats.total_views_created, 0);
513    }
514
515    #[test]
516    fn test_create_and_drop_view() {
517        let config = MaterializedViewConfig::default();
518        let manager = MaterializedViewManager::new(config);
519
520        // Create view
521        let view_id = manager
522            .create_view(
523                "test_view".to_string(),
524                "SELECT * WHERE { ?s ?p ?o }".to_string(),
525                RefreshStrategy::Deferred,
526            )
527            .unwrap();
528
529        assert_eq!(view_id, 1);
530
531        let stats = manager.stats();
532        assert_eq!(stats.total_views, 1);
533        assert_eq!(stats.total_views_created, 1);
534
535        // Drop view
536        manager.drop_view(view_id).unwrap();
537
538        let stats = manager.stats();
539        assert_eq!(stats.total_views, 0);
540        assert_eq!(stats.total_views_dropped, 1);
541    }
542
543    #[test]
544    fn test_duplicate_view_name() {
545        let config = MaterializedViewConfig::default();
546        let manager = MaterializedViewManager::new(config);
547
548        // Create first view
549        manager
550            .create_view(
551                "dup".to_string(),
552                "pattern1".to_string(),
553                RefreshStrategy::Deferred,
554            )
555            .unwrap();
556
557        // Try to create view with same name
558        let result = manager.create_view(
559            "dup".to_string(),
560            "pattern2".to_string(),
561            RefreshStrategy::Deferred,
562        );
563
564        assert!(result.is_err());
565    }
566
567    #[test]
568    fn test_max_views_limit() {
569        let mut config = MaterializedViewConfig::default();
570        config.max_views = 2;
571
572        let manager = MaterializedViewManager::new(config);
573
574        // Create two views (at limit)
575        manager
576            .create_view(
577                "view1".to_string(),
578                "pattern1".to_string(),
579                RefreshStrategy::Deferred,
580            )
581            .unwrap();
582        manager
583            .create_view(
584                "view2".to_string(),
585                "pattern2".to_string(),
586                RefreshStrategy::Deferred,
587            )
588            .unwrap();
589
590        // Third view should fail
591        let result = manager.create_view(
592            "view3".to_string(),
593            "pattern3".to_string(),
594            RefreshStrategy::Deferred,
595        );
596
597        assert!(result.is_err());
598    }
599
600    #[test]
601    fn test_refresh_view() {
602        let config = MaterializedViewConfig::default();
603        let manager = MaterializedViewManager::new(config);
604
605        let view_id = manager
606            .create_view(
607                "test".to_string(),
608                "pattern".to_string(),
609                RefreshStrategy::Deferred,
610            )
611            .unwrap();
612
613        let data = vec![vec![1, 2, 3], vec![4, 5, 6]];
614        manager.refresh_view(view_id, data.clone()).unwrap();
615
616        let view = manager.get_view(view_id).unwrap();
617        let view_data = view.read();
618
619        assert!(view_data.is_valid);
620        assert_eq!(view_data.result_count, 2);
621    }
622
623    #[test]
624    fn test_view_size_limit() {
625        let mut config = MaterializedViewConfig::default();
626        config.max_view_size = 2;
627
628        let manager = MaterializedViewManager::new(config);
629
630        let view_id = manager
631            .create_view(
632                "test".to_string(),
633                "pattern".to_string(),
634                RefreshStrategy::Deferred,
635            )
636            .unwrap();
637
638        // Try to refresh with too much data
639        let data = vec![vec![1], vec![2], vec![3]]; // 3 results > limit of 2
640        let result = manager.refresh_view(view_id, data);
641
642        assert!(result.is_err());
643    }
644
645    #[test]
646    fn test_invalidate_views() {
647        let config = MaterializedViewConfig::default();
648        let manager = MaterializedViewManager::new(config);
649
650        let view_id = manager
651            .create_view(
652                "test".to_string(),
653                "pattern1".to_string(),
654                RefreshStrategy::Deferred,
655            )
656            .unwrap();
657
658        // Refresh view
659        manager.refresh_view(view_id, vec![vec![1, 2, 3]]).unwrap();
660
661        let view = manager.get_view(view_id).unwrap();
662        assert!(view.read().is_valid);
663
664        // Invalidate
665        manager.invalidate_views(&["pattern1".to_string()]);
666
667        assert!(!view.read().is_valid);
668
669        let stats = manager.stats();
670        assert_eq!(stats.total_invalidations, 1);
671    }
672
673    #[test]
674    fn test_find_applicable_views() {
675        let config = MaterializedViewConfig::default();
676        let manager = MaterializedViewManager::new(config);
677
678        let view_id1 = manager
679            .create_view(
680                "view1".to_string(),
681                "pattern1".to_string(),
682                RefreshStrategy::Deferred,
683            )
684            .unwrap();
685        let view_id2 = manager
686            .create_view(
687                "view2".to_string(),
688                "pattern1".to_string(),
689                RefreshStrategy::Deferred,
690            )
691            .unwrap();
692
693        let applicable = manager.find_applicable_views("pattern1");
694
695        assert_eq!(applicable.len(), 2);
696        assert!(applicable.contains(&view_id1));
697        assert!(applicable.contains(&view_id2));
698    }
699
700    #[test]
701    fn test_get_view_by_name() {
702        let config = MaterializedViewConfig::default();
703        let manager = MaterializedViewManager::new(config);
704
705        manager
706            .create_view(
707                "myview".to_string(),
708                "pattern".to_string(),
709                RefreshStrategy::Deferred,
710            )
711            .unwrap();
712
713        let view = manager.get_view_by_name("myview");
714        assert!(view.is_some());
715
716        let view_arc = view.unwrap();
717        let view_data = view_arc.read();
718        assert_eq!(view_data.name, "myview");
719    }
720
721    #[test]
722    fn test_list_views() {
723        let config = MaterializedViewConfig::default();
724        let manager = MaterializedViewManager::new(config);
725
726        manager
727            .create_view(
728                "view1".to_string(),
729                "pattern1".to_string(),
730                RefreshStrategy::Deferred,
731            )
732            .unwrap();
733        manager
734            .create_view(
735                "view2".to_string(),
736                "pattern2".to_string(),
737                RefreshStrategy::Immediate,
738            )
739            .unwrap();
740
741        let views = manager.list_views();
742        assert_eq!(views.len(), 2);
743
744        let names: Vec<String> = views.iter().map(|v| v.name.clone()).collect();
745        assert!(names.contains(&"view1".to_string()));
746        assert!(names.contains(&"view2".to_string()));
747    }
748
749    #[test]
750    fn test_hit_miss_tracking() {
751        let config = MaterializedViewConfig::default();
752        let manager = MaterializedViewManager::new(config);
753
754        manager.record_hit();
755        manager.record_hit();
756        manager.record_miss();
757
758        let stats = manager.stats();
759        assert_eq!(stats.total_hits, 2);
760        assert_eq!(stats.total_misses, 1);
761        assert!((stats.hit_rate() - 66.67).abs() < 0.1);
762    }
763
764    #[test]
765    fn test_stats_calculations() {
766        let stats = MaterializedViewManagerStats {
767            total_views: 5,
768            total_views_created: 10,
769            total_views_dropped: 5,
770            total_refreshes: 30,
771            total_invalidations: 15,
772            total_hits: 80,
773            total_misses: 20,
774        };
775
776        assert_eq!(stats.hit_rate(), 80.0);
777        assert_eq!(stats.avg_refreshes_per_view(), 3.0);
778    }
779
780    #[test]
781    fn test_view_needs_refresh() {
782        let view = MaterializedView::new(
783            1,
784            "test".to_string(),
785            "pattern".to_string(),
786            RefreshStrategy::Deferred,
787        );
788
789        // Invalid view needs refresh
790        assert!(view.needs_refresh(Duration::from_secs(60)));
791
792        // Valid view might not need refresh if fresh
793        let mut view2 = view.clone();
794        view2.refresh(vec![vec![1, 2, 3]]);
795        assert!(!view2.needs_refresh(Duration::from_secs(3600)));
796    }
797}