1use std::{collections::HashMap, sync::Arc};
16
17use drasi_query_ast::api::QueryParser;
18use drasi_query_cypher::CypherParser;
19
20use crate::{
21 evaluation::{
22 functions::{
23 future::RegisterFutureFunctions, past::RegisterPastFunctions, FunctionRegistry,
24 },
25 ExpressionEvaluator, QueryPartEvaluator,
26 },
27 in_memory_index::{
28 in_memory_element_index::InMemoryElementIndex, in_memory_future_queue::InMemoryFutureQueue,
29 in_memory_result_index::InMemoryResultIndex,
30 },
31 index_cache::shadowed_future_queue::ShadowedFutureQueue,
32 interface::{
33 ElementArchiveIndex, ElementIndex, FutureQueue, MiddlewareSetupError, QueryBuilderError,
34 ResultIndex,
35 },
36 middleware::{
37 MiddlewareContainer, MiddlewareTypeRegistry, SourceMiddlewarePipeline,
38 SourceMiddlewarePipelineCollection,
39 },
40 models::{QueryJoin, SourceMiddlewareConfig},
41 path_solver::{match_path::MatchPath, MatchPathSolver},
42};
43
44use super::ContinuousQuery;
45
46pub struct QueryBuilder {
47 function_registry: Option<Arc<FunctionRegistry>>,
48 expr_evaluator: Option<Arc<ExpressionEvaluator>>,
49 element_index: Option<Arc<dyn ElementIndex>>,
50 archive_index: Option<Arc<dyn ElementArchiveIndex>>,
51 result_index: Option<Arc<dyn ResultIndex>>,
52 future_queue: Option<Arc<dyn FutureQueue>>,
53 part_evaluator: Option<Arc<QueryPartEvaluator>>,
54 joins: Vec<Arc<QueryJoin>>,
55 middleware_registry: Option<Arc<MiddlewareTypeRegistry>>,
56 source_middleware: Vec<Arc<SourceMiddlewareConfig>>,
57 source_pipelines: HashMap<Arc<str>, Vec<Arc<str>>>,
58
59 query_source: String,
60 query_parser: Option<Arc<dyn QueryParser>>,
61}
62
63impl QueryBuilder {
64 pub fn new(query: impl Into<String>) -> Self {
65 QueryBuilder {
66 function_registry: None,
67 expr_evaluator: None,
68 element_index: None,
69 archive_index: None,
70 result_index: None,
71 future_queue: None,
72 part_evaluator: None,
73 joins: Vec::new(),
74 middleware_registry: None,
75 source_middleware: Vec::new(),
76 source_pipelines: HashMap::new(),
77 query_source: query.into(),
78 query_parser: None,
79 }
80 }
81
82 pub fn with_query_parser(mut self, query_parser: Arc<dyn QueryParser>) -> Self {
83 self.query_parser = Some(query_parser);
84 self
85 }
86
87 pub fn with_middleware_registry(
88 mut self,
89 middleware_registry: Arc<MiddlewareTypeRegistry>,
90 ) -> Self {
91 self.middleware_registry = Some(middleware_registry);
92 self
93 }
94
95 pub fn with_source_middleware(
96 mut self,
97 source_middleware: Arc<SourceMiddlewareConfig>,
98 ) -> Self {
99 self.source_middleware.push(source_middleware);
100 self
101 }
102
103 pub fn with_source_pipeline(mut self, source: impl Into<String>, pipeline: &[String]) -> Self {
104 let pipeline = pipeline.iter().map(|s| Arc::from(s.as_str())).collect();
105 self.source_pipelines
106 .insert(Arc::from(source.into()), pipeline);
107 self
108 }
109
110 pub fn with_join(mut self, join: QueryJoin) -> Self {
111 self.joins.push(Arc::new(join));
112 self
113 }
114
115 pub fn with_joins(mut self, joins: Vec<QueryJoin>) -> Self {
116 for join in joins {
117 self.joins.push(Arc::new(join));
118 }
119 self
120 }
121
122 pub fn with_function_registry(mut self, function_registry: Arc<FunctionRegistry>) -> Self {
123 self.function_registry = Some(function_registry);
124 self
125 }
126
127 pub fn with_element_index(mut self, element_index: Arc<dyn ElementIndex>) -> Self {
128 self.element_index = Some(element_index);
129 self
130 }
131
132 pub fn with_archive_index(mut self, archive_index: Arc<dyn ElementArchiveIndex>) -> Self {
133 self.archive_index = Some(archive_index);
134 self
135 }
136
137 pub fn with_result_index(mut self, accumulator_result_index: Arc<dyn ResultIndex>) -> Self {
138 self.result_index = Some(accumulator_result_index);
139 self
140 }
141
142 pub fn with_future_queue(mut self, future_queue: Arc<dyn FutureQueue>) -> Self {
143 self.future_queue = Some(future_queue);
144 self
145 }
146
147 pub fn get_joins(&self) -> &Vec<Arc<QueryJoin>> {
148 &self.joins
149 }
150
151 pub async fn build(self) -> ContinuousQuery {
152 self.try_build().await.unwrap()
153 }
154
155 pub async fn try_build(mut self) -> Result<ContinuousQuery, QueryBuilderError> {
156 let function_registry = match self.function_registry.take() {
157 Some(registry) => registry,
158 None => Arc::new(FunctionRegistry::new()),
159 };
160
161 let query_parser = match self.query_parser.take() {
162 Some(index) => index,
163 None => Arc::new(CypherParser::new(function_registry.clone())),
164 };
165
166 let query = query_parser.parse(self.query_source.as_str())?;
167 let match_path = Arc::new(MatchPath::from_query(&query.parts[0])?);
168
169 let element_index = match self.element_index.take() {
170 Some(index) => index,
171 None => Arc::new(InMemoryElementIndex::new()),
172 };
173
174 if let Some(archive_index) = self.archive_index.take() {
175 function_registry.register_past_functions(archive_index);
176 }
177
178 let result_index = match self.result_index.take() {
179 Some(index) => index,
180 None => Arc::new(InMemoryResultIndex::new()),
181 };
182
183 let future_queue = match self.future_queue.take() {
184 Some(queue) => queue,
185 None => Arc::new(InMemoryFutureQueue::new()),
186 };
187
188 let future_queue = Arc::new(ShadowedFutureQueue::new(future_queue));
189
190 let expr_evaluator = match self.expr_evaluator.take() {
191 Some(evaluator) => evaluator,
192 None => Arc::new(ExpressionEvaluator::new(
193 function_registry.clone(),
194 result_index.clone(),
195 )),
196 };
197
198 let part_evaluator = match self.part_evaluator.take() {
199 Some(evaluator) => evaluator,
200 None => Arc::new(QueryPartEvaluator::new(
201 expr_evaluator.clone(),
202 result_index.clone(),
203 )),
204 };
205
206 let path_solver = Arc::new(MatchPathSolver::new(element_index.clone()));
207
208 function_registry.register_future_functions(
209 future_queue.clone(),
210 result_index.clone(),
211 Arc::downgrade(&expr_evaluator.clone()),
212 );
213
214 let source_pipelines: SourceMiddlewarePipelineCollection = {
215 if self.source_middleware.is_empty() {
216 Ok(SourceMiddlewarePipelineCollection::new())
217 } else {
218 match self.middleware_registry.as_ref() {
219 Some(registry) => {
220 let container =
221 MiddlewareContainer::new(registry, self.source_middleware.clone())?;
222 let mut pipelines = SourceMiddlewarePipelineCollection::new();
223 for (source_id, pipeline_keys) in self.source_pipelines.iter() {
224 let pipeline =
225 SourceMiddlewarePipeline::new(&container, pipeline_keys.clone())?;
226 pipelines.insert(source_id.clone(), pipeline);
227 }
228 Ok(pipelines)
229 }
230 None => Err(MiddlewareSetupError::NoRegistry),
231 }
232 }
233 }?;
234
235 element_index.set_joins(&match_path, &self.joins).await;
236
237 Ok(ContinuousQuery::new(
238 Arc::new(query),
239 match_path,
240 expr_evaluator,
241 element_index,
242 path_solver,
243 part_evaluator,
244 future_queue,
245 source_pipelines,
246 ))
247 }
248}