Skip to main content

drasi_core/query/
query_builder.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::HashMap, sync::Arc};
16
17use drasi_query_ast::api::QueryParser;
18
19use crate::{
20    evaluation::{
21        functions::{
22            future::RegisterFutureFunctions, past::RegisterPastFunctions, FunctionRegistry,
23        },
24        ExpressionEvaluator, QueryPartEvaluator,
25    },
26    in_memory_index::{
27        in_memory_element_index::InMemoryElementIndex, in_memory_future_queue::InMemoryFutureQueue,
28        in_memory_result_index::InMemoryResultIndex,
29    },
30    index_cache::shadowed_future_queue::ShadowedFutureQueue,
31    interface::{
32        ElementArchiveIndex, ElementIndex, FutureQueue, MiddlewareSetupError, NoOpSessionControl,
33        QueryBuilderError, ResultIndex, SessionControl,
34    },
35    middleware::{
36        MiddlewareContainer, MiddlewareTypeRegistry, SourceMiddlewarePipeline,
37        SourceMiddlewarePipelineCollection,
38    },
39    models::{QueryJoin, SourceMiddlewareConfig},
40    path_solver::{match_path::MatchPath, MatchPathSolver},
41};
42
43use super::ContinuousQuery;
44
45pub struct QueryBuilder {
46    function_registry: Option<Arc<FunctionRegistry>>,
47    expr_evaluator: Option<Arc<ExpressionEvaluator>>,
48    element_index: Option<Arc<dyn ElementIndex>>,
49    archive_index: Option<Arc<dyn ElementArchiveIndex>>,
50    result_index: Option<Arc<dyn ResultIndex>>,
51    future_queue: Option<Arc<dyn FutureQueue>>,
52    part_evaluator: Option<Arc<QueryPartEvaluator>>,
53    joins: Vec<Arc<QueryJoin>>,
54    middleware_registry: Option<Arc<MiddlewareTypeRegistry>>,
55    source_middleware: Vec<Arc<SourceMiddlewareConfig>>,
56    source_pipelines: HashMap<Arc<str>, Vec<Arc<str>>>,
57    session_control: Option<Arc<dyn SessionControl>>,
58
59    query_source: String,
60    query_parser: Arc<dyn QueryParser>,
61}
62
63impl QueryBuilder {
64    pub fn new(query: impl Into<String>, parser: Arc<dyn QueryParser>) -> Self {
65        QueryBuilder {
66            function_registry: None,
67            expr_evaluator: None,
68            element_index: None,
69            archive_index: None,
70            result_index: None,
71            future_queue: None,
72            part_evaluator: None,
73            joins: Vec::new(),
74            middleware_registry: None,
75            source_middleware: Vec::new(),
76            source_pipelines: HashMap::new(),
77            session_control: None,
78            query_source: query.into(),
79            query_parser: parser,
80        }
81    }
82
83    pub fn with_middleware_registry(
84        mut self,
85        middleware_registry: Arc<MiddlewareTypeRegistry>,
86    ) -> Self {
87        self.middleware_registry = Some(middleware_registry);
88        self
89    }
90
91    pub fn with_source_middleware(
92        mut self,
93        source_middleware: Arc<SourceMiddlewareConfig>,
94    ) -> Self {
95        self.source_middleware.push(source_middleware);
96        self
97    }
98
99    pub fn with_source_pipeline(mut self, source: impl Into<String>, pipeline: &[String]) -> Self {
100        let pipeline = pipeline.iter().map(|s| Arc::from(s.as_str())).collect();
101        self.source_pipelines
102            .insert(Arc::from(source.into()), pipeline);
103        self
104    }
105
106    pub fn with_join(mut self, join: QueryJoin) -> Self {
107        self.joins.push(Arc::new(join));
108        self
109    }
110
111    pub fn with_joins(mut self, joins: Vec<QueryJoin>) -> Self {
112        for join in joins {
113            self.joins.push(Arc::new(join));
114        }
115        self
116    }
117
118    pub fn with_function_registry(mut self, function_registry: Arc<FunctionRegistry>) -> Self {
119        self.function_registry = Some(function_registry);
120        self
121    }
122
123    pub fn with_element_index(mut self, element_index: Arc<dyn ElementIndex>) -> Self {
124        self.element_index = Some(element_index);
125        self
126    }
127
128    pub fn with_archive_index(mut self, archive_index: Arc<dyn ElementArchiveIndex>) -> Self {
129        self.archive_index = Some(archive_index);
130        self
131    }
132
133    pub fn with_result_index(mut self, accumulator_result_index: Arc<dyn ResultIndex>) -> Self {
134        self.result_index = Some(accumulator_result_index);
135        self
136    }
137
138    pub fn with_future_queue(mut self, future_queue: Arc<dyn FutureQueue>) -> Self {
139        self.future_queue = Some(future_queue);
140        self
141    }
142
143    pub fn with_session_control(mut self, sc: Arc<dyn SessionControl>) -> Self {
144        self.session_control = Some(sc);
145        self
146    }
147
148    pub fn get_joins(&self) -> &Vec<Arc<QueryJoin>> {
149        &self.joins
150    }
151
152    pub async fn build(self) -> ContinuousQuery {
153        self.try_build().await.unwrap()
154    }
155
156    pub async fn try_build(mut self) -> Result<ContinuousQuery, QueryBuilderError> {
157        let function_registry = match self.function_registry.take() {
158            Some(registry) => registry,
159            None => Arc::new(FunctionRegistry::new()),
160        };
161
162        let query = self.query_parser.parse(self.query_source.as_str())?;
163
164        let match_path = Arc::new(MatchPath::from_query(&query.parts[0])?);
165
166        let element_index = match self.element_index.take() {
167            Some(index) => index,
168            None => Arc::new(InMemoryElementIndex::new()),
169        };
170
171        if let Some(archive_index) = self.archive_index.take() {
172            function_registry.register_past_functions(archive_index);
173        }
174
175        let result_index = match self.result_index.take() {
176            Some(index) => index,
177            None => Arc::new(InMemoryResultIndex::new()),
178        };
179
180        let future_queue = match self.future_queue.take() {
181            Some(queue) => queue,
182            None => Arc::new(InMemoryFutureQueue::new()),
183        };
184
185        let future_queue = Arc::new(ShadowedFutureQueue::new(future_queue));
186
187        let expr_evaluator = match self.expr_evaluator.take() {
188            Some(evaluator) => evaluator,
189            None => Arc::new(ExpressionEvaluator::new(
190                function_registry.clone(),
191                result_index.clone(),
192            )),
193        };
194
195        let part_evaluator = match self.part_evaluator.take() {
196            Some(evaluator) => evaluator,
197            None => Arc::new(QueryPartEvaluator::new(
198                expr_evaluator.clone(),
199                result_index.clone(),
200            )),
201        };
202
203        let path_solver = Arc::new(MatchPathSolver::new(element_index.clone()));
204
205        function_registry.register_future_functions(
206            future_queue.clone(),
207            result_index.clone(),
208            Arc::downgrade(&expr_evaluator.clone()),
209        );
210
211        let source_pipelines: SourceMiddlewarePipelineCollection = {
212            if self.source_middleware.is_empty() {
213                Ok(SourceMiddlewarePipelineCollection::new())
214            } else {
215                match self.middleware_registry.as_ref() {
216                    Some(registry) => {
217                        let container =
218                            MiddlewareContainer::new(registry, self.source_middleware.clone())?;
219                        let mut pipelines = SourceMiddlewarePipelineCollection::new();
220                        for (source_id, pipeline_keys) in self.source_pipelines.iter() {
221                            let pipeline =
222                                SourceMiddlewarePipeline::new(&container, pipeline_keys.clone())?;
223                            pipelines.insert(source_id.clone(), pipeline);
224                        }
225                        Ok(pipelines)
226                    }
227                    None => Err(MiddlewareSetupError::NoRegistry),
228                }
229            }
230        }?;
231
232        let session_control = match self.session_control.take() {
233            Some(sc) => sc,
234            None => Arc::new(NoOpSessionControl),
235        };
236
237        element_index.set_joins(&match_path, &self.joins).await;
238
239        Ok(ContinuousQuery::new(
240            Arc::new(query),
241            match_path,
242            expr_evaluator,
243            element_index,
244            path_solver,
245            part_evaluator,
246            future_queue,
247            source_pipelines,
248            session_control,
249        ))
250    }
251}