Skip to main content

teaql_runtime/repository/
resolved.rs

1use std::sync::Arc;
2use std::time::{Instant, SystemTime};
3
4use teaql_core::{
5    AggregationCacheOptions, DeleteCommand, Entity, Expr, InsertCommand, Record, RecoverCommand,
6    RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
7};
8use teaql_sql::{CompiledQuery, SqlDialect};
9
10use crate::{
11    CheckObjectStatus, EntityEvent, RepositoryBehavior, RepositoryError, RuntimeError,
12    SqlLogOperation, clear_record_status, mark_record_status,
13};
14
15use super::{
16    AggregationCacheBackend, ContextRepository, InMemoryAggregationCache, QueryExecutor,
17    ResolvedRepository, UserContextMetadata, helpers::*,
18};
19
20impl<'a, D, E> ResolvedRepository<'a, D, E>
21where
22    D: SqlDialect,
23    E: QueryExecutor,
24{
25    pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
26        self.repository.metadata.context.repository_behavior(entity)
27    }
28
29    pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
30        self.repository
31            .metadata
32            .context
33            .repository_behavior(&self.entity)
34    }
35
36    pub fn entity(&self) -> &str {
37        &self.entity
38    }
39
40    pub fn select(&self) -> SelectQuery {
41        SelectQuery::new(self.entity.clone())
42    }
43
44    pub fn insert_command(&self) -> InsertCommand {
45        InsertCommand::new(self.entity.clone())
46    }
47
48    pub fn prepare_insert_command(
49        &self,
50        command: &InsertCommand,
51    ) -> Result<InsertCommand, RuntimeError> {
52        let mut command = command.clone();
53        if let Some(behavior) = self.behavior() {
54            behavior.before_insert(self.repository.metadata.context, &mut command)?;
55        }
56
57        let entity = self
58            .repository
59            .metadata
60            .context
61            .require_entity(&command.entity)?;
62        if let Some(id_property) = entity.id_property() {
63            let needs_id = !command.values.contains_key(&id_property.name)
64                || matches!(command.values.get(&id_property.name), Some(Value::Null));
65            if needs_id {
66                let id = self.repository.metadata.context.next_id(&command.entity)?;
67                command
68                    .values
69                    .insert(id_property.name.clone(), Value::U64(id));
70            }
71        }
72        mark_record_status(&mut command.values, CheckObjectStatus::Create);
73        let check_result = self
74            .repository
75            .metadata
76            .context
77            .check_and_fix_record(&command.entity, &mut command.values);
78        clear_record_status(&mut command.values);
79        check_result?;
80
81        Ok(command)
82    }
83
84    pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
85        UpdateCommand::new(self.entity.clone(), id)
86    }
87
88    pub fn prepare_update_command(
89        &self,
90        command: &UpdateCommand,
91    ) -> Result<UpdateCommand, RuntimeError> {
92        let mut command = command.clone();
93        if let Some(behavior) = self.behavior() {
94            behavior.before_update(self.repository.metadata.context, &mut command)?;
95        }
96        mark_record_status(&mut command.values, CheckObjectStatus::Update);
97        let check_result = self
98            .repository
99            .metadata
100            .context
101            .check_and_fix_record(&command.entity, &mut command.values);
102        clear_record_status(&mut command.values);
103        check_result?;
104        Ok(command)
105    }
106
107    pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
108        DeleteCommand::new(self.entity.clone(), id)
109    }
110
111    pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
112        RecoverCommand::new(self.entity.clone(), id, expected_version)
113    }
114
115    pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
116        let mut query = query.clone();
117        if let Some(behavior) = self.query_behavior(&query.entity) {
118            behavior.before_select(self.repository.metadata.context, &mut query)?;
119        }
120        self.repository.compile(&query)
121    }
122
123    pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
124        let mut query = query.clone();
125        if let Some(behavior) = self.query_behavior(&query.entity) {
126            behavior
127                .before_select(self.repository.metadata.context, &mut query)
128                .map_err(RepositoryError::Runtime)?;
129        }
130        let mut rows = self.fetch_prepared_query(&query)?;
131        self.enhance_object_group_bys(&mut rows, &query.object_group_bys)?;
132        self.enhance_child_queries(&mut rows, &query.child_enhancements)?;
133        Ok(rows)
134    }
135
136    fn fetch_prepared_query(
137        &self,
138        query: &SelectQuery,
139    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
140        let compiled = self
141            .repository
142            .compile(query)
143            .map_err(RepositoryError::Runtime)?;
144        if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
145            if let Some(cache) = self
146                .repository
147                .metadata
148                .context
149                .get_resource::<Arc<dyn AggregationCacheBackend>>()
150            {
151                return self.fetch_prepared_query_with_cache(
152                    query,
153                    &compiled,
154                    options,
155                    cache.as_ref(),
156                );
157            }
158            if let Some(cache) = self
159                .repository
160                .metadata
161                .context
162                .get_resource::<InMemoryAggregationCache>()
163            {
164                return self.fetch_prepared_query_with_cache(query, &compiled, options, cache);
165            }
166        }
167        let started_at = SystemTime::now();
168        let started = Instant::now();
169        let rows = self
170            .repository
171            .executor
172            .fetch_all(&compiled)
173            .map_err(RepositoryError::Executor)?;
174        self.repository.log_sql_result(
175            SqlLogOperation::Select,
176            &compiled,
177            started_at,
178            started,
179            Some(rows.len()),
180            Some(query.entity.clone()),
181            None,
182        );
183        Ok(rows)
184    }
185
186    fn fetch_prepared_query_with_cache(
187        &self,
188        query: &SelectQuery,
189        compiled: &CompiledQuery,
190        options: AggregationCacheOptions,
191        cache: &dyn AggregationCacheBackend,
192    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
193        let key = aggregation_cache_key(
194            cache.namespace(),
195            &aggregation_cache_namespace(&query.entity),
196            compiled,
197        );
198        if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
199            return Ok(rows);
200        }
201        let started_at = SystemTime::now();
202        let started = Instant::now();
203        let rows = self
204            .repository
205            .executor
206            .fetch_all(compiled)
207            .map_err(RepositoryError::Executor)?;
208        self.repository.log_sql_result(
209            SqlLogOperation::Select,
210            compiled,
211            started_at,
212            started,
213            Some(rows.len()),
214            Some(query.entity.clone()),
215            None,
216        );
217        cache.put(key, rows.clone());
218        Ok(rows)
219    }
220
221    pub fn fetch_all_with_relation_aggregates(
222        &self,
223        query: &SelectQuery,
224        relation_aggregates: &[RelationAggregate],
225    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
226        let mut rows = self.fetch_all(query)?;
227        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
228        Ok(rows)
229    }
230
231    pub fn fetch_smart_list(
232        &self,
233        query: &SelectQuery,
234    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
235        let mut query = query.clone();
236        if let Some(behavior) = self.query_behavior(&query.entity) {
237            behavior
238                .before_select(self.repository.metadata.context, &mut query)
239                .map_err(RepositoryError::Runtime)?;
240        }
241        self.repository.fetch_smart_list(&query)
242    }
243
244    pub fn fetch_smart_list_with_relation_aggregates(
245        &self,
246        query: &SelectQuery,
247        relation_aggregates: &[RelationAggregate],
248    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
249        self.fetch_all_with_relation_aggregates(query, relation_aggregates)
250            .map(SmartList::from)
251    }
252
253    pub fn fetch_entities<T>(
254        &self,
255        query: &SelectQuery,
256    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
257    where
258        T: Entity,
259    {
260        let mut query = query.clone();
261        if let Some(behavior) = self.query_behavior(&query.entity) {
262            behavior
263                .before_select(self.repository.metadata.context, &mut query)
264                .map_err(RepositoryError::Runtime)?;
265        }
266        self.repository.fetch_entities(&query)
267    }
268
269    pub fn fetch_entities_with_relation_aggregates<T>(
270        &self,
271        query: &SelectQuery,
272        relation_aggregates: &[RelationAggregate],
273    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
274    where
275        T: Entity,
276    {
277        self.fetch_all_with_relation_aggregates(query, relation_aggregates)?
278            .into_iter()
279            .map(T::from_record)
280            .collect::<Result<Vec<_>, _>>()
281            .map(SmartList::from)
282            .map_err(RepositoryError::Entity)
283    }
284
285    pub fn fetch_enhanced_entities_with_relation_aggregates<T>(
286        &self,
287        query: &SelectQuery,
288        relation_aggregates: &[RelationAggregate],
289    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
290    where
291        T: Entity,
292    {
293        let mut query = query.clone();
294        if let Some(behavior) = self.query_behavior(&query.entity) {
295            behavior
296                .before_select(self.repository.metadata.context, &mut query)
297                .map_err(RepositoryError::Runtime)?;
298        }
299
300        let mut rows = self.repository.fetch_all(&query)?;
301        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
302        self.enhance_query_relations(&mut rows, &query)?;
303        self.enhance_relations(&mut rows)?;
304        rows.into_iter()
305            .map(T::from_record)
306            .collect::<Result<Vec<_>, _>>()
307            .map(SmartList::from)
308            .map_err(RepositoryError::Entity)
309    }
310
311    pub fn fetch_enhanced_entities<T>(
312        &self,
313        query: &SelectQuery,
314    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
315    where
316        T: Entity,
317    {
318        let mut query = query.clone();
319        if let Some(behavior) = self.query_behavior(&query.entity) {
320            behavior
321                .before_select(self.repository.metadata.context, &mut query)
322                .map_err(RepositoryError::Runtime)?;
323        }
324
325        let mut rows = self.repository.fetch_all(&query)?;
326        self.enhance_query_relations(&mut rows, &query)?;
327        self.enhance_relations(&mut rows)?;
328        rows.into_iter()
329            .map(T::from_record)
330            .collect::<Result<Vec<_>, _>>()
331            .map(SmartList::from)
332            .map_err(RepositoryError::Entity)
333    }
334
335    pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
336        let command = self
337            .prepare_insert_command(command)
338            .map_err(RepositoryError::Runtime)?;
339        self.execute_prepared_insert(command)
340    }
341
342    pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
343        let command = self
344            .prepare_update_command(command)
345            .map_err(RepositoryError::Runtime)?;
346        self.execute_prepared_update(command)
347    }
348
349    pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
350        let mut command = command.clone();
351        if let Some(behavior) = self.behavior() {
352            behavior
353                .before_delete(self.repository.metadata.context, &mut command)
354                .map_err(RepositoryError::Runtime)?;
355        }
356        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
357        let affected = self.repository.delete(&command)?;
358        self.emit_event(EntityEvent::deleted_with_old_values(
359            command.entity,
360            command.id,
361            command.expected_version,
362            old_values,
363        ))
364        .map_err(RepositoryError::Runtime)?;
365        Ok(affected)
366    }
367
368    pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
369        let mut command = command.clone();
370        if let Some(behavior) = self.behavior() {
371            behavior
372                .before_recover(self.repository.metadata.context, &mut command)
373                .map_err(RepositoryError::Runtime)?;
374        }
375        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
376        let affected = self.repository.recover(&command)?;
377        self.emit_event(EntityEvent::recovered_with_old_values(
378            command.entity,
379            command.id,
380            command.expected_version,
381            old_values,
382        ))
383        .map_err(RepositoryError::Runtime)?;
384        Ok(affected)
385    }
386
387    fn emit_event(&self, event: EntityEvent) -> Result<(), RuntimeError> {
388        self.repository.metadata.context.send_event(event)
389    }
390
391    pub(super) fn execute_prepared_insert(
392        &self,
393        command: InsertCommand,
394    ) -> Result<u64, RepositoryError<E::Error>> {
395        let affected = self.repository.insert(&command)?;
396        self.emit_event(EntityEvent::created(command.entity, command.values))
397            .map_err(RepositoryError::Runtime)?;
398        Ok(affected)
399    }
400
401    pub(super) fn execute_prepared_update(
402        &self,
403        command: UpdateCommand,
404    ) -> Result<u64, RepositoryError<E::Error>> {
405        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
406        let affected = self.repository.update(&command)?;
407        let updated_fields = command.values.keys().cloned().collect();
408        let mut values = command.values.clone();
409        values.insert("id".to_owned(), command.id.clone());
410        if let Some(version) = command.expected_version {
411            values.insert("version".to_owned(), Value::I64(version + 1));
412        }
413        let mut new_values = old_values.clone().unwrap_or_default();
414        for (field, value) in &values {
415            new_values.insert(field.clone(), value.clone());
416        }
417        self.emit_event(EntityEvent::updated_with_old_values(
418            command.entity,
419            values,
420            old_values,
421            new_values,
422            updated_fields,
423        ))
424        .map_err(RepositoryError::Runtime)?;
425        Ok(affected)
426    }
427
428    fn fetch_current_event_row(
429        &self,
430        entity: &str,
431        id: &Value,
432    ) -> Result<Option<Record>, RepositoryError<E::Error>> {
433        if self.repository.metadata.context.event_sink.is_none() {
434            return Ok(None);
435        }
436        let descriptor = self
437            .repository
438            .metadata
439            .context
440            .require_entity(entity)
441            .map_err(RepositoryError::Runtime)?;
442        let Some(id_property) = descriptor.id_property() else {
443            return Ok(None);
444        };
445        let mut rows = self.fetch_all(
446            &SelectQuery::new(entity).filter(Expr::eq(id_property.name.clone(), id.clone())),
447        )?;
448        Ok(rows.pop())
449    }
450
451    pub(super) fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, D, E> {
452        ResolvedRepository {
453            entity,
454            repository: ContextRepository {
455                metadata: UserContextMetadata {
456                    context: self.repository.metadata.context,
457                },
458                dialect: self.repository.dialect,
459                executor: self.repository.executor,
460            },
461        }
462    }
463}