Skip to main content

reddb_server/storage/query/parser/
join.rs

1//! Join query parsing (FROM ... JOIN GRAPH/PATH/TABLE/VECTOR ...)
2
3use super::super::ast::{
4    FieldRef, GraphQuery, JoinCondition, JoinQuery, JoinType, Projection, QueryExpr, SelectItem,
5    TableQuery,
6};
7use super::super::lexer::Token;
8use super::error::ParseError;
9use super::Parser;
10use crate::storage::query::sql_lowering::{filter_to_expr, projection_to_select_item};
11impl<'a> Parser<'a> {
12    /// Parse FROM ... JOIN (GRAPH / PATH / TABLE / VECTOR / HYBRID) query
13    pub fn parse_from_query(&mut self) -> Result<QueryExpr, ParseError> {
14        self.expect(Token::From)?;
15
16        // Fase 1.7 unlock: `FROM (SELECT … FROM t) AS alias` — subquery
17        // in FROM position. Detect by peeking LParen before falling
18        // through to the legacy identifier-only parse_table_source.
19        // The subquery branch builds the outer TableQuery via
20        // `TableQuery::from_subquery` which sets `source` to
21        // `TableSource::Subquery` and marks `table` with a sentinel
22        // `__subq_<alias>` so code that still reads `table.as_str()`
23        // errors loudly instead of silently mis-resolving.
24        let mut table_query = if self.check(&Token::LParen) {
25            self.advance()?; // consume `(`
26                             // Only SELECT is allowed in a FROM subquery — reject
27                             // `FROM (MATCH … RETURN)` and other non-SELECT shapes.
28            if !self.check(&Token::Select) {
29                return Err(ParseError::new(
30                    "subquery in FROM must start with SELECT".to_string(),
31                    self.position(),
32                ));
33            }
34            let inner = self.parse_select_query()?;
35            self.expect(Token::RParen)?;
36            let alias = if self.consume(&Token::As)?
37                || (self.check(&Token::Ident("".into())) && !self.is_join_keyword())
38            {
39                Some(self.expect_ident()?)
40            } else {
41                None
42            };
43            TableQuery::from_subquery(inner, alias)
44        } else {
45            // Parse table name and alias
46            let table = self.parse_table_source()?;
47            let alias = if self.consume(&Token::As)?
48                || (self.check(&Token::Ident("".into())) && !self.is_join_keyword())
49            {
50                Some(self.expect_ident()?)
51            } else {
52                None
53            };
54            TableQuery {
55                table,
56                source: None,
57                alias,
58                select_items: Vec::new(),
59                columns: Vec::new(),
60                where_expr: None,
61                filter: None,
62                group_by_exprs: Vec::new(),
63                group_by: Vec::new(),
64                having_expr: None,
65                having: None,
66                order_by: Vec::new(),
67                limit: None,
68                limit_param: None,
69                offset: None,
70                offset_param: None,
71                expand: None,
72                as_of: None,
73                sessionize: None,
74            }
75        };
76
77        // Check for JOIN
78        if self.is_join_keyword() {
79            return self.parse_join_query(QueryExpr::Table(table_query));
80        }
81
82        // Parse optional WHERE clause
83        if self.consume(&Token::Where)? {
84            let filter = self.parse_filter()?;
85            table_query.where_expr = Some(filter_to_expr(&filter));
86            table_query.filter = Some(filter);
87        }
88
89        // Parse optional ORDER BY
90        if self.consume(&Token::Order)? {
91            self.expect(Token::By)?;
92            table_query.order_by = self.parse_order_by_list()?;
93        }
94
95        // Parse optional LIMIT/OFFSET
96        if self.consume(&Token::Limit)? {
97            table_query.limit = Some(self.parse_integer()? as u64);
98        }
99        if self.consume(&Token::Offset)? {
100            table_query.offset = Some(self.parse_integer()? as u64);
101        }
102
103        // Check for RETURN (shorthand for column selection)
104        if self.consume(&Token::Return)? {
105            let (select_items, columns) = self.parse_select_items_and_projections()?;
106            table_query.select_items = select_items;
107            table_query.columns = columns;
108        }
109
110        Ok(QueryExpr::Table(table_query))
111    }
112
113    /// Check if current token is a join keyword
114    pub fn is_join_keyword(&self) -> bool {
115        matches!(
116            self.peek(),
117            Token::Join | Token::Inner | Token::Left | Token::Right | Token::Full | Token::Cross
118        )
119    }
120
121    /// Parse JOIN query
122    pub(crate) fn parse_join_query(&mut self, left: QueryExpr) -> Result<QueryExpr, ParseError> {
123        // Parse join type
124        let join_type = if self.consume(&Token::Inner)? {
125            self.expect(Token::Join)?;
126            JoinType::Inner
127        } else if self.consume(&Token::Left)? {
128            self.consume(&Token::Outer)?;
129            self.expect(Token::Join)?;
130            JoinType::LeftOuter
131        } else if self.consume(&Token::Right)? {
132            self.consume(&Token::Outer)?;
133            self.expect(Token::Join)?;
134            JoinType::RightOuter
135        } else if self.consume(&Token::Full)? {
136            // `FULL JOIN` and `FULL OUTER JOIN` are aliases.
137            self.consume(&Token::Outer)?;
138            self.expect(Token::Join)?;
139            JoinType::FullOuter
140        } else if self.consume(&Token::Cross)? {
141            self.expect(Token::Join)?;
142            JoinType::Cross
143        } else {
144            self.expect(Token::Join)?;
145            JoinType::Inner
146        };
147
148        if self.consume(&Token::Graph)? {
149            return self.parse_graph_join_query(left, join_type);
150        }
151        if self.check(&Token::Path) {
152            return self.parse_path_join_query(left, join_type);
153        }
154        if self.check(&Token::Vector) {
155            return self.parse_vector_join_query(left, join_type);
156        }
157        if self.check(&Token::Hybrid) {
158            return self.parse_hybrid_join_query(left, join_type);
159        }
160
161        self.parse_table_join_query(left, join_type)
162    }
163
164    fn parse_graph_join_query(
165        &mut self,
166        left: QueryExpr,
167        join_type: JoinType,
168    ) -> Result<QueryExpr, ParseError> {
169        // Parse graph pattern
170        let pattern = self.parse_graph_pattern()?;
171        let alias = self.parse_join_rhs_alias()?;
172
173        // Parse ON condition
174        self.expect(Token::On)?;
175        let on = self.parse_graph_join_condition()?;
176        let (filter, order_by, limit, offset, return_items, return_) =
177            self.parse_join_post_clauses()?;
178        let graph_query = GraphQuery {
179            alias,
180            pattern,
181            filter: None,
182            return_: Vec::new(),
183            limit: None,
184        };
185
186        Ok(QueryExpr::Join(JoinQuery {
187            left: Box::new(left),
188            right: Box::new(QueryExpr::Graph(graph_query)),
189            join_type,
190            on,
191            filter,
192            order_by,
193            limit,
194            offset,
195            return_items,
196            return_,
197        }))
198    }
199
200    fn parse_table_join_query(
201        &mut self,
202        left: QueryExpr,
203        join_type: JoinType,
204    ) -> Result<QueryExpr, ParseError> {
205        let table = self.parse_table_source()?;
206        let alias = if self.consume(&Token::As)?
207            || (self.check(&Token::Ident("".into())) && !self.is_clause_keyword())
208        {
209            Some(self.expect_ident()?)
210        } else {
211            None
212        };
213
214        // CROSS JOIN has no ON clause — emit a sentinel JoinCondition
215        // that the runtime join loops treat as "always matches".
216        let on = if matches!(join_type, JoinType::Cross) {
217            cross_join_sentinel()
218        } else {
219            self.expect(Token::On)?;
220            self.parse_table_join_condition()?
221        };
222        let table_query = TableQuery {
223            table,
224            source: None,
225            alias,
226            select_items: Vec::new(),
227            columns: Vec::new(),
228            where_expr: None,
229            filter: None,
230            group_by_exprs: Vec::new(),
231            group_by: Vec::new(),
232            having_expr: None,
233            having: None,
234            order_by: Vec::new(),
235            limit: None,
236            limit_param: None,
237            offset: None,
238            offset_param: None,
239            expand: None,
240            as_of: None,
241            sessionize: None,
242        };
243
244        let mut expr = QueryExpr::Join(JoinQuery {
245            left: Box::new(left),
246            right: Box::new(QueryExpr::Table(table_query)),
247            join_type,
248            on,
249            filter: None,
250            order_by: Vec::new(),
251            limit: None,
252            offset: None,
253            return_items: Vec::new(),
254            return_: Vec::new(),
255        });
256
257        if self.is_join_keyword() {
258            return self.parse_join_query(expr);
259        }
260
261        let (filter, order_by, limit, offset, _, return_) = self.parse_join_post_clauses()?;
262        let return_ = normalize_table_join_return_projections(return_);
263        let return_items = return_
264            .iter()
265            .filter_map(projection_to_select_item)
266            .collect();
267        if let QueryExpr::Join(join) = &mut expr {
268            join.filter = filter;
269            join.order_by = order_by;
270            join.limit = limit;
271            join.offset = offset;
272            join.return_items = return_items;
273            join.return_ = return_;
274        }
275        Ok(expr)
276    }
277
278    fn parse_vector_join_query(
279        &mut self,
280        left: QueryExpr,
281        join_type: JoinType,
282    ) -> Result<QueryExpr, ParseError> {
283        let mut right = match self.parse_vector_query()? {
284            QueryExpr::Vector(query) => query,
285            _ => unreachable!("vector parser must return QueryExpr::Vector"),
286        };
287        right.alias = self.parse_join_rhs_alias()?;
288        self.expect(Token::On)?;
289        let on = self.parse_table_join_condition()?;
290        let (filter, order_by, limit, offset, return_items, return_) =
291            self.parse_join_post_clauses()?;
292
293        Ok(QueryExpr::Join(JoinQuery {
294            left: Box::new(left),
295            right: Box::new(QueryExpr::Vector(right)),
296            join_type,
297            on,
298            filter,
299            order_by,
300            limit,
301            offset,
302            return_items,
303            return_,
304        }))
305    }
306
307    fn parse_path_join_query(
308        &mut self,
309        left: QueryExpr,
310        join_type: JoinType,
311    ) -> Result<QueryExpr, ParseError> {
312        let mut right = match self.parse_path_query()? {
313            QueryExpr::Path(query) => query,
314            _ => unreachable!("path parser must return QueryExpr::Path"),
315        };
316        right.alias = self.parse_join_rhs_alias()?;
317        self.expect(Token::On)?;
318        let on = self.parse_table_join_condition()?;
319        let (filter, order_by, limit, offset, return_items, return_) =
320            self.parse_join_post_clauses()?;
321
322        Ok(QueryExpr::Join(JoinQuery {
323            left: Box::new(left),
324            right: Box::new(QueryExpr::Path(right)),
325            join_type,
326            on,
327            filter,
328            order_by,
329            limit,
330            offset,
331            return_items,
332            return_,
333        }))
334    }
335
336    fn parse_hybrid_join_query(
337        &mut self,
338        left: QueryExpr,
339        join_type: JoinType,
340    ) -> Result<QueryExpr, ParseError> {
341        let mut right = match self.parse_hybrid_query()? {
342            QueryExpr::Hybrid(query) => query,
343            _ => unreachable!("hybrid parser must return QueryExpr::Hybrid"),
344        };
345        right.alias = self.parse_join_rhs_alias()?;
346        self.expect(Token::On)?;
347        let on = self.parse_table_join_condition()?;
348        let (filter, order_by, limit, offset, return_items, return_) =
349            self.parse_join_post_clauses()?;
350
351        Ok(QueryExpr::Join(JoinQuery {
352            left: Box::new(left),
353            right: Box::new(QueryExpr::Hybrid(right)),
354            join_type,
355            on,
356            filter,
357            order_by,
358            limit,
359            offset,
360            return_items,
361            return_,
362        }))
363    }
364
365    fn parse_join_post_clauses(
366        &mut self,
367    ) -> Result<
368        (
369            Option<super::super::ast::Filter>,
370            Vec<super::super::ast::OrderByClause>,
371            Option<u64>,
372            Option<u64>,
373            Vec<SelectItem>,
374            Vec<super::super::ast::Projection>,
375        ),
376        ParseError,
377    > {
378        let mut filter = None;
379        let mut order_by = Vec::new();
380        let mut limit = None;
381        let mut offset = None;
382        let mut return_items = Vec::new();
383        let mut return_ = Vec::new();
384
385        loop {
386            if self.consume(&Token::Where)? {
387                filter = Some(self.parse_filter()?);
388            } else if self.consume(&Token::Order)? {
389                self.expect(Token::By)?;
390                order_by = self.parse_order_by_list()?;
391            } else if self.consume(&Token::Limit)? {
392                limit = Some(self.parse_integer()? as u64);
393            } else if self.consume(&Token::Offset)? {
394                offset = Some(self.parse_integer()? as u64);
395            } else if self.consume(&Token::Return)? {
396                return_ = self.parse_return_list()?;
397                return_items = return_
398                    .iter()
399                    .filter_map(projection_to_select_item)
400                    .collect();
401            } else {
402                break;
403            }
404        }
405
406        Ok((filter, order_by, limit, offset, return_items, return_))
407    }
408
409    fn parse_join_rhs_alias(&mut self) -> Result<Option<String>, ParseError> {
410        if self.consume(&Token::As)?
411            || (self.check(&Token::Ident("".into())) && !self.is_join_rhs_clause_keyword())
412        {
413            Ok(Some(self.expect_ident()?))
414        } else {
415            Ok(None)
416        }
417    }
418
419    fn is_join_rhs_clause_keyword(&self) -> bool {
420        matches!(
421            self.peek(),
422            Token::On
423                | Token::Where
424                | Token::Order
425                | Token::Limit
426                | Token::Offset
427                | Token::Return
428                | Token::Join
429                | Token::Inner
430                | Token::Left
431                | Token::Right
432        )
433    }
434
435    fn parse_table_source(&mut self) -> Result<String, ParseError> {
436        if self.consume(&Token::Star)? {
437            Ok("*".to_string())
438        } else if self.consume(&Token::All)? {
439            Ok("all".to_string())
440        } else {
441            self.expect_ident()
442        }
443    }
444
445    /// Parse join condition: table.col = node.prop
446    fn parse_graph_join_condition(&mut self) -> Result<JoinCondition, ParseError> {
447        let left_field = self.parse_field_ref()?;
448        self.expect(Token::Eq)?;
449        let right_first = self.expect_ident()?;
450        self.expect(Token::Dot)?;
451        let right_second = self.expect_ident()?;
452
453        // Try to determine if right side is node property or ID
454        let right_field = if right_second == "id" {
455            FieldRef::NodeId { alias: right_first }
456        } else {
457            FieldRef::NodeProperty {
458                alias: right_first,
459                property: right_second,
460            }
461        };
462
463        Ok(JoinCondition {
464            left_field,
465            right_field,
466        })
467    }
468
469    fn parse_table_join_condition(&mut self) -> Result<JoinCondition, ParseError> {
470        let left_field = self.parse_field_ref()?;
471        self.expect(Token::Eq)?;
472        let right_field = self.parse_field_ref()?;
473
474        Ok(JoinCondition {
475            left_field,
476            right_field,
477        })
478    }
479}
480
481fn normalize_table_join_return_projections(projections: Vec<Projection>) -> Vec<Projection> {
482    projections
483        .into_iter()
484        .map(|projection| match projection {
485            Projection::Field(FieldRef::NodeProperty { alias, property }, output_alias) => {
486                let output_alias = output_alias.or_else(|| Some(property.clone()));
487                Projection::Field(
488                    FieldRef::TableColumn {
489                        table: alias,
490                        column: property,
491                    },
492                    output_alias,
493                )
494            }
495            other => other,
496        })
497        .collect()
498}
499
500/// Sentinel JoinCondition used for CROSS JOIN — neither field references
501/// anything real. The runtime join loops must detect
502/// `join_type == JoinType::Cross` and skip predicate evaluation entirely
503/// rather than resolving these empty fields.
504fn cross_join_sentinel() -> JoinCondition {
505    JoinCondition {
506        left_field: FieldRef::TableColumn {
507            table: String::new(),
508            column: String::new(),
509        },
510        right_field: FieldRef::TableColumn {
511            table: String::new(),
512            column: String::new(),
513        },
514    }
515}