1use crate::algebra::{Binding, Solution, Term, Variable};
7use anyhow::{bail, Result};
8use std::collections::{HashMap, HashSet};
9
10#[derive(Debug, Clone, PartialEq)]
12pub struct ValuesClause {
13 pub variables: Vec<Variable>,
15 pub data: Vec<Vec<Option<Term>>>,
17}
18
19impl ValuesClause {
20 pub fn new(variables: Vec<Variable>) -> Self {
22 Self {
23 variables,
24 data: Vec::new(),
25 }
26 }
27
28 pub fn with_data(variables: Vec<Variable>, data: Vec<Vec<Option<Term>>>) -> Result<Self> {
30 for row in &data {
32 if row.len() != variables.len() {
33 bail!(
34 "VALUES data row has {} values but {} variables defined",
35 row.len(),
36 variables.len()
37 );
38 }
39 }
40
41 Ok(Self { variables, data })
42 }
43
44 pub fn add_row(&mut self, row: Vec<Option<Term>>) -> Result<()> {
46 if row.len() != self.variables.len() {
47 bail!(
48 "Row has {} values but {} variables defined",
49 row.len(),
50 self.variables.len()
51 );
52 }
53 self.data.push(row);
54 Ok(())
55 }
56
57 pub fn row_count(&self) -> usize {
59 self.data.len()
60 }
61
62 pub fn variable_count(&self) -> usize {
64 self.variables.len()
65 }
66
67 pub fn is_empty(&self) -> bool {
69 self.data.is_empty()
70 }
71
72 pub fn to_solution(&self) -> Solution {
74 let mut solution = Solution::new();
75
76 for row in &self.data {
77 let mut binding = Binding::new();
78 for (var, value) in self.variables.iter().zip(row.iter()) {
79 if let Some(term) = value {
80 binding.insert(var.clone(), term.clone());
81 }
82 }
84 solution.push(binding);
85 }
86
87 solution
88 }
89
90 pub fn statistics(&self) -> ValuesStatistics {
92 let mut undef_count = 0;
93 let mut distinct_values_per_var = HashMap::new();
94
95 for var in &self.variables {
96 distinct_values_per_var.insert(var.clone(), HashSet::new());
97 }
98
99 for row in &self.data {
100 for (var, value) in self.variables.iter().zip(row.iter()) {
101 if let Some(term) = value {
102 distinct_values_per_var
103 .get_mut(var)
104 .expect("variable should exist in initialized map")
105 .insert(term.clone());
106 } else {
107 undef_count += 1;
108 }
109 }
110 }
111
112 let selectivity: HashMap<Variable, f64> = distinct_values_per_var
113 .iter()
114 .map(|(var, values)| {
115 let selectivity = if self.data.is_empty() {
116 1.0
117 } else {
118 values.len() as f64 / self.data.len() as f64
119 };
120 (var.clone(), selectivity)
121 })
122 .collect();
123
124 ValuesStatistics {
125 row_count: self.data.len(),
126 variable_count: self.variables.len(),
127 undef_count,
128 selectivity,
129 estimated_memory: self.estimate_memory_usage(),
130 }
131 }
132
133 fn estimate_memory_usage(&self) -> usize {
134 self.data.len() * self.variables.len() * 32
136 }
137}
138
139#[derive(Debug, Clone)]
141pub struct ValuesStatistics {
142 pub row_count: usize,
143 pub variable_count: usize,
144 pub undef_count: usize,
145 pub selectivity: HashMap<Variable, f64>,
146 pub estimated_memory: usize,
147}
148
149pub struct ValuesOptimizer;
151
152impl ValuesOptimizer {
153 pub fn optimize(values: &ValuesClause) -> OptimizedValues {
155 let stats = values.statistics();
156
157 let strategy = if values.row_count() <= 10 {
159 ValuesExecutionStrategy::Inline
160 } else if values.row_count() <= 1000 {
161 ValuesExecutionStrategy::HashIndex
162 } else {
163 ValuesExecutionStrategy::Materialized
164 };
165
166 let pushable = values.row_count() < 100 && stats.undef_count == 0;
168
169 let most_selective = stats
171 .selectivity
172 .iter()
173 .min_by(|a, b| a.1.partial_cmp(b.1).unwrap_or(std::cmp::Ordering::Equal))
174 .map(|(var, _)| var.clone());
175
176 OptimizedValues {
177 original: values.clone(),
178 strategy,
179 pushable,
180 most_selective_var: most_selective,
181 statistics: stats,
182 }
183 }
184
185 pub fn suggest_variable_order(values: &ValuesClause) -> Vec<Variable> {
187 let stats = values.statistics();
188
189 let mut vars_with_selectivity: Vec<_> = stats
191 .selectivity
192 .iter()
193 .map(|(var, sel)| (var.clone(), *sel))
194 .collect();
195
196 vars_with_selectivity
197 .sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
198
199 vars_with_selectivity
200 .into_iter()
201 .map(|(var, _)| var)
202 .collect()
203 }
204}
205
206#[derive(Debug, Clone)]
208pub struct OptimizedValues {
209 pub original: ValuesClause,
210 pub strategy: ValuesExecutionStrategy,
211 pub pushable: bool,
212 pub most_selective_var: Option<Variable>,
213 pub statistics: ValuesStatistics,
214}
215
216#[derive(Debug, Clone, Copy, PartialEq, Eq)]
218pub enum ValuesExecutionStrategy {
219 Inline,
221 HashIndex,
223 Materialized,
225}
226
227pub struct IndexedValues {
229 values: ValuesClause,
230 indexes: HashMap<Variable, HashMap<Term, Vec<usize>>>,
232}
233
234impl IndexedValues {
235 pub fn new(values: ValuesClause) -> Self {
237 let mut indexes = HashMap::new();
238
239 for (var_idx, var) in values.variables.iter().enumerate() {
240 let mut var_index: HashMap<Term, Vec<usize>> = HashMap::new();
241
242 for (row_idx, row) in values.data.iter().enumerate() {
243 if let Some(Some(term)) = row.get(var_idx) {
244 var_index.entry(term.clone()).or_default().push(row_idx);
245 }
246 }
247
248 indexes.insert(var.clone(), var_index);
249 }
250
251 Self { values, indexes }
252 }
253
254 pub fn lookup(&self, var: &Variable, term: &Term) -> Vec<Binding> {
256 if let Some(var_index) = self.indexes.get(var) {
257 if let Some(row_indices) = var_index.get(term) {
258 return row_indices
259 .iter()
260 .filter_map(|&idx| self.row_to_binding(idx))
261 .collect();
262 }
263 }
264 Vec::new()
265 }
266
267 pub fn probe(&self, binding: &Binding) -> Vec<Binding> {
269 let mut candidates: Option<HashSet<usize>> = None;
270
271 for (var, term) in binding {
273 if let Some(var_index) = self.indexes.get(var) {
274 if let Some(row_indices) = var_index.get(term) {
275 let row_set: HashSet<usize> = row_indices.iter().copied().collect();
276
277 candidates = Some(if let Some(existing) = candidates {
278 existing.intersection(&row_set).copied().collect()
279 } else {
280 row_set
281 });
282 }
283 }
284 }
285
286 if let Some(candidate_rows) = candidates {
288 candidate_rows
289 .into_iter()
290 .filter_map(|idx| self.row_to_binding(idx))
291 .collect()
292 } else {
293 (0..self.values.data.len())
295 .filter_map(|idx| self.row_to_binding(idx))
296 .collect()
297 }
298 }
299
300 fn row_to_binding(&self, row_idx: usize) -> Option<Binding> {
301 let row = self.values.data.get(row_idx)?;
302 let mut binding = Binding::new();
303
304 for (var, value) in self.values.variables.iter().zip(row.iter()) {
305 if let Some(term) = value {
306 binding.insert(var.clone(), term.clone());
307 }
308 }
309
310 Some(binding)
311 }
312
313 pub fn all_bindings(&self) -> Vec<Binding> {
315 (0..self.values.data.len())
316 .filter_map(|idx| self.row_to_binding(idx))
317 .collect()
318 }
319
320 pub fn statistics(&self) -> ValuesStatistics {
322 self.values.statistics()
323 }
324}
325
326pub struct ValuesJoinOptimizer;
328
329impl ValuesJoinOptimizer {
330 pub fn optimize_join(
332 values: &ValuesClause,
333 other_variables: &HashSet<Variable>,
334 ) -> JoinStrategy {
335 let _stats = values.statistics();
336
337 let values_vars: HashSet<_> = values.variables.iter().collect();
339 let shared_vars: Vec<_> = values_vars
340 .intersection(&other_variables.iter().collect::<HashSet<_>>())
341 .map(|&v| v.clone())
342 .collect();
343
344 if shared_vars.is_empty() {
345 return JoinStrategy::CrossProduct;
346 }
347
348 if values.row_count() <= 10 {
350 JoinStrategy::NestedLoop
351 } else if shared_vars.len() == 1 {
352 JoinStrategy::IndexNestedLoop {
353 index_var: shared_vars[0].clone(),
354 }
355 } else {
356 JoinStrategy::HashJoin {
357 join_vars: shared_vars,
358 }
359 }
360 }
361
362 pub fn can_push_values(values: &ValuesClause, join_vars: &[Variable]) -> bool {
364 if values.row_count() >= 100 {
370 return false;
371 }
372
373 let stats = values.statistics();
374 if stats.undef_count > 0 {
375 return false;
377 }
378
379 values.variables.iter().any(|v| join_vars.contains(v))
381 }
382}
383
384#[derive(Debug, Clone, PartialEq, Eq)]
386pub enum JoinStrategy {
387 CrossProduct,
388 NestedLoop,
389 IndexNestedLoop { index_var: Variable },
390 HashJoin { join_vars: Vec<Variable> },
391}
392
393pub struct ValuesExecutor {
395 indexed_cache: HashMap<String, IndexedValues>,
396}
397
398impl ValuesExecutor {
399 pub fn new() -> Self {
401 Self {
402 indexed_cache: HashMap::new(),
403 }
404 }
405
406 pub fn execute(&mut self, values: &ValuesClause) -> Result<Solution> {
408 Ok(values.to_solution())
409 }
410
411 pub fn execute_indexed(&mut self, values: &ValuesClause) -> Result<IndexedValues> {
413 let cache_key = self.compute_cache_key(values);
414
415 if let Some(indexed) = self.indexed_cache.get(&cache_key) {
416 return Ok(IndexedValues {
417 values: indexed.values.clone(),
418 indexes: indexed.indexes.clone(),
419 });
420 }
421
422 let indexed = IndexedValues::new(values.clone());
423 self.indexed_cache
424 .insert(cache_key, indexed.clone_structure());
425
426 Ok(indexed)
427 }
428
429 pub fn join_indexed(&mut self, values: &ValuesClause, solution: Solution) -> Result<Solution> {
431 let indexed = self.execute_indexed(values)?;
432 let mut result = Solution::new();
433
434 for binding in solution {
435 let matches = indexed.probe(&binding);
437
438 for values_binding in &matches {
439 let mut merged = binding.clone();
441 for (var, term) in values_binding {
442 if let Some(existing) = merged.get(var) {
443 if existing != term {
444 continue;
446 }
447 } else {
448 merged.insert(var.clone(), term.clone());
449 }
450 }
451 result.push(merged);
452 }
453
454 if matches.is_empty() && !has_shared_vars(values, &binding) {
456 result.push(binding);
457 }
458 }
459
460 Ok(result)
461 }
462
463 fn compute_cache_key(&self, values: &ValuesClause) -> String {
464 format!("{:?}", values)
465 }
466}
467
468impl Default for ValuesExecutor {
469 fn default() -> Self {
470 Self::new()
471 }
472}
473
474impl IndexedValues {
475 fn clone_structure(&self) -> Self {
476 Self {
477 values: self.values.clone(),
478 indexes: self.indexes.clone(),
479 }
480 }
481}
482
483fn has_shared_vars(values: &ValuesClause, binding: &Binding) -> bool {
485 values.variables.iter().any(|v| binding.contains_key(v))
486}
487
488pub struct ValuesBuilder {
490 variables: Vec<Variable>,
491 rows: Vec<Vec<Option<Term>>>,
492}
493
494impl ValuesBuilder {
495 pub fn new() -> Self {
497 Self {
498 variables: Vec::new(),
499 rows: Vec::new(),
500 }
501 }
502
503 pub fn add_variable(mut self, var: Variable) -> Self {
505 self.variables.push(var);
506 self
507 }
508
509 pub fn add_variables(mut self, vars: Vec<Variable>) -> Self {
511 self.variables.extend(vars);
512 self
513 }
514
515 pub fn add_row(mut self, values: Vec<Option<Term>>) -> Result<Self> {
517 if values.len() != self.variables.len() {
518 bail!(
519 "Row has {} values but {} variables defined",
520 values.len(),
521 self.variables.len()
522 );
523 }
524 self.rows.push(values);
525 Ok(self)
526 }
527
528 pub fn build(self) -> Result<ValuesClause> {
530 ValuesClause::with_data(self.variables, self.rows)
531 }
532}
533
534impl Default for ValuesBuilder {
535 fn default() -> Self {
536 Self::new()
537 }
538}
539
540#[cfg(test)]
541mod tests {
542 use super::*;
543 use oxirs_core::model::NamedNode;
544
545 fn create_test_variable(name: &str) -> Variable {
546 Variable::new(name).unwrap()
547 }
548
549 fn create_test_iri(iri: &str) -> Term {
550 Term::Iri(NamedNode::new(iri).unwrap())
551 }
552
553 #[test]
554 fn test_values_clause_creation() {
555 let var_x = create_test_variable("x");
556 let var_y = create_test_variable("y");
557
558 let mut values = ValuesClause::new(vec![var_x.clone(), var_y.clone()]);
559
560 assert_eq!(values.variable_count(), 2);
561 assert_eq!(values.row_count(), 0);
562 assert!(values.is_empty());
563
564 values
565 .add_row(vec![
566 Some(create_test_iri("http://example.org/a")),
567 Some(create_test_iri("http://example.org/b")),
568 ])
569 .unwrap();
570
571 assert_eq!(values.row_count(), 1);
572 assert!(!values.is_empty());
573 }
574
575 #[test]
576 fn test_values_with_undef() {
577 let var_x = create_test_variable("x");
578 let var_y = create_test_variable("y");
579
580 let values = ValuesClause::with_data(
581 vec![var_x.clone(), var_y.clone()],
582 vec![
583 vec![
584 Some(create_test_iri("http://example.org/a")),
585 Some(create_test_iri("http://example.org/b")),
586 ],
587 vec![
588 Some(create_test_iri("http://example.org/c")),
589 None, ],
591 ],
592 )
593 .unwrap();
594
595 let solution = values.to_solution();
596 assert_eq!(solution.len(), 2);
597
598 assert_eq!(solution[0].len(), 2);
600
601 assert_eq!(solution[1].len(), 1);
603 assert!(solution[1].contains_key(&var_x));
604 assert!(!solution[1].contains_key(&var_y));
605 }
606
607 #[test]
608 fn test_values_statistics() {
609 let var_x = create_test_variable("x");
610 let values = ValuesClause::with_data(
611 vec![var_x.clone()],
612 vec![
613 vec![Some(create_test_iri("http://example.org/a"))],
614 vec![Some(create_test_iri("http://example.org/a"))],
615 vec![Some(create_test_iri("http://example.org/b"))],
616 ],
617 )
618 .unwrap();
619
620 let stats = values.statistics();
621 assert_eq!(stats.row_count, 3);
622 assert_eq!(stats.variable_count, 1);
623 assert_eq!(stats.undef_count, 0);
624
625 let selectivity = stats.selectivity.get(&var_x).unwrap();
627 assert!((selectivity - 0.666).abs() < 0.01);
628 }
629
630 #[test]
631 fn test_values_optimizer() {
632 let var_x = create_test_variable("x");
633 let values = ValuesClause::with_data(
634 vec![var_x],
635 vec![
636 vec![Some(create_test_iri("http://example.org/a"))],
637 vec![Some(create_test_iri("http://example.org/b"))],
638 ],
639 )
640 .unwrap();
641
642 let optimized = ValuesOptimizer::optimize(&values);
643 assert_eq!(optimized.strategy, ValuesExecutionStrategy::Inline);
644 assert!(optimized.pushable);
645 }
646
647 #[test]
648 fn test_indexed_values() {
649 let var_x = create_test_variable("x");
650 let var_y = create_test_variable("y");
651
652 let values = ValuesClause::with_data(
653 vec![var_x.clone(), var_y.clone()],
654 vec![
655 vec![
656 Some(create_test_iri("http://example.org/a")),
657 Some(create_test_iri("http://example.org/1")),
658 ],
659 vec![
660 Some(create_test_iri("http://example.org/b")),
661 Some(create_test_iri("http://example.org/2")),
662 ],
663 ],
664 )
665 .unwrap();
666
667 let indexed = IndexedValues::new(values);
668
669 let results = indexed.lookup(&var_x, &create_test_iri("http://example.org/a"));
671 assert_eq!(results.len(), 1);
672 assert_eq!(results[0].len(), 2);
673
674 let all = indexed.all_bindings();
676 assert_eq!(all.len(), 2);
677 }
678
679 #[test]
680 fn test_indexed_values_probe() {
681 let var_x = create_test_variable("x");
682 let var_y = create_test_variable("y");
683
684 let values = ValuesClause::with_data(
685 vec![var_x.clone(), var_y.clone()],
686 vec![
687 vec![
688 Some(create_test_iri("http://example.org/a")),
689 Some(create_test_iri("http://example.org/1")),
690 ],
691 vec![
692 Some(create_test_iri("http://example.org/a")),
693 Some(create_test_iri("http://example.org/2")),
694 ],
695 ],
696 )
697 .unwrap();
698
699 let indexed = IndexedValues::new(values);
700
701 let mut probe_binding = Binding::new();
703 probe_binding.insert(var_x.clone(), create_test_iri("http://example.org/a"));
704
705 let results = indexed.probe(&probe_binding);
706 assert_eq!(results.len(), 2); }
708
709 #[test]
710 fn test_values_join_optimizer() {
711 let var_x = create_test_variable("x");
712 let data: Vec<Vec<Option<Term>>> = (0..15)
714 .map(|i| {
715 vec![Some(create_test_iri(&format!(
716 "http://example.org/item{}",
717 i
718 )))]
719 })
720 .collect();
721 let values = ValuesClause::with_data(vec![var_x.clone()], data).unwrap();
722
723 let other_vars = [var_x.clone()].into_iter().collect();
724
725 let strategy = ValuesJoinOptimizer::optimize_join(&values, &other_vars);
726 assert_eq!(strategy, JoinStrategy::IndexNestedLoop { index_var: var_x });
727 }
728
729 #[test]
730 fn test_values_executor() {
731 let var_x = create_test_variable("x");
732 let values = ValuesClause::with_data(
733 vec![var_x],
734 vec![
735 vec![Some(create_test_iri("http://example.org/a"))],
736 vec![Some(create_test_iri("http://example.org/b"))],
737 ],
738 )
739 .unwrap();
740
741 let mut executor = ValuesExecutor::new();
742 let solution = executor.execute(&values).unwrap();
743
744 assert_eq!(solution.len(), 2);
745 }
746
747 #[test]
748 fn test_values_builder() {
749 let var_x = create_test_variable("x");
750 let var_y = create_test_variable("y");
751
752 let values = ValuesBuilder::new()
753 .add_variable(var_x)
754 .add_variable(var_y)
755 .add_row(vec![
756 Some(create_test_iri("http://example.org/a")),
757 Some(create_test_iri("http://example.org/1")),
758 ])
759 .unwrap()
760 .build()
761 .unwrap();
762
763 assert_eq!(values.variable_count(), 2);
764 assert_eq!(values.row_count(), 1);
765 }
766
767 #[test]
768 fn test_values_invalid_row() {
769 let var_x = create_test_variable("x");
770 let mut values = ValuesClause::new(vec![var_x]);
771
772 let result = values.add_row(vec![
774 Some(create_test_iri("http://example.org/a")),
775 Some(create_test_iri("http://example.org/b")),
776 ]);
777
778 assert!(result.is_err());
779 }
780
781 #[test]
782 fn test_variable_reordering() {
783 let var_x = create_test_variable("x");
784 let var_y = create_test_variable("y");
785
786 let values = ValuesClause::with_data(
787 vec![var_x.clone(), var_y.clone()],
788 vec![
789 vec![
790 Some(create_test_iri("http://example.org/a")),
791 Some(create_test_iri("http://example.org/1")),
792 ],
793 vec![
794 Some(create_test_iri("http://example.org/a")),
795 Some(create_test_iri("http://example.org/2")),
796 ],
797 vec![
798 Some(create_test_iri("http://example.org/b")),
799 Some(create_test_iri("http://example.org/3")),
800 ],
801 ],
802 )
803 .unwrap();
804
805 let ordered = ValuesOptimizer::suggest_variable_order(&values);
806 assert_eq!(ordered.len(), 2);
807 assert_eq!(ordered[0], var_x);
809 }
810
811 #[test]
812 fn test_can_push_values() {
813 let var_x = create_test_variable("x");
814 let values = ValuesClause::with_data(
815 vec![var_x.clone()],
816 vec![vec![Some(create_test_iri("http://example.org/a"))]],
817 )
818 .unwrap();
819
820 assert!(ValuesJoinOptimizer::can_push_values(&values, &[var_x]));
821 }
822
823 #[test]
824 fn test_large_values_strategy() {
825 let var_x = create_test_variable("x");
826 let mut data = Vec::new();
827 for i in 0..2000 {
828 data.push(vec![Some(create_test_iri(&format!(
829 "http://example.org/{}",
830 i
831 )))]);
832 }
833
834 let values = ValuesClause::with_data(vec![var_x], data).unwrap();
835 let optimized = ValuesOptimizer::optimize(&values);
836
837 assert_eq!(optimized.strategy, ValuesExecutionStrategy::Materialized);
838 assert!(!optimized.pushable); }
840}