Skip to main content

nodedb_query/expr/
codec.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Manual zerompk wire format for [`SqlExpr`].
4//!
5//! Each variant encodes as an array `[tag_u8, field1, field2, ...]`. Tags
6//! are stable and MUST NOT be renumbered — they are on-wire values in
7//! physical-plan envelopes. `Value`, `BinaryOp`, and `CastType` implement
8//! zerompk natively so they nest transparently.
9//!
10//! Tags: Column=0, Literal=1, BinaryOp=2, Negate=3, Function=4, Cast=5,
11//!       Case=6, Coalesce=7, NullIf=8, IsNull=9, OldColumn=10,
12//!       ExcludedColumn=11.
13
14use nodedb_types::Value;
15
16use super::types::{BinaryOp, CastType, SqlExpr};
17
18impl zerompk::ToMessagePack for SqlExpr {
19    fn write<W: zerompk::Write>(&self, writer: &mut W) -> zerompk::Result<()> {
20        match self {
21            SqlExpr::Column(s) => {
22                writer.write_array_len(2)?;
23                writer.write_u8(0)?;
24                writer.write_string(s)
25            }
26            SqlExpr::Literal(v) => {
27                writer.write_array_len(2)?;
28                writer.write_u8(1)?;
29                v.write(writer)
30            }
31            SqlExpr::BinaryOp { left, op, right } => {
32                writer.write_array_len(4)?;
33                writer.write_u8(2)?;
34                left.write(writer)?;
35                op.write(writer)?;
36                right.write(writer)
37            }
38            SqlExpr::Negate(inner) => {
39                writer.write_array_len(2)?;
40                writer.write_u8(3)?;
41                inner.write(writer)
42            }
43            SqlExpr::Function { name, args } => {
44                writer.write_array_len(3)?;
45                writer.write_u8(4)?;
46                writer.write_string(name)?;
47                args.write(writer)
48            }
49            SqlExpr::Cast { expr, to_type } => {
50                writer.write_array_len(3)?;
51                writer.write_u8(5)?;
52                expr.write(writer)?;
53                to_type.write(writer)
54            }
55            SqlExpr::Case {
56                operand,
57                when_thens,
58                else_expr,
59            } => {
60                writer.write_array_len(4)?;
61                writer.write_u8(6)?;
62                operand.write(writer)?;
63                writer.write_array_len(when_thens.len())?;
64                for (cond, val) in when_thens {
65                    writer.write_array_len(2)?;
66                    cond.write(writer)?;
67                    val.write(writer)?;
68                }
69                else_expr.write(writer)
70            }
71            SqlExpr::Coalesce(exprs) => {
72                writer.write_array_len(2)?;
73                writer.write_u8(7)?;
74                exprs.write(writer)
75            }
76            SqlExpr::NullIf(e1, e2) => {
77                writer.write_array_len(3)?;
78                writer.write_u8(8)?;
79                e1.write(writer)?;
80                e2.write(writer)
81            }
82            SqlExpr::IsNull { expr, negated } => {
83                writer.write_array_len(3)?;
84                writer.write_u8(9)?;
85                expr.write(writer)?;
86                writer.write_boolean(*negated)
87            }
88            SqlExpr::OldColumn(s) => {
89                writer.write_array_len(2)?;
90                writer.write_u8(10)?;
91                writer.write_string(s)
92            }
93            SqlExpr::ExcludedColumn(s) => {
94                writer.write_array_len(2)?;
95                writer.write_u8(11)?;
96                writer.write_string(s)
97            }
98        }
99    }
100}
101
102impl<'a> zerompk::FromMessagePack<'a> for SqlExpr {
103    fn read<R: zerompk::Read<'a>>(reader: &mut R) -> zerompk::Result<Self> {
104        let len = reader.read_array_len()?;
105        if len == 0 {
106            return Err(zerompk::Error::ArrayLengthMismatch {
107                expected: 1,
108                actual: 0,
109            });
110        }
111        let tag = reader.read_u8()?;
112        match tag {
113            0 => Ok(SqlExpr::Column(reader.read_string()?.into_owned())),
114            1 => {
115                let v = Value::read(reader)?;
116                Ok(SqlExpr::Literal(v))
117            }
118            2 => {
119                let left = SqlExpr::read(reader)?;
120                let op = BinaryOp::read(reader)?;
121                let right = SqlExpr::read(reader)?;
122                Ok(SqlExpr::BinaryOp {
123                    left: Box::new(left),
124                    op,
125                    right: Box::new(right),
126                })
127            }
128            3 => {
129                let inner = SqlExpr::read(reader)?;
130                Ok(SqlExpr::Negate(Box::new(inner)))
131            }
132            4 => {
133                let name = reader.read_string()?.into_owned();
134                let args = Vec::<SqlExpr>::read(reader)?;
135                Ok(SqlExpr::Function { name, args })
136            }
137            5 => {
138                let expr = SqlExpr::read(reader)?;
139                let to_type = CastType::read(reader)?;
140                Ok(SqlExpr::Cast {
141                    expr: Box::new(expr),
142                    to_type,
143                })
144            }
145            6 => {
146                let operand = Option::<Box<SqlExpr>>::read(reader)?;
147                let wt_len = reader.read_array_len()?;
148                let mut when_thens = Vec::with_capacity(wt_len);
149                for _ in 0..wt_len {
150                    let pair_len = reader.read_array_len()?;
151                    if pair_len != 2 {
152                        return Err(zerompk::Error::ArrayLengthMismatch {
153                            expected: 2,
154                            actual: pair_len,
155                        });
156                    }
157                    let cond = SqlExpr::read(reader)?;
158                    let val = SqlExpr::read(reader)?;
159                    when_thens.push((cond, val));
160                }
161                let else_expr = Option::<Box<SqlExpr>>::read(reader)?;
162                Ok(SqlExpr::Case {
163                    operand,
164                    when_thens,
165                    else_expr,
166                })
167            }
168            7 => {
169                let exprs = Vec::<SqlExpr>::read(reader)?;
170                Ok(SqlExpr::Coalesce(exprs))
171            }
172            8 => {
173                let e1 = SqlExpr::read(reader)?;
174                let e2 = SqlExpr::read(reader)?;
175                Ok(SqlExpr::NullIf(Box::new(e1), Box::new(e2)))
176            }
177            9 => {
178                let expr = SqlExpr::read(reader)?;
179                let negated = reader.read_boolean()?;
180                Ok(SqlExpr::IsNull {
181                    expr: Box::new(expr),
182                    negated,
183                })
184            }
185            10 => Ok(SqlExpr::OldColumn(reader.read_string()?.into_owned())),
186            11 => Ok(SqlExpr::ExcludedColumn(reader.read_string()?.into_owned())),
187            _ => Err(zerompk::Error::InvalidMarker(tag)),
188        }
189    }
190}