Skip to main content

dbkit/
runtime.rs

1use crate::executor::BoxFuture;
2use crate::load::{LoadChain, NoLoad};
3use crate::rel::{ManyToManyThrough, RelationInfo};
4use crate::{Error, Expr, ExprNode};
5use crate::{Executor, GetRelation, ModelValue, Select, SelectExt, SetRelation, Value};
6
7pub trait RunLoads<Out> {
8    fn run<'e, E>(&'e self, ex: &'e E, rows: &'e mut [Out]) -> BoxFuture<'e, Result<(), Error>>
9    where
10        E: Executor + Send + Sync + 'e,
11        Out: Send + 'e;
12}
13
14impl<Out> RunLoads<Out> for NoLoad {
15    fn run<'e, E>(&'e self, _ex: &'e E, _rows: &'e mut [Out]) -> BoxFuture<'e, Result<(), Error>>
16    where
17        E: Executor + Send + Sync + 'e,
18        Out: Send + 'e,
19    {
20        Box::pin(async { Ok(()) })
21    }
22}
23
24impl<Out, Prev, L> RunLoads<Out> for LoadChain<Prev, L>
25where
26    Prev: RunLoads<Out> + Sync,
27    L: RunLoad<Out> + Sync,
28{
29    fn run<'e, E>(&'e self, ex: &'e E, rows: &'e mut [Out]) -> BoxFuture<'e, Result<(), Error>>
30    where
31        E: Executor + Send + Sync + 'e,
32        Out: Send + 'e,
33    {
34        Box::pin(async move {
35            self.prev.run(ex, rows).await?;
36            self.load.run(ex, rows).await
37        })
38    }
39}
40
41pub trait RunLoad<Out> {
42    fn run<'e, E>(&'e self, ex: &'e E, rows: &'e mut [Out]) -> BoxFuture<'e, Result<(), Error>>
43    where
44        E: Executor + Send + Sync + 'e,
45        Out: Send + 'e;
46}
47
48pub fn load_selectin_has_many<'e, E, Out, Rel, ChildOut, Nested>(
49    ex: &'e E,
50    rows: &'e mut [Out],
51    rel: Rel,
52    nested: &'e Nested,
53) -> BoxFuture<'e, Result<(), Error>>
54where
55    E: Executor + Send + Sync + 'e,
56    Rel: RelationInfo + Clone + Send + 'e,
57    Out: ModelValue + SetRelation<Rel, Vec<ChildOut>> + Send,
58    ChildOut: ModelValue + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
59    Nested: RunLoads<ChildOut> + Sync,
60{
61    Box::pin(async move {
62        if rows.is_empty() {
63            return Ok(());
64        }
65
66        let relation = rel.relation();
67        let mut keys: Vec<Value> = Vec::new();
68        for row in rows.iter() {
69            if let Some(value) = row.column_value(relation.parent_key) {
70                if value == Value::Null {
71                    continue;
72                }
73                if !keys.iter().any(|existing| existing == &value) {
74                    keys.push(value);
75                }
76            }
77        }
78
79        if keys.is_empty() {
80            return Ok(());
81        }
82
83        let filter = Expr::new(ExprNode::In {
84            expr: Box::new(ExprNode::Column(relation.child_key)),
85            values: keys.clone(),
86        });
87
88        let query: Select<ChildOut> = Select::new(relation.child).filter(filter);
89        let mut children = query.all(ex).await?;
90        nested.run(ex, &mut children).await?;
91
92        let mut groups: Vec<(Value, Vec<ChildOut>)> = Vec::new();
93        for child in children {
94            let Some(key) = child.column_value(relation.child_key) else {
95                return Err(Error::RelationMismatch);
96            };
97            if key == Value::Null {
98                continue;
99            }
100            match groups.iter_mut().find(|(k, _)| *k == key) {
101                Some((_, items)) => items.push(child),
102                None => groups.push((key, vec![child])),
103            }
104        }
105
106        for row in rows.iter_mut() {
107            let Some(key) = row.column_value(relation.parent_key) else {
108                return Err(Error::RelationMismatch);
109            };
110            if key == Value::Null {
111                row.set_relation(rel.clone(), Vec::new())?;
112                continue;
113            }
114            let mut value = Vec::new();
115            if let Some((_, items)) = groups.iter_mut().find(|(k, _)| *k == key) {
116                value = std::mem::take(items);
117            }
118            row.set_relation(rel.clone(), value)?;
119        }
120
121        Ok(())
122    })
123}
124
125pub fn load_selectin_belongs_to<'e, E, Out, Rel, ParentOut, Nested>(
126    ex: &'e E,
127    rows: &'e mut [Out],
128    rel: Rel,
129    nested: &'e Nested,
130) -> BoxFuture<'e, Result<(), Error>>
131where
132    E: Executor + Send + Sync + 'e,
133    Rel: RelationInfo + Clone + Send + 'e,
134    Out: ModelValue + SetRelation<Rel, Option<ParentOut>> + Send,
135    ParentOut: ModelValue + Clone + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
136    Nested: RunLoads<ParentOut> + Sync,
137{
138    Box::pin(async move {
139        if rows.is_empty() {
140            return Ok(());
141        }
142
143        let relation = rel.relation();
144        let mut keys: Vec<Value> = Vec::new();
145        for row in rows.iter() {
146            if let Some(value) = row.column_value(relation.child_key) {
147                if value == Value::Null {
148                    continue;
149                }
150                if !keys.iter().any(|existing| existing == &value) {
151                    keys.push(value);
152                }
153            }
154        }
155
156        if keys.is_empty() {
157            return Ok(());
158        }
159
160        let filter = Expr::new(ExprNode::In {
161            expr: Box::new(ExprNode::Column(relation.parent_key)),
162            values: keys.clone(),
163        });
164
165        let query: Select<ParentOut> = Select::new(relation.parent).filter(filter);
166        let mut parents = query.all(ex).await?;
167        nested.run(ex, &mut parents).await?;
168
169        for row in rows.iter_mut() {
170            let key = row.column_value(relation.child_key);
171            let value = key.and_then(|key| {
172                if key == Value::Null {
173                    return None;
174                }
175                parents
176                    .iter()
177                    .find(|parent| parent.column_value(relation.parent_key) == Some(key.clone()))
178                    .cloned()
179            });
180            row.set_relation(rel.clone(), value)?;
181        }
182
183        Ok(())
184    })
185}
186
187pub fn load_joined_has_many<'e, E, Out, Rel, ChildOut, Nested>(
188    ex: &'e E,
189    rows: &'e mut [Out],
190    rel: Rel,
191    nested: &'e Nested,
192) -> BoxFuture<'e, Result<(), Error>>
193where
194    E: Executor + Send + Sync + 'e,
195    Rel: RelationInfo + Clone + Send + 'e,
196    Out: GetRelation<Rel, Vec<ChildOut>> + Send,
197    ChildOut: ModelValue + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
198    Nested: RunLoads<ChildOut> + Sync,
199{
200    Box::pin(async move {
201        for row in rows.iter_mut() {
202            let Some(children) = row.get_relation_mut(rel.clone()) else {
203                return Err(Error::RelationMismatch);
204            };
205            if !children.is_empty() {
206                nested.run(ex, children.as_mut_slice()).await?;
207            }
208        }
209        Ok(())
210    })
211}
212
213pub fn load_selectin_many_to_many<'e, E, Out, Rel, Through, ChildOut, Nested>(
214    ex: &'e E,
215    rows: &'e mut [Out],
216    rel: Rel,
217    nested: &'e Nested,
218) -> BoxFuture<'e, Result<(), Error>>
219where
220    E: Executor + Send + Sync + 'e,
221    Rel: RelationInfo + ManyToManyThrough<Through = Through> + Clone + Send + 'e,
222    Out: ModelValue + SetRelation<Rel, Vec<ChildOut>> + Send,
223    Through: ModelValue + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
224    ChildOut: ModelValue + Clone + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
225    Nested: RunLoads<ChildOut> + Sync,
226{
227    Box::pin(async move {
228        if rows.is_empty() {
229            return Ok(());
230        }
231
232        let relation = rel.relation();
233        let join_parent_key = relation.join_parent_key.ok_or(Error::RelationMismatch)?;
234        let join_child_key = relation.join_child_key.ok_or(Error::RelationMismatch)?;
235        let join_table = relation.join_table.ok_or(Error::RelationMismatch)?;
236
237        let mut parent_keys: Vec<Value> = Vec::new();
238        for row in rows.iter() {
239            if let Some(value) = row.column_value(relation.parent_key) {
240                if value == Value::Null {
241                    continue;
242                }
243                if !parent_keys.iter().any(|existing| existing == &value) {
244                    parent_keys.push(value);
245                }
246            }
247        }
248
249        if parent_keys.is_empty() {
250            for row in rows.iter_mut() {
251                row.set_relation(rel.clone(), Vec::new())?;
252            }
253            return Ok(());
254        }
255
256        let join_filter = Expr::new(ExprNode::In {
257            expr: Box::new(ExprNode::Column(join_parent_key)),
258            values: parent_keys.clone(),
259        });
260        let join_query: Select<Through> = Select::new(join_table).filter(join_filter);
261        let join_rows = join_query.all(ex).await?;
262
263        let mut parent_to_child: Vec<(Value, Vec<Value>)> = Vec::new();
264        for row in join_rows {
265            let Some(parent_key) = row.column_value(join_parent_key) else {
266                return Err(Error::RelationMismatch);
267            };
268            let Some(child_key) = row.column_value(join_child_key) else {
269                return Err(Error::RelationMismatch);
270            };
271            if parent_key == Value::Null || child_key == Value::Null {
272                continue;
273            }
274            match parent_to_child.iter_mut().find(|(key, _)| *key == parent_key) {
275                Some((_, children)) => {
276                    if !children.iter().any(|existing| existing == &child_key) {
277                        children.push(child_key);
278                    }
279                }
280                None => parent_to_child.push((parent_key, vec![child_key])),
281            }
282        }
283
284        let mut child_keys: Vec<Value> = Vec::new();
285        for (_, children) in parent_to_child.iter() {
286            for child_key in children {
287                if !child_keys.iter().any(|existing| existing == child_key) {
288                    child_keys.push(child_key.clone());
289                }
290            }
291        }
292
293        if child_keys.is_empty() {
294            for row in rows.iter_mut() {
295                row.set_relation(rel.clone(), Vec::new())?;
296            }
297            return Ok(());
298        }
299
300        let child_filter = Expr::new(ExprNode::In {
301            expr: Box::new(ExprNode::Column(relation.child_key)),
302            values: child_keys.clone(),
303        });
304        let child_query: Select<ChildOut> = Select::new(relation.child).filter(child_filter);
305        let mut children = child_query.all(ex).await?;
306        nested.run(ex, &mut children).await?;
307
308        for row in rows.iter_mut() {
309            let Some(parent_key) = row.column_value(relation.parent_key) else {
310                return Err(Error::RelationMismatch);
311            };
312            if parent_key == Value::Null {
313                row.set_relation(rel.clone(), Vec::new())?;
314                continue;
315            }
316            let mut value = Vec::new();
317            if let Some((_, child_ids)) = parent_to_child.iter().find(|(key, _)| *key == parent_key) {
318                for child_id in child_ids {
319                    if let Some(child) = children
320                        .iter()
321                        .find(|child| child.column_value(relation.child_key) == Some(child_id.clone()))
322                    {
323                        value.push(child.clone());
324                    }
325                }
326            }
327            row.set_relation(rel.clone(), value)?;
328        }
329
330        Ok(())
331    })
332}
333
334pub fn load_joined_belongs_to<'e, E, Out, Rel, ParentOut, Nested>(
335    ex: &'e E,
336    rows: &'e mut [Out],
337    rel: Rel,
338    nested: &'e Nested,
339) -> BoxFuture<'e, Result<(), Error>>
340where
341    E: Executor + Send + Sync + 'e,
342    Rel: RelationInfo + Clone + Send + 'e,
343    Out: GetRelation<Rel, Option<ParentOut>> + Send,
344    ParentOut: ModelValue + Clone + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
345    Nested: RunLoads<ParentOut> + Sync,
346{
347    Box::pin(async move {
348        for row in rows.iter_mut() {
349            let Some(parent) = row.get_relation_mut(rel.clone()) else {
350                return Err(Error::RelationMismatch);
351            };
352            if let Some(parent) = parent.as_mut() {
353                nested.run(ex, std::slice::from_mut(parent)).await?;
354            }
355        }
356        Ok(())
357    })
358}
359
360pub fn load_joined_many_to_many<'e, E, Out, Rel, Through, ChildOut, Nested>(
361    ex: &'e E,
362    rows: &'e mut [Out],
363    rel: Rel,
364    nested: &'e Nested,
365) -> BoxFuture<'e, Result<(), Error>>
366where
367    E: Executor + Send + Sync + 'e,
368    Rel: RelationInfo + ManyToManyThrough<Through = Through> + Clone + Send + 'e,
369    Out: GetRelation<Rel, Vec<ChildOut>> + Send,
370    Through: ModelValue + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
371    ChildOut: ModelValue + Clone + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
372    Nested: RunLoads<ChildOut> + Sync,
373{
374    Box::pin(async move {
375        for row in rows.iter_mut() {
376            let Some(children) = row.get_relation_mut(rel.clone()) else {
377                return Err(Error::RelationMismatch);
378            };
379            if !children.is_empty() {
380                nested.run(ex, children.as_mut_slice()).await?;
381            }
382        }
383        Ok(())
384    })
385}