Skip to main content

nodedb_query/expr/
codec.rs

1//! Manual zerompk wire format for [`SqlExpr`].
2//!
3//! Each variant encodes as an array `[tag_u8, field1, field2, ...]`. Tags
4//! are stable and MUST NOT be renumbered — they are on-wire values in
5//! physical-plan envelopes. `Value`, `BinaryOp`, and `CastType` implement
6//! zerompk natively so they nest transparently.
7//!
8//! Tags: Column=0, Literal=1, BinaryOp=2, Negate=3, Function=4, Cast=5,
9//!       Case=6, Coalesce=7, NullIf=8, IsNull=9, OldColumn=10,
10//!       ExcludedColumn=11.
11
12use nodedb_types::Value;
13
14use super::types::{BinaryOp, CastType, SqlExpr};
15
16impl zerompk::ToMessagePack for SqlExpr {
17    fn write<W: zerompk::Write>(&self, writer: &mut W) -> zerompk::Result<()> {
18        match self {
19            SqlExpr::Column(s) => {
20                writer.write_array_len(2)?;
21                writer.write_u8(0)?;
22                writer.write_string(s)
23            }
24            SqlExpr::Literal(v) => {
25                writer.write_array_len(2)?;
26                writer.write_u8(1)?;
27                v.write(writer)
28            }
29            SqlExpr::BinaryOp { left, op, right } => {
30                writer.write_array_len(4)?;
31                writer.write_u8(2)?;
32                left.write(writer)?;
33                op.write(writer)?;
34                right.write(writer)
35            }
36            SqlExpr::Negate(inner) => {
37                writer.write_array_len(2)?;
38                writer.write_u8(3)?;
39                inner.write(writer)
40            }
41            SqlExpr::Function { name, args } => {
42                writer.write_array_len(3)?;
43                writer.write_u8(4)?;
44                writer.write_string(name)?;
45                args.write(writer)
46            }
47            SqlExpr::Cast { expr, to_type } => {
48                writer.write_array_len(3)?;
49                writer.write_u8(5)?;
50                expr.write(writer)?;
51                to_type.write(writer)
52            }
53            SqlExpr::Case {
54                operand,
55                when_thens,
56                else_expr,
57            } => {
58                writer.write_array_len(4)?;
59                writer.write_u8(6)?;
60                operand.write(writer)?;
61                writer.write_array_len(when_thens.len())?;
62                for (cond, val) in when_thens {
63                    writer.write_array_len(2)?;
64                    cond.write(writer)?;
65                    val.write(writer)?;
66                }
67                else_expr.write(writer)
68            }
69            SqlExpr::Coalesce(exprs) => {
70                writer.write_array_len(2)?;
71                writer.write_u8(7)?;
72                exprs.write(writer)
73            }
74            SqlExpr::NullIf(e1, e2) => {
75                writer.write_array_len(3)?;
76                writer.write_u8(8)?;
77                e1.write(writer)?;
78                e2.write(writer)
79            }
80            SqlExpr::IsNull { expr, negated } => {
81                writer.write_array_len(3)?;
82                writer.write_u8(9)?;
83                expr.write(writer)?;
84                writer.write_boolean(*negated)
85            }
86            SqlExpr::OldColumn(s) => {
87                writer.write_array_len(2)?;
88                writer.write_u8(10)?;
89                writer.write_string(s)
90            }
91            SqlExpr::ExcludedColumn(s) => {
92                writer.write_array_len(2)?;
93                writer.write_u8(11)?;
94                writer.write_string(s)
95            }
96        }
97    }
98}
99
100impl<'a> zerompk::FromMessagePack<'a> for SqlExpr {
101    fn read<R: zerompk::Read<'a>>(reader: &mut R) -> zerompk::Result<Self> {
102        let len = reader.read_array_len()?;
103        if len == 0 {
104            return Err(zerompk::Error::ArrayLengthMismatch {
105                expected: 1,
106                actual: 0,
107            });
108        }
109        let tag = reader.read_u8()?;
110        match tag {
111            0 => Ok(SqlExpr::Column(reader.read_string()?.into_owned())),
112            1 => {
113                let v = Value::read(reader)?;
114                Ok(SqlExpr::Literal(v))
115            }
116            2 => {
117                let left = SqlExpr::read(reader)?;
118                let op = BinaryOp::read(reader)?;
119                let right = SqlExpr::read(reader)?;
120                Ok(SqlExpr::BinaryOp {
121                    left: Box::new(left),
122                    op,
123                    right: Box::new(right),
124                })
125            }
126            3 => {
127                let inner = SqlExpr::read(reader)?;
128                Ok(SqlExpr::Negate(Box::new(inner)))
129            }
130            4 => {
131                let name = reader.read_string()?.into_owned();
132                let args = Vec::<SqlExpr>::read(reader)?;
133                Ok(SqlExpr::Function { name, args })
134            }
135            5 => {
136                let expr = SqlExpr::read(reader)?;
137                let to_type = CastType::read(reader)?;
138                Ok(SqlExpr::Cast {
139                    expr: Box::new(expr),
140                    to_type,
141                })
142            }
143            6 => {
144                let operand = Option::<Box<SqlExpr>>::read(reader)?;
145                let wt_len = reader.read_array_len()?;
146                let mut when_thens = Vec::with_capacity(wt_len);
147                for _ in 0..wt_len {
148                    let pair_len = reader.read_array_len()?;
149                    if pair_len != 2 {
150                        return Err(zerompk::Error::ArrayLengthMismatch {
151                            expected: 2,
152                            actual: pair_len,
153                        });
154                    }
155                    let cond = SqlExpr::read(reader)?;
156                    let val = SqlExpr::read(reader)?;
157                    when_thens.push((cond, val));
158                }
159                let else_expr = Option::<Box<SqlExpr>>::read(reader)?;
160                Ok(SqlExpr::Case {
161                    operand,
162                    when_thens,
163                    else_expr,
164                })
165            }
166            7 => {
167                let exprs = Vec::<SqlExpr>::read(reader)?;
168                Ok(SqlExpr::Coalesce(exprs))
169            }
170            8 => {
171                let e1 = SqlExpr::read(reader)?;
172                let e2 = SqlExpr::read(reader)?;
173                Ok(SqlExpr::NullIf(Box::new(e1), Box::new(e2)))
174            }
175            9 => {
176                let expr = SqlExpr::read(reader)?;
177                let negated = reader.read_boolean()?;
178                Ok(SqlExpr::IsNull {
179                    expr: Box::new(expr),
180                    negated,
181                })
182            }
183            10 => Ok(SqlExpr::OldColumn(reader.read_string()?.into_owned())),
184            11 => Ok(SqlExpr::ExcludedColumn(reader.read_string()?.into_owned())),
185            _ => Err(zerompk::Error::InvalidMarker(tag)),
186        }
187    }
188}