1use std::{collections::HashMap, sync::Arc};
16
17use drasi_query_ast::api::QueryParser;
18
19use crate::{
20 evaluation::{
21 functions::{
22 future::RegisterFutureFunctions, past::RegisterPastFunctions, FunctionRegistry,
23 },
24 ExpressionEvaluator, QueryPartEvaluator,
25 },
26 in_memory_index::{
27 in_memory_element_index::InMemoryElementIndex, in_memory_future_queue::InMemoryFutureQueue,
28 in_memory_result_index::InMemoryResultIndex,
29 },
30 index_cache::shadowed_future_queue::ShadowedFutureQueue,
31 interface::{
32 ElementArchiveIndex, ElementIndex, FutureQueue, MiddlewareSetupError, NoOpSessionControl,
33 QueryBuilderError, ResultIndex, SessionControl,
34 },
35 middleware::{
36 MiddlewareContainer, MiddlewareTypeRegistry, SourceMiddlewarePipeline,
37 SourceMiddlewarePipelineCollection,
38 },
39 models::{QueryJoin, SourceMiddlewareConfig},
40 path_solver::{match_path::MatchPath, MatchPathSolver},
41};
42
43use super::ContinuousQuery;
44
45pub struct QueryBuilder {
46 function_registry: Option<Arc<FunctionRegistry>>,
47 expr_evaluator: Option<Arc<ExpressionEvaluator>>,
48 element_index: Option<Arc<dyn ElementIndex>>,
49 archive_index: Option<Arc<dyn ElementArchiveIndex>>,
50 result_index: Option<Arc<dyn ResultIndex>>,
51 future_queue: Option<Arc<dyn FutureQueue>>,
52 part_evaluator: Option<Arc<QueryPartEvaluator>>,
53 joins: Vec<Arc<QueryJoin>>,
54 middleware_registry: Option<Arc<MiddlewareTypeRegistry>>,
55 source_middleware: Vec<Arc<SourceMiddlewareConfig>>,
56 source_pipelines: HashMap<Arc<str>, Vec<Arc<str>>>,
57 session_control: Option<Arc<dyn SessionControl>>,
58
59 query_source: String,
60 query_parser: Arc<dyn QueryParser>,
61}
62
63impl QueryBuilder {
64 pub fn new(query: impl Into<String>, parser: Arc<dyn QueryParser>) -> Self {
65 QueryBuilder {
66 function_registry: None,
67 expr_evaluator: None,
68 element_index: None,
69 archive_index: None,
70 result_index: None,
71 future_queue: None,
72 part_evaluator: None,
73 joins: Vec::new(),
74 middleware_registry: None,
75 source_middleware: Vec::new(),
76 source_pipelines: HashMap::new(),
77 session_control: None,
78 query_source: query.into(),
79 query_parser: parser,
80 }
81 }
82
83 pub fn with_middleware_registry(
84 mut self,
85 middleware_registry: Arc<MiddlewareTypeRegistry>,
86 ) -> Self {
87 self.middleware_registry = Some(middleware_registry);
88 self
89 }
90
91 pub fn with_source_middleware(
92 mut self,
93 source_middleware: Arc<SourceMiddlewareConfig>,
94 ) -> Self {
95 self.source_middleware.push(source_middleware);
96 self
97 }
98
99 pub fn with_source_pipeline(mut self, source: impl Into<String>, pipeline: &[String]) -> Self {
100 let pipeline = pipeline.iter().map(|s| Arc::from(s.as_str())).collect();
101 self.source_pipelines
102 .insert(Arc::from(source.into()), pipeline);
103 self
104 }
105
106 pub fn with_join(mut self, join: QueryJoin) -> Self {
107 self.joins.push(Arc::new(join));
108 self
109 }
110
111 pub fn with_joins(mut self, joins: Vec<QueryJoin>) -> Self {
112 for join in joins {
113 self.joins.push(Arc::new(join));
114 }
115 self
116 }
117
118 pub fn with_function_registry(mut self, function_registry: Arc<FunctionRegistry>) -> Self {
119 self.function_registry = Some(function_registry);
120 self
121 }
122
123 pub fn with_element_index(mut self, element_index: Arc<dyn ElementIndex>) -> Self {
124 self.element_index = Some(element_index);
125 self
126 }
127
128 pub fn with_archive_index(mut self, archive_index: Arc<dyn ElementArchiveIndex>) -> Self {
129 self.archive_index = Some(archive_index);
130 self
131 }
132
133 pub fn with_result_index(mut self, accumulator_result_index: Arc<dyn ResultIndex>) -> Self {
134 self.result_index = Some(accumulator_result_index);
135 self
136 }
137
138 pub fn with_future_queue(mut self, future_queue: Arc<dyn FutureQueue>) -> Self {
139 self.future_queue = Some(future_queue);
140 self
141 }
142
143 pub fn with_session_control(mut self, sc: Arc<dyn SessionControl>) -> Self {
144 self.session_control = Some(sc);
145 self
146 }
147
148 pub fn get_joins(&self) -> &Vec<Arc<QueryJoin>> {
149 &self.joins
150 }
151
152 pub async fn build(self) -> ContinuousQuery {
153 self.try_build().await.unwrap()
154 }
155
156 pub async fn try_build(mut self) -> Result<ContinuousQuery, QueryBuilderError> {
157 let function_registry = match self.function_registry.take() {
158 Some(registry) => registry,
159 None => Arc::new(FunctionRegistry::new()),
160 };
161
162 let query = self.query_parser.parse(self.query_source.as_str())?;
163
164 let match_path = Arc::new(MatchPath::from_query(&query.parts[0])?);
165
166 let element_index = match self.element_index.take() {
167 Some(index) => index,
168 None => Arc::new(InMemoryElementIndex::new()),
169 };
170
171 if let Some(archive_index) = self.archive_index.take() {
172 function_registry.register_past_functions(archive_index);
173 }
174
175 let result_index = match self.result_index.take() {
176 Some(index) => index,
177 None => Arc::new(InMemoryResultIndex::new()),
178 };
179
180 let future_queue = match self.future_queue.take() {
181 Some(queue) => queue,
182 None => Arc::new(InMemoryFutureQueue::new()),
183 };
184
185 let future_queue = Arc::new(ShadowedFutureQueue::new(future_queue));
186
187 let expr_evaluator = match self.expr_evaluator.take() {
188 Some(evaluator) => evaluator,
189 None => Arc::new(ExpressionEvaluator::new(
190 function_registry.clone(),
191 result_index.clone(),
192 )),
193 };
194
195 let part_evaluator = match self.part_evaluator.take() {
196 Some(evaluator) => evaluator,
197 None => Arc::new(QueryPartEvaluator::new(
198 expr_evaluator.clone(),
199 result_index.clone(),
200 )),
201 };
202
203 let path_solver = Arc::new(MatchPathSolver::new(element_index.clone()));
204
205 function_registry.register_future_functions(
206 future_queue.clone(),
207 result_index.clone(),
208 Arc::downgrade(&expr_evaluator.clone()),
209 );
210
211 let source_pipelines: SourceMiddlewarePipelineCollection = {
212 if self.source_middleware.is_empty() {
213 Ok(SourceMiddlewarePipelineCollection::new())
214 } else {
215 match self.middleware_registry.as_ref() {
216 Some(registry) => {
217 let container =
218 MiddlewareContainer::new(registry, self.source_middleware.clone())?;
219 let mut pipelines = SourceMiddlewarePipelineCollection::new();
220 for (source_id, pipeline_keys) in self.source_pipelines.iter() {
221 let pipeline =
222 SourceMiddlewarePipeline::new(&container, pipeline_keys.clone())?;
223 pipelines.insert(source_id.clone(), pipeline);
224 }
225 Ok(pipelines)
226 }
227 None => Err(MiddlewareSetupError::NoRegistry),
228 }
229 }
230 }?;
231
232 let session_control = match self.session_control.take() {
233 Some(sc) => sc,
234 None => Arc::new(NoOpSessionControl),
235 };
236
237 element_index.set_joins(&match_path, &self.joins).await;
238
239 Ok(ContinuousQuery::new(
240 Arc::new(query),
241 match_path,
242 expr_evaluator,
243 element_index,
244 path_solver,
245 part_evaluator,
246 future_queue,
247 source_pipelines,
248 session_control,
249 ))
250 }
251}