Skip to main content

teaql_runtime/repository/
resolved.rs

1use std::sync::Arc;
2
3use teaql_core::{
4    AggregationCacheOptions, DeleteCommand, Entity, InsertCommand, Record, RecoverCommand,
5    RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
6};
7
8use crate::{
9    CheckObjectStatus, RawAuditEvent, RepositoryBehavior, RepositoryError, RuntimeError,
10    clear_record_status, mark_record_status,
11};
12
13use super::{
14    AggregationCacheBackend, ContextRepository, InMemoryAggregationCache,
15    ResolvedRepository, UserContextMetadata, helpers::*,
16};
17
18impl<'a, E> ResolvedRepository<'a, E>
19where
20    E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
21{
22    pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
23        self.repository.metadata.context.repository_behavior(entity)
24    }
25
26    pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
27        self.repository
28            .metadata
29            .context
30            .repository_behavior(&self.entity)
31    }
32
33    pub fn entity(&self) -> &str {
34        &self.entity
35    }
36
37    pub fn select(&self) -> SelectQuery {
38        SelectQuery::new(self.entity.clone())
39    }
40
41    pub fn insert_command(&self) -> InsertCommand {
42        InsertCommand::new(self.entity.clone())
43    }
44
45    fn enforce_insert_policy(&self, command: &mut InsertCommand) -> Result<(), RuntimeError> {
46        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
47            policy.enforce_insert(self.repository.metadata.context, command)?;
48        }
49        Ok(())
50    }
51
52    fn enforce_update_policy(&self, command: &mut UpdateCommand) -> Result<(), RuntimeError> {
53        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
54            policy.enforce_update(self.repository.metadata.context, command)?;
55        }
56        Ok(())
57    }
58
59    fn enforce_delete_policy(&self, command: &mut DeleteCommand) -> Result<(), RuntimeError> {
60        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
61            policy.enforce_delete(self.repository.metadata.context, command)?;
62        }
63        Ok(())
64    }
65
66    fn enforce_recover_policy(&self, command: &mut RecoverCommand) -> Result<(), RuntimeError> {
67        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
68            policy.enforce_recover(self.repository.metadata.context, command)?;
69        }
70        Ok(())
71    }
72
73    fn prepare_select_query(&self, query: &SelectQuery) -> Result<SelectQuery, RuntimeError> {
74        let mut query = query.clone();
75        
76        let mut full_trace = self.trace_context.clone();
77        full_trace.extend(query.trace_chain);
78        query.trace_chain = full_trace;
79
80        if let Some(behavior) = self.query_behavior(&query.entity) {
81            behavior.before_select(self.repository.metadata.context, &mut query)?;
82        }
83        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
84            policy.enforce_select(self.repository.metadata.context, &mut query)?;
85        }
86        Ok(query)
87    }
88
89    pub fn prepare_insert_command(
90        &self,
91        command: &InsertCommand,
92    ) -> Result<InsertCommand, RuntimeError> {
93        let mut command = command.clone();
94        if let Some(behavior) = self.behavior() {
95            behavior.before_insert(self.repository.metadata.context, &mut command)?;
96        }
97        self.enforce_insert_policy(&mut command)?;
98
99        let entity = self
100            .repository
101            .metadata
102            .context
103            .require_entity(&command.entity)?;
104        if let Some(id_property) = entity.id_property() {
105            let needs_id = !command.values.contains_key(&id_property.name)
106                || is_unassigned_id(command.values.get(&id_property.name));
107            if needs_id {
108                let id = self.repository.metadata.context.next_id(&command.entity)?;
109                command
110                    .values
111                    .insert(id_property.name.clone(), Value::U64(id));
112            }
113        }
114        ensure_initial_version(&mut command.values, entity);
115        mark_record_status(&mut command.values, CheckObjectStatus::Create);
116        let check_result = self
117            .repository
118            .metadata
119            .context
120            .check_and_fix_record(&command.entity, &mut command.values);
121        clear_record_status(&mut command.values);
122        check_result?;
123
124        Ok(command)
125    }
126
127    pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
128        UpdateCommand::new(self.entity.clone(), id)
129    }
130
131    pub fn prepare_update_command(
132        &self,
133        command: &UpdateCommand,
134    ) -> Result<UpdateCommand, RuntimeError> {
135        let mut command = command.clone();
136        if let Some(behavior) = self.behavior() {
137            behavior.before_update(self.repository.metadata.context, &mut command)?;
138        }
139        self.enforce_update_policy(&mut command)?;
140
141        Ok(command)
142    }
143
144    pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
145        DeleteCommand::new(self.entity.clone(), id)
146    }
147
148    pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
149        RecoverCommand::new(self.entity.clone(), id, expected_version)
150    }
151
152    pub async fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
153        let query = self
154            .prepare_select_query(query)
155            .map_err(RepositoryError::Runtime)?;
156        self.fetch_prepared_all(&query).await
157    }
158
159    async fn fetch_prepared_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
160        let mut rows = self.fetch_prepared_query(query).await?;
161        self.enhance_object_group_bys(&mut rows, &query.object_group_bys, &query.trace_chain).await?;
162        self.enhance_child_queries(&mut rows, &query.child_enhancements, &query.trace_chain).await?;
163        self.enhance_query_relations(&mut rows, query).await?;
164        Ok(rows)
165    }
166
167    async fn fetch_prepared_query(
168        &self,
169        query: &SelectQuery,
170    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
171
172        let final_comment = self.repository.resolve_final_comment(&query.trace_chain, query.comment.clone());
173        let mut query = query.clone();
174        query.comment = final_comment;
175        if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
176            if let Some(cache) = self
177                .repository
178                .metadata
179                .context
180                .get_resource::<Arc<dyn AggregationCacheBackend>>()
181            {
182                return self.fetch_prepared_query_with_cache(
183                    &query,
184                    options,
185                    cache.as_ref(),
186                ).await;
187            }
188            if let Some(cache) = self
189                .repository
190                .metadata
191                .context
192                .get_resource::<InMemoryAggregationCache>()
193            {
194                return self.fetch_prepared_query_with_cache(&query, options, cache).await;
195            }
196        }
197        let request = teaql_data_service::QueryRequest {
198            query: query.clone(),
199            trace_chain: query.trace_chain.clone(),
200            comment: query.comment.clone(),
201        };
202        let res = self
203            .repository
204            .executor
205            .query(request)
206            .await.map_err(RepositoryError::Executor)?;
207        self.repository.metadata.context.record_metadata_log(&res.metadata);
208        Ok(res.rows)
209    }
210
211    async fn fetch_prepared_query_with_cache(
212        &self,
213        query: &SelectQuery,
214        options: AggregationCacheOptions,
215        cache: &dyn AggregationCacheBackend,
216    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
217
218        let key = aggregation_cache_key(
219            cache.namespace(),
220            &aggregation_cache_namespace(&query.entity),
221            query,
222        );
223        if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
224            return Ok(rows);
225        }
226        let request = teaql_data_service::QueryRequest {
227            query: query.clone(),
228            trace_chain: query.trace_chain.clone(),
229            comment: query.comment.clone(),
230        };
231        let res = self
232            .repository
233            .executor
234            .query(request)
235            .await.map_err(RepositoryError::Executor)?;
236        self.repository.metadata.context.record_metadata_log(&res.metadata);
237        let rows = res.rows;
238        cache.put(key, rows.clone());
239        Ok(rows)
240    }
241
242    pub async fn fetch_all_with_relation_aggregates(
243        &self,
244        query: &SelectQuery,
245        relation_aggregates: &[RelationAggregate],
246    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
247        let query = self
248            .prepare_select_query(query)
249            .map_err(RepositoryError::Runtime)?;
250
251        let mut rows = self.fetch_prepared_all(&query).await?;
252        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain).await?;
253        Ok(rows)
254    }
255
256    pub async fn fetch_smart_list(
257        &self,
258        query: &SelectQuery,
259    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
260        let query = self
261            .prepare_select_query(query)
262            .map_err(RepositoryError::Runtime)?;
263
264        self.repository.fetch_smart_list(&query).await
265    }
266
267    pub async fn fetch_smart_list_with_relation_aggregates(
268        &self,
269        query: &SelectQuery,
270        relation_aggregates: &[RelationAggregate],
271    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
272        self.fetch_all_with_relation_aggregates(query, relation_aggregates).await
273            .map(SmartList::from)
274    }
275
276    pub async fn fetch_entities<T>(
277        &self,
278        query: &SelectQuery,
279    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
280    where
281        T: Entity,
282    {
283        let query = self
284            .prepare_select_query(query)
285            .map_err(RepositoryError::Runtime)?;
286
287        self.repository.fetch_entities(&query).await
288    }
289
290    pub async fn fetch_entities_with_relation_aggregates<T>(
291        &self,
292        query: &SelectQuery,
293        relation_aggregates: &[RelationAggregate],
294    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
295    where
296        T: Entity,
297    {
298        self.fetch_all_with_relation_aggregates(query, relation_aggregates).await?
299            .into_iter()
300            .map(|record| {
301                let mut entity = T::from_record(record)?;
302                let root = crate::EntityRoot::default();
303                entity.on_loaded(&root as &dyn std::any::Any);
304                Ok(entity)
305            })
306            .collect::<Result<Vec<_>, _>>()
307            .map(SmartList::from)
308            .map_err(RepositoryError::Entity)
309    }
310
311    pub async fn fetch_enhanced_entities_with_relation_aggregates<T>(
312        &self,
313        query: &SelectQuery,
314        relation_aggregates: &[RelationAggregate],
315    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
316    where
317        T: Entity,
318    {
319        let query = self
320            .prepare_select_query(query)
321            .map_err(RepositoryError::Runtime)?;
322
323        let mut rows = self.fetch_prepared_all(&query).await?;
324        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain).await?;
325        self.enhance_relations(&mut rows).await?;
326        rows.into_iter()
327            .map(|record| {
328                let mut entity = T::from_record(record)?;
329                let root = crate::EntityRoot::default();
330                entity.on_loaded(&root as &dyn std::any::Any);
331                Ok(entity)
332            })
333            .collect::<Result<Vec<_>, _>>()
334            .map(SmartList::from)
335            .map_err(RepositoryError::Entity)
336    }
337
338    pub async fn fetch_enhanced_entities<T>(
339        &self,
340        query: &SelectQuery,
341    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
342    where
343        T: Entity,
344    {
345        let query = self
346            .prepare_select_query(query)
347            .map_err(RepositoryError::Runtime)?;
348
349        let mut rows = self.fetch_prepared_all(&query).await?;
350        self.enhance_relations(&mut rows).await?;
351        let root = self.repository.metadata.context.get_resource::<crate::EntityRoot>().cloned();
352        rows.into_iter()
353            .map(|record| {
354                let mut entity = T::from_record(record)?;
355                if let Some(ref root) = root {
356                    entity.on_loaded(root as &dyn std::any::Any);
357                }
358                Ok(entity)
359            })
360            .collect::<Result<Vec<_>, _>>()
361            .map(SmartList::from)
362            .map_err(RepositoryError::Entity)
363    }
364
365    pub async fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
366        let command = self
367            .prepare_insert_command(command)
368            .map_err(RepositoryError::Runtime)?;
369        self.execute_prepared_insert_with_comment(command, self.trace_context.clone()).await
370    }
371
372    pub async fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
373        let command = self
374            .prepare_update_command(command)
375            .map_err(RepositoryError::Runtime)?;
376        self.execute_prepared_update_with_comment(command, self.trace_context.clone()).await
377    }
378
379    pub async fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
380        self.delete_scoped(command, self.trace_context.clone()).await
381    }
382
383    pub async fn delete_scoped(
384        &self,
385        command: &DeleteCommand,
386        trace_chain: Vec<teaql_core::TraceNode>,
387    ) -> Result<u64, RepositoryError<E::Error>> {
388        let mut command = command.clone();
389        command.trace_chain = trace_chain.clone();
390        if let Some(behavior) = self.behavior() {
391            behavior
392                .before_delete(self.repository.metadata.context, &mut command)
393                .map_err(RepositoryError::Runtime)?;
394        }
395        self.enforce_delete_policy(&mut command)
396            .map_err(RepositoryError::Runtime)?;
397
398        let old_values = self.fetch_current_event_row(&command.entity, &command.id, trace_chain.clone())?;
399        let affected = self.repository.delete(&command).await?;
400
401        let mut event = RawAuditEvent::deleted_with_old_values(
402            command.entity,
403            command.id,
404            command.expected_version,
405            old_values,
406        );
407        event.trace_chain = trace_chain;
408        self.emit_event(event)
409            .map_err(RepositoryError::Runtime)?;
410        Ok(affected)
411    }
412
413    pub async fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
414        let mut command = command.clone();
415        command.trace_chain = self.trace_context.clone();
416        if let Some(behavior) = self.behavior() {
417            behavior
418                .before_recover(self.repository.metadata.context, &mut command)
419                .map_err(RepositoryError::Runtime)?;
420        }
421        self.enforce_recover_policy(&mut command)
422            .map_err(RepositoryError::Runtime)?;
423        let old_values = self.fetch_current_event_row(&command.entity, &command.id, command.trace_chain.clone())?;
424        let affected = self.repository.recover(&command).await?;
425        let event = RawAuditEvent::recovered_with_old_values(
426            command.entity,
427            command.id,
428            command.expected_version,
429            old_values,
430        );
431        self.emit_event(event)
432            .map_err(RepositoryError::Runtime)?;
433        Ok(affected)
434    }
435
436    fn emit_event(&self, event: RawAuditEvent) -> Result<(), RuntimeError> {
437        self.repository.metadata.context.send_event(event)
438    }
439
440    #[allow(dead_code)]
441    pub(super) async fn execute_prepared_insert(
442        &self,
443        command: InsertCommand,
444    ) -> Result<u64, RepositoryError<E::Error>> {
445        self.execute_prepared_insert_with_comment(command, Vec::new()).await
446    }
447
448    pub(super) async fn execute_prepared_insert_with_comment(
449        &self,
450        mut command: InsertCommand,
451        trace_chain: Vec<teaql_core::TraceNode>,
452    ) -> Result<u64, RepositoryError<E::Error>> {
453        command.trace_chain = trace_chain.clone();
454        let affected = self.repository.insert(&command).await?;
455        let mut event = RawAuditEvent::created(command.entity, command.values);
456        event.trace_chain = trace_chain;
457        self.emit_event(event).map_err(RepositoryError::Runtime)?;
458        Ok(affected)
459    }
460
461    pub(super) async fn execute_prepared_batch_insert(
462        &self,
463        command: teaql_core::BatchInsertCommand,
464    ) -> Result<u64, RepositoryError<E::Error>> {
465        if command.batch_values.is_empty() {
466            return Ok(0);
467        }
468        let affected = self.repository.batch_insert(&command).await?;
469        
470        let entity = command.entity.clone();
471        for (i, values) in command.batch_values.into_iter().enumerate() {
472            let mut event = RawAuditEvent::created(entity.clone(), values);
473            if i < command.trace_chains.len() {
474                event.trace_chain = command.trace_chains[i].clone();
475            }
476            self.emit_event(event).map_err(RepositoryError::Runtime)?;
477        }
478        Ok(affected)
479    }
480
481    #[allow(dead_code)]
482    pub(super) async fn execute_prepared_update(
483        &self,
484        command: UpdateCommand,
485    ) -> Result<u64, RepositoryError<E::Error>> {
486        self.execute_prepared_update_with_comment(command, Vec::new()).await
487    }
488
489    pub(super) async fn execute_prepared_update_with_comment(
490        &self,
491        mut command: UpdateCommand,
492        trace_chain: Vec<teaql_core::TraceNode>,
493    ) -> Result<u64, RepositoryError<E::Error>> {
494        command.trace_chain = trace_chain.clone();
495        
496        let mut old_values = command.old_values.clone();
497        let needs_fetch = match &old_values {
498            Some(snapshot) => !command.values.keys().all(|k| snapshot.contains_key(k)),
499            None => true,
500        };
501        if needs_fetch {
502            old_values = self.fetch_current_event_row(&command.entity, &command.id, trace_chain.clone())?;
503        }
504
505        let affected = self.repository.update(&command).await?;
506        let updated_fields = command.values.keys().cloned().collect();
507        let mut values = command.values.clone();
508        values.insert("id".to_owned(), command.id.clone());
509        if let Some(version) = command.expected_version {
510            values.insert("version".to_owned(), Value::I64(version + 1));
511        }
512        let mut new_values = old_values.clone().unwrap_or_default();
513        for (field, value) in &values {
514            new_values.insert(field.clone(), value.clone());
515        }
516        let mut event = RawAuditEvent::updated_with_old_values(
517            command.entity,
518            values,
519            old_values,
520            new_values,
521            updated_fields,
522        );
523        event.trace_chain = trace_chain;
524        self.emit_event(event).map_err(RepositoryError::Runtime)?;
525        Ok(affected)
526    }
527
528    pub(super) async fn execute_prepared_batch_update(
529        &self,
530        command: teaql_core::BatchUpdateCommand,
531    ) -> Result<u64, RepositoryError<E::Error>> {
532        if command.batch_values.is_empty() {
533            return Ok(0);
534        }
535        let affected = self.repository.batch_update(&command).await?;
536        
537        let entity = command.entity.clone();
538        for (i, values) in command.batch_values.into_iter().enumerate() {
539            let mut full_values = values.clone();
540            full_values.insert("id".to_owned(), command.batch_ids[i].clone());
541            if let Some(Some(version)) = command.batch_expected_versions.get(i) {
542                full_values.insert("version".to_owned(), teaql_core::Value::I64(*version + 1));
543            }
544            
545            let old_values = command.batch_old_values.get(i).cloned().unwrap_or(None);
546            let mut new_values = old_values.clone().unwrap_or_default();
547            for (field, value) in &full_values {
548                new_values.insert(field.clone(), value.clone());
549            }
550            
551            let mut event = RawAuditEvent::updated_with_old_values(
552                entity.clone(),
553                full_values,
554                old_values,
555                new_values,
556                command.update_fields.clone(),
557            );
558            if i < command.trace_chains.len() {
559                event.trace_chain = command.trace_chains[i].clone();
560            }
561            self.emit_event(event).map_err(RepositoryError::Runtime)?;
562        }
563        Ok(affected)
564    }
565
566    fn fetch_current_event_row(
567        &self,
568        _entity: &str,
569        _id: &Value,
570        _trace_chain: Vec<teaql_core::TraceNode>,
571    ) -> Result<Option<Record>, RepositoryError<E::Error>> {
572        // PER THE USER: "我们不需要在审计的时候去抓旧的值"
573        // Avoid DB queries during event emission. We rely on in-memory `original_values`.
574        Ok(None)
575    }
576
577
578    pub fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, E> {
579        ResolvedRepository {
580            entity,
581            repository: ContextRepository {
582                metadata: UserContextMetadata {
583                    context: self.repository.metadata.context,
584                },
585                executor: self.repository.executor,
586            },
587            trace_context: Vec::new(),
588        }
589    }
590}