rdf_fusion_execution/
engine.rs

1use crate::planner::RdfFusionPlanner;
2use crate::results::QueryResults;
3use crate::sparql::error::QueryEvaluationError;
4use crate::sparql::{
5    OptimizationLevel, Query, QueryExplanation, QueryOptions, create_optimizer_rules,
6    create_pyhsical_optimizer_rules, evaluate_query,
7};
8use datafusion::dataframe::DataFrame;
9use datafusion::error::DataFusionError;
10use datafusion::execution::runtime_env::RuntimeEnv;
11use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder};
12use datafusion::functions_aggregate::first_last::FirstValue;
13use datafusion::logical_expr::AggregateUDF;
14use datafusion::prelude::{SessionConfig, SessionContext};
15use rdf_fusion_encoding::plain_term::PLAIN_TERM_ENCODING;
16use rdf_fusion_encoding::sortable_term::SORTABLE_TERM_ENCODING;
17use rdf_fusion_encoding::typed_value::TYPED_VALUE_ENCODING;
18use rdf_fusion_encoding::{QuadStorageEncoding, RdfFusionEncodings};
19use rdf_fusion_extensions::RdfFusionContextView;
20use rdf_fusion_extensions::functions::{
21    RdfFusionFunctionRegistry, RdfFusionFunctionRegistryRef,
22};
23use rdf_fusion_extensions::storage::QuadStorage;
24use rdf_fusion_functions::registry::DefaultRdfFusionFunctionRegistry;
25use rdf_fusion_logical::{ActiveGraph, RdfFusionLogicalPlanBuilderContext};
26use rdf_fusion_model::{DFResult, NamedOrBlankNodeRef};
27use rdf_fusion_model::{GraphName, GraphNameRef, NamedNodeRef, QuadRef, TermRef};
28use std::sync::Arc;
29
30/// Represents a connection to an instance of an RDF Fusion engine.
31///
32/// An RDF Fusion instance consists of:
33/// - A [SessionContext]. This is the primary interaction point with DataFusion.
34/// - An [RdfFusionFunctionRegistry] that holds the currently registered RDF Fusion built-ins.
35/// - A reference to a quad storage.
36#[derive(Clone)]
37pub struct RdfFusionContext {
38    /// The DataFusion [SessionContext].
39    ctx: SessionContext,
40    /// Holds references to the registered built-in functions.
41    functions: RdfFusionFunctionRegistryRef,
42    /// Encoding configurations
43    encodings: RdfFusionEncodings,
44    /// The storage that backs this instance.
45    storage: Arc<dyn QuadStorage>,
46}
47
48impl RdfFusionContext {
49    /// Creates a new [RdfFusionContext] with the default configuration and the given `storage`.
50    pub fn new(
51        config: SessionConfig,
52        runtime_env: Arc<RuntimeEnv>,
53        storage: Arc<dyn QuadStorage>,
54    ) -> Self {
55        // TODO make a builder
56        let object_id_encoding = match storage.encoding() {
57            QuadStorageEncoding::PlainTerm => None,
58            QuadStorageEncoding::ObjectId(_) => {
59                assert!(storage.object_id_mapping().is_some());
60                storage.object_id_mapping()
61            }
62        };
63        let encodings = RdfFusionEncodings::new(
64            PLAIN_TERM_ENCODING,
65            TYPED_VALUE_ENCODING,
66            object_id_encoding,
67            SORTABLE_TERM_ENCODING,
68        );
69
70        let registry: Arc<dyn RdfFusionFunctionRegistry> =
71            Arc::new(DefaultRdfFusionFunctionRegistry::new(encodings.clone()));
72
73        let context_view = RdfFusionContextView::new(
74            Arc::clone(&registry),
75            encodings.clone(),
76            storage.encoding(),
77        );
78
79        let optimizer_rules =
80            create_optimizer_rules(context_view.clone(), OptimizationLevel::Full);
81        let physical_optimizer_rules =
82            create_pyhsical_optimizer_rules(OptimizationLevel::Full);
83
84        let state = SessionStateBuilder::new()
85            .with_query_planner(Arc::new(RdfFusionPlanner::new(
86                context_view,
87                Arc::clone(&storage),
88            )))
89            .with_aggregate_functions(vec![AggregateUDF::from(FirstValue::new()).into()])
90            .with_optimizer_rules(optimizer_rules)
91            .with_physical_optimizer_rules(physical_optimizer_rules)
92            .with_runtime_env(runtime_env)
93            .with_config(config)
94            .build();
95
96        let session_context = SessionContext::from(state);
97        Self {
98            ctx: session_context,
99            functions: registry,
100            encodings,
101            storage,
102        }
103    }
104
105    /// Creates a new [RdfFusionContextView] on this context. The resulting view should be passed
106    /// around in the RDF Fusion ecosystem to access the current configuration without directly
107    /// depending on the [RdfFusionContext].
108    pub fn create_view(&self) -> RdfFusionContextView {
109        RdfFusionContextView::new(
110            Arc::clone(&self.functions),
111            self.encodings.clone(),
112            self.storage.encoding(),
113        )
114    }
115
116    /// Provides a reference to the [SessionContext].
117    pub fn session_context(&self) -> &SessionContext {
118        &self.ctx
119    }
120
121    /// Returns a reference to the used [RdfFusionFunctionRegistry].
122    pub fn functions(&self) -> &RdfFusionFunctionRegistryRef {
123        &self.functions
124    }
125
126    /// Returns a reference to the used [RdfFusionEncodings].
127    pub fn encodings(&self) -> &RdfFusionEncodings {
128        &self.encodings
129    }
130
131    /// Provides access to the [QuadStorage] of this instance for writing operations.
132    pub fn storage(&self) -> &Arc<dyn QuadStorage> {
133        &self.storage
134    }
135
136    //
137    // Querying
138    //
139
140    /// Checks whether `quad` is contained in the instance.
141    pub async fn contains(&self, quad: &QuadRef<'_>) -> DFResult<bool> {
142        let active_graph_info = graph_name_to_active_graph(Some(quad.graph_name));
143        let pattern_plan = self.plan_builder_context().create_matching_quads(
144            active_graph_info,
145            Some(quad.subject.into_owned()),
146            Some(quad.predicate.into_owned()),
147            Some(quad.object.into_owned()),
148        );
149
150        let count = DataFrame::new(self.ctx.state(), pattern_plan.build()?)
151            .count()
152            .await?;
153
154        Ok(count > 0)
155    }
156
157    /// Used for obtaining a [RdfFusionLogicalPlanBuilderContext] for this RDF Fusion instance.
158    fn plan_builder_context(&self) -> RdfFusionLogicalPlanBuilderContext {
159        RdfFusionLogicalPlanBuilderContext::new(self.create_view())
160    }
161
162    /// Returns the number of quads in the instance.
163    pub async fn len(&self) -> DFResult<usize> {
164        self.storage
165            .len()
166            .await
167            .map_err(|err| DataFusionError::External(Box::new(err)))
168    }
169
170    /// Returns a stream of all quads that match the given pattern.
171    pub async fn quads_for_pattern(
172        &self,
173        graph_name: Option<GraphNameRef<'_>>,
174        subject: Option<NamedOrBlankNodeRef<'_>>,
175        predicate: Option<NamedNodeRef<'_>>,
176        object: Option<TermRef<'_>>,
177    ) -> DFResult<SendableRecordBatchStream> {
178        let active_graph_info = graph_name_to_active_graph(graph_name);
179        let pattern_plan = self
180            .plan_builder_context()
181            .create_matching_quads(
182                active_graph_info,
183                subject.map(NamedOrBlankNodeRef::into_owned),
184                predicate.map(NamedNodeRef::into_owned),
185                object.map(TermRef::into_owned),
186            )
187            .with_plain_terms()?;
188
189        let result = DataFrame::new(self.ctx.state(), pattern_plan.build()?)
190            .execute_stream()
191            .await?;
192        Ok(result)
193    }
194
195    /// Evaluates a SPARQL [Query] over the instance.
196    pub async fn execute_query(
197        &self,
198        query: &Query,
199        options: QueryOptions,
200    ) -> Result<(QueryResults, QueryExplanation), QueryEvaluationError> {
201        Box::pin(evaluate_query(
202            self,
203            self.plan_builder_context(),
204            query,
205            options,
206        ))
207        .await
208    }
209}
210
211fn graph_name_to_active_graph(graph_name: Option<GraphNameRef<'_>>) -> ActiveGraph {
212    let Some(graph_name) = graph_name else {
213        return ActiveGraph::AllGraphs;
214    };
215
216    match graph_name {
217        GraphNameRef::NamedNode(nn) => {
218            ActiveGraph::Union(vec![GraphName::NamedNode(nn.into_owned())])
219        }
220        GraphNameRef::BlankNode(bnode) => {
221            ActiveGraph::Union(vec![GraphName::BlankNode(bnode.into_owned())])
222        }
223        GraphNameRef::DefaultGraph => ActiveGraph::DefaultGraph,
224    }
225}