drasi_core/query/
query_builder.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::HashMap, sync::Arc};
16
17use drasi_query_ast::api::QueryParser;
18use drasi_query_cypher::CypherParser;
19
20use crate::{
21    evaluation::{
22        functions::{
23            future::RegisterFutureFunctions, past::RegisterPastFunctions, FunctionRegistry,
24        },
25        ExpressionEvaluator, QueryPartEvaluator,
26    },
27    in_memory_index::{
28        in_memory_element_index::InMemoryElementIndex, in_memory_future_queue::InMemoryFutureQueue,
29        in_memory_result_index::InMemoryResultIndex,
30    },
31    index_cache::shadowed_future_queue::ShadowedFutureQueue,
32    interface::{
33        ElementArchiveIndex, ElementIndex, FutureQueue, MiddlewareSetupError, QueryBuilderError,
34        ResultIndex,
35    },
36    middleware::{
37        MiddlewareContainer, MiddlewareTypeRegistry, SourceMiddlewarePipeline,
38        SourceMiddlewarePipelineCollection,
39    },
40    models::{QueryJoin, SourceMiddlewareConfig},
41    path_solver::{match_path::MatchPath, MatchPathSolver},
42};
43
44use super::ContinuousQuery;
45
46pub struct QueryBuilder {
47    function_registry: Option<Arc<FunctionRegistry>>,
48    expr_evaluator: Option<Arc<ExpressionEvaluator>>,
49    element_index: Option<Arc<dyn ElementIndex>>,
50    archive_index: Option<Arc<dyn ElementArchiveIndex>>,
51    result_index: Option<Arc<dyn ResultIndex>>,
52    future_queue: Option<Arc<dyn FutureQueue>>,
53    part_evaluator: Option<Arc<QueryPartEvaluator>>,
54    joins: Vec<Arc<QueryJoin>>,
55    middleware_registry: Option<Arc<MiddlewareTypeRegistry>>,
56    source_middleware: Vec<Arc<SourceMiddlewareConfig>>,
57    source_pipelines: HashMap<Arc<str>, Vec<Arc<str>>>,
58
59    query_source: String,
60    query_parser: Option<Arc<dyn QueryParser>>,
61}
62
63impl QueryBuilder {
64    pub fn new(query: impl Into<String>) -> Self {
65        QueryBuilder {
66            function_registry: None,
67            expr_evaluator: None,
68            element_index: None,
69            archive_index: None,
70            result_index: None,
71            future_queue: None,
72            part_evaluator: None,
73            joins: Vec::new(),
74            middleware_registry: None,
75            source_middleware: Vec::new(),
76            source_pipelines: HashMap::new(),
77            query_source: query.into(),
78            query_parser: None,
79        }
80    }
81
82    pub fn with_query_parser(mut self, query_parser: Arc<dyn QueryParser>) -> Self {
83        self.query_parser = Some(query_parser);
84        self
85    }
86
87    pub fn with_middleware_registry(
88        mut self,
89        middleware_registry: Arc<MiddlewareTypeRegistry>,
90    ) -> Self {
91        self.middleware_registry = Some(middleware_registry);
92        self
93    }
94
95    pub fn with_source_middleware(
96        mut self,
97        source_middleware: Arc<SourceMiddlewareConfig>,
98    ) -> Self {
99        self.source_middleware.push(source_middleware);
100        self
101    }
102
103    pub fn with_source_pipeline(mut self, source: impl Into<String>, pipeline: &[String]) -> Self {
104        let pipeline = pipeline.iter().map(|s| Arc::from(s.as_str())).collect();
105        self.source_pipelines
106            .insert(Arc::from(source.into()), pipeline);
107        self
108    }
109
110    pub fn with_join(mut self, join: QueryJoin) -> Self {
111        self.joins.push(Arc::new(join));
112        self
113    }
114
115    pub fn with_joins(mut self, joins: Vec<QueryJoin>) -> Self {
116        for join in joins {
117            self.joins.push(Arc::new(join));
118        }
119        self
120    }
121
122    pub fn with_function_registry(mut self, function_registry: Arc<FunctionRegistry>) -> Self {
123        self.function_registry = Some(function_registry);
124        self
125    }
126
127    pub fn with_element_index(mut self, element_index: Arc<dyn ElementIndex>) -> Self {
128        self.element_index = Some(element_index);
129        self
130    }
131
132    pub fn with_archive_index(mut self, archive_index: Arc<dyn ElementArchiveIndex>) -> Self {
133        self.archive_index = Some(archive_index);
134        self
135    }
136
137    pub fn with_result_index(mut self, accumulator_result_index: Arc<dyn ResultIndex>) -> Self {
138        self.result_index = Some(accumulator_result_index);
139        self
140    }
141
142    pub fn with_future_queue(mut self, future_queue: Arc<dyn FutureQueue>) -> Self {
143        self.future_queue = Some(future_queue);
144        self
145    }
146
147    pub fn get_joins(&self) -> &Vec<Arc<QueryJoin>> {
148        &self.joins
149    }
150
151    pub async fn build(self) -> ContinuousQuery {
152        self.try_build().await.unwrap()
153    }
154
155    pub async fn try_build(mut self) -> Result<ContinuousQuery, QueryBuilderError> {
156        let function_registry = match self.function_registry.take() {
157            Some(registry) => registry,
158            None => Arc::new(FunctionRegistry::new()),
159        };
160
161        let query_parser = match self.query_parser.take() {
162            Some(index) => index,
163            None => Arc::new(CypherParser::new(function_registry.clone())),
164        };
165
166        let query = query_parser.parse(self.query_source.as_str())?;
167        let match_path = Arc::new(MatchPath::from_query(&query.parts[0])?);
168
169        let element_index = match self.element_index.take() {
170            Some(index) => index,
171            None => Arc::new(InMemoryElementIndex::new()),
172        };
173
174        if let Some(archive_index) = self.archive_index.take() {
175            function_registry.register_past_functions(archive_index);
176        }
177
178        let result_index = match self.result_index.take() {
179            Some(index) => index,
180            None => Arc::new(InMemoryResultIndex::new()),
181        };
182
183        let future_queue = match self.future_queue.take() {
184            Some(queue) => queue,
185            None => Arc::new(InMemoryFutureQueue::new()),
186        };
187
188        let future_queue = Arc::new(ShadowedFutureQueue::new(future_queue));
189
190        let expr_evaluator = match self.expr_evaluator.take() {
191            Some(evaluator) => evaluator,
192            None => Arc::new(ExpressionEvaluator::new(
193                function_registry.clone(),
194                result_index.clone(),
195            )),
196        };
197
198        let part_evaluator = match self.part_evaluator.take() {
199            Some(evaluator) => evaluator,
200            None => Arc::new(QueryPartEvaluator::new(
201                expr_evaluator.clone(),
202                result_index.clone(),
203            )),
204        };
205
206        let path_solver = Arc::new(MatchPathSolver::new(element_index.clone()));
207
208        function_registry.register_future_functions(
209            future_queue.clone(),
210            result_index.clone(),
211            Arc::downgrade(&expr_evaluator.clone()),
212        );
213
214        let source_pipelines: SourceMiddlewarePipelineCollection = {
215            if self.source_middleware.is_empty() {
216                Ok(SourceMiddlewarePipelineCollection::new())
217            } else {
218                match self.middleware_registry.as_ref() {
219                    Some(registry) => {
220                        let container =
221                            MiddlewareContainer::new(registry, self.source_middleware.clone())?;
222                        let mut pipelines = SourceMiddlewarePipelineCollection::new();
223                        for (source_id, pipeline_keys) in self.source_pipelines.iter() {
224                            let pipeline =
225                                SourceMiddlewarePipeline::new(&container, pipeline_keys.clone())?;
226                            pipelines.insert(source_id.clone(), pipeline);
227                        }
228                        Ok(pipelines)
229                    }
230                    None => Err(MiddlewareSetupError::NoRegistry),
231                }
232            }
233        }?;
234
235        element_index.set_joins(&match_path, &self.joins).await;
236
237        Ok(ContinuousQuery::new(
238            Arc::new(query),
239            match_path,
240            expr_evaluator,
241            element_index,
242            path_solver,
243            part_evaluator,
244            future_queue,
245            source_pipelines,
246        ))
247    }
248}