use arrow::array::RecordBatch;
use arrow::datatypes::{Schema as ArrowSchema, SchemaRef};
use itertools::Itertools as _;
use re_arrow_util::ArrowArrayDowncastRef as _;
use re_log_encoding::{RawRrdManifest, ToApplication as _};
use re_log_types::EntryId;
use re_protos::EntryName;
use re_protos::cloud::v1alpha1::ext::{
CreateDatasetEntryResponse, CreateTableEntryRequest, DataSource, DataSourceKind,
DatasetDetails, DatasetEntry, EntryDetails, EntryDetailsUpdate, LanceTable, ProviderDetails,
QueryDatasetRequest, QueryTasksOnCompletionRequest, QueryTasksRequest,
ReadDatasetEntryResponse, ReadTableEntryResponse, RegisterTableResponse,
RegisterWithDatasetRequest, RegisterWithDatasetTaskDescriptor, TableEntry, TableInsertMode,
UnregisterFromDatasetRequest, UpdateDatasetEntryRequest, UpdateDatasetEntryResponse,
UpdateEntryRequest, UpdateEntryResponse, VersionResponse,
};
use re_protos::cloud::v1alpha1::rerun_cloud_service_client::RerunCloudServiceClient;
use re_protos::cloud::v1alpha1::{
CancelTasksRequest, CreateDatasetEntryRequest, DeleteEntryRequest, EntryFilter, EntryKind,
FetchChunksRequest, FindEntriesRequest, GetDatasetManifestSchemaRequest,
GetDatasetManifestSchemaResponse, GetDatasetSchemaRequest, GetRrdManifestResponse,
GetSegmentTableSchemaRequest, GetSegmentTableSchemaResponse, QueryDatasetResponse,
QueryTasksOnCompletionResponse, QueryTasksResponse, ReadDatasetEntryRequest,
ReadTableEntryRequest, RegisterWithDatasetResponse, ScanSegmentTableRequest,
ScanSegmentTableResponse, UnregisterFromDatasetResponse, VersionRequest, WriteTableRequest,
};
use re_protos::common::v1alpha1::ext::{IfDuplicateBehavior, ScanParameters, SegmentId};
use re_protos::common::v1alpha1::{DataframePart, TaskId};
use re_protos::external::prost::bytes::Bytes;
use re_protos::headers::RerunHeadersInjectorExt as _;
use re_protos::{TypeConversionError, invalid_schema, missing_column, missing_field};
use std::sync::Arc;
use tokio::sync::OnceCell;
use tokio_stream::{Stream, StreamExt as _};
use tonic::IntoStreamingRequest as _;
use tonic::codegen::{Body, StdError};
use url::Url;
use crate::{ApiError, ApiResponseStream, ApiResult, TraceId, extract_trace_id};
trait TonicResponseExt<T> {
fn into_inner_and_trace_id(self) -> (T, Option<opentelemetry::TraceId>);
}
impl<T> TonicResponseExt<T> for tonic::Response<T> {
fn into_inner_and_trace_id(self) -> (T, Option<opentelemetry::TraceId>) {
let trace_id = extract_trace_id(self.metadata());
(self.into_inner(), trace_id)
}
}
pub type FetchChunksResponseStream =
ApiResponseStream<re_protos::cloud::v1alpha1::FetchChunksResponse>;
pub type QueryDatasetResponseStream =
ApiResponseStream<re_protos::cloud::v1alpha1::QueryDatasetResponse>;
pub struct SegmentQueryParams {
pub dataset_id: EntryId,
pub segment_id: SegmentId,
pub include_static_data: bool,
pub include_temporal_data: bool,
pub generate_direct_urls: bool,
pub query: Option<re_protos::cloud::v1alpha1::Query>,
}
#[derive(Debug, Clone)]
pub struct GenericConnectionClient<T> {
inner: RerunCloudServiceClient<T>,
features: Arc<OnceCell<Vec<String>>>,
}
impl<T> GenericConnectionClient<T> {
pub fn new(client: RerunCloudServiceClient<T>) -> Self {
Self {
inner: client,
features: Arc::new(OnceCell::new()),
}
}
pub fn inner(&mut self) -> &mut RerunCloudServiceClient<T> {
&mut self.inner
}
}
#[derive(Debug, Clone)]
pub struct ConnectionClient {
inner: GenericConnectionClient<crate::grpc::RedapClientInner>,
service: crate::grpc::RedapClientInner,
}
impl ConnectionClient {
pub(crate) fn new(
inner: GenericConnectionClient<crate::grpc::RedapClientInner>,
service: crate::grpc::RedapClientInner,
) -> Self {
Self { inner, service }
}
pub fn service(&self) -> crate::grpc::RedapClientInner {
self.service.clone()
}
#[cfg(any(test, feature = "test_utils"))]
#[cfg(not(target_arch = "wasm32"))]
pub fn new_disconnected() -> Self {
let channel = tonic::transport::Channel::from_static("http://127.0.0.1:1").connect_lazy();
let (raw_client, service) =
crate::grpc::assemble_client(channel, None);
Self::new(GenericConnectionClient::new(raw_client), service)
}
}
impl std::ops::Deref for ConnectionClient {
type Target = GenericConnectionClient<crate::grpc::RedapClientInner>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl std::ops::DerefMut for ConnectionClient {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl<T> GenericConnectionClient<T>
where
T: tonic::client::GrpcService<tonic::body::Body>,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
{
#[tracing::instrument(level = "info", skip_all)]
pub async fn ping(&mut self) -> ApiResult<()> {
self.inner()
.version(VersionRequest {})
.await
.map_err(|err| ApiError::tonic(err, "/Version failed"))
.map(|_| ())
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn version_info(&mut self) -> ApiResult<VersionResponse> {
let response = self
.inner()
.version(VersionRequest {})
.await
.map_err(|err| ApiError::tonic(err, "/Version failed"))?
.into_inner();
Ok(response.into())
}
pub async fn supports_feature(&mut self, feature: &str) -> ApiResult<bool> {
let features_cache;
let features = if let Some(features) = self.features.get() {
features
} else {
features_cache = self.features.clone();
features_cache
.get_or_try_init(|| async {
let info = self.version_info().await?;
Ok(info.features)
})
.await?
};
Ok(features.iter().any(|f| f == feature))
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn who_am_i(&mut self) -> ApiResult<re_protos::cloud::v1alpha1::WhoAmIResponse> {
self.inner()
.who_am_i(re_protos::cloud::v1alpha1::WhoAmIRequest {})
.await
.map(|resp| resp.into_inner())
.map_err(|err| ApiError::tonic(err, "/WhoAmI failed"))
}
pub async fn rtt(&mut self, num_pings: usize) -> ApiResult<std::time::Duration> {
if num_pings == 0 {
return Err(ApiError::invalid_arguments(
"rtt requires at least one ping",
));
}
let mut best = std::time::Duration::MAX;
for _ in 0..num_pings {
let start = web_time::Instant::now();
let mut stream = self
.inner()
.do_bandwidth_test(re_protos::cloud::v1alpha1::DoBandwidthTestRequest {
num_bytes: 1,
})
.await
.map_err(|err| ApiError::tonic(err, "/DoBandwidthTest failed"))?
.into_inner();
while stream
.next()
.await
.transpose()
.map_err(|err| ApiError::tonic(err, "/DoBandwidthTest stream error"))?
.is_some()
{}
best = best.min(start.elapsed());
}
Ok(best)
}
pub async fn bandwidth_bytes_per_sec(
&mut self,
num_bytes: u64,
rtt: std::time::Duration,
) -> ApiResult<Option<f64>> {
let max = re_protos::cloud::v1alpha1::ext::MAX_BANDWIDTH_TEST_BYTES;
if num_bytes > max {
return Err(ApiError::invalid_arguments(format!(
"num_bytes ({num_bytes}) exceeds the maximum of {max}"
)));
}
let start = web_time::Instant::now();
let mut stream = self
.inner()
.do_bandwidth_test(re_protos::cloud::v1alpha1::DoBandwidthTestRequest { num_bytes })
.await
.map_err(|err| ApiError::tonic(err, "/DoBandwidthTest failed"))?
.into_inner();
let mut received: u64 = 0;
while let Some(item) = stream
.next()
.await
.transpose()
.map_err(|err| ApiError::tonic(err, "/DoBandwidthTest stream error"))?
{
received += item.payload.len() as u64;
}
let elapsed = start.elapsed();
let Some(transfer) = elapsed.checked_sub(rtt).filter(|t| !t.is_zero()) else {
return Ok(None);
};
Ok(Some(received as f64 / transfer.as_secs_f64()))
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn find_entries(&mut self, filter: EntryFilter) -> ApiResult<Vec<EntryDetails>> {
let (response, trace_id) = TonicResponseExt::into_inner_and_trace_id(
self.inner()
.find_entries(FindEntriesRequest {
filter: Some(filter),
})
.await
.map_err(|err| ApiError::tonic(err, "/FindEntries failed"))?,
);
response
.entries
.into_iter()
.map(TryInto::try_into)
.try_collect()
.map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed parsing /FindEntries response",
)
})
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn delete_entry(&mut self, entry_id: EntryId) -> ApiResult {
self.inner()
.delete_entry(
tonic::Request::new(DeleteEntryRequest {
id: Some(entry_id.into()),
})
.with_entry_id(entry_id),
)
.await
.map_err(|err| ApiError::tonic(err, "/DeleteEntry failed"))?;
Ok(())
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn update_entry(
&mut self,
entry_id: EntryId,
entry_details_update: EntryDetailsUpdate,
) -> ApiResult<EntryDetails> {
let (inner, trace_id) = TonicResponseExt::into_inner_and_trace_id(
self.inner()
.update_entry(
tonic::Request::new(
UpdateEntryRequest {
id: entry_id,
entry_details_update,
}
.into(),
)
.with_entry_id(entry_id),
)
.await
.map_err(|err| ApiError::tonic(err, "/UpdateEntry failed"))?,
);
let response: UpdateEntryResponse = inner.try_into().map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed parsing /UpdateEntry response",
)
})?;
Ok(response.entry_details)
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn get_dataset_schema(&mut self, entry_id: EntryId) -> ApiResult<ArrowSchema> {
let (inner, trace_id) = TonicResponseExt::into_inner_and_trace_id(
self.inner()
.get_dataset_schema(
tonic::Request::new(GetDatasetSchemaRequest {}).with_entry_id(entry_id),
)
.await
.map_err(|err| ApiError::tonic(err, "/GetDatasetSchema failed"))?,
);
inner.schema().map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed parsing /GetDatasetSchema response",
)
})
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn create_dataset_entry(
&mut self,
name: String,
entry_id: Option<EntryId>,
) -> ApiResult<DatasetEntry> {
let (inner, trace_id) = TonicResponseExt::into_inner_and_trace_id(
self.inner()
.create_dataset_entry(CreateDatasetEntryRequest {
name: Some(name),
id: entry_id.map(Into::into),
})
.await
.map_err(|err| ApiError::tonic(err, "/CreateDatasetEntry failed"))?,
);
let response: CreateDatasetEntryResponse = inner.try_into().map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed parsing /CreateDatasetEntry response",
)
})?;
Ok(response.dataset)
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn read_dataset_entry(&mut self, entry_id: EntryId) -> ApiResult<DatasetEntry> {
let (inner, trace_id) = TonicResponseExt::into_inner_and_trace_id(
self.inner()
.read_dataset_entry(
tonic::Request::new(ReadDatasetEntryRequest {}).with_entry_id(entry_id),
)
.await
.map_err(|err| ApiError::tonic(err, "/ReadDatasetEntry failed"))?,
);
let response: ReadDatasetEntryResponse = inner.try_into().map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed parsing /ReadDatasetEntry response",
)
})?;
Ok(response.dataset_entry)
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn update_dataset_entry(
&mut self,
entry_id: EntryId,
dataset_details: DatasetDetails,
) -> ApiResult<DatasetEntry> {
let (inner, trace_id) = TonicResponseExt::into_inner_and_trace_id(
self.inner()
.update_dataset_entry(
tonic::Request::new(
UpdateDatasetEntryRequest {
id: entry_id,
dataset_details,
}
.into(),
)
.with_entry_id(entry_id),
)
.await
.map_err(|err| ApiError::tonic(err, "/UpdateDatasetEntry failed"))?,
);
let response: UpdateDatasetEntryResponse = inner.try_into().map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed parsing /UpdateDatasetEntry response",
)
})?;
Ok(response.dataset_entry)
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn read_table_entry(&mut self, entry_id: EntryId) -> ApiResult<TableEntry> {
let (inner, trace_id) = TonicResponseExt::into_inner_and_trace_id(
self.inner()
.read_table_entry(
tonic::Request::new(ReadTableEntryRequest {
id: Some(entry_id.into()),
})
.with_entry_id(entry_id),
)
.await
.map_err(|err| ApiError::tonic(err, "/ReadTableEntry failed"))?,
);
let response: ReadTableEntryResponse = inner.try_into().map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed parsing /ReadTableEntry response",
)
})?;
Ok(response.table_entry)
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn get_segment_table_schema(&mut self, entry_id: EntryId) -> ApiResult<ArrowSchema> {
let (inner, trace_id) = TonicResponseExt::into_inner_and_trace_id(
self.inner()
.get_segment_table_schema(
tonic::Request::new(GetSegmentTableSchemaRequest {}).with_entry_id(entry_id),
)
.await
.map_err(|err| ApiError::tonic(err, "GetSegmentTableSchema failed"))?,
);
inner
.schema
.ok_or_else(|| {
let err = missing_field!(GetSegmentTableSchemaResponse, "schema");
ApiError::deserialization_with_source(
trace_id,
err,
"missing field in /GetSegmentTableSchema response",
)
})?
.try_into()
.map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed parsing /GetSegmentTableSchema response",
)
})
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn get_dataset_segment_ids(
&mut self,
entry_id: EntryId,
) -> ApiResult<Vec<SegmentId>> {
const COLUMN_NAME: &str = ScanSegmentTableResponse::FIELD_SEGMENT_ID;
let response = self
.inner()
.scan_segment_table(
tonic::Request::new(ScanSegmentTableRequest {
columns: vec![COLUMN_NAME.to_owned()],
})
.with_entry_id(entry_id),
)
.await
.map_err(|err| ApiError::tonic(err, "/ScanSegmentTable failed"))?;
let mut stream = ApiResponseStream::from_tonic_response(response, "/ScanSegmentTable");
let trace_id = stream.trace_id();
let mut segment_ids = Vec::new();
while let Some(resp) = stream.next().await {
let record_batch: RecordBatch = resp?
.data()
.map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed parsing item from /ScanSegmentTable stream",
)
})?
.try_into()
.map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed decoding item from /ScanSegmentTable stream",
)
})?;
let segment_id_col = record_batch.column_by_name(COLUMN_NAME).ok_or_else(|| {
let err = missing_column!(ScanSegmentTableResponse, COLUMN_NAME);
ApiError::deserialization_with_source(
trace_id,
err,
"missing column from item in /ScanSegmentTable stream",
)
})?;
let segment_id_array = segment_id_col
.try_downcast_array_ref::<arrow::array::StringArray>()
.map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"unexpected types in item in /ScanSegmentTable stream",
)
})?;
segment_ids.extend(
segment_id_array
.iter()
.filter_map(|opt| opt.map(|s| SegmentId::new(s.to_owned()))),
);
}
Ok(segment_ids)
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn get_dataset_manifest_schema(
&mut self,
entry_id: EntryId,
) -> ApiResult<ArrowSchema> {
let (inner, trace_id) = TonicResponseExt::into_inner_and_trace_id(
self.inner()
.get_dataset_manifest_schema(
tonic::Request::new(GetDatasetManifestSchemaRequest {}).with_entry_id(entry_id),
)
.await
.map_err(|err| ApiError::tonic(err, "/GetDatasetManifestSchema failed"))?,
);
inner
.schema
.ok_or_else(|| {
let err = missing_field!(GetDatasetManifestSchemaResponse, "schema");
ApiError::deserialization_with_source(
trace_id,
err,
"missing field in /GetDatasetManifestSchema response",
)
})?
.try_into()
.map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed parsing /GetDatasetManifestSchema response",
)
})
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn get_rrd_manifest_stream(
&mut self,
dataset_id: EntryId,
segment_id: SegmentId,
) -> ApiResult<ApiResponseStream<RawRrdManifest>> {
let response = self
.inner()
.get_rrd_manifest(
tonic::Request::new(re_protos::cloud::v1alpha1::GetRrdManifestRequest {
segment_id: Some(segment_id.clone().into()),
})
.with_entry_id(dataset_id),
)
.await
.map_err(|err| ApiError::tonic(err, "/GetRrdManifest failed"))?;
let stream = ApiResponseStream::from_tonic_response(response, "/GetRrdManifest");
let trace_id = stream.trace_id();
let stream = stream.map(move |resp| {
resp?
.rrd_manifest
.ok_or_else(|| {
let err = missing_field!(GetRrdManifestResponse, "rrd_manifest");
ApiError::deserialization_with_source(
trace_id,
err,
"missing field in /GetRrdManifest response",
)
})?
.to_application(())
.map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed parsing /GetRrdManifest response",
)
})
});
Ok(ApiResponseStream::new(stream, trace_id))
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn get_rrd_manifest(
&mut self,
dataset_id: EntryId,
segment_id: SegmentId,
) -> ApiResult<RawRrdManifest> {
let stream = self.get_rrd_manifest_stream(dataset_id, segment_id).await?;
let trace_id = stream.trace_id();
futures::pin_mut!(stream);
let mut rrd_manifest_parts = Vec::new();
while let Some(part) = stream.next().await {
rrd_manifest_parts.push(part?);
}
let Some(mut rrd_manifest) = rrd_manifest_parts.first().cloned() else {
return Err(ApiError::deserialization(
trace_id,
"failed to parse the response for /GetRrdManifest (no data)",
));
};
let data_parts = rrd_manifest_parts.into_iter().map(|p| p.data).collect_vec();
rrd_manifest.data =
re_arrow_util::concat_polymorphic_batches(&data_parts).map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed concatenating /GetRrdManifest response parts",
)
})?;
Ok(rrd_manifest)
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn query_dataset_raw(
&mut self,
params: SegmentQueryParams,
) -> ApiResult<QueryDatasetResponseStream> {
let SegmentQueryParams {
dataset_id,
segment_id,
include_static_data,
include_temporal_data,
query,
generate_direct_urls,
} = params;
let query_request = QueryDatasetRequest {
segment_ids: vec![segment_id],
chunk_ids: vec![],
entity_paths: vec![],
select_all_entity_paths: true,
fuzzy_descriptors: vec![],
exclude_static_data: !include_static_data,
exclude_temporal_data: !include_temporal_data,
query: query
.map(|q| q.try_into())
.transpose()
.map_err(|err| ApiError::tonic(err, "failed building /QueryDataset request"))?,
scan_parameters: Some(ScanParameters {
columns: FetchChunksRequest::required_column_names(),
..Default::default()
}),
generate_direct_urls,
};
let response = self
.inner()
.query_dataset(tonic::Request::new(query_request.into()).with_entry_id(dataset_id))
.await
.map_err(|err| ApiError::tonic(err, "/QueryDataset failed"))?;
Ok(ApiResponseStream::from_tonic_response(
response,
"/QueryDataset",
))
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn query_dataset_chunk_index(
&mut self,
params: SegmentQueryParams,
) -> ApiResult<Vec<RecordBatch>> {
let stream = self.query_dataset_raw(params).await?;
let trace_id = stream.trace_id();
let responses: Vec<_> = stream.collect::<Vec<_>>().await.into_iter().try_collect()?;
responses
.into_iter()
.map(|resp| {
resp.data.ok_or_else(|| {
let err = missing_field!(QueryDatasetResponse, "data");
ApiError::deserialization_with_source(
trace_id,
err,
"missing field in item in /QueryDataset response stream",
)
})
})
.map(|batch| {
arrow::array::RecordBatch::try_from(batch?).map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed converting to RecordBatch",
)
})
})
.collect()
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn fetch_segment_chunks_by_id(
&mut self,
record_batch: &RecordBatch,
) -> ApiResult<FetchChunksResponseStream> {
let fetch_chunks_request = FetchChunksRequest {
chunk_infos: vec![DataframePart::from(record_batch)],
};
let mut req = tonic::Request::new(fetch_chunks_request);
req.set_timeout(crate::FETCH_CHUNKS_DEADLINE);
let response = self
.inner()
.fetch_chunks(req)
.await
.map_err(|err| ApiError::tonic(err, "/FetchChunks failed"))?;
Ok(ApiResponseStream::from_tonic_response(
response,
"/FetchChunks",
))
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn fetch_segment_chunks_by_query(
&mut self,
params: SegmentQueryParams,
) -> ApiResult<FetchChunksResponseStream> {
let stream = self.query_dataset_raw(params).await?;
let query_trace_id = stream.trace_id();
let responses: Vec<_> = stream.collect::<Vec<_>>().await.into_iter().try_collect()?;
let chunk_info_batches: Vec<_> = responses
.into_iter()
.map(|resp| {
resp.data.ok_or_else(|| {
let err = missing_field!(QueryDatasetResponse, "data");
ApiError::deserialization_with_source(
query_trace_id,
err,
"missing field in item in /QueryDataset response stream",
)
})
})
.try_collect()?;
if chunk_info_batches.is_empty() {
return Ok(ApiResponseStream::new(
tokio_stream::empty::<ApiResult<re_protos::cloud::v1alpha1::FetchChunksResponse>>(),
None,
));
}
let fetch_chunks_request = FetchChunksRequest {
chunk_infos: chunk_info_batches,
};
let mut req = tonic::Request::new(fetch_chunks_request);
req.set_timeout(crate::FETCH_CHUNKS_DEADLINE);
let response = self
.inner()
.fetch_chunks(req)
.await
.map_err(|err| ApiError::tonic(err, "/FetchChunks failed"))?;
Ok(ApiResponseStream::from_tonic_response(
response,
"/FetchChunks",
))
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn register_with_dataset(
&mut self,
dataset_id: EntryId,
data_sources: Vec<DataSource>,
on_duplicate: IfDuplicateBehavior,
) -> ApiResult<(Option<TraceId>, Vec<RegisterWithDatasetTaskDescriptor>)> {
let req = tonic::Request::new(RegisterWithDatasetRequest {
data_sources,
on_duplicate,
})
.with_entry_id(dataset_id);
let (inner, trace_id) = TonicResponseExt::into_inner_and_trace_id(
self.inner()
.register_with_dataset(req.map(Into::into))
.await
.map_err(|err| ApiError::tonic(err, "/RegisterWithDataset failed"))?,
);
let response: RecordBatch = inner
.data
.ok_or_else(|| {
let err = missing_field!(RegisterWithDatasetResponse, "data");
ApiError::deserialization_with_source(
trace_id,
err,
"missing field in /RegisterWithDataset response",
)
})?
.try_into()
.map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed decoding /RegisterWithDataset response",
)
})?;
#[expect(clippy::overly_complex_bool_expr)]
if false
&& !response
.schema()
.contains(&RegisterWithDatasetResponse::schema())
{
let err = invalid_schema!(RegisterWithDatasetResponse);
return Err(ApiError::deserialization_with_source(
trace_id,
err,
"invalid schema in /RegisterWithDataset response",
));
}
let get_string_array = |column_name: &'static str| {
response
.column_by_name(column_name)
.and_then(|column| {
column
.try_downcast_array_ref::<arrow::array::StringArray>()
.ok()
})
.ok_or_else(|| {
let err = missing_column!(RegisterWithDatasetResponse, column_name);
ApiError::deserialization_with_source(
trace_id,
err,
"missing column in /RegisterWithDataset response",
)
})
};
let segment_id_column = get_string_array(RegisterWithDatasetResponse::FIELD_SEGMENT_ID)?;
let segment_type_column = DataSourceKind::many_from_arrow(
response
.column_by_name(RegisterWithDatasetResponse::FIELD_SEGMENT_TYPE)
.ok_or_else(|| {
let err = missing_column!(
RegisterWithDatasetResponse,
RegisterWithDatasetResponse::FIELD_SEGMENT_TYPE
);
ApiError::deserialization_with_source(
trace_id,
err,
"missing column in /RegisterWithDataset response",
)
})?,
)
.map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed parsing /RegisterWithDataset response",
)
})?;
let storage_url_column = get_string_array(RegisterWithDatasetResponse::FIELD_STORAGE_URL)?;
let task_id_column = get_string_array(RegisterWithDatasetResponse::FIELD_TASK_ID)?;
let descriptors = itertools::izip!(
segment_id_column,
segment_type_column,
storage_url_column,
task_id_column,
)
.map(|(segment_id, segment_type, storage_url, task_id)| {
Ok(RegisterWithDatasetTaskDescriptor {
segment_id: SegmentId::new(
segment_id
.ok_or_else(|| {
let err = missing_field!(RegisterWithDatasetResponse, "segment_id");
ApiError::deserialization_with_source(
trace_id,
err,
"missing field in /RegisterWithDataset response",
)
})?
.to_owned(),
),
segment_type,
storage_url: url::Url::parse(storage_url.ok_or_else(|| {
let err = missing_field!(RegisterWithDatasetResponse, "storage_url");
ApiError::deserialization_with_source(
trace_id,
err,
"missing field in /RegisterWithDataset response",
)
})?)
.map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
TypeConversionError::UrlParseError(err),
"failed to parse /RegisterWithDataset response",
)
})?,
task_id: TaskId {
id: task_id
.ok_or_else(|| {
let err = missing_field!(RegisterWithDatasetResponse, "task_id");
ApiError::deserialization_with_source(
trace_id,
err,
"missing field in /RegisterWithDataset response",
)
})?
.to_owned(),
},
})
})
.try_collect()?;
Ok((trace_id, descriptors))
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn unregister_from_dataset(
&mut self,
dataset_id: EntryId,
segments_to_drop: Vec<String>,
layers_to_drop: Vec<String>,
force: bool,
) -> ApiResult<Vec<RecordBatch>> {
let req = tonic::Request::new(
UnregisterFromDatasetRequest {
segments_to_drop: segments_to_drop.into_iter().map(Into::into).collect(),
layers_to_drop,
force,
}
.into(),
)
.with_entry_id(dataset_id);
use futures::TryStreamExt as _;
let response = self
.inner()
.unregister_from_dataset(req)
.await
.map_err(|err| ApiError::tonic(err, "/UnregisterFromDataset failed"))?;
let stream = ApiResponseStream::from_tonic_response(response, "/UnregisterFromDataset");
let trace_id = stream.trace_id();
let responses: Vec<_> = stream.try_collect().await?;
let batches: ApiResult<Vec<RecordBatch>> = responses
.into_iter()
.map(|resp| {
resp.data
.ok_or_else(|| {
let err = missing_field!(UnregisterFromDatasetResponse, "data");
ApiError::deserialization_with_source(
trace_id,
err,
"missing field in /UnregisterFromDataset response",
)
})?
.try_into()
.map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed decoding /UnregisterFromDataset response",
)
})
})
.collect();
batches
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn register_table(
&mut self,
name: EntryName,
url: url::Url,
) -> ApiResult<TableEntry> {
let request = re_protos::cloud::v1alpha1::ext::RegisterTableRequest {
name,
provider_details: ProviderDetails::LanceTable(LanceTable { table_url: url }),
};
let (inner, trace_id) = TonicResponseExt::into_inner_and_trace_id(
self.inner()
.register_table(tonic::Request::new(request.try_into().map_err(|err| {
ApiError::serialization_with_source(
err,
"failed building /RegisterTable request",
)
})?))
.await
.map_err(|err| ApiError::tonic(err, "/RegisterTable failed"))?,
);
let response: RegisterTableResponse = inner.try_into().map_err(|err| {
ApiError::deserialization_with_source(
trace_id,
err,
"failed parsing /RegisterTable response",
)
})?;
Ok(response.table_entry)
}
#[expect(clippy::fn_params_excessive_bools)] #[tracing::instrument(level = "info", skip_all)]
pub async fn do_maintenance(
&mut self,
dataset_id: EntryId,
optimize_indexes: bool,
retrain_indexes: bool,
compact_fragments: bool,
cleanup_before: Option<jiff::Timestamp>,
unsafe_allow_recent_cleanup: bool,
) -> ApiResult {
self.inner()
.do_maintenance(
tonic::Request::new(
re_protos::cloud::v1alpha1::ext::DoMaintenanceRequest {
optimize_indexes,
retrain_indexes,
compact_fragments,
cleanup_before,
unsafe_allow_recent_cleanup,
}
.into(),
)
.with_entry_id(dataset_id),
)
.await
.map_err(|err| ApiError::tonic(err, "/DoMaintenance failed"))?;
Ok(())
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn do_global_maintenance(&mut self) -> ApiResult {
self.inner()
.do_global_maintenance(tonic::Request::new(
re_protos::cloud::v1alpha1::DoGlobalMaintenanceRequest {},
))
.await
.map_err(|err| ApiError::tonic(err, "/DoGlobalMaintenance failed"))?;
Ok(())
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn get_table_names(&mut self) -> ApiResult<Vec<EntryName>> {
Ok(self
.find_entries(re_protos::cloud::v1alpha1::EntryFilter {
entry_kind: Some(EntryKind::Table.into()),
..Default::default()
})
.await?
.into_iter()
.map(|entry| entry.name.clone())
.collect())
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn query_tasks_on_completion(
&mut self,
task_ids: Vec<TaskId>,
timeout: std::time::Duration,
) -> ApiResult<ApiResponseStream<QueryTasksOnCompletionResponse>> {
let q = QueryTasksOnCompletionRequest { task_ids, timeout };
let response = self
.inner()
.query_tasks_on_completion(tonic::Request::new(q.try_into().map_err(|err| {
ApiError::serialization_with_source(
err,
"failed building /QueryTasksOnCompletion request",
)
})?))
.await
.map_err(|err| ApiError::tonic(err, "/QueryTasksOnCompletion failed"))?;
Ok(ApiResponseStream::from_tonic_response(
response,
"/QueryTasksOnCompletion",
))
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn cancel_tasks(&mut self, task_ids: Vec<TaskId>) -> ApiResult {
self.inner()
.cancel_tasks(CancelTasksRequest { ids: task_ids })
.await
.map_err(|err| ApiError::tonic(err, "/CancelTasks failed"))?;
Ok(())
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn query_tasks(&mut self, task_ids: Vec<TaskId>) -> ApiResult<QueryTasksResponse> {
let q = QueryTasksRequest { task_ids };
let response = self
.inner()
.query_tasks(tonic::Request::new(q.try_into().map_err(|err| {
ApiError::serialization_with_source(err, "failed building /QueryTasks request")
})?))
.await
.map_err(|err| ApiError::tonic(err, "/QueryTasks failed"))?
.into_inner();
Ok(response)
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn get_entry_id(
&mut self,
entry_name: &EntryName,
entry_kind: Option<EntryKind>,
) -> ApiResult<Option<EntryId>> {
let (inner, trace_id) = TonicResponseExt::into_inner_and_trace_id(
self.inner()
.find_entries(FindEntriesRequest {
filter: Some(EntryFilter {
id: None,
name: Some(entry_name.to_string()),
entry_kind: entry_kind.map(|kind| kind.into()),
}),
})
.await
.map_err(|err| ApiError::tonic(err, "/FindEntries failed"))?,
);
inner
.entries
.first()
.and_then(|entry| entry.id)
.map(|id| {
EntryId::try_from(id).map_err(|err| {
ApiError::deserialization_with_source(trace_id, err, "/FindEntries failed")
})
})
.transpose()
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn write_table(
&mut self,
stream: impl Stream<Item = RecordBatch> + Send + 'static,
table_id: EntryId,
insert_mode: TableInsertMode,
) -> ApiResult {
let insert_mode = re_protos::cloud::v1alpha1::TableInsertMode::from(insert_mode).into();
let stream = stream
.map(move |batch| WriteTableRequest {
dataframe_part: Some(batch.into()),
insert_mode,
})
.into_streaming_request()
.with_entry_id(table_id);
self.inner()
.write_table(stream)
.await
.map(|_| ())
.map_err(|err| ApiError::tonic(err, "/WriteTable failed"))
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn create_table_entry(
&mut self,
name: EntryName,
url: Option<Url>,
schema: SchemaRef,
) -> ApiResult<TableEntry> {
let provider_details =
url.map(|url| ProviderDetails::LanceTable(LanceTable { table_url: url }));
let request = CreateTableEntryRequest {
name,
schema: schema.as_ref().clone(),
provider_details,
};
let (resp, trace_id) = self
.inner()
.create_table_entry(tonic::Request::new(request.try_into().map_err(|err| {
ApiError::internal_with_source(None, err, "/CreateTableEntry failed")
})?))
.await
.map_err(|err| ApiError::tonic(err, "failed to create table"))?
.into_inner_and_trace_id();
resp.table
.ok_or_else(|| {
ApiError::deserialization(
trace_id,
"/CreateTable failed: entry ID not set in response",
)
})?
.try_into()
.map_err(|err| ApiError::internal_with_source(trace_id, err, "/CreateTable failed"))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn supports_feature_short_circuits_when_cache_is_populated() {
let channel = tonic::transport::Channel::from_static("http://127.0.0.1:1").connect_lazy();
let mut client = GenericConnectionClient::new(RerunCloudServiceClient::new(channel));
client
.features
.set(vec![
"per_segment_index_values".to_owned(),
"future_X".to_owned(),
])
.expect("freshly-constructed cell is empty");
assert!(
client
.supports_feature("per_segment_index_values")
.await
.unwrap()
);
assert!(client.supports_feature("future_X").await.unwrap());
assert!(!client.supports_feature("nonexistent").await.unwrap());
}
#[tokio::test]
async fn features_cache_is_shared_across_clones() {
let channel = tonic::transport::Channel::from_static("http://127.0.0.1:1").connect_lazy();
let client_a = GenericConnectionClient::new(RerunCloudServiceClient::new(channel));
let mut client_b = client_a.clone();
client_a
.features
.set(vec!["per_segment_index_values".to_owned()])
.expect("freshly-constructed cell is empty");
assert!(
client_b
.supports_feature("per_segment_index_values")
.await
.unwrap()
);
assert!(!client_b.supports_feature("nonexistent").await.unwrap());
}
#[tokio::test]
async fn supports_feature_returns_false_for_empty_features_list() {
let channel = tonic::transport::Channel::from_static("http://127.0.0.1:1").connect_lazy();
let mut client = GenericConnectionClient::new(RerunCloudServiceClient::new(channel));
client
.features
.set(vec![])
.expect("freshly-constructed cell is empty");
assert!(
!client
.supports_feature("per_segment_index_values")
.await
.unwrap()
);
}
}