Skip to main content

teaql_runtime/repository/
resolved.rs

1use std::sync::Arc;
2
3use teaql_core::{
4    AggregationCacheOptions, DeleteCommand, Entity, InsertCommand, Record, RecoverCommand,
5    RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
6};
7
8use crate::{
9    CheckObjectStatus, RawAuditEvent, RepositoryBehavior, RepositoryError, RuntimeError,
10    clear_record_status, mark_record_status,
11};
12
13use super::{
14    AggregationCacheBackend, ContextRepository, InMemoryAggregationCache,
15    ResolvedRepository, UserContextMetadata, helpers::*,
16};
17
18impl<'a, E> ResolvedRepository<'a, E>
19where
20    E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
21{
22    pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
23        self.repository.metadata.context.repository_behavior(entity)
24    }
25
26    pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
27        self.repository
28            .metadata
29            .context
30            .repository_behavior(&self.entity)
31    }
32
33    pub fn entity(&self) -> &str {
34        &self.entity
35    }
36
37    pub fn select(&self) -> SelectQuery {
38        SelectQuery::new(self.entity.clone())
39    }
40
41    pub fn insert_command(&self) -> InsertCommand {
42        InsertCommand::new(self.entity.clone())
43    }
44
45    fn enforce_insert_policy(&self, command: &mut InsertCommand) -> Result<(), RuntimeError> {
46        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
47            policy.enforce_insert(self.repository.metadata.context, command)?;
48        }
49        Ok(())
50    }
51
52    fn enforce_update_policy(&self, command: &mut UpdateCommand) -> Result<(), RuntimeError> {
53        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
54            policy.enforce_update(self.repository.metadata.context, command)?;
55        }
56        Ok(())
57    }
58
59    fn enforce_delete_policy(&self, command: &mut DeleteCommand) -> Result<(), RuntimeError> {
60        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
61            policy.enforce_delete(self.repository.metadata.context, command)?;
62        }
63        Ok(())
64    }
65
66    fn enforce_recover_policy(&self, command: &mut RecoverCommand) -> Result<(), RuntimeError> {
67        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
68            policy.enforce_recover(self.repository.metadata.context, command)?;
69        }
70        Ok(())
71    }
72
73    fn prepare_select_query(&self, query: &SelectQuery) -> Result<SelectQuery, RuntimeError> {
74        let mut query = query.clone();
75        
76        let mut full_trace = self.trace_context.clone();
77        full_trace.extend(query.trace_chain);
78        query.trace_chain = full_trace;
79
80        if let Some(behavior) = self.query_behavior(&query.entity) {
81            behavior.before_select(self.repository.metadata.context, &mut query)?;
82        }
83        if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
84            policy.enforce_select(self.repository.metadata.context, &mut query)?;
85        }
86        Ok(query)
87    }
88
89    pub fn prepare_insert_command(
90        &self,
91        command: &InsertCommand,
92    ) -> Result<InsertCommand, RuntimeError> {
93        let mut command = command.clone();
94        if let Some(behavior) = self.behavior() {
95            behavior.before_insert(self.repository.metadata.context, &mut command)?;
96        }
97        self.enforce_insert_policy(&mut command)?;
98
99        let entity = self
100            .repository
101            .metadata
102            .context
103            .require_entity(&command.entity)?;
104        if let Some(id_property) = entity.id_property() {
105            let needs_id = !command.values.contains_key(&id_property.name)
106                || is_unassigned_id(command.values.get(&id_property.name));
107            if needs_id {
108                let id = self.repository.metadata.context.next_id(&command.entity)?;
109                command
110                    .values
111                    .insert(id_property.name.clone(), Value::U64(id));
112            }
113        }
114        ensure_initial_version(&mut command.values, entity);
115        mark_record_status(&mut command.values, CheckObjectStatus::Create);
116        let check_result = self
117            .repository
118            .metadata
119            .context
120            .check_and_fix_record(&command.entity, &mut command.values);
121        clear_record_status(&mut command.values);
122        check_result?;
123
124        Ok(command)
125    }
126
127    pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
128        UpdateCommand::new(self.entity.clone(), id)
129    }
130
131    pub fn prepare_update_command(
132        &self,
133        command: &UpdateCommand,
134    ) -> Result<UpdateCommand, RuntimeError> {
135        let mut command = command.clone();
136        if let Some(behavior) = self.behavior() {
137            behavior.before_update(self.repository.metadata.context, &mut command)?;
138        }
139        self.enforce_update_policy(&mut command)?;
140
141        Ok(command)
142    }
143
144    pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
145        DeleteCommand::new(self.entity.clone(), id)
146    }
147
148    pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
149        RecoverCommand::new(self.entity.clone(), id, expected_version)
150    }
151
152    pub async fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
153        let query = self
154            .prepare_select_query(query)
155            .map_err(RepositoryError::Runtime)?;
156        self.fetch_prepared_all(&query).await
157    }
158
159    /// Fetch records in streaming mode (chunked).
160    /// Returns a Vec of chunks, each chunk containing up to `chunk_size` rows.
161    /// Each chunk is enhanced (relations, children) before returning.
162    /// Requires E to implement StreamQueryExecutor.
163    pub async fn fetch_stream(
164        &self,
165        query: &SelectQuery,
166    ) -> Result<Vec<teaql_data_service::StreamChunk>, RepositoryError<E::Error>>
167    where
168        E: teaql_data_service::StreamQueryExecutor,
169    {
170        let query = self
171            .prepare_select_query(query)
172            .map_err(RepositoryError::Runtime)?;
173
174        let chunk_size = query.stream_config
175            .as_ref()
176            .map(|c| c.chunk_size)
177            .unwrap_or(1000);
178
179        let final_comment = self.repository.resolve_final_comment(&query.trace_chain, query.comment.clone());
180        let mut query = query.clone();
181        query.comment = final_comment;
182
183        let request = teaql_data_service::QueryRequest {
184            query: query.clone(),
185            trace_chain: query.trace_chain.clone(),
186            comment: query.comment.clone(),
187        };
188
189        let chunks = self
190            .repository
191            .executor
192            .query_stream(request, chunk_size)
193            .await
194            .map_err(RepositoryError::Executor)?;
195
196        // Enhance each chunk
197        let mut enhanced_chunks = Vec::with_capacity(chunks.len());
198        for mut chunk in chunks {
199            self.enhance_object_group_bys(&mut chunk.rows, &query.object_group_bys, &query.trace_chain).await?;
200            self.enhance_child_queries(&mut chunk.rows, &query.child_enhancements, &query.trace_chain).await?;
201            self.enhance_query_relations(&mut chunk.rows, &query).await?;
202            enhanced_chunks.push(chunk);
203        }
204
205        Ok(enhanced_chunks)
206    }
207
208    async fn fetch_prepared_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
209        let mut rows = self.fetch_prepared_query(query).await?;
210        self.enhance_object_group_bys(&mut rows, &query.object_group_bys, &query.trace_chain).await?;
211        self.enhance_child_queries(&mut rows, &query.child_enhancements, &query.trace_chain).await?;
212        self.enhance_query_relations(&mut rows, query).await?;
213        Ok(rows)
214    }
215
216    async fn fetch_prepared_query(
217        &self,
218        query: &SelectQuery,
219    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
220
221        let final_comment = self.repository.resolve_final_comment(&query.trace_chain, query.comment.clone());
222        let mut query = query.clone();
223        query.comment = final_comment;
224        if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
225            if let Some(cache) = self
226                .repository
227                .metadata
228                .context
229                .get_resource::<Arc<dyn AggregationCacheBackend>>()
230            {
231                return self.fetch_prepared_query_with_cache(
232                    &query,
233                    options,
234                    cache.as_ref(),
235                ).await;
236            }
237            if let Some(cache) = self
238                .repository
239                .metadata
240                .context
241                .get_resource::<InMemoryAggregationCache>()
242            {
243                return self.fetch_prepared_query_with_cache(&query, options, cache).await;
244            }
245        }
246        let request = teaql_data_service::QueryRequest {
247            query: query.clone(),
248            trace_chain: query.trace_chain.clone(),
249            comment: query.comment.clone(),
250        };
251        let res = self
252            .repository
253            .executor
254            .query(request)
255            .await.map_err(RepositoryError::Executor)?;
256        self.repository.metadata.context.record_metadata_log(&res.metadata);
257        Ok(res.rows)
258    }
259
260    async fn fetch_prepared_query_with_cache(
261        &self,
262        query: &SelectQuery,
263        options: AggregationCacheOptions,
264        cache: &dyn AggregationCacheBackend,
265    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
266
267        let key = aggregation_cache_key(
268            cache.namespace(),
269            &aggregation_cache_namespace(&query.entity),
270            query,
271        );
272        if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
273            return Ok(rows);
274        }
275        let request = teaql_data_service::QueryRequest {
276            query: query.clone(),
277            trace_chain: query.trace_chain.clone(),
278            comment: query.comment.clone(),
279        };
280        let res = self
281            .repository
282            .executor
283            .query(request)
284            .await.map_err(RepositoryError::Executor)?;
285        self.repository.metadata.context.record_metadata_log(&res.metadata);
286        let rows = res.rows;
287        cache.put(key, rows.clone());
288        Ok(rows)
289    }
290
291    pub async fn fetch_all_with_relation_aggregates(
292        &self,
293        query: &SelectQuery,
294        relation_aggregates: &[RelationAggregate],
295    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
296        let query = self
297            .prepare_select_query(query)
298            .map_err(RepositoryError::Runtime)?;
299
300        let mut rows = self.fetch_prepared_all(&query).await?;
301        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain).await?;
302        Ok(rows)
303    }
304
305    pub async fn fetch_smart_list(
306        &self,
307        query: &SelectQuery,
308    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
309        let query = self
310            .prepare_select_query(query)
311            .map_err(RepositoryError::Runtime)?;
312
313        self.repository.fetch_smart_list(&query).await
314    }
315
316    pub async fn fetch_smart_list_with_relation_aggregates(
317        &self,
318        query: &SelectQuery,
319        relation_aggregates: &[RelationAggregate],
320    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
321        self.fetch_all_with_relation_aggregates(query, relation_aggregates).await
322            .map(SmartList::from)
323    }
324
325    pub async fn fetch_entities<T>(
326        &self,
327        query: &SelectQuery,
328    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
329    where
330        T: Entity,
331    {
332        let query = self
333            .prepare_select_query(query)
334            .map_err(RepositoryError::Runtime)?;
335
336        self.repository.fetch_entities(&query).await
337    }
338
339    pub async fn fetch_entities_with_relation_aggregates<T>(
340        &self,
341        query: &SelectQuery,
342        relation_aggregates: &[RelationAggregate],
343    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
344    where
345        T: Entity,
346    {
347        self.fetch_all_with_relation_aggregates(query, relation_aggregates).await?
348            .into_iter()
349            .map(|record| {
350                let mut entity = T::from_record(record)?;
351                let root = crate::EntityRoot::default();
352                entity.on_loaded(&root as &dyn std::any::Any);
353                Ok(entity)
354            })
355            .collect::<Result<Vec<_>, _>>()
356            .map(SmartList::from)
357            .map_err(RepositoryError::Entity)
358    }
359
360    pub async fn fetch_enhanced_entities_with_relation_aggregates<T>(
361        &self,
362        query: &SelectQuery,
363        relation_aggregates: &[RelationAggregate],
364    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
365    where
366        T: Entity,
367    {
368        let query = self
369            .prepare_select_query(query)
370            .map_err(RepositoryError::Runtime)?;
371
372        let mut rows = self.fetch_prepared_all(&query).await?;
373        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain).await?;
374        self.enhance_relations(&mut rows).await?;
375        rows.into_iter()
376            .map(|record| {
377                let mut entity = T::from_record(record)?;
378                let root = crate::EntityRoot::default();
379                entity.on_loaded(&root as &dyn std::any::Any);
380                Ok(entity)
381            })
382            .collect::<Result<Vec<_>, _>>()
383            .map(SmartList::from)
384            .map_err(RepositoryError::Entity)
385    }
386
387    pub async fn fetch_enhanced_entities<T>(
388        &self,
389        query: &SelectQuery,
390    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
391    where
392        T: Entity,
393    {
394        let query = self
395            .prepare_select_query(query)
396            .map_err(RepositoryError::Runtime)?;
397
398        let mut rows = self.fetch_prepared_all(&query).await?;
399        self.enhance_relations(&mut rows).await?;
400        let root = self.repository.metadata.context.get_resource::<crate::EntityRoot>().cloned();
401        rows.into_iter()
402            .map(|record| {
403                let mut entity = T::from_record(record)?;
404                if let Some(ref root) = root {
405                    entity.on_loaded(root as &dyn std::any::Any);
406                }
407                Ok(entity)
408            })
409            .collect::<Result<Vec<_>, _>>()
410            .map(SmartList::from)
411            .map_err(RepositoryError::Entity)
412    }
413
414    pub async fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
415        let command = self
416            .prepare_insert_command(command)
417            .map_err(RepositoryError::Runtime)?;
418        self.execute_prepared_insert_with_comment(command, self.trace_context.clone()).await
419    }
420
421    pub async fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
422        let command = self
423            .prepare_update_command(command)
424            .map_err(RepositoryError::Runtime)?;
425        self.execute_prepared_update_with_comment(command, self.trace_context.clone()).await
426    }
427
428    pub async fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
429        self.delete_scoped(command, self.trace_context.clone()).await
430    }
431
432    pub async fn delete_scoped(
433        &self,
434        command: &DeleteCommand,
435        trace_chain: Vec<teaql_core::TraceNode>,
436    ) -> Result<u64, RepositoryError<E::Error>> {
437        let mut command = command.clone();
438        command.trace_chain = trace_chain.clone();
439        if let Some(behavior) = self.behavior() {
440            behavior
441                .before_delete(self.repository.metadata.context, &mut command)
442                .map_err(RepositoryError::Runtime)?;
443        }
444        self.enforce_delete_policy(&mut command)
445            .map_err(RepositoryError::Runtime)?;
446
447        let old_values = self.fetch_current_event_row(&command.entity, &command.id, trace_chain.clone())?;
448        let affected = self.repository.delete(&command).await?;
449
450        let mut event = RawAuditEvent::deleted_with_old_values(
451            command.entity,
452            command.id,
453            command.expected_version,
454            old_values,
455        );
456        event.trace_chain = trace_chain;
457        self.emit_event(event)
458            .map_err(RepositoryError::Runtime)?;
459        Ok(affected)
460    }
461
462    pub async fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
463        let mut command = command.clone();
464        command.trace_chain = self.trace_context.clone();
465        if let Some(behavior) = self.behavior() {
466            behavior
467                .before_recover(self.repository.metadata.context, &mut command)
468                .map_err(RepositoryError::Runtime)?;
469        }
470        self.enforce_recover_policy(&mut command)
471            .map_err(RepositoryError::Runtime)?;
472        let old_values = self.fetch_current_event_row(&command.entity, &command.id, command.trace_chain.clone())?;
473        let affected = self.repository.recover(&command).await?;
474        let event = RawAuditEvent::recovered_with_old_values(
475            command.entity,
476            command.id,
477            command.expected_version,
478            old_values,
479        );
480        self.emit_event(event)
481            .map_err(RepositoryError::Runtime)?;
482        Ok(affected)
483    }
484
485    fn emit_event(&self, event: RawAuditEvent) -> Result<(), RuntimeError> {
486        self.repository.metadata.context.send_event(event)
487    }
488
489    #[allow(dead_code)]
490    pub(super) async fn execute_prepared_insert(
491        &self,
492        command: InsertCommand,
493    ) -> Result<u64, RepositoryError<E::Error>> {
494        self.execute_prepared_insert_with_comment(command, Vec::new()).await
495    }
496
497    pub(super) async fn execute_prepared_insert_with_comment(
498        &self,
499        mut command: InsertCommand,
500        trace_chain: Vec<teaql_core::TraceNode>,
501    ) -> Result<u64, RepositoryError<E::Error>> {
502        command.trace_chain = trace_chain.clone();
503        let affected = self.repository.insert(&command).await?;
504        let mut event = RawAuditEvent::created(command.entity, command.values);
505        event.trace_chain = trace_chain;
506        self.emit_event(event).map_err(RepositoryError::Runtime)?;
507        Ok(affected)
508    }
509
510    pub(super) async fn execute_prepared_batch_insert(
511        &self,
512        command: teaql_core::BatchInsertCommand,
513    ) -> Result<u64, RepositoryError<E::Error>> {
514        if command.batch_values.is_empty() {
515            return Ok(0);
516        }
517        let affected = self.repository.batch_insert(&command).await?;
518        
519        let entity = command.entity.clone();
520        for (i, values) in command.batch_values.into_iter().enumerate() {
521            let mut event = RawAuditEvent::created(entity.clone(), values);
522            if i < command.trace_chains.len() {
523                event.trace_chain = command.trace_chains[i].clone();
524            }
525            self.emit_event(event).map_err(RepositoryError::Runtime)?;
526        }
527        Ok(affected)
528    }
529
530    #[allow(dead_code)]
531    pub(super) async fn execute_prepared_update(
532        &self,
533        command: UpdateCommand,
534    ) -> Result<u64, RepositoryError<E::Error>> {
535        self.execute_prepared_update_with_comment(command, Vec::new()).await
536    }
537
538    pub(super) async fn execute_prepared_update_with_comment(
539        &self,
540        mut command: UpdateCommand,
541        trace_chain: Vec<teaql_core::TraceNode>,
542    ) -> Result<u64, RepositoryError<E::Error>> {
543        command.trace_chain = trace_chain.clone();
544        
545        let mut old_values = command.old_values.clone();
546        let needs_fetch = match &old_values {
547            Some(snapshot) => !command.values.keys().all(|k| snapshot.contains_key(k)),
548            None => true,
549        };
550        if needs_fetch {
551            old_values = self.fetch_current_event_row(&command.entity, &command.id, trace_chain.clone())?;
552        }
553
554        let affected = self.repository.update(&command).await?;
555        let updated_fields = command.values.keys().cloned().collect();
556        let mut values = command.values.clone();
557        values.insert("id".to_owned(), command.id.clone());
558        if let Some(version) = command.expected_version {
559            values.insert("version".to_owned(), Value::I64(version + 1));
560        }
561        let mut new_values = old_values.clone().unwrap_or_default();
562        for (field, value) in &values {
563            new_values.insert(field.clone(), value.clone());
564        }
565        let mut event = RawAuditEvent::updated_with_old_values(
566            command.entity,
567            values,
568            old_values,
569            new_values,
570            updated_fields,
571        );
572        event.trace_chain = trace_chain;
573        self.emit_event(event).map_err(RepositoryError::Runtime)?;
574        Ok(affected)
575    }
576
577    pub(super) async fn execute_prepared_batch_update(
578        &self,
579        command: teaql_core::BatchUpdateCommand,
580    ) -> Result<u64, RepositoryError<E::Error>> {
581        if command.batch_values.is_empty() {
582            return Ok(0);
583        }
584        let affected = self.repository.batch_update(&command).await?;
585        
586        let entity = command.entity.clone();
587        for (i, values) in command.batch_values.into_iter().enumerate() {
588            let mut full_values = values.clone();
589            full_values.insert("id".to_owned(), command.batch_ids[i].clone());
590            if let Some(Some(version)) = command.batch_expected_versions.get(i) {
591                full_values.insert("version".to_owned(), teaql_core::Value::I64(*version + 1));
592            }
593            
594            let old_values = command.batch_old_values.get(i).cloned().unwrap_or(None);
595            let mut new_values = old_values.clone().unwrap_or_default();
596            for (field, value) in &full_values {
597                new_values.insert(field.clone(), value.clone());
598            }
599            
600            let mut event = RawAuditEvent::updated_with_old_values(
601                entity.clone(),
602                full_values,
603                old_values,
604                new_values,
605                command.update_fields.clone(),
606            );
607            if i < command.trace_chains.len() {
608                event.trace_chain = command.trace_chains[i].clone();
609            }
610            self.emit_event(event).map_err(RepositoryError::Runtime)?;
611        }
612        Ok(affected)
613    }
614
615    fn fetch_current_event_row(
616        &self,
617        _entity: &str,
618        _id: &Value,
619        _trace_chain: Vec<teaql_core::TraceNode>,
620    ) -> Result<Option<Record>, RepositoryError<E::Error>> {
621        // PER THE USER: "我们不需要在审计的时候去抓旧的值"
622        // Avoid DB queries during event emission. We rely on in-memory `original_values`.
623        Ok(None)
624    }
625
626
627    pub fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, E> {
628        ResolvedRepository {
629            entity,
630            repository: ContextRepository {
631                metadata: UserContextMetadata {
632                    context: self.repository.metadata.context,
633                },
634                executor: self.repository.executor,
635            },
636            trace_context: Vec::new(),
637        }
638    }
639}