Skip to main content

teaql_runtime/repository/
resolved.rs

1use std::sync::Arc;
2use std::time::{Instant, SystemTime};
3
4use teaql_core::{
5    AggregationCacheOptions, DeleteCommand, Entity, Expr, InsertCommand, Record, RecoverCommand,
6    RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
7};
8use teaql_sql::{CompiledQuery, SqlDialect};
9
10use crate::{
11    CheckObjectStatus, EntityEvent, RepositoryBehavior, RepositoryError, RuntimeError,
12    SqlLogOperation, clear_record_status, mark_record_status,
13};
14
15use super::{
16    AggregationCacheBackend, ContextRepository, InMemoryAggregationCache, QueryExecutor,
17    ResolvedRepository, UserContextMetadata, helpers::*,
18};
19
20impl<'a, D, E> ResolvedRepository<'a, D, E>
21where
22    D: SqlDialect,
23    E: QueryExecutor,
24{
25    pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
26        self.repository.metadata.context.repository_behavior(entity)
27    }
28
29    pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
30        self.repository
31            .metadata
32            .context
33            .repository_behavior(&self.entity)
34    }
35
36    pub fn entity(&self) -> &str {
37        &self.entity
38    }
39
40    pub fn select(&self) -> SelectQuery {
41        SelectQuery::new(self.entity.clone())
42    }
43
44    pub fn insert_command(&self) -> InsertCommand {
45        InsertCommand::new(self.entity.clone())
46    }
47
48    fn enforce_insert_policy(&self, command: &mut InsertCommand) -> Result<(), RuntimeError> {
49        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
50            policy.enforce_insert(self.repository.metadata.context, command)?;
51        }
52        Ok(())
53    }
54
55    fn enforce_update_policy(&self, command: &mut UpdateCommand) -> Result<(), RuntimeError> {
56        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
57            policy.enforce_update(self.repository.metadata.context, command)?;
58        }
59        Ok(())
60    }
61
62    fn enforce_delete_policy(&self, command: &mut DeleteCommand) -> Result<(), RuntimeError> {
63        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
64            policy.enforce_delete(self.repository.metadata.context, command)?;
65        }
66        Ok(())
67    }
68
69    fn enforce_recover_policy(&self, command: &mut RecoverCommand) -> Result<(), RuntimeError> {
70        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
71            policy.enforce_recover(self.repository.metadata.context, command)?;
72        }
73        Ok(())
74    }
75
76    fn prepare_select_query(&self, query: &SelectQuery) -> Result<SelectQuery, RuntimeError> {
77        let mut query = query.clone();
78        if let Some(behavior) = self.query_behavior(&query.entity) {
79            behavior.before_select(self.repository.metadata.context, &mut query)?;
80        }
81        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
82            policy.enforce_select(self.repository.metadata.context, &mut query)?;
83        }
84        Ok(query)
85    }
86
87    pub fn prepare_insert_command(
88        &self,
89        command: &InsertCommand,
90    ) -> Result<InsertCommand, RuntimeError> {
91        let mut command = command.clone();
92        if let Some(behavior) = self.behavior() {
93            behavior.before_insert(self.repository.metadata.context, &mut command)?;
94        }
95        self.enforce_insert_policy(&mut command)?;
96
97        let entity = self
98            .repository
99            .metadata
100            .context
101            .require_entity(&command.entity)?;
102        if let Some(id_property) = entity.id_property() {
103            let needs_id = !command.values.contains_key(&id_property.name)
104                || is_unassigned_id(command.values.get(&id_property.name));
105            if needs_id {
106                let id = self.repository.metadata.context.next_id(&command.entity)?;
107                command
108                    .values
109                    .insert(id_property.name.clone(), Value::U64(id));
110            }
111        }
112        mark_record_status(&mut command.values, CheckObjectStatus::Create);
113        let check_result = self
114            .repository
115            .metadata
116            .context
117            .check_and_fix_record(&command.entity, &mut command.values);
118        clear_record_status(&mut command.values);
119        check_result?;
120
121        Ok(command)
122    }
123
124    pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
125        UpdateCommand::new(self.entity.clone(), id)
126    }
127
128    pub fn prepare_update_command(
129        &self,
130        command: &UpdateCommand,
131    ) -> Result<UpdateCommand, RuntimeError> {
132        let mut command = command.clone();
133        if let Some(behavior) = self.behavior() {
134            behavior.before_update(self.repository.metadata.context, &mut command)?;
135        }
136        self.enforce_update_policy(&mut command)?;
137        mark_record_status(&mut command.values, CheckObjectStatus::Update);
138        let check_result = self
139            .repository
140            .metadata
141            .context
142            .check_and_fix_record(&command.entity, &mut command.values);
143        clear_record_status(&mut command.values);
144        check_result?;
145        Ok(command)
146    }
147
148    pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
149        DeleteCommand::new(self.entity.clone(), id)
150    }
151
152    pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
153        RecoverCommand::new(self.entity.clone(), id, expected_version)
154    }
155
156    pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
157        let query = self.prepare_select_query(query)?;
158        self.repository.compile(&query)
159    }
160
161    pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
162        let query = self
163            .prepare_select_query(query)
164            .map_err(RepositoryError::Runtime)?;
165        let mut rows = self.fetch_prepared_query(&query)?;
166        self.enhance_object_group_bys(&mut rows, &query.object_group_bys)?;
167        self.enhance_child_queries(&mut rows, &query.child_enhancements)?;
168        Ok(rows)
169    }
170
171    fn fetch_prepared_query(
172        &self,
173        query: &SelectQuery,
174    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
175        let compiled = self
176            .repository
177            .compile(query)
178            .map_err(RepositoryError::Runtime)?;
179        if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
180            if let Some(cache) = self
181                .repository
182                .metadata
183                .context
184                .get_resource::<Arc<dyn AggregationCacheBackend>>()
185            {
186                return self.fetch_prepared_query_with_cache(
187                    query,
188                    &compiled,
189                    options,
190                    cache.as_ref(),
191                );
192            }
193            if let Some(cache) = self
194                .repository
195                .metadata
196                .context
197                .get_resource::<InMemoryAggregationCache>()
198            {
199                return self.fetch_prepared_query_with_cache(query, &compiled, options, cache);
200            }
201        }
202        let started_at = SystemTime::now();
203        let started = Instant::now();
204        let rows = self
205            .repository
206            .executor
207            .fetch_all(&compiled)
208            .map_err(RepositoryError::Executor)?;
209        self.repository.log_sql_result(
210            SqlLogOperation::Select,
211            &compiled,
212            started_at,
213            started,
214            Some(rows.len()),
215            Some(query.entity.clone()),
216            None,
217        );
218        Ok(rows)
219    }
220
221    fn fetch_prepared_query_with_cache(
222        &self,
223        query: &SelectQuery,
224        compiled: &CompiledQuery,
225        options: AggregationCacheOptions,
226        cache: &dyn AggregationCacheBackend,
227    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
228        let key = aggregation_cache_key(
229            cache.namespace(),
230            &aggregation_cache_namespace(&query.entity),
231            compiled,
232        );
233        if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
234            return Ok(rows);
235        }
236        let started_at = SystemTime::now();
237        let started = Instant::now();
238        let rows = self
239            .repository
240            .executor
241            .fetch_all(compiled)
242            .map_err(RepositoryError::Executor)?;
243        self.repository.log_sql_result(
244            SqlLogOperation::Select,
245            compiled,
246            started_at,
247            started,
248            Some(rows.len()),
249            Some(query.entity.clone()),
250            None,
251        );
252        cache.put(key, rows.clone());
253        Ok(rows)
254    }
255
256    pub fn fetch_all_with_relation_aggregates(
257        &self,
258        query: &SelectQuery,
259        relation_aggregates: &[RelationAggregate],
260    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
261        let mut rows = self.fetch_all(query)?;
262        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
263        Ok(rows)
264    }
265
266    pub fn fetch_smart_list(
267        &self,
268        query: &SelectQuery,
269    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
270        let query = self
271            .prepare_select_query(query)
272            .map_err(RepositoryError::Runtime)?;
273        self.repository.fetch_smart_list(&query)
274    }
275
276    pub fn fetch_smart_list_with_relation_aggregates(
277        &self,
278        query: &SelectQuery,
279        relation_aggregates: &[RelationAggregate],
280    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
281        self.fetch_all_with_relation_aggregates(query, relation_aggregates)
282            .map(SmartList::from)
283    }
284
285    pub fn fetch_entities<T>(
286        &self,
287        query: &SelectQuery,
288    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
289    where
290        T: Entity,
291    {
292        let query = self
293            .prepare_select_query(query)
294            .map_err(RepositoryError::Runtime)?;
295        self.repository.fetch_entities(&query)
296    }
297
298    pub fn fetch_entities_with_relation_aggregates<T>(
299        &self,
300        query: &SelectQuery,
301        relation_aggregates: &[RelationAggregate],
302    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
303    where
304        T: Entity,
305    {
306        self.fetch_all_with_relation_aggregates(query, relation_aggregates)?
307            .into_iter()
308            .map(T::from_record)
309            .collect::<Result<Vec<_>, _>>()
310            .map(SmartList::from)
311            .map_err(RepositoryError::Entity)
312    }
313
314    pub fn fetch_enhanced_entities_with_relation_aggregates<T>(
315        &self,
316        query: &SelectQuery,
317        relation_aggregates: &[RelationAggregate],
318    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
319    where
320        T: Entity,
321    {
322        let query = self
323            .prepare_select_query(query)
324            .map_err(RepositoryError::Runtime)?;
325
326        let mut rows = self.repository.fetch_all(&query)?;
327        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
328        self.enhance_query_relations(&mut rows, &query)?;
329        self.enhance_relations(&mut rows)?;
330        rows.into_iter()
331            .map(T::from_record)
332            .collect::<Result<Vec<_>, _>>()
333            .map(SmartList::from)
334            .map_err(RepositoryError::Entity)
335    }
336
337    pub fn fetch_enhanced_entities<T>(
338        &self,
339        query: &SelectQuery,
340    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
341    where
342        T: Entity,
343    {
344        let query = self
345            .prepare_select_query(query)
346            .map_err(RepositoryError::Runtime)?;
347
348        let mut rows = self.repository.fetch_all(&query)?;
349        self.enhance_query_relations(&mut rows, &query)?;
350        self.enhance_relations(&mut rows)?;
351        rows.into_iter()
352            .map(T::from_record)
353            .collect::<Result<Vec<_>, _>>()
354            .map(SmartList::from)
355            .map_err(RepositoryError::Entity)
356    }
357
358    pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
359        let command = self
360            .prepare_insert_command(command)
361            .map_err(RepositoryError::Runtime)?;
362        self.execute_prepared_insert(command)
363    }
364
365    pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
366        let command = self
367            .prepare_update_command(command)
368            .map_err(RepositoryError::Runtime)?;
369        self.execute_prepared_update(command)
370    }
371
372    pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
373        let mut command = command.clone();
374        if let Some(behavior) = self.behavior() {
375            behavior
376                .before_delete(self.repository.metadata.context, &mut command)
377                .map_err(RepositoryError::Runtime)?;
378        }
379        self.enforce_delete_policy(&mut command)
380            .map_err(RepositoryError::Runtime)?;
381        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
382        let affected = self.repository.delete(&command)?;
383        self.emit_event(EntityEvent::deleted_with_old_values(
384            command.entity,
385            command.id,
386            command.expected_version,
387            old_values,
388        ))
389        .map_err(RepositoryError::Runtime)?;
390        Ok(affected)
391    }
392
393    pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
394        let mut command = command.clone();
395        if let Some(behavior) = self.behavior() {
396            behavior
397                .before_recover(self.repository.metadata.context, &mut command)
398                .map_err(RepositoryError::Runtime)?;
399        }
400        self.enforce_recover_policy(&mut command)
401            .map_err(RepositoryError::Runtime)?;
402        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
403        let affected = self.repository.recover(&command)?;
404        self.emit_event(EntityEvent::recovered_with_old_values(
405            command.entity,
406            command.id,
407            command.expected_version,
408            old_values,
409        ))
410        .map_err(RepositoryError::Runtime)?;
411        Ok(affected)
412    }
413
414    fn emit_event(&self, event: EntityEvent) -> Result<(), RuntimeError> {
415        self.repository.metadata.context.send_event(event)
416    }
417
418    pub(super) fn execute_prepared_insert(
419        &self,
420        command: InsertCommand,
421    ) -> Result<u64, RepositoryError<E::Error>> {
422        let affected = self.repository.insert(&command)?;
423        self.emit_event(EntityEvent::created(command.entity, command.values))
424            .map_err(RepositoryError::Runtime)?;
425        Ok(affected)
426    }
427
428    pub(super) fn execute_prepared_update(
429        &self,
430        command: UpdateCommand,
431    ) -> Result<u64, RepositoryError<E::Error>> {
432        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
433        let affected = self.repository.update(&command)?;
434        let updated_fields = command.values.keys().cloned().collect();
435        let mut values = command.values.clone();
436        values.insert("id".to_owned(), command.id.clone());
437        if let Some(version) = command.expected_version {
438            values.insert("version".to_owned(), Value::I64(version + 1));
439        }
440        let mut new_values = old_values.clone().unwrap_or_default();
441        for (field, value) in &values {
442            new_values.insert(field.clone(), value.clone());
443        }
444        self.emit_event(EntityEvent::updated_with_old_values(
445            command.entity,
446            values,
447            old_values,
448            new_values,
449            updated_fields,
450        ))
451        .map_err(RepositoryError::Runtime)?;
452        Ok(affected)
453    }
454
455    fn fetch_current_event_row(
456        &self,
457        entity: &str,
458        id: &Value,
459    ) -> Result<Option<Record>, RepositoryError<E::Error>> {
460        if self.repository.metadata.context.event_sink.is_none() {
461            return Ok(None);
462        }
463        let descriptor = self
464            .repository
465            .metadata
466            .context
467            .require_entity(entity)
468            .map_err(RepositoryError::Runtime)?;
469        let Some(id_property) = descriptor.id_property() else {
470            return Ok(None);
471        };
472        let mut rows = self.fetch_all(
473            &SelectQuery::new(entity).filter(Expr::eq(id_property.name.clone(), id.clone())),
474        )?;
475        Ok(rows.pop())
476    }
477
478    pub(super) fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, D, E> {
479        ResolvedRepository {
480            entity,
481            repository: ContextRepository {
482                metadata: UserContextMetadata {
483                    context: self.repository.metadata.context,
484                },
485                dialect: self.repository.dialect,
486                executor: self.repository.executor,
487            },
488        }
489    }
490}
491
492fn is_unassigned_id(value: Option<&Value>) -> bool {
493    matches!(
494        value,
495        None | Some(Value::Null) | Some(Value::U64(0)) | Some(Value::I64(0))
496    )
497}