1use crate::executor::BoxFuture;
2use crate::load::{LoadChain, NoLoad};
3use crate::rel::{ManyToManyThrough, RelationInfo};
4use crate::{Error, Expr, ExprNode};
5use crate::{Executor, GetRelation, ModelValue, Select, SelectExt, SetRelation, Value};
6
7pub trait RunLoads<Out> {
8 fn run<'e, E>(&'e self, ex: &'e E, rows: &'e mut [Out]) -> BoxFuture<'e, Result<(), Error>>
9 where
10 E: Executor + Send + Sync + 'e,
11 Out: Send + 'e;
12}
13
14impl<Out> RunLoads<Out> for NoLoad {
15 fn run<'e, E>(&'e self, _ex: &'e E, _rows: &'e mut [Out]) -> BoxFuture<'e, Result<(), Error>>
16 where
17 E: Executor + Send + Sync + 'e,
18 Out: Send + 'e,
19 {
20 Box::pin(async { Ok(()) })
21 }
22}
23
24impl<Out, Prev, L> RunLoads<Out> for LoadChain<Prev, L>
25where
26 Prev: RunLoads<Out> + Sync,
27 L: RunLoad<Out> + Sync,
28{
29 fn run<'e, E>(&'e self, ex: &'e E, rows: &'e mut [Out]) -> BoxFuture<'e, Result<(), Error>>
30 where
31 E: Executor + Send + Sync + 'e,
32 Out: Send + 'e,
33 {
34 Box::pin(async move {
35 self.prev.run(ex, rows).await?;
36 self.load.run(ex, rows).await
37 })
38 }
39}
40
41pub trait RunLoad<Out> {
42 fn run<'e, E>(&'e self, ex: &'e E, rows: &'e mut [Out]) -> BoxFuture<'e, Result<(), Error>>
43 where
44 E: Executor + Send + Sync + 'e,
45 Out: Send + 'e;
46}
47
48pub fn load_selectin_has_many<'e, E, Out, Rel, ChildOut, Nested>(
49 ex: &'e E,
50 rows: &'e mut [Out],
51 rel: Rel,
52 nested: &'e Nested,
53) -> BoxFuture<'e, Result<(), Error>>
54where
55 E: Executor + Send + Sync + 'e,
56 Rel: RelationInfo + Clone + Send + 'e,
57 Out: ModelValue + SetRelation<Rel, Vec<ChildOut>> + Send,
58 ChildOut: ModelValue + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
59 Nested: RunLoads<ChildOut> + Sync,
60{
61 Box::pin(async move {
62 if rows.is_empty() {
63 return Ok(());
64 }
65
66 let relation = rel.relation();
67 let mut keys: Vec<Value> = Vec::new();
68 for row in rows.iter() {
69 if let Some(value) = row.column_value(relation.parent_key) {
70 if value == Value::Null {
71 continue;
72 }
73 if !keys.iter().any(|existing| existing == &value) {
74 keys.push(value);
75 }
76 }
77 }
78
79 if keys.is_empty() {
80 return Ok(());
81 }
82
83 let filter = Expr::new(ExprNode::In {
84 expr: Box::new(ExprNode::Column(relation.child_key)),
85 values: keys.clone(),
86 });
87
88 let query: Select<ChildOut> = Select::new(relation.child).filter(filter);
89 let mut children = query.all(ex).await?;
90 nested.run(ex, &mut children).await?;
91
92 let mut groups: Vec<(Value, Vec<ChildOut>)> = Vec::new();
93 for child in children {
94 let Some(key) = child.column_value(relation.child_key) else {
95 return Err(Error::RelationMismatch);
96 };
97 if key == Value::Null {
98 continue;
99 }
100 match groups.iter_mut().find(|(k, _)| *k == key) {
101 Some((_, items)) => items.push(child),
102 None => groups.push((key, vec![child])),
103 }
104 }
105
106 for row in rows.iter_mut() {
107 let Some(key) = row.column_value(relation.parent_key) else {
108 return Err(Error::RelationMismatch);
109 };
110 if key == Value::Null {
111 row.set_relation(rel.clone(), Vec::new())?;
112 continue;
113 }
114 let mut value = Vec::new();
115 if let Some((_, items)) = groups.iter_mut().find(|(k, _)| *k == key) {
116 value = std::mem::take(items);
117 }
118 row.set_relation(rel.clone(), value)?;
119 }
120
121 Ok(())
122 })
123}
124
125pub fn load_selectin_belongs_to<'e, E, Out, Rel, ParentOut, Nested>(
126 ex: &'e E,
127 rows: &'e mut [Out],
128 rel: Rel,
129 nested: &'e Nested,
130) -> BoxFuture<'e, Result<(), Error>>
131where
132 E: Executor + Send + Sync + 'e,
133 Rel: RelationInfo + Clone + Send + 'e,
134 Out: ModelValue + SetRelation<Rel, Option<ParentOut>> + Send,
135 ParentOut: ModelValue + Clone + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
136 Nested: RunLoads<ParentOut> + Sync,
137{
138 Box::pin(async move {
139 if rows.is_empty() {
140 return Ok(());
141 }
142
143 let relation = rel.relation();
144 let mut keys: Vec<Value> = Vec::new();
145 for row in rows.iter() {
146 if let Some(value) = row.column_value(relation.child_key) {
147 if value == Value::Null {
148 continue;
149 }
150 if !keys.iter().any(|existing| existing == &value) {
151 keys.push(value);
152 }
153 }
154 }
155
156 if keys.is_empty() {
157 return Ok(());
158 }
159
160 let filter = Expr::new(ExprNode::In {
161 expr: Box::new(ExprNode::Column(relation.parent_key)),
162 values: keys.clone(),
163 });
164
165 let query: Select<ParentOut> = Select::new(relation.parent).filter(filter);
166 let mut parents = query.all(ex).await?;
167 nested.run(ex, &mut parents).await?;
168
169 for row in rows.iter_mut() {
170 let key = row.column_value(relation.child_key);
171 let value = key.and_then(|key| {
172 if key == Value::Null {
173 return None;
174 }
175 parents
176 .iter()
177 .find(|parent| parent.column_value(relation.parent_key) == Some(key.clone()))
178 .cloned()
179 });
180 row.set_relation(rel.clone(), value)?;
181 }
182
183 Ok(())
184 })
185}
186
187pub fn load_joined_has_many<'e, E, Out, Rel, ChildOut, Nested>(
188 ex: &'e E,
189 rows: &'e mut [Out],
190 rel: Rel,
191 nested: &'e Nested,
192) -> BoxFuture<'e, Result<(), Error>>
193where
194 E: Executor + Send + Sync + 'e,
195 Rel: RelationInfo + Clone + Send + 'e,
196 Out: GetRelation<Rel, Vec<ChildOut>> + Send,
197 ChildOut: ModelValue + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
198 Nested: RunLoads<ChildOut> + Sync,
199{
200 Box::pin(async move {
201 for row in rows.iter_mut() {
202 let Some(children) = row.get_relation_mut(rel.clone()) else {
203 return Err(Error::RelationMismatch);
204 };
205 if !children.is_empty() {
206 nested.run(ex, children.as_mut_slice()).await?;
207 }
208 }
209 Ok(())
210 })
211}
212
213pub fn load_selectin_many_to_many<'e, E, Out, Rel, Through, ChildOut, Nested>(
214 ex: &'e E,
215 rows: &'e mut [Out],
216 rel: Rel,
217 nested: &'e Nested,
218) -> BoxFuture<'e, Result<(), Error>>
219where
220 E: Executor + Send + Sync + 'e,
221 Rel: RelationInfo + ManyToManyThrough<Through = Through> + Clone + Send + 'e,
222 Out: ModelValue + SetRelation<Rel, Vec<ChildOut>> + Send,
223 Through: ModelValue + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
224 ChildOut: ModelValue + Clone + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
225 Nested: RunLoads<ChildOut> + Sync,
226{
227 Box::pin(async move {
228 if rows.is_empty() {
229 return Ok(());
230 }
231
232 let relation = rel.relation();
233 let join_parent_key = relation.join_parent_key.ok_or(Error::RelationMismatch)?;
234 let join_child_key = relation.join_child_key.ok_or(Error::RelationMismatch)?;
235 let join_table = relation.join_table.ok_or(Error::RelationMismatch)?;
236
237 let mut parent_keys: Vec<Value> = Vec::new();
238 for row in rows.iter() {
239 if let Some(value) = row.column_value(relation.parent_key) {
240 if value == Value::Null {
241 continue;
242 }
243 if !parent_keys.iter().any(|existing| existing == &value) {
244 parent_keys.push(value);
245 }
246 }
247 }
248
249 if parent_keys.is_empty() {
250 for row in rows.iter_mut() {
251 row.set_relation(rel.clone(), Vec::new())?;
252 }
253 return Ok(());
254 }
255
256 let join_filter = Expr::new(ExprNode::In {
257 expr: Box::new(ExprNode::Column(join_parent_key)),
258 values: parent_keys.clone(),
259 });
260 let join_query: Select<Through> = Select::new(join_table).filter(join_filter);
261 let join_rows = join_query.all(ex).await?;
262
263 let mut parent_to_child: Vec<(Value, Vec<Value>)> = Vec::new();
264 for row in join_rows {
265 let Some(parent_key) = row.column_value(join_parent_key) else {
266 return Err(Error::RelationMismatch);
267 };
268 let Some(child_key) = row.column_value(join_child_key) else {
269 return Err(Error::RelationMismatch);
270 };
271 if parent_key == Value::Null || child_key == Value::Null {
272 continue;
273 }
274 match parent_to_child.iter_mut().find(|(key, _)| *key == parent_key) {
275 Some((_, children)) => {
276 if !children.iter().any(|existing| existing == &child_key) {
277 children.push(child_key);
278 }
279 }
280 None => parent_to_child.push((parent_key, vec![child_key])),
281 }
282 }
283
284 let mut child_keys: Vec<Value> = Vec::new();
285 for (_, children) in parent_to_child.iter() {
286 for child_key in children {
287 if !child_keys.iter().any(|existing| existing == child_key) {
288 child_keys.push(child_key.clone());
289 }
290 }
291 }
292
293 if child_keys.is_empty() {
294 for row in rows.iter_mut() {
295 row.set_relation(rel.clone(), Vec::new())?;
296 }
297 return Ok(());
298 }
299
300 let child_filter = Expr::new(ExprNode::In {
301 expr: Box::new(ExprNode::Column(relation.child_key)),
302 values: child_keys.clone(),
303 });
304 let child_query: Select<ChildOut> = Select::new(relation.child).filter(child_filter);
305 let mut children = child_query.all(ex).await?;
306 nested.run(ex, &mut children).await?;
307
308 for row in rows.iter_mut() {
309 let Some(parent_key) = row.column_value(relation.parent_key) else {
310 return Err(Error::RelationMismatch);
311 };
312 if parent_key == Value::Null {
313 row.set_relation(rel.clone(), Vec::new())?;
314 continue;
315 }
316 let mut value = Vec::new();
317 if let Some((_, child_ids)) = parent_to_child.iter().find(|(key, _)| *key == parent_key) {
318 for child_id in child_ids {
319 if let Some(child) = children
320 .iter()
321 .find(|child| child.column_value(relation.child_key) == Some(child_id.clone()))
322 {
323 value.push(child.clone());
324 }
325 }
326 }
327 row.set_relation(rel.clone(), value)?;
328 }
329
330 Ok(())
331 })
332}
333
334pub fn load_joined_belongs_to<'e, E, Out, Rel, ParentOut, Nested>(
335 ex: &'e E,
336 rows: &'e mut [Out],
337 rel: Rel,
338 nested: &'e Nested,
339) -> BoxFuture<'e, Result<(), Error>>
340where
341 E: Executor + Send + Sync + 'e,
342 Rel: RelationInfo + Clone + Send + 'e,
343 Out: GetRelation<Rel, Option<ParentOut>> + Send,
344 ParentOut: ModelValue + Clone + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
345 Nested: RunLoads<ParentOut> + Sync,
346{
347 Box::pin(async move {
348 for row in rows.iter_mut() {
349 let Some(parent) = row.get_relation_mut(rel.clone()) else {
350 return Err(Error::RelationMismatch);
351 };
352 if let Some(parent) = parent.as_mut() {
353 nested.run(ex, std::slice::from_mut(parent)).await?;
354 }
355 }
356 Ok(())
357 })
358}
359
360pub fn load_joined_many_to_many<'e, E, Out, Rel, Through, ChildOut, Nested>(
361 ex: &'e E,
362 rows: &'e mut [Out],
363 rel: Rel,
364 nested: &'e Nested,
365) -> BoxFuture<'e, Result<(), Error>>
366where
367 E: Executor + Send + Sync + 'e,
368 Rel: RelationInfo + ManyToManyThrough<Through = Through> + Clone + Send + 'e,
369 Out: GetRelation<Rel, Vec<ChildOut>> + Send,
370 Through: ModelValue + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
371 ChildOut: ModelValue + Clone + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send + Unpin,
372 Nested: RunLoads<ChildOut> + Sync,
373{
374 Box::pin(async move {
375 for row in rows.iter_mut() {
376 let Some(children) = row.get_relation_mut(rel.clone()) else {
377 return Err(Error::RelationMismatch);
378 };
379 if !children.is_empty() {
380 nested.run(ex, children.as_mut_slice()).await?;
381 }
382 }
383 Ok(())
384 })
385}