Skip to main content

nodedb_query/expr/
codec.rs

1//! Manual zerompk wire format for [`SqlExpr`].
2//!
3//! Each variant encodes as an array `[tag_u8, field1, field2, ...]`. Tags
4//! are stable and MUST NOT be renumbered — they are on-wire values in
5//! physical-plan envelopes. `Value`, `BinaryOp`, and `CastType` implement
6//! zerompk natively so they nest transparently.
7//!
8//! Tags: Column=0, Literal=1, BinaryOp=2, Negate=3, Function=4, Cast=5,
9//!       Case=6, Coalesce=7, NullIf=8, IsNull=9, OldColumn=10.
10
11use nodedb_types::Value;
12
13use super::types::{BinaryOp, CastType, SqlExpr};
14
15impl zerompk::ToMessagePack for SqlExpr {
16    fn write<W: zerompk::Write>(&self, writer: &mut W) -> zerompk::Result<()> {
17        match self {
18            SqlExpr::Column(s) => {
19                writer.write_array_len(2)?;
20                writer.write_u8(0)?;
21                writer.write_string(s)
22            }
23            SqlExpr::Literal(v) => {
24                writer.write_array_len(2)?;
25                writer.write_u8(1)?;
26                v.write(writer)
27            }
28            SqlExpr::BinaryOp { left, op, right } => {
29                writer.write_array_len(4)?;
30                writer.write_u8(2)?;
31                left.write(writer)?;
32                op.write(writer)?;
33                right.write(writer)
34            }
35            SqlExpr::Negate(inner) => {
36                writer.write_array_len(2)?;
37                writer.write_u8(3)?;
38                inner.write(writer)
39            }
40            SqlExpr::Function { name, args } => {
41                writer.write_array_len(3)?;
42                writer.write_u8(4)?;
43                writer.write_string(name)?;
44                args.write(writer)
45            }
46            SqlExpr::Cast { expr, to_type } => {
47                writer.write_array_len(3)?;
48                writer.write_u8(5)?;
49                expr.write(writer)?;
50                to_type.write(writer)
51            }
52            SqlExpr::Case {
53                operand,
54                when_thens,
55                else_expr,
56            } => {
57                writer.write_array_len(4)?;
58                writer.write_u8(6)?;
59                operand.write(writer)?;
60                writer.write_array_len(when_thens.len())?;
61                for (cond, val) in when_thens {
62                    writer.write_array_len(2)?;
63                    cond.write(writer)?;
64                    val.write(writer)?;
65                }
66                else_expr.write(writer)
67            }
68            SqlExpr::Coalesce(exprs) => {
69                writer.write_array_len(2)?;
70                writer.write_u8(7)?;
71                exprs.write(writer)
72            }
73            SqlExpr::NullIf(e1, e2) => {
74                writer.write_array_len(3)?;
75                writer.write_u8(8)?;
76                e1.write(writer)?;
77                e2.write(writer)
78            }
79            SqlExpr::IsNull { expr, negated } => {
80                writer.write_array_len(3)?;
81                writer.write_u8(9)?;
82                expr.write(writer)?;
83                writer.write_boolean(*negated)
84            }
85            SqlExpr::OldColumn(s) => {
86                writer.write_array_len(2)?;
87                writer.write_u8(10)?;
88                writer.write_string(s)
89            }
90        }
91    }
92}
93
94impl<'a> zerompk::FromMessagePack<'a> for SqlExpr {
95    fn read<R: zerompk::Read<'a>>(reader: &mut R) -> zerompk::Result<Self> {
96        let len = reader.read_array_len()?;
97        if len == 0 {
98            return Err(zerompk::Error::ArrayLengthMismatch {
99                expected: 1,
100                actual: 0,
101            });
102        }
103        let tag = reader.read_u8()?;
104        match tag {
105            0 => Ok(SqlExpr::Column(reader.read_string()?.into_owned())),
106            1 => {
107                let v = Value::read(reader)?;
108                Ok(SqlExpr::Literal(v))
109            }
110            2 => {
111                let left = SqlExpr::read(reader)?;
112                let op = BinaryOp::read(reader)?;
113                let right = SqlExpr::read(reader)?;
114                Ok(SqlExpr::BinaryOp {
115                    left: Box::new(left),
116                    op,
117                    right: Box::new(right),
118                })
119            }
120            3 => {
121                let inner = SqlExpr::read(reader)?;
122                Ok(SqlExpr::Negate(Box::new(inner)))
123            }
124            4 => {
125                let name = reader.read_string()?.into_owned();
126                let args = Vec::<SqlExpr>::read(reader)?;
127                Ok(SqlExpr::Function { name, args })
128            }
129            5 => {
130                let expr = SqlExpr::read(reader)?;
131                let to_type = CastType::read(reader)?;
132                Ok(SqlExpr::Cast {
133                    expr: Box::new(expr),
134                    to_type,
135                })
136            }
137            6 => {
138                let operand = Option::<Box<SqlExpr>>::read(reader)?;
139                let wt_len = reader.read_array_len()?;
140                let mut when_thens = Vec::with_capacity(wt_len);
141                for _ in 0..wt_len {
142                    let pair_len = reader.read_array_len()?;
143                    if pair_len != 2 {
144                        return Err(zerompk::Error::ArrayLengthMismatch {
145                            expected: 2,
146                            actual: pair_len,
147                        });
148                    }
149                    let cond = SqlExpr::read(reader)?;
150                    let val = SqlExpr::read(reader)?;
151                    when_thens.push((cond, val));
152                }
153                let else_expr = Option::<Box<SqlExpr>>::read(reader)?;
154                Ok(SqlExpr::Case {
155                    operand,
156                    when_thens,
157                    else_expr,
158                })
159            }
160            7 => {
161                let exprs = Vec::<SqlExpr>::read(reader)?;
162                Ok(SqlExpr::Coalesce(exprs))
163            }
164            8 => {
165                let e1 = SqlExpr::read(reader)?;
166                let e2 = SqlExpr::read(reader)?;
167                Ok(SqlExpr::NullIf(Box::new(e1), Box::new(e2)))
168            }
169            9 => {
170                let expr = SqlExpr::read(reader)?;
171                let negated = reader.read_boolean()?;
172                Ok(SqlExpr::IsNull {
173                    expr: Box::new(expr),
174                    negated,
175                })
176            }
177            10 => Ok(SqlExpr::OldColumn(reader.read_string()?.into_owned())),
178            _ => Err(zerompk::Error::InvalidMarker(tag)),
179        }
180    }
181}