Skip to main content

teaql_runtime/
repository.rs

1use std::collections::{BTreeMap, HashMap};
2use std::slice;
3use std::sync::{Arc, Mutex};
4use std::time::{Duration, Instant};
5
6use teaql_core::{
7    Aggregate, AggregationCacheOptions, DeleteCommand, Entity, EntityDescriptor, Expr,
8    InsertCommand, ObjectGroupBy, PropertyDescriptor, Record, RecoverCommand, RelationAggregate,
9    RelationLoad, SelectQuery, SmartList, UpdateCommand, Value,
10};
11use teaql_sql::{CompiledQuery, SqlDialect};
12
13use crate::{
14    CheckObjectStatus, ContextError, EntityEvent, GraphMutationKind, GraphMutationPlan, GraphNode,
15    GraphOperation, MetadataStore, RepositoryBehavior, RepositoryError, RuntimeError,
16    SqlLogOperation, UserContext, clear_record_status, mark_record_status, sorted_update_fields,
17};
18
19#[derive(Debug, Default)]
20pub struct InMemoryAggregationCache {
21    namespace: String,
22    entries: Mutex<HashMap<String, AggregationCacheEntry>>,
23}
24
25pub trait AggregationCacheBackend: Send + Sync {
26    fn namespace(&self) -> &str;
27    fn get(&self, key: &str, max_age_millis: u64) -> Option<Vec<Record>>;
28    fn put(&self, key: String, rows: Vec<Record>);
29    fn invalidate_namespace(&self, namespace: &str);
30}
31
32#[derive(Debug, Clone)]
33struct AggregationCacheEntry {
34    stored_at: Instant,
35    rows: Vec<Record>,
36}
37
38impl InMemoryAggregationCache {
39    pub fn with_namespace(namespace: impl Into<String>) -> Self {
40        Self {
41            namespace: namespace.into(),
42            entries: Mutex::new(HashMap::new()),
43        }
44    }
45
46    pub fn namespace(&self) -> &str {
47        &self.namespace
48    }
49}
50
51impl AggregationCacheBackend for InMemoryAggregationCache {
52    fn namespace(&self) -> &str {
53        &self.namespace
54    }
55
56    fn get(&self, key: &str, max_age_millis: u64) -> Option<Vec<Record>> {
57        let entries = self.entries.lock().ok()?;
58        let entry = entries.get(key)?;
59        if max_age_millis == 0 || entry.stored_at.elapsed() <= Duration::from_millis(max_age_millis)
60        {
61            Some(entry.rows.clone())
62        } else {
63            None
64        }
65    }
66
67    fn put(&self, key: String, rows: Vec<Record>) {
68        if let Ok(mut entries) = self.entries.lock() {
69            entries.insert(
70                key,
71                AggregationCacheEntry {
72                    stored_at: Instant::now(),
73                    rows,
74                },
75            );
76        }
77    }
78
79    fn invalidate_namespace(&self, namespace: &str) {
80        if let Ok(mut entries) = self.entries.lock() {
81            let prefix = format!("{namespace}::");
82            entries.retain(|key, _| !key.starts_with(&prefix));
83        }
84    }
85}
86
87impl InMemoryAggregationCache {
88    pub fn get(&self, key: &str, max_age_millis: u64) -> Option<Vec<Record>> {
89        AggregationCacheBackend::get(self, key, max_age_millis)
90    }
91
92    pub fn put(&self, key: String, rows: Vec<Record>) {
93        AggregationCacheBackend::put(self, key, rows);
94    }
95
96    pub fn clear(&self) {
97        if let Ok(mut entries) = self.entries.lock() {
98            entries.clear();
99        }
100    }
101
102    pub fn invalidate_namespace(&self, namespace: &str) {
103        AggregationCacheBackend::invalidate_namespace(self, namespace);
104    }
105}
106
107pub trait QueryExecutor {
108    type Error: std::error::Error + Send + Sync + 'static;
109
110    fn fetch_all(&self, query: &CompiledQuery) -> Result<Vec<Record>, Self::Error>;
111    fn execute(&self, query: &CompiledQuery) -> Result<u64, Self::Error>;
112
113    fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
114        Ok(GraphTransactionBoundary::Unsupported)
115    }
116
117    fn commit_transaction(&self) -> Result<(), Self::Error> {
118        Ok(())
119    }
120
121    fn rollback_transaction(&self) -> Result<(), Self::Error> {
122        Ok(())
123    }
124}
125
126#[derive(Debug, Clone, Copy, PartialEq, Eq)]
127pub enum GraphTransactionBoundary {
128    Started,
129    AlreadyActive,
130    Unsupported,
131}
132
133pub struct Repository<'a, D, M, E> {
134    dialect: &'a D,
135    metadata: &'a M,
136    executor: &'a E,
137}
138
139pub struct ContextRepository<'a, D, E> {
140    metadata: UserContextMetadata<'a>,
141    pub(crate) dialect: &'a D,
142    pub(crate) executor: &'a E,
143}
144
145pub struct ResolvedRepository<'a, D, E> {
146    entity: String,
147    repository: ContextRepository<'a, D, E>,
148}
149
150#[derive(Debug, Clone, PartialEq)]
151pub struct RelationLoadPlan {
152    pub parent_entity: String,
153    pub relation_name: String,
154    pub path: String,
155    pub target_entity: String,
156    pub local_key: String,
157    pub foreign_key: String,
158    pub many: bool,
159    pub query: Option<SelectQuery>,
160    pub children: Vec<RelationLoadPlan>,
161}
162
163pub(crate) struct UserContextMetadata<'a> {
164    pub(crate) context: &'a UserContext,
165}
166
167impl MetadataStore for UserContextMetadata<'_> {
168    fn entity(&self, name: &str) -> Option<&EntityDescriptor> {
169        self.context.entity(name)
170    }
171
172    fn all_entities(&self) -> Vec<&EntityDescriptor> {
173        self.context
174            .metadata
175            .as_ref()
176            .map(|metadata| metadata.all_entities())
177            .unwrap_or_default()
178    }
179}
180
181impl<'a, D, M, E> Repository<'a, D, M, E>
182where
183    D: SqlDialect,
184    M: MetadataStore,
185    E: QueryExecutor,
186{
187    pub fn new(dialect: &'a D, metadata: &'a M, executor: &'a E) -> Self {
188        Self {
189            dialect,
190            metadata,
191            executor,
192        }
193    }
194
195    pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
196        let entity = self
197            .metadata
198            .entity(&query.entity)
199            .ok_or_else(|| RuntimeError::MissingEntity(query.entity.clone()))?;
200        Ok(self.dialect.compile_select(entity, query)?)
201    }
202
203    pub fn compile_insert(&self, command: &InsertCommand) -> Result<CompiledQuery, RuntimeError> {
204        let entity = self
205            .metadata
206            .entity(&command.entity)
207            .ok_or_else(|| RuntimeError::MissingEntity(command.entity.clone()))?;
208        Ok(self.dialect.compile_insert(entity, command)?)
209    }
210
211    pub fn compile_update(&self, command: &UpdateCommand) -> Result<CompiledQuery, RuntimeError> {
212        let entity = self
213            .metadata
214            .entity(&command.entity)
215            .ok_or_else(|| RuntimeError::MissingEntity(command.entity.clone()))?;
216        Ok(self.dialect.compile_update(entity, command)?)
217    }
218
219    pub fn compile_delete(&self, command: &DeleteCommand) -> Result<CompiledQuery, RuntimeError> {
220        let entity = self
221            .metadata
222            .entity(&command.entity)
223            .ok_or_else(|| RuntimeError::MissingEntity(command.entity.clone()))?;
224        Ok(self.dialect.compile_delete(entity, command)?)
225    }
226
227    pub fn compile_recover(&self, command: &RecoverCommand) -> Result<CompiledQuery, RuntimeError> {
228        let entity = self
229            .metadata
230            .entity(&command.entity)
231            .ok_or_else(|| RuntimeError::MissingEntity(command.entity.clone()))?;
232        Ok(self.dialect.compile_recover(entity, command)?)
233    }
234
235    pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
236        let compiled = self.compile(query).map_err(RepositoryError::Runtime)?;
237        self.executor
238            .fetch_all(&compiled)
239            .map_err(RepositoryError::Executor)
240    }
241
242    pub fn fetch_smart_list(
243        &self,
244        query: &SelectQuery,
245    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
246        self.fetch_all(query).map(SmartList::from)
247    }
248
249    pub fn fetch_entities<T>(
250        &self,
251        query: &SelectQuery,
252    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
253    where
254        T: Entity,
255    {
256        self.fetch_all(query)?
257            .into_iter()
258            .map(T::from_record)
259            .collect::<Result<Vec<_>, _>>()
260            .map(SmartList::from)
261            .map_err(RepositoryError::Entity)
262    }
263
264    pub fn fetch_enhanced_entities<T>(
265        &self,
266        query: &SelectQuery,
267    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
268    where
269        T: Entity,
270    {
271        self.fetch_entities(query)
272    }
273
274    pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
275        let compiled = self
276            .compile_insert(command)
277            .map_err(RepositoryError::Runtime)?;
278        self.executor
279            .execute(&compiled)
280            .map_err(RepositoryError::Executor)
281    }
282
283    pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
284        let compiled = self
285            .compile_update(command)
286            .map_err(RepositoryError::Runtime)?;
287        let affected = self
288            .executor
289            .execute(&compiled)
290            .map_err(RepositoryError::Executor)?;
291
292        if command.expected_version.is_some() && affected == 0 {
293            return Err(RepositoryError::Runtime(
294                RuntimeError::OptimisticLockConflict {
295                    entity: command.entity.clone(),
296                    id: format!("{:?}", command.id),
297                },
298            ));
299        }
300
301        Ok(affected)
302    }
303
304    pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
305        let compiled = self
306            .compile_delete(command)
307            .map_err(RepositoryError::Runtime)?;
308        let affected = self
309            .executor
310            .execute(&compiled)
311            .map_err(RepositoryError::Executor)?;
312
313        if command.expected_version.is_some() && affected == 0 {
314            return Err(RepositoryError::Runtime(
315                RuntimeError::OptimisticLockConflict {
316                    entity: command.entity.clone(),
317                    id: format!("{:?}", command.id),
318                },
319            ));
320        }
321
322        Ok(affected)
323    }
324
325    pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
326        let compiled = self
327            .compile_recover(command)
328            .map_err(RepositoryError::Runtime)?;
329        let affected = self
330            .executor
331            .execute(&compiled)
332            .map_err(RepositoryError::Executor)?;
333
334        if affected == 0 {
335            return Err(RepositoryError::Runtime(
336                RuntimeError::OptimisticLockConflict {
337                    entity: command.entity.clone(),
338                    id: format!("{:?}", command.id),
339                },
340            ));
341        }
342
343        Ok(affected)
344    }
345
346    pub fn insert_many(
347        &self,
348        commands: &[InsertCommand],
349    ) -> Result<u64, RepositoryError<E::Error>> {
350        let mut total = 0;
351        for command in commands {
352            total += self.insert(command)?;
353        }
354        Ok(total)
355    }
356
357    pub fn update_many(
358        &self,
359        commands: &[UpdateCommand],
360    ) -> Result<u64, RepositoryError<E::Error>> {
361        let mut total = 0;
362        for command in commands {
363            total += self.update(command)?;
364        }
365        Ok(total)
366    }
367
368    pub fn delete_many(
369        &self,
370        commands: &[DeleteCommand],
371    ) -> Result<u64, RepositoryError<E::Error>> {
372        let mut total = 0;
373        for command in commands {
374            total += self.delete(command)?;
375        }
376        Ok(total)
377    }
378
379    pub fn recover_many(
380        &self,
381        commands: &[RecoverCommand],
382    ) -> Result<u64, RepositoryError<E::Error>> {
383        let mut total = 0;
384        for command in commands {
385            total += self.recover(command)?;
386        }
387        Ok(total)
388    }
389}
390
391impl UserContext {
392    pub fn repository<D, E>(&self) -> Result<ContextRepository<'_, D, E>, ContextError>
393    where
394        D: SqlDialect + Send + Sync + 'static,
395        E: QueryExecutor + Send + Sync + 'static,
396    {
397        if self.metadata.is_none() {
398            return Err(ContextError::MissingResource("metadata".to_owned()));
399        }
400
401        let dialect = self.require_resource::<D>()?;
402        let executor = self.require_resource::<E>()?;
403        Ok(ContextRepository {
404            metadata: UserContextMetadata { context: self },
405            dialect,
406            executor,
407        })
408    }
409
410    pub fn resolve_repository<D, E>(
411        &self,
412        entity: impl Into<String>,
413    ) -> Result<ResolvedRepository<'_, D, E>, ContextError>
414    where
415        D: SqlDialect + Send + Sync + 'static,
416        E: QueryExecutor + Send + Sync + 'static,
417    {
418        let entity = entity.into();
419        if !self.has_repository(&entity) {
420            return Err(ContextError::MissingRepository(entity));
421        }
422        Ok(ResolvedRepository {
423            entity,
424            repository: self.repository::<D, E>()?,
425        })
426    }
427
428    pub fn plan_for_save_graph<D, E>(
429        &self,
430        node: GraphNode,
431    ) -> Result<GraphMutationPlan, RepositoryError<E::Error>>
432    where
433        D: SqlDialect + Send + Sync + 'static,
434        E: QueryExecutor + Send + Sync + 'static,
435    {
436        let repository = self
437            .resolve_repository::<D, E>(node.entity.clone())
438            .map_err(|err| RepositoryError::Runtime(RuntimeError::Graph(err.to_string())))?;
439        repository.plan_graph(node)
440    }
441}
442
443impl<'a, D, E> ContextRepository<'a, D, E>
444where
445    D: SqlDialect,
446    E: QueryExecutor,
447{
448    fn repository(&self) -> Repository<'_, D, UserContextMetadata<'_>, E> {
449        Repository::new(self.dialect, &self.metadata, self.executor)
450    }
451
452    pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
453        self.repository().compile(query)
454    }
455
456    pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
457        let compiled = self.compile(query).map_err(RepositoryError::Runtime)?;
458        self.log_sql(SqlLogOperation::Select, &compiled);
459        self.executor
460            .fetch_all(&compiled)
461            .map_err(RepositoryError::Executor)
462    }
463
464    pub fn fetch_smart_list(
465        &self,
466        query: &SelectQuery,
467    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
468        self.repository().fetch_smart_list(query)
469    }
470
471    pub fn fetch_entities<T>(
472        &self,
473        query: &SelectQuery,
474    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
475    where
476        T: Entity,
477    {
478        self.repository().fetch_entities(query)
479    }
480
481    pub fn fetch_enhanced_entities<T>(
482        &self,
483        query: &SelectQuery,
484    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
485    where
486        T: Entity,
487    {
488        self.repository().fetch_enhanced_entities(query)
489    }
490
491    pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
492        let compiled = self
493            .repository()
494            .compile_insert(command)
495            .map_err(RepositoryError::Runtime)?;
496        self.log_sql(SqlLogOperation::Insert, &compiled);
497        let affected = self
498            .executor
499            .execute(&compiled)
500            .map_err(RepositoryError::Executor)?;
501        self.invalidate_aggregation_cache_for(&command.entity);
502        Ok(affected)
503    }
504
505    pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
506        let affected = self.execute_mutation(
507            SqlLogOperation::Update,
508            &command.entity,
509            self.repository()
510                .compile_update(command)
511                .map_err(RepositoryError::Runtime)?,
512        )?;
513        if command.expected_version.is_some() && affected == 0 {
514            return Err(RepositoryError::Runtime(
515                RuntimeError::OptimisticLockConflict {
516                    entity: command.entity.clone(),
517                    id: format!("{:?}", command.id),
518                },
519            ));
520        }
521        Ok(affected)
522    }
523
524    pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
525        let affected = self.execute_mutation(
526            SqlLogOperation::Delete,
527            &command.entity,
528            self.repository()
529                .compile_delete(command)
530                .map_err(RepositoryError::Runtime)?,
531        )?;
532        if command.expected_version.is_some() && affected == 0 {
533            return Err(RepositoryError::Runtime(
534                RuntimeError::OptimisticLockConflict {
535                    entity: command.entity.clone(),
536                    id: format!("{:?}", command.id),
537                },
538            ));
539        }
540        Ok(affected)
541    }
542
543    pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
544        let affected = self.execute_mutation(
545            SqlLogOperation::Recover,
546            &command.entity,
547            self.repository()
548                .compile_recover(command)
549                .map_err(RepositoryError::Runtime)?,
550        )?;
551        if affected == 0 {
552            return Err(RepositoryError::Runtime(
553                RuntimeError::OptimisticLockConflict {
554                    entity: command.entity.clone(),
555                    id: format!("{:?}", command.id),
556                },
557            ));
558        }
559        Ok(affected)
560    }
561
562    fn execute_mutation(
563        &self,
564        operation: SqlLogOperation,
565        entity: &str,
566        compiled: CompiledQuery,
567    ) -> Result<u64, RepositoryError<E::Error>> {
568        self.log_sql(operation, &compiled);
569        let affected = self
570            .executor
571            .execute(&compiled)
572            .map_err(RepositoryError::Executor)?;
573        self.invalidate_aggregation_cache_for(entity);
574        Ok(affected)
575    }
576
577    fn log_sql(&self, operation: SqlLogOperation, compiled: &CompiledQuery) {
578        self.metadata
579            .context
580            .record_sql_log(operation, compiled, self.dialect.kind());
581    }
582
583    fn invalidate_aggregation_cache_for(&self, entity: &str) {
584        if let Some(cache) = self
585            .metadata
586            .context
587            .get_resource::<Arc<dyn AggregationCacheBackend>>()
588        {
589            invalidate_aggregation_cache_namespace(cache.as_ref(), entity);
590        }
591        if let Some(cache) = self
592            .metadata
593            .context
594            .get_resource::<InMemoryAggregationCache>()
595        {
596            invalidate_aggregation_cache_namespace(cache, entity);
597        }
598    }
599}
600
601impl<'a, D, E> ResolvedRepository<'a, D, E>
602where
603    D: SqlDialect,
604    E: QueryExecutor,
605{
606    fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
607        self.repository.metadata.context.repository_behavior(entity)
608    }
609
610    fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
611        self.repository
612            .metadata
613            .context
614            .repository_behavior(&self.entity)
615    }
616
617    pub fn entity(&self) -> &str {
618        &self.entity
619    }
620
621    pub fn select(&self) -> SelectQuery {
622        SelectQuery::new(self.entity.clone())
623    }
624
625    pub fn insert_command(&self) -> InsertCommand {
626        InsertCommand::new(self.entity.clone())
627    }
628
629    pub fn prepare_insert_command(
630        &self,
631        command: &InsertCommand,
632    ) -> Result<InsertCommand, RuntimeError> {
633        let mut command = command.clone();
634        if let Some(behavior) = self.behavior() {
635            behavior.before_insert(self.repository.metadata.context, &mut command)?;
636        }
637
638        let entity = self
639            .repository
640            .metadata
641            .context
642            .require_entity(&command.entity)?;
643        if let Some(id_property) = entity.id_property() {
644            let needs_id = !command.values.contains_key(&id_property.name)
645                || matches!(command.values.get(&id_property.name), Some(Value::Null));
646            if needs_id {
647                let id = self.repository.metadata.context.next_id(&command.entity)?;
648                command
649                    .values
650                    .insert(id_property.name.clone(), Value::U64(id));
651            }
652        }
653        mark_record_status(&mut command.values, CheckObjectStatus::Create);
654        let check_result = self
655            .repository
656            .metadata
657            .context
658            .check_and_fix_record(&command.entity, &mut command.values);
659        clear_record_status(&mut command.values);
660        check_result?;
661
662        Ok(command)
663    }
664
665    pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
666        UpdateCommand::new(self.entity.clone(), id)
667    }
668
669    pub fn prepare_update_command(
670        &self,
671        command: &UpdateCommand,
672    ) -> Result<UpdateCommand, RuntimeError> {
673        let mut command = command.clone();
674        if let Some(behavior) = self.behavior() {
675            behavior.before_update(self.repository.metadata.context, &mut command)?;
676        }
677        mark_record_status(&mut command.values, CheckObjectStatus::Update);
678        let check_result = self
679            .repository
680            .metadata
681            .context
682            .check_and_fix_record(&command.entity, &mut command.values);
683        clear_record_status(&mut command.values);
684        check_result?;
685        Ok(command)
686    }
687
688    pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
689        DeleteCommand::new(self.entity.clone(), id)
690    }
691
692    pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
693        RecoverCommand::new(self.entity.clone(), id, expected_version)
694    }
695
696    pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
697        let mut query = query.clone();
698        if let Some(behavior) = self.query_behavior(&query.entity) {
699            behavior.before_select(self.repository.metadata.context, &mut query)?;
700        }
701        self.repository.compile(&query)
702    }
703
704    pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
705        let mut query = query.clone();
706        if let Some(behavior) = self.query_behavior(&query.entity) {
707            behavior
708                .before_select(self.repository.metadata.context, &mut query)
709                .map_err(RepositoryError::Runtime)?;
710        }
711        let mut rows = self.fetch_prepared_query(&query)?;
712        self.enhance_object_group_bys(&mut rows, &query.object_group_bys)?;
713        self.enhance_child_queries(&mut rows, &query.child_enhancements)?;
714        Ok(rows)
715    }
716
717    fn fetch_prepared_query(
718        &self,
719        query: &SelectQuery,
720    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
721        let compiled = self
722            .repository
723            .compile(query)
724            .map_err(RepositoryError::Runtime)?;
725        if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
726            if let Some(cache) = self
727                .repository
728                .metadata
729                .context
730                .get_resource::<Arc<dyn AggregationCacheBackend>>()
731            {
732                return self.fetch_prepared_query_with_cache(
733                    query,
734                    &compiled,
735                    options,
736                    cache.as_ref(),
737                );
738            }
739            if let Some(cache) = self
740                .repository
741                .metadata
742                .context
743                .get_resource::<InMemoryAggregationCache>()
744            {
745                return self.fetch_prepared_query_with_cache(query, &compiled, options, cache);
746            }
747        }
748        self.repository.log_sql(SqlLogOperation::Select, &compiled);
749        self.repository
750            .executor
751            .fetch_all(&compiled)
752            .map_err(RepositoryError::Executor)
753    }
754
755    fn fetch_prepared_query_with_cache(
756        &self,
757        query: &SelectQuery,
758        compiled: &CompiledQuery,
759        options: AggregationCacheOptions,
760        cache: &dyn AggregationCacheBackend,
761    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
762        let key = aggregation_cache_key(
763            cache.namespace(),
764            &aggregation_cache_namespace(&query.entity),
765            compiled,
766        );
767        if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
768            return Ok(rows);
769        }
770        self.repository.log_sql(SqlLogOperation::Select, compiled);
771        let rows = self
772            .repository
773            .executor
774            .fetch_all(compiled)
775            .map_err(RepositoryError::Executor)?;
776        cache.put(key, rows.clone());
777        Ok(rows)
778    }
779
780    pub fn fetch_all_with_relation_aggregates(
781        &self,
782        query: &SelectQuery,
783        relation_aggregates: &[RelationAggregate],
784    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
785        let mut rows = self.fetch_all(query)?;
786        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
787        Ok(rows)
788    }
789
790    pub fn fetch_smart_list(
791        &self,
792        query: &SelectQuery,
793    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
794        let mut query = query.clone();
795        if let Some(behavior) = self.query_behavior(&query.entity) {
796            behavior
797                .before_select(self.repository.metadata.context, &mut query)
798                .map_err(RepositoryError::Runtime)?;
799        }
800        self.repository.fetch_smart_list(&query)
801    }
802
803    pub fn fetch_smart_list_with_relation_aggregates(
804        &self,
805        query: &SelectQuery,
806        relation_aggregates: &[RelationAggregate],
807    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
808        self.fetch_all_with_relation_aggregates(query, relation_aggregates)
809            .map(SmartList::from)
810    }
811
812    pub fn fetch_entities<T>(
813        &self,
814        query: &SelectQuery,
815    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
816    where
817        T: Entity,
818    {
819        let mut query = query.clone();
820        if let Some(behavior) = self.query_behavior(&query.entity) {
821            behavior
822                .before_select(self.repository.metadata.context, &mut query)
823                .map_err(RepositoryError::Runtime)?;
824        }
825        self.repository.fetch_entities(&query)
826    }
827
828    pub fn fetch_entities_with_relation_aggregates<T>(
829        &self,
830        query: &SelectQuery,
831        relation_aggregates: &[RelationAggregate],
832    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
833    where
834        T: Entity,
835    {
836        self.fetch_all_with_relation_aggregates(query, relation_aggregates)?
837            .into_iter()
838            .map(T::from_record)
839            .collect::<Result<Vec<_>, _>>()
840            .map(SmartList::from)
841            .map_err(RepositoryError::Entity)
842    }
843
844    pub fn fetch_enhanced_entities_with_relation_aggregates<T>(
845        &self,
846        query: &SelectQuery,
847        relation_aggregates: &[RelationAggregate],
848    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
849    where
850        T: Entity,
851    {
852        let mut query = query.clone();
853        if let Some(behavior) = self.query_behavior(&query.entity) {
854            behavior
855                .before_select(self.repository.metadata.context, &mut query)
856                .map_err(RepositoryError::Runtime)?;
857        }
858
859        let mut rows = self.repository.fetch_all(&query)?;
860        self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
861        self.enhance_query_relations(&mut rows, &query)?;
862        self.enhance_relations(&mut rows)?;
863        rows.into_iter()
864            .map(T::from_record)
865            .collect::<Result<Vec<_>, _>>()
866            .map(SmartList::from)
867            .map_err(RepositoryError::Entity)
868    }
869
870    pub fn fetch_enhanced_entities<T>(
871        &self,
872        query: &SelectQuery,
873    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
874    where
875        T: Entity,
876    {
877        let mut query = query.clone();
878        if let Some(behavior) = self.query_behavior(&query.entity) {
879            behavior
880                .before_select(self.repository.metadata.context, &mut query)
881                .map_err(RepositoryError::Runtime)?;
882        }
883
884        let mut rows = self.repository.fetch_all(&query)?;
885        self.enhance_query_relations(&mut rows, &query)?;
886        self.enhance_relations(&mut rows)?;
887        rows.into_iter()
888            .map(T::from_record)
889            .collect::<Result<Vec<_>, _>>()
890            .map(SmartList::from)
891            .map_err(RepositoryError::Entity)
892    }
893
894    pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
895        let command = self
896            .prepare_insert_command(command)
897            .map_err(RepositoryError::Runtime)?;
898        self.execute_prepared_insert(command)
899    }
900
901    pub fn save_graph(&self, node: GraphNode) -> Result<GraphNode, RepositoryError<E::Error>> {
902        if node.entity != self.entity {
903            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
904                "resolved repository {} cannot save graph root {}",
905                self.entity, node.entity
906            ))));
907        }
908        let boundary = self
909            .repository
910            .executor
911            .begin_transaction()
912            .map_err(RepositoryError::Executor)?;
913        if matches!(boundary, GraphTransactionBoundary::Unsupported) {
914            return Err(RepositoryError::Runtime(RuntimeError::Graph(
915                "save_graph requires a transactional executor".to_owned(),
916            )));
917        }
918        let result = self
919            .plan_graph(node)
920            .and_then(|plan| self.execute_graph_plan(plan));
921        match result {
922            Ok(saved) => {
923                if matches!(boundary, GraphTransactionBoundary::Started) {
924                    self.repository
925                        .executor
926                        .commit_transaction()
927                        .map_err(RepositoryError::Executor)?;
928                }
929                Ok(saved)
930            }
931            Err(err) => {
932                if !matches!(boundary, GraphTransactionBoundary::Unsupported) {
933                    self.repository
934                        .executor
935                        .rollback_transaction()
936                        .map_err(RepositoryError::Executor)?;
937                }
938                Err(err)
939            }
940        }
941    }
942
943    pub fn save_entity_graph<T>(&self, entity: T) -> Result<GraphNode, RepositoryError<E::Error>>
944    where
945        T: Entity,
946    {
947        let node = self
948            .graph_node_from_entity(entity)
949            .map_err(RepositoryError::Runtime)?;
950        self.save_graph(node)
951    }
952
953    pub fn plan_graph(
954        &self,
955        node: GraphNode,
956    ) -> Result<GraphMutationPlan, RepositoryError<E::Error>> {
957        if node.entity != self.entity {
958            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
959                "resolved repository {} cannot plan graph root {}",
960                self.entity, node.entity
961            ))));
962        }
963        let mut node = node;
964        let mut plan = GraphMutationPlan::default();
965        self.collect_graph_plan(&mut node, &mut plan)?;
966        plan.planned_root = Some(node);
967        plan.rebuild_batches();
968        Ok(plan)
969    }
970
971    pub fn execute_graph_plan(
972        &self,
973        plan: GraphMutationPlan,
974    ) -> Result<GraphNode, RepositoryError<E::Error>> {
975        let Some(root) = plan.planned_root else {
976            return Err(RepositoryError::Runtime(RuntimeError::Graph(
977                "graph mutation plan has no planned root".to_owned(),
978            )));
979        };
980        self.upsert_graph_node(root)
981    }
982
983    pub fn graph_node_from_entity<T>(&self, entity: T) -> Result<GraphNode, RuntimeError>
984    where
985        T: Entity,
986    {
987        let descriptor = T::entity_descriptor();
988        if descriptor.name != self.entity {
989            return Err(RuntimeError::Graph(format!(
990                "resolved repository {} cannot extract graph root {}",
991                self.entity, descriptor.name
992            )));
993        }
994        self.graph_node_from_record(&descriptor.name, entity.into_record())
995    }
996
997    fn collect_graph_plan(
998        &self,
999        node: &mut GraphNode,
1000        plan: &mut GraphMutationPlan,
1001    ) -> Result<(), RepositoryError<E::Error>> {
1002        match node.operation {
1003            GraphOperation::Reference => {
1004                plan.push(
1005                    node.entity.clone(),
1006                    GraphMutationKind::Reference,
1007                    node.values.clone(),
1008                    Vec::new(),
1009                );
1010                return Ok(());
1011            }
1012            GraphOperation::Remove => {
1013                plan.push(
1014                    node.entity.clone(),
1015                    GraphMutationKind::Delete,
1016                    node.values.clone(),
1017                    Vec::new(),
1018                );
1019                return Ok(());
1020            }
1021            GraphOperation::Upsert => {}
1022        }
1023
1024        let descriptor = self
1025            .repository
1026            .metadata
1027            .context
1028            .require_entity(&node.entity)
1029            .map_err(RepositoryError::Runtime)?;
1030        let id_property = descriptor.id_property().cloned();
1031        let id = id_property.as_ref().and_then(|property| {
1032            node.values
1033                .get(&property.name)
1034                .filter(|value| !matches!(value, Value::Null))
1035                .cloned()
1036        });
1037        let is_update = match (id_property.as_ref(), id.as_ref()) {
1038            (Some(id_property), Some(id)) => self
1039                .fetch_graph_current_row(&node.entity, &id_property.name, id)?
1040                .is_some(),
1041            _ => false,
1042        };
1043        if !is_update {
1044            if let Some(id_property) = id_property.as_ref() {
1045                let needs_id = !node.values.contains_key(&id_property.name)
1046                    || matches!(node.values.get(&id_property.name), Some(Value::Null));
1047                if needs_id {
1048                    let id = self
1049                        .repository
1050                        .metadata
1051                        .context
1052                        .next_id(&node.entity)
1053                        .map_err(RepositoryError::Runtime)?;
1054                    node.values.insert(id_property.name.clone(), Value::U64(id));
1055                }
1056            }
1057        }
1058        let update_fields = if is_update {
1059            let mut excluded = Vec::new();
1060            if let Some(id_property) = id_property.as_ref() {
1061                excluded.push(id_property.name.clone());
1062            }
1063            if let Some(version_property) = descriptor.version_property() {
1064                excluded.push(version_property.name.clone());
1065            }
1066            sorted_update_fields(&node.values, excluded)
1067        } else {
1068            Vec::new()
1069        };
1070        plan.push(
1071            node.entity.clone(),
1072            if is_update {
1073                GraphMutationKind::Update
1074            } else {
1075                GraphMutationKind::Create
1076            },
1077            node.values.clone(),
1078            update_fields,
1079        );
1080
1081        for (name, children) in &mut node.relations {
1082            let relation = descriptor.relation_by_name(name).ok_or_else(|| {
1083                RepositoryError::Runtime(RuntimeError::MissingRelation {
1084                    entity: node.entity.clone(),
1085                    relation: name.clone(),
1086                })
1087            })?;
1088            let child_repo = self.scoped_repository(relation.target_entity.clone());
1089            for child in children {
1090                ensure_relation_target(&node.entity, name, &relation.target_entity, child)?;
1091                child_repo.collect_graph_plan(child, plan)?;
1092            }
1093        }
1094        Ok(())
1095    }
1096
1097    pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
1098        let command = self
1099            .prepare_update_command(command)
1100            .map_err(RepositoryError::Runtime)?;
1101        self.execute_prepared_update(command)
1102    }
1103
1104    pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
1105        let mut command = command.clone();
1106        if let Some(behavior) = self.behavior() {
1107            behavior
1108                .before_delete(self.repository.metadata.context, &mut command)
1109                .map_err(RepositoryError::Runtime)?;
1110        }
1111        let affected = self.repository.delete(&command)?;
1112        self.emit_event(EntityEvent::deleted(
1113            command.entity,
1114            command.id,
1115            command.expected_version,
1116        ))
1117        .map_err(RepositoryError::Runtime)?;
1118        Ok(affected)
1119    }
1120
1121    pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
1122        let mut command = command.clone();
1123        if let Some(behavior) = self.behavior() {
1124            behavior
1125                .before_recover(self.repository.metadata.context, &mut command)
1126                .map_err(RepositoryError::Runtime)?;
1127        }
1128        let affected = self.repository.recover(&command)?;
1129        self.emit_event(EntityEvent::recovered(
1130            command.entity,
1131            command.id,
1132            command.expected_version,
1133        ))
1134        .map_err(RepositoryError::Runtime)?;
1135        Ok(affected)
1136    }
1137
1138    fn emit_event(&self, event: EntityEvent) -> Result<(), RuntimeError> {
1139        self.repository.metadata.context.send_event(event)
1140    }
1141
1142    fn execute_prepared_insert(
1143        &self,
1144        command: InsertCommand,
1145    ) -> Result<u64, RepositoryError<E::Error>> {
1146        let affected = self.repository.insert(&command)?;
1147        self.emit_event(EntityEvent::created(command.entity, command.values))
1148            .map_err(RepositoryError::Runtime)?;
1149        Ok(affected)
1150    }
1151
1152    fn execute_prepared_update(
1153        &self,
1154        command: UpdateCommand,
1155    ) -> Result<u64, RepositoryError<E::Error>> {
1156        let affected = self.repository.update(&command)?;
1157        let updated_fields = command.values.keys().cloned().collect();
1158        let mut values = command.values.clone();
1159        values.insert("id".to_owned(), command.id.clone());
1160        if let Some(version) = command.expected_version {
1161            values.insert("version".to_owned(), Value::I64(version + 1));
1162        }
1163        self.emit_event(EntityEvent {
1164            kind: crate::EntityEventKind::Updated,
1165            entity: command.entity,
1166            values,
1167            updated_fields,
1168        })
1169        .map_err(RepositoryError::Runtime)?;
1170        Ok(affected)
1171    }
1172
1173    fn insert_graph_node(
1174        &self,
1175        mut node: GraphNode,
1176    ) -> Result<GraphNode, RepositoryError<E::Error>> {
1177        match node.operation {
1178            GraphOperation::Upsert => {}
1179            GraphOperation::Reference => return self.validate_reference_node(node),
1180            GraphOperation::Remove => {
1181                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
1182                    "create graph cannot remove node {}",
1183                    node.entity
1184                ))));
1185            }
1186        }
1187
1188        let descriptor = self
1189            .repository
1190            .metadata
1191            .context
1192            .require_entity(&node.entity)
1193            .map_err(RepositoryError::Runtime)?;
1194
1195        let mut one_relations = Vec::new();
1196        let mut many_relations = Vec::new();
1197        for (name, children) in std::mem::take(&mut node.relations) {
1198            let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
1199                RepositoryError::Runtime(RuntimeError::MissingRelation {
1200                    entity: node.entity.clone(),
1201                    relation: name.clone(),
1202                })
1203            })?;
1204            if relation.many {
1205                many_relations.push((name, relation.clone(), children));
1206            } else {
1207                one_relations.push((name, relation.clone(), children));
1208            }
1209        }
1210
1211        for (name, relation, children) in one_relations {
1212            if children.len() > 1 {
1213                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
1214                    "relation {}.{} expects one child, got {}",
1215                    node.entity,
1216                    name,
1217                    children.len()
1218                ))));
1219            }
1220            let mut saved_children = Vec::new();
1221            for child in children {
1222                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
1223                let child_repo = self.scoped_repository(child.entity.clone());
1224                let saved_child = child_repo.insert_graph_node(child)?;
1225                if relation.attach {
1226                    let foreign_value = saved_child
1227                        .values
1228                        .get(&relation.foreign_key)
1229                        .cloned()
1230                        .ok_or_else(|| {
1231                            RepositoryError::Runtime(RuntimeError::Graph(format!(
1232                                "saved child {} missing foreign key {} for relation {}.{}",
1233                                relation.target_entity, relation.foreign_key, node.entity, name
1234                            )))
1235                        })?;
1236                    node.values
1237                        .insert(relation.local_key.clone(), foreign_value);
1238                }
1239                saved_children.push(saved_child);
1240            }
1241            node.relations.insert(name, saved_children);
1242        }
1243
1244        let command = self
1245            .prepare_insert_command(&InsertCommand {
1246                entity: node.entity.clone(),
1247                values: node.values.clone(),
1248            })
1249            .map_err(RepositoryError::Runtime)?;
1250        self.execute_prepared_insert(command.clone())?;
1251        node.values = command.values;
1252
1253        for (name, relation, children) in many_relations {
1254            let local_value = node
1255                .values
1256                .get(&relation.local_key)
1257                .cloned()
1258                .ok_or_else(|| {
1259                    RepositoryError::Runtime(RuntimeError::Graph(format!(
1260                        "parent {} missing local key {} for relation {}",
1261                        node.entity, relation.local_key, name
1262                    )))
1263                })?;
1264            let mut saved_children = Vec::new();
1265            for mut child in children {
1266                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
1267                if relation.attach {
1268                    child
1269                        .values
1270                        .insert(relation.foreign_key.clone(), local_value.clone());
1271                }
1272                let child_repo = self.scoped_repository(child.entity.clone());
1273                saved_children.push(child_repo.insert_graph_node(child)?);
1274            }
1275            node.relations.insert(name, saved_children);
1276        }
1277
1278        Ok(node)
1279    }
1280
1281    fn upsert_graph_node(
1282        &self,
1283        mut node: GraphNode,
1284    ) -> Result<GraphNode, RepositoryError<E::Error>> {
1285        match node.operation {
1286            GraphOperation::Upsert => {}
1287            GraphOperation::Reference => return self.validate_reference_node(node),
1288            GraphOperation::Remove => {
1289                self.validate_remove_node(&node)?;
1290                self.delete_graph_node(&node)?;
1291                return Ok(node);
1292            }
1293        }
1294
1295        let descriptor = self
1296            .repository
1297            .metadata
1298            .context
1299            .require_entity(&node.entity)
1300            .map_err(RepositoryError::Runtime)?;
1301        let Some(id_property) = descriptor.id_property() else {
1302            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
1303                "entity {} has no id property for graph upsert",
1304                node.entity
1305            ))));
1306        };
1307        let Some(id) = node
1308            .values
1309            .get(&id_property.name)
1310            .filter(|value| !matches!(value, Value::Null))
1311            .cloned()
1312        else {
1313            return self.insert_graph_node(node);
1314        };
1315
1316        if self
1317            .fetch_graph_current_row(&node.entity, &id_property.name, &id)?
1318            .is_none()
1319        {
1320            return self.insert_graph_node(node);
1321        }
1322
1323        let mut one_relations = Vec::new();
1324        let mut many_relations = Vec::new();
1325        for (name, children) in std::mem::take(&mut node.relations) {
1326            let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
1327                RepositoryError::Runtime(RuntimeError::MissingRelation {
1328                    entity: node.entity.clone(),
1329                    relation: name.clone(),
1330                })
1331            })?;
1332            if relation.many {
1333                many_relations.push((name, relation.clone(), children));
1334            } else {
1335                one_relations.push((name, relation.clone(), children));
1336            }
1337        }
1338
1339        for (name, relation, children) in one_relations {
1340            if children.len() > 1 {
1341                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
1342                    "relation {}.{} expects one child, got {}",
1343                    node.entity,
1344                    name,
1345                    children.len()
1346                ))));
1347            }
1348            let mut saved_children = Vec::new();
1349            for child in children {
1350                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
1351                let child_repo = self.scoped_repository(child.entity.clone());
1352                let saved_child = child_repo.upsert_graph_node(child)?;
1353                if relation.attach {
1354                    let foreign_value = saved_child
1355                        .values
1356                        .get(&relation.foreign_key)
1357                        .cloned()
1358                        .ok_or_else(|| {
1359                            RepositoryError::Runtime(RuntimeError::Graph(format!(
1360                                "saved child {} missing foreign key {} for relation {}.{}",
1361                                relation.target_entity, relation.foreign_key, node.entity, name
1362                            )))
1363                        })?;
1364                    node.values
1365                        .insert(relation.local_key.clone(), foreign_value);
1366                }
1367                saved_children.push(saved_child);
1368            }
1369            node.relations.insert(name, saved_children);
1370        }
1371
1372        let update = self.graph_update_command(&node, descriptor, id_property, &id);
1373        if !update.values.is_empty() || update.expected_version.is_some() {
1374            let prepared_update = self
1375                .prepare_update_command(&update)
1376                .map_err(RepositoryError::Runtime)?;
1377            self.execute_prepared_update(prepared_update.clone())?;
1378            for (field, value) in &prepared_update.values {
1379                node.values.insert(field.clone(), value.clone());
1380            }
1381            if let Some(version_property) = descriptor.version_property() {
1382                if let Some(expected_version) = prepared_update.expected_version {
1383                    node.values.insert(
1384                        version_property.name.clone(),
1385                        Value::I64(expected_version + 1),
1386                    );
1387                }
1388            }
1389        }
1390
1391        for (name, relation, children) in many_relations {
1392            let local_value = node
1393                .values
1394                .get(&relation.local_key)
1395                .cloned()
1396                .ok_or_else(|| {
1397                    RepositoryError::Runtime(RuntimeError::Graph(format!(
1398                        "parent {} missing local key {} for relation {}",
1399                        node.entity, relation.local_key, name
1400                    )))
1401                })?;
1402            let child_repo = self.scoped_repository(relation.target_entity.clone());
1403            let existing_children = child_repo.fetch_graph_children(
1404                &relation.target_entity,
1405                &relation.foreign_key,
1406                &local_value,
1407            )?;
1408            let child_descriptor = self
1409                .repository
1410                .metadata
1411                .context
1412                .require_entity(&relation.target_entity)
1413                .map_err(RepositoryError::Runtime)?;
1414            let child_id_property = child_descriptor.id_property().ok_or_else(|| {
1415                RepositoryError::Runtime(RuntimeError::Graph(format!(
1416                    "entity {} has no id property for graph diff",
1417                    relation.target_entity
1418                )))
1419            })?;
1420            let mut existing_by_id = BTreeMap::new();
1421            for child in existing_children {
1422                if let Some(id) = child.get(&child_id_property.name) {
1423                    existing_by_id.insert(graph_identity_key(id), child);
1424                }
1425            }
1426
1427            let mut seen = std::collections::BTreeSet::new();
1428            let mut saved_children = Vec::new();
1429            for mut child in children {
1430                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
1431                if relation.attach && child.operation != GraphOperation::Reference {
1432                    child
1433                        .values
1434                        .insert(relation.foreign_key.clone(), local_value.clone());
1435                }
1436                if let Some(child_id) = child
1437                    .values
1438                    .get(&child_id_property.name)
1439                    .filter(|value| !matches!(value, Value::Null))
1440                {
1441                    let key = graph_identity_key(child_id);
1442                    if !seen.insert(key.clone()) {
1443                        return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
1444                            "duplicate child id {key} in relation {}.{}",
1445                            node.entity, name
1446                        ))));
1447                    }
1448                }
1449                saved_children.push(child_repo.upsert_graph_node(child)?);
1450            }
1451
1452            if relation.delete_missing {
1453                for (id_key, existing) in existing_by_id {
1454                    if seen.contains(&id_key) {
1455                        continue;
1456                    }
1457                    let Some(existing_id) = existing.get(&child_id_property.name).cloned() else {
1458                        continue;
1459                    };
1460                    let mut delete =
1461                        DeleteCommand::new(relation.target_entity.clone(), existing_id);
1462                    if let Some(version) = graph_record_version(&existing, child_descriptor) {
1463                        delete = delete.expected_version(version);
1464                    }
1465                    child_repo.delete(&delete)?;
1466                }
1467            }
1468
1469            node.relations.insert(name, saved_children);
1470        }
1471
1472        Ok(node)
1473    }
1474
1475    fn validate_reference_node(
1476        &self,
1477        node: GraphNode,
1478    ) -> Result<GraphNode, RepositoryError<E::Error>> {
1479        if !node.relations.is_empty() {
1480            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
1481                "reference node {} cannot contain child relations",
1482                node.entity
1483            ))));
1484        }
1485        let descriptor = self
1486            .repository
1487            .metadata
1488            .context
1489            .require_entity(&node.entity)
1490            .map_err(RepositoryError::Runtime)?;
1491        let id_property = descriptor.id_property().ok_or_else(|| {
1492            RepositoryError::Runtime(RuntimeError::Graph(format!(
1493                "entity {} has no id property for graph reference",
1494                node.entity
1495            )))
1496        })?;
1497        let id = node
1498            .values
1499            .get(&id_property.name)
1500            .filter(|value| !matches!(value, Value::Null))
1501            .cloned()
1502            .ok_or_else(|| {
1503                RepositoryError::Runtime(RuntimeError::Graph(format!(
1504                    "reference node {} missing id property {}",
1505                    node.entity, id_property.name
1506                )))
1507            })?;
1508
1509        for field in node.values.keys() {
1510            if field == &id_property.name {
1511                continue;
1512            }
1513            if descriptor
1514                .version_property()
1515                .map(|property| field == &property.name)
1516                .unwrap_or(false)
1517            {
1518                continue;
1519            }
1520            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
1521                "reference node {} cannot carry mutable field {}",
1522                node.entity, field
1523            ))));
1524        }
1525
1526        let current = self
1527            .fetch_graph_current_row(&node.entity, &id_property.name, &id)?
1528            .ok_or_else(|| {
1529                RepositoryError::Runtime(RuntimeError::Graph(format!(
1530                    "reference node {}({}) does not exist",
1531                    node.entity,
1532                    graph_identity_key(&id)
1533                )))
1534            })?;
1535
1536        if let Some(version_property) = descriptor.version_property() {
1537            if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
1538                if *existing_version < 0 {
1539                    return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
1540                        "reference node {}({}) is deleted",
1541                        node.entity,
1542                        graph_identity_key(&id)
1543                    ))));
1544                }
1545                if let Some(Value::I64(expected_version)) = node.values.get(&version_property.name)
1546                {
1547                    if expected_version != existing_version {
1548                        return Err(RepositoryError::Runtime(
1549                            RuntimeError::OptimisticLockConflict {
1550                                entity: node.entity,
1551                                id: graph_identity_key(&id),
1552                            },
1553                        ));
1554                    }
1555                }
1556            }
1557        }
1558
1559        Ok(GraphNode {
1560            entity: node.entity,
1561            values: current,
1562            relations: BTreeMap::new(),
1563            operation: GraphOperation::Reference,
1564        })
1565    }
1566
1567    fn validate_remove_node(&self, node: &GraphNode) -> Result<(), RepositoryError<E::Error>> {
1568        if !node.relations.is_empty() {
1569            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
1570                "remove node {} cannot contain child relations",
1571                node.entity
1572            ))));
1573        }
1574        let descriptor = self
1575            .repository
1576            .metadata
1577            .context
1578            .require_entity(&node.entity)
1579            .map_err(RepositoryError::Runtime)?;
1580        let id_property = descriptor.id_property().ok_or_else(|| {
1581            RepositoryError::Runtime(RuntimeError::Graph(format!(
1582                "entity {} has no id property for graph remove",
1583                node.entity
1584            )))
1585        })?;
1586        let id = node
1587            .values
1588            .get(&id_property.name)
1589            .filter(|value| !matches!(value, Value::Null))
1590            .cloned()
1591            .ok_or_else(|| {
1592                RepositoryError::Runtime(RuntimeError::Graph(format!(
1593                    "remove node {} missing id property {}",
1594                    node.entity, id_property.name
1595                )))
1596            })?;
1597        let current = self
1598            .fetch_graph_current_row(&node.entity, &id_property.name, &id)?
1599            .ok_or_else(|| {
1600                RepositoryError::Runtime(RuntimeError::Graph(format!(
1601                    "remove node {}({}) does not exist",
1602                    node.entity,
1603                    graph_identity_key(&id)
1604                )))
1605            })?;
1606        if let Some(version_property) = descriptor.version_property() {
1607            if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
1608                if *existing_version < 0 {
1609                    return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
1610                        "remove node {}({}) is already deleted",
1611                        node.entity,
1612                        graph_identity_key(&id)
1613                    ))));
1614                }
1615            }
1616        }
1617        Ok(())
1618    }
1619
1620    fn graph_node_from_record(
1621        &self,
1622        entity: &str,
1623        record: Record,
1624    ) -> Result<GraphNode, RuntimeError> {
1625        let descriptor = self.repository.metadata.context.require_entity(entity)?;
1626        let mut node = GraphNode::new(entity);
1627
1628        for (field, value) in record {
1629            let Some(relation) = descriptor.relation_by_name(&field) else {
1630                node.values.insert(field, value);
1631                continue;
1632            };
1633
1634            match value {
1635                Value::Null => {
1636                    node.relations.entry(field).or_default();
1637                }
1638                Value::Object(record) => {
1639                    let child = self.graph_node_from_record(&relation.target_entity, record)?;
1640                    node.relations.entry(field).or_default().push(child);
1641                }
1642                Value::List(values) => {
1643                    let children = node.relations.entry(field.clone()).or_default();
1644                    for value in values {
1645                        let Value::Object(record) = value else {
1646                            return Err(RuntimeError::Graph(format!(
1647                                "relation {}.{} expects object children, got {:?}",
1648                                entity, field, value
1649                            )));
1650                        };
1651                        children
1652                            .push(self.graph_node_from_record(&relation.target_entity, record)?);
1653                    }
1654                }
1655                other => {
1656                    return Err(RuntimeError::Graph(format!(
1657                        "relation {}.{} expects object/list/null, got {:?}",
1658                        entity, field, other
1659                    )));
1660                }
1661            }
1662        }
1663
1664        Ok(node)
1665    }
1666
1667    fn graph_update_command(
1668        &self,
1669        node: &GraphNode,
1670        descriptor: &EntityDescriptor,
1671        id_property: &PropertyDescriptor,
1672        id: &Value,
1673    ) -> UpdateCommand {
1674        let mut command = UpdateCommand::new(node.entity.clone(), id.clone());
1675        if let Some(version_property) = descriptor.version_property() {
1676            if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
1677                command = command.expected_version(*version);
1678            }
1679        }
1680        for property in descriptor.properties.iter().filter(|property| {
1681            !property.is_id && !property.is_version && node.values.contains_key(&property.name)
1682        }) {
1683            if property.name == id_property.name {
1684                continue;
1685            }
1686            if let Some(value) = node.values.get(&property.name) {
1687                command.values.insert(property.name.clone(), value.clone());
1688            }
1689        }
1690        command
1691    }
1692
1693    fn delete_graph_node(&self, node: &GraphNode) -> Result<u64, RepositoryError<E::Error>> {
1694        let descriptor = self
1695            .repository
1696            .metadata
1697            .context
1698            .require_entity(&node.entity)
1699            .map_err(RepositoryError::Runtime)?;
1700        let id_property = descriptor.id_property().ok_or_else(|| {
1701            RepositoryError::Runtime(RuntimeError::Graph(format!(
1702                "entity {} has no id property for graph remove",
1703                node.entity
1704            )))
1705        })?;
1706        let id = node.values.get(&id_property.name).cloned().ok_or_else(|| {
1707            RepositoryError::Runtime(RuntimeError::Graph(format!(
1708                "remove node {} missing id property {}",
1709                node.entity, id_property.name
1710            )))
1711        })?;
1712        let mut delete = DeleteCommand::new(node.entity.clone(), id);
1713        if let Some(version_property) = descriptor.version_property() {
1714            if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
1715                delete = delete.expected_version(*version);
1716            }
1717        }
1718        self.delete(&delete)
1719    }
1720
1721    fn fetch_graph_current_row(
1722        &self,
1723        entity: &str,
1724        id_property: &str,
1725        id: &Value,
1726    ) -> Result<Option<Record>, RepositoryError<E::Error>> {
1727        let mut rows = self
1728            .scoped_repository(entity.to_owned())
1729            .fetch_all(&SelectQuery::new(entity).filter(Expr::eq(id_property, id.clone())))?;
1730        Ok(rows.pop())
1731    }
1732
1733    fn fetch_graph_children(
1734        &self,
1735        entity: &str,
1736        foreign_key: &str,
1737        parent_value: &Value,
1738    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
1739        self.scoped_repository(entity.to_owned()).fetch_all(
1740            &SelectQuery::new(entity).filter(Expr::eq(foreign_key, parent_value.clone())),
1741        )
1742    }
1743
1744    pub fn relation_loads(&self) -> Vec<String> {
1745        self.behavior()
1746            .map(|behavior| behavior.relation_loads(self.repository.metadata.context))
1747            .unwrap_or_default()
1748    }
1749
1750    pub fn relation_plans(&self) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
1751        self.build_relation_plans(&self.entity, &self.relation_loads())
1752    }
1753
1754    pub fn relation_query(
1755        &self,
1756        relation_name: &str,
1757        parent_rows: &[Record],
1758    ) -> Result<SelectQuery, RuntimeError> {
1759        let plan = self
1760            .relation_plans()?
1761            .into_iter()
1762            .find(|plan| plan.relation_name == relation_name)
1763            .ok_or_else(|| RuntimeError::MissingRelation {
1764                entity: self.entity.clone(),
1765                relation: relation_name.to_owned(),
1766            })?;
1767        Ok(self.query_for_plan(&plan, parent_rows))
1768    }
1769
1770    pub fn enhance_relations(
1771        &self,
1772        parent_rows: &mut [Record],
1773    ) -> Result<(), RepositoryError<E::Error>> {
1774        let plans = self.relation_plans().map_err(RepositoryError::Runtime)?;
1775        for plan in plans {
1776            self.enhance_plan(parent_rows, &plan)?;
1777        }
1778        Ok(())
1779    }
1780
1781    pub fn enhance_query_relations(
1782        &self,
1783        parent_rows: &mut [Record],
1784        query: &SelectQuery,
1785    ) -> Result<(), RepositoryError<E::Error>> {
1786        let plans = self
1787            .build_relation_plans_from_loads(&query.entity, &query.relations)
1788            .map_err(RepositoryError::Runtime)?;
1789        for plan in plans {
1790            self.enhance_plan(parent_rows, &plan)?;
1791        }
1792        Ok(())
1793    }
1794
1795    pub fn enhance_relation_aggregates(
1796        &self,
1797        parent_rows: &mut [Record],
1798        relation_aggregates: &[RelationAggregate],
1799        parent_cache_options: Option<teaql_core::AggregationCacheOptions>,
1800    ) -> Result<(), RepositoryError<E::Error>> {
1801        for aggregate in relation_aggregates {
1802            self.enhance_relation_aggregate(parent_rows, aggregate, parent_cache_options)?;
1803        }
1804        Ok(())
1805    }
1806
1807    pub fn enhance_object_group_bys(
1808        &self,
1809        rows: &mut [Record],
1810        object_group_bys: &[ObjectGroupBy],
1811    ) -> Result<(), RepositoryError<E::Error>> {
1812        for group_by in object_group_bys {
1813            let ids = rows
1814                .iter()
1815                .filter_map(|row| row.get(&group_by.storage_field).cloned())
1816                .collect::<Vec<_>>();
1817            if ids.is_empty() {
1818                continue;
1819            }
1820            let mut query = group_by.query.clone();
1821            ensure_projection(&mut query, "id");
1822            query = query.and_filter(Expr::in_list("id", ids));
1823            let object_rows = self
1824                .scoped_repository(query.entity.clone())
1825                .fetch_all(&query)?
1826                .into_iter()
1827                .filter_map(|row| {
1828                    row.get("id")
1829                        .cloned()
1830                        .map(|id| (relation_bucket_key(&id), row))
1831                })
1832                .collect::<BTreeMap<_, _>>();
1833            for row in rows.iter_mut() {
1834                if let Some(key) = row.get(&group_by.storage_field).map(relation_bucket_key) {
1835                    let value = object_rows
1836                        .get(&key)
1837                        .cloned()
1838                        .map(Value::object)
1839                        .unwrap_or(Value::Null);
1840                    row.insert(group_by.property_name.clone(), value);
1841                }
1842            }
1843        }
1844        Ok(())
1845    }
1846
1847    pub fn enhance_child_queries(
1848        &self,
1849        rows: &mut [Record],
1850        child_queries: &[SelectQuery],
1851    ) -> Result<(), RepositoryError<E::Error>> {
1852        for child_query in child_queries {
1853            let ids = rows
1854                .iter()
1855                .filter_map(|row| row.get("id").cloned())
1856                .collect::<Vec<_>>();
1857            if ids.is_empty() {
1858                continue;
1859            }
1860            let mut query = child_query.clone();
1861            ensure_projection(&mut query, "id");
1862            query = query.and_filter(Expr::in_list("id", ids));
1863            let child_rows = self
1864                .scoped_repository(query.entity.clone())
1865                .fetch_all(&query)?
1866                .into_iter()
1867                .filter_map(|row| {
1868                    row.get("id")
1869                        .cloned()
1870                        .map(|id| (relation_bucket_key(&id), row))
1871                })
1872                .collect::<BTreeMap<_, _>>();
1873            for row in rows.iter_mut() {
1874                if let Some(key) = row.get("id").map(relation_bucket_key) {
1875                    if let Some(child) = child_rows.get(&key) {
1876                        row.extend(child.clone());
1877                    }
1878                }
1879            }
1880        }
1881        Ok(())
1882    }
1883
1884    fn enhance_relation_aggregate(
1885        &self,
1886        parent_rows: &mut [Record],
1887        aggregate: &RelationAggregate,
1888        parent_cache_options: Option<teaql_core::AggregationCacheOptions>,
1889    ) -> Result<(), RepositoryError<E::Error>> {
1890        let plan = self
1891            .build_relation_plans_from_loads(
1892                &self.entity,
1893                &[RelationLoad::with_query(
1894                    aggregate.relation_name.clone(),
1895                    aggregate.query.clone(),
1896                )],
1897            )
1898            .map_err(RepositoryError::Runtime)?
1899            .into_iter()
1900            .next()
1901            .ok_or_else(|| {
1902                RepositoryError::Runtime(RuntimeError::MissingRelation {
1903                    entity: self.entity.clone(),
1904                    relation: aggregate.relation_name.clone(),
1905                })
1906            })?;
1907
1908        let ids = parent_rows
1909            .iter()
1910            .filter_map(|row| row.get(&plan.local_key).cloned())
1911            .collect::<Vec<_>>();
1912        if ids.is_empty() {
1913            attach_empty_relation_aggregate(parent_rows, &aggregate.alias, aggregate.single_result);
1914            return Ok(());
1915        }
1916
1917        let child_repo = self.scoped_repository(plan.target_entity.clone());
1918        let mut query = aggregate.query.clone();
1919        query.entity = plan.target_entity.clone();
1920        if query.aggregation_cache.is_none() {
1921            if let Some(options) = parent_cache_options.filter(|options| options.propagate) {
1922                query.aggregation_cache = Some(teaql_core::AggregationCacheOptions::enabled(
1923                    options.propagate_cache_expired_millis,
1924                ));
1925            }
1926        }
1927        query.projection.clear();
1928        query.expr_projection.clear();
1929        query.order_by.clear();
1930        query.slice = None;
1931        query.relations.clear();
1932        if query.aggregates.is_empty() {
1933            let alias = if aggregate.single_result {
1934                aggregate.alias.clone()
1935            } else {
1936                "count".to_owned()
1937            };
1938            query = query.aggregate(Aggregate::count(alias));
1939        }
1940        if !query
1941            .group_by
1942            .iter()
1943            .any(|field| field == &plan.foreign_key)
1944        {
1945            query = query.group_by(plan.foreign_key.clone());
1946        }
1947        query = query.and_filter(Expr::in_list(plan.foreign_key.clone(), ids));
1948
1949        let aggregate_rows = child_repo.fetch_all(&query)?;
1950        attach_relation_aggregate_rows(parent_rows, &plan, aggregate, aggregate_rows);
1951        Ok(())
1952    }
1953
1954    fn build_relation_plans(
1955        &self,
1956        entity: &str,
1957        loads: &[String],
1958    ) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
1959        let descriptor = self.repository.metadata.context.require_entity(entity)?;
1960        let mut grouped: BTreeMap<String, Vec<String>> = BTreeMap::new();
1961        for load in loads {
1962            if let Some((head, tail)) = load.split_once('.') {
1963                grouped
1964                    .entry(head.to_owned())
1965                    .or_default()
1966                    .push(tail.to_owned());
1967            } else {
1968                grouped.entry(load.clone()).or_default();
1969            }
1970        }
1971
1972        grouped
1973            .into_iter()
1974            .map(|(name, child_loads)| {
1975                let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
1976                    RuntimeError::MissingRelation {
1977                        entity: entity.to_owned(),
1978                        relation: name.clone(),
1979                    }
1980                })?;
1981                let child_repo = self.scoped_repository(relation.target_entity.clone());
1982                let children =
1983                    child_repo.build_relation_plans(&relation.target_entity, &child_loads)?;
1984                Ok(RelationLoadPlan {
1985                    parent_entity: entity.to_owned(),
1986                    relation_name: relation.name.clone(),
1987                    path: relation.name.clone(),
1988                    target_entity: relation.target_entity.clone(),
1989                    local_key: relation.local_key.clone(),
1990                    foreign_key: relation.foreign_key.clone(),
1991                    many: relation.many,
1992                    query: None,
1993                    children,
1994                })
1995            })
1996            .collect()
1997    }
1998
1999    fn build_relation_plans_from_loads(
2000        &self,
2001        entity: &str,
2002        loads: &[RelationLoad],
2003    ) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
2004        let descriptor = self.repository.metadata.context.require_entity(entity)?;
2005        loads
2006            .iter()
2007            .map(|load| {
2008                let relation = descriptor.relation_by_name(&load.name).ok_or_else(|| {
2009                    RuntimeError::MissingRelation {
2010                        entity: entity.to_owned(),
2011                        relation: load.name.clone(),
2012                    }
2013                })?;
2014                let relation_query = load.query.as_deref().cloned();
2015                let child_loads = relation_query
2016                    .as_ref()
2017                    .map(|query| query.relations.as_slice())
2018                    .unwrap_or_default();
2019                let child_repo = self.scoped_repository(relation.target_entity.clone());
2020                let children = child_repo
2021                    .build_relation_plans_from_loads(&relation.target_entity, child_loads)?;
2022                Ok(RelationLoadPlan {
2023                    parent_entity: entity.to_owned(),
2024                    relation_name: relation.name.clone(),
2025                    path: relation.name.clone(),
2026                    target_entity: relation.target_entity.clone(),
2027                    local_key: relation.local_key.clone(),
2028                    foreign_key: relation.foreign_key.clone(),
2029                    many: relation.many,
2030                    query: relation_query,
2031                    children,
2032                })
2033            })
2034            .collect()
2035    }
2036
2037    fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, D, E> {
2038        ResolvedRepository {
2039            entity,
2040            repository: ContextRepository {
2041                metadata: UserContextMetadata {
2042                    context: self.repository.metadata.context,
2043                },
2044                dialect: self.repository.dialect,
2045                executor: self.repository.executor,
2046            },
2047        }
2048    }
2049
2050    fn enhance_plan(
2051        &self,
2052        parent_rows: &mut [Record],
2053        plan: &RelationLoadPlan,
2054    ) -> Result<(), RepositoryError<E::Error>> {
2055        let child_repo = self.scoped_repository(plan.target_entity.clone());
2056        let query = self.query_for_plan(plan, parent_rows);
2057        let child_rows = child_repo.fetch_all(&query)?;
2058        self.attach_relation_rows(parent_rows, plan, child_rows);
2059
2060        if !plan.children.is_empty() {
2061            for parent in parent_rows.iter_mut() {
2062                match parent.get_mut(&plan.relation_name) {
2063                    Some(Value::Object(child)) => {
2064                        child_repo.enhance_child_record(child, &plan.children)?;
2065                    }
2066                    Some(Value::List(values)) => {
2067                        for value in values.iter_mut() {
2068                            if let Value::Object(child) = value {
2069                                child_repo.enhance_child_record(child, &plan.children)?;
2070                            }
2071                        }
2072                    }
2073                    _ => {}
2074                }
2075            }
2076        }
2077        Ok(())
2078    }
2079
2080    fn enhance_child_record(
2081        &self,
2082        child: &mut Record,
2083        plans: &[RelationLoadPlan],
2084    ) -> Result<(), RepositoryError<E::Error>> {
2085        for plan in plans {
2086            self.enhance_plan(slice::from_mut(child), plan)?;
2087        }
2088        Ok(())
2089    }
2090
2091    fn query_for_plan(&self, plan: &RelationLoadPlan, parent_rows: &[Record]) -> SelectQuery {
2092        let ids = parent_rows
2093            .iter()
2094            .filter_map(|row| row.get(&plan.local_key).cloned())
2095            .collect::<Vec<_>>();
2096
2097        let mut query = plan
2098            .query
2099            .clone()
2100            .unwrap_or_else(|| SelectQuery::new(plan.target_entity.clone()));
2101        query.entity = plan.target_entity.clone();
2102        ensure_projection(&mut query, &plan.foreign_key);
2103        for child in &plan.children {
2104            ensure_projection(&mut query, &child.local_key);
2105        }
2106        if !ids.is_empty() {
2107            query = query.and_filter(Expr::in_list(plan.foreign_key.clone(), ids));
2108        }
2109        query
2110    }
2111
2112    fn attach_relation_rows(
2113        &self,
2114        parent_rows: &mut [Record],
2115        plan: &RelationLoadPlan,
2116        child_rows: Vec<Record>,
2117    ) {
2118        let mut buckets: BTreeMap<String, Vec<Record>> = BTreeMap::new();
2119        for child in child_rows {
2120            if let Some(key) = child.get(&plan.foreign_key) {
2121                buckets
2122                    .entry(relation_bucket_key(key))
2123                    .or_default()
2124                    .push(child);
2125            }
2126        }
2127
2128        for parent in parent_rows.iter_mut() {
2129            let Some(local_value) = parent.get(&plan.local_key) else {
2130                continue;
2131            };
2132            let bucket_key = relation_bucket_key(local_value);
2133            let related = buckets.get(&bucket_key).cloned().unwrap_or_default();
2134            if plan.many {
2135                parent.insert(
2136                    plan.relation_name.clone(),
2137                    Value::List(related.into_iter().map(Value::object).collect()),
2138                );
2139            } else {
2140                let value = related
2141                    .into_iter()
2142                    .next()
2143                    .map(Value::object)
2144                    .unwrap_or(Value::Null);
2145                parent.insert(plan.relation_name.clone(), value);
2146            }
2147        }
2148    }
2149}
2150
2151fn relation_bucket_key(value: &Value) -> String {
2152    match value {
2153        Value::Null => "null".to_owned(),
2154        Value::Bool(v) => format!("b:{v}"),
2155        Value::I64(v) => format!("i:{v}"),
2156        Value::U64(v) => format!("u:{v}"),
2157        Value::F64(v) => format!("f:{v}"),
2158        Value::Decimal(v) => format!("d:{v}"),
2159        Value::Text(v) => format!("t:{v}"),
2160        Value::Json(v) => format!("j:{v}"),
2161        Value::Date(v) => format!("d:{v}"),
2162        Value::Timestamp(v) => format!("ts:{}", v.to_rfc3339()),
2163        Value::Object(_) => "o".to_owned(),
2164        Value::List(_) => "l".to_owned(),
2165    }
2166}
2167
2168fn aggregation_cache_namespace(entity: &str) -> String {
2169    format!("entity:{entity}")
2170}
2171
2172fn invalidate_aggregation_cache_namespace(cache: &dyn AggregationCacheBackend, entity: &str) {
2173    let namespace = format!(
2174        "{}::{}",
2175        cache.namespace(),
2176        aggregation_cache_namespace(entity)
2177    );
2178    cache.invalidate_namespace(&namespace);
2179}
2180
2181fn aggregation_cache_key(
2182    cache_namespace: &str,
2183    query_namespace: &str,
2184    query: &CompiledQuery,
2185) -> String {
2186    format!(
2187        "{cache_namespace}::{query_namespace}::{}::{:?}",
2188        query.sql, query.params
2189    )
2190}
2191
2192fn ensure_projection(query: &mut SelectQuery, field: &str) {
2193    if !query.projection.is_empty()
2194        && !query
2195            .projection
2196            .iter()
2197            .any(|projection| projection == field)
2198    {
2199        query.projection.push(field.to_owned());
2200    }
2201}
2202
2203fn attach_empty_relation_aggregate(parent_rows: &mut [Record], alias: &str, single_result: bool) {
2204    let value = if single_result {
2205        Value::U64(0)
2206    } else {
2207        Value::List(Vec::new())
2208    };
2209    for parent in parent_rows {
2210        parent.insert(alias.to_owned(), value.clone());
2211    }
2212}
2213
2214fn attach_relation_aggregate_rows(
2215    parent_rows: &mut [Record],
2216    plan: &RelationLoadPlan,
2217    aggregate: &RelationAggregate,
2218    aggregate_rows: Vec<Record>,
2219) {
2220    let mut buckets: BTreeMap<String, Vec<Record>> = BTreeMap::new();
2221    for mut row in aggregate_rows {
2222        if let Some(key) = row.remove(&plan.foreign_key) {
2223            buckets
2224                .entry(relation_bucket_key(&key))
2225                .or_default()
2226                .push(row);
2227        }
2228    }
2229
2230    for parent in parent_rows {
2231        let value = parent
2232            .get(&plan.local_key)
2233            .and_then(|local_value| buckets.get(&relation_bucket_key(local_value)))
2234            .map(|rows| relation_aggregate_value(rows, aggregate.single_result))
2235            .unwrap_or_else(|| {
2236                if aggregate.single_result {
2237                    Value::U64(0)
2238                } else {
2239                    Value::List(Vec::new())
2240                }
2241            });
2242        parent.insert(aggregate.alias.clone(), value);
2243    }
2244}
2245
2246fn relation_aggregate_value(rows: &[Record], single_result: bool) -> Value {
2247    if single_result {
2248        rows.first()
2249            .map(single_relation_aggregate_value)
2250            .unwrap_or(Value::U64(0))
2251    } else {
2252        Value::List(rows.iter().cloned().map(Value::object).collect())
2253    }
2254}
2255
2256fn single_relation_aggregate_value(row: &Record) -> Value {
2257    if row.len() == 1 {
2258        row.values().next().cloned().unwrap_or(Value::Null)
2259    } else {
2260        Value::object(row.clone())
2261    }
2262}
2263
2264fn graph_record_version(record: &Record, descriptor: &EntityDescriptor) -> Option<i64> {
2265    descriptor
2266        .version_property()
2267        .and_then(|property| match record.get(&property.name) {
2268            Some(Value::I64(version)) => Some(*version),
2269            _ => None,
2270        })
2271}
2272
2273fn graph_identity_key(value: &Value) -> String {
2274    match value {
2275        Value::I64(value) if *value >= 0 => format!("u:{}", *value as u64),
2276        Value::U64(value) => format!("u:{value}"),
2277        _ => relation_bucket_key(value),
2278    }
2279}
2280
2281fn ensure_relation_target<ExecError>(
2282    parent_entity: &str,
2283    relation_name: &str,
2284    expected_entity: &str,
2285    child: &GraphNode,
2286) -> Result<(), RepositoryError<ExecError>> {
2287    if child.entity == expected_entity {
2288        return Ok(());
2289    }
2290    Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
2291        "relation {parent_entity}.{relation_name} expects {expected_entity}, got {}",
2292        child.entity
2293    ))))
2294}