Skip to main content

nodedb_query/msgpack_scan/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Msgpack scalar writers for constructing response messages, aggregate results,
4//! projections, and other output data — all in raw msgpack without intermediary types.
5//!
6//! These are the building blocks for zero-serialization response construction.
7//! Follow the timeseries ingest pattern: `SqlValue → row_to_msgpack()` at ingress,
8//! raw msgpack throughout, `msgpack_to_json_string()` at outermost pgwire/HTTP layer.
9
10/// Write a msgpack map header.
11#[inline]
12pub fn write_map_header(buf: &mut Vec<u8>, len: usize) {
13    if len < 16 {
14        buf.push(0x80 | len as u8);
15    } else if len <= u16::MAX as usize {
16        buf.push(0xDE);
17        buf.extend_from_slice(&(len as u16).to_be_bytes());
18    } else {
19        buf.push(0xDF);
20        buf.extend_from_slice(&(len as u32).to_be_bytes());
21    }
22}
23
24/// Write a msgpack array header.
25#[inline]
26pub fn write_array_header(buf: &mut Vec<u8>, len: usize) {
27    if len < 16 {
28        buf.push(0x90 | len as u8);
29    } else if len <= u16::MAX as usize {
30        buf.push(0xDC);
31        buf.extend_from_slice(&(len as u16).to_be_bytes());
32    } else {
33        buf.push(0xDD);
34        buf.extend_from_slice(&(len as u32).to_be_bytes());
35    }
36}
37
38/// Write a msgpack string (header + UTF-8 bytes).
39#[inline]
40pub fn write_str(buf: &mut Vec<u8>, s: &str) {
41    let bytes = s.as_bytes();
42    let len = bytes.len();
43    if len < 32 {
44        buf.push(0xA0 | len as u8);
45    } else if len <= u8::MAX as usize {
46        buf.push(0xD9);
47        buf.push(len as u8);
48    } else if len <= u16::MAX as usize {
49        buf.push(0xDA);
50        buf.extend_from_slice(&(len as u16).to_be_bytes());
51    } else {
52        buf.push(0xDB);
53        buf.extend_from_slice(&(len as u32).to_be_bytes());
54    }
55    buf.extend_from_slice(bytes);
56}
57
58/// Write a msgpack integer using the most compact encoding.
59#[inline]
60pub fn write_i64(buf: &mut Vec<u8>, i: i64) {
61    if (0..=127).contains(&i) {
62        buf.push(i as u8);
63    } else if (-32..0).contains(&i) {
64        buf.push(i as u8); // negative fixint
65    } else if i >= i8::MIN as i64 && i <= i8::MAX as i64 {
66        buf.push(0xD0);
67        buf.push(i as i8 as u8);
68    } else if i >= i16::MIN as i64 && i <= i16::MAX as i64 {
69        buf.push(0xD1);
70        buf.extend_from_slice(&(i as i16).to_be_bytes());
71    } else if i >= i32::MIN as i64 && i <= i32::MAX as i64 {
72        buf.push(0xD2);
73        buf.extend_from_slice(&(i as i32).to_be_bytes());
74    } else {
75        buf.push(0xD3);
76        buf.extend_from_slice(&i.to_be_bytes());
77    }
78}
79
80/// Write a msgpack float64.
81#[inline]
82pub fn write_f64(buf: &mut Vec<u8>, f: f64) {
83    buf.push(0xCB);
84    buf.extend_from_slice(&f.to_be_bytes());
85}
86
87/// Write a msgpack boolean.
88#[inline]
89pub fn write_bool(buf: &mut Vec<u8>, b: bool) {
90    buf.push(if b { 0xC3 } else { 0xC2 });
91}
92
93/// Write a msgpack null.
94#[inline]
95pub fn write_null(buf: &mut Vec<u8>) {
96    buf.push(0xC0);
97}
98
99/// Write a msgpack binary blob (bin 8/16/32).
100#[inline]
101pub fn write_bin(buf: &mut Vec<u8>, data: &[u8]) {
102    let len = data.len();
103    if len <= u8::MAX as usize {
104        buf.push(0xC4);
105        buf.push(len as u8);
106    } else if len <= u16::MAX as usize {
107        buf.push(0xC5);
108        buf.extend_from_slice(&(len as u16).to_be_bytes());
109    } else {
110        buf.push(0xC6);
111        buf.extend_from_slice(&(len as u32).to_be_bytes());
112    }
113    buf.extend_from_slice(data);
114}
115
116/// Write a key-value pair into a msgpack map being built.
117///
118/// Caller is responsible for writing the map header first via [`write_map_header`].
119#[inline]
120pub fn write_kv_str(buf: &mut Vec<u8>, key: &str, value: &str) {
121    write_str(buf, key);
122    write_str(buf, value);
123}
124
125/// Write a key-value pair (string key, i64 value).
126#[inline]
127pub fn write_kv_i64(buf: &mut Vec<u8>, key: &str, value: i64) {
128    write_str(buf, key);
129    write_i64(buf, value);
130}
131
132/// Write a key-value pair (string key, f64 value).
133#[inline]
134pub fn write_kv_f64(buf: &mut Vec<u8>, key: &str, value: f64) {
135    write_str(buf, key);
136    write_f64(buf, value);
137}
138
139/// Write a key-value pair (string key, bool value).
140#[inline]
141pub fn write_kv_bool(buf: &mut Vec<u8>, key: &str, value: bool) {
142    write_str(buf, key);
143    write_bool(buf, value);
144}
145
146/// Write a key-value pair (string key, raw msgpack value bytes).
147///
148/// The value bytes must be valid msgpack. This enables splicing raw
149/// field bytes extracted via `msgpack_scan::extract_field` directly
150/// into a new map without decode/re-encode.
151#[inline]
152pub fn write_kv_raw(buf: &mut Vec<u8>, key: &str, raw_value: &[u8]) {
153    write_str(buf, key);
154    buf.extend_from_slice(raw_value);
155}
156
157/// Write a key-value pair (string key, null value).
158#[inline]
159pub fn write_kv_null(buf: &mut Vec<u8>, key: &str) {
160    write_str(buf, key);
161    write_null(buf);
162}
163
164/// Inject a string field into a msgpack map without full decode.
165///
166/// If `value` is a valid msgpack map that already contains `field_name`,
167/// the existing value is preserved and the input is returned unchanged —
168/// the caller's body is authoritative for its own primary key. If the map
169/// does not contain the field, it is prepended. If `value` is not a map,
170/// wraps as `{field_name: field_value, "value": raw}`.
171pub fn inject_str_field(value: &[u8], field_name: &str, field_value: &str) -> Vec<u8> {
172    if let Some((count, body_start)) = crate::msgpack_scan::reader::map_header(value, 0) {
173        if crate::msgpack_scan::extract_field(value, 0, field_name).is_some() {
174            return value.to_vec();
175        }
176        let mut buf = Vec::with_capacity(value.len() + field_name.len() + field_value.len() + 16);
177        write_map_header(&mut buf, count + 1);
178        write_kv_str(&mut buf, field_name, field_value);
179        buf.extend_from_slice(&value[body_start..]);
180        buf
181    } else {
182        let mut buf = Vec::with_capacity(value.len() + field_name.len() + field_value.len() + 16);
183        write_map_header(&mut buf, 2);
184        write_kv_str(&mut buf, field_name, field_value);
185        write_str(&mut buf, "value");
186        buf.extend_from_slice(value);
187        buf
188    }
189}
190
191/// Merge field updates into a msgpack map without full decode.
192///
193/// Takes a base msgpack map and a list of `(field_name, raw_msgpack_value)` updates.
194/// Returns a new msgpack map with updated fields replaced and new fields appended.
195/// Fields not in `updates` are copied from the original.
196pub fn merge_fields(base: &[u8], updates: &[(&str, &[u8])]) -> Vec<u8> {
197    use std::collections::HashSet;
198
199    let update_names: HashSet<&str> = updates.iter().map(|(k, _)| *k).collect();
200
201    let (count, body_start) = match crate::msgpack_scan::reader::map_header(base, 0) {
202        Some(v) => v,
203        None => {
204            // Not a valid map — build from updates only.
205            let mut buf = Vec::with_capacity(updates.len() * 32);
206            write_map_header(&mut buf, updates.len());
207            for (k, v) in updates {
208                write_str(&mut buf, k);
209                buf.extend_from_slice(v);
210            }
211            return buf;
212        }
213    };
214
215    // Count fields: existing (not overwritten) + updates.
216    let mut kept = 0usize;
217    let mut pos = body_start;
218    for _ in 0..count {
219        let key = crate::msgpack_scan::reader::read_str(base, pos);
220        pos = match crate::msgpack_scan::reader::skip_value(base, pos) {
221            Some(p) => p,
222            None => break,
223        };
224        pos = match crate::msgpack_scan::reader::skip_value(base, pos) {
225            Some(p) => p,
226            None => break,
227        };
228        if let Some(k) = &key {
229            if !update_names.contains(&k[..]) {
230                kept += 1;
231            }
232        } else {
233            kept += 1;
234        }
235    }
236
237    let new_count = kept + updates.len();
238    let mut buf = Vec::with_capacity(
239        base.len()
240            + updates
241                .iter()
242                .map(|(k, v)| k.len() + v.len() + 4)
243                .sum::<usize>(),
244    );
245    write_map_header(&mut buf, new_count);
246
247    // Copy non-overwritten fields from base.
248    pos = body_start;
249    for _ in 0..count {
250        let key_start = pos;
251        let key = crate::msgpack_scan::reader::read_str(base, pos);
252        pos = match crate::msgpack_scan::reader::skip_value(base, pos) {
253            Some(p) => p,
254            None => break,
255        };
256        pos = match crate::msgpack_scan::reader::skip_value(base, pos) {
257            Some(p) => p,
258            None => break,
259        };
260        if let Some(k) = &key
261            && update_names.contains(&k[..])
262        {
263            continue; // Will be written from updates.
264        }
265        // Copy key + value bytes.
266        buf.extend_from_slice(&base[key_start..pos]);
267    }
268
269    // Write updated/new fields.
270    for (k, v) in updates {
271        write_str(&mut buf, k);
272        buf.extend_from_slice(v);
273    }
274
275    buf
276}
277
278/// Build a simple msgpack map from string key-value pairs.
279pub fn build_str_map(pairs: &[(&str, &str)]) -> Vec<u8> {
280    let mut buf = Vec::with_capacity(pairs.len() * 32);
281    write_map_header(&mut buf, pairs.len());
282    for (k, v) in pairs {
283        write_kv_str(&mut buf, k, v);
284    }
285    buf
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291
292    #[test]
293    fn roundtrip_map() {
294        let mut buf = Vec::new();
295        write_map_header(&mut buf, 2);
296        write_kv_str(&mut buf, "name", "alice");
297        write_kv_i64(&mut buf, "age", 30);
298
299        // Verify it's valid msgpack by reading back via reader
300        let (count, mut pos) = crate::msgpack_scan::reader::map_header(&buf, 0).unwrap();
301        assert_eq!(count, 2);
302
303        let k1 = crate::msgpack_scan::reader::read_str(&buf, pos).unwrap();
304        assert_eq!(k1, "name");
305        pos = crate::msgpack_scan::reader::skip_value(&buf, pos).unwrap();
306        let v1 = crate::msgpack_scan::reader::read_str(&buf, pos).unwrap();
307        assert_eq!(v1, "alice");
308        pos = crate::msgpack_scan::reader::skip_value(&buf, pos).unwrap();
309
310        let k2 = crate::msgpack_scan::reader::read_str(&buf, pos).unwrap();
311        assert_eq!(k2, "age");
312        pos = crate::msgpack_scan::reader::skip_value(&buf, pos).unwrap();
313        let v2 = crate::msgpack_scan::reader::read_i64(&buf, pos).unwrap();
314        assert_eq!(v2, 30);
315    }
316
317    #[test]
318    fn write_kv_raw_splices_correctly() {
319        // Build a value as raw msgpack
320        let mut val_buf = Vec::new();
321        write_str(&mut val_buf, "hello");
322
323        // Build a map with raw splice
324        let mut buf = Vec::new();
325        write_map_header(&mut buf, 1);
326        write_kv_raw(&mut buf, "greeting", &val_buf);
327
328        // Verify
329        let field = crate::msgpack_scan::field::extract_field(&buf, 0, "greeting").unwrap();
330        let s = crate::msgpack_scan::reader::read_str(&buf, field.0).unwrap();
331        assert_eq!(s, "hello");
332    }
333
334    #[test]
335    fn compact_integer_encoding() {
336        // Positive fixint
337        let mut buf = Vec::new();
338        write_i64(&mut buf, 42);
339        assert_eq!(buf, vec![42]);
340
341        // Negative fixint
342        buf.clear();
343        write_i64(&mut buf, -1);
344        assert_eq!(buf, vec![0xFF]);
345
346        // int8
347        buf.clear();
348        write_i64(&mut buf, -100);
349        assert_eq!(buf, vec![0xD0, (-100i8) as u8]);
350    }
351}