1use std::collections::HashSet;
2
3use crate::ast::{Condition, Expr, LogicalOp, Operator, Value};
4use crate::schema::RelationRegistry;
5
6use super::{FilterClause, NormalizedSelect};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum NestedRelationKind {
11 ForwardObject,
13 ReverseArray,
15}
16
17#[derive(Debug, Clone, PartialEq)]
19pub struct NestedBatchPlan {
20 pub kind: NestedRelationKind,
21 pub parent_table: String,
22 pub related_table: String,
23 pub parent_key_column: String,
25 pub related_match_column: String,
27 pub query: NormalizedSelect,
29}
30
31impl NestedBatchPlan {
32 pub fn to_qail(&self) -> crate::ast::Qail {
34 self.query.to_qail()
35 }
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum BatchPlanError {
41 RelationNotFound {
42 parent_table: String,
43 related_table: String,
44 },
45}
46
47impl std::fmt::Display for BatchPlanError {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 match self {
50 Self::RelationNotFound {
51 parent_table,
52 related_table,
53 } => write!(
54 f,
55 "no relation found between '{}' and '{}'",
56 parent_table, related_table
57 ),
58 }
59 }
60}
61
62impl std::error::Error for BatchPlanError {}
63
64pub fn plan_nested_batch_fetch(
68 relations: &RelationRegistry,
69 parent_table: &str,
70 related_table: &str,
71 parent_keys: Vec<Value>,
72) -> Result<Option<NestedBatchPlan>, BatchPlanError> {
73 let normalized_keys = canonicalize_non_null_values(parent_keys);
74 if normalized_keys.is_empty() {
75 return Ok(None);
76 }
77
78 let (kind, parent_key_column, related_match_column) =
79 if let Some((fk_col, ref_col)) = relations.get(parent_table, related_table) {
80 (
81 NestedRelationKind::ForwardObject,
82 fk_col.to_string(),
83 ref_col.to_string(),
84 )
85 } else if let Some((fk_col, ref_col)) = relations.get(related_table, parent_table) {
86 (
87 NestedRelationKind::ReverseArray,
88 ref_col.to_string(),
89 fk_col.to_string(),
90 )
91 } else {
92 return Err(BatchPlanError::RelationNotFound {
93 parent_table: parent_table.to_string(),
94 related_table: related_table.to_string(),
95 });
96 };
97
98 let query = NormalizedSelect {
99 table: related_table.to_string(),
100 columns: vec![Expr::Star],
101 joins: Vec::new(),
102 filters: vec![FilterClause {
103 logical_op: LogicalOp::And,
104 conditions: vec![Condition {
105 left: Expr::Named(related_match_column.clone()),
106 op: Operator::In,
107 value: Value::Array(normalized_keys),
108 is_array_unnest: false,
109 }],
110 }],
111 order_by: Vec::new(),
112 limit: None,
113 offset: None,
114 }
115 .cleaned();
116
117 Ok(Some(NestedBatchPlan {
118 kind,
119 parent_table: parent_table.to_string(),
120 related_table: related_table.to_string(),
121 parent_key_column,
122 related_match_column,
123 query,
124 }))
125}
126
127fn canonicalize_non_null_values(values: Vec<Value>) -> Vec<Value> {
128 let mut pairs: Vec<(String, Value)> = values
129 .into_iter()
130 .filter(|v| !is_null_like(v))
131 .map(|v| (value_signature(&v), v))
132 .collect();
133 pairs.sort_by(|a, b| a.0.cmp(&b.0));
134
135 let mut seen = HashSet::new();
136 let mut deduped = Vec::with_capacity(pairs.len());
137 for (sig, value) in pairs {
138 if seen.insert(sig) {
139 deduped.push(value);
140 }
141 }
142 deduped
143}
144
145fn is_null_like(value: &Value) -> bool {
146 matches!(value, Value::Null | Value::NullUuid)
147}
148
149fn value_signature(value: &Value) -> String {
150 format!("{}", value)
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156 use crate::ast::{Action, Qail};
157 use crate::optimizer::normalize_select;
158 use uuid::Uuid;
159
160 #[test]
161 fn forward_relation_builds_object_plan() {
162 let mut relations = RelationRegistry::new();
163 relations.register("orders", "user_id", "users", "id");
164
165 let plan = plan_nested_batch_fetch(
166 &relations,
167 "orders",
168 "users",
169 vec![
170 Value::String("u2".to_string()),
171 Value::Null,
172 Value::String("u1".to_string()),
173 Value::String("u2".to_string()),
174 ],
175 )
176 .expect("planning should succeed")
177 .expect("keys are non-empty");
178
179 assert_eq!(plan.kind, NestedRelationKind::ForwardObject);
180 assert_eq!(plan.parent_key_column, "user_id");
181 assert_eq!(plan.related_match_column, "id");
182 assert_eq!(plan.related_table, "users");
183 assert_eq!(plan.query.table, "users");
184
185 let Some(filter) = plan.query.filters.first() else {
186 panic!("missing filter");
187 };
188 assert_eq!(filter.conditions.len(), 1);
189 assert_eq!(filter.conditions[0].left, Expr::Named("id".to_string()));
190 assert_eq!(filter.conditions[0].op, Operator::In);
191 assert_eq!(
192 filter.conditions[0].value,
193 Value::Array(vec![
194 Value::String("u1".to_string()),
195 Value::String("u2".to_string()),
196 ])
197 );
198 }
199
200 #[test]
201 fn reverse_relation_builds_array_plan() {
202 let mut relations = RelationRegistry::new();
203 relations.register("posts", "user_id", "users", "id");
204
205 let plan = plan_nested_batch_fetch(
206 &relations,
207 "users",
208 "posts",
209 vec![Value::Int(2), Value::Int(1)],
210 )
211 .expect("planning should succeed")
212 .expect("keys are non-empty");
213
214 assert_eq!(plan.kind, NestedRelationKind::ReverseArray);
215 assert_eq!(plan.parent_key_column, "id");
216 assert_eq!(plan.related_match_column, "user_id");
217 assert_eq!(plan.related_table, "posts");
218
219 let Some(filter) = plan.query.filters.first() else {
220 panic!("missing filter");
221 };
222 assert_eq!(
223 filter.conditions[0].left,
224 Expr::Named("user_id".to_string())
225 );
226 assert_eq!(
227 filter.conditions[0].value,
228 Value::Array(vec![Value::Int(1), Value::Int(2)])
229 );
230 }
231
232 #[test]
233 fn missing_relation_returns_error() {
234 let relations = RelationRegistry::new();
235 let err = plan_nested_batch_fetch(
236 &relations,
237 "users",
238 "invoices",
239 vec![Value::Int(1), Value::Int(2)],
240 )
241 .expect_err("relation should be required");
242
243 assert_eq!(
244 err,
245 BatchPlanError::RelationNotFound {
246 parent_table: "users".to_string(),
247 related_table: "invoices".to_string(),
248 }
249 );
250 }
251
252 #[test]
253 fn null_only_keys_skip_plan() {
254 let mut relations = RelationRegistry::new();
255 relations.register("posts", "user_id", "users", "id");
256
257 let plan = plan_nested_batch_fetch(
258 &relations,
259 "users",
260 "posts",
261 vec![Value::Null, Value::NullUuid],
262 )
263 .expect("planning should succeed");
264
265 assert!(plan.is_none());
266 }
267
268 #[test]
269 fn plan_query_roundtrips_through_qail() {
270 let mut relations = RelationRegistry::new();
271 relations.register("posts", "user_id", "users", "id");
272
273 let plan = plan_nested_batch_fetch(
274 &relations,
275 "users",
276 "posts",
277 vec![Value::Uuid(Uuid::nil()), Value::Uuid(Uuid::nil())],
278 )
279 .expect("planning should succeed")
280 .expect("keys are non-empty");
281
282 let qail: Qail = plan.to_qail();
283 assert_eq!(qail.action, Action::Get);
284 let normalized = normalize_select(&qail).expect("planned query should normalize");
285 assert!(plan.query.equivalent_shape(&normalized));
286 }
287}