Skip to main content

teaql_runtime/repository/
resolved.rs

1use std::sync::Arc;
2use std::time::{Instant, SystemTime};
3
4use teaql_core::{
5    AggregationCacheOptions, DeleteCommand, Entity, Expr, InsertCommand, Record, RecoverCommand,
6    RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
7};
8use teaql_sql::{CompiledQuery, SqlDialect};
9
10use crate::{
11    CheckObjectStatus, EntityEvent, RepositoryBehavior, RepositoryError, RuntimeError,
12    SqlLogOperation, clear_record_status, mark_record_status,
13};
14
15use super::{
16    AggregationCacheBackend, ContextRepository, InMemoryAggregationCache, QueryExecutor,
17    ResolvedRepository, UserContextMetadata, helpers::*,
18};
19
20impl<'a, D, E> ResolvedRepository<'a, D, E>
21where
22    D: SqlDialect,
23    E: QueryExecutor,
24{
25    pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
26        self.repository.metadata.context.repository_behavior(entity)
27    }
28
29    pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
30        self.repository
31            .metadata
32            .context
33            .repository_behavior(&self.entity)
34    }
35
36    pub fn entity(&self) -> &str {
37        &self.entity
38    }
39
40    pub fn select(&self) -> SelectQuery {
41        SelectQuery::new(self.entity.clone())
42    }
43
44    pub fn insert_command(&self) -> InsertCommand {
45        InsertCommand::new(self.entity.clone())
46    }
47
48    fn enforce_insert_policy(&self, command: &mut InsertCommand) -> Result<(), RuntimeError> {
49        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
50            policy.enforce_insert(self.repository.metadata.context, command)?;
51        }
52        Ok(())
53    }
54
55    fn enforce_update_policy(&self, command: &mut UpdateCommand) -> Result<(), RuntimeError> {
56        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
57            policy.enforce_update(self.repository.metadata.context, command)?;
58        }
59        Ok(())
60    }
61
62    fn enforce_delete_policy(&self, command: &mut DeleteCommand) -> Result<(), RuntimeError> {
63        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
64            policy.enforce_delete(self.repository.metadata.context, command)?;
65        }
66        Ok(())
67    }
68
69    fn enforce_recover_policy(&self, command: &mut RecoverCommand) -> Result<(), RuntimeError> {
70        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
71            policy.enforce_recover(self.repository.metadata.context, command)?;
72        }
73        Ok(())
74    }
75
76    fn prepare_select_query(&self, query: &SelectQuery) -> Result<SelectQuery, RuntimeError> {
77        let mut query = query.clone();
78        
79        let mut full_trace = self.trace_context.clone();
80        full_trace.extend(query.trace_chain);
81        query.trace_chain = full_trace;
82
83        if let Some(behavior) = self.query_behavior(&query.entity) {
84            behavior.before_select(self.repository.metadata.context, &mut query)?;
85        }
86        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
87            policy.enforce_select(self.repository.metadata.context, &mut query)?;
88        }
89        Ok(query)
90    }
91
92    pub fn prepare_insert_command(
93        &self,
94        command: &InsertCommand,
95    ) -> Result<InsertCommand, RuntimeError> {
96        let mut command = command.clone();
97        if let Some(behavior) = self.behavior() {
98            behavior.before_insert(self.repository.metadata.context, &mut command)?;
99        }
100        self.enforce_insert_policy(&mut command)?;
101
102        let entity = self
103            .repository
104            .metadata
105            .context
106            .require_entity(&command.entity)?;
107        if let Some(id_property) = entity.id_property() {
108            let needs_id = !command.values.contains_key(&id_property.name)
109                || is_unassigned_id(command.values.get(&id_property.name));
110            if needs_id {
111                let id = self.repository.metadata.context.next_id(&command.entity)?;
112                command
113                    .values
114                    .insert(id_property.name.clone(), Value::U64(id));
115            }
116        }
117        ensure_initial_version(&mut command.values, entity);
118        mark_record_status(&mut command.values, CheckObjectStatus::Create);
119        let check_result = self
120            .repository
121            .metadata
122            .context
123            .check_and_fix_record(&command.entity, &mut command.values);
124        clear_record_status(&mut command.values);
125        check_result?;
126
127        Ok(command)
128    }
129
130    pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
131        UpdateCommand::new(self.entity.clone(), id)
132    }
133
134    pub fn prepare_update_command(
135        &self,
136        command: &UpdateCommand,
137    ) -> Result<UpdateCommand, RuntimeError> {
138        let mut command = command.clone();
139        if let Some(behavior) = self.behavior() {
140            behavior.before_update(self.repository.metadata.context, &mut command)?;
141        }
142        self.enforce_update_policy(&mut command)?;
143        mark_record_status(&mut command.values, CheckObjectStatus::Update);
144        let check_result = self
145            .repository
146            .metadata
147            .context
148            .check_and_fix_record(&command.entity, &mut command.values);
149        clear_record_status(&mut command.values);
150        check_result?;
151        Ok(command)
152    }
153
154    pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
155        DeleteCommand::new(self.entity.clone(), id)
156    }
157
158    pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
159        RecoverCommand::new(self.entity.clone(), id, expected_version)
160    }
161
162    pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
163        let query = self.prepare_select_query(query)?;
164        self.repository.compile(&query)
165    }
166
167    pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
168        let query = self
169            .prepare_select_query(query)
170            .map_err(RepositoryError::Runtime)?;
171        self.fetch_prepared_all(&query)
172    }
173
174    fn fetch_prepared_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
175        let mut rows = self.fetch_prepared_query(query)?;
176        self.enhance_object_group_bys(&mut rows, &query.object_group_bys, &query.trace_chain)?;
177        self.enhance_child_queries(&mut rows, &query.child_enhancements, &query.trace_chain)?;
178        Ok(rows)
179    }
180
181    fn fetch_prepared_query(
182        &self,
183        query: &SelectQuery,
184    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
185
186        let mut compiled = self
187            .repository
188            .compile(query)
189            .map_err(RepositoryError::Runtime)?;
190        let final_comment = self.repository.resolve_final_comment(&query.trace_chain, query.comment.clone());
191        compiled.comment = final_comment;
192        if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
193            if let Some(cache) = self
194                .repository
195                .metadata
196                .context
197                .get_resource::<Arc<dyn AggregationCacheBackend>>()
198            {
199                return self.fetch_prepared_query_with_cache(
200                    query,
201                    &compiled,
202                    options,
203                    cache.as_ref(),
204                );
205            }
206            if let Some(cache) = self
207                .repository
208                .metadata
209                .context
210                .get_resource::<InMemoryAggregationCache>()
211            {
212                return self.fetch_prepared_query_with_cache(query, &compiled, options, cache);
213            }
214        }
215        let started_at = SystemTime::now();
216        let started = Instant::now();
217        let rows = self
218            .repository
219            .executor
220            .fetch_all(&compiled)
221            .map_err(RepositoryError::Executor)?;
222        self.repository.log_sql_result(
223            SqlLogOperation::Select,
224            &compiled,
225            started_at,
226            started,
227            Some(rows.len()),
228            Some(query.entity.clone()),
229            None,
230            query.trace_chain.clone(),
231        );
232        Ok(rows)
233    }
234
235    fn fetch_prepared_query_with_cache(
236        &self,
237        query: &SelectQuery,
238        compiled: &CompiledQuery,
239        options: AggregationCacheOptions,
240        cache: &dyn AggregationCacheBackend,
241    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
242
243        let key = aggregation_cache_key(
244            cache.namespace(),
245            &aggregation_cache_namespace(&query.entity),
246            compiled,
247        );
248        if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
249            return Ok(rows);
250        }
251        let started_at = SystemTime::now();
252        let started = Instant::now();
253        let rows = self
254            .repository
255            .executor
256            .fetch_all(compiled)
257            .map_err(RepositoryError::Executor)?;
258        self.repository.log_sql_result(
259            SqlLogOperation::Select,
260            compiled,
261            started_at,
262            started,
263            Some(rows.len()),
264            Some(query.entity.clone()),
265            None,
266            query.trace_chain.clone(),
267        );
268        cache.put(key, rows.clone());
269        Ok(rows)
270    }
271
272    pub fn fetch_all_with_relation_aggregates(
273        &self,
274        query: &SelectQuery,
275        relation_aggregates: &[RelationAggregate],
276    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
277        let query = self
278            .prepare_select_query(query)
279            .map_err(RepositoryError::Runtime)?;
280
281        let mut rows = self.fetch_prepared_all(&query)?;
282        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain)?;
283        Ok(rows)
284    }
285
286    pub fn fetch_smart_list(
287        &self,
288        query: &SelectQuery,
289    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
290        let query = self
291            .prepare_select_query(query)
292            .map_err(RepositoryError::Runtime)?;
293
294        self.repository.fetch_smart_list(&query)
295    }
296
297    pub fn fetch_smart_list_with_relation_aggregates(
298        &self,
299        query: &SelectQuery,
300        relation_aggregates: &[RelationAggregate],
301    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
302        self.fetch_all_with_relation_aggregates(query, relation_aggregates)
303            .map(SmartList::from)
304    }
305
306    pub fn fetch_entities<T>(
307        &self,
308        query: &SelectQuery,
309    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
310    where
311        T: Entity,
312    {
313        let query = self
314            .prepare_select_query(query)
315            .map_err(RepositoryError::Runtime)?;
316
317        self.repository.fetch_entities(&query)
318    }
319
320    pub fn fetch_entities_with_relation_aggregates<T>(
321        &self,
322        query: &SelectQuery,
323        relation_aggregates: &[RelationAggregate],
324    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
325    where
326        T: Entity,
327    {
328        self.fetch_all_with_relation_aggregates(query, relation_aggregates)?
329            .into_iter()
330            .map(|record| {
331                let mut entity = T::from_record(record)?;
332                let root = crate::EntityRoot::default();
333                entity.on_loaded(&root as &dyn std::any::Any);
334                Ok(entity)
335            })
336            .collect::<Result<Vec<_>, _>>()
337            .map(SmartList::from)
338            .map_err(RepositoryError::Entity)
339    }
340
341    pub fn fetch_enhanced_entities_with_relation_aggregates<T>(
342        &self,
343        query: &SelectQuery,
344        relation_aggregates: &[RelationAggregate],
345    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
346    where
347        T: Entity,
348    {
349        let query = self
350            .prepare_select_query(query)
351            .map_err(RepositoryError::Runtime)?;
352
353
354
355        let mut rows = self.fetch_prepared_all(&query)?;
356        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain)?;
357        self.enhance_query_relations(&mut rows, &query)?;
358        self.enhance_relations(&mut rows)?;
359        rows.into_iter()
360            .map(|record| {
361                let mut entity = T::from_record(record)?;
362                let root = crate::EntityRoot::default();
363                entity.on_loaded(&root as &dyn std::any::Any);
364                Ok(entity)
365            })
366            .collect::<Result<Vec<_>, _>>()
367            .map(SmartList::from)
368            .map_err(RepositoryError::Entity)
369    }
370
371    pub fn fetch_enhanced_entities<T>(
372        &self,
373        query: &SelectQuery,
374    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
375    where
376        T: Entity,
377    {
378        let query = self
379            .prepare_select_query(query)
380            .map_err(RepositoryError::Runtime)?;
381
382
383        let mut rows = self.fetch_prepared_all(&query)?;
384        self.enhance_query_relations(&mut rows, &query)?;
385        self.enhance_relations(&mut rows)?;
386        let root = self.repository.metadata.context.get_resource::<crate::EntityRoot>().cloned();
387
388        rows.into_iter()
389            .map(|record| {
390                let mut entity = T::from_record(record)?;
391                if let Some(ref root) = root {
392                    entity.on_loaded(root as &dyn std::any::Any);
393                }
394                Ok(entity)
395            })
396            .collect::<Result<Vec<_>, _>>()
397            .map(SmartList::from)
398            .map_err(RepositoryError::Entity)
399    }
400
401    pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
402        let command = self
403            .prepare_insert_command(command)
404            .map_err(RepositoryError::Runtime)?;
405        self.execute_prepared_insert_with_comment(command, self.trace_context.clone())
406    }
407
408    pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
409        let command = self
410            .prepare_update_command(command)
411            .map_err(RepositoryError::Runtime)?;
412        self.execute_prepared_update_with_comment(command, self.trace_context.clone())
413    }
414
415    pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
416        self.delete_scoped(command, self.trace_context.clone())
417    }
418
419    pub fn delete_scoped(
420        &self,
421        command: &DeleteCommand,
422        trace_chain: Vec<teaql_core::TraceNode>,
423    ) -> Result<u64, RepositoryError<E::Error>> {
424        let mut command = command.clone();
425        command.trace_chain = trace_chain.clone();
426        if let Some(behavior) = self.behavior() {
427            behavior
428                .before_delete(self.repository.metadata.context, &mut command)
429                .map_err(RepositoryError::Runtime)?;
430        }
431        self.enforce_delete_policy(&mut command)
432            .map_err(RepositoryError::Runtime)?;
433
434        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
435        let affected = self.repository.delete(&command)?;
436
437        let mut event = EntityEvent::deleted_with_old_values(
438            command.entity,
439            command.id,
440            command.expected_version,
441            old_values,
442        );
443        event.trace_chain = trace_chain;
444        self.emit_event(event)
445            .map_err(RepositoryError::Runtime)?;
446        Ok(affected)
447    }
448
449    pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
450        let mut command = command.clone();
451        command.trace_chain = self.trace_context.clone();
452        if let Some(behavior) = self.behavior() {
453            behavior
454                .before_recover(self.repository.metadata.context, &mut command)
455                .map_err(RepositoryError::Runtime)?;
456        }
457        self.enforce_recover_policy(&mut command)
458            .map_err(RepositoryError::Runtime)?;
459        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
460        let affected = self.repository.recover(&command)?;
461        let mut event = EntityEvent::recovered_with_old_values(
462            command.entity,
463            command.id,
464            command.expected_version,
465            old_values,
466        );
467        self.emit_event(event)
468            .map_err(RepositoryError::Runtime)?;
469        Ok(affected)
470    }
471
472    fn emit_event(&self, event: EntityEvent) -> Result<(), RuntimeError> {
473        self.repository.metadata.context.send_event(event)
474    }
475
476    pub(super) fn execute_prepared_insert(
477        &self,
478        command: InsertCommand,
479    ) -> Result<u64, RepositoryError<E::Error>> {
480        self.execute_prepared_insert_with_comment(command, Vec::new())
481    }
482
483    pub(super) fn execute_prepared_insert_with_comment(
484        &self,
485        mut command: InsertCommand,
486        trace_chain: Vec<teaql_core::TraceNode>,
487    ) -> Result<u64, RepositoryError<E::Error>> {
488        command.trace_chain = trace_chain.clone();
489        let affected = self.repository.insert(&command)?;
490        let mut event = EntityEvent::created(command.entity, command.values);
491        event.trace_chain = trace_chain;
492        self.emit_event(event).map_err(RepositoryError::Runtime)?;
493        Ok(affected)
494    }
495
496    pub(super) fn execute_prepared_update(
497        &self,
498        command: UpdateCommand,
499    ) -> Result<u64, RepositoryError<E::Error>> {
500        self.execute_prepared_update_with_comment(command, Vec::new())
501    }
502
503    pub(super) fn execute_prepared_update_with_comment(
504        &self,
505        mut command: UpdateCommand,
506        trace_chain: Vec<teaql_core::TraceNode>,
507    ) -> Result<u64, RepositoryError<E::Error>> {
508        command.trace_chain = trace_chain.clone();
509        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
510        let affected = self.repository.update(&command)?;
511        let updated_fields = command.values.keys().cloned().collect();
512        let mut values = command.values.clone();
513        values.insert("id".to_owned(), command.id.clone());
514        if let Some(version) = command.expected_version {
515            values.insert("version".to_owned(), Value::I64(version + 1));
516        }
517        let mut new_values = old_values.clone().unwrap_or_default();
518        for (field, value) in &values {
519            new_values.insert(field.clone(), value.clone());
520        }
521        let mut event = EntityEvent::updated_with_old_values(
522            command.entity,
523            values,
524            old_values,
525            new_values,
526            updated_fields,
527        );
528        event.trace_chain = trace_chain;
529        self.emit_event(event).map_err(RepositoryError::Runtime)?;
530        Ok(affected)
531    }
532
533    fn fetch_current_event_row(
534        &self,
535        entity: &str,
536        id: &Value,
537    ) -> Result<Option<Record>, RepositoryError<E::Error>> {
538        if self.repository.metadata.context.event_sink.is_none() {
539            return Ok(None);
540        }
541        let descriptor = self
542            .repository
543            .metadata
544            .context
545            .require_entity(entity)
546            .map_err(RepositoryError::Runtime)?;
547        let Some(id_property) = descriptor.id_property() else {
548            return Ok(None);
549        };
550        let mut rows = self.fetch_all(
551            &SelectQuery::new(entity).filter(Expr::eq(id_property.name.clone(), id.clone())),
552        )?;
553        Ok(rows.pop())
554    }
555
556
557    pub fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, D, E> {
558        ResolvedRepository {
559            entity,
560            repository: ContextRepository {
561                metadata: UserContextMetadata {
562                    context: self.repository.metadata.context,
563                },
564                dialect: self.repository.dialect,
565                executor: self.repository.executor,
566            },
567            trace_context: Vec::new(),
568        }
569    }
570}