use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::path::PathBuf;
use std::sync::Arc;
use arrow::array::BinaryArray;
use arrow::record_batch::RecordBatch;
use cfg_if::cfg_if;
use datafusion::logical_expr::dml::InsertOp;
use datafusion::prelude::SessionContext;
use nohash_hasher::{IntMap, IntSet};
use tokio_stream::StreamExt as _;
use tonic::{Code, Request, Response, Status};
use re_arrow_util::RecordBatchExt as _;
use re_chunk_store::{
Chunk, ChunkId, ChunkStore, ChunkStoreHandle, ChunkTrackingMode, LatestAtQuery, RangeQuery,
};
use re_log_encoding::ToTransport as _;
use re_log_types::{AbsoluteTimeRange, EntityPath, EntryId, StoreId, StoreKind, Timeline};
use re_protos::cloud::v1alpha1::rerun_cloud_service_server::RerunCloudService;
use re_protos::cloud::v1alpha1::{
CancelTasksRequest, CancelTasksResponse, DeleteEntryResponse, EntryDetails, EntryKind,
FetchChunksRequest, GetDatasetManifestSchemaRequest, GetDatasetManifestSchemaResponse,
GetDatasetSchemaResponse, GetRrdManifestResponse, GetSegmentTableSchemaResponse,
QueryDatasetResponse, QueryTasksOnCompletionRequest, QueryTasksOnCompletionResponse,
QueryTasksRequest, QueryTasksResponse, RegisterTableRequest, RegisterTableResponse,
RegisterWithDatasetResponse, ScanDatasetManifestRequest, ScanDatasetManifestResponse,
ScanSegmentTableResponse, ScanTableResponse,
};
use re_protos::common::v1alpha1::TaskId;
use re_protos::common::v1alpha1::ext::{IfDuplicateBehavior, SegmentId};
use re_protos::headers::RerunHeadersExtractorExt as _;
use re_protos::missing_field;
use re_protos::{
EntryName,
cloud::v1alpha1::ext::{
self, CreateDatasetEntryRequest, CreateDatasetEntryResponse, CreateTableEntryRequest,
CreateTableEntryResponse, DataSource, EntryDetailsUpdate, LanceTable, ProviderDetails,
QueryDatasetRequest, ReadDatasetEntryResponse, ReadTableEntryResponse, TableInsertMode,
UpdateDatasetEntryRequest, UpdateDatasetEntryResponse, UpdateEntryRequest,
UpdateEntryResponse,
},
};
use re_tuid::Tuid;
use crate::OnError;
use crate::entrypoint::NamedPath;
use crate::store::ResolvedStore;
use crate::store::{
ChunkKey, Dataset, Error, InMemoryStore, StoreSlotId, TASK_ID_SUCCESS, Table, TaskResult,
};
#[derive(Debug)]
pub struct RerunCloudHandlerSettings {
storage_dir: tempfile::TempDir,
}
impl Default for RerunCloudHandlerSettings {
fn default() -> Self {
Self {
storage_dir: create_data_dir().expect("Failed to create data directory"),
}
}
}
fn create_data_dir() -> Result<tempfile::TempDir, crate::store::Error> {
Ok(tempfile::Builder::new().prefix("rerun-data-").tempdir()?)
}
#[derive(Default)]
pub struct RerunCloudHandlerBuilder {
settings: RerunCloudHandlerSettings,
store: InMemoryStore,
}
impl RerunCloudHandlerBuilder {
pub fn new() -> Self {
Self::default()
}
pub async fn with_directory_as_dataset(
mut self,
directory: &NamedPath,
on_duplicate: IfDuplicateBehavior,
on_error: crate::OnError,
) -> Result<Self, crate::store::Error> {
self.store
.load_directory_as_dataset(directory, on_duplicate, on_error)
.await?;
Ok(self)
}
pub async fn with_rrds_as_dataset(
mut self,
dataset_name: EntryName,
rrd_paths: Vec<PathBuf>,
on_duplicate: IfDuplicateBehavior,
on_error: crate::OnError,
) -> Result<Self, crate::store::Error> {
let dataset_id = self.store.create_dataset(dataset_name, None)?;
for rrd_path in rrd_paths {
let load_result = self
.store
.register_rrd_to_dataset(
dataset_id,
&rrd_path,
None,
on_duplicate,
StoreKind::Recording,
)
.await;
match load_result {
Ok(_segment_ids) => {}
Err(err) => match on_error {
OnError::Continue => {
re_log::warn!("Failed loading file {}: {err}", rrd_path.display());
}
OnError::Abort => {
return Err(err);
}
},
}
}
Ok(self)
}
#[cfg(feature = "lance")]
pub async fn with_directory_as_table(
mut self,
path: &NamedPath,
on_duplicate: IfDuplicateBehavior,
) -> Result<Self, crate::store::Error> {
self.store
.load_directory_as_table(path, on_duplicate)
.await?;
Ok(self)
}
pub fn with_eager_chunk_store_config(
mut self,
config: re_chunk_store::ChunkStoreConfig,
) -> Self {
self.store.set_eager_chunk_store_config(config);
self
}
pub fn build(self) -> RerunCloudHandler {
RerunCloudHandler::new(self.settings, self.store)
}
}
pub struct RerunCloudHandler {
settings: RerunCloudHandlerSettings,
eager_chunk_store_config: re_chunk_store::ChunkStoreConfig,
store: tokio::sync::RwLock<InMemoryStore>,
}
impl RerunCloudHandler {
pub fn new(settings: RerunCloudHandlerSettings, store: InMemoryStore) -> Self {
let eager_chunk_store_config = store.eager_chunk_store_config();
Self {
settings,
eager_chunk_store_config,
store: tokio::sync::RwLock::new(store),
}
}
async fn get_chunk_stores(
&self,
dataset_id: EntryId,
segment_ids: &[SegmentId],
) -> tonic::Result<Vec<(SegmentId, String, StoreSlotId, ResolvedStore)>> {
let store = self.store.read().await;
let dataset = store.dataset(dataset_id)?;
Ok(dataset
.segments_from_ids(segment_ids)?
.flat_map(|(segment_id, segment)| {
segment.iter_layers().map(|(layer_name, layer)| {
(
segment_id.clone(),
layer_name.to_owned(),
layer.store_slot_id(),
layer.resolved_store().clone(),
)
})
})
.collect())
}
fn resolve_data_sources(data_sources: &[DataSource]) -> tonic::Result<Vec<DataSource>> {
let mut resolved = Vec::<DataSource>::with_capacity(data_sources.len());
for source in data_sources {
if source.is_prefix {
if source.storage_url.scheme() == "memory" {
return Err(tonic::Status::invalid_argument(
"memory:// URLs cannot be used as prefix data sources",
));
}
let path = source.storage_url.to_file_path().map_err(|_err| {
tonic::Status::invalid_argument(format!(
"getting file path from {:?}",
source.storage_url
))
})?;
let meta = std::fs::metadata(&path).map_err(|err| match err.kind() {
std::io::ErrorKind::NotFound => {
tonic::Status::invalid_argument(format!("Directory not found: {:?}", &path))
}
_ => tonic::Status::invalid_argument(format!(
"Failed to read directory metadata {path:?}: {err:#}"
)),
})?;
if !meta.is_dir() {
return Err(tonic::Status::invalid_argument(format!(
"expected prefix / directory but got an object ({path:?})"
)));
}
let mut dirs_to_visit = vec![path];
let mut files = Vec::new();
while let Some(current_dir) = dirs_to_visit.pop() {
let entries = std::fs::read_dir(¤t_dir).map_err(|err| {
tonic::Status::internal(format!(
"Failed to read directory {current_dir:?}: {err:#}"
))
})?;
for entry in entries {
let entry = entry.map_err(|err| {
tonic::Status::internal(format!(
"Failed to read directory entry: {err:#}"
))
})?;
let entry_path = entry.path();
if entry_path.is_dir() {
dirs_to_visit.push(entry_path);
} else if let Some(extension) = entry_path.extension()
&& extension == "rrd"
{
files.push(entry_path);
}
}
}
if files.is_empty() {
return Err(tonic::Status::invalid_argument(format!(
"no rrd files found in {:?}",
source.storage_url
)));
}
for file_path in files {
let mut file_url = source.storage_url.clone();
file_url.set_path(&file_path.to_string_lossy());
resolved.push(DataSource {
storage_url: file_url,
is_prefix: false,
..source.clone()
});
}
} else {
resolved.push(source.clone());
}
}
Ok(resolved)
}
}
impl std::fmt::Debug for RerunCloudHandler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RerunCloudHandler").finish()
}
}
macro_rules! decl_stream {
($stream:ident<manifest:$resp:ident>) => {
pub type $stream = std::pin::Pin<
Box<
dyn futures::Stream<Item = tonic::Result<re_protos::cloud::v1alpha1::$resp>> + Send,
>,
>;
};
($stream:ident<rerun_cloud:$resp:ident>) => {
pub type $stream = std::pin::Pin<
Box<
dyn futures::Stream<Item = tonic::Result<re_protos::cloud::v1alpha1::$resp>> + Send,
>,
>;
};
($stream:ident<tasks:$resp:ident>) => {
pub type $stream = std::pin::Pin<
Box<
dyn futures::Stream<Item = tonic::Result<re_protos::cloud::v1alpha1::$resp>> + Send,
>,
>;
};
}
decl_stream!(FetchChunksResponseStream<manifest:FetchChunksResponse>);
decl_stream!(GetRrdManifestResponseStream<manifest:GetRrdManifestResponse>);
decl_stream!(QueryDatasetResponseStream<manifest:QueryDatasetResponse>);
decl_stream!(QueryTasksOnCompletionResponseStream<tasks:QueryTasksOnCompletionResponse>);
decl_stream!(ScanDatasetManifestResponseStream<manifest:ScanDatasetManifestResponse>);
decl_stream!(ScanSegmentTableResponseStream<manifest:ScanSegmentTableResponse>);
decl_stream!(ScanTableResponseStream<rerun_cloud:ScanTableResponse>);
decl_stream!(SearchDatasetResponseStream<manifest:SearchDatasetResponse>);
decl_stream!(UnregisterFromDatasetResponseStream<manifest:UnregisterFromDatasetResponse>);
impl RerunCloudHandler {
async fn find_datasets(
&self,
entry_id: Option<EntryId>,
name: Option<EntryName>,
store_kind: Option<StoreKind>,
) -> tonic::Result<Vec<EntryDetails>> {
let store = self.store.read().await;
let dataset = match (entry_id, name) {
(None, None) => None,
(Some(entry_id), None) => Some(store.dataset(entry_id)?),
(None, Some(name)) => Some(store.dataset_by_name(&name)?),
(Some(entry_id), Some(name)) => {
let dataset = store.dataset_by_name(&name)?;
if dataset.id() != entry_id {
return Err(tonic::Status::not_found(format!(
"Dataset with ID {entry_id} not found"
)));
}
Some(dataset)
}
};
let dataset_iter = if let Some(dataset) = dataset {
itertools::Either::Left(std::iter::once(dataset))
} else {
itertools::Either::Right(store.iter_datasets())
};
Ok(dataset_iter
.filter(|dataset| {
store_kind.is_none_or(|store_kind| dataset.store_kind() == store_kind)
})
.map(Dataset::as_entry_details)
.map(Into::into)
.collect())
}
async fn find_tables(
&self,
entry_id: Option<EntryId>,
name: Option<EntryName>,
) -> tonic::Result<Vec<EntryDetails>> {
let store = self.store.read().await;
let table = match (entry_id, name) {
(None, None) => None,
(Some(entry_id), None) => {
let Some(table) = store.table(entry_id) else {
return Err(tonic::Status::not_found(format!(
"Table with ID {entry_id} not found"
)));
};
Some(table)
}
(None, Some(name)) => {
let Some(table) = store.table_by_name(&name) else {
return Err(tonic::Status::not_found(format!(
"Table with name {name} not found"
)));
};
Some(table)
}
(Some(entry_id), Some(name)) => {
let Some(table) = store.table_by_name(&name) else {
return Err(tonic::Status::not_found(format!(
"Table with name {name} not found"
)));
};
if table.id() != entry_id {
return Err(tonic::Status::not_found(format!(
"Table with ID {entry_id} not found"
)));
}
Some(table)
}
};
let table_iter = if let Some(table) = table {
itertools::Either::Left(std::iter::once(table))
} else {
itertools::Either::Right(store.iter_tables())
};
Ok(table_iter
.map(Table::as_entry_details)
.map(Into::into)
.collect())
}
}
#[tonic::async_trait]
impl RerunCloudService for RerunCloudHandler {
async fn version(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::VersionRequest>,
) -> tonic::Result<tonic::Response<re_protos::cloud::v1alpha1::VersionResponse>> {
let re_protos::cloud::v1alpha1::VersionRequest {} = request.into_inner();
let build_info = re_build_info::build_info!();
Ok(tonic::Response::new(
re_protos::cloud::v1alpha1::VersionResponse {
build_info: Some(build_info.into()),
version: re_build_info::exposed_version!().to_owned(),
cloud_provider: None,
cloud_region: None,
},
))
}
async fn who_am_i(
&self,
_request: tonic::Request<re_protos::cloud::v1alpha1::WhoAmIRequest>,
) -> tonic::Result<tonic::Response<re_protos::cloud::v1alpha1::WhoAmIResponse>> {
Ok(tonic::Response::new(
re_protos::cloud::v1alpha1::WhoAmIResponse {
user_id: None,
can_read: true,
can_write: true,
},
))
}
async fn find_entries(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::FindEntriesRequest>,
) -> tonic::Result<tonic::Response<re_protos::cloud::v1alpha1::FindEntriesResponse>> {
let filter = request.into_inner().filter;
let entry_id = filter
.as_ref()
.and_then(|filter| filter.id)
.map(TryInto::try_into)
.transpose()?;
let name = filter
.as_ref()
.and_then(|filter| filter.name.clone())
.map(EntryName::new)
.transpose()
.map_err(|err| Status::invalid_argument(err.to_string()))?;
let kind = filter
.and_then(|filter| filter.entry_kind)
.map(EntryKind::try_from)
.transpose()
.map_err(|err| {
Status::invalid_argument(format!("find_entries: invalid entry kind {err}"))
})?;
let entries = match kind {
Some(EntryKind::Dataset) => {
self.find_datasets(entry_id, name, Some(StoreKind::Recording))
.await?
}
Some(EntryKind::BlueprintDataset) => {
self.find_datasets(entry_id, name, Some(StoreKind::Blueprint))
.await?
}
Some(EntryKind::Table) => self.find_tables(entry_id, name).await?,
Some(EntryKind::DatasetView | EntryKind::TableView) => {
return Err(Status::unimplemented(
"find_entries: dataset and table views are not supported",
));
}
Some(EntryKind::Unspecified) => {
return Err(Status::invalid_argument(
"find_entries: entry kind unspecified",
));
}
None => {
let mut datasets = match self.find_datasets(entry_id, name.clone(), None).await {
Ok(datasets) => datasets,
Err(err) => {
if err.code() == Code::NotFound {
vec![]
} else {
return Err(err);
}
}
};
let tables = match self.find_tables(entry_id, name).await {
Ok(tables) => tables,
Err(err) => {
if err.code() == Code::NotFound {
vec![]
} else {
return Err(err);
}
}
};
datasets.extend(tables);
datasets
}
};
let response = re_protos::cloud::v1alpha1::FindEntriesResponse { entries };
Ok(tonic::Response::new(response))
}
async fn create_dataset_entry(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::CreateDatasetEntryRequest>,
) -> tonic::Result<tonic::Response<re_protos::cloud::v1alpha1::CreateDatasetEntryResponse>>
{
let CreateDatasetEntryRequest {
name: dataset_name,
id: dataset_id,
} = request.into_inner().try_into()?;
let mut store = self.store.write().await;
let dataset_id = store.create_dataset(dataset_name, dataset_id)?;
let dataset = store.dataset(dataset_id)?;
Ok(tonic::Response::new(
CreateDatasetEntryResponse {
dataset: dataset.as_dataset_entry(),
}
.into(),
))
}
async fn read_dataset_entry(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::ReadDatasetEntryRequest>,
) -> tonic::Result<tonic::Response<re_protos::cloud::v1alpha1::ReadDatasetEntryResponse>> {
let store = self.store.read().await;
let entry_id = get_entry_id_from_headers(&store, &request)?;
let dataset = store.dataset(entry_id)?;
Ok(tonic::Response::new(
ReadDatasetEntryResponse {
dataset_entry: dataset.as_dataset_entry(),
}
.into(),
))
}
async fn update_dataset_entry(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::UpdateDatasetEntryRequest>,
) -> tonic::Result<tonic::Response<re_protos::cloud::v1alpha1::UpdateDatasetEntryResponse>>
{
let request: UpdateDatasetEntryRequest = request.into_inner().try_into()?;
let mut store = self.store.write().await;
let dataset = store.dataset_mut(request.id)?;
dataset.set_dataset_details(request.dataset_details);
Ok(tonic::Response::new(
UpdateDatasetEntryResponse {
dataset_entry: dataset.as_dataset_entry(),
}
.into(),
))
}
async fn read_table_entry(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::ReadTableEntryRequest>,
) -> tonic::Result<tonic::Response<re_protos::cloud::v1alpha1::ReadTableEntryResponse>> {
let store = self.store.read().await;
let id = request
.into_inner()
.id
.ok_or_else(|| Status::invalid_argument("No table entry ID provided"))?
.try_into()?;
let table = store.table(id).ok_or_else(|| {
tonic::Status::not_found(format!("table with entry ID '{id}' not found"))
})?;
Ok(tonic::Response::new(
ReadTableEntryResponse {
table_entry: table.as_table_entry(),
}
.try_into()?,
))
}
async fn delete_entry(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::DeleteEntryRequest>,
) -> tonic::Result<tonic::Response<re_protos::cloud::v1alpha1::DeleteEntryResponse>> {
let entry_id = request.into_inner().try_into()?;
self.store.write().await.delete_entry(entry_id)?;
Ok(tonic::Response::new(DeleteEntryResponse {}))
}
async fn update_entry(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::UpdateEntryRequest>,
) -> tonic::Result<tonic::Response<re_protos::cloud::v1alpha1::UpdateEntryResponse>> {
let UpdateEntryRequest {
id: entry_id,
entry_details_update: EntryDetailsUpdate { name },
} = request.into_inner().try_into()?;
let mut store = self.store.write().await;
if let Some(name) = name {
store.rename_entry(entry_id, name)?;
}
Ok(tonic::Response::new(
UpdateEntryResponse {
entry_details: store.entry_details(entry_id)?,
}
.into(),
))
}
async fn register_with_dataset(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::RegisterWithDatasetRequest>,
) -> tonic::Result<tonic::Response<re_protos::cloud::v1alpha1::RegisterWithDatasetResponse>>
{
let mut store = self.store.write().await;
let dataset_id = get_entry_id_from_headers(&store, &request)?;
let ext::RegisterWithDatasetRequest {
data_sources,
on_duplicate,
} = request.into_inner().try_into()?;
let data_sources = Self::resolve_data_sources(&data_sources)?;
if data_sources.is_empty() {
return Err(tonic::Status::invalid_argument(
"no data sources to register",
));
}
enum ValidatedSource {
File {
rrd_path: PathBuf,
layer_name: String,
storage_url: url::Url,
},
Memory {
store_slot_id: StoreSlotId,
resolved: ResolvedStore,
segment_id: SegmentId,
layer_name: String,
},
}
let mut seen: BTreeMap<(String, String), Vec<url::Url>> = BTreeMap::new();
let mut validated_sources: Vec<ValidatedSource> = Vec::new();
let store_kind = store.dataset(dataset_id)?.store_kind();
for source in data_sources {
let ext::DataSource {
storage_url,
is_prefix,
layer,
kind,
} = source;
if is_prefix {
return Err(tonic::Status::internal(
"register_with_dataset: prefix data sources should have been resolved already",
));
}
if kind != ext::DataSourceKind::Rrd {
return Err(tonic::Status::unimplemented(
"register_with_dataset: only RRD data sources are implemented",
));
}
let layer = if layer.is_empty() {
DataSource::DEFAULT_LAYER.to_owned()
} else {
layer
};
if storage_url.scheme() == "memory" {
let store_slot_id = parse_memory_url(&storage_url)?;
let resolved = store.resolve_store(&store_slot_id).ok_or_else(|| {
tonic::Status::not_found(format!(
"store not found for memory URL: {storage_url}"
))
})?;
let store_id = resolved.store_id();
if store_id.kind() != store_kind {
continue;
}
let segment_id = SegmentId::new(store_id.recording_id().to_string());
let key = (segment_id.id.clone(), layer.clone());
seen.entry(key).or_default().push(storage_url.clone());
validated_sources.push(ValidatedSource::Memory {
store_slot_id,
resolved,
segment_id,
layer_name: layer,
});
continue;
}
let Ok(rrd_path) = storage_url.to_file_path() else {
return if storage_url.scheme() == "file" && storage_url.host().is_some() {
Err(tonic::Status::not_found(format!(
"RRD file not found, file URI should not have a host: {storage_url} (this may be caused by invalid relative-path URI)"
)))
} else {
Err(tonic::Status::not_found(format!(
"RRD file not found, could not load URI: {storage_url}"
)))
};
};
if !rrd_path.exists() {
return Err(tonic::Status::not_found(format!(
"RRD file not found, file does not exists: {rrd_path:?}"
)));
}
if !rrd_path.is_file() {
return Err(tonic::Status::not_found(format!(
"RRD file not found, path is not a file: {rrd_path:?}"
)));
}
let store_ids = load_store_ids(&rrd_path)?;
for store_id in store_ids {
if store_id.kind() != store_kind {
continue;
}
let segment_id_str = store_id.recording_id().to_string();
let key = (segment_id_str, layer.clone());
seen.entry(key).or_default().push(storage_url.clone());
}
validated_sources.push(ValidatedSource::File {
rrd_path,
layer_name: layer,
storage_url,
});
}
let duplicates: Vec<_> = seen.iter().filter(|(_, urls)| urls.len() > 1).collect();
if !duplicates.is_empty() {
let details: Vec<String> = duplicates
.iter()
.map(|((segment_id, layer), urls)| {
let uri_lines = urls
.iter()
.map(|u| format!(" {u}"))
.collect::<Vec<_>>()
.join("\n");
format!(" segment id: {segment_id}, layer name: {layer}\n{uri_lines}")
})
.collect();
return Err(tonic::Status::invalid_argument(format!(
"duplicate segment layers in request:\n{}",
details.join("\n")
)));
}
struct ReadySource {
store_slot_id: StoreSlotId,
resolved: ResolvedStore,
segment_id: SegmentId,
layer_name: String,
storage_url: String,
}
let mut ready_sources: Vec<ReadySource> = Vec::new();
for source in validated_sources {
match source {
ValidatedSource::Memory {
store_slot_id,
resolved,
segment_id,
layer_name,
} => {
ready_sources.push(ReadySource {
storage_url: format!("memory:///store/{store_slot_id}"),
store_slot_id,
resolved,
segment_id,
layer_name,
});
}
ValidatedSource::File {
rrd_path,
layer_name,
storage_url,
} => {
re_log::info!("Loading RRD: {}", rrd_path.display());
for (store_id, resolved) in ResolvedStore::load_rrd_file(&rrd_path, store_kind)?
{
ready_sources.push(ReadySource {
store_slot_id: StoreSlotId::new(),
resolved,
segment_id: SegmentId::new(store_id.recording_id().to_string()),
layer_name: layer_name.clone(),
storage_url: storage_url.to_string(),
});
}
}
}
}
let mut segment_ids: Vec<String> = vec![];
let mut segment_layers: Vec<String> = vec![];
let mut segment_types: Vec<String> = vec![];
let mut storage_urls: Vec<String> = vec![];
let mut task_ids: Vec<String> = vec![];
let mut failed_task_results: Vec<(TaskId, TaskResult)> = vec![];
for source in &ready_sources {
store.register_store_with_id(source.store_slot_id, &source.resolved);
}
{
let dataset = store.dataset_mut(dataset_id)?;
for source in ready_sources {
let add_result = dataset
.add_layer(
source.segment_id.clone(),
source.layer_name.clone(),
source.store_slot_id,
source.resolved,
on_duplicate,
)
.await;
match add_result {
Ok(()) => {
segment_ids.push(source.segment_id.to_string());
segment_layers.push(source.layer_name);
segment_types.push("rrd".to_owned());
storage_urls.push(source.storage_url);
task_ids.push(TASK_ID_SUCCESS.to_owned());
}
Err(Error::SchemaConflict(msg)) => {
segment_ids.push(String::new());
segment_layers.push(source.layer_name);
segment_types.push("rrd".to_owned());
storage_urls.push(source.storage_url);
let task_id = TaskId::new();
task_ids.push(task_id.id.clone());
failed_task_results.push((task_id, TaskResult::failed(&msg)));
}
Err(other_err) => {
return Err(other_err.into());
}
}
}
}
for (task_id, result) in failed_task_results {
store.task_registry().register_failure(task_id, result);
}
let record_batch = RegisterWithDatasetResponse::create_dataframe(
segment_ids,
segment_layers,
segment_types,
storage_urls,
task_ids,
)
.map_err(|err| tonic::Status::internal(format!("Failed to create dataframe: {err:#}")))?;
Ok(tonic::Response::new(
re_protos::cloud::v1alpha1::RegisterWithDatasetResponse {
data: Some(record_batch.into()),
},
))
}
type UnregisterFromDatasetStream = UnregisterFromDatasetResponseStream;
async fn unregister_from_dataset(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::UnregisterFromDatasetRequest>,
) -> tonic::Result<Response<Self::UnregisterFromDatasetStream>> {
let mut store = self.store.write().await;
let entry_id = get_entry_id_from_headers(&store, &request)?;
request.get_ref().sanity_check()?;
let dataset = store.dataset_mut(entry_id)?;
let ext::UnregisterFromDatasetRequest {
segments_to_drop,
layers_to_drop,
force: _, } = request.into_inner().try_into()?;
let segments_to_drop = segments_to_drop.iter().collect();
let layers_to_drop = layers_to_drop.iter().map(|s| s.as_str()).collect();
let dataset_manifest_removed =
dataset.dataset_manifest_filtered(&segments_to_drop, &layers_to_drop)?;
_ = dataset
.remove_layers(&segments_to_drop, &layers_to_drop)
.await?;
store.cleanup_store_pool();
let stream = futures::stream::once(async move {
Ok(re_protos::cloud::v1alpha1::UnregisterFromDatasetResponse {
data: Some(dataset_manifest_removed.into()),
})
});
Ok(tonic::Response::new(
Box::pin(stream) as Self::UnregisterFromDatasetStream
))
}
async fn write_chunks(
&self,
request: tonic::Request<tonic::Streaming<re_protos::cloud::v1alpha1::WriteChunksRequest>>,
) -> tonic::Result<tonic::Response<re_protos::cloud::v1alpha1::WriteChunksResponse>> {
let entry_id = get_entry_id_from_headers(&*self.store.read().await, &request)?;
let mut request = request.into_inner();
let mut chunk_stores: HashMap<_, _> = HashMap::default();
while let Some(chunk_msg) = request.next().await {
let chunk_msg = chunk_msg?;
let chunk_batch: RecordBatch = chunk_msg
.chunk
.ok_or_else(|| tonic::Status::invalid_argument("no chunk in WriteChunksRequest"))?
.try_into()
.map_err(|err| {
tonic::Status::internal(format!("Could not decode chunk: {err:#}"))
})?;
let schema = chunk_batch.schema();
let metadata = schema.metadata();
let segment_id: SegmentId = metadata
.get("rerun:segment_id")
.or_else(|| metadata.get("rerun:partition_id"))
.ok_or_else(|| {
tonic::Status::invalid_argument(
"Received chunk without 'rerun:segment_id' metadata",
)
})?
.clone()
.into();
let chunk = Arc::new(Chunk::from_record_batch(&chunk_batch).map_err(|err| {
tonic::Status::internal(format!("error decoding chunk from record batch: {err:#}"))
})?);
chunk_stores
.entry(segment_id.clone())
.or_insert_with(|| {
ChunkStore::new(
StoreId::new(
StoreKind::Recording,
entry_id.to_string(),
segment_id.id.clone(),
),
self.eager_chunk_store_config.clone(),
)
})
.insert_chunk(&chunk)
.map_err(|err| {
tonic::Status::internal(format!("error adding chunk to store: {err:#}"))
})?;
}
let mut store = self.store.write().await;
let handles: Vec<_> = chunk_stores
.into_iter()
.map(|(segment_id, chunk_store)| {
let resolved = ResolvedStore::Eager(ChunkStoreHandle::new(chunk_store));
let store_slot_id = store.register_store(&resolved);
(segment_id, store_slot_id, resolved)
})
.collect();
let dataset = store.dataset_mut(entry_id)?;
for (entity_path, store_slot_id, resolved) in handles {
dataset
.add_layer(
entity_path,
DataSource::DEFAULT_LAYER.to_owned(),
store_slot_id,
resolved,
IfDuplicateBehavior::Error,
)
.await?;
}
Ok(tonic::Response::new(
re_protos::cloud::v1alpha1::WriteChunksResponse {},
))
}
async fn write_table(
&self,
request: tonic::Request<tonic::Streaming<re_protos::cloud::v1alpha1::WriteTableRequest>>,
) -> tonic::Result<tonic::Response<re_protos::cloud::v1alpha1::WriteTableResponse>> {
let entry_id = {
let store = self.store.read().await;
get_entry_id_from_headers(&store, &request)?
};
let mut request = request.into_inner();
while let Some(write_msg) = request.next().await {
let write_msg = write_msg?;
let rb = write_msg
.dataframe_part
.ok_or_else(|| {
tonic::Status::invalid_argument("no data frame in WriteTableRequest")
})?
.try_into()
.map_err(|err| {
tonic::Status::internal(format!("Could not decode chunk: {err:#}"))
})?;
let mut store = self.store.write().await;
let Some(table) = store.table_mut(entry_id) else {
return Err(tonic::Status::not_found("table not found"));
};
let insert_op = match TableInsertMode::try_from(write_msg.insert_mode)
.map_err(|err| Status::invalid_argument(err.to_string()))?
{
TableInsertMode::Append => InsertOp::Append,
TableInsertMode::Overwrite => InsertOp::Overwrite,
TableInsertMode::Replace => InsertOp::Replace,
};
table.write_table(rb, insert_op).await.map_err(|err| {
tonic::Status::internal(format!("error writing to table: {err:#}"))
})?;
}
Ok(tonic::Response::new(
re_protos::cloud::v1alpha1::WriteTableResponse {},
))
}
async fn get_segment_table_schema(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::GetSegmentTableSchemaRequest>,
) -> tonic::Result<tonic::Response<re_protos::cloud::v1alpha1::GetSegmentTableSchemaResponse>>
{
let store = self.store.read().await;
let entry_id = get_entry_id_from_headers(&store, &request)?;
let dataset = store.dataset(entry_id)?;
let record_batch = dataset.segment_table().map_err(|err| {
tonic::Status::internal(format!("Unable to read segment table: {err:#}"))
})?;
Ok(tonic::Response::new(GetSegmentTableSchemaResponse {
schema: Some(
record_batch
.schema_ref()
.as_ref()
.try_into()
.map_err(|err| {
tonic::Status::internal(format!(
"unable to serialize Arrow schema: {err:#}"
))
})?,
),
}))
}
type ScanSegmentTableStream = ScanSegmentTableResponseStream;
async fn scan_segment_table(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::ScanSegmentTableRequest>,
) -> tonic::Result<tonic::Response<Self::ScanSegmentTableStream>> {
let store = self.store.read().await;
let entry_id = get_entry_id_from_headers(&store, &request)?;
let request = request.into_inner();
let dataset = store.dataset(entry_id)?;
let mut record_batch = dataset.segment_table().map_err(|err| {
tonic::Status::internal(format!("Unable to read segment table: {err:#}"))
})?;
if !request.columns.is_empty() {
record_batch = record_batch
.project_columns(request.columns.iter().map(|s| s.as_str()))
.map_err(|err| {
tonic::Status::invalid_argument(format!("Unable to project columns: {err:#}"))
})?;
}
let stream = futures::stream::once(async move {
Ok(ScanSegmentTableResponse {
data: Some(record_batch.into()),
})
});
Ok(tonic::Response::new(
Box::pin(stream) as Self::ScanSegmentTableStream
))
}
async fn get_dataset_manifest_schema(
&self,
request: Request<GetDatasetManifestSchemaRequest>,
) -> tonic::Result<Response<GetDatasetManifestSchemaResponse>> {
let store = self.store.read().await;
let entry_id = get_entry_id_from_headers(&store, &request)?;
let dataset = store.dataset(entry_id)?;
let record_batch = dataset.dataset_manifest()?;
Ok(tonic::Response::new(GetDatasetManifestSchemaResponse {
schema: Some(
record_batch
.schema_ref()
.as_ref()
.try_into()
.map_err(|err| {
tonic::Status::internal(format!(
"unable to serialize Arrow schema: {err:#}"
))
})?,
),
}))
}
type ScanDatasetManifestStream = ScanDatasetManifestResponseStream;
async fn scan_dataset_manifest(
&self,
request: Request<ScanDatasetManifestRequest>,
) -> tonic::Result<Response<Self::ScanDatasetManifestStream>> {
let store = self.store.read().await;
let entry_id = get_entry_id_from_headers(&store, &request)?;
let request = request.into_inner();
let dataset = store.dataset(entry_id)?;
let mut record_batch = dataset.dataset_manifest()?;
if !request.columns.is_empty() {
record_batch = record_batch
.project_columns(request.columns.iter().map(|s| s.as_str()))
.map_err(|err| {
tonic::Status::invalid_argument(format!("Unable to project columns: {err:#}"))
})?;
}
let stream = futures::stream::once(async move {
Ok(ScanDatasetManifestResponse {
data: Some(record_batch.into()),
})
});
Ok(tonic::Response::new(
Box::pin(stream) as Self::ScanDatasetManifestStream
))
}
async fn get_dataset_schema(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::GetDatasetSchemaRequest>,
) -> tonic::Result<tonic::Response<re_protos::cloud::v1alpha1::GetDatasetSchemaResponse>> {
let store = self.store.read().await;
let entry_id = get_entry_id_from_headers(&store, &request)?;
let dataset = store.dataset(entry_id)?;
let schema = dataset.schema().map_err(|err| {
tonic::Status::internal(format!("Unable to read dataset schema: {err:#}"))
})?;
Ok(tonic::Response::new(GetDatasetSchemaResponse {
schema: Some((&schema).try_into().map_err(|err| {
tonic::Status::internal(format!("Unable to serialize Arrow schema: {err:#}"))
})?),
}))
}
type GetRrdManifestStream = GetRrdManifestResponseStream;
async fn get_rrd_manifest(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::GetRrdManifestRequest>,
) -> tonic::Result<tonic::Response<Self::GetRrdManifestStream>> {
let store = self.store.read().await;
let entry_id = get_entry_id_from_headers(&store, &request)?;
let request = request.into_inner();
let segment_id = request
.segment_id
.ok_or_else(|| {
missing_field!(
re_protos::cloud::v1alpha1::GetRrdManifestRequest,
"segment_id"
)
})?
.try_into()?;
let dataset = store.dataset(entry_id)?;
let rrd_manifest = dataset.rrd_manifest(&segment_id)?;
let rrd_manifest_stream =
futures::stream::once(futures::future::ok(GetRrdManifestResponse {
rrd_manifest: Some(rrd_manifest.to_transport(()).map_err(|err| {
tonic::Status::internal(format!("Unable to compute RRD manifest: {err:#}"))
})?),
}));
Ok(tonic::Response::new(
Box::pin(rrd_manifest_stream) as Self::GetRrdManifestStream
))
}
async fn create_index(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::CreateIndexRequest>,
) -> tonic::Result<tonic::Response<re_protos::cloud::v1alpha1::CreateIndexResponse>> {
cfg_if! {
if #[cfg(feature = "lance")] {
let store = self.store.read().await;
let entry_id = get_entry_id_from_headers(&store, &request)?;
let dataset = store.dataset(entry_id)?;
dataset.indexes().create_index(dataset, request.into_inner().try_into()?).await
} else {
let _ = request;
Err(tonic::Status::unimplemented("create_index requires the `lance` feature"))
}
}
}
async fn list_indexes(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::ListIndexesRequest>,
) -> tonic::Result<tonic::Response<re_protos::cloud::v1alpha1::ListIndexesResponse>> {
cfg_if! {
if #[cfg(feature = "lance")] {
let store = self.store.read().await;
let entry_id = get_entry_id_from_headers(&store, &request)?;
let dataset = store.dataset(entry_id)?;
dataset.indexes().list_indexes(request.into_inner()).await
} else {
let _ = request;
Err(tonic::Status::unimplemented("list_indexes requires the `lance` feature"))
}
}
}
async fn delete_indexes(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::DeleteIndexesRequest>,
) -> tonic::Result<tonic::Response<re_protos::cloud::v1alpha1::DeleteIndexesResponse>> {
cfg_if! {
if #[cfg(feature = "lance")] {
let store = self.store.read().await;
let entry_id = get_entry_id_from_headers(&store, &request)?;
let dataset = store.dataset(entry_id)?;
let request = request.into_inner();
let column = request.column.ok_or_else(|| {
missing_field!(re_protos::cloud::v1alpha1::DeleteIndexesRequest, "column")
})?;
dataset.indexes().delete_indexes(column.try_into()?).await
} else {
let _ = request;
Err(tonic::Status::unimplemented("delete_indexes requires the `lance` feature"))
}
}
}
type SearchDatasetStream = SearchDatasetResponseStream;
async fn search_dataset(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::SearchDatasetRequest>,
) -> tonic::Result<tonic::Response<Self::SearchDatasetStream>> {
cfg_if! {
if #[cfg(feature = "lance")] {
let store = self.store.read().await;
let entry_id = get_entry_id_from_headers(&store, &request)?;
let dataset = store.dataset(entry_id)?;
Ok(crate::chunk_index::DatasetChunkIndexes::search_dataset(dataset, request.into_inner().try_into()?).await?)
} else {
let _ = request;
Err(tonic::Status::unimplemented("search_dataset requires the `lance` feature"))
}
}
}
type QueryDatasetStream = QueryDatasetResponseStream;
async fn query_dataset(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::QueryDatasetRequest>,
) -> tonic::Result<tonic::Response<Self::QueryDatasetStream>> {
if !request.get_ref().chunk_ids.is_empty() {
return Err(tonic::Status::unimplemented(
"query_dataset: querying specific chunk ids is not implemented",
));
}
let entry_id = get_entry_id_from_headers(&*self.store.read().await, &request)?;
let QueryDatasetRequest {
segment_ids,
entity_paths,
select_all_entity_paths,
chunk_ids: requested_chunk_ids,
fuzzy_descriptors: _,
exclude_static_data,
exclude_temporal_data,
scan_parameters,
query,
generate_direct_urls: _,
} = request.into_inner().try_into()?;
if scan_parameters.is_some() {
re_log::debug_once!(" scan_parameters are not yet implemented and will be ignored");
}
let entity_paths: IntSet<EntityPath> = entity_paths.into_iter().collect();
if select_all_entity_paths && !entity_paths.is_empty() {
return Err(tonic::Status::invalid_argument(
"cannot specify entity paths if `select_all_entity_paths` is true",
));
}
let chunk_stores = self.get_chunk_stores(entry_id, &segment_ids).await?;
if chunk_stores.is_empty() {
let stream = futures::stream::iter([{
let batch = QueryDatasetResponse::create_empty_dataframe();
let data = Some(batch.into());
Ok(QueryDatasetResponse { data })
}]);
return Ok(tonic::Response::new(
Box::pin(stream) as Self::QueryDatasetStream
));
}
let all_timelines: BTreeMap<String, arrow::datatypes::DataType> = chunk_stores
.iter()
.flat_map(|(_, _, _, resolved)| {
resolved
.schema()
.timelines()
.into_values()
.map(|tl| (tl.name().as_str().to_owned(), tl.datatype()))
.collect::<Vec<_>>()
})
.collect();
let stream = futures::stream::iter(chunk_stores.into_iter().map(
move |(segment_id, layer_name, store_slot_id, resolved)| {
let metadata_vec: Vec<ChunkMetadata> = if let Some(query) = &query {
let (chunks, missing_virtual) =
get_chunks_for_query_results(&resolved, &entity_paths, query);
let mut metas: Vec<_> = chunks
.iter()
.map(|c| ChunkMetadata::from_chunk(c))
.collect();
if let ResolvedStore::Lazy(lazy) = &resolved {
for chunk_id in &missing_virtual {
if let Some(idx) = lazy.chunk_row_index(chunk_id) {
metas.push(ChunkMetadata::from_manifest(
lazy.manifest(),
*chunk_id,
idx,
lazy.timeline_ranges().get(chunk_id),
));
}
}
}
metas
} else {
match &resolved {
ResolvedStore::Eager(h) => h
.read()
.iter_physical_chunks()
.map(|c| ChunkMetadata::from_chunk(c))
.collect(),
ResolvedStore::Lazy(lazy) => lazy
.manifest()
.col_chunk_ids()
.iter()
.enumerate()
.map(|(idx, &chunk_id)| {
ChunkMetadata::from_manifest(
lazy.manifest(),
chunk_id,
idx,
lazy.timeline_ranges().get(&chunk_id),
)
})
.collect(),
}
};
let num_chunks = metadata_vec.len();
let mut chunk_ids = Vec::with_capacity(num_chunks);
let mut chunk_segment_ids = Vec::with_capacity(num_chunks);
let mut chunk_keys = Vec::with_capacity(num_chunks);
let mut chunk_entity_path = Vec::with_capacity(num_chunks);
let mut chunk_is_static = Vec::with_capacity(num_chunks);
let mut chunk_byte_sizes = Vec::with_capacity(num_chunks);
let mut chunk_byte_sizes_uncompressed = Vec::with_capacity(num_chunks);
let mut chunk_direct_urls = Vec::with_capacity(num_chunks);
let mut chunk_direct_url_expiry = Vec::with_capacity(num_chunks);
let mut timelines: BTreeMap<
String,
(arrow::datatypes::DataType, Vec<Option<i64>>),
> = all_timelines
.iter()
.map(|(name, dtype)| {
(
name.clone(),
(dtype.clone(), Vec::with_capacity(num_chunks)),
)
})
.collect();
for meta in &metadata_vec {
if !entity_paths.is_empty()
&& !entity_paths.contains(&EntityPath::from(meta.entity_path.as_str()))
{
continue;
}
if !requested_chunk_ids.is_empty()
&& !requested_chunk_ids.contains(&meta.chunk_id)
{
continue;
}
if exclude_static_data && meta.is_static {
continue;
}
if exclude_temporal_data && !meta.is_static {
continue;
}
let mut missing_timelines: BTreeSet<String> =
timelines.keys().cloned().collect();
for (timeline, range) in &meta.timelines {
let timeline_name = timeline.name().as_str();
missing_timelines.remove(timeline_name);
let timeline_data = timelines
.get_mut(timeline_name)
.expect("timeline was pre-seeded from chunk stores");
timeline_data.1.push(Some(range.min().as_i64()));
}
for timeline_name in missing_timelines {
let timeline_data = timelines
.get_mut(&timeline_name)
.expect("timeline_names already checked");
timeline_data.1.push(None);
}
chunk_segment_ids.push(segment_id.id.clone());
chunk_ids.push(meta.chunk_id);
chunk_entity_path.push(meta.entity_path.clone());
chunk_is_static.push(meta.is_static);
chunk_byte_sizes.push(meta.byte_size);
chunk_byte_sizes_uncompressed.push(Some(meta.byte_size));
chunk_keys.push(
ChunkKey {
chunk_id: meta.chunk_id,
store_slot_id,
}
.encode()?,
);
chunk_direct_urls.push(None);
chunk_direct_url_expiry.push(None);
}
let chunk_layer_names = vec![layer_name.clone(); chunk_ids.len()];
let chunk_key_refs = chunk_keys.iter().map(|v| v.as_slice()).collect();
let batch = QueryDatasetResponse::create_dataframe_with_timelines(
chunk_ids,
chunk_segment_ids,
chunk_layer_names,
chunk_key_refs,
chunk_entity_path,
chunk_is_static,
chunk_byte_sizes,
chunk_byte_sizes_uncompressed,
chunk_direct_urls,
chunk_direct_url_expiry,
&timelines,
)
.map_err(|err| {
tonic::Status::internal(format!("Failed to create dataframe: {err:#}"))
})?;
let data = Some(batch.into());
Ok(QueryDatasetResponse { data })
},
));
Ok(tonic::Response::new(
Box::pin(stream) as Self::QueryDatasetStream
))
}
type FetchChunksStream = FetchChunksResponseStream;
async fn fetch_chunks(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::FetchChunksRequest>,
) -> tonic::Result<tonic::Response<Self::FetchChunksStream>> {
let request = request.into_inner();
let mut chunk_keys = vec![];
for chunk_info_data in request.chunk_infos {
let chunk_info_batch: RecordBatch = chunk_info_data.try_into().map_err(|err| {
tonic::Status::internal(format!("Failed to decode chunk_info: {err:#}"))
})?;
let schema = chunk_info_batch.schema();
let chunk_key_col_idx = schema
.column_with_name(FetchChunksRequest::FIELD_CHUNK_KEY)
.ok_or_else(|| {
tonic::Status::invalid_argument(format!(
"Missing {} column",
FetchChunksRequest::FIELD_CHUNK_KEY
))
})?
.0;
let chunk_keys_arr = chunk_info_batch
.column(chunk_key_col_idx)
.as_any()
.downcast_ref::<BinaryArray>()
.ok_or_else(|| {
tonic::Status::invalid_argument(format!(
"{} must be binary array",
FetchChunksRequest::FIELD_CHUNK_KEY
))
})?;
for chunk_key in chunk_keys_arr {
let chunk_key = chunk_key.ok_or_else(|| {
tonic::Status::invalid_argument(format!(
"{} must not be null",
FetchChunksRequest::FIELD_CHUNK_KEY
))
})?;
let chunk_key = ChunkKey::decode(chunk_key)?;
chunk_keys.push(chunk_key);
}
}
let chunks = self
.store
.read()
.await
.chunks_from_chunk_keys(&chunk_keys)?;
let stream = futures::stream::iter(chunks).map(|(store_id, chunk)| {
let arrow_msg = re_log_types::ArrowMsg {
chunk_id: *chunk.id(),
batch: chunk.to_record_batch().map_err(|err| {
tonic::Status::internal(format!(
"failed to convert chunk to record batch: {err:#}"
))
})?,
on_release: None,
};
let compression = re_log_encoding::Compression::Off;
let encoded_chunk = arrow_msg
.to_transport((store_id, compression))
.map_err(|err| tonic::Status::internal(format!("encoding failed: {err:#}")))?;
Ok(re_protos::cloud::v1alpha1::FetchChunksResponse {
chunks: vec![encoded_chunk],
})
});
Ok(tonic::Response::new(
Box::pin(stream) as Self::FetchChunksStream
))
}
async fn register_table(
&self,
request: tonic::Request<RegisterTableRequest>,
) -> tonic::Result<tonic::Response<RegisterTableResponse>> {
#[cfg_attr(not(feature = "lance"), expect(unused_mut))]
let mut store = self.store.write().await;
let request = request.into_inner();
let Some(provider_details) = request.provider_details else {
return Err(tonic::Status::invalid_argument("Missing provider details"));
};
#[cfg_attr(not(feature = "lance"), expect(unused_variables))]
let lance_table = match ProviderDetails::try_from(&provider_details) {
Ok(ProviderDetails::LanceTable(lance_table)) => lance_table.table_url,
Ok(ProviderDetails::SystemTable(_)) => Err(Status::invalid_argument(
"System tables cannot be registered",
))?,
Err(err) => return Err(err.into()),
}
.to_file_path()
.map_err(|()| tonic::Status::invalid_argument("Invalid lance table path"))?;
#[cfg(feature = "lance")]
let entry_id = {
let named_path = NamedPath {
name: Some(request.name.clone()),
path: lance_table,
};
store
.load_directory_as_table(&named_path, IfDuplicateBehavior::Error)
.await?
};
#[cfg(not(feature = "lance"))]
let entry_id = EntryId::new();
let table_entry = store
.table(entry_id)
.ok_or_else(|| Status::internal("table missing that was just registered"))?
.as_table_entry();
let response = RegisterTableResponse {
table_entry: Some(table_entry.try_into()?),
};
Ok(response.into())
}
async fn get_table_schema(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::GetTableSchemaRequest>,
) -> tonic::Result<tonic::Response<re_protos::cloud::v1alpha1::GetTableSchemaResponse>> {
let store = self.store.read().await;
let Some(entry_id) = request.into_inner().table_id else {
return Err(Status::not_found("Table ID not specified in request"));
};
let entry_id = entry_id.try_into()?;
let table = store
.table(entry_id)
.ok_or_else(|| Status::not_found(format!("Entry with ID {entry_id} not found")))?;
let schema = table.schema();
Ok(tonic::Response::new(
re_protos::cloud::v1alpha1::GetTableSchemaResponse {
schema: Some(schema.as_ref().try_into().map_err(|err| {
Status::internal(format!("Unable to serialize Arrow schema: {err:#}"))
})?),
},
))
}
type ScanTableStream = ScanTableResponseStream;
async fn scan_table(
&self,
request: tonic::Request<re_protos::cloud::v1alpha1::ScanTableRequest>,
) -> tonic::Result<tonic::Response<Self::ScanTableStream>> {
let store = self.store.read().await;
let Some(entry_id) = request.into_inner().table_id else {
return Err(Status::not_found("Table ID not specified in request"));
};
let entry_id = entry_id.try_into()?;
let table = store
.table(entry_id)
.ok_or_else(|| Status::not_found(format!("Entry with ID {entry_id} not found")))?;
let ctx = SessionContext::default();
let plan = table
.provider()
.scan(&ctx.state(), None, &[], None)
.await
.map_err(|err| Status::internal(format!("failed to scan table: {err:#}")))?;
let stream = plan
.execute(0, ctx.task_ctx())
.map_err(|err| tonic::Status::from_error(Box::new(err)))?;
let resp_stream = stream.map(|batch| {
let batch = batch.map_err(|err| tonic::Status::from_error(Box::new(err)))?;
Ok(ScanTableResponse {
dataframe_part: Some(batch.into()),
})
});
Ok(tonic::Response::new(
Box::pin(resp_stream) as Self::ScanTableStream
))
}
async fn query_tasks(
&self,
request: tonic::Request<QueryTasksRequest>,
) -> tonic::Result<tonic::Response<QueryTasksResponse>> {
let task_ids = request.into_inner().ids;
let store = self.store.read().await;
let mut ids = Vec::with_capacity(task_ids.len());
let mut exec_statuses = Vec::with_capacity(task_ids.len());
let mut msgs = Vec::with_capacity(task_ids.len());
for task_id in task_ids {
let result = store
.task_registry()
.get(&task_id)
.unwrap_or_else(TaskResult::success);
ids.push(task_id.id);
exec_statuses.push(result.exec_status);
msgs.push(if result.msgs.is_empty() {
None
} else {
Some(result.msgs)
});
}
let num_tasks = ids.len();
let rb = QueryTasksResponse::create_dataframe(
ids,
vec![None; num_tasks], vec![None; num_tasks], exec_statuses,
msgs,
vec![None; num_tasks], vec![None; num_tasks], vec![None; num_tasks], vec![1; num_tasks], vec![None; num_tasks], vec![None; num_tasks], )
.map_err(|err| tonic::Status::internal(format!("Failed to create dataframe: {err:#}")))?;
Ok(tonic::Response::new(QueryTasksResponse {
data: Some(rb.into()),
}))
}
type QueryTasksOnCompletionStream = QueryTasksOnCompletionResponseStream;
async fn query_tasks_on_completion(
&self,
request: tonic::Request<QueryTasksOnCompletionRequest>,
) -> tonic::Result<tonic::Response<Self::QueryTasksOnCompletionStream>> {
let task_ids = request.into_inner().ids;
let response_data = self
.query_tasks(tonic::Request::new(QueryTasksRequest { ids: task_ids }))
.await?
.into_inner()
.data;
Ok(tonic::Response::new(
Box::pin(futures::stream::once(async move {
Ok(QueryTasksOnCompletionResponse {
data: response_data,
})
})) as Self::QueryTasksOnCompletionStream,
))
}
async fn cancel_tasks(
&self,
_request: tonic::Request<CancelTasksRequest>,
) -> tonic::Result<tonic::Response<CancelTasksResponse>> {
Ok(tonic::Response::new(CancelTasksResponse {}))
}
async fn do_maintenance(
&self,
_request: tonic::Request<re_protos::cloud::v1alpha1::DoMaintenanceRequest>,
) -> tonic::Result<tonic::Response<re_protos::cloud::v1alpha1::DoMaintenanceResponse>> {
Err(tonic::Status::unimplemented(
"do_maintenance not implemented",
))
}
async fn do_global_maintenance(
&self,
_request: tonic::Request<re_protos::cloud::v1alpha1::DoGlobalMaintenanceRequest>,
) -> tonic::Result<tonic::Response<re_protos::cloud::v1alpha1::DoGlobalMaintenanceResponse>>
{
Err(tonic::Status::unimplemented(
"do_global_maintenance not implemented",
))
}
async fn create_table_entry(
&self,
request: Request<re_protos::cloud::v1alpha1::CreateTableEntryRequest>,
) -> tonic::Result<Response<re_protos::cloud::v1alpha1::CreateTableEntryResponse>> {
let mut store = self.store.write().await;
let request: CreateTableEntryRequest = request.into_inner().try_into()?;
let table_name = request.name;
let schema = Arc::new(request.schema);
let details = if let Some(details) = request.provider_details {
details
} else {
let table_path = self
.settings
.storage_dir
.path()
.join(format!("lance-{}", Tuid::new()));
ProviderDetails::LanceTable(LanceTable {
table_url: url::Url::from_directory_path(table_path).map_err(|_err| {
Status::internal(format!(
"Failed to create table directory in {:?}",
self.settings.storage_dir.path()
))
})?,
})
};
let table = match details {
ProviderDetails::LanceTable(table) => {
store
.create_table_entry(table_name, &table.table_url, schema)
.await?
}
ProviderDetails::SystemTable(_) => {
return Err(tonic::Status::invalid_argument(
"Creating system tables is not supported",
));
}
};
Ok(Response::new(
CreateTableEntryResponse { table }.try_into()?,
))
}
}
fn load_store_ids(rrd_path: &std::path::Path) -> tonic::Result<BTreeSet<StoreId>> {
let reader = std::io::BufReader::new(
std::fs::File::open(rrd_path)
.map_err(|err| tonic::Status::internal(format!("Failed to open RRD file: {err:#}")))?,
);
let decoder = re_log_encoding::DecoderApp::decode_lazy(reader);
let mut store_ids = BTreeSet::new();
for msg_result in decoder {
let msg = msg_result.map_err(|err| {
tonic::Status::internal(format!("Failed to decode RRD message: {err:#}"))
})?;
if let re_log_types::LogMsg::SetStoreInfo(info) = msg {
store_ids.insert(info.info.store_id);
}
}
Ok(store_ids)
}
fn parse_memory_url(url: &url::Url) -> tonic::Result<StoreSlotId> {
let path = url.path();
let slot_id_str = path.strip_prefix("/store/").ok_or_else(|| {
tonic::Status::invalid_argument(format!(
"invalid memory URL format, expected memory:///store/{{store_slot_id}}: {url}"
))
})?;
slot_id_str.parse::<StoreSlotId>().map_err(|err| {
tonic::Status::invalid_argument(format!(
"invalid store slot ID in memory URL '{url}': {err}"
))
})
}
fn get_entry_id_from_headers<T>(
store: &InMemoryStore,
req: &tonic::Request<T>,
) -> tonic::Result<EntryId> {
if let Some(entry_id) = req.entry_id()? {
Ok(entry_id)
} else if let Some(dataset_name) = req.entry_name()? {
Ok(store.dataset_by_name(&dataset_name)?.id())
} else {
const HEADERS: &[&str] = &[
re_protos::headers::RERUN_HTTP_HEADER_ENTRY_ID,
re_protos::headers::RERUN_HTTP_HEADER_ENTRY_NAME,
];
Err(tonic::Status::invalid_argument(format!(
"missing mandatory {HEADERS:?} HTTP headers"
)))
}
}
fn latest_at_or_static(latest_at: &ext::QueryLatestAt) -> LatestAtQuery {
match &latest_at.index {
Some(index) => LatestAtQuery::new(index.clone().into(), latest_at.at),
None => {
LatestAtQuery::new("".into(), re_log_types::TimeInt::MIN)
}
}
}
struct ChunkMetadata {
chunk_id: ChunkId,
entity_path: String,
is_static: bool,
byte_size: u64,
timelines: IntMap<Timeline, AbsoluteTimeRange>,
}
impl ChunkMetadata {
fn from_chunk(chunk: &Chunk) -> Self {
let timelines = chunk
.timelines()
.values()
.map(|col| (*col.timeline(), col.time_range()))
.collect();
Self {
chunk_id: chunk.id(),
entity_path: chunk.entity_path().to_string(),
is_static: chunk.is_static(),
byte_size: re_byte_size::SizeBytes::total_size_bytes(chunk),
timelines,
}
}
fn from_manifest(
manifest: &re_log_encoding::RrdManifest,
chunk_id: ChunkId,
row_idx: usize,
chunk_timelines: Option<&IntMap<Timeline, AbsoluteTimeRange>>,
) -> Self {
Self {
chunk_id,
entity_path: manifest
.col_chunk_entity_path_raw()
.value(row_idx)
.to_owned(),
is_static: manifest.col_chunk_is_static_raw().value(row_idx),
byte_size: manifest.col_chunk_byte_size_uncompressed()[row_idx],
timelines: chunk_timelines.cloned().unwrap_or_default(),
}
}
}
fn get_chunks_for_query_results(
resolved: &ResolvedStore,
entity_paths: &IntSet<EntityPath>,
query: &ext::Query,
) -> (Vec<Arc<Chunk>>, Vec<ChunkId>) {
if query.latest_at.is_none() && query.range.is_none() {
return match resolved {
ResolvedStore::Eager(h) => (h.read().iter_physical_chunks().cloned().collect(), vec![]),
ResolvedStore::Lazy(lazy) => (vec![], lazy.manifest().col_chunk_ids().to_vec()),
};
}
let paths = if entity_paths.is_empty() {
resolved.all_entities()
} else {
entity_paths.clone()
};
let mut all_chunks: Vec<Arc<Chunk>> = vec![];
let mut all_missing: BTreeSet<ChunkId> = BTreeSet::new();
let mut seen_physical: BTreeSet<ChunkId> = BTreeSet::new();
for entity_path in &paths {
if let Some(latest_at) = &query.latest_at {
let latest_at_q = latest_at_or_static(latest_at);
let results = resolved.latest_at_relevant_chunks_for_all_components(
ChunkTrackingMode::Report,
&latest_at_q,
entity_path,
true,
);
for chunk in results.chunks {
if seen_physical.insert(chunk.id()) {
all_chunks.push(chunk);
}
}
all_missing.extend(results.missing_virtual);
}
if let Some(range) = &query.range {
let range_q = RangeQuery::new(range.index.clone().into(), range.index_range);
let results = resolved.range_relevant_chunks_for_all_components(
ChunkTrackingMode::Report,
&range_q,
entity_path,
true,
);
for chunk in results.chunks {
if seen_physical.insert(chunk.id()) {
all_chunks.push(chunk);
}
}
all_missing.extend(results.missing_virtual);
}
}
for id in &seen_physical {
all_missing.remove(id);
}
(all_chunks, all_missing.into_iter().collect())
}