teaql_runtime/repository/
relation.rs1use std::collections::BTreeMap;
2use std::slice;
3
4use teaql_core::{
5 Aggregate, Expr, ObjectGroupBy, Record, RelationAggregate, RelationLoad, SelectQuery, Value,
6};
7use teaql_sql::SqlDialect;
8
9use crate::{RepositoryError, RuntimeError};
10
11use super::{QueryExecutor, RelationLoadPlan, ResolvedRepository, helpers::*};
12
13impl<'a, D, E> ResolvedRepository<'a, D, E>
14where
15 D: SqlDialect,
16 E: QueryExecutor,
17{
18 pub fn relation_loads(&self) -> Vec<String> {
19 self.behavior()
20 .map(|behavior| behavior.relation_loads(self.repository.metadata.context))
21 .unwrap_or_default()
22 }
23
24 pub fn relation_plans(&self) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
25 self.build_relation_plans(&self.entity, &self.relation_loads())
26 }
27
28 pub fn relation_query(
29 &self,
30 relation_name: &str,
31 parent_rows: &[Record],
32 ) -> Result<SelectQuery, RuntimeError> {
33 let plan = self
34 .relation_plans()?
35 .into_iter()
36 .find(|plan| plan.relation_name == relation_name)
37 .ok_or_else(|| RuntimeError::MissingRelation {
38 entity: self.entity.clone(),
39 relation: relation_name.to_owned(),
40 })?;
41 Ok(self.query_for_plan(&plan, parent_rows))
42 }
43
44 pub fn enhance_relations(
45 &self,
46 parent_rows: &mut [Record],
47 ) -> Result<(), RepositoryError<E::Error>> {
48 let plans = self.relation_plans().map_err(RepositoryError::Runtime)?;
49 for plan in plans {
50 self.enhance_plan(parent_rows, &plan)?;
51 }
52 Ok(())
53 }
54
55 pub fn enhance_query_relations(
56 &self,
57 parent_rows: &mut [Record],
58 query: &SelectQuery,
59 ) -> Result<(), RepositoryError<E::Error>> {
60 let plans = self
61 .build_relation_plans_from_loads(&query.entity, &query.relations)
62 .map_err(RepositoryError::Runtime)?;
63 for plan in plans {
64 self.enhance_plan(parent_rows, &plan)?;
65 }
66 Ok(())
67 }
68
69 pub fn enhance_relation_aggregates(
70 &self,
71 parent_rows: &mut [Record],
72 relation_aggregates: &[RelationAggregate],
73 parent_cache_options: Option<teaql_core::AggregationCacheOptions>,
74 ) -> Result<(), RepositoryError<E::Error>> {
75 for aggregate in relation_aggregates {
76 self.enhance_relation_aggregate(parent_rows, aggregate, parent_cache_options)?;
77 }
78 Ok(())
79 }
80
81 pub fn enhance_object_group_bys(
82 &self,
83 rows: &mut [Record],
84 object_group_bys: &[ObjectGroupBy],
85 ) -> Result<(), RepositoryError<E::Error>> {
86 for group_by in object_group_bys {
87 let ids = rows
88 .iter()
89 .filter_map(|row| row.get(&group_by.storage_field).cloned())
90 .collect::<Vec<_>>();
91 if ids.is_empty() {
92 continue;
93 }
94 let mut query = group_by.query.clone();
95 ensure_projection(&mut query, "id");
96 query = query.and_filter(Expr::in_list("id", ids));
97 let object_rows = self
98 .scoped_repository(query.entity.clone())
99 .fetch_all(&query)?
100 .into_iter()
101 .filter_map(|row| {
102 row.get("id")
103 .cloned()
104 .map(|id| (relation_bucket_key(&id), row))
105 })
106 .collect::<BTreeMap<_, _>>();
107 for row in rows.iter_mut() {
108 if let Some(key) = row.get(&group_by.storage_field).map(relation_bucket_key) {
109 let value = object_rows
110 .get(&key)
111 .cloned()
112 .map(Value::object)
113 .unwrap_or(Value::Null);
114 row.insert(group_by.property_name.clone(), value);
115 }
116 }
117 }
118 Ok(())
119 }
120
121 pub fn enhance_child_queries(
122 &self,
123 rows: &mut [Record],
124 child_queries: &[SelectQuery],
125 ) -> Result<(), RepositoryError<E::Error>> {
126 for child_query in child_queries {
127 let ids = rows
128 .iter()
129 .filter_map(|row| row.get("id").cloned())
130 .collect::<Vec<_>>();
131 if ids.is_empty() {
132 continue;
133 }
134 let mut query = child_query.clone();
135 ensure_projection(&mut query, "id");
136 query = query.and_filter(Expr::in_list("id", ids));
137 let child_rows = self
138 .scoped_repository(query.entity.clone())
139 .fetch_all(&query)?
140 .into_iter()
141 .filter_map(|row| {
142 row.get("id")
143 .cloned()
144 .map(|id| (relation_bucket_key(&id), row))
145 })
146 .collect::<BTreeMap<_, _>>();
147 for row in rows.iter_mut() {
148 if let Some(key) = row.get("id").map(relation_bucket_key) {
149 if let Some(child) = child_rows.get(&key) {
150 row.extend(child.clone());
151 }
152 }
153 }
154 }
155 Ok(())
156 }
157
158 fn enhance_relation_aggregate(
159 &self,
160 parent_rows: &mut [Record],
161 aggregate: &RelationAggregate,
162 parent_cache_options: Option<teaql_core::AggregationCacheOptions>,
163 ) -> Result<(), RepositoryError<E::Error>> {
164 let plan = self
165 .build_relation_plans_from_loads(
166 &self.entity,
167 &[RelationLoad::with_query(
168 aggregate.relation_name.clone(),
169 aggregate.query.clone(),
170 )],
171 )
172 .map_err(RepositoryError::Runtime)?
173 .into_iter()
174 .next()
175 .ok_or_else(|| {
176 RepositoryError::Runtime(RuntimeError::MissingRelation {
177 entity: self.entity.clone(),
178 relation: aggregate.relation_name.clone(),
179 })
180 })?;
181
182 let ids = parent_rows
183 .iter()
184 .filter_map(|row| row.get(&plan.local_key).cloned())
185 .collect::<Vec<_>>();
186 if ids.is_empty() {
187 attach_empty_relation_aggregate(parent_rows, &aggregate.alias, aggregate.single_result);
188 return Ok(());
189 }
190
191 let child_repo = self.scoped_repository(plan.target_entity.clone());
192 let mut query = aggregate.query.clone();
193 query.entity = plan.target_entity.clone();
194 if query.aggregation_cache.is_none() {
195 if let Some(options) = parent_cache_options.filter(|options| options.propagate) {
196 query.aggregation_cache = Some(teaql_core::AggregationCacheOptions::enabled(
197 options.propagate_cache_expired_millis,
198 ));
199 }
200 }
201 query.projection.clear();
202 query.expr_projection.clear();
203 query.order_by.clear();
204 query.slice = None;
205 query.relations.clear();
206 if query.aggregates.is_empty() {
207 let alias = if aggregate.single_result {
208 aggregate.alias.clone()
209 } else {
210 "count".to_owned()
211 };
212 query = query.aggregate(Aggregate::count(alias));
213 }
214 if !query
215 .group_by
216 .iter()
217 .any(|field| field == &plan.foreign_key)
218 {
219 query = query.group_by(plan.foreign_key.clone());
220 }
221 query = query.and_filter(Expr::in_list(plan.foreign_key.clone(), ids));
222
223 let aggregate_rows = child_repo.fetch_all(&query)?;
224 attach_relation_aggregate_rows(parent_rows, &plan, aggregate, aggregate_rows);
225 Ok(())
226 }
227
228 fn build_relation_plans(
229 &self,
230 entity: &str,
231 loads: &[String],
232 ) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
233 let descriptor = self.repository.metadata.context.require_entity(entity)?;
234 let mut grouped: BTreeMap<String, Vec<String>> = BTreeMap::new();
235 for load in loads {
236 if let Some((head, tail)) = load.split_once('.') {
237 grouped
238 .entry(head.to_owned())
239 .or_default()
240 .push(tail.to_owned());
241 } else {
242 grouped.entry(load.clone()).or_default();
243 }
244 }
245
246 grouped
247 .into_iter()
248 .map(|(name, child_loads)| {
249 let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
250 RuntimeError::MissingRelation {
251 entity: entity.to_owned(),
252 relation: name.clone(),
253 }
254 })?;
255 let child_repo = self.scoped_repository(relation.target_entity.clone());
256 let children =
257 child_repo.build_relation_plans(&relation.target_entity, &child_loads)?;
258 Ok(RelationLoadPlan {
259 parent_entity: entity.to_owned(),
260 relation_name: relation.name.clone(),
261 path: relation.name.clone(),
262 target_entity: relation.target_entity.clone(),
263 local_key: relation.local_key.clone(),
264 foreign_key: relation.foreign_key.clone(),
265 many: relation.many,
266 query: None,
267 children,
268 })
269 })
270 .collect()
271 }
272
273 fn build_relation_plans_from_loads(
274 &self,
275 entity: &str,
276 loads: &[RelationLoad],
277 ) -> Result<Vec<RelationLoadPlan>, RuntimeError> {
278 let descriptor = self.repository.metadata.context.require_entity(entity)?;
279 loads
280 .iter()
281 .map(|load| {
282 let relation = descriptor.relation_by_name(&load.name).ok_or_else(|| {
283 RuntimeError::MissingRelation {
284 entity: entity.to_owned(),
285 relation: load.name.clone(),
286 }
287 })?;
288 let relation_query = load.query.as_deref().cloned();
289 let child_loads = relation_query
290 .as_ref()
291 .map(|query| query.relations.as_slice())
292 .unwrap_or_default();
293 let child_repo = self.scoped_repository(relation.target_entity.clone());
294 let children = child_repo
295 .build_relation_plans_from_loads(&relation.target_entity, child_loads)?;
296 Ok(RelationLoadPlan {
297 parent_entity: entity.to_owned(),
298 relation_name: relation.name.clone(),
299 path: relation.name.clone(),
300 target_entity: relation.target_entity.clone(),
301 local_key: relation.local_key.clone(),
302 foreign_key: relation.foreign_key.clone(),
303 many: relation.many,
304 query: relation_query,
305 children,
306 })
307 })
308 .collect()
309 }
310 fn enhance_plan(
311 &self,
312 parent_rows: &mut [Record],
313 plan: &RelationLoadPlan,
314 ) -> Result<(), RepositoryError<E::Error>> {
315 let child_repo = self.scoped_repository(plan.target_entity.clone());
316 let query = self.query_for_plan(plan, parent_rows);
317 let child_rows = child_repo.fetch_all(&query)?;
318 self.attach_relation_rows(parent_rows, plan, child_rows);
319
320 if !plan.children.is_empty() {
321 for parent in parent_rows.iter_mut() {
322 match parent.get_mut(&plan.relation_name) {
323 Some(Value::Object(child)) => {
324 child_repo.enhance_child_record(child, &plan.children)?;
325 }
326 Some(Value::List(values)) => {
327 for value in values.iter_mut() {
328 if let Value::Object(child) = value {
329 child_repo.enhance_child_record(child, &plan.children)?;
330 }
331 }
332 }
333 _ => {}
334 }
335 }
336 }
337 Ok(())
338 }
339
340 fn enhance_child_record(
341 &self,
342 child: &mut Record,
343 plans: &[RelationLoadPlan],
344 ) -> Result<(), RepositoryError<E::Error>> {
345 for plan in plans {
346 self.enhance_plan(slice::from_mut(child), plan)?;
347 }
348 Ok(())
349 }
350
351 fn query_for_plan(&self, plan: &RelationLoadPlan, parent_rows: &[Record]) -> SelectQuery {
352 let ids = parent_rows
353 .iter()
354 .filter_map(|row| row.get(&plan.local_key).cloned())
355 .collect::<Vec<_>>();
356
357 let mut query = plan
358 .query
359 .clone()
360 .unwrap_or_else(|| SelectQuery::new(plan.target_entity.clone()));
361 query.entity = plan.target_entity.clone();
362 ensure_projection(&mut query, &plan.foreign_key);
363 for child in &plan.children {
364 ensure_projection(&mut query, &child.local_key);
365 }
366 if !ids.is_empty() {
367 query = query.and_filter(Expr::in_list(plan.foreign_key.clone(), ids));
368 }
369 query
370 }
371
372 fn attach_relation_rows(
373 &self,
374 parent_rows: &mut [Record],
375 plan: &RelationLoadPlan,
376 child_rows: Vec<Record>,
377 ) {
378 let inverse_relation = self
379 .repository
380 .metadata
381 .context
382 .entity(&plan.target_entity)
383 .and_then(|descriptor| {
384 descriptor.relations.iter().find(|relation| {
385 relation.target_entity == plan.parent_entity
386 && relation.local_key == plan.foreign_key
387 && relation.foreign_key == plan.local_key
388 })
389 })
390 .map(|relation| relation.name.clone());
391
392 let mut buckets: BTreeMap<String, Vec<Record>> = BTreeMap::new();
393 for child in child_rows {
394 if let Some(key) = child.get(&plan.foreign_key) {
395 buckets
396 .entry(relation_bucket_key(key))
397 .or_default()
398 .push(child);
399 }
400 }
401
402 for parent in parent_rows.iter_mut() {
403 let Some(local_value) = parent.get(&plan.local_key) else {
404 continue;
405 };
406 let bucket_key = relation_bucket_key(local_value);
407 let related = buckets.get(&bucket_key).cloned().unwrap_or_default();
408 let related = if let Some(inverse_relation) = &inverse_relation {
409 let mut parent_object = parent.clone();
410 parent_object.remove(&plan.relation_name);
411 related
412 .into_iter()
413 .map(|mut child| {
414 child
415 .entry(inverse_relation.clone())
416 .or_insert_with(|| Value::object(parent_object.clone()));
417 child
418 })
419 .collect::<Vec<_>>()
420 } else {
421 related
422 };
423 if plan.many {
424 parent.insert(
425 plan.relation_name.clone(),
426 Value::List(related.into_iter().map(Value::object).collect()),
427 );
428 } else {
429 let value = related
430 .into_iter()
431 .next()
432 .map(Value::object)
433 .unwrap_or(Value::Null);
434 parent.insert(plan.relation_name.clone(), value);
435 }
436 }
437 }
438}