1use std::sync::Arc;
2
3use teaql_core::{
4 AggregationCacheOptions, DeleteCommand, Entity, InsertCommand, Record, RecoverCommand,
5 RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
6};
7use teaql_sql::{CompiledQuery, SqlDialect};
8
9use crate::{
10 CheckObjectStatus, EntityEvent, RepositoryBehavior, RepositoryError, RuntimeError,
11 SqlLogOperation, clear_record_status, mark_record_status,
12};
13
14use super::{
15 AggregationCacheBackend, ContextRepository, InMemoryAggregationCache, QueryExecutor,
16 ResolvedRepository, UserContextMetadata, helpers::*,
17};
18
19impl<'a, D, E> ResolvedRepository<'a, D, E>
20where
21 D: SqlDialect,
22 E: QueryExecutor,
23{
24 pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
25 self.repository.metadata.context.repository_behavior(entity)
26 }
27
28 pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
29 self.repository
30 .metadata
31 .context
32 .repository_behavior(&self.entity)
33 }
34
35 pub fn entity(&self) -> &str {
36 &self.entity
37 }
38
39 pub fn select(&self) -> SelectQuery {
40 SelectQuery::new(self.entity.clone())
41 }
42
43 pub fn insert_command(&self) -> InsertCommand {
44 InsertCommand::new(self.entity.clone())
45 }
46
47 pub fn prepare_insert_command(
48 &self,
49 command: &InsertCommand,
50 ) -> Result<InsertCommand, RuntimeError> {
51 let mut command = command.clone();
52 if let Some(behavior) = self.behavior() {
53 behavior.before_insert(self.repository.metadata.context, &mut command)?;
54 }
55
56 let entity = self
57 .repository
58 .metadata
59 .context
60 .require_entity(&command.entity)?;
61 if let Some(id_property) = entity.id_property() {
62 let needs_id = !command.values.contains_key(&id_property.name)
63 || matches!(command.values.get(&id_property.name), Some(Value::Null));
64 if needs_id {
65 let id = self.repository.metadata.context.next_id(&command.entity)?;
66 command
67 .values
68 .insert(id_property.name.clone(), Value::U64(id));
69 }
70 }
71 mark_record_status(&mut command.values, CheckObjectStatus::Create);
72 let check_result = self
73 .repository
74 .metadata
75 .context
76 .check_and_fix_record(&command.entity, &mut command.values);
77 clear_record_status(&mut command.values);
78 check_result?;
79
80 Ok(command)
81 }
82
83 pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
84 UpdateCommand::new(self.entity.clone(), id)
85 }
86
87 pub fn prepare_update_command(
88 &self,
89 command: &UpdateCommand,
90 ) -> Result<UpdateCommand, RuntimeError> {
91 let mut command = command.clone();
92 if let Some(behavior) = self.behavior() {
93 behavior.before_update(self.repository.metadata.context, &mut command)?;
94 }
95 mark_record_status(&mut command.values, CheckObjectStatus::Update);
96 let check_result = self
97 .repository
98 .metadata
99 .context
100 .check_and_fix_record(&command.entity, &mut command.values);
101 clear_record_status(&mut command.values);
102 check_result?;
103 Ok(command)
104 }
105
106 pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
107 DeleteCommand::new(self.entity.clone(), id)
108 }
109
110 pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
111 RecoverCommand::new(self.entity.clone(), id, expected_version)
112 }
113
114 pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
115 let mut query = query.clone();
116 if let Some(behavior) = self.query_behavior(&query.entity) {
117 behavior.before_select(self.repository.metadata.context, &mut query)?;
118 }
119 self.repository.compile(&query)
120 }
121
122 pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
123 let mut query = query.clone();
124 if let Some(behavior) = self.query_behavior(&query.entity) {
125 behavior
126 .before_select(self.repository.metadata.context, &mut query)
127 .map_err(RepositoryError::Runtime)?;
128 }
129 let mut rows = self.fetch_prepared_query(&query)?;
130 self.enhance_object_group_bys(&mut rows, &query.object_group_bys)?;
131 self.enhance_child_queries(&mut rows, &query.child_enhancements)?;
132 Ok(rows)
133 }
134
135 fn fetch_prepared_query(
136 &self,
137 query: &SelectQuery,
138 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
139 let compiled = self
140 .repository
141 .compile(query)
142 .map_err(RepositoryError::Runtime)?;
143 if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
144 if let Some(cache) = self
145 .repository
146 .metadata
147 .context
148 .get_resource::<Arc<dyn AggregationCacheBackend>>()
149 {
150 return self.fetch_prepared_query_with_cache(
151 query,
152 &compiled,
153 options,
154 cache.as_ref(),
155 );
156 }
157 if let Some(cache) = self
158 .repository
159 .metadata
160 .context
161 .get_resource::<InMemoryAggregationCache>()
162 {
163 return self.fetch_prepared_query_with_cache(query, &compiled, options, cache);
164 }
165 }
166 self.repository.log_sql(SqlLogOperation::Select, &compiled);
167 self.repository
168 .executor
169 .fetch_all(&compiled)
170 .map_err(RepositoryError::Executor)
171 }
172
173 fn fetch_prepared_query_with_cache(
174 &self,
175 query: &SelectQuery,
176 compiled: &CompiledQuery,
177 options: AggregationCacheOptions,
178 cache: &dyn AggregationCacheBackend,
179 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
180 let key = aggregation_cache_key(
181 cache.namespace(),
182 &aggregation_cache_namespace(&query.entity),
183 compiled,
184 );
185 if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
186 return Ok(rows);
187 }
188 self.repository.log_sql(SqlLogOperation::Select, compiled);
189 let rows = self
190 .repository
191 .executor
192 .fetch_all(compiled)
193 .map_err(RepositoryError::Executor)?;
194 cache.put(key, rows.clone());
195 Ok(rows)
196 }
197
198 pub fn fetch_all_with_relation_aggregates(
199 &self,
200 query: &SelectQuery,
201 relation_aggregates: &[RelationAggregate],
202 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
203 let mut rows = self.fetch_all(query)?;
204 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
205 Ok(rows)
206 }
207
208 pub fn fetch_smart_list(
209 &self,
210 query: &SelectQuery,
211 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
212 let mut query = query.clone();
213 if let Some(behavior) = self.query_behavior(&query.entity) {
214 behavior
215 .before_select(self.repository.metadata.context, &mut query)
216 .map_err(RepositoryError::Runtime)?;
217 }
218 self.repository.fetch_smart_list(&query)
219 }
220
221 pub fn fetch_smart_list_with_relation_aggregates(
222 &self,
223 query: &SelectQuery,
224 relation_aggregates: &[RelationAggregate],
225 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
226 self.fetch_all_with_relation_aggregates(query, relation_aggregates)
227 .map(SmartList::from)
228 }
229
230 pub fn fetch_entities<T>(
231 &self,
232 query: &SelectQuery,
233 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
234 where
235 T: Entity,
236 {
237 let mut query = query.clone();
238 if let Some(behavior) = self.query_behavior(&query.entity) {
239 behavior
240 .before_select(self.repository.metadata.context, &mut query)
241 .map_err(RepositoryError::Runtime)?;
242 }
243 self.repository.fetch_entities(&query)
244 }
245
246 pub fn fetch_entities_with_relation_aggregates<T>(
247 &self,
248 query: &SelectQuery,
249 relation_aggregates: &[RelationAggregate],
250 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
251 where
252 T: Entity,
253 {
254 self.fetch_all_with_relation_aggregates(query, relation_aggregates)?
255 .into_iter()
256 .map(T::from_record)
257 .collect::<Result<Vec<_>, _>>()
258 .map(SmartList::from)
259 .map_err(RepositoryError::Entity)
260 }
261
262 pub fn fetch_enhanced_entities_with_relation_aggregates<T>(
263 &self,
264 query: &SelectQuery,
265 relation_aggregates: &[RelationAggregate],
266 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
267 where
268 T: Entity,
269 {
270 let mut query = query.clone();
271 if let Some(behavior) = self.query_behavior(&query.entity) {
272 behavior
273 .before_select(self.repository.metadata.context, &mut query)
274 .map_err(RepositoryError::Runtime)?;
275 }
276
277 let mut rows = self.repository.fetch_all(&query)?;
278 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
279 self.enhance_query_relations(&mut rows, &query)?;
280 self.enhance_relations(&mut rows)?;
281 rows.into_iter()
282 .map(T::from_record)
283 .collect::<Result<Vec<_>, _>>()
284 .map(SmartList::from)
285 .map_err(RepositoryError::Entity)
286 }
287
288 pub fn fetch_enhanced_entities<T>(
289 &self,
290 query: &SelectQuery,
291 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
292 where
293 T: Entity,
294 {
295 let mut query = query.clone();
296 if let Some(behavior) = self.query_behavior(&query.entity) {
297 behavior
298 .before_select(self.repository.metadata.context, &mut query)
299 .map_err(RepositoryError::Runtime)?;
300 }
301
302 let mut rows = self.repository.fetch_all(&query)?;
303 self.enhance_query_relations(&mut rows, &query)?;
304 self.enhance_relations(&mut rows)?;
305 rows.into_iter()
306 .map(T::from_record)
307 .collect::<Result<Vec<_>, _>>()
308 .map(SmartList::from)
309 .map_err(RepositoryError::Entity)
310 }
311
312 pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
313 let command = self
314 .prepare_insert_command(command)
315 .map_err(RepositoryError::Runtime)?;
316 self.execute_prepared_insert(command)
317 }
318
319 pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
320 let command = self
321 .prepare_update_command(command)
322 .map_err(RepositoryError::Runtime)?;
323 self.execute_prepared_update(command)
324 }
325
326 pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
327 let mut command = command.clone();
328 if let Some(behavior) = self.behavior() {
329 behavior
330 .before_delete(self.repository.metadata.context, &mut command)
331 .map_err(RepositoryError::Runtime)?;
332 }
333 let affected = self.repository.delete(&command)?;
334 self.emit_event(EntityEvent::deleted(
335 command.entity,
336 command.id,
337 command.expected_version,
338 ))
339 .map_err(RepositoryError::Runtime)?;
340 Ok(affected)
341 }
342
343 pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
344 let mut command = command.clone();
345 if let Some(behavior) = self.behavior() {
346 behavior
347 .before_recover(self.repository.metadata.context, &mut command)
348 .map_err(RepositoryError::Runtime)?;
349 }
350 let affected = self.repository.recover(&command)?;
351 self.emit_event(EntityEvent::recovered(
352 command.entity,
353 command.id,
354 command.expected_version,
355 ))
356 .map_err(RepositoryError::Runtime)?;
357 Ok(affected)
358 }
359
360 fn emit_event(&self, event: EntityEvent) -> Result<(), RuntimeError> {
361 self.repository.metadata.context.send_event(event)
362 }
363
364 pub(super) fn execute_prepared_insert(
365 &self,
366 command: InsertCommand,
367 ) -> Result<u64, RepositoryError<E::Error>> {
368 let affected = self.repository.insert(&command)?;
369 self.emit_event(EntityEvent::created(command.entity, command.values))
370 .map_err(RepositoryError::Runtime)?;
371 Ok(affected)
372 }
373
374 pub(super) fn execute_prepared_update(
375 &self,
376 command: UpdateCommand,
377 ) -> Result<u64, RepositoryError<E::Error>> {
378 let affected = self.repository.update(&command)?;
379 let updated_fields = command.values.keys().cloned().collect();
380 let mut values = command.values.clone();
381 values.insert("id".to_owned(), command.id.clone());
382 if let Some(version) = command.expected_version {
383 values.insert("version".to_owned(), Value::I64(version + 1));
384 }
385 self.emit_event(EntityEvent {
386 kind: crate::EntityEventKind::Updated,
387 entity: command.entity,
388 values,
389 updated_fields,
390 })
391 .map_err(RepositoryError::Runtime)?;
392 Ok(affected)
393 }
394 pub(super) fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, D, E> {
395 ResolvedRepository {
396 entity,
397 repository: ContextRepository {
398 metadata: UserContextMetadata {
399 context: self.repository.metadata.context,
400 },
401 dialect: self.repository.dialect,
402 executor: self.repository.executor,
403 },
404 }
405 }
406}