Skip to main content

teaql_runtime/repository/
relation.rs

1use std::collections::BTreeMap;
2use std::slice;
3
4use teaql_core::{
5    Aggregate, Expr, ObjectGroupBy, Record, RelationAggregate, RelationLoad, SelectQuery, Value,
6};
7use teaql_sql::SqlDialect;
8
9use crate::{RepositoryError, RuntimeError};
10
11use super::{QueryExecutor, RelationLoadPlan, ResolvedRepository, helpers::*};
12
13impl<'a, D, E> ResolvedRepository<'a, D, E>
14where
15    D: SqlDialect,
16    E: QueryExecutor,
17{
18    pub fn relation_loads(&self) -> Vec<String> {
19        self.behavior()
20            .map(|behavior| behavior.relation_loads(self.repository.metadata.context))
21            .unwrap_or_default()
22    }
23
24    pub fn relation_plans(&self) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
25        self.build_relation_plans(&self.entity, &self.relation_loads())
26    }
27
28    pub fn relation_query(
29        &self,
30        relation_name: &str,
31        parent_rows: &[Record],
32    ) -> Result<SelectQuery, RuntimeError> {
33        let plan = self
34            .relation_plans()?
35            .into_iter()
36            .find(|plan| plan.relation_name == relation_name)
37            .ok_or_else(|| RuntimeError::MissingRelation {
38                entity: self.entity.clone(),
39                relation: relation_name.to_owned(),
40            })?;
41        Ok(self.query_for_plan(&plan, parent_rows))
42    }
43
44    pub fn enhance_relations(
45        &self,
46        parent_rows: &mut [Record],
47    ) -> Result<(), RepositoryError<E::Error>> {
48        let plans = self.relation_plans().map_err(RepositoryError::Runtime)?;
49        for plan in plans {
50            self.enhance_plan(parent_rows, &plan)?;
51        }
52        Ok(())
53    }
54
55    pub fn enhance_query_relations(
56        &self,
57        parent_rows: &mut [Record],
58        query: &SelectQuery,
59    ) -> Result<(), RepositoryError<E::Error>> {
60        let plans = self
61            .build_relation_plans_from_loads(&query.entity, &query.relations)
62            .map_err(RepositoryError::Runtime)?;
63        for plan in plans {
64            self.enhance_plan(parent_rows, &plan)?;
65        }
66        Ok(())
67    }
68
69    pub fn enhance_relation_aggregates(
70        &self,
71        parent_rows: &mut [Record],
72        relation_aggregates: &[RelationAggregate],
73        parent_cache_options: Option<teaql_core::AggregationCacheOptions>,
74    ) -> Result<(), RepositoryError<E::Error>> {
75        for aggregate in relation_aggregates {
76            self.enhance_relation_aggregate(parent_rows, aggregate, parent_cache_options)?;
77        }
78        Ok(())
79    }
80
81    pub fn enhance_object_group_bys(
82        &self,
83        rows: &mut [Record],
84        object_group_bys: &[ObjectGroupBy],
85    ) -> Result<(), RepositoryError<E::Error>> {
86        for group_by in object_group_bys {
87            let ids = rows
88                .iter()
89                .filter_map(|row| row.get(&group_by.storage_field).cloned())
90                .collect::<Vec<_>>();
91            if ids.is_empty() {
92                continue;
93            }
94            let mut query = group_by.query.clone();
95            ensure_projection(&mut query, "id");
96            query = query.and_filter(Expr::in_list("id", ids));
97            let object_rows = self
98                .scoped_repository(query.entity.clone())
99                .fetch_all(&query)?
100                .into_iter()
101                .filter_map(|row| {
102                    row.get("id")
103                        .cloned()
104                        .map(|id| (graph_identity_key(&id), row))
105                })
106                .collect::<BTreeMap<_, _>>();
107            for row in rows.iter_mut() {
108                if let Some(key) = row.get(&group_by.storage_field).map(graph_identity_key) {
109                    let value = object_rows
110                        .get(&key)
111                        .cloned()
112                        .map(Value::object)
113                        .unwrap_or(Value::Null);
114                    row.insert(group_by.property_name.clone(), value);
115                }
116            }
117        }
118        Ok(())
119    }
120
121    pub fn enhance_child_queries(
122        &self,
123        rows: &mut [Record],
124        child_queries: &[SelectQuery],
125    ) -> Result<(), RepositoryError<E::Error>> {
126        for child_query in child_queries {
127            let ids = rows
128                .iter()
129                .filter_map(|row| row.get("id").cloned())
130                .collect::<Vec<_>>();
131            if ids.is_empty() {
132                continue;
133            }
134            let mut query = child_query.clone();
135            ensure_projection(&mut query, "id");
136            query = query.and_filter(Expr::in_list("id", ids));
137            let child_rows = self
138                .scoped_repository(query.entity.clone())
139                .fetch_all(&query)?
140                .into_iter()
141                .filter_map(|row| {
142                    row.get("id")
143                        .cloned()
144                        .map(|id| (graph_identity_key(&id), row))
145                })
146                .collect::<BTreeMap<_, _>>();
147            for row in rows.iter_mut() {
148                if let Some(key) = row.get("id").map(graph_identity_key) {
149                    if let Some(child) = child_rows.get(&key) {
150                        row.extend(child.clone());
151                    }
152                }
153            }
154        }
155        Ok(())
156    }
157
158    fn enhance_relation_aggregate(
159        &self,
160        parent_rows: &mut [Record],
161        aggregate: &RelationAggregate,
162        parent_cache_options: Option<teaql_core::AggregationCacheOptions>,
163    ) -> Result<(), RepositoryError<E::Error>> {
164        let plan = self
165            .build_relation_plans_from_loads(
166                &self.entity,
167                &[RelationLoad::with_query(
168                    aggregate.relation_name.clone(),
169                    aggregate.query.clone(),
170                )],
171            )
172            .map_err(RepositoryError::Runtime)?
173            .into_iter()
174            .next()
175            .ok_or_else(|| {
176                RepositoryError::Runtime(RuntimeError::MissingRelation {
177                    entity: self.entity.clone(),
178                    relation: aggregate.relation_name.clone(),
179                })
180            })?;
181
182        let ids = parent_rows
183            .iter()
184            .filter_map(|row| row.get(&plan.local_key).cloned())
185            .collect::<Vec<_>>();
186        if ids.is_empty() {
187            attach_empty_relation_aggregate(parent_rows, &aggregate.alias, aggregate.single_result);
188            return Ok(());
189        }
190
191        let child_repo = self.scoped_repository(plan.target_entity.clone());
192        let mut query = aggregate.query.clone();
193        query.entity = plan.target_entity.clone();
194        if query.aggregation_cache.is_none() {
195            if let Some(options) = parent_cache_options.filter(|options| options.propagate) {
196                query.aggregation_cache = Some(teaql_core::AggregationCacheOptions::enabled(
197                    options.propagate_cache_expired_millis,
198                ));
199            }
200        }
201        query.projection.clear();
202        query.expr_projection.clear();
203        query.order_by.clear();
204        query.slice = None;
205        query.relations.clear();
206        if query.aggregates.is_empty() {
207            let alias = if aggregate.single_result {
208                aggregate.alias.clone()
209            } else {
210                "count".to_owned()
211            };
212            query = query.aggregate(Aggregate::count(alias));
213        }
214        if !query
215            .group_by
216            .iter()
217            .any(|field| field == &plan.foreign_key)
218        {
219            query = query.group_by(plan.foreign_key.clone());
220        }
221        query = query.and_filter(Expr::in_list(plan.foreign_key.clone(), ids));
222
223        let mut aggregate_rows = child_repo.fetch_all(&query)?;
224        let foreign_key_column = self
225            .repository
226            .metadata
227            .context
228            .entity(&plan.target_entity)
229            .and_then(|descriptor| {
230                descriptor
231                    .properties
232                    .iter()
233                    .find(|property| property.name == plan.foreign_key)
234                    .map(|property| property.column_name.clone())
235            });
236        if let Some(foreign_key_column) =
237            foreign_key_column.filter(|column| column != &plan.foreign_key)
238        {
239            for row in &mut aggregate_rows {
240                if !row.contains_key(&plan.foreign_key) {
241                    if let Some(value) = row.remove(&foreign_key_column) {
242                        row.insert(plan.foreign_key.clone(), value);
243                    }
244                }
245            }
246        }
247        attach_relation_aggregate_rows(parent_rows, &plan, aggregate, aggregate_rows);
248        Ok(())
249    }
250
251    fn build_relation_plans(
252        &self,
253        entity: &str,
254        loads: &[String],
255    ) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
256        let descriptor = self.repository.metadata.context.require_entity(entity)?;
257        let mut grouped: BTreeMap<String, Vec<String>> = BTreeMap::new();
258        for load in loads {
259            if let Some((head, tail)) = load.split_once('.') {
260                grouped
261                    .entry(head.to_owned())
262                    .or_default()
263                    .push(tail.to_owned());
264            } else {
265                grouped.entry(load.clone()).or_default();
266            }
267        }
268
269        grouped
270            .into_iter()
271            .map(|(name, child_loads)| {
272                let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
273                    RuntimeError::MissingRelation {
274                        entity: entity.to_owned(),
275                        relation: name.clone(),
276                    }
277                })?;
278                let child_repo = self.scoped_repository(relation.target_entity.clone());
279                let children =
280                    child_repo.build_relation_plans(&relation.target_entity, &child_loads)?;
281                Ok(RelationLoadPlan {
282                    parent_entity: entity.to_owned(),
283                    relation_name: relation.name.clone(),
284                    path: relation.name.clone(),
285                    target_entity: relation.target_entity.clone(),
286                    local_key: relation.local_key.clone(),
287                    foreign_key: relation.foreign_key.clone(),
288                    many: relation.many,
289                    query: None,
290                    children,
291                })
292            })
293            .collect()
294    }
295
296    fn build_relation_plans_from_loads(
297        &self,
298        entity: &str,
299        loads: &[RelationLoad],
300    ) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
301        let descriptor = self.repository.metadata.context.require_entity(entity)?;
302        loads
303            .iter()
304            .map(|load| {
305                let relation = descriptor.relation_by_name(&load.name).ok_or_else(|| {
306                    RuntimeError::MissingRelation {
307                        entity: entity.to_owned(),
308                        relation: load.name.clone(),
309                    }
310                })?;
311                let relation_query = load.query.as_deref().cloned();
312                let child_loads = relation_query
313                    .as_ref()
314                    .map(|query| query.relations.as_slice())
315                    .unwrap_or_default();
316                let child_repo = self.scoped_repository(relation.target_entity.clone());
317                let children = child_repo
318                    .build_relation_plans_from_loads(&relation.target_entity, child_loads)?;
319                Ok(RelationLoadPlan {
320                    parent_entity: entity.to_owned(),
321                    relation_name: relation.name.clone(),
322                    path: relation.name.clone(),
323                    target_entity: relation.target_entity.clone(),
324                    local_key: relation.local_key.clone(),
325                    foreign_key: relation.foreign_key.clone(),
326                    many: relation.many,
327                    query: relation_query,
328                    children,
329                })
330            })
331            .collect()
332    }
333    fn enhance_plan(
334        &self,
335        parent_rows: &mut [Record],
336        plan: &RelationLoadPlan,
337    ) -> Result<(), RepositoryError<E::Error>> {
338        let child_repo = self.scoped_repository(plan.target_entity.clone());
339        let query = self.query_for_plan(plan, parent_rows);
340        let child_rows = child_repo.fetch_all(&query)?;
341        self.attach_relation_rows(parent_rows, plan, child_rows);
342
343        if !plan.children.is_empty() {
344            for parent in parent_rows.iter_mut() {
345                match parent.get_mut(&plan.relation_name) {
346                    Some(Value::Object(child)) => {
347                        child_repo.enhance_child_record(child, &plan.children)?;
348                    }
349                    Some(Value::List(values)) => {
350                        for value in values.iter_mut() {
351                            if let Value::Object(child) = value {
352                                child_repo.enhance_child_record(child, &plan.children)?;
353                            }
354                        }
355                    }
356                    _ => {}
357                }
358            }
359        }
360        Ok(())
361    }
362
363    fn enhance_child_record(
364        &self,
365        child: &mut Record,
366        plans: &[RelationLoadPlan],
367    ) -> Result<(), RepositoryError<E::Error>> {
368        for plan in plans {
369            self.enhance_plan(slice::from_mut(child), plan)?;
370        }
371        Ok(())
372    }
373
374    fn query_for_plan(&self, plan: &RelationLoadPlan, parent_rows: &[Record]) -> SelectQuery {
375        let ids = parent_rows
376            .iter()
377            .filter_map(|row| row.get(&plan.local_key).cloned())
378            .collect::<Vec<_>>();
379
380        let mut query = plan
381            .query
382            .clone()
383            .unwrap_or_else(|| SelectQuery::new(plan.target_entity.clone()));
384        query.entity = plan.target_entity.clone();
385        ensure_projection(&mut query, &plan.foreign_key);
386        for child in &plan.children {
387            ensure_projection(&mut query, &child.local_key);
388        }
389        if !ids.is_empty() {
390            query = query.and_filter(Expr::in_list(plan.foreign_key.clone(), ids));
391        }
392        query
393    }
394
395    fn attach_relation_rows(
396        &self,
397        parent_rows: &mut [Record],
398        plan: &RelationLoadPlan,
399        child_rows: Vec<Record>,
400    ) {
401        let inverse_relation = self
402            .repository
403            .metadata
404            .context
405            .entity(&plan.target_entity)
406            .and_then(|descriptor| {
407                descriptor.relations.iter().find(|relation| {
408                    relation.target_entity == plan.parent_entity
409                        && relation.local_key == plan.foreign_key
410                        && relation.foreign_key == plan.local_key
411                })
412            })
413            .map(|relation| (relation.name.clone(), relation.many));
414
415        let mut buckets: BTreeMap<String, Vec<Record>> = BTreeMap::new();
416        for child in child_rows {
417            if let Some(key) = child.get(&plan.foreign_key) {
418                buckets
419                    .entry(graph_identity_key(key))
420                    .or_default()
421                    .push(child);
422            }
423        }
424
425        for parent in parent_rows.iter_mut() {
426            let Some(local_value) = parent.get(&plan.local_key) else {
427                continue;
428            };
429            let bucket_key = graph_identity_key(local_value);
430            let related = buckets.get(&bucket_key).cloned().unwrap_or_default();
431            let related = if let Some((inverse_relation, inverse_many)) = &inverse_relation {
432                let mut parent_object = parent.clone();
433                parent_object.remove(&plan.relation_name);
434                related
435                    .into_iter()
436                    .map(|mut child| {
437                        if *inverse_many {
438                            let entry = child
439                                .entry(inverse_relation.clone())
440                                .or_insert_with(|| Value::List(Vec::new()));
441                            if let Value::List(list) = entry {
442                                list.push(Value::object(parent_object.clone()));
443                            }
444                        } else {
445                            child.insert(inverse_relation.clone(), Value::object(parent_object.clone()));
446                        }
447                        child
448                    })
449                    .collect::<Vec<_>>()
450            } else {
451                related
452            };
453            if plan.many {
454                parent.insert(
455                    plan.relation_name.clone(),
456                    Value::List(related.into_iter().map(Value::object).collect()),
457                );
458            } else {
459                let value = related
460                    .into_iter()
461                    .next()
462                    .map(Value::object)
463                    .unwrap_or(Value::Null);
464                parent.insert(plan.relation_name.clone(), value);
465            }
466        }
467    }
468}