rdf_fusion_execution/sparql/
eval.rs

1use crate::RdfFusionContext;
2use crate::results::{QueryResults, QuerySolutionStream, QueryTripleStream};
3use crate::sparql::error::QueryEvaluationError;
4use crate::sparql::optimizer::{create_optimizer_rules, create_pyhsical_optimizer_rules};
5use crate::sparql::rewriting::GraphPatternRewriter;
6use crate::sparql::{Query, QueryDataset, QueryExplanation, QueryOptions};
7use datafusion::arrow::datatypes::Schema;
8use datafusion::common::instant::Instant;
9use datafusion::execution::{SessionState, SessionStateBuilder};
10use datafusion::physical_plan::{ExecutionPlan, execute_stream};
11use futures::StreamExt;
12use itertools::izip;
13use rdf_fusion_logical::RdfFusionLogicalPlanBuilderContext;
14use rdf_fusion_model::Iri;
15use rdf_fusion_model::Variable;
16use spargebra::algebra::GraphPattern;
17use spargebra::term::TriplePattern;
18use std::sync::Arc;
19
20/// Evaluates a SPARQL query and returns the results along with execution information.
21///
22/// Most users should refrain from directly using this function, as there are higher-level
23/// abstractions that provide APIs for querying.
24pub async fn evaluate_query(
25    ctx: &RdfFusionContext,
26    builder_context: RdfFusionLogicalPlanBuilderContext,
27    query: &Query,
28    options: QueryOptions,
29) -> Result<(QueryResults, QueryExplanation), QueryEvaluationError> {
30    let session_state = SessionStateBuilder::from(ctx.session_context().state())
31        .with_optimizer_rules(create_optimizer_rules(
32            ctx.create_view(),
33            options.optimization_level,
34        ))
35        .with_physical_optimizer_rules(create_pyhsical_optimizer_rules(
36            options.optimization_level,
37        ))
38        .build();
39
40    match &query.inner {
41        spargebra::Query::Select {
42            pattern, base_iri, ..
43        } => {
44            let (stream, explanation) = Box::pin(graph_pattern_to_stream(
45                session_state,
46                builder_context,
47                query,
48                pattern,
49                base_iri,
50            ))
51            .await?;
52            Ok((QueryResults::Solutions(stream), explanation))
53        }
54        spargebra::Query::Construct {
55            template,
56            pattern,
57            base_iri,
58            ..
59        } => {
60            let (stream, explanation) = Box::pin(graph_pattern_to_stream(
61                session_state,
62                builder_context,
63                query,
64                pattern,
65                base_iri,
66            ))
67            .await?;
68            Ok((
69                QueryResults::Graph(QueryTripleStream::new(template.clone(), stream)),
70                explanation,
71            ))
72        }
73        spargebra::Query::Ask {
74            pattern, base_iri, ..
75        } => {
76            let (mut stream, explanation) = Box::pin(graph_pattern_to_stream(
77                session_state,
78                builder_context,
79                query,
80                pattern,
81                base_iri,
82            ))
83            .await?;
84            let count = stream.next().await;
85            Ok((QueryResults::Boolean(count.is_some()), explanation))
86        }
87        spargebra::Query::Describe {
88            pattern, base_iri, ..
89        } => {
90            // TODO: Research what a good DESCRIBE implementation would look like.
91
92            let mut vars = Vec::new();
93            pattern.on_in_scope_variable(|v| vars.push(v.clone()));
94            let rdf_types = vars
95                .iter()
96                .map(|v| Variable::new(format!("{}__type", v.as_str())).unwrap())
97                .collect::<Vec<_>>();
98
99            let describe_pattern = izip!(vars, rdf_types.iter())
100                .map(|(variable, rdf_type)| {
101                    vec![TriplePattern {
102                        subject: variable.clone().into(),
103                        predicate: rdf_fusion_model::vocab::rdf::TYPE.into_owned().into(),
104                        object: rdf_type.clone().into(),
105                    }]
106                    .into_iter()
107                })
108                .flatten()
109                .collect::<Vec<_>>();
110
111            // Compute the label / comment results
112            let pattern = GraphPattern::Join {
113                left: Box::new(pattern.clone()),
114                right: Box::new(GraphPattern::Bgp {
115                    patterns: describe_pattern.clone(),
116                }),
117            };
118            let (stream, explanation) = Box::pin(graph_pattern_to_stream(
119                session_state,
120                builder_context,
121                query,
122                &pattern,
123                base_iri,
124            ))
125            .await?;
126
127            Ok((
128                QueryResults::Graph(QueryTripleStream::new(describe_pattern, stream)),
129                explanation,
130            ))
131        }
132    }
133}
134
135/// Converts a SPARQL graph pattern to a stream of query solutions.
136async fn graph_pattern_to_stream(
137    state: SessionState,
138    builder_context: RdfFusionLogicalPlanBuilderContext,
139    query: &Query,
140    pattern: &GraphPattern,
141    base_iri: &Option<Iri<String>>,
142) -> Result<(QuerySolutionStream, QueryExplanation), QueryEvaluationError> {
143    let task = state.task_ctx();
144
145    let (execution_plan, explanation) =
146        create_execution_plan(state, builder_context, &query.dataset, pattern, base_iri)
147            .await?;
148    let variables = create_variables(&execution_plan.schema());
149
150    let batch_record_stream = execute_stream(execution_plan, task)?;
151    let stream = QuerySolutionStream::try_new(variables, batch_record_stream)?;
152    Ok((stream, explanation))
153}
154
155/// Creates a physical execution plan from a SPARQL graph pattern, doing further processing on the
156/// resulting query plan (e.g., optimization).
157async fn create_execution_plan(
158    state: SessionState,
159    builder_context: RdfFusionLogicalPlanBuilderContext,
160    dataset: &QueryDataset,
161    pattern: &GraphPattern,
162    base_iri: &Option<Iri<String>>,
163) -> Result<(Arc<dyn ExecutionPlan>, QueryExplanation), QueryEvaluationError> {
164    let planning_time_start = Instant::now();
165    let logical_plan =
166        GraphPatternRewriter::new(builder_context, dataset.clone(), base_iri.clone())
167            .rewrite(pattern)
168            .map_err(|e| e.context("Cannot rewrite SPARQL query"))?;
169    let optimized_plan = state.optimize(&logical_plan)?;
170    let physical_plan = state
171        .query_planner()
172        .create_physical_plan(&optimized_plan, &state)
173        .await?;
174    let planning_time = planning_time_start.elapsed();
175
176    let explanation = QueryExplanation {
177        planning_time,
178        initial_logical_plan: logical_plan,
179        optimized_logical_plan: optimized_plan,
180        execution_plan: Arc::clone(&physical_plan),
181    };
182    Ok((Arc::clone(&physical_plan), explanation))
183}
184
185#[allow(clippy::expect_used)]
186fn create_variables(schema: &Schema) -> Arc<[Variable]> {
187    schema
188        .fields()
189        .iter()
190        .map(|f| Variable::new(f.name()).expect("Variables already checked."))
191        .collect::<Vec<_>>()
192        .into()
193}