Skip to main content

oxirs_core/sparql/
executor.rs

1//! SPARQL query execution engine
2//!
3//! This module provides the core query execution logic for SPARQL queries.
4//! It delegates to parser, pattern, filter, expression, aggregate, and modifier modules.
5//!
6//! ## Performance Monitoring
7//!
8//! The executor integrates SciRS2-core metrics for comprehensive query performance tracking:
9//! - Query execution time (per query type: SELECT, ASK, CONSTRUCT, DESCRIBE)
10//! - Pattern matching operations
11//! - Result set sizes
12//! - Query complexity metrics
13
14use super::*;
15use crate::model::*;
16use crate::rdf_store::{OxirsQueryResults, StorageBackend, VariableBinding};
17use crate::{OxirsError, Result};
18use scirs2_core::metrics::{Counter, Histogram, Timer};
19use std::collections::HashMap;
20use std::sync::Arc;
21
22/// Executor performance statistics
23#[derive(Debug, Clone)]
24pub struct ExecutorStats {
25    /// Total number of queries executed
26    pub total_queries: u64,
27    /// Number of SELECT queries
28    pub select_queries: u64,
29    /// Number of ASK queries
30    pub ask_queries: u64,
31    /// Number of CONSTRUCT queries
32    pub construct_queries: u64,
33    /// Number of DESCRIBE queries
34    pub describe_queries: u64,
35    /// Total pattern matching operations
36    pub pattern_matches: u64,
37    /// Average execution time in seconds
38    pub avg_execution_time_secs: f64,
39    /// Total number of observations
40    pub total_observations: u64,
41}
42
43/// VALUES clause for inline data (SPARQL-specific)
44#[derive(Debug, Clone)]
45struct ValuesClause {
46    variables: Vec<String>,
47    rows: Vec<Vec<String>>, // Each row contains values for the variables
48}
49
50/// SPARQL query executor with integrated performance monitoring
51///
52/// This executor tracks query performance metrics using SciRS2-core:
53/// - Query execution time broken down by query type
54/// - Pattern matching statistics
55/// - Result set size distribution
56/// - Query complexity indicators
57pub struct QueryExecutor<'a> {
58    backend: &'a StorageBackend,
59    /// Query execution timer
60    query_timer: Arc<Timer>,
61    /// SELECT query counter
62    select_counter: Arc<Counter>,
63    /// ASK query counter
64    ask_counter: Arc<Counter>,
65    /// CONSTRUCT query counter
66    construct_counter: Arc<Counter>,
67    /// DESCRIBE query counter
68    describe_counter: Arc<Counter>,
69    /// Pattern matching counter
70    pattern_counter: Arc<Counter>,
71    /// Result set size histogram
72    result_size_histogram: Arc<Histogram>,
73}
74
75impl<'a> QueryExecutor<'a> {
76    /// Create a new query executor for the given storage backend with performance monitoring
77    ///
78    /// The executor automatically tracks:
79    /// - Query execution times
80    /// - Query type distribution
81    /// - Pattern matching operations
82    /// - Result set sizes
83    pub fn new(backend: &'a StorageBackend) -> Self {
84        Self {
85            backend,
86            query_timer: Arc::new(Timer::new("query_execution_time".to_string())),
87            select_counter: Arc::new(Counter::new("select_queries".to_string())),
88            ask_counter: Arc::new(Counter::new("ask_queries".to_string())),
89            construct_counter: Arc::new(Counter::new("construct_queries".to_string())),
90            describe_counter: Arc::new(Counter::new("describe_queries".to_string())),
91            pattern_counter: Arc::new(Counter::new("pattern_matches".to_string())),
92            result_size_histogram: Arc::new(Histogram::new("result_set_size".to_string())),
93        }
94    }
95
96    /// Get query execution statistics
97    ///
98    /// Returns the current metrics including:
99    /// - Total queries executed by type
100    /// - Average execution time
101    /// - Pattern matching statistics
102    /// - Result set size distribution
103    pub fn get_stats(&self) -> ExecutorStats {
104        let select = self.select_counter.get();
105        let ask = self.ask_counter.get();
106        let construct = self.construct_counter.get();
107        let describe = self.describe_counter.get();
108
109        let timer_stats = self.query_timer.get_stats();
110
111        ExecutorStats {
112            total_queries: select + ask + construct + describe,
113            select_queries: select,
114            ask_queries: ask,
115            construct_queries: construct,
116            describe_queries: describe,
117            pattern_matches: self.pattern_counter.get(),
118            avg_execution_time_secs: timer_stats.mean,
119            total_observations: timer_stats.count,
120        }
121    }
122
123    /// Execute a SPARQL query
124    pub fn execute(&self, sparql: &str) -> Result<OxirsQueryResults> {
125        self.query(sparql)
126    }
127
128    pub fn query(&self, sparql: &str) -> Result<OxirsQueryResults> {
129        // Start timing the query execution
130        let start = std::time::Instant::now();
131
132        // Basic SPARQL query processor for common patterns
133        let sparql = sparql.trim();
134
135        // Extract PREFIX declarations and expand prefixed names
136        let (prefixes, expanded_query) = self.extract_and_expand_prefixes(sparql)?;
137
138        let query_to_execute = if !prefixes.is_empty() {
139            &expanded_query
140        } else {
141            sparql
142        };
143
144        // Execute query and track metrics
145        let result = if query_to_execute.to_uppercase().contains("SELECT") {
146            self.select_counter.inc();
147            self.execute_select_query(query_to_execute)
148        } else if query_to_execute.to_uppercase().starts_with("ASK") {
149            self.ask_counter.inc();
150            self.execute_ask_query(query_to_execute)
151        } else if query_to_execute.to_uppercase().starts_with("CONSTRUCT") {
152            self.construct_counter.inc();
153            self.execute_construct_query(query_to_execute)
154        } else if query_to_execute.to_uppercase().starts_with("DESCRIBE") {
155            self.describe_counter.inc();
156            self.execute_describe_query(query_to_execute)
157        } else {
158            return Err(OxirsError::Query(format!(
159                "Unsupported SPARQL query type: {sparql}"
160            )));
161        };
162
163        // Record execution time
164        let duration = start.elapsed();
165        self.query_timer.observe(duration);
166
167        // Track result set size if available
168        if let Ok(ref query_result) = result {
169            self.result_size_histogram
170                .observe(query_result.len() as f64);
171        }
172
173        result
174    }
175
176    /// Execute a SELECT query
177    fn execute_select_query(&self, sparql: &str) -> Result<OxirsQueryResults> {
178        // Basic pattern matching for simple SELECT queries
179        // Pattern: SELECT ?var WHERE { ?s ?p ?o } LIMIT n OFFSET m
180
181        if sparql.contains("WHERE") {
182            // Check for DISTINCT modifier
183            let has_distinct = sparql.to_uppercase().contains("SELECT DISTINCT");
184
185            let mut variables = self.extract_select_variables(sparql)?;
186
187            // Check for UNION - handle separately
188            if let Some(union_groups) = self.extract_union_groups(sparql)? {
189                return self.execute_union_query(sparql, variables, has_distinct, union_groups);
190            }
191
192            let pattern_groups = self.extract_pattern_groups(sparql)?;
193
194            // Track pattern matching operations
195            let total_patterns: usize = pattern_groups.iter().map(|g| g.patterns.len()).sum();
196            for _ in 0..total_patterns {
197                self.pattern_counter.inc();
198            }
199
200            // Extract all patterns for variable detection
201            let all_patterns: Vec<&SimpleTriplePattern> =
202                pattern_groups.iter().flat_map(|g| &g.patterns).collect();
203
204            // Handle SELECT * - extract variables from pattern
205            if variables.len() == 1 && variables[0] == "*" {
206                variables.clear();
207                for pattern in &all_patterns {
208                    if let Some(var) = &pattern.subject {
209                        if let Some(var_name) = var.strip_prefix('?') {
210                            if !variables.contains(&var_name.to_string()) {
211                                variables.push(var_name.to_string());
212                            }
213                        }
214                    }
215                    if let Some(var) = &pattern.predicate {
216                        if let Some(var_name) = var.strip_prefix('?') {
217                            if !variables.contains(&var_name.to_string()) {
218                                variables.push(var_name.to_string());
219                            }
220                        }
221                    }
222                    if let Some(var) = &pattern.object {
223                        if let Some(var_name) = var.strip_prefix('?') {
224                            if !variables.contains(&var_name.to_string()) {
225                                variables.push(var_name.to_string());
226                            }
227                        }
228                    }
229                }
230            }
231
232            // Separate required and optional patterns
233            let required_groups: Vec<&PatternGroup> =
234                pattern_groups.iter().filter(|g| !g.optional).collect();
235            let optional_groups: Vec<&PatternGroup> =
236                pattern_groups.iter().filter(|g| g.optional).collect();
237
238            let mut results = Vec::new();
239
240            // Check for VALUES clause
241            let values_clause = self.extract_values_clause(sparql)?;
242            if let Some(values) = &values_clause {
243                // Apply VALUES to create initial bindings
244                results = self.apply_values_clause(values)?;
245            }
246
247            // Execute required patterns and join with VALUES if present
248            let pattern_results = if !required_groups.is_empty() {
249                let mut pattern_bindings = Vec::new();
250
251                for group in required_groups {
252                    for pattern in &group.patterns {
253                        let matching_quads = self.query_quads_by_pattern(pattern)?;
254
255                        for quad in matching_quads {
256                            let mut binding = VariableBinding::new();
257
258                            // Bind variables based on the pattern
259                            if let Some(var) = &pattern.subject {
260                                if let Some(var_name) = var.strip_prefix('?') {
261                                    binding.bind(
262                                        var_name.to_string(),
263                                        Term::from(quad.subject().clone()),
264                                    );
265                                }
266                            }
267
268                            if let Some(var) = &pattern.predicate {
269                                if let Some(var_name) = var.strip_prefix('?') {
270                                    binding.bind(
271                                        var_name.to_string(),
272                                        Term::from(quad.predicate().clone()),
273                                    );
274                                }
275                            }
276
277                            if let Some(var) = &pattern.object {
278                                if let Some(var_name) = var.strip_prefix('?') {
279                                    binding.bind(
280                                        var_name.to_string(),
281                                        Term::from(quad.object().clone()),
282                                    );
283                                }
284                            }
285
286                            pattern_bindings.push(binding);
287                        }
288                    }
289                }
290                pattern_bindings
291            } else {
292                Vec::new()
293            };
294
295            // Join VALUES results with pattern results if both present
296            if !results.is_empty() && !pattern_results.is_empty() {
297                // Join: keep only bindings that are compatible
298                let mut joined = Vec::new();
299                for values_binding in &results {
300                    for pattern_binding in &pattern_results {
301                        // Check compatibility: shared variables must have same values
302                        let mut compatible = true;
303                        for (var, val) in values_binding.bindings.iter() {
304                            if let Some(pattern_val) = pattern_binding.get(var) {
305                                if val != pattern_val {
306                                    compatible = false;
307                                    break;
308                                }
309                            }
310                        }
311
312                        if compatible {
313                            // Merge bindings
314                            let mut merged = values_binding.clone();
315                            for (var, val) in pattern_binding.bindings.iter() {
316                                if !merged.bindings.contains_key(var) {
317                                    merged.bind(var.clone(), val.clone());
318                                }
319                            }
320                            joined.push(merged);
321                        }
322                    }
323                }
324                results = joined;
325            } else if !pattern_results.is_empty() {
326                // Only patterns, no VALUES
327                results = pattern_results;
328            }
329            // else: Only VALUES, results already set
330
331            // If no required patterns and no VALUES, start with empty binding for OPTIONAL
332            if results.is_empty() && !optional_groups.is_empty() {
333                results.push(VariableBinding::new());
334            }
335
336            // Apply optional patterns to each existing result
337            for optional_group in optional_groups {
338                results = self.apply_optional_patterns(results, &optional_group.patterns)?;
339            }
340
341            // Apply BIND expressions
342            let bind_expressions = self.extract_bind_expressions(sparql)?;
343            if !bind_expressions.is_empty() {
344                results = self.apply_bind_expressions(results, &bind_expressions)?;
345                // Add BIND variables to output variables
346                for bind_expr in &bind_expressions {
347                    if !variables.contains(&bind_expr.variable) {
348                        variables.push(bind_expr.variable.clone());
349                    }
350                }
351            }
352
353            // Apply FILTER if present
354            let filter_expressions = self.extract_filter_expressions(sparql)?;
355            if !filter_expressions.is_empty() {
356                results.retain(|binding| self.evaluate_filters(binding, &filter_expressions));
357            }
358
359            // Check for aggregate expressions
360            let aggregates = self.extract_aggregates(sparql)?;
361            if !aggregates.is_empty() {
362                // Apply aggregates and replace variables with aggregate results
363                let (agg_results, agg_vars) = self.apply_aggregates(results, &aggregates)?;
364                return Ok(OxirsQueryResults::from_bindings(agg_results, agg_vars));
365            }
366
367            // Apply DISTINCT if present
368            if has_distinct {
369                results = self.remove_duplicate_bindings(results, &variables);
370            }
371
372            // Apply ORDER BY if present
373            if let Some(order_by) = self.extract_order_by(sparql)? {
374                self.sort_results(&mut results, &order_by);
375            }
376
377            // Apply OFFSET and LIMIT
378            let offset = self.extract_offset(sparql)?;
379            let limit = self.extract_limit(sparql)?;
380
381            // Apply offset
382            if offset > 0 {
383                results = results.into_iter().skip(offset).collect();
384            }
385
386            // Apply limit
387            if let Some(limit_value) = limit {
388                results.truncate(limit_value);
389            }
390
391            Ok(OxirsQueryResults::from_bindings(results, variables))
392        } else {
393            Ok(OxirsQueryResults::new())
394        }
395    }
396
397    /// Execute a SELECT query with UNION
398    fn execute_union_query(
399        &self,
400        sparql: &str,
401        mut variables: Vec<String>,
402        has_distinct: bool,
403        union_groups: UnionGroup,
404    ) -> Result<OxirsQueryResults> {
405        let mut all_results = Vec::new();
406
407        // Execute each UNION branch independently
408        for branch_groups in union_groups.branches {
409            // Execute this branch
410            let mut branch_results = Vec::new();
411
412            // Extract all patterns from this branch for variable detection
413            let all_patterns: Vec<&SimpleTriplePattern> =
414                branch_groups.iter().flat_map(|g| &g.patterns).collect();
415
416            // Handle SELECT * - extract variables from all patterns
417            if variables.len() == 1 && variables[0] == "*" {
418                variables.clear();
419                for pattern in &all_patterns {
420                    if let Some(var) = &pattern.subject {
421                        if let Some(var_name) = var.strip_prefix('?') {
422                            if !variables.contains(&var_name.to_string()) {
423                                variables.push(var_name.to_string());
424                            }
425                        }
426                    }
427                    if let Some(var) = &pattern.predicate {
428                        if let Some(var_name) = var.strip_prefix('?') {
429                            if !variables.contains(&var_name.to_string()) {
430                                variables.push(var_name.to_string());
431                            }
432                        }
433                    }
434                    if let Some(var) = &pattern.object {
435                        if let Some(var_name) = var.strip_prefix('?') {
436                            if !variables.contains(&var_name.to_string()) {
437                                variables.push(var_name.to_string());
438                            }
439                        }
440                    }
441                }
442            }
443
444            // Separate required and optional patterns for this branch
445            let required_groups: Vec<&PatternGroup> =
446                branch_groups.iter().filter(|g| !g.optional).collect();
447            let optional_groups: Vec<&PatternGroup> =
448                branch_groups.iter().filter(|g| g.optional).collect();
449
450            // Execute required patterns first
451            for group in required_groups {
452                for pattern in &group.patterns {
453                    let matching_quads = self.query_quads_by_pattern(pattern)?;
454
455                    for quad in matching_quads {
456                        let mut binding = VariableBinding::new();
457
458                        // Bind variables based on the pattern
459                        if let Some(var) = &pattern.subject {
460                            if let Some(var_name) = var.strip_prefix('?') {
461                                binding
462                                    .bind(var_name.to_string(), Term::from(quad.subject().clone()));
463                            }
464                        }
465
466                        if let Some(var) = &pattern.predicate {
467                            if let Some(var_name) = var.strip_prefix('?') {
468                                binding.bind(
469                                    var_name.to_string(),
470                                    Term::from(quad.predicate().clone()),
471                                );
472                            }
473                        }
474
475                        if let Some(var) = &pattern.object {
476                            if let Some(var_name) = var.strip_prefix('?') {
477                                binding
478                                    .bind(var_name.to_string(), Term::from(quad.object().clone()));
479                            }
480                        }
481
482                        branch_results.push(binding);
483                    }
484                }
485            }
486
487            // If no required patterns, start with empty binding
488            if branch_results.is_empty() && !optional_groups.is_empty() {
489                branch_results.push(VariableBinding::new());
490            }
491
492            // Apply optional patterns to each existing result
493            for optional_group in optional_groups {
494                branch_results =
495                    self.apply_optional_patterns(branch_results, &optional_group.patterns)?;
496            }
497
498            // Merge branch results into all results
499            all_results.extend(branch_results);
500        }
501
502        // Apply BIND expressions
503        let bind_expressions = self.extract_bind_expressions(sparql)?;
504        if !bind_expressions.is_empty() {
505            all_results = self.apply_bind_expressions(all_results, &bind_expressions)?;
506        }
507
508        // Apply FILTER if present
509        let filter_expressions = self.extract_filter_expressions(sparql)?;
510        if !filter_expressions.is_empty() {
511            all_results.retain(|binding| self.evaluate_filters(binding, &filter_expressions));
512        }
513
514        // Apply DISTINCT if present
515        if has_distinct {
516            all_results = self.remove_duplicate_bindings(all_results, &variables);
517        }
518
519        // Apply ORDER BY if present
520        if let Some(order_by) = self.extract_order_by(sparql)? {
521            self.sort_results(&mut all_results, &order_by);
522        }
523
524        // Apply OFFSET and LIMIT
525        let offset = self.extract_offset(sparql)?;
526        let limit = self.extract_limit(sparql)?;
527
528        if offset > 0 {
529            all_results = all_results.into_iter().skip(offset).collect();
530        }
531
532        if let Some(limit_value) = limit {
533            all_results.truncate(limit_value);
534        }
535
536        Ok(OxirsQueryResults::from_bindings(all_results, variables))
537    }
538
539    /// Execute an ASK query
540    fn execute_ask_query(&self, sparql: &str) -> Result<OxirsQueryResults> {
541        // Basic ASK query: check if any triples match the pattern
542        let triple_patterns = self.extract_triple_patterns(sparql)?;
543
544        for pattern in triple_patterns {
545            let matching_quads = self.query_quads_by_pattern(&pattern)?;
546            if !matching_quads.is_empty() {
547                return Ok(OxirsQueryResults::from_boolean(true));
548            }
549        }
550
551        Ok(OxirsQueryResults::from_boolean(false))
552    }
553
554    /// Execute a CONSTRUCT query
555    fn execute_construct_query(&self, sparql: &str) -> Result<OxirsQueryResults> {
556        // Basic CONSTRUCT query: return matching triples
557        let triple_patterns = self.extract_triple_patterns(sparql)?;
558        let mut result_quads = Vec::new();
559
560        for pattern in triple_patterns {
561            let matching_quads = self.query_quads_by_pattern(&pattern)?;
562            result_quads.extend(matching_quads);
563        }
564
565        Ok(OxirsQueryResults::from_graph(result_quads))
566    }
567
568    /// Execute a DESCRIBE query
569    fn execute_describe_query(&self, sparql: &str) -> Result<OxirsQueryResults> {
570        // Basic DESCRIBE query: return all triples about the specified resource
571        let resources = self.extract_describe_resources(sparql)?;
572        let mut result_quads = Vec::new();
573
574        for resource in resources {
575            // Find all triples where the resource is subject or object
576            let subject_pattern = SimpleTriplePattern {
577                subject: Some(resource.clone()),
578                predicate: None,
579                object: None,
580            };
581            let object_pattern = SimpleTriplePattern {
582                subject: None,
583                predicate: None,
584                object: Some(resource),
585            };
586
587            let subject_quads = self.query_quads(
588                subject_pattern
589                    .subject
590                    .as_ref()
591                    .and_then(|s| Self::string_to_subject(s))
592                    .as_ref(),
593                subject_pattern
594                    .predicate
595                    .as_ref()
596                    .and_then(|p| Self::string_to_predicate(p))
597                    .as_ref(),
598                subject_pattern
599                    .object
600                    .as_ref()
601                    .and_then(|o| Self::string_to_object(o))
602                    .as_ref(),
603                None,
604            )?;
605            let object_quads = self.query_quads(
606                object_pattern
607                    .subject
608                    .as_ref()
609                    .and_then(|s| Self::string_to_subject(s))
610                    .as_ref(),
611                object_pattern
612                    .predicate
613                    .as_ref()
614                    .and_then(|p| Self::string_to_predicate(p))
615                    .as_ref(),
616                object_pattern
617                    .object
618                    .as_ref()
619                    .and_then(|o| Self::string_to_object(o))
620                    .as_ref(),
621                None,
622            )?;
623
624            result_quads.extend(subject_quads);
625            result_quads.extend(object_quads);
626        }
627
628        Ok(OxirsQueryResults::from_graph(result_quads))
629    }
630
631    /// Extract variables from SELECT clause
632    fn extract_select_variables(&self, sparql: &str) -> Result<Vec<String>> {
633        extract_select_variables(sparql)
634    }
635
636    /// Extract aggregate expressions from SELECT clause
637    fn extract_aggregates(&self, sparql: &str) -> Result<Vec<AggregateExpression>> {
638        extract_aggregates(sparql)
639    }
640
641    /// Find matching closing parenthesis
642    #[allow(dead_code)]
643    fn find_matching_paren(&self, text: &str) -> Option<usize> {
644        find_matching_paren(text)
645    }
646
647    /// Extract LIMIT value from query
648    fn extract_limit(&self, sparql: &str) -> Result<Option<usize>> {
649        let sparql_upper = sparql.to_uppercase();
650
651        if let Some(limit_start) = sparql_upper.find("LIMIT") {
652            let after_limit = &sparql[limit_start + 5..];
653
654            // Find the first token after LIMIT
655            for token in after_limit.split_whitespace() {
656                if let Ok(limit_value) = token.parse::<usize>() {
657                    return Ok(Some(limit_value));
658                }
659            }
660        }
661
662        Ok(None)
663    }
664
665    /// Extract ORDER BY clause
666    fn extract_order_by(&self, sparql: &str) -> Result<Option<OrderBy>> {
667        let sparql_upper = sparql.to_uppercase();
668
669        if let Some(order_start) = sparql_upper.find("ORDER BY") {
670            let after_order = &sparql[order_start + 8..];
671
672            // Check for DESC or ASC
673            let mut descending = false;
674            let tokens: Vec<&str> = after_order.split_whitespace().collect();
675
676            if tokens.is_empty() {
677                return Ok(None);
678            }
679
680            let mut var_token = tokens[0];
681
682            // Check for DESC/ASC modifier
683            if tokens.len() > 1 {
684                let modifier = tokens[1].to_uppercase();
685                if modifier == "DESC" {
686                    descending = true;
687                } else if modifier == "ASC" {
688                    descending = false;
689                }
690            } else if var_token.to_uppercase().ends_with("DESC") {
691                // Handle DESC attached to variable
692                var_token = var_token.trim_end_matches("DESC").trim_end_matches("desc");
693                descending = true;
694            } else if var_token.to_uppercase().ends_with("ASC") {
695                // Handle ASC attached to variable
696                var_token = var_token.trim_end_matches("ASC").trim_end_matches("asc");
697            }
698
699            // Check if DESC() or ASC() function
700            if var_token.to_uppercase().starts_with("DESC(") {
701                descending = true;
702                var_token = var_token[5..].trim_end_matches(')');
703            } else if var_token.to_uppercase().starts_with("ASC(") {
704                var_token = var_token[4..].trim_end_matches(')');
705            }
706
707            // Extract variable name
708            let variable = var_token.trim_start_matches('?').trim().to_string();
709
710            if !variable.is_empty() {
711                return Ok(Some(OrderBy {
712                    variable,
713                    descending,
714                }));
715            }
716        }
717
718        Ok(None)
719    }
720
721    /// Remove duplicate bindings for DISTINCT
722    fn remove_duplicate_bindings(
723        &self,
724        results: Vec<VariableBinding>,
725        variables: &[String],
726    ) -> Vec<VariableBinding> {
727        use std::collections::HashSet;
728
729        let mut seen = HashSet::new();
730        let mut unique_results = Vec::new();
731
732        for binding in results {
733            // Create a signature for this binding based on the selected variables
734            let mut signature = String::new();
735            for var in variables {
736                if let Some(term) = binding.get(var) {
737                    signature.push_str(&format!("{:?}|", term));
738                } else {
739                    signature.push_str("UNBOUND|");
740                }
741            }
742
743            if seen.insert(signature) {
744                unique_results.push(binding);
745            }
746        }
747
748        unique_results
749    }
750
751    /// Apply aggregate functions to results
752    fn apply_aggregates(
753        &self,
754        results: Vec<VariableBinding>,
755        aggregates: &[AggregateExpression],
756    ) -> Result<(Vec<VariableBinding>, Vec<String>)> {
757        apply_aggregates(results, aggregates)
758    }
759
760    /// Sort results according to ORDER BY
761    fn sort_results(&self, results: &mut [VariableBinding], order_by: &OrderBy) {
762        results.sort_by(|a, b| {
763            let a_val = a.get(&order_by.variable);
764            let b_val = b.get(&order_by.variable);
765
766            let cmp = match (a_val, b_val) {
767                (Some(a_term), Some(b_term)) => self.compare_terms(a_term, b_term),
768                (Some(_), None) => std::cmp::Ordering::Less,
769                (None, Some(_)) => std::cmp::Ordering::Greater,
770                (None, None) => std::cmp::Ordering::Equal,
771            };
772
773            if order_by.descending {
774                cmp.reverse()
775            } else {
776                cmp
777            }
778        });
779    }
780
781    /// Compare two terms for ordering
782    fn compare_terms(&self, a: &Term, b: &Term) -> std::cmp::Ordering {
783        use std::cmp::Ordering;
784
785        match (a, b) {
786            (Term::Literal(a_lit), Term::Literal(b_lit)) => {
787                let a_val = a_lit.value();
788                let b_val = b_lit.value();
789
790                // Try numeric comparison first
791                if let (Ok(a_num), Ok(b_num)) = (a_val.parse::<f64>(), b_val.parse::<f64>()) {
792                    if a_num < b_num {
793                        Ordering::Less
794                    } else if a_num > b_num {
795                        Ordering::Greater
796                    } else {
797                        Ordering::Equal
798                    }
799                } else {
800                    // Lexicographic comparison
801                    a_val.cmp(b_val)
802                }
803            }
804            (Term::NamedNode(a_node), Term::NamedNode(b_node)) => {
805                a_node.as_str().cmp(b_node.as_str())
806            }
807            (Term::BlankNode(a_bnode), Term::BlankNode(b_bnode)) => {
808                a_bnode.as_str().cmp(b_bnode.as_str())
809            }
810            // Mixed types: Literals < URIs < BlankNodes
811            (Term::Literal(_), _) => Ordering::Less,
812            (_, Term::Literal(_)) => Ordering::Greater,
813            (Term::NamedNode(_), Term::BlankNode(_)) => Ordering::Less,
814            (Term::BlankNode(_), Term::NamedNode(_)) => Ordering::Greater,
815            _ => Ordering::Equal,
816        }
817    }
818
819    /// Extract OFFSET value from query
820    fn extract_offset(&self, sparql: &str) -> Result<usize> {
821        let sparql_upper = sparql.to_uppercase();
822
823        if let Some(offset_start) = sparql_upper.find("OFFSET") {
824            let after_offset = &sparql[offset_start + 6..];
825
826            // Find the first token after OFFSET
827            for token in after_offset.split_whitespace() {
828                if let Ok(offset_value) = token.parse::<usize>() {
829                    return Ok(offset_value);
830                }
831            }
832        }
833
834        Ok(0)
835    }
836
837    /// Extract BIND clauses from WHERE clause
838    fn extract_bind_expressions(&self, sparql: &str) -> Result<Vec<BindExpression>> {
839        extract_bind_expressions(sparql)
840    }
841
842    /// Split function arguments respecting quotes and parentheses
843    #[allow(dead_code)]
844    fn split_function_args(&self, args_text: &str) -> Vec<String> {
845        split_function_args(args_text)
846    }
847
848    /// Parse an expression for BIND
849    #[allow(dead_code)]
850    fn parse_expression(&self, expr: &str) -> Result<Expression> {
851        parse_expression(expr)
852    }
853
854    /// Evaluate an expression against a binding
855    #[allow(dead_code)]
856    fn evaluate_expression(&self, expr: &Expression, binding: &VariableBinding) -> Result<Term> {
857        evaluate_expression(expr, binding)
858    }
859
860    /// Convert a term to a number for arithmetic
861    #[allow(dead_code)]
862    fn term_to_number(&self, term: &Term) -> Result<f64> {
863        term_to_number(term)
864    }
865
866    /// Convert a term to a string
867    #[allow(dead_code)]
868    fn term_to_string(&self, term: &Term) -> String {
869        term_to_string(term)
870    }
871
872    /// Apply BIND expressions to results
873    fn apply_bind_expressions(
874        &self,
875        results: Vec<VariableBinding>,
876        binds: &[BindExpression],
877    ) -> Result<Vec<VariableBinding>> {
878        apply_bind_expressions(results, binds)
879    }
880
881    /// Extract VALUES clause from WHERE clause
882    fn extract_values_clause(&self, sparql: &str) -> Result<Option<ValuesClause>> {
883        let sparql_upper = sparql.to_uppercase();
884
885        if let Some(values_start) = sparql_upper.find("VALUES") {
886            let after_values = &sparql[values_start + 6..].trim_start();
887
888            // Parse variable list: (?var1 ?var2 ...)
889            if let Some(paren_start) = after_values.find('(') {
890                if let Some(paren_end) = after_values[paren_start..].find(')') {
891                    let var_list = &after_values[paren_start + 1..paren_start + paren_end];
892                    let variables: Vec<String> = var_list
893                        .split_whitespace()
894                        .filter_map(|v| v.strip_prefix('?'))
895                        .map(|v| v.to_string())
896                        .collect();
897
898                    if variables.is_empty() {
899                        return Ok(None);
900                    }
901
902                    // Parse data rows: { (val1 val2) (val3 val4) ... }
903                    let after_vars = &after_values[paren_start + paren_end + 1..].trim_start();
904                    if let Some(brace_start) = after_vars.find('{') {
905                        if let Some(brace_end) = after_vars[brace_start..].rfind('}') {
906                            let data_block = &after_vars[brace_start + 1..brace_start + brace_end];
907
908                            let mut rows = Vec::new();
909                            let mut current_pos = 0;
910
911                            // Parse each row (val1 val2 ...)
912                            while current_pos < data_block.len() {
913                                if let Some(row_start) = data_block[current_pos..].find('(') {
914                                    let abs_row_start = current_pos + row_start;
915                                    if let Some(row_end) = data_block[abs_row_start..].find(')') {
916                                        let row_content =
917                                            &data_block[abs_row_start + 1..abs_row_start + row_end];
918
919                                        // Parse row values
920                                        let values: Vec<String> = row_content
921                                            .split_whitespace()
922                                            .map(|v| {
923                                                // Remove quotes if present
924                                                if (v.starts_with('"') && v.ends_with('"'))
925                                                    || (v.starts_with('\'') && v.ends_with('\''))
926                                                {
927                                                    v[1..v.len() - 1].to_string()
928                                                } else {
929                                                    v.to_string()
930                                                }
931                                            })
932                                            .collect();
933
934                                        if values.len() == variables.len() {
935                                            rows.push(values);
936                                        }
937
938                                        current_pos = abs_row_start + row_end + 1;
939                                    } else {
940                                        break;
941                                    }
942                                } else {
943                                    break;
944                                }
945                            }
946
947                            if !rows.is_empty() {
948                                return Ok(Some(ValuesClause { variables, rows }));
949                            }
950                        }
951                    }
952                }
953            }
954        }
955
956        Ok(None)
957    }
958
959    /// Apply VALUES clause to create initial bindings
960    fn apply_values_clause(&self, values: &ValuesClause) -> Result<Vec<VariableBinding>> {
961        let mut bindings = Vec::new();
962
963        for row in &values.rows {
964            let mut binding = VariableBinding::new();
965
966            for (idx, var_name) in values.variables.iter().enumerate() {
967                if idx < row.len() {
968                    let value_str = &row[idx];
969
970                    // Parse the value into a Term
971                    let term = if value_str.starts_with('<') && value_str.ends_with('>') {
972                        // URI
973                        let uri = &value_str[1..value_str.len() - 1];
974                        Term::from(NamedNode::new_unchecked(uri))
975                    } else {
976                        // Literal
977                        Term::from(Literal::new(value_str.clone()))
978                    };
979
980                    binding.bind(var_name.clone(), term);
981                }
982            }
983
984            bindings.push(binding);
985        }
986
987        Ok(bindings)
988    }
989
990    /// Extract FILTER expressions from query
991    fn extract_filter_expressions(&self, sparql: &str) -> Result<Vec<FilterExpression>> {
992        extract_filter_expressions(sparql)
993    }
994
995    /// Evaluate FILTER expressions against a binding
996    fn evaluate_filters(&self, binding: &VariableBinding, filters: &[FilterExpression]) -> bool {
997        evaluate_filters(binding, filters)
998    }
999
1000    /// Apply optional patterns to existing bindings
1001    fn apply_optional_patterns(
1002        &self,
1003        current_results: Vec<VariableBinding>,
1004        optional_patterns: &[SimpleTriplePattern],
1005    ) -> Result<Vec<VariableBinding>> {
1006        let mut new_results = Vec::new();
1007
1008        for binding in current_results {
1009            let mut extended = false;
1010
1011            // Try to match optional patterns with current binding
1012            for pattern in optional_patterns {
1013                let matching_quads = self.query_quads_by_pattern(pattern)?;
1014
1015                for quad in matching_quads {
1016                    let mut new_binding = binding.clone();
1017
1018                    // Try to extend the binding with optional pattern
1019                    let mut compatible = true;
1020
1021                    // Check subject compatibility
1022                    if let Some(var) = &pattern.subject {
1023                        if let Some(var_name) = var.strip_prefix('?') {
1024                            if let Some(existing) = binding.get(var_name) {
1025                                // Variable already bound - check compatibility
1026                                let new_term = Term::from(quad.subject().clone());
1027                                if format!("{:?}", existing) != format!("{:?}", new_term) {
1028                                    compatible = false;
1029                                }
1030                            } else {
1031                                // New binding
1032                                new_binding
1033                                    .bind(var_name.to_string(), Term::from(quad.subject().clone()));
1034                            }
1035                        }
1036                    }
1037
1038                    // Check predicate compatibility
1039                    if compatible {
1040                        if let Some(var) = &pattern.predicate {
1041                            if let Some(var_name) = var.strip_prefix('?') {
1042                                if let Some(existing) = binding.get(var_name) {
1043                                    let new_term = Term::from(quad.predicate().clone());
1044                                    if format!("{:?}", existing) != format!("{:?}", new_term) {
1045                                        compatible = false;
1046                                    }
1047                                } else {
1048                                    new_binding.bind(
1049                                        var_name.to_string(),
1050                                        Term::from(quad.predicate().clone()),
1051                                    );
1052                                }
1053                            }
1054                        }
1055                    }
1056
1057                    // Check object compatibility
1058                    if compatible {
1059                        if let Some(var) = &pattern.object {
1060                            if let Some(var_name) = var.strip_prefix('?') {
1061                                if let Some(existing) = binding.get(var_name) {
1062                                    let new_term = Term::from(quad.object().clone());
1063                                    if format!("{:?}", existing) != format!("{:?}", new_term) {
1064                                        compatible = false;
1065                                    }
1066                                } else {
1067                                    new_binding.bind(
1068                                        var_name.to_string(),
1069                                        Term::from(quad.object().clone()),
1070                                    );
1071                                }
1072                            }
1073                        }
1074                    }
1075
1076                    if compatible {
1077                        new_results.push(new_binding);
1078                        extended = true;
1079                    }
1080                }
1081            }
1082
1083            // If no optional pattern matched, keep original binding
1084            if !extended {
1085                new_results.push(binding);
1086            }
1087        }
1088
1089        Ok(new_results)
1090    }
1091
1092    /// Extract pattern groups (required and optional) from WHERE clause
1093    fn extract_pattern_groups(&self, sparql: &str) -> Result<Vec<PatternGroup>> {
1094        let mut groups = Vec::new();
1095
1096        if let Some(where_start) = sparql.to_uppercase().find("WHERE") {
1097            let where_clause = &sparql[where_start + 5..];
1098
1099            // Find the main WHERE block
1100            if let Some(start_brace) = where_clause.find('{') {
1101                if let Some(end_brace) = self.find_matching_brace(where_clause, start_brace) {
1102                    let pattern_text = &where_clause[start_brace + 1..end_brace];
1103
1104                    // Check for OPTIONAL blocks
1105                    let sparql_upper = pattern_text.to_uppercase();
1106                    if sparql_upper.contains("OPTIONAL") {
1107                        // Parse with OPTIONAL support
1108                        let mut pos = 0;
1109                        let mut required_patterns = Vec::new();
1110
1111                        while pos < pattern_text.len() {
1112                            // Look for OPTIONAL keyword
1113                            if let Some(opt_pos) =
1114                                pattern_text[pos..].to_uppercase().find("OPTIONAL")
1115                            {
1116                                let abs_pos = pos + opt_pos;
1117
1118                                // Add any required patterns before OPTIONAL
1119                                let before_optional = &pattern_text[pos..abs_pos];
1120                                if let Some(req_pattern) =
1121                                    self.parse_simple_pattern(before_optional)
1122                                {
1123                                    required_patterns.push(req_pattern);
1124                                }
1125
1126                                // Find OPTIONAL block
1127                                let after_optional = &pattern_text[abs_pos + 8..];
1128                                if let Some(opt_brace) = after_optional.find('{') {
1129                                    if let Some(opt_end) =
1130                                        self.find_matching_brace(after_optional, opt_brace)
1131                                    {
1132                                        let optional_content =
1133                                            &after_optional[opt_brace + 1..opt_end];
1134
1135                                        // Parse optional patterns
1136                                        if let Some(opt_pattern) =
1137                                            self.parse_simple_pattern(optional_content)
1138                                        {
1139                                            groups.push(PatternGroup {
1140                                                patterns: vec![opt_pattern],
1141                                                optional: true,
1142                                            });
1143                                        }
1144
1145                                        pos = abs_pos + 8 + opt_end + 1;
1146                                    } else {
1147                                        break;
1148                                    }
1149                                } else {
1150                                    break;
1151                                }
1152                            } else {
1153                                // No more OPTIONAL, add remaining as required
1154                                if let Some(req_pattern) =
1155                                    self.parse_simple_pattern(&pattern_text[pos..])
1156                                {
1157                                    required_patterns.push(req_pattern);
1158                                }
1159                                break;
1160                            }
1161                        }
1162
1163                        // Add required patterns group
1164                        if !required_patterns.is_empty() {
1165                            groups.push(PatternGroup {
1166                                patterns: required_patterns,
1167                                optional: false,
1168                            });
1169                        }
1170                    } else {
1171                        // No OPTIONAL, parse as simple patterns
1172                        let patterns = self.extract_triple_patterns(sparql)?;
1173                        if !patterns.is_empty() {
1174                            groups.push(PatternGroup {
1175                                patterns,
1176                                optional: false,
1177                            });
1178                        }
1179                    }
1180                }
1181            }
1182        }
1183
1184        // Fallback: if no groups found, try simple extraction
1185        if groups.is_empty() {
1186            let patterns = self.extract_triple_patterns(sparql)?;
1187            if !patterns.is_empty() {
1188                groups.push(PatternGroup {
1189                    patterns,
1190                    optional: false,
1191                });
1192            }
1193        }
1194
1195        Ok(groups)
1196    }
1197
1198    /// Find matching closing brace
1199    fn find_matching_brace(&self, text: &str, start_pos: usize) -> Option<usize> {
1200        let mut brace_count = 1;
1201        let chars: Vec<char> = text.chars().collect();
1202
1203        for (i, &ch) in chars.iter().enumerate().skip(start_pos + 1) {
1204            if ch == '{' {
1205                brace_count += 1;
1206            } else if ch == '}' {
1207                brace_count -= 1;
1208                if brace_count == 0 {
1209                    return Some(i);
1210                }
1211            }
1212        }
1213
1214        None
1215    }
1216
1217    /// Parse a simple triple pattern from text
1218    fn parse_simple_pattern(&self, text: &str) -> Option<SimpleTriplePattern> {
1219        let tokens: Vec<&str> = text
1220            .split_whitespace()
1221            .filter(|t| !t.is_empty() && *t != "." && *t != ";")
1222            .collect();
1223
1224        if tokens.len() >= 3 {
1225            Some(SimpleTriplePattern {
1226                subject: Some(tokens[0].to_string()),
1227                predicate: Some(tokens[1].to_string()),
1228                object: Some(tokens[2].to_string()),
1229            })
1230        } else {
1231            None
1232        }
1233    }
1234
1235    /// Check if query contains UNION clause
1236    fn has_union(&self, sparql: &str) -> bool {
1237        sparql.to_uppercase().contains("UNION")
1238    }
1239
1240    /// Extract UNION groups from WHERE clause
1241    fn extract_union_groups(&self, sparql: &str) -> Result<Option<UnionGroup>> {
1242        if !self.has_union(sparql) {
1243            return Ok(None);
1244        }
1245
1246        if let Some(where_start) = sparql.to_uppercase().find("WHERE") {
1247            let where_clause = &sparql[where_start + 5..];
1248
1249            if let Some(start_brace) = where_clause.find('{') {
1250                if let Some(end_brace) = self.find_matching_brace(where_clause, start_brace) {
1251                    let pattern_text = &where_clause[start_brace + 1..end_brace];
1252
1253                    // Split by UNION keyword
1254                    let mut branches = Vec::new();
1255                    let mut current_pos = 0;
1256                    let pattern_upper = pattern_text.to_uppercase();
1257
1258                    // Find all UNION positions
1259                    let mut union_positions = Vec::new();
1260                    let mut search_pos = 0;
1261                    while let Some(pos) = pattern_upper[search_pos..].find("UNION") {
1262                        let abs_pos = search_pos + pos;
1263                        union_positions.push(abs_pos);
1264                        search_pos = abs_pos + 5;
1265                    }
1266
1267                    // Extract branches between UNION keywords
1268                    for &union_pos in &union_positions {
1269                        let branch_text = &pattern_text[current_pos..union_pos];
1270                        if let Some(branch_groups) = self.parse_union_branch(branch_text)? {
1271                            branches.push(branch_groups);
1272                        }
1273                        current_pos = union_pos + 5;
1274                    }
1275
1276                    // Add the last branch after the final UNION
1277                    if current_pos < pattern_text.len() {
1278                        let branch_text = &pattern_text[current_pos..];
1279                        if let Some(branch_groups) = self.parse_union_branch(branch_text)? {
1280                            branches.push(branch_groups);
1281                        }
1282                    }
1283
1284                    if !branches.is_empty() {
1285                        return Ok(Some(UnionGroup { branches }));
1286                    }
1287                }
1288            }
1289        }
1290
1291        Ok(None)
1292    }
1293
1294    /// Parse a single UNION branch into pattern groups
1295    fn parse_union_branch(&self, branch_text: &str) -> Result<Option<Vec<PatternGroup>>> {
1296        let trimmed = branch_text.trim();
1297
1298        // Check if this branch is wrapped in braces
1299        let content = if trimmed.starts_with('{') && trimmed.ends_with('}') {
1300            &trimmed[1..trimmed.len() - 1]
1301        } else {
1302            trimmed
1303        };
1304
1305        // Parse the content as pattern groups (could contain OPTIONAL)
1306        // Create a temporary SPARQL query with WHERE clause for parsing
1307        let temp_query = format!("SELECT * WHERE {{ {} }}", content);
1308        let groups = self.extract_pattern_groups(&temp_query)?;
1309
1310        if groups.is_empty() {
1311            Ok(None)
1312        } else {
1313            Ok(Some(groups))
1314        }
1315    }
1316
1317    /// Extract triple patterns from WHERE clause
1318    fn extract_triple_patterns(&self, sparql: &str) -> Result<Vec<SimpleTriplePattern>> {
1319        let mut patterns = Vec::new();
1320
1321        if let Some(where_start) = sparql.to_uppercase().find("WHERE") {
1322            let where_clause = &sparql[where_start + 5..];
1323
1324            // Simple pattern extraction for { ?s ?p ?o }
1325            if let Some(start_brace) = where_clause.find('{') {
1326                if let Some(end_brace) = where_clause.find('}') {
1327                    let pattern_text = &where_clause[start_brace + 1..end_brace];
1328                    let tokens: Vec<&str> = pattern_text.split_whitespace().collect();
1329
1330                    if tokens.len() >= 3 {
1331                        let subject = if tokens[0] == "." {
1332                            None
1333                        } else {
1334                            Some(tokens[0].to_string())
1335                        };
1336                        let predicate = if tokens[1] == "." {
1337                            None
1338                        } else {
1339                            Some(tokens[1].to_string())
1340                        };
1341                        let object = if tokens[2] == "." {
1342                            None
1343                        } else {
1344                            Some(tokens[2].to_string())
1345                        };
1346
1347                        patterns.push(SimpleTriplePattern {
1348                            subject,
1349                            predicate,
1350                            object,
1351                        });
1352                    }
1353                }
1354            }
1355        }
1356
1357        Ok(patterns)
1358    }
1359
1360    /// Extract resources from DESCRIBE clause
1361    fn extract_describe_resources(&self, sparql: &str) -> Result<Vec<String>> {
1362        let mut resources = Vec::new();
1363
1364        if let Some(describe_start) = sparql.to_uppercase().find("DESCRIBE") {
1365            let describe_clause = &sparql[describe_start + 8..];
1366
1367            for token in describe_clause.split_whitespace() {
1368                if token.starts_with('<') && token.ends_with('>') {
1369                    resources.push(token.to_string());
1370                } else if token.starts_with('?') {
1371                    // Variable - for now just treat as literal
1372                    resources.push(token.to_string());
1373                }
1374
1375                if token.to_uppercase() == "WHERE" {
1376                    break;
1377                }
1378            }
1379        }
1380
1381        Ok(resources)
1382    }
1383
1384    /// Query quads by pattern (helper method for SPARQL execution)
1385    fn query_quads_by_pattern(&self, pattern: &SimpleTriplePattern) -> Result<Vec<Quad>> {
1386        match &self.backend {
1387            StorageBackend::UltraMemory(index, _arena) => {
1388                let mut results = Vec::new();
1389
1390                // Convert pattern to quad pattern
1391                let _subject_filter = pattern.subject.as_deref();
1392                let _predicate_filter = pattern.predicate.as_deref();
1393                let _object_filter = pattern.object.as_deref();
1394
1395                // Query the ultra index using find_quads method
1396                // Convert string filters to proper types
1397                let results_vec = index.find_quads(
1398                    pattern
1399                        .subject
1400                        .as_ref()
1401                        .and_then(|s| Self::string_to_subject(s))
1402                        .as_ref(),
1403                    pattern
1404                        .predicate
1405                        .as_ref()
1406                        .and_then(|p| Self::string_to_predicate(p))
1407                        .as_ref(),
1408                    pattern
1409                        .object
1410                        .as_ref()
1411                        .and_then(|o| Self::string_to_object(o))
1412                        .as_ref(),
1413                    None, // graph_name
1414                );
1415                results.extend(results_vec);
1416
1417                Ok(results)
1418            }
1419            StorageBackend::Memory(storage) => {
1420                let storage = storage.read().expect("storage lock should not be poisoned");
1421                let mut results = Vec::new();
1422
1423                for quad in &storage.quads {
1424                    let mut matches = true;
1425
1426                    if let Some(s) = &pattern.subject {
1427                        if !s.starts_with('?') && quad.subject().to_string() != *s {
1428                            matches = false;
1429                        }
1430                    }
1431
1432                    if let Some(p) = &pattern.predicate {
1433                        if !p.starts_with('?') && quad.predicate().to_string() != *p {
1434                            matches = false;
1435                        }
1436                    }
1437
1438                    if let Some(o) = &pattern.object {
1439                        if !o.starts_with('?') && quad.object().to_string() != *o {
1440                            matches = false;
1441                        }
1442                    }
1443
1444                    if matches {
1445                        results.push(quad.clone());
1446                    }
1447                }
1448
1449                Ok(results)
1450            }
1451            StorageBackend::Persistent(storage, _) => {
1452                let storage = storage.read().expect("storage lock should not be poisoned");
1453                let mut results = Vec::new();
1454
1455                for quad in &storage.quads {
1456                    let mut matches = true;
1457
1458                    if let Some(s) = &pattern.subject {
1459                        if !s.starts_with('?') && quad.subject().to_string() != *s {
1460                            matches = false;
1461                        }
1462                    }
1463
1464                    if let Some(p) = &pattern.predicate {
1465                        if !p.starts_with('?') && quad.predicate().to_string() != *p {
1466                            matches = false;
1467                        }
1468                    }
1469
1470                    if let Some(o) = &pattern.object {
1471                        if !o.starts_with('?') && quad.object().to_string() != *o {
1472                            matches = false;
1473                        }
1474                    }
1475
1476                    if matches {
1477                        results.push(quad.clone());
1478                    }
1479                }
1480
1481                Ok(results)
1482            }
1483        }
1484    }
1485
1486    /// Extract and expand PREFIX declarations
1487    #[allow(dead_code)]
1488    fn extract_and_expand_prefixes(
1489        &self,
1490        sparql: &str,
1491    ) -> Result<(HashMap<String, String>, String)> {
1492        extract_and_expand_prefixes(sparql)
1493    }
1494
1495    /// Query quads from the backend
1496    fn query_quads(
1497        &self,
1498        subject: Option<&Subject>,
1499        predicate: Option<&Predicate>,
1500        object: Option<&Object>,
1501        graph_name: Option<&GraphName>,
1502    ) -> Result<Vec<Quad>> {
1503        match &self.backend {
1504            StorageBackend::UltraMemory(index, _arena) => {
1505                Ok(index.find_quads(subject, predicate, object, graph_name))
1506            }
1507            StorageBackend::Memory(storage) => {
1508                let storage = storage.read().expect("storage lock should not be poisoned");
1509                Ok(storage.query_quads(subject, predicate, object, graph_name))
1510            }
1511            StorageBackend::Persistent(storage, _) => {
1512                let storage = storage.read().expect("storage lock should not be poisoned");
1513                Ok(storage.query_quads(subject, predicate, object, graph_name))
1514            }
1515        }
1516    }
1517
1518    /// Convert string to Subject
1519    fn string_to_subject(s: &str) -> Option<Subject> {
1520        if s.starts_with('<') && s.ends_with('>') {
1521            Some(Subject::NamedNode(NamedNode::new_unchecked(
1522                &s[1..s.len() - 1],
1523            )))
1524        } else if s.starts_with('_') {
1525            Some(Subject::BlankNode(BlankNode::new_unchecked(s)))
1526        } else {
1527            None
1528        }
1529    }
1530
1531    /// Convert string to Predicate
1532    fn string_to_predicate(p: &str) -> Option<Predicate> {
1533        if p.starts_with('<') && p.ends_with('>') {
1534            Some(Predicate::NamedNode(NamedNode::new_unchecked(
1535                &p[1..p.len() - 1],
1536            )))
1537        } else {
1538            None
1539        }
1540    }
1541
1542    /// Convert string to Object
1543    fn string_to_object(o: &str) -> Option<Object> {
1544        if o.starts_with('<') && o.ends_with('>') {
1545            Some(Object::NamedNode(NamedNode::new_unchecked(
1546                &o[1..o.len() - 1],
1547            )))
1548        } else if o.starts_with('_') {
1549            Some(Object::BlankNode(BlankNode::new_unchecked(o)))
1550        } else if o.starts_with('"') {
1551            Some(Object::Literal(Literal::new(o.trim_matches('"'))))
1552        } else {
1553            None
1554        }
1555    }
1556}
1557
1558#[cfg(test)]
1559mod tests {
1560    use super::*;
1561    use crate::rdf_store::storage::{MemoryStorage, StorageBackend};
1562    use std::sync::{Arc, RwLock};
1563
1564    /// Helper: create a Memory-backed StorageBackend with no data
1565    fn make_empty_backend() -> StorageBackend {
1566        StorageBackend::Memory(Arc::new(RwLock::new(MemoryStorage::new())))
1567    }
1568
1569    /// Helper: create a Memory-backed StorageBackend pre-populated with sample RDF triples
1570    fn make_populated_backend() -> StorageBackend {
1571        let mut storage = MemoryStorage::new();
1572
1573        // Insert: <http://example.org/alice> <http://xmlns.com/foaf/0.1/name> "Alice"
1574        let alice = NamedNode::new_unchecked("http://example.org/alice");
1575        let name_pred = NamedNode::new_unchecked("http://xmlns.com/foaf/0.1/name");
1576        let bob = NamedNode::new_unchecked("http://example.org/bob");
1577        let knows_pred = NamedNode::new_unchecked("http://xmlns.com/foaf/0.1/knows");
1578        let age_pred = NamedNode::new_unchecked("http://xmlns.com/foaf/0.1/age");
1579
1580        let alice_name_quad = Quad::new(
1581            Subject::NamedNode(alice.clone()),
1582            Predicate::NamedNode(name_pred.clone()),
1583            Object::Literal(Literal::new("Alice")),
1584            GraphName::DefaultGraph,
1585        );
1586        let bob_name_quad = Quad::new(
1587            Subject::NamedNode(bob.clone()),
1588            Predicate::NamedNode(name_pred),
1589            Object::Literal(Literal::new("Bob")),
1590            GraphName::DefaultGraph,
1591        );
1592        let alice_knows_bob = Quad::new(
1593            Subject::NamedNode(alice.clone()),
1594            Predicate::NamedNode(knows_pred.clone()),
1595            Object::NamedNode(bob.clone()),
1596            GraphName::DefaultGraph,
1597        );
1598        let bob_knows_alice = Quad::new(
1599            Subject::NamedNode(bob.clone()),
1600            Predicate::NamedNode(knows_pred),
1601            Object::NamedNode(alice.clone()),
1602            GraphName::DefaultGraph,
1603        );
1604        let alice_age_quad = Quad::new(
1605            Subject::NamedNode(alice),
1606            Predicate::NamedNode(age_pred),
1607            Object::Literal(Literal::new("30")),
1608            GraphName::DefaultGraph,
1609        );
1610
1611        storage.insert_quad(alice_name_quad);
1612        storage.insert_quad(bob_name_quad);
1613        storage.insert_quad(alice_knows_bob);
1614        storage.insert_quad(bob_knows_alice);
1615        storage.insert_quad(alice_age_quad);
1616
1617        StorageBackend::Memory(Arc::new(RwLock::new(storage)))
1618    }
1619
1620    // --- Executor construction and stats ---
1621
1622    #[test]
1623    fn test_executor_new_initial_stats_are_zero() {
1624        let backend = make_empty_backend();
1625        let executor = QueryExecutor::new(&backend);
1626        let stats = executor.get_stats();
1627        assert_eq!(
1628            stats.total_queries, 0,
1629            "Initially total_queries should be 0"
1630        );
1631        assert_eq!(
1632            stats.select_queries, 0,
1633            "Initially select_queries should be 0"
1634        );
1635        assert_eq!(stats.ask_queries, 0, "Initially ask_queries should be 0");
1636        assert_eq!(stats.construct_queries, 0);
1637        assert_eq!(stats.describe_queries, 0);
1638    }
1639
1640    // --- SELECT query tests ---
1641
1642    #[test]
1643    fn test_select_star_on_empty_store_returns_empty() {
1644        let backend = make_empty_backend();
1645        let executor = QueryExecutor::new(&backend);
1646        let result = executor
1647            .execute("SELECT * WHERE { ?s ?p ?o }")
1648            .expect("SELECT * on empty store should not error");
1649        assert!(
1650            result.is_empty(),
1651            "Result should be empty for an empty store"
1652        );
1653    }
1654
1655    #[test]
1656    fn test_select_star_returns_all_triples() {
1657        let backend = make_populated_backend();
1658        let executor = QueryExecutor::new(&backend);
1659        let result = executor
1660            .execute("SELECT * WHERE { ?s ?p ?o }")
1661            .expect("SELECT * should succeed");
1662        // We inserted 5 quads
1663        assert_eq!(result.len(), 5, "Expected 5 results, got {}", result.len());
1664    }
1665
1666    #[test]
1667    fn test_select_with_specific_subject() {
1668        let backend = make_populated_backend();
1669        let executor = QueryExecutor::new(&backend);
1670        let query = "SELECT ?p ?o WHERE { <http://example.org/alice> ?p ?o }";
1671        let result = executor
1672            .execute(query)
1673            .expect("SELECT with specific subject should succeed");
1674        // Alice has: name, knows, age = 3 triples
1675        assert_eq!(
1676            result.len(),
1677            3,
1678            "Alice should have 3 properties, got {}",
1679            result.len()
1680        );
1681    }
1682
1683    #[test]
1684    fn test_select_with_specific_predicate() {
1685        let backend = make_populated_backend();
1686        let executor = QueryExecutor::new(&backend);
1687        let query = "SELECT ?s ?o WHERE { ?s <http://xmlns.com/foaf/0.1/name> ?o }";
1688        let result = executor
1689            .execute(query)
1690            .expect("SELECT with specific predicate should succeed");
1691        // Two people have names: alice and bob
1692        assert_eq!(
1693            result.len(),
1694            2,
1695            "Expected 2 name bindings, got {}",
1696            result.len()
1697        );
1698    }
1699
1700    #[test]
1701    fn test_select_with_specific_object_iri() {
1702        let backend = make_populated_backend();
1703        let executor = QueryExecutor::new(&backend);
1704        let query =
1705            "SELECT ?s WHERE { ?s <http://xmlns.com/foaf/0.1/knows> <http://example.org/bob> }";
1706        let result = executor
1707            .execute(query)
1708            .expect("SELECT with specific object should succeed");
1709        // Only alice knows bob
1710        assert_eq!(
1711            result.len(),
1712            1,
1713            "Expected 1 result (alice knows bob), got {}",
1714            result.len()
1715        );
1716    }
1717
1718    #[test]
1719    fn test_select_with_limit() {
1720        let backend = make_populated_backend();
1721        let executor = QueryExecutor::new(&backend);
1722        let query = "SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 2";
1723        let result = executor
1724            .execute(query)
1725            .expect("SELECT with LIMIT should succeed");
1726        assert!(
1727            result.len() <= 2,
1728            "LIMIT 2 should return at most 2 results, got {}",
1729            result.len()
1730        );
1731    }
1732
1733    #[test]
1734    fn test_select_with_offset_and_limit() {
1735        let backend = make_populated_backend();
1736        let executor = QueryExecutor::new(&backend);
1737
1738        let all_results = executor
1739            .execute("SELECT * WHERE { ?s ?p ?o }")
1740            .expect("Unbounded SELECT should succeed");
1741        let total = all_results.len();
1742
1743        let limited_results = executor
1744            .execute("SELECT * WHERE { ?s ?p ?o } LIMIT 3")
1745            .expect("SELECT with LIMIT 3 should succeed");
1746
1747        // LIMIT 3 should yield exactly 3 results (since total >= 3)
1748        if total >= 3 {
1749            assert_eq!(
1750                limited_results.len(),
1751                3,
1752                "Expected exactly 3 results with LIMIT 3"
1753            );
1754        }
1755    }
1756
1757    // --- ASK query tests ---
1758
1759    #[test]
1760    fn test_ask_returns_true_when_matching() {
1761        let backend = make_populated_backend();
1762        let executor = QueryExecutor::new(&backend);
1763        // Use WHERE keyword so the executor's extract_triple_patterns can find the pattern
1764        let result = executor
1765            .execute("ASK WHERE { ?s ?p ?o }")
1766            .expect("ASK WHERE should succeed");
1767        // Result should be a boolean true
1768        match result.results() {
1769            crate::rdf_store::types::QueryResults::Boolean(val) => {
1770                assert!(*val, "ASK WHERE should return true when store is non-empty");
1771            }
1772            other => panic!("Expected Boolean result from ASK WHERE, got: {:?}", other),
1773        }
1774    }
1775
1776    #[test]
1777    fn test_ask_returns_false_on_empty_store() {
1778        let backend = make_empty_backend();
1779        let executor = QueryExecutor::new(&backend);
1780        let result = executor
1781            .execute("ASK WHERE { ?s ?p ?o }")
1782            .expect("ASK WHERE on empty store should succeed");
1783        match result.results() {
1784            crate::rdf_store::types::QueryResults::Boolean(val) => {
1785                assert!(!val, "ASK WHERE on empty store should return false");
1786            }
1787            other => panic!("Expected Boolean result from ASK WHERE, got: {:?}", other),
1788        }
1789    }
1790
1791    // --- CONSTRUCT query tests ---
1792
1793    #[test]
1794    fn test_construct_query_basic() {
1795        let backend = make_populated_backend();
1796        let executor = QueryExecutor::new(&backend);
1797        let query = "CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }";
1798        let result = executor.execute(query).expect("CONSTRUCT should succeed");
1799        assert!(
1800            !result.is_empty(),
1801            "CONSTRUCT should return non-empty graph"
1802        );
1803    }
1804
1805    // --- DESCRIBE query tests ---
1806
1807    #[test]
1808    fn test_describe_query_specific_resource() {
1809        let backend = make_populated_backend();
1810        let executor = QueryExecutor::new(&backend);
1811        let query = "DESCRIBE <http://example.org/alice>";
1812        let result = executor.execute(query).expect("DESCRIBE should succeed");
1813        // DESCRIBE should return triples about alice
1814        // At minimum we should not error out
1815        let _ = result.len(); // Just verify it runs without panic
1816    }
1817
1818    // --- Error handling ---
1819
1820    #[test]
1821    fn test_unsupported_query_type_returns_error() {
1822        let backend = make_empty_backend();
1823        let executor = QueryExecutor::new(&backend);
1824        let result = executor.execute("INSERT DATA { <s> <p> <o> }");
1825        assert!(
1826            result.is_err(),
1827            "Unsupported query type should return an error"
1828        );
1829    }
1830
1831    #[test]
1832    fn test_execute_and_query_are_equivalent() {
1833        let backend = make_populated_backend();
1834        let executor = QueryExecutor::new(&backend);
1835        let sparql = "SELECT * WHERE { ?s ?p ?o }";
1836        let via_execute = executor.execute(sparql).expect("execute() should succeed");
1837        let via_query = executor.query(sparql).expect("query() should succeed");
1838        assert_eq!(
1839            via_execute.len(),
1840            via_query.len(),
1841            "execute() and query() should return the same number of results"
1842        );
1843    }
1844
1845    // --- Stats tracking after queries ---
1846
1847    #[test]
1848    fn test_stats_track_select_queries() {
1849        let backend = make_empty_backend();
1850        let executor = QueryExecutor::new(&backend);
1851        executor.execute("SELECT * WHERE { ?s ?p ?o }").ok();
1852        executor.execute("SELECT ?s WHERE { ?s ?p ?o }").ok();
1853        let stats = executor.get_stats();
1854        assert_eq!(
1855            stats.select_queries, 2,
1856            "Should have tracked 2 SELECT queries"
1857        );
1858        assert_eq!(stats.total_queries, 2);
1859    }
1860
1861    #[test]
1862    fn test_stats_track_ask_queries() {
1863        let backend = make_empty_backend();
1864        let executor = QueryExecutor::new(&backend);
1865        executor.execute("ASK WHERE { ?s ?p ?o }").ok();
1866        let stats = executor.get_stats();
1867        assert_eq!(stats.ask_queries, 1, "Should have tracked 1 ASK query");
1868    }
1869
1870    #[test]
1871    fn test_stats_track_mixed_query_types() {
1872        let backend = make_populated_backend();
1873        let executor = QueryExecutor::new(&backend);
1874        executor.execute("SELECT * WHERE { ?s ?p ?o }").ok();
1875        executor.execute("ASK WHERE { ?s ?p ?o }").ok();
1876        executor
1877            .execute("CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }")
1878            .ok();
1879        executor.execute("DESCRIBE <http://example.org/alice>").ok();
1880        let stats = executor.get_stats();
1881        assert_eq!(stats.select_queries, 1);
1882        assert_eq!(stats.ask_queries, 1);
1883        assert_eq!(stats.construct_queries, 1);
1884        assert_eq!(stats.describe_queries, 1);
1885        assert_eq!(stats.total_queries, 4);
1886    }
1887
1888    // --- PREFIX expansion ---
1889
1890    #[test]
1891    fn test_prefix_expansion_in_select() {
1892        let backend = make_populated_backend();
1893        let executor = QueryExecutor::new(&backend);
1894        let query = "PREFIX foaf: <http://xmlns.com/foaf/0.1/> \
1895                     SELECT ?s ?o WHERE { ?s foaf:name ?o }";
1896        let result = executor
1897            .execute(query)
1898            .expect("PREFIX expansion SELECT should succeed");
1899        // Should find both alice and bob's names
1900        assert_eq!(
1901            result.len(),
1902            2,
1903            "Expected 2 results with prefix expansion, got {}",
1904            result.len()
1905        );
1906    }
1907
1908    // --- SELECT DISTINCT ---
1909
1910    #[test]
1911    fn test_select_distinct_reduces_duplicates() {
1912        let backend = make_populated_backend();
1913        let executor = QueryExecutor::new(&backend);
1914        let all_subjects = executor
1915            .execute("SELECT ?s WHERE { ?s ?p ?o }")
1916            .expect("SELECT should succeed");
1917        let distinct_subjects = executor
1918            .execute("SELECT DISTINCT ?s WHERE { ?s ?p ?o }")
1919            .expect("SELECT DISTINCT should succeed");
1920        // DISTINCT should return <= the total (possibly fewer due to deduplication)
1921        assert!(
1922            distinct_subjects.len() <= all_subjects.len(),
1923            "DISTINCT should not exceed total results: {} vs {}",
1924            distinct_subjects.len(),
1925            all_subjects.len()
1926        );
1927    }
1928}