Skip to main content

oxirs_arq/
materialization.rs

1//! Query Result Materialization Strategies
2//!
3//! This module provides sophisticated materialization strategies for SPARQL query results,
4//! balancing memory usage, latency, and throughput using scirs2-core features.
5
6use crate::algebra::Term;
7use anyhow::{anyhow, Result};
8use oxirs_core::model::Variable;
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::sync::{Arc, RwLock};
12
13// Use scirs2-core for advanced features
14use scirs2_core::ndarray_ext::Array1;
15use scirs2_core::profiling::Profiler;
16use scirs2_core::random::{rng, Rng}; // Use scirs2-core for random number generation
17use scirs2_stats::{mean, std};
18
19/// Solution type alias - mapping of variables to terms
20pub type Solution = HashMap<Variable, Term>;
21
22/// Materialization strategy determines how query results are stored and processed
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
24pub enum MaterializationStrategy {
25    /// Stream results without materialization (lowest memory, highest latency for reuse)
26    Streaming,
27    /// Materialize to memory (high memory, low latency)
28    InMemory,
29    /// Hybrid: Stream until threshold, then materialize
30    #[default]
31    Adaptive,
32    /// Materialize to disk with memory-mapped access
33    MemoryMapped,
34    /// Chunked processing with configurable chunk size
35    Chunked,
36    /// Lazy evaluation with caching
37    Lazy,
38}
39
40/// Configuration for materialization
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct MaterializationConfig {
43    /// Default strategy
44    pub default_strategy: MaterializationStrategy,
45    /// Memory limit for in-memory materialization (bytes)
46    pub memory_limit: usize,
47    /// Threshold for adaptive switching (number of results)
48    pub adaptive_threshold: usize,
49    /// Chunk size for chunked processing
50    pub chunk_size: usize,
51    /// Enable profiling for materialization decisions
52    pub enable_profiling: bool,
53    /// Estimated result size for strategy selection
54    pub estimated_result_size: Option<usize>,
55    /// Enable compression for disk-based materialization
56    pub enable_compression: bool,
57}
58
59impl Default for MaterializationConfig {
60    fn default() -> Self {
61        Self {
62            default_strategy: MaterializationStrategy::Adaptive,
63            memory_limit: 1024 * 1024 * 1024, // 1GB
64            adaptive_threshold: 10_000,
65            chunk_size: 1000,
66            enable_profiling: true,
67            estimated_result_size: None,
68            enable_compression: true,
69        }
70    }
71}
72
73/// Materialization statistics
74#[derive(Debug, Clone, Default, Serialize, Deserialize)]
75pub struct MaterializationStats {
76    /// Total results materialized
77    pub total_results: usize,
78    /// Memory used (bytes)
79    pub memory_used: usize,
80    /// Disk space used (bytes)
81    pub disk_used: usize,
82    /// Strategy used
83    pub strategy_used: Option<MaterializationStrategy>,
84    /// Time taken for materialization (ms)
85    pub materialization_time_ms: f64,
86    /// Number of cache hits
87    pub cache_hits: usize,
88    /// Number of cache misses
89    pub cache_misses: usize,
90}
91
92/// Materialized result container
93pub struct MaterializedResults {
94    /// Strategy used
95    strategy: MaterializationStrategy,
96    /// In-memory results (for InMemory strategy)
97    in_memory: Vec<Solution>,
98    /// Streaming iterator state (for Streaming strategy)
99    stream_buffer: VecDeque<Solution>,
100    /// Chunked results (for Chunked strategy)
101    chunks: Vec<Vec<Solution>>,
102    /// Lazy results with cache (for Lazy strategy)
103    lazy_cache: HashMap<usize, Solution>,
104    /// Temporary file path for memory-mapped results
105    temp_file_path: Option<String>,
106    /// Statistics
107    stats: Arc<RwLock<MaterializationStats>>,
108    /// Configuration
109    config: MaterializationConfig,
110}
111
112impl MaterializedResults {
113    /// Create new materialized results with the given strategy
114    pub fn new(strategy: MaterializationStrategy, config: MaterializationConfig) -> Self {
115        Self {
116            strategy,
117            in_memory: Vec::new(),
118            stream_buffer: VecDeque::new(),
119            chunks: Vec::new(),
120            lazy_cache: HashMap::new(),
121            temp_file_path: None,
122            stats: Arc::new(RwLock::new(MaterializationStats {
123                strategy_used: Some(strategy),
124                ..Default::default()
125            })),
126            config,
127        }
128    }
129
130    /// Add a solution to the materialized results
131    pub fn add_solution(&mut self, solution: Solution) -> Result<()> {
132        match self.strategy {
133            MaterializationStrategy::InMemory => {
134                self.in_memory.push(solution);
135                self.update_stats();
136            }
137            MaterializationStrategy::Streaming => {
138                self.stream_buffer.push_back(solution);
139                // Keep buffer bounded
140                if self.stream_buffer.len() > self.config.chunk_size {
141                    self.stream_buffer.pop_front();
142                }
143            }
144            MaterializationStrategy::Adaptive => {
145                self.in_memory.push(solution);
146                // Check if we should switch strategy
147                if self.in_memory.len() > self.config.adaptive_threshold {
148                    self.switch_to_disk()?;
149                }
150            }
151            MaterializationStrategy::Chunked => {
152                // Add to current chunk
153                self.in_memory.push(solution);
154                if self.in_memory.len() >= self.config.chunk_size {
155                    self.flush_chunk()?;
156                }
157            }
158            MaterializationStrategy::Lazy => {
159                // Cache only, actual data loaded on demand
160                let idx = self.lazy_cache.len();
161                self.lazy_cache.insert(idx, solution);
162            }
163            MaterializationStrategy::MemoryMapped => {
164                // For now, collect in memory and flush to mmap later
165                self.in_memory.push(solution);
166            }
167        }
168        Ok(())
169    }
170
171    /// Get solution at index
172    pub fn get_solution(&mut self, index: usize) -> Option<&Solution> {
173        match self.strategy {
174            MaterializationStrategy::InMemory | MaterializationStrategy::Adaptive => {
175                self.in_memory.get(index)
176            }
177            MaterializationStrategy::Lazy => {
178                if self.lazy_cache.contains_key(&index) {
179                    let mut stats = self
180                        .stats
181                        .write()
182                        .expect("write lock should not be poisoned");
183                    stats.cache_hits += 1;
184                    drop(stats);
185                    self.lazy_cache.get(&index)
186                } else {
187                    let mut stats = self
188                        .stats
189                        .write()
190                        .expect("write lock should not be poisoned");
191                    stats.cache_misses += 1;
192                    None
193                }
194            }
195            _ => None, // Other strategies don't support random access
196        }
197    }
198
199    /// Get total number of results
200    pub fn len(&self) -> usize {
201        match self.strategy {
202            MaterializationStrategy::InMemory | MaterializationStrategy::Adaptive => {
203                self.in_memory.len()
204            }
205            MaterializationStrategy::Lazy => self.lazy_cache.len(),
206            MaterializationStrategy::Chunked => {
207                self.chunks.len() * self.config.chunk_size + self.in_memory.len()
208            }
209            _ => 0,
210        }
211    }
212
213    /// Check if results are empty
214    pub fn is_empty(&self) -> bool {
215        self.len() == 0
216    }
217
218    /// Get iterator over all results
219    pub fn iter(&self) -> ResultIterator<'_> {
220        ResultIterator {
221            results: self,
222            current_index: 0,
223        }
224    }
225
226    /// Flush current chunk to disk
227    fn flush_chunk(&mut self) -> Result<()> {
228        if self.in_memory.is_empty() {
229            return Ok(());
230        }
231
232        // Store chunk in memory for now (simplified implementation)
233        let chunk = std::mem::take(&mut self.in_memory);
234        self.chunks.push(chunk);
235
236        Ok(())
237    }
238
239    /// Switch from in-memory to disk-based storage
240    fn switch_to_disk(&mut self) -> Result<()> {
241        // Serialize all in-memory data
242        let serialized =
243            oxicode::serde::encode_to_vec(&self.in_memory, oxicode::config::standard())
244                .map_err(|e| anyhow!("Failed to serialize results: {}", e))?;
245
246        // Create memory-mapped file
247        use std::env::temp_dir;
248        use std::fs::File;
249        use std::io::Write;
250
251        let random_id: u64 = rng().random();
252        let temp_path = temp_dir().join(format!("sparql_results_{}.bin", random_id));
253        let mut file = File::create(&temp_path)?;
254        file.write_all(&serialized)?;
255        drop(file);
256
257        self.temp_file_path = Some(temp_path.to_string_lossy().to_string());
258        self.in_memory.clear();
259        self.strategy = MaterializationStrategy::MemoryMapped;
260
261        let mut stats = self
262            .stats
263            .write()
264            .expect("write lock should not be poisoned");
265        stats.strategy_used = Some(MaterializationStrategy::MemoryMapped);
266        stats.disk_used = serialized.len();
267
268        Ok(())
269    }
270
271    /// Update statistics
272    fn update_stats(&self) {
273        let mut stats = self
274            .stats
275            .write()
276            .expect("write lock should not be poisoned");
277        stats.total_results = self.len();
278        // Estimate memory usage (simplified)
279        stats.memory_used = self.in_memory.len() * std::mem::size_of::<Solution>();
280    }
281
282    /// Get statistics
283    pub fn get_stats(&self) -> MaterializationStats {
284        self.stats
285            .read()
286            .expect("read lock should not be poisoned")
287            .clone()
288    }
289
290    /// Analyze result patterns using scirs2-stats
291    pub fn analyze_patterns(&self) -> Result<MaterializationAnalysis> {
292        if self.in_memory.is_empty() {
293            return Ok(MaterializationAnalysis::default());
294        }
295
296        // Analyze cardinality distribution across variables
297        let mut var_cardinalities: HashMap<String, Vec<f64>> = HashMap::new();
298
299        for solution in &self.in_memory {
300            for (var, _term) in solution.iter() {
301                let var_name = format!("{}", var);
302                var_cardinalities.entry(var_name).or_default().push(1.0); // Count occurrences
303            }
304        }
305
306        // Calculate statistics for each variable using scirs2-stats
307        let mut analysis = MaterializationAnalysis::default();
308
309        for (var_name, counts) in var_cardinalities {
310            if !counts.is_empty() {
311                let arr = Array1::from_vec(counts.clone());
312                let arr_view = arr.view();
313
314                let mean_val = mean(&arr_view).unwrap_or(0.0);
315                let std_val = std(&arr_view, 1, None).unwrap_or(0.0);
316
317                analysis.variable_stats.insert(
318                    var_name.clone(),
319                    VariableStats {
320                        mean_cardinality: mean_val,
321                        std_cardinality: std_val,
322                        total_occurrences: counts.len(),
323                    },
324                );
325            }
326        }
327
328        analysis.total_solutions = self.in_memory.len();
329        analysis.estimated_memory = self.in_memory.len() * std::mem::size_of::<Solution>();
330
331        Ok(analysis)
332    }
333}
334
335/// Iterator over materialized results
336pub struct ResultIterator<'a> {
337    results: &'a MaterializedResults,
338    current_index: usize,
339}
340
341impl<'a> Iterator for ResultIterator<'a> {
342    type Item = &'a Solution;
343
344    fn next(&mut self) -> Option<Self::Item> {
345        let solution = match self.results.strategy {
346            MaterializationStrategy::InMemory | MaterializationStrategy::Adaptive => {
347                self.results.in_memory.get(self.current_index)
348            }
349            _ => None,
350        };
351
352        if solution.is_some() {
353            self.current_index += 1;
354        }
355
356        solution
357    }
358}
359
360/// Analysis of materialized results
361#[derive(Debug, Clone, Default)]
362pub struct MaterializationAnalysis {
363    /// Total number of solutions
364    pub total_solutions: usize,
365    /// Estimated memory usage
366    pub estimated_memory: usize,
367    /// Statistics per variable
368    pub variable_stats: HashMap<String, VariableStats>,
369}
370
371/// Statistics for a single variable
372#[derive(Debug, Clone)]
373pub struct VariableStats {
374    /// Mean cardinality
375    pub mean_cardinality: f64,
376    /// Standard deviation of cardinality
377    pub std_cardinality: f64,
378    /// Total occurrences
379    pub total_occurrences: usize,
380}
381
382/// Materialization strategy selector
383pub struct MaterializationSelector {
384    config: MaterializationConfig,
385    #[allow(dead_code)]
386    profiler: Option<Profiler>,
387}
388
389impl MaterializationSelector {
390    /// Create a new selector
391    pub fn new(config: MaterializationConfig) -> Self {
392        let profiler = if config.enable_profiling {
393            Some(Profiler::new())
394        } else {
395            None
396        };
397
398        Self { config, profiler }
399    }
400
401    /// Select the best materialization strategy based on query characteristics
402    pub fn select_strategy(&self, estimated_results: Option<usize>) -> MaterializationStrategy {
403        // Use estimated results or fall back to query analysis
404        let result_count = estimated_results.or(self.config.estimated_result_size);
405
406        match result_count {
407            Some(count) if count < 1000 => MaterializationStrategy::InMemory,
408            Some(count) if count < self.config.adaptive_threshold => {
409                MaterializationStrategy::InMemory
410            }
411            Some(count) if count < 100_000 => MaterializationStrategy::Chunked,
412            Some(_) => MaterializationStrategy::MemoryMapped,
413            None => MaterializationStrategy::Adaptive,
414        }
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421    use crate::algebra::{Literal, Term};
422
423    #[test]
424    fn test_in_memory_materialization() {
425        let config = MaterializationConfig::default();
426        let mut results = MaterializedResults::new(MaterializationStrategy::InMemory, config);
427
428        // Add some solutions
429        for i in 0..100 {
430            let mut solution = Solution::new();
431            let var = Variable::new(format!("x{}", i)).unwrap();
432            solution.insert(
433                var,
434                Term::Literal(Literal {
435                    value: i.to_string(),
436                    language: None,
437                    datatype: None,
438                }),
439            );
440            results.add_solution(solution).unwrap();
441        }
442
443        assert_eq!(results.len(), 100);
444        assert!(results.get_solution(50).is_some());
445
446        let stats = results.get_stats();
447        assert_eq!(stats.total_results, 100);
448    }
449
450    #[test]
451    fn test_adaptive_materialization() {
452        let config = MaterializationConfig {
453            adaptive_threshold: 10,
454            ..Default::default()
455        };
456
457        let mut results = MaterializedResults::new(MaterializationStrategy::Adaptive, config);
458
459        // Add solutions beyond threshold
460        for i in 0..20 {
461            let mut solution = Solution::new();
462            let var = Variable::new(format!("x{}", i)).unwrap();
463            solution.insert(
464                var,
465                Term::Literal(Literal {
466                    value: i.to_string(),
467                    language: None,
468                    datatype: None,
469                }),
470            );
471            results.add_solution(solution).unwrap();
472        }
473
474        // Strategy should have switched
475        let stats = results.get_stats();
476        assert!(stats.strategy_used == Some(MaterializationStrategy::MemoryMapped));
477    }
478
479    #[test]
480    fn test_strategy_selection() {
481        let config = MaterializationConfig::default();
482        let selector = MaterializationSelector::new(config);
483
484        // Small result set
485        let strategy = selector.select_strategy(Some(100));
486        assert_eq!(strategy, MaterializationStrategy::InMemory);
487
488        // Large result set
489        let strategy = selector.select_strategy(Some(1_000_000));
490        assert_eq!(strategy, MaterializationStrategy::MemoryMapped);
491    }
492
493    #[test]
494    fn test_result_analysis() {
495        let config = MaterializationConfig::default();
496        let mut results = MaterializedResults::new(MaterializationStrategy::InMemory, config);
497
498        for i in 0..50 {
499            let mut solution = Solution::new();
500            let var = Variable::new("x".to_string()).unwrap();
501            solution.insert(
502                var,
503                Term::Literal(Literal {
504                    value: i.to_string(),
505                    language: None,
506                    datatype: None,
507                }),
508            );
509            results.add_solution(solution).unwrap();
510        }
511
512        let analysis = results.analyze_patterns().unwrap();
513        assert_eq!(analysis.total_solutions, 50);
514        // Variable name format is "?x"
515        let has_x_var = analysis.variable_stats.keys().any(|k| k.contains("x"));
516        assert!(has_x_var);
517    }
518}