use std::fs::File;
use std::io::{Read as _, Seek as _, SeekFrom};
use std::sync::Arc;
use itertools::Itertools as _;
use re_chunk::{Chunk, ChunkId};
use re_span::Span;
use crate::RrdManifest;
use crate::ToApplication as _;
use crate::rrd::CodecError;
const MERGE_GAP_BYTES: u64 = 64 * 1024;
pub fn read_chunks(
file: &mut File,
manifest: &RrdManifest,
chunk_ids: &[ChunkId],
) -> Result<Vec<Arc<Chunk>>, CodecError> {
if chunk_ids.is_empty() {
return Ok(Vec::new());
}
let all_ids = manifest.col_chunk_ids();
let offsets = manifest.col_chunk_byte_offset();
let sizes = manifest.col_chunk_byte_size();
let id_to_row: std::collections::HashMap<ChunkId, usize> =
all_ids.iter().enumerate().map(|(i, &id)| (id, i)).collect();
let mut entries: Vec<(ChunkId, Span<u64>)> = chunk_ids
.iter()
.map(|&id| -> Result<_, CodecError> {
let &row = id_to_row
.get(&id)
.ok_or(CodecError::ChunkNotInManifest { chunk_id: id })?;
Ok((id, Span::from_start_len(offsets[row], sizes[row])))
})
.try_collect()?;
if entries.is_empty() {
return Ok(Vec::new());
}
entries.sort_by_key(|&(_, span)| span.start);
let groups = coalesce_spans(&entries);
let mut result = Vec::with_capacity(entries.len());
for group in &groups {
file.seek(SeekFrom::Start(group.byte_span.start))?;
let mut buf = vec![0u8; usize::try_from(group.byte_span.len)?];
file.read_exact(&mut buf)?;
for &(_chunk_id, chunk_span) in &entries[group.entry_range.clone()] {
let local_span = Span::from_start_len(
usize::try_from(chunk_span.start - group.byte_span.start)?,
usize::try_from(chunk_span.len)?,
);
let chunk = decode_chunk_from_bytes(&buf[local_span.range()])?;
result.push(Arc::new(chunk));
}
}
Ok(result)
}
struct CoalescedSpan {
byte_span: Span<u64>,
entry_range: std::ops::Range<usize>,
}
fn coalesce_spans(entries: &[(ChunkId, Span<u64>)]) -> Vec<CoalescedSpan> {
let mut groups: Vec<CoalescedSpan> = Vec::new();
for (i, &(_id, span)) in entries.iter().enumerate() {
if let Some(last) = groups.last_mut() {
let last_end = last.byte_span.end();
if span.start <= last_end + MERGE_GAP_BYTES {
last.byte_span.len = span.end().max(last_end) - last.byte_span.start;
last.entry_range.end = i + 1;
continue;
}
}
groups.push(CoalescedSpan {
byte_span: span,
entry_range: i..i + 1,
});
}
groups
}
fn decode_chunk_from_bytes(buf: &[u8]) -> Result<Chunk, CodecError> {
use crate::rrd::Decodable as _;
let transport_arrow_msg = re_protos::log_msg::v1alpha1::ArrowMsg::from_rrd_bytes(buf)?;
let app_arrow_msg = transport_arrow_msg.to_application(())?;
let chunk = Chunk::from_arrow_msg(&app_arrow_msg)?;
Ok(chunk)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::rrd::test_util::{
encode_test_rrd, encode_test_rrd_to_file_with_options, make_test_chunks,
};
#[test]
fn test_read_chunks_roundtrip() {
let chunks = make_test_chunks(5);
let (rrd, store_id) = encode_test_rrd(&chunks);
let mut file = File::open(rrd.path()).unwrap();
let footer = crate::read_rrd_footer(&mut file).unwrap().unwrap();
let raw_manifest = &footer.manifests[&store_id];
let manifest = RrdManifest::try_new(raw_manifest).unwrap();
let chunk_ids = manifest.col_chunk_ids();
assert_eq!(chunk_ids.len(), chunks.len());
let loaded = read_chunks(&mut file, &manifest, chunk_ids).unwrap();
assert_eq!(loaded.len(), chunks.len());
for (i, loaded_chunk) in loaded.iter().enumerate() {
assert_eq!(loaded_chunk.entity_path(), chunks[i].entity_path());
assert_eq!(loaded_chunk.num_rows(), chunks[i].num_rows());
}
}
#[test]
fn test_read_chunks_subset() {
let chunks = make_test_chunks(5);
let (rrd, store_id) = encode_test_rrd(&chunks);
let mut file = File::open(rrd.path()).unwrap();
let footer = crate::read_rrd_footer(&mut file).unwrap().unwrap();
let raw_manifest = &footer.manifests[&store_id];
let manifest = RrdManifest::try_new(raw_manifest).unwrap();
let chunk_ids = manifest.col_chunk_ids();
let subset = [chunk_ids[0], chunk_ids[chunk_ids.len() - 1]];
let loaded = read_chunks(&mut file, &manifest, &subset).unwrap();
assert_eq!(loaded.len(), 2);
}
#[test]
fn test_read_chunks_unknown_id_errors() {
let chunks = make_test_chunks(3);
let (rrd, store_id) = encode_test_rrd(&chunks);
let mut file = File::open(rrd.path()).unwrap();
let footer = crate::read_rrd_footer(&mut file).unwrap().unwrap();
let raw_manifest = &footer.manifests[&store_id];
let manifest = RrdManifest::try_new(raw_manifest).unwrap();
let bogus_id = ChunkId::new();
let result = read_chunks(&mut file, &manifest, &[bogus_id]);
assert!(
matches!(result, Err(crate::CodecError::ChunkNotInManifest { .. })),
"Expected ChunkNotInManifest error, got: {result:?}"
);
}
fn span(start: u64, len: u64) -> Span<u64> {
Span { start, len }
}
#[test]
fn test_coalesce_spans_single_group() {
let entries = vec![
(ChunkId::new(), span(100, 50)),
(ChunkId::new(), span(150, 50)), (ChunkId::new(), span(200, 50)), ];
let groups = coalesce_spans(&entries);
assert_eq!(groups.len(), 1);
assert_eq!(groups[0].byte_span, span(100, 150)); assert_eq!(groups[0].entry_range, 0..3);
}
#[test]
fn test_coalesce_spans_multiple_groups() {
let gap = MERGE_GAP_BYTES + 1;
let entries = vec![
(ChunkId::new(), span(0, 100)),
(ChunkId::new(), span(100, 100)), (ChunkId::new(), span(200 + gap, 100)), (ChunkId::new(), span(200 + gap + 100, 50)), ];
let groups = coalesce_spans(&entries);
assert_eq!(groups.len(), 2);
assert_eq!(groups[0].byte_span, span(0, 200)); assert_eq!(groups[0].entry_range, 0..2);
assert_eq!(groups[1].byte_span, span(200 + gap, 150)); assert_eq!(groups[1].entry_range, 2..4);
}
#[test]
fn test_coalesce_spans_merge_gap_boundary() {
let entries = vec![
(ChunkId::new(), span(0, 100)),
(ChunkId::new(), span(100 + MERGE_GAP_BYTES, 50)), ];
let groups = coalesce_spans(&entries);
assert_eq!(groups.len(), 1);
assert_eq!(groups[0].byte_span, span(0, 100 + MERGE_GAP_BYTES + 50));
assert_eq!(groups[0].entry_range, 0..2);
let entries = vec![
(ChunkId::new(), span(0, 100)),
(ChunkId::new(), span(100 + MERGE_GAP_BYTES + 1, 50)),
];
let groups = coalesce_spans(&entries);
assert_eq!(groups.len(), 2);
}
#[test]
fn test_coalesce_spans_empty() {
let entries: Vec<(ChunkId, Span<u64>)> = vec![];
let groups = coalesce_spans(&entries);
assert!(groups.is_empty());
}
#[test]
fn test_read_chunks_uncompressed() {
let chunks = make_test_chunks(3);
let rrd = tempfile::NamedTempFile::new().unwrap();
let store_id = re_log_types::StoreId::random(re_log_types::StoreKind::Recording, "test");
encode_test_rrd_to_file_with_options(
rrd.path(),
&chunks,
&store_id,
true,
crate::EncodingOptions::PROTOBUF_UNCOMPRESSED,
);
let mut file = File::open(rrd.path()).unwrap();
let footer = crate::read_rrd_footer(&mut file).unwrap().unwrap();
let raw_manifest = &footer.manifests[&store_id];
let manifest = RrdManifest::try_new(raw_manifest).unwrap();
let loaded = read_chunks(&mut file, &manifest, manifest.col_chunk_ids()).unwrap();
assert_eq!(loaded.len(), chunks.len());
for (i, loaded_chunk) in loaded.iter().enumerate() {
assert_eq!(loaded_chunk.entity_path(), chunks[i].entity_path());
assert_eq!(loaded_chunk.num_rows(), chunks[i].num_rows());
}
}
}