use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use arrow::array::{Array as _, BooleanArray, RecordBatch, StringArray, UInt64Array};
use futures::StreamExt as _;
use re_dataframe::TimelineName;
use re_log_types::{EntityPath, TimeInt};
use re_protos::common::v1alpha1::ext::SegmentId;
use re_redap_client::{ApiError, ApiResult};
use tokio::sync::mpsc::Sender;
use tracing::Instrument as _;
use crate::analytics::{QueryErrorKind, TaskFetchStats};
use crate::chunk_fetcher::{
ChunksWithSegment, SortedChunksWithSegment, batch_byte_size, batch_byte_size_uncompressed,
batch_has_any_direct_urls, fetch_batch_direct, fetch_batch_group_via_grpc,
split_batch_by_direct_url,
};
use crate::dataframe_query_common::{DataframeClientAPI, force_grpc};
use crate::metrics_capture::QueryMetrics;
use crate::pipeline_budget::PipelineBudget;
use crate::segment_chunk_manifest::SegmentChunkManifest;
use re_dataframe::external::re_chunk::{Chunk, TimeColumn};
use super::cpu_worker::CpuWorkerMsg;
const TARGET_BATCH_SIZE_BYTES: usize = 8 * 1024 * 1024;
const GRPC_BATCH_SIZE: usize = 12;
const IO_PIPELINE_BUFFER: usize = 24;
fn build_segment_manifests(
chunk_infos: &[RecordBatch],
filtered_timeline: Option<TimelineName>,
) -> ApiResult<HashMap<SegmentId, SegmentChunkManifest>> {
let mut manifests: HashMap<SegmentId, SegmentChunkManifest> = HashMap::new();
let Some(timeline_name) = filtered_timeline else {
return Ok(manifests);
};
let start_col_name = format!("{timeline_name}:start");
if chunk_infos
.iter()
.any(|rb| rb.column_by_name(&start_col_name).is_none())
{
return Ok(manifests);
}
for rb in chunk_infos {
let start_col = rb
.column_by_name(&start_col_name)
.expect("pre-check above guarantees presence on every batch");
let (start_values, start_nulls) = TimeColumn::read_nullable_array(start_col.as_ref())
.map_err(|err| {
ApiError::internal(format!(
"`{start_col_name}` column has unsupported type: {err}"
))
})?;
let seg_arr = rb
.column_by_name(
re_protos::cloud::v1alpha1::QueryDatasetResponse::FIELD_CHUNK_SEGMENT_ID,
)
.ok_or_else(|| ApiError::internal("missing segment_id column in chunk_info batch"))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| ApiError::internal("segment_id column is not a string array"))?;
let entity_arr = rb
.column_by_name(
re_protos::cloud::v1alpha1::QueryDatasetResponse::FIELD_CHUNK_ENTITY_PATH,
)
.ok_or_else(|| ApiError::internal("missing entity_path column in chunk_info batch"))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| ApiError::internal("entity_path column is not a string array"))?;
let static_arr = rb
.column_by_name(re_protos::cloud::v1alpha1::QueryDatasetResponse::FIELD_CHUNK_IS_STATIC)
.ok_or_else(|| ApiError::internal("missing is_static column in chunk_info batch"))?
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or_else(|| ApiError::internal("is_static column is not a boolean array"))?;
for i in 0..rb.num_rows() {
if start_nulls.as_ref().is_some_and(|n| n.is_null(i)) {
continue;
}
if static_arr.value(i) {
continue;
}
let time_min = TimeInt::saturated_temporal_i64(start_values[i]);
if time_min.is_static() {
continue;
}
let seg = SegmentId::from(seg_arr.value(i));
let entity = EntityPath::from(entity_arr.value(i));
manifests
.entry(seg)
.or_default()
.expect_chunk(entity, time_min);
}
}
for m in manifests.values_mut() {
m.lock();
}
Ok(manifests)
}
fn count_chunks_per_segment(chunk_infos: &[RecordBatch]) -> ApiResult<Vec<(SegmentId, usize)>> {
let mut counts: HashMap<String, usize> = HashMap::new();
let mut order: Vec<String> = Vec::new();
for rb in chunk_infos {
let seg_col = rb
.column_by_name(
re_protos::cloud::v1alpha1::QueryDatasetResponse::FIELD_CHUNK_SEGMENT_ID,
)
.ok_or_else(|| ApiError::internal("missing segment_id column in chunk_info batch"))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| ApiError::internal("segment_id column is not a string array"))?;
for i in 0..rb.num_rows() {
let seg = seg_col.value(i);
if let Some(c) = counts.get_mut(seg) {
*c += 1;
} else {
let owned = seg.to_owned();
order.push(owned.clone());
counts.insert(owned, 1);
}
}
}
Ok(order
.into_iter()
.map(|s| {
let c = counts.remove(&s).unwrap_or(0);
(SegmentId::from(s), c)
})
.collect())
}
fn extract_segment_id(chunk_info: &RecordBatch) -> ApiResult<SegmentId> {
let segment_ids = chunk_info
.column_by_name(re_protos::cloud::v1alpha1::QueryDatasetResponse::FIELD_CHUNK_SEGMENT_ID)
.ok_or_else(|| ApiError::internal("missing segment_id column in chunk_info batch"))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| ApiError::internal("segment_id column is not a string array"))?;
Ok(SegmentId::from(segment_ids.value(0)))
}
fn extract_chunk_sizes(chunk_info: &RecordBatch) -> ApiResult<&UInt64Array> {
let chunk_sizes = chunk_info
.column_by_name(re_protos::cloud::v1alpha1::QueryDatasetResponse::FIELD_CHUNK_BYTE_LENGTH)
.ok_or_else(|| ApiError::internal("missing chunk_byte_len column in chunk_info batch"))?
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| ApiError::internal("chunk_byte_len column is not a uint64 array"))?;
Ok(chunk_sizes)
}
type BatchingResult = (Vec<RecordBatch>, Vec<SegmentId>);
#[tracing::instrument(level = "info", skip_all, fields(num_chunk_infos = chunk_infos.len(), target_size_bytes))]
fn create_request_batches(
chunk_infos: Vec<RecordBatch>,
target_size_bytes: u64,
) -> ApiResult<BatchingResult> {
re_tracing::profile_function!();
let merge_err = |err: arrow::error::ArrowError, ctx: &'static str| {
ApiError::deserialization_with_source(None, err, ctx)
};
let mut request_batches = Vec::new();
let mut current_batch = Vec::new();
let mut current_batch_size = 0u64;
let mut segment_order = Vec::new();
let mut segment_seen = HashSet::new();
for chunk_info in chunk_infos {
let segment_id = extract_segment_id(&chunk_info)?;
let chunk_sizes = extract_chunk_sizes(&chunk_info)?;
let segment_size: u64 = chunk_sizes.iter().map(|v| v.unwrap_or(0)).sum();
if segment_seen.insert(segment_id.clone()) {
segment_order.push(segment_id.clone());
}
if !current_batch.is_empty() && current_batch_size + segment_size > target_size_bytes {
let merged_batch = re_arrow_util::concat_polymorphic_batches(¤t_batch)
.map_err(|err| merge_err(err, "merging chunk-info batches"))?;
request_batches.push(merged_batch);
current_batch = Vec::new();
current_batch_size = 0;
}
if segment_size > target_size_bytes {
if !current_batch.is_empty() {
let merged_batch = re_arrow_util::concat_polymorphic_batches(¤t_batch)
.map_err(|err| merge_err(err, "merging chunk-info batches"))?;
request_batches.push(merged_batch);
current_batch = Vec::new();
current_batch_size = 0;
}
let split_batches =
split_large_segments(&segment_id, &chunk_info, target_size_bytes, chunk_sizes)?;
for split_batch in split_batches {
request_batches.push(split_batch);
}
} else {
current_batch.push(chunk_info);
current_batch_size += segment_size;
}
}
if !current_batch.is_empty() {
let merged_batch = re_arrow_util::concat_polymorphic_batches(¤t_batch)
.map_err(|err| merge_err(err, "merging final chunk-info batch"))?;
request_batches.push(merged_batch);
}
tracing::debug!(
"Batching complete: {} segments → {} batches (target_size={}KB)",
segment_order.len(),
request_batches.len(),
target_size_bytes / 1024
);
Ok((request_batches, segment_order))
}
fn split_large_segments(
segment_id: &SegmentId,
chunk_info: &RecordBatch,
target_size: u64,
chunk_sizes: &UInt64Array,
) -> ApiResult<Vec<RecordBatch>> {
re_tracing::profile_function!();
let take_err = |err: arrow::error::ArrowError| {
ApiError::deserialization_with_source(None, err, "slicing large segment into sub-batches")
};
let mut result_batches = Vec::new();
let mut current_indices = Vec::new();
let mut current_size = 0u64;
for row_idx in 0..chunk_info.num_rows() {
let chunk_size = chunk_sizes.value(row_idx);
if current_indices.is_empty() || current_size + chunk_size <= target_size {
current_indices.push(row_idx);
current_size += chunk_size;
} else {
let batch =
re_arrow_util::take_record_batch(chunk_info, ¤t_indices).map_err(take_err)?;
result_batches.push(batch);
current_indices = vec![row_idx];
current_size = chunk_size;
}
}
if !current_indices.is_empty() {
let batch =
re_arrow_util::take_record_batch(chunk_info, ¤t_indices).map_err(take_err)?;
result_batches.push(batch);
}
tracing::debug!(
"Split large segment '{}' ({}) into {} requests",
segment_id,
re_format::format_bytes(
(0..chunk_info.num_rows())
.map(|i| chunk_sizes.value(i))
.sum::<u64>() as _
),
result_batches.len()
);
Ok(result_batches)
}
fn sort_chunks_by_segment_order(
chunks: Vec<ChunksWithSegment>,
segment_order: &[SegmentId],
) -> Vec<SortedChunksWithSegment> {
let mut segment_groups: HashMap<SegmentId, Vec<Chunk>> = HashMap::default();
for chunks_with_segment in chunks {
for (chunk, segment_id_opt) in chunks_with_segment {
let Some(segment_id) = segment_id_opt else {
continue;
};
segment_groups.entry(segment_id).or_default().push(chunk);
}
}
segment_order
.iter()
.filter_map(|segment_id| segment_groups.remove_entry(segment_id))
.collect()
}
async fn send_sorted_chunks(
chunks: Vec<ChunksWithSegment>,
global_segment_order: &[SegmentId],
output_channel: &Sender<ApiResult<CpuWorkerMsg>>,
) -> bool {
let sorted = {
let _span = tracing::info_span!("sort_chunks").entered();
sort_chunks_by_segment_order(chunks, global_segment_order)
};
let n_sorted = sorted.len();
async {
for chunk in sorted {
if output_channel
.send(Ok(CpuWorkerMsg::Chunks(chunk)))
.await
.is_err()
{
return false;
}
}
true
}
.instrument(tracing::info_span!("send_chunks", n = n_sorted))
.await
}
async fn fetch_remaining_via_grpc<T: DataframeClientAPI>(
batches: &[RecordBatch],
client: &T,
global_segment_order: &[SegmentId],
output_channel: &Sender<ApiResult<CpuWorkerMsg>>,
pipeline_budget: &PipelineBudget,
) -> ApiResult<()> {
let total_batches = batches.len();
let mut batches_completed = 0usize;
for batch_group in batches.chunks(GRPC_BATCH_SIZE) {
#[cfg(not(target_arch = "wasm32"))]
{
let bytes: u64 = batch_group.iter().map(batch_byte_size).sum();
crate::chunk_fetcher::metrics::record_grpc_no_direct_urls(bytes);
}
let estimated = batch_group
.iter()
.map(|b| batch_byte_size_uncompressed(b).unwrap_or_else(|| batch_byte_size(b)))
.sum::<u64>() as usize;
let guard = pipeline_budget.reserve_guarded(estimated).await;
let all_chunks = fetch_batch_group_via_grpc(batch_group, client).await?;
let actual: usize = all_chunks
.iter()
.flat_map(|segment_chunks| {
segment_chunks
.iter()
.map(|(chunk, _)| re_byte_size::SizeBytes::total_size_bytes(chunk) as usize)
})
.sum();
guard.commit(actual);
batches_completed += batch_group.len();
if !send_sorted_chunks(all_chunks, global_segment_order, output_channel).await {
tracing::info!(
total_batches,
batches_completed,
batches_skipped = total_batches.saturating_sub(batches_completed),
"FetchChunks IO loop short-circuited: downstream consumer closed (likely LIMIT or plan cancellation)"
);
return Ok(());
}
}
Ok(())
}
#[tracing::instrument(
level = "info",
skip_all,
fields(n_chunks, n_batches, n_segments, fetch_strategy)
)]
pub(super) async fn chunk_stream_io_loop<T: DataframeClientAPI>(
client: T,
chunk_infos: Vec<RecordBatch>,
filtered_index_timeline: Option<TimelineName>,
output_channel: Sender<ApiResult<CpuWorkerMsg>>,
pending_analytics: crate::PendingQueryAnalytics,
pipeline_budget: Arc<PipelineBudget>,
metrics: Arc<QueryMetrics>,
) -> ApiResult<()> {
let target_size_bytes = TARGET_BATCH_SIZE_BYTES as u64;
let n_chunks: usize = chunk_infos.iter().map(|rb| rb.num_rows()).sum();
let segment_chunk_counts = count_chunks_per_segment(&chunk_infos)?;
for (segment_id, count) in segment_chunk_counts {
if output_channel
.send(Ok(CpuWorkerMsg::SegmentChunkCount { segment_id, count }))
.await
.is_err()
{
return Ok(());
}
}
let manifests = build_segment_manifests(&chunk_infos, filtered_index_timeline)?;
for (segment_id, manifest) in manifests {
if output_channel
.send(Ok(CpuWorkerMsg::SegmentManifest {
segment_id,
manifest: Box::new(manifest),
}))
.await
.is_err()
{
return Ok(());
}
}
let (request_batches, global_segment_order) =
create_request_batches(chunk_infos, target_size_bytes)?;
let span = tracing::Span::current();
span.record("n_chunks", n_chunks);
span.record("n_batches", request_batches.len());
span.record("n_segments", global_segment_order.len());
re_log::debug!(
"Fetching {n_chunks} chunks in {} batches ({} segments)",
request_batches.len(),
global_segment_order.len()
);
let force_grpc = force_grpc();
if force_grpc || !request_batches.iter().any(batch_has_any_direct_urls) {
let reason = if force_grpc {
"grpc_forced"
} else {
"no_direct_urls"
};
span.record("fetch_strategy", reason);
re_log::debug!(
"{reason}, fetching all {} chunks via FetchChunks gRPC",
request_batches.len()
);
let result = fetch_remaining_via_grpc(
&request_batches,
&client,
&global_segment_order,
&output_channel,
&pipeline_budget,
)
.await;
match &result {
Ok(()) => {
let total_bytes: u64 = request_batches.iter().map(batch_byte_size).sum();
let mut stats = TaskFetchStats::default();
stats.record_grpc_fetch(total_bytes);
stats.flush_into(&metrics);
}
Err(_) => {
pending_analytics.record_error(QueryErrorKind::GrpcFetch);
}
}
return result;
}
enum FetchTask {
Direct(RecordBatch),
Grpc(RecordBatch),
}
let mut work_items: Vec<FetchTask> = Vec::new();
let mut n_direct = 0usize;
let mut n_grpc = 0usize;
for batch in &request_batches {
let (direct_batch, grpc_batch) = split_batch_by_direct_url(batch);
if let Some(b) = direct_batch {
n_direct += 1;
work_items.push(FetchTask::Direct(b));
}
if let Some(b) = grpc_batch {
n_grpc += 1;
work_items.push(FetchTask::Grpc(b));
}
}
if n_grpc == 0 {
span.record("fetch_strategy", "direct");
} else {
span.record(
"fetch_strategy",
format!("hybrid(direct={n_direct},grpc={n_grpc})"),
);
}
re_log::debug!("Fetch tasks: {n_direct} direct, {n_grpc} gRPC fallback");
let http_client = reqwest::Client::new();
let total_tasks = work_items.len();
let fetch_stream = futures::stream::iter(work_items.into_iter().enumerate())
.map(|(task_idx, task)| {
let http_client = http_client.clone();
let client = client.clone();
let pending_analytics = pending_analytics.clone();
let pipeline_budget = Arc::clone(&pipeline_budget);
let metrics = Arc::clone(&metrics);
async move {
let mut stats = TaskFetchStats::default();
let estimated = match &task {
FetchTask::Direct(b) | FetchTask::Grpc(b) => batch_byte_size_uncompressed(b)
.unwrap_or_else(|| batch_byte_size(b))
as usize,
};
let guard = pipeline_budget.reserve_guarded(estimated).await;
let chunks = match task {
FetchTask::Direct(batch) => {
let bytes = batch_byte_size(&batch);
let chunks = match fetch_batch_direct(
&batch,
&http_client,
&mut stats,
&pending_analytics,
)
.await
{
Ok(chunks) => chunks,
Err(err) => {
stats.try_flush_into(
&pending_analytics,
&metrics,
Err(QueryErrorKind::DirectFetch),
);
return Err(err);
}
};
stats.record_direct_fetch(bytes);
chunks
}
FetchTask::Grpc(batch) => {
let bytes = batch_byte_size(&batch);
#[cfg(not(target_arch = "wasm32"))]
crate::chunk_fetcher::metrics::record_grpc_no_direct_urls(bytes);
let chunks =
match fetch_batch_group_via_grpc(std::slice::from_ref(&batch), &client)
.await
{
Ok(chunks) => chunks,
Err(err) => {
stats.try_flush_into(
&pending_analytics,
&metrics,
Err(QueryErrorKind::GrpcFetch),
);
return Err(err);
}
};
stats.record_grpc_fetch(bytes);
chunks
}
};
stats.try_flush_into(&pending_analytics, &metrics, Ok(()));
let actual: usize = chunks
.iter()
.flat_map(|seg| {
seg.iter()
.map(|(c, _)| re_byte_size::SizeBytes::total_size_bytes(c) as usize)
})
.sum();
guard.commit(actual);
Ok::<_, ApiError>(chunks)
}
.instrument(tracing::info_span!("fetch_task", task_idx))
})
.buffer_unordered(IO_PIPELINE_BUFFER);
tokio::pin!(fetch_stream);
let mut tasks_completed: usize = 0;
while let Some(result) = fetch_stream.next().await {
let chunks = result?;
tasks_completed += 1;
if !send_sorted_chunks(chunks, &global_segment_order, &output_channel).await {
tracing::info!(
total_tasks,
tasks_completed,
in_flight_or_pending = total_tasks.saturating_sub(tasks_completed),
"FetchChunks IO loop short-circuited (hybrid path): downstream consumer closed (likely LIMIT or plan cancellation)"
);
return Ok(());
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use arrow::array::{Array as _, FixedSizeBinaryBuilder, Int64Array, RecordBatchOptions};
use arrow::datatypes::{Field, Schema};
use super::*;
fn extract_segment_id_from_chunk((segment_id, _chunks): &SortedChunksWithSegment) -> &str {
segment_id.as_ref()
}
fn create_test_chunk_info(segment_id: &str, chunk_sizes: &[u64]) -> RecordBatch {
let num_chunks = chunk_sizes.len();
let segment_ids = StringArray::from(vec![segment_id; num_chunks]);
let sizes = UInt64Array::from(chunk_sizes.to_vec());
let mut chunk_id_builder = FixedSizeBinaryBuilder::with_capacity(num_chunks, 16);
for i in 0..num_chunks {
let mut id_bytes = [0u8; 16];
id_bytes[0..4].copy_from_slice(&(i as u32).to_le_bytes());
chunk_id_builder.append_value(id_bytes).unwrap();
}
let chunk_ids = chunk_id_builder.finish();
let schema = Arc::new(Schema::new_with_metadata(
vec![
re_protos::cloud::v1alpha1::QueryDatasetResponse::field_chunk_segment_id()
.as_ref()
.clone(),
Field::new(
re_protos::cloud::v1alpha1::QueryDatasetResponse::FIELD_CHUNK_BYTE_LENGTH,
arrow::datatypes::DataType::UInt64,
false,
),
re_protos::cloud::v1alpha1::QueryDatasetResponse::field_chunk_id()
.as_ref()
.clone(),
],
HashMap::default(),
));
RecordBatch::try_new_with_options(
schema,
vec![Arc::new(segment_ids), Arc::new(sizes), Arc::new(chunk_ids)],
&RecordBatchOptions::new().with_row_count(Some(num_chunks)),
)
.unwrap()
}
fn segment_order_as_strs(segment_order: &[SegmentId]) -> Vec<&str> {
segment_order.iter().map(SegmentId::as_ref).collect()
}
#[test]
fn test_create_request_batches_single_small_segment() {
let chunk_info = create_test_chunk_info("seg1", &[100, 200, 300]); let target_size = 1000;
let (batches, segment_order) =
create_request_batches(vec![chunk_info], target_size).unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 3);
assert_eq!(segment_order_as_strs(&segment_order), vec!["seg1"]);
}
#[test]
fn test_create_request_batches_single_large_segment() {
let chunk_info = create_test_chunk_info("seg1", &[300, 400, 500, 600]); let target_size = 1000;
let (batches, segment_order) =
create_request_batches(vec![chunk_info], target_size).unwrap();
assert_eq!(batches.len(), 3);
assert_eq!(segment_order_as_strs(&segment_order), vec!["seg1"]);
}
#[test]
fn test_create_request_batches_multiple_small_segments() {
let chunk_infos = vec![
create_test_chunk_info("seg1", &[100, 150]), create_test_chunk_info("seg2", &[200, 250]), create_test_chunk_info("seg3", &[300]), create_test_chunk_info("seg4", &[100]), ];
let target_size = 800;
let (batches, segment_order) = create_request_batches(chunk_infos, target_size).unwrap();
assert_eq!(batches.len(), 2);
assert_eq!(batches[0].num_rows(), 4);
assert_eq!(batches[1].num_rows(), 2);
assert_eq!(
segment_order_as_strs(&segment_order),
vec!["seg1", "seg2", "seg3", "seg4"]
);
}
#[test]
fn test_create_request_batches_mixed_small_and_large() {
let chunk_infos = vec![
create_test_chunk_info("seg1", &[100, 200]), create_test_chunk_info("seg2", &[800, 900, 700]), create_test_chunk_info("seg3", &[150]), ];
let target_size = 1000;
let (batches, segment_order) = create_request_batches(chunk_infos, target_size).unwrap();
assert_eq!(batches.len(), 5);
assert_eq!(
segment_order_as_strs(&segment_order),
vec!["seg1", "seg2", "seg3"]
);
}
#[test]
fn test_segment_order_within_batches_is_preserved() {
let chunk_infos = vec![
create_test_chunk_info("segA", &[100]), create_test_chunk_info("segB", &[200]), create_test_chunk_info("segC", &[300]), ];
let target_size = 1000;
let (batches, segment_order) = create_request_batches(chunk_infos, target_size).unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 3);
assert_eq!(
segment_order_as_strs(&segment_order),
vec!["segA", "segB", "segC"]
);
let segment_id_column = batches[0]
.column_by_name(
re_protos::cloud::v1alpha1::QueryDatasetResponse::FIELD_CHUNK_SEGMENT_ID,
)
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let batch_segment_ids: Vec<String> = (0..segment_id_column.len())
.map(|i| segment_id_column.value(i).to_owned())
.collect();
assert_eq!(batch_segment_ids, vec!["segA", "segB", "segC"]);
}
#[test]
fn test_sort_chunks_by_segment_order_simple_case() {
use re_dataframe::external::re_chunk::Chunk;
use re_log_types::EntityPath;
let empty_chunk = Chunk::builder(EntityPath::root()).build().unwrap();
let segment_order: Vec<SegmentId> = vec!["segA".into(), "segB".into(), "segC".into()];
let chunks: Vec<ChunksWithSegment> = vec![
vec![(empty_chunk.clone(), Some("segC".into()))],
vec![(empty_chunk.clone(), Some("segA".into()))],
vec![(empty_chunk.clone(), Some("segB".into()))],
];
let sorted_chunks = sort_chunks_by_segment_order(chunks, &segment_order);
let sorted_segments: Vec<&str> = sorted_chunks
.iter()
.map(extract_segment_id_from_chunk)
.collect();
assert_eq!(sorted_segments, vec!["segA", "segB", "segC"]);
}
#[test]
fn test_sort_chunks_by_segment_order_multi_segment_response() {
use re_dataframe::external::re_chunk::Chunk;
use re_log_types::EntityPath;
let empty_chunk = Chunk::builder(EntityPath::root()).build().unwrap();
let segment_order: Vec<SegmentId> = vec!["segA".into(), "segB".into(), "segC".into()];
let chunks: Vec<ChunksWithSegment> = vec![
vec![
(empty_chunk.clone(), Some("segC".into())),
(empty_chunk.clone(), Some("segC".into())), (empty_chunk.clone(), Some("segA".into())),
(empty_chunk.clone(), Some("segB".into())),
(empty_chunk.clone(), Some("segB".into())), (empty_chunk.clone(), Some("segA".into())), (empty_chunk.clone(), Some("segB".into())), ],
];
let sorted_chunks = sort_chunks_by_segment_order(chunks, &segment_order);
assert_eq!(sorted_chunks.len(), 3);
let sorted_segments: Vec<&str> = sorted_chunks
.iter()
.map(extract_segment_id_from_chunk)
.collect();
assert_eq!(sorted_segments, vec!["segA", "segB", "segC"]);
let seg_a_chunks = sorted_chunks[0].1.len();
let seg_b_chunks = sorted_chunks[1].1.len();
let seg_c_chunks = sorted_chunks[2].1.len();
assert_eq!(seg_a_chunks, 2);
assert_eq!(seg_b_chunks, 3);
assert_eq!(seg_c_chunks, 2);
}
#[test]
fn test_sort_chunks_by_segment_order_mixed_responses() {
use re_dataframe::external::re_chunk::Chunk;
use re_log_types::EntityPath;
let empty_chunk = Chunk::builder(EntityPath::root()).build().unwrap();
let segment_order: Vec<SegmentId> = vec!["segA".into(), "segB".into(), "segC".into()];
let chunks: Vec<ChunksWithSegment> = vec![
vec![(empty_chunk.clone(), Some("segC".into()))],
vec![
(empty_chunk.clone(), Some("segB".into())),
(empty_chunk.clone(), Some("segA".into())),
],
vec![(empty_chunk.clone(), Some("segB".into()))],
];
let sorted_chunks = sort_chunks_by_segment_order(chunks, &segment_order);
assert_eq!(sorted_chunks.len(), 3);
let sorted_segments: Vec<&str> = sorted_chunks
.iter()
.map(extract_segment_id_from_chunk)
.collect();
assert_eq!(sorted_segments, vec!["segA", "segB", "segC"]);
let seg_b_chunks = sorted_chunks[1].1.len();
assert_eq!(seg_b_chunks, 2);
}
#[test]
fn test_count_chunks_per_segment_basic() {
let chunk_infos = vec![
create_test_chunk_info("segA", &[10, 20, 30]),
create_test_chunk_info("segB", &[40]),
create_test_chunk_info("segC", &[50, 60]),
];
let counts = count_chunks_per_segment(&chunk_infos).unwrap();
assert_eq!(
counts,
vec![
(SegmentId::from("segA"), 3),
(SegmentId::from("segB"), 1),
(SegmentId::from("segC"), 2),
]
);
}
#[test]
fn test_count_chunks_per_segment_sums_across_batches() {
let chunk_infos = vec![
create_test_chunk_info("segA", &[1, 2]),
create_test_chunk_info("segB", &[3]),
create_test_chunk_info("segA", &[4, 5, 6]),
];
let counts = count_chunks_per_segment(&chunk_infos).unwrap();
assert_eq!(
counts,
vec![(SegmentId::from("segA"), 5), (SegmentId::from("segB"), 1),]
);
}
fn create_chunk_info_with_starts(
timeline_name: &str,
rows: &[(&str, &str, bool, Option<i64>)],
) -> RecordBatch {
let num_rows = rows.len();
let segment_ids = StringArray::from(rows.iter().map(|r| r.0).collect::<Vec<_>>());
let entity_paths = StringArray::from(rows.iter().map(|r| r.1).collect::<Vec<_>>());
let is_static = BooleanArray::from(rows.iter().map(|r| r.2).collect::<Vec<_>>());
let starts = Int64Array::from(rows.iter().map(|r| r.3).collect::<Vec<_>>());
let schema = Arc::new(Schema::new_with_metadata(
vec![
re_protos::cloud::v1alpha1::QueryDatasetResponse::field_chunk_segment_id()
.as_ref()
.clone(),
re_protos::cloud::v1alpha1::QueryDatasetResponse::field_chunk_entity_path()
.as_ref()
.clone(),
re_protos::cloud::v1alpha1::QueryDatasetResponse::field_chunk_is_static()
.as_ref()
.clone(),
re_protos::cloud::v1alpha1::QueryDatasetResponse::field_timeline_start(
timeline_name,
)
.as_ref()
.clone(),
],
HashMap::default(),
));
RecordBatch::try_new_with_options(
schema,
vec![
Arc::new(segment_ids),
Arc::new(entity_paths),
Arc::new(is_static),
Arc::new(starts),
],
&RecordBatchOptions::new().with_row_count(Some(num_rows)),
)
.unwrap()
}
#[test]
fn test_build_segment_manifests_no_timeline_yields_empty() {
let rb = create_chunk_info_with_starts("time", &[("seg1", "/a", false, Some(10))]);
let manifests = build_segment_manifests(&[rb], None).unwrap();
assert!(manifests.is_empty());
}
#[test]
fn test_build_segment_manifests_missing_start_column_yields_empty() {
let rb = create_test_chunk_info("seg1", &[100]);
let manifests = build_segment_manifests(&[rb], Some(TimelineName::new("time"))).unwrap();
assert!(manifests.is_empty());
}
#[test]
fn test_build_segment_manifests_mixed_batches_yields_empty() {
let with_start = create_chunk_info_with_starts("time", &[("seg1", "/a", false, Some(10))]);
let without_start = create_test_chunk_info("seg1", &[20]);
let manifests = build_segment_manifests(
&[with_start, without_start],
Some(TimelineName::new("time")),
)
.unwrap();
assert!(
manifests.is_empty(),
"any batch missing `:start` must trigger full fallback, even if other batches carry it",
);
}
#[test]
fn test_build_segment_manifests_filters_null_and_static() {
let rb = create_chunk_info_with_starts(
"time",
&[
("seg1", "/a", false, None), ("seg1", "/b", true, Some(10)), ("seg1", "/c", false, Some(20)), ("seg1", "/d", false, Some(i64::MIN)), ],
);
let manifests = build_segment_manifests(&[rb], Some(TimelineName::new("time"))).unwrap();
let m = manifests
.get(&SegmentId::from("seg1"))
.expect("seg1 has at least one temporal chunk");
assert_eq!(m.outstanding_count(), 2);
assert!(m.is_locked());
assert_eq!(m.safe_horizon(), Some(TimeInt::MIN));
}
#[test]
fn test_build_segment_manifests_entity_path_keys_roundtrip() {
let rb = create_chunk_info_with_starts("time", &[("seg1", "/foo/bar", false, Some(42))]);
let mut manifests =
build_segment_manifests(&[rb], Some(TimelineName::new("time"))).unwrap();
let m = manifests.get_mut(&SegmentId::from("seg1")).unwrap();
assert!(m.record_arrival(
&EntityPath::from("/foo/bar"),
TimeInt::saturated_temporal_i64(42)
));
assert!(m.is_complete());
}
#[test]
fn test_build_segment_manifests_all_static_segment_absent() {
let rb = create_chunk_info_with_starts(
"time",
&[
("segA", "/a", true, Some(10)),
("segA", "/b", true, Some(20)),
("segB", "/c", false, Some(30)),
],
);
let manifests = build_segment_manifests(&[rb], Some(TimelineName::new("time"))).unwrap();
assert!(!manifests.contains_key(&SegmentId::from("segA")));
let b = &manifests[&SegmentId::from("segB")];
assert_eq!(b.outstanding_count(), 1);
}
#[test]
fn test_build_segment_manifests_accepts_timestamp_ns_start_column() {
use arrow::array::TimestampNanosecondArray;
let num_rows = 2;
let segment_ids = StringArray::from(vec!["seg1", "seg1"]);
let entity_paths = StringArray::from(vec!["/a", "/b"]);
let is_static = BooleanArray::from(vec![false, false]);
let starts = TimestampNanosecondArray::from(vec![Some(100_i64), Some(200_i64)]);
let schema = Arc::new(Schema::new_with_metadata(
vec![
re_protos::cloud::v1alpha1::QueryDatasetResponse::field_chunk_segment_id()
.as_ref()
.clone(),
re_protos::cloud::v1alpha1::QueryDatasetResponse::field_chunk_entity_path()
.as_ref()
.clone(),
re_protos::cloud::v1alpha1::QueryDatasetResponse::field_chunk_is_static()
.as_ref()
.clone(),
Field::new(
"time:start",
arrow::datatypes::DataType::Timestamp(
arrow::datatypes::TimeUnit::Nanosecond,
None,
),
true,
),
],
HashMap::default(),
));
let rb = RecordBatch::try_new_with_options(
schema,
vec![
Arc::new(segment_ids),
Arc::new(entity_paths),
Arc::new(is_static),
Arc::new(starts),
],
&RecordBatchOptions::new().with_row_count(Some(num_rows)),
)
.unwrap();
let manifests = build_segment_manifests(&[rb], Some(TimelineName::new("time"))).unwrap();
let m = manifests
.get(&SegmentId::from("seg1"))
.expect("seg1 has temporal chunks");
assert_eq!(m.outstanding_count(), 2);
assert_eq!(m.safe_horizon(), Some(TimeInt::new_temporal(99)));
}
}