1use std::sync::Arc;
2use std::time::{Instant, SystemTime};
3
4use teaql_core::{
5 AggregationCacheOptions, DeleteCommand, Entity, Expr, InsertCommand, Record, RecoverCommand,
6 RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
7};
8use teaql_sql::{CompiledQuery, SqlDialect};
9
10use crate::{
11 CheckObjectStatus, EntityEvent, RepositoryBehavior, RepositoryError, RuntimeError,
12 SqlLogOperation, clear_record_status, mark_record_status,
13};
14
15use super::{
16 AggregationCacheBackend, ContextRepository, InMemoryAggregationCache, QueryExecutor,
17 ResolvedRepository, UserContextMetadata, helpers::*,
18};
19
20impl<'a, D, E> ResolvedRepository<'a, D, E>
21where
22 D: SqlDialect,
23 E: QueryExecutor,
24{
25 pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
26 self.repository.metadata.context.repository_behavior(entity)
27 }
28
29 pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
30 self.repository
31 .metadata
32 .context
33 .repository_behavior(&self.entity)
34 }
35
36 pub fn entity(&self) -> &str {
37 &self.entity
38 }
39
40 pub fn select(&self) -> SelectQuery {
41 SelectQuery::new(self.entity.clone())
42 }
43
44 pub fn insert_command(&self) -> InsertCommand {
45 InsertCommand::new(self.entity.clone())
46 }
47
48 fn enforce_insert_policy(&self, command: &mut InsertCommand) -> Result<(), RuntimeError> {
49 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
50 policy.enforce_insert(self.repository.metadata.context, command)?;
51 }
52 Ok(())
53 }
54
55 fn enforce_update_policy(&self, command: &mut UpdateCommand) -> Result<(), RuntimeError> {
56 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
57 policy.enforce_update(self.repository.metadata.context, command)?;
58 }
59 Ok(())
60 }
61
62 fn enforce_delete_policy(&self, command: &mut DeleteCommand) -> Result<(), RuntimeError> {
63 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
64 policy.enforce_delete(self.repository.metadata.context, command)?;
65 }
66 Ok(())
67 }
68
69 fn enforce_recover_policy(&self, command: &mut RecoverCommand) -> Result<(), RuntimeError> {
70 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
71 policy.enforce_recover(self.repository.metadata.context, command)?;
72 }
73 Ok(())
74 }
75
76 fn prepare_select_query(&self, query: &SelectQuery) -> Result<SelectQuery, RuntimeError> {
77 let mut query = query.clone();
78
79 let mut full_trace = self.trace_context.clone();
80 full_trace.extend(query.trace_chain);
81 query.trace_chain = full_trace;
82
83 if let Some(behavior) = self.query_behavior(&query.entity) {
84 behavior.before_select(self.repository.metadata.context, &mut query)?;
85 }
86 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
87 policy.enforce_select(self.repository.metadata.context, &mut query)?;
88 }
89 Ok(query)
90 }
91
92 pub fn prepare_insert_command(
93 &self,
94 command: &InsertCommand,
95 ) -> Result<InsertCommand, RuntimeError> {
96 let mut command = command.clone();
97 if let Some(behavior) = self.behavior() {
98 behavior.before_insert(self.repository.metadata.context, &mut command)?;
99 }
100 self.enforce_insert_policy(&mut command)?;
101
102 let entity = self
103 .repository
104 .metadata
105 .context
106 .require_entity(&command.entity)?;
107 if let Some(id_property) = entity.id_property() {
108 let needs_id = !command.values.contains_key(&id_property.name)
109 || is_unassigned_id(command.values.get(&id_property.name));
110 if needs_id {
111 let id = self.repository.metadata.context.next_id(&command.entity)?;
112 command
113 .values
114 .insert(id_property.name.clone(), Value::U64(id));
115 }
116 }
117 ensure_initial_version(&mut command.values, entity);
118 mark_record_status(&mut command.values, CheckObjectStatus::Create);
119 let check_result = self
120 .repository
121 .metadata
122 .context
123 .check_and_fix_record(&command.entity, &mut command.values);
124 clear_record_status(&mut command.values);
125 check_result?;
126
127 Ok(command)
128 }
129
130 pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
131 UpdateCommand::new(self.entity.clone(), id)
132 }
133
134 pub fn prepare_update_command(
135 &self,
136 command: &UpdateCommand,
137 ) -> Result<UpdateCommand, RuntimeError> {
138 let mut command = command.clone();
139 if let Some(behavior) = self.behavior() {
140 behavior.before_update(self.repository.metadata.context, &mut command)?;
141 }
142 self.enforce_update_policy(&mut command)?;
143
144 Ok(command)
145 }
146
147 pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
148 DeleteCommand::new(self.entity.clone(), id)
149 }
150
151 pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
152 RecoverCommand::new(self.entity.clone(), id, expected_version)
153 }
154
155 pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
156 let query = self.prepare_select_query(query)?;
157 self.repository.compile(&query)
158 }
159
160 pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
161 let query = self
162 .prepare_select_query(query)
163 .map_err(RepositoryError::Runtime)?;
164 self.fetch_prepared_all(&query)
165 }
166
167 fn fetch_prepared_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
168 let mut rows = self.fetch_prepared_query(query)?;
169 self.enhance_object_group_bys(&mut rows, &query.object_group_bys, &query.trace_chain)?;
170 self.enhance_child_queries(&mut rows, &query.child_enhancements, &query.trace_chain)?;
171 Ok(rows)
172 }
173
174 fn fetch_prepared_query(
175 &self,
176 query: &SelectQuery,
177 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
178
179 let mut compiled = self
180 .repository
181 .compile(query)
182 .map_err(RepositoryError::Runtime)?;
183 let final_comment = self.repository.resolve_final_comment(&query.trace_chain, query.comment.clone());
184 compiled.comment = final_comment;
185 if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
186 if let Some(cache) = self
187 .repository
188 .metadata
189 .context
190 .get_resource::<Arc<dyn AggregationCacheBackend>>()
191 {
192 return self.fetch_prepared_query_with_cache(
193 query,
194 &compiled,
195 options,
196 cache.as_ref(),
197 );
198 }
199 if let Some(cache) = self
200 .repository
201 .metadata
202 .context
203 .get_resource::<InMemoryAggregationCache>()
204 {
205 return self.fetch_prepared_query_with_cache(query, &compiled, options, cache);
206 }
207 }
208 let started_at = SystemTime::now();
209 let started = Instant::now();
210 let rows = self
211 .repository
212 .executor
213 .fetch_all(&compiled)
214 .map_err(RepositoryError::Executor)?;
215 self.repository.log_sql_result(
216 SqlLogOperation::Select,
217 &compiled,
218 started_at,
219 started,
220 Some(rows.len()),
221 Some(query.entity.clone()),
222 None,
223 query.trace_chain.clone(),
224 );
225 Ok(rows)
226 }
227
228 fn fetch_prepared_query_with_cache(
229 &self,
230 query: &SelectQuery,
231 compiled: &CompiledQuery,
232 options: AggregationCacheOptions,
233 cache: &dyn AggregationCacheBackend,
234 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
235
236 let key = aggregation_cache_key(
237 cache.namespace(),
238 &aggregation_cache_namespace(&query.entity),
239 compiled,
240 );
241 if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
242 return Ok(rows);
243 }
244 let started_at = SystemTime::now();
245 let started = Instant::now();
246 let rows = self
247 .repository
248 .executor
249 .fetch_all(compiled)
250 .map_err(RepositoryError::Executor)?;
251 self.repository.log_sql_result(
252 SqlLogOperation::Select,
253 compiled,
254 started_at,
255 started,
256 Some(rows.len()),
257 Some(query.entity.clone()),
258 None,
259 query.trace_chain.clone(),
260 );
261 cache.put(key, rows.clone());
262 Ok(rows)
263 }
264
265 pub fn fetch_all_with_relation_aggregates(
266 &self,
267 query: &SelectQuery,
268 relation_aggregates: &[RelationAggregate],
269 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
270 let query = self
271 .prepare_select_query(query)
272 .map_err(RepositoryError::Runtime)?;
273
274 let mut rows = self.fetch_prepared_all(&query)?;
275 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain)?;
276 Ok(rows)
277 }
278
279 pub fn fetch_smart_list(
280 &self,
281 query: &SelectQuery,
282 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
283 let query = self
284 .prepare_select_query(query)
285 .map_err(RepositoryError::Runtime)?;
286
287 self.repository.fetch_smart_list(&query)
288 }
289
290 pub fn fetch_smart_list_with_relation_aggregates(
291 &self,
292 query: &SelectQuery,
293 relation_aggregates: &[RelationAggregate],
294 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
295 self.fetch_all_with_relation_aggregates(query, relation_aggregates)
296 .map(SmartList::from)
297 }
298
299 pub fn fetch_entities<T>(
300 &self,
301 query: &SelectQuery,
302 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
303 where
304 T: Entity,
305 {
306 let query = self
307 .prepare_select_query(query)
308 .map_err(RepositoryError::Runtime)?;
309
310 self.repository.fetch_entities(&query)
311 }
312
313 pub fn fetch_entities_with_relation_aggregates<T>(
314 &self,
315 query: &SelectQuery,
316 relation_aggregates: &[RelationAggregate],
317 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
318 where
319 T: Entity,
320 {
321 self.fetch_all_with_relation_aggregates(query, relation_aggregates)?
322 .into_iter()
323 .map(|record| {
324 let mut entity = T::from_record(record)?;
325 let root = crate::EntityRoot::default();
326 entity.on_loaded(&root as &dyn std::any::Any);
327 Ok(entity)
328 })
329 .collect::<Result<Vec<_>, _>>()
330 .map(SmartList::from)
331 .map_err(RepositoryError::Entity)
332 }
333
334 pub fn fetch_enhanced_entities_with_relation_aggregates<T>(
335 &self,
336 query: &SelectQuery,
337 relation_aggregates: &[RelationAggregate],
338 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
339 where
340 T: Entity,
341 {
342 let query = self
343 .prepare_select_query(query)
344 .map_err(RepositoryError::Runtime)?;
345
346
347
348 let mut rows = self.fetch_prepared_all(&query)?;
349 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain)?;
350 self.enhance_query_relations(&mut rows, &query)?;
351 self.enhance_relations(&mut rows)?;
352 rows.into_iter()
353 .map(|record| {
354 let mut entity = T::from_record(record)?;
355 let root = crate::EntityRoot::default();
356 entity.on_loaded(&root as &dyn std::any::Any);
357 Ok(entity)
358 })
359 .collect::<Result<Vec<_>, _>>()
360 .map(SmartList::from)
361 .map_err(RepositoryError::Entity)
362 }
363
364 pub fn fetch_enhanced_entities<T>(
365 &self,
366 query: &SelectQuery,
367 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
368 where
369 T: Entity,
370 {
371 let query = self
372 .prepare_select_query(query)
373 .map_err(RepositoryError::Runtime)?;
374
375
376 let mut rows = self.fetch_prepared_all(&query)?;
377 self.enhance_query_relations(&mut rows, &query)?;
378 self.enhance_relations(&mut rows)?;
379 let root = self.repository.metadata.context.get_resource::<crate::EntityRoot>().cloned();
380
381 rows.into_iter()
382 .map(|record| {
383 let mut entity = T::from_record(record)?;
384 if let Some(ref root) = root {
385 entity.on_loaded(root as &dyn std::any::Any);
386 }
387 Ok(entity)
388 })
389 .collect::<Result<Vec<_>, _>>()
390 .map(SmartList::from)
391 .map_err(RepositoryError::Entity)
392 }
393
394 pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
395 let command = self
396 .prepare_insert_command(command)
397 .map_err(RepositoryError::Runtime)?;
398 self.execute_prepared_insert_with_comment(command, self.trace_context.clone())
399 }
400
401 pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
402 let command = self
403 .prepare_update_command(command)
404 .map_err(RepositoryError::Runtime)?;
405 self.execute_prepared_update_with_comment(command, self.trace_context.clone())
406 }
407
408 pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
409 self.delete_scoped(command, self.trace_context.clone())
410 }
411
412 pub fn delete_scoped(
413 &self,
414 command: &DeleteCommand,
415 trace_chain: Vec<teaql_core::TraceNode>,
416 ) -> Result<u64, RepositoryError<E::Error>> {
417 let mut command = command.clone();
418 command.trace_chain = trace_chain.clone();
419 if let Some(behavior) = self.behavior() {
420 behavior
421 .before_delete(self.repository.metadata.context, &mut command)
422 .map_err(RepositoryError::Runtime)?;
423 }
424 self.enforce_delete_policy(&mut command)
425 .map_err(RepositoryError::Runtime)?;
426
427 let old_values = self.fetch_current_event_row(&command.entity, &command.id, trace_chain.clone())?;
428 let affected = self.repository.delete(&command)?;
429
430 let mut event = EntityEvent::deleted_with_old_values(
431 command.entity,
432 command.id,
433 command.expected_version,
434 old_values,
435 );
436 event.trace_chain = trace_chain;
437 self.emit_event(event)
438 .map_err(RepositoryError::Runtime)?;
439 Ok(affected)
440 }
441
442 pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
443 let mut command = command.clone();
444 command.trace_chain = self.trace_context.clone();
445 if let Some(behavior) = self.behavior() {
446 behavior
447 .before_recover(self.repository.metadata.context, &mut command)
448 .map_err(RepositoryError::Runtime)?;
449 }
450 self.enforce_recover_policy(&mut command)
451 .map_err(RepositoryError::Runtime)?;
452 let old_values = self.fetch_current_event_row(&command.entity, &command.id, command.trace_chain.clone())?;
453 let affected = self.repository.recover(&command)?;
454 let event = EntityEvent::recovered_with_old_values(
455 command.entity,
456 command.id,
457 command.expected_version,
458 old_values,
459 );
460 self.emit_event(event)
461 .map_err(RepositoryError::Runtime)?;
462 Ok(affected)
463 }
464
465 fn emit_event(&self, event: EntityEvent) -> Result<(), RuntimeError> {
466 self.repository.metadata.context.send_event(event)
467 }
468
469 #[allow(dead_code)]
470 pub(super) fn execute_prepared_insert(
471 &self,
472 command: InsertCommand,
473 ) -> Result<u64, RepositoryError<E::Error>> {
474 self.execute_prepared_insert_with_comment(command, Vec::new())
475 }
476
477 pub(super) fn execute_prepared_insert_with_comment(
478 &self,
479 mut command: InsertCommand,
480 trace_chain: Vec<teaql_core::TraceNode>,
481 ) -> Result<u64, RepositoryError<E::Error>> {
482 command.trace_chain = trace_chain.clone();
483 let affected = self.repository.insert(&command)?;
484 let mut event = EntityEvent::created(command.entity, command.values);
485 event.trace_chain = trace_chain;
486 self.emit_event(event).map_err(RepositoryError::Runtime)?;
487 Ok(affected)
488 }
489
490 pub(super) fn execute_prepared_batch_insert(
491 &self,
492 command: teaql_core::BatchInsertCommand,
493 ) -> Result<u64, RepositoryError<E::Error>> {
494 if command.batch_values.is_empty() {
495 return Ok(0);
496 }
497 let affected = self.repository.batch_insert(&command)?;
498
499 let entity = command.entity.clone();
500 for (i, values) in command.batch_values.into_iter().enumerate() {
501 let mut event = EntityEvent::created(entity.clone(), values);
502 if i < command.trace_chains.len() {
503 event.trace_chain = command.trace_chains[i].clone();
504 }
505 self.emit_event(event).map_err(RepositoryError::Runtime)?;
506 }
507 Ok(affected)
508 }
509
510 #[allow(dead_code)]
511 pub(super) fn execute_prepared_update(
512 &self,
513 command: UpdateCommand,
514 ) -> Result<u64, RepositoryError<E::Error>> {
515 self.execute_prepared_update_with_comment(command, Vec::new())
516 }
517
518 pub(super) fn execute_prepared_update_with_comment(
519 &self,
520 mut command: UpdateCommand,
521 trace_chain: Vec<teaql_core::TraceNode>,
522 ) -> Result<u64, RepositoryError<E::Error>> {
523 command.trace_chain = trace_chain.clone();
524
525 let mut old_values = command.old_values.clone();
526 let needs_fetch = match &old_values {
527 Some(snapshot) => !command.values.keys().all(|k| snapshot.contains_key(k)),
528 None => true,
529 };
530 if needs_fetch {
531 old_values = self.fetch_current_event_row(&command.entity, &command.id, trace_chain.clone())?;
532 }
533
534 let affected = self.repository.update(&command)?;
535 let updated_fields = command.values.keys().cloned().collect();
536 let mut values = command.values.clone();
537 values.insert("id".to_owned(), command.id.clone());
538 if let Some(version) = command.expected_version {
539 values.insert("version".to_owned(), Value::I64(version + 1));
540 }
541 let mut new_values = old_values.clone().unwrap_or_default();
542 for (field, value) in &values {
543 new_values.insert(field.clone(), value.clone());
544 }
545 let mut event = EntityEvent::updated_with_old_values(
546 command.entity,
547 values,
548 old_values,
549 new_values,
550 updated_fields,
551 );
552 event.trace_chain = trace_chain;
553 self.emit_event(event).map_err(RepositoryError::Runtime)?;
554 Ok(affected)
555 }
556
557 pub(super) fn execute_prepared_batch_update(
558 &self,
559 command: teaql_core::BatchUpdateCommand,
560 ) -> Result<u64, RepositoryError<E::Error>> {
561 if command.batch_values.is_empty() {
562 return Ok(0);
563 }
564 let affected = self.repository.batch_update(&command)?;
565
566 let entity = command.entity.clone();
567 for (i, values) in command.batch_values.into_iter().enumerate() {
568 let mut full_values = values.clone();
569 full_values.insert("id".to_owned(), command.batch_ids[i].clone());
570 if let Some(Some(version)) = command.batch_expected_versions.get(i) {
571 full_values.insert("version".to_owned(), teaql_core::Value::I64(*version + 1));
572 }
573
574 let old_values = command.batch_old_values.get(i).cloned().unwrap_or(None);
575 let mut new_values = old_values.clone().unwrap_or_default();
576 for (field, value) in &full_values {
577 new_values.insert(field.clone(), value.clone());
578 }
579
580 let mut event = EntityEvent::updated_with_old_values(
581 entity.clone(),
582 full_values,
583 old_values,
584 new_values,
585 command.update_fields.clone(),
586 );
587 if i < command.trace_chains.len() {
588 event.trace_chain = command.trace_chains[i].clone();
589 }
590 self.emit_event(event).map_err(RepositoryError::Runtime)?;
591 }
592 Ok(affected)
593 }
594
595 fn fetch_current_event_row(
596 &self,
597 _entity: &str,
598 _id: &Value,
599 _trace_chain: Vec<teaql_core::TraceNode>,
600 ) -> Result<Option<Record>, RepositoryError<E::Error>> {
601 Ok(None)
604 }
605
606
607 pub fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, D, E> {
608 ResolvedRepository {
609 entity,
610 repository: ContextRepository {
611 metadata: UserContextMetadata {
612 context: self.repository.metadata.context,
613 },
614 dialect: self.repository.dialect,
615 executor: self.repository.executor,
616 },
617 trace_context: Vec::new(),
618 }
619 }
620}