Skip to main content

teaql_runtime/repository/
resolved.rs

1use std::sync::Arc;
2use std::time::{Instant, SystemTime};
3
4use teaql_core::{
5    AggregationCacheOptions, DeleteCommand, Entity, Expr, InsertCommand, Record, RecoverCommand,
6    RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
7};
8use teaql_sql::{CompiledQuery, SqlDialect};
9
10use crate::{
11    CheckObjectStatus, EntityEvent, RepositoryBehavior, RepositoryError, RuntimeError,
12    SqlLogOperation, clear_record_status, mark_record_status,
13};
14
15use super::{
16    AggregationCacheBackend, ContextRepository, InMemoryAggregationCache, QueryExecutor,
17    ResolvedRepository, UserContextMetadata, helpers::*,
18};
19
20impl<'a, D, E> ResolvedRepository<'a, D, E>
21where
22    D: SqlDialect,
23    E: QueryExecutor,
24{
25    pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
26        self.repository.metadata.context.repository_behavior(entity)
27    }
28
29    pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
30        self.repository
31            .metadata
32            .context
33            .repository_behavior(&self.entity)
34    }
35
36    pub fn entity(&self) -> &str {
37        &self.entity
38    }
39
40    pub fn select(&self) -> SelectQuery {
41        SelectQuery::new(self.entity.clone())
42    }
43
44    pub fn insert_command(&self) -> InsertCommand {
45        InsertCommand::new(self.entity.clone())
46    }
47
48    fn enforce_insert_policy(&self, command: &mut InsertCommand) -> Result<(), RuntimeError> {
49        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
50            policy.enforce_insert(self.repository.metadata.context, command)?;
51        }
52        Ok(())
53    }
54
55    fn enforce_update_policy(&self, command: &mut UpdateCommand) -> Result<(), RuntimeError> {
56        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
57            policy.enforce_update(self.repository.metadata.context, command)?;
58        }
59        Ok(())
60    }
61
62    fn enforce_delete_policy(&self, command: &mut DeleteCommand) -> Result<(), RuntimeError> {
63        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
64            policy.enforce_delete(self.repository.metadata.context, command)?;
65        }
66        Ok(())
67    }
68
69    fn enforce_recover_policy(&self, command: &mut RecoverCommand) -> Result<(), RuntimeError> {
70        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
71            policy.enforce_recover(self.repository.metadata.context, command)?;
72        }
73        Ok(())
74    }
75
76    fn prepare_select_query(&self, query: &SelectQuery) -> Result<SelectQuery, RuntimeError> {
77        let mut query = query.clone();
78        if let Some(behavior) = self.query_behavior(&query.entity) {
79            behavior.before_select(self.repository.metadata.context, &mut query)?;
80        }
81        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
82            policy.enforce_select(self.repository.metadata.context, &mut query)?;
83        }
84        Ok(query)
85    }
86
87    pub fn prepare_insert_command(
88        &self,
89        command: &InsertCommand,
90    ) -> Result<InsertCommand, RuntimeError> {
91        let mut command = command.clone();
92        if let Some(behavior) = self.behavior() {
93            behavior.before_insert(self.repository.metadata.context, &mut command)?;
94        }
95        self.enforce_insert_policy(&mut command)?;
96
97        let entity = self
98            .repository
99            .metadata
100            .context
101            .require_entity(&command.entity)?;
102        if let Some(id_property) = entity.id_property() {
103            let needs_id = !command.values.contains_key(&id_property.name)
104                || is_unassigned_id(command.values.get(&id_property.name));
105            if needs_id {
106                let id = self.repository.metadata.context.next_id(&command.entity)?;
107                command
108                    .values
109                    .insert(id_property.name.clone(), Value::U64(id));
110            }
111        }
112        ensure_initial_version(&mut command.values, entity);
113        mark_record_status(&mut command.values, CheckObjectStatus::Create);
114        let check_result = self
115            .repository
116            .metadata
117            .context
118            .check_and_fix_record(&command.entity, &mut command.values);
119        clear_record_status(&mut command.values);
120        check_result?;
121
122        Ok(command)
123    }
124
125    pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
126        UpdateCommand::new(self.entity.clone(), id)
127    }
128
129    pub fn prepare_update_command(
130        &self,
131        command: &UpdateCommand,
132    ) -> Result<UpdateCommand, RuntimeError> {
133        let mut command = command.clone();
134        if let Some(behavior) = self.behavior() {
135            behavior.before_update(self.repository.metadata.context, &mut command)?;
136        }
137        self.enforce_update_policy(&mut command)?;
138        mark_record_status(&mut command.values, CheckObjectStatus::Update);
139        let check_result = self
140            .repository
141            .metadata
142            .context
143            .check_and_fix_record(&command.entity, &mut command.values);
144        clear_record_status(&mut command.values);
145        check_result?;
146        Ok(command)
147    }
148
149    pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
150        DeleteCommand::new(self.entity.clone(), id)
151    }
152
153    pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
154        RecoverCommand::new(self.entity.clone(), id, expected_version)
155    }
156
157    pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
158        let query = self.prepare_select_query(query)?;
159        self.repository.compile(&query)
160    }
161
162    pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
163        let query = self
164            .prepare_select_query(query)
165            .map_err(RepositoryError::Runtime)?;
166        let mut rows = self.fetch_prepared_query(&query)?;
167        self.enhance_object_group_bys(&mut rows, &query.object_group_bys)?;
168        self.enhance_child_queries(&mut rows, &query.child_enhancements)?;
169        Ok(rows)
170    }
171
172    fn fetch_prepared_query(
173        &self,
174        query: &SelectQuery,
175    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
176        let compiled = self
177            .repository
178            .compile(query)
179            .map_err(RepositoryError::Runtime)?;
180        if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
181            if let Some(cache) = self
182                .repository
183                .metadata
184                .context
185                .get_resource::<Arc<dyn AggregationCacheBackend>>()
186            {
187                return self.fetch_prepared_query_with_cache(
188                    query,
189                    &compiled,
190                    options,
191                    cache.as_ref(),
192                );
193            }
194            if let Some(cache) = self
195                .repository
196                .metadata
197                .context
198                .get_resource::<InMemoryAggregationCache>()
199            {
200                return self.fetch_prepared_query_with_cache(query, &compiled, options, cache);
201            }
202        }
203        let started_at = SystemTime::now();
204        let started = Instant::now();
205        let rows = self
206            .repository
207            .executor
208            .fetch_all(&compiled)
209            .map_err(RepositoryError::Executor)?;
210        self.repository.log_sql_result(
211            SqlLogOperation::Select,
212            &compiled,
213            started_at,
214            started,
215            Some(rows.len()),
216            Some(query.entity.clone()),
217            None,
218        );
219        Ok(rows)
220    }
221
222    fn fetch_prepared_query_with_cache(
223        &self,
224        query: &SelectQuery,
225        compiled: &CompiledQuery,
226        options: AggregationCacheOptions,
227        cache: &dyn AggregationCacheBackend,
228    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
229        let key = aggregation_cache_key(
230            cache.namespace(),
231            &aggregation_cache_namespace(&query.entity),
232            compiled,
233        );
234        if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
235            return Ok(rows);
236        }
237        let started_at = SystemTime::now();
238        let started = Instant::now();
239        let rows = self
240            .repository
241            .executor
242            .fetch_all(compiled)
243            .map_err(RepositoryError::Executor)?;
244        self.repository.log_sql_result(
245            SqlLogOperation::Select,
246            compiled,
247            started_at,
248            started,
249            Some(rows.len()),
250            Some(query.entity.clone()),
251            None,
252        );
253        cache.put(key, rows.clone());
254        Ok(rows)
255    }
256
257    pub fn fetch_all_with_relation_aggregates(
258        &self,
259        query: &SelectQuery,
260        relation_aggregates: &[RelationAggregate],
261    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
262        let mut rows = self.fetch_all(query)?;
263        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
264        Ok(rows)
265    }
266
267    pub fn fetch_smart_list(
268        &self,
269        query: &SelectQuery,
270    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
271        let query = self
272            .prepare_select_query(query)
273            .map_err(RepositoryError::Runtime)?;
274        self.repository.fetch_smart_list(&query)
275    }
276
277    pub fn fetch_smart_list_with_relation_aggregates(
278        &self,
279        query: &SelectQuery,
280        relation_aggregates: &[RelationAggregate],
281    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
282        self.fetch_all_with_relation_aggregates(query, relation_aggregates)
283            .map(SmartList::from)
284    }
285
286    pub fn fetch_entities<T>(
287        &self,
288        query: &SelectQuery,
289    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
290    where
291        T: Entity,
292    {
293        let query = self
294            .prepare_select_query(query)
295            .map_err(RepositoryError::Runtime)?;
296        self.repository.fetch_entities(&query)
297    }
298
299    pub fn fetch_entities_with_relation_aggregates<T>(
300        &self,
301        query: &SelectQuery,
302        relation_aggregates: &[RelationAggregate],
303    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
304    where
305        T: Entity,
306    {
307        self.fetch_all_with_relation_aggregates(query, relation_aggregates)?
308            .into_iter()
309            .map(T::from_record)
310            .collect::<Result<Vec<_>, _>>()
311            .map(SmartList::from)
312            .map_err(RepositoryError::Entity)
313    }
314
315    pub fn fetch_enhanced_entities_with_relation_aggregates<T>(
316        &self,
317        query: &SelectQuery,
318        relation_aggregates: &[RelationAggregate],
319    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
320    where
321        T: Entity,
322    {
323        let query = self
324            .prepare_select_query(query)
325            .map_err(RepositoryError::Runtime)?;
326
327        let mut rows = self.repository.fetch_all(&query)?;
328        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
329        self.enhance_query_relations(&mut rows, &query)?;
330        self.enhance_relations(&mut rows)?;
331        rows.into_iter()
332            .map(T::from_record)
333            .collect::<Result<Vec<_>, _>>()
334            .map(SmartList::from)
335            .map_err(RepositoryError::Entity)
336    }
337
338    pub fn fetch_enhanced_entities<T>(
339        &self,
340        query: &SelectQuery,
341    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
342    where
343        T: Entity,
344    {
345        let query = self
346            .prepare_select_query(query)
347            .map_err(RepositoryError::Runtime)?;
348
349        let mut rows = self.repository.fetch_all(&query)?;
350        self.enhance_query_relations(&mut rows, &query)?;
351        self.enhance_relations(&mut rows)?;
352        rows.into_iter()
353            .map(T::from_record)
354            .collect::<Result<Vec<_>, _>>()
355            .map(SmartList::from)
356            .map_err(RepositoryError::Entity)
357    }
358
359    pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
360        let command = self
361            .prepare_insert_command(command)
362            .map_err(RepositoryError::Runtime)?;
363        self.execute_prepared_insert(command)
364    }
365
366    pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
367        let command = self
368            .prepare_update_command(command)
369            .map_err(RepositoryError::Runtime)?;
370        self.execute_prepared_update(command)
371    }
372
373    pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
374        let mut command = command.clone();
375        if let Some(behavior) = self.behavior() {
376            behavior
377                .before_delete(self.repository.metadata.context, &mut command)
378                .map_err(RepositoryError::Runtime)?;
379        }
380        self.enforce_delete_policy(&mut command)
381            .map_err(RepositoryError::Runtime)?;
382        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
383        let affected = self.repository.delete(&command)?;
384        self.emit_event(EntityEvent::deleted_with_old_values(
385            command.entity,
386            command.id,
387            command.expected_version,
388            old_values,
389        ))
390        .map_err(RepositoryError::Runtime)?;
391        Ok(affected)
392    }
393
394    pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
395        let mut command = command.clone();
396        if let Some(behavior) = self.behavior() {
397            behavior
398                .before_recover(self.repository.metadata.context, &mut command)
399                .map_err(RepositoryError::Runtime)?;
400        }
401        self.enforce_recover_policy(&mut command)
402            .map_err(RepositoryError::Runtime)?;
403        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
404        let affected = self.repository.recover(&command)?;
405        self.emit_event(EntityEvent::recovered_with_old_values(
406            command.entity,
407            command.id,
408            command.expected_version,
409            old_values,
410        ))
411        .map_err(RepositoryError::Runtime)?;
412        Ok(affected)
413    }
414
415    fn emit_event(&self, event: EntityEvent) -> Result<(), RuntimeError> {
416        self.repository.metadata.context.send_event(event)
417    }
418
419    pub(super) fn execute_prepared_insert(
420        &self,
421        command: InsertCommand,
422    ) -> Result<u64, RepositoryError<E::Error>> {
423        let affected = self.repository.insert(&command)?;
424        self.emit_event(EntityEvent::created(command.entity, command.values))
425            .map_err(RepositoryError::Runtime)?;
426        Ok(affected)
427    }
428
429    pub(super) fn execute_prepared_update(
430        &self,
431        command: UpdateCommand,
432    ) -> Result<u64, RepositoryError<E::Error>> {
433        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
434        let affected = self.repository.update(&command)?;
435        let updated_fields = command.values.keys().cloned().collect();
436        let mut values = command.values.clone();
437        values.insert("id".to_owned(), command.id.clone());
438        if let Some(version) = command.expected_version {
439            values.insert("version".to_owned(), Value::I64(version + 1));
440        }
441        let mut new_values = old_values.clone().unwrap_or_default();
442        for (field, value) in &values {
443            new_values.insert(field.clone(), value.clone());
444        }
445        self.emit_event(EntityEvent::updated_with_old_values(
446            command.entity,
447            values,
448            old_values,
449            new_values,
450            updated_fields,
451        ))
452        .map_err(RepositoryError::Runtime)?;
453        Ok(affected)
454    }
455
456    fn fetch_current_event_row(
457        &self,
458        entity: &str,
459        id: &Value,
460    ) -> Result<Option<Record>, RepositoryError<E::Error>> {
461        if self.repository.metadata.context.event_sink.is_none() {
462            return Ok(None);
463        }
464        let descriptor = self
465            .repository
466            .metadata
467            .context
468            .require_entity(entity)
469            .map_err(RepositoryError::Runtime)?;
470        let Some(id_property) = descriptor.id_property() else {
471            return Ok(None);
472        };
473        let mut rows = self.fetch_all(
474            &SelectQuery::new(entity).filter(Expr::eq(id_property.name.clone(), id.clone())),
475        )?;
476        Ok(rows.pop())
477    }
478
479    pub(super) fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, D, E> {
480        ResolvedRepository {
481            entity,
482            repository: ContextRepository {
483                metadata: UserContextMetadata {
484                    context: self.repository.metadata.context,
485                },
486                dialect: self.repository.dialect,
487                executor: self.repository.executor,
488            },
489        }
490    }
491}