Skip to main content

reddb_server/storage/query/parser/
join.rs

1//! Join query parsing (FROM ... JOIN GRAPH/PATH/TABLE/VECTOR ...)
2
3use super::super::ast::{
4    FieldRef, GraphQuery, JoinCondition, JoinQuery, JoinType, Projection, QueryExpr, SelectItem,
5    TableQuery,
6};
7use super::super::lexer::Token;
8use super::error::ParseError;
9use super::Parser;
10use crate::storage::query::sql_lowering::{filter_to_expr, projection_to_select_item};
11impl<'a> Parser<'a> {
12    /// Parse FROM ... JOIN (GRAPH / PATH / TABLE / VECTOR / HYBRID) query
13    pub fn parse_from_query(&mut self) -> Result<QueryExpr, ParseError> {
14        self.expect(Token::From)?;
15
16        // Fase 1.7 unlock: `FROM (SELECT … FROM t) AS alias` — subquery
17        // in FROM position. Detect by peeking LParen before falling
18        // through to the legacy identifier-only parse_table_source.
19        // The subquery branch builds the outer TableQuery via
20        // `TableQuery::from_subquery` which sets `source` to
21        // `TableSource::Subquery` and marks `table` with a sentinel
22        // `__subq_<alias>` so code that still reads `table.as_str()`
23        // errors loudly instead of silently mis-resolving.
24        let mut table_query = if self.check(&Token::LParen) {
25            self.advance()?; // consume `(`
26                             // Only SELECT is allowed in a FROM subquery — reject
27                             // `FROM (MATCH … RETURN)` and other non-SELECT shapes.
28            if !self.check(&Token::Select) {
29                return Err(ParseError::new(
30                    "subquery in FROM must start with SELECT".to_string(),
31                    self.position(),
32                ));
33            }
34            let inner = self.parse_select_query()?;
35            self.expect(Token::RParen)?;
36            let alias = if self.consume(&Token::As)?
37                || (self.check(&Token::Ident("".into())) && !self.is_join_keyword())
38            {
39                Some(self.expect_ident()?)
40            } else {
41                None
42            };
43            TableQuery::from_subquery(inner, alias)
44        } else {
45            // Parse table name and alias
46            let table = self.parse_table_source()?;
47            let alias = if self.consume(&Token::As)?
48                || (self.check(&Token::Ident("".into())) && !self.is_join_keyword())
49            {
50                Some(self.expect_ident()?)
51            } else {
52                None
53            };
54            TableQuery {
55                table,
56                source: None,
57                alias,
58                select_items: Vec::new(),
59                columns: Vec::new(),
60                where_expr: None,
61                filter: None,
62                group_by_exprs: Vec::new(),
63                group_by: Vec::new(),
64                having_expr: None,
65                having: None,
66                order_by: Vec::new(),
67                limit: None,
68                limit_param: None,
69                offset: None,
70                offset_param: None,
71                expand: None,
72                as_of: None,
73            }
74        };
75
76        // Check for JOIN
77        if self.is_join_keyword() {
78            return self.parse_join_query(QueryExpr::Table(table_query));
79        }
80
81        // Parse optional WHERE clause
82        if self.consume(&Token::Where)? {
83            let filter = self.parse_filter()?;
84            table_query.where_expr = Some(filter_to_expr(&filter));
85            table_query.filter = Some(filter);
86        }
87
88        // Parse optional ORDER BY
89        if self.consume(&Token::Order)? {
90            self.expect(Token::By)?;
91            table_query.order_by = self.parse_order_by_list()?;
92        }
93
94        // Parse optional LIMIT/OFFSET
95        if self.consume(&Token::Limit)? {
96            table_query.limit = Some(self.parse_integer()? as u64);
97        }
98        if self.consume(&Token::Offset)? {
99            table_query.offset = Some(self.parse_integer()? as u64);
100        }
101
102        // Check for RETURN (shorthand for column selection)
103        if self.consume(&Token::Return)? {
104            let (select_items, columns) = self.parse_select_items_and_projections()?;
105            table_query.select_items = select_items;
106            table_query.columns = columns;
107        }
108
109        Ok(QueryExpr::Table(table_query))
110    }
111
112    /// Check if current token is a join keyword
113    pub fn is_join_keyword(&self) -> bool {
114        matches!(
115            self.peek(),
116            Token::Join | Token::Inner | Token::Left | Token::Right | Token::Full | Token::Cross
117        )
118    }
119
120    /// Parse JOIN query
121    pub(crate) fn parse_join_query(&mut self, left: QueryExpr) -> Result<QueryExpr, ParseError> {
122        // Parse join type
123        let join_type = if self.consume(&Token::Inner)? {
124            self.expect(Token::Join)?;
125            JoinType::Inner
126        } else if self.consume(&Token::Left)? {
127            self.consume(&Token::Outer)?;
128            self.expect(Token::Join)?;
129            JoinType::LeftOuter
130        } else if self.consume(&Token::Right)? {
131            self.consume(&Token::Outer)?;
132            self.expect(Token::Join)?;
133            JoinType::RightOuter
134        } else if self.consume(&Token::Full)? {
135            // `FULL JOIN` and `FULL OUTER JOIN` are aliases.
136            self.consume(&Token::Outer)?;
137            self.expect(Token::Join)?;
138            JoinType::FullOuter
139        } else if self.consume(&Token::Cross)? {
140            self.expect(Token::Join)?;
141            JoinType::Cross
142        } else {
143            self.expect(Token::Join)?;
144            JoinType::Inner
145        };
146
147        if self.consume(&Token::Graph)? {
148            return self.parse_graph_join_query(left, join_type);
149        }
150        if self.check(&Token::Path) {
151            return self.parse_path_join_query(left, join_type);
152        }
153        if self.check(&Token::Vector) {
154            return self.parse_vector_join_query(left, join_type);
155        }
156        if self.check(&Token::Hybrid) {
157            return self.parse_hybrid_join_query(left, join_type);
158        }
159
160        self.parse_table_join_query(left, join_type)
161    }
162
163    fn parse_graph_join_query(
164        &mut self,
165        left: QueryExpr,
166        join_type: JoinType,
167    ) -> Result<QueryExpr, ParseError> {
168        // Parse graph pattern
169        let pattern = self.parse_graph_pattern()?;
170        let alias = self.parse_join_rhs_alias()?;
171
172        // Parse ON condition
173        self.expect(Token::On)?;
174        let on = self.parse_graph_join_condition()?;
175        let (filter, order_by, limit, offset, return_items, return_) =
176            self.parse_join_post_clauses()?;
177        let graph_query = GraphQuery {
178            alias,
179            pattern,
180            filter: None,
181            return_: Vec::new(),
182            limit: None,
183        };
184
185        Ok(QueryExpr::Join(JoinQuery {
186            left: Box::new(left),
187            right: Box::new(QueryExpr::Graph(graph_query)),
188            join_type,
189            on,
190            filter,
191            order_by,
192            limit,
193            offset,
194            return_items,
195            return_,
196        }))
197    }
198
199    fn parse_table_join_query(
200        &mut self,
201        left: QueryExpr,
202        join_type: JoinType,
203    ) -> Result<QueryExpr, ParseError> {
204        let table = self.parse_table_source()?;
205        let alias = if self.consume(&Token::As)?
206            || (self.check(&Token::Ident("".into())) && !self.is_clause_keyword())
207        {
208            Some(self.expect_ident()?)
209        } else {
210            None
211        };
212
213        // CROSS JOIN has no ON clause — emit a sentinel JoinCondition
214        // that the runtime join loops treat as "always matches".
215        let on = if matches!(join_type, JoinType::Cross) {
216            cross_join_sentinel()
217        } else {
218            self.expect(Token::On)?;
219            self.parse_table_join_condition()?
220        };
221        let table_query = TableQuery {
222            table,
223            source: None,
224            alias,
225            select_items: Vec::new(),
226            columns: Vec::new(),
227            where_expr: None,
228            filter: None,
229            group_by_exprs: Vec::new(),
230            group_by: Vec::new(),
231            having_expr: None,
232            having: None,
233            order_by: Vec::new(),
234            limit: None,
235            limit_param: None,
236            offset: None,
237            offset_param: None,
238            expand: None,
239            as_of: None,
240        };
241
242        let mut expr = QueryExpr::Join(JoinQuery {
243            left: Box::new(left),
244            right: Box::new(QueryExpr::Table(table_query)),
245            join_type,
246            on,
247            filter: None,
248            order_by: Vec::new(),
249            limit: None,
250            offset: None,
251            return_items: Vec::new(),
252            return_: Vec::new(),
253        });
254
255        if self.is_join_keyword() {
256            return self.parse_join_query(expr);
257        }
258
259        let (filter, order_by, limit, offset, _, return_) = self.parse_join_post_clauses()?;
260        let return_ = normalize_table_join_return_projections(return_);
261        let return_items = return_
262            .iter()
263            .filter_map(projection_to_select_item)
264            .collect();
265        if let QueryExpr::Join(join) = &mut expr {
266            join.filter = filter;
267            join.order_by = order_by;
268            join.limit = limit;
269            join.offset = offset;
270            join.return_items = return_items;
271            join.return_ = return_;
272        }
273        Ok(expr)
274    }
275
276    fn parse_vector_join_query(
277        &mut self,
278        left: QueryExpr,
279        join_type: JoinType,
280    ) -> Result<QueryExpr, ParseError> {
281        let mut right = match self.parse_vector_query()? {
282            QueryExpr::Vector(query) => query,
283            _ => unreachable!("vector parser must return QueryExpr::Vector"),
284        };
285        right.alias = self.parse_join_rhs_alias()?;
286        self.expect(Token::On)?;
287        let on = self.parse_table_join_condition()?;
288        let (filter, order_by, limit, offset, return_items, return_) =
289            self.parse_join_post_clauses()?;
290
291        Ok(QueryExpr::Join(JoinQuery {
292            left: Box::new(left),
293            right: Box::new(QueryExpr::Vector(right)),
294            join_type,
295            on,
296            filter,
297            order_by,
298            limit,
299            offset,
300            return_items,
301            return_,
302        }))
303    }
304
305    fn parse_path_join_query(
306        &mut self,
307        left: QueryExpr,
308        join_type: JoinType,
309    ) -> Result<QueryExpr, ParseError> {
310        let mut right = match self.parse_path_query()? {
311            QueryExpr::Path(query) => query,
312            _ => unreachable!("path parser must return QueryExpr::Path"),
313        };
314        right.alias = self.parse_join_rhs_alias()?;
315        self.expect(Token::On)?;
316        let on = self.parse_table_join_condition()?;
317        let (filter, order_by, limit, offset, return_items, return_) =
318            self.parse_join_post_clauses()?;
319
320        Ok(QueryExpr::Join(JoinQuery {
321            left: Box::new(left),
322            right: Box::new(QueryExpr::Path(right)),
323            join_type,
324            on,
325            filter,
326            order_by,
327            limit,
328            offset,
329            return_items,
330            return_,
331        }))
332    }
333
334    fn parse_hybrid_join_query(
335        &mut self,
336        left: QueryExpr,
337        join_type: JoinType,
338    ) -> Result<QueryExpr, ParseError> {
339        let mut right = match self.parse_hybrid_query()? {
340            QueryExpr::Hybrid(query) => query,
341            _ => unreachable!("hybrid parser must return QueryExpr::Hybrid"),
342        };
343        right.alias = self.parse_join_rhs_alias()?;
344        self.expect(Token::On)?;
345        let on = self.parse_table_join_condition()?;
346        let (filter, order_by, limit, offset, return_items, return_) =
347            self.parse_join_post_clauses()?;
348
349        Ok(QueryExpr::Join(JoinQuery {
350            left: Box::new(left),
351            right: Box::new(QueryExpr::Hybrid(right)),
352            join_type,
353            on,
354            filter,
355            order_by,
356            limit,
357            offset,
358            return_items,
359            return_,
360        }))
361    }
362
363    fn parse_join_post_clauses(
364        &mut self,
365    ) -> Result<
366        (
367            Option<super::super::ast::Filter>,
368            Vec<super::super::ast::OrderByClause>,
369            Option<u64>,
370            Option<u64>,
371            Vec<SelectItem>,
372            Vec<super::super::ast::Projection>,
373        ),
374        ParseError,
375    > {
376        let mut filter = None;
377        let mut order_by = Vec::new();
378        let mut limit = None;
379        let mut offset = None;
380        let mut return_items = Vec::new();
381        let mut return_ = Vec::new();
382
383        loop {
384            if self.consume(&Token::Where)? {
385                filter = Some(self.parse_filter()?);
386            } else if self.consume(&Token::Order)? {
387                self.expect(Token::By)?;
388                order_by = self.parse_order_by_list()?;
389            } else if self.consume(&Token::Limit)? {
390                limit = Some(self.parse_integer()? as u64);
391            } else if self.consume(&Token::Offset)? {
392                offset = Some(self.parse_integer()? as u64);
393            } else if self.consume(&Token::Return)? {
394                return_ = self.parse_return_list()?;
395                return_items = return_
396                    .iter()
397                    .filter_map(projection_to_select_item)
398                    .collect();
399            } else {
400                break;
401            }
402        }
403
404        Ok((filter, order_by, limit, offset, return_items, return_))
405    }
406
407    fn parse_join_rhs_alias(&mut self) -> Result<Option<String>, ParseError> {
408        if self.consume(&Token::As)?
409            || (self.check(&Token::Ident("".into())) && !self.is_join_rhs_clause_keyword())
410        {
411            Ok(Some(self.expect_ident()?))
412        } else {
413            Ok(None)
414        }
415    }
416
417    fn is_join_rhs_clause_keyword(&self) -> bool {
418        matches!(
419            self.peek(),
420            Token::On
421                | Token::Where
422                | Token::Order
423                | Token::Limit
424                | Token::Offset
425                | Token::Return
426                | Token::Join
427                | Token::Inner
428                | Token::Left
429                | Token::Right
430        )
431    }
432
433    fn parse_table_source(&mut self) -> Result<String, ParseError> {
434        if self.consume(&Token::Star)? {
435            Ok("*".to_string())
436        } else if self.consume(&Token::All)? {
437            Ok("all".to_string())
438        } else {
439            self.expect_ident()
440        }
441    }
442
443    /// Parse join condition: table.col = node.prop
444    fn parse_graph_join_condition(&mut self) -> Result<JoinCondition, ParseError> {
445        let left_field = self.parse_field_ref()?;
446        self.expect(Token::Eq)?;
447        let right_first = self.expect_ident()?;
448        self.expect(Token::Dot)?;
449        let right_second = self.expect_ident()?;
450
451        // Try to determine if right side is node property or ID
452        let right_field = if right_second == "id" {
453            FieldRef::NodeId { alias: right_first }
454        } else {
455            FieldRef::NodeProperty {
456                alias: right_first,
457                property: right_second,
458            }
459        };
460
461        Ok(JoinCondition {
462            left_field,
463            right_field,
464        })
465    }
466
467    fn parse_table_join_condition(&mut self) -> Result<JoinCondition, ParseError> {
468        let left_field = self.parse_field_ref()?;
469        self.expect(Token::Eq)?;
470        let right_field = self.parse_field_ref()?;
471
472        Ok(JoinCondition {
473            left_field,
474            right_field,
475        })
476    }
477}
478
479fn normalize_table_join_return_projections(projections: Vec<Projection>) -> Vec<Projection> {
480    projections
481        .into_iter()
482        .map(|projection| match projection {
483            Projection::Field(FieldRef::NodeProperty { alias, property }, output_alias) => {
484                let output_alias = output_alias.or_else(|| Some(property.clone()));
485                Projection::Field(
486                    FieldRef::TableColumn {
487                        table: alias,
488                        column: property,
489                    },
490                    output_alias,
491                )
492            }
493            other => other,
494        })
495        .collect()
496}
497
498/// Sentinel JoinCondition used for CROSS JOIN — neither field references
499/// anything real. The runtime join loops must detect
500/// `join_type == JoinType::Cross` and skip predicate evaluation entirely
501/// rather than resolving these empty fields.
502fn cross_join_sentinel() -> JoinCondition {
503    JoinCondition {
504        left_field: FieldRef::TableColumn {
505            table: String::new(),
506            column: String::new(),
507        },
508        right_field: FieldRef::TableColumn {
509            table: String::new(),
510            column: String::new(),
511        },
512    }
513}