Skip to main content

teaql_runtime/repository/
resolved.rs

1use std::sync::Arc;
2use std::time::{Instant, SystemTime};
3
4use teaql_core::{
5    AggregationCacheOptions, DeleteCommand, Entity, Expr, InsertCommand, Record, RecoverCommand,
6    RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
7};
8use teaql_sql::{CompiledQuery, SqlDialect};
9
10use crate::{
11    CheckObjectStatus, EntityEvent, RepositoryBehavior, RepositoryError, RuntimeError,
12    SqlLogOperation, clear_record_status, mark_record_status,
13};
14
15use super::{
16    AggregationCacheBackend, ContextRepository, InMemoryAggregationCache, QueryExecutor,
17    ResolvedRepository, UserContextMetadata, helpers::*,
18};
19
20impl<'a, D, E> ResolvedRepository<'a, D, E>
21where
22    D: SqlDialect,
23    E: QueryExecutor,
24{
25    pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
26        self.repository.metadata.context.repository_behavior(entity)
27    }
28
29    pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
30        self.repository
31            .metadata
32            .context
33            .repository_behavior(&self.entity)
34    }
35
36    pub fn entity(&self) -> &str {
37        &self.entity
38    }
39
40    pub fn select(&self) -> SelectQuery {
41        SelectQuery::new(self.entity.clone())
42    }
43
44    pub fn insert_command(&self) -> InsertCommand {
45        InsertCommand::new(self.entity.clone())
46    }
47
48    fn enforce_insert_policy(&self, command: &mut InsertCommand) -> Result<(), RuntimeError> {
49        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
50            policy.enforce_insert(self.repository.metadata.context, command)?;
51        }
52        Ok(())
53    }
54
55    fn enforce_update_policy(&self, command: &mut UpdateCommand) -> Result<(), RuntimeError> {
56        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
57            policy.enforce_update(self.repository.metadata.context, command)?;
58        }
59        Ok(())
60    }
61
62    fn enforce_delete_policy(&self, command: &mut DeleteCommand) -> Result<(), RuntimeError> {
63        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
64            policy.enforce_delete(self.repository.metadata.context, command)?;
65        }
66        Ok(())
67    }
68
69    fn enforce_recover_policy(&self, command: &mut RecoverCommand) -> Result<(), RuntimeError> {
70        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
71            policy.enforce_recover(self.repository.metadata.context, command)?;
72        }
73        Ok(())
74    }
75
76    fn prepare_select_query(&self, query: &SelectQuery) -> Result<SelectQuery, RuntimeError> {
77        let mut query = query.clone();
78        if let Some(behavior) = self.query_behavior(&query.entity) {
79            behavior.before_select(self.repository.metadata.context, &mut query)?;
80        }
81        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
82            policy.enforce_select(self.repository.metadata.context, &mut query)?;
83        }
84        Ok(query)
85    }
86
87    pub fn prepare_insert_command(
88        &self,
89        command: &InsertCommand,
90    ) -> Result<InsertCommand, RuntimeError> {
91        let mut command = command.clone();
92        if let Some(behavior) = self.behavior() {
93            behavior.before_insert(self.repository.metadata.context, &mut command)?;
94        }
95        self.enforce_insert_policy(&mut command)?;
96
97        let entity = self
98            .repository
99            .metadata
100            .context
101            .require_entity(&command.entity)?;
102        if let Some(id_property) = entity.id_property() {
103            let needs_id = !command.values.contains_key(&id_property.name)
104                || is_unassigned_id(command.values.get(&id_property.name));
105            if needs_id {
106                let id = self.repository.metadata.context.next_id(&command.entity)?;
107                command
108                    .values
109                    .insert(id_property.name.clone(), Value::U64(id));
110            }
111        }
112        ensure_initial_version(&mut command.values, entity);
113        mark_record_status(&mut command.values, CheckObjectStatus::Create);
114        let check_result = self
115            .repository
116            .metadata
117            .context
118            .check_and_fix_record(&command.entity, &mut command.values);
119        clear_record_status(&mut command.values);
120        check_result?;
121
122        Ok(command)
123    }
124
125    pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
126        UpdateCommand::new(self.entity.clone(), id)
127    }
128
129    pub fn prepare_update_command(
130        &self,
131        command: &UpdateCommand,
132    ) -> Result<UpdateCommand, RuntimeError> {
133        let mut command = command.clone();
134        if let Some(behavior) = self.behavior() {
135            behavior.before_update(self.repository.metadata.context, &mut command)?;
136        }
137        self.enforce_update_policy(&mut command)?;
138        mark_record_status(&mut command.values, CheckObjectStatus::Update);
139        let check_result = self
140            .repository
141            .metadata
142            .context
143            .check_and_fix_record(&command.entity, &mut command.values);
144        clear_record_status(&mut command.values);
145        check_result?;
146        Ok(command)
147    }
148
149    pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
150        DeleteCommand::new(self.entity.clone(), id)
151    }
152
153    pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
154        RecoverCommand::new(self.entity.clone(), id, expected_version)
155    }
156
157    pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
158        let query = self.prepare_select_query(query)?;
159        self.repository.compile(&query)
160    }
161
162    pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
163        let query = self
164            .prepare_select_query(query)
165            .map_err(RepositoryError::Runtime)?;
166        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
167        let mut rows = self.fetch_prepared_query(&query)?;
168        self.enhance_object_group_bys(&mut rows, &query.object_group_bys)?;
169        self.enhance_child_queries(&mut rows, &query.child_enhancements)?;
170        Ok(rows)
171    }
172
173    fn fetch_prepared_query(
174        &self,
175        query: &SelectQuery,
176    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
177        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
178        let mut compiled = self
179            .repository
180            .compile(query)
181            .map_err(RepositoryError::Runtime)?;
182        compiled.comment = self.repository.resolve_final_comment(query.comment.clone());
183        if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
184            if let Some(cache) = self
185                .repository
186                .metadata
187                .context
188                .get_resource::<Arc<dyn AggregationCacheBackend>>()
189            {
190                return self.fetch_prepared_query_with_cache(
191                    query,
192                    &compiled,
193                    options,
194                    cache.as_ref(),
195                );
196            }
197            if let Some(cache) = self
198                .repository
199                .metadata
200                .context
201                .get_resource::<InMemoryAggregationCache>()
202            {
203                return self.fetch_prepared_query_with_cache(query, &compiled, options, cache);
204            }
205        }
206        let started_at = SystemTime::now();
207        let started = Instant::now();
208        let rows = self
209            .repository
210            .executor
211            .fetch_all(&compiled)
212            .map_err(RepositoryError::Executor)?;
213        self.repository.log_sql_result(
214            SqlLogOperation::Select,
215            &compiled,
216            started_at,
217            started,
218            Some(rows.len()),
219            Some(query.entity.clone()),
220            None,
221            query.comment.clone(),
222        );
223        Ok(rows)
224    }
225
226    fn fetch_prepared_query_with_cache(
227        &self,
228        query: &SelectQuery,
229        compiled: &CompiledQuery,
230        options: AggregationCacheOptions,
231        cache: &dyn AggregationCacheBackend,
232    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
233        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
234        let key = aggregation_cache_key(
235            cache.namespace(),
236            &aggregation_cache_namespace(&query.entity),
237            compiled,
238        );
239        if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
240            return Ok(rows);
241        }
242        let started_at = SystemTime::now();
243        let started = Instant::now();
244        let rows = self
245            .repository
246            .executor
247            .fetch_all(compiled)
248            .map_err(RepositoryError::Executor)?;
249        self.repository.log_sql_result(
250            SqlLogOperation::Select,
251            compiled,
252            started_at,
253            started,
254            Some(rows.len()),
255            Some(query.entity.clone()),
256            None,
257            query.comment.clone(),
258        );
259        cache.put(key, rows.clone());
260        Ok(rows)
261    }
262
263    pub fn fetch_all_with_relation_aggregates(
264        &self,
265        query: &SelectQuery,
266        relation_aggregates: &[RelationAggregate],
267    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
268        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
269        let mut rows = self.fetch_all(query)?;
270        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
271        Ok(rows)
272    }
273
274    pub fn fetch_smart_list(
275        &self,
276        query: &SelectQuery,
277    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
278        let query = self
279            .prepare_select_query(query)
280            .map_err(RepositoryError::Runtime)?;
281        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
282        self.repository.fetch_smart_list(&query)
283    }
284
285    pub fn fetch_smart_list_with_relation_aggregates(
286        &self,
287        query: &SelectQuery,
288        relation_aggregates: &[RelationAggregate],
289    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
290        self.fetch_all_with_relation_aggregates(query, relation_aggregates)
291            .map(SmartList::from)
292    }
293
294    pub fn fetch_entities<T>(
295        &self,
296        query: &SelectQuery,
297    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
298    where
299        T: Entity,
300    {
301        let query = self
302            .prepare_select_query(query)
303            .map_err(RepositoryError::Runtime)?;
304        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
305        self.repository.fetch_entities(&query)
306    }
307
308    pub fn fetch_entities_with_relation_aggregates<T>(
309        &self,
310        query: &SelectQuery,
311        relation_aggregates: &[RelationAggregate],
312    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
313    where
314        T: Entity,
315    {
316        self.fetch_all_with_relation_aggregates(query, relation_aggregates)?
317            .into_iter()
318            .map(T::from_record)
319            .collect::<Result<Vec<_>, _>>()
320            .map(SmartList::from)
321            .map_err(RepositoryError::Entity)
322    }
323
324    pub fn fetch_enhanced_entities_with_relation_aggregates<T>(
325        &self,
326        query: &SelectQuery,
327        relation_aggregates: &[RelationAggregate],
328    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
329    where
330        T: Entity,
331    {
332        let query = self
333            .prepare_select_query(query)
334            .map_err(RepositoryError::Runtime)?;
335
336        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
337
338        let mut rows = self.repository.fetch_all(&query)?;
339        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
340        self.enhance_query_relations(&mut rows, &query)?;
341        self.enhance_relations(&mut rows)?;
342        rows.into_iter()
343            .map(T::from_record)
344            .collect::<Result<Vec<_>, _>>()
345            .map(SmartList::from)
346            .map_err(RepositoryError::Entity)
347    }
348
349    pub fn fetch_enhanced_entities<T>(
350        &self,
351        query: &SelectQuery,
352    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
353    where
354        T: Entity,
355    {
356        let query = self
357            .prepare_select_query(query)
358            .map_err(RepositoryError::Runtime)?;
359        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
360
361        let mut rows = self.repository.fetch_all(&query)?;
362        self.enhance_query_relations(&mut rows, &query)?;
363        self.enhance_relations(&mut rows)?;
364        rows.into_iter()
365            .map(T::from_record)
366            .collect::<Result<Vec<_>, _>>()
367            .map(SmartList::from)
368            .map_err(RepositoryError::Entity)
369    }
370
371    pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
372        let command = self
373            .prepare_insert_command(command)
374            .map_err(RepositoryError::Runtime)?;
375        self.execute_prepared_insert(command)
376    }
377
378    pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
379        let command = self
380            .prepare_update_command(command)
381            .map_err(RepositoryError::Runtime)?;
382        self.execute_prepared_update(command)
383    }
384
385    pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
386        self.delete_scoped(command, None)
387    }
388
389    pub fn delete_scoped(
390        &self,
391        command: &DeleteCommand,
392        comment_lineage: Option<String>,
393    ) -> Result<u64, RepositoryError<E::Error>> {
394        let mut command = command.clone();
395        if let Some(behavior) = self.behavior() {
396            behavior
397                .before_delete(self.repository.metadata.context, &mut command)
398                .map_err(RepositoryError::Runtime)?;
399        }
400        self.enforce_delete_policy(&mut command)
401            .map_err(RepositoryError::Runtime)?;
402        let resolved_lineage = self.format_lineage_comment(&command.id, comment_lineage);
403        let _guard = resolved_lineage.as_ref().map(|lineage| {
404            crate::context::QueryCommentGuard::new(
405                self.repository.metadata.context,
406                Some(lineage.clone()),
407            )
408        });
409        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
410        let affected = self.repository.delete(&command)?;
411        let mut event = EntityEvent::deleted_with_old_values(
412            command.entity,
413            command.id,
414            command.expected_version,
415            old_values,
416        );
417        event.comment = resolved_lineage;
418        self.emit_event(event)
419            .map_err(RepositoryError::Runtime)?;
420        Ok(affected)
421    }
422
423    pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
424        let mut command = command.clone();
425        if let Some(behavior) = self.behavior() {
426            behavior
427                .before_recover(self.repository.metadata.context, &mut command)
428                .map_err(RepositoryError::Runtime)?;
429        }
430        self.enforce_recover_policy(&mut command)
431            .map_err(RepositoryError::Runtime)?;
432        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
433        let affected = self.repository.recover(&command)?;
434        let resolved_lineage = self.format_lineage_comment(&command.id, None);
435        let mut event = EntityEvent::recovered_with_old_values(
436            command.entity,
437            command.id,
438            command.expected_version,
439            old_values,
440        );
441        event.comment = resolved_lineage;
442        self.emit_event(event)
443            .map_err(RepositoryError::Runtime)?;
444        Ok(affected)
445    }
446
447    fn emit_event(&self, event: EntityEvent) -> Result<(), RuntimeError> {
448        self.repository.metadata.context.send_event(event)
449    }
450
451    pub(super) fn execute_prepared_insert(
452        &self,
453        command: InsertCommand,
454    ) -> Result<u64, RepositoryError<E::Error>> {
455        self.execute_prepared_insert_with_comment(command, None)
456    }
457
458    pub(super) fn execute_prepared_insert_with_comment(
459        &self,
460        command: InsertCommand,
461        comment_lineage: Option<String>,
462    ) -> Result<u64, RepositoryError<E::Error>> {
463        let id_val = command.values.get("id").cloned().unwrap_or(Value::Null);
464        let resolved_lineage = self.format_lineage_comment(&id_val, comment_lineage);
465        let _guard = resolved_lineage.as_ref().map(|lineage| {
466            crate::context::QueryCommentGuard::new(
467                self.repository.metadata.context,
468                Some(lineage.clone()),
469            )
470        });
471        let affected = self.repository.insert(&command)?;
472        let mut event = EntityEvent::created(command.entity, command.values);
473        event.comment = resolved_lineage;
474        self.emit_event(event).map_err(RepositoryError::Runtime)?;
475        Ok(affected)
476    }
477
478    pub(super) fn execute_prepared_update(
479        &self,
480        command: UpdateCommand,
481    ) -> Result<u64, RepositoryError<E::Error>> {
482        self.execute_prepared_update_with_comment(command, None)
483    }
484
485    pub(super) fn execute_prepared_update_with_comment(
486        &self,
487        command: UpdateCommand,
488        comment_lineage: Option<String>,
489    ) -> Result<u64, RepositoryError<E::Error>> {
490        let resolved_lineage = self.format_lineage_comment(&command.id, comment_lineage);
491        let _guard = resolved_lineage.as_ref().map(|lineage| {
492            crate::context::QueryCommentGuard::new(
493                self.repository.metadata.context,
494                Some(lineage.clone()),
495            )
496        });
497        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
498        let affected = self.repository.update(&command)?;
499        let updated_fields = command.values.keys().cloned().collect();
500        let mut values = command.values.clone();
501        values.insert("id".to_owned(), command.id.clone());
502        if let Some(version) = command.expected_version {
503            values.insert("version".to_owned(), Value::I64(version + 1));
504        }
505        let mut new_values = old_values.clone().unwrap_or_default();
506        for (field, value) in &values {
507            new_values.insert(field.clone(), value.clone());
508        }
509        let mut event = EntityEvent::updated_with_old_values(
510            command.entity,
511            values,
512            old_values,
513            new_values,
514            updated_fields,
515        );
516        event.comment = resolved_lineage;
517        self.emit_event(event).map_err(RepositoryError::Runtime)?;
518        Ok(affected)
519    }
520
521    fn fetch_current_event_row(
522        &self,
523        entity: &str,
524        id: &Value,
525    ) -> Result<Option<Record>, RepositoryError<E::Error>> {
526        if self.repository.metadata.context.event_sink.is_none() {
527            return Ok(None);
528        }
529        let descriptor = self
530            .repository
531            .metadata
532            .context
533            .require_entity(entity)
534            .map_err(RepositoryError::Runtime)?;
535        let Some(id_property) = descriptor.id_property() else {
536            return Ok(None);
537        };
538        let mut rows = self.fetch_all(
539            &SelectQuery::new(entity).filter(Expr::eq(id_property.name.clone(), id.clone())),
540        )?;
541        Ok(rows.pop())
542    }
543
544    fn format_lineage_comment(
545        &self,
546        id: &Value,
547        comment_lineage: Option<String>,
548    ) -> Option<String> {
549        let raw_comment = comment_lineage.or_else(|| {
550            self.repository.metadata.context.comment_stack.lock().ok().and_then(|stack| {
551                if stack.is_empty() {
552                    None
553                } else {
554                    Some(stack.join(" -> "))
555                }
556            })
557        })?;
558
559        if raw_comment.is_empty() {
560            return None;
561        }
562
563        let id_str = match id {
564            Value::U64(n) => n.to_string(),
565            Value::I64(n) => n.to_string(),
566            Value::Text(s) => s.clone(),
567            other => format!("{other:?}"),
568        };
569        let prefix = format!("{}({}):", self.entity, id_str);
570        if raw_comment.starts_with(&prefix) || raw_comment.contains(&format!(" -> {}({}):", self.entity, id_str)) {
571            Some(raw_comment)
572        } else {
573            Some(format!("{}({}): {}", self.entity, id_str, raw_comment))
574        }
575    }
576
577    pub(super) fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, D, E> {
578        ResolvedRepository {
579            entity,
580            repository: ContextRepository {
581                metadata: UserContextMetadata {
582                    context: self.repository.metadata.context,
583                },
584                dialect: self.repository.dialect,
585                executor: self.repository.executor,
586            },
587        }
588    }
589}