Skip to main content

teaql_runtime/repository/
resolved.rs

1use std::sync::Arc;
2
3use teaql_core::{
4    AggregationCacheOptions, DeleteCommand, Entity, InsertCommand, Record, RecoverCommand,
5    RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
6};
7use teaql_sql::{CompiledQuery, SqlDialect};
8
9use crate::{
10    CheckObjectStatus, EntityEvent, RepositoryBehavior, RepositoryError, RuntimeError,
11    SqlLogOperation, clear_record_status, mark_record_status,
12};
13
14use super::{
15    AggregationCacheBackend, ContextRepository, InMemoryAggregationCache, QueryExecutor,
16    ResolvedRepository, UserContextMetadata, helpers::*,
17};
18
19impl<'a, D, E> ResolvedRepository<'a, D, E>
20where
21    D: SqlDialect,
22    E: QueryExecutor,
23{
24    pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
25        self.repository.metadata.context.repository_behavior(entity)
26    }
27
28    pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
29        self.repository
30            .metadata
31            .context
32            .repository_behavior(&self.entity)
33    }
34
35    pub fn entity(&self) -> &str {
36        &self.entity
37    }
38
39    pub fn select(&self) -> SelectQuery {
40        SelectQuery::new(self.entity.clone())
41    }
42
43    pub fn insert_command(&self) -> InsertCommand {
44        InsertCommand::new(self.entity.clone())
45    }
46
47    pub fn prepare_insert_command(
48        &self,
49        command: &InsertCommand,
50    ) -> Result<InsertCommand, RuntimeError> {
51        let mut command = command.clone();
52        if let Some(behavior) = self.behavior() {
53            behavior.before_insert(self.repository.metadata.context, &mut command)?;
54        }
55
56        let entity = self
57            .repository
58            .metadata
59            .context
60            .require_entity(&command.entity)?;
61        if let Some(id_property) = entity.id_property() {
62            let needs_id = !command.values.contains_key(&id_property.name)
63                || matches!(command.values.get(&id_property.name), Some(Value::Null));
64            if needs_id {
65                let id = self.repository.metadata.context.next_id(&command.entity)?;
66                command
67                    .values
68                    .insert(id_property.name.clone(), Value::U64(id));
69            }
70        }
71        mark_record_status(&mut command.values, CheckObjectStatus::Create);
72        let check_result = self
73            .repository
74            .metadata
75            .context
76            .check_and_fix_record(&command.entity, &mut command.values);
77        clear_record_status(&mut command.values);
78        check_result?;
79
80        Ok(command)
81    }
82
83    pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
84        UpdateCommand::new(self.entity.clone(), id)
85    }
86
87    pub fn prepare_update_command(
88        &self,
89        command: &UpdateCommand,
90    ) -> Result<UpdateCommand, RuntimeError> {
91        let mut command = command.clone();
92        if let Some(behavior) = self.behavior() {
93            behavior.before_update(self.repository.metadata.context, &mut command)?;
94        }
95        mark_record_status(&mut command.values, CheckObjectStatus::Update);
96        let check_result = self
97            .repository
98            .metadata
99            .context
100            .check_and_fix_record(&command.entity, &mut command.values);
101        clear_record_status(&mut command.values);
102        check_result?;
103        Ok(command)
104    }
105
106    pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
107        DeleteCommand::new(self.entity.clone(), id)
108    }
109
110    pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
111        RecoverCommand::new(self.entity.clone(), id, expected_version)
112    }
113
114    pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
115        let mut query = query.clone();
116        if let Some(behavior) = self.query_behavior(&query.entity) {
117            behavior.before_select(self.repository.metadata.context, &mut query)?;
118        }
119        self.repository.compile(&query)
120    }
121
122    pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
123        let mut query = query.clone();
124        if let Some(behavior) = self.query_behavior(&query.entity) {
125            behavior
126                .before_select(self.repository.metadata.context, &mut query)
127                .map_err(RepositoryError::Runtime)?;
128        }
129        let mut rows = self.fetch_prepared_query(&query)?;
130        self.enhance_object_group_bys(&mut rows, &query.object_group_bys)?;
131        self.enhance_child_queries(&mut rows, &query.child_enhancements)?;
132        Ok(rows)
133    }
134
135    fn fetch_prepared_query(
136        &self,
137        query: &SelectQuery,
138    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
139        let compiled = self
140            .repository
141            .compile(query)
142            .map_err(RepositoryError::Runtime)?;
143        if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
144            if let Some(cache) = self
145                .repository
146                .metadata
147                .context
148                .get_resource::<Arc<dyn AggregationCacheBackend>>()
149            {
150                return self.fetch_prepared_query_with_cache(
151                    query,
152                    &compiled,
153                    options,
154                    cache.as_ref(),
155                );
156            }
157            if let Some(cache) = self
158                .repository
159                .metadata
160                .context
161                .get_resource::<InMemoryAggregationCache>()
162            {
163                return self.fetch_prepared_query_with_cache(query, &compiled, options, cache);
164            }
165        }
166        self.repository.log_sql(SqlLogOperation::Select, &compiled);
167        self.repository
168            .executor
169            .fetch_all(&compiled)
170            .map_err(RepositoryError::Executor)
171    }
172
173    fn fetch_prepared_query_with_cache(
174        &self,
175        query: &SelectQuery,
176        compiled: &CompiledQuery,
177        options: AggregationCacheOptions,
178        cache: &dyn AggregationCacheBackend,
179    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
180        let key = aggregation_cache_key(
181            cache.namespace(),
182            &aggregation_cache_namespace(&query.entity),
183            compiled,
184        );
185        if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
186            return Ok(rows);
187        }
188        self.repository.log_sql(SqlLogOperation::Select, compiled);
189        let rows = self
190            .repository
191            .executor
192            .fetch_all(compiled)
193            .map_err(RepositoryError::Executor)?;
194        cache.put(key, rows.clone());
195        Ok(rows)
196    }
197
198    pub fn fetch_all_with_relation_aggregates(
199        &self,
200        query: &SelectQuery,
201        relation_aggregates: &[RelationAggregate],
202    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
203        let mut rows = self.fetch_all(query)?;
204        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
205        Ok(rows)
206    }
207
208    pub fn fetch_smart_list(
209        &self,
210        query: &SelectQuery,
211    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
212        let mut query = query.clone();
213        if let Some(behavior) = self.query_behavior(&query.entity) {
214            behavior
215                .before_select(self.repository.metadata.context, &mut query)
216                .map_err(RepositoryError::Runtime)?;
217        }
218        self.repository.fetch_smart_list(&query)
219    }
220
221    pub fn fetch_smart_list_with_relation_aggregates(
222        &self,
223        query: &SelectQuery,
224        relation_aggregates: &[RelationAggregate],
225    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
226        self.fetch_all_with_relation_aggregates(query, relation_aggregates)
227            .map(SmartList::from)
228    }
229
230    pub fn fetch_entities<T>(
231        &self,
232        query: &SelectQuery,
233    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
234    where
235        T: Entity,
236    {
237        let mut query = query.clone();
238        if let Some(behavior) = self.query_behavior(&query.entity) {
239            behavior
240                .before_select(self.repository.metadata.context, &mut query)
241                .map_err(RepositoryError::Runtime)?;
242        }
243        self.repository.fetch_entities(&query)
244    }
245
246    pub fn fetch_entities_with_relation_aggregates<T>(
247        &self,
248        query: &SelectQuery,
249        relation_aggregates: &[RelationAggregate],
250    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
251    where
252        T: Entity,
253    {
254        self.fetch_all_with_relation_aggregates(query, relation_aggregates)?
255            .into_iter()
256            .map(T::from_record)
257            .collect::<Result<Vec<_>, _>>()
258            .map(SmartList::from)
259            .map_err(RepositoryError::Entity)
260    }
261
262    pub fn fetch_enhanced_entities_with_relation_aggregates<T>(
263        &self,
264        query: &SelectQuery,
265        relation_aggregates: &[RelationAggregate],
266    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
267    where
268        T: Entity,
269    {
270        let mut query = query.clone();
271        if let Some(behavior) = self.query_behavior(&query.entity) {
272            behavior
273                .before_select(self.repository.metadata.context, &mut query)
274                .map_err(RepositoryError::Runtime)?;
275        }
276
277        let mut rows = self.repository.fetch_all(&query)?;
278        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
279        self.enhance_query_relations(&mut rows, &query)?;
280        self.enhance_relations(&mut rows)?;
281        rows.into_iter()
282            .map(T::from_record)
283            .collect::<Result<Vec<_>, _>>()
284            .map(SmartList::from)
285            .map_err(RepositoryError::Entity)
286    }
287
288    pub fn fetch_enhanced_entities<T>(
289        &self,
290        query: &SelectQuery,
291    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
292    where
293        T: Entity,
294    {
295        let mut query = query.clone();
296        if let Some(behavior) = self.query_behavior(&query.entity) {
297            behavior
298                .before_select(self.repository.metadata.context, &mut query)
299                .map_err(RepositoryError::Runtime)?;
300        }
301
302        let mut rows = self.repository.fetch_all(&query)?;
303        self.enhance_query_relations(&mut rows, &query)?;
304        self.enhance_relations(&mut rows)?;
305        rows.into_iter()
306            .map(T::from_record)
307            .collect::<Result<Vec<_>, _>>()
308            .map(SmartList::from)
309            .map_err(RepositoryError::Entity)
310    }
311
312    pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
313        let command = self
314            .prepare_insert_command(command)
315            .map_err(RepositoryError::Runtime)?;
316        self.execute_prepared_insert(command)
317    }
318
319    pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
320        let command = self
321            .prepare_update_command(command)
322            .map_err(RepositoryError::Runtime)?;
323        self.execute_prepared_update(command)
324    }
325
326    pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
327        let mut command = command.clone();
328        if let Some(behavior) = self.behavior() {
329            behavior
330                .before_delete(self.repository.metadata.context, &mut command)
331                .map_err(RepositoryError::Runtime)?;
332        }
333        let affected = self.repository.delete(&command)?;
334        self.emit_event(EntityEvent::deleted(
335            command.entity,
336            command.id,
337            command.expected_version,
338        ))
339        .map_err(RepositoryError::Runtime)?;
340        Ok(affected)
341    }
342
343    pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
344        let mut command = command.clone();
345        if let Some(behavior) = self.behavior() {
346            behavior
347                .before_recover(self.repository.metadata.context, &mut command)
348                .map_err(RepositoryError::Runtime)?;
349        }
350        let affected = self.repository.recover(&command)?;
351        self.emit_event(EntityEvent::recovered(
352            command.entity,
353            command.id,
354            command.expected_version,
355        ))
356        .map_err(RepositoryError::Runtime)?;
357        Ok(affected)
358    }
359
360    fn emit_event(&self, event: EntityEvent) -> Result<(), RuntimeError> {
361        self.repository.metadata.context.send_event(event)
362    }
363
364    pub(super) fn execute_prepared_insert(
365        &self,
366        command: InsertCommand,
367    ) -> Result<u64, RepositoryError<E::Error>> {
368        let affected = self.repository.insert(&command)?;
369        self.emit_event(EntityEvent::created(command.entity, command.values))
370            .map_err(RepositoryError::Runtime)?;
371        Ok(affected)
372    }
373
374    pub(super) fn execute_prepared_update(
375        &self,
376        command: UpdateCommand,
377    ) -> Result<u64, RepositoryError<E::Error>> {
378        let affected = self.repository.update(&command)?;
379        let updated_fields = command.values.keys().cloned().collect();
380        let mut values = command.values.clone();
381        values.insert("id".to_owned(), command.id.clone());
382        if let Some(version) = command.expected_version {
383            values.insert("version".to_owned(), Value::I64(version + 1));
384        }
385        self.emit_event(EntityEvent {
386            kind: crate::EntityEventKind::Updated,
387            entity: command.entity,
388            values,
389            updated_fields,
390        })
391        .map_err(RepositoryError::Runtime)?;
392        Ok(affected)
393    }
394    pub(super) fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, D, E> {
395        ResolvedRepository {
396            entity,
397            repository: ContextRepository {
398                metadata: UserContextMetadata {
399                    context: self.repository.metadata.context,
400                },
401                dialect: self.repository.dialect,
402                executor: self.repository.executor,
403            },
404        }
405    }
406}