rdf_fusion_execution/sparql/
eval.rs1use crate::RdfFusionContext;
2use crate::results::{QueryResults, QuerySolutionStream, QueryTripleStream};
3use crate::sparql::error::QueryEvaluationError;
4use crate::sparql::optimizer::{create_optimizer_rules, create_pyhsical_optimizer_rules};
5use crate::sparql::rewriting::GraphPatternRewriter;
6use crate::sparql::{Query, QueryDataset, QueryExplanation, QueryOptions};
7use datafusion::arrow::datatypes::Schema;
8use datafusion::common::instant::Instant;
9use datafusion::execution::{SessionState, SessionStateBuilder};
10use datafusion::physical_plan::{ExecutionPlan, execute_stream};
11use futures::StreamExt;
12use itertools::izip;
13use rdf_fusion_logical::RdfFusionLogicalPlanBuilderContext;
14use rdf_fusion_model::Iri;
15use rdf_fusion_model::Variable;
16use spargebra::algebra::GraphPattern;
17use spargebra::term::TriplePattern;
18use std::sync::Arc;
19
20pub async fn evaluate_query(
25 ctx: &RdfFusionContext,
26 builder_context: RdfFusionLogicalPlanBuilderContext,
27 query: &Query,
28 options: QueryOptions,
29) -> Result<(QueryResults, QueryExplanation), QueryEvaluationError> {
30 let session_state = SessionStateBuilder::from(ctx.session_context().state())
31 .with_optimizer_rules(create_optimizer_rules(
32 ctx.create_view(),
33 options.optimization_level,
34 ))
35 .with_physical_optimizer_rules(create_pyhsical_optimizer_rules(
36 options.optimization_level,
37 ))
38 .build();
39
40 match &query.inner {
41 spargebra::Query::Select {
42 pattern, base_iri, ..
43 } => {
44 let (stream, explanation) = Box::pin(graph_pattern_to_stream(
45 session_state,
46 builder_context,
47 query,
48 pattern,
49 base_iri,
50 ))
51 .await?;
52 Ok((QueryResults::Solutions(stream), explanation))
53 }
54 spargebra::Query::Construct {
55 template,
56 pattern,
57 base_iri,
58 ..
59 } => {
60 let (stream, explanation) = Box::pin(graph_pattern_to_stream(
61 session_state,
62 builder_context,
63 query,
64 pattern,
65 base_iri,
66 ))
67 .await?;
68 Ok((
69 QueryResults::Graph(QueryTripleStream::new(template.clone(), stream)),
70 explanation,
71 ))
72 }
73 spargebra::Query::Ask {
74 pattern, base_iri, ..
75 } => {
76 let (mut stream, explanation) = Box::pin(graph_pattern_to_stream(
77 session_state,
78 builder_context,
79 query,
80 pattern,
81 base_iri,
82 ))
83 .await?;
84 let count = stream.next().await;
85 Ok((QueryResults::Boolean(count.is_some()), explanation))
86 }
87 spargebra::Query::Describe {
88 pattern, base_iri, ..
89 } => {
90 let mut vars = Vec::new();
93 pattern.on_in_scope_variable(|v| vars.push(v.clone()));
94 let rdf_types = vars
95 .iter()
96 .map(|v| Variable::new(format!("{}__type", v.as_str())).unwrap())
97 .collect::<Vec<_>>();
98
99 let describe_pattern = izip!(vars, rdf_types.iter())
100 .map(|(variable, rdf_type)| {
101 vec![TriplePattern {
102 subject: variable.clone().into(),
103 predicate: rdf_fusion_model::vocab::rdf::TYPE.into_owned().into(),
104 object: rdf_type.clone().into(),
105 }]
106 .into_iter()
107 })
108 .flatten()
109 .collect::<Vec<_>>();
110
111 let pattern = GraphPattern::Join {
113 left: Box::new(pattern.clone()),
114 right: Box::new(GraphPattern::Bgp {
115 patterns: describe_pattern.clone(),
116 }),
117 };
118 let (stream, explanation) = Box::pin(graph_pattern_to_stream(
119 session_state,
120 builder_context,
121 query,
122 &pattern,
123 base_iri,
124 ))
125 .await?;
126
127 Ok((
128 QueryResults::Graph(QueryTripleStream::new(describe_pattern, stream)),
129 explanation,
130 ))
131 }
132 }
133}
134
135async fn graph_pattern_to_stream(
137 state: SessionState,
138 builder_context: RdfFusionLogicalPlanBuilderContext,
139 query: &Query,
140 pattern: &GraphPattern,
141 base_iri: &Option<Iri<String>>,
142) -> Result<(QuerySolutionStream, QueryExplanation), QueryEvaluationError> {
143 let task = state.task_ctx();
144
145 let (execution_plan, explanation) =
146 create_execution_plan(state, builder_context, &query.dataset, pattern, base_iri)
147 .await?;
148 let variables = create_variables(&execution_plan.schema());
149
150 let batch_record_stream = execute_stream(execution_plan, task)?;
151 let stream = QuerySolutionStream::try_new(variables, batch_record_stream)?;
152 Ok((stream, explanation))
153}
154
155async fn create_execution_plan(
158 state: SessionState,
159 builder_context: RdfFusionLogicalPlanBuilderContext,
160 dataset: &QueryDataset,
161 pattern: &GraphPattern,
162 base_iri: &Option<Iri<String>>,
163) -> Result<(Arc<dyn ExecutionPlan>, QueryExplanation), QueryEvaluationError> {
164 let planning_time_start = Instant::now();
165 let logical_plan =
166 GraphPatternRewriter::new(builder_context, dataset.clone(), base_iri.clone())
167 .rewrite(pattern)
168 .map_err(|e| e.context("Cannot rewrite SPARQL query"))?;
169 let optimized_plan = state.optimize(&logical_plan)?;
170 let physical_plan = state
171 .query_planner()
172 .create_physical_plan(&optimized_plan, &state)
173 .await?;
174 let planning_time = planning_time_start.elapsed();
175
176 let explanation = QueryExplanation {
177 planning_time,
178 initial_logical_plan: logical_plan,
179 optimized_logical_plan: optimized_plan,
180 execution_plan: Arc::clone(&physical_plan),
181 };
182 Ok((Arc::clone(&physical_plan), explanation))
183}
184
185#[allow(clippy::expect_used)]
186fn create_variables(schema: &Schema) -> Arc<[Variable]> {
187 schema
188 .fields()
189 .iter()
190 .map(|f| Variable::new(f.name()).expect("Variables already checked."))
191 .collect::<Vec<_>>()
192 .into()
193}