rdf_fusion_execution/
engine.rs1use crate::planner::RdfFusionPlanner;
2use crate::results::QueryResults;
3use crate::sparql::error::QueryEvaluationError;
4use crate::sparql::{
5 OptimizationLevel, Query, QueryExplanation, QueryOptions, create_optimizer_rules,
6 create_pyhsical_optimizer_rules, evaluate_query,
7};
8use datafusion::dataframe::DataFrame;
9use datafusion::error::DataFusionError;
10use datafusion::execution::runtime_env::RuntimeEnv;
11use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder};
12use datafusion::functions_aggregate::first_last::FirstValue;
13use datafusion::logical_expr::AggregateUDF;
14use datafusion::prelude::{SessionConfig, SessionContext};
15use rdf_fusion_encoding::plain_term::PLAIN_TERM_ENCODING;
16use rdf_fusion_encoding::sortable_term::SORTABLE_TERM_ENCODING;
17use rdf_fusion_encoding::typed_value::TYPED_VALUE_ENCODING;
18use rdf_fusion_encoding::{QuadStorageEncoding, RdfFusionEncodings};
19use rdf_fusion_extensions::RdfFusionContextView;
20use rdf_fusion_extensions::functions::{
21 RdfFusionFunctionRegistry, RdfFusionFunctionRegistryRef,
22};
23use rdf_fusion_extensions::storage::QuadStorage;
24use rdf_fusion_functions::registry::DefaultRdfFusionFunctionRegistry;
25use rdf_fusion_logical::{ActiveGraph, RdfFusionLogicalPlanBuilderContext};
26use rdf_fusion_model::{DFResult, NamedOrBlankNodeRef};
27use rdf_fusion_model::{GraphName, GraphNameRef, NamedNodeRef, QuadRef, TermRef};
28use std::sync::Arc;
29
30#[derive(Clone)]
37pub struct RdfFusionContext {
38 ctx: SessionContext,
40 functions: RdfFusionFunctionRegistryRef,
42 encodings: RdfFusionEncodings,
44 storage: Arc<dyn QuadStorage>,
46}
47
48impl RdfFusionContext {
49 pub fn new(
51 config: SessionConfig,
52 runtime_env: Arc<RuntimeEnv>,
53 storage: Arc<dyn QuadStorage>,
54 ) -> Self {
55 let object_id_encoding = match storage.encoding() {
57 QuadStorageEncoding::PlainTerm => None,
58 QuadStorageEncoding::ObjectId(_) => {
59 assert!(storage.object_id_mapping().is_some());
60 storage.object_id_mapping()
61 }
62 };
63 let encodings = RdfFusionEncodings::new(
64 PLAIN_TERM_ENCODING,
65 TYPED_VALUE_ENCODING,
66 object_id_encoding,
67 SORTABLE_TERM_ENCODING,
68 );
69
70 let registry: Arc<dyn RdfFusionFunctionRegistry> =
71 Arc::new(DefaultRdfFusionFunctionRegistry::new(encodings.clone()));
72
73 let context_view = RdfFusionContextView::new(
74 Arc::clone(®istry),
75 encodings.clone(),
76 storage.encoding(),
77 );
78
79 let optimizer_rules =
80 create_optimizer_rules(context_view.clone(), OptimizationLevel::Full);
81 let physical_optimizer_rules =
82 create_pyhsical_optimizer_rules(OptimizationLevel::Full);
83
84 let state = SessionStateBuilder::new()
85 .with_query_planner(Arc::new(RdfFusionPlanner::new(
86 context_view,
87 Arc::clone(&storage),
88 )))
89 .with_aggregate_functions(vec![AggregateUDF::from(FirstValue::new()).into()])
90 .with_optimizer_rules(optimizer_rules)
91 .with_physical_optimizer_rules(physical_optimizer_rules)
92 .with_runtime_env(runtime_env)
93 .with_config(config)
94 .build();
95
96 let session_context = SessionContext::from(state);
97 Self {
98 ctx: session_context,
99 functions: registry,
100 encodings,
101 storage,
102 }
103 }
104
105 pub fn create_view(&self) -> RdfFusionContextView {
109 RdfFusionContextView::new(
110 Arc::clone(&self.functions),
111 self.encodings.clone(),
112 self.storage.encoding(),
113 )
114 }
115
116 pub fn session_context(&self) -> &SessionContext {
118 &self.ctx
119 }
120
121 pub fn functions(&self) -> &RdfFusionFunctionRegistryRef {
123 &self.functions
124 }
125
126 pub fn encodings(&self) -> &RdfFusionEncodings {
128 &self.encodings
129 }
130
131 pub fn storage(&self) -> &Arc<dyn QuadStorage> {
133 &self.storage
134 }
135
136 pub async fn contains(&self, quad: &QuadRef<'_>) -> DFResult<bool> {
142 let active_graph_info = graph_name_to_active_graph(Some(quad.graph_name));
143 let pattern_plan = self.plan_builder_context().create_matching_quads(
144 active_graph_info,
145 Some(quad.subject.into_owned()),
146 Some(quad.predicate.into_owned()),
147 Some(quad.object.into_owned()),
148 );
149
150 let count = DataFrame::new(self.ctx.state(), pattern_plan.build()?)
151 .count()
152 .await?;
153
154 Ok(count > 0)
155 }
156
157 fn plan_builder_context(&self) -> RdfFusionLogicalPlanBuilderContext {
159 RdfFusionLogicalPlanBuilderContext::new(self.create_view())
160 }
161
162 pub async fn len(&self) -> DFResult<usize> {
164 self.storage
165 .len()
166 .await
167 .map_err(|err| DataFusionError::External(Box::new(err)))
168 }
169
170 pub async fn quads_for_pattern(
172 &self,
173 graph_name: Option<GraphNameRef<'_>>,
174 subject: Option<NamedOrBlankNodeRef<'_>>,
175 predicate: Option<NamedNodeRef<'_>>,
176 object: Option<TermRef<'_>>,
177 ) -> DFResult<SendableRecordBatchStream> {
178 let active_graph_info = graph_name_to_active_graph(graph_name);
179 let pattern_plan = self
180 .plan_builder_context()
181 .create_matching_quads(
182 active_graph_info,
183 subject.map(NamedOrBlankNodeRef::into_owned),
184 predicate.map(NamedNodeRef::into_owned),
185 object.map(TermRef::into_owned),
186 )
187 .with_plain_terms()?;
188
189 let result = DataFrame::new(self.ctx.state(), pattern_plan.build()?)
190 .execute_stream()
191 .await?;
192 Ok(result)
193 }
194
195 pub async fn execute_query(
197 &self,
198 query: &Query,
199 options: QueryOptions,
200 ) -> Result<(QueryResults, QueryExplanation), QueryEvaluationError> {
201 Box::pin(evaluate_query(
202 self,
203 self.plan_builder_context(),
204 query,
205 options,
206 ))
207 .await
208 }
209}
210
211fn graph_name_to_active_graph(graph_name: Option<GraphNameRef<'_>>) -> ActiveGraph {
212 let Some(graph_name) = graph_name else {
213 return ActiveGraph::AllGraphs;
214 };
215
216 match graph_name {
217 GraphNameRef::NamedNode(nn) => {
218 ActiveGraph::Union(vec![GraphName::NamedNode(nn.into_owned())])
219 }
220 GraphNameRef::BlankNode(bnode) => {
221 ActiveGraph::Union(vec![GraphName::BlankNode(bnode.into_owned())])
222 }
223 GraphNameRef::DefaultGraph => ActiveGraph::DefaultGraph,
224 }
225}