teaql_runtime/repository/
relation.rs1use std::collections::BTreeMap;
2use std::slice;
3
4use teaql_core::{
5 Aggregate, Expr, ObjectGroupBy, Record, RelationAggregate, RelationLoad, SelectQuery, Value,
6};
7use teaql_sql::SqlDialect;
8
9use crate::{RepositoryError, RuntimeError};
10
11use super::{QueryExecutor, RelationLoadPlan, ResolvedRepository, helpers::*};
12
13impl<'a, D, E> ResolvedRepository<'a, D, E>
14where
15 D: SqlDialect,
16 E: QueryExecutor,
17{
18 pub fn relation_loads(&self) -> Vec<String> {
19 self.behavior()
20 .map(|behavior| behavior.relation_loads(self.repository.metadata.context))
21 .unwrap_or_default()
22 }
23
24 pub fn relation_plans(&self) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
25 self.build_relation_plans(&self.entity, &self.relation_loads())
26 }
27
28 pub fn relation_query(
29 &self,
30 relation_name: &str,
31 parent_rows: &[Record],
32 ) -> Result<SelectQuery, RuntimeError> {
33 let plan = self
34 .relation_plans()?
35 .into_iter()
36 .find(|plan| plan.relation_name == relation_name)
37 .ok_or_else(|| RuntimeError::MissingRelation {
38 entity: self.entity.clone(),
39 relation: relation_name.to_owned(),
40 })?;
41 Ok(self.query_for_plan(&plan, parent_rows))
42 }
43
44 pub fn enhance_relations(
45 &self,
46 parent_rows: &mut [Record],
47 ) -> Result<(), RepositoryError<E::Error>> {
48 let plans = self.relation_plans().map_err(RepositoryError::Runtime)?;
49 for plan in plans {
50 self.enhance_plan(parent_rows, &plan)?;
51 }
52 Ok(())
53 }
54
55 pub fn enhance_query_relations(
56 &self,
57 parent_rows: &mut [Record],
58 query: &SelectQuery,
59 ) -> Result<(), RepositoryError<E::Error>> {
60 let plans = self
61 .build_relation_plans_from_loads(&query.entity, &query.relations)
62 .map_err(RepositoryError::Runtime)?;
63 for plan in plans {
64 self.enhance_plan(parent_rows, &plan)?;
65 }
66 Ok(())
67 }
68
69 pub fn enhance_relation_aggregates(
70 &self,
71 parent_rows: &mut [Record],
72 relation_aggregates: &[RelationAggregate],
73 parent_cache_options: Option<teaql_core::AggregationCacheOptions>,
74 parent_trace_chain: &[teaql_core::TraceNode],
75 ) -> Result<(), RepositoryError<E::Error>> {
76 for aggregate in relation_aggregates {
77 self.enhance_relation_aggregate(parent_rows, aggregate, parent_cache_options, parent_trace_chain)?;
78 }
79 Ok(())
80 }
81
82 pub fn enhance_object_group_bys(
83 &self,
84 rows: &mut [Record],
85 object_group_bys: &[ObjectGroupBy],
86 parent_trace_chain: &[teaql_core::TraceNode],
87 ) -> Result<(), RepositoryError<E::Error>> {
88 for group_by in object_group_bys {
89 let ids = rows
90 .iter()
91 .filter_map(|row| row.get(&group_by.storage_field).cloned())
92 .collect::<Vec<_>>();
93 if ids.is_empty() {
94 continue;
95 }
96 let mut query = group_by.query.clone();
97 ensure_projection(&mut query, "id");
98 query = query.and_filter(Expr::in_list("id", ids));
99 let object_rows = self
100 .scoped_repository(query.entity.clone())
101 .with_trace_context(parent_trace_chain.to_vec())
102 .fetch_all(&query)?
103 .into_iter()
104 .filter_map(|row| {
105 row.get("id")
106 .cloned()
107 .map(|id| (graph_identity_key(&id), row))
108 })
109 .collect::<BTreeMap<_, _>>();
110 for row in rows.iter_mut() {
111 if let Some(key) = row.get(&group_by.storage_field).map(graph_identity_key) {
112 let value = object_rows
113 .get(&key)
114 .cloned()
115 .map(Value::object)
116 .unwrap_or(Value::Null);
117 row.insert(group_by.property_name.clone(), value);
118 }
119 }
120 }
121 Ok(())
122 }
123
124 pub fn enhance_child_queries(
125 &self,
126 rows: &mut [Record],
127 child_queries: &[SelectQuery],
128 parent_trace_chain: &[teaql_core::TraceNode],
129 ) -> Result<(), RepositoryError<E::Error>> {
130 for child_query in child_queries {
131 let ids = rows
132 .iter()
133 .filter_map(|row| row.get("id").cloned())
134 .collect::<Vec<_>>();
135 if ids.is_empty() {
136 continue;
137 }
138 let mut query = child_query.clone();
139 ensure_projection(&mut query, "id");
140 query = query.and_filter(Expr::in_list("id", ids));
141 let child_rows = self
142 .scoped_repository(query.entity.clone())
143 .with_trace_context(parent_trace_chain.to_vec())
144 .fetch_all(&query)?
145 .into_iter()
146 .filter_map(|row| {
147 row.get("id")
148 .cloned()
149 .map(|id| (graph_identity_key(&id), row))
150 })
151 .collect::<BTreeMap<_, _>>();
152 for row in rows.iter_mut() {
153 if let Some(key) = row.get("id").map(graph_identity_key) {
154 if let Some(child) = child_rows.get(&key) {
155 row.extend(child.clone());
156 }
157 }
158 }
159 }
160 Ok(())
161 }
162
163 fn enhance_relation_aggregate(
164 &self,
165 parent_rows: &mut [Record],
166 aggregate: &RelationAggregate,
167 parent_cache_options: Option<teaql_core::AggregationCacheOptions>,
168 parent_trace_chain: &[teaql_core::TraceNode],
169 ) -> Result<(), RepositoryError<E::Error>> {
170 let plan = self
171 .build_relation_plans_from_loads(
172 &self.entity,
173 &[RelationLoad::with_query(
174 aggregate.relation_name.clone(),
175 aggregate.query.clone(),
176 )],
177 )
178 .map_err(RepositoryError::Runtime)?
179 .into_iter()
180 .next()
181 .ok_or_else(|| {
182 RepositoryError::Runtime(RuntimeError::MissingRelation {
183 entity: self.entity.clone(),
184 relation: aggregate.relation_name.clone(),
185 })
186 })?;
187
188 let ids = parent_rows
189 .iter()
190 .filter_map(|row| row.get(&plan.local_key).cloned())
191 .collect::<Vec<_>>();
192 if ids.is_empty() {
193 attach_empty_relation_aggregate(parent_rows, &aggregate.alias, aggregate.single_result);
194 return Ok(());
195 }
196
197 let child_repo = self.scoped_repository(plan.target_entity.clone());
198 let mut query = aggregate.query.clone();
199 query.entity = plan.target_entity.clone();
200 if query.aggregation_cache.is_none() {
201 if let Some(options) = parent_cache_options.filter(|options| options.propagate) {
202 query.aggregation_cache = Some(teaql_core::AggregationCacheOptions::enabled(
203 options.propagate_cache_expired_millis,
204 ));
205 }
206 }
207 query.projection.clear();
208 query.expr_projection.clear();
209 query.order_by.clear();
210 query.slice = None;
211 query.relations.clear();
212 if query.aggregates.is_empty() {
213 let alias = if aggregate.single_result {
214 aggregate.alias.clone()
215 } else {
216 "count".to_owned()
217 };
218 query = query.aggregate(Aggregate::count(alias));
219 }
220 if !query
221 .group_by
222 .iter()
223 .any(|field| field == &plan.foreign_key)
224 {
225 query = query.group_by(plan.foreign_key.clone());
226 }
227 query = query.and_filter(Expr::in_list(plan.foreign_key.clone(), ids));
228
229 let mut chain = parent_trace_chain.to_vec();
230 chain.push(teaql_core::TraceNode {
231 entity_type: query.entity.clone(),
232 entity_id: None,
233 comment: aggregate.alias.clone(),
234 });
235
236 let mut aggregate_rows = child_repo.with_trace_context(chain).fetch_all(&query)?;
237 let foreign_key_column = self
238 .repository
239 .metadata
240 .context
241 .entity(&plan.target_entity)
242 .and_then(|descriptor| {
243 descriptor
244 .properties
245 .iter()
246 .find(|property| property.name == plan.foreign_key)
247 .map(|property| property.column_name.clone())
248 });
249 if let Some(foreign_key_column) =
250 foreign_key_column.filter(|column| column != &plan.foreign_key)
251 {
252 for row in &mut aggregate_rows {
253 if !row.contains_key(&plan.foreign_key) {
254 if let Some(value) = row.remove(&foreign_key_column) {
255 row.insert(plan.foreign_key.clone(), value);
256 }
257 }
258 }
259 }
260 attach_relation_aggregate_rows(parent_rows, &plan, aggregate, aggregate_rows);
261 Ok(())
262 }
263
264 fn build_relation_plans(
265 &self,
266 entity: &str,
267 loads: &[String],
268 ) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
269 let descriptor = self.repository.metadata.context.require_entity(entity)?;
270 let mut grouped: BTreeMap<String, Vec<String>> = BTreeMap::new();
271 for load in loads {
272 if let Some((head, tail)) = load.split_once('.') {
273 grouped
274 .entry(head.to_owned())
275 .or_default()
276 .push(tail.to_owned());
277 } else {
278 grouped.entry(load.clone()).or_default();
279 }
280 }
281
282 grouped
283 .into_iter()
284 .map(|(name, child_loads)| {
285 let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
286 RuntimeError::MissingRelation {
287 entity: entity.to_owned(),
288 relation: name.clone(),
289 }
290 })?;
291 let child_repo = self.scoped_repository(relation.target_entity.clone());
292 let children =
293 child_repo.build_relation_plans(&relation.target_entity, &child_loads)?;
294 Ok(RelationLoadPlan {
295 parent_entity: entity.to_owned(),
296 relation_name: relation.name.clone(),
297 path: relation.name.clone(),
298 target_entity: relation.target_entity.clone(),
299 local_key: relation.local_key.clone(),
300 foreign_key: relation.foreign_key.clone(),
301 many: relation.many,
302 query: None,
303 children,
304 })
305 })
306 .collect()
307 }
308
309 fn build_relation_plans_from_loads(
310 &self,
311 entity: &str,
312 loads: &[RelationLoad],
313 ) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
314 let descriptor = self.repository.metadata.context.require_entity(entity)?;
315 loads
316 .iter()
317 .map(|load| {
318 let relation = descriptor.relation_by_name(&load.name).ok_or_else(|| {
319 RuntimeError::MissingRelation {
320 entity: entity.to_owned(),
321 relation: load.name.clone(),
322 }
323 })?;
324 let relation_query = load.query.as_deref().cloned();
325 let child_loads = relation_query
326 .as_ref()
327 .map(|query| query.relations.as_slice())
328 .unwrap_or_default();
329 let child_repo = self.scoped_repository(relation.target_entity.clone());
330 let children = child_repo
331 .build_relation_plans_from_loads(&relation.target_entity, child_loads)?;
332 Ok(RelationLoadPlan {
333 parent_entity: entity.to_owned(),
334 relation_name: relation.name.clone(),
335 path: relation.name.clone(),
336 target_entity: relation.target_entity.clone(),
337 local_key: relation.local_key.clone(),
338 foreign_key: relation.foreign_key.clone(),
339 many: relation.many,
340 query: relation_query,
341 children,
342 })
343 })
344 .collect()
345 }
346 fn enhance_plan(
347 &self,
348 parent_rows: &mut [Record],
349 plan: &RelationLoadPlan,
350 ) -> Result<(), RepositoryError<E::Error>> {
351 let child_repo = self.scoped_repository(plan.target_entity.clone());
352 let query = self.query_for_plan(plan, parent_rows);
353 let child_rows = child_repo.fetch_all(&query)?;
354 self.attach_relation_rows(parent_rows, plan, child_rows);
355
356 if !plan.children.is_empty() {
357 for parent in parent_rows.iter_mut() {
358 match parent.get_mut(&plan.relation_name) {
359 Some(Value::Object(child)) => {
360 child_repo.enhance_child_record(child, &plan.children)?;
361 }
362 Some(Value::List(values)) => {
363 for value in values.iter_mut() {
364 if let Value::Object(child) = value {
365 child_repo.enhance_child_record(child, &plan.children)?;
366 }
367 }
368 }
369 _ => {}
370 }
371 }
372 }
373 Ok(())
374 }
375
376 fn enhance_child_record(
377 &self,
378 child: &mut Record,
379 plans: &[RelationLoadPlan],
380 ) -> Result<(), RepositoryError<E::Error>> {
381 for plan in plans {
382 self.enhance_plan(slice::from_mut(child), plan)?;
383 }
384 Ok(())
385 }
386
387 fn query_for_plan(&self, plan: &RelationLoadPlan, parent_rows: &[Record]) -> SelectQuery {
388 let ids = parent_rows
389 .iter()
390 .filter_map(|row| row.get(&plan.local_key).cloned())
391 .collect::<Vec<_>>();
392
393 let mut query = plan
394 .query
395 .clone()
396 .unwrap_or_else(|| SelectQuery::new(plan.target_entity.clone()));
397 query.entity = plan.target_entity.clone();
398 ensure_projection(&mut query, &plan.foreign_key);
399 for child in &plan.children {
400 ensure_projection(&mut query, &child.local_key);
401 }
402 if !ids.is_empty() {
403 query = query.and_filter(Expr::in_list(plan.foreign_key.clone(), ids));
404 }
405 query
406 }
407
408 fn attach_relation_rows(
409 &self,
410 parent_rows: &mut [Record],
411 plan: &RelationLoadPlan,
412 child_rows: Vec<Record>,
413 ) {
414 let inverse_relation = self
415 .repository
416 .metadata
417 .context
418 .entity(&plan.target_entity)
419 .and_then(|descriptor| {
420 descriptor.relations.iter().find(|relation| {
421 relation.target_entity == plan.parent_entity
422 && relation.local_key == plan.foreign_key
423 && relation.foreign_key == plan.local_key
424 })
425 })
426 .map(|relation| (relation.name.clone(), relation.many));
427
428 let mut buckets: BTreeMap<String, Vec<Record>> = BTreeMap::new();
429 for child in child_rows {
430 if let Some(key) = child.get(&plan.foreign_key) {
431 buckets
432 .entry(graph_identity_key(key))
433 .or_default()
434 .push(child);
435 }
436 }
437
438 for parent in parent_rows.iter_mut() {
439 let Some(local_value) = parent.get(&plan.local_key) else {
440 continue;
441 };
442 let bucket_key = graph_identity_key(local_value);
443 let related = buckets.get(&bucket_key).cloned().unwrap_or_default();
444 let related = if let Some((inverse_relation, inverse_many)) = &inverse_relation {
445 let mut parent_object = parent.clone();
446 parent_object.remove(&plan.relation_name);
447 related
448 .into_iter()
449 .map(|mut child| {
450 if *inverse_many {
451 let entry = child
452 .entry(inverse_relation.clone())
453 .or_insert_with(|| Value::List(Vec::new()));
454 if let Value::List(list) = entry {
455 list.push(Value::object(parent_object.clone()));
456 }
457 } else {
458 child.insert(inverse_relation.clone(), Value::object(parent_object.clone()));
459 }
460 child
461 })
462 .collect::<Vec<_>>()
463 } else {
464 related
465 };
466 if plan.many {
467 parent.insert(
468 plan.relation_name.clone(),
469 Value::List(related.into_iter().map(Value::object).collect()),
470 );
471 } else {
472 let value = related
473 .into_iter()
474 .next()
475 .map(Value::object)
476 .unwrap_or(Value::Null);
477 parent.insert(plan.relation_name.clone(), value);
478 }
479 }
480 }
481}