1use std::sync::Arc;
2use std::time::{Instant, SystemTime};
3
4use teaql_core::{
5 AggregationCacheOptions, DeleteCommand, Entity, Expr, InsertCommand, Record, RecoverCommand,
6 RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
7};
8use teaql_sql::{CompiledQuery, SqlDialect};
9
10use crate::{
11 CheckObjectStatus, EntityEvent, RepositoryBehavior, RepositoryError, RuntimeError,
12 SqlLogOperation, clear_record_status, mark_record_status,
13};
14
15use super::{
16 AggregationCacheBackend, ContextRepository, InMemoryAggregationCache, QueryExecutor,
17 ResolvedRepository, UserContextMetadata, helpers::*,
18};
19
20impl<'a, D, E> ResolvedRepository<'a, D, E>
21where
22 D: SqlDialect,
23 E: QueryExecutor,
24{
25 pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
26 self.repository.metadata.context.repository_behavior(entity)
27 }
28
29 pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
30 self.repository
31 .metadata
32 .context
33 .repository_behavior(&self.entity)
34 }
35
36 pub fn entity(&self) -> &str {
37 &self.entity
38 }
39
40 pub fn select(&self) -> SelectQuery {
41 SelectQuery::new(self.entity.clone())
42 }
43
44 pub fn insert_command(&self) -> InsertCommand {
45 InsertCommand::new(self.entity.clone())
46 }
47
48 fn enforce_insert_policy(&self, command: &mut InsertCommand) -> Result<(), RuntimeError> {
49 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
50 policy.enforce_insert(self.repository.metadata.context, command)?;
51 }
52 Ok(())
53 }
54
55 fn enforce_update_policy(&self, command: &mut UpdateCommand) -> Result<(), RuntimeError> {
56 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
57 policy.enforce_update(self.repository.metadata.context, command)?;
58 }
59 Ok(())
60 }
61
62 fn enforce_delete_policy(&self, command: &mut DeleteCommand) -> Result<(), RuntimeError> {
63 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
64 policy.enforce_delete(self.repository.metadata.context, command)?;
65 }
66 Ok(())
67 }
68
69 fn enforce_recover_policy(&self, command: &mut RecoverCommand) -> Result<(), RuntimeError> {
70 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
71 policy.enforce_recover(self.repository.metadata.context, command)?;
72 }
73 Ok(())
74 }
75
76 fn prepare_select_query(&self, query: &SelectQuery) -> Result<SelectQuery, RuntimeError> {
77 let mut query = query.clone();
78
79 let mut full_trace = self.trace_context.clone();
80 full_trace.extend(query.trace_chain);
81 query.trace_chain = full_trace;
82
83 if let Some(behavior) = self.query_behavior(&query.entity) {
84 behavior.before_select(self.repository.metadata.context, &mut query)?;
85 }
86 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
87 policy.enforce_select(self.repository.metadata.context, &mut query)?;
88 }
89 Ok(query)
90 }
91
92 pub fn prepare_insert_command(
93 &self,
94 command: &InsertCommand,
95 ) -> Result<InsertCommand, RuntimeError> {
96 let mut command = command.clone();
97 if let Some(behavior) = self.behavior() {
98 behavior.before_insert(self.repository.metadata.context, &mut command)?;
99 }
100 self.enforce_insert_policy(&mut command)?;
101
102 let entity = self
103 .repository
104 .metadata
105 .context
106 .require_entity(&command.entity)?;
107 if let Some(id_property) = entity.id_property() {
108 let needs_id = !command.values.contains_key(&id_property.name)
109 || is_unassigned_id(command.values.get(&id_property.name));
110 if needs_id {
111 let id = self.repository.metadata.context.next_id(&command.entity)?;
112 command
113 .values
114 .insert(id_property.name.clone(), Value::U64(id));
115 }
116 }
117 ensure_initial_version(&mut command.values, entity);
118 mark_record_status(&mut command.values, CheckObjectStatus::Create);
119 let check_result = self
120 .repository
121 .metadata
122 .context
123 .check_and_fix_record(&command.entity, &mut command.values);
124 clear_record_status(&mut command.values);
125 check_result?;
126
127 Ok(command)
128 }
129
130 pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
131 UpdateCommand::new(self.entity.clone(), id)
132 }
133
134 pub fn prepare_update_command(
135 &self,
136 command: &UpdateCommand,
137 ) -> Result<UpdateCommand, RuntimeError> {
138 let mut command = command.clone();
139 if let Some(behavior) = self.behavior() {
140 behavior.before_update(self.repository.metadata.context, &mut command)?;
141 }
142 self.enforce_update_policy(&mut command)?;
143 mark_record_status(&mut command.values, CheckObjectStatus::Update);
144 let check_result = self
145 .repository
146 .metadata
147 .context
148 .check_and_fix_record(&command.entity, &mut command.values);
149 clear_record_status(&mut command.values);
150 check_result?;
151 Ok(command)
152 }
153
154 pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
155 DeleteCommand::new(self.entity.clone(), id)
156 }
157
158 pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
159 RecoverCommand::new(self.entity.clone(), id, expected_version)
160 }
161
162 pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
163 let query = self.prepare_select_query(query)?;
164 self.repository.compile(&query)
165 }
166
167 pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
168 let query = self
169 .prepare_select_query(query)
170 .map_err(RepositoryError::Runtime)?;
171 self.fetch_prepared_all(&query)
172 }
173
174 fn fetch_prepared_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
175 let mut rows = self.fetch_prepared_query(query)?;
176 self.enhance_object_group_bys(&mut rows, &query.object_group_bys, &query.trace_chain)?;
177 self.enhance_child_queries(&mut rows, &query.child_enhancements, &query.trace_chain)?;
178 Ok(rows)
179 }
180
181 fn fetch_prepared_query(
182 &self,
183 query: &SelectQuery,
184 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
185
186 let mut compiled = self
187 .repository
188 .compile(query)
189 .map_err(RepositoryError::Runtime)?;
190 let final_comment = self.repository.resolve_final_comment(&query.trace_chain, query.comment.clone());
191 compiled.comment = final_comment;
192 if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
193 if let Some(cache) = self
194 .repository
195 .metadata
196 .context
197 .get_resource::<Arc<dyn AggregationCacheBackend>>()
198 {
199 return self.fetch_prepared_query_with_cache(
200 query,
201 &compiled,
202 options,
203 cache.as_ref(),
204 );
205 }
206 if let Some(cache) = self
207 .repository
208 .metadata
209 .context
210 .get_resource::<InMemoryAggregationCache>()
211 {
212 return self.fetch_prepared_query_with_cache(query, &compiled, options, cache);
213 }
214 }
215 let started_at = SystemTime::now();
216 let started = Instant::now();
217 let rows = self
218 .repository
219 .executor
220 .fetch_all(&compiled)
221 .map_err(RepositoryError::Executor)?;
222 self.repository.log_sql_result(
223 SqlLogOperation::Select,
224 &compiled,
225 started_at,
226 started,
227 Some(rows.len()),
228 Some(query.entity.clone()),
229 None,
230 query.trace_chain.clone(),
231 );
232 Ok(rows)
233 }
234
235 fn fetch_prepared_query_with_cache(
236 &self,
237 query: &SelectQuery,
238 compiled: &CompiledQuery,
239 options: AggregationCacheOptions,
240 cache: &dyn AggregationCacheBackend,
241 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
242
243 let key = aggregation_cache_key(
244 cache.namespace(),
245 &aggregation_cache_namespace(&query.entity),
246 compiled,
247 );
248 if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
249 return Ok(rows);
250 }
251 let started_at = SystemTime::now();
252 let started = Instant::now();
253 let rows = self
254 .repository
255 .executor
256 .fetch_all(compiled)
257 .map_err(RepositoryError::Executor)?;
258 self.repository.log_sql_result(
259 SqlLogOperation::Select,
260 compiled,
261 started_at,
262 started,
263 Some(rows.len()),
264 Some(query.entity.clone()),
265 None,
266 query.trace_chain.clone(),
267 );
268 cache.put(key, rows.clone());
269 Ok(rows)
270 }
271
272 pub fn fetch_all_with_relation_aggregates(
273 &self,
274 query: &SelectQuery,
275 relation_aggregates: &[RelationAggregate],
276 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
277 let query = self
278 .prepare_select_query(query)
279 .map_err(RepositoryError::Runtime)?;
280
281 let mut rows = self.fetch_prepared_all(&query)?;
282 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain)?;
283 Ok(rows)
284 }
285
286 pub fn fetch_smart_list(
287 &self,
288 query: &SelectQuery,
289 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
290 let query = self
291 .prepare_select_query(query)
292 .map_err(RepositoryError::Runtime)?;
293
294 self.repository.fetch_smart_list(&query)
295 }
296
297 pub fn fetch_smart_list_with_relation_aggregates(
298 &self,
299 query: &SelectQuery,
300 relation_aggregates: &[RelationAggregate],
301 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
302 self.fetch_all_with_relation_aggregates(query, relation_aggregates)
303 .map(SmartList::from)
304 }
305
306 pub fn fetch_entities<T>(
307 &self,
308 query: &SelectQuery,
309 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
310 where
311 T: Entity,
312 {
313 let query = self
314 .prepare_select_query(query)
315 .map_err(RepositoryError::Runtime)?;
316
317 self.repository.fetch_entities(&query)
318 }
319
320 pub fn fetch_entities_with_relation_aggregates<T>(
321 &self,
322 query: &SelectQuery,
323 relation_aggregates: &[RelationAggregate],
324 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
325 where
326 T: Entity,
327 {
328 self.fetch_all_with_relation_aggregates(query, relation_aggregates)?
329 .into_iter()
330 .map(|record| {
331 let mut entity = T::from_record(record)?;
332 let root = crate::EntityRoot::default();
333 entity.on_loaded(&root as &dyn std::any::Any);
334 Ok(entity)
335 })
336 .collect::<Result<Vec<_>, _>>()
337 .map(SmartList::from)
338 .map_err(RepositoryError::Entity)
339 }
340
341 pub fn fetch_enhanced_entities_with_relation_aggregates<T>(
342 &self,
343 query: &SelectQuery,
344 relation_aggregates: &[RelationAggregate],
345 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
346 where
347 T: Entity,
348 {
349 let query = self
350 .prepare_select_query(query)
351 .map_err(RepositoryError::Runtime)?;
352
353
354
355 let mut rows = self.fetch_prepared_all(&query)?;
356 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain)?;
357 self.enhance_query_relations(&mut rows, &query)?;
358 self.enhance_relations(&mut rows)?;
359 rows.into_iter()
360 .map(|record| {
361 let mut entity = T::from_record(record)?;
362 let root = crate::EntityRoot::default();
363 entity.on_loaded(&root as &dyn std::any::Any);
364 Ok(entity)
365 })
366 .collect::<Result<Vec<_>, _>>()
367 .map(SmartList::from)
368 .map_err(RepositoryError::Entity)
369 }
370
371 pub fn fetch_enhanced_entities<T>(
372 &self,
373 query: &SelectQuery,
374 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
375 where
376 T: Entity,
377 {
378 let query = self
379 .prepare_select_query(query)
380 .map_err(RepositoryError::Runtime)?;
381
382
383 let mut rows = self.fetch_prepared_all(&query)?;
384 self.enhance_query_relations(&mut rows, &query)?;
385 self.enhance_relations(&mut rows)?;
386 let root = self.repository.metadata.context.get_resource::<crate::EntityRoot>().cloned();
387
388 rows.into_iter()
389 .map(|record| {
390 let mut entity = T::from_record(record)?;
391 if let Some(ref root) = root {
392 entity.on_loaded(root as &dyn std::any::Any);
393 }
394 Ok(entity)
395 })
396 .collect::<Result<Vec<_>, _>>()
397 .map(SmartList::from)
398 .map_err(RepositoryError::Entity)
399 }
400
401 pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
402 let command = self
403 .prepare_insert_command(command)
404 .map_err(RepositoryError::Runtime)?;
405 self.execute_prepared_insert_with_comment(command, self.trace_context.clone())
406 }
407
408 pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
409 let command = self
410 .prepare_update_command(command)
411 .map_err(RepositoryError::Runtime)?;
412 self.execute_prepared_update_with_comment(command, self.trace_context.clone())
413 }
414
415 pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
416 self.delete_scoped(command, self.trace_context.clone())
417 }
418
419 pub fn delete_scoped(
420 &self,
421 command: &DeleteCommand,
422 trace_chain: Vec<teaql_core::TraceNode>,
423 ) -> Result<u64, RepositoryError<E::Error>> {
424 let mut command = command.clone();
425 command.trace_chain = trace_chain.clone();
426 if let Some(behavior) = self.behavior() {
427 behavior
428 .before_delete(self.repository.metadata.context, &mut command)
429 .map_err(RepositoryError::Runtime)?;
430 }
431 self.enforce_delete_policy(&mut command)
432 .map_err(RepositoryError::Runtime)?;
433
434 let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
435 let affected = self.repository.delete(&command)?;
436
437 let mut event = EntityEvent::deleted_with_old_values(
438 command.entity,
439 command.id,
440 command.expected_version,
441 old_values,
442 );
443 event.trace_chain = trace_chain;
444 self.emit_event(event)
445 .map_err(RepositoryError::Runtime)?;
446 Ok(affected)
447 }
448
449 pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
450 let mut command = command.clone();
451 command.trace_chain = self.trace_context.clone();
452 if let Some(behavior) = self.behavior() {
453 behavior
454 .before_recover(self.repository.metadata.context, &mut command)
455 .map_err(RepositoryError::Runtime)?;
456 }
457 self.enforce_recover_policy(&mut command)
458 .map_err(RepositoryError::Runtime)?;
459 let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
460 let affected = self.repository.recover(&command)?;
461 let event = EntityEvent::recovered_with_old_values(
462 command.entity,
463 command.id,
464 command.expected_version,
465 old_values,
466 );
467 self.emit_event(event)
468 .map_err(RepositoryError::Runtime)?;
469 Ok(affected)
470 }
471
472 fn emit_event(&self, event: EntityEvent) -> Result<(), RuntimeError> {
473 self.repository.metadata.context.send_event(event)
474 }
475
476 #[allow(dead_code)]
477 pub(super) fn execute_prepared_insert(
478 &self,
479 command: InsertCommand,
480 ) -> Result<u64, RepositoryError<E::Error>> {
481 self.execute_prepared_insert_with_comment(command, Vec::new())
482 }
483
484 pub(super) fn execute_prepared_insert_with_comment(
485 &self,
486 mut command: InsertCommand,
487 trace_chain: Vec<teaql_core::TraceNode>,
488 ) -> Result<u64, RepositoryError<E::Error>> {
489 command.trace_chain = trace_chain.clone();
490 let affected = self.repository.insert(&command)?;
491 let mut event = EntityEvent::created(command.entity, command.values);
492 event.trace_chain = trace_chain;
493 self.emit_event(event).map_err(RepositoryError::Runtime)?;
494 Ok(affected)
495 }
496
497 #[allow(dead_code)]
498 pub(super) fn execute_prepared_update(
499 &self,
500 command: UpdateCommand,
501 ) -> Result<u64, RepositoryError<E::Error>> {
502 self.execute_prepared_update_with_comment(command, Vec::new())
503 }
504
505 pub(super) fn execute_prepared_update_with_comment(
506 &self,
507 mut command: UpdateCommand,
508 trace_chain: Vec<teaql_core::TraceNode>,
509 ) -> Result<u64, RepositoryError<E::Error>> {
510 command.trace_chain = trace_chain.clone();
511 let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
512 let affected = self.repository.update(&command)?;
513 let updated_fields = command.values.keys().cloned().collect();
514 let mut values = command.values.clone();
515 values.insert("id".to_owned(), command.id.clone());
516 if let Some(version) = command.expected_version {
517 values.insert("version".to_owned(), Value::I64(version + 1));
518 }
519 let mut new_values = old_values.clone().unwrap_or_default();
520 for (field, value) in &values {
521 new_values.insert(field.clone(), value.clone());
522 }
523 let mut event = EntityEvent::updated_with_old_values(
524 command.entity,
525 values,
526 old_values,
527 new_values,
528 updated_fields,
529 );
530 event.trace_chain = trace_chain;
531 self.emit_event(event).map_err(RepositoryError::Runtime)?;
532 Ok(affected)
533 }
534
535 fn fetch_current_event_row(
536 &self,
537 entity: &str,
538 id: &Value,
539 ) -> Result<Option<Record>, RepositoryError<E::Error>> {
540 if self.repository.metadata.context.event_sink.is_none() {
541 return Ok(None);
542 }
543 let descriptor = self
544 .repository
545 .metadata
546 .context
547 .require_entity(entity)
548 .map_err(RepositoryError::Runtime)?;
549 let Some(id_property) = descriptor.id_property() else {
550 return Ok(None);
551 };
552 let mut rows = self.fetch_all(
553 &SelectQuery::new(entity).filter(Expr::eq(id_property.name.clone(), id.clone())),
554 )?;
555 Ok(rows.pop())
556 }
557
558
559 pub fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, D, E> {
560 ResolvedRepository {
561 entity,
562 repository: ContextRepository {
563 metadata: UserContextMetadata {
564 context: self.repository.metadata.context,
565 },
566 dialect: self.repository.dialect,
567 executor: self.repository.executor,
568 },
569 trace_context: Vec::new(),
570 }
571 }
572}