teaql_runtime/repository/
relation.rs1use std::collections::BTreeMap;
2use std::slice;
3
4use teaql_core::{
5 Aggregate, Expr, ObjectGroupBy, Record, RelationAggregate, RelationLoad, SelectQuery, Value,
6};
7
8use crate::{RepositoryError, RuntimeError};
9
10use super::{RelationLoadPlan, ResolvedRepository, helpers::*};
11
12impl<'a, E> ResolvedRepository<'a, E>
13where
14 E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
15{
16 pub fn relation_loads(&self) -> Vec<String> {
17 self.behavior()
18 .map(|behavior| behavior.relation_loads(self.repository.metadata.context))
19 .unwrap_or_default()
20 }
21
22 pub fn relation_plans(&self) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
23 self.build_relation_plans(&self.entity, &self.relation_loads())
24 }
25
26 pub fn relation_query(
27 &self,
28 relation_name: &str,
29 parent_rows: &[Record],
30 ) -> Result<SelectQuery, RuntimeError> {
31 let plan = self
32 .relation_plans()?
33 .into_iter()
34 .find(|plan| plan.relation_name == relation_name)
35 .ok_or_else(|| RuntimeError::MissingRelation {
36 entity: self.entity.clone(),
37 relation: relation_name.to_owned(),
38 })?;
39 Ok(self.query_for_plan(&plan, parent_rows))
40 }
41
42 pub async fn enhance_relations(
43 &self,
44 parent_rows: &mut [Record],
45 ) -> Result<(), RepositoryError<E::Error>> {
46 let plans = self.relation_plans().map_err(RepositoryError::Runtime)?;
47 for plan in plans {
48 self.enhance_plan(parent_rows, &plan).await?;
49 }
50 Ok(())
51 }
52
53 pub async fn enhance_query_relations(
54 &self,
55 parent_rows: &mut [Record],
56 query: &SelectQuery,
57 ) -> Result<(), RepositoryError<E::Error>> {
58 let plans = self
59 .build_relation_plans_from_loads(&query.entity, &query.relations)
60 .map_err(RepositoryError::Runtime)?;
61 for plan in plans {
62 self.enhance_plan(parent_rows, &plan).await?;
63 }
64 Ok(())
65 }
66
67 pub fn enhance_relation_aggregates<'b>(
68 &'b self,
69 parent_rows: &'b mut [Record],
70 relation_aggregates: &'b [RelationAggregate],
71 parent_cache_options: Option<teaql_core::AggregationCacheOptions>,
72 parent_trace_chain: &'b [teaql_core::TraceNode],
73 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RepositoryError<E::Error>>> + Send + 'b>> {
74 Box::pin(async move {
75 for aggregate in relation_aggregates {
76 self.enhance_relation_aggregate(parent_rows, aggregate, parent_cache_options, parent_trace_chain).await?;
77 }
78 Ok(())
79 })
80 }
81
82 pub fn enhance_object_group_bys<'b>(
83 &'b self,
84 rows: &'b mut [Record],
85 object_group_bys: &'b [ObjectGroupBy],
86 parent_trace_chain: &'b [teaql_core::TraceNode],
87 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RepositoryError<E::Error>>> + Send + 'b>> {
88 Box::pin(async move {
89 for group_by in object_group_bys {
90 let ids = rows
91 .iter()
92 .filter_map(|row| row.get(&group_by.storage_field).cloned())
93 .collect::<Vec<_>>();
94 if ids.is_empty() {
95 continue;
96 }
97 let mut query = group_by.query.clone();
98 ensure_projection(&mut query, "id");
99 query = query.and_filter(Expr::in_list("id", ids));
100 let object_rows = self
101 .scoped_repository(query.entity.clone())
102 .with_trace_context(parent_trace_chain.to_vec())
103 .fetch_all(&query).await?
104 .into_iter()
105 .filter_map(|row| {
106 row.get("id")
107 .cloned()
108 .map(|id| (graph_identity_key(&id), row))
109 })
110 .collect::<BTreeMap<_, _>>();
111 for row in rows.iter_mut() {
112 if let Some(key) = row.get(&group_by.storage_field).map(graph_identity_key) {
113 let value = object_rows
114 .get(&key)
115 .cloned()
116 .map(Value::object)
117 .unwrap_or(Value::Null);
118 row.insert(group_by.property_name.clone(), value);
119 }
120 }
121 }
122 Ok(())
123 })
124 }
125
126 pub fn enhance_child_queries<'b>(
127 &'b self,
128 rows: &'b mut [Record],
129 child_queries: &'b [SelectQuery],
130 parent_trace_chain: &'b [teaql_core::TraceNode],
131 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RepositoryError<E::Error>>> + Send + 'b>> {
132 Box::pin(async move {
133 for child_query in child_queries {
134 let ids = rows
135 .iter()
136 .filter_map(|row| row.get("id").cloned())
137 .collect::<Vec<_>>();
138 if ids.is_empty() {
139 continue;
140 }
141 let mut query = child_query.clone();
142 ensure_projection(&mut query, "id");
143 query = query.and_filter(Expr::in_list("id", ids));
144 let child_rows = self
145 .scoped_repository(query.entity.clone())
146 .with_trace_context(parent_trace_chain.to_vec())
147 .fetch_all(&query).await?
148 .into_iter()
149 .filter_map(|row| {
150 row.get("id")
151 .cloned()
152 .map(|id| (graph_identity_key(&id), row))
153 })
154 .collect::<BTreeMap<_, _>>();
155 for row in rows.iter_mut() {
156 if let Some(key) = row.get("id").map(graph_identity_key) {
157 if let Some(child) = child_rows.get(&key) {
158 row.extend(child.clone());
159 }
160 }
161 }
162 }
163 Ok(())
164 })
165 }
166
167 async fn enhance_relation_aggregate(
168 &self,
169 parent_rows: &mut [Record],
170 aggregate: &RelationAggregate,
171 parent_cache_options: Option<teaql_core::AggregationCacheOptions>,
172 parent_trace_chain: &[teaql_core::TraceNode],
173 ) -> Result<(), RepositoryError<E::Error>> {
174 let plan = self
175 .build_relation_plans_from_loads(
176 &self.entity,
177 &[RelationLoad::with_query(
178 aggregate.relation_name.clone(),
179 aggregate.query.clone(),
180 )],
181 )
182 .map_err(RepositoryError::Runtime)?
183 .into_iter()
184 .next()
185 .ok_or_else(|| {
186 RepositoryError::Runtime(RuntimeError::MissingRelation {
187 entity: self.entity.clone(),
188 relation: aggregate.relation_name.clone(),
189 })
190 })?;
191
192 let ids = parent_rows
193 .iter()
194 .filter_map(|row| row.get(&plan.local_key).cloned())
195 .collect::<Vec<_>>();
196 if ids.is_empty() {
197 attach_empty_relation_aggregate(parent_rows, &aggregate.alias, aggregate.single_result);
198 return Ok(());
199 }
200
201 let child_repo = self.scoped_repository(plan.target_entity.clone());
202 let mut query = aggregate.query.clone();
203 query.entity = plan.target_entity.clone();
204 if query.aggregation_cache.is_none() {
205 if let Some(options) = parent_cache_options.filter(|options| options.propagate) {
206 query.aggregation_cache = Some(teaql_core::AggregationCacheOptions::enabled(
207 options.propagate_cache_expired_millis,
208 ));
209 }
210 }
211 query.projection.clear();
212 query.expr_projection.clear();
213 query.order_by.clear();
214 query.slice = None;
215 query.relations.clear();
216 if query.aggregates.is_empty() {
217 let alias = if aggregate.single_result {
218 aggregate.alias.clone()
219 } else {
220 "count".to_owned()
221 };
222 query = query.aggregate(Aggregate::count(alias));
223 }
224 if !query
225 .group_by
226 .iter()
227 .any(|field| field == &plan.foreign_key)
228 {
229 query = query.group_by(plan.foreign_key.clone());
230 }
231 query = query.and_filter(Expr::in_list(plan.foreign_key.clone(), ids));
232
233 let mut chain = parent_trace_chain.to_vec();
234 chain.push(teaql_core::TraceNode {
235 entity_type: query.entity.clone(),
236 entity_id: None,
237 comment: aggregate.alias.clone(),
238 });
239
240 let mut aggregate_rows = child_repo.with_trace_context(chain).fetch_all(&query).await?;
241 let foreign_key_column = self
242 .repository
243 .metadata
244 .context
245 .entity(&plan.target_entity)
246 .and_then(|descriptor| {
247 descriptor
248 .properties
249 .iter()
250 .find(|property| property.name == plan.foreign_key)
251 .map(|property| property.column_name.clone())
252 });
253 if let Some(foreign_key_column) =
254 foreign_key_column.filter(|column| column != &plan.foreign_key)
255 {
256 for row in &mut aggregate_rows {
257 if !row.contains_key(&plan.foreign_key) {
258 if let Some(value) = row.remove(&foreign_key_column) {
259 row.insert(plan.foreign_key.clone(), value);
260 }
261 }
262 }
263 }
264 attach_relation_aggregate_rows(parent_rows, &plan, aggregate, aggregate_rows);
265 Ok(())
266 }
267
268 fn build_relation_plans(
269 &self,
270 entity: &str,
271 loads: &[String],
272 ) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
273 let descriptor = self.repository.metadata.context.require_entity(entity)?;
274 let mut grouped: BTreeMap<String, Vec<String>> = BTreeMap::new();
275 for load in loads {
276 if let Some((head, tail)) = load.split_once('.') {
277 grouped
278 .entry(head.to_owned())
279 .or_default()
280 .push(tail.to_owned());
281 } else {
282 grouped.entry(load.clone()).or_default();
283 }
284 }
285
286 grouped
287 .into_iter()
288 .map(|(name, child_loads)| {
289 let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
290 RuntimeError::MissingRelation {
291 entity: entity.to_owned(),
292 relation: name.clone(),
293 }
294 })?;
295 let child_repo = self.scoped_repository(relation.target_entity.clone());
296 let children =
297 child_repo.build_relation_plans(&relation.target_entity, &child_loads)?;
298 Ok(RelationLoadPlan {
299 parent_entity: entity.to_owned(),
300 relation_name: relation.name.clone(),
301 path: relation.name.clone(),
302 target_entity: relation.target_entity.clone(),
303 local_key: relation.local_key.clone(),
304 foreign_key: relation.foreign_key.clone(),
305 many: relation.many,
306 query: None,
307 children,
308 })
309 })
310 .collect()
311 }
312
313 fn build_relation_plans_from_loads(
314 &self,
315 entity: &str,
316 loads: &[RelationLoad],
317 ) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
318 let descriptor = self.repository.metadata.context.require_entity(entity)?;
319 loads
320 .iter()
321 .map(|load| {
322 let relation = descriptor.relation_by_name(&load.name).ok_or_else(|| {
323 RuntimeError::MissingRelation {
324 entity: entity.to_owned(),
325 relation: load.name.clone(),
326 }
327 })?;
328 let relation_query = load.query.as_deref().cloned();
329 let child_loads = relation_query
330 .as_ref()
331 .map(|query| query.relations.as_slice())
332 .unwrap_or_default();
333 let child_repo = self.scoped_repository(relation.target_entity.clone());
334 let children = child_repo
335 .build_relation_plans_from_loads(&relation.target_entity, child_loads)?;
336 Ok(RelationLoadPlan {
337 parent_entity: entity.to_owned(),
338 relation_name: relation.name.clone(),
339 path: relation.name.clone(),
340 target_entity: relation.target_entity.clone(),
341 local_key: relation.local_key.clone(),
342 foreign_key: relation.foreign_key.clone(),
343 many: relation.many,
344 query: relation_query,
345 children,
346 })
347 })
348 .collect()
349 }
350 fn enhance_plan<'b>(
351 &'b self,
352 parent_rows: &'b mut [Record],
353 plan: &'b RelationLoadPlan,
354 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RepositoryError<E::Error>>> + Send + 'b>> {
355 Box::pin(async move {
356 let child_repo = self.scoped_repository(plan.target_entity.clone());
357 let query = self.query_for_plan(plan, parent_rows);
358 let child_rows = child_repo.fetch_all(&query).await?;
359 self.attach_relation_rows(parent_rows, plan, child_rows);
360
361 if !plan.children.is_empty() {
362 for parent in parent_rows.iter_mut() {
363 match parent.get_mut(&plan.relation_name) {
364 Some(Value::Object(child)) => {
365 child_repo.enhance_child_record(child, &plan.children).await?;
366 }
367 Some(Value::List(values)) => {
368 for value in values.iter_mut() {
369 if let Value::Object(child) = value {
370 child_repo.enhance_child_record(child, &plan.children).await?;
371 }
372 }
373 }
374 _ => {}
375 }
376 }
377 }
378 Ok(())
379 })
380 }
381
382 fn enhance_child_record<'b>(
383 &'b self,
384 child: &'b mut Record,
385 plans: &'b [RelationLoadPlan],
386 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RepositoryError<E::Error>>> + Send + 'b>> {
387 Box::pin(async move {
388 for plan in plans {
389 self.enhance_plan(slice::from_mut(child), plan).await?;
390 }
391 Ok(())
392 })
393 }
394
395 fn query_for_plan(&self, plan: &RelationLoadPlan, parent_rows: &[Record]) -> SelectQuery {
396 let ids = parent_rows
397 .iter()
398 .filter_map(|row| row.get(&plan.local_key).cloned())
399 .collect::<Vec<_>>();
400
401 let mut query = plan
402 .query
403 .clone()
404 .unwrap_or_else(|| SelectQuery::new(plan.target_entity.clone()));
405 query.entity = plan.target_entity.clone();
406 ensure_projection(&mut query, &plan.foreign_key);
407 for child in &plan.children {
408 ensure_projection(&mut query, &child.local_key);
409 }
410 if !ids.is_empty() {
411 query = query.and_filter(Expr::in_list(plan.foreign_key.clone(), ids));
412 }
413 query
414 }
415
416 fn attach_relation_rows(
417 &self,
418 parent_rows: &mut [Record],
419 plan: &RelationLoadPlan,
420 child_rows: Vec<Record>,
421 ) {
422 let inverse_relation = self
423 .repository
424 .metadata
425 .context
426 .entity(&plan.target_entity)
427 .and_then(|descriptor| {
428 descriptor.relations.iter().find(|relation| {
429 relation.target_entity == plan.parent_entity
430 && relation.local_key == plan.foreign_key
431 && relation.foreign_key == plan.local_key
432 })
433 })
434 .map(|relation| (relation.name.clone(), relation.many));
435
436 let mut buckets: BTreeMap<String, Vec<Record>> = BTreeMap::new();
437 for child in child_rows {
438 if let Some(key) = child.get(&plan.foreign_key) {
439 buckets
440 .entry(graph_identity_key(key))
441 .or_default()
442 .push(child);
443 }
444 }
445
446 for parent in parent_rows.iter_mut() {
447 let Some(local_value) = parent.get(&plan.local_key) else {
448 continue;
449 };
450 let bucket_key = graph_identity_key(local_value);
451 let related = buckets.get(&bucket_key).cloned().unwrap_or_default();
452 let related = if let Some((inverse_relation, inverse_many)) = &inverse_relation {
453 let mut parent_object = parent.clone();
454 parent_object.remove(&plan.relation_name);
455 related
456 .into_iter()
457 .map(|mut child| {
458 if *inverse_many {
459 let entry = child
460 .entry(inverse_relation.clone())
461 .or_insert_with(|| Value::List(Vec::new()));
462 if let Value::List(list) = entry {
463 list.push(Value::object(parent_object.clone()));
464 }
465 } else {
466 child.insert(
467 inverse_relation.clone(),
468 Value::object(parent_object.clone()),
469 );
470 }
471 child
472 })
473 .collect::<Vec<_>>()
474 } else {
475 related
476 };
477 if plan.many {
478 parent.insert(
479 plan.relation_name.clone(),
480 Value::List(related.into_iter().map(Value::object).collect()),
481 );
482 } else {
483 let value = related
484 .into_iter()
485 .next()
486 .map(Value::object)
487 .unwrap_or(Value::Null);
488 parent.insert(plan.relation_name.clone(), value);
489 }
490 }
491 }
492}