Skip to main content

nodedb_query/msgpack_scan/
writer.rs

1//! Msgpack scalar writers for constructing response messages, aggregate results,
2//! projections, and other output data — all in raw msgpack without intermediary types.
3//!
4//! These are the building blocks for zero-serialization response construction.
5//! Follow the timeseries ingest pattern: `SqlValue → row_to_msgpack()` at ingress,
6//! raw msgpack throughout, `msgpack_to_json_string()` at outermost pgwire/HTTP layer.
7
8/// Write a msgpack map header.
9#[inline]
10pub fn write_map_header(buf: &mut Vec<u8>, len: usize) {
11    if len < 16 {
12        buf.push(0x80 | len as u8);
13    } else if len <= u16::MAX as usize {
14        buf.push(0xDE);
15        buf.extend_from_slice(&(len as u16).to_be_bytes());
16    } else {
17        buf.push(0xDF);
18        buf.extend_from_slice(&(len as u32).to_be_bytes());
19    }
20}
21
22/// Write a msgpack array header.
23#[inline]
24pub fn write_array_header(buf: &mut Vec<u8>, len: usize) {
25    if len < 16 {
26        buf.push(0x90 | len as u8);
27    } else if len <= u16::MAX as usize {
28        buf.push(0xDC);
29        buf.extend_from_slice(&(len as u16).to_be_bytes());
30    } else {
31        buf.push(0xDD);
32        buf.extend_from_slice(&(len as u32).to_be_bytes());
33    }
34}
35
36/// Write a msgpack string (header + UTF-8 bytes).
37#[inline]
38pub fn write_str(buf: &mut Vec<u8>, s: &str) {
39    let bytes = s.as_bytes();
40    let len = bytes.len();
41    if len < 32 {
42        buf.push(0xA0 | len as u8);
43    } else if len <= u8::MAX as usize {
44        buf.push(0xD9);
45        buf.push(len as u8);
46    } else if len <= u16::MAX as usize {
47        buf.push(0xDA);
48        buf.extend_from_slice(&(len as u16).to_be_bytes());
49    } else {
50        buf.push(0xDB);
51        buf.extend_from_slice(&(len as u32).to_be_bytes());
52    }
53    buf.extend_from_slice(bytes);
54}
55
56/// Write a msgpack integer using the most compact encoding.
57#[inline]
58pub fn write_i64(buf: &mut Vec<u8>, i: i64) {
59    if (0..=127).contains(&i) {
60        buf.push(i as u8);
61    } else if (-32..0).contains(&i) {
62        buf.push(i as u8); // negative fixint
63    } else if i >= i8::MIN as i64 && i <= i8::MAX as i64 {
64        buf.push(0xD0);
65        buf.push(i as i8 as u8);
66    } else if i >= i16::MIN as i64 && i <= i16::MAX as i64 {
67        buf.push(0xD1);
68        buf.extend_from_slice(&(i as i16).to_be_bytes());
69    } else if i >= i32::MIN as i64 && i <= i32::MAX as i64 {
70        buf.push(0xD2);
71        buf.extend_from_slice(&(i as i32).to_be_bytes());
72    } else {
73        buf.push(0xD3);
74        buf.extend_from_slice(&i.to_be_bytes());
75    }
76}
77
78/// Write a msgpack float64.
79#[inline]
80pub fn write_f64(buf: &mut Vec<u8>, f: f64) {
81    buf.push(0xCB);
82    buf.extend_from_slice(&f.to_be_bytes());
83}
84
85/// Write a msgpack boolean.
86#[inline]
87pub fn write_bool(buf: &mut Vec<u8>, b: bool) {
88    buf.push(if b { 0xC3 } else { 0xC2 });
89}
90
91/// Write a msgpack null.
92#[inline]
93pub fn write_null(buf: &mut Vec<u8>) {
94    buf.push(0xC0);
95}
96
97/// Write a msgpack binary blob (bin 8/16/32).
98#[inline]
99pub fn write_bin(buf: &mut Vec<u8>, data: &[u8]) {
100    let len = data.len();
101    if len <= u8::MAX as usize {
102        buf.push(0xC4);
103        buf.push(len as u8);
104    } else if len <= u16::MAX as usize {
105        buf.push(0xC5);
106        buf.extend_from_slice(&(len as u16).to_be_bytes());
107    } else {
108        buf.push(0xC6);
109        buf.extend_from_slice(&(len as u32).to_be_bytes());
110    }
111    buf.extend_from_slice(data);
112}
113
114/// Write a key-value pair into a msgpack map being built.
115///
116/// Caller is responsible for writing the map header first via [`write_map_header`].
117#[inline]
118pub fn write_kv_str(buf: &mut Vec<u8>, key: &str, value: &str) {
119    write_str(buf, key);
120    write_str(buf, value);
121}
122
123/// Write a key-value pair (string key, i64 value).
124#[inline]
125pub fn write_kv_i64(buf: &mut Vec<u8>, key: &str, value: i64) {
126    write_str(buf, key);
127    write_i64(buf, value);
128}
129
130/// Write a key-value pair (string key, f64 value).
131#[inline]
132pub fn write_kv_f64(buf: &mut Vec<u8>, key: &str, value: f64) {
133    write_str(buf, key);
134    write_f64(buf, value);
135}
136
137/// Write a key-value pair (string key, bool value).
138#[inline]
139pub fn write_kv_bool(buf: &mut Vec<u8>, key: &str, value: bool) {
140    write_str(buf, key);
141    write_bool(buf, value);
142}
143
144/// Write a key-value pair (string key, raw msgpack value bytes).
145///
146/// The value bytes must be valid msgpack. This enables splicing raw
147/// field bytes extracted via `msgpack_scan::extract_field` directly
148/// into a new map without decode/re-encode.
149#[inline]
150pub fn write_kv_raw(buf: &mut Vec<u8>, key: &str, raw_value: &[u8]) {
151    write_str(buf, key);
152    buf.extend_from_slice(raw_value);
153}
154
155/// Write a key-value pair (string key, null value).
156#[inline]
157pub fn write_kv_null(buf: &mut Vec<u8>, key: &str) {
158    write_str(buf, key);
159    write_null(buf);
160}
161
162/// Inject a string field into a msgpack map without full decode.
163///
164/// If `value` is a valid msgpack map, returns a new map with the field prepended.
165/// If `value` is not a map, wraps as `{field_name: field_value, "value": raw}`.
166pub fn inject_str_field(value: &[u8], field_name: &str, field_value: &str) -> Vec<u8> {
167    if let Some((count, body_start)) = crate::msgpack_scan::reader::map_header(value, 0) {
168        let mut buf = Vec::with_capacity(value.len() + field_name.len() + field_value.len() + 16);
169        write_map_header(&mut buf, count + 1);
170        write_kv_str(&mut buf, field_name, field_value);
171        buf.extend_from_slice(&value[body_start..]);
172        buf
173    } else {
174        let mut buf = Vec::with_capacity(value.len() + field_name.len() + field_value.len() + 16);
175        write_map_header(&mut buf, 2);
176        write_kv_str(&mut buf, field_name, field_value);
177        write_str(&mut buf, "value");
178        buf.extend_from_slice(value);
179        buf
180    }
181}
182
183/// Merge field updates into a msgpack map without full decode.
184///
185/// Takes a base msgpack map and a list of `(field_name, raw_msgpack_value)` updates.
186/// Returns a new msgpack map with updated fields replaced and new fields appended.
187/// Fields not in `updates` are copied from the original.
188pub fn merge_fields(base: &[u8], updates: &[(&str, &[u8])]) -> Vec<u8> {
189    use std::collections::HashSet;
190
191    let update_names: HashSet<&str> = updates.iter().map(|(k, _)| *k).collect();
192
193    let (count, body_start) = match crate::msgpack_scan::reader::map_header(base, 0) {
194        Some(v) => v,
195        None => {
196            // Not a valid map — build from updates only.
197            let mut buf = Vec::with_capacity(updates.len() * 32);
198            write_map_header(&mut buf, updates.len());
199            for (k, v) in updates {
200                write_str(&mut buf, k);
201                buf.extend_from_slice(v);
202            }
203            return buf;
204        }
205    };
206
207    // Count fields: existing (not overwritten) + updates.
208    let mut kept = 0usize;
209    let mut pos = body_start;
210    for _ in 0..count {
211        let key = crate::msgpack_scan::reader::read_str(base, pos);
212        pos = match crate::msgpack_scan::reader::skip_value(base, pos) {
213            Some(p) => p,
214            None => break,
215        };
216        pos = match crate::msgpack_scan::reader::skip_value(base, pos) {
217            Some(p) => p,
218            None => break,
219        };
220        if let Some(k) = &key {
221            if !update_names.contains(&k[..]) {
222                kept += 1;
223            }
224        } else {
225            kept += 1;
226        }
227    }
228
229    let new_count = kept + updates.len();
230    let mut buf = Vec::with_capacity(
231        base.len()
232            + updates
233                .iter()
234                .map(|(k, v)| k.len() + v.len() + 4)
235                .sum::<usize>(),
236    );
237    write_map_header(&mut buf, new_count);
238
239    // Copy non-overwritten fields from base.
240    pos = body_start;
241    for _ in 0..count {
242        let key_start = pos;
243        let key = crate::msgpack_scan::reader::read_str(base, pos);
244        pos = match crate::msgpack_scan::reader::skip_value(base, pos) {
245            Some(p) => p,
246            None => break,
247        };
248        pos = match crate::msgpack_scan::reader::skip_value(base, pos) {
249            Some(p) => p,
250            None => break,
251        };
252        if let Some(k) = &key
253            && update_names.contains(&k[..])
254        {
255            continue; // Will be written from updates.
256        }
257        // Copy key + value bytes.
258        buf.extend_from_slice(&base[key_start..pos]);
259    }
260
261    // Write updated/new fields.
262    for (k, v) in updates {
263        write_str(&mut buf, k);
264        buf.extend_from_slice(v);
265    }
266
267    buf
268}
269
270/// Build a simple msgpack map from string key-value pairs.
271pub fn build_str_map(pairs: &[(&str, &str)]) -> Vec<u8> {
272    let mut buf = Vec::with_capacity(pairs.len() * 32);
273    write_map_header(&mut buf, pairs.len());
274    for (k, v) in pairs {
275        write_kv_str(&mut buf, k, v);
276    }
277    buf
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283
284    #[test]
285    fn roundtrip_map() {
286        let mut buf = Vec::new();
287        write_map_header(&mut buf, 2);
288        write_kv_str(&mut buf, "name", "alice");
289        write_kv_i64(&mut buf, "age", 30);
290
291        // Verify it's valid msgpack by reading back via reader
292        let (count, mut pos) = crate::msgpack_scan::reader::map_header(&buf, 0).unwrap();
293        assert_eq!(count, 2);
294
295        let k1 = crate::msgpack_scan::reader::read_str(&buf, pos).unwrap();
296        assert_eq!(k1, "name");
297        pos = crate::msgpack_scan::reader::skip_value(&buf, pos).unwrap();
298        let v1 = crate::msgpack_scan::reader::read_str(&buf, pos).unwrap();
299        assert_eq!(v1, "alice");
300        pos = crate::msgpack_scan::reader::skip_value(&buf, pos).unwrap();
301
302        let k2 = crate::msgpack_scan::reader::read_str(&buf, pos).unwrap();
303        assert_eq!(k2, "age");
304        pos = crate::msgpack_scan::reader::skip_value(&buf, pos).unwrap();
305        let v2 = crate::msgpack_scan::reader::read_i64(&buf, pos).unwrap();
306        assert_eq!(v2, 30);
307    }
308
309    #[test]
310    fn write_kv_raw_splices_correctly() {
311        // Build a value as raw msgpack
312        let mut val_buf = Vec::new();
313        write_str(&mut val_buf, "hello");
314
315        // Build a map with raw splice
316        let mut buf = Vec::new();
317        write_map_header(&mut buf, 1);
318        write_kv_raw(&mut buf, "greeting", &val_buf);
319
320        // Verify
321        let field = crate::msgpack_scan::field::extract_field(&buf, 0, "greeting").unwrap();
322        let s = crate::msgpack_scan::reader::read_str(&buf, field.0).unwrap();
323        assert_eq!(s, "hello");
324    }
325
326    #[test]
327    fn compact_integer_encoding() {
328        // Positive fixint
329        let mut buf = Vec::new();
330        write_i64(&mut buf, 42);
331        assert_eq!(buf, vec![42]);
332
333        // Negative fixint
334        buf.clear();
335        write_i64(&mut buf, -1);
336        assert_eq!(buf, vec![0xFF]);
337
338        // int8
339        buf.clear();
340        write_i64(&mut buf, -100);
341        assert_eq!(buf, vec![0xD0, (-100i8) as u8]);
342    }
343}