Skip to main content

teaql_runtime/repository/
resolved.rs

1use std::sync::Arc;
2use std::time::{Instant, SystemTime};
3
4use teaql_core::{
5    AggregationCacheOptions, DeleteCommand, Entity, Expr, InsertCommand, Record, RecoverCommand,
6    RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
7};
8use teaql_sql::{CompiledQuery, SqlDialect};
9
10use crate::{
11    CheckObjectStatus, EntityEvent, RepositoryBehavior, RepositoryError, RuntimeError,
12    SqlLogOperation, clear_record_status, mark_record_status,
13};
14
15use super::{
16    AggregationCacheBackend, ContextRepository, InMemoryAggregationCache, QueryExecutor,
17    ResolvedRepository, UserContextMetadata, helpers::*,
18};
19
20impl<'a, D, E> ResolvedRepository<'a, D, E>
21where
22    D: SqlDialect,
23    E: QueryExecutor,
24{
25    pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
26        self.repository.metadata.context.repository_behavior(entity)
27    }
28
29    pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
30        self.repository
31            .metadata
32            .context
33            .repository_behavior(&self.entity)
34    }
35
36    pub fn entity(&self) -> &str {
37        &self.entity
38    }
39
40    pub fn select(&self) -> SelectQuery {
41        SelectQuery::new(self.entity.clone())
42    }
43
44    pub fn insert_command(&self) -> InsertCommand {
45        InsertCommand::new(self.entity.clone())
46    }
47
48    fn enforce_insert_policy(&self, command: &mut InsertCommand) -> Result<(), RuntimeError> {
49        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
50            policy.enforce_insert(self.repository.metadata.context, command)?;
51        }
52        Ok(())
53    }
54
55    fn enforce_update_policy(&self, command: &mut UpdateCommand) -> Result<(), RuntimeError> {
56        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
57            policy.enforce_update(self.repository.metadata.context, command)?;
58        }
59        Ok(())
60    }
61
62    fn enforce_delete_policy(&self, command: &mut DeleteCommand) -> Result<(), RuntimeError> {
63        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
64            policy.enforce_delete(self.repository.metadata.context, command)?;
65        }
66        Ok(())
67    }
68
69    fn enforce_recover_policy(&self, command: &mut RecoverCommand) -> Result<(), RuntimeError> {
70        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
71            policy.enforce_recover(self.repository.metadata.context, command)?;
72        }
73        Ok(())
74    }
75
76    fn prepare_select_query(&self, query: &SelectQuery) -> Result<SelectQuery, RuntimeError> {
77        let mut query = query.clone();
78        if let Some(behavior) = self.query_behavior(&query.entity) {
79            behavior.before_select(self.repository.metadata.context, &mut query)?;
80        }
81        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
82            policy.enforce_select(self.repository.metadata.context, &mut query)?;
83        }
84        Ok(query)
85    }
86
87    pub fn prepare_insert_command(
88        &self,
89        command: &InsertCommand,
90    ) -> Result<InsertCommand, RuntimeError> {
91        let mut command = command.clone();
92        if let Some(behavior) = self.behavior() {
93            behavior.before_insert(self.repository.metadata.context, &mut command)?;
94        }
95        self.enforce_insert_policy(&mut command)?;
96
97        let entity = self
98            .repository
99            .metadata
100            .context
101            .require_entity(&command.entity)?;
102        if let Some(id_property) = entity.id_property() {
103            let needs_id = !command.values.contains_key(&id_property.name)
104                || is_unassigned_id(command.values.get(&id_property.name));
105            if needs_id {
106                let id = self.repository.metadata.context.next_id(&command.entity)?;
107                command
108                    .values
109                    .insert(id_property.name.clone(), Value::U64(id));
110            }
111        }
112        ensure_initial_version(&mut command.values, entity);
113        mark_record_status(&mut command.values, CheckObjectStatus::Create);
114        let check_result = self
115            .repository
116            .metadata
117            .context
118            .check_and_fix_record(&command.entity, &mut command.values);
119        clear_record_status(&mut command.values);
120        check_result?;
121
122        Ok(command)
123    }
124
125    pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
126        UpdateCommand::new(self.entity.clone(), id)
127    }
128
129    pub fn prepare_update_command(
130        &self,
131        command: &UpdateCommand,
132    ) -> Result<UpdateCommand, RuntimeError> {
133        let mut command = command.clone();
134        if let Some(behavior) = self.behavior() {
135            behavior.before_update(self.repository.metadata.context, &mut command)?;
136        }
137        self.enforce_update_policy(&mut command)?;
138        mark_record_status(&mut command.values, CheckObjectStatus::Update);
139        let check_result = self
140            .repository
141            .metadata
142            .context
143            .check_and_fix_record(&command.entity, &mut command.values);
144        clear_record_status(&mut command.values);
145        check_result?;
146        Ok(command)
147    }
148
149    pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
150        DeleteCommand::new(self.entity.clone(), id)
151    }
152
153    pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
154        RecoverCommand::new(self.entity.clone(), id, expected_version)
155    }
156
157    pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
158        let query = self.prepare_select_query(query)?;
159        self.repository.compile(&query)
160    }
161
162    pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
163        let query = self
164            .prepare_select_query(query)
165            .map_err(RepositoryError::Runtime)?;
166        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
167        let mut rows = self.fetch_prepared_query(&query)?;
168        self.enhance_object_group_bys(&mut rows, &query.object_group_bys)?;
169        self.enhance_child_queries(&mut rows, &query.child_enhancements)?;
170        Ok(rows)
171    }
172
173    fn fetch_prepared_query(
174        &self,
175        query: &SelectQuery,
176    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
177        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
178        let mut compiled = self
179            .repository
180            .compile(query)
181            .map_err(RepositoryError::Runtime)?;
182        compiled.comment = self.repository.resolve_final_comment(query.comment.clone());
183        if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
184            if let Some(cache) = self
185                .repository
186                .metadata
187                .context
188                .get_resource::<Arc<dyn AggregationCacheBackend>>()
189            {
190                return self.fetch_prepared_query_with_cache(
191                    query,
192                    &compiled,
193                    options,
194                    cache.as_ref(),
195                );
196            }
197            if let Some(cache) = self
198                .repository
199                .metadata
200                .context
201                .get_resource::<InMemoryAggregationCache>()
202            {
203                return self.fetch_prepared_query_with_cache(query, &compiled, options, cache);
204            }
205        }
206        let started_at = SystemTime::now();
207        let started = Instant::now();
208        let rows = self
209            .repository
210            .executor
211            .fetch_all(&compiled)
212            .map_err(RepositoryError::Executor)?;
213        self.repository.log_sql_result(
214            SqlLogOperation::Select,
215            &compiled,
216            started_at,
217            started,
218            Some(rows.len()),
219            Some(query.entity.clone()),
220            None,
221            query.comment.clone(),
222        );
223        Ok(rows)
224    }
225
226    fn fetch_prepared_query_with_cache(
227        &self,
228        query: &SelectQuery,
229        compiled: &CompiledQuery,
230        options: AggregationCacheOptions,
231        cache: &dyn AggregationCacheBackend,
232    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
233        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
234        let key = aggregation_cache_key(
235            cache.namespace(),
236            &aggregation_cache_namespace(&query.entity),
237            compiled,
238        );
239        if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
240            return Ok(rows);
241        }
242        let started_at = SystemTime::now();
243        let started = Instant::now();
244        let rows = self
245            .repository
246            .executor
247            .fetch_all(compiled)
248            .map_err(RepositoryError::Executor)?;
249        self.repository.log_sql_result(
250            SqlLogOperation::Select,
251            compiled,
252            started_at,
253            started,
254            Some(rows.len()),
255            Some(query.entity.clone()),
256            None,
257            query.comment.clone(),
258        );
259        cache.put(key, rows.clone());
260        Ok(rows)
261    }
262
263    pub fn fetch_all_with_relation_aggregates(
264        &self,
265        query: &SelectQuery,
266        relation_aggregates: &[RelationAggregate],
267    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
268        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
269        let mut rows = self.fetch_all(query)?;
270        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
271        Ok(rows)
272    }
273
274    pub fn fetch_smart_list(
275        &self,
276        query: &SelectQuery,
277    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
278        let query = self
279            .prepare_select_query(query)
280            .map_err(RepositoryError::Runtime)?;
281        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
282        self.repository.fetch_smart_list(&query)
283    }
284
285    pub fn fetch_smart_list_with_relation_aggregates(
286        &self,
287        query: &SelectQuery,
288        relation_aggregates: &[RelationAggregate],
289    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
290        self.fetch_all_with_relation_aggregates(query, relation_aggregates)
291            .map(SmartList::from)
292    }
293
294    pub fn fetch_entities<T>(
295        &self,
296        query: &SelectQuery,
297    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
298    where
299        T: Entity,
300    {
301        let query = self
302            .prepare_select_query(query)
303            .map_err(RepositoryError::Runtime)?;
304        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
305        self.repository.fetch_entities(&query)
306    }
307
308    pub fn fetch_entities_with_relation_aggregates<T>(
309        &self,
310        query: &SelectQuery,
311        relation_aggregates: &[RelationAggregate],
312    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
313    where
314        T: Entity,
315    {
316        self.fetch_all_with_relation_aggregates(query, relation_aggregates)?
317            .into_iter()
318            .map(|record| {
319                let mut entity = T::from_record(record)?;
320                let root = crate::EntityRoot::default();
321                entity.on_loaded(&root as &dyn std::any::Any);
322                Ok(entity)
323            })
324            .collect::<Result<Vec<_>, _>>()
325            .map(SmartList::from)
326            .map_err(RepositoryError::Entity)
327    }
328
329    pub fn fetch_enhanced_entities_with_relation_aggregates<T>(
330        &self,
331        query: &SelectQuery,
332        relation_aggregates: &[RelationAggregate],
333    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
334    where
335        T: Entity,
336    {
337        let query = self
338            .prepare_select_query(query)
339            .map_err(RepositoryError::Runtime)?;
340
341        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
342
343        let mut rows = self.repository.fetch_all(&query)?;
344        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
345        self.enhance_query_relations(&mut rows, &query)?;
346        self.enhance_relations(&mut rows)?;
347        rows.into_iter()
348            .map(|record| {
349                let mut entity = T::from_record(record)?;
350                let root = crate::EntityRoot::default();
351                entity.on_loaded(&root as &dyn std::any::Any);
352                Ok(entity)
353            })
354            .collect::<Result<Vec<_>, _>>()
355            .map(SmartList::from)
356            .map_err(RepositoryError::Entity)
357    }
358
359    pub fn fetch_enhanced_entities<T>(
360        &self,
361        query: &SelectQuery,
362    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
363    where
364        T: Entity,
365    {
366        let query = self
367            .prepare_select_query(query)
368            .map_err(RepositoryError::Runtime)?;
369        let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
370
371        let mut rows = self.repository.fetch_all(&query)?;
372        self.enhance_query_relations(&mut rows, &query)?;
373        self.enhance_relations(&mut rows)?;
374        let root = self.repository.metadata.context.get_resource::<crate::EntityRoot>().cloned();
375
376        rows.into_iter()
377            .map(|record| {
378                let mut entity = T::from_record(record)?;
379                if let Some(ref root) = root {
380                    entity.on_loaded(root as &dyn std::any::Any);
381                }
382                Ok(entity)
383            })
384            .collect::<Result<Vec<_>, _>>()
385            .map(SmartList::from)
386            .map_err(RepositoryError::Entity)
387    }
388
389    pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
390        let command = self
391            .prepare_insert_command(command)
392            .map_err(RepositoryError::Runtime)?;
393        self.execute_prepared_insert(command)
394    }
395
396    pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
397        let command = self
398            .prepare_update_command(command)
399            .map_err(RepositoryError::Runtime)?;
400        self.execute_prepared_update(command)
401    }
402
403    pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
404        self.delete_scoped(command, None)
405    }
406
407    pub fn delete_scoped(
408        &self,
409        command: &DeleteCommand,
410        comment_lineage: Option<String>,
411    ) -> Result<u64, RepositoryError<E::Error>> {
412        let mut command = command.clone();
413        if let Some(behavior) = self.behavior() {
414            behavior
415                .before_delete(self.repository.metadata.context, &mut command)
416                .map_err(RepositoryError::Runtime)?;
417        }
418        self.enforce_delete_policy(&mut command)
419            .map_err(RepositoryError::Runtime)?;
420        let resolved_lineage = self.format_lineage_comment(&command.id, comment_lineage);
421        let _guard = resolved_lineage.as_ref().map(|lineage| {
422            crate::context::QueryCommentGuard::new(
423                self.repository.metadata.context,
424                Some(lineage.clone()),
425            )
426        });
427        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
428        let affected = self.repository.delete(&command)?;
429        let mut event = EntityEvent::deleted_with_old_values(
430            command.entity,
431            command.id,
432            command.expected_version,
433            old_values,
434        );
435        event.comment = resolved_lineage;
436        self.emit_event(event)
437            .map_err(RepositoryError::Runtime)?;
438        Ok(affected)
439    }
440
441    pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
442        let mut command = command.clone();
443        if let Some(behavior) = self.behavior() {
444            behavior
445                .before_recover(self.repository.metadata.context, &mut command)
446                .map_err(RepositoryError::Runtime)?;
447        }
448        self.enforce_recover_policy(&mut command)
449            .map_err(RepositoryError::Runtime)?;
450        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
451        let affected = self.repository.recover(&command)?;
452        let resolved_lineage = self.format_lineage_comment(&command.id, None);
453        let mut event = EntityEvent::recovered_with_old_values(
454            command.entity,
455            command.id,
456            command.expected_version,
457            old_values,
458        );
459        event.comment = resolved_lineage;
460        self.emit_event(event)
461            .map_err(RepositoryError::Runtime)?;
462        Ok(affected)
463    }
464
465    fn emit_event(&self, event: EntityEvent) -> Result<(), RuntimeError> {
466        self.repository.metadata.context.send_event(event)
467    }
468
469    pub(super) fn execute_prepared_insert(
470        &self,
471        command: InsertCommand,
472    ) -> Result<u64, RepositoryError<E::Error>> {
473        self.execute_prepared_insert_with_comment(command, None)
474    }
475
476    pub(super) fn execute_prepared_insert_with_comment(
477        &self,
478        command: InsertCommand,
479        comment_lineage: Option<String>,
480    ) -> Result<u64, RepositoryError<E::Error>> {
481        let id_val = command.values.get("id").cloned().unwrap_or(Value::Null);
482        let resolved_lineage = self.format_lineage_comment(&id_val, comment_lineage);
483        let _guard = resolved_lineage.as_ref().map(|lineage| {
484            crate::context::QueryCommentGuard::new(
485                self.repository.metadata.context,
486                Some(lineage.clone()),
487            )
488        });
489        let affected = self.repository.insert(&command)?;
490        let mut event = EntityEvent::created(command.entity, command.values);
491        event.comment = resolved_lineage;
492        self.emit_event(event).map_err(RepositoryError::Runtime)?;
493        Ok(affected)
494    }
495
496    pub(super) fn execute_prepared_update(
497        &self,
498        command: UpdateCommand,
499    ) -> Result<u64, RepositoryError<E::Error>> {
500        self.execute_prepared_update_with_comment(command, None)
501    }
502
503    pub(super) fn execute_prepared_update_with_comment(
504        &self,
505        command: UpdateCommand,
506        comment_lineage: Option<String>,
507    ) -> Result<u64, RepositoryError<E::Error>> {
508        let resolved_lineage = self.format_lineage_comment(&command.id, comment_lineage);
509        let _guard = resolved_lineage.as_ref().map(|lineage| {
510            crate::context::QueryCommentGuard::new(
511                self.repository.metadata.context,
512                Some(lineage.clone()),
513            )
514        });
515        let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
516        let affected = self.repository.update(&command)?;
517        let updated_fields = command.values.keys().cloned().collect();
518        let mut values = command.values.clone();
519        values.insert("id".to_owned(), command.id.clone());
520        if let Some(version) = command.expected_version {
521            values.insert("version".to_owned(), Value::I64(version + 1));
522        }
523        let mut new_values = old_values.clone().unwrap_or_default();
524        for (field, value) in &values {
525            new_values.insert(field.clone(), value.clone());
526        }
527        let mut event = EntityEvent::updated_with_old_values(
528            command.entity,
529            values,
530            old_values,
531            new_values,
532            updated_fields,
533        );
534        event.comment = resolved_lineage;
535        self.emit_event(event).map_err(RepositoryError::Runtime)?;
536        Ok(affected)
537    }
538
539    fn fetch_current_event_row(
540        &self,
541        entity: &str,
542        id: &Value,
543    ) -> Result<Option<Record>, RepositoryError<E::Error>> {
544        if self.repository.metadata.context.event_sink.is_none() {
545            return Ok(None);
546        }
547        let descriptor = self
548            .repository
549            .metadata
550            .context
551            .require_entity(entity)
552            .map_err(RepositoryError::Runtime)?;
553        let Some(id_property) = descriptor.id_property() else {
554            return Ok(None);
555        };
556        let mut rows = self.fetch_all(
557            &SelectQuery::new(entity).filter(Expr::eq(id_property.name.clone(), id.clone())),
558        )?;
559        Ok(rows.pop())
560    }
561
562    fn format_lineage_comment(
563        &self,
564        id: &Value,
565        comment_lineage: Option<String>,
566    ) -> Option<String> {
567        let raw_comment = comment_lineage.or_else(|| {
568            self.repository.metadata.context.comment_stack.lock().ok().and_then(|stack| {
569                if stack.is_empty() {
570                    None
571                } else {
572                    Some(stack.join(" -> "))
573                }
574            })
575        })?;
576
577        if raw_comment.is_empty() {
578            return None;
579        }
580
581        let id_str = match id {
582            Value::U64(n) => n.to_string(),
583            Value::I64(n) => n.to_string(),
584            Value::Text(s) => s.clone(),
585            other => format!("{other:?}"),
586        };
587        let prefix = format!("{}({}):", self.entity, id_str);
588        if raw_comment.starts_with(&prefix) || raw_comment.contains(&format!(" -> {}({}):", self.entity, id_str)) {
589            Some(raw_comment)
590        } else {
591            Some(format!("{}({}): {}", self.entity, id_str, raw_comment))
592        }
593    }
594
595    pub(super) fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, D, E> {
596        ResolvedRepository {
597            entity,
598            repository: ContextRepository {
599                metadata: UserContextMetadata {
600                    context: self.repository.metadata.context,
601                },
602                dialect: self.repository.dialect,
603                executor: self.repository.executor,
604            },
605        }
606    }
607}