use std::io::Cursor;
use riegeli::{
CompressionType, Field, FieldProjection, ReaderOptions, RecordReader, RecordWriter,
WriterOptions,
};
fn encode_u32(v: u32) -> Vec<u8> {
let mut out = Vec::new();
let mut v = v as u64;
loop {
if v < 0x80 {
out.push(v as u8);
break;
}
out.push((v as u8 & 0x7f) | 0x80);
v >>= 7;
}
out
}
fn decode_u32(buf: &[u8]) -> Result<(u32, usize), String> {
let mut result = 0u32;
let mut shift = 0u32;
for (i, &byte) in buf.iter().enumerate() {
if shift >= 32 {
return Err("varint overflow".into());
}
result |= ((byte & 0x7f) as u32) << shift;
shift += 7;
if byte & 0x80 == 0 {
return Ok((result, i + 1));
}
}
Err("unexpected EOF".into())
}
fn encode_varint_field(field_number: u32, value: u64) -> Vec<u8> {
let tag = (field_number << 3) | 0; let mut out = encode_u32(tag);
let mut v = value;
loop {
let byte = (v & 0x7F) as u8;
v >>= 7;
if v == 0 {
out.push(byte);
break;
} else {
out.push(byte | 0x80);
}
}
out
}
fn encode_fixed32_field(field_number: u32, value: u32) -> Vec<u8> {
let tag = (field_number << 3) | 5; let mut out = encode_u32(tag);
out.extend_from_slice(&value.to_le_bytes());
out
}
fn encode_string_field(field_number: u32, value: &[u8]) -> Vec<u8> {
let tag = (field_number << 3) | 2; let mut out = encode_u32(tag);
out.extend_from_slice(&encode_u32(value.len() as u32));
out.extend_from_slice(value);
out
}
fn make_proto_record(field1: u64, field2: u32, field3: &[u8]) -> Vec<u8> {
let mut rec = Vec::new();
rec.extend_from_slice(&encode_varint_field(1, field1));
rec.extend_from_slice(&encode_fixed32_field(2, field2));
rec.extend_from_slice(&encode_string_field(3, field3));
rec
}
fn parse_field1_varint(record: &[u8]) -> Option<u64> {
let mut pos = 0;
while pos < record.len() {
let remaining = &record[pos..];
let (tag, consumed) = decode_u32(remaining).ok()?;
pos += consumed;
let field_number = tag >> 3;
let wire_type = tag & 7;
if field_number == 1 && wire_type == 0 {
let mut val: u64 = 0;
let mut shift = 0u64;
loop {
if pos >= record.len() {
return None;
}
let b = record[pos];
pos += 1;
val |= ((b & 0x7F) as u64) << shift;
shift += 7;
if b < 0x80 {
break;
}
}
return Some(val);
} else {
match wire_type {
0 => {
while pos < record.len() {
let b = record[pos];
pos += 1;
if b < 0x80 {
break;
}
}
}
5 => pos += 4, 1 => pos += 8, 2 => {
let rem = &record[pos..];
let (len, c) = decode_u32(rem).ok()?;
pos += c + len as usize;
}
_ => break,
}
}
}
None
}
fn field_absent(record: &[u8], field_number: u32) -> bool {
let mut pos = 0;
while pos < record.len() {
let remaining = &record[pos..];
let (tag, consumed) = match decode_u32(remaining) {
Ok(v) => v,
Err(_) => return true,
};
pos += consumed;
let fn_num = tag >> 3;
let wire_type = tag & 7;
if fn_num == field_number {
return false;
}
match wire_type {
0 => {
while pos < record.len() {
let b = record[pos];
pos += 1;
if b < 0x80 {
break;
}
}
}
5 => pos += 4,
1 => pos += 8,
2 => {
let rem = &record[pos..];
let (len, c) = match decode_u32(rem) {
Ok(v) => v,
Err(_) => return true,
};
pos += c + len as usize;
}
_ => break,
}
}
true
}
fn write_records(records: &[&[u8]], opts: WriterOptions) -> Vec<u8> {
let mut buf = Cursor::new(Vec::<u8>::new());
{
let mut w = RecordWriter::new(&mut buf, opts).expect("new ok");
for rec in records {
w.write_record(rec).expect("write ok");
}
w.flush().expect("flush ok");
}
buf.into_inner()
}
fn read_all(data: &[u8], opts: ReaderOptions) -> Vec<Vec<u8>> {
let cursor = Cursor::new(data);
let mut reader = RecordReader::new(cursor, opts).expect("reader new ok");
let mut out = Vec::new();
while let Some(rec) = reader.read_record().expect("read ok") {
out.push(rec);
}
out
}
#[test]
fn test_projection_field1_only_simple_compression() {
let records: Vec<Vec<u8>> = (0..10u64)
.map(|i| make_proto_record(i + 1, (i * 100) as u32, b"hello"))
.collect();
let record_refs: Vec<&[u8]> = records.iter().map(|r| r.as_slice()).collect();
let opts = WriterOptions::new()
.transpose(true)
.compression(CompressionType::None);
let data = write_records(&record_refs, opts);
let proj = FieldProjection::new().add_field(Field::new(vec![1]));
let reader_opts = ReaderOptions::new().field_projection(proj);
let results = read_all(&data, reader_opts);
assert_eq!(results.len(), 10, "should read 10 records");
for (i, rec) in results.iter().enumerate() {
let val = parse_field1_varint(rec).expect("field 1 should be present");
assert_eq!(val, (i as u64) + 1, "field 1 value mismatch at record {i}");
assert!(
field_absent(rec, 2),
"field 2 should be absent in record {i}"
);
assert!(
field_absent(rec, 3),
"field 3 should be absent in record {i}"
);
}
}
#[test]
#[cfg(feature = "brotli")]
fn test_projection_with_small_bucket_fraction() {
let records: Vec<Vec<u8>> = (0..100u64)
.map(|i| make_proto_record(i + 1, (i * 7) as u32, b"world"))
.collect();
let record_refs: Vec<&[u8]> = records.iter().map(|r| r.as_slice()).collect();
let opts = WriterOptions::new()
.transpose(true)
.bucket_fraction(0.1)
.compression(CompressionType::Brotli);
let data = write_records(&record_refs, opts);
let proj = FieldProjection::new().add_field(Field::new(vec![1]));
let reader_opts = ReaderOptions::new().field_projection(proj);
let results = read_all(&data, reader_opts);
assert_eq!(results.len(), 100, "should read 100 records");
for (i, rec) in results.iter().enumerate() {
let val = parse_field1_varint(rec).expect("field 1 should be present");
assert_eq!(val, (i as u64) + 1, "field 1 value mismatch at record {i}");
}
}
#[test]
fn test_nonproto_records_passthrough() {
let nonproto: Vec<Vec<u8>> = (0..5u8).map(|i| vec![0xFF, 0xFE, i, i + 1]).collect();
let record_refs: Vec<&[u8]> = nonproto.iter().map(|r| r.as_slice()).collect();
let opts = WriterOptions::new()
.transpose(true)
.compression(CompressionType::None);
let data = write_records(&record_refs, opts);
let proj = FieldProjection::new().add_field(Field::new(vec![1]));
let results = read_all(&data, ReaderOptions::new().field_projection(proj));
assert_eq!(
results.len(),
nonproto.len(),
"should read all nonproto records"
);
for (i, (got, expected)) in results.iter().zip(nonproto.iter()).enumerate() {
assert_eq!(got, expected, "nonproto record {i} mismatch");
}
}
#[test]
fn test_projection_single_bucket() {
let records: Vec<Vec<u8>> = (0..10u64)
.map(|i| make_proto_record(i + 100, (i * 5) as u32, b"xyz"))
.collect();
let record_refs: Vec<&[u8]> = records.iter().map(|r| r.as_slice()).collect();
let opts = WriterOptions::new()
.transpose(true)
.bucket_fraction(1.0)
.compression(CompressionType::None);
let data = write_records(&record_refs, opts);
let proj = FieldProjection::new().add_field(Field::new(vec![1]));
let results = read_all(&data, ReaderOptions::new().field_projection(proj));
assert_eq!(results.len(), 10);
for (i, rec) in results.iter().enumerate() {
let val = parse_field1_varint(rec).expect("field 1 should be present");
assert_eq!(val, (i as u64) + 100, "field 1 mismatch at record {i}");
assert!(
field_absent(rec, 2),
"field 2 should be absent at record {i}"
);
assert!(
field_absent(rec, 3),
"field 3 should be absent at record {i}"
);
}
}
#[test]
fn test_simple_chunk_projection_passthrough() {
let records: Vec<Vec<u8>> = (0..10u64)
.map(|i| make_proto_record(i + 1, (i * 2) as u32, b"simple"))
.collect();
let record_refs: Vec<&[u8]> = records.iter().map(|r| r.as_slice()).collect();
let opts = WriterOptions::new()
.transpose(false)
.compression(CompressionType::None);
let data = write_records(&record_refs, opts);
let no_proj = read_all(&data, ReaderOptions::new());
let proj = FieldProjection::new().add_field(Field::new(vec![1]));
let with_proj = read_all(&data, ReaderOptions::new().field_projection(proj));
assert_eq!(no_proj, with_proj, "simple chunk should ignore projection");
}
#[test]
fn test_nested_field_projection_roundtrip() {
let records: Vec<Vec<u8>> = (0..5u64)
.map(|i| {
let inner = encode_varint_field(3, i);
let middle = encode_string_field(2, &inner);
encode_string_field(1, &middle)
})
.collect();
let record_refs: Vec<&[u8]> = records.iter().map(|r| r.as_slice()).collect();
let opts = WriterOptions::new()
.transpose(true)
.compression(CompressionType::None);
let data = write_records(&record_refs, opts);
let proj = FieldProjection::new().add_field(Field::new(vec![1, 2, 3]));
let results = read_all(&data, ReaderOptions::new().field_projection(proj));
assert_eq!(results.len(), 5, "should read 5 records");
for (i, rec) in results.iter().enumerate() {
assert!(
!field_absent(rec, 1),
"field 1 should be present in record {i}"
);
}
}
#[test]
fn test_size_before_any_reads() {
let records: Vec<Vec<u8>> = (0..50u8).map(|i| vec![i; 20]).collect();
let record_refs: Vec<&[u8]> = records.iter().map(|r| r.as_slice()).collect();
let data = write_records(&record_refs, WriterOptions::new());
let cursor = Cursor::new(data);
let mut reader = RecordReader::new(cursor, ReaderOptions::new()).expect("reader new ok");
let sz = reader.size().expect("size() should not fail");
assert_eq!(sz, 50, "size() should return 50");
let mut count = 0;
while let Some(rec) = reader.read_record().expect("read after size() ok") {
assert_eq!(rec, records[count], "record {count} mismatch after size()");
count += 1;
}
assert_eq!(count, 50, "should read all 50 records after size()");
}
#[test]
fn test_size_preserves_position_mid_read() {
let records: Vec<Vec<u8>> = (0..100u8).map(|i| vec![i; 10]).collect();
let record_refs: Vec<&[u8]> = records.iter().map(|r| r.as_slice()).collect();
let data = write_records(&record_refs, WriterOptions::new().chunk_size(200));
let cursor = Cursor::new(data);
let mut reader = RecordReader::new(cursor, ReaderOptions::new()).expect("new ok");
for i in 0..10 {
let rec = reader
.read_record()
.expect("read ok")
.expect("should have record");
assert_eq!(rec, records[i]);
}
let sz = reader.size().expect("size() ok");
assert_eq!(sz, 100);
let rec11 = reader
.read_record()
.expect("read ok after size()")
.expect("should have record 11");
assert_eq!(
rec11, records[10],
"position should be preserved after size()"
);
}