teaql_runtime/repository/
relation.rs1use std::collections::BTreeMap;
2use std::slice;
3
4use teaql_core::{
5 Aggregate, Expr, ObjectGroupBy, Record, RelationAggregate, RelationLoad, SelectQuery, Value,
6};
7use teaql_sql::SqlDialect;
8
9use crate::{RepositoryError, RuntimeError};
10
11use super::{QueryExecutor, RelationLoadPlan, ResolvedRepository, helpers::*};
12
13impl<'a, D, E> ResolvedRepository<'a, D, E>
14where
15 D: SqlDialect,
16 E: QueryExecutor,
17{
18 pub fn relation_loads(&self) -> Vec<String> {
19 self.behavior()
20 .map(|behavior| behavior.relation_loads(self.repository.metadata.context))
21 .unwrap_or_default()
22 }
23
24 pub fn relation_plans(&self) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
25 self.build_relation_plans(&self.entity, &self.relation_loads())
26 }
27
28 pub fn relation_query(
29 &self,
30 relation_name: &str,
31 parent_rows: &[Record],
32 ) -> Result<SelectQuery, RuntimeError> {
33 let plan = self
34 .relation_plans()?
35 .into_iter()
36 .find(|plan| plan.relation_name == relation_name)
37 .ok_or_else(|| RuntimeError::MissingRelation {
38 entity: self.entity.clone(),
39 relation: relation_name.to_owned(),
40 })?;
41 Ok(self.query_for_plan(&plan, parent_rows))
42 }
43
44 pub fn enhance_relations(
45 &self,
46 parent_rows: &mut [Record],
47 ) -> Result<(), RepositoryError<E::Error>> {
48 let plans = self.relation_plans().map_err(RepositoryError::Runtime)?;
49 for plan in plans {
50 self.enhance_plan(parent_rows, &plan)?;
51 }
52 Ok(())
53 }
54
55 pub fn enhance_query_relations(
56 &self,
57 parent_rows: &mut [Record],
58 query: &SelectQuery,
59 ) -> Result<(), RepositoryError<E::Error>> {
60 let plans = self
61 .build_relation_plans_from_loads(&query.entity, &query.relations)
62 .map_err(RepositoryError::Runtime)?;
63 for plan in plans {
64 self.enhance_plan(parent_rows, &plan)?;
65 }
66 Ok(())
67 }
68
69 pub fn enhance_relation_aggregates(
70 &self,
71 parent_rows: &mut [Record],
72 relation_aggregates: &[RelationAggregate],
73 parent_cache_options: Option<teaql_core::AggregationCacheOptions>,
74 ) -> Result<(), RepositoryError<E::Error>> {
75 for aggregate in relation_aggregates {
76 self.enhance_relation_aggregate(parent_rows, aggregate, parent_cache_options)?;
77 }
78 Ok(())
79 }
80
81 pub fn enhance_object_group_bys(
82 &self,
83 rows: &mut [Record],
84 object_group_bys: &[ObjectGroupBy],
85 ) -> Result<(), RepositoryError<E::Error>> {
86 for group_by in object_group_bys {
87 let ids = rows
88 .iter()
89 .filter_map(|row| row.get(&group_by.storage_field).cloned())
90 .collect::<Vec<_>>();
91 if ids.is_empty() {
92 continue;
93 }
94 let mut query = group_by.query.clone();
95 ensure_projection(&mut query, "id");
96 query = query.and_filter(Expr::in_list("id", ids));
97 let object_rows = self
98 .scoped_repository(query.entity.clone())
99 .fetch_all(&query)?
100 .into_iter()
101 .filter_map(|row| {
102 row.get("id")
103 .cloned()
104 .map(|id| (graph_identity_key(&id), row))
105 })
106 .collect::<BTreeMap<_, _>>();
107 for row in rows.iter_mut() {
108 if let Some(key) = row.get(&group_by.storage_field).map(graph_identity_key) {
109 let value = object_rows
110 .get(&key)
111 .cloned()
112 .map(Value::object)
113 .unwrap_or(Value::Null);
114 row.insert(group_by.property_name.clone(), value);
115 }
116 }
117 }
118 Ok(())
119 }
120
121 pub fn enhance_child_queries(
122 &self,
123 rows: &mut [Record],
124 child_queries: &[SelectQuery],
125 ) -> Result<(), RepositoryError<E::Error>> {
126 for child_query in child_queries {
127 let ids = rows
128 .iter()
129 .filter_map(|row| row.get("id").cloned())
130 .collect::<Vec<_>>();
131 if ids.is_empty() {
132 continue;
133 }
134 let mut query = child_query.clone();
135 ensure_projection(&mut query, "id");
136 query = query.and_filter(Expr::in_list("id", ids));
137 let child_rows = self
138 .scoped_repository(query.entity.clone())
139 .fetch_all(&query)?
140 .into_iter()
141 .filter_map(|row| {
142 row.get("id")
143 .cloned()
144 .map(|id| (graph_identity_key(&id), row))
145 })
146 .collect::<BTreeMap<_, _>>();
147 for row in rows.iter_mut() {
148 if let Some(key) = row.get("id").map(graph_identity_key) {
149 if let Some(child) = child_rows.get(&key) {
150 row.extend(child.clone());
151 }
152 }
153 }
154 }
155 Ok(())
156 }
157
158 fn enhance_relation_aggregate(
159 &self,
160 parent_rows: &mut [Record],
161 aggregate: &RelationAggregate,
162 parent_cache_options: Option<teaql_core::AggregationCacheOptions>,
163 ) -> Result<(), RepositoryError<E::Error>> {
164 let plan = self
165 .build_relation_plans_from_loads(
166 &self.entity,
167 &[RelationLoad::with_query(
168 aggregate.relation_name.clone(),
169 aggregate.query.clone(),
170 )],
171 )
172 .map_err(RepositoryError::Runtime)?
173 .into_iter()
174 .next()
175 .ok_or_else(|| {
176 RepositoryError::Runtime(RuntimeError::MissingRelation {
177 entity: self.entity.clone(),
178 relation: aggregate.relation_name.clone(),
179 })
180 })?;
181
182 let ids = parent_rows
183 .iter()
184 .filter_map(|row| row.get(&plan.local_key).cloned())
185 .collect::<Vec<_>>();
186 if ids.is_empty() {
187 attach_empty_relation_aggregate(parent_rows, &aggregate.alias, aggregate.single_result);
188 return Ok(());
189 }
190
191 let child_repo = self.scoped_repository(plan.target_entity.clone());
192 let mut query = aggregate.query.clone();
193 query.entity = plan.target_entity.clone();
194 if query.aggregation_cache.is_none() {
195 if let Some(options) = parent_cache_options.filter(|options| options.propagate) {
196 query.aggregation_cache = Some(teaql_core::AggregationCacheOptions::enabled(
197 options.propagate_cache_expired_millis,
198 ));
199 }
200 }
201 query.projection.clear();
202 query.expr_projection.clear();
203 query.order_by.clear();
204 query.slice = None;
205 query.relations.clear();
206 if query.aggregates.is_empty() {
207 let alias = if aggregate.single_result {
208 aggregate.alias.clone()
209 } else {
210 "count".to_owned()
211 };
212 query = query.aggregate(Aggregate::count(alias));
213 }
214 if !query
215 .group_by
216 .iter()
217 .any(|field| field == &plan.foreign_key)
218 {
219 query = query.group_by(plan.foreign_key.clone());
220 }
221 query = query.and_filter(Expr::in_list(plan.foreign_key.clone(), ids));
222
223 let mut aggregate_rows = child_repo.fetch_all(&query)?;
224 let foreign_key_column = self
225 .repository
226 .metadata
227 .context
228 .entity(&plan.target_entity)
229 .and_then(|descriptor| {
230 descriptor
231 .properties
232 .iter()
233 .find(|property| property.name == plan.foreign_key)
234 .map(|property| property.column_name.clone())
235 });
236 if let Some(foreign_key_column) =
237 foreign_key_column.filter(|column| column != &plan.foreign_key)
238 {
239 for row in &mut aggregate_rows {
240 if !row.contains_key(&plan.foreign_key) {
241 if let Some(value) = row.remove(&foreign_key_column) {
242 row.insert(plan.foreign_key.clone(), value);
243 }
244 }
245 }
246 }
247 attach_relation_aggregate_rows(parent_rows, &plan, aggregate, aggregate_rows);
248 Ok(())
249 }
250
251 fn build_relation_plans(
252 &self,
253 entity: &str,
254 loads: &[String],
255 ) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
256 let descriptor = self.repository.metadata.context.require_entity(entity)?;
257 let mut grouped: BTreeMap<String, Vec<String>> = BTreeMap::new();
258 for load in loads {
259 if let Some((head, tail)) = load.split_once('.') {
260 grouped
261 .entry(head.to_owned())
262 .or_default()
263 .push(tail.to_owned());
264 } else {
265 grouped.entry(load.clone()).or_default();
266 }
267 }
268
269 grouped
270 .into_iter()
271 .map(|(name, child_loads)| {
272 let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
273 RuntimeError::MissingRelation {
274 entity: entity.to_owned(),
275 relation: name.clone(),
276 }
277 })?;
278 let child_repo = self.scoped_repository(relation.target_entity.clone());
279 let children =
280 child_repo.build_relation_plans(&relation.target_entity, &child_loads)?;
281 Ok(RelationLoadPlan {
282 parent_entity: entity.to_owned(),
283 relation_name: relation.name.clone(),
284 path: relation.name.clone(),
285 target_entity: relation.target_entity.clone(),
286 local_key: relation.local_key.clone(),
287 foreign_key: relation.foreign_key.clone(),
288 many: relation.many,
289 query: None,
290 children,
291 })
292 })
293 .collect()
294 }
295
296 fn build_relation_plans_from_loads(
297 &self,
298 entity: &str,
299 loads: &[RelationLoad],
300 ) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
301 let descriptor = self.repository.metadata.context.require_entity(entity)?;
302 loads
303 .iter()
304 .map(|load| {
305 let relation = descriptor.relation_by_name(&load.name).ok_or_else(|| {
306 RuntimeError::MissingRelation {
307 entity: entity.to_owned(),
308 relation: load.name.clone(),
309 }
310 })?;
311 let relation_query = load.query.as_deref().cloned();
312 let child_loads = relation_query
313 .as_ref()
314 .map(|query| query.relations.as_slice())
315 .unwrap_or_default();
316 let child_repo = self.scoped_repository(relation.target_entity.clone());
317 let children = child_repo
318 .build_relation_plans_from_loads(&relation.target_entity, child_loads)?;
319 Ok(RelationLoadPlan {
320 parent_entity: entity.to_owned(),
321 relation_name: relation.name.clone(),
322 path: relation.name.clone(),
323 target_entity: relation.target_entity.clone(),
324 local_key: relation.local_key.clone(),
325 foreign_key: relation.foreign_key.clone(),
326 many: relation.many,
327 query: relation_query,
328 children,
329 })
330 })
331 .collect()
332 }
333 fn enhance_plan(
334 &self,
335 parent_rows: &mut [Record],
336 plan: &RelationLoadPlan,
337 ) -> Result<(), RepositoryError<E::Error>> {
338 let child_repo = self.scoped_repository(plan.target_entity.clone());
339 let query = self.query_for_plan(plan, parent_rows);
340 let child_rows = child_repo.fetch_all(&query)?;
341 self.attach_relation_rows(parent_rows, plan, child_rows);
342
343 if !plan.children.is_empty() {
344 for parent in parent_rows.iter_mut() {
345 match parent.get_mut(&plan.relation_name) {
346 Some(Value::Object(child)) => {
347 child_repo.enhance_child_record(child, &plan.children)?;
348 }
349 Some(Value::List(values)) => {
350 for value in values.iter_mut() {
351 if let Value::Object(child) = value {
352 child_repo.enhance_child_record(child, &plan.children)?;
353 }
354 }
355 }
356 _ => {}
357 }
358 }
359 }
360 Ok(())
361 }
362
363 fn enhance_child_record(
364 &self,
365 child: &mut Record,
366 plans: &[RelationLoadPlan],
367 ) -> Result<(), RepositoryError<E::Error>> {
368 for plan in plans {
369 self.enhance_plan(slice::from_mut(child), plan)?;
370 }
371 Ok(())
372 }
373
374 fn query_for_plan(&self, plan: &RelationLoadPlan, parent_rows: &[Record]) -> SelectQuery {
375 let ids = parent_rows
376 .iter()
377 .filter_map(|row| row.get(&plan.local_key).cloned())
378 .collect::<Vec<_>>();
379
380 let mut query = plan
381 .query
382 .clone()
383 .unwrap_or_else(|| SelectQuery::new(plan.target_entity.clone()));
384 query.entity = plan.target_entity.clone();
385 ensure_projection(&mut query, &plan.foreign_key);
386 for child in &plan.children {
387 ensure_projection(&mut query, &child.local_key);
388 }
389 if !ids.is_empty() {
390 query = query.and_filter(Expr::in_list(plan.foreign_key.clone(), ids));
391 }
392 query
393 }
394
395 fn attach_relation_rows(
396 &self,
397 parent_rows: &mut [Record],
398 plan: &RelationLoadPlan,
399 child_rows: Vec<Record>,
400 ) {
401 let inverse_relation = self
402 .repository
403 .metadata
404 .context
405 .entity(&plan.target_entity)
406 .and_then(|descriptor| {
407 descriptor.relations.iter().find(|relation| {
408 relation.target_entity == plan.parent_entity
409 && relation.local_key == plan.foreign_key
410 && relation.foreign_key == plan.local_key
411 })
412 })
413 .map(|relation| relation.name.clone());
414
415 let mut buckets: BTreeMap<String, Vec<Record>> = BTreeMap::new();
416 for child in child_rows {
417 if let Some(key) = child.get(&plan.foreign_key) {
418 buckets
419 .entry(graph_identity_key(key))
420 .or_default()
421 .push(child);
422 }
423 }
424
425 for parent in parent_rows.iter_mut() {
426 let Some(local_value) = parent.get(&plan.local_key) else {
427 continue;
428 };
429 let bucket_key = graph_identity_key(local_value);
430 let related = buckets.get(&bucket_key).cloned().unwrap_or_default();
431 let related = if let Some(inverse_relation) = &inverse_relation {
432 let mut parent_object = parent.clone();
433 parent_object.remove(&plan.relation_name);
434 related
435 .into_iter()
436 .map(|mut child| {
437 child
438 .entry(inverse_relation.clone())
439 .or_insert_with(|| Value::object(parent_object.clone()));
440 child
441 })
442 .collect::<Vec<_>>()
443 } else {
444 related
445 };
446 if plan.many {
447 parent.insert(
448 plan.relation_name.clone(),
449 Value::List(related.into_iter().map(Value::object).collect()),
450 );
451 } else {
452 let value = related
453 .into_iter()
454 .next()
455 .map(Value::object)
456 .unwrap_or(Value::Null);
457 parent.insert(plan.relation_name.clone(), value);
458 }
459 }
460 }
461}