Skip to main content

sif_parser/
packed.rs

1// SIF Packed Binary Encoding v1
2//
3// Hybrid framing: UTF-8 text directives + binary record frames.
4// Built from SIF-PACKED-SPEC.md specification.
5//
6// Frame markers 0x00-0x0F distinguish binary from text (0x23+ for #directives).
7// All multi-byte values are little-endian. No alignment padding.
8
9use crate::error::{err, ErrorKind, Result};
10use crate::types::*;
11
12// ── Varint / Zigzag (LEB128) ────────────────────────────────────────
13
14/// Encode an unsigned integer as LEB128 varint.
15pub fn encode_varint(mut value: u64, out: &mut Vec<u8>) {
16    loop {
17        let mut byte = (value & 0x7F) as u8;
18        value >>= 7;
19        if value != 0 {
20            byte |= 0x80;
21        }
22        out.push(byte);
23        if value == 0 {
24            break;
25        }
26    }
27}
28
29/// Decode an unsigned LEB128 varint from a byte slice.
30/// Returns (value, bytes_consumed).
31pub fn decode_varint(data: &[u8]) -> Result<(u64, usize)> {
32    let mut result: u64 = 0;
33    let mut shift = 0u32;
34    for (i, &byte) in data.iter().enumerate() {
35        if i >= 10 {
36            return Err(err(ErrorKind::InvalidRecord, 0, "varint exceeds 10 bytes"));
37        }
38        result |= ((byte & 0x7F) as u64) << shift;
39        if byte & 0x80 == 0 {
40            return Ok((result, i + 1));
41        }
42        shift += 7;
43    }
44    Err(err(ErrorKind::UnexpectedEof, 0, "unterminated varint"))
45}
46
47/// Zigzag encode a signed i64 to unsigned u64.
48pub fn zigzag_encode(n: i64) -> u64 {
49    ((n << 1) ^ (n >> 63)) as u64
50}
51
52/// Zigzag decode an unsigned u64 to signed i64.
53pub fn zigzag_decode(n: u64) -> i64 {
54    ((n >> 1) as i64) ^ -((n & 1) as i64)
55}
56
57// ── Frame Markers ───────────────────────────────────────────────────
58
59/// Frame marker bytes per the spec.
60pub const FRAME_INSERT: u8 = 0x00;
61pub const FRAME_UPDATE: u8 = 0x01;
62pub const FRAME_DELETE: u8 = 0x02;
63pub const FRAME_BATCH_INSERT: u8 = 0x03;
64pub const FRAME_BATCH_UPDATE: u8 = 0x04;
65
66// ── Type Tags for `any` type ────────────────────────────────────────
67
68const TAG_NULL: u8 = 0x00;
69const TAG_BOOL: u8 = 0x01;
70const TAG_INT: u8 = 0x02;
71const TAG_UINT: u8 = 0x03;
72const TAG_FLOAT: u8 = 0x04;
73const TAG_STR: u8 = 0x05;
74const TAG_DATE: u8 = 0x06;
75const TAG_DATETIME: u8 = 0x07;
76const TAG_DURATION: u8 = 0x08;
77const TAG_BYTES: u8 = 0x09;
78const TAG_ARRAY: u8 = 0x0A;
79const TAG_MAP: u8 = 0x0B;
80
81// ── Width Modifier Parsing ──────────────────────────────────────────
82
83/// Packed width override from field modifiers.
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub enum PackedWidth {
86    Default,
87    U8,
88    U16,
89    U32,
90    U64,
91    I8,
92    I16,
93    I32,
94    I64,
95    F32,
96    F64,
97}
98
99impl PackedWidth {
100    /// Parse from a field's modifiers.
101    pub fn from_field(field: &FieldDef) -> Self {
102        for m in &field.modifiers {
103            if m.name == "packed" {
104                if let Some(ref v) = m.value {
105                    return match v.as_str() {
106                        "u8" => PackedWidth::U8,
107                        "u16" => PackedWidth::U16,
108                        "u32" => PackedWidth::U32,
109                        "u64" => PackedWidth::U64,
110                        "i8" => PackedWidth::I8,
111                        "i16" => PackedWidth::I16,
112                        "i32" => PackedWidth::I32,
113                        "i64" => PackedWidth::I64,
114                        "f32" => PackedWidth::F32,
115                        "f64" => PackedWidth::F64,
116                        _ => PackedWidth::Default,
117                    };
118                }
119            }
120        }
121        PackedWidth::Default
122    }
123
124    /// Fixed byte size, or None for default (variable-width).
125    pub fn fixed_size(&self) -> Option<usize> {
126        match self {
127            PackedWidth::Default => None,
128            PackedWidth::U8 | PackedWidth::I8 => Some(1),
129            PackedWidth::U16 | PackedWidth::I16 => Some(2),
130            PackedWidth::U32 | PackedWidth::I32 | PackedWidth::F32 => Some(4),
131            PackedWidth::U64 | PackedWidth::I64 | PackedWidth::F64 => Some(8),
132        }
133    }
134}
135
136// ── Encoder ─────────────────────────────────────────────────────────
137
138/// Encode a single SIF value to packed binary.
139pub fn encode_value(value: &Value, field_type: &Type, width: PackedWidth, out: &mut Vec<u8>) {
140    match field_type {
141        Type::Nullable(inner) => {
142            match value {
143                Value::Null => out.push(0x00),
144                _ => {
145                    out.push(0x01);
146                    encode_value(value, inner, width, out);
147                }
148            }
149        }
150        Type::Bool => {
151            match value {
152                Value::Bool(true) => out.push(0x01),
153                Value::Bool(false) => out.push(0x00),
154                _ => out.push(0x00),
155            }
156        }
157        Type::Uint => encode_uint(value, width, out),
158        Type::Int => encode_int(value, width, out),
159        Type::Float => encode_float(value, width, out),
160        Type::Str => encode_str(value, out),
161        Type::Date => encode_date(value, out),
162        Type::DateTime => encode_datetime(value, out),
163        Type::Duration => encode_duration(value, out),
164        Type::Bytes => encode_bytes_value(value, out),
165        Type::Enum(variants) => encode_enum(value, variants, width, out),
166        Type::Null => {} // zero bytes
167        Type::Any => encode_any(value, out),
168        Type::Map => encode_map(value, out),
169        Type::Array(elem_ty) => encode_array(value, elem_ty, out),
170    }
171}
172
173fn encode_uint(value: &Value, width: PackedWidth, out: &mut Vec<u8>) {
174    let n = match value {
175        Value::Uint(n) => *n,
176        Value::Int(n) => *n as u64,
177        _ => 0,
178    };
179    match width {
180        PackedWidth::U8 => out.push(n as u8),
181        PackedWidth::U16 => out.extend_from_slice(&(n as u16).to_le_bytes()),
182        PackedWidth::U32 => out.extend_from_slice(&(n as u32).to_le_bytes()),
183        PackedWidth::U64 => out.extend_from_slice(&n.to_le_bytes()),
184        _ => encode_varint(n, out),
185    }
186}
187
188fn encode_int(value: &Value, width: PackedWidth, out: &mut Vec<u8>) {
189    let n = match value {
190        Value::Int(n) => *n,
191        Value::Uint(n) => *n as i64,
192        _ => 0,
193    };
194    match width {
195        PackedWidth::I8 => out.push(n as u8),
196        PackedWidth::I16 => out.extend_from_slice(&(n as i16).to_le_bytes()),
197        PackedWidth::I32 => out.extend_from_slice(&(n as i32).to_le_bytes()),
198        PackedWidth::I64 => out.extend_from_slice(&n.to_le_bytes()),
199        PackedWidth::U8 => out.push(n as u8),
200        PackedWidth::U16 => out.extend_from_slice(&(n as u16).to_le_bytes()),
201        PackedWidth::U32 => out.extend_from_slice(&(n as u32).to_le_bytes()),
202        PackedWidth::U64 => out.extend_from_slice(&(n as u64).to_le_bytes()),
203        _ => encode_varint(zigzag_encode(n), out),
204    }
205}
206
207fn encode_float(value: &Value, width: PackedWidth, out: &mut Vec<u8>) {
208    let n = match value {
209        Value::Float(n) => *n,
210        Value::Int(n) => *n as f64,
211        Value::Uint(n) => *n as f64,
212        _ => 0.0,
213    };
214    match width {
215        PackedWidth::F32 => out.extend_from_slice(&(n as f32).to_le_bytes()),
216        _ => out.extend_from_slice(&n.to_le_bytes()),
217    }
218}
219
220fn encode_str(value: &Value, out: &mut Vec<u8>) {
221    let s = match value {
222        Value::Str(s) | Value::Date(s) | Value::DateTime(s)
223        | Value::Duration(s) | Value::Enum(s) => s.as_bytes(),
224        _ => b"",
225    };
226    encode_varint(s.len() as u64, out);
227    out.extend_from_slice(s);
228}
229
230fn encode_date(value: &Value, out: &mut Vec<u8>) {
231    // Days since Unix epoch (1970-01-01), i32 LE.
232    let days = match value {
233        Value::Date(s) => parse_date_to_days(s).unwrap_or(0),
234        _ => 0,
235    };
236    out.extend_from_slice(&days.to_le_bytes());
237}
238
239fn encode_datetime(value: &Value, out: &mut Vec<u8>) {
240    // Microseconds since Unix epoch, i64 LE.
241    let us = match value {
242        Value::DateTime(s) => parse_datetime_to_micros(s).unwrap_or(0),
243        _ => 0,
244    };
245    out.extend_from_slice(&us.to_le_bytes());
246}
247
248fn encode_duration(value: &Value, out: &mut Vec<u8>) {
249    // Microseconds, i64 LE.
250    let us = match value {
251        Value::Duration(s) => parse_duration_to_micros(s).unwrap_or(0),
252        _ => 0,
253    };
254    out.extend_from_slice(&us.to_le_bytes());
255}
256
257fn encode_bytes_value(value: &Value, out: &mut Vec<u8>) {
258    let data = match value {
259        Value::Bytes(b) => b.as_slice(),
260        _ => &[],
261    };
262    encode_varint(data.len() as u64, out);
263    out.extend_from_slice(data);
264}
265
266fn encode_enum(value: &Value, variants: &[String], width: PackedWidth, out: &mut Vec<u8>) {
267    let idx = match value {
268        Value::Enum(s) => variants.iter().position(|v| v == s).unwrap_or(0) as u64,
269        _ => 0,
270    };
271    match width {
272        PackedWidth::U8 => out.push(idx as u8),
273        _ => encode_varint(idx, out),
274    }
275}
276
277fn encode_any(value: &Value, out: &mut Vec<u8>) {
278    match value {
279        Value::Null => out.push(TAG_NULL),
280        Value::Bool(b) => {
281            out.push(TAG_BOOL);
282            out.push(if *b { 0x01 } else { 0x00 });
283        }
284        Value::Int(n) => {
285            out.push(TAG_INT);
286            encode_varint(zigzag_encode(*n), out);
287        }
288        Value::Uint(n) => {
289            out.push(TAG_UINT);
290            encode_varint(*n, out);
291        }
292        Value::Float(n) => {
293            out.push(TAG_FLOAT);
294            out.extend_from_slice(&n.to_le_bytes());
295        }
296        Value::Str(s) => {
297            out.push(TAG_STR);
298            encode_varint(s.len() as u64, out);
299            out.extend_from_slice(s.as_bytes());
300        }
301        Value::Date(s) => {
302            out.push(TAG_DATE);
303            let days = parse_date_to_days(s).unwrap_or(0);
304            out.extend_from_slice(&days.to_le_bytes());
305        }
306        Value::DateTime(s) => {
307            out.push(TAG_DATETIME);
308            let us = parse_datetime_to_micros(s).unwrap_or(0);
309            out.extend_from_slice(&us.to_le_bytes());
310        }
311        Value::Duration(s) => {
312            out.push(TAG_DURATION);
313            let us = parse_duration_to_micros(s).unwrap_or(0);
314            out.extend_from_slice(&us.to_le_bytes());
315        }
316        Value::Bytes(b) => {
317            out.push(TAG_BYTES);
318            encode_varint(b.len() as u64, out);
319            out.extend_from_slice(b);
320        }
321        Value::Array(items) => {
322            out.push(TAG_ARRAY);
323            encode_varint(items.len() as u64, out);
324            for item in items {
325                encode_any(item, out);
326            }
327        }
328        Value::Enum(s) => {
329            // Encode enums as strings in `any` context (no variant table).
330            out.push(TAG_STR);
331            encode_varint(s.len() as u64, out);
332            out.extend_from_slice(s.as_bytes());
333        }
334        Value::Map(entries) => {
335            out.push(TAG_MAP);
336            encode_varint(entries.len() as u64, out);
337            for (k, v) in entries {
338                encode_varint(k.len() as u64, out);
339                out.extend_from_slice(k.as_bytes());
340                encode_any(v, out);
341            }
342        }
343    }
344}
345
346fn encode_array(value: &Value, elem_ty: &Type, out: &mut Vec<u8>) {
347    let items = match value {
348        Value::Array(items) => items,
349        _ => {
350            encode_varint(0, out);
351            return;
352        }
353    };
354    encode_varint(items.len() as u64, out);
355    for item in items {
356        encode_value(item, elem_ty, PackedWidth::Default, out);
357    }
358}
359
360fn encode_map(value: &Value, out: &mut Vec<u8>) {
361    let entries = match value {
362        Value::Map(entries) => entries,
363        _ => {
364            encode_varint(0, out);
365            return;
366        }
367    };
368    encode_varint(entries.len() as u64, out);
369    for (k, v) in entries {
370        encode_varint(k.len() as u64, out);
371        out.extend_from_slice(k.as_bytes());
372        encode_any(v, out);
373    }
374}
375
376// ── Decoder ─────────────────────────────────────────────────────────
377
378/// Decode a single SIF value from packed binary.
379/// Returns (Value, bytes_consumed).
380pub fn decode_value(data: &[u8], field_type: &Type, width: PackedWidth) -> Result<(Value, usize)> {
381    match field_type {
382        Type::Nullable(inner) => {
383            if data.is_empty() {
384                return Err(err(ErrorKind::UnexpectedEof, 0, "expected nullable byte"));
385            }
386            if data[0] == 0x00 {
387                Ok((Value::Null, 1))
388            } else {
389                let (val, n) = decode_value(&data[1..], inner, width)?;
390                Ok((val, 1 + n))
391            }
392        }
393        Type::Bool => {
394            if data.is_empty() {
395                return Err(err(ErrorKind::UnexpectedEof, 0, "expected bool byte"));
396            }
397            Ok((Value::Bool(data[0] != 0), 1))
398        }
399        Type::Uint => decode_uint(data, width),
400        Type::Int => decode_int(data, width),
401        Type::Float => decode_float(data, width),
402        Type::Str => decode_str(data),
403        Type::Date => decode_date(data),
404        Type::DateTime => decode_datetime(data),
405        Type::Duration => decode_duration(data),
406        Type::Bytes => decode_bytes_value(data),
407        Type::Enum(variants) => decode_enum(data, variants, width),
408        Type::Null => Ok((Value::Null, 0)),
409        Type::Any => decode_any(data),
410        Type::Map => decode_map(data),
411        Type::Array(elem_ty) => decode_array(data, elem_ty),
412    }
413}
414
415fn decode_uint(data: &[u8], width: PackedWidth) -> Result<(Value, usize)> {
416    match width {
417        PackedWidth::U8 => {
418            check_len(data, 1)?;
419            Ok((Value::Uint(data[0] as u64), 1))
420        }
421        PackedWidth::U16 => {
422            check_len(data, 2)?;
423            Ok((Value::Uint(u16::from_le_bytes([data[0], data[1]]) as u64), 2))
424        }
425        PackedWidth::U32 => {
426            check_len(data, 4)?;
427            Ok((Value::Uint(u32::from_le_bytes(data[..4].try_into().unwrap()) as u64), 4))
428        }
429        PackedWidth::U64 => {
430            check_len(data, 8)?;
431            Ok((Value::Uint(u64::from_le_bytes(data[..8].try_into().unwrap())), 8))
432        }
433        _ => {
434            let (n, consumed) = decode_varint(data)?;
435            Ok((Value::Uint(n), consumed))
436        }
437    }
438}
439
440fn decode_int(data: &[u8], width: PackedWidth) -> Result<(Value, usize)> {
441    match width {
442        PackedWidth::I8 => {
443            check_len(data, 1)?;
444            Ok((Value::Int(data[0] as i8 as i64), 1))
445        }
446        PackedWidth::I16 => {
447            check_len(data, 2)?;
448            Ok((Value::Int(i16::from_le_bytes([data[0], data[1]]) as i64), 2))
449        }
450        PackedWidth::I32 => {
451            check_len(data, 4)?;
452            Ok((Value::Int(i32::from_le_bytes(data[..4].try_into().unwrap()) as i64), 4))
453        }
454        PackedWidth::I64 => {
455            check_len(data, 8)?;
456            Ok((Value::Int(i64::from_le_bytes(data[..8].try_into().unwrap())), 8))
457        }
458        PackedWidth::U8 => {
459            check_len(data, 1)?;
460            Ok((Value::Int(data[0] as i64), 1))
461        }
462        PackedWidth::U16 => {
463            check_len(data, 2)?;
464            Ok((Value::Int(u16::from_le_bytes([data[0], data[1]]) as i64), 2))
465        }
466        PackedWidth::U32 => {
467            check_len(data, 4)?;
468            Ok((Value::Int(u32::from_le_bytes(data[..4].try_into().unwrap()) as i64), 4))
469        }
470        PackedWidth::U64 => {
471            check_len(data, 8)?;
472            Ok((Value::Int(u64::from_le_bytes(data[..8].try_into().unwrap()) as i64), 8))
473        }
474        _ => {
475            let (n, consumed) = decode_varint(data)?;
476            Ok((Value::Int(zigzag_decode(n)), consumed))
477        }
478    }
479}
480
481fn decode_float(data: &[u8], width: PackedWidth) -> Result<(Value, usize)> {
482    match width {
483        PackedWidth::F32 => {
484            check_len(data, 4)?;
485            let f = f32::from_le_bytes(data[..4].try_into().unwrap());
486            Ok((Value::Float(f as f64), 4))
487        }
488        _ => {
489            check_len(data, 8)?;
490            let f = f64::from_le_bytes(data[..8].try_into().unwrap());
491            Ok((Value::Float(f), 8))
492        }
493    }
494}
495
496fn decode_str(data: &[u8]) -> Result<(Value, usize)> {
497    let (len, hdr) = decode_varint(data)?;
498    let len = len as usize;
499    let total = hdr + len;
500    if data.len() < total {
501        return Err(err(ErrorKind::UnexpectedEof, 0, "string data truncated"));
502    }
503    let s = std::str::from_utf8(&data[hdr..total])
504        .map_err(|_| err(ErrorKind::InvalidString, 0, "invalid UTF-8 in packed string"))?;
505    Ok((Value::Str(s.to_string()), total))
506}
507
508fn decode_date(data: &[u8]) -> Result<(Value, usize)> {
509    check_len(data, 4)?;
510    let days = i32::from_le_bytes(data[..4].try_into().unwrap());
511    Ok((Value::Date(days_to_date_string(days)), 4))
512}
513
514fn decode_datetime(data: &[u8]) -> Result<(Value, usize)> {
515    check_len(data, 8)?;
516    let us = i64::from_le_bytes(data[..8].try_into().unwrap());
517    Ok((Value::DateTime(micros_to_datetime_string(us)), 8))
518}
519
520fn decode_duration(data: &[u8]) -> Result<(Value, usize)> {
521    check_len(data, 8)?;
522    let us = i64::from_le_bytes(data[..8].try_into().unwrap());
523    Ok((Value::Duration(micros_to_duration_string(us)), 8))
524}
525
526fn decode_bytes_value(data: &[u8]) -> Result<(Value, usize)> {
527    let (len, hdr) = decode_varint(data)?;
528    let len = len as usize;
529    let total = hdr + len;
530    if data.len() < total {
531        return Err(err(ErrorKind::UnexpectedEof, 0, "bytes data truncated"));
532    }
533    Ok((Value::Bytes(data[hdr..total].to_vec()), total))
534}
535
536fn decode_enum(data: &[u8], variants: &[String], width: PackedWidth) -> Result<(Value, usize)> {
537    let (idx, consumed) = match width {
538        PackedWidth::U8 => {
539            check_len(data, 1)?;
540            (data[0] as u64, 1)
541        }
542        _ => decode_varint(data)?,
543    };
544    let variant = variants
545        .get(idx as usize)
546        .cloned()
547        .unwrap_or_else(|| format!("unknown_{}", idx));
548    Ok((Value::Enum(variant), consumed))
549}
550
551fn decode_any(data: &[u8]) -> Result<(Value, usize)> {
552    if data.is_empty() {
553        return Err(err(ErrorKind::UnexpectedEof, 0, "expected type tag"));
554    }
555    let tag = data[0];
556    let rest = &data[1..];
557    match tag {
558        TAG_NULL => Ok((Value::Null, 1)),
559        TAG_BOOL => {
560            check_len(rest, 1)?;
561            Ok((Value::Bool(rest[0] != 0), 2))
562        }
563        TAG_INT => {
564            let (n, c) = decode_varint(rest)?;
565            Ok((Value::Int(zigzag_decode(n)), 1 + c))
566        }
567        TAG_UINT => {
568            let (n, c) = decode_varint(rest)?;
569            Ok((Value::Uint(n), 1 + c))
570        }
571        TAG_FLOAT => {
572            check_len(rest, 8)?;
573            let f = f64::from_le_bytes(rest[..8].try_into().unwrap());
574            Ok((Value::Float(f), 9))
575        }
576        TAG_STR => {
577            let (val, c) = decode_str(rest)?;
578            Ok((val, 1 + c))
579        }
580        TAG_DATE => {
581            let (val, c) = decode_date(rest)?;
582            Ok((val, 1 + c))
583        }
584        TAG_DATETIME => {
585            let (val, c) = decode_datetime(rest)?;
586            Ok((val, 1 + c))
587        }
588        TAG_DURATION => {
589            let (val, c) = decode_duration(rest)?;
590            Ok((val, 1 + c))
591        }
592        TAG_BYTES => {
593            let (val, c) = decode_bytes_value(rest)?;
594            Ok((val, 1 + c))
595        }
596        TAG_ARRAY => {
597            let (count, hdr) = decode_varint(rest)?;
598            let mut pos = hdr;
599            let mut items = Vec::with_capacity(count as usize);
600            for _ in 0..count {
601                let (val, c) = decode_any(&rest[pos..])?;
602                items.push(val);
603                pos += c;
604            }
605            Ok((Value::Array(items), 1 + pos))
606        }
607        TAG_MAP => {
608            let (count, hdr) = decode_varint(rest)?;
609            let mut pos = hdr;
610            let mut entries = Vec::with_capacity(count as usize);
611            for _ in 0..count {
612                let (key_val, kc) = decode_str(&rest[pos..])?;
613                let key = match key_val {
614                    Value::Str(s) => s,
615                    _ => String::new(),
616                };
617                pos += kc;
618                let (val, vc) = decode_any(&rest[pos..])?;
619                pos += vc;
620                entries.push((key, val));
621            }
622            Ok((Value::Map(entries), 1 + pos))
623        }
624        _ => Err(err(ErrorKind::InvalidRecord, 0, format!("unknown type tag: 0x{:02x}", tag))),
625    }
626}
627
628fn decode_array(data: &[u8], elem_ty: &Type) -> Result<(Value, usize)> {
629    let (count, hdr) = decode_varint(data)?;
630    let mut pos = hdr;
631    let mut items = Vec::with_capacity(count as usize);
632    for _ in 0..count {
633        let (val, c) = decode_value(&data[pos..], elem_ty, PackedWidth::Default)?;
634        items.push(val);
635        pos += c;
636    }
637    Ok((Value::Array(items), pos))
638}
639
640fn decode_map(data: &[u8]) -> Result<(Value, usize)> {
641    let (count, hdr) = decode_varint(data)?;
642    let mut pos = hdr;
643    let mut entries = Vec::with_capacity(count as usize);
644    for _ in 0..count {
645        let (key_val, kc) = decode_str(&data[pos..])?;
646        let key = match key_val {
647            Value::Str(s) => s,
648            _ => String::new(),
649        };
650        pos += kc;
651        let (val, vc) = decode_any(&data[pos..])?;
652        pos += vc;
653        entries.push((key, val));
654    }
655    Ok((Value::Map(entries), pos))
656}
657
658fn check_len(data: &[u8], need: usize) -> Result<()> {
659    if data.len() < need {
660        Err(err(ErrorKind::UnexpectedEof, 0, format!("need {} bytes, have {}", need, data.len())))
661    } else {
662        Ok(())
663    }
664}
665
666// ── Record-Level Encode / Decode ────────────────────────────────────
667
668/// Encode a record to a packed binary frame (marker + length + payload).
669pub fn encode_record_frame(record: &Record, schema: &Schema, out: &mut Vec<u8>) {
670    let marker = match record.cdc_op {
671        CdcOp::Insert => FRAME_INSERT,
672        CdcOp::Update => FRAME_UPDATE,
673        CdcOp::Delete => FRAME_DELETE,
674    };
675
676    let mut payload = Vec::new();
677
678    if record.cdc_op == CdcOp::Delete {
679        // Tombstone: only :id fields.
680        for (i, field) in schema.fields.iter().enumerate() {
681            if field.semantic.as_deref() == Some("id") {
682                if let Some(val) = record.values.get(i) {
683                    let w = PackedWidth::from_field(field);
684                    encode_value(val, &field.field_type, w, &mut payload);
685                }
686            }
687        }
688    } else {
689        for (i, field) in schema.fields.iter().enumerate() {
690            let val = record.values.get(i).unwrap_or(&Value::Null);
691            let w = PackedWidth::from_field(field);
692            encode_value(val, &field.field_type, w, &mut payload);
693        }
694    }
695
696    out.push(marker);
697    encode_varint(payload.len() as u64, out);
698    out.extend_from_slice(&payload);
699}
700
701/// Decode a record from a packed binary frame payload.
702pub fn decode_record_frame(
703    marker: u8,
704    payload: &[u8],
705    schema: &Schema,
706) -> Result<Record> {
707    let cdc_op = match marker {
708        FRAME_INSERT => CdcOp::Insert,
709        FRAME_UPDATE => CdcOp::Update,
710        FRAME_DELETE => CdcOp::Delete,
711        _ => return Err(err(ErrorKind::InvalidRecord, 0, format!("unknown frame marker: 0x{:02x}", marker))),
712    };
713
714    let mut values = Vec::with_capacity(schema.fields.len());
715    let mut pos = 0;
716
717    if cdc_op == CdcOp::Delete {
718        for field in &schema.fields {
719            if field.semantic.as_deref() == Some("id") {
720                let w = PackedWidth::from_field(field);
721                let (val, consumed) = decode_value(&payload[pos..], &field.field_type, w)?;
722                values.push(val);
723                pos += consumed;
724            } else {
725                values.push(Value::Null);
726            }
727        }
728    } else {
729        for field in &schema.fields {
730            let w = PackedWidth::from_field(field);
731            let (val, consumed) = decode_value(&payload[pos..], &field.field_type, w)?;
732            values.push(val);
733            pos += consumed;
734        }
735    }
736
737    Ok(Record { values, cdc_op })
738}
739
740/// Encode a batch of records into a batch frame.
741pub fn encode_batch_frame(records: &[Record], schema: &Schema, cdc_op: CdcOp, out: &mut Vec<u8>) {
742    let marker = match cdc_op {
743        CdcOp::Insert => FRAME_BATCH_INSERT,
744        CdcOp::Update => FRAME_BATCH_UPDATE,
745        CdcOp::Delete => return, // batch delete not defined
746    };
747
748    let mut payload = Vec::new();
749    for record in records {
750        let mut rec_data = Vec::new();
751        for (i, field) in schema.fields.iter().enumerate() {
752            let val = record.values.get(i).unwrap_or(&Value::Null);
753            let w = PackedWidth::from_field(field);
754            encode_value(val, &field.field_type, w, &mut rec_data);
755        }
756        // Variable-width: length-prefix each record.
757        encode_varint(rec_data.len() as u64, &mut payload);
758        payload.extend_from_slice(&rec_data);
759    }
760
761    out.push(marker);
762    encode_varint(records.len() as u64, out);
763    encode_varint(payload.len() as u64, out);
764    out.extend_from_slice(&payload);
765}
766
767/// Decode a batch frame into records.
768pub fn decode_batch_frame(
769    payload: &[u8],
770    record_count: u64,
771    schema: &Schema,
772    cdc_op: CdcOp,
773) -> Result<Vec<Record>> {
774    let mut records = Vec::with_capacity(record_count as usize);
775    let mut pos = 0;
776
777    for _ in 0..record_count {
778        let (rec_len, hdr) = decode_varint(&payload[pos..])?;
779        pos += hdr;
780        let rec_end = pos + rec_len as usize;
781        if rec_end > payload.len() {
782            return Err(err(ErrorKind::UnexpectedEof, 0, "batch record truncated"));
783        }
784
785        let mut values = Vec::with_capacity(schema.fields.len());
786        let mut rpos = 0;
787        let rec_data = &payload[pos..rec_end];
788        for field in &schema.fields {
789            let w = PackedWidth::from_field(field);
790            let (val, consumed) = decode_value(&rec_data[rpos..], &field.field_type, w)?;
791            values.push(val);
792            rpos += consumed;
793        }
794        records.push(Record { values, cdc_op });
795        pos = rec_end;
796    }
797
798    Ok(records)
799}
800
801// ── Date/Time Conversion Helpers ────────────────────────────────────
802
803fn parse_date_to_days(s: &str) -> Option<i32> {
804    if s.len() < 10 {
805        return None;
806    }
807    let year: i32 = s[0..4].parse().ok()?;
808    let month: u32 = s[5..7].parse().ok()?;
809    let day: u32 = s[8..10].parse().ok()?;
810    Some(civil_to_days(year, month, day))
811}
812
813fn parse_datetime_to_micros(s: &str) -> Option<i64> {
814    let days = parse_date_to_days(&s[..10])? as i64;
815    let mut micros = days * 86_400_000_000;
816    if s.len() > 11 {
817        let time_part = s[11..].trim_end_matches('Z');
818        let parts: Vec<&str> = time_part.split(':').collect();
819        if parts.len() >= 2 {
820            let h: i64 = parts[0].parse().ok()?;
821            let m: i64 = parts[1].parse().ok()?;
822            let s_part = if parts.len() >= 3 { parts[2] } else { "0" };
823            let (sec, frac) = if let Some(dot) = s_part.find('.') {
824                let sec: i64 = s_part[..dot].parse().ok()?;
825                let frac_str = &s_part[dot + 1..];
826                let frac_us: i64 = match frac_str.len() {
827                    1 => frac_str.parse::<i64>().ok()? * 100_000,
828                    2 => frac_str.parse::<i64>().ok()? * 10_000,
829                    3 => frac_str.parse::<i64>().ok()? * 1_000,
830                    4 => frac_str.parse::<i64>().ok()? * 100,
831                    5 => frac_str.parse::<i64>().ok()? * 10,
832                    6 => frac_str.parse::<i64>().ok()?,
833                    _ => frac_str[..6].parse::<i64>().ok()?,
834                };
835                (sec, frac_us)
836            } else {
837                (s_part.parse::<i64>().ok()?, 0)
838            };
839            micros += h * 3_600_000_000 + m * 60_000_000 + sec * 1_000_000 + frac;
840        }
841    }
842    Some(micros)
843}
844
845fn parse_duration_to_micros(s: &str) -> Option<i64> {
846    if !s.starts_with('P') {
847        return None;
848    }
849    let rest = &s[1..];
850    let mut total_us: i64 = 0;
851    let mut in_time = false;
852    let mut num_buf = String::new();
853
854    for c in rest.chars() {
855        if c == 'T' {
856            in_time = true;
857            continue;
858        }
859        if c.is_ascii_digit() || c == '.' {
860            num_buf.push(c);
861            continue;
862        }
863        let n: f64 = num_buf.parse().unwrap_or(0.0);
864        num_buf.clear();
865        match (in_time, c) {
866            (false, 'D') => total_us += (n * 86_400_000_000.0) as i64,
867            (false, 'W') => total_us += (n * 7.0 * 86_400_000_000.0) as i64,
868            (true, 'H') => total_us += (n * 3_600_000_000.0) as i64,
869            (true, 'M') => total_us += (n * 60_000_000.0) as i64,
870            (true, 'S') => total_us += (n * 1_000_000.0) as i64,
871            _ => {}
872        }
873    }
874    Some(total_us)
875}
876
877/// Convert days since epoch to YYYY-MM-DD.
878fn days_to_date_string(days: i32) -> String {
879    let (y, m, d) = days_to_civil(days);
880    format!("{:04}-{:02}-{:02}", y, m, d)
881}
882
883/// Convert microseconds since epoch to ISO 8601 datetime.
884fn micros_to_datetime_string(us: i64) -> String {
885    let total_secs = us.div_euclid(1_000_000);
886    let frac = us.rem_euclid(1_000_000);
887    let days = total_secs.div_euclid(86_400) as i32;
888    let day_secs = total_secs.rem_euclid(86_400);
889    let (y, m, d) = days_to_civil(days);
890    let h = day_secs / 3600;
891    let min = (day_secs % 3600) / 60;
892    let s = day_secs % 60;
893    if frac == 0 {
894        format!("{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z", y, m, d, h, min, s)
895    } else {
896        format!("{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:06}Z", y, m, d, h, min, s, frac)
897    }
898}
899
900/// Convert microseconds to ISO 8601 duration.
901fn micros_to_duration_string(us: i64) -> String {
902    if us == 0 {
903        return "PT0S".to_string();
904    }
905    let total_secs = us.abs() / 1_000_000;
906    let frac = us.abs() % 1_000_000;
907    let h = total_secs / 3600;
908    let m = (total_secs % 3600) / 60;
909    let s = total_secs % 60;
910
911    let mut result = String::from("P");
912    if h > 0 || m > 0 || s > 0 || frac > 0 {
913        result.push('T');
914        if h > 0 {
915            result.push_str(&format!("{}H", h));
916        }
917        if m > 0 {
918            result.push_str(&format!("{}M", m));
919        }
920        if s > 0 || frac > 0 {
921            if frac > 0 {
922                result.push_str(&format!("{}.{}S", s, frac));
923            } else {
924                result.push_str(&format!("{}S", s));
925            }
926        }
927    }
928    result
929}
930
931// Civil date algorithms (Howard Hinnant's algorithm).
932fn civil_to_days(y: i32, m: u32, d: u32) -> i32 {
933    let y = if m <= 2 { y - 1 } else { y };
934    let era = y.div_euclid(400);
935    let yoe = y.rem_euclid(400) as u32;
936    let m = if m > 2 { m - 3 } else { m + 9 };
937    let doy = (153 * m + 2) / 5 + d - 1;
938    let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
939    era * 146097 + doe as i32 - 719468
940}
941
942fn days_to_civil(days: i32) -> (i32, u32, u32) {
943    let z = days + 719468;
944    let era = z.div_euclid(146097);
945    let doe = z.rem_euclid(146097) as u32;
946    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
947    let y = yoe as i32 + era * 400;
948    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
949    let mp = (5 * doy + 2) / 153;
950    let d = doy - (153 * mp + 2) / 5 + 1;
951    let m = if mp < 10 { mp + 3 } else { mp - 9 };
952    let y = if m <= 2 { y + 1 } else { y };
953    (y, m, d)
954}
955
956// ── Tests ───────────────────────────────────────────────────────────
957
958#[cfg(test)]
959mod tests {
960    use super::*;
961
962    #[test]
963    fn test_varint_roundtrip() {
964        for &n in &[0u64, 1, 127, 128, 16383, 16384, u32::MAX as u64, u64::MAX] {
965            let mut buf = Vec::new();
966            encode_varint(n, &mut buf);
967            let (decoded, consumed) = decode_varint(&buf).unwrap();
968            assert_eq!(decoded, n);
969            assert_eq!(consumed, buf.len());
970        }
971    }
972
973    #[test]
974    fn test_zigzag_roundtrip() {
975        for &n in &[0i64, -1, 1, -64, 64, i32::MIN as i64, i32::MAX as i64, i64::MIN, i64::MAX] {
976            let encoded = zigzag_encode(n);
977            let decoded = zigzag_decode(encoded);
978            assert_eq!(decoded, n);
979        }
980    }
981
982    #[test]
983    fn test_encode_decode_scalars() {
984        let cases: Vec<(Value, Type)> = vec![
985            (Value::Bool(true), Type::Bool),
986            (Value::Bool(false), Type::Bool),
987            (Value::Uint(42), Type::Uint),
988            (Value::Uint(0), Type::Uint),
989            (Value::Int(-42), Type::Int),
990            (Value::Int(0), Type::Int),
991            (Value::Float(3.14), Type::Float),
992            (Value::Str("hello".to_string()), Type::Str),
993            (Value::Str("".to_string()), Type::Str),
994            (Value::Null, Type::Null),
995        ];
996
997        for (value, ty) in cases {
998            let mut buf = Vec::new();
999            encode_value(&value, &ty, PackedWidth::Default, &mut buf);
1000            let (decoded, consumed) = decode_value(&buf, &ty, PackedWidth::Default).unwrap();
1001            assert_eq!(decoded, value, "failed for type {:?}", ty);
1002            assert_eq!(consumed, buf.len());
1003        }
1004    }
1005
1006    #[test]
1007    fn test_nullable_roundtrip() {
1008        let ty = Type::Nullable(Box::new(Type::Int));
1009
1010        let mut buf = Vec::new();
1011        encode_value(&Value::Int(42), &ty, PackedWidth::Default, &mut buf);
1012        let (val, _) = decode_value(&buf, &ty, PackedWidth::Default).unwrap();
1013        assert_eq!(val, Value::Int(42));
1014
1015        let mut buf = Vec::new();
1016        encode_value(&Value::Null, &ty, PackedWidth::Default, &mut buf);
1017        let (val, _) = decode_value(&buf, &ty, PackedWidth::Default).unwrap();
1018        assert_eq!(val, Value::Null);
1019    }
1020
1021    #[test]
1022    fn test_width_modifiers() {
1023        // u16
1024        let mut buf = Vec::new();
1025        encode_uint(&Value::Uint(1000), PackedWidth::U16, &mut buf);
1026        assert_eq!(buf.len(), 2);
1027        let (val, _) = decode_uint(&buf, PackedWidth::U16).unwrap();
1028        assert_eq!(val, Value::Uint(1000));
1029
1030        // f32
1031        let mut buf = Vec::new();
1032        encode_float(&Value::Float(3.14), PackedWidth::F32, &mut buf);
1033        assert_eq!(buf.len(), 4);
1034        let (val, _) = decode_float(&buf, PackedWidth::F32).unwrap();
1035        if let Value::Float(f) = val {
1036            assert!((f - 3.14).abs() < 0.001);
1037        } else {
1038            panic!("expected float");
1039        }
1040    }
1041
1042    #[test]
1043    fn test_record_frame_roundtrip() {
1044        let schema = Schema {
1045            fields: vec![
1046                FieldDef {
1047                    name: "id".to_string(),
1048                    field_type: Type::Uint,
1049                    semantic: Some("id".to_string()),
1050                    deprecated: false,
1051                    modifiers: Vec::new(),
1052                },
1053                FieldDef {
1054                    name: "name".to_string(),
1055                    field_type: Type::Str,
1056                    semantic: None,
1057                    deprecated: false,
1058                    modifiers: Vec::new(),
1059                },
1060                FieldDef {
1061                    name: "active".to_string(),
1062                    field_type: Type::Bool,
1063                    semantic: None,
1064                    deprecated: false,
1065                    modifiers: Vec::new(),
1066                },
1067            ],
1068        };
1069
1070        let record = Record {
1071            values: vec![
1072                Value::Uint(42),
1073                Value::Str("alice".to_string()),
1074                Value::Bool(true),
1075            ],
1076            cdc_op: CdcOp::Insert,
1077        };
1078
1079        let mut buf = Vec::new();
1080        encode_record_frame(&record, &schema, &mut buf);
1081
1082        // Decode
1083        assert_eq!(buf[0], FRAME_INSERT);
1084        let (payload_len, hdr) = decode_varint(&buf[1..]).unwrap();
1085        let payload = &buf[1 + hdr..1 + hdr + payload_len as usize];
1086        let decoded = decode_record_frame(FRAME_INSERT, payload, &schema).unwrap();
1087
1088        assert_eq!(decoded.values, record.values);
1089        assert_eq!(decoded.cdc_op, CdcOp::Insert);
1090    }
1091
1092    #[test]
1093    fn test_cdc_delete_frame() {
1094        let schema = Schema {
1095            fields: vec![
1096                FieldDef {
1097                    name: "id".to_string(),
1098                    field_type: Type::Uint,
1099                    semantic: Some("id".to_string()),
1100                    deprecated: false,
1101                    modifiers: Vec::new(),
1102                },
1103                FieldDef {
1104                    name: "name".to_string(),
1105                    field_type: Type::Str,
1106                    semantic: None,
1107                    deprecated: false,
1108                    modifiers: Vec::new(),
1109                },
1110            ],
1111        };
1112
1113        let record = Record {
1114            values: vec![Value::Uint(5), Value::Null],
1115            cdc_op: CdcOp::Delete,
1116        };
1117
1118        let mut buf = Vec::new();
1119        encode_record_frame(&record, &schema, &mut buf);
1120        assert_eq!(buf[0], FRAME_DELETE);
1121
1122        let (payload_len, hdr) = decode_varint(&buf[1..]).unwrap();
1123        let payload = &buf[1 + hdr..1 + hdr + payload_len as usize];
1124        let decoded = decode_record_frame(FRAME_DELETE, payload, &schema).unwrap();
1125
1126        assert_eq!(decoded.cdc_op, CdcOp::Delete);
1127        assert_eq!(decoded.values[0], Value::Uint(5));
1128        assert_eq!(decoded.values[1], Value::Null); // non-id field is null
1129    }
1130
1131    #[test]
1132    fn test_date_roundtrip() {
1133        let mut buf = Vec::new();
1134        let val = Value::Date("2026-03-14".to_string());
1135        encode_value(&val, &Type::Date, PackedWidth::Default, &mut buf);
1136        assert_eq!(buf.len(), 4);
1137        let (decoded, _) = decode_value(&buf, &Type::Date, PackedWidth::Default).unwrap();
1138        assert_eq!(decoded, val);
1139    }
1140
1141    #[test]
1142    fn test_datetime_roundtrip() {
1143        let mut buf = Vec::new();
1144        let val = Value::DateTime("2026-03-14T10:30:00Z".to_string());
1145        encode_value(&val, &Type::DateTime, PackedWidth::Default, &mut buf);
1146        assert_eq!(buf.len(), 8);
1147        let (decoded, _) = decode_value(&buf, &Type::DateTime, PackedWidth::Default).unwrap();
1148        assert_eq!(decoded, val);
1149    }
1150
1151    #[test]
1152    fn test_duration_roundtrip() {
1153        let mut buf = Vec::new();
1154        let val = Value::Duration("PT2H30M".to_string());
1155        encode_value(&val, &Type::Duration, PackedWidth::Default, &mut buf);
1156        let (decoded, _) = decode_value(&buf, &Type::Duration, PackedWidth::Default).unwrap();
1157        assert_eq!(decoded, val);
1158    }
1159
1160    #[test]
1161    fn test_enum_roundtrip() {
1162        let variants = vec!["open".to_string(), "closed".to_string(), "merged".to_string()];
1163        let ty = Type::Enum(variants);
1164        let val = Value::Enum("closed".to_string());
1165        let mut buf = Vec::new();
1166        encode_value(&val, &ty, PackedWidth::Default, &mut buf);
1167        let (decoded, _) = decode_value(&buf, &ty, PackedWidth::Default).unwrap();
1168        assert_eq!(decoded, val);
1169    }
1170
1171    #[test]
1172    fn test_array_roundtrip() {
1173        let ty = Type::Array(Box::new(Type::Int));
1174        let val = Value::Array(vec![Value::Int(1), Value::Int(-2), Value::Int(3)]);
1175        let mut buf = Vec::new();
1176        encode_value(&val, &ty, PackedWidth::Default, &mut buf);
1177        let (decoded, _) = decode_value(&buf, &ty, PackedWidth::Default).unwrap();
1178        assert_eq!(decoded, val);
1179    }
1180
1181    #[test]
1182    fn test_any_roundtrip() {
1183        let cases: Vec<Value> = vec![
1184            Value::Null,
1185            Value::Bool(true),
1186            Value::Int(-42),
1187            Value::Uint(100),
1188            Value::Float(2.718),
1189            Value::Str("test".to_string()),
1190        ];
1191        for val in cases {
1192            let mut buf = Vec::new();
1193            encode_value(&val, &Type::Any, PackedWidth::Default, &mut buf);
1194            let (decoded, _) = decode_value(&buf, &Type::Any, PackedWidth::Default).unwrap();
1195            assert_eq!(decoded, val, "any roundtrip failed for {:?}", val);
1196        }
1197    }
1198
1199    #[test]
1200    fn test_batch_frame_roundtrip() {
1201        let schema = Schema {
1202            fields: vec![
1203                FieldDef {
1204                    name: "id".to_string(),
1205                    field_type: Type::Uint,
1206                    semantic: Some("id".to_string()),
1207                    deprecated: false,
1208                    modifiers: Vec::new(),
1209                },
1210                FieldDef {
1211                    name: "val".to_string(),
1212                    field_type: Type::Str,
1213                    semantic: None,
1214                    deprecated: false,
1215                    modifiers: Vec::new(),
1216                },
1217            ],
1218        };
1219
1220        let records = vec![
1221            Record {
1222                values: vec![Value::Uint(1), Value::Str("a".to_string())],
1223                cdc_op: CdcOp::Insert,
1224            },
1225            Record {
1226                values: vec![Value::Uint(2), Value::Str("b".to_string())],
1227                cdc_op: CdcOp::Insert,
1228            },
1229            Record {
1230                values: vec![Value::Uint(3), Value::Str("c".to_string())],
1231                cdc_op: CdcOp::Insert,
1232            },
1233        ];
1234
1235        let mut buf = Vec::new();
1236        encode_batch_frame(&records, &schema, CdcOp::Insert, &mut buf);
1237
1238        assert_eq!(buf[0], FRAME_BATCH_INSERT);
1239        let (count, h1) = decode_varint(&buf[1..]).unwrap();
1240        assert_eq!(count, 3);
1241        let (payload_len, h2) = decode_varint(&buf[1 + h1..]).unwrap();
1242        let payload = &buf[1 + h1 + h2..1 + h1 + h2 + payload_len as usize];
1243
1244        let decoded = decode_batch_frame(payload, count, &schema, CdcOp::Insert).unwrap();
1245        assert_eq!(decoded.len(), 3);
1246        assert_eq!(decoded[0].values[0], Value::Uint(1));
1247        assert_eq!(decoded[2].values[1], Value::Str("c".to_string()));
1248    }
1249}