re_server/
rerun_cloud.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3
4use ahash::HashMap;
5use arrow::array::BinaryArray;
6use arrow::record_batch::RecordBatch;
7use datafusion::logical_expr::dml::InsertOp;
8use datafusion::prelude::SessionContext;
9use nohash_hasher::IntSet;
10use tokio_stream::StreamExt as _;
11use tonic::{Code, Request, Response, Status};
12
13use re_arrow_util::RecordBatchExt as _;
14use re_chunk_store::{Chunk, ChunkStore, ChunkStoreHandle};
15use re_log_encoding::ToTransport as _;
16use re_log_types::{EntityPath, EntryId, StoreId, StoreKind};
17use re_protos::{
18    cloud::v1alpha1::{
19        DeleteEntryResponse, EntryDetails, EntryKind, FetchChunksRequest,
20        GetDatasetManifestSchemaRequest, GetDatasetManifestSchemaResponse,
21        GetDatasetSchemaResponse, GetPartitionTableSchemaResponse, QueryDatasetResponse,
22        QueryTasksOnCompletionRequest, QueryTasksOnCompletionResponse, QueryTasksRequest,
23        QueryTasksResponse, RegisterTableRequest, RegisterTableResponse,
24        RegisterWithDatasetResponse, ScanDatasetManifestRequest, ScanDatasetManifestResponse,
25        ScanPartitionTableResponse, ScanTableResponse,
26        ext::{
27            self, CreateDatasetEntryRequest, CreateDatasetEntryResponse, CreateTableEntryRequest,
28            CreateTableEntryResponse, DataSource, DatasetDetails, EntryDetailsUpdate,
29            ProviderDetails, ReadDatasetEntryResponse, ReadTableEntryResponse, TableInsertMode,
30            UpdateDatasetEntryRequest, UpdateDatasetEntryResponse, UpdateEntryRequest,
31            UpdateEntryResponse,
32        },
33        rerun_cloud_service_server::RerunCloudService,
34    },
35    common::v1alpha1::{
36        TaskId,
37        ext::{IfDuplicateBehavior, PartitionId},
38    },
39    headers::RerunHeadersExtractorExt as _,
40};
41
42use crate::entrypoint::NamedPath;
43use crate::store::{ChunkKey, Dataset, InMemoryStore, Table};
44
45#[derive(Debug, Default)]
46pub struct RerunCloudHandlerSettings {}
47
48#[derive(Default)]
49pub struct RerunCloudHandlerBuilder {
50    settings: RerunCloudHandlerSettings,
51
52    store: InMemoryStore,
53}
54
55impl RerunCloudHandlerBuilder {
56    pub fn new() -> Self {
57        Self::default()
58    }
59
60    pub fn with_directory_as_dataset(
61        mut self,
62        directory: &NamedPath,
63        on_duplicate: IfDuplicateBehavior,
64    ) -> Result<Self, crate::store::Error> {
65        self.store
66            .load_directory_as_dataset(directory, on_duplicate)?;
67
68        Ok(self)
69    }
70
71    #[cfg(feature = "lance")]
72    pub async fn with_directory_as_table(
73        mut self,
74        path: &NamedPath,
75        on_duplicate: IfDuplicateBehavior,
76    ) -> Result<Self, crate::store::Error> {
77        self.store
78            .load_directory_as_table(path, on_duplicate)
79            .await?;
80
81        Ok(self)
82    }
83
84    pub fn build(self) -> RerunCloudHandler {
85        RerunCloudHandler::new(self.settings, self.store)
86    }
87}
88
89// ---
90
91const DUMMY_TASK_ID: &str = "task_00000000DEADBEEF";
92
93pub struct RerunCloudHandler {
94    #[expect(dead_code)]
95    settings: RerunCloudHandlerSettings,
96
97    store: tokio::sync::RwLock<InMemoryStore>,
98}
99
100impl RerunCloudHandler {
101    pub fn new(settings: RerunCloudHandlerSettings, store: InMemoryStore) -> Self {
102        Self {
103            settings,
104            store: tokio::sync::RwLock::new(store),
105        }
106    }
107
108    /// Returns all the chunk stores of the specified dataset and partitions ids. If `partition_ids`
109    /// is empty, return stores of all partitions.
110    ///
111    /// Returns (partition id, layer name, store) tuples.
112    async fn get_chunk_stores(
113        &self,
114        dataset_id: EntryId,
115        partition_ids: &[PartitionId],
116    ) -> Result<Vec<(PartitionId, String, ChunkStoreHandle)>, tonic::Status> {
117        let store = self.store.read().await;
118        let dataset = store.dataset(dataset_id)?;
119
120        Ok(dataset
121            .partitions_from_ids(partition_ids)?
122            .flat_map(|(partition_id, partition)| {
123                partition.iter_layers().map(|(layer_name, layer)| {
124                    (
125                        partition_id.clone(),
126                        layer_name.to_owned(),
127                        layer.store_handle().clone(),
128                    )
129                })
130            })
131            .collect())
132    }
133
134    fn resolve_data_sources(data_sources: &[DataSource]) -> Result<Vec<DataSource>, tonic::Status> {
135        let mut resolved = Vec::<DataSource>::with_capacity(data_sources.len());
136        for source in data_sources {
137            if source.is_prefix {
138                let path = source.storage_url.to_file_path().map_err(|_err| {
139                    tonic::Status::invalid_argument(format!(
140                        "getting file path from {:?}",
141                        source.storage_url
142                    ))
143                })?;
144                let meta = std::fs::metadata(&path).map_err(|err| match err.kind() {
145                    std::io::ErrorKind::NotFound => {
146                        tonic::Status::invalid_argument(format!("Directory not found: {:?}", &path))
147                    }
148                    _ => tonic::Status::invalid_argument(format!(
149                        "Failed to read directory metadata {path:?}: {err:#}"
150                    )),
151                })?;
152                if !meta.is_dir() {
153                    return Err(tonic::Status::invalid_argument(format!(
154                        "Expected directory, got file: {path:?}"
155                    )));
156                }
157
158                // Recursively walk the directory and grab all '.rrd' files
159                let mut dirs_to_visit = vec![path];
160                let mut files = Vec::new();
161
162                while let Some(current_dir) = dirs_to_visit.pop() {
163                    let entries = std::fs::read_dir(&current_dir).map_err(|err| {
164                        tonic::Status::internal(format!(
165                            "Failed to read directory {current_dir:?}: {err:#}"
166                        ))
167                    })?;
168
169                    for entry in entries {
170                        let entry = entry.map_err(|err| {
171                            tonic::Status::internal(format!(
172                                "Failed to read directory entry: {err:#}"
173                            ))
174                        })?;
175                        let entry_path = entry.path();
176
177                        if entry_path.is_dir() {
178                            dirs_to_visit.push(entry_path);
179                        } else if let Some(extension) = entry_path.extension()
180                            && extension == "rrd"
181                        {
182                            files.push(entry_path);
183                        }
184                    }
185                }
186
187                for file_path in files {
188                    let mut file_url = source.storage_url.clone();
189                    file_url.set_path(&file_path.to_string_lossy());
190                    resolved.push(DataSource {
191                        storage_url: file_url,
192                        is_prefix: false,
193                        ..source.clone()
194                    });
195                }
196            } else {
197                resolved.push(source.clone());
198            }
199        }
200
201        Ok(resolved)
202    }
203}
204
205impl std::fmt::Debug for RerunCloudHandler {
206    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207        f.debug_struct("RerunCloudHandler").finish()
208    }
209}
210
211macro_rules! decl_stream {
212    ($stream:ident<manifest:$resp:ident>) => {
213        pub type $stream = std::pin::Pin<
214            Box<
215                dyn futures::Stream<Item = Result<re_protos::cloud::v1alpha1::$resp, tonic::Status>>
216                    + Send,
217            >,
218        >;
219    };
220
221    ($stream:ident<rerun_cloud:$resp:ident>) => {
222        pub type $stream = std::pin::Pin<
223            Box<
224                dyn futures::Stream<Item = Result<re_protos::cloud::v1alpha1::$resp, tonic::Status>>
225                    + Send,
226            >,
227        >;
228    };
229
230    ($stream:ident<tasks:$resp:ident>) => {
231        pub type $stream = std::pin::Pin<
232            Box<
233                dyn futures::Stream<Item = Result<re_protos::cloud::v1alpha1::$resp, tonic::Status>>
234                    + Send,
235            >,
236        >;
237    };
238}
239
240decl_stream!(FetchChunksResponseStream<manifest:FetchChunksResponse>);
241decl_stream!(QueryDatasetResponseStream<manifest:QueryDatasetResponse>);
242decl_stream!(ScanPartitionTableResponseStream<manifest:ScanPartitionTableResponse>);
243decl_stream!(ScanDatasetManifestResponseStream<manifest:ScanDatasetManifestResponse>);
244decl_stream!(SearchDatasetResponseStream<manifest:SearchDatasetResponse>);
245decl_stream!(ScanTableResponseStream<rerun_cloud:ScanTableResponse>);
246decl_stream!(QueryTasksOnCompletionResponseStream<tasks:QueryTasksOnCompletionResponse>);
247
248impl RerunCloudHandler {
249    async fn find_datasets(
250        &self,
251        entry_id: Option<EntryId>,
252        name: Option<String>,
253        store_kind: Option<StoreKind>,
254    ) -> Result<Vec<EntryDetails>, Status> {
255        let store = self.store.read().await;
256
257        let dataset = match (entry_id, name) {
258            (None, None) => None,
259
260            (Some(entry_id), None) => Some(store.dataset(entry_id)?),
261
262            (None, Some(name)) => Some(store.dataset_by_name(&name)?),
263
264            (Some(entry_id), Some(name)) => {
265                let dataset = store.dataset_by_name(&name)?;
266                if dataset.id() != entry_id {
267                    return Err(tonic::Status::not_found(format!(
268                        "Dataset with ID {entry_id} not found"
269                    )));
270                }
271                Some(dataset)
272            }
273        };
274
275        let dataset_iter = if let Some(dataset) = dataset {
276            itertools::Either::Left(std::iter::once(dataset))
277        } else {
278            itertools::Either::Right(store.iter_datasets())
279        };
280
281        Ok(dataset_iter
282            .filter(|dataset| {
283                store_kind.is_none_or(|store_kind| dataset.store_kind() == store_kind)
284            })
285            .map(Dataset::as_entry_details)
286            .map(Into::into)
287            .collect())
288    }
289
290    async fn find_tables(
291        &self,
292        entry_id: Option<EntryId>,
293        name: Option<String>,
294    ) -> Result<Vec<EntryDetails>, Status> {
295        let store = self.store.read().await;
296
297        let table = match (entry_id, name) {
298            (None, None) => None,
299
300            (Some(entry_id), None) => {
301                let Some(table) = store.table(entry_id) else {
302                    return Err(tonic::Status::not_found(format!(
303                        "Table with ID {entry_id} not found"
304                    )));
305                };
306                Some(table)
307            }
308
309            (None, Some(name)) => {
310                let Some(table) = store.table_by_name(&name) else {
311                    return Err(tonic::Status::not_found(format!(
312                        "Table with name {name} not found"
313                    )));
314                };
315                Some(table)
316            }
317
318            (Some(entry_id), Some(name)) => {
319                let Some(table) = store.table_by_name(&name) else {
320                    return Err(tonic::Status::not_found(format!(
321                        "Table with name {name} not found"
322                    )));
323                };
324                if table.id() != entry_id {
325                    return Err(tonic::Status::not_found(format!(
326                        "Table with ID {entry_id} not found"
327                    )));
328                }
329                Some(table)
330            }
331        };
332
333        let table_iter = if let Some(table) = table {
334            itertools::Either::Left(std::iter::once(table))
335        } else {
336            itertools::Either::Right(store.iter_tables())
337        };
338
339        Ok(table_iter
340            .map(Table::as_entry_details)
341            .map(Into::into)
342            .collect())
343    }
344}
345
346#[tonic::async_trait]
347impl RerunCloudService for RerunCloudHandler {
348    async fn version(
349        &self,
350        request: tonic::Request<re_protos::cloud::v1alpha1::VersionRequest>,
351    ) -> std::result::Result<
352        tonic::Response<re_protos::cloud::v1alpha1::VersionResponse>,
353        tonic::Status,
354    > {
355        let re_protos::cloud::v1alpha1::VersionRequest {} = request.into_inner();
356
357        // NOTE: Reminder that this is only fully filled iff CI=1.
358        let build_info = re_build_info::build_info!();
359
360        Ok(tonic::Response::new(
361            re_protos::cloud::v1alpha1::VersionResponse {
362                build_info: Some(build_info.into()),
363            },
364        ))
365    }
366
367    // --- Catalog ---
368
369    async fn find_entries(
370        &self,
371        request: tonic::Request<re_protos::cloud::v1alpha1::FindEntriesRequest>,
372    ) -> Result<tonic::Response<re_protos::cloud::v1alpha1::FindEntriesResponse>, tonic::Status>
373    {
374        let filter = request.into_inner().filter;
375        let entry_id = filter
376            .as_ref()
377            .and_then(|filter| filter.id)
378            .map(TryInto::try_into)
379            .transpose()?;
380        let name = filter.as_ref().and_then(|filter| filter.name.clone());
381        let kind = filter
382            .and_then(|filter| filter.entry_kind)
383            .map(EntryKind::try_from)
384            .transpose()
385            .map_err(|err| {
386                Status::invalid_argument(format!("find_entries: invalid entry kind {err}"))
387            })?;
388
389        let entries = match kind {
390            Some(EntryKind::Dataset) => {
391                self.find_datasets(entry_id, name, Some(StoreKind::Recording))
392                    .await?
393            }
394
395            Some(EntryKind::BlueprintDataset) => {
396                self.find_datasets(entry_id, name, Some(StoreKind::Blueprint))
397                    .await?
398            }
399
400            Some(EntryKind::Table) => self.find_tables(entry_id, name).await?,
401
402            Some(EntryKind::DatasetView | EntryKind::TableView) => {
403                return Err(Status::unimplemented(
404                    "find_entries: dataset and table views are not supported",
405                ));
406            }
407
408            Some(EntryKind::Unspecified) => {
409                return Err(Status::invalid_argument(
410                    "find_entries: entry kind unspecified",
411                ));
412            }
413
414            None => {
415                let mut datasets = match self.find_datasets(entry_id, name.clone(), None).await {
416                    Ok(datasets) => datasets,
417                    Err(err) => {
418                        if err.code() == Code::NotFound {
419                            vec![]
420                        } else {
421                            return Err(err);
422                        }
423                    }
424                };
425                let tables = match self.find_tables(entry_id, name).await {
426                    Ok(tables) => tables,
427                    Err(err) => {
428                        if err.code() == Code::NotFound {
429                            vec![]
430                        } else {
431                            return Err(err);
432                        }
433                    }
434                };
435                datasets.extend(tables);
436                datasets
437            }
438        };
439
440        let response = re_protos::cloud::v1alpha1::FindEntriesResponse { entries };
441
442        Ok(tonic::Response::new(response))
443    }
444
445    async fn create_dataset_entry(
446        &self,
447        request: tonic::Request<re_protos::cloud::v1alpha1::CreateDatasetEntryRequest>,
448    ) -> Result<
449        tonic::Response<re_protos::cloud::v1alpha1::CreateDatasetEntryResponse>,
450        tonic::Status,
451    > {
452        let CreateDatasetEntryRequest {
453            name: dataset_name,
454            id: dataset_id,
455        } = request.into_inner().try_into()?;
456
457        let mut store = self.store.write().await;
458
459        let dataset_id = dataset_id.unwrap_or_else(EntryId::new);
460        let blueprint_dataset_id = EntryId::new();
461        let blueprint_dataset_name = format!("__bp_{dataset_id}");
462
463        store.create_dataset(
464            &blueprint_dataset_name,
465            Some(blueprint_dataset_id),
466            StoreKind::Blueprint,
467            None,
468        )?;
469
470        let dataset_details = DatasetDetails {
471            blueprint_dataset: Some(blueprint_dataset_id),
472            default_blueprint: None,
473        };
474
475        let dataset = store.create_dataset(
476            &dataset_name,
477            Some(dataset_id),
478            StoreKind::Recording,
479            Some(dataset_details),
480        )?;
481
482        let dataset_entry = dataset.as_dataset_entry();
483
484        Ok(tonic::Response::new(
485            CreateDatasetEntryResponse {
486                dataset: dataset_entry,
487            }
488            .into(),
489        ))
490    }
491
492    async fn read_dataset_entry(
493        &self,
494        request: tonic::Request<re_protos::cloud::v1alpha1::ReadDatasetEntryRequest>,
495    ) -> Result<tonic::Response<re_protos::cloud::v1alpha1::ReadDatasetEntryResponse>, tonic::Status>
496    {
497        let store = self.store.read().await;
498        let entry_id = get_entry_id_from_headers(&store, &request)?;
499        let dataset = store.dataset(entry_id)?;
500
501        Ok(tonic::Response::new(
502            ReadDatasetEntryResponse {
503                dataset_entry: dataset.as_dataset_entry(),
504            }
505            .into(),
506        ))
507    }
508
509    async fn update_dataset_entry(
510        &self,
511        request: tonic::Request<re_protos::cloud::v1alpha1::UpdateDatasetEntryRequest>,
512    ) -> Result<
513        tonic::Response<re_protos::cloud::v1alpha1::UpdateDatasetEntryResponse>,
514        tonic::Status,
515    > {
516        let request: UpdateDatasetEntryRequest = request.into_inner().try_into()?;
517
518        let mut store = self.store.write().await;
519        let dataset = store.dataset_mut(request.id)?;
520
521        dataset.set_dataset_details(request.dataset_details);
522
523        Ok(tonic::Response::new(
524            UpdateDatasetEntryResponse {
525                dataset_entry: dataset.as_dataset_entry(),
526            }
527            .into(),
528        ))
529    }
530
531    async fn read_table_entry(
532        &self,
533        request: tonic::Request<re_protos::cloud::v1alpha1::ReadTableEntryRequest>,
534    ) -> std::result::Result<
535        tonic::Response<re_protos::cloud::v1alpha1::ReadTableEntryResponse>,
536        tonic::Status,
537    > {
538        let store = self.store.read().await;
539
540        let id = request
541            .into_inner()
542            .id
543            .ok_or(Status::invalid_argument("No table entry ID provided"))?
544            .try_into()?;
545
546        let table = store.table(id).ok_or_else(|| {
547            tonic::Status::not_found(format!("table with entry ID '{id}' not found"))
548        })?;
549
550        Ok(tonic::Response::new(
551            ReadTableEntryResponse {
552                table_entry: table.as_table_entry(),
553            }
554            .try_into()?,
555        ))
556    }
557
558    async fn delete_entry(
559        &self,
560        request: tonic::Request<re_protos::cloud::v1alpha1::DeleteEntryRequest>,
561    ) -> Result<tonic::Response<re_protos::cloud::v1alpha1::DeleteEntryResponse>, tonic::Status>
562    {
563        let entry_id = request.into_inner().try_into()?;
564
565        self.store.write().await.delete_entry(entry_id)?;
566
567        Ok(tonic::Response::new(DeleteEntryResponse {}))
568    }
569
570    async fn update_entry(
571        &self,
572        request: tonic::Request<re_protos::cloud::v1alpha1::UpdateEntryRequest>,
573    ) -> Result<tonic::Response<re_protos::cloud::v1alpha1::UpdateEntryResponse>, tonic::Status>
574    {
575        let UpdateEntryRequest {
576            id: entry_id,
577            entry_details_update: EntryDetailsUpdate { name },
578        } = request.into_inner().try_into()?;
579
580        let mut store = self.store.write().await;
581
582        if let Some(name) = name {
583            store.rename_entry(entry_id, name)?;
584        }
585
586        Ok(tonic::Response::new(
587            UpdateEntryResponse {
588                entry_details: store.entry_details(entry_id)?,
589            }
590            .into(),
591        ))
592    }
593
594    // --- Manifest Registry ---
595
596    async fn register_with_dataset(
597        &self,
598        request: tonic::Request<re_protos::cloud::v1alpha1::RegisterWithDatasetRequest>,
599    ) -> Result<
600        tonic::Response<re_protos::cloud::v1alpha1::RegisterWithDatasetResponse>,
601        tonic::Status,
602    > {
603        let mut store = self.store.write().await;
604        let dataset_id = get_entry_id_from_headers(&store, &request)?;
605        let dataset = store.dataset_mut(dataset_id)?;
606
607        let re_protos::cloud::v1alpha1::ext::RegisterWithDatasetRequest {
608            data_sources,
609            on_duplicate,
610        } = request.into_inner().try_into()?;
611
612        let mut partition_ids: Vec<String> = vec![];
613        let mut partition_layers: Vec<String> = vec![];
614        let mut partition_types: Vec<String> = vec![];
615        let mut storage_urls: Vec<String> = vec![];
616        let mut task_ids: Vec<String> = vec![];
617
618        let data_sources = Self::resolve_data_sources(&data_sources)?;
619
620        for source in data_sources {
621            let ext::DataSource {
622                storage_url,
623                is_prefix,
624                layer,
625                kind,
626            } = source;
627
628            if is_prefix {
629                return Err(tonic::Status::internal(
630                    "register_with_dataset: prefix data sources should have been resolved already",
631                ));
632            }
633
634            if kind != ext::DataSourceKind::Rrd {
635                return Err(tonic::Status::unimplemented(
636                    "register_with_dataset: only RRD data sources are implemented",
637                ));
638            }
639
640            if let Ok(rrd_path) = storage_url.to_file_path() {
641                let new_partition_ids = dataset.load_rrd(
642                    &rrd_path,
643                    Some(&layer),
644                    on_duplicate,
645                    dataset.store_kind(),
646                )?;
647
648                for partition_id in new_partition_ids {
649                    partition_ids.push(partition_id.to_string());
650                    partition_layers.push(layer.clone());
651                    partition_types.push("rrd".to_owned());
652                    // TODO(RR-2289): this should probably be a memory address
653                    storage_urls.push(storage_url.to_string());
654                    task_ids.push(DUMMY_TASK_ID.to_owned());
655                }
656            }
657        }
658
659        let record_batch = RegisterWithDatasetResponse::create_dataframe(
660            partition_ids,
661            partition_layers,
662            partition_types,
663            storage_urls,
664            task_ids,
665        )
666        .map_err(|err| tonic::Status::internal(format!("Failed to create dataframe: {err:#}")))?;
667        Ok(tonic::Response::new(
668            re_protos::cloud::v1alpha1::RegisterWithDatasetResponse {
669                data: Some(record_batch.into()),
670            },
671        ))
672    }
673
674    // TODO(RR-2017): This endpoint is in need of a deep redesign. For now it defaults to
675    // overwriting the "base" layer.
676    async fn write_chunks(
677        &self,
678        request: tonic::Request<tonic::Streaming<re_protos::cloud::v1alpha1::WriteChunksRequest>>,
679    ) -> Result<tonic::Response<re_protos::cloud::v1alpha1::WriteChunksResponse>, tonic::Status>
680    {
681        let entry_id = get_entry_id_from_headers(&*self.store.read().await, &request)?;
682
683        let mut request = request.into_inner();
684
685        let mut chunk_stores = HashMap::default();
686
687        while let Some(chunk_msg) = request.next().await {
688            let chunk_msg = chunk_msg?;
689
690            let chunk_batch: RecordBatch = chunk_msg
691                .chunk
692                .ok_or_else(|| tonic::Status::invalid_argument("no chunk in WriteChunksRequest"))?
693                .try_into()
694                .map_err(|err| {
695                    tonic::Status::internal(format!("Could not decode chunk: {err:#}"))
696                })?;
697
698            let partition_id: PartitionId = chunk_batch
699                .schema()
700                .metadata()
701                .get("rerun:partition_id")
702                .ok_or_else(|| {
703                    tonic::Status::invalid_argument(
704                        "Received chunk without 'rerun.partition_id' metadata",
705                    )
706                })?
707                .clone()
708                .into();
709
710            let chunk = Arc::new(Chunk::from_record_batch(&chunk_batch).map_err(|err| {
711                tonic::Status::internal(format!("error decoding chunk from record batch: {err:#}"))
712            })?);
713
714            chunk_stores
715                .entry(partition_id.clone())
716                .or_insert_with(|| {
717                    ChunkStore::new(
718                        StoreId::new(StoreKind::Recording, entry_id.to_string(), partition_id.id),
719                        InMemoryStore::chunk_store_config(),
720                    )
721                })
722                .insert_chunk(&chunk)
723                .map_err(|err| {
724                    tonic::Status::internal(format!("error adding chunk to store: {err:#}"))
725                })?;
726        }
727
728        let mut store = self.store.write().await;
729        let dataset = store.dataset_mut(entry_id)?;
730
731        #[expect(clippy::iter_over_hash_type)]
732        for (entity_path, chunk_store) in chunk_stores {
733            dataset.add_layer(
734                entity_path,
735                DataSource::DEFAULT_LAYER.to_owned(),
736                ChunkStoreHandle::new(chunk_store),
737                IfDuplicateBehavior::Error,
738            )?;
739        }
740
741        Ok(tonic::Response::new(
742            re_protos::cloud::v1alpha1::WriteChunksResponse {},
743        ))
744    }
745
746    async fn write_table(
747        &self,
748        request: tonic::Request<tonic::Streaming<re_protos::cloud::v1alpha1::WriteTableRequest>>,
749    ) -> Result<tonic::Response<re_protos::cloud::v1alpha1::WriteTableResponse>, tonic::Status>
750    {
751        // Limit the scope of the lock here to prevent deadlocks
752        // when reading and writing to the same table
753        let entry_id = {
754            let store = self.store.read().await;
755            get_entry_id_from_headers(&store, &request)?
756        };
757
758        let mut request = request.into_inner();
759
760        while let Some(write_msg) = request.next().await {
761            let write_msg = write_msg?;
762
763            let rb = write_msg
764                .dataframe_part
765                .ok_or_else(|| {
766                    tonic::Status::invalid_argument("no data frame in WriteTableRequest")
767                })?
768                .try_into()
769                .map_err(|err| {
770                    tonic::Status::internal(format!("Could not decode chunk: {err:#}"))
771                })?;
772
773            let mut store = self.store.write().await;
774            let Some(table) = store.table_mut(entry_id) else {
775                return Err(tonic::Status::not_found("table not found"));
776            };
777            let insert_op = match TableInsertMode::try_from(write_msg.insert_mode)
778                .map_err(|err| Status::invalid_argument(err.to_string()))?
779            {
780                TableInsertMode::Append => InsertOp::Append,
781                TableInsertMode::Overwrite => InsertOp::Overwrite,
782            };
783
784            table.write_table(rb, insert_op).await.map_err(|err| {
785                tonic::Status::internal(format!("error writing to table: {err:#}"))
786            })?;
787        }
788
789        Ok(tonic::Response::new(
790            re_protos::cloud::v1alpha1::WriteTableResponse {},
791        ))
792    }
793
794    /* Query schemas */
795
796    async fn get_partition_table_schema(
797        &self,
798        request: tonic::Request<re_protos::cloud::v1alpha1::GetPartitionTableSchemaRequest>,
799    ) -> std::result::Result<
800        tonic::Response<re_protos::cloud::v1alpha1::GetPartitionTableSchemaResponse>,
801        tonic::Status,
802    > {
803        let store = self.store.read().await;
804
805        let entry_id = get_entry_id_from_headers(&store, &request)?;
806        let dataset = store.dataset(entry_id)?;
807        let record_batch = dataset.partition_table().map_err(|err| {
808            tonic::Status::internal(format!("Unable to read partition table: {err:#}"))
809        })?;
810
811        Ok(tonic::Response::new(GetPartitionTableSchemaResponse {
812            schema: Some(
813                record_batch
814                    .schema_ref()
815                    .as_ref()
816                    .try_into()
817                    .map_err(|err| {
818                        tonic::Status::internal(format!(
819                            "unable to serialize Arrow schema: {err:#}"
820                        ))
821                    })?,
822            ),
823        }))
824    }
825
826    type ScanPartitionTableStream = ScanPartitionTableResponseStream;
827
828    async fn scan_partition_table(
829        &self,
830        request: tonic::Request<re_protos::cloud::v1alpha1::ScanPartitionTableRequest>,
831    ) -> Result<tonic::Response<Self::ScanPartitionTableStream>, tonic::Status> {
832        let store = self.store.read().await;
833        let entry_id = get_entry_id_from_headers(&store, &request)?;
834
835        let request = request.into_inner();
836
837        let dataset = store.dataset(entry_id)?;
838        let mut record_batch = dataset.partition_table().map_err(|err| {
839            tonic::Status::internal(format!("Unable to read partition table: {err:#}"))
840        })?;
841
842        // project columns
843        if !request.columns.is_empty() {
844            record_batch = record_batch
845                .project_columns(request.columns.iter().map(|s| s.as_str()))
846                .map_err(|err| {
847                    tonic::Status::invalid_argument(format!("Unable to project columns: {err:#}"))
848                })?;
849        }
850
851        let stream = futures::stream::once(async move {
852            Ok(ScanPartitionTableResponse {
853                data: Some(record_batch.into()),
854            })
855        });
856
857        Ok(tonic::Response::new(
858            Box::pin(stream) as Self::ScanPartitionTableStream
859        ))
860    }
861
862    async fn get_dataset_manifest_schema(
863        &self,
864        request: Request<GetDatasetManifestSchemaRequest>,
865    ) -> Result<Response<GetDatasetManifestSchemaResponse>, Status> {
866        let store = self.store.read().await;
867
868        let entry_id = get_entry_id_from_headers(&store, &request)?;
869        let dataset = store.dataset(entry_id)?;
870        let record_batch = dataset.dataset_manifest()?;
871
872        Ok(tonic::Response::new(GetDatasetManifestSchemaResponse {
873            schema: Some(
874                record_batch
875                    .schema_ref()
876                    .as_ref()
877                    .try_into()
878                    .map_err(|err| {
879                        tonic::Status::internal(format!(
880                            "unable to serialize Arrow schema: {err:#}"
881                        ))
882                    })?,
883            ),
884        }))
885    }
886
887    type ScanDatasetManifestStream = ScanDatasetManifestResponseStream;
888
889    async fn scan_dataset_manifest(
890        &self,
891        request: Request<ScanDatasetManifestRequest>,
892    ) -> Result<Response<Self::ScanDatasetManifestStream>, Status> {
893        let store = self.store.read().await;
894        let entry_id = get_entry_id_from_headers(&store, &request)?;
895
896        let request = request.into_inner();
897
898        let dataset = store.dataset(entry_id)?;
899        let mut record_batch = dataset.dataset_manifest()?;
900
901        // project columns
902        if !request.columns.is_empty() {
903            record_batch = record_batch
904                .project_columns(request.columns.iter().map(|s| s.as_str()))
905                .map_err(|err| {
906                    tonic::Status::invalid_argument(format!("Unable to project columns: {err:#}"))
907                })?;
908        }
909
910        let stream = futures::stream::once(async move {
911            Ok(ScanDatasetManifestResponse {
912                data: Some(record_batch.into()),
913            })
914        });
915
916        Ok(tonic::Response::new(
917            Box::pin(stream) as Self::ScanDatasetManifestStream
918        ))
919    }
920
921    async fn get_dataset_schema(
922        &self,
923        request: tonic::Request<re_protos::cloud::v1alpha1::GetDatasetSchemaRequest>,
924    ) -> std::result::Result<
925        tonic::Response<re_protos::cloud::v1alpha1::GetDatasetSchemaResponse>,
926        tonic::Status,
927    > {
928        let store = self.store.read().await;
929        let entry_id = get_entry_id_from_headers(&store, &request)?;
930
931        let dataset = store.dataset(entry_id)?;
932        let schema = dataset.schema().map_err(|err| {
933            tonic::Status::internal(format!("Unable to read dataset schema: {err:#}"))
934        })?;
935
936        Ok(tonic::Response::new(GetDatasetSchemaResponse {
937            schema: Some((&schema).try_into().map_err(|err| {
938                tonic::Status::internal(format!("Unable to serialize Arrow schema: {err:#}"))
939            })?),
940        }))
941    }
942
943    /* Indexing */
944
945    async fn create_index(
946        &self,
947        _request: tonic::Request<re_protos::cloud::v1alpha1::CreateIndexRequest>,
948    ) -> std::result::Result<
949        tonic::Response<re_protos::cloud::v1alpha1::CreateIndexResponse>,
950        tonic::Status,
951    > {
952        Err(tonic::Status::unimplemented("create_index not implemented"))
953    }
954
955    async fn list_indexes(
956        &self,
957        _request: tonic::Request<re_protos::cloud::v1alpha1::ListIndexesRequest>,
958    ) -> std::result::Result<
959        tonic::Response<re_protos::cloud::v1alpha1::ListIndexesResponse>,
960        tonic::Status,
961    > {
962        Err(tonic::Status::unimplemented("list_indexes not implemented"))
963    }
964
965    async fn delete_indexes(
966        &self,
967        _request: tonic::Request<re_protos::cloud::v1alpha1::DeleteIndexesRequest>,
968    ) -> std::result::Result<
969        tonic::Response<re_protos::cloud::v1alpha1::DeleteIndexesResponse>,
970        tonic::Status,
971    > {
972        Err(tonic::Status::unimplemented(
973            "delete_indexes not implemented",
974        ))
975    }
976
977    /* Queries */
978
979    type SearchDatasetStream = SearchDatasetResponseStream;
980
981    async fn search_dataset(
982        &self,
983        _request: tonic::Request<re_protos::cloud::v1alpha1::SearchDatasetRequest>,
984    ) -> std::result::Result<tonic::Response<Self::SearchDatasetStream>, tonic::Status> {
985        Err(tonic::Status::unimplemented(
986            "search_dataset not implemented",
987        ))
988    }
989
990    type QueryDatasetStream = QueryDatasetResponseStream;
991
992    async fn query_dataset(
993        &self,
994        request: tonic::Request<re_protos::cloud::v1alpha1::QueryDatasetRequest>,
995    ) -> std::result::Result<tonic::Response<Self::QueryDatasetStream>, tonic::Status> {
996        if !request.get_ref().chunk_ids.is_empty() {
997            return Err(tonic::Status::unimplemented(
998                "query_dataset: querying specific chunk ids is not implemented",
999            ));
1000        }
1001
1002        let entry_id = get_entry_id_from_headers(&*self.store.read().await, &request)?;
1003
1004        let re_protos::cloud::v1alpha1::QueryDatasetRequest {
1005            partition_ids,
1006            entity_paths,
1007            select_all_entity_paths,
1008
1009            //TODO(RR-2613): we must do a much better job at handling these
1010            chunk_ids: _,
1011            fuzzy_descriptors: _,
1012            exclude_static_data: _,
1013            exclude_temporal_data: _,
1014            scan_parameters: _,
1015            query: _,
1016        } = request.into_inner();
1017
1018        let entity_paths: IntSet<EntityPath> = entity_paths
1019            .into_iter()
1020            .map(EntityPath::try_from)
1021            .collect::<Result<IntSet<EntityPath>, _>>()?;
1022        if select_all_entity_paths && !entity_paths.is_empty() {
1023            return Err(tonic::Status::invalid_argument(
1024                "cannot specify entity paths if `select_all_entity_paths` is true",
1025            ));
1026        }
1027
1028        let partition_ids = partition_ids
1029            .into_iter()
1030            .map(PartitionId::try_from)
1031            .collect::<Result<Vec<_>, _>>()?;
1032
1033        let chunk_stores = self.get_chunk_stores(entry_id, &partition_ids).await?;
1034
1035        if chunk_stores.is_empty() {
1036            let stream = futures::stream::iter([{
1037                let batch = QueryDatasetResponse::create_empty_dataframe();
1038                let data = Some(batch.into());
1039                Ok(QueryDatasetResponse { data })
1040            }]);
1041
1042            return Ok(tonic::Response::new(
1043                Box::pin(stream) as Self::QueryDatasetStream
1044            ));
1045        }
1046
1047        let stream = futures::stream::iter(chunk_stores.into_iter().map(
1048            move |(partition_id, layer_name, store_handle)| {
1049                let num_chunks = store_handle.read().num_chunks();
1050
1051                let mut chunk_ids = Vec::with_capacity(num_chunks);
1052                let mut chunk_partition_ids = Vec::with_capacity(num_chunks);
1053                let mut chunk_keys = Vec::with_capacity(num_chunks);
1054                let mut chunk_entity_path = Vec::with_capacity(num_chunks);
1055                let mut chunk_is_static = Vec::with_capacity(num_chunks);
1056
1057                let mut timelines = BTreeMap::new();
1058
1059                for chunk in store_handle.read().iter_chunks() {
1060                    if !entity_paths.is_empty() && !entity_paths.contains(chunk.entity_path()) {
1061                        continue;
1062                    }
1063
1064                    let mut missing_timelines: BTreeSet<_> = timelines.keys().copied().collect();
1065                    for (timeline_name, timeline_col) in chunk.timelines() {
1066                        let range = timeline_col.time_range();
1067                        let time_min = range.min();
1068                        let time_max = range.max();
1069
1070                        let timeline_name = timeline_name.as_str();
1071                        missing_timelines.remove(timeline_name);
1072                        let timeline_data_type = timeline_col.times_array().data_type().to_owned();
1073
1074                        let timeline_data = timelines.entry(timeline_name).or_insert((
1075                            timeline_data_type,
1076                            vec![None; chunk_partition_ids.len()],
1077                            vec![None; chunk_partition_ids.len()],
1078                        ));
1079
1080                        timeline_data.1.push(Some(time_min.as_i64()));
1081                        timeline_data.2.push(Some(time_max.as_i64()));
1082                    }
1083                    for timeline_name in missing_timelines {
1084                        let timeline_data = timelines
1085                            .get_mut(timeline_name)
1086                            .expect("timeline_names already checked"); // Already checked
1087
1088                        timeline_data.1.push(None);
1089                        timeline_data.2.push(None);
1090                    }
1091
1092                    chunk_partition_ids.push(partition_id.id.clone());
1093                    chunk_ids.push(chunk.id());
1094                    chunk_entity_path.push(chunk.entity_path().to_string());
1095                    chunk_is_static.push(chunk.is_static());
1096                    chunk_keys.push(
1097                        ChunkKey {
1098                            chunk_id: chunk.id(),
1099                            partition_id: partition_id.clone(),
1100                            layer_name: layer_name.clone(),
1101                            dataset_id: entry_id,
1102                        }
1103                        .encode()?,
1104                    );
1105                }
1106
1107                let chunk_layer_names = vec![layer_name.clone(); chunk_ids.len()];
1108                let chunk_key_refs = chunk_keys.iter().map(|v| v.as_slice()).collect();
1109                let batch = QueryDatasetResponse::create_dataframe(
1110                    chunk_ids,
1111                    chunk_partition_ids,
1112                    chunk_layer_names,
1113                    chunk_key_refs,
1114                    chunk_entity_path,
1115                    chunk_is_static,
1116                )
1117                .map_err(|err| {
1118                    tonic::Status::internal(format!("Failed to create dataframe: {err:#}"))
1119                })?;
1120
1121                let data = Some(batch.into());
1122
1123                Ok(QueryDatasetResponse { data })
1124            },
1125        ));
1126
1127        Ok(tonic::Response::new(
1128            Box::pin(stream) as Self::QueryDatasetStream
1129        ))
1130    }
1131
1132    type FetchChunksStream = FetchChunksResponseStream;
1133
1134    async fn fetch_chunks(
1135        &self,
1136        request: tonic::Request<re_protos::cloud::v1alpha1::FetchChunksRequest>,
1137    ) -> std::result::Result<tonic::Response<Self::FetchChunksStream>, tonic::Status> {
1138        // worth noting that FetchChunks is not per-dataset request, it simply contains chunk infos
1139        let request = request.into_inner();
1140
1141        let mut chunk_keys = vec![];
1142        for chunk_info_data in request.chunk_infos {
1143            let chunk_info_batch: RecordBatch = chunk_info_data.try_into().map_err(|err| {
1144                tonic::Status::internal(format!("Failed to decode chunk_info: {err:#}"))
1145            })?;
1146
1147            let schema = chunk_info_batch.schema();
1148
1149            let chunk_key_col_idx = schema
1150                .column_with_name(FetchChunksRequest::FIELD_CHUNK_KEY)
1151                .ok_or_else(|| {
1152                    tonic::Status::invalid_argument(format!(
1153                        "Missing {} column",
1154                        FetchChunksRequest::FIELD_CHUNK_KEY
1155                    ))
1156                })?
1157                .0;
1158
1159            let chunk_keys_arr = chunk_info_batch
1160                .column(chunk_key_col_idx)
1161                .as_any()
1162                .downcast_ref::<BinaryArray>()
1163                .ok_or_else(|| {
1164                    tonic::Status::invalid_argument(format!(
1165                        "{} must be binary array",
1166                        FetchChunksRequest::FIELD_CHUNK_KEY
1167                    ))
1168                })?;
1169
1170            for chunk_key in chunk_keys_arr {
1171                let chunk_key = chunk_key.ok_or_else(|| {
1172                    tonic::Status::invalid_argument(format!(
1173                        "{} must not be null",
1174                        FetchChunksRequest::FIELD_CHUNK_KEY
1175                    ))
1176                })?;
1177
1178                let chunk_key = ChunkKey::decode(chunk_key)?;
1179                chunk_keys.push(chunk_key);
1180            }
1181        }
1182
1183        let chunks = self
1184            .store
1185            .read()
1186            .await
1187            .chunks_from_chunk_keys(&chunk_keys)?;
1188
1189        let compression = re_log_encoding::Compression::Off;
1190
1191        let encoded_chunks = chunks
1192            .into_iter()
1193            .map(|(store_id, chunk)| {
1194                let arrow_msg = re_log_types::ArrowMsg {
1195                    chunk_id: *chunk.id(),
1196                    batch: chunk.to_record_batch().map_err(|err| {
1197                        tonic::Status::internal(format!(
1198                            "failed to convert chunk to record batch: {err:#}"
1199                        ))
1200                    })?,
1201                    on_release: None,
1202                };
1203
1204                arrow_msg
1205                    .to_transport((store_id, compression))
1206                    .map_err(|err| tonic::Status::internal(format!("encoding failed: {err:#}")))
1207            })
1208            .collect::<Result<Vec<_>, _>>()?;
1209
1210        let response = re_protos::cloud::v1alpha1::FetchChunksResponse {
1211            chunks: encoded_chunks,
1212        };
1213
1214        let stream = futures::stream::once(async move { Ok(response) });
1215
1216        Ok(tonic::Response::new(
1217            Box::pin(stream) as Self::FetchChunksStream
1218        ))
1219    }
1220
1221    // --- Table APIs ---
1222
1223    async fn register_table(
1224        &self,
1225        request: tonic::Request<RegisterTableRequest>,
1226    ) -> Result<tonic::Response<RegisterTableResponse>, tonic::Status> {
1227        #[cfg_attr(not(feature = "lance"), expect(unused_mut))]
1228        let mut store = self.store.write().await;
1229        let request = request.into_inner();
1230        let Some(provider_details) = request.provider_details else {
1231            return Err(tonic::Status::invalid_argument("Missing provider details"));
1232        };
1233        #[cfg_attr(not(feature = "lance"), expect(unused_variables))]
1234        let lance_table = match ProviderDetails::try_from(&provider_details) {
1235            Ok(ProviderDetails::LanceTable(lance_table)) => lance_table.table_url,
1236            Ok(ProviderDetails::SystemTable(_)) => Err(Status::invalid_argument(
1237                "System tables cannot be registered",
1238            ))?,
1239            Err(err) => return Err(err.into()),
1240        }
1241        .to_file_path()
1242        .map_err(|()| tonic::Status::invalid_argument("Invalid lance table path"))?;
1243
1244        #[cfg(feature = "lance")]
1245        let entry_id = {
1246            let named_path = NamedPath {
1247                name: Some(request.name.clone()),
1248                path: lance_table,
1249            };
1250
1251            store
1252                .load_directory_as_table(&named_path, IfDuplicateBehavior::Error)
1253                .await?
1254        };
1255
1256        #[cfg(not(feature = "lance"))]
1257        let entry_id = EntryId::new();
1258
1259        let table_entry = store
1260            .table(entry_id)
1261            .ok_or(Status::internal("table missing that was just registered"))?
1262            .as_table_entry();
1263
1264        let response = RegisterTableResponse {
1265            table_entry: Some(table_entry.try_into()?),
1266        };
1267
1268        Ok(response.into())
1269    }
1270
1271    async fn get_table_schema(
1272        &self,
1273        request: tonic::Request<re_protos::cloud::v1alpha1::GetTableSchemaRequest>,
1274    ) -> Result<tonic::Response<re_protos::cloud::v1alpha1::GetTableSchemaResponse>, Status> {
1275        let store = self.store.read().await;
1276        let Some(entry_id) = request.into_inner().table_id else {
1277            return Err(Status::not_found("Table ID not specified in request"));
1278        };
1279        let entry_id = entry_id.try_into()?;
1280
1281        let table = store
1282            .table(entry_id)
1283            .ok_or_else(|| Status::not_found(format!("Entry with ID {entry_id} not found")))?;
1284
1285        let schema = table.schema();
1286
1287        Ok(tonic::Response::new(
1288            re_protos::cloud::v1alpha1::GetTableSchemaResponse {
1289                schema: Some(schema.as_ref().try_into().map_err(|err| {
1290                    Status::internal(format!("Unable to serialize Arrow schema: {err:#}"))
1291                })?),
1292            },
1293        ))
1294    }
1295
1296    type ScanTableStream = ScanTableResponseStream;
1297
1298    async fn scan_table(
1299        &self,
1300        request: tonic::Request<re_protos::cloud::v1alpha1::ScanTableRequest>,
1301    ) -> Result<tonic::Response<Self::ScanTableStream>, Status> {
1302        let store = self.store.read().await;
1303        let Some(entry_id) = request.into_inner().table_id else {
1304            return Err(Status::not_found("Table ID not specified in request"));
1305        };
1306        let entry_id = entry_id.try_into()?;
1307
1308        let table = store
1309            .table(entry_id)
1310            .ok_or_else(|| Status::not_found(format!("Entry with ID {entry_id} not found")))?;
1311
1312        let ctx = SessionContext::default();
1313        let plan = table
1314            .provider()
1315            .scan(&ctx.state(), None, &[], None)
1316            .await
1317            .map_err(|err| Status::internal(format!("failed to scan table: {err:#}")))?;
1318
1319        let stream = plan
1320            .execute(0, ctx.task_ctx())
1321            .map_err(|err| tonic::Status::from_error(Box::new(err)))?;
1322
1323        let resp_stream = stream.map(|batch| {
1324            let batch = batch.map_err(|err| tonic::Status::from_error(Box::new(err)))?;
1325            Ok(ScanTableResponse {
1326                dataframe_part: Some(batch.into()),
1327            })
1328        });
1329
1330        Ok(tonic::Response::new(
1331            Box::pin(resp_stream) as Self::ScanTableStream
1332        ))
1333    }
1334
1335    // --- Tasks service ---
1336
1337    async fn query_tasks(
1338        &self,
1339        request: tonic::Request<QueryTasksRequest>,
1340    ) -> Result<tonic::Response<QueryTasksResponse>, tonic::Status> {
1341        let tasks_id = request.into_inner().ids;
1342
1343        let dummy_task_id = TaskId {
1344            id: DUMMY_TASK_ID.to_owned(),
1345        };
1346
1347        for task_id in &tasks_id {
1348            if task_id != &dummy_task_id {
1349                return Err(tonic::Status::not_found(format!(
1350                    "task {} not found",
1351                    task_id.id
1352                )));
1353            }
1354        }
1355
1356        let rb = QueryTasksResponse::create_dataframe(
1357            vec![DUMMY_TASK_ID.to_owned()],
1358            vec![None],
1359            vec![None],
1360            vec!["success".to_owned()],
1361            vec![None],
1362            vec![None],
1363            vec![None],
1364            vec![None],
1365            vec![1],
1366            vec![None],
1367            vec![None],
1368        )
1369        .expect("constant content that should always succeed");
1370
1371        // All tasks finish immediately in the OSS server
1372        Ok(tonic::Response::new(QueryTasksResponse {
1373            data: Some(rb.into()),
1374        }))
1375    }
1376
1377    type QueryTasksOnCompletionStream = QueryTasksOnCompletionResponseStream;
1378
1379    async fn query_tasks_on_completion(
1380        &self,
1381        request: tonic::Request<QueryTasksOnCompletionRequest>,
1382    ) -> Result<tonic::Response<Self::QueryTasksOnCompletionStream>, tonic::Status> {
1383        let task_ids = request.into_inner().ids;
1384
1385        // All tasks finish immediately in the OSS server, so we can delegate to `query_tasks
1386        let response_data = self
1387            .query_tasks(tonic::Request::new(QueryTasksRequest { ids: task_ids }))
1388            .await?
1389            .into_inner()
1390            .data;
1391
1392        Ok(tonic::Response::new(
1393            Box::pin(futures::stream::once(async move {
1394                Ok(QueryTasksOnCompletionResponse {
1395                    data: response_data,
1396                })
1397            })) as Self::QueryTasksOnCompletionStream,
1398        ))
1399    }
1400
1401    async fn do_maintenance(
1402        &self,
1403        _request: tonic::Request<re_protos::cloud::v1alpha1::DoMaintenanceRequest>,
1404    ) -> Result<tonic::Response<re_protos::cloud::v1alpha1::DoMaintenanceResponse>, tonic::Status>
1405    {
1406        Err(tonic::Status::unimplemented(
1407            "do_maintenance not implemented",
1408        ))
1409    }
1410
1411    async fn do_global_maintenance(
1412        &self,
1413        _request: tonic::Request<re_protos::cloud::v1alpha1::DoGlobalMaintenanceRequest>,
1414    ) -> Result<
1415        tonic::Response<re_protos::cloud::v1alpha1::DoGlobalMaintenanceResponse>,
1416        tonic::Status,
1417    > {
1418        Err(tonic::Status::unimplemented(
1419            "do_global_maintenance not implemented",
1420        ))
1421    }
1422
1423    async fn create_table_entry(
1424        &self,
1425        request: Request<re_protos::cloud::v1alpha1::CreateTableEntryRequest>,
1426    ) -> Result<Response<re_protos::cloud::v1alpha1::CreateTableEntryResponse>, Status> {
1427        let mut store = self.store.write().await;
1428
1429        let request: CreateTableEntryRequest = request.into_inner().try_into()?;
1430        let table_name = &request.name;
1431
1432        let schema = Arc::new(request.schema);
1433
1434        let table = match &request.provider_details {
1435            ProviderDetails::LanceTable(table) => {
1436                store
1437                    .create_table_entry(table_name, &table.table_url, schema)
1438                    .await?
1439            }
1440            ProviderDetails::SystemTable(_) => {
1441                return Err(tonic::Status::invalid_argument(
1442                    "Creating system tables is not supported",
1443                ));
1444            }
1445        };
1446
1447        Ok(Response::new(
1448            CreateTableEntryResponse { table }.try_into()?,
1449        ))
1450    }
1451}
1452
1453/// Retrieves the entry ID based on HTTP headers.
1454fn get_entry_id_from_headers<T>(
1455    store: &InMemoryStore,
1456    req: &tonic::Request<T>,
1457) -> Result<EntryId, tonic::Status> {
1458    if let Some(entry_id) = req.entry_id()? {
1459        Ok(entry_id)
1460    } else if let Some(dataset_name) = req.entry_name()? {
1461        Ok(store.dataset_by_name(&dataset_name)?.id())
1462    } else {
1463        const HEADERS: &[&str] = &[
1464            re_protos::headers::RERUN_HTTP_HEADER_ENTRY_ID,
1465            re_protos::headers::RERUN_HTTP_HEADER_ENTRY_NAME,
1466        ];
1467        Err(tonic::Status::invalid_argument(format!(
1468            "missing mandatory {HEADERS:?} HTTP headers"
1469        )))
1470    }
1471}