Skip to main content

teaql_runtime/repository/
relation.rs

1use std::collections::BTreeMap;
2use std::slice;
3
4use teaql_core::{
5    Aggregate, Expr, ObjectGroupBy, Record, RelationAggregate, RelationLoad, SelectQuery, Value,
6};
7
8use crate::{RepositoryError, RuntimeError};
9
10use super::{RelationLoadPlan, ResolvedRepository, helpers::*};
11
12impl<'a, E> ResolvedRepository<'a, E>
13where
14    E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
15{
16    pub fn relation_loads(&self) -> Vec<String> {
17        self.behavior()
18            .map(|behavior| behavior.relation_loads(self.repository.metadata.context))
19            .unwrap_or_default()
20    }
21
22    pub fn relation_plans(&self) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
23        self.build_relation_plans(&self.entity, &self.relation_loads())
24    }
25
26    pub fn relation_query(
27        &self,
28        relation_name: &str,
29        parent_rows: &[Record],
30    ) -> Result<SelectQuery, RuntimeError> {
31        let plan = self
32            .relation_plans()?
33            .into_iter()
34            .find(|plan| plan.relation_name == relation_name)
35            .ok_or_else(|| RuntimeError::MissingRelation {
36                entity: self.entity.clone(),
37                relation: relation_name.to_owned(),
38            })?;
39        Ok(self.query_for_plan(&plan, parent_rows))
40    }
41
42    pub async fn enhance_relations(
43        &self,
44        parent_rows: &mut [Record],
45    ) -> Result<(), RepositoryError<E::Error>> {
46        let plans = self.relation_plans().map_err(RepositoryError::Runtime)?;
47        for plan in plans {
48            self.enhance_plan(parent_rows, &plan).await?;
49        }
50        Ok(())
51    }
52
53    pub async fn enhance_query_relations(
54        &self,
55        parent_rows: &mut [Record],
56        query: &SelectQuery,
57    ) -> Result<(), RepositoryError<E::Error>> {
58        let plans = self
59            .build_relation_plans_from_loads(&query.entity, &query.relations)
60            .map_err(RepositoryError::Runtime)?;
61        for plan in plans {
62            self.enhance_plan(parent_rows, &plan).await?;
63        }
64        Ok(())
65    }
66
67    pub fn enhance_relation_aggregates<'b>(
68        &'b self,
69        parent_rows: &'b mut [Record],
70        relation_aggregates: &'b [RelationAggregate],
71        parent_cache_options: Option<teaql_core::AggregationCacheOptions>,
72        parent_trace_chain: &'b [teaql_core::TraceNode],
73    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RepositoryError<E::Error>>> + Send + 'b>> {
74        Box::pin(async move {
75        for aggregate in relation_aggregates {
76            self.enhance_relation_aggregate(parent_rows, aggregate, parent_cache_options, parent_trace_chain).await?;
77        }
78        Ok(())
79        })
80    }
81
82    pub fn enhance_object_group_bys<'b>(
83        &'b self,
84        rows: &'b mut [Record],
85        object_group_bys: &'b [ObjectGroupBy],
86        parent_trace_chain: &'b [teaql_core::TraceNode],
87    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RepositoryError<E::Error>>> + Send + 'b>> {
88        Box::pin(async move {
89        for group_by in object_group_bys {
90            let ids = rows
91                .iter()
92                .filter_map(|row| row.get(&group_by.storage_field).cloned())
93                .collect::<Vec<_>>();
94            if ids.is_empty() {
95                continue;
96            }
97            let mut query = group_by.query.clone();
98            ensure_projection(&mut query, "id");
99            query = query.and_filter(Expr::in_list("id", ids));
100            let object_rows = self
101                .scoped_repository(query.entity.clone())
102                .with_trace_context(parent_trace_chain.to_vec())
103                .fetch_all(&query).await?
104                .into_iter()
105                .filter_map(|row| {
106                    row.get("id")
107                        .cloned()
108                        .map(|id| (graph_identity_key(&id), row))
109                })
110                .collect::<BTreeMap<_, _>>();
111            for row in rows.iter_mut() {
112                if let Some(key) = row.get(&group_by.storage_field).map(graph_identity_key) {
113                    let value = object_rows
114                        .get(&key)
115                        .cloned()
116                        .map(Value::object)
117                        .unwrap_or(Value::Null);
118                    row.insert(group_by.property_name.clone(), value);
119                }
120            }
121        }
122        Ok(())
123        })
124    }
125
126    pub fn enhance_child_queries<'b>(
127        &'b self,
128        rows: &'b mut [Record],
129        child_queries: &'b [SelectQuery],
130        parent_trace_chain: &'b [teaql_core::TraceNode],
131    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RepositoryError<E::Error>>> + Send + 'b>> {
132        Box::pin(async move {
133        for child_query in child_queries {
134            let ids = rows
135                .iter()
136                .filter_map(|row| row.get("id").cloned())
137                .collect::<Vec<_>>();
138            if ids.is_empty() {
139                continue;
140            }
141            let mut query = child_query.clone();
142            ensure_projection(&mut query, "id");
143            query = query.and_filter(Expr::in_list("id", ids));
144            let child_rows = self
145                .scoped_repository(query.entity.clone())
146                .with_trace_context(parent_trace_chain.to_vec())
147                .fetch_all(&query).await?
148                .into_iter()
149                .filter_map(|row| {
150                    row.get("id")
151                        .cloned()
152                        .map(|id| (graph_identity_key(&id), row))
153                })
154                .collect::<BTreeMap<_, _>>();
155            for row in rows.iter_mut() {
156                if let Some(key) = row.get("id").map(graph_identity_key) {
157                    if let Some(child) = child_rows.get(&key) {
158                        row.extend(child.clone());
159                    }
160                }
161            }
162        }
163        Ok(())
164        })
165    }
166
167    async fn enhance_relation_aggregate(
168        &self,
169        parent_rows: &mut [Record],
170        aggregate: &RelationAggregate,
171        parent_cache_options: Option<teaql_core::AggregationCacheOptions>,
172        parent_trace_chain: &[teaql_core::TraceNode],
173    ) -> Result<(), RepositoryError<E::Error>> {
174        let plan = self
175            .build_relation_plans_from_loads(
176                &self.entity,
177                &[RelationLoad::with_query(
178                    aggregate.relation_name.clone(),
179                    aggregate.query.clone(),
180                )],
181            )
182            .map_err(RepositoryError::Runtime)?
183            .into_iter()
184            .next()
185            .ok_or_else(|| {
186                RepositoryError::Runtime(RuntimeError::MissingRelation {
187                    entity: self.entity.clone(),
188                    relation: aggregate.relation_name.clone(),
189                })
190            })?;
191
192        let ids = parent_rows
193            .iter()
194            .filter_map(|row| row.get(&plan.local_key).cloned())
195            .collect::<Vec<_>>();
196        if ids.is_empty() {
197            attach_empty_relation_aggregate(parent_rows, &aggregate.alias, aggregate.single_result);
198            return Ok(());
199        }
200
201        let child_repo = self.scoped_repository(plan.target_entity.clone());
202        let mut query = aggregate.query.clone();
203        query.entity = plan.target_entity.clone();
204        if query.aggregation_cache.is_none() {
205            if let Some(options) = parent_cache_options.filter(|options| options.propagate) {
206                query.aggregation_cache = Some(teaql_core::AggregationCacheOptions::enabled(
207                    options.propagate_cache_expired_millis,
208                ));
209            }
210        }
211        query.projection.clear();
212        query.expr_projection.clear();
213        query.order_by.clear();
214        query.slice = None;
215        query.relations.clear();
216        if query.aggregates.is_empty() {
217            let alias = if aggregate.single_result {
218                aggregate.alias.clone()
219            } else {
220                "count".to_owned()
221            };
222            query = query.aggregate(Aggregate::count(alias));
223        }
224        if !query
225            .group_by
226            .iter()
227            .any(|field| field == &plan.foreign_key)
228        {
229            query = query.group_by(plan.foreign_key.clone());
230        }
231        query = query.and_filter(Expr::in_list(plan.foreign_key.clone(), ids));
232
233        let mut chain = parent_trace_chain.to_vec();
234        chain.push(teaql_core::TraceNode {
235            entity_type: query.entity.clone(),
236            entity_id: None,
237            comment: aggregate.alias.clone(),
238        });
239
240        let mut aggregate_rows = child_repo.with_trace_context(chain).fetch_all(&query).await?;
241        let foreign_key_column = self
242            .repository
243            .metadata
244            .context
245            .entity(&plan.target_entity)
246            .and_then(|descriptor| {
247                descriptor
248                    .properties
249                    .iter()
250                    .find(|property| property.name == plan.foreign_key)
251                    .map(|property| property.column_name.clone())
252            });
253        if let Some(foreign_key_column) =
254            foreign_key_column.filter(|column| column != &plan.foreign_key)
255        {
256            for row in &mut aggregate_rows {
257                if !row.contains_key(&plan.foreign_key) {
258                    if let Some(value) = row.remove(&foreign_key_column) {
259                        row.insert(plan.foreign_key.clone(), value);
260                    }
261                }
262            }
263        }
264        attach_relation_aggregate_rows(parent_rows, &plan, aggregate, aggregate_rows);
265        Ok(())
266    }
267
268    fn build_relation_plans(
269        &self,
270        entity: &str,
271        loads: &[String],
272    ) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
273        let descriptor = self.repository.metadata.context.require_entity(entity)?;
274        let mut grouped: BTreeMap<String, Vec<String>> = BTreeMap::new();
275        for load in loads {
276            if let Some((head, tail)) = load.split_once('.') {
277                grouped
278                    .entry(head.to_owned())
279                    .or_default()
280                    .push(tail.to_owned());
281            } else {
282                grouped.entry(load.clone()).or_default();
283            }
284        }
285
286        grouped
287            .into_iter()
288            .map(|(name, child_loads)| {
289                let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
290                    RuntimeError::MissingRelation {
291                        entity: entity.to_owned(),
292                        relation: name.clone(),
293                    }
294                })?;
295                let child_repo = self.scoped_repository(relation.target_entity.clone());
296                let children =
297                    child_repo.build_relation_plans(&relation.target_entity, &child_loads)?;
298                Ok(RelationLoadPlan {
299                    parent_entity: entity.to_owned(),
300                    relation_name: relation.name.clone(),
301                    path: relation.name.clone(),
302                    target_entity: relation.target_entity.clone(),
303                    local_key: relation.local_key.clone(),
304                    foreign_key: relation.foreign_key.clone(),
305                    many: relation.many,
306                    query: None,
307                    children,
308                })
309            })
310            .collect()
311    }
312
313    fn build_relation_plans_from_loads(
314        &self,
315        entity: &str,
316        loads: &[RelationLoad],
317    ) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
318        let descriptor = self.repository.metadata.context.require_entity(entity)?;
319        loads
320            .iter()
321            .map(|load| {
322                let relation = descriptor.relation_by_name(&load.name).ok_or_else(|| {
323                    RuntimeError::MissingRelation {
324                        entity: entity.to_owned(),
325                        relation: load.name.clone(),
326                    }
327                })?;
328                let relation_query = load.query.as_deref().cloned();
329                let child_loads = relation_query
330                    .as_ref()
331                    .map(|query| query.relations.as_slice())
332                    .unwrap_or_default();
333                let child_repo = self.scoped_repository(relation.target_entity.clone());
334                let children = child_repo
335                    .build_relation_plans_from_loads(&relation.target_entity, child_loads)?;
336                Ok(RelationLoadPlan {
337                    parent_entity: entity.to_owned(),
338                    relation_name: relation.name.clone(),
339                    path: relation.name.clone(),
340                    target_entity: relation.target_entity.clone(),
341                    local_key: relation.local_key.clone(),
342                    foreign_key: relation.foreign_key.clone(),
343                    many: relation.many,
344                    query: relation_query,
345                    children,
346                })
347            })
348            .collect()
349    }
350    fn enhance_plan<'b>(
351        &'b self,
352        parent_rows: &'b mut [Record],
353        plan: &'b RelationLoadPlan,
354    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RepositoryError<E::Error>>> + Send + 'b>> {
355        Box::pin(async move {
356            let child_repo = self.scoped_repository(plan.target_entity.clone());
357            let query = self.query_for_plan(plan, parent_rows);
358            let child_rows = child_repo.fetch_all(&query).await?;
359            self.attach_relation_rows(parent_rows, plan, child_rows);
360
361            if !plan.children.is_empty() {
362                for parent in parent_rows.iter_mut() {
363                    match parent.get_mut(&plan.relation_name) {
364                        Some(Value::Object(child)) => {
365                            child_repo.enhance_child_record(child, &plan.children).await?;
366                        }
367                        Some(Value::List(values)) => {
368                            for value in values.iter_mut() {
369                                if let Value::Object(child) = value {
370                                    child_repo.enhance_child_record(child, &plan.children).await?;
371                                }
372                            }
373                        }
374                        _ => {}
375                    }
376                }
377            }
378            Ok(())
379        })
380    }
381
382    fn enhance_child_record<'b>(
383        &'b self,
384        child: &'b mut Record,
385        plans: &'b [RelationLoadPlan],
386    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RepositoryError<E::Error>>> + Send + 'b>> {
387        Box::pin(async move {
388            for plan in plans {
389                self.enhance_plan(slice::from_mut(child), plan).await?;
390            }
391            Ok(())
392        })
393    }
394
395    fn query_for_plan(&self, plan: &RelationLoadPlan, parent_rows: &[Record]) -> SelectQuery {
396        let ids = parent_rows
397            .iter()
398            .filter_map(|row| row.get(&plan.local_key).cloned())
399            .collect::<Vec<_>>();
400
401        let mut query = plan
402            .query
403            .clone()
404            .unwrap_or_else(|| SelectQuery::new(plan.target_entity.clone()));
405        query.entity = plan.target_entity.clone();
406        ensure_projection(&mut query, &plan.foreign_key);
407        for child in &plan.children {
408            ensure_projection(&mut query, &child.local_key);
409        }
410        if !ids.is_empty() {
411            query = query.and_filter(Expr::in_list(plan.foreign_key.clone(), ids));
412        }
413        query
414    }
415
416    fn attach_relation_rows(
417        &self,
418        parent_rows: &mut [Record],
419        plan: &RelationLoadPlan,
420        child_rows: Vec<Record>,
421    ) {
422        let inverse_relation = self
423            .repository
424            .metadata
425            .context
426            .entity(&plan.target_entity)
427            .and_then(|descriptor| {
428                descriptor.relations.iter().find(|relation| {
429                    relation.target_entity == plan.parent_entity
430                        && relation.local_key == plan.foreign_key
431                        && relation.foreign_key == plan.local_key
432                })
433            })
434            .map(|relation| (relation.name.clone(), relation.many));
435
436        let mut buckets: BTreeMap<String, Vec<Record>> = BTreeMap::new();
437        for child in child_rows {
438            if let Some(key) = child.get(&plan.foreign_key) {
439                buckets
440                    .entry(graph_identity_key(key))
441                    .or_default()
442                    .push(child);
443            }
444        }
445
446        for parent in parent_rows.iter_mut() {
447            let Some(local_value) = parent.get(&plan.local_key) else {
448                continue;
449            };
450            let bucket_key = graph_identity_key(local_value);
451            let related = buckets.get(&bucket_key).cloned().unwrap_or_default();
452            let related = if let Some((inverse_relation, inverse_many)) = &inverse_relation {
453                let mut parent_object = parent.clone();
454                parent_object.remove(&plan.relation_name);
455                related
456                    .into_iter()
457                    .map(|mut child| {
458                        if *inverse_many {
459                            let entry = child
460                                .entry(inverse_relation.clone())
461                                .or_insert_with(|| Value::List(Vec::new()));
462                            if let Value::List(list) = entry {
463                                list.push(Value::object(parent_object.clone()));
464                            }
465                        } else {
466                            child.insert(
467                                inverse_relation.clone(),
468                                Value::object(parent_object.clone()),
469                            );
470                        }
471                        child
472                    })
473                    .collect::<Vec<_>>()
474            } else {
475                related
476            };
477            if plan.many {
478                parent.insert(
479                    plan.relation_name.clone(),
480                    Value::List(related.into_iter().map(Value::object).collect()),
481                );
482            } else {
483                let value = related
484                    .into_iter()
485                    .next()
486                    .map(Value::object)
487                    .unwrap_or(Value::Null);
488                parent.insert(plan.relation_name.clone(), value);
489            }
490        }
491    }
492}