1use crate::error::{err, ErrorKind, Result};
10use crate::types::*;
11
12pub fn encode_varint(mut value: u64, out: &mut Vec<u8>) {
16 loop {
17 let mut byte = (value & 0x7F) as u8;
18 value >>= 7;
19 if value != 0 {
20 byte |= 0x80;
21 }
22 out.push(byte);
23 if value == 0 {
24 break;
25 }
26 }
27}
28
29pub fn decode_varint(data: &[u8]) -> Result<(u64, usize)> {
32 let mut result: u64 = 0;
33 let mut shift = 0u32;
34 for (i, &byte) in data.iter().enumerate() {
35 if i >= 10 {
36 return Err(err(ErrorKind::InvalidRecord, 0, "varint exceeds 10 bytes"));
37 }
38 result |= ((byte & 0x7F) as u64) << shift;
39 if byte & 0x80 == 0 {
40 return Ok((result, i + 1));
41 }
42 shift += 7;
43 }
44 Err(err(ErrorKind::UnexpectedEof, 0, "unterminated varint"))
45}
46
47pub fn zigzag_encode(n: i64) -> u64 {
49 ((n << 1) ^ (n >> 63)) as u64
50}
51
52pub fn zigzag_decode(n: u64) -> i64 {
54 ((n >> 1) as i64) ^ -((n & 1) as i64)
55}
56
57pub const FRAME_INSERT: u8 = 0x00;
61pub const FRAME_UPDATE: u8 = 0x01;
62pub const FRAME_DELETE: u8 = 0x02;
63pub const FRAME_BATCH_INSERT: u8 = 0x03;
64pub const FRAME_BATCH_UPDATE: u8 = 0x04;
65
66const TAG_NULL: u8 = 0x00;
69const TAG_BOOL: u8 = 0x01;
70const TAG_INT: u8 = 0x02;
71const TAG_UINT: u8 = 0x03;
72const TAG_FLOAT: u8 = 0x04;
73const TAG_STR: u8 = 0x05;
74const TAG_DATE: u8 = 0x06;
75const TAG_DATETIME: u8 = 0x07;
76const TAG_DURATION: u8 = 0x08;
77const TAG_BYTES: u8 = 0x09;
78const TAG_ARRAY: u8 = 0x0A;
79const TAG_MAP: u8 = 0x0B;
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub enum PackedWidth {
86 Default,
87 U8,
88 U16,
89 U32,
90 U64,
91 I8,
92 I16,
93 I32,
94 I64,
95 F32,
96 F64,
97}
98
99impl PackedWidth {
100 pub fn from_field(field: &FieldDef) -> Self {
102 for m in &field.modifiers {
103 if m.name == "packed" {
104 if let Some(ref v) = m.value {
105 return match v.as_str() {
106 "u8" => PackedWidth::U8,
107 "u16" => PackedWidth::U16,
108 "u32" => PackedWidth::U32,
109 "u64" => PackedWidth::U64,
110 "i8" => PackedWidth::I8,
111 "i16" => PackedWidth::I16,
112 "i32" => PackedWidth::I32,
113 "i64" => PackedWidth::I64,
114 "f32" => PackedWidth::F32,
115 "f64" => PackedWidth::F64,
116 _ => PackedWidth::Default,
117 };
118 }
119 }
120 }
121 PackedWidth::Default
122 }
123
124 pub fn fixed_size(&self) -> Option<usize> {
126 match self {
127 PackedWidth::Default => None,
128 PackedWidth::U8 | PackedWidth::I8 => Some(1),
129 PackedWidth::U16 | PackedWidth::I16 => Some(2),
130 PackedWidth::U32 | PackedWidth::I32 | PackedWidth::F32 => Some(4),
131 PackedWidth::U64 | PackedWidth::I64 | PackedWidth::F64 => Some(8),
132 }
133 }
134}
135
136pub fn encode_value(value: &Value, field_type: &Type, width: PackedWidth, out: &mut Vec<u8>) {
140 match field_type {
141 Type::Nullable(inner) => {
142 match value {
143 Value::Null => out.push(0x00),
144 _ => {
145 out.push(0x01);
146 encode_value(value, inner, width, out);
147 }
148 }
149 }
150 Type::Bool => {
151 match value {
152 Value::Bool(true) => out.push(0x01),
153 Value::Bool(false) => out.push(0x00),
154 _ => out.push(0x00),
155 }
156 }
157 Type::Uint => encode_uint(value, width, out),
158 Type::Int => encode_int(value, width, out),
159 Type::Float => encode_float(value, width, out),
160 Type::Str => encode_str(value, out),
161 Type::Date => encode_date(value, out),
162 Type::DateTime => encode_datetime(value, out),
163 Type::Duration => encode_duration(value, out),
164 Type::Bytes => encode_bytes_value(value, out),
165 Type::Enum(variants) => encode_enum(value, variants, width, out),
166 Type::Null => {} Type::Any => encode_any(value, out),
168 Type::Map => encode_map(value, out),
169 Type::Array(elem_ty) => encode_array(value, elem_ty, out),
170 }
171}
172
173fn encode_uint(value: &Value, width: PackedWidth, out: &mut Vec<u8>) {
174 let n = match value {
175 Value::Uint(n) => *n,
176 Value::Int(n) => *n as u64,
177 _ => 0,
178 };
179 match width {
180 PackedWidth::U8 => out.push(n as u8),
181 PackedWidth::U16 => out.extend_from_slice(&(n as u16).to_le_bytes()),
182 PackedWidth::U32 => out.extend_from_slice(&(n as u32).to_le_bytes()),
183 PackedWidth::U64 => out.extend_from_slice(&n.to_le_bytes()),
184 _ => encode_varint(n, out),
185 }
186}
187
188fn encode_int(value: &Value, width: PackedWidth, out: &mut Vec<u8>) {
189 let n = match value {
190 Value::Int(n) => *n,
191 Value::Uint(n) => *n as i64,
192 _ => 0,
193 };
194 match width {
195 PackedWidth::I8 => out.push(n as u8),
196 PackedWidth::I16 => out.extend_from_slice(&(n as i16).to_le_bytes()),
197 PackedWidth::I32 => out.extend_from_slice(&(n as i32).to_le_bytes()),
198 PackedWidth::I64 => out.extend_from_slice(&n.to_le_bytes()),
199 PackedWidth::U8 => out.push(n as u8),
200 PackedWidth::U16 => out.extend_from_slice(&(n as u16).to_le_bytes()),
201 PackedWidth::U32 => out.extend_from_slice(&(n as u32).to_le_bytes()),
202 PackedWidth::U64 => out.extend_from_slice(&(n as u64).to_le_bytes()),
203 _ => encode_varint(zigzag_encode(n), out),
204 }
205}
206
207fn encode_float(value: &Value, width: PackedWidth, out: &mut Vec<u8>) {
208 let n = match value {
209 Value::Float(n) => *n,
210 Value::Int(n) => *n as f64,
211 Value::Uint(n) => *n as f64,
212 _ => 0.0,
213 };
214 match width {
215 PackedWidth::F32 => out.extend_from_slice(&(n as f32).to_le_bytes()),
216 _ => out.extend_from_slice(&n.to_le_bytes()),
217 }
218}
219
220fn encode_str(value: &Value, out: &mut Vec<u8>) {
221 let s = match value {
222 Value::Str(s) | Value::Date(s) | Value::DateTime(s)
223 | Value::Duration(s) | Value::Enum(s) => s.as_bytes(),
224 _ => b"",
225 };
226 encode_varint(s.len() as u64, out);
227 out.extend_from_slice(s);
228}
229
230fn encode_date(value: &Value, out: &mut Vec<u8>) {
231 let days = match value {
233 Value::Date(s) => parse_date_to_days(s).unwrap_or(0),
234 _ => 0,
235 };
236 out.extend_from_slice(&days.to_le_bytes());
237}
238
239fn encode_datetime(value: &Value, out: &mut Vec<u8>) {
240 let us = match value {
242 Value::DateTime(s) => parse_datetime_to_micros(s).unwrap_or(0),
243 _ => 0,
244 };
245 out.extend_from_slice(&us.to_le_bytes());
246}
247
248fn encode_duration(value: &Value, out: &mut Vec<u8>) {
249 let us = match value {
251 Value::Duration(s) => parse_duration_to_micros(s).unwrap_or(0),
252 _ => 0,
253 };
254 out.extend_from_slice(&us.to_le_bytes());
255}
256
257fn encode_bytes_value(value: &Value, out: &mut Vec<u8>) {
258 let data = match value {
259 Value::Bytes(b) => b.as_slice(),
260 _ => &[],
261 };
262 encode_varint(data.len() as u64, out);
263 out.extend_from_slice(data);
264}
265
266fn encode_enum(value: &Value, variants: &[String], width: PackedWidth, out: &mut Vec<u8>) {
267 let idx = match value {
268 Value::Enum(s) => variants.iter().position(|v| v == s).unwrap_or(0) as u64,
269 _ => 0,
270 };
271 match width {
272 PackedWidth::U8 => out.push(idx as u8),
273 _ => encode_varint(idx, out),
274 }
275}
276
277fn encode_any(value: &Value, out: &mut Vec<u8>) {
278 match value {
279 Value::Null => out.push(TAG_NULL),
280 Value::Bool(b) => {
281 out.push(TAG_BOOL);
282 out.push(if *b { 0x01 } else { 0x00 });
283 }
284 Value::Int(n) => {
285 out.push(TAG_INT);
286 encode_varint(zigzag_encode(*n), out);
287 }
288 Value::Uint(n) => {
289 out.push(TAG_UINT);
290 encode_varint(*n, out);
291 }
292 Value::Float(n) => {
293 out.push(TAG_FLOAT);
294 out.extend_from_slice(&n.to_le_bytes());
295 }
296 Value::Str(s) => {
297 out.push(TAG_STR);
298 encode_varint(s.len() as u64, out);
299 out.extend_from_slice(s.as_bytes());
300 }
301 Value::Date(s) => {
302 out.push(TAG_DATE);
303 let days = parse_date_to_days(s).unwrap_or(0);
304 out.extend_from_slice(&days.to_le_bytes());
305 }
306 Value::DateTime(s) => {
307 out.push(TAG_DATETIME);
308 let us = parse_datetime_to_micros(s).unwrap_or(0);
309 out.extend_from_slice(&us.to_le_bytes());
310 }
311 Value::Duration(s) => {
312 out.push(TAG_DURATION);
313 let us = parse_duration_to_micros(s).unwrap_or(0);
314 out.extend_from_slice(&us.to_le_bytes());
315 }
316 Value::Bytes(b) => {
317 out.push(TAG_BYTES);
318 encode_varint(b.len() as u64, out);
319 out.extend_from_slice(b);
320 }
321 Value::Array(items) => {
322 out.push(TAG_ARRAY);
323 encode_varint(items.len() as u64, out);
324 for item in items {
325 encode_any(item, out);
326 }
327 }
328 Value::Enum(s) => {
329 out.push(TAG_STR);
331 encode_varint(s.len() as u64, out);
332 out.extend_from_slice(s.as_bytes());
333 }
334 Value::Map(entries) => {
335 out.push(TAG_MAP);
336 encode_varint(entries.len() as u64, out);
337 for (k, v) in entries {
338 encode_varint(k.len() as u64, out);
339 out.extend_from_slice(k.as_bytes());
340 encode_any(v, out);
341 }
342 }
343 }
344}
345
346fn encode_array(value: &Value, elem_ty: &Type, out: &mut Vec<u8>) {
347 let items = match value {
348 Value::Array(items) => items,
349 _ => {
350 encode_varint(0, out);
351 return;
352 }
353 };
354 encode_varint(items.len() as u64, out);
355 for item in items {
356 encode_value(item, elem_ty, PackedWidth::Default, out);
357 }
358}
359
360fn encode_map(value: &Value, out: &mut Vec<u8>) {
361 let entries = match value {
362 Value::Map(entries) => entries,
363 _ => {
364 encode_varint(0, out);
365 return;
366 }
367 };
368 encode_varint(entries.len() as u64, out);
369 for (k, v) in entries {
370 encode_varint(k.len() as u64, out);
371 out.extend_from_slice(k.as_bytes());
372 encode_any(v, out);
373 }
374}
375
376pub fn decode_value(data: &[u8], field_type: &Type, width: PackedWidth) -> Result<(Value, usize)> {
381 match field_type {
382 Type::Nullable(inner) => {
383 if data.is_empty() {
384 return Err(err(ErrorKind::UnexpectedEof, 0, "expected nullable byte"));
385 }
386 if data[0] == 0x00 {
387 Ok((Value::Null, 1))
388 } else {
389 let (val, n) = decode_value(&data[1..], inner, width)?;
390 Ok((val, 1 + n))
391 }
392 }
393 Type::Bool => {
394 if data.is_empty() {
395 return Err(err(ErrorKind::UnexpectedEof, 0, "expected bool byte"));
396 }
397 Ok((Value::Bool(data[0] != 0), 1))
398 }
399 Type::Uint => decode_uint(data, width),
400 Type::Int => decode_int(data, width),
401 Type::Float => decode_float(data, width),
402 Type::Str => decode_str(data),
403 Type::Date => decode_date(data),
404 Type::DateTime => decode_datetime(data),
405 Type::Duration => decode_duration(data),
406 Type::Bytes => decode_bytes_value(data),
407 Type::Enum(variants) => decode_enum(data, variants, width),
408 Type::Null => Ok((Value::Null, 0)),
409 Type::Any => decode_any(data),
410 Type::Map => decode_map(data),
411 Type::Array(elem_ty) => decode_array(data, elem_ty),
412 }
413}
414
415fn decode_uint(data: &[u8], width: PackedWidth) -> Result<(Value, usize)> {
416 match width {
417 PackedWidth::U8 => {
418 check_len(data, 1)?;
419 Ok((Value::Uint(data[0] as u64), 1))
420 }
421 PackedWidth::U16 => {
422 check_len(data, 2)?;
423 Ok((Value::Uint(u16::from_le_bytes([data[0], data[1]]) as u64), 2))
424 }
425 PackedWidth::U32 => {
426 check_len(data, 4)?;
427 Ok((Value::Uint(u32::from_le_bytes(data[..4].try_into().unwrap()) as u64), 4))
428 }
429 PackedWidth::U64 => {
430 check_len(data, 8)?;
431 Ok((Value::Uint(u64::from_le_bytes(data[..8].try_into().unwrap())), 8))
432 }
433 _ => {
434 let (n, consumed) = decode_varint(data)?;
435 Ok((Value::Uint(n), consumed))
436 }
437 }
438}
439
440fn decode_int(data: &[u8], width: PackedWidth) -> Result<(Value, usize)> {
441 match width {
442 PackedWidth::I8 => {
443 check_len(data, 1)?;
444 Ok((Value::Int(data[0] as i8 as i64), 1))
445 }
446 PackedWidth::I16 => {
447 check_len(data, 2)?;
448 Ok((Value::Int(i16::from_le_bytes([data[0], data[1]]) as i64), 2))
449 }
450 PackedWidth::I32 => {
451 check_len(data, 4)?;
452 Ok((Value::Int(i32::from_le_bytes(data[..4].try_into().unwrap()) as i64), 4))
453 }
454 PackedWidth::I64 => {
455 check_len(data, 8)?;
456 Ok((Value::Int(i64::from_le_bytes(data[..8].try_into().unwrap())), 8))
457 }
458 PackedWidth::U8 => {
459 check_len(data, 1)?;
460 Ok((Value::Int(data[0] as i64), 1))
461 }
462 PackedWidth::U16 => {
463 check_len(data, 2)?;
464 Ok((Value::Int(u16::from_le_bytes([data[0], data[1]]) as i64), 2))
465 }
466 PackedWidth::U32 => {
467 check_len(data, 4)?;
468 Ok((Value::Int(u32::from_le_bytes(data[..4].try_into().unwrap()) as i64), 4))
469 }
470 PackedWidth::U64 => {
471 check_len(data, 8)?;
472 Ok((Value::Int(u64::from_le_bytes(data[..8].try_into().unwrap()) as i64), 8))
473 }
474 _ => {
475 let (n, consumed) = decode_varint(data)?;
476 Ok((Value::Int(zigzag_decode(n)), consumed))
477 }
478 }
479}
480
481fn decode_float(data: &[u8], width: PackedWidth) -> Result<(Value, usize)> {
482 match width {
483 PackedWidth::F32 => {
484 check_len(data, 4)?;
485 let f = f32::from_le_bytes(data[..4].try_into().unwrap());
486 Ok((Value::Float(f as f64), 4))
487 }
488 _ => {
489 check_len(data, 8)?;
490 let f = f64::from_le_bytes(data[..8].try_into().unwrap());
491 Ok((Value::Float(f), 8))
492 }
493 }
494}
495
496fn decode_str(data: &[u8]) -> Result<(Value, usize)> {
497 let (len, hdr) = decode_varint(data)?;
498 let len = len as usize;
499 let total = hdr + len;
500 if data.len() < total {
501 return Err(err(ErrorKind::UnexpectedEof, 0, "string data truncated"));
502 }
503 let s = std::str::from_utf8(&data[hdr..total])
504 .map_err(|_| err(ErrorKind::InvalidString, 0, "invalid UTF-8 in packed string"))?;
505 Ok((Value::Str(s.to_string()), total))
506}
507
508fn decode_date(data: &[u8]) -> Result<(Value, usize)> {
509 check_len(data, 4)?;
510 let days = i32::from_le_bytes(data[..4].try_into().unwrap());
511 Ok((Value::Date(days_to_date_string(days)), 4))
512}
513
514fn decode_datetime(data: &[u8]) -> Result<(Value, usize)> {
515 check_len(data, 8)?;
516 let us = i64::from_le_bytes(data[..8].try_into().unwrap());
517 Ok((Value::DateTime(micros_to_datetime_string(us)), 8))
518}
519
520fn decode_duration(data: &[u8]) -> Result<(Value, usize)> {
521 check_len(data, 8)?;
522 let us = i64::from_le_bytes(data[..8].try_into().unwrap());
523 Ok((Value::Duration(micros_to_duration_string(us)), 8))
524}
525
526fn decode_bytes_value(data: &[u8]) -> Result<(Value, usize)> {
527 let (len, hdr) = decode_varint(data)?;
528 let len = len as usize;
529 let total = hdr + len;
530 if data.len() < total {
531 return Err(err(ErrorKind::UnexpectedEof, 0, "bytes data truncated"));
532 }
533 Ok((Value::Bytes(data[hdr..total].to_vec()), total))
534}
535
536fn decode_enum(data: &[u8], variants: &[String], width: PackedWidth) -> Result<(Value, usize)> {
537 let (idx, consumed) = match width {
538 PackedWidth::U8 => {
539 check_len(data, 1)?;
540 (data[0] as u64, 1)
541 }
542 _ => decode_varint(data)?,
543 };
544 let variant = variants
545 .get(idx as usize)
546 .cloned()
547 .unwrap_or_else(|| format!("unknown_{}", idx));
548 Ok((Value::Enum(variant), consumed))
549}
550
551fn decode_any(data: &[u8]) -> Result<(Value, usize)> {
552 if data.is_empty() {
553 return Err(err(ErrorKind::UnexpectedEof, 0, "expected type tag"));
554 }
555 let tag = data[0];
556 let rest = &data[1..];
557 match tag {
558 TAG_NULL => Ok((Value::Null, 1)),
559 TAG_BOOL => {
560 check_len(rest, 1)?;
561 Ok((Value::Bool(rest[0] != 0), 2))
562 }
563 TAG_INT => {
564 let (n, c) = decode_varint(rest)?;
565 Ok((Value::Int(zigzag_decode(n)), 1 + c))
566 }
567 TAG_UINT => {
568 let (n, c) = decode_varint(rest)?;
569 Ok((Value::Uint(n), 1 + c))
570 }
571 TAG_FLOAT => {
572 check_len(rest, 8)?;
573 let f = f64::from_le_bytes(rest[..8].try_into().unwrap());
574 Ok((Value::Float(f), 9))
575 }
576 TAG_STR => {
577 let (val, c) = decode_str(rest)?;
578 Ok((val, 1 + c))
579 }
580 TAG_DATE => {
581 let (val, c) = decode_date(rest)?;
582 Ok((val, 1 + c))
583 }
584 TAG_DATETIME => {
585 let (val, c) = decode_datetime(rest)?;
586 Ok((val, 1 + c))
587 }
588 TAG_DURATION => {
589 let (val, c) = decode_duration(rest)?;
590 Ok((val, 1 + c))
591 }
592 TAG_BYTES => {
593 let (val, c) = decode_bytes_value(rest)?;
594 Ok((val, 1 + c))
595 }
596 TAG_ARRAY => {
597 let (count, hdr) = decode_varint(rest)?;
598 let mut pos = hdr;
599 let mut items = Vec::with_capacity(count as usize);
600 for _ in 0..count {
601 let (val, c) = decode_any(&rest[pos..])?;
602 items.push(val);
603 pos += c;
604 }
605 Ok((Value::Array(items), 1 + pos))
606 }
607 TAG_MAP => {
608 let (count, hdr) = decode_varint(rest)?;
609 let mut pos = hdr;
610 let mut entries = Vec::with_capacity(count as usize);
611 for _ in 0..count {
612 let (key_val, kc) = decode_str(&rest[pos..])?;
613 let key = match key_val {
614 Value::Str(s) => s,
615 _ => String::new(),
616 };
617 pos += kc;
618 let (val, vc) = decode_any(&rest[pos..])?;
619 pos += vc;
620 entries.push((key, val));
621 }
622 Ok((Value::Map(entries), 1 + pos))
623 }
624 _ => Err(err(ErrorKind::InvalidRecord, 0, format!("unknown type tag: 0x{:02x}", tag))),
625 }
626}
627
628fn decode_array(data: &[u8], elem_ty: &Type) -> Result<(Value, usize)> {
629 let (count, hdr) = decode_varint(data)?;
630 let mut pos = hdr;
631 let mut items = Vec::with_capacity(count as usize);
632 for _ in 0..count {
633 let (val, c) = decode_value(&data[pos..], elem_ty, PackedWidth::Default)?;
634 items.push(val);
635 pos += c;
636 }
637 Ok((Value::Array(items), pos))
638}
639
640fn decode_map(data: &[u8]) -> Result<(Value, usize)> {
641 let (count, hdr) = decode_varint(data)?;
642 let mut pos = hdr;
643 let mut entries = Vec::with_capacity(count as usize);
644 for _ in 0..count {
645 let (key_val, kc) = decode_str(&data[pos..])?;
646 let key = match key_val {
647 Value::Str(s) => s,
648 _ => String::new(),
649 };
650 pos += kc;
651 let (val, vc) = decode_any(&data[pos..])?;
652 pos += vc;
653 entries.push((key, val));
654 }
655 Ok((Value::Map(entries), pos))
656}
657
658fn check_len(data: &[u8], need: usize) -> Result<()> {
659 if data.len() < need {
660 Err(err(ErrorKind::UnexpectedEof, 0, format!("need {} bytes, have {}", need, data.len())))
661 } else {
662 Ok(())
663 }
664}
665
666pub fn encode_record_frame(record: &Record, schema: &Schema, out: &mut Vec<u8>) {
670 let marker = match record.cdc_op {
671 CdcOp::Insert => FRAME_INSERT,
672 CdcOp::Update => FRAME_UPDATE,
673 CdcOp::Delete => FRAME_DELETE,
674 };
675
676 let mut payload = Vec::new();
677
678 if record.cdc_op == CdcOp::Delete {
679 for (i, field) in schema.fields.iter().enumerate() {
681 if field.semantic.as_deref() == Some("id") {
682 if let Some(val) = record.values.get(i) {
683 let w = PackedWidth::from_field(field);
684 encode_value(val, &field.field_type, w, &mut payload);
685 }
686 }
687 }
688 } else {
689 for (i, field) in schema.fields.iter().enumerate() {
690 let val = record.values.get(i).unwrap_or(&Value::Null);
691 let w = PackedWidth::from_field(field);
692 encode_value(val, &field.field_type, w, &mut payload);
693 }
694 }
695
696 out.push(marker);
697 encode_varint(payload.len() as u64, out);
698 out.extend_from_slice(&payload);
699}
700
701pub fn decode_record_frame(
703 marker: u8,
704 payload: &[u8],
705 schema: &Schema,
706) -> Result<Record> {
707 let cdc_op = match marker {
708 FRAME_INSERT => CdcOp::Insert,
709 FRAME_UPDATE => CdcOp::Update,
710 FRAME_DELETE => CdcOp::Delete,
711 _ => return Err(err(ErrorKind::InvalidRecord, 0, format!("unknown frame marker: 0x{:02x}", marker))),
712 };
713
714 let mut values = Vec::with_capacity(schema.fields.len());
715 let mut pos = 0;
716
717 if cdc_op == CdcOp::Delete {
718 for field in &schema.fields {
719 if field.semantic.as_deref() == Some("id") {
720 let w = PackedWidth::from_field(field);
721 let (val, consumed) = decode_value(&payload[pos..], &field.field_type, w)?;
722 values.push(val);
723 pos += consumed;
724 } else {
725 values.push(Value::Null);
726 }
727 }
728 } else {
729 for field in &schema.fields {
730 let w = PackedWidth::from_field(field);
731 let (val, consumed) = decode_value(&payload[pos..], &field.field_type, w)?;
732 values.push(val);
733 pos += consumed;
734 }
735 }
736
737 Ok(Record { values, cdc_op })
738}
739
740pub fn encode_batch_frame(records: &[Record], schema: &Schema, cdc_op: CdcOp, out: &mut Vec<u8>) {
742 let marker = match cdc_op {
743 CdcOp::Insert => FRAME_BATCH_INSERT,
744 CdcOp::Update => FRAME_BATCH_UPDATE,
745 CdcOp::Delete => return, };
747
748 let mut payload = Vec::new();
749 for record in records {
750 let mut rec_data = Vec::new();
751 for (i, field) in schema.fields.iter().enumerate() {
752 let val = record.values.get(i).unwrap_or(&Value::Null);
753 let w = PackedWidth::from_field(field);
754 encode_value(val, &field.field_type, w, &mut rec_data);
755 }
756 encode_varint(rec_data.len() as u64, &mut payload);
758 payload.extend_from_slice(&rec_data);
759 }
760
761 out.push(marker);
762 encode_varint(records.len() as u64, out);
763 encode_varint(payload.len() as u64, out);
764 out.extend_from_slice(&payload);
765}
766
767pub fn decode_batch_frame(
769 payload: &[u8],
770 record_count: u64,
771 schema: &Schema,
772 cdc_op: CdcOp,
773) -> Result<Vec<Record>> {
774 let mut records = Vec::with_capacity(record_count as usize);
775 let mut pos = 0;
776
777 for _ in 0..record_count {
778 let (rec_len, hdr) = decode_varint(&payload[pos..])?;
779 pos += hdr;
780 let rec_end = pos + rec_len as usize;
781 if rec_end > payload.len() {
782 return Err(err(ErrorKind::UnexpectedEof, 0, "batch record truncated"));
783 }
784
785 let mut values = Vec::with_capacity(schema.fields.len());
786 let mut rpos = 0;
787 let rec_data = &payload[pos..rec_end];
788 for field in &schema.fields {
789 let w = PackedWidth::from_field(field);
790 let (val, consumed) = decode_value(&rec_data[rpos..], &field.field_type, w)?;
791 values.push(val);
792 rpos += consumed;
793 }
794 records.push(Record { values, cdc_op });
795 pos = rec_end;
796 }
797
798 Ok(records)
799}
800
801fn parse_date_to_days(s: &str) -> Option<i32> {
804 if s.len() < 10 {
805 return None;
806 }
807 let year: i32 = s[0..4].parse().ok()?;
808 let month: u32 = s[5..7].parse().ok()?;
809 let day: u32 = s[8..10].parse().ok()?;
810 Some(civil_to_days(year, month, day))
811}
812
813fn parse_datetime_to_micros(s: &str) -> Option<i64> {
814 let days = parse_date_to_days(&s[..10])? as i64;
815 let mut micros = days * 86_400_000_000;
816 if s.len() > 11 {
817 let time_part = s[11..].trim_end_matches('Z');
818 let parts: Vec<&str> = time_part.split(':').collect();
819 if parts.len() >= 2 {
820 let h: i64 = parts[0].parse().ok()?;
821 let m: i64 = parts[1].parse().ok()?;
822 let s_part = if parts.len() >= 3 { parts[2] } else { "0" };
823 let (sec, frac) = if let Some(dot) = s_part.find('.') {
824 let sec: i64 = s_part[..dot].parse().ok()?;
825 let frac_str = &s_part[dot + 1..];
826 let frac_us: i64 = match frac_str.len() {
827 1 => frac_str.parse::<i64>().ok()? * 100_000,
828 2 => frac_str.parse::<i64>().ok()? * 10_000,
829 3 => frac_str.parse::<i64>().ok()? * 1_000,
830 4 => frac_str.parse::<i64>().ok()? * 100,
831 5 => frac_str.parse::<i64>().ok()? * 10,
832 6 => frac_str.parse::<i64>().ok()?,
833 _ => frac_str[..6].parse::<i64>().ok()?,
834 };
835 (sec, frac_us)
836 } else {
837 (s_part.parse::<i64>().ok()?, 0)
838 };
839 micros += h * 3_600_000_000 + m * 60_000_000 + sec * 1_000_000 + frac;
840 }
841 }
842 Some(micros)
843}
844
845fn parse_duration_to_micros(s: &str) -> Option<i64> {
846 if !s.starts_with('P') {
847 return None;
848 }
849 let rest = &s[1..];
850 let mut total_us: i64 = 0;
851 let mut in_time = false;
852 let mut num_buf = String::new();
853
854 for c in rest.chars() {
855 if c == 'T' {
856 in_time = true;
857 continue;
858 }
859 if c.is_ascii_digit() || c == '.' {
860 num_buf.push(c);
861 continue;
862 }
863 let n: f64 = num_buf.parse().unwrap_or(0.0);
864 num_buf.clear();
865 match (in_time, c) {
866 (false, 'D') => total_us += (n * 86_400_000_000.0) as i64,
867 (false, 'W') => total_us += (n * 7.0 * 86_400_000_000.0) as i64,
868 (true, 'H') => total_us += (n * 3_600_000_000.0) as i64,
869 (true, 'M') => total_us += (n * 60_000_000.0) as i64,
870 (true, 'S') => total_us += (n * 1_000_000.0) as i64,
871 _ => {}
872 }
873 }
874 Some(total_us)
875}
876
877fn days_to_date_string(days: i32) -> String {
879 let (y, m, d) = days_to_civil(days);
880 format!("{:04}-{:02}-{:02}", y, m, d)
881}
882
883fn micros_to_datetime_string(us: i64) -> String {
885 let total_secs = us.div_euclid(1_000_000);
886 let frac = us.rem_euclid(1_000_000);
887 let days = total_secs.div_euclid(86_400) as i32;
888 let day_secs = total_secs.rem_euclid(86_400);
889 let (y, m, d) = days_to_civil(days);
890 let h = day_secs / 3600;
891 let min = (day_secs % 3600) / 60;
892 let s = day_secs % 60;
893 if frac == 0 {
894 format!("{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z", y, m, d, h, min, s)
895 } else {
896 format!("{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:06}Z", y, m, d, h, min, s, frac)
897 }
898}
899
900fn micros_to_duration_string(us: i64) -> String {
902 if us == 0 {
903 return "PT0S".to_string();
904 }
905 let total_secs = us.abs() / 1_000_000;
906 let frac = us.abs() % 1_000_000;
907 let h = total_secs / 3600;
908 let m = (total_secs % 3600) / 60;
909 let s = total_secs % 60;
910
911 let mut result = String::from("P");
912 if h > 0 || m > 0 || s > 0 || frac > 0 {
913 result.push('T');
914 if h > 0 {
915 result.push_str(&format!("{}H", h));
916 }
917 if m > 0 {
918 result.push_str(&format!("{}M", m));
919 }
920 if s > 0 || frac > 0 {
921 if frac > 0 {
922 result.push_str(&format!("{}.{}S", s, frac));
923 } else {
924 result.push_str(&format!("{}S", s));
925 }
926 }
927 }
928 result
929}
930
931fn civil_to_days(y: i32, m: u32, d: u32) -> i32 {
933 let y = if m <= 2 { y - 1 } else { y };
934 let era = y.div_euclid(400);
935 let yoe = y.rem_euclid(400) as u32;
936 let m = if m > 2 { m - 3 } else { m + 9 };
937 let doy = (153 * m + 2) / 5 + d - 1;
938 let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
939 era * 146097 + doe as i32 - 719468
940}
941
942fn days_to_civil(days: i32) -> (i32, u32, u32) {
943 let z = days + 719468;
944 let era = z.div_euclid(146097);
945 let doe = z.rem_euclid(146097) as u32;
946 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
947 let y = yoe as i32 + era * 400;
948 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
949 let mp = (5 * doy + 2) / 153;
950 let d = doy - (153 * mp + 2) / 5 + 1;
951 let m = if mp < 10 { mp + 3 } else { mp - 9 };
952 let y = if m <= 2 { y + 1 } else { y };
953 (y, m, d)
954}
955
956#[cfg(test)]
959mod tests {
960 use super::*;
961
962 #[test]
963 fn test_varint_roundtrip() {
964 for &n in &[0u64, 1, 127, 128, 16383, 16384, u32::MAX as u64, u64::MAX] {
965 let mut buf = Vec::new();
966 encode_varint(n, &mut buf);
967 let (decoded, consumed) = decode_varint(&buf).unwrap();
968 assert_eq!(decoded, n);
969 assert_eq!(consumed, buf.len());
970 }
971 }
972
973 #[test]
974 fn test_zigzag_roundtrip() {
975 for &n in &[0i64, -1, 1, -64, 64, i32::MIN as i64, i32::MAX as i64, i64::MIN, i64::MAX] {
976 let encoded = zigzag_encode(n);
977 let decoded = zigzag_decode(encoded);
978 assert_eq!(decoded, n);
979 }
980 }
981
982 #[test]
983 fn test_encode_decode_scalars() {
984 let cases: Vec<(Value, Type)> = vec![
985 (Value::Bool(true), Type::Bool),
986 (Value::Bool(false), Type::Bool),
987 (Value::Uint(42), Type::Uint),
988 (Value::Uint(0), Type::Uint),
989 (Value::Int(-42), Type::Int),
990 (Value::Int(0), Type::Int),
991 (Value::Float(3.14), Type::Float),
992 (Value::Str("hello".to_string()), Type::Str),
993 (Value::Str("".to_string()), Type::Str),
994 (Value::Null, Type::Null),
995 ];
996
997 for (value, ty) in cases {
998 let mut buf = Vec::new();
999 encode_value(&value, &ty, PackedWidth::Default, &mut buf);
1000 let (decoded, consumed) = decode_value(&buf, &ty, PackedWidth::Default).unwrap();
1001 assert_eq!(decoded, value, "failed for type {:?}", ty);
1002 assert_eq!(consumed, buf.len());
1003 }
1004 }
1005
1006 #[test]
1007 fn test_nullable_roundtrip() {
1008 let ty = Type::Nullable(Box::new(Type::Int));
1009
1010 let mut buf = Vec::new();
1011 encode_value(&Value::Int(42), &ty, PackedWidth::Default, &mut buf);
1012 let (val, _) = decode_value(&buf, &ty, PackedWidth::Default).unwrap();
1013 assert_eq!(val, Value::Int(42));
1014
1015 let mut buf = Vec::new();
1016 encode_value(&Value::Null, &ty, PackedWidth::Default, &mut buf);
1017 let (val, _) = decode_value(&buf, &ty, PackedWidth::Default).unwrap();
1018 assert_eq!(val, Value::Null);
1019 }
1020
1021 #[test]
1022 fn test_width_modifiers() {
1023 let mut buf = Vec::new();
1025 encode_uint(&Value::Uint(1000), PackedWidth::U16, &mut buf);
1026 assert_eq!(buf.len(), 2);
1027 let (val, _) = decode_uint(&buf, PackedWidth::U16).unwrap();
1028 assert_eq!(val, Value::Uint(1000));
1029
1030 let mut buf = Vec::new();
1032 encode_float(&Value::Float(3.14), PackedWidth::F32, &mut buf);
1033 assert_eq!(buf.len(), 4);
1034 let (val, _) = decode_float(&buf, PackedWidth::F32).unwrap();
1035 if let Value::Float(f) = val {
1036 assert!((f - 3.14).abs() < 0.001);
1037 } else {
1038 panic!("expected float");
1039 }
1040 }
1041
1042 #[test]
1043 fn test_record_frame_roundtrip() {
1044 let schema = Schema {
1045 fields: vec![
1046 FieldDef {
1047 name: "id".to_string(),
1048 field_type: Type::Uint,
1049 semantic: Some("id".to_string()),
1050 deprecated: false,
1051 modifiers: Vec::new(),
1052 },
1053 FieldDef {
1054 name: "name".to_string(),
1055 field_type: Type::Str,
1056 semantic: None,
1057 deprecated: false,
1058 modifiers: Vec::new(),
1059 },
1060 FieldDef {
1061 name: "active".to_string(),
1062 field_type: Type::Bool,
1063 semantic: None,
1064 deprecated: false,
1065 modifiers: Vec::new(),
1066 },
1067 ],
1068 };
1069
1070 let record = Record {
1071 values: vec![
1072 Value::Uint(42),
1073 Value::Str("alice".to_string()),
1074 Value::Bool(true),
1075 ],
1076 cdc_op: CdcOp::Insert,
1077 };
1078
1079 let mut buf = Vec::new();
1080 encode_record_frame(&record, &schema, &mut buf);
1081
1082 assert_eq!(buf[0], FRAME_INSERT);
1084 let (payload_len, hdr) = decode_varint(&buf[1..]).unwrap();
1085 let payload = &buf[1 + hdr..1 + hdr + payload_len as usize];
1086 let decoded = decode_record_frame(FRAME_INSERT, payload, &schema).unwrap();
1087
1088 assert_eq!(decoded.values, record.values);
1089 assert_eq!(decoded.cdc_op, CdcOp::Insert);
1090 }
1091
1092 #[test]
1093 fn test_cdc_delete_frame() {
1094 let schema = Schema {
1095 fields: vec![
1096 FieldDef {
1097 name: "id".to_string(),
1098 field_type: Type::Uint,
1099 semantic: Some("id".to_string()),
1100 deprecated: false,
1101 modifiers: Vec::new(),
1102 },
1103 FieldDef {
1104 name: "name".to_string(),
1105 field_type: Type::Str,
1106 semantic: None,
1107 deprecated: false,
1108 modifiers: Vec::new(),
1109 },
1110 ],
1111 };
1112
1113 let record = Record {
1114 values: vec![Value::Uint(5), Value::Null],
1115 cdc_op: CdcOp::Delete,
1116 };
1117
1118 let mut buf = Vec::new();
1119 encode_record_frame(&record, &schema, &mut buf);
1120 assert_eq!(buf[0], FRAME_DELETE);
1121
1122 let (payload_len, hdr) = decode_varint(&buf[1..]).unwrap();
1123 let payload = &buf[1 + hdr..1 + hdr + payload_len as usize];
1124 let decoded = decode_record_frame(FRAME_DELETE, payload, &schema).unwrap();
1125
1126 assert_eq!(decoded.cdc_op, CdcOp::Delete);
1127 assert_eq!(decoded.values[0], Value::Uint(5));
1128 assert_eq!(decoded.values[1], Value::Null); }
1130
1131 #[test]
1132 fn test_date_roundtrip() {
1133 let mut buf = Vec::new();
1134 let val = Value::Date("2026-03-14".to_string());
1135 encode_value(&val, &Type::Date, PackedWidth::Default, &mut buf);
1136 assert_eq!(buf.len(), 4);
1137 let (decoded, _) = decode_value(&buf, &Type::Date, PackedWidth::Default).unwrap();
1138 assert_eq!(decoded, val);
1139 }
1140
1141 #[test]
1142 fn test_datetime_roundtrip() {
1143 let mut buf = Vec::new();
1144 let val = Value::DateTime("2026-03-14T10:30:00Z".to_string());
1145 encode_value(&val, &Type::DateTime, PackedWidth::Default, &mut buf);
1146 assert_eq!(buf.len(), 8);
1147 let (decoded, _) = decode_value(&buf, &Type::DateTime, PackedWidth::Default).unwrap();
1148 assert_eq!(decoded, val);
1149 }
1150
1151 #[test]
1152 fn test_duration_roundtrip() {
1153 let mut buf = Vec::new();
1154 let val = Value::Duration("PT2H30M".to_string());
1155 encode_value(&val, &Type::Duration, PackedWidth::Default, &mut buf);
1156 let (decoded, _) = decode_value(&buf, &Type::Duration, PackedWidth::Default).unwrap();
1157 assert_eq!(decoded, val);
1158 }
1159
1160 #[test]
1161 fn test_enum_roundtrip() {
1162 let variants = vec!["open".to_string(), "closed".to_string(), "merged".to_string()];
1163 let ty = Type::Enum(variants);
1164 let val = Value::Enum("closed".to_string());
1165 let mut buf = Vec::new();
1166 encode_value(&val, &ty, PackedWidth::Default, &mut buf);
1167 let (decoded, _) = decode_value(&buf, &ty, PackedWidth::Default).unwrap();
1168 assert_eq!(decoded, val);
1169 }
1170
1171 #[test]
1172 fn test_array_roundtrip() {
1173 let ty = Type::Array(Box::new(Type::Int));
1174 let val = Value::Array(vec![Value::Int(1), Value::Int(-2), Value::Int(3)]);
1175 let mut buf = Vec::new();
1176 encode_value(&val, &ty, PackedWidth::Default, &mut buf);
1177 let (decoded, _) = decode_value(&buf, &ty, PackedWidth::Default).unwrap();
1178 assert_eq!(decoded, val);
1179 }
1180
1181 #[test]
1182 fn test_any_roundtrip() {
1183 let cases: Vec<Value> = vec![
1184 Value::Null,
1185 Value::Bool(true),
1186 Value::Int(-42),
1187 Value::Uint(100),
1188 Value::Float(2.718),
1189 Value::Str("test".to_string()),
1190 ];
1191 for val in cases {
1192 let mut buf = Vec::new();
1193 encode_value(&val, &Type::Any, PackedWidth::Default, &mut buf);
1194 let (decoded, _) = decode_value(&buf, &Type::Any, PackedWidth::Default).unwrap();
1195 assert_eq!(decoded, val, "any roundtrip failed for {:?}", val);
1196 }
1197 }
1198
1199 #[test]
1200 fn test_batch_frame_roundtrip() {
1201 let schema = Schema {
1202 fields: vec![
1203 FieldDef {
1204 name: "id".to_string(),
1205 field_type: Type::Uint,
1206 semantic: Some("id".to_string()),
1207 deprecated: false,
1208 modifiers: Vec::new(),
1209 },
1210 FieldDef {
1211 name: "val".to_string(),
1212 field_type: Type::Str,
1213 semantic: None,
1214 deprecated: false,
1215 modifiers: Vec::new(),
1216 },
1217 ],
1218 };
1219
1220 let records = vec![
1221 Record {
1222 values: vec![Value::Uint(1), Value::Str("a".to_string())],
1223 cdc_op: CdcOp::Insert,
1224 },
1225 Record {
1226 values: vec![Value::Uint(2), Value::Str("b".to_string())],
1227 cdc_op: CdcOp::Insert,
1228 },
1229 Record {
1230 values: vec![Value::Uint(3), Value::Str("c".to_string())],
1231 cdc_op: CdcOp::Insert,
1232 },
1233 ];
1234
1235 let mut buf = Vec::new();
1236 encode_batch_frame(&records, &schema, CdcOp::Insert, &mut buf);
1237
1238 assert_eq!(buf[0], FRAME_BATCH_INSERT);
1239 let (count, h1) = decode_varint(&buf[1..]).unwrap();
1240 assert_eq!(count, 3);
1241 let (payload_len, h2) = decode_varint(&buf[1 + h1..]).unwrap();
1242 let payload = &buf[1 + h1 + h2..1 + h1 + h2 + payload_len as usize];
1243
1244 let decoded = decode_batch_frame(payload, count, &schema, CdcOp::Insert).unwrap();
1245 assert_eq!(decoded.len(), 3);
1246 assert_eq!(decoded[0].values[0], Value::Uint(1));
1247 assert_eq!(decoded[2].values[1], Value::Str("c".to_string()));
1248 }
1249}