Skip to main content

teaql_runtime/repository/
resolved.rs

1use std::sync::Arc;
2use std::time::{Instant, SystemTime};
3
4use teaql_core::{
5    AggregationCacheOptions, DeleteCommand, Entity, Expr, InsertCommand, Record, RecoverCommand,
6    RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
7};
8use teaql_sql::{CompiledQuery, SqlDialect};
9
10use crate::{
11    CheckObjectStatus, EntityEvent, RepositoryBehavior, RepositoryError, RuntimeError,
12    SqlLogOperation, clear_record_status, mark_record_status,
13};
14
15use super::{
16    AggregationCacheBackend, ContextRepository, InMemoryAggregationCache, QueryExecutor,
17    ResolvedRepository, UserContextMetadata, helpers::*,
18};
19
20impl<'a, D, E> ResolvedRepository<'a, D, E>
21where
22    D: SqlDialect,
23    E: QueryExecutor,
24{
25    pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
26        self.repository.metadata.context.repository_behavior(entity)
27    }
28
29    pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
30        self.repository
31            .metadata
32            .context
33            .repository_behavior(&self.entity)
34    }
35
36    pub fn entity(&self) -> &str {
37        &self.entity
38    }
39
40    pub fn select(&self) -> SelectQuery {
41        SelectQuery::new(self.entity.clone())
42    }
43
44    pub fn insert_command(&self) -> InsertCommand {
45        InsertCommand::new(self.entity.clone())
46    }
47
48    fn enforce_insert_policy(&self, command: &mut InsertCommand) -> Result<(), RuntimeError> {
49        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
50            policy.enforce_insert(self.repository.metadata.context, command)?;
51        }
52        Ok(())
53    }
54
55    fn enforce_update_policy(&self, command: &mut UpdateCommand) -> Result<(), RuntimeError> {
56        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
57            policy.enforce_update(self.repository.metadata.context, command)?;
58        }
59        Ok(())
60    }
61
62    fn enforce_delete_policy(&self, command: &mut DeleteCommand) -> Result<(), RuntimeError> {
63        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
64            policy.enforce_delete(self.repository.metadata.context, command)?;
65        }
66        Ok(())
67    }
68
69    fn enforce_recover_policy(&self, command: &mut RecoverCommand) -> Result<(), RuntimeError> {
70        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
71            policy.enforce_recover(self.repository.metadata.context, command)?;
72        }
73        Ok(())
74    }
75
76    fn prepare_select_query(&self, query: &SelectQuery) -> Result<SelectQuery, RuntimeError> {
77        let mut query = query.clone();
78        
79        let mut full_trace = self.trace_context.clone();
80        full_trace.extend(query.trace_chain);
81        query.trace_chain = full_trace;
82
83        if let Some(behavior) = self.query_behavior(&query.entity) {
84            behavior.before_select(self.repository.metadata.context, &mut query)?;
85        }
86        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
87            policy.enforce_select(self.repository.metadata.context, &mut query)?;
88        }
89        Ok(query)
90    }
91
92    pub fn prepare_insert_command(
93        &self,
94        command: &InsertCommand,
95    ) -> Result<InsertCommand, RuntimeError> {
96        let mut command = command.clone();
97        if let Some(behavior) = self.behavior() {
98            behavior.before_insert(self.repository.metadata.context, &mut command)?;
99        }
100        self.enforce_insert_policy(&mut command)?;
101
102        let entity = self
103            .repository
104            .metadata
105            .context
106            .require_entity(&command.entity)?;
107        if let Some(id_property) = entity.id_property() {
108            let needs_id = !command.values.contains_key(&id_property.name)
109                || is_unassigned_id(command.values.get(&id_property.name));
110            if needs_id {
111                let id = self.repository.metadata.context.next_id(&command.entity)?;
112                command
113                    .values
114                    .insert(id_property.name.clone(), Value::U64(id));
115            }
116        }
117        ensure_initial_version(&mut command.values, entity);
118        mark_record_status(&mut command.values, CheckObjectStatus::Create);
119        let check_result = self
120            .repository
121            .metadata
122            .context
123            .check_and_fix_record(&command.entity, &mut command.values);
124        clear_record_status(&mut command.values);
125        check_result?;
126
127        Ok(command)
128    }
129
130    pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
131        UpdateCommand::new(self.entity.clone(), id)
132    }
133
134    pub fn prepare_update_command(
135        &self,
136        command: &UpdateCommand,
137    ) -> Result<UpdateCommand, RuntimeError> {
138        let mut command = command.clone();
139        if let Some(behavior) = self.behavior() {
140            behavior.before_update(self.repository.metadata.context, &mut command)?;
141        }
142        self.enforce_update_policy(&mut command)?;
143
144        Ok(command)
145    }
146
147    pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
148        DeleteCommand::new(self.entity.clone(), id)
149    }
150
151    pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
152        RecoverCommand::new(self.entity.clone(), id, expected_version)
153    }
154
155    pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
156        let query = self.prepare_select_query(query)?;
157        self.repository.compile(&query)
158    }
159
160    pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
161        let query = self
162            .prepare_select_query(query)
163            .map_err(RepositoryError::Runtime)?;
164        self.fetch_prepared_all(&query)
165    }
166
167    fn fetch_prepared_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
168        let mut rows = self.fetch_prepared_query(query)?;
169        self.enhance_object_group_bys(&mut rows, &query.object_group_bys, &query.trace_chain)?;
170        self.enhance_child_queries(&mut rows, &query.child_enhancements, &query.trace_chain)?;
171        Ok(rows)
172    }
173
174    fn fetch_prepared_query(
175        &self,
176        query: &SelectQuery,
177    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
178
179        let mut compiled = self
180            .repository
181            .compile(query)
182            .map_err(RepositoryError::Runtime)?;
183        let final_comment = self.repository.resolve_final_comment(&query.trace_chain, query.comment.clone());
184        compiled.comment = final_comment;
185        if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
186            if let Some(cache) = self
187                .repository
188                .metadata
189                .context
190                .get_resource::<Arc<dyn AggregationCacheBackend>>()
191            {
192                return self.fetch_prepared_query_with_cache(
193                    query,
194                    &compiled,
195                    options,
196                    cache.as_ref(),
197                );
198            }
199            if let Some(cache) = self
200                .repository
201                .metadata
202                .context
203                .get_resource::<InMemoryAggregationCache>()
204            {
205                return self.fetch_prepared_query_with_cache(query, &compiled, options, cache);
206            }
207        }
208        let started_at = SystemTime::now();
209        let started = Instant::now();
210        let rows = self
211            .repository
212            .executor
213            .fetch_all(&compiled)
214            .map_err(RepositoryError::Executor)?;
215        self.repository.log_sql_result(
216            SqlLogOperation::Select,
217            &compiled,
218            started_at,
219            started,
220            Some(rows.len()),
221            Some(query.entity.clone()),
222            None,
223            query.trace_chain.clone(),
224        );
225        Ok(rows)
226    }
227
228    fn fetch_prepared_query_with_cache(
229        &self,
230        query: &SelectQuery,
231        compiled: &CompiledQuery,
232        options: AggregationCacheOptions,
233        cache: &dyn AggregationCacheBackend,
234    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
235
236        let key = aggregation_cache_key(
237            cache.namespace(),
238            &aggregation_cache_namespace(&query.entity),
239            compiled,
240        );
241        if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
242            return Ok(rows);
243        }
244        let started_at = SystemTime::now();
245        let started = Instant::now();
246        let rows = self
247            .repository
248            .executor
249            .fetch_all(compiled)
250            .map_err(RepositoryError::Executor)?;
251        self.repository.log_sql_result(
252            SqlLogOperation::Select,
253            compiled,
254            started_at,
255            started,
256            Some(rows.len()),
257            Some(query.entity.clone()),
258            None,
259            query.trace_chain.clone(),
260        );
261        cache.put(key, rows.clone());
262        Ok(rows)
263    }
264
265    pub fn fetch_all_with_relation_aggregates(
266        &self,
267        query: &SelectQuery,
268        relation_aggregates: &[RelationAggregate],
269    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
270        let query = self
271            .prepare_select_query(query)
272            .map_err(RepositoryError::Runtime)?;
273
274        let mut rows = self.fetch_prepared_all(&query)?;
275        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain)?;
276        Ok(rows)
277    }
278
279    pub fn fetch_smart_list(
280        &self,
281        query: &SelectQuery,
282    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
283        let query = self
284            .prepare_select_query(query)
285            .map_err(RepositoryError::Runtime)?;
286
287        self.repository.fetch_smart_list(&query)
288    }
289
290    pub fn fetch_smart_list_with_relation_aggregates(
291        &self,
292        query: &SelectQuery,
293        relation_aggregates: &[RelationAggregate],
294    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
295        self.fetch_all_with_relation_aggregates(query, relation_aggregates)
296            .map(SmartList::from)
297    }
298
299    pub fn fetch_entities<T>(
300        &self,
301        query: &SelectQuery,
302    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
303    where
304        T: Entity,
305    {
306        let query = self
307            .prepare_select_query(query)
308            .map_err(RepositoryError::Runtime)?;
309
310        self.repository.fetch_entities(&query)
311    }
312
313    pub fn fetch_entities_with_relation_aggregates<T>(
314        &self,
315        query: &SelectQuery,
316        relation_aggregates: &[RelationAggregate],
317    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
318    where
319        T: Entity,
320    {
321        self.fetch_all_with_relation_aggregates(query, relation_aggregates)?
322            .into_iter()
323            .map(|record| {
324                let mut entity = T::from_record(record)?;
325                let root = crate::EntityRoot::default();
326                entity.on_loaded(&root as &dyn std::any::Any);
327                Ok(entity)
328            })
329            .collect::<Result<Vec<_>, _>>()
330            .map(SmartList::from)
331            .map_err(RepositoryError::Entity)
332    }
333
334    pub fn fetch_enhanced_entities_with_relation_aggregates<T>(
335        &self,
336        query: &SelectQuery,
337        relation_aggregates: &[RelationAggregate],
338    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
339    where
340        T: Entity,
341    {
342        let query = self
343            .prepare_select_query(query)
344            .map_err(RepositoryError::Runtime)?;
345
346
347
348        let mut rows = self.fetch_prepared_all(&query)?;
349        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain)?;
350        self.enhance_query_relations(&mut rows, &query)?;
351        self.enhance_relations(&mut rows)?;
352        rows.into_iter()
353            .map(|record| {
354                let mut entity = T::from_record(record)?;
355                let root = crate::EntityRoot::default();
356                entity.on_loaded(&root as &dyn std::any::Any);
357                Ok(entity)
358            })
359            .collect::<Result<Vec<_>, _>>()
360            .map(SmartList::from)
361            .map_err(RepositoryError::Entity)
362    }
363
364    pub fn fetch_enhanced_entities<T>(
365        &self,
366        query: &SelectQuery,
367    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
368    where
369        T: Entity,
370    {
371        let query = self
372            .prepare_select_query(query)
373            .map_err(RepositoryError::Runtime)?;
374
375
376        let mut rows = self.fetch_prepared_all(&query)?;
377        self.enhance_query_relations(&mut rows, &query)?;
378        self.enhance_relations(&mut rows)?;
379        let root = self.repository.metadata.context.get_resource::<crate::EntityRoot>().cloned();
380
381        rows.into_iter()
382            .map(|record| {
383                let mut entity = T::from_record(record)?;
384                if let Some(ref root) = root {
385                    entity.on_loaded(root as &dyn std::any::Any);
386                }
387                Ok(entity)
388            })
389            .collect::<Result<Vec<_>, _>>()
390            .map(SmartList::from)
391            .map_err(RepositoryError::Entity)
392    }
393
394    pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
395        let command = self
396            .prepare_insert_command(command)
397            .map_err(RepositoryError::Runtime)?;
398        self.execute_prepared_insert_with_comment(command, self.trace_context.clone())
399    }
400
401    pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
402        let command = self
403            .prepare_update_command(command)
404            .map_err(RepositoryError::Runtime)?;
405        self.execute_prepared_update_with_comment(command, self.trace_context.clone())
406    }
407
408    pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
409        self.delete_scoped(command, self.trace_context.clone())
410    }
411
412    pub fn delete_scoped(
413        &self,
414        command: &DeleteCommand,
415        trace_chain: Vec<teaql_core::TraceNode>,
416    ) -> Result<u64, RepositoryError<E::Error>> {
417        let mut command = command.clone();
418        command.trace_chain = trace_chain.clone();
419        if let Some(behavior) = self.behavior() {
420            behavior
421                .before_delete(self.repository.metadata.context, &mut command)
422                .map_err(RepositoryError::Runtime)?;
423        }
424        self.enforce_delete_policy(&mut command)
425            .map_err(RepositoryError::Runtime)?;
426
427        let old_values = self.fetch_current_event_row(&command.entity, &command.id, trace_chain.clone())?;
428        let affected = self.repository.delete(&command)?;
429
430        let mut event = EntityEvent::deleted_with_old_values(
431            command.entity,
432            command.id,
433            command.expected_version,
434            old_values,
435        );
436        event.trace_chain = trace_chain;
437        self.emit_event(event)
438            .map_err(RepositoryError::Runtime)?;
439        Ok(affected)
440    }
441
442    pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
443        let mut command = command.clone();
444        command.trace_chain = self.trace_context.clone();
445        if let Some(behavior) = self.behavior() {
446            behavior
447                .before_recover(self.repository.metadata.context, &mut command)
448                .map_err(RepositoryError::Runtime)?;
449        }
450        self.enforce_recover_policy(&mut command)
451            .map_err(RepositoryError::Runtime)?;
452        let old_values = self.fetch_current_event_row(&command.entity, &command.id, command.trace_chain.clone())?;
453        let affected = self.repository.recover(&command)?;
454        let event = EntityEvent::recovered_with_old_values(
455            command.entity,
456            command.id,
457            command.expected_version,
458            old_values,
459        );
460        self.emit_event(event)
461            .map_err(RepositoryError::Runtime)?;
462        Ok(affected)
463    }
464
465    fn emit_event(&self, event: EntityEvent) -> Result<(), RuntimeError> {
466        self.repository.metadata.context.send_event(event)
467    }
468
469    #[allow(dead_code)]
470    pub(super) fn execute_prepared_insert(
471        &self,
472        command: InsertCommand,
473    ) -> Result<u64, RepositoryError<E::Error>> {
474        self.execute_prepared_insert_with_comment(command, Vec::new())
475    }
476
477    pub(super) fn execute_prepared_insert_with_comment(
478        &self,
479        mut command: InsertCommand,
480        trace_chain: Vec<teaql_core::TraceNode>,
481    ) -> Result<u64, RepositoryError<E::Error>> {
482        command.trace_chain = trace_chain.clone();
483        let affected = self.repository.insert(&command)?;
484        let mut event = EntityEvent::created(command.entity, command.values);
485        event.trace_chain = trace_chain;
486        self.emit_event(event).map_err(RepositoryError::Runtime)?;
487        Ok(affected)
488    }
489
490    pub(super) fn execute_prepared_batch_insert(
491        &self,
492        command: teaql_core::BatchInsertCommand,
493    ) -> Result<u64, RepositoryError<E::Error>> {
494        if command.batch_values.is_empty() {
495            return Ok(0);
496        }
497        let affected = self.repository.batch_insert(&command)?;
498        
499        let entity = command.entity.clone();
500        for (i, values) in command.batch_values.into_iter().enumerate() {
501            let mut event = EntityEvent::created(entity.clone(), values);
502            if i < command.trace_chains.len() {
503                event.trace_chain = command.trace_chains[i].clone();
504            }
505            self.emit_event(event).map_err(RepositoryError::Runtime)?;
506        }
507        Ok(affected)
508    }
509
510    #[allow(dead_code)]
511    pub(super) fn execute_prepared_update(
512        &self,
513        command: UpdateCommand,
514    ) -> Result<u64, RepositoryError<E::Error>> {
515        self.execute_prepared_update_with_comment(command, Vec::new())
516    }
517
518    pub(super) fn execute_prepared_update_with_comment(
519        &self,
520        mut command: UpdateCommand,
521        trace_chain: Vec<teaql_core::TraceNode>,
522    ) -> Result<u64, RepositoryError<E::Error>> {
523        command.trace_chain = trace_chain.clone();
524        
525        let mut old_values = command.old_values.clone();
526        let needs_fetch = match &old_values {
527            Some(snapshot) => !command.values.keys().all(|k| snapshot.contains_key(k)),
528            None => true,
529        };
530        if needs_fetch {
531            old_values = self.fetch_current_event_row(&command.entity, &command.id, trace_chain.clone())?;
532        }
533
534        let affected = self.repository.update(&command)?;
535        let updated_fields = command.values.keys().cloned().collect();
536        let mut values = command.values.clone();
537        values.insert("id".to_owned(), command.id.clone());
538        if let Some(version) = command.expected_version {
539            values.insert("version".to_owned(), Value::I64(version + 1));
540        }
541        let mut new_values = old_values.clone().unwrap_or_default();
542        for (field, value) in &values {
543            new_values.insert(field.clone(), value.clone());
544        }
545        let mut event = EntityEvent::updated_with_old_values(
546            command.entity,
547            values,
548            old_values,
549            new_values,
550            updated_fields,
551        );
552        event.trace_chain = trace_chain;
553        self.emit_event(event).map_err(RepositoryError::Runtime)?;
554        Ok(affected)
555    }
556
557    pub(super) fn execute_prepared_batch_update(
558        &self,
559        command: teaql_core::BatchUpdateCommand,
560    ) -> Result<u64, RepositoryError<E::Error>> {
561        if command.batch_values.is_empty() {
562            return Ok(0);
563        }
564        let affected = self.repository.batch_update(&command)?;
565        
566        let entity = command.entity.clone();
567        for (i, values) in command.batch_values.into_iter().enumerate() {
568            let mut full_values = values.clone();
569            full_values.insert("id".to_owned(), command.batch_ids[i].clone());
570            if let Some(Some(version)) = command.batch_expected_versions.get(i) {
571                full_values.insert("version".to_owned(), teaql_core::Value::I64(*version + 1));
572            }
573            
574            let old_values = command.batch_old_values.get(i).cloned().unwrap_or(None);
575            let mut new_values = old_values.clone().unwrap_or_default();
576            for (field, value) in &full_values {
577                new_values.insert(field.clone(), value.clone());
578            }
579            
580            let mut event = EntityEvent::updated_with_old_values(
581                entity.clone(),
582                full_values,
583                old_values,
584                new_values,
585                command.update_fields.clone(),
586            );
587            if i < command.trace_chains.len() {
588                event.trace_chain = command.trace_chains[i].clone();
589            }
590            self.emit_event(event).map_err(RepositoryError::Runtime)?;
591        }
592        Ok(affected)
593    }
594
595    fn fetch_current_event_row(
596        &self,
597        _entity: &str,
598        _id: &Value,
599        _trace_chain: Vec<teaql_core::TraceNode>,
600    ) -> Result<Option<Record>, RepositoryError<E::Error>> {
601        // PER THE USER: "我们不需要在审计的时候去抓旧的值"
602        // Avoid DB queries during event emission. We rely on in-memory `original_values`.
603        Ok(None)
604    }
605
606
607    pub fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, D, E> {
608        ResolvedRepository {
609            entity,
610            repository: ContextRepository {
611                metadata: UserContextMetadata {
612                    context: self.repository.metadata.context,
613                },
614                dialect: self.repository.dialect,
615                executor: self.repository.executor,
616            },
617            trace_context: Vec::new(),
618        }
619    }
620}