1use std::collections::{BTreeMap, HashMap};
2use std::slice;
3use std::sync::{Arc, Mutex};
4use std::time::{Duration, Instant};
5
6use teaql_core::{
7 Aggregate, AggregationCacheOptions, DeleteCommand, Entity, EntityDescriptor, Expr,
8 InsertCommand, ObjectGroupBy, PropertyDescriptor, Record, RecoverCommand, RelationAggregate,
9 RelationLoad, SelectQuery, SmartList, UpdateCommand, Value,
10};
11use teaql_sql::{CompiledQuery, SqlDialect};
12
13use crate::{
14 CheckObjectStatus, ContextError, EntityEvent, GraphMutationKind, GraphMutationPlan, GraphNode,
15 GraphOperation, MetadataStore, RepositoryBehavior, RepositoryError, RuntimeError,
16 SqlLogOperation, UserContext, clear_record_status, mark_record_status, sorted_update_fields,
17};
18
19#[derive(Debug, Default)]
20pub struct InMemoryAggregationCache {
21 namespace: String,
22 entries: Mutex<HashMap<String, AggregationCacheEntry>>,
23}
24
25pub trait AggregationCacheBackend: Send + Sync {
26 fn namespace(&self) -> &str;
27 fn get(&self, key: &str, max_age_millis: u64) -> Option<Vec<Record>>;
28 fn put(&self, key: String, rows: Vec<Record>);
29 fn invalidate_namespace(&self, namespace: &str);
30}
31
32#[derive(Debug, Clone)]
33struct AggregationCacheEntry {
34 stored_at: Instant,
35 rows: Vec<Record>,
36}
37
38impl InMemoryAggregationCache {
39 pub fn with_namespace(namespace: impl Into<String>) -> Self {
40 Self {
41 namespace: namespace.into(),
42 entries: Mutex::new(HashMap::new()),
43 }
44 }
45
46 pub fn namespace(&self) -> &str {
47 &self.namespace
48 }
49}
50
51impl AggregationCacheBackend for InMemoryAggregationCache {
52 fn namespace(&self) -> &str {
53 &self.namespace
54 }
55
56 fn get(&self, key: &str, max_age_millis: u64) -> Option<Vec<Record>> {
57 let entries = self.entries.lock().ok()?;
58 let entry = entries.get(key)?;
59 if max_age_millis == 0 || entry.stored_at.elapsed() <= Duration::from_millis(max_age_millis)
60 {
61 Some(entry.rows.clone())
62 } else {
63 None
64 }
65 }
66
67 fn put(&self, key: String, rows: Vec<Record>) {
68 if let Ok(mut entries) = self.entries.lock() {
69 entries.insert(
70 key,
71 AggregationCacheEntry {
72 stored_at: Instant::now(),
73 rows,
74 },
75 );
76 }
77 }
78
79 fn invalidate_namespace(&self, namespace: &str) {
80 if let Ok(mut entries) = self.entries.lock() {
81 let prefix = format!("{namespace}::");
82 entries.retain(|key, _| !key.starts_with(&prefix));
83 }
84 }
85}
86
87impl InMemoryAggregationCache {
88 pub fn get(&self, key: &str, max_age_millis: u64) -> Option<Vec<Record>> {
89 AggregationCacheBackend::get(self, key, max_age_millis)
90 }
91
92 pub fn put(&self, key: String, rows: Vec<Record>) {
93 AggregationCacheBackend::put(self, key, rows);
94 }
95
96 pub fn clear(&self) {
97 if let Ok(mut entries) = self.entries.lock() {
98 entries.clear();
99 }
100 }
101
102 pub fn invalidate_namespace(&self, namespace: &str) {
103 AggregationCacheBackend::invalidate_namespace(self, namespace);
104 }
105}
106
107pub trait QueryExecutor {
108 type Error: std::error::Error + Send + Sync + 'static;
109
110 fn fetch_all(&self, query: &CompiledQuery) -> Result<Vec<Record>, Self::Error>;
111 fn execute(&self, query: &CompiledQuery) -> Result<u64, Self::Error>;
112
113 fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
114 Ok(GraphTransactionBoundary::Unsupported)
115 }
116
117 fn commit_transaction(&self) -> Result<(), Self::Error> {
118 Ok(())
119 }
120
121 fn rollback_transaction(&self) -> Result<(), Self::Error> {
122 Ok(())
123 }
124}
125
126#[derive(Debug, Clone, Copy, PartialEq, Eq)]
127pub enum GraphTransactionBoundary {
128 Started,
129 AlreadyActive,
130 Unsupported,
131}
132
133pub struct Repository<'a, D, M, E> {
134 dialect: &'a D,
135 metadata: &'a M,
136 executor: &'a E,
137}
138
139pub struct ContextRepository<'a, D, E> {
140 metadata: UserContextMetadata<'a>,
141 pub(crate) dialect: &'a D,
142 pub(crate) executor: &'a E,
143}
144
145pub struct ResolvedRepository<'a, D, E> {
146 entity: String,
147 repository: ContextRepository<'a, D, E>,
148}
149
150#[derive(Debug, Clone, PartialEq)]
151pub struct RelationLoadPlan {
152 pub parent_entity: String,
153 pub relation_name: String,
154 pub path: String,
155 pub target_entity: String,
156 pub local_key: String,
157 pub foreign_key: String,
158 pub many: bool,
159 pub query: Option<SelectQuery>,
160 pub children: Vec<RelationLoadPlan>,
161}
162
163pub(crate) struct UserContextMetadata<'a> {
164 pub(crate) context: &'a UserContext,
165}
166
167impl MetadataStore for UserContextMetadata<'_> {
168 fn entity(&self, name: &str) -> Option<&EntityDescriptor> {
169 self.context.entity(name)
170 }
171
172 fn all_entities(&self) -> Vec<&EntityDescriptor> {
173 self.context
174 .metadata
175 .as_ref()
176 .map(|metadata| metadata.all_entities())
177 .unwrap_or_default()
178 }
179}
180
181impl<'a, D, M, E> Repository<'a, D, M, E>
182where
183 D: SqlDialect,
184 M: MetadataStore,
185 E: QueryExecutor,
186{
187 pub fn new(dialect: &'a D, metadata: &'a M, executor: &'a E) -> Self {
188 Self {
189 dialect,
190 metadata,
191 executor,
192 }
193 }
194
195 pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
196 let entity = self
197 .metadata
198 .entity(&query.entity)
199 .ok_or_else(|| RuntimeError::MissingEntity(query.entity.clone()))?;
200 Ok(self.dialect.compile_select(entity, query)?)
201 }
202
203 pub fn compile_insert(&self, command: &InsertCommand) -> Result<CompiledQuery, RuntimeError> {
204 let entity = self
205 .metadata
206 .entity(&command.entity)
207 .ok_or_else(|| RuntimeError::MissingEntity(command.entity.clone()))?;
208 Ok(self.dialect.compile_insert(entity, command)?)
209 }
210
211 pub fn compile_update(&self, command: &UpdateCommand) -> Result<CompiledQuery, RuntimeError> {
212 let entity = self
213 .metadata
214 .entity(&command.entity)
215 .ok_or_else(|| RuntimeError::MissingEntity(command.entity.clone()))?;
216 Ok(self.dialect.compile_update(entity, command)?)
217 }
218
219 pub fn compile_delete(&self, command: &DeleteCommand) -> Result<CompiledQuery, RuntimeError> {
220 let entity = self
221 .metadata
222 .entity(&command.entity)
223 .ok_or_else(|| RuntimeError::MissingEntity(command.entity.clone()))?;
224 Ok(self.dialect.compile_delete(entity, command)?)
225 }
226
227 pub fn compile_recover(&self, command: &RecoverCommand) -> Result<CompiledQuery, RuntimeError> {
228 let entity = self
229 .metadata
230 .entity(&command.entity)
231 .ok_or_else(|| RuntimeError::MissingEntity(command.entity.clone()))?;
232 Ok(self.dialect.compile_recover(entity, command)?)
233 }
234
235 pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
236 let compiled = self.compile(query).map_err(RepositoryError::Runtime)?;
237 self.executor
238 .fetch_all(&compiled)
239 .map_err(RepositoryError::Executor)
240 }
241
242 pub fn fetch_smart_list(
243 &self,
244 query: &SelectQuery,
245 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
246 self.fetch_all(query).map(SmartList::from)
247 }
248
249 pub fn fetch_entities<T>(
250 &self,
251 query: &SelectQuery,
252 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
253 where
254 T: Entity,
255 {
256 self.fetch_all(query)?
257 .into_iter()
258 .map(T::from_record)
259 .collect::<Result<Vec<_>, _>>()
260 .map(SmartList::from)
261 .map_err(RepositoryError::Entity)
262 }
263
264 pub fn fetch_enhanced_entities<T>(
265 &self,
266 query: &SelectQuery,
267 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
268 where
269 T: Entity,
270 {
271 self.fetch_entities(query)
272 }
273
274 pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
275 let compiled = self
276 .compile_insert(command)
277 .map_err(RepositoryError::Runtime)?;
278 self.executor
279 .execute(&compiled)
280 .map_err(RepositoryError::Executor)
281 }
282
283 pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
284 let compiled = self
285 .compile_update(command)
286 .map_err(RepositoryError::Runtime)?;
287 let affected = self
288 .executor
289 .execute(&compiled)
290 .map_err(RepositoryError::Executor)?;
291
292 if command.expected_version.is_some() && affected == 0 {
293 return Err(RepositoryError::Runtime(
294 RuntimeError::OptimisticLockConflict {
295 entity: command.entity.clone(),
296 id: format!("{:?}", command.id),
297 },
298 ));
299 }
300
301 Ok(affected)
302 }
303
304 pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
305 let compiled = self
306 .compile_delete(command)
307 .map_err(RepositoryError::Runtime)?;
308 let affected = self
309 .executor
310 .execute(&compiled)
311 .map_err(RepositoryError::Executor)?;
312
313 if command.expected_version.is_some() && affected == 0 {
314 return Err(RepositoryError::Runtime(
315 RuntimeError::OptimisticLockConflict {
316 entity: command.entity.clone(),
317 id: format!("{:?}", command.id),
318 },
319 ));
320 }
321
322 Ok(affected)
323 }
324
325 pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
326 let compiled = self
327 .compile_recover(command)
328 .map_err(RepositoryError::Runtime)?;
329 let affected = self
330 .executor
331 .execute(&compiled)
332 .map_err(RepositoryError::Executor)?;
333
334 if affected == 0 {
335 return Err(RepositoryError::Runtime(
336 RuntimeError::OptimisticLockConflict {
337 entity: command.entity.clone(),
338 id: format!("{:?}", command.id),
339 },
340 ));
341 }
342
343 Ok(affected)
344 }
345
346 pub fn insert_many(
347 &self,
348 commands: &[InsertCommand],
349 ) -> Result<u64, RepositoryError<E::Error>> {
350 let mut total = 0;
351 for command in commands {
352 total += self.insert(command)?;
353 }
354 Ok(total)
355 }
356
357 pub fn update_many(
358 &self,
359 commands: &[UpdateCommand],
360 ) -> Result<u64, RepositoryError<E::Error>> {
361 let mut total = 0;
362 for command in commands {
363 total += self.update(command)?;
364 }
365 Ok(total)
366 }
367
368 pub fn delete_many(
369 &self,
370 commands: &[DeleteCommand],
371 ) -> Result<u64, RepositoryError<E::Error>> {
372 let mut total = 0;
373 for command in commands {
374 total += self.delete(command)?;
375 }
376 Ok(total)
377 }
378
379 pub fn recover_many(
380 &self,
381 commands: &[RecoverCommand],
382 ) -> Result<u64, RepositoryError<E::Error>> {
383 let mut total = 0;
384 for command in commands {
385 total += self.recover(command)?;
386 }
387 Ok(total)
388 }
389}
390
391impl UserContext {
392 pub fn repository<D, E>(&self) -> Result<ContextRepository<'_, D, E>, ContextError>
393 where
394 D: SqlDialect + Send + Sync + 'static,
395 E: QueryExecutor + Send + Sync + 'static,
396 {
397 if self.metadata.is_none() {
398 return Err(ContextError::MissingResource("metadata".to_owned()));
399 }
400
401 let dialect = self.require_resource::<D>()?;
402 let executor = self.require_resource::<E>()?;
403 Ok(ContextRepository {
404 metadata: UserContextMetadata { context: self },
405 dialect,
406 executor,
407 })
408 }
409
410 pub fn resolve_repository<D, E>(
411 &self,
412 entity: impl Into<String>,
413 ) -> Result<ResolvedRepository<'_, D, E>, ContextError>
414 where
415 D: SqlDialect + Send + Sync + 'static,
416 E: QueryExecutor + Send + Sync + 'static,
417 {
418 let entity = entity.into();
419 if !self.has_repository(&entity) {
420 return Err(ContextError::MissingRepository(entity));
421 }
422 Ok(ResolvedRepository {
423 entity,
424 repository: self.repository::<D, E>()?,
425 })
426 }
427
428 pub fn plan_for_save_graph<D, E>(
429 &self,
430 node: GraphNode,
431 ) -> Result<GraphMutationPlan, RepositoryError<E::Error>>
432 where
433 D: SqlDialect + Send + Sync + 'static,
434 E: QueryExecutor + Send + Sync + 'static,
435 {
436 let repository = self
437 .resolve_repository::<D, E>(node.entity.clone())
438 .map_err(|err| RepositoryError::Runtime(RuntimeError::Graph(err.to_string())))?;
439 repository.plan_graph(node)
440 }
441}
442
443impl<'a, D, E> ContextRepository<'a, D, E>
444where
445 D: SqlDialect,
446 E: QueryExecutor,
447{
448 fn repository(&self) -> Repository<'_, D, UserContextMetadata<'_>, E> {
449 Repository::new(self.dialect, &self.metadata, self.executor)
450 }
451
452 pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
453 self.repository().compile(query)
454 }
455
456 pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
457 let compiled = self.compile(query).map_err(RepositoryError::Runtime)?;
458 self.log_sql(SqlLogOperation::Select, &compiled);
459 self.executor
460 .fetch_all(&compiled)
461 .map_err(RepositoryError::Executor)
462 }
463
464 pub fn fetch_smart_list(
465 &self,
466 query: &SelectQuery,
467 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
468 self.repository().fetch_smart_list(query)
469 }
470
471 pub fn fetch_entities<T>(
472 &self,
473 query: &SelectQuery,
474 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
475 where
476 T: Entity,
477 {
478 self.repository().fetch_entities(query)
479 }
480
481 pub fn fetch_enhanced_entities<T>(
482 &self,
483 query: &SelectQuery,
484 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
485 where
486 T: Entity,
487 {
488 self.repository().fetch_enhanced_entities(query)
489 }
490
491 pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
492 let compiled = self
493 .repository()
494 .compile_insert(command)
495 .map_err(RepositoryError::Runtime)?;
496 self.log_sql(SqlLogOperation::Insert, &compiled);
497 let affected = self
498 .executor
499 .execute(&compiled)
500 .map_err(RepositoryError::Executor)?;
501 self.invalidate_aggregation_cache_for(&command.entity);
502 Ok(affected)
503 }
504
505 pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
506 let affected = self.execute_mutation(
507 SqlLogOperation::Update,
508 &command.entity,
509 self.repository()
510 .compile_update(command)
511 .map_err(RepositoryError::Runtime)?,
512 )?;
513 if command.expected_version.is_some() && affected == 0 {
514 return Err(RepositoryError::Runtime(
515 RuntimeError::OptimisticLockConflict {
516 entity: command.entity.clone(),
517 id: format!("{:?}", command.id),
518 },
519 ));
520 }
521 Ok(affected)
522 }
523
524 pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
525 let affected = self.execute_mutation(
526 SqlLogOperation::Delete,
527 &command.entity,
528 self.repository()
529 .compile_delete(command)
530 .map_err(RepositoryError::Runtime)?,
531 )?;
532 if command.expected_version.is_some() && affected == 0 {
533 return Err(RepositoryError::Runtime(
534 RuntimeError::OptimisticLockConflict {
535 entity: command.entity.clone(),
536 id: format!("{:?}", command.id),
537 },
538 ));
539 }
540 Ok(affected)
541 }
542
543 pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
544 let affected = self.execute_mutation(
545 SqlLogOperation::Recover,
546 &command.entity,
547 self.repository()
548 .compile_recover(command)
549 .map_err(RepositoryError::Runtime)?,
550 )?;
551 if affected == 0 {
552 return Err(RepositoryError::Runtime(
553 RuntimeError::OptimisticLockConflict {
554 entity: command.entity.clone(),
555 id: format!("{:?}", command.id),
556 },
557 ));
558 }
559 Ok(affected)
560 }
561
562 fn execute_mutation(
563 &self,
564 operation: SqlLogOperation,
565 entity: &str,
566 compiled: CompiledQuery,
567 ) -> Result<u64, RepositoryError<E::Error>> {
568 self.log_sql(operation, &compiled);
569 let affected = self
570 .executor
571 .execute(&compiled)
572 .map_err(RepositoryError::Executor)?;
573 self.invalidate_aggregation_cache_for(entity);
574 Ok(affected)
575 }
576
577 fn log_sql(&self, operation: SqlLogOperation, compiled: &CompiledQuery) {
578 self.metadata
579 .context
580 .record_sql_log(operation, compiled, self.dialect.kind());
581 }
582
583 fn invalidate_aggregation_cache_for(&self, entity: &str) {
584 if let Some(cache) = self
585 .metadata
586 .context
587 .get_resource::<Arc<dyn AggregationCacheBackend>>()
588 {
589 invalidate_aggregation_cache_namespace(cache.as_ref(), entity);
590 }
591 if let Some(cache) = self
592 .metadata
593 .context
594 .get_resource::<InMemoryAggregationCache>()
595 {
596 invalidate_aggregation_cache_namespace(cache, entity);
597 }
598 }
599}
600
601impl<'a, D, E> ResolvedRepository<'a, D, E>
602where
603 D: SqlDialect,
604 E: QueryExecutor,
605{
606 fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
607 self.repository.metadata.context.repository_behavior(entity)
608 }
609
610 fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
611 self.repository
612 .metadata
613 .context
614 .repository_behavior(&self.entity)
615 }
616
617 pub fn entity(&self) -> &str {
618 &self.entity
619 }
620
621 pub fn select(&self) -> SelectQuery {
622 SelectQuery::new(self.entity.clone())
623 }
624
625 pub fn insert_command(&self) -> InsertCommand {
626 InsertCommand::new(self.entity.clone())
627 }
628
629 pub fn prepare_insert_command(
630 &self,
631 command: &InsertCommand,
632 ) -> Result<InsertCommand, RuntimeError> {
633 let mut command = command.clone();
634 if let Some(behavior) = self.behavior() {
635 behavior.before_insert(self.repository.metadata.context, &mut command)?;
636 }
637
638 let entity = self
639 .repository
640 .metadata
641 .context
642 .require_entity(&command.entity)?;
643 if let Some(id_property) = entity.id_property() {
644 let needs_id = !command.values.contains_key(&id_property.name)
645 || matches!(command.values.get(&id_property.name), Some(Value::Null));
646 if needs_id {
647 let id = self.repository.metadata.context.next_id(&command.entity)?;
648 command
649 .values
650 .insert(id_property.name.clone(), Value::U64(id));
651 }
652 }
653 mark_record_status(&mut command.values, CheckObjectStatus::Create);
654 let check_result = self
655 .repository
656 .metadata
657 .context
658 .check_and_fix_record(&command.entity, &mut command.values);
659 clear_record_status(&mut command.values);
660 check_result?;
661
662 Ok(command)
663 }
664
665 pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
666 UpdateCommand::new(self.entity.clone(), id)
667 }
668
669 pub fn prepare_update_command(
670 &self,
671 command: &UpdateCommand,
672 ) -> Result<UpdateCommand, RuntimeError> {
673 let mut command = command.clone();
674 if let Some(behavior) = self.behavior() {
675 behavior.before_update(self.repository.metadata.context, &mut command)?;
676 }
677 mark_record_status(&mut command.values, CheckObjectStatus::Update);
678 let check_result = self
679 .repository
680 .metadata
681 .context
682 .check_and_fix_record(&command.entity, &mut command.values);
683 clear_record_status(&mut command.values);
684 check_result?;
685 Ok(command)
686 }
687
688 pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
689 DeleteCommand::new(self.entity.clone(), id)
690 }
691
692 pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
693 RecoverCommand::new(self.entity.clone(), id, expected_version)
694 }
695
696 pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
697 let mut query = query.clone();
698 if let Some(behavior) = self.query_behavior(&query.entity) {
699 behavior.before_select(self.repository.metadata.context, &mut query)?;
700 }
701 self.repository.compile(&query)
702 }
703
704 pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
705 let mut query = query.clone();
706 if let Some(behavior) = self.query_behavior(&query.entity) {
707 behavior
708 .before_select(self.repository.metadata.context, &mut query)
709 .map_err(RepositoryError::Runtime)?;
710 }
711 let mut rows = self.fetch_prepared_query(&query)?;
712 self.enhance_object_group_bys(&mut rows, &query.object_group_bys)?;
713 self.enhance_child_queries(&mut rows, &query.child_enhancements)?;
714 Ok(rows)
715 }
716
717 fn fetch_prepared_query(
718 &self,
719 query: &SelectQuery,
720 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
721 let compiled = self
722 .repository
723 .compile(query)
724 .map_err(RepositoryError::Runtime)?;
725 if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
726 if let Some(cache) = self
727 .repository
728 .metadata
729 .context
730 .get_resource::<Arc<dyn AggregationCacheBackend>>()
731 {
732 return self.fetch_prepared_query_with_cache(
733 query,
734 &compiled,
735 options,
736 cache.as_ref(),
737 );
738 }
739 if let Some(cache) = self
740 .repository
741 .metadata
742 .context
743 .get_resource::<InMemoryAggregationCache>()
744 {
745 return self.fetch_prepared_query_with_cache(query, &compiled, options, cache);
746 }
747 }
748 self.repository.log_sql(SqlLogOperation::Select, &compiled);
749 self.repository
750 .executor
751 .fetch_all(&compiled)
752 .map_err(RepositoryError::Executor)
753 }
754
755 fn fetch_prepared_query_with_cache(
756 &self,
757 query: &SelectQuery,
758 compiled: &CompiledQuery,
759 options: AggregationCacheOptions,
760 cache: &dyn AggregationCacheBackend,
761 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
762 let key = aggregation_cache_key(
763 cache.namespace(),
764 &aggregation_cache_namespace(&query.entity),
765 compiled,
766 );
767 if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
768 return Ok(rows);
769 }
770 self.repository.log_sql(SqlLogOperation::Select, compiled);
771 let rows = self
772 .repository
773 .executor
774 .fetch_all(compiled)
775 .map_err(RepositoryError::Executor)?;
776 cache.put(key, rows.clone());
777 Ok(rows)
778 }
779
780 pub fn fetch_all_with_relation_aggregates(
781 &self,
782 query: &SelectQuery,
783 relation_aggregates: &[RelationAggregate],
784 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
785 let mut rows = self.fetch_all(query)?;
786 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
787 Ok(rows)
788 }
789
790 pub fn fetch_smart_list(
791 &self,
792 query: &SelectQuery,
793 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
794 let mut query = query.clone();
795 if let Some(behavior) = self.query_behavior(&query.entity) {
796 behavior
797 .before_select(self.repository.metadata.context, &mut query)
798 .map_err(RepositoryError::Runtime)?;
799 }
800 self.repository.fetch_smart_list(&query)
801 }
802
803 pub fn fetch_smart_list_with_relation_aggregates(
804 &self,
805 query: &SelectQuery,
806 relation_aggregates: &[RelationAggregate],
807 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
808 self.fetch_all_with_relation_aggregates(query, relation_aggregates)
809 .map(SmartList::from)
810 }
811
812 pub fn fetch_entities<T>(
813 &self,
814 query: &SelectQuery,
815 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
816 where
817 T: Entity,
818 {
819 let mut query = query.clone();
820 if let Some(behavior) = self.query_behavior(&query.entity) {
821 behavior
822 .before_select(self.repository.metadata.context, &mut query)
823 .map_err(RepositoryError::Runtime)?;
824 }
825 self.repository.fetch_entities(&query)
826 }
827
828 pub fn fetch_entities_with_relation_aggregates<T>(
829 &self,
830 query: &SelectQuery,
831 relation_aggregates: &[RelationAggregate],
832 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
833 where
834 T: Entity,
835 {
836 self.fetch_all_with_relation_aggregates(query, relation_aggregates)?
837 .into_iter()
838 .map(T::from_record)
839 .collect::<Result<Vec<_>, _>>()
840 .map(SmartList::from)
841 .map_err(RepositoryError::Entity)
842 }
843
844 pub fn fetch_enhanced_entities_with_relation_aggregates<T>(
845 &self,
846 query: &SelectQuery,
847 relation_aggregates: &[RelationAggregate],
848 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
849 where
850 T: Entity,
851 {
852 let mut query = query.clone();
853 if let Some(behavior) = self.query_behavior(&query.entity) {
854 behavior
855 .before_select(self.repository.metadata.context, &mut query)
856 .map_err(RepositoryError::Runtime)?;
857 }
858
859 let mut rows = self.repository.fetch_all(&query)?;
860 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
861 self.enhance_query_relations(&mut rows, &query)?;
862 self.enhance_relations(&mut rows)?;
863 rows.into_iter()
864 .map(T::from_record)
865 .collect::<Result<Vec<_>, _>>()
866 .map(SmartList::from)
867 .map_err(RepositoryError::Entity)
868 }
869
870 pub fn fetch_enhanced_entities<T>(
871 &self,
872 query: &SelectQuery,
873 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
874 where
875 T: Entity,
876 {
877 let mut query = query.clone();
878 if let Some(behavior) = self.query_behavior(&query.entity) {
879 behavior
880 .before_select(self.repository.metadata.context, &mut query)
881 .map_err(RepositoryError::Runtime)?;
882 }
883
884 let mut rows = self.repository.fetch_all(&query)?;
885 self.enhance_query_relations(&mut rows, &query)?;
886 self.enhance_relations(&mut rows)?;
887 rows.into_iter()
888 .map(T::from_record)
889 .collect::<Result<Vec<_>, _>>()
890 .map(SmartList::from)
891 .map_err(RepositoryError::Entity)
892 }
893
894 pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
895 let command = self
896 .prepare_insert_command(command)
897 .map_err(RepositoryError::Runtime)?;
898 self.execute_prepared_insert(command)
899 }
900
901 pub fn save_graph(&self, node: GraphNode) -> Result<GraphNode, RepositoryError<E::Error>> {
902 if node.entity != self.entity {
903 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
904 "resolved repository {} cannot save graph root {}",
905 self.entity, node.entity
906 ))));
907 }
908 let boundary = self
909 .repository
910 .executor
911 .begin_transaction()
912 .map_err(RepositoryError::Executor)?;
913 if matches!(boundary, GraphTransactionBoundary::Unsupported) {
914 return Err(RepositoryError::Runtime(RuntimeError::Graph(
915 "save_graph requires a transactional executor".to_owned(),
916 )));
917 }
918 let result = self
919 .plan_graph(node)
920 .and_then(|plan| self.execute_graph_plan(plan));
921 match result {
922 Ok(saved) => {
923 if matches!(boundary, GraphTransactionBoundary::Started) {
924 self.repository
925 .executor
926 .commit_transaction()
927 .map_err(RepositoryError::Executor)?;
928 }
929 Ok(saved)
930 }
931 Err(err) => {
932 if !matches!(boundary, GraphTransactionBoundary::Unsupported) {
933 self.repository
934 .executor
935 .rollback_transaction()
936 .map_err(RepositoryError::Executor)?;
937 }
938 Err(err)
939 }
940 }
941 }
942
943 pub fn save_entity_graph<T>(&self, entity: T) -> Result<GraphNode, RepositoryError<E::Error>>
944 where
945 T: Entity,
946 {
947 let node = self
948 .graph_node_from_entity(entity)
949 .map_err(RepositoryError::Runtime)?;
950 self.save_graph(node)
951 }
952
953 pub fn plan_graph(
954 &self,
955 node: GraphNode,
956 ) -> Result<GraphMutationPlan, RepositoryError<E::Error>> {
957 if node.entity != self.entity {
958 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
959 "resolved repository {} cannot plan graph root {}",
960 self.entity, node.entity
961 ))));
962 }
963 let mut node = node;
964 let mut plan = GraphMutationPlan::default();
965 self.collect_graph_plan(&mut node, &mut plan)?;
966 plan.planned_root = Some(node);
967 plan.rebuild_batches();
968 Ok(plan)
969 }
970
971 pub fn execute_graph_plan(
972 &self,
973 plan: GraphMutationPlan,
974 ) -> Result<GraphNode, RepositoryError<E::Error>> {
975 let Some(root) = plan.planned_root else {
976 return Err(RepositoryError::Runtime(RuntimeError::Graph(
977 "graph mutation plan has no planned root".to_owned(),
978 )));
979 };
980 self.upsert_graph_node(root)
981 }
982
983 pub fn graph_node_from_entity<T>(&self, entity: T) -> Result<GraphNode, RuntimeError>
984 where
985 T: Entity,
986 {
987 let descriptor = T::entity_descriptor();
988 if descriptor.name != self.entity {
989 return Err(RuntimeError::Graph(format!(
990 "resolved repository {} cannot extract graph root {}",
991 self.entity, descriptor.name
992 )));
993 }
994 self.graph_node_from_record(&descriptor.name, entity.into_record())
995 }
996
997 fn collect_graph_plan(
998 &self,
999 node: &mut GraphNode,
1000 plan: &mut GraphMutationPlan,
1001 ) -> Result<(), RepositoryError<E::Error>> {
1002 match node.operation {
1003 GraphOperation::Reference => {
1004 plan.push(
1005 node.entity.clone(),
1006 GraphMutationKind::Reference,
1007 node.values.clone(),
1008 Vec::new(),
1009 );
1010 return Ok(());
1011 }
1012 GraphOperation::Remove => {
1013 plan.push(
1014 node.entity.clone(),
1015 GraphMutationKind::Delete,
1016 node.values.clone(),
1017 Vec::new(),
1018 );
1019 return Ok(());
1020 }
1021 GraphOperation::Upsert => {}
1022 }
1023
1024 let descriptor = self
1025 .repository
1026 .metadata
1027 .context
1028 .require_entity(&node.entity)
1029 .map_err(RepositoryError::Runtime)?;
1030 let id_property = descriptor.id_property().cloned();
1031 let id = id_property.as_ref().and_then(|property| {
1032 node.values
1033 .get(&property.name)
1034 .filter(|value| !matches!(value, Value::Null))
1035 .cloned()
1036 });
1037 let is_update = match (id_property.as_ref(), id.as_ref()) {
1038 (Some(id_property), Some(id)) => self
1039 .fetch_graph_current_row(&node.entity, &id_property.name, id)?
1040 .is_some(),
1041 _ => false,
1042 };
1043 if !is_update {
1044 if let Some(id_property) = id_property.as_ref() {
1045 let needs_id = !node.values.contains_key(&id_property.name)
1046 || matches!(node.values.get(&id_property.name), Some(Value::Null));
1047 if needs_id {
1048 let id = self
1049 .repository
1050 .metadata
1051 .context
1052 .next_id(&node.entity)
1053 .map_err(RepositoryError::Runtime)?;
1054 node.values.insert(id_property.name.clone(), Value::U64(id));
1055 }
1056 }
1057 }
1058 let update_fields = if is_update {
1059 let mut excluded = Vec::new();
1060 if let Some(id_property) = id_property.as_ref() {
1061 excluded.push(id_property.name.clone());
1062 }
1063 if let Some(version_property) = descriptor.version_property() {
1064 excluded.push(version_property.name.clone());
1065 }
1066 sorted_update_fields(&node.values, excluded)
1067 } else {
1068 Vec::new()
1069 };
1070 plan.push(
1071 node.entity.clone(),
1072 if is_update {
1073 GraphMutationKind::Update
1074 } else {
1075 GraphMutationKind::Create
1076 },
1077 node.values.clone(),
1078 update_fields,
1079 );
1080
1081 for (name, children) in &mut node.relations {
1082 let relation = descriptor.relation_by_name(name).ok_or_else(|| {
1083 RepositoryError::Runtime(RuntimeError::MissingRelation {
1084 entity: node.entity.clone(),
1085 relation: name.clone(),
1086 })
1087 })?;
1088 let child_repo = self.scoped_repository(relation.target_entity.clone());
1089 for child in children {
1090 ensure_relation_target(&node.entity, name, &relation.target_entity, child)?;
1091 child_repo.collect_graph_plan(child, plan)?;
1092 }
1093 }
1094 Ok(())
1095 }
1096
1097 pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
1098 let command = self
1099 .prepare_update_command(command)
1100 .map_err(RepositoryError::Runtime)?;
1101 self.execute_prepared_update(command)
1102 }
1103
1104 pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
1105 let mut command = command.clone();
1106 if let Some(behavior) = self.behavior() {
1107 behavior
1108 .before_delete(self.repository.metadata.context, &mut command)
1109 .map_err(RepositoryError::Runtime)?;
1110 }
1111 let affected = self.repository.delete(&command)?;
1112 self.emit_event(EntityEvent::deleted(
1113 command.entity,
1114 command.id,
1115 command.expected_version,
1116 ))
1117 .map_err(RepositoryError::Runtime)?;
1118 Ok(affected)
1119 }
1120
1121 pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
1122 let mut command = command.clone();
1123 if let Some(behavior) = self.behavior() {
1124 behavior
1125 .before_recover(self.repository.metadata.context, &mut command)
1126 .map_err(RepositoryError::Runtime)?;
1127 }
1128 let affected = self.repository.recover(&command)?;
1129 self.emit_event(EntityEvent::recovered(
1130 command.entity,
1131 command.id,
1132 command.expected_version,
1133 ))
1134 .map_err(RepositoryError::Runtime)?;
1135 Ok(affected)
1136 }
1137
1138 fn emit_event(&self, event: EntityEvent) -> Result<(), RuntimeError> {
1139 self.repository.metadata.context.send_event(event)
1140 }
1141
1142 fn execute_prepared_insert(
1143 &self,
1144 command: InsertCommand,
1145 ) -> Result<u64, RepositoryError<E::Error>> {
1146 let affected = self.repository.insert(&command)?;
1147 self.emit_event(EntityEvent::created(command.entity, command.values))
1148 .map_err(RepositoryError::Runtime)?;
1149 Ok(affected)
1150 }
1151
1152 fn execute_prepared_update(
1153 &self,
1154 command: UpdateCommand,
1155 ) -> Result<u64, RepositoryError<E::Error>> {
1156 let affected = self.repository.update(&command)?;
1157 let updated_fields = command.values.keys().cloned().collect();
1158 let mut values = command.values.clone();
1159 values.insert("id".to_owned(), command.id.clone());
1160 if let Some(version) = command.expected_version {
1161 values.insert("version".to_owned(), Value::I64(version + 1));
1162 }
1163 self.emit_event(EntityEvent {
1164 kind: crate::EntityEventKind::Updated,
1165 entity: command.entity,
1166 values,
1167 updated_fields,
1168 })
1169 .map_err(RepositoryError::Runtime)?;
1170 Ok(affected)
1171 }
1172
1173 fn insert_graph_node(
1174 &self,
1175 mut node: GraphNode,
1176 ) -> Result<GraphNode, RepositoryError<E::Error>> {
1177 match node.operation {
1178 GraphOperation::Upsert => {}
1179 GraphOperation::Reference => return self.validate_reference_node(node),
1180 GraphOperation::Remove => {
1181 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
1182 "create graph cannot remove node {}",
1183 node.entity
1184 ))));
1185 }
1186 }
1187
1188 let descriptor = self
1189 .repository
1190 .metadata
1191 .context
1192 .require_entity(&node.entity)
1193 .map_err(RepositoryError::Runtime)?;
1194
1195 let mut one_relations = Vec::new();
1196 let mut many_relations = Vec::new();
1197 for (name, children) in std::mem::take(&mut node.relations) {
1198 let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
1199 RepositoryError::Runtime(RuntimeError::MissingRelation {
1200 entity: node.entity.clone(),
1201 relation: name.clone(),
1202 })
1203 })?;
1204 if relation.many {
1205 many_relations.push((name, relation.clone(), children));
1206 } else {
1207 one_relations.push((name, relation.clone(), children));
1208 }
1209 }
1210
1211 for (name, relation, children) in one_relations {
1212 if children.len() > 1 {
1213 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
1214 "relation {}.{} expects one child, got {}",
1215 node.entity,
1216 name,
1217 children.len()
1218 ))));
1219 }
1220 let mut saved_children = Vec::new();
1221 for child in children {
1222 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
1223 let child_repo = self.scoped_repository(child.entity.clone());
1224 let saved_child = child_repo.insert_graph_node(child)?;
1225 if relation.attach {
1226 let foreign_value = saved_child
1227 .values
1228 .get(&relation.foreign_key)
1229 .cloned()
1230 .ok_or_else(|| {
1231 RepositoryError::Runtime(RuntimeError::Graph(format!(
1232 "saved child {} missing foreign key {} for relation {}.{}",
1233 relation.target_entity, relation.foreign_key, node.entity, name
1234 )))
1235 })?;
1236 node.values
1237 .insert(relation.local_key.clone(), foreign_value);
1238 }
1239 saved_children.push(saved_child);
1240 }
1241 node.relations.insert(name, saved_children);
1242 }
1243
1244 let command = self
1245 .prepare_insert_command(&InsertCommand {
1246 entity: node.entity.clone(),
1247 values: node.values.clone(),
1248 })
1249 .map_err(RepositoryError::Runtime)?;
1250 self.execute_prepared_insert(command.clone())?;
1251 node.values = command.values;
1252
1253 for (name, relation, children) in many_relations {
1254 let local_value = node
1255 .values
1256 .get(&relation.local_key)
1257 .cloned()
1258 .ok_or_else(|| {
1259 RepositoryError::Runtime(RuntimeError::Graph(format!(
1260 "parent {} missing local key {} for relation {}",
1261 node.entity, relation.local_key, name
1262 )))
1263 })?;
1264 let mut saved_children = Vec::new();
1265 for mut child in children {
1266 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
1267 if relation.attach {
1268 child
1269 .values
1270 .insert(relation.foreign_key.clone(), local_value.clone());
1271 }
1272 let child_repo = self.scoped_repository(child.entity.clone());
1273 saved_children.push(child_repo.insert_graph_node(child)?);
1274 }
1275 node.relations.insert(name, saved_children);
1276 }
1277
1278 Ok(node)
1279 }
1280
1281 fn upsert_graph_node(
1282 &self,
1283 mut node: GraphNode,
1284 ) -> Result<GraphNode, RepositoryError<E::Error>> {
1285 match node.operation {
1286 GraphOperation::Upsert => {}
1287 GraphOperation::Reference => return self.validate_reference_node(node),
1288 GraphOperation::Remove => {
1289 self.validate_remove_node(&node)?;
1290 self.delete_graph_node(&node)?;
1291 return Ok(node);
1292 }
1293 }
1294
1295 let descriptor = self
1296 .repository
1297 .metadata
1298 .context
1299 .require_entity(&node.entity)
1300 .map_err(RepositoryError::Runtime)?;
1301 let Some(id_property) = descriptor.id_property() else {
1302 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
1303 "entity {} has no id property for graph upsert",
1304 node.entity
1305 ))));
1306 };
1307 let Some(id) = node
1308 .values
1309 .get(&id_property.name)
1310 .filter(|value| !matches!(value, Value::Null))
1311 .cloned()
1312 else {
1313 return self.insert_graph_node(node);
1314 };
1315
1316 if self
1317 .fetch_graph_current_row(&node.entity, &id_property.name, &id)?
1318 .is_none()
1319 {
1320 return self.insert_graph_node(node);
1321 }
1322
1323 let mut one_relations = Vec::new();
1324 let mut many_relations = Vec::new();
1325 for (name, children) in std::mem::take(&mut node.relations) {
1326 let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
1327 RepositoryError::Runtime(RuntimeError::MissingRelation {
1328 entity: node.entity.clone(),
1329 relation: name.clone(),
1330 })
1331 })?;
1332 if relation.many {
1333 many_relations.push((name, relation.clone(), children));
1334 } else {
1335 one_relations.push((name, relation.clone(), children));
1336 }
1337 }
1338
1339 for (name, relation, children) in one_relations {
1340 if children.len() > 1 {
1341 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
1342 "relation {}.{} expects one child, got {}",
1343 node.entity,
1344 name,
1345 children.len()
1346 ))));
1347 }
1348 let mut saved_children = Vec::new();
1349 for child in children {
1350 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
1351 let child_repo = self.scoped_repository(child.entity.clone());
1352 let saved_child = child_repo.upsert_graph_node(child)?;
1353 if relation.attach {
1354 let foreign_value = saved_child
1355 .values
1356 .get(&relation.foreign_key)
1357 .cloned()
1358 .ok_or_else(|| {
1359 RepositoryError::Runtime(RuntimeError::Graph(format!(
1360 "saved child {} missing foreign key {} for relation {}.{}",
1361 relation.target_entity, relation.foreign_key, node.entity, name
1362 )))
1363 })?;
1364 node.values
1365 .insert(relation.local_key.clone(), foreign_value);
1366 }
1367 saved_children.push(saved_child);
1368 }
1369 node.relations.insert(name, saved_children);
1370 }
1371
1372 let update = self.graph_update_command(&node, descriptor, id_property, &id);
1373 if !update.values.is_empty() || update.expected_version.is_some() {
1374 let prepared_update = self
1375 .prepare_update_command(&update)
1376 .map_err(RepositoryError::Runtime)?;
1377 self.execute_prepared_update(prepared_update.clone())?;
1378 for (field, value) in &prepared_update.values {
1379 node.values.insert(field.clone(), value.clone());
1380 }
1381 if let Some(version_property) = descriptor.version_property() {
1382 if let Some(expected_version) = prepared_update.expected_version {
1383 node.values.insert(
1384 version_property.name.clone(),
1385 Value::I64(expected_version + 1),
1386 );
1387 }
1388 }
1389 }
1390
1391 for (name, relation, children) in many_relations {
1392 let local_value = node
1393 .values
1394 .get(&relation.local_key)
1395 .cloned()
1396 .ok_or_else(|| {
1397 RepositoryError::Runtime(RuntimeError::Graph(format!(
1398 "parent {} missing local key {} for relation {}",
1399 node.entity, relation.local_key, name
1400 )))
1401 })?;
1402 let child_repo = self.scoped_repository(relation.target_entity.clone());
1403 let existing_children = child_repo.fetch_graph_children(
1404 &relation.target_entity,
1405 &relation.foreign_key,
1406 &local_value,
1407 )?;
1408 let child_descriptor = self
1409 .repository
1410 .metadata
1411 .context
1412 .require_entity(&relation.target_entity)
1413 .map_err(RepositoryError::Runtime)?;
1414 let child_id_property = child_descriptor.id_property().ok_or_else(|| {
1415 RepositoryError::Runtime(RuntimeError::Graph(format!(
1416 "entity {} has no id property for graph diff",
1417 relation.target_entity
1418 )))
1419 })?;
1420 let mut existing_by_id = BTreeMap::new();
1421 for child in existing_children {
1422 if let Some(id) = child.get(&child_id_property.name) {
1423 existing_by_id.insert(graph_identity_key(id), child);
1424 }
1425 }
1426
1427 let mut seen = std::collections::BTreeSet::new();
1428 let mut saved_children = Vec::new();
1429 for mut child in children {
1430 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
1431 if relation.attach && child.operation != GraphOperation::Reference {
1432 child
1433 .values
1434 .insert(relation.foreign_key.clone(), local_value.clone());
1435 }
1436 if let Some(child_id) = child
1437 .values
1438 .get(&child_id_property.name)
1439 .filter(|value| !matches!(value, Value::Null))
1440 {
1441 let key = graph_identity_key(child_id);
1442 if !seen.insert(key.clone()) {
1443 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
1444 "duplicate child id {key} in relation {}.{}",
1445 node.entity, name
1446 ))));
1447 }
1448 }
1449 saved_children.push(child_repo.upsert_graph_node(child)?);
1450 }
1451
1452 if relation.delete_missing {
1453 for (id_key, existing) in existing_by_id {
1454 if seen.contains(&id_key) {
1455 continue;
1456 }
1457 let Some(existing_id) = existing.get(&child_id_property.name).cloned() else {
1458 continue;
1459 };
1460 let mut delete =
1461 DeleteCommand::new(relation.target_entity.clone(), existing_id);
1462 if let Some(version) = graph_record_version(&existing, child_descriptor) {
1463 delete = delete.expected_version(version);
1464 }
1465 child_repo.delete(&delete)?;
1466 }
1467 }
1468
1469 node.relations.insert(name, saved_children);
1470 }
1471
1472 Ok(node)
1473 }
1474
1475 fn validate_reference_node(
1476 &self,
1477 node: GraphNode,
1478 ) -> Result<GraphNode, RepositoryError<E::Error>> {
1479 if !node.relations.is_empty() {
1480 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
1481 "reference node {} cannot contain child relations",
1482 node.entity
1483 ))));
1484 }
1485 let descriptor = self
1486 .repository
1487 .metadata
1488 .context
1489 .require_entity(&node.entity)
1490 .map_err(RepositoryError::Runtime)?;
1491 let id_property = descriptor.id_property().ok_or_else(|| {
1492 RepositoryError::Runtime(RuntimeError::Graph(format!(
1493 "entity {} has no id property for graph reference",
1494 node.entity
1495 )))
1496 })?;
1497 let id = node
1498 .values
1499 .get(&id_property.name)
1500 .filter(|value| !matches!(value, Value::Null))
1501 .cloned()
1502 .ok_or_else(|| {
1503 RepositoryError::Runtime(RuntimeError::Graph(format!(
1504 "reference node {} missing id property {}",
1505 node.entity, id_property.name
1506 )))
1507 })?;
1508
1509 for field in node.values.keys() {
1510 if field == &id_property.name {
1511 continue;
1512 }
1513 if descriptor
1514 .version_property()
1515 .map(|property| field == &property.name)
1516 .unwrap_or(false)
1517 {
1518 continue;
1519 }
1520 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
1521 "reference node {} cannot carry mutable field {}",
1522 node.entity, field
1523 ))));
1524 }
1525
1526 let current = self
1527 .fetch_graph_current_row(&node.entity, &id_property.name, &id)?
1528 .ok_or_else(|| {
1529 RepositoryError::Runtime(RuntimeError::Graph(format!(
1530 "reference node {}({}) does not exist",
1531 node.entity,
1532 graph_identity_key(&id)
1533 )))
1534 })?;
1535
1536 if let Some(version_property) = descriptor.version_property() {
1537 if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
1538 if *existing_version < 0 {
1539 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
1540 "reference node {}({}) is deleted",
1541 node.entity,
1542 graph_identity_key(&id)
1543 ))));
1544 }
1545 if let Some(Value::I64(expected_version)) = node.values.get(&version_property.name)
1546 {
1547 if expected_version != existing_version {
1548 return Err(RepositoryError::Runtime(
1549 RuntimeError::OptimisticLockConflict {
1550 entity: node.entity,
1551 id: graph_identity_key(&id),
1552 },
1553 ));
1554 }
1555 }
1556 }
1557 }
1558
1559 Ok(GraphNode {
1560 entity: node.entity,
1561 values: current,
1562 relations: BTreeMap::new(),
1563 operation: GraphOperation::Reference,
1564 })
1565 }
1566
1567 fn validate_remove_node(&self, node: &GraphNode) -> Result<(), RepositoryError<E::Error>> {
1568 if !node.relations.is_empty() {
1569 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
1570 "remove node {} cannot contain child relations",
1571 node.entity
1572 ))));
1573 }
1574 let descriptor = self
1575 .repository
1576 .metadata
1577 .context
1578 .require_entity(&node.entity)
1579 .map_err(RepositoryError::Runtime)?;
1580 let id_property = descriptor.id_property().ok_or_else(|| {
1581 RepositoryError::Runtime(RuntimeError::Graph(format!(
1582 "entity {} has no id property for graph remove",
1583 node.entity
1584 )))
1585 })?;
1586 let id = node
1587 .values
1588 .get(&id_property.name)
1589 .filter(|value| !matches!(value, Value::Null))
1590 .cloned()
1591 .ok_or_else(|| {
1592 RepositoryError::Runtime(RuntimeError::Graph(format!(
1593 "remove node {} missing id property {}",
1594 node.entity, id_property.name
1595 )))
1596 })?;
1597 let current = self
1598 .fetch_graph_current_row(&node.entity, &id_property.name, &id)?
1599 .ok_or_else(|| {
1600 RepositoryError::Runtime(RuntimeError::Graph(format!(
1601 "remove node {}({}) does not exist",
1602 node.entity,
1603 graph_identity_key(&id)
1604 )))
1605 })?;
1606 if let Some(version_property) = descriptor.version_property() {
1607 if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
1608 if *existing_version < 0 {
1609 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
1610 "remove node {}({}) is already deleted",
1611 node.entity,
1612 graph_identity_key(&id)
1613 ))));
1614 }
1615 }
1616 }
1617 Ok(())
1618 }
1619
1620 fn graph_node_from_record(
1621 &self,
1622 entity: &str,
1623 record: Record,
1624 ) -> Result<GraphNode, RuntimeError> {
1625 let descriptor = self.repository.metadata.context.require_entity(entity)?;
1626 let mut node = GraphNode::new(entity);
1627
1628 for (field, value) in record {
1629 let Some(relation) = descriptor.relation_by_name(&field) else {
1630 node.values.insert(field, value);
1631 continue;
1632 };
1633
1634 match value {
1635 Value::Null => {
1636 node.relations.entry(field).or_default();
1637 }
1638 Value::Object(record) => {
1639 let child = self.graph_node_from_record(&relation.target_entity, record)?;
1640 node.relations.entry(field).or_default().push(child);
1641 }
1642 Value::List(values) => {
1643 let children = node.relations.entry(field.clone()).or_default();
1644 for value in values {
1645 let Value::Object(record) = value else {
1646 return Err(RuntimeError::Graph(format!(
1647 "relation {}.{} expects object children, got {:?}",
1648 entity, field, value
1649 )));
1650 };
1651 children
1652 .push(self.graph_node_from_record(&relation.target_entity, record)?);
1653 }
1654 }
1655 other => {
1656 return Err(RuntimeError::Graph(format!(
1657 "relation {}.{} expects object/list/null, got {:?}",
1658 entity, field, other
1659 )));
1660 }
1661 }
1662 }
1663
1664 Ok(node)
1665 }
1666
1667 fn graph_update_command(
1668 &self,
1669 node: &GraphNode,
1670 descriptor: &EntityDescriptor,
1671 id_property: &PropertyDescriptor,
1672 id: &Value,
1673 ) -> UpdateCommand {
1674 let mut command = UpdateCommand::new(node.entity.clone(), id.clone());
1675 if let Some(version_property) = descriptor.version_property() {
1676 if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
1677 command = command.expected_version(*version);
1678 }
1679 }
1680 for property in descriptor.properties.iter().filter(|property| {
1681 !property.is_id && !property.is_version && node.values.contains_key(&property.name)
1682 }) {
1683 if property.name == id_property.name {
1684 continue;
1685 }
1686 if let Some(value) = node.values.get(&property.name) {
1687 command.values.insert(property.name.clone(), value.clone());
1688 }
1689 }
1690 command
1691 }
1692
1693 fn delete_graph_node(&self, node: &GraphNode) -> Result<u64, RepositoryError<E::Error>> {
1694 let descriptor = self
1695 .repository
1696 .metadata
1697 .context
1698 .require_entity(&node.entity)
1699 .map_err(RepositoryError::Runtime)?;
1700 let id_property = descriptor.id_property().ok_or_else(|| {
1701 RepositoryError::Runtime(RuntimeError::Graph(format!(
1702 "entity {} has no id property for graph remove",
1703 node.entity
1704 )))
1705 })?;
1706 let id = node.values.get(&id_property.name).cloned().ok_or_else(|| {
1707 RepositoryError::Runtime(RuntimeError::Graph(format!(
1708 "remove node {} missing id property {}",
1709 node.entity, id_property.name
1710 )))
1711 })?;
1712 let mut delete = DeleteCommand::new(node.entity.clone(), id);
1713 if let Some(version_property) = descriptor.version_property() {
1714 if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
1715 delete = delete.expected_version(*version);
1716 }
1717 }
1718 self.delete(&delete)
1719 }
1720
1721 fn fetch_graph_current_row(
1722 &self,
1723 entity: &str,
1724 id_property: &str,
1725 id: &Value,
1726 ) -> Result<Option<Record>, RepositoryError<E::Error>> {
1727 let mut rows = self
1728 .scoped_repository(entity.to_owned())
1729 .fetch_all(&SelectQuery::new(entity).filter(Expr::eq(id_property, id.clone())))?;
1730 Ok(rows.pop())
1731 }
1732
1733 fn fetch_graph_children(
1734 &self,
1735 entity: &str,
1736 foreign_key: &str,
1737 parent_value: &Value,
1738 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
1739 self.scoped_repository(entity.to_owned()).fetch_all(
1740 &SelectQuery::new(entity).filter(Expr::eq(foreign_key, parent_value.clone())),
1741 )
1742 }
1743
1744 pub fn relation_loads(&self) -> Vec<String> {
1745 self.behavior()
1746 .map(|behavior| behavior.relation_loads(self.repository.metadata.context))
1747 .unwrap_or_default()
1748 }
1749
1750 pub fn relation_plans(&self) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
1751 self.build_relation_plans(&self.entity, &self.relation_loads())
1752 }
1753
1754 pub fn relation_query(
1755 &self,
1756 relation_name: &str,
1757 parent_rows: &[Record],
1758 ) -> Result<SelectQuery, RuntimeError> {
1759 let plan = self
1760 .relation_plans()?
1761 .into_iter()
1762 .find(|plan| plan.relation_name == relation_name)
1763 .ok_or_else(|| RuntimeError::MissingRelation {
1764 entity: self.entity.clone(),
1765 relation: relation_name.to_owned(),
1766 })?;
1767 Ok(self.query_for_plan(&plan, parent_rows))
1768 }
1769
1770 pub fn enhance_relations(
1771 &self,
1772 parent_rows: &mut [Record],
1773 ) -> Result<(), RepositoryError<E::Error>> {
1774 let plans = self.relation_plans().map_err(RepositoryError::Runtime)?;
1775 for plan in plans {
1776 self.enhance_plan(parent_rows, &plan)?;
1777 }
1778 Ok(())
1779 }
1780
1781 pub fn enhance_query_relations(
1782 &self,
1783 parent_rows: &mut [Record],
1784 query: &SelectQuery,
1785 ) -> Result<(), RepositoryError<E::Error>> {
1786 let plans = self
1787 .build_relation_plans_from_loads(&query.entity, &query.relations)
1788 .map_err(RepositoryError::Runtime)?;
1789 for plan in plans {
1790 self.enhance_plan(parent_rows, &plan)?;
1791 }
1792 Ok(())
1793 }
1794
1795 pub fn enhance_relation_aggregates(
1796 &self,
1797 parent_rows: &mut [Record],
1798 relation_aggregates: &[RelationAggregate],
1799 parent_cache_options: Option<teaql_core::AggregationCacheOptions>,
1800 ) -> Result<(), RepositoryError<E::Error>> {
1801 for aggregate in relation_aggregates {
1802 self.enhance_relation_aggregate(parent_rows, aggregate, parent_cache_options)?;
1803 }
1804 Ok(())
1805 }
1806
1807 pub fn enhance_object_group_bys(
1808 &self,
1809 rows: &mut [Record],
1810 object_group_bys: &[ObjectGroupBy],
1811 ) -> Result<(), RepositoryError<E::Error>> {
1812 for group_by in object_group_bys {
1813 let ids = rows
1814 .iter()
1815 .filter_map(|row| row.get(&group_by.storage_field).cloned())
1816 .collect::<Vec<_>>();
1817 if ids.is_empty() {
1818 continue;
1819 }
1820 let mut query = group_by.query.clone();
1821 ensure_projection(&mut query, "id");
1822 query = query.and_filter(Expr::in_list("id", ids));
1823 let object_rows = self
1824 .scoped_repository(query.entity.clone())
1825 .fetch_all(&query)?
1826 .into_iter()
1827 .filter_map(|row| {
1828 row.get("id")
1829 .cloned()
1830 .map(|id| (relation_bucket_key(&id), row))
1831 })
1832 .collect::<BTreeMap<_, _>>();
1833 for row in rows.iter_mut() {
1834 if let Some(key) = row.get(&group_by.storage_field).map(relation_bucket_key) {
1835 let value = object_rows
1836 .get(&key)
1837 .cloned()
1838 .map(Value::object)
1839 .unwrap_or(Value::Null);
1840 row.insert(group_by.property_name.clone(), value);
1841 }
1842 }
1843 }
1844 Ok(())
1845 }
1846
1847 pub fn enhance_child_queries(
1848 &self,
1849 rows: &mut [Record],
1850 child_queries: &[SelectQuery],
1851 ) -> Result<(), RepositoryError<E::Error>> {
1852 for child_query in child_queries {
1853 let ids = rows
1854 .iter()
1855 .filter_map(|row| row.get("id").cloned())
1856 .collect::<Vec<_>>();
1857 if ids.is_empty() {
1858 continue;
1859 }
1860 let mut query = child_query.clone();
1861 ensure_projection(&mut query, "id");
1862 query = query.and_filter(Expr::in_list("id", ids));
1863 let child_rows = self
1864 .scoped_repository(query.entity.clone())
1865 .fetch_all(&query)?
1866 .into_iter()
1867 .filter_map(|row| {
1868 row.get("id")
1869 .cloned()
1870 .map(|id| (relation_bucket_key(&id), row))
1871 })
1872 .collect::<BTreeMap<_, _>>();
1873 for row in rows.iter_mut() {
1874 if let Some(key) = row.get("id").map(relation_bucket_key) {
1875 if let Some(child) = child_rows.get(&key) {
1876 row.extend(child.clone());
1877 }
1878 }
1879 }
1880 }
1881 Ok(())
1882 }
1883
1884 fn enhance_relation_aggregate(
1885 &self,
1886 parent_rows: &mut [Record],
1887 aggregate: &RelationAggregate,
1888 parent_cache_options: Option<teaql_core::AggregationCacheOptions>,
1889 ) -> Result<(), RepositoryError<E::Error>> {
1890 let plan = self
1891 .build_relation_plans_from_loads(
1892 &self.entity,
1893 &[RelationLoad::with_query(
1894 aggregate.relation_name.clone(),
1895 aggregate.query.clone(),
1896 )],
1897 )
1898 .map_err(RepositoryError::Runtime)?
1899 .into_iter()
1900 .next()
1901 .ok_or_else(|| {
1902 RepositoryError::Runtime(RuntimeError::MissingRelation {
1903 entity: self.entity.clone(),
1904 relation: aggregate.relation_name.clone(),
1905 })
1906 })?;
1907
1908 let ids = parent_rows
1909 .iter()
1910 .filter_map(|row| row.get(&plan.local_key).cloned())
1911 .collect::<Vec<_>>();
1912 if ids.is_empty() {
1913 attach_empty_relation_aggregate(parent_rows, &aggregate.alias, aggregate.single_result);
1914 return Ok(());
1915 }
1916
1917 let child_repo = self.scoped_repository(plan.target_entity.clone());
1918 let mut query = aggregate.query.clone();
1919 query.entity = plan.target_entity.clone();
1920 if query.aggregation_cache.is_none() {
1921 if let Some(options) = parent_cache_options.filter(|options| options.propagate) {
1922 query.aggregation_cache = Some(teaql_core::AggregationCacheOptions::enabled(
1923 options.propagate_cache_expired_millis,
1924 ));
1925 }
1926 }
1927 query.projection.clear();
1928 query.expr_projection.clear();
1929 query.order_by.clear();
1930 query.slice = None;
1931 query.relations.clear();
1932 if query.aggregates.is_empty() {
1933 let alias = if aggregate.single_result {
1934 aggregate.alias.clone()
1935 } else {
1936 "count".to_owned()
1937 };
1938 query = query.aggregate(Aggregate::count(alias));
1939 }
1940 if !query
1941 .group_by
1942 .iter()
1943 .any(|field| field == &plan.foreign_key)
1944 {
1945 query = query.group_by(plan.foreign_key.clone());
1946 }
1947 query = query.and_filter(Expr::in_list(plan.foreign_key.clone(), ids));
1948
1949 let aggregate_rows = child_repo.fetch_all(&query)?;
1950 attach_relation_aggregate_rows(parent_rows, &plan, aggregate, aggregate_rows);
1951 Ok(())
1952 }
1953
1954 fn build_relation_plans(
1955 &self,
1956 entity: &str,
1957 loads: &[String],
1958 ) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
1959 let descriptor = self.repository.metadata.context.require_entity(entity)?;
1960 let mut grouped: BTreeMap<String, Vec<String>> = BTreeMap::new();
1961 for load in loads {
1962 if let Some((head, tail)) = load.split_once('.') {
1963 grouped
1964 .entry(head.to_owned())
1965 .or_default()
1966 .push(tail.to_owned());
1967 } else {
1968 grouped.entry(load.clone()).or_default();
1969 }
1970 }
1971
1972 grouped
1973 .into_iter()
1974 .map(|(name, child_loads)| {
1975 let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
1976 RuntimeError::MissingRelation {
1977 entity: entity.to_owned(),
1978 relation: name.clone(),
1979 }
1980 })?;
1981 let child_repo = self.scoped_repository(relation.target_entity.clone());
1982 let children =
1983 child_repo.build_relation_plans(&relation.target_entity, &child_loads)?;
1984 Ok(RelationLoadPlan {
1985 parent_entity: entity.to_owned(),
1986 relation_name: relation.name.clone(),
1987 path: relation.name.clone(),
1988 target_entity: relation.target_entity.clone(),
1989 local_key: relation.local_key.clone(),
1990 foreign_key: relation.foreign_key.clone(),
1991 many: relation.many,
1992 query: None,
1993 children,
1994 })
1995 })
1996 .collect()
1997 }
1998
1999 fn build_relation_plans_from_loads(
2000 &self,
2001 entity: &str,
2002 loads: &[RelationLoad],
2003 ) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
2004 let descriptor = self.repository.metadata.context.require_entity(entity)?;
2005 loads
2006 .iter()
2007 .map(|load| {
2008 let relation = descriptor.relation_by_name(&load.name).ok_or_else(|| {
2009 RuntimeError::MissingRelation {
2010 entity: entity.to_owned(),
2011 relation: load.name.clone(),
2012 }
2013 })?;
2014 let relation_query = load.query.as_deref().cloned();
2015 let child_loads = relation_query
2016 .as_ref()
2017 .map(|query| query.relations.as_slice())
2018 .unwrap_or_default();
2019 let child_repo = self.scoped_repository(relation.target_entity.clone());
2020 let children = child_repo
2021 .build_relation_plans_from_loads(&relation.target_entity, child_loads)?;
2022 Ok(RelationLoadPlan {
2023 parent_entity: entity.to_owned(),
2024 relation_name: relation.name.clone(),
2025 path: relation.name.clone(),
2026 target_entity: relation.target_entity.clone(),
2027 local_key: relation.local_key.clone(),
2028 foreign_key: relation.foreign_key.clone(),
2029 many: relation.many,
2030 query: relation_query,
2031 children,
2032 })
2033 })
2034 .collect()
2035 }
2036
2037 fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, D, E> {
2038 ResolvedRepository {
2039 entity,
2040 repository: ContextRepository {
2041 metadata: UserContextMetadata {
2042 context: self.repository.metadata.context,
2043 },
2044 dialect: self.repository.dialect,
2045 executor: self.repository.executor,
2046 },
2047 }
2048 }
2049
2050 fn enhance_plan(
2051 &self,
2052 parent_rows: &mut [Record],
2053 plan: &RelationLoadPlan,
2054 ) -> Result<(), RepositoryError<E::Error>> {
2055 let child_repo = self.scoped_repository(plan.target_entity.clone());
2056 let query = self.query_for_plan(plan, parent_rows);
2057 let child_rows = child_repo.fetch_all(&query)?;
2058 self.attach_relation_rows(parent_rows, plan, child_rows);
2059
2060 if !plan.children.is_empty() {
2061 for parent in parent_rows.iter_mut() {
2062 match parent.get_mut(&plan.relation_name) {
2063 Some(Value::Object(child)) => {
2064 child_repo.enhance_child_record(child, &plan.children)?;
2065 }
2066 Some(Value::List(values)) => {
2067 for value in values.iter_mut() {
2068 if let Value::Object(child) = value {
2069 child_repo.enhance_child_record(child, &plan.children)?;
2070 }
2071 }
2072 }
2073 _ => {}
2074 }
2075 }
2076 }
2077 Ok(())
2078 }
2079
2080 fn enhance_child_record(
2081 &self,
2082 child: &mut Record,
2083 plans: &[RelationLoadPlan],
2084 ) -> Result<(), RepositoryError<E::Error>> {
2085 for plan in plans {
2086 self.enhance_plan(slice::from_mut(child), plan)?;
2087 }
2088 Ok(())
2089 }
2090
2091 fn query_for_plan(&self, plan: &RelationLoadPlan, parent_rows: &[Record]) -> SelectQuery {
2092 let ids = parent_rows
2093 .iter()
2094 .filter_map(|row| row.get(&plan.local_key).cloned())
2095 .collect::<Vec<_>>();
2096
2097 let mut query = plan
2098 .query
2099 .clone()
2100 .unwrap_or_else(|| SelectQuery::new(plan.target_entity.clone()));
2101 query.entity = plan.target_entity.clone();
2102 ensure_projection(&mut query, &plan.foreign_key);
2103 for child in &plan.children {
2104 ensure_projection(&mut query, &child.local_key);
2105 }
2106 if !ids.is_empty() {
2107 query = query.and_filter(Expr::in_list(plan.foreign_key.clone(), ids));
2108 }
2109 query
2110 }
2111
2112 fn attach_relation_rows(
2113 &self,
2114 parent_rows: &mut [Record],
2115 plan: &RelationLoadPlan,
2116 child_rows: Vec<Record>,
2117 ) {
2118 let mut buckets: BTreeMap<String, Vec<Record>> = BTreeMap::new();
2119 for child in child_rows {
2120 if let Some(key) = child.get(&plan.foreign_key) {
2121 buckets
2122 .entry(relation_bucket_key(key))
2123 .or_default()
2124 .push(child);
2125 }
2126 }
2127
2128 for parent in parent_rows.iter_mut() {
2129 let Some(local_value) = parent.get(&plan.local_key) else {
2130 continue;
2131 };
2132 let bucket_key = relation_bucket_key(local_value);
2133 let related = buckets.get(&bucket_key).cloned().unwrap_or_default();
2134 if plan.many {
2135 parent.insert(
2136 plan.relation_name.clone(),
2137 Value::List(related.into_iter().map(Value::object).collect()),
2138 );
2139 } else {
2140 let value = related
2141 .into_iter()
2142 .next()
2143 .map(Value::object)
2144 .unwrap_or(Value::Null);
2145 parent.insert(plan.relation_name.clone(), value);
2146 }
2147 }
2148 }
2149}
2150
2151fn relation_bucket_key(value: &Value) -> String {
2152 match value {
2153 Value::Null => "null".to_owned(),
2154 Value::Bool(v) => format!("b:{v}"),
2155 Value::I64(v) => format!("i:{v}"),
2156 Value::U64(v) => format!("u:{v}"),
2157 Value::F64(v) => format!("f:{v}"),
2158 Value::Decimal(v) => format!("d:{v}"),
2159 Value::Text(v) => format!("t:{v}"),
2160 Value::Json(v) => format!("j:{v}"),
2161 Value::Date(v) => format!("d:{v}"),
2162 Value::Timestamp(v) => format!("ts:{}", v.to_rfc3339()),
2163 Value::Object(_) => "o".to_owned(),
2164 Value::List(_) => "l".to_owned(),
2165 }
2166}
2167
2168fn aggregation_cache_namespace(entity: &str) -> String {
2169 format!("entity:{entity}")
2170}
2171
2172fn invalidate_aggregation_cache_namespace(cache: &dyn AggregationCacheBackend, entity: &str) {
2173 let namespace = format!(
2174 "{}::{}",
2175 cache.namespace(),
2176 aggregation_cache_namespace(entity)
2177 );
2178 cache.invalidate_namespace(&namespace);
2179}
2180
2181fn aggregation_cache_key(
2182 cache_namespace: &str,
2183 query_namespace: &str,
2184 query: &CompiledQuery,
2185) -> String {
2186 format!(
2187 "{cache_namespace}::{query_namespace}::{}::{:?}",
2188 query.sql, query.params
2189 )
2190}
2191
2192fn ensure_projection(query: &mut SelectQuery, field: &str) {
2193 if !query.projection.is_empty()
2194 && !query
2195 .projection
2196 .iter()
2197 .any(|projection| projection == field)
2198 {
2199 query.projection.push(field.to_owned());
2200 }
2201}
2202
2203fn attach_empty_relation_aggregate(parent_rows: &mut [Record], alias: &str, single_result: bool) {
2204 let value = if single_result {
2205 Value::U64(0)
2206 } else {
2207 Value::List(Vec::new())
2208 };
2209 for parent in parent_rows {
2210 parent.insert(alias.to_owned(), value.clone());
2211 }
2212}
2213
2214fn attach_relation_aggregate_rows(
2215 parent_rows: &mut [Record],
2216 plan: &RelationLoadPlan,
2217 aggregate: &RelationAggregate,
2218 aggregate_rows: Vec<Record>,
2219) {
2220 let mut buckets: BTreeMap<String, Vec<Record>> = BTreeMap::new();
2221 for mut row in aggregate_rows {
2222 if let Some(key) = row.remove(&plan.foreign_key) {
2223 buckets
2224 .entry(relation_bucket_key(&key))
2225 .or_default()
2226 .push(row);
2227 }
2228 }
2229
2230 for parent in parent_rows {
2231 let value = parent
2232 .get(&plan.local_key)
2233 .and_then(|local_value| buckets.get(&relation_bucket_key(local_value)))
2234 .map(|rows| relation_aggregate_value(rows, aggregate.single_result))
2235 .unwrap_or_else(|| {
2236 if aggregate.single_result {
2237 Value::U64(0)
2238 } else {
2239 Value::List(Vec::new())
2240 }
2241 });
2242 parent.insert(aggregate.alias.clone(), value);
2243 }
2244}
2245
2246fn relation_aggregate_value(rows: &[Record], single_result: bool) -> Value {
2247 if single_result {
2248 rows.first()
2249 .map(single_relation_aggregate_value)
2250 .unwrap_or(Value::U64(0))
2251 } else {
2252 Value::List(rows.iter().cloned().map(Value::object).collect())
2253 }
2254}
2255
2256fn single_relation_aggregate_value(row: &Record) -> Value {
2257 if row.len() == 1 {
2258 row.values().next().cloned().unwrap_or(Value::Null)
2259 } else {
2260 Value::object(row.clone())
2261 }
2262}
2263
2264fn graph_record_version(record: &Record, descriptor: &EntityDescriptor) -> Option<i64> {
2265 descriptor
2266 .version_property()
2267 .and_then(|property| match record.get(&property.name) {
2268 Some(Value::I64(version)) => Some(*version),
2269 _ => None,
2270 })
2271}
2272
2273fn graph_identity_key(value: &Value) -> String {
2274 match value {
2275 Value::I64(value) if *value >= 0 => format!("u:{}", *value as u64),
2276 Value::U64(value) => format!("u:{value}"),
2277 _ => relation_bucket_key(value),
2278 }
2279}
2280
2281fn ensure_relation_target<ExecError>(
2282 parent_entity: &str,
2283 relation_name: &str,
2284 expected_entity: &str,
2285 child: &GraphNode,
2286) -> Result<(), RepositoryError<ExecError>> {
2287 if child.entity == expected_entity {
2288 return Ok(());
2289 }
2290 Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
2291 "relation {parent_entity}.{relation_name} expects {expected_entity}, got {}",
2292 child.entity
2293 ))))
2294}