Skip to main content

teaql_runtime/repository/
resolved.rs

1use std::sync::Arc;
2use std::time::{Instant, SystemTime};
3
4use teaql_core::{
5    AggregationCacheOptions, DeleteCommand, Entity, Expr, InsertCommand, Record, RecoverCommand,
6    RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
7};
8use teaql_sql::{CompiledQuery, SqlDialect};
9
10use crate::{
11    CheckObjectStatus, EntityEvent, RepositoryBehavior, RepositoryError, RuntimeError,
12    SqlLogOperation, clear_record_status, mark_record_status,
13};
14
15use super::{
16    AggregationCacheBackend, ContextRepository, InMemoryAggregationCache, QueryExecutor,
17    ResolvedRepository, UserContextMetadata, helpers::*,
18};
19
20impl<'a, D, E> ResolvedRepository<'a, D, E>
21where
22    D: SqlDialect,
23    E: QueryExecutor,
24{
25    pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
26        self.repository.metadata.context.repository_behavior(entity)
27    }
28
29    pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
30        self.repository
31            .metadata
32            .context
33            .repository_behavior(&self.entity)
34    }
35
36    pub fn entity(&self) -> &str {
37        &self.entity
38    }
39
40    pub fn select(&self) -> SelectQuery {
41        SelectQuery::new(self.entity.clone())
42    }
43
44    pub fn insert_command(&self) -> InsertCommand {
45        InsertCommand::new(self.entity.clone())
46    }
47
48    fn enforce_insert_policy(&self, command: &mut InsertCommand) -> Result<(), RuntimeError> {
49        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
50            policy.enforce_insert(self.repository.metadata.context, command)?;
51        }
52        Ok(())
53    }
54
55    fn enforce_update_policy(&self, command: &mut UpdateCommand) -> Result<(), RuntimeError> {
56        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
57            policy.enforce_update(self.repository.metadata.context, command)?;
58        }
59        Ok(())
60    }
61
62    fn enforce_delete_policy(&self, command: &mut DeleteCommand) -> Result<(), RuntimeError> {
63        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
64            policy.enforce_delete(self.repository.metadata.context, command)?;
65        }
66        Ok(())
67    }
68
69    fn enforce_recover_policy(&self, command: &mut RecoverCommand) -> Result<(), RuntimeError> {
70        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
71            policy.enforce_recover(self.repository.metadata.context, command)?;
72        }
73        Ok(())
74    }
75
76    fn prepare_select_query(&self, query: &SelectQuery) -> Result<SelectQuery, RuntimeError> {
77        let mut query = query.clone();
78        if let Some(behavior) = self.query_behavior(&query.entity) {
79            behavior.before_select(self.repository.metadata.context, &mut query)?;
80        }
81        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
82            policy.enforce_select(self.repository.metadata.context, &mut query)?;
83        }
84        Ok(query)
85    }
86
87    pub fn prepare_insert_command(
88        &self,
89        command: &InsertCommand,
90    ) -> Result<InsertCommand, RuntimeError> {
91        let mut command = command.clone();
92        if let Some(behavior) = self.behavior() {
93            behavior.before_insert(self.repository.metadata.context, &mut command)?;
94        }
95        self.enforce_insert_policy(&mut command)?;
96
97        let entity = self
98            .repository
99            .metadata
100            .context
101            .require_entity(&command.entity)?;
102        if let Some(id_property) = entity.id_property() {
103            let needs_id = !command.values.contains_key(&id_property.name)
104                || is_unassigned_id(command.values.get(&id_property.name));
105            if needs_id {
106                let id = self.repository.metadata.context.next_id(&command.entity)?;
107                command
108                    .values
109                    .insert(id_property.name.clone(), Value::U64(id));
110            }
111        }
112        ensure_initial_version(&mut command.values, entity);
113        mark_record_status(&mut command.values, CheckObjectStatus::Create);
114        let check_result = self
115            .repository
116            .metadata
117            .context
118            .check_and_fix_record(&command.entity, &mut command.values);
119        clear_record_status(&mut command.values);
120        check_result?;
121
122        Ok(command)
123    }
124
125    pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
126        UpdateCommand::new(self.entity.clone(), id)
127    }
128
129    pub fn prepare_update_command(
130        &self,
131        command: &UpdateCommand,
132    ) -> Result<UpdateCommand, RuntimeError> {
133        let mut command = command.clone();
134        if let Some(behavior) = self.behavior() {
135            behavior.before_update(self.repository.metadata.context, &mut command)?;
136        }
137        self.enforce_update_policy(&mut command)?;
138        mark_record_status(&mut command.values, CheckObjectStatus::Update);
139        let check_result = self
140            .repository
141            .metadata
142            .context
143            .check_and_fix_record(&command.entity, &mut command.values);
144        clear_record_status(&mut command.values);
145        check_result?;
146        Ok(command)
147    }
148
149    pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
150        DeleteCommand::new(self.entity.clone(), id)
151    }
152
153    pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
154        RecoverCommand::new(self.entity.clone(), id, expected_version)
155    }
156
157    pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
158        let query = self.prepare_select_query(query)?;
159        self.repository.compile(&query)
160    }
161
162    pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
163        let query = self
164            .prepare_select_query(query)
165            .map_err(RepositoryError::Runtime)?;
166        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
167        let mut rows = self.fetch_prepared_query(&query)?;
168        self.enhance_object_group_bys(&mut rows, &query.object_group_bys)?;
169        self.enhance_child_queries(&mut rows, &query.child_enhancements)?;
170        Ok(rows)
171    }
172
173    fn fetch_prepared_query(
174        &self,
175        query: &SelectQuery,
176    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
177        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
178        let mut compiled = self
179            .repository
180            .compile(query)
181            .map_err(RepositoryError::Runtime)?;
182        compiled.comment = self.repository.resolve_final_comment(query.comment.clone());
183        if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
184            if let Some(cache) = self
185                .repository
186                .metadata
187                .context
188                .get_resource::<Arc<dyn AggregationCacheBackend>>()
189            {
190                return self.fetch_prepared_query_with_cache(
191                    query,
192                    &compiled,
193                    options,
194                    cache.as_ref(),
195                );
196            }
197            if let Some(cache) = self
198                .repository
199                .metadata
200                .context
201                .get_resource::<InMemoryAggregationCache>()
202            {
203                return self.fetch_prepared_query_with_cache(query, &compiled, options, cache);
204            }
205        }
206        let started_at = SystemTime::now();
207        let started = Instant::now();
208        let rows = self
209            .repository
210            .executor
211            .fetch_all(&compiled)
212            .map_err(RepositoryError::Executor)?;
213        self.repository.log_sql_result(
214            SqlLogOperation::Select,
215            &compiled,
216            started_at,
217            started,
218            Some(rows.len()),
219            Some(query.entity.clone()),
220            None,
221            query.comment.clone(),
222        );
223        Ok(rows)
224    }
225
226    fn fetch_prepared_query_with_cache(
227        &self,
228        query: &SelectQuery,
229        compiled: &CompiledQuery,
230        options: AggregationCacheOptions,
231        cache: &dyn AggregationCacheBackend,
232    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
233        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
234        let key = aggregation_cache_key(
235            cache.namespace(),
236            &aggregation_cache_namespace(&query.entity),
237            compiled,
238        );
239        if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
240            return Ok(rows);
241        }
242        let started_at = SystemTime::now();
243        let started = Instant::now();
244        let rows = self
245            .repository
246            .executor
247            .fetch_all(compiled)
248            .map_err(RepositoryError::Executor)?;
249        self.repository.log_sql_result(
250            SqlLogOperation::Select,
251            compiled,
252            started_at,
253            started,
254            Some(rows.len()),
255            Some(query.entity.clone()),
256            None,
257            query.comment.clone(),
258        );
259        cache.put(key, rows.clone());
260        Ok(rows)
261    }
262
263    pub fn fetch_all_with_relation_aggregates(
264        &self,
265        query: &SelectQuery,
266        relation_aggregates: &[RelationAggregate],
267    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
268        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
269        let mut rows = self.fetch_all(query)?;
270        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
271        Ok(rows)
272    }
273
274    pub fn fetch_smart_list(
275        &self,
276        query: &SelectQuery,
277    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
278        let query = self
279            .prepare_select_query(query)
280            .map_err(RepositoryError::Runtime)?;
281        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
282        self.repository.fetch_smart_list(&query)
283    }
284
285    pub fn fetch_smart_list_with_relation_aggregates(
286        &self,
287        query: &SelectQuery,
288        relation_aggregates: &[RelationAggregate],
289    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
290        self.fetch_all_with_relation_aggregates(query, relation_aggregates)
291            .map(SmartList::from)
292    }
293
294    pub fn fetch_entities<T>(
295        &self,
296        query: &SelectQuery,
297    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
298    where
299        T: Entity,
300    {
301        let query = self
302            .prepare_select_query(query)
303            .map_err(RepositoryError::Runtime)?;
304        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
305        self.repository.fetch_entities(&query)
306    }
307
308    pub fn fetch_entities_with_relation_aggregates<T>(
309        &self,
310        query: &SelectQuery,
311        relation_aggregates: &[RelationAggregate],
312    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
313    where
314        T: Entity,
315    {
316        self.fetch_all_with_relation_aggregates(query, relation_aggregates)?
317            .into_iter()
318            .map(T::from_record)
319            .collect::<Result<Vec<_>, _>>()
320            .map(SmartList::from)
321            .map_err(RepositoryError::Entity)
322    }
323
324    pub fn fetch_enhanced_entities_with_relation_aggregates<T>(
325        &self,
326        query: &SelectQuery,
327        relation_aggregates: &[RelationAggregate],
328    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
329    where
330        T: Entity,
331    {
332        let query = self
333            .prepare_select_query(query)
334            .map_err(RepositoryError::Runtime)?;
335
336        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
337
338        let mut rows = self.repository.fetch_all(&query)?;
339        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
340        self.enhance_query_relations(&mut rows, &query)?;
341        self.enhance_relations(&mut rows)?;
342        rows.into_iter()
343            .map(T::from_record)
344            .collect::<Result<Vec<_>, _>>()
345            .map(SmartList::from)
346            .map_err(RepositoryError::Entity)
347    }
348
349    pub fn fetch_enhanced_entities<T>(
350        &self,
351        query: &SelectQuery,
352    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
353    where
354        T: Entity,
355    {
356        let query = self
357            .prepare_select_query(query)
358            .map_err(RepositoryError::Runtime)?;
359        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
360
361        let mut rows = self.repository.fetch_all(&query)?;
362        self.enhance_query_relations(&mut rows, &query)?;
363        self.enhance_relations(&mut rows)?;
364        rows.into_iter()
365            .map(T::from_record)
366            .collect::<Result<Vec<_>, _>>()
367            .map(SmartList::from)
368            .map_err(RepositoryError::Entity)
369    }
370
371    pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
372        let command = self
373            .prepare_insert_command(command)
374            .map_err(RepositoryError::Runtime)?;
375        self.execute_prepared_insert(command)
376    }
377
378    pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
379        let command = self
380            .prepare_update_command(command)
381            .map_err(RepositoryError::Runtime)?;
382        self.execute_prepared_update(command)
383    }
384
385    pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
386        let mut command = command.clone();
387        if let Some(behavior) = self.behavior() {
388            behavior
389                .before_delete(self.repository.metadata.context, &mut command)
390                .map_err(RepositoryError::Runtime)?;
391        }
392        self.enforce_delete_policy(&mut command)
393            .map_err(RepositoryError::Runtime)?;
394        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
395        let affected = self.repository.delete(&command)?;
396        self.emit_event(EntityEvent::deleted_with_old_values(
397            command.entity,
398            command.id,
399            command.expected_version,
400            old_values,
401        ))
402        .map_err(RepositoryError::Runtime)?;
403        Ok(affected)
404    }
405
406    pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
407        let mut command = command.clone();
408        if let Some(behavior) = self.behavior() {
409            behavior
410                .before_recover(self.repository.metadata.context, &mut command)
411                .map_err(RepositoryError::Runtime)?;
412        }
413        self.enforce_recover_policy(&mut command)
414            .map_err(RepositoryError::Runtime)?;
415        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
416        let affected = self.repository.recover(&command)?;
417        self.emit_event(EntityEvent::recovered_with_old_values(
418            command.entity,
419            command.id,
420            command.expected_version,
421            old_values,
422        ))
423        .map_err(RepositoryError::Runtime)?;
424        Ok(affected)
425    }
426
427    fn emit_event(&self, event: EntityEvent) -> Result<(), RuntimeError> {
428        self.repository.metadata.context.send_event(event)
429    }
430
431    pub(super) fn execute_prepared_insert(
432        &self,
433        command: InsertCommand,
434    ) -> Result<u64, RepositoryError<E::Error>> {
435        let affected = self.repository.insert(&command)?;
436        self.emit_event(EntityEvent::created(command.entity, command.values))
437            .map_err(RepositoryError::Runtime)?;
438        Ok(affected)
439    }
440
441    pub(super) fn execute_prepared_update(
442        &self,
443        command: UpdateCommand,
444    ) -> Result<u64, RepositoryError<E::Error>> {
445        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
446        let affected = self.repository.update(&command)?;
447        let updated_fields = command.values.keys().cloned().collect();
448        let mut values = command.values.clone();
449        values.insert("id".to_owned(), command.id.clone());
450        if let Some(version) = command.expected_version {
451            values.insert("version".to_owned(), Value::I64(version + 1));
452        }
453        let mut new_values = old_values.clone().unwrap_or_default();
454        for (field, value) in &values {
455            new_values.insert(field.clone(), value.clone());
456        }
457        self.emit_event(EntityEvent::updated_with_old_values(
458            command.entity,
459            values,
460            old_values,
461            new_values,
462            updated_fields,
463        ))
464        .map_err(RepositoryError::Runtime)?;
465        Ok(affected)
466    }
467
468    fn fetch_current_event_row(
469        &self,
470        entity: &str,
471        id: &Value,
472    ) -> Result<Option<Record>, RepositoryError<E::Error>> {
473        if self.repository.metadata.context.event_sink.is_none() {
474            return Ok(None);
475        }
476        let descriptor = self
477            .repository
478            .metadata
479            .context
480            .require_entity(entity)
481            .map_err(RepositoryError::Runtime)?;
482        let Some(id_property) = descriptor.id_property() else {
483            return Ok(None);
484        };
485        let mut rows = self.fetch_all(
486            &SelectQuery::new(entity).filter(Expr::eq(id_property.name.clone(), id.clone())),
487        )?;
488        Ok(rows.pop())
489    }
490
491    pub(super) fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, D, E> {
492        ResolvedRepository {
493            entity,
494            repository: ContextRepository {
495                metadata: UserContextMetadata {
496                    context: self.repository.metadata.context,
497                },
498                dialect: self.repository.dialect,
499                executor: self.repository.executor,
500            },
501        }
502    }
503}