use std::ops::ControlFlow;
use std::sync::Arc;
use arrow::array::{AsArray as _, RecordBatch};
use arrow::error::ArrowError;
use re_auth::client::AuthDecorator;
use re_byte_size::SizeBytes as _;
use re_chunk::{Chunk, ChunkId};
use re_log_channel::{DataSourceMessage, DataSourceUiCommand};
use re_log_types::{
BlueprintActivationCommand, EntryId, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind,
StoreSource,
};
use re_protos::cloud::v1alpha1::rerun_cloud_service_client::RerunCloudServiceClient;
use re_protos::common::v1alpha1::ext::SegmentId;
use re_uri::Origin;
use tokio_stream::{Stream, StreamExt as _};
use crate::{
ApiError, ApiErrorKind, ApiResult, ConnectionClient, MAX_DECODING_MESSAGE_SIZE,
SegmentQueryParams,
};
#[cfg(target_arch = "wasm32")]
pub async fn channel(origin: Origin) -> ApiResult<tonic_web_wasm_client::Client> {
let channel = tonic_web_wasm_client::Client::new_with_options(
origin.as_url(),
tonic_web_wasm_client::options::FetchOptions::new(),
);
Ok(channel)
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn channel(origin: Origin) -> ApiResult<tonic::transport::Channel> {
use std::net::Ipv4Addr;
use tonic::transport::Endpoint;
let http_url = origin.as_url();
let tls_config = if let Ok(cert_path) = std::env::var("RERUN_REDAP_LOCAL_CERT_PATH") {
use tonic::transport::{Certificate, ClientTlsConfig};
re_log::info!(cert_path, "starting client with local TLS cert");
let ca_cert = tokio::fs::read_to_string(&cert_path).await.map_err(|err| {
ApiError::internal_with_source(
err,
format!("couldn't load local cert at {cert_path:?}"),
)
})?;
let ca_cert = Certificate::from_pem(ca_cert);
ClientTlsConfig::new()
.with_enabled_roots()
.ca_certificate(ca_cert)
.domain_name("localhost") .assume_http2(true)
} else {
tonic::transport::ClientTlsConfig::new()
.with_enabled_roots()
.assume_http2(true)
};
let endpoint = {
let mut endpoint = Endpoint::new(http_url)
.and_then(|ep| ep.tls_config(tls_config))
.map_err(|err| ApiError::connection_with_source(err, "connecting to server"))?
.http2_adaptive_window(true) .connect_timeout(std::time::Duration::from_secs(10));
if false {
endpoint = endpoint.initial_stream_window_size(Some(4 * 1024 * 1024));
endpoint = endpoint.initial_connection_window_size(Some(16 * 1024 * 1024));
}
endpoint.connect().await.map_err(|err| {
ApiError::connection_with_source(
err,
format!("failed to connect to server at {origin}"),
)
})
};
match endpoint {
Ok(channel) => Ok(channel),
Err(original_error) => {
if ![
url::Host::Domain("localhost".to_owned()),
url::Host::Ipv4(Ipv4Addr::LOCALHOST),
]
.contains(&origin.host)
{
return Err(original_error);
}
let Ok(endpoint) = Endpoint::new(origin.coerce_http_url()) else {
return Err(original_error);
};
let endpoint = endpoint.http2_adaptive_window(true);
if endpoint.connect().await.is_ok() {
Err(ApiError::connection(
"the server is expecting an unencrypted connection (try `rerun+http://` if you are sure)",
))
} else {
Err(original_error)
}
}
}
}
#[cfg(target_arch = "wasm32")]
pub type RedapClientInner = re_auth::client::AuthService<
tonic::service::interceptor::InterceptedService<
re_protos::headers::PropagateHeaders<tonic_web_wasm_client::Client>,
re_protos::headers::RerunVersionInterceptor,
>,
>;
#[cfg(target_arch = "wasm32")]
pub(crate) async fn client(
origin: Origin,
credentials: Option<Arc<dyn re_auth::credentials::CredentialsProvider + Send + Sync + 'static>>,
) -> ApiResult<RedapClient> {
let channel = crate::with_retry("redap_connection", || async {
channel(origin.clone()).await
})
.await?;
let middlewares = tower::ServiceBuilder::new()
.layer(AuthDecorator::new(credentials))
.layer({
let name = Some("rerun-web".to_owned());
let version = None;
let is_client = true;
re_protos::headers::new_rerun_headers_layer(name, version, is_client)
});
let svc = tower::ServiceBuilder::new()
.layer(middlewares.into_inner())
.service(channel);
Ok(RerunCloudServiceClient::new(svc).max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE))
}
#[cfg(all(not(target_arch = "wasm32"), feature = "perf_telemetry"))]
pub type RedapClientInner = re_auth::client::AuthService<
tonic::service::interceptor::InterceptedService<
re_protos::headers::PropagateHeaders<
re_perf_telemetry::external::tower_http::trace::Trace<
tonic::service::interceptor::InterceptedService<
tonic::transport::Channel,
re_perf_telemetry::TracingInjectorInterceptor,
>,
re_perf_telemetry::external::tower_http::classify::SharedClassifier<
re_perf_telemetry::external::tower_http::classify::GrpcErrorsAsFailures,
>,
re_perf_telemetry::GrpcMakeSpan,
>,
>,
re_protos::headers::RerunVersionInterceptor,
>,
>;
#[cfg(all(not(target_arch = "wasm32"), not(feature = "perf_telemetry")))]
pub type RedapClientInner = re_auth::client::AuthService<
tonic::service::interceptor::InterceptedService<
re_protos::headers::PropagateHeaders<tonic::transport::Channel>,
re_protos::headers::RerunVersionInterceptor,
>,
>;
pub type RedapClient = RerunCloudServiceClient<RedapClientInner>;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) async fn client(
origin: Origin,
credentials: Option<Arc<dyn re_auth::credentials::CredentialsProvider + Send + Sync + 'static>>,
) -> ApiResult<RedapClient> {
let channel = crate::with_retry("redap_connection", || async {
channel(origin.clone()).await
})
.await?;
let middlewares = tower::ServiceBuilder::new()
.layer(AuthDecorator::new(credentials))
.layer({
let name = None;
let version = None;
let is_client = true;
re_protos::headers::new_rerun_headers_layer(name, version, is_client)
});
#[cfg(feature = "perf_telemetry")]
let middlewares = middlewares.layer(re_perf_telemetry::new_client_telemetry_layer());
let svc: RedapClientInner = tower::ServiceBuilder::new()
.layer(middlewares.into_inner())
.service(channel);
Ok(RerunCloudServiceClient::new(svc).max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE))
}
#[cfg(not(target_arch = "wasm32"))]
pub fn fetch_chunks_response_to_chunk_and_segment_id<S>(
response: S,
) -> impl Stream<Item = ApiResult<Vec<(Chunk, Option<String>)>>>
where
S: Stream<Item = tonic::Result<re_protos::cloud::v1alpha1::FetchChunksResponse>>,
{
response
.then(|resp| {
tokio::task::spawn_blocking(move || {
let r = resp.map_err(|err| {
ApiError::tonic(err, "failed to get item in /FetchChunks response stream")
})?;
let _span =
tracing::trace_span!("fetch_chunks::batch_decode", num_chunks = r.chunks.len())
.entered();
r.chunks
.into_iter()
.map(|arrow_msg| {
let segment_id = arrow_msg.store_id.clone().map(|id| id.recording_id);
use re_log_encoding::ToApplication as _;
let arrow_msg = arrow_msg.to_application(()).map_err(|err| {
ApiError::serialization_with_source(
err,
"failed to get arrow data for item in /FetchChunks response stream",
)
})?;
let chunk = re_chunk::Chunk::from_record_batch(&arrow_msg.batch).map_err(
|err| {
ApiError::serialization_with_source(
err,
"failed to parse item in /FetchChunks response stream",
)
},
)?;
Ok((chunk, segment_id))
})
.collect::<Result<Vec<_>, _>>()
})
})
.map(|res| {
res.map_err(|err| {
ApiError::internal_with_source(
err,
"failed to sync on /FetchChunks response stream",
)
})
.and_then(std::convert::identity)
})
}
#[cfg(target_arch = "wasm32")]
pub fn fetch_chunks_response_to_chunk_and_segment_id<S>(
response: S,
) -> impl Stream<Item = ApiResult<Vec<(Chunk, Option<String>)>>>
where
S: Stream<Item = tonic::Result<re_protos::cloud::v1alpha1::FetchChunksResponse>>,
{
response.map(|resp| {
let resp = resp.map_err(|err| {
ApiError::tonic(err, "failed to get item in /FetchChunks response stream")
})?;
let _span =
tracing::trace_span!("fetch_chunks::batch_decode", num_chunks = resp.chunks.len())
.entered();
resp.chunks
.into_iter()
.map(|arrow_msg| {
let segment_id = arrow_msg.store_id.clone().map(|id| id.recording_id);
use re_log_encoding::ToApplication as _;
let arrow_msg = arrow_msg.to_application(()).map_err(|err| {
ApiError::serialization_with_source(
err,
"failed to get arrow data for item in /FetchChunks response stream",
)
})?;
let chunk =
re_chunk::Chunk::from_record_batch(&arrow_msg.batch).map_err(|err| {
ApiError::serialization_with_source(
err,
"failed to parse item in /FetchChunks response stream",
)
})?;
Ok((chunk, segment_id))
})
.collect::<Result<Vec<_>, _>>()
})
}
pub type ProgressCallback = std::sync::Arc<dyn Fn(u64, Option<u64>) + Send + Sync>;
#[derive(Clone, Default)]
pub struct StreamingOptions {
pub force_full_download: bool,
pub on_progress: Option<ProgressCallback>,
}
impl std::fmt::Debug for StreamingOptions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamingOptions")
.field("force_full_download", &self.force_full_download)
.field("on_progress", &self.on_progress.as_ref().map(|_| "…"))
.finish()
}
}
pub async fn stream_blueprint_and_segment_from_server(
mut client: ConnectionClient,
tx: re_log_channel::LogSender,
uri: re_uri::DatasetSegmentUri,
options: StreamingOptions,
) -> ApiResult {
re_log::debug!("Loading {uri}…");
let dataset_entry = client.read_dataset_entry(uri.dataset_id.into()).await?;
let recording_store_id = uri.store_id();
if let Some((blueprint_dataset, blueprint_segment)) =
dataset_entry.dataset_details.default_blueprint()
{
re_log::debug!("Streaming blueprint dataset {blueprint_dataset}");
let blueprint_store_id = StoreId::random(
StoreKind::Blueprint,
recording_store_id.application_id().clone(),
);
let blueprint_store_info = StoreInfo {
store_id: blueprint_store_id.clone(),
cloned_from: None,
store_source: StoreSource::Unknown,
store_version: None,
};
if stream_segment_from_server(
&mut client,
blueprint_store_info,
&tx,
blueprint_dataset,
blueprint_segment,
re_uri::Fragment::default(),
&StreamingOptions::default(),
)
.await?
.is_break()
{
return Ok(());
}
if tx
.send(
LogMsg::BlueprintActivationCommand(BlueprintActivationCommand {
blueprint_id: blueprint_store_id,
make_active: false,
make_default: true,
})
.into(),
)
.is_err()
{
re_log::debug!("Receiver disconnected");
return Ok(());
}
} else {
re_log::debug!("No blueprint dataset found for {uri}");
}
let re_uri::DatasetSegmentUri {
origin: _,
dataset_id,
segment_id,
fragment,
} = uri;
let store_info = StoreInfo {
store_id: recording_store_id,
cloned_from: None,
store_source: StoreSource::Unknown,
store_version: None,
};
if stream_segment_from_server(
&mut client,
store_info,
&tx,
dataset_id.into(),
segment_id.into(),
fragment,
&options,
)
.await?
.is_break()
{
return Ok(());
}
Ok(())
}
async fn stream_segment_from_server(
client: &mut ConnectionClient,
store_info: StoreInfo,
tx: &re_log_channel::LogSender,
dataset_id: EntryId,
segment_id: SegmentId,
fragment: re_uri::Fragment,
options: &StreamingOptions,
) -> ApiResult<ControlFlow<()>> {
let store_id = store_info.store_id.clone();
re_log::debug!("Streaming {store_id:?}…");
if tx
.send(
LogMsg::SetStoreInfo(SetStoreInfo {
row_id: *re_chunk::RowId::new(),
info: store_info,
})
.into(),
)
.is_err()
{
re_log::debug!("Receiver disconnected");
return Ok(ControlFlow::Break(()));
}
#[expect(clippy::collapsible_if)]
if store_id.is_recording() && !fragment.is_empty() {
if tx
.send(
DataSourceUiCommand::SetUrlFragment {
store_id: store_id.clone(),
fragment: fragment.to_string(),
}
.into(),
)
.is_err()
{
re_log::debug!("Receiver disconnected");
return Ok(ControlFlow::Break(()));
}
}
let start_time = web_time::Instant::now();
let manifest_stream_result = client
.get_rrd_manifest_stream(dataset_id, segment_id.clone())
.await;
match manifest_stream_result {
Ok(manifest_stream) => {
let mut manifest_stream = std::pin::pin!(manifest_stream);
let mut rrd_manifest_parts: Vec<Arc<re_log_encoding::RrdManifest>> = Vec::new();
while let Some(part_result) = manifest_stream.next().await {
let raw_rrd_manifest_part = part_result?;
let part_nr = rrd_manifest_parts.len() + 1;
re_log::debug!(
"Received RRD manifest part #{part_nr}/? ({} deflated, {:.1}s elapsed)",
re_format::format_bytes(raw_rrd_manifest_part.total_size_bytes() as _),
start_time.elapsed().as_secs_f32(),
);
let rrd_manifest = re_log_encoding::RrdManifest::try_new(raw_rrd_manifest_part)
.map_err(|err| {
ApiError::invalid_arguments_with_source(err, "Invalid RRD manifest part")
})?;
let rrd_manifest = Arc::new(rrd_manifest);
if tx
.send(DataSourceMessage::RrdManifest(
store_id.clone(),
rrd_manifest.clone(),
))
.is_err()
{
re_log::debug!("Receiver disconnected");
return Ok(ControlFlow::Break(()));
}
rrd_manifest_parts.push(rrd_manifest);
}
if rrd_manifest_parts.is_empty() {
return Err(ApiError::serialization(
"failed to parse the response for /GetRrdManifest (no data)",
));
}
let part_nr = rrd_manifest_parts.len();
re_log::debug!(
"Full RRD manifest loaded in {:.1}s in {}",
start_time.elapsed().as_secs_f32(),
re_format::format_plural_s(part_nr, "part")
);
if tx
.send(DataSourceMessage::RrdManifestComplete(store_id.clone()))
.is_err()
{
re_log::debug!("Receiver disconnected");
return Ok(ControlFlow::Break(()));
}
match store_id.kind() {
StoreKind::Recording if !options.force_full_download => {
re_log::debug!("Letting the viewer load chunks on-demand");
return Ok(ControlFlow::Continue(()));
}
StoreKind::Recording | StoreKind::Blueprint => {
re_log::debug!("Loading all of the chunks in one go; most important first");
let refs: Vec<&re_log_encoding::RrdManifest> =
rrd_manifest_parts.iter().map(|m| m.as_ref()).collect();
let combined = re_log_encoding::RrdManifest::concat(&refs).map_err(|err| {
ApiError::invalid_arguments_with_source(
err,
"Failed to concatenate RRD manifest parts",
)
})?;
let batch = sort_batch(combined.data()).map_err(|err| {
ApiError::invalid_arguments_with_source(err, "Failed to sort chunk index")
})?;
return load_chunks(client, tx, &store_id, batch, options).await;
}
}
}
Err(err) => {
if err.kind == ApiErrorKind::Unimplemented {
re_log::debug_once!("The server does not support on-demand streaming"); } else {
re_log::warn!("Failed to load RRD manifest: {err}");
}
}
}
let mut already_loaded_chunk_ids: ahash::HashSet<ChunkId> = Default::default();
if let Some(time_selection) = fragment.time_selection {
let time_selection_batches = client
.query_dataset_chunk_index(SegmentQueryParams {
dataset_id,
segment_id: segment_id.clone(),
include_static_data: true,
include_temporal_data: true,
query: Some(
re_protos::cloud::v1alpha1::ext::Query::latest_at_range(
time_selection.timeline.name(),
time_selection.range,
)
.into(),
),
})
.await?;
if time_selection_batches.is_empty() {
re_log::debug!(
"No chunks found for time selection {:?} in recording {:?}",
time_selection,
store_id
);
} else {
let batch = arrow::compute::concat_batches(
&time_selection_batches[0].schema(),
&time_selection_batches,
)
.map_err(|err| {
ApiError::invalid_arguments_with_source(err, "Failed to concat chunk index batches")
})?;
let batch = sort_batch(&batch).map_err(|err| {
ApiError::invalid_arguments_with_source(err, "Failed to sort chunk index")
})?;
if let Some(chunk_ids) = chunk_id_column(&batch) {
already_loaded_chunk_ids = chunk_ids.iter().copied().collect();
} else {
re_log::warn_once!(
"Failed to find 'chunk_id' column in chunk index response. Schema: {}",
batch.schema()
);
}
if load_chunks(client, tx, &store_id, batch, options)
.await?
.is_break()
{
return Ok(ControlFlow::Break(()));
}
}
}
let batches = client
.query_dataset_chunk_index(SegmentQueryParams {
dataset_id,
segment_id: segment_id.clone(),
include_static_data: true,
include_temporal_data: true,
query: None, })
.await?;
if batches.is_empty() {
re_log::info!("Empty recording"); return Ok(ControlFlow::Continue(()));
}
let batch = arrow::compute::concat_batches(&batches[0].schema(), &batches).map_err(|err| {
ApiError::invalid_arguments_with_source(err, "Failed to concat chunk index batches")
})?;
let batch = sort_batch(&batch).map_err(|err| {
ApiError::invalid_arguments_with_source(err, "Failed to sort chunk index")
})?;
if let Some(chunk_ids) = chunk_id_column(&batch)
&& !already_loaded_chunk_ids.is_empty()
{
let filtered_indices: Vec<usize> = chunk_ids
.iter()
.enumerate()
.filter_map(|(idx, chunk_id)| {
if already_loaded_chunk_ids.contains(chunk_id) {
None
} else {
Some(idx)
}
})
.collect();
let filtered_batch = re_arrow_util::take_record_batch(&batch, &filtered_indices)
.map_err(|err| ApiError::invalid_arguments_with_source(err, "take_record_batch"))?;
load_chunks(client, tx, &store_id, filtered_batch, options).await
} else {
load_chunks(client, tx, &store_id, batch, options).await
}
}
fn chunk_id_column(batch: &RecordBatch) -> Option<&[ChunkId]> {
batch
.column_by_name("chunk_id")
.and_then(|array| array.as_fixed_size_binary_opt())
.and_then(|array| ChunkId::try_slice_from_arrow(array).ok())
}
async fn load_chunks(
client: &ConnectionClient,
tx: &re_log_channel::LogSender,
store_id: &StoreId,
full_batch: RecordBatch,
options: &StreamingOptions,
) -> ApiResult<ControlFlow<()>> {
let num_chunks = full_batch.num_rows();
re_log::debug!(
"Downloading {} chunks from server…",
re_format::format_uint(num_chunks)
);
if 10_000 < num_chunks {
re_log::warn!(
"There are {} chunks in this recording. Consider running `rerun rrd compact` on it!",
re_format::format_uint(num_chunks)
);
}
use futures::stream::FuturesUnordered;
const BATCH_SIZE: usize = 32;
let total_size_bytes = total_size_bytes_from_batch(&full_batch);
let mut futures = FuturesUnordered::new();
for start in (0..num_chunks).step_by(BATCH_SIZE) {
let end = usize::min(start + BATCH_SIZE, num_chunks);
let small_batch = full_batch.slice(start, end - start);
let mut client = client.clone();
let tx = tx.clone();
let store_id = store_id.clone();
futures.push(async move {
load_small_chunk_batch(&mut client, &tx, &store_id, &small_batch).await
});
}
let mut downloaded_bytes: u64 = 0;
while let Some(res) = futures::stream::StreamExt::next(&mut futures).await {
let (result, batch_bytes) = res?;
downloaded_bytes += batch_bytes;
if let Some(on_progress) = &options.on_progress {
on_progress(downloaded_bytes, total_size_bytes);
}
if result.is_break() {
return Ok(ControlFlow::Break(()));
}
}
re_log::trace!(
"Finished downloading {} chunks.",
re_format::format_uint(num_chunks)
);
Ok(ControlFlow::Continue(()))
}
fn total_size_bytes_from_batch(batch: &RecordBatch) -> Option<u64> {
let col = batch.column_by_name("chunk_byte_size")?;
let array = col.as_primitive_opt::<arrow::datatypes::UInt64Type>()?;
Some(array.iter().map(|v| v.unwrap_or(0)).sum())
}
async fn load_small_chunk_batch(
client: &mut ConnectionClient,
tx: &re_log_channel::LogSender,
store_id: &StoreId,
batch: &RecordBatch,
) -> ApiResult<(ControlFlow<()>, u64)> {
let chunk_stream = client.fetch_segment_chunks_by_id(batch).await?;
let mut chunk_stream = fetch_chunks_response_to_chunk_and_segment_id(chunk_stream);
let mut batch_bytes: u64 = 0;
while let Some(chunks) = chunk_stream.next().await {
for (chunk, _partition_id) in chunks? {
batch_bytes += chunk.heap_size_bytes();
if tx
.send(
LogMsg::ArrowMsg(
store_id.clone(),
chunk.to_arrow_msg().map_err(|err| {
ApiError::serialization_with_source(
err,
"failed to parse chunk in /FetchChunks response stream",
)
})?,
)
.into(),
)
.is_err()
{
re_log::debug!("Receiver disconnected");
return Ok((ControlFlow::Break(()), batch_bytes));
}
}
}
Ok((ControlFlow::Continue(()), batch_bytes))
}
fn sort_batch(batch: &RecordBatch) -> Result<RecordBatch, ArrowError> {
use std::sync::Arc;
let schema = batch.schema();
let chunk_is_static = schema.index_of("chunk_is_static")?;
let chunk_id = schema.index_of("chunk_id")?;
let sort_keys = vec![
arrow::compute::SortColumn {
values: Arc::new(batch.column(chunk_is_static).clone()),
options: Some(arrow::compute::SortOptions {
descending: true,
nulls_first: true,
}),
},
arrow::compute::SortColumn {
values: Arc::new(batch.column(chunk_id).clone()),
options: Some(arrow::compute::SortOptions {
descending: false,
nulls_first: true,
}),
},
];
let indices = arrow::compute::lexsort_to_indices(&sort_keys, None)?;
let sorted = arrow::compute::take_record_batch(batch, &indices)?;
Ok(sorted)
}