Skip to main content

teaql_runtime/repository/
relation.rs

1use std::collections::BTreeMap;
2use std::slice;
3
4use teaql_core::{
5    Aggregate, Expr, ObjectGroupBy, Record, RelationAggregate, RelationLoad, SelectQuery, Value,
6};
7use teaql_sql::SqlDialect;
8
9use crate::{RepositoryError, RuntimeError};
10
11use super::{QueryExecutor, RelationLoadPlan, ResolvedRepository, helpers::*};
12
13impl<'a, D, E> ResolvedRepository<'a, D, E>
14where
15    D: SqlDialect,
16    E: QueryExecutor,
17{
18    pub fn relation_loads(&self) -> Vec<String> {
19        self.behavior()
20            .map(|behavior| behavior.relation_loads(self.repository.metadata.context))
21            .unwrap_or_default()
22    }
23
24    pub fn relation_plans(&self) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
25        self.build_relation_plans(&self.entity, &self.relation_loads())
26    }
27
28    pub fn relation_query(
29        &self,
30        relation_name: &str,
31        parent_rows: &[Record],
32    ) -> Result<SelectQuery, RuntimeError> {
33        let plan = self
34            .relation_plans()?
35            .into_iter()
36            .find(|plan| plan.relation_name == relation_name)
37            .ok_or_else(|| RuntimeError::MissingRelation {
38                entity: self.entity.clone(),
39                relation: relation_name.to_owned(),
40            })?;
41        Ok(self.query_for_plan(&plan, parent_rows))
42    }
43
44    pub fn enhance_relations(
45        &self,
46        parent_rows: &mut [Record],
47    ) -> Result<(), RepositoryError<E::Error>> {
48        let plans = self.relation_plans().map_err(RepositoryError::Runtime)?;
49        for plan in plans {
50            self.enhance_plan(parent_rows, &plan)?;
51        }
52        Ok(())
53    }
54
55    pub fn enhance_query_relations(
56        &self,
57        parent_rows: &mut [Record],
58        query: &SelectQuery,
59    ) -> Result<(), RepositoryError<E::Error>> {
60        let plans = self
61            .build_relation_plans_from_loads(&query.entity, &query.relations)
62            .map_err(RepositoryError::Runtime)?;
63        for plan in plans {
64            self.enhance_plan(parent_rows, &plan)?;
65        }
66        Ok(())
67    }
68
69    pub fn enhance_relation_aggregates(
70        &self,
71        parent_rows: &mut [Record],
72        relation_aggregates: &[RelationAggregate],
73        parent_cache_options: Option<teaql_core::AggregationCacheOptions>,
74        parent_trace_chain: &[teaql_core::TraceNode],
75    ) -> Result<(), RepositoryError<E::Error>> {
76        for aggregate in relation_aggregates {
77            self.enhance_relation_aggregate(parent_rows, aggregate, parent_cache_options, parent_trace_chain)?;
78        }
79        Ok(())
80    }
81
82    pub fn enhance_object_group_bys(
83        &self,
84        rows: &mut [Record],
85        object_group_bys: &[ObjectGroupBy],
86        parent_trace_chain: &[teaql_core::TraceNode],
87    ) -> Result<(), RepositoryError<E::Error>> {
88        for group_by in object_group_bys {
89            let ids = rows
90                .iter()
91                .filter_map(|row| row.get(&group_by.storage_field).cloned())
92                .collect::<Vec<_>>();
93            if ids.is_empty() {
94                continue;
95            }
96            let mut query = group_by.query.clone();
97            ensure_projection(&mut query, "id");
98            query = query.and_filter(Expr::in_list("id", ids));
99            let object_rows = self
100                .scoped_repository(query.entity.clone())
101                .with_trace_context(parent_trace_chain.to_vec())
102                .fetch_all(&query)?
103                .into_iter()
104                .filter_map(|row| {
105                    row.get("id")
106                        .cloned()
107                        .map(|id| (graph_identity_key(&id), row))
108                })
109                .collect::<BTreeMap<_, _>>();
110            for row in rows.iter_mut() {
111                if let Some(key) = row.get(&group_by.storage_field).map(graph_identity_key) {
112                    let value = object_rows
113                        .get(&key)
114                        .cloned()
115                        .map(Value::object)
116                        .unwrap_or(Value::Null);
117                    row.insert(group_by.property_name.clone(), value);
118                }
119            }
120        }
121        Ok(())
122    }
123
124    pub fn enhance_child_queries(
125        &self,
126        rows: &mut [Record],
127        child_queries: &[SelectQuery],
128        parent_trace_chain: &[teaql_core::TraceNode],
129    ) -> Result<(), RepositoryError<E::Error>> {
130        for child_query in child_queries {
131            let ids = rows
132                .iter()
133                .filter_map(|row| row.get("id").cloned())
134                .collect::<Vec<_>>();
135            if ids.is_empty() {
136                continue;
137            }
138            let mut query = child_query.clone();
139            ensure_projection(&mut query, "id");
140            query = query.and_filter(Expr::in_list("id", ids));
141            let child_rows = self
142                .scoped_repository(query.entity.clone())
143                .with_trace_context(parent_trace_chain.to_vec())
144                .fetch_all(&query)?
145                .into_iter()
146                .filter_map(|row| {
147                    row.get("id")
148                        .cloned()
149                        .map(|id| (graph_identity_key(&id), row))
150                })
151                .collect::<BTreeMap<_, _>>();
152            for row in rows.iter_mut() {
153                if let Some(key) = row.get("id").map(graph_identity_key) {
154                    if let Some(child) = child_rows.get(&key) {
155                        row.extend(child.clone());
156                    }
157                }
158            }
159        }
160        Ok(())
161    }
162
163    fn enhance_relation_aggregate(
164        &self,
165        parent_rows: &mut [Record],
166        aggregate: &RelationAggregate,
167        parent_cache_options: Option<teaql_core::AggregationCacheOptions>,
168        parent_trace_chain: &[teaql_core::TraceNode],
169    ) -> Result<(), RepositoryError<E::Error>> {
170        let plan = self
171            .build_relation_plans_from_loads(
172                &self.entity,
173                &[RelationLoad::with_query(
174                    aggregate.relation_name.clone(),
175                    aggregate.query.clone(),
176                )],
177            )
178            .map_err(RepositoryError::Runtime)?
179            .into_iter()
180            .next()
181            .ok_or_else(|| {
182                RepositoryError::Runtime(RuntimeError::MissingRelation {
183                    entity: self.entity.clone(),
184                    relation: aggregate.relation_name.clone(),
185                })
186            })?;
187
188        let ids = parent_rows
189            .iter()
190            .filter_map(|row| row.get(&plan.local_key).cloned())
191            .collect::<Vec<_>>();
192        if ids.is_empty() {
193            attach_empty_relation_aggregate(parent_rows, &aggregate.alias, aggregate.single_result);
194            return Ok(());
195        }
196
197        let child_repo = self.scoped_repository(plan.target_entity.clone());
198        let mut query = aggregate.query.clone();
199        query.entity = plan.target_entity.clone();
200        if query.aggregation_cache.is_none() {
201            if let Some(options) = parent_cache_options.filter(|options| options.propagate) {
202                query.aggregation_cache = Some(teaql_core::AggregationCacheOptions::enabled(
203                    options.propagate_cache_expired_millis,
204                ));
205            }
206        }
207        query.projection.clear();
208        query.expr_projection.clear();
209        query.order_by.clear();
210        query.slice = None;
211        query.relations.clear();
212        if query.aggregates.is_empty() {
213            let alias = if aggregate.single_result {
214                aggregate.alias.clone()
215            } else {
216                "count".to_owned()
217            };
218            query = query.aggregate(Aggregate::count(alias));
219        }
220        if !query
221            .group_by
222            .iter()
223            .any(|field| field == &plan.foreign_key)
224        {
225            query = query.group_by(plan.foreign_key.clone());
226        }
227        query = query.and_filter(Expr::in_list(plan.foreign_key.clone(), ids));
228
229        let mut chain = parent_trace_chain.to_vec();
230        chain.push(teaql_core::TraceNode {
231            entity_type: query.entity.clone(),
232            entity_id: None,
233            comment: aggregate.alias.clone(),
234        });
235
236        let mut aggregate_rows = child_repo.with_trace_context(chain).fetch_all(&query)?;
237        let foreign_key_column = self
238            .repository
239            .metadata
240            .context
241            .entity(&plan.target_entity)
242            .and_then(|descriptor| {
243                descriptor
244                    .properties
245                    .iter()
246                    .find(|property| property.name == plan.foreign_key)
247                    .map(|property| property.column_name.clone())
248            });
249        if let Some(foreign_key_column) =
250            foreign_key_column.filter(|column| column != &plan.foreign_key)
251        {
252            for row in &mut aggregate_rows {
253                if !row.contains_key(&plan.foreign_key) {
254                    if let Some(value) = row.remove(&foreign_key_column) {
255                        row.insert(plan.foreign_key.clone(), value);
256                    }
257                }
258            }
259        }
260        attach_relation_aggregate_rows(parent_rows, &plan, aggregate, aggregate_rows);
261        Ok(())
262    }
263
264    fn build_relation_plans(
265        &self,
266        entity: &str,
267        loads: &[String],
268    ) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
269        let descriptor = self.repository.metadata.context.require_entity(entity)?;
270        let mut grouped: BTreeMap<String, Vec<String>> = BTreeMap::new();
271        for load in loads {
272            if let Some((head, tail)) = load.split_once('.') {
273                grouped
274                    .entry(head.to_owned())
275                    .or_default()
276                    .push(tail.to_owned());
277            } else {
278                grouped.entry(load.clone()).or_default();
279            }
280        }
281
282        grouped
283            .into_iter()
284            .map(|(name, child_loads)| {
285                let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
286                    RuntimeError::MissingRelation {
287                        entity: entity.to_owned(),
288                        relation: name.clone(),
289                    }
290                })?;
291                let child_repo = self.scoped_repository(relation.target_entity.clone());
292                let children =
293                    child_repo.build_relation_plans(&relation.target_entity, &child_loads)?;
294                Ok(RelationLoadPlan {
295                    parent_entity: entity.to_owned(),
296                    relation_name: relation.name.clone(),
297                    path: relation.name.clone(),
298                    target_entity: relation.target_entity.clone(),
299                    local_key: relation.local_key.clone(),
300                    foreign_key: relation.foreign_key.clone(),
301                    many: relation.many,
302                    query: None,
303                    children,
304                })
305            })
306            .collect()
307    }
308
309    fn build_relation_plans_from_loads(
310        &self,
311        entity: &str,
312        loads: &[RelationLoad],
313    ) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
314        let descriptor = self.repository.metadata.context.require_entity(entity)?;
315        loads
316            .iter()
317            .map(|load| {
318                let relation = descriptor.relation_by_name(&load.name).ok_or_else(|| {
319                    RuntimeError::MissingRelation {
320                        entity: entity.to_owned(),
321                        relation: load.name.clone(),
322                    }
323                })?;
324                let relation_query = load.query.as_deref().cloned();
325                let child_loads = relation_query
326                    .as_ref()
327                    .map(|query| query.relations.as_slice())
328                    .unwrap_or_default();
329                let child_repo = self.scoped_repository(relation.target_entity.clone());
330                let children = child_repo
331                    .build_relation_plans_from_loads(&relation.target_entity, child_loads)?;
332                Ok(RelationLoadPlan {
333                    parent_entity: entity.to_owned(),
334                    relation_name: relation.name.clone(),
335                    path: relation.name.clone(),
336                    target_entity: relation.target_entity.clone(),
337                    local_key: relation.local_key.clone(),
338                    foreign_key: relation.foreign_key.clone(),
339                    many: relation.many,
340                    query: relation_query,
341                    children,
342                })
343            })
344            .collect()
345    }
346    fn enhance_plan(
347        &self,
348        parent_rows: &mut [Record],
349        plan: &RelationLoadPlan,
350    ) -> Result<(), RepositoryError<E::Error>> {
351        let child_repo = self.scoped_repository(plan.target_entity.clone());
352        let query = self.query_for_plan(plan, parent_rows);
353        let child_rows = child_repo.fetch_all(&query)?;
354        self.attach_relation_rows(parent_rows, plan, child_rows);
355
356        if !plan.children.is_empty() {
357            for parent in parent_rows.iter_mut() {
358                match parent.get_mut(&plan.relation_name) {
359                    Some(Value::Object(child)) => {
360                        child_repo.enhance_child_record(child, &plan.children)?;
361                    }
362                    Some(Value::List(values)) => {
363                        for value in values.iter_mut() {
364                            if let Value::Object(child) = value {
365                                child_repo.enhance_child_record(child, &plan.children)?;
366                            }
367                        }
368                    }
369                    _ => {}
370                }
371            }
372        }
373        Ok(())
374    }
375
376    fn enhance_child_record(
377        &self,
378        child: &mut Record,
379        plans: &[RelationLoadPlan],
380    ) -> Result<(), RepositoryError<E::Error>> {
381        for plan in plans {
382            self.enhance_plan(slice::from_mut(child), plan)?;
383        }
384        Ok(())
385    }
386
387    fn query_for_plan(&self, plan: &RelationLoadPlan, parent_rows: &[Record]) -> SelectQuery {
388        let ids = parent_rows
389            .iter()
390            .filter_map(|row| row.get(&plan.local_key).cloned())
391            .collect::<Vec<_>>();
392
393        let mut query = plan
394            .query
395            .clone()
396            .unwrap_or_else(|| SelectQuery::new(plan.target_entity.clone()));
397        query.entity = plan.target_entity.clone();
398        ensure_projection(&mut query, &plan.foreign_key);
399        for child in &plan.children {
400            ensure_projection(&mut query, &child.local_key);
401        }
402        if !ids.is_empty() {
403            query = query.and_filter(Expr::in_list(plan.foreign_key.clone(), ids));
404        }
405        query
406    }
407
408    fn attach_relation_rows(
409        &self,
410        parent_rows: &mut [Record],
411        plan: &RelationLoadPlan,
412        child_rows: Vec<Record>,
413    ) {
414        let inverse_relation = self
415            .repository
416            .metadata
417            .context
418            .entity(&plan.target_entity)
419            .and_then(|descriptor| {
420                descriptor.relations.iter().find(|relation| {
421                    relation.target_entity == plan.parent_entity
422                        && relation.local_key == plan.foreign_key
423                        && relation.foreign_key == plan.local_key
424                })
425            })
426            .map(|relation| (relation.name.clone(), relation.many));
427
428        let mut buckets: BTreeMap<String, Vec<Record>> = BTreeMap::new();
429        for child in child_rows {
430            if let Some(key) = child.get(&plan.foreign_key) {
431                buckets
432                    .entry(graph_identity_key(key))
433                    .or_default()
434                    .push(child);
435            }
436        }
437
438        for parent in parent_rows.iter_mut() {
439            let Some(local_value) = parent.get(&plan.local_key) else {
440                continue;
441            };
442            let bucket_key = graph_identity_key(local_value);
443            let related = buckets.get(&bucket_key).cloned().unwrap_or_default();
444            let related = if let Some((inverse_relation, inverse_many)) = &inverse_relation {
445                let mut parent_object = parent.clone();
446                parent_object.remove(&plan.relation_name);
447                related
448                    .into_iter()
449                    .map(|mut child| {
450                        if *inverse_many {
451                            let entry = child
452                                .entry(inverse_relation.clone())
453                                .or_insert_with(|| Value::List(Vec::new()));
454                            if let Value::List(list) = entry {
455                                list.push(Value::object(parent_object.clone()));
456                            }
457                        } else {
458                            child.insert(inverse_relation.clone(), Value::object(parent_object.clone()));
459                        }
460                        child
461                    })
462                    .collect::<Vec<_>>()
463            } else {
464                related
465            };
466            if plan.many {
467                parent.insert(
468                    plan.relation_name.clone(),
469                    Value::List(related.into_iter().map(Value::object).collect()),
470                );
471            } else {
472                let value = related
473                    .into_iter()
474                    .next()
475                    .map(Value::object)
476                    .unwrap_or(Value::Null);
477                parent.insert(plan.relation_name.clone(), value);
478            }
479        }
480    }
481}