use std::ops::ControlFlow;
use std::sync::Arc;
use arrow::array::{AsArray as _, RecordBatch};
use arrow::error::ArrowError;
use itertools::Itertools as _;
use re_auth::client::AuthDecorator;
use re_byte_size::SizeBytes as _;
use re_chunk::{Chunk, ChunkId};
use re_log_channel::{DataSourceMessage, DataSourceUiCommand};
use re_log_types::{
BlueprintActivationCommand, EntryId, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind,
StoreSource,
};
use re_protos::cloud::v1alpha1::rerun_cloud_service_client::RerunCloudServiceClient;
use re_protos::common::v1alpha1::ext::SegmentId;
use re_uri::Origin;
use tokio_stream::StreamExt as _;
use crate::{
ApiError, ApiErrorKind, ApiResult, ConnectionClient, MAX_DECODING_MESSAGE_SIZE,
SegmentQueryParams,
};
#[cfg(target_arch = "wasm32")]
pub async fn channel(origin: Origin) -> ApiResult<tonic_web_wasm_client::Client> {
let channel = tonic_web_wasm_client::Client::new_with_options(
origin.as_url(),
tonic_web_wasm_client::options::FetchOptions::new(),
);
Ok(channel)
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn channel(origin: Origin) -> ApiResult<tonic::transport::Channel> {
use std::net::Ipv4Addr;
use tonic::transport::Endpoint;
let http_url = origin.as_url();
let tls_config = if let Ok(cert_path) = std::env::var("RERUN_REDAP_LOCAL_CERT_PATH") {
use tonic::transport::{Certificate, ClientTlsConfig};
re_log::info!(cert_path, "starting client with local TLS cert");
let ca_cert = tokio::fs::read_to_string(&cert_path).await.map_err(|err| {
ApiError::internal_with_source(
None,
err,
format!("couldn't load local cert at {cert_path:?}"),
)
})?;
let ca_cert = Certificate::from_pem(ca_cert);
ClientTlsConfig::new()
.with_enabled_roots()
.ca_certificate(ca_cert)
.domain_name("localhost") .assume_http2(true)
} else {
tonic::transport::ClientTlsConfig::new()
.with_enabled_roots()
.assume_http2(true)
};
let endpoint = {
let mut endpoint = Endpoint::new(http_url)
.and_then(|ep| ep.tls_config(tls_config))
.map_err(|err| ApiError::connection_with_source(None, err, "connecting to server"))?
.http2_adaptive_window(true) .connect_timeout(std::time::Duration::from_secs(10))
.http2_keep_alive_interval(std::time::Duration::from_secs(30))
.keep_alive_timeout(std::time::Duration::from_secs(20))
.keep_alive_while_idle(true)
.tcp_keepalive(Some(std::time::Duration::from_secs(30)));
if false {
endpoint = endpoint.initial_stream_window_size(Some(4 * 1024 * 1024));
endpoint = endpoint.initial_connection_window_size(Some(16 * 1024 * 1024));
}
endpoint.connect().await.map_err(|err| {
ApiError::connection_with_source(
None,
err,
format!("failed to connect to server at {origin}"),
)
})
};
match endpoint {
Ok(channel) => Ok(channel),
Err(original_err) => {
if ![
url::Host::Domain("localhost".to_owned()),
url::Host::Ipv4(Ipv4Addr::LOCALHOST),
]
.contains(&origin.host)
{
return Err(original_err);
}
let Ok(endpoint) = Endpoint::new(origin.coerce_http_url()) else {
return Err(original_err);
};
let endpoint = endpoint.http2_adaptive_window(true);
if endpoint.connect().await.is_ok() {
Err(ApiError::connection(
"the server is expecting an unencrypted connection (try `rerun+http://` if you are sure)",
))
} else {
Err(original_err)
}
}
}
}
#[cfg(target_arch = "wasm32")]
pub type RedapClientInner = re_auth::client::AuthService<
tonic::service::interceptor::InterceptedService<
re_protos::headers::PropagateHeaders<tonic_web_wasm_client::Client>,
re_protos::headers::RerunVersionInterceptor,
>,
>;
#[cfg(target_arch = "wasm32")]
pub(crate) fn assemble_client(
channel: tonic_web_wasm_client::Client,
credentials: Option<Arc<dyn re_auth::credentials::CredentialsProvider + Send + Sync + 'static>>,
) -> (RedapClient, RedapClientInner) {
let middlewares = tower::ServiceBuilder::new()
.layer(AuthDecorator::new(credentials))
.layer(re_protos::headers::new_rerun_client_headers_layer());
let svc: RedapClientInner = tower::ServiceBuilder::new()
.layer(middlewares.into_inner())
.service(channel);
let client = RerunCloudServiceClient::new(svc.clone())
.max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE);
(client, svc)
}
#[cfg(target_arch = "wasm32")]
pub(crate) async fn client(
origin: Origin,
credentials: Option<Arc<dyn re_auth::credentials::CredentialsProvider + Send + Sync + 'static>>,
) -> ApiResult<(RedapClient, RedapClientInner)> {
let channel = crate::with_retry("redap_connection", || async {
channel(origin.clone()).await
})
.await?;
Ok(assemble_client(channel, credentials))
}
#[cfg(all(not(target_arch = "wasm32"), feature = "perf_telemetry"))]
pub type RedapClientInner = re_auth::client::AuthService<
tonic::service::interceptor::InterceptedService<
re_protos::headers::PropagateHeaders<
re_perf_telemetry::external::tower_http::trace::Trace<
tonic::service::interceptor::InterceptedService<
tonic::transport::Channel,
re_perf_telemetry::TracingInjectorInterceptor,
>,
re_perf_telemetry::external::tower_http::classify::SharedClassifier<
re_perf_telemetry::external::tower_http::classify::GrpcErrorsAsFailures,
>,
re_perf_telemetry::GrpcMakeSpan,
re_perf_telemetry::external::tower_http::trace::DefaultOnRequest,
re_perf_telemetry::ClientOnResponse,
>,
>,
re_protos::headers::RerunVersionInterceptor,
>,
>;
#[cfg(all(not(target_arch = "wasm32"), not(feature = "perf_telemetry")))]
pub type RedapClientInner = re_auth::client::AuthService<
tonic::service::interceptor::InterceptedService<
re_protos::headers::PropagateHeaders<tonic::transport::Channel>,
re_protos::headers::RerunVersionInterceptor,
>,
>;
pub type RedapClient = RerunCloudServiceClient<RedapClientInner>;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn assemble_client(
channel: tonic::transport::Channel,
credentials: Option<Arc<dyn re_auth::credentials::CredentialsProvider + Send + Sync + 'static>>,
) -> (RedapClient, RedapClientInner) {
let middlewares = tower::ServiceBuilder::new()
.layer(AuthDecorator::new(credentials))
.layer(re_protos::headers::new_rerun_client_headers_layer());
#[cfg(feature = "perf_telemetry")]
let middlewares = middlewares.layer(re_perf_telemetry::new_client_telemetry_layer());
let svc: RedapClientInner = tower::ServiceBuilder::new()
.layer(middlewares.into_inner())
.service(channel);
let client = RerunCloudServiceClient::new(svc.clone())
.max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE);
(client, svc)
}
#[cfg(not(target_arch = "wasm32"))]
pub(crate) async fn client(
origin: Origin,
credentials: Option<Arc<dyn re_auth::credentials::CredentialsProvider + Send + Sync + 'static>>,
) -> ApiResult<(RedapClient, RedapClientInner)> {
let channel = crate::with_retry("redap_connection", || async {
channel(origin.clone()).await
})
.await?;
Ok(assemble_client(channel, credentials))
}
pub type ChunksWithSegment = Vec<(Chunk, Option<SegmentId>)>;
#[tracing::instrument(level = "debug", skip_all)]
#[cfg(not(target_arch = "wasm32"))]
pub fn fetch_chunks_response_to_chunk_and_segment_id(
response: crate::FetchChunksResponseStream,
) -> crate::ApiResponseStream<ChunksWithSegment> {
let trace_id = response.trace_id();
let parent_span = tracing::Span::current();
let stream = response
.then(move |resp| {
let trace_id = trace_id;
let parent_span = parent_span.clone();
tokio::task::spawn_blocking(move || {
let _parent_guard = parent_span.enter();
let r = resp?;
let _span = tracing::trace_span!(
parent: &parent_span,
"fetch_chunks::batch_decode",
num_chunks = r.chunks.len(),
)
.entered();
r.chunks
.into_iter()
.map(|arrow_msg| {
re_tracing::profile_scope!("fetch_chunks_response_to_chunk_and_segment_id");
let segment_id = arrow_msg
.store_id
.clone()
.map(|id| SegmentId::from(id.recording_id));
use re_log_encoding::ToApplication as _;
let arrow_msg = arrow_msg.to_application(()).map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed to get arrow data for item in /FetchChunks response stream",
)
})?;
let chunk = re_chunk::Chunk::from_record_batch(&arrow_msg.batch).map_err(
|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed to parse item in /FetchChunks response stream",
)
},
)?;
Ok((chunk, segment_id))
})
.try_collect()
})
})
.map(move |res| {
res.map_err(|err| {
ApiError::internal_with_source(
trace_id,
err,
"failed to sync on /FetchChunks response stream",
)
})
.flatten()
});
crate::ApiResponseStream::new(stream, trace_id)
}
#[cfg(target_arch = "wasm32")]
pub fn fetch_chunks_response_to_chunk_and_segment_id(
response: crate::FetchChunksResponseStream,
) -> crate::ApiResponseStream<ChunksWithSegment> {
let trace_id = response.trace_id();
let stream = response.map(move |resp| {
let resp = resp?;
let _span =
tracing::trace_span!("fetch_chunks::batch_decode", num_chunks = resp.chunks.len())
.entered();
resp.chunks
.into_iter()
.map(|arrow_msg| {
let segment_id = arrow_msg
.store_id
.clone()
.map(|id| SegmentId::from(id.recording_id));
use re_log_encoding::ToApplication as _;
let arrow_msg = arrow_msg.to_application(()).map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed to get arrow data for item in /FetchChunks response stream",
)
})?;
let chunk =
re_chunk::Chunk::from_record_batch(&arrow_msg.batch).map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed to parse item in /FetchChunks response stream",
)
})?;
Ok((chunk, segment_id))
})
.try_collect()
});
crate::ApiResponseStream::new(stream, trace_id)
}
pub type ProgressCallback = std::sync::Arc<dyn Fn(u64, Option<u64>) + Send + Sync>;
#[derive(Clone, Default)]
pub struct StreamingOptions {
pub force_full_download: bool,
pub on_progress: Option<ProgressCallback>,
}
impl std::fmt::Debug for StreamingOptions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamingOptions")
.field("force_full_download", &self.force_full_download)
.field("on_progress", &self.on_progress.as_ref().map(|_| "…"))
.finish()
}
}
pub async fn stream_blueprint_and_segment_from_server(
mut client: ConnectionClient,
tx: re_log_channel::LogSender,
uri: re_uri::DatasetSegmentUri,
options: StreamingOptions,
) -> ApiResult {
re_log::debug!("Loading {uri}…");
let dataset_entry = client.read_dataset_entry(uri.dataset_id.into()).await?;
let recording_store_id = uri.store_id();
if let Some((blueprint_dataset, blueprint_segment)) =
dataset_entry.dataset_details.default_blueprint()
{
re_log::debug!("Streaming blueprint dataset {blueprint_dataset}");
let blueprint_store_id = StoreId::random(
StoreKind::Blueprint,
recording_store_id.application_id().clone(),
);
let blueprint_store_info = StoreInfo {
store_id: blueprint_store_id.clone(),
cloned_from: None,
store_source: StoreSource::Unknown,
store_version: None,
};
if stream_segment_from_server(
&mut client,
blueprint_store_info,
&tx,
blueprint_dataset,
blueprint_segment,
re_uri::Fragment::default(),
&StreamingOptions::default(),
)
.await?
.is_break()
{
return Ok(());
}
if tx
.send(
LogMsg::BlueprintActivationCommand(BlueprintActivationCommand {
blueprint_id: blueprint_store_id,
make_active: false,
make_default: true,
})
.into(),
)
.is_err()
{
re_log::debug!("Receiver disconnected");
return Ok(());
}
} else {
re_log::debug!("No blueprint dataset found for {uri}");
}
let re_uri::DatasetSegmentUri {
origin: _,
dataset_id,
segment_id,
fragment,
} = uri;
let store_info = StoreInfo {
store_id: recording_store_id,
cloned_from: None,
store_source: StoreSource::Unknown,
store_version: None,
};
if stream_segment_from_server(
&mut client,
store_info,
&tx,
dataset_id.into(),
segment_id.into(),
fragment,
&options,
)
.await?
.is_break()
{
return Ok(());
}
Ok(())
}
async fn stream_segment_from_server(
client: &mut ConnectionClient,
store_info: StoreInfo,
tx: &re_log_channel::LogSender,
dataset_id: EntryId,
segment_id: SegmentId,
fragment: re_uri::Fragment,
options: &StreamingOptions,
) -> ApiResult<ControlFlow<()>> {
let store_id = store_info.store_id.clone();
re_log::debug!("Streaming {store_id:?}…");
if tx
.send(
LogMsg::SetStoreInfo(SetStoreInfo {
row_id: *re_chunk::RowId::new(),
info: store_info,
})
.into(),
)
.is_err()
{
re_log::debug!("Receiver disconnected");
return Ok(ControlFlow::Break(()));
}
#[expect(clippy::collapsible_if)]
if store_id.is_recording() && !fragment.is_empty() {
if tx
.send(
DataSourceUiCommand::SetUrlFragment {
store_id: store_id.clone(),
fragment: fragment.to_string(),
}
.into(),
)
.is_err()
{
re_log::debug!("Receiver disconnected");
return Ok(ControlFlow::Break(()));
}
}
let start_time = web_time::Instant::now();
let manifest_stream_result = client
.get_rrd_manifest_stream(dataset_id, segment_id.clone())
.await;
let trace_id = manifest_stream_result
.as_ref()
.ok()
.and_then(|s| s.trace_id());
match manifest_stream_result {
Ok(manifest_stream) => {
let mut manifest_stream = std::pin::pin!(manifest_stream);
let mut rrd_manifest_parts: Vec<Arc<re_log_encoding::RrdManifest>> = Vec::new();
while let Some(part_result) = manifest_stream.next().await {
let raw_rrd_manifest_part = part_result?;
let part_nr = rrd_manifest_parts.len() + 1;
re_log::debug!(
"Received RRD manifest part #{part_nr}/? ({} deflated, {:.1}s elapsed)",
re_format::format_bytes(raw_rrd_manifest_part.total_size_bytes() as _),
start_time.elapsed().as_secs_f32(),
);
let rrd_manifest = re_log_encoding::RrdManifest::try_new(&raw_rrd_manifest_part)
.map_err(|err| {
ApiError::invalid_arguments_with_source(
trace_id,
err,
"Invalid RRD manifest part",
)
})?;
let rrd_manifest = Arc::new(rrd_manifest);
if tx
.send(DataSourceMessage::RrdManifest(
store_id.clone(),
rrd_manifest.clone(),
))
.is_err()
{
re_log::debug!("Receiver disconnected");
return Ok(ControlFlow::Break(()));
}
rrd_manifest_parts.push(rrd_manifest);
}
if rrd_manifest_parts.is_empty() {
return Err(ApiError::deserialization(
trace_id,
"failed to parse the response for /GetRrdManifest (no data)",
));
}
let part_nr = rrd_manifest_parts.len();
re_log::debug!(
"Full RRD manifest loaded in {:.1}s in {}",
start_time.elapsed().as_secs_f32(),
re_format::format_plural_s(part_nr, "part")
);
if tx
.send(DataSourceMessage::RrdManifestComplete(store_id.clone()))
.is_err()
{
re_log::debug!("Receiver disconnected");
return Ok(ControlFlow::Break(()));
}
match store_id.kind() {
StoreKind::Recording if !options.force_full_download => {
re_log::debug!("Letting the viewer load chunks on-demand");
return Ok(ControlFlow::Continue(()));
}
StoreKind::Recording | StoreKind::Blueprint => {
re_log::debug!("Loading all of the chunks in one go; most important first");
let refs: Vec<&re_log_encoding::RrdManifest> =
rrd_manifest_parts.iter().map(|m| m.as_ref()).collect();
let combined = re_log_encoding::RrdManifest::concat(&refs).map_err(|err| {
ApiError::invalid_arguments_with_source(
trace_id,
err,
"Failed to concatenate RRD manifest parts",
)
})?;
let batch = sort_batch(combined.chunk_fetcher_rb()).map_err(|err| {
ApiError::invalid_arguments_with_source(
trace_id,
err,
"Failed to sort chunk index",
)
})?;
return load_chunks(client, tx, &store_id, batch, options).await;
}
}
}
Err(err) => {
if err.kind == ApiErrorKind::Unimplemented {
re_log::debug_once!("The server does not support on-demand streaming"); } else {
re_log::warn!("Failed to load RRD manifest: {err}");
}
}
}
let mut already_loaded_chunk_ids: ahash::HashSet<ChunkId> = Default::default();
if let Some(time_selection) = fragment.time_selection {
let time_selection_batches = client
.query_dataset_chunk_index(SegmentQueryParams {
dataset_id,
segment_id: segment_id.clone(),
include_static_data: true,
include_temporal_data: true,
query: Some(
re_protos::cloud::v1alpha1::ext::Query::latest_at_range(
*time_selection.timeline.name(),
time_selection.range,
)
.into(),
),
generate_direct_urls: false,
})
.await?;
if time_selection_batches.is_empty() {
re_log::debug!(
"No chunks found for time selection {:?} in recording {:?}",
time_selection,
store_id
);
} else {
let batch = arrow::compute::concat_batches(
&time_selection_batches[0].schema(),
&time_selection_batches,
)
.map_err(|err| {
ApiError::invalid_arguments_with_source(
None,
err,
"Failed to concat chunk index batches",
)
})?;
let batch = sort_batch(&batch).map_err(|err| {
ApiError::invalid_arguments_with_source(trace_id, err, "Failed to sort chunk index")
})?;
if let Some(chunk_ids) = chunk_id_column(&batch) {
already_loaded_chunk_ids = chunk_ids.iter().copied().collect();
} else {
re_log::warn_once!(
"Failed to find 'chunk_id' column in chunk index response. Schema: {}",
batch.schema()
);
}
if load_chunks(client, tx, &store_id, batch, options)
.await?
.is_break()
{
return Ok(ControlFlow::Break(()));
}
}
}
let batches = client
.query_dataset_chunk_index(SegmentQueryParams {
dataset_id,
segment_id: segment_id.clone(),
include_static_data: true,
include_temporal_data: true,
query: None, generate_direct_urls: false,
})
.await?;
if batches.is_empty() {
re_log::info!("Empty recording"); return Ok(ControlFlow::Continue(()));
}
let batch = arrow::compute::concat_batches(&batches[0].schema(), &batches).map_err(|err| {
ApiError::invalid_arguments_with_source(
trace_id,
err,
"Failed to concat chunk index batches",
)
})?;
let batch = sort_batch(&batch).map_err(|err| {
ApiError::invalid_arguments_with_source(trace_id, err, "Failed to sort chunk index")
})?;
if let Some(chunk_ids) = chunk_id_column(&batch)
&& !already_loaded_chunk_ids.is_empty()
{
let filtered_indices: Vec<usize> = chunk_ids
.iter()
.enumerate()
.filter_map(|(idx, chunk_id)| {
if already_loaded_chunk_ids.contains(chunk_id) {
None
} else {
Some(idx)
}
})
.collect();
let filtered_batch =
re_arrow_util::take_record_batch(&batch, &filtered_indices).map_err(|err| {
ApiError::invalid_arguments_with_source(trace_id, err, "take_record_batch")
})?;
load_chunks(client, tx, &store_id, filtered_batch, options).await
} else {
load_chunks(client, tx, &store_id, batch, options).await
}
}
fn chunk_id_column(batch: &RecordBatch) -> Option<&[ChunkId]> {
let array = batch
.column_by_name(re_log_encoding::RawRrdManifest::FIELD_CHUNK_ID)
.and_then(|array| array.as_fixed_size_binary_opt())?;
ChunkId::try_slice_from_arrow(array).ok()
}
#[tracing::instrument(skip_all, fields(
num_chunks = tracing::field::Empty,
total_size_bytes = tracing::field::Empty,
downloaded_bytes = tracing::field::Empty,
))]
async fn load_chunks(
client: &ConnectionClient,
tx: &re_log_channel::LogSender,
store_id: &StoreId,
full_batch: RecordBatch,
options: &StreamingOptions,
) -> ApiResult<ControlFlow<()>> {
let num_chunks = full_batch.num_rows();
let total_size_bytes = total_size_bytes_from_batch(&full_batch);
let span = tracing::Span::current();
span.record("num_chunks", num_chunks);
if let Some(total_size_bytes) = total_size_bytes {
span.record("total_size_bytes", total_size_bytes);
}
let total_size_str = total_size_bytes
.map(|bytes| re_format::format_bytes(bytes as _))
.unwrap_or_else(|| "unknown size".to_owned());
re_log::debug!(
"Downloading {} chunks ({}) from server…",
re_format::format_uint(num_chunks),
total_size_str,
);
if 25_000 < num_chunks {
re_log::debug_warn!(
"There are {} chunks in this recording. Consider running `rerun rrd optimize` on it!",
re_format::format_uint(num_chunks)
);
}
use futures::stream::FuturesUnordered;
const BATCH_SIZE: usize = 32;
let mut futures = FuturesUnordered::new();
for start in (0..num_chunks).step_by(BATCH_SIZE) {
let end = usize::min(start + BATCH_SIZE, num_chunks);
let small_batch = full_batch.slice(start, end - start);
let mut client = client.clone();
let tx = tx.clone();
let store_id = store_id.clone();
futures.push(async move {
load_small_chunk_batch(&mut client, &tx, &store_id, &small_batch).await
});
}
let mut downloaded_bytes: u64 = 0;
while let Some(res) = futures::stream::StreamExt::next(&mut futures).await {
let (result, batch_bytes) = res?;
downloaded_bytes += batch_bytes;
if let Some(on_progress) = &options.on_progress {
on_progress(downloaded_bytes, total_size_bytes);
}
if result.is_break() {
return Ok(ControlFlow::Break(()));
}
}
span.record("downloaded_bytes", downloaded_bytes);
re_log::trace!(
"Finished downloading {} chunks ({}).",
re_format::format_uint(num_chunks),
re_format::format_bytes(downloaded_bytes as _),
);
Ok(ControlFlow::Continue(()))
}
fn total_size_bytes_from_batch(batch: &RecordBatch) -> Option<u64> {
let col = batch.column_by_name(re_log_encoding::RawRrdManifest::FIELD_CHUNK_BYTE_SIZE)?;
let array = col.as_primitive_opt::<arrow::datatypes::UInt64Type>()?;
Some(array.iter().map(|v| v.unwrap_or(0)).sum())
}
#[tracing::instrument(skip_all, fields(
num_chunks_in_batch = batch.num_rows(),
batch_bytes = tracing::field::Empty,
num_chunks_received = tracing::field::Empty,
))]
async fn load_small_chunk_batch(
client: &mut ConnectionClient,
tx: &re_log_channel::LogSender,
store_id: &StoreId,
batch: &RecordBatch,
) -> ApiResult<(ControlFlow<()>, u64)> {
let chunk_stream = client.fetch_segment_chunks_by_id(batch).await?;
let mut chunk_stream = fetch_chunks_response_to_chunk_and_segment_id(chunk_stream);
let trace_id = chunk_stream.trace_id();
let mut batch_bytes: u64 = 0;
let mut num_chunks_received: u64 = 0;
while let Some(chunks) = chunk_stream.next().await {
for (chunk, _partition_id) in chunks? {
batch_bytes += chunk.heap_size_bytes();
num_chunks_received += 1;
if tx
.send(
LogMsg::ArrowMsg(
store_id.clone(),
chunk.to_arrow_msg().map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed to parse chunk in /FetchChunks response stream",
)
})?,
)
.into(),
)
.is_err()
{
re_log::debug!("Receiver disconnected");
let span = tracing::Span::current();
span.record("batch_bytes", batch_bytes);
span.record("num_chunks_received", num_chunks_received);
return Ok((ControlFlow::Break(()), batch_bytes));
}
}
}
let span = tracing::Span::current();
span.record("batch_bytes", batch_bytes);
span.record("num_chunks_received", num_chunks_received);
Ok((ControlFlow::Continue(()), batch_bytes))
}
fn sort_batch(batch: &RecordBatch) -> Result<RecordBatch, ArrowError> {
use std::sync::Arc;
let schema = batch.schema();
let chunk_is_static = schema.index_of(re_log_encoding::RrdManifest::FIELD_CHUNK_IS_STATIC)?;
let chunk_id = schema.index_of(re_log_encoding::RrdManifest::FIELD_CHUNK_ID)?;
let sort_keys = vec![
arrow::compute::SortColumn {
values: Arc::new(batch.column(chunk_is_static).clone()),
options: Some(arrow::compute::SortOptions {
descending: true,
nulls_first: true,
}),
},
arrow::compute::SortColumn {
values: Arc::new(batch.column(chunk_id).clone()),
options: Some(arrow::compute::SortOptions {
descending: false,
nulls_first: true,
}),
},
];
let indices = arrow::compute::lexsort_to_indices(&sort_keys, None)?;
let sorted = arrow::compute::take_record_batch(batch, &indices)?;
Ok(sorted)
}