use std::collections::BTreeMap;
use std::{error::Error as _, fmt::Write as _};
use arrow::array::{Array as _, DictionaryArray, RecordBatch, StringArray, UInt64Array};
use arrow::datatypes::Int32Type;
use futures::StreamExt as _;
use tonic::IntoRequest as _;
use tracing::Instrument as _;
use re_dataframe::external::re_chunk::Chunk;
use re_protos::cloud::v1alpha1::{FetchChunksRequest, QueryDatasetResponse};
use re_redap_client::ApiResult;
use crate::analytics::{DirectFetchFailureReason, PendingQueryAnalytics, TaskFetchStats};
use crate::dataframe_query_common::DataframeClientAPI;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) mod metrics {
use std::sync::OnceLock;
use opentelemetry::{KeyValue, metrics::Counter};
struct ChunkFetchMetrics {
direct_result: Counter<u64>,
bytes_fetched: Counter<u64>,
grpc_no_direct_urls: Counter<u64>,
}
fn get() -> &'static ChunkFetchMetrics {
static INSTANCE: OnceLock<ChunkFetchMetrics> = OnceLock::new();
INSTANCE.get_or_init(|| {
let meter = opentelemetry::global::meter("chunk_fetch");
ChunkFetchMetrics {
direct_result: meter
.u64_counter("chunk_fetch.direct.result")
.with_description("Direct fetch outcomes")
.build(),
bytes_fetched: meter
.u64_counter("chunk_fetch.bytes")
.with_description("Bytes fetched for chunk data")
.with_unit("B")
.build(),
grpc_no_direct_urls: meter
.u64_counter("chunk_fetch.grpc_no_direct_urls")
.with_description("gRPC fetches for chunks without direct URLs")
.build(),
}
})
}
pub fn record_direct_success(bytes: u64) {
let m = get();
m.direct_result
.add(1, &[KeyValue::new("result", "success")]);
m.bytes_fetched
.add(bytes, &[KeyValue::new("method", "direct")]);
}
pub fn record_direct_failure(reason: &str) {
let m = get();
m.direct_result.add(
1,
&[
KeyValue::new("result", "failure"),
KeyValue::new("reason", reason.to_owned()),
],
);
}
pub fn record_grpc_no_direct_urls(bytes: u64) {
let m = get();
m.grpc_no_direct_urls.add(1, &[]);
m.bytes_fetched
.add(bytes, &[KeyValue::new("method", "grpc")]);
}
}
pub type ChunksWithSegment = Vec<(Chunk, Option<String>)>;
pub type SortedChunksWithSegment = (String, Vec<Chunk>);
const MAX_MERGED_RANGE_SIZE: usize = 16 * 1024 * 1024;
const DIRECT_FETCH_MAX_RETRIES: usize = 10;
struct ChunkInMergedRange {
original_row_index: usize,
offset_in_merged: usize,
length: usize,
}
struct MergedRangeRequest {
url: String,
file_range_start: usize,
file_range_end: usize,
chunks: Vec<ChunkInMergedRange>,
}
#[derive(Debug)]
pub struct DirectFetchError {
msg: String,
retryable: bool,
}
impl DirectFetchError {
fn new(msg: String, retryable: bool) -> Self {
Self { msg, retryable }
}
}
impl std::fmt::Display for DirectFetchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.msg)
}
}
impl std::error::Error for DirectFetchError {}
pub fn batch_has_any_direct_urls(batch: &RecordBatch) -> bool {
batch
.column_by_name(QueryDatasetResponse::FIELD_DIRECT_URL)
.is_some_and(|col| col.null_count() < col.len())
}
pub fn split_batch_by_direct_url(
batch: &RecordBatch,
) -> (Option<RecordBatch>, Option<RecordBatch>) {
use arrow::compute::{filter_record_batch, is_not_null, not};
let Some(url_col) = batch.column_by_name(QueryDatasetResponse::FIELD_DIRECT_URL) else {
return (None, Some(batch.clone()));
};
let has_url = is_not_null(url_col).expect("is_not_null on direct_url column");
let no_url = not(&has_url).expect("boolean not");
let direct_batch = if has_url.true_count() > 0 {
Some(filter_record_batch(batch, &has_url).expect("filter_record_batch for direct URL rows"))
} else {
None
};
let grpc_batch = if no_url.true_count() > 0 {
Some(filter_record_batch(batch, &no_url).expect("filter_record_batch for gRPC rows"))
} else {
None
};
(direct_batch, grpc_batch)
}
pub fn batch_byte_size(batch: &RecordBatch) -> u64 {
batch
.column_by_name(QueryDatasetResponse::FIELD_CHUNK_BYTE_LENGTH)
.and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
.map(|arr| arr.iter().map(|v| v.unwrap_or(0)).sum())
.unwrap_or(0)
}
#[tracing::instrument(level = "info", skip_all, fields(num_chunks, byte_size))]
pub async fn fetch_batch_direct(
batch: &RecordBatch,
http_client: &reqwest::Client,
stats: &mut TaskFetchStats,
pending: Option<&PendingQueryAnalytics>,
) -> ApiResult<Vec<ChunksWithSegment>> {
#[cfg(not(target_arch = "wasm32"))]
let byte_size = batch_byte_size(batch);
let span = tracing::Span::current();
span.record("num_chunks", batch.num_rows());
#[cfg(not(target_arch = "wasm32"))]
span.record("byte_size", byte_size);
match fetch_batch_via_direct_urls(http_client, batch, stats).await {
Ok(chunks) => {
#[cfg(not(target_arch = "wasm32"))]
metrics::record_direct_success(byte_size);
Ok(chunks)
}
Err(err) => {
let reason = DirectFetchFailureReason::classify(&err);
if let Some(pending) = pending {
pending.record_direct_terminal_failure(reason);
}
#[cfg(not(target_arch = "wasm32"))]
metrics::record_direct_failure(reason.as_str());
Err(re_redap_client::ApiError::connection_with_source(
None,
err,
"fetching chunks via direct URLs",
))
}
}
}
impl DirectFetchFailureReason {
fn classify(err: &DirectFetchError) -> Self {
let msg = &err.msg;
if msg.contains("timed out") || msg.contains("Timeout") {
Self::Timeout
} else if msg.contains("status 4") {
Self::Http4xx
} else if msg.contains("status 5") {
Self::Http5xx
} else if msg.contains("connection") || msg.contains("dns") || msg.contains("connect") {
Self::Connection
} else if msg.contains("decode")
|| msg.contains("from_rrd_bytes")
|| msg.contains("from_record_batch")
{
Self::Decode
} else {
Self::Other
}
}
}
pub async fn fetch_batch_group_via_grpc<T: DataframeClientAPI>(
batch_group: &[RecordBatch],
client: &T,
) -> ApiResult<Vec<ChunksWithSegment>> {
let mut all_chunks = Vec::new();
let mut client = client.clone();
for batch in batch_group {
let chunk_info: re_protos::common::v1alpha1::DataframePart = batch.clone().into();
let fetch_chunks_request = FetchChunksRequest {
chunk_infos: vec![chunk_info],
};
let response = client
.fetch_chunks(fetch_chunks_request.into_request())
.instrument(tracing::trace_span!("batched_fetch_chunks"))
.await
.map_err(|err| re_redap_client::ApiError::tonic(err, "FetchChunks request failed"))?;
let response_stream =
re_redap_client::ApiResponseStream::from_tonic_response(response, "/FetchChunks");
let chunk_stream =
re_redap_client::fetch_chunks_response_to_chunk_and_segment_id(response_stream);
let batch_chunks: Vec<ApiResult<ChunksWithSegment>> = chunk_stream.collect().await;
for chunk_result in batch_chunks {
all_chunks.push(chunk_result?);
}
}
Ok(all_chunks)
}
fn classify_http_status(status: reqwest::StatusCode) -> DirectFetchError {
DirectFetchError {
msg: format!("HTTP request returned status {status}"),
retryable: status_retryable(status),
}
}
fn status_retryable(status: reqwest::StatusCode) -> bool {
!matches!(
status,
reqwest::StatusCode::BAD_REQUEST
| reqwest::StatusCode::UNAUTHORIZED
| reqwest::StatusCode::FORBIDDEN
| reqwest::StatusCode::METHOD_NOT_ALLOWED
)
}
impl From<reqwest::Error> for DirectFetchError {
fn from(err: reqwest::Error) -> Self {
let mut msg = match err.status() {
Some(status) => {
format!("HTTP request failed with status {status}: {err}")
}
None => format!("HTTP request failed: {err}"),
};
let retryable = err.status().is_none_or(status_retryable);
if let Some(source) = err.source() {
write!(msg, " ({source})").expect("Can append");
}
Self { msg, retryable }
}
}
fn calculate_optimal_gap_size(ranges: &[(u64, u64)]) -> usize {
if ranges.len() < 2 {
return 0;
}
let avg_chunk_size: f64 =
ranges.iter().map(|(_, len)| *len as f64).sum::<f64>() / ranges.len() as f64;
(avg_chunk_size * 0.25) as usize
}
fn merge_ranges_for_url(
url: String,
mut chunks: Vec<(usize, u64, u64)>, max_gap_size: usize,
) -> Vec<MergedRangeRequest> {
if chunks.is_empty() {
return vec![];
}
chunks.sort_by_key(|&(_, offset, _)| offset);
chunks.dedup_by_key(|(_, offset, _)| *offset);
let mut merged_ranges = Vec::new();
let (first_row, first_offset, first_length) = chunks[0];
let mut current_start = first_offset as usize;
let mut current_end = (first_offset + first_length) as usize;
let mut chunk_infos = vec![ChunkInMergedRange {
original_row_index: first_row,
offset_in_merged: 0,
length: first_length as usize,
}];
for (row_idx, offset, length) in chunks.into_iter().skip(1) {
let offset = offset as usize;
let length = length as usize;
let gap_size = offset.saturating_sub(current_end);
let new_end = (offset + length).max(current_end);
let new_merged_size = new_end - current_start;
let should_merge = gap_size <= max_gap_size && new_merged_size <= MAX_MERGED_RANGE_SIZE;
if should_merge {
chunk_infos.push(ChunkInMergedRange {
original_row_index: row_idx,
offset_in_merged: offset - current_start,
length,
});
current_end = new_end;
} else {
merged_ranges.push(MergedRangeRequest {
url: url.clone(),
file_range_start: current_start,
file_range_end: current_end,
chunks: chunk_infos,
});
current_start = offset;
current_end = offset + length;
chunk_infos = vec![ChunkInMergedRange {
original_row_index: row_idx,
offset_in_merged: 0,
length,
}];
}
}
merged_ranges.push(MergedRangeRequest {
url,
file_range_start: current_start,
file_range_end: current_end,
chunks: chunk_infos,
});
merged_ranges
}
fn calculate_adaptive_concurrency(ranges: &[(u64, u64)]) -> usize {
if ranges.is_empty() {
return 1;
}
let total_range_size: usize = ranges.iter().map(|(_, len)| *len as usize).sum();
let avg_range_size = total_range_size / ranges.len();
let base_concurrency = if avg_range_size <= 128 * 1024 {
130
} else if avg_range_size <= 2 * 1024 * 1024 {
90
} else {
30
};
let memory_limit = if total_range_size <= 50 * 1024 * 1024 {
base_concurrency
} else if total_range_size <= 200 * 1024 * 1024 {
25
} else {
8
};
base_concurrency.min(memory_limit)
}
fn decode_chunk_from_bytes(bytes: &[u8]) -> Result<(Chunk, Option<String>), DirectFetchError> {
use re_log_encoding::Decodable;
let raw_msg =
<Option<re_protos::log_msg::v1alpha1::log_msg::Msg> as Decodable>::from_rrd_bytes(bytes)
.map_err(|err| {
DirectFetchError::new(format!("Msg::from_rrd_bytes failed: {err}"), false)
})?
.ok_or_else(|| DirectFetchError::new("empty msg".to_owned(), false))?;
let re_protos::log_msg::v1alpha1::log_msg::Msg::ArrowMsg(arrow_msg) = raw_msg else {
return Err(DirectFetchError::new("invalid msg type".to_owned(), false));
};
let segment_id_opt = arrow_msg.store_id.clone().map(|id| id.recording_id);
use re_log_encoding::ToApplication as _;
let app_msg = arrow_msg.to_application(()).map_err(|err| {
DirectFetchError::new(format!("ArrowMsg::to_application() failed: {err}"), false)
})?;
let chunk = Chunk::from_record_batch(&app_msg.batch).map_err(|err| {
DirectFetchError::new(format!("Chunk::from_record_batch failed: {err}"), false)
})?;
Ok((chunk, segment_id_opt))
}
#[tracing::instrument(
level = "info",
skip_all,
fields(num_chunks, num_merged_requests, concurrency)
)]
async fn fetch_batch_via_direct_urls(
http_client: &reqwest::Client,
batch: &RecordBatch,
stats: &mut TaskFetchStats,
) -> Result<Vec<ChunksWithSegment>, DirectFetchError> {
fn batch_column<'a, T: arrow::array::Array + 'static>(
batch: &'a RecordBatch,
column_name: &'static str,
) -> Result<&'a T, DirectFetchError> {
let column = batch
.column_by_name(column_name)
.ok_or_else(|| DirectFetchError::new(format!("missing column {column_name}"), false))?;
column
.as_any()
.downcast_ref::<T>()
.ok_or_else(|| DirectFetchError::new(format!("invalid column {column_name}"), false))
}
let byte_offsets: &UInt64Array =
batch_column(batch, QueryDatasetResponse::FIELD_CHUNK_BYTE_OFFSET)?;
let byte_lengths: &UInt64Array =
batch_column(batch, QueryDatasetResponse::FIELD_CHUNK_BYTE_LENGTH)?;
let direct_urls: &DictionaryArray<Int32Type> =
batch_column(batch, QueryDatasetResponse::FIELD_DIRECT_URL)?;
let num_rows = batch.num_rows();
let mut url_groups: BTreeMap<String, Vec<(usize, u64, u64)>> = BTreeMap::new();
let mut all_ranges: Vec<(u64, u64)> = Vec::with_capacity(num_rows);
let url_values = direct_urls
.values()
.as_any()
.downcast_ref::<StringArray>()
.expect("direct_url dictionary values must be strings");
for i in 0..num_rows {
let offset = byte_offsets.value(i);
let length = byte_lengths.value(i);
re_log::debug_assert!(
!direct_urls.is_null(i),
"split_batch_by_direct_url should have filtered null URLs"
);
if direct_urls.is_null(i) {
return Err(DirectFetchError::new(
format!("no direct URL for chunk at row {i}"),
false,
));
}
let key = direct_urls.keys().value(i);
let url = url_values.value(key as usize).to_owned();
url_groups.entry(url).or_default().push((i, offset, length));
all_ranges.push((offset, length));
}
let max_gap_size = calculate_optimal_gap_size(&all_ranges);
let merged_requests: Vec<MergedRangeRequest> = url_groups
.into_iter()
.flat_map(|(url, chunks)| merge_ranges_for_url(url, chunks, max_gap_size))
.collect();
let concurrency = calculate_adaptive_concurrency(&all_ranges);
let span = tracing::Span::current();
span.record("num_chunks", num_rows);
span.record("num_merged_requests", merged_requests.len());
span.record("concurrency", concurrency);
stats.record_direct_ranges(all_ranges.len() as u64, merged_requests.len() as u64);
re_log::debug!(
"Range merging: {num_rows} chunks → {} merged requests, concurrency={concurrency}",
merged_requests.len()
);
let fetches = merged_requests
.into_iter()
.enumerate()
.map(|(req_idx, request)| {
let MergedRangeRequest {
url,
file_range_start,
file_range_end,
chunks,
} = request;
let http_client = http_client.clone();
async move {
let mut local_stats = TaskFetchStats::default();
let range_end = file_range_end - 1;
re_log::debug!(
"Merged fetch [{req_idx}]: {file_range_start}..={range_end} ({} chunks)",
chunks.len()
);
let mut backoff_gen = re_backoff::BackoffGenerator::new(
std::time::Duration::from_millis(100),
std::time::Duration::from_secs(3),
)
.expect("base is less than max");
let mut last_err: Option<DirectFetchError> = None;
for attempt in 1..=DIRECT_FETCH_MAX_RETRIES {
if last_err.is_some() {
let backoff = backoff_gen.gen_next();
let jittered = backoff.jittered();
re_log::debug!(
"Direct fetch [{req_idx}] retry attempt {attempt}/{DIRECT_FETCH_MAX_RETRIES} after {jittered:?}"
);
if attempt == 2 {
local_stats.record_direct_request_was_retried();
}
local_stats.record_direct_retry(jittered, attempt as u64);
backoff.sleep().await;
}
let fetch_result = fetch_merged_range(
&http_client,
&url,
file_range_start,
range_end,
&chunks,
)
.await;
match fetch_result {
Ok(results) => {
if attempt > 1 {
re_log::debug!(
"Direct fetch [{req_idx}] succeeded on attempt {attempt}"
);
}
return (Ok(results), local_stats);
}
Err(err) if err.retryable => {
re_log::debug!(
"Direct fetch [{req_idx}] failure (attempt {attempt}/{DIRECT_FETCH_MAX_RETRIES}): {err}"
);
last_err = Some(err);
}
Err(err) => {
re_log::error!(
"Non-retryable direct fetch failure on attempt {attempt}: {err}"
);
return (Err(err), local_stats);
}
}
}
let err = last_err.expect("at least one attempt was made");
(
Err(DirectFetchError::new(
format!(
"request [{req_idx}] failed after {DIRECT_FETCH_MAX_RETRIES} attempts: {err}"
),
false,
)),
local_stats,
)
}
.instrument(tracing::info_span!(
"direct_fetch_request",
req = req_idx,
bytes = tracing::field::Empty
))
});
let mut all_chunks: Vec<(usize, (Chunk, Option<String>))> = Vec::new();
let mut first_err: Option<DirectFetchError> = None;
async {
let mut stream = futures::stream::iter(fetches).buffer_unordered(concurrency);
while let Some((result, local_stats)) = stream.next().await {
stats.merge_from(local_stats);
match result {
Ok(chunks) => all_chunks.extend(chunks),
Err(err) => {
if first_err.is_none() {
first_err = Some(err);
}
}
}
}
}
.instrument(tracing::info_span!("direct_fetch_all"))
.await;
if let Some(err) = first_err {
return Err(err);
}
all_chunks.sort_by_key(|(idx, _)| *idx);
let ordered: Vec<(Chunk, Option<String>)> = all_chunks
.into_iter()
.map(|(_, chunk_with_segment)| chunk_with_segment)
.collect();
Ok(vec![ordered])
}
type DecodedChunk = (usize, (Chunk, Option<String>));
async fn fetch_merged_range(
http_client: &reqwest::Client,
url: &str,
range_start: usize,
range_end: usize,
chunks: &[ChunkInMergedRange],
) -> Result<Vec<DecodedChunk>, DirectFetchError> {
let response = http_client
.get(url)
.header("Range", format!("bytes={range_start}-{range_end}"))
.send()
.await?;
if !response.status().is_success() {
return Err(classify_http_status(response.status()));
}
let merged_bytes = response
.bytes()
.await
.map_err(|err| DirectFetchError::new(format!("failed to read body: {err}"), true))?;
tracing::Span::current().record("bytes", merged_bytes.len());
chunks
.iter()
.map(|info| {
let start = info.offset_in_merged;
let end = start + info.length;
let chunk_bytes = merged_bytes.get(start..end).ok_or_else(|| {
DirectFetchError::new(
format!(
"merged range shorter than expected: need {end} bytes, got {}",
merged_bytes.len()
),
false,
)
})?;
decode_chunk_from_bytes(chunk_bytes)
.map(|chunk_with_segment| (info.original_row_index, chunk_with_segment))
})
.collect::<Result<Vec<_>, _>>()
}