Skip to main content

teaql_runtime/repository/
resolved.rs

1use std::sync::Arc;
2
3use teaql_core::{
4    AggregationCacheOptions, DeleteCommand, Entity, InsertCommand, Record, RecoverCommand,
5    RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
6};
7
8use crate::{
9    CheckObjectStatus, RawAuditEvent, RepositoryBehavior, RepositoryError, RuntimeError,
10    clear_record_status, mark_record_status,
11};
12
13use super::{
14    AggregationCacheBackend, ContextRepository, InMemoryAggregationCache,
15    ResolvedRepository, UserContextMetadata, helpers::*,
16};
17
18impl<'a, E> ResolvedRepository<'a, E>
19where
20    E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
21{
22    pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
23        self.repository.metadata.context.repository_behavior(entity)
24    }
25
26    pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
27        self.repository
28            .metadata
29            .context
30            .repository_behavior(&self.entity)
31    }
32
33    pub fn entity(&self) -> &str {
34        &self.entity
35    }
36
37    pub fn select(&self) -> SelectQuery {
38        SelectQuery::new(self.entity.clone())
39    }
40
41    pub fn insert_command(&self) -> InsertCommand {
42        InsertCommand::new(self.entity.clone())
43    }
44
45    fn enforce_insert_policy(&self, command: &mut InsertCommand) -> Result<(), RuntimeError> {
46        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
47            policy.enforce_insert(self.repository.metadata.context, command)?;
48        }
49        Ok(())
50    }
51
52    fn enforce_update_policy(&self, command: &mut UpdateCommand) -> Result<(), RuntimeError> {
53        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
54            policy.enforce_update(self.repository.metadata.context, command)?;
55        }
56        Ok(())
57    }
58
59    fn enforce_delete_policy(&self, command: &mut DeleteCommand) -> Result<(), RuntimeError> {
60        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
61            policy.enforce_delete(self.repository.metadata.context, command)?;
62        }
63        Ok(())
64    }
65
66    fn enforce_recover_policy(&self, command: &mut RecoverCommand) -> Result<(), RuntimeError> {
67        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
68            policy.enforce_recover(self.repository.metadata.context, command)?;
69        }
70        Ok(())
71    }
72
73    fn prepare_select_query(&self, query: &SelectQuery) -> Result<SelectQuery, RuntimeError> {
74        let mut query = query.clone();
75        
76        let mut full_trace = self.trace_context.clone();
77        full_trace.extend(query.trace_chain);
78        query.trace_chain = full_trace;
79
80        if let Some(behavior) = self.query_behavior(&query.entity) {
81            behavior.before_select(self.repository.metadata.context, &mut query)?;
82        }
83        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
84            policy.enforce_select(self.repository.metadata.context, &mut query)?;
85        }
86        // Ensure local_key fields for relation loads are projected so that
87        // enhance_query_relations can match parent rows to child records.
88        if !query.relations.is_empty() {
89            if let Some(descriptor) = self.repository.metadata.context.entity(&query.entity) {
90                for load in &query.relations {
91                    if let Some(relation) = descriptor.relation_by_name(&load.name) {
92                        if !query.projection.contains(&relation.local_key) {
93                            query.projection.push(relation.local_key.clone());
94                        }
95                    }
96                }
97            }
98        }
99        Ok(query)
100    }
101
102    pub fn prepare_insert_command(
103        &self,
104        command: &InsertCommand,
105    ) -> Result<InsertCommand, RuntimeError> {
106        let mut command = command.clone();
107        if let Some(behavior) = self.behavior() {
108            behavior.before_insert(self.repository.metadata.context, &mut command)?;
109        }
110        self.enforce_insert_policy(&mut command)?;
111
112        let entity = self
113            .repository
114            .metadata
115            .context
116            .require_entity(&command.entity)?;
117        if let Some(id_property) = entity.id_property() {
118            let needs_id = !command.values.contains_key(&id_property.name)
119                || is_unassigned_id(command.values.get(&id_property.name));
120            if needs_id {
121                let id = self.repository.metadata.context.next_id(&command.entity)?;
122                command
123                    .values
124                    .insert(id_property.name.clone(), Value::U64(id));
125            }
126        }
127        ensure_initial_version(&mut command.values, entity);
128        mark_record_status(&mut command.values, CheckObjectStatus::Create);
129        let check_result = self
130            .repository
131            .metadata
132            .context
133            .check_and_fix_record(&command.entity, &mut command.values);
134        clear_record_status(&mut command.values);
135        check_result?;
136
137        Ok(command)
138    }
139
140    pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
141        UpdateCommand::new(self.entity.clone(), id)
142    }
143
144    pub fn prepare_update_command(
145        &self,
146        command: &UpdateCommand,
147    ) -> Result<UpdateCommand, RuntimeError> {
148        let mut command = command.clone();
149        if let Some(behavior) = self.behavior() {
150            behavior.before_update(self.repository.metadata.context, &mut command)?;
151        }
152        self.enforce_update_policy(&mut command)?;
153
154        Ok(command)
155    }
156
157    pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
158        DeleteCommand::new(self.entity.clone(), id)
159    }
160
161    pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
162        RecoverCommand::new(self.entity.clone(), id, expected_version)
163    }
164
165    pub async fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
166        let query = self
167            .prepare_select_query(query)
168            .map_err(RepositoryError::Runtime)?;
169        self.fetch_prepared_all(&query).await
170    }
171
172    /// Fetch records in streaming mode (chunked).
173    /// Returns a Vec of chunks, each chunk containing up to `chunk_size` rows.
174    /// Each chunk is enhanced (relations, children) before returning.
175    /// Requires E to implement StreamQueryExecutor.
176    pub async fn fetch_stream(
177        &self,
178        query: &SelectQuery,
179    ) -> Result<Vec<teaql_data_service::StreamChunk>, RepositoryError<E::Error>>
180    where
181        E: teaql_data_service::StreamQueryExecutor,
182    {
183        let query = self
184            .prepare_select_query(query)
185            .map_err(RepositoryError::Runtime)?;
186
187        let chunk_size = query.stream_config
188            .as_ref()
189            .map(|c| c.chunk_size)
190            .unwrap_or(1000);
191
192        let final_comment = self.repository.resolve_final_comment(&query.trace_chain, query.comment.clone());
193        let mut query = query.clone();
194        query.comment = final_comment;
195
196        let request = teaql_data_service::QueryRequest {
197            query: query.clone(),
198            trace_chain: query.trace_chain.clone(),
199            comment: query.comment.clone(),
200        };
201
202        let chunks = self
203            .repository
204            .executor
205            .query_stream(request, chunk_size)
206            .await
207            .map_err(RepositoryError::Executor)?;
208
209        // Enhance each chunk
210        let mut enhanced_chunks = Vec::with_capacity(chunks.len());
211        for mut chunk in chunks {
212            self.enhance_object_group_bys(&mut chunk.rows, &query.object_group_bys, &query.trace_chain).await?;
213            self.enhance_child_queries(&mut chunk.rows, &query.child_enhancements, &query.trace_chain).await?;
214            self.enhance_query_relations(&mut chunk.rows, &query).await?;
215            enhanced_chunks.push(chunk);
216        }
217
218        Ok(enhanced_chunks)
219    }
220
221    async fn fetch_prepared_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
222        let mut rows = self.fetch_prepared_query(query).await?;
223        self.enhance_object_group_bys(&mut rows, &query.object_group_bys, &query.trace_chain).await?;
224        self.enhance_child_queries(&mut rows, &query.child_enhancements, &query.trace_chain).await?;
225        self.enhance_query_relations(&mut rows, query).await?;
226        Ok(rows)
227    }
228
229    async fn fetch_prepared_query(
230        &self,
231        query: &SelectQuery,
232    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
233
234        let final_comment = self.repository.resolve_final_comment(&query.trace_chain, query.comment.clone());
235        let mut query = query.clone();
236        query.comment = final_comment;
237        if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
238            if let Some(cache) = self
239                .repository
240                .metadata
241                .context
242                .get_resource::<Arc<dyn AggregationCacheBackend>>()
243            {
244                return self.fetch_prepared_query_with_cache(
245                    &query,
246                    options,
247                    cache.as_ref(),
248                ).await;
249            }
250            if let Some(cache) = self
251                .repository
252                .metadata
253                .context
254                .get_resource::<InMemoryAggregationCache>()
255            {
256                return self.fetch_prepared_query_with_cache(&query, options, cache).await;
257            }
258        }
259        let request = teaql_data_service::QueryRequest {
260            query: query.clone(),
261            trace_chain: query.trace_chain.clone(),
262            comment: query.comment.clone(),
263        };
264        let res = self
265            .repository
266            .executor
267            .query(request)
268            .await.map_err(RepositoryError::Executor)?;
269        self.repository.metadata.context.record_metadata_log(&res.metadata);
270        Ok(res.rows)
271    }
272
273    async fn fetch_prepared_query_with_cache(
274        &self,
275        query: &SelectQuery,
276        options: AggregationCacheOptions,
277        cache: &dyn AggregationCacheBackend,
278    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
279
280        let key = aggregation_cache_key(
281            cache.namespace(),
282            &aggregation_cache_namespace(&query.entity),
283            query,
284        );
285        if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
286            return Ok(rows);
287        }
288        let request = teaql_data_service::QueryRequest {
289            query: query.clone(),
290            trace_chain: query.trace_chain.clone(),
291            comment: query.comment.clone(),
292        };
293        let res = self
294            .repository
295            .executor
296            .query(request)
297            .await.map_err(RepositoryError::Executor)?;
298        self.repository.metadata.context.record_metadata_log(&res.metadata);
299        let rows = res.rows;
300        cache.put(key, rows.clone());
301        Ok(rows)
302    }
303
304    pub async fn fetch_all_with_relation_aggregates(
305        &self,
306        query: &SelectQuery,
307        relation_aggregates: &[RelationAggregate],
308    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
309        let query = self
310            .prepare_select_query(query)
311            .map_err(RepositoryError::Runtime)?;
312
313        let mut rows = self.fetch_prepared_all(&query).await?;
314        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain).await?;
315        Ok(rows)
316    }
317
318    pub async fn fetch_smart_list(
319        &self,
320        query: &SelectQuery,
321    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
322        let query = self
323            .prepare_select_query(query)
324            .map_err(RepositoryError::Runtime)?;
325
326        self.repository.fetch_smart_list(&query).await
327    }
328
329    pub async fn fetch_smart_list_with_relation_aggregates(
330        &self,
331        query: &SelectQuery,
332        relation_aggregates: &[RelationAggregate],
333    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
334        self.fetch_all_with_relation_aggregates(query, relation_aggregates).await
335            .map(SmartList::from)
336    }
337
338    pub async fn fetch_entities<T>(
339        &self,
340        query: &SelectQuery,
341    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
342    where
343        T: Entity,
344    {
345        let query = self
346            .prepare_select_query(query)
347            .map_err(RepositoryError::Runtime)?;
348
349        self.repository.fetch_entities(&query).await
350    }
351
352    pub async fn fetch_entities_with_relation_aggregates<T>(
353        &self,
354        query: &SelectQuery,
355        relation_aggregates: &[RelationAggregate],
356    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
357    where
358        T: Entity,
359    {
360        self.fetch_all_with_relation_aggregates(query, relation_aggregates).await?
361            .into_iter()
362            .map(|record| {
363                let mut entity = T::from_record(record)?;
364                let root = crate::EntityRoot::default();
365                entity.on_loaded(&root as &dyn std::any::Any);
366                Ok(entity)
367            })
368            .collect::<Result<Vec<_>, _>>()
369            .map(SmartList::from)
370            .map_err(RepositoryError::Entity)
371    }
372
373    pub async fn fetch_enhanced_entities_with_relation_aggregates<T>(
374        &self,
375        query: &SelectQuery,
376        relation_aggregates: &[RelationAggregate],
377    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
378    where
379        T: Entity,
380    {
381        let query = self
382            .prepare_select_query(query)
383            .map_err(RepositoryError::Runtime)?;
384
385        let mut rows = self.fetch_prepared_all(&query).await?;
386        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain).await?;
387        self.enhance_relations(&mut rows).await?;
388        self.enhance_query_relations(&mut rows, &query).await?;
389        rows.into_iter()
390            .map(|record| {
391                let mut entity = T::from_record(record)?;
392                let root = crate::EntityRoot::default();
393                entity.on_loaded(&root as &dyn std::any::Any);
394                Ok(entity)
395            })
396            .collect::<Result<Vec<_>, _>>()
397            .map(SmartList::from)
398            .map_err(RepositoryError::Entity)
399    }
400
401    pub async fn fetch_enhanced_entities<T>(
402        &self,
403        query: &SelectQuery,
404    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
405    where
406        T: Entity,
407    {
408        let query = self
409            .prepare_select_query(query)
410            .map_err(RepositoryError::Runtime)?;
411
412        let mut rows = self.fetch_prepared_all(&query).await?;
413        self.enhance_relations(&mut rows).await?;
414        self.enhance_query_relations(&mut rows, &query).await?;
415        let root = self.repository.metadata.context.get_resource::<crate::EntityRoot>().cloned();
416        rows.into_iter()
417            .map(|record| {
418                let mut entity = T::from_record(record)?;
419                if let Some(ref root) = root {
420                    entity.on_loaded(root as &dyn std::any::Any);
421                }
422                Ok(entity)
423            })
424            .collect::<Result<Vec<_>, _>>()
425            .map(SmartList::from)
426            .map_err(RepositoryError::Entity)
427    }
428
429    pub async fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
430        let command = self
431            .prepare_insert_command(command)
432            .map_err(RepositoryError::Runtime)?;
433        self.execute_prepared_insert_with_comment(command, self.trace_context.clone()).await
434    }
435
436    pub async fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
437        let command = self
438            .prepare_update_command(command)
439            .map_err(RepositoryError::Runtime)?;
440        self.execute_prepared_update_with_comment(command, self.trace_context.clone()).await
441    }
442
443    pub async fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
444        self.delete_scoped(command, self.trace_context.clone()).await
445    }
446
447    pub async fn delete_scoped(
448        &self,
449        command: &DeleteCommand,
450        trace_chain: Vec<teaql_core::TraceNode>,
451    ) -> Result<u64, RepositoryError<E::Error>> {
452        let mut command = command.clone();
453        command.trace_chain = trace_chain.clone();
454        if let Some(behavior) = self.behavior() {
455            behavior
456                .before_delete(self.repository.metadata.context, &mut command)
457                .map_err(RepositoryError::Runtime)?;
458        }
459        self.enforce_delete_policy(&mut command)
460            .map_err(RepositoryError::Runtime)?;
461
462        let old_values = self.fetch_current_event_row(&command.entity, &command.id, trace_chain.clone())?;
463        let affected = self.repository.delete(&command).await?;
464
465        let mut event = RawAuditEvent::deleted_with_old_values(
466            command.entity,
467            command.id,
468            command.expected_version,
469            old_values,
470        );
471        event.trace_chain = trace_chain;
472        self.emit_event(event)
473            .map_err(RepositoryError::Runtime)?;
474        Ok(affected)
475    }
476
477    pub async fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
478        let mut command = command.clone();
479        command.trace_chain = self.trace_context.clone();
480        if let Some(behavior) = self.behavior() {
481            behavior
482                .before_recover(self.repository.metadata.context, &mut command)
483                .map_err(RepositoryError::Runtime)?;
484        }
485        self.enforce_recover_policy(&mut command)
486            .map_err(RepositoryError::Runtime)?;
487        let old_values = self.fetch_current_event_row(&command.entity, &command.id, command.trace_chain.clone())?;
488        let affected = self.repository.recover(&command).await?;
489        let event = RawAuditEvent::recovered_with_old_values(
490            command.entity,
491            command.id,
492            command.expected_version,
493            old_values,
494        );
495        self.emit_event(event)
496            .map_err(RepositoryError::Runtime)?;
497        Ok(affected)
498    }
499
500    fn emit_event(&self, event: RawAuditEvent) -> Result<(), RuntimeError> {
501        self.repository.metadata.context.send_event(event)
502    }
503
504    #[allow(dead_code)]
505    pub(super) async fn execute_prepared_insert(
506        &self,
507        command: InsertCommand,
508    ) -> Result<u64, RepositoryError<E::Error>> {
509        self.execute_prepared_insert_with_comment(command, Vec::new()).await
510    }
511
512    pub(super) async fn execute_prepared_insert_with_comment(
513        &self,
514        mut command: InsertCommand,
515        trace_chain: Vec<teaql_core::TraceNode>,
516    ) -> Result<u64, RepositoryError<E::Error>> {
517        command.trace_chain = trace_chain.clone();
518        let affected = self.repository.insert(&command).await?;
519        let mut event = RawAuditEvent::created(command.entity, command.values);
520        event.trace_chain = trace_chain;
521        self.emit_event(event).map_err(RepositoryError::Runtime)?;
522        Ok(affected)
523    }
524
525    pub(super) async fn execute_prepared_batch_insert(
526        &self,
527        command: teaql_core::BatchInsertCommand,
528    ) -> Result<u64, RepositoryError<E::Error>> {
529        if command.batch_values.is_empty() {
530            return Ok(0);
531        }
532        let affected = self.repository.batch_insert(&command).await?;
533        
534        let entity = command.entity.clone();
535        for (i, values) in command.batch_values.into_iter().enumerate() {
536            let mut event = RawAuditEvent::created(entity.clone(), values);
537            if i < command.trace_chains.len() {
538                event.trace_chain = command.trace_chains[i].clone();
539            }
540            self.emit_event(event).map_err(RepositoryError::Runtime)?;
541        }
542        Ok(affected)
543    }
544
545    #[allow(dead_code)]
546    pub(super) async fn execute_prepared_update(
547        &self,
548        command: UpdateCommand,
549    ) -> Result<u64, RepositoryError<E::Error>> {
550        self.execute_prepared_update_with_comment(command, Vec::new()).await
551    }
552
553    pub(super) async fn execute_prepared_update_with_comment(
554        &self,
555        mut command: UpdateCommand,
556        trace_chain: Vec<teaql_core::TraceNode>,
557    ) -> Result<u64, RepositoryError<E::Error>> {
558        command.trace_chain = trace_chain.clone();
559        
560        let mut old_values = command.old_values.clone();
561        let needs_fetch = match &old_values {
562            Some(snapshot) => !command.values.keys().all(|k| snapshot.contains_key(k)),
563            None => true,
564        };
565        if needs_fetch {
566            old_values = self.fetch_current_event_row(&command.entity, &command.id, trace_chain.clone())?;
567        }
568
569        let affected = self.repository.update(&command).await?;
570        let updated_fields = command.values.keys().cloned().collect();
571        let mut values = command.values.clone();
572        values.insert("id".to_owned(), command.id.clone());
573        if let Some(version) = command.expected_version {
574            values.insert("version".to_owned(), Value::I64(version + 1));
575        }
576        let mut new_values = old_values.clone().unwrap_or_default();
577        for (field, value) in &values {
578            new_values.insert(field.clone(), value.clone());
579        }
580        let mut event = RawAuditEvent::updated_with_old_values(
581            command.entity,
582            values,
583            old_values,
584            new_values,
585            updated_fields,
586        );
587        event.trace_chain = trace_chain;
588        self.emit_event(event).map_err(RepositoryError::Runtime)?;
589        Ok(affected)
590    }
591
592    pub(super) async fn execute_prepared_batch_update(
593        &self,
594        command: teaql_core::BatchUpdateCommand,
595    ) -> Result<u64, RepositoryError<E::Error>> {
596        if command.batch_values.is_empty() {
597            return Ok(0);
598        }
599        let affected = self.repository.batch_update(&command).await?;
600        
601        let entity = command.entity.clone();
602        for (i, values) in command.batch_values.into_iter().enumerate() {
603            let mut full_values = values.clone();
604            full_values.insert("id".to_owned(), command.batch_ids[i].clone());
605            if let Some(Some(version)) = command.batch_expected_versions.get(i) {
606                full_values.insert("version".to_owned(), teaql_core::Value::I64(*version + 1));
607            }
608            
609            let old_values = command.batch_old_values.get(i).cloned().unwrap_or(None);
610            let mut new_values = old_values.clone().unwrap_or_default();
611            for (field, value) in &full_values {
612                new_values.insert(field.clone(), value.clone());
613            }
614            
615            let mut event = RawAuditEvent::updated_with_old_values(
616                entity.clone(),
617                full_values,
618                old_values,
619                new_values,
620                command.update_fields.clone(),
621            );
622            if i < command.trace_chains.len() {
623                event.trace_chain = command.trace_chains[i].clone();
624            }
625            self.emit_event(event).map_err(RepositoryError::Runtime)?;
626        }
627        Ok(affected)
628    }
629
630    fn fetch_current_event_row(
631        &self,
632        _entity: &str,
633        _id: &Value,
634        _trace_chain: Vec<teaql_core::TraceNode>,
635    ) -> Result<Option<Record>, RepositoryError<E::Error>> {
636        // PER THE USER: "我们不需要在审计的时候去抓旧的值"
637        // Avoid DB queries during event emission. We rely on in-memory `original_values`.
638        Ok(None)
639    }
640
641
642    pub fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, E> {
643        ResolvedRepository {
644            entity,
645            repository: ContextRepository {
646                metadata: UserContextMetadata {
647                    context: self.repository.metadata.context,
648                },
649                executor: self.repository.executor,
650            },
651            trace_context: Vec::new(),
652        }
653    }
654}