Skip to main content

teaql_runtime/repository/
resolved.rs

1use std::sync::Arc;
2
3use teaql_core::{
4    AggregationCacheOptions, DeleteCommand, Entity, InsertCommand, Record, RecoverCommand,
5    RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
6};
7
8use crate::{
9    CheckObjectStatus, EntityEvent, RepositoryBehavior, RepositoryError, RuntimeError,
10    clear_record_status, mark_record_status,
11};
12
13use super::{
14    AggregationCacheBackend, ContextRepository, InMemoryAggregationCache,
15    ResolvedRepository, UserContextMetadata, helpers::*,
16};
17
18impl<'a, E> ResolvedRepository<'a, E>
19where
20    E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
21{
22    pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
23        self.repository.metadata.context.repository_behavior(entity)
24    }
25
26    pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
27        self.repository
28            .metadata
29            .context
30            .repository_behavior(&self.entity)
31    }
32
33    pub fn entity(&self) -> &str {
34        &self.entity
35    }
36
37    pub fn select(&self) -> SelectQuery {
38        SelectQuery::new(self.entity.clone())
39    }
40
41    pub fn insert_command(&self) -> InsertCommand {
42        InsertCommand::new(self.entity.clone())
43    }
44
45    fn enforce_insert_policy(&self, command: &mut InsertCommand) -> Result<(), RuntimeError> {
46        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
47            policy.enforce_insert(self.repository.metadata.context, command)?;
48        }
49        Ok(())
50    }
51
52    fn enforce_update_policy(&self, command: &mut UpdateCommand) -> Result<(), RuntimeError> {
53        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
54            policy.enforce_update(self.repository.metadata.context, command)?;
55        }
56        Ok(())
57    }
58
59    fn enforce_delete_policy(&self, command: &mut DeleteCommand) -> Result<(), RuntimeError> {
60        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
61            policy.enforce_delete(self.repository.metadata.context, command)?;
62        }
63        Ok(())
64    }
65
66    fn enforce_recover_policy(&self, command: &mut RecoverCommand) -> Result<(), RuntimeError> {
67        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
68            policy.enforce_recover(self.repository.metadata.context, command)?;
69        }
70        Ok(())
71    }
72
73    fn prepare_select_query(&self, query: &SelectQuery) -> Result<SelectQuery, RuntimeError> {
74        let mut query = query.clone();
75        
76        let mut full_trace = self.trace_context.clone();
77        full_trace.extend(query.trace_chain);
78        query.trace_chain = full_trace;
79
80        if let Some(behavior) = self.query_behavior(&query.entity) {
81            behavior.before_select(self.repository.metadata.context, &mut query)?;
82        }
83        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
84            policy.enforce_select(self.repository.metadata.context, &mut query)?;
85        }
86        Ok(query)
87    }
88
89    pub fn prepare_insert_command(
90        &self,
91        command: &InsertCommand,
92    ) -> Result<InsertCommand, RuntimeError> {
93        let mut command = command.clone();
94        if let Some(behavior) = self.behavior() {
95            behavior.before_insert(self.repository.metadata.context, &mut command)?;
96        }
97        self.enforce_insert_policy(&mut command)?;
98
99        let entity = self
100            .repository
101            .metadata
102            .context
103            .require_entity(&command.entity)?;
104        if let Some(id_property) = entity.id_property() {
105            let needs_id = !command.values.contains_key(&id_property.name)
106                || is_unassigned_id(command.values.get(&id_property.name));
107            if needs_id {
108                let id = self.repository.metadata.context.next_id(&command.entity)?;
109                command
110                    .values
111                    .insert(id_property.name.clone(), Value::U64(id));
112            }
113        }
114        ensure_initial_version(&mut command.values, entity);
115        mark_record_status(&mut command.values, CheckObjectStatus::Create);
116        let check_result = self
117            .repository
118            .metadata
119            .context
120            .check_and_fix_record(&command.entity, &mut command.values);
121        clear_record_status(&mut command.values);
122        check_result?;
123
124        Ok(command)
125    }
126
127    pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
128        UpdateCommand::new(self.entity.clone(), id)
129    }
130
131    pub fn prepare_update_command(
132        &self,
133        command: &UpdateCommand,
134    ) -> Result<UpdateCommand, RuntimeError> {
135        let mut command = command.clone();
136        if let Some(behavior) = self.behavior() {
137            behavior.before_update(self.repository.metadata.context, &mut command)?;
138        }
139        self.enforce_update_policy(&mut command)?;
140
141        Ok(command)
142    }
143
144    pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
145        DeleteCommand::new(self.entity.clone(), id)
146    }
147
148    pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
149        RecoverCommand::new(self.entity.clone(), id, expected_version)
150    }
151
152    pub async fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
153        let query = self
154            .prepare_select_query(query)
155            .map_err(RepositoryError::Runtime)?;
156        self.fetch_prepared_all(&query).await
157    }
158
159    async fn fetch_prepared_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
160        let mut rows = self.fetch_prepared_query(query).await?;
161        self.enhance_object_group_bys(&mut rows, &query.object_group_bys, &query.trace_chain).await?;
162        self.enhance_child_queries(&mut rows, &query.child_enhancements, &query.trace_chain).await?;
163        self.enhance_query_relations(&mut rows, query).await?;
164        Ok(rows)
165    }
166
167    async fn fetch_prepared_query(
168        &self,
169        query: &SelectQuery,
170    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
171
172        let final_comment = self.repository.resolve_final_comment(&query.trace_chain, query.comment.clone());
173        let mut query = query.clone();
174        query.comment = final_comment;
175        if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
176            if let Some(cache) = self
177                .repository
178                .metadata
179                .context
180                .get_resource::<Arc<dyn AggregationCacheBackend>>()
181            {
182                return self.fetch_prepared_query_with_cache(
183                    &query,
184                    options,
185                    cache.as_ref(),
186                ).await;
187            }
188            if let Some(cache) = self
189                .repository
190                .metadata
191                .context
192                .get_resource::<InMemoryAggregationCache>()
193            {
194                return self.fetch_prepared_query_with_cache(&query, options, cache).await;
195            }
196        }
197        let request = teaql_data_service::QueryRequest {
198            query: query.clone(),
199            trace_chain: query.trace_chain.clone(),
200            comment: query.comment.clone(),
201        };
202        let res = self
203            .repository
204            .executor
205            .query(request)
206            .await.map_err(RepositoryError::Executor)?;
207        self.repository.metadata.context.record_metadata_log(&res.metadata);
208        Ok(res.rows)
209    }
210
211    async fn fetch_prepared_query_with_cache(
212        &self,
213        query: &SelectQuery,
214        options: AggregationCacheOptions,
215        cache: &dyn AggregationCacheBackend,
216    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
217
218        let key = aggregation_cache_key(
219            cache.namespace(),
220            &aggregation_cache_namespace(&query.entity),
221            query,
222        );
223        if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
224            return Ok(rows);
225        }
226        let request = teaql_data_service::QueryRequest {
227            query: query.clone(),
228            trace_chain: query.trace_chain.clone(),
229            comment: query.comment.clone(),
230        };
231        let res = self
232            .repository
233            .executor
234            .query(request)
235            .await.map_err(RepositoryError::Executor)?;
236        self.repository.metadata.context.record_metadata_log(&res.metadata);
237        let rows = res.rows;
238        cache.put(key, rows.clone());
239        Ok(rows)
240    }
241
242    pub async fn fetch_all_with_relation_aggregates(
243        &self,
244        query: &SelectQuery,
245        relation_aggregates: &[RelationAggregate],
246    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
247        let query = self
248            .prepare_select_query(query)
249            .map_err(RepositoryError::Runtime)?;
250
251        let mut rows = self.fetch_prepared_all(&query).await?;
252        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain).await?;
253        Ok(rows)
254    }
255
256    pub async fn fetch_smart_list(
257        &self,
258        query: &SelectQuery,
259    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
260        let query = self
261            .prepare_select_query(query)
262            .map_err(RepositoryError::Runtime)?;
263
264        self.repository.fetch_smart_list(&query).await
265    }
266
267    pub async fn fetch_smart_list_with_relation_aggregates(
268        &self,
269        query: &SelectQuery,
270        relation_aggregates: &[RelationAggregate],
271    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
272        self.fetch_all_with_relation_aggregates(query, relation_aggregates).await
273            .map(SmartList::from)
274    }
275
276    pub async fn fetch_entities<T>(
277        &self,
278        query: &SelectQuery,
279    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
280    where
281        T: Entity,
282    {
283        let query = self
284            .prepare_select_query(query)
285            .map_err(RepositoryError::Runtime)?;
286
287        self.repository.fetch_entities(&query).await
288    }
289
290    pub async fn fetch_entities_with_relation_aggregates<T>(
291        &self,
292        query: &SelectQuery,
293        relation_aggregates: &[RelationAggregate],
294    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
295    where
296        T: Entity,
297    {
298        self.fetch_all_with_relation_aggregates(query, relation_aggregates).await?
299            .into_iter()
300            .map(|record| {
301                let mut entity = T::from_record(record)?;
302                let root = crate::EntityRoot::default();
303                entity.on_loaded(&root as &dyn std::any::Any);
304                Ok(entity)
305            })
306            .collect::<Result<Vec<_>, _>>()
307            .map(SmartList::from)
308            .map_err(RepositoryError::Entity)
309    }
310
311    pub async fn fetch_enhanced_entities_with_relation_aggregates<T>(
312        &self,
313        query: &SelectQuery,
314        relation_aggregates: &[RelationAggregate],
315    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
316    where
317        T: Entity,
318    {
319        let query = self
320            .prepare_select_query(query)
321            .map_err(RepositoryError::Runtime)?;
322
323        let mut rows = self.fetch_prepared_all(&query).await?;
324        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain).await?;
325        self.enhance_query_relations(&mut rows, &query).await?;
326        self.enhance_relations(&mut rows).await?;
327        rows.into_iter()
328            .map(|record| {
329                let mut entity = T::from_record(record)?;
330                let root = crate::EntityRoot::default();
331                entity.on_loaded(&root as &dyn std::any::Any);
332                Ok(entity)
333            })
334            .collect::<Result<Vec<_>, _>>()
335            .map(SmartList::from)
336            .map_err(RepositoryError::Entity)
337    }
338
339    pub async fn fetch_enhanced_entities<T>(
340        &self,
341        query: &SelectQuery,
342    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
343    where
344        T: Entity,
345    {
346        let query = self
347            .prepare_select_query(query)
348            .map_err(RepositoryError::Runtime)?;
349
350        let mut rows = self.fetch_prepared_all(&query).await?;
351        self.enhance_query_relations(&mut rows, &query).await?;
352        self.enhance_relations(&mut rows).await?;
353        let root = self.repository.metadata.context.get_resource::<crate::EntityRoot>().cloned();
354
355        rows.into_iter()
356            .map(|record| {
357                let mut entity = T::from_record(record)?;
358                if let Some(ref root) = root {
359                    entity.on_loaded(root as &dyn std::any::Any);
360                }
361                Ok(entity)
362            })
363            .collect::<Result<Vec<_>, _>>()
364            .map(SmartList::from)
365            .map_err(RepositoryError::Entity)
366    }
367
368    pub async fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
369        let mut command = command.clone();
370        if let Some(behavior) = self.behavior() {
371            behavior
372                .before_insert(self.repository.metadata.context, &mut command)
373                .map_err(RepositoryError::Runtime)?;
374        }
375        self.enforce_insert_policy(&mut command).map_err(RepositoryError::Runtime)?;
376        self.execute_prepared_insert_with_comment(command, self.trace_context.clone()).await
377    }
378
379    pub async fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
380        let mut command = command.clone();
381        if let Some(behavior) = self.behavior() {
382            behavior
383                .before_update(self.repository.metadata.context, &mut command)
384                .map_err(RepositoryError::Runtime)?;
385        }
386        self.enforce_update_policy(&mut command).map_err(RepositoryError::Runtime)?;
387        self.execute_prepared_update_with_comment(command, self.trace_context.clone()).await
388    }
389
390    pub async fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
391        self.delete_scoped(command, self.trace_context.clone()).await
392    }
393
394    pub async fn delete_scoped(
395        &self,
396        command: &DeleteCommand,
397        trace_chain: Vec<teaql_core::TraceNode>,
398    ) -> Result<u64, RepositoryError<E::Error>> {
399        let mut command = command.clone();
400        command.trace_chain = trace_chain.clone();
401        if let Some(behavior) = self.behavior() {
402            behavior
403                .before_delete(self.repository.metadata.context, &mut command)
404                .map_err(RepositoryError::Runtime)?;
405        }
406        self.enforce_delete_policy(&mut command)
407            .map_err(RepositoryError::Runtime)?;
408
409        let old_values = self.fetch_current_event_row(&command.entity, &command.id, trace_chain.clone())?;
410        let affected = self.repository.delete(&command).await?;
411
412        let mut event = EntityEvent::deleted_with_old_values(
413            command.entity,
414            command.id,
415            command.expected_version,
416            old_values,
417        );
418        event.trace_chain = trace_chain;
419        self.emit_event(event)
420            .map_err(RepositoryError::Runtime)?;
421        Ok(affected)
422    }
423
424    pub async fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
425        let mut command = command.clone();
426        command.trace_chain = self.trace_context.clone();
427        if let Some(behavior) = self.behavior() {
428            behavior
429                .before_recover(self.repository.metadata.context, &mut command)
430                .map_err(RepositoryError::Runtime)?;
431        }
432        self.enforce_recover_policy(&mut command)
433            .map_err(RepositoryError::Runtime)?;
434        let old_values = self.fetch_current_event_row(&command.entity, &command.id, command.trace_chain.clone())?;
435        let affected = self.repository.recover(&command).await?;
436        let event = EntityEvent::recovered_with_old_values(
437            command.entity,
438            command.id,
439            command.expected_version,
440            old_values,
441        );
442        self.emit_event(event)
443            .map_err(RepositoryError::Runtime)?;
444        Ok(affected)
445    }
446
447    fn emit_event(&self, event: EntityEvent) -> Result<(), RuntimeError> {
448        self.repository.metadata.context.send_event(event)
449    }
450
451    #[allow(dead_code)]
452    pub(super) async fn execute_prepared_insert(
453        &self,
454        command: InsertCommand,
455    ) -> Result<u64, RepositoryError<E::Error>> {
456        self.execute_prepared_insert_with_comment(command, Vec::new()).await
457    }
458
459    pub(super) async fn execute_prepared_insert_with_comment(
460        &self,
461        mut command: InsertCommand,
462        trace_chain: Vec<teaql_core::TraceNode>,
463    ) -> Result<u64, RepositoryError<E::Error>> {
464        command.trace_chain = trace_chain.clone();
465        let affected = self.repository.insert(&command).await?;
466        let mut event = EntityEvent::created(command.entity, command.values);
467        event.trace_chain = trace_chain;
468        self.emit_event(event).map_err(RepositoryError::Runtime)?;
469        Ok(affected)
470    }
471
472    pub(super) async fn execute_prepared_batch_insert(
473        &self,
474        command: teaql_core::BatchInsertCommand,
475    ) -> Result<u64, RepositoryError<E::Error>> {
476        if command.batch_values.is_empty() {
477            return Ok(0);
478        }
479        let affected = self.repository.batch_insert(&command).await?;
480        
481        let entity = command.entity.clone();
482        for (i, values) in command.batch_values.into_iter().enumerate() {
483            let mut event = EntityEvent::created(entity.clone(), values);
484            if i < command.trace_chains.len() {
485                event.trace_chain = command.trace_chains[i].clone();
486            }
487            self.emit_event(event).map_err(RepositoryError::Runtime)?;
488        }
489        Ok(affected)
490    }
491
492    #[allow(dead_code)]
493    pub(super) async fn execute_prepared_update(
494        &self,
495        command: UpdateCommand,
496    ) -> Result<u64, RepositoryError<E::Error>> {
497        self.execute_prepared_update_with_comment(command, Vec::new()).await
498    }
499
500    pub(super) async fn execute_prepared_update_with_comment(
501        &self,
502        mut command: UpdateCommand,
503        trace_chain: Vec<teaql_core::TraceNode>,
504    ) -> Result<u64, RepositoryError<E::Error>> {
505        command.trace_chain = trace_chain.clone();
506        
507        let mut old_values = command.old_values.clone();
508        let needs_fetch = match &old_values {
509            Some(snapshot) => !command.values.keys().all(|k| snapshot.contains_key(k)),
510            None => true,
511        };
512        if needs_fetch {
513            old_values = self.fetch_current_event_row(&command.entity, &command.id, trace_chain.clone())?;
514        }
515
516        let affected = self.repository.update(&command).await?;
517        let updated_fields = command.values.keys().cloned().collect();
518        let mut values = command.values.clone();
519        values.insert("id".to_owned(), command.id.clone());
520        if let Some(version) = command.expected_version {
521            values.insert("version".to_owned(), Value::I64(version + 1));
522        }
523        let mut new_values = old_values.clone().unwrap_or_default();
524        for (field, value) in &values {
525            new_values.insert(field.clone(), value.clone());
526        }
527        let mut event = EntityEvent::updated_with_old_values(
528            command.entity,
529            values,
530            old_values,
531            new_values,
532            updated_fields,
533        );
534        event.trace_chain = trace_chain;
535        self.emit_event(event).map_err(RepositoryError::Runtime)?;
536        Ok(affected)
537    }
538
539    pub(super) async fn execute_prepared_batch_update(
540        &self,
541        command: teaql_core::BatchUpdateCommand,
542    ) -> Result<u64, RepositoryError<E::Error>> {
543        if command.batch_values.is_empty() {
544            return Ok(0);
545        }
546        let affected = self.repository.batch_update(&command).await?;
547        
548        let entity = command.entity.clone();
549        for (i, values) in command.batch_values.into_iter().enumerate() {
550            let mut full_values = values.clone();
551            full_values.insert("id".to_owned(), command.batch_ids[i].clone());
552            if let Some(Some(version)) = command.batch_expected_versions.get(i) {
553                full_values.insert("version".to_owned(), teaql_core::Value::I64(*version + 1));
554            }
555            
556            let old_values = command.batch_old_values.get(i).cloned().unwrap_or(None);
557            let mut new_values = old_values.clone().unwrap_or_default();
558            for (field, value) in &full_values {
559                new_values.insert(field.clone(), value.clone());
560            }
561            
562            let mut event = EntityEvent::updated_with_old_values(
563                entity.clone(),
564                full_values,
565                old_values,
566                new_values,
567                command.update_fields.clone(),
568            );
569            if i < command.trace_chains.len() {
570                event.trace_chain = command.trace_chains[i].clone();
571            }
572            self.emit_event(event).map_err(RepositoryError::Runtime)?;
573        }
574        Ok(affected)
575    }
576
577    fn fetch_current_event_row(
578        &self,
579        _entity: &str,
580        _id: &Value,
581        _trace_chain: Vec<teaql_core::TraceNode>,
582    ) -> Result<Option<Record>, RepositoryError<E::Error>> {
583        // PER THE USER: "我们不需要在审计的时候去抓旧的值"
584        // Avoid DB queries during event emission. We rely on in-memory `original_values`.
585        Ok(None)
586    }
587
588
589    pub fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, E> {
590        ResolvedRepository {
591            entity,
592            repository: ContextRepository {
593                metadata: UserContextMetadata {
594                    context: self.repository.metadata.context,
595                },
596                executor: self.repository.executor,
597            },
598            trace_context: Vec::new(),
599        }
600    }
601}