Skip to main content

teaql_runtime/repository/
relation.rs

1use std::collections::BTreeMap;
2use std::slice;
3
4use teaql_core::{
5    Aggregate, Expr, ObjectGroupBy, Record, RelationAggregate, RelationLoad, SelectQuery, Value,
6};
7use teaql_sql::SqlDialect;
8
9use crate::{RepositoryError, RuntimeError};
10
11use super::{QueryExecutor, RelationLoadPlan, ResolvedRepository, helpers::*};
12
13impl<'a, D, E> ResolvedRepository<'a, D, E>
14where
15    D: SqlDialect,
16    E: QueryExecutor,
17{
18    pub fn relation_loads(&self) -> Vec<String> {
19        self.behavior()
20            .map(|behavior| behavior.relation_loads(self.repository.metadata.context))
21            .unwrap_or_default()
22    }
23
24    pub fn relation_plans(&self) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
25        self.build_relation_plans(&self.entity, &self.relation_loads())
26    }
27
28    pub fn relation_query(
29        &self,
30        relation_name: &str,
31        parent_rows: &[Record],
32    ) -> Result<SelectQuery, RuntimeError> {
33        let plan = self
34            .relation_plans()?
35            .into_iter()
36            .find(|plan| plan.relation_name == relation_name)
37            .ok_or_else(|| RuntimeError::MissingRelation {
38                entity: self.entity.clone(),
39                relation: relation_name.to_owned(),
40            })?;
41        Ok(self.query_for_plan(&plan, parent_rows))
42    }
43
44    pub fn enhance_relations(
45        &self,
46        parent_rows: &mut [Record],
47    ) -> Result<(), RepositoryError<E::Error>> {
48        let plans = self.relation_plans().map_err(RepositoryError::Runtime)?;
49        for plan in plans {
50            self.enhance_plan(parent_rows, &plan)?;
51        }
52        Ok(())
53    }
54
55    pub fn enhance_query_relations(
56        &self,
57        parent_rows: &mut [Record],
58        query: &SelectQuery,
59    ) -> Result<(), RepositoryError<E::Error>> {
60        let plans = self
61            .build_relation_plans_from_loads(&query.entity, &query.relations)
62            .map_err(RepositoryError::Runtime)?;
63        for plan in plans {
64            self.enhance_plan(parent_rows, &plan)?;
65        }
66        Ok(())
67    }
68
69    pub fn enhance_relation_aggregates(
70        &self,
71        parent_rows: &mut [Record],
72        relation_aggregates: &[RelationAggregate],
73        parent_cache_options: Option<teaql_core::AggregationCacheOptions>,
74    ) -> Result<(), RepositoryError<E::Error>> {
75        for aggregate in relation_aggregates {
76            self.enhance_relation_aggregate(parent_rows, aggregate, parent_cache_options)?;
77        }
78        Ok(())
79    }
80
81    pub fn enhance_object_group_bys(
82        &self,
83        rows: &mut [Record],
84        object_group_bys: &[ObjectGroupBy],
85    ) -> Result<(), RepositoryError<E::Error>> {
86        for group_by in object_group_bys {
87            let ids = rows
88                .iter()
89                .filter_map(|row| row.get(&group_by.storage_field).cloned())
90                .collect::<Vec<_>>();
91            if ids.is_empty() {
92                continue;
93            }
94            let mut query = group_by.query.clone();
95            ensure_projection(&mut query, "id");
96            query = query.and_filter(Expr::in_list("id", ids));
97            let object_rows = self
98                .scoped_repository(query.entity.clone())
99                .fetch_all(&query)?
100                .into_iter()
101                .filter_map(|row| {
102                    row.get("id")
103                        .cloned()
104                        .map(|id| (relation_bucket_key(&id), row))
105                })
106                .collect::<BTreeMap<_, _>>();
107            for row in rows.iter_mut() {
108                if let Some(key) = row.get(&group_by.storage_field).map(relation_bucket_key) {
109                    let value = object_rows
110                        .get(&key)
111                        .cloned()
112                        .map(Value::object)
113                        .unwrap_or(Value::Null);
114                    row.insert(group_by.property_name.clone(), value);
115                }
116            }
117        }
118        Ok(())
119    }
120
121    pub fn enhance_child_queries(
122        &self,
123        rows: &mut [Record],
124        child_queries: &[SelectQuery],
125    ) -> Result<(), RepositoryError<E::Error>> {
126        for child_query in child_queries {
127            let ids = rows
128                .iter()
129                .filter_map(|row| row.get("id").cloned())
130                .collect::<Vec<_>>();
131            if ids.is_empty() {
132                continue;
133            }
134            let mut query = child_query.clone();
135            ensure_projection(&mut query, "id");
136            query = query.and_filter(Expr::in_list("id", ids));
137            let child_rows = self
138                .scoped_repository(query.entity.clone())
139                .fetch_all(&query)?
140                .into_iter()
141                .filter_map(|row| {
142                    row.get("id")
143                        .cloned()
144                        .map(|id| (relation_bucket_key(&id), row))
145                })
146                .collect::<BTreeMap<_, _>>();
147            for row in rows.iter_mut() {
148                if let Some(key) = row.get("id").map(relation_bucket_key) {
149                    if let Some(child) = child_rows.get(&key) {
150                        row.extend(child.clone());
151                    }
152                }
153            }
154        }
155        Ok(())
156    }
157
158    fn enhance_relation_aggregate(
159        &self,
160        parent_rows: &mut [Record],
161        aggregate: &RelationAggregate,
162        parent_cache_options: Option<teaql_core::AggregationCacheOptions>,
163    ) -> Result<(), RepositoryError<E::Error>> {
164        let plan = self
165            .build_relation_plans_from_loads(
166                &self.entity,
167                &[RelationLoad::with_query(
168                    aggregate.relation_name.clone(),
169                    aggregate.query.clone(),
170                )],
171            )
172            .map_err(RepositoryError::Runtime)?
173            .into_iter()
174            .next()
175            .ok_or_else(|| {
176                RepositoryError::Runtime(RuntimeError::MissingRelation {
177                    entity: self.entity.clone(),
178                    relation: aggregate.relation_name.clone(),
179                })
180            })?;
181
182        let ids = parent_rows
183            .iter()
184            .filter_map(|row| row.get(&plan.local_key).cloned())
185            .collect::<Vec<_>>();
186        if ids.is_empty() {
187            attach_empty_relation_aggregate(parent_rows, &aggregate.alias, aggregate.single_result);
188            return Ok(());
189        }
190
191        let child_repo = self.scoped_repository(plan.target_entity.clone());
192        let mut query = aggregate.query.clone();
193        query.entity = plan.target_entity.clone();
194        if query.aggregation_cache.is_none() {
195            if let Some(options) = parent_cache_options.filter(|options| options.propagate) {
196                query.aggregation_cache = Some(teaql_core::AggregationCacheOptions::enabled(
197                    options.propagate_cache_expired_millis,
198                ));
199            }
200        }
201        query.projection.clear();
202        query.expr_projection.clear();
203        query.order_by.clear();
204        query.slice = None;
205        query.relations.clear();
206        if query.aggregates.is_empty() {
207            let alias = if aggregate.single_result {
208                aggregate.alias.clone()
209            } else {
210                "count".to_owned()
211            };
212            query = query.aggregate(Aggregate::count(alias));
213        }
214        if !query
215            .group_by
216            .iter()
217            .any(|field| field == &plan.foreign_key)
218        {
219            query = query.group_by(plan.foreign_key.clone());
220        }
221        query = query.and_filter(Expr::in_list(plan.foreign_key.clone(), ids));
222
223        let aggregate_rows = child_repo.fetch_all(&query)?;
224        attach_relation_aggregate_rows(parent_rows, &plan, aggregate, aggregate_rows);
225        Ok(())
226    }
227
228    fn build_relation_plans(
229        &self,
230        entity: &str,
231        loads: &[String],
232    ) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
233        let descriptor = self.repository.metadata.context.require_entity(entity)?;
234        let mut grouped: BTreeMap<String, Vec<String>> = BTreeMap::new();
235        for load in loads {
236            if let Some((head, tail)) = load.split_once('.') {
237                grouped
238                    .entry(head.to_owned())
239                    .or_default()
240                    .push(tail.to_owned());
241            } else {
242                grouped.entry(load.clone()).or_default();
243            }
244        }
245
246        grouped
247            .into_iter()
248            .map(|(name, child_loads)| {
249                let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
250                    RuntimeError::MissingRelation {
251                        entity: entity.to_owned(),
252                        relation: name.clone(),
253                    }
254                })?;
255                let child_repo = self.scoped_repository(relation.target_entity.clone());
256                let children =
257                    child_repo.build_relation_plans(&relation.target_entity, &child_loads)?;
258                Ok(RelationLoadPlan {
259                    parent_entity: entity.to_owned(),
260                    relation_name: relation.name.clone(),
261                    path: relation.name.clone(),
262                    target_entity: relation.target_entity.clone(),
263                    local_key: relation.local_key.clone(),
264                    foreign_key: relation.foreign_key.clone(),
265                    many: relation.many,
266                    query: None,
267                    children,
268                })
269            })
270            .collect()
271    }
272
273    fn build_relation_plans_from_loads(
274        &self,
275        entity: &str,
276        loads: &[RelationLoad],
277    ) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
278        let descriptor = self.repository.metadata.context.require_entity(entity)?;
279        loads
280            .iter()
281            .map(|load| {
282                let relation = descriptor.relation_by_name(&load.name).ok_or_else(|| {
283                    RuntimeError::MissingRelation {
284                        entity: entity.to_owned(),
285                        relation: load.name.clone(),
286                    }
287                })?;
288                let relation_query = load.query.as_deref().cloned();
289                let child_loads = relation_query
290                    .as_ref()
291                    .map(|query| query.relations.as_slice())
292                    .unwrap_or_default();
293                let child_repo = self.scoped_repository(relation.target_entity.clone());
294                let children = child_repo
295                    .build_relation_plans_from_loads(&relation.target_entity, child_loads)?;
296                Ok(RelationLoadPlan {
297                    parent_entity: entity.to_owned(),
298                    relation_name: relation.name.clone(),
299                    path: relation.name.clone(),
300                    target_entity: relation.target_entity.clone(),
301                    local_key: relation.local_key.clone(),
302                    foreign_key: relation.foreign_key.clone(),
303                    many: relation.many,
304                    query: relation_query,
305                    children,
306                })
307            })
308            .collect()
309    }
310    fn enhance_plan(
311        &self,
312        parent_rows: &mut [Record],
313        plan: &RelationLoadPlan,
314    ) -> Result<(), RepositoryError<E::Error>> {
315        let child_repo = self.scoped_repository(plan.target_entity.clone());
316        let query = self.query_for_plan(plan, parent_rows);
317        let child_rows = child_repo.fetch_all(&query)?;
318        self.attach_relation_rows(parent_rows, plan, child_rows);
319
320        if !plan.children.is_empty() {
321            for parent in parent_rows.iter_mut() {
322                match parent.get_mut(&plan.relation_name) {
323                    Some(Value::Object(child)) => {
324                        child_repo.enhance_child_record(child, &plan.children)?;
325                    }
326                    Some(Value::List(values)) => {
327                        for value in values.iter_mut() {
328                            if let Value::Object(child) = value {
329                                child_repo.enhance_child_record(child, &plan.children)?;
330                            }
331                        }
332                    }
333                    _ => {}
334                }
335            }
336        }
337        Ok(())
338    }
339
340    fn enhance_child_record(
341        &self,
342        child: &mut Record,
343        plans: &[RelationLoadPlan],
344    ) -> Result<(), RepositoryError<E::Error>> {
345        for plan in plans {
346            self.enhance_plan(slice::from_mut(child), plan)?;
347        }
348        Ok(())
349    }
350
351    fn query_for_plan(&self, plan: &RelationLoadPlan, parent_rows: &[Record]) -> SelectQuery {
352        let ids = parent_rows
353            .iter()
354            .filter_map(|row| row.get(&plan.local_key).cloned())
355            .collect::<Vec<_>>();
356
357        let mut query = plan
358            .query
359            .clone()
360            .unwrap_or_else(|| SelectQuery::new(plan.target_entity.clone()));
361        query.entity = plan.target_entity.clone();
362        ensure_projection(&mut query, &plan.foreign_key);
363        for child in &plan.children {
364            ensure_projection(&mut query, &child.local_key);
365        }
366        if !ids.is_empty() {
367            query = query.and_filter(Expr::in_list(plan.foreign_key.clone(), ids));
368        }
369        query
370    }
371
372    fn attach_relation_rows(
373        &self,
374        parent_rows: &mut [Record],
375        plan: &RelationLoadPlan,
376        child_rows: Vec<Record>,
377    ) {
378        let inverse_relation = self
379            .repository
380            .metadata
381            .context
382            .entity(&plan.target_entity)
383            .and_then(|descriptor| {
384                descriptor.relations.iter().find(|relation| {
385                    relation.target_entity == plan.parent_entity
386                        && relation.local_key == plan.foreign_key
387                        && relation.foreign_key == plan.local_key
388                })
389            })
390            .map(|relation| relation.name.clone());
391
392        let mut buckets: BTreeMap<String, Vec<Record>> = BTreeMap::new();
393        for child in child_rows {
394            if let Some(key) = child.get(&plan.foreign_key) {
395                buckets
396                    .entry(relation_bucket_key(key))
397                    .or_default()
398                    .push(child);
399            }
400        }
401
402        for parent in parent_rows.iter_mut() {
403            let Some(local_value) = parent.get(&plan.local_key) else {
404                continue;
405            };
406            let bucket_key = relation_bucket_key(local_value);
407            let related = buckets.get(&bucket_key).cloned().unwrap_or_default();
408            let related = if let Some(inverse_relation) = &inverse_relation {
409                let mut parent_object = parent.clone();
410                parent_object.remove(&plan.relation_name);
411                related
412                    .into_iter()
413                    .map(|mut child| {
414                        child
415                            .entry(inverse_relation.clone())
416                            .or_insert_with(|| Value::object(parent_object.clone()));
417                        child
418                    })
419                    .collect::<Vec<_>>()
420            } else {
421                related
422            };
423            if plan.many {
424                parent.insert(
425                    plan.relation_name.clone(),
426                    Value::List(related.into_iter().map(Value::object).collect()),
427                );
428            } else {
429                let value = related
430                    .into_iter()
431                    .next()
432                    .map(Value::object)
433                    .unwrap_or(Value::Null);
434                parent.insert(plan.relation_name.clone(), value);
435            }
436        }
437    }
438}