1use crate::algebra::Variable;
42use anyhow::{anyhow, Result};
43use regex::Regex;
44#[allow(unused_imports)]
46use scirs2_core::metrics::{Counter, Timer};
47use serde::{Deserialize, Serialize};
48use std::collections::HashMap;
49use std::sync::atomic::{AtomicU64, Ordering};
50use std::sync::{Arc, OnceLock};
51use std::time::Duration;
52
53#[derive(Debug, Clone, Default, Serialize, Deserialize)]
55pub struct QueryHints {
56 pub join_hints: Vec<JoinHint>,
58 pub index_hints: Vec<IndexHint>,
60 pub cardinality_hints: Vec<CardinalityHint>,
62 pub parallelism_hints: Option<ParallelismHint>,
64 pub materialization_hints: Vec<MaterializationHint>,
66 pub timeout_hint: Option<Duration>,
68 pub memory_hint: Option<MemoryHint>,
70 pub cache_hints: Option<CacheHint>,
72 pub join_order_hint: Option<JoinOrderHint>,
74 pub filter_hints: Vec<FilterHint>,
76 pub custom_directives: HashMap<String, String>,
78}
79
80impl QueryHints {
81 pub fn new() -> Self {
83 Self::default()
84 }
85
86 pub fn builder() -> QueryHintsBuilder {
88 QueryHintsBuilder::new()
89 }
90
91 pub fn is_empty(&self) -> bool {
93 self.join_hints.is_empty()
94 && self.index_hints.is_empty()
95 && self.cardinality_hints.is_empty()
96 && self.parallelism_hints.is_none()
97 && self.materialization_hints.is_empty()
98 && self.timeout_hint.is_none()
99 && self.memory_hint.is_none()
100 && self.cache_hints.is_none()
101 && self.join_order_hint.is_none()
102 && self.filter_hints.is_empty()
103 && self.custom_directives.is_empty()
104 }
105
106 pub fn hint_count(&self) -> usize {
108 let mut count = self.join_hints.len()
109 + self.index_hints.len()
110 + self.cardinality_hints.len()
111 + self.materialization_hints.len()
112 + self.filter_hints.len()
113 + self.custom_directives.len();
114
115 if self.parallelism_hints.is_some() {
116 count += 1;
117 }
118 if self.timeout_hint.is_some() {
119 count += 1;
120 }
121 if self.memory_hint.is_some() {
122 count += 1;
123 }
124 if self.cache_hints.is_some() {
125 count += 1;
126 }
127 if self.join_order_hint.is_some() {
128 count += 1;
129 }
130 count
131 }
132
133 pub fn merge(&mut self, other: QueryHints) {
135 self.join_hints.extend(other.join_hints);
136 self.index_hints.extend(other.index_hints);
137 self.cardinality_hints.extend(other.cardinality_hints);
138 self.materialization_hints
139 .extend(other.materialization_hints);
140 self.filter_hints.extend(other.filter_hints);
141 self.custom_directives.extend(other.custom_directives);
142
143 if other.parallelism_hints.is_some() {
144 self.parallelism_hints = other.parallelism_hints;
145 }
146 if other.timeout_hint.is_some() {
147 self.timeout_hint = other.timeout_hint;
148 }
149 if other.memory_hint.is_some() {
150 self.memory_hint = other.memory_hint;
151 }
152 if other.cache_hints.is_some() {
153 self.cache_hints = other.cache_hints;
154 }
155 if other.join_order_hint.is_some() {
156 self.join_order_hint = other.join_order_hint;
157 }
158 }
159
160 pub fn get_join_hint(&self, vars: &[Variable]) -> Option<&JoinHint> {
162 self.join_hints.iter().find(|hint| {
163 vars.iter()
164 .all(|v| hint.variables.iter().any(|hv| hv == v.name()))
165 })
166 }
167
168 pub fn get_cardinality_hint(&self, var: &Variable) -> Option<u64> {
170 self.cardinality_hints
171 .iter()
172 .find(|hint| hint.variable == var.name())
173 .map(|hint| hint.cardinality)
174 }
175
176 pub fn get_index_hint(&self, pattern_id: &str) -> Option<&IndexHint> {
178 self.index_hints
179 .iter()
180 .find(|hint| hint.pattern_id == pattern_id)
181 }
182}
183
184#[derive(Debug, Default)]
186pub struct QueryHintsBuilder {
187 hints: QueryHints,
188}
189
190impl QueryHintsBuilder {
191 pub fn new() -> Self {
193 Self::default()
194 }
195
196 pub fn with_join_hint(mut self, hint: JoinHint) -> Self {
198 self.hints.join_hints.push(hint);
199 self
200 }
201
202 pub fn hash_join(self, variables: Vec<&str>) -> Self {
204 self.with_join_hint(JoinHint::new(
205 variables.into_iter().map(String::from).collect(),
206 JoinAlgorithmHint::HashJoin,
207 ))
208 }
209
210 pub fn merge_join(self, variables: Vec<&str>) -> Self {
212 self.with_join_hint(JoinHint::new(
213 variables.into_iter().map(String::from).collect(),
214 JoinAlgorithmHint::MergeJoin,
215 ))
216 }
217
218 pub fn nested_loop_join(self, variables: Vec<&str>) -> Self {
220 self.with_join_hint(JoinHint::new(
221 variables.into_iter().map(String::from).collect(),
222 JoinAlgorithmHint::NestedLoop,
223 ))
224 }
225
226 pub fn with_index_hint(mut self, hint: IndexHint) -> Self {
228 self.hints.index_hints.push(hint);
229 self
230 }
231
232 pub fn use_index(self, pattern_id: &str, index_name: &str) -> Self {
234 self.with_index_hint(IndexHint::use_index(pattern_id, index_name))
235 }
236
237 pub fn ignore_index(self, pattern_id: &str, index_name: &str) -> Self {
239 self.with_index_hint(IndexHint::ignore_index(pattern_id, index_name))
240 }
241
242 pub fn with_cardinality_hint(mut self, hint: CardinalityHint) -> Self {
244 self.hints.cardinality_hints.push(hint);
245 self
246 }
247
248 pub fn cardinality(self, variable: &str, cardinality: u64) -> Self {
250 self.with_cardinality_hint(CardinalityHint::new(variable, cardinality))
251 }
252
253 pub fn with_parallelism(mut self, hint: ParallelismHint) -> Self {
255 self.hints.parallelism_hints = Some(hint);
256 self
257 }
258
259 pub fn parallel(self, threads: usize) -> Self {
261 self.with_parallelism(ParallelismHint::new(threads))
262 }
263
264 pub fn no_parallel(self) -> Self {
266 self.with_parallelism(ParallelismHint::disabled())
267 }
268
269 pub fn with_materialization_hint(mut self, hint: MaterializationHint) -> Self {
271 self.hints.materialization_hints.push(hint);
272 self
273 }
274
275 pub fn timeout(mut self, duration: Duration) -> Self {
277 self.hints.timeout_hint = Some(duration);
278 self
279 }
280
281 pub fn timeout_secs(self, secs: u64) -> Self {
283 self.timeout(Duration::from_secs(secs))
284 }
285
286 pub fn with_memory_hint(mut self, hint: MemoryHint) -> Self {
288 self.hints.memory_hint = Some(hint);
289 self
290 }
291
292 pub fn memory_limit(self, bytes: usize) -> Self {
294 self.with_memory_hint(MemoryHint {
295 max_memory: bytes,
296 prefer_streaming: false,
297 spill_to_disk: true,
298 })
299 }
300
301 pub fn prefer_streaming(self) -> Self {
303 self.with_memory_hint(MemoryHint {
304 max_memory: usize::MAX,
305 prefer_streaming: true,
306 spill_to_disk: false,
307 })
308 }
309
310 pub fn with_cache_hint(mut self, hint: CacheHint) -> Self {
312 self.hints.cache_hints = Some(hint);
313 self
314 }
315
316 pub fn no_cache(self) -> Self {
318 self.with_cache_hint(CacheHint {
319 use_cache: false,
320 update_cache: false,
321 cache_ttl: None,
322 })
323 }
324
325 pub fn with_join_order(mut self, hint: JoinOrderHint) -> Self {
327 self.hints.join_order_hint = Some(hint);
328 self
329 }
330
331 pub fn fixed_join_order(self, order: Vec<&str>) -> Self {
333 self.with_join_order(JoinOrderHint {
334 strategy: JoinOrderStrategy::Fixed,
335 order: order.into_iter().map(String::from).collect(),
336 })
337 }
338
339 pub fn with_filter_hint(mut self, hint: FilterHint) -> Self {
341 self.hints.filter_hints.push(hint);
342 self
343 }
344
345 pub fn directive(mut self, key: &str, value: &str) -> Self {
347 self.hints
348 .custom_directives
349 .insert(key.to_string(), value.to_string());
350 self
351 }
352
353 pub fn build(self) -> QueryHints {
355 self.hints
356 }
357}
358
359#[derive(Debug, Clone, Serialize, Deserialize)]
361pub struct JoinHint {
362 pub variables: Vec<String>,
364 pub algorithm: JoinAlgorithmHint,
366 pub build_side: Option<JoinBuildSide>,
368 pub priority: u8,
370}
371
372impl JoinHint {
373 pub fn new(variables: Vec<String>, algorithm: JoinAlgorithmHint) -> Self {
375 Self {
376 variables,
377 algorithm,
378 build_side: None,
379 priority: 1,
380 }
381 }
382
383 pub fn with_build_side(mut self, side: JoinBuildSide) -> Self {
385 self.build_side = Some(side);
386 self
387 }
388
389 pub fn with_priority(mut self, priority: u8) -> Self {
391 self.priority = priority;
392 self
393 }
394}
395
396#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
398pub enum JoinAlgorithmHint {
399 HashJoin,
401 MergeJoin,
403 NestedLoop,
405 IndexJoin,
407 Auto,
409}
410
411impl std::fmt::Display for JoinAlgorithmHint {
412 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
413 match self {
414 JoinAlgorithmHint::HashJoin => write!(f, "HASH_JOIN"),
415 JoinAlgorithmHint::MergeJoin => write!(f, "MERGE_JOIN"),
416 JoinAlgorithmHint::NestedLoop => write!(f, "NESTED_LOOP"),
417 JoinAlgorithmHint::IndexJoin => write!(f, "INDEX_JOIN"),
418 JoinAlgorithmHint::Auto => write!(f, "AUTO"),
419 }
420 }
421}
422
423#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
425pub enum JoinBuildSide {
426 Left,
428 Right,
430 Auto,
432}
433
434#[derive(Debug, Clone, Serialize, Deserialize)]
436pub struct IndexHint {
437 pub pattern_id: String,
439 pub directive: IndexDirective,
441 pub index_names: Vec<String>,
443}
444
445impl IndexHint {
446 pub fn use_index(pattern_id: &str, index_name: &str) -> Self {
448 Self {
449 pattern_id: pattern_id.to_string(),
450 directive: IndexDirective::Use,
451 index_names: vec![index_name.to_string()],
452 }
453 }
454
455 pub fn ignore_index(pattern_id: &str, index_name: &str) -> Self {
457 Self {
458 pattern_id: pattern_id.to_string(),
459 directive: IndexDirective::Ignore,
460 index_names: vec![index_name.to_string()],
461 }
462 }
463
464 pub fn force_index(pattern_id: &str) -> Self {
466 Self {
467 pattern_id: pattern_id.to_string(),
468 directive: IndexDirective::Force,
469 index_names: Vec::new(),
470 }
471 }
472
473 pub fn no_index(pattern_id: &str) -> Self {
475 Self {
476 pattern_id: pattern_id.to_string(),
477 directive: IndexDirective::NoIndex,
478 index_names: Vec::new(),
479 }
480 }
481}
482
483#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
485pub enum IndexDirective {
486 Use,
488 Ignore,
490 Force,
492 NoIndex,
494}
495
496#[derive(Debug, Clone, Serialize, Deserialize)]
498pub struct CardinalityHint {
499 pub variable: String,
501 pub cardinality: u64,
503 pub confidence: f64,
505}
506
507impl CardinalityHint {
508 pub fn new(variable: &str, cardinality: u64) -> Self {
510 Self {
511 variable: variable.to_string(),
512 cardinality,
513 confidence: 1.0,
514 }
515 }
516
517 pub fn with_confidence(mut self, confidence: f64) -> Self {
519 self.confidence = confidence.clamp(0.0, 1.0);
520 self
521 }
522}
523
524#[derive(Debug, Clone, Serialize, Deserialize)]
526pub struct ParallelismHint {
527 pub enabled: bool,
529 pub threads: Option<usize>,
531 pub min_batch_size: usize,
533 pub work_stealing: bool,
535}
536
537impl ParallelismHint {
538 pub fn new(threads: usize) -> Self {
540 Self {
541 enabled: true,
542 threads: Some(threads),
543 min_batch_size: 1000,
544 work_stealing: true,
545 }
546 }
547
548 pub fn disabled() -> Self {
550 Self {
551 enabled: false,
552 threads: None,
553 min_batch_size: 0,
554 work_stealing: false,
555 }
556 }
557
558 pub fn auto() -> Self {
560 Self {
561 enabled: true,
562 threads: None,
563 min_batch_size: 1000,
564 work_stealing: true,
565 }
566 }
567}
568
569#[derive(Debug, Clone, Serialize, Deserialize)]
571pub struct MaterializationHint {
572 pub target: String,
574 pub strategy: MaterializationStrategy,
576}
577
578impl MaterializationHint {
579 pub fn materialize(target: &str) -> Self {
581 Self {
582 target: target.to_string(),
583 strategy: MaterializationStrategy::Eager,
584 }
585 }
586
587 pub fn lazy(target: &str) -> Self {
589 Self {
590 target: target.to_string(),
591 strategy: MaterializationStrategy::Lazy,
592 }
593 }
594
595 pub fn streaming(target: &str) -> Self {
597 Self {
598 target: target.to_string(),
599 strategy: MaterializationStrategy::Streaming,
600 }
601 }
602}
603
604#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
606pub enum MaterializationStrategy {
607 Eager,
609 Lazy,
611 Streaming,
613 Partial,
615}
616
617#[derive(Debug, Clone, Serialize, Deserialize)]
619pub struct MemoryHint {
620 pub max_memory: usize,
622 pub prefer_streaming: bool,
624 pub spill_to_disk: bool,
626}
627
628#[derive(Debug, Clone, Serialize, Deserialize)]
630pub struct CacheHint {
631 pub use_cache: bool,
633 pub update_cache: bool,
635 pub cache_ttl: Option<Duration>,
637}
638
639#[derive(Debug, Clone, Serialize, Deserialize)]
641pub struct JoinOrderHint {
642 pub strategy: JoinOrderStrategy,
644 pub order: Vec<String>,
646}
647
648#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
650pub enum JoinOrderStrategy {
651 Auto,
653 Fixed,
655 LeftToRight,
657 SmallestFirst,
659 MostSelectiveFirst,
661}
662
663#[derive(Debug, Clone, Serialize, Deserialize)]
665pub struct FilterHint {
666 pub filter_id: String,
668 pub directive: FilterPushdownDirective,
670 pub target_pattern: Option<String>,
672}
673
674#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
676pub enum FilterPushdownDirective {
677 Push,
679 NoPush,
681 PushTo,
683}
684
685pub struct HintParser {
687 hints_parsed: Arc<AtomicU64>,
689 parse_timer: Arc<AtomicU64>,
691}
692
693impl Default for HintParser {
694 fn default() -> Self {
695 Self::new()
696 }
697}
698
699impl HintParser {
700 pub fn new() -> Self {
702 Self {
703 hints_parsed: Arc::new(AtomicU64::new(0)),
704 parse_timer: Arc::new(AtomicU64::new(0)),
705 }
706 }
707
708 pub fn parse(query: &str) -> Result<QueryHints> {
710 let parser = Self::new();
711 parser.parse_query(query)
712 }
713
714 pub fn parse_query(&self, query: &str) -> Result<QueryHints> {
716 let start = std::time::Instant::now();
717 let mut hints = QueryHints::new();
718
719 let line_hint_pattern = line_hint_regex();
722 let mut line_hint_positions: std::collections::HashSet<usize> =
723 std::collections::HashSet::new();
724
725 for cap in line_hint_pattern.captures_iter(query) {
726 if let Some(hint_text) = cap.get(1) {
727 let parsed = self.parse_hint_block(hint_text.as_str())?;
728 hints.merge(parsed);
729 if let Some(m) = cap.get(0) {
731 let match_start = m.start();
734 let match_str = m.as_str();
735 if let Some(inner_pos) = match_str.find("/*+") {
736 line_hint_positions.insert(match_start + inner_pos);
737 }
738 }
739 }
740 }
741
742 let hint_pattern = regex();
745
746 for cap in hint_pattern.captures_iter(query) {
747 if let Some(m) = cap.get(0) {
748 if line_hint_positions.contains(&m.start()) {
750 continue;
751 }
752 }
753 if let Some(hint_text) = cap.get(1) {
754 let parsed = self.parse_hint_block(hint_text.as_str())?;
755 hints.merge(parsed);
756 }
757 }
758
759 self.hints_parsed
760 .fetch_add(hints.hint_count() as u64, Ordering::Relaxed);
761 self.parse_timer
762 .fetch_add(start.elapsed().as_nanos() as u64, Ordering::Relaxed);
763
764 Ok(hints)
765 }
766
767 fn parse_hint_block(&self, text: &str) -> Result<QueryHints> {
769 let mut hints = QueryHints::new();
770
771 let tokens = self.tokenize_hints(text);
773 let mut i = 0;
774
775 while i < tokens.len() {
776 let token = &tokens[i];
777 let hint_upper = token.to_uppercase();
778
779 match hint_upper.as_str() {
780 "HASH_JOIN" => {
781 if i + 1 < tokens.len() {
782 let vars = self.parse_variable_list(&tokens[i + 1])?;
783 hints
784 .join_hints
785 .push(JoinHint::new(vars, JoinAlgorithmHint::HashJoin));
786 i += 1;
787 }
788 }
789 "MERGE_JOIN" => {
790 if i + 1 < tokens.len() {
791 let vars = self.parse_variable_list(&tokens[i + 1])?;
792 hints
793 .join_hints
794 .push(JoinHint::new(vars, JoinAlgorithmHint::MergeJoin));
795 i += 1;
796 }
797 }
798 "NESTED_LOOP" | "NL_JOIN" => {
799 if i + 1 < tokens.len() {
800 let vars = self.parse_variable_list(&tokens[i + 1])?;
801 hints
802 .join_hints
803 .push(JoinHint::new(vars, JoinAlgorithmHint::NestedLoop));
804 i += 1;
805 }
806 }
807 "INDEX_JOIN" => {
808 if i + 1 < tokens.len() {
809 let vars = self.parse_variable_list(&tokens[i + 1])?;
810 hints
811 .join_hints
812 .push(JoinHint::new(vars, JoinAlgorithmHint::IndexJoin));
813 i += 1;
814 }
815 }
816 "CARDINALITY" => {
817 if i + 1 < tokens.len() {
818 let (var, card) = self.parse_cardinality_hint(&tokens[i + 1])?;
819 hints
820 .cardinality_hints
821 .push(CardinalityHint::new(&var, card));
822 i += 1;
823 }
824 }
825 "PARALLEL" => {
826 if i + 1 < tokens.len() {
827 let threads = self.parse_single_value(&tokens[i + 1])?;
828 hints.parallelism_hints = Some(ParallelismHint::new(threads as usize));
829 i += 1;
830 } else {
831 hints.parallelism_hints = Some(ParallelismHint::auto());
832 }
833 }
834 "NO_PARALLEL" | "NOPARALLEL" => {
835 hints.parallelism_hints = Some(ParallelismHint::disabled());
836 }
837 "TIMEOUT" => {
838 if i + 1 < tokens.len() {
839 let timeout = self.parse_duration(&tokens[i + 1])?;
840 hints.timeout_hint = Some(timeout);
841 i += 1;
842 }
843 }
844 "MEMORY_LIMIT" | "MAX_MEMORY" => {
845 if i + 1 < tokens.len() {
846 let bytes = self.parse_memory_size(&tokens[i + 1])?;
847 hints.memory_hint = Some(MemoryHint {
848 max_memory: bytes,
849 prefer_streaming: false,
850 spill_to_disk: true,
851 });
852 i += 1;
853 }
854 }
855 "STREAMING" => {
856 hints.memory_hint = Some(MemoryHint {
857 max_memory: usize::MAX,
858 prefer_streaming: true,
859 spill_to_disk: false,
860 });
861 }
862 "NO_CACHE" | "NOCACHE" => {
863 hints.cache_hints = Some(CacheHint {
864 use_cache: false,
865 update_cache: false,
866 cache_ttl: None,
867 });
868 }
869 "CACHE" => {
870 hints.cache_hints = Some(CacheHint {
871 use_cache: true,
872 update_cache: true,
873 cache_ttl: None,
874 });
875 }
876 "USE_INDEX" | "FORCE_INDEX" => {
877 if i + 1 < tokens.len() {
878 let (pattern, index) = self.parse_index_hint(&tokens[i + 1])?;
879 hints
880 .index_hints
881 .push(IndexHint::use_index(&pattern, &index));
882 i += 1;
883 }
884 }
885 "IGNORE_INDEX" | "NO_INDEX" => {
886 if i + 1 < tokens.len() {
887 let (pattern, index) = self.parse_index_hint(&tokens[i + 1])?;
888 hints
889 .index_hints
890 .push(IndexHint::ignore_index(&pattern, &index));
891 i += 1;
892 }
893 }
894 "ORDERED" | "FIXED_ORDER" => {
895 hints.join_order_hint = Some(JoinOrderHint {
896 strategy: JoinOrderStrategy::LeftToRight,
897 order: Vec::new(),
898 });
899 }
900 "LEADING" => {
901 if i + 1 < tokens.len() {
902 let order = self.parse_variable_list(&tokens[i + 1])?;
903 hints.join_order_hint = Some(JoinOrderHint {
904 strategy: JoinOrderStrategy::Fixed,
905 order,
906 });
907 i += 1;
908 }
909 }
910 "MATERIALIZE" => {
911 if i + 1 < tokens.len() {
912 let target = self.parse_single_string(&tokens[i + 1])?;
913 hints
914 .materialization_hints
915 .push(MaterializationHint::materialize(&target));
916 i += 1;
917 }
918 }
919 _ => {
920 if i + 1 < tokens.len() && tokens[i + 1].starts_with('(') {
922 hints.custom_directives.insert(
923 hint_upper,
924 tokens[i + 1]
925 .trim_matches(|c| c == '(' || c == ')')
926 .to_string(),
927 );
928 i += 1;
929 }
930 }
931 }
932 i += 1;
933 }
934
935 Ok(hints)
936 }
937
938 fn tokenize_hints(&self, text: &str) -> Vec<String> {
941 let mut tokens = Vec::new();
942 let mut current = String::new();
943 let mut paren_depth = 0;
944
945 for ch in text.chars() {
946 match ch {
947 '(' if paren_depth == 0 => {
948 if !current.is_empty() {
951 tokens.push(current.clone());
952 current.clear();
953 }
954 paren_depth += 1;
955 current.push(ch);
956 }
957 '(' => {
958 paren_depth += 1;
960 current.push(ch);
961 }
962 ')' => {
963 paren_depth -= 1;
964 current.push(ch);
965 if paren_depth == 0 && !current.is_empty() {
966 tokens.push(current.clone());
968 current.clear();
969 }
970 }
971 ' ' | '\t' | '\n' | '\r' if paren_depth == 0 => {
972 if !current.is_empty() {
973 tokens.push(current.clone());
974 current.clear();
975 }
976 }
977 _ => {
978 current.push(ch);
979 }
980 }
981 }
982
983 if !current.is_empty() {
984 tokens.push(current);
985 }
986
987 tokens
988 }
989
990 fn parse_variable_list(&self, text: &str) -> Result<Vec<String>> {
992 let inner = text.trim_matches(|c| c == '(' || c == ')');
993 let vars: Vec<String> = inner
994 .split(',')
995 .map(|s| s.trim().trim_start_matches('?').to_string())
996 .filter(|s| !s.is_empty())
997 .collect();
998
999 if vars.is_empty() {
1000 return Err(anyhow!("Empty variable list"));
1001 }
1002 Ok(vars)
1003 }
1004
1005 fn parse_cardinality_hint(&self, text: &str) -> Result<(String, u64)> {
1007 let inner = text.trim_matches(|c| c == '(' || c == ')');
1008 let parts: Vec<&str> = inner.split(',').map(|s| s.trim()).collect();
1009
1010 if parts.len() != 2 {
1011 return Err(anyhow!(
1012 "Invalid cardinality hint format: expected (?var, number)"
1013 ));
1014 }
1015
1016 let var = parts[0].trim_start_matches('?').to_string();
1017 let card: u64 = parts[1]
1018 .parse()
1019 .map_err(|_| anyhow!("Invalid cardinality value: {}", parts[1]))?;
1020
1021 Ok((var, card))
1022 }
1023
1024 fn parse_single_value(&self, text: &str) -> Result<u64> {
1026 let inner = text.trim_matches(|c| c == '(' || c == ')');
1027 inner
1028 .parse()
1029 .map_err(|_| anyhow!("Invalid numeric value: {}", inner))
1030 }
1031
1032 fn parse_single_string(&self, text: &str) -> Result<String> {
1034 let inner = text.trim_matches(|c| c == '(' || c == ')');
1035 Ok(inner.trim().to_string())
1036 }
1037
1038 fn parse_duration(&self, text: &str) -> Result<Duration> {
1040 let inner = text.trim_matches(|c| c == '(' || c == ')').to_lowercase();
1041
1042 if let Some(secs) = inner.strip_suffix('s') {
1043 let val: u64 = secs
1044 .parse()
1045 .map_err(|_| anyhow!("Invalid duration: {}", text))?;
1046 return Ok(Duration::from_secs(val));
1047 }
1048 if let Some(mins) = inner.strip_suffix('m') {
1049 let val: u64 = mins
1050 .parse()
1051 .map_err(|_| anyhow!("Invalid duration: {}", text))?;
1052 return Ok(Duration::from_secs(val * 60));
1053 }
1054 if let Some(hours) = inner.strip_suffix('h') {
1055 let val: u64 = hours
1056 .parse()
1057 .map_err(|_| anyhow!("Invalid duration: {}", text))?;
1058 return Ok(Duration::from_secs(val * 3600));
1059 }
1060
1061 let val: u64 = inner
1063 .parse()
1064 .map_err(|_| anyhow!("Invalid duration: {}", text))?;
1065 Ok(Duration::from_millis(val))
1066 }
1067
1068 fn parse_memory_size(&self, text: &str) -> Result<usize> {
1070 let inner = text.trim_matches(|c| c == '(' || c == ')').to_uppercase();
1071
1072 if let Some(gb) = inner.strip_suffix("GB") {
1073 let val: usize = gb
1074 .parse()
1075 .map_err(|_| anyhow!("Invalid memory size: {}", text))?;
1076 return Ok(val * 1024 * 1024 * 1024);
1077 }
1078 if let Some(mb) = inner.strip_suffix("MB") {
1079 let val: usize = mb
1080 .parse()
1081 .map_err(|_| anyhow!("Invalid memory size: {}", text))?;
1082 return Ok(val * 1024 * 1024);
1083 }
1084 if let Some(kb) = inner.strip_suffix("KB") {
1085 let val: usize = kb
1086 .parse()
1087 .map_err(|_| anyhow!("Invalid memory size: {}", text))?;
1088 return Ok(val * 1024);
1089 }
1090
1091 inner
1093 .parse()
1094 .map_err(|_| anyhow!("Invalid memory size: {}", text))
1095 }
1096
1097 fn parse_index_hint(&self, text: &str) -> Result<(String, String)> {
1099 let inner = text.trim_matches(|c| c == '(' || c == ')');
1100 let parts: Vec<&str> = inner.split(',').map(|s| s.trim()).collect();
1101
1102 if parts.len() != 2 {
1103 return Err(anyhow!(
1104 "Invalid index hint format: expected (pattern, index_name)"
1105 ));
1106 }
1107
1108 Ok((parts[0].to_string(), parts[1].to_string()))
1109 }
1110
1111 pub fn statistics(&self) -> HintParserStats {
1113 HintParserStats {
1114 hints_parsed: self.hints_parsed.load(Ordering::Relaxed),
1115 total_parse_time_ns: self.parse_timer.load(Ordering::Relaxed),
1116 }
1117 }
1118}
1119
1120#[derive(Debug, Clone)]
1122pub struct HintParserStats {
1123 pub hints_parsed: u64,
1125 pub total_parse_time_ns: u64,
1127}
1128
1129fn regex() -> &'static Regex {
1131 static HINT_REGEX: OnceLock<Regex> = OnceLock::new();
1132 HINT_REGEX.get_or_init(|| {
1133 Regex::new(r"/\*\+\s*(.+?)\s*\*/").expect("Invalid regex")
1136 })
1137}
1138
1139fn line_hint_regex() -> &'static Regex {
1141 static LINE_HINT_REGEX: OnceLock<Regex> = OnceLock::new();
1142 LINE_HINT_REGEX.get_or_init(|| {
1143 Regex::new(r"#\s*/\*\+\s*(.+?)\s*\*/").expect("Invalid regex")
1145 })
1146}
1147
1148#[derive(Debug, Clone)]
1150pub struct HintApplicationResult {
1151 pub applied: Vec<String>,
1153 pub ignored: Vec<String>,
1155 pub conflicts: Vec<String>,
1157}
1158
1159impl HintApplicationResult {
1160 pub fn new() -> Self {
1162 Self {
1163 applied: Vec::new(),
1164 ignored: Vec::new(),
1165 conflicts: Vec::new(),
1166 }
1167 }
1168
1169 pub fn applied(&mut self, hint: &str) {
1171 self.applied.push(hint.to_string());
1172 }
1173
1174 pub fn ignored(&mut self, hint: &str) {
1176 self.ignored.push(hint.to_string());
1177 }
1178
1179 pub fn conflict(&mut self, hint: &str) {
1181 self.conflicts.push(hint.to_string());
1182 }
1183}
1184
1185impl Default for HintApplicationResult {
1186 fn default() -> Self {
1187 Self::new()
1188 }
1189}
1190
1191pub struct HintValidator;
1193
1194impl HintValidator {
1195 pub fn validate(hints: &QueryHints) -> Vec<HintValidationWarning> {
1197 let mut warnings = Vec::new();
1198
1199 let mut seen_vars: HashMap<String, Vec<JoinAlgorithmHint>> = HashMap::new();
1201 for hint in &hints.join_hints {
1202 for var in &hint.variables {
1203 seen_vars
1204 .entry(var.clone())
1205 .or_default()
1206 .push(hint.algorithm);
1207 }
1208 }
1209 for (var, algorithms) in seen_vars {
1210 if algorithms.len() > 1 {
1211 let unique: std::collections::HashSet<_> = algorithms.iter().collect();
1212 if unique.len() > 1 {
1213 warnings.push(HintValidationWarning {
1214 severity: WarningSeverity::Warning,
1215 message: format!(
1216 "Conflicting join hints for variable '{}': {:?}",
1217 var, algorithms
1218 ),
1219 });
1220 }
1221 }
1222 }
1223
1224 if let Some(ref mem) = hints.memory_hint {
1226 if mem.prefer_streaming {
1227 if let Some(ref par) = hints.parallelism_hints {
1228 if par.enabled && par.threads.is_some_and(|t| t > 1) {
1229 warnings.push(HintValidationWarning {
1230 severity: WarningSeverity::Info,
1231 message:
1232 "Streaming mode with parallelism may reduce streaming benefits"
1233 .to_string(),
1234 });
1235 }
1236 }
1237 }
1238 }
1239
1240 for hint in &hints.cardinality_hints {
1242 if hint.cardinality > 1_000_000_000 {
1243 warnings.push(HintValidationWarning {
1244 severity: WarningSeverity::Warning,
1245 message: format!(
1246 "Very high cardinality hint for '{}': {} (may affect optimization)",
1247 hint.variable, hint.cardinality
1248 ),
1249 });
1250 }
1251 }
1252
1253 if let Some(timeout) = hints.timeout_hint {
1255 if timeout < Duration::from_millis(100) {
1256 warnings.push(HintValidationWarning {
1257 severity: WarningSeverity::Warning,
1258 message: format!(
1259 "Very short timeout: {:?} (query may timeout immediately)",
1260 timeout
1261 ),
1262 });
1263 }
1264 if timeout > Duration::from_secs(3600) {
1265 warnings.push(HintValidationWarning {
1266 severity: WarningSeverity::Info,
1267 message: format!(
1268 "Very long timeout: {:?} (consider using async execution)",
1269 timeout
1270 ),
1271 });
1272 }
1273 }
1274
1275 warnings
1276 }
1277}
1278
1279#[derive(Debug, Clone)]
1281pub struct HintValidationWarning {
1282 pub severity: WarningSeverity,
1284 pub message: String,
1286}
1287
1288#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1290pub enum WarningSeverity {
1291 Info,
1293 Warning,
1295 Error,
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301 use super::*;
1302
1303 #[test]
1304 fn test_regex_matching() {
1305 let re = regex();
1306 let query = "/*+ HASH_JOIN(?s, ?o) */ SELECT ?s ?o WHERE { ?s ?p ?o }";
1307
1308 let caps: Vec<_> = re.captures_iter(query).collect();
1309 println!("Number of captures: {}", caps.len());
1310 for cap in &caps {
1311 println!("Full match: {:?}", cap.get(0).map(|m| m.as_str()));
1312 println!("Group 1: {:?}", cap.get(1).map(|m| m.as_str()));
1313 }
1314
1315 assert!(!caps.is_empty(), "Regex should match the hint comment");
1316 }
1317
1318 #[test]
1319 fn test_parse_hash_join_hint() {
1320 let query = "/*+ HASH_JOIN(?s, ?o) */ SELECT ?s ?o WHERE { ?s ?p ?o }";
1321 let hints = HintParser::parse(query).unwrap();
1322
1323 assert_eq!(hints.join_hints.len(), 1);
1324 assert_eq!(hints.join_hints[0].algorithm, JoinAlgorithmHint::HashJoin);
1325 assert_eq!(hints.join_hints[0].variables, vec!["s", "o"]);
1326 }
1327
1328 #[test]
1329 fn test_parse_cardinality_hint() {
1330 let query = "/*+ CARDINALITY(?person, 1000) */ SELECT ?person WHERE { ?person a <Person> }";
1331 let hints = HintParser::parse(query).unwrap();
1332
1333 assert_eq!(hints.cardinality_hints.len(), 1);
1334 assert_eq!(hints.cardinality_hints[0].variable, "person");
1335 assert_eq!(hints.cardinality_hints[0].cardinality, 1000);
1336 }
1337
1338 #[test]
1339 fn test_parse_parallel_hint() {
1340 let query = "/*+ PARALLEL(4) */ SELECT * WHERE { ?s ?p ?o }";
1341 let hints = HintParser::parse(query).unwrap();
1342
1343 assert!(hints.parallelism_hints.is_some());
1344 let par = hints.parallelism_hints.unwrap();
1345 assert!(par.enabled);
1346 assert_eq!(par.threads, Some(4));
1347 }
1348
1349 #[test]
1350 fn test_parse_no_parallel_hint() {
1351 let query = "/*+ NO_PARALLEL */ SELECT * WHERE { ?s ?p ?o }";
1352 let hints = HintParser::parse(query).unwrap();
1353
1354 assert!(hints.parallelism_hints.is_some());
1355 assert!(!hints.parallelism_hints.unwrap().enabled);
1356 }
1357
1358 #[test]
1359 fn test_parse_timeout_hint() {
1360 let query = "/*+ TIMEOUT(30s) */ SELECT * WHERE { ?s ?p ?o }";
1361 let hints = HintParser::parse(query).unwrap();
1362
1363 assert_eq!(hints.timeout_hint, Some(Duration::from_secs(30)));
1364 }
1365
1366 #[test]
1367 fn test_parse_memory_limit_hint() {
1368 let query = "/*+ MEMORY_LIMIT(1GB) */ SELECT * WHERE { ?s ?p ?o }";
1369 let hints = HintParser::parse(query).unwrap();
1370
1371 assert!(hints.memory_hint.is_some());
1372 assert_eq!(hints.memory_hint.unwrap().max_memory, 1024 * 1024 * 1024);
1373 }
1374
1375 #[test]
1376 fn test_parse_no_cache_hint() {
1377 let query = "/*+ NO_CACHE */ SELECT * WHERE { ?s ?p ?o }";
1378 let hints = HintParser::parse(query).unwrap();
1379
1380 assert!(hints.cache_hints.is_some());
1381 assert!(!hints.cache_hints.unwrap().use_cache);
1382 }
1383
1384 #[test]
1385 fn test_parse_multiple_hints() {
1386 let query =
1387 "/*+ HASH_JOIN(?s, ?o) PARALLEL(8) TIMEOUT(60s) */ SELECT ?s ?o WHERE { ?s ?p ?o }";
1388 let hints = HintParser::parse(query).unwrap();
1389
1390 assert_eq!(hints.join_hints.len(), 1);
1391 assert!(hints.parallelism_hints.is_some());
1392 assert_eq!(hints.timeout_hint, Some(Duration::from_secs(60)));
1393 }
1394
1395 #[test]
1396 fn test_parse_line_comment_hint() {
1397 let query = r#"
1398 # /*+ CARDINALITY(?x, 5000) PARALLEL(2) */
1399 SELECT ?x WHERE { ?x a <Thing> }
1400 "#;
1401 let hints = HintParser::parse(query).unwrap();
1402
1403 assert_eq!(hints.cardinality_hints.len(), 1);
1404 assert!(hints.parallelism_hints.is_some());
1405 }
1406
1407 #[test]
1408 fn test_hints_builder() {
1409 let hints = QueryHints::builder()
1410 .hash_join(vec!["s", "o"])
1411 .cardinality("person", 1000)
1412 .parallel(4)
1413 .timeout_secs(30)
1414 .no_cache()
1415 .build();
1416
1417 assert_eq!(hints.join_hints.len(), 1);
1418 assert_eq!(hints.cardinality_hints.len(), 1);
1419 assert!(hints.parallelism_hints.is_some());
1420 assert_eq!(hints.timeout_hint, Some(Duration::from_secs(30)));
1421 assert!(hints.cache_hints.is_some());
1422 }
1423
1424 #[test]
1425 fn test_hint_validator() {
1426 let hints = QueryHints::builder()
1427 .hash_join(vec!["x", "y"])
1428 .merge_join(vec!["x", "z"]) .timeout(Duration::from_millis(50)) .build();
1431
1432 let warnings = HintValidator::validate(&hints);
1433
1434 assert!(warnings.iter().any(|w| w.message.contains("Conflicting")));
1435 assert!(warnings.iter().any(|w| w.message.contains("short timeout")));
1436 }
1437
1438 #[test]
1439 fn test_hint_merge() {
1440 let mut hints1 = QueryHints::builder().hash_join(vec!["a", "b"]).build();
1441
1442 let hints2 = QueryHints::builder()
1443 .cardinality("c", 500)
1444 .parallel(4)
1445 .build();
1446
1447 hints1.merge(hints2);
1448
1449 assert_eq!(hints1.join_hints.len(), 1);
1450 assert_eq!(hints1.cardinality_hints.len(), 1);
1451 assert!(hints1.parallelism_hints.is_some());
1452 }
1453
1454 #[test]
1455 fn test_empty_hints() {
1456 let hints = QueryHints::new();
1457 assert!(hints.is_empty());
1458 assert_eq!(hints.hint_count(), 0);
1459 }
1460
1461 #[test]
1462 fn test_get_join_hint() {
1463 let hints = QueryHints::builder().hash_join(vec!["s", "o"]).build();
1464
1465 let var_s = Variable::new("s").unwrap();
1466 let var_o = Variable::new("o").unwrap();
1467
1468 let hint = hints.get_join_hint(&[var_s, var_o]);
1469 assert!(hint.is_some());
1470 assert_eq!(hint.unwrap().algorithm, JoinAlgorithmHint::HashJoin);
1471 }
1472
1473 #[test]
1474 fn test_get_cardinality_hint() {
1475 let hints = QueryHints::builder().cardinality("person", 1000).build();
1476
1477 let var = Variable::new("person").unwrap();
1478 let card = hints.get_cardinality_hint(&var);
1479
1480 assert_eq!(card, Some(1000));
1481 }
1482
1483 #[test]
1484 fn test_parse_index_hint() {
1485 let query = "/*+ USE_INDEX(pattern1, idx_subject) */ SELECT * WHERE { ?s ?p ?o }";
1486 let hints = HintParser::parse(query).unwrap();
1487
1488 assert_eq!(hints.index_hints.len(), 1);
1489 assert_eq!(hints.index_hints[0].pattern_id, "pattern1");
1490 assert_eq!(hints.index_hints[0].directive, IndexDirective::Use);
1491 }
1492
1493 #[test]
1494 fn test_parse_leading_hint() {
1495 let query = "/*+ LEADING(?a, ?b, ?c) */ SELECT * WHERE { ?a ?p ?b . ?b ?q ?c }";
1496 let hints = HintParser::parse(query).unwrap();
1497
1498 assert!(hints.join_order_hint.is_some());
1499 let order = hints.join_order_hint.unwrap();
1500 assert_eq!(order.strategy, JoinOrderStrategy::Fixed);
1501 assert_eq!(order.order, vec!["a", "b", "c"]);
1502 }
1503}