1use crate::algebra::Term;
7use anyhow::{anyhow, Result};
8use oxirs_core::model::Variable;
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::sync::{Arc, RwLock};
12
13use scirs2_core::ndarray_ext::Array1;
15use scirs2_core::profiling::Profiler;
16use scirs2_core::random::{rng, Rng}; use scirs2_stats::{mean, std};
18
19pub type Solution = HashMap<Variable, Term>;
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
24pub enum MaterializationStrategy {
25 Streaming,
27 InMemory,
29 #[default]
31 Adaptive,
32 MemoryMapped,
34 Chunked,
36 Lazy,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct MaterializationConfig {
43 pub default_strategy: MaterializationStrategy,
45 pub memory_limit: usize,
47 pub adaptive_threshold: usize,
49 pub chunk_size: usize,
51 pub enable_profiling: bool,
53 pub estimated_result_size: Option<usize>,
55 pub enable_compression: bool,
57}
58
59impl Default for MaterializationConfig {
60 fn default() -> Self {
61 Self {
62 default_strategy: MaterializationStrategy::Adaptive,
63 memory_limit: 1024 * 1024 * 1024, adaptive_threshold: 10_000,
65 chunk_size: 1000,
66 enable_profiling: true,
67 estimated_result_size: None,
68 enable_compression: true,
69 }
70 }
71}
72
73#[derive(Debug, Clone, Default, Serialize, Deserialize)]
75pub struct MaterializationStats {
76 pub total_results: usize,
78 pub memory_used: usize,
80 pub disk_used: usize,
82 pub strategy_used: Option<MaterializationStrategy>,
84 pub materialization_time_ms: f64,
86 pub cache_hits: usize,
88 pub cache_misses: usize,
90}
91
92pub struct MaterializedResults {
94 strategy: MaterializationStrategy,
96 in_memory: Vec<Solution>,
98 stream_buffer: VecDeque<Solution>,
100 chunks: Vec<Vec<Solution>>,
102 lazy_cache: HashMap<usize, Solution>,
104 temp_file_path: Option<String>,
106 stats: Arc<RwLock<MaterializationStats>>,
108 config: MaterializationConfig,
110}
111
112impl MaterializedResults {
113 pub fn new(strategy: MaterializationStrategy, config: MaterializationConfig) -> Self {
115 Self {
116 strategy,
117 in_memory: Vec::new(),
118 stream_buffer: VecDeque::new(),
119 chunks: Vec::new(),
120 lazy_cache: HashMap::new(),
121 temp_file_path: None,
122 stats: Arc::new(RwLock::new(MaterializationStats {
123 strategy_used: Some(strategy),
124 ..Default::default()
125 })),
126 config,
127 }
128 }
129
130 pub fn add_solution(&mut self, solution: Solution) -> Result<()> {
132 match self.strategy {
133 MaterializationStrategy::InMemory => {
134 self.in_memory.push(solution);
135 self.update_stats();
136 }
137 MaterializationStrategy::Streaming => {
138 self.stream_buffer.push_back(solution);
139 if self.stream_buffer.len() > self.config.chunk_size {
141 self.stream_buffer.pop_front();
142 }
143 }
144 MaterializationStrategy::Adaptive => {
145 self.in_memory.push(solution);
146 if self.in_memory.len() > self.config.adaptive_threshold {
148 self.switch_to_disk()?;
149 }
150 }
151 MaterializationStrategy::Chunked => {
152 self.in_memory.push(solution);
154 if self.in_memory.len() >= self.config.chunk_size {
155 self.flush_chunk()?;
156 }
157 }
158 MaterializationStrategy::Lazy => {
159 let idx = self.lazy_cache.len();
161 self.lazy_cache.insert(idx, solution);
162 }
163 MaterializationStrategy::MemoryMapped => {
164 self.in_memory.push(solution);
166 }
167 }
168 Ok(())
169 }
170
171 pub fn get_solution(&mut self, index: usize) -> Option<&Solution> {
173 match self.strategy {
174 MaterializationStrategy::InMemory | MaterializationStrategy::Adaptive => {
175 self.in_memory.get(index)
176 }
177 MaterializationStrategy::Lazy => {
178 if self.lazy_cache.contains_key(&index) {
179 let mut stats = self
180 .stats
181 .write()
182 .expect("write lock should not be poisoned");
183 stats.cache_hits += 1;
184 drop(stats);
185 self.lazy_cache.get(&index)
186 } else {
187 let mut stats = self
188 .stats
189 .write()
190 .expect("write lock should not be poisoned");
191 stats.cache_misses += 1;
192 None
193 }
194 }
195 _ => None, }
197 }
198
199 pub fn len(&self) -> usize {
201 match self.strategy {
202 MaterializationStrategy::InMemory | MaterializationStrategy::Adaptive => {
203 self.in_memory.len()
204 }
205 MaterializationStrategy::Lazy => self.lazy_cache.len(),
206 MaterializationStrategy::Chunked => {
207 self.chunks.len() * self.config.chunk_size + self.in_memory.len()
208 }
209 _ => 0,
210 }
211 }
212
213 pub fn is_empty(&self) -> bool {
215 self.len() == 0
216 }
217
218 pub fn iter(&self) -> ResultIterator<'_> {
220 ResultIterator {
221 results: self,
222 current_index: 0,
223 }
224 }
225
226 fn flush_chunk(&mut self) -> Result<()> {
228 if self.in_memory.is_empty() {
229 return Ok(());
230 }
231
232 let chunk = std::mem::take(&mut self.in_memory);
234 self.chunks.push(chunk);
235
236 Ok(())
237 }
238
239 fn switch_to_disk(&mut self) -> Result<()> {
241 let serialized =
243 oxicode::serde::encode_to_vec(&self.in_memory, oxicode::config::standard())
244 .map_err(|e| anyhow!("Failed to serialize results: {}", e))?;
245
246 use std::env::temp_dir;
248 use std::fs::File;
249 use std::io::Write;
250
251 let random_id: u64 = rng().random();
252 let temp_path = temp_dir().join(format!("sparql_results_{}.bin", random_id));
253 let mut file = File::create(&temp_path)?;
254 file.write_all(&serialized)?;
255 drop(file);
256
257 self.temp_file_path = Some(temp_path.to_string_lossy().to_string());
258 self.in_memory.clear();
259 self.strategy = MaterializationStrategy::MemoryMapped;
260
261 let mut stats = self
262 .stats
263 .write()
264 .expect("write lock should not be poisoned");
265 stats.strategy_used = Some(MaterializationStrategy::MemoryMapped);
266 stats.disk_used = serialized.len();
267
268 Ok(())
269 }
270
271 fn update_stats(&self) {
273 let mut stats = self
274 .stats
275 .write()
276 .expect("write lock should not be poisoned");
277 stats.total_results = self.len();
278 stats.memory_used = self.in_memory.len() * std::mem::size_of::<Solution>();
280 }
281
282 pub fn get_stats(&self) -> MaterializationStats {
284 self.stats
285 .read()
286 .expect("read lock should not be poisoned")
287 .clone()
288 }
289
290 pub fn analyze_patterns(&self) -> Result<MaterializationAnalysis> {
292 if self.in_memory.is_empty() {
293 return Ok(MaterializationAnalysis::default());
294 }
295
296 let mut var_cardinalities: HashMap<String, Vec<f64>> = HashMap::new();
298
299 for solution in &self.in_memory {
300 for (var, _term) in solution.iter() {
301 let var_name = format!("{}", var);
302 var_cardinalities.entry(var_name).or_default().push(1.0); }
304 }
305
306 let mut analysis = MaterializationAnalysis::default();
308
309 for (var_name, counts) in var_cardinalities {
310 if !counts.is_empty() {
311 let arr = Array1::from_vec(counts.clone());
312 let arr_view = arr.view();
313
314 let mean_val = mean(&arr_view).unwrap_or(0.0);
315 let std_val = std(&arr_view, 1, None).unwrap_or(0.0);
316
317 analysis.variable_stats.insert(
318 var_name.clone(),
319 VariableStats {
320 mean_cardinality: mean_val,
321 std_cardinality: std_val,
322 total_occurrences: counts.len(),
323 },
324 );
325 }
326 }
327
328 analysis.total_solutions = self.in_memory.len();
329 analysis.estimated_memory = self.in_memory.len() * std::mem::size_of::<Solution>();
330
331 Ok(analysis)
332 }
333}
334
335pub struct ResultIterator<'a> {
337 results: &'a MaterializedResults,
338 current_index: usize,
339}
340
341impl<'a> Iterator for ResultIterator<'a> {
342 type Item = &'a Solution;
343
344 fn next(&mut self) -> Option<Self::Item> {
345 let solution = match self.results.strategy {
346 MaterializationStrategy::InMemory | MaterializationStrategy::Adaptive => {
347 self.results.in_memory.get(self.current_index)
348 }
349 _ => None,
350 };
351
352 if solution.is_some() {
353 self.current_index += 1;
354 }
355
356 solution
357 }
358}
359
360#[derive(Debug, Clone, Default)]
362pub struct MaterializationAnalysis {
363 pub total_solutions: usize,
365 pub estimated_memory: usize,
367 pub variable_stats: HashMap<String, VariableStats>,
369}
370
371#[derive(Debug, Clone)]
373pub struct VariableStats {
374 pub mean_cardinality: f64,
376 pub std_cardinality: f64,
378 pub total_occurrences: usize,
380}
381
382pub struct MaterializationSelector {
384 config: MaterializationConfig,
385 #[allow(dead_code)]
386 profiler: Option<Profiler>,
387}
388
389impl MaterializationSelector {
390 pub fn new(config: MaterializationConfig) -> Self {
392 let profiler = if config.enable_profiling {
393 Some(Profiler::new())
394 } else {
395 None
396 };
397
398 Self { config, profiler }
399 }
400
401 pub fn select_strategy(&self, estimated_results: Option<usize>) -> MaterializationStrategy {
403 let result_count = estimated_results.or(self.config.estimated_result_size);
405
406 match result_count {
407 Some(count) if count < 1000 => MaterializationStrategy::InMemory,
408 Some(count) if count < self.config.adaptive_threshold => {
409 MaterializationStrategy::InMemory
410 }
411 Some(count) if count < 100_000 => MaterializationStrategy::Chunked,
412 Some(_) => MaterializationStrategy::MemoryMapped,
413 None => MaterializationStrategy::Adaptive,
414 }
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421 use crate::algebra::{Literal, Term};
422
423 #[test]
424 fn test_in_memory_materialization() {
425 let config = MaterializationConfig::default();
426 let mut results = MaterializedResults::new(MaterializationStrategy::InMemory, config);
427
428 for i in 0..100 {
430 let mut solution = Solution::new();
431 let var = Variable::new(format!("x{}", i)).unwrap();
432 solution.insert(
433 var,
434 Term::Literal(Literal {
435 value: i.to_string(),
436 language: None,
437 datatype: None,
438 }),
439 );
440 results.add_solution(solution).unwrap();
441 }
442
443 assert_eq!(results.len(), 100);
444 assert!(results.get_solution(50).is_some());
445
446 let stats = results.get_stats();
447 assert_eq!(stats.total_results, 100);
448 }
449
450 #[test]
451 fn test_adaptive_materialization() {
452 let config = MaterializationConfig {
453 adaptive_threshold: 10,
454 ..Default::default()
455 };
456
457 let mut results = MaterializedResults::new(MaterializationStrategy::Adaptive, config);
458
459 for i in 0..20 {
461 let mut solution = Solution::new();
462 let var = Variable::new(format!("x{}", i)).unwrap();
463 solution.insert(
464 var,
465 Term::Literal(Literal {
466 value: i.to_string(),
467 language: None,
468 datatype: None,
469 }),
470 );
471 results.add_solution(solution).unwrap();
472 }
473
474 let stats = results.get_stats();
476 assert!(stats.strategy_used == Some(MaterializationStrategy::MemoryMapped));
477 }
478
479 #[test]
480 fn test_strategy_selection() {
481 let config = MaterializationConfig::default();
482 let selector = MaterializationSelector::new(config);
483
484 let strategy = selector.select_strategy(Some(100));
486 assert_eq!(strategy, MaterializationStrategy::InMemory);
487
488 let strategy = selector.select_strategy(Some(1_000_000));
490 assert_eq!(strategy, MaterializationStrategy::MemoryMapped);
491 }
492
493 #[test]
494 fn test_result_analysis() {
495 let config = MaterializationConfig::default();
496 let mut results = MaterializedResults::new(MaterializationStrategy::InMemory, config);
497
498 for i in 0..50 {
499 let mut solution = Solution::new();
500 let var = Variable::new("x".to_string()).unwrap();
501 solution.insert(
502 var,
503 Term::Literal(Literal {
504 value: i.to_string(),
505 language: None,
506 datatype: None,
507 }),
508 );
509 results.add_solution(solution).unwrap();
510 }
511
512 let analysis = results.analyze_patterns().unwrap();
513 assert_eq!(analysis.total_solutions, 50);
514 let has_x_var = analysis.variable_stats.keys().any(|k| k.contains("x"));
516 assert!(has_x_var);
517 }
518}