1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3
4use ahash::HashMap;
5use arrow::array::BinaryArray;
6use arrow::record_batch::RecordBatch;
7use datafusion::logical_expr::dml::InsertOp;
8use datafusion::prelude::SessionContext;
9use nohash_hasher::IntSet;
10use tokio_stream::StreamExt as _;
11use tonic::{Code, Request, Response, Status};
12
13use re_arrow_util::RecordBatchExt as _;
14use re_chunk_store::{Chunk, ChunkStore, ChunkStoreHandle};
15use re_log_encoding::ToTransport as _;
16use re_log_types::{EntityPath, EntryId, StoreId, StoreKind};
17use re_protos::{
18 cloud::v1alpha1::{
19 DeleteEntryResponse, EntryDetails, EntryKind, FetchChunksRequest,
20 GetDatasetManifestSchemaRequest, GetDatasetManifestSchemaResponse,
21 GetDatasetSchemaResponse, GetPartitionTableSchemaResponse, QueryDatasetResponse,
22 QueryTasksOnCompletionRequest, QueryTasksOnCompletionResponse, QueryTasksRequest,
23 QueryTasksResponse, RegisterTableRequest, RegisterTableResponse,
24 RegisterWithDatasetResponse, ScanDatasetManifestRequest, ScanDatasetManifestResponse,
25 ScanPartitionTableResponse, ScanTableResponse,
26 ext::{
27 self, CreateDatasetEntryRequest, CreateDatasetEntryResponse, CreateTableEntryRequest,
28 CreateTableEntryResponse, DataSource, DatasetDetails, EntryDetailsUpdate,
29 ProviderDetails, ReadDatasetEntryResponse, ReadTableEntryResponse, TableInsertMode,
30 UpdateDatasetEntryRequest, UpdateDatasetEntryResponse, UpdateEntryRequest,
31 UpdateEntryResponse,
32 },
33 rerun_cloud_service_server::RerunCloudService,
34 },
35 common::v1alpha1::{
36 TaskId,
37 ext::{IfDuplicateBehavior, PartitionId},
38 },
39 headers::RerunHeadersExtractorExt as _,
40};
41
42use crate::entrypoint::NamedPath;
43use crate::store::{ChunkKey, Dataset, InMemoryStore, Table};
44
45#[derive(Debug, Default)]
46pub struct RerunCloudHandlerSettings {}
47
48#[derive(Default)]
49pub struct RerunCloudHandlerBuilder {
50 settings: RerunCloudHandlerSettings,
51
52 store: InMemoryStore,
53}
54
55impl RerunCloudHandlerBuilder {
56 pub fn new() -> Self {
57 Self::default()
58 }
59
60 pub fn with_directory_as_dataset(
61 mut self,
62 directory: &NamedPath,
63 on_duplicate: IfDuplicateBehavior,
64 ) -> Result<Self, crate::store::Error> {
65 self.store
66 .load_directory_as_dataset(directory, on_duplicate)?;
67
68 Ok(self)
69 }
70
71 #[cfg(feature = "lance")]
72 pub async fn with_directory_as_table(
73 mut self,
74 path: &NamedPath,
75 on_duplicate: IfDuplicateBehavior,
76 ) -> Result<Self, crate::store::Error> {
77 self.store
78 .load_directory_as_table(path, on_duplicate)
79 .await?;
80
81 Ok(self)
82 }
83
84 pub fn build(self) -> RerunCloudHandler {
85 RerunCloudHandler::new(self.settings, self.store)
86 }
87}
88
89const DUMMY_TASK_ID: &str = "task_00000000DEADBEEF";
92
93pub struct RerunCloudHandler {
94 #[expect(dead_code)]
95 settings: RerunCloudHandlerSettings,
96
97 store: tokio::sync::RwLock<InMemoryStore>,
98}
99
100impl RerunCloudHandler {
101 pub fn new(settings: RerunCloudHandlerSettings, store: InMemoryStore) -> Self {
102 Self {
103 settings,
104 store: tokio::sync::RwLock::new(store),
105 }
106 }
107
108 async fn get_chunk_stores(
113 &self,
114 dataset_id: EntryId,
115 partition_ids: &[PartitionId],
116 ) -> Result<Vec<(PartitionId, String, ChunkStoreHandle)>, tonic::Status> {
117 let store = self.store.read().await;
118 let dataset = store.dataset(dataset_id)?;
119
120 Ok(dataset
121 .partitions_from_ids(partition_ids)?
122 .flat_map(|(partition_id, partition)| {
123 partition.iter_layers().map(|(layer_name, layer)| {
124 (
125 partition_id.clone(),
126 layer_name.to_owned(),
127 layer.store_handle().clone(),
128 )
129 })
130 })
131 .collect())
132 }
133
134 fn resolve_data_sources(data_sources: &[DataSource]) -> Result<Vec<DataSource>, tonic::Status> {
135 let mut resolved = Vec::<DataSource>::with_capacity(data_sources.len());
136 for source in data_sources {
137 if source.is_prefix {
138 let path = source.storage_url.to_file_path().map_err(|_err| {
139 tonic::Status::invalid_argument(format!(
140 "getting file path from {:?}",
141 source.storage_url
142 ))
143 })?;
144 let meta = std::fs::metadata(&path).map_err(|err| match err.kind() {
145 std::io::ErrorKind::NotFound => {
146 tonic::Status::invalid_argument(format!("Directory not found: {:?}", &path))
147 }
148 _ => tonic::Status::invalid_argument(format!(
149 "Failed to read directory metadata {path:?}: {err:#}"
150 )),
151 })?;
152 if !meta.is_dir() {
153 return Err(tonic::Status::invalid_argument(format!(
154 "Expected directory, got file: {path:?}"
155 )));
156 }
157
158 let mut dirs_to_visit = vec![path];
160 let mut files = Vec::new();
161
162 while let Some(current_dir) = dirs_to_visit.pop() {
163 let entries = std::fs::read_dir(¤t_dir).map_err(|err| {
164 tonic::Status::internal(format!(
165 "Failed to read directory {current_dir:?}: {err:#}"
166 ))
167 })?;
168
169 for entry in entries {
170 let entry = entry.map_err(|err| {
171 tonic::Status::internal(format!(
172 "Failed to read directory entry: {err:#}"
173 ))
174 })?;
175 let entry_path = entry.path();
176
177 if entry_path.is_dir() {
178 dirs_to_visit.push(entry_path);
179 } else if let Some(extension) = entry_path.extension()
180 && extension == "rrd"
181 {
182 files.push(entry_path);
183 }
184 }
185 }
186
187 for file_path in files {
188 let mut file_url = source.storage_url.clone();
189 file_url.set_path(&file_path.to_string_lossy());
190 resolved.push(DataSource {
191 storage_url: file_url,
192 is_prefix: false,
193 ..source.clone()
194 });
195 }
196 } else {
197 resolved.push(source.clone());
198 }
199 }
200
201 Ok(resolved)
202 }
203}
204
205impl std::fmt::Debug for RerunCloudHandler {
206 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207 f.debug_struct("RerunCloudHandler").finish()
208 }
209}
210
211macro_rules! decl_stream {
212 ($stream:ident<manifest:$resp:ident>) => {
213 pub type $stream = std::pin::Pin<
214 Box<
215 dyn futures::Stream<Item = Result<re_protos::cloud::v1alpha1::$resp, tonic::Status>>
216 + Send,
217 >,
218 >;
219 };
220
221 ($stream:ident<rerun_cloud:$resp:ident>) => {
222 pub type $stream = std::pin::Pin<
223 Box<
224 dyn futures::Stream<Item = Result<re_protos::cloud::v1alpha1::$resp, tonic::Status>>
225 + Send,
226 >,
227 >;
228 };
229
230 ($stream:ident<tasks:$resp:ident>) => {
231 pub type $stream = std::pin::Pin<
232 Box<
233 dyn futures::Stream<Item = Result<re_protos::cloud::v1alpha1::$resp, tonic::Status>>
234 + Send,
235 >,
236 >;
237 };
238}
239
240decl_stream!(FetchChunksResponseStream<manifest:FetchChunksResponse>);
241decl_stream!(QueryDatasetResponseStream<manifest:QueryDatasetResponse>);
242decl_stream!(ScanPartitionTableResponseStream<manifest:ScanPartitionTableResponse>);
243decl_stream!(ScanDatasetManifestResponseStream<manifest:ScanDatasetManifestResponse>);
244decl_stream!(SearchDatasetResponseStream<manifest:SearchDatasetResponse>);
245decl_stream!(ScanTableResponseStream<rerun_cloud:ScanTableResponse>);
246decl_stream!(QueryTasksOnCompletionResponseStream<tasks:QueryTasksOnCompletionResponse>);
247
248impl RerunCloudHandler {
249 async fn find_datasets(
250 &self,
251 entry_id: Option<EntryId>,
252 name: Option<String>,
253 store_kind: Option<StoreKind>,
254 ) -> Result<Vec<EntryDetails>, Status> {
255 let store = self.store.read().await;
256
257 let dataset = match (entry_id, name) {
258 (None, None) => None,
259
260 (Some(entry_id), None) => Some(store.dataset(entry_id)?),
261
262 (None, Some(name)) => Some(store.dataset_by_name(&name)?),
263
264 (Some(entry_id), Some(name)) => {
265 let dataset = store.dataset_by_name(&name)?;
266 if dataset.id() != entry_id {
267 return Err(tonic::Status::not_found(format!(
268 "Dataset with ID {entry_id} not found"
269 )));
270 }
271 Some(dataset)
272 }
273 };
274
275 let dataset_iter = if let Some(dataset) = dataset {
276 itertools::Either::Left(std::iter::once(dataset))
277 } else {
278 itertools::Either::Right(store.iter_datasets())
279 };
280
281 Ok(dataset_iter
282 .filter(|dataset| {
283 store_kind.is_none_or(|store_kind| dataset.store_kind() == store_kind)
284 })
285 .map(Dataset::as_entry_details)
286 .map(Into::into)
287 .collect())
288 }
289
290 async fn find_tables(
291 &self,
292 entry_id: Option<EntryId>,
293 name: Option<String>,
294 ) -> Result<Vec<EntryDetails>, Status> {
295 let store = self.store.read().await;
296
297 let table = match (entry_id, name) {
298 (None, None) => None,
299
300 (Some(entry_id), None) => {
301 let Some(table) = store.table(entry_id) else {
302 return Err(tonic::Status::not_found(format!(
303 "Table with ID {entry_id} not found"
304 )));
305 };
306 Some(table)
307 }
308
309 (None, Some(name)) => {
310 let Some(table) = store.table_by_name(&name) else {
311 return Err(tonic::Status::not_found(format!(
312 "Table with name {name} not found"
313 )));
314 };
315 Some(table)
316 }
317
318 (Some(entry_id), Some(name)) => {
319 let Some(table) = store.table_by_name(&name) else {
320 return Err(tonic::Status::not_found(format!(
321 "Table with name {name} not found"
322 )));
323 };
324 if table.id() != entry_id {
325 return Err(tonic::Status::not_found(format!(
326 "Table with ID {entry_id} not found"
327 )));
328 }
329 Some(table)
330 }
331 };
332
333 let table_iter = if let Some(table) = table {
334 itertools::Either::Left(std::iter::once(table))
335 } else {
336 itertools::Either::Right(store.iter_tables())
337 };
338
339 Ok(table_iter
340 .map(Table::as_entry_details)
341 .map(Into::into)
342 .collect())
343 }
344}
345
346#[tonic::async_trait]
347impl RerunCloudService for RerunCloudHandler {
348 async fn version(
349 &self,
350 request: tonic::Request<re_protos::cloud::v1alpha1::VersionRequest>,
351 ) -> std::result::Result<
352 tonic::Response<re_protos::cloud::v1alpha1::VersionResponse>,
353 tonic::Status,
354 > {
355 let re_protos::cloud::v1alpha1::VersionRequest {} = request.into_inner();
356
357 let build_info = re_build_info::build_info!();
359
360 Ok(tonic::Response::new(
361 re_protos::cloud::v1alpha1::VersionResponse {
362 build_info: Some(build_info.into()),
363 },
364 ))
365 }
366
367 async fn find_entries(
370 &self,
371 request: tonic::Request<re_protos::cloud::v1alpha1::FindEntriesRequest>,
372 ) -> Result<tonic::Response<re_protos::cloud::v1alpha1::FindEntriesResponse>, tonic::Status>
373 {
374 let filter = request.into_inner().filter;
375 let entry_id = filter
376 .as_ref()
377 .and_then(|filter| filter.id)
378 .map(TryInto::try_into)
379 .transpose()?;
380 let name = filter.as_ref().and_then(|filter| filter.name.clone());
381 let kind = filter
382 .and_then(|filter| filter.entry_kind)
383 .map(EntryKind::try_from)
384 .transpose()
385 .map_err(|err| {
386 Status::invalid_argument(format!("find_entries: invalid entry kind {err}"))
387 })?;
388
389 let entries = match kind {
390 Some(EntryKind::Dataset) => {
391 self.find_datasets(entry_id, name, Some(StoreKind::Recording))
392 .await?
393 }
394
395 Some(EntryKind::BlueprintDataset) => {
396 self.find_datasets(entry_id, name, Some(StoreKind::Blueprint))
397 .await?
398 }
399
400 Some(EntryKind::Table) => self.find_tables(entry_id, name).await?,
401
402 Some(EntryKind::DatasetView | EntryKind::TableView) => {
403 return Err(Status::unimplemented(
404 "find_entries: dataset and table views are not supported",
405 ));
406 }
407
408 Some(EntryKind::Unspecified) => {
409 return Err(Status::invalid_argument(
410 "find_entries: entry kind unspecified",
411 ));
412 }
413
414 None => {
415 let mut datasets = match self.find_datasets(entry_id, name.clone(), None).await {
416 Ok(datasets) => datasets,
417 Err(err) => {
418 if err.code() == Code::NotFound {
419 vec![]
420 } else {
421 return Err(err);
422 }
423 }
424 };
425 let tables = match self.find_tables(entry_id, name).await {
426 Ok(tables) => tables,
427 Err(err) => {
428 if err.code() == Code::NotFound {
429 vec![]
430 } else {
431 return Err(err);
432 }
433 }
434 };
435 datasets.extend(tables);
436 datasets
437 }
438 };
439
440 let response = re_protos::cloud::v1alpha1::FindEntriesResponse { entries };
441
442 Ok(tonic::Response::new(response))
443 }
444
445 async fn create_dataset_entry(
446 &self,
447 request: tonic::Request<re_protos::cloud::v1alpha1::CreateDatasetEntryRequest>,
448 ) -> Result<
449 tonic::Response<re_protos::cloud::v1alpha1::CreateDatasetEntryResponse>,
450 tonic::Status,
451 > {
452 let CreateDatasetEntryRequest {
453 name: dataset_name,
454 id: dataset_id,
455 } = request.into_inner().try_into()?;
456
457 let mut store = self.store.write().await;
458
459 let dataset_id = dataset_id.unwrap_or_else(EntryId::new);
460 let blueprint_dataset_id = EntryId::new();
461 let blueprint_dataset_name = format!("__bp_{dataset_id}");
462
463 store.create_dataset(
464 &blueprint_dataset_name,
465 Some(blueprint_dataset_id),
466 StoreKind::Blueprint,
467 None,
468 )?;
469
470 let dataset_details = DatasetDetails {
471 blueprint_dataset: Some(blueprint_dataset_id),
472 default_blueprint: None,
473 };
474
475 let dataset = store.create_dataset(
476 &dataset_name,
477 Some(dataset_id),
478 StoreKind::Recording,
479 Some(dataset_details),
480 )?;
481
482 let dataset_entry = dataset.as_dataset_entry();
483
484 Ok(tonic::Response::new(
485 CreateDatasetEntryResponse {
486 dataset: dataset_entry,
487 }
488 .into(),
489 ))
490 }
491
492 async fn read_dataset_entry(
493 &self,
494 request: tonic::Request<re_protos::cloud::v1alpha1::ReadDatasetEntryRequest>,
495 ) -> Result<tonic::Response<re_protos::cloud::v1alpha1::ReadDatasetEntryResponse>, tonic::Status>
496 {
497 let store = self.store.read().await;
498 let entry_id = get_entry_id_from_headers(&store, &request)?;
499 let dataset = store.dataset(entry_id)?;
500
501 Ok(tonic::Response::new(
502 ReadDatasetEntryResponse {
503 dataset_entry: dataset.as_dataset_entry(),
504 }
505 .into(),
506 ))
507 }
508
509 async fn update_dataset_entry(
510 &self,
511 request: tonic::Request<re_protos::cloud::v1alpha1::UpdateDatasetEntryRequest>,
512 ) -> Result<
513 tonic::Response<re_protos::cloud::v1alpha1::UpdateDatasetEntryResponse>,
514 tonic::Status,
515 > {
516 let request: UpdateDatasetEntryRequest = request.into_inner().try_into()?;
517
518 let mut store = self.store.write().await;
519 let dataset = store.dataset_mut(request.id)?;
520
521 dataset.set_dataset_details(request.dataset_details);
522
523 Ok(tonic::Response::new(
524 UpdateDatasetEntryResponse {
525 dataset_entry: dataset.as_dataset_entry(),
526 }
527 .into(),
528 ))
529 }
530
531 async fn read_table_entry(
532 &self,
533 request: tonic::Request<re_protos::cloud::v1alpha1::ReadTableEntryRequest>,
534 ) -> std::result::Result<
535 tonic::Response<re_protos::cloud::v1alpha1::ReadTableEntryResponse>,
536 tonic::Status,
537 > {
538 let store = self.store.read().await;
539
540 let id = request
541 .into_inner()
542 .id
543 .ok_or(Status::invalid_argument("No table entry ID provided"))?
544 .try_into()?;
545
546 let table = store.table(id).ok_or_else(|| {
547 tonic::Status::not_found(format!("table with entry ID '{id}' not found"))
548 })?;
549
550 Ok(tonic::Response::new(
551 ReadTableEntryResponse {
552 table_entry: table.as_table_entry(),
553 }
554 .try_into()?,
555 ))
556 }
557
558 async fn delete_entry(
559 &self,
560 request: tonic::Request<re_protos::cloud::v1alpha1::DeleteEntryRequest>,
561 ) -> Result<tonic::Response<re_protos::cloud::v1alpha1::DeleteEntryResponse>, tonic::Status>
562 {
563 let entry_id = request.into_inner().try_into()?;
564
565 self.store.write().await.delete_entry(entry_id)?;
566
567 Ok(tonic::Response::new(DeleteEntryResponse {}))
568 }
569
570 async fn update_entry(
571 &self,
572 request: tonic::Request<re_protos::cloud::v1alpha1::UpdateEntryRequest>,
573 ) -> Result<tonic::Response<re_protos::cloud::v1alpha1::UpdateEntryResponse>, tonic::Status>
574 {
575 let UpdateEntryRequest {
576 id: entry_id,
577 entry_details_update: EntryDetailsUpdate { name },
578 } = request.into_inner().try_into()?;
579
580 let mut store = self.store.write().await;
581
582 if let Some(name) = name {
583 store.rename_entry(entry_id, name)?;
584 }
585
586 Ok(tonic::Response::new(
587 UpdateEntryResponse {
588 entry_details: store.entry_details(entry_id)?,
589 }
590 .into(),
591 ))
592 }
593
594 async fn register_with_dataset(
597 &self,
598 request: tonic::Request<re_protos::cloud::v1alpha1::RegisterWithDatasetRequest>,
599 ) -> Result<
600 tonic::Response<re_protos::cloud::v1alpha1::RegisterWithDatasetResponse>,
601 tonic::Status,
602 > {
603 let mut store = self.store.write().await;
604 let dataset_id = get_entry_id_from_headers(&store, &request)?;
605 let dataset = store.dataset_mut(dataset_id)?;
606
607 let re_protos::cloud::v1alpha1::ext::RegisterWithDatasetRequest {
608 data_sources,
609 on_duplicate,
610 } = request.into_inner().try_into()?;
611
612 let mut partition_ids: Vec<String> = vec![];
613 let mut partition_layers: Vec<String> = vec![];
614 let mut partition_types: Vec<String> = vec![];
615 let mut storage_urls: Vec<String> = vec![];
616 let mut task_ids: Vec<String> = vec![];
617
618 let data_sources = Self::resolve_data_sources(&data_sources)?;
619
620 for source in data_sources {
621 let ext::DataSource {
622 storage_url,
623 is_prefix,
624 layer,
625 kind,
626 } = source;
627
628 if is_prefix {
629 return Err(tonic::Status::internal(
630 "register_with_dataset: prefix data sources should have been resolved already",
631 ));
632 }
633
634 if kind != ext::DataSourceKind::Rrd {
635 return Err(tonic::Status::unimplemented(
636 "register_with_dataset: only RRD data sources are implemented",
637 ));
638 }
639
640 if let Ok(rrd_path) = storage_url.to_file_path() {
641 let new_partition_ids = dataset.load_rrd(
642 &rrd_path,
643 Some(&layer),
644 on_duplicate,
645 dataset.store_kind(),
646 )?;
647
648 for partition_id in new_partition_ids {
649 partition_ids.push(partition_id.to_string());
650 partition_layers.push(layer.clone());
651 partition_types.push("rrd".to_owned());
652 storage_urls.push(storage_url.to_string());
654 task_ids.push(DUMMY_TASK_ID.to_owned());
655 }
656 }
657 }
658
659 let record_batch = RegisterWithDatasetResponse::create_dataframe(
660 partition_ids,
661 partition_layers,
662 partition_types,
663 storage_urls,
664 task_ids,
665 )
666 .map_err(|err| tonic::Status::internal(format!("Failed to create dataframe: {err:#}")))?;
667 Ok(tonic::Response::new(
668 re_protos::cloud::v1alpha1::RegisterWithDatasetResponse {
669 data: Some(record_batch.into()),
670 },
671 ))
672 }
673
674 async fn write_chunks(
677 &self,
678 request: tonic::Request<tonic::Streaming<re_protos::cloud::v1alpha1::WriteChunksRequest>>,
679 ) -> Result<tonic::Response<re_protos::cloud::v1alpha1::WriteChunksResponse>, tonic::Status>
680 {
681 let entry_id = get_entry_id_from_headers(&*self.store.read().await, &request)?;
682
683 let mut request = request.into_inner();
684
685 let mut chunk_stores = HashMap::default();
686
687 while let Some(chunk_msg) = request.next().await {
688 let chunk_msg = chunk_msg?;
689
690 let chunk_batch: RecordBatch = chunk_msg
691 .chunk
692 .ok_or_else(|| tonic::Status::invalid_argument("no chunk in WriteChunksRequest"))?
693 .try_into()
694 .map_err(|err| {
695 tonic::Status::internal(format!("Could not decode chunk: {err:#}"))
696 })?;
697
698 let partition_id: PartitionId = chunk_batch
699 .schema()
700 .metadata()
701 .get("rerun:partition_id")
702 .ok_or_else(|| {
703 tonic::Status::invalid_argument(
704 "Received chunk without 'rerun.partition_id' metadata",
705 )
706 })?
707 .clone()
708 .into();
709
710 let chunk = Arc::new(Chunk::from_record_batch(&chunk_batch).map_err(|err| {
711 tonic::Status::internal(format!("error decoding chunk from record batch: {err:#}"))
712 })?);
713
714 chunk_stores
715 .entry(partition_id.clone())
716 .or_insert_with(|| {
717 ChunkStore::new(
718 StoreId::new(StoreKind::Recording, entry_id.to_string(), partition_id.id),
719 InMemoryStore::chunk_store_config(),
720 )
721 })
722 .insert_chunk(&chunk)
723 .map_err(|err| {
724 tonic::Status::internal(format!("error adding chunk to store: {err:#}"))
725 })?;
726 }
727
728 let mut store = self.store.write().await;
729 let dataset = store.dataset_mut(entry_id)?;
730
731 #[expect(clippy::iter_over_hash_type)]
732 for (entity_path, chunk_store) in chunk_stores {
733 dataset.add_layer(
734 entity_path,
735 DataSource::DEFAULT_LAYER.to_owned(),
736 ChunkStoreHandle::new(chunk_store),
737 IfDuplicateBehavior::Error,
738 )?;
739 }
740
741 Ok(tonic::Response::new(
742 re_protos::cloud::v1alpha1::WriteChunksResponse {},
743 ))
744 }
745
746 async fn write_table(
747 &self,
748 request: tonic::Request<tonic::Streaming<re_protos::cloud::v1alpha1::WriteTableRequest>>,
749 ) -> Result<tonic::Response<re_protos::cloud::v1alpha1::WriteTableResponse>, tonic::Status>
750 {
751 let entry_id = {
754 let store = self.store.read().await;
755 get_entry_id_from_headers(&store, &request)?
756 };
757
758 let mut request = request.into_inner();
759
760 while let Some(write_msg) = request.next().await {
761 let write_msg = write_msg?;
762
763 let rb = write_msg
764 .dataframe_part
765 .ok_or_else(|| {
766 tonic::Status::invalid_argument("no data frame in WriteTableRequest")
767 })?
768 .try_into()
769 .map_err(|err| {
770 tonic::Status::internal(format!("Could not decode chunk: {err:#}"))
771 })?;
772
773 let mut store = self.store.write().await;
774 let Some(table) = store.table_mut(entry_id) else {
775 return Err(tonic::Status::not_found("table not found"));
776 };
777 let insert_op = match TableInsertMode::try_from(write_msg.insert_mode)
778 .map_err(|err| Status::invalid_argument(err.to_string()))?
779 {
780 TableInsertMode::Append => InsertOp::Append,
781 TableInsertMode::Overwrite => InsertOp::Overwrite,
782 };
783
784 table.write_table(rb, insert_op).await.map_err(|err| {
785 tonic::Status::internal(format!("error writing to table: {err:#}"))
786 })?;
787 }
788
789 Ok(tonic::Response::new(
790 re_protos::cloud::v1alpha1::WriteTableResponse {},
791 ))
792 }
793
794 async fn get_partition_table_schema(
797 &self,
798 request: tonic::Request<re_protos::cloud::v1alpha1::GetPartitionTableSchemaRequest>,
799 ) -> std::result::Result<
800 tonic::Response<re_protos::cloud::v1alpha1::GetPartitionTableSchemaResponse>,
801 tonic::Status,
802 > {
803 let store = self.store.read().await;
804
805 let entry_id = get_entry_id_from_headers(&store, &request)?;
806 let dataset = store.dataset(entry_id)?;
807 let record_batch = dataset.partition_table().map_err(|err| {
808 tonic::Status::internal(format!("Unable to read partition table: {err:#}"))
809 })?;
810
811 Ok(tonic::Response::new(GetPartitionTableSchemaResponse {
812 schema: Some(
813 record_batch
814 .schema_ref()
815 .as_ref()
816 .try_into()
817 .map_err(|err| {
818 tonic::Status::internal(format!(
819 "unable to serialize Arrow schema: {err:#}"
820 ))
821 })?,
822 ),
823 }))
824 }
825
826 type ScanPartitionTableStream = ScanPartitionTableResponseStream;
827
828 async fn scan_partition_table(
829 &self,
830 request: tonic::Request<re_protos::cloud::v1alpha1::ScanPartitionTableRequest>,
831 ) -> Result<tonic::Response<Self::ScanPartitionTableStream>, tonic::Status> {
832 let store = self.store.read().await;
833 let entry_id = get_entry_id_from_headers(&store, &request)?;
834
835 let request = request.into_inner();
836
837 let dataset = store.dataset(entry_id)?;
838 let mut record_batch = dataset.partition_table().map_err(|err| {
839 tonic::Status::internal(format!("Unable to read partition table: {err:#}"))
840 })?;
841
842 if !request.columns.is_empty() {
844 record_batch = record_batch
845 .project_columns(request.columns.iter().map(|s| s.as_str()))
846 .map_err(|err| {
847 tonic::Status::invalid_argument(format!("Unable to project columns: {err:#}"))
848 })?;
849 }
850
851 let stream = futures::stream::once(async move {
852 Ok(ScanPartitionTableResponse {
853 data: Some(record_batch.into()),
854 })
855 });
856
857 Ok(tonic::Response::new(
858 Box::pin(stream) as Self::ScanPartitionTableStream
859 ))
860 }
861
862 async fn get_dataset_manifest_schema(
863 &self,
864 request: Request<GetDatasetManifestSchemaRequest>,
865 ) -> Result<Response<GetDatasetManifestSchemaResponse>, Status> {
866 let store = self.store.read().await;
867
868 let entry_id = get_entry_id_from_headers(&store, &request)?;
869 let dataset = store.dataset(entry_id)?;
870 let record_batch = dataset.dataset_manifest()?;
871
872 Ok(tonic::Response::new(GetDatasetManifestSchemaResponse {
873 schema: Some(
874 record_batch
875 .schema_ref()
876 .as_ref()
877 .try_into()
878 .map_err(|err| {
879 tonic::Status::internal(format!(
880 "unable to serialize Arrow schema: {err:#}"
881 ))
882 })?,
883 ),
884 }))
885 }
886
887 type ScanDatasetManifestStream = ScanDatasetManifestResponseStream;
888
889 async fn scan_dataset_manifest(
890 &self,
891 request: Request<ScanDatasetManifestRequest>,
892 ) -> Result<Response<Self::ScanDatasetManifestStream>, Status> {
893 let store = self.store.read().await;
894 let entry_id = get_entry_id_from_headers(&store, &request)?;
895
896 let request = request.into_inner();
897
898 let dataset = store.dataset(entry_id)?;
899 let mut record_batch = dataset.dataset_manifest()?;
900
901 if !request.columns.is_empty() {
903 record_batch = record_batch
904 .project_columns(request.columns.iter().map(|s| s.as_str()))
905 .map_err(|err| {
906 tonic::Status::invalid_argument(format!("Unable to project columns: {err:#}"))
907 })?;
908 }
909
910 let stream = futures::stream::once(async move {
911 Ok(ScanDatasetManifestResponse {
912 data: Some(record_batch.into()),
913 })
914 });
915
916 Ok(tonic::Response::new(
917 Box::pin(stream) as Self::ScanDatasetManifestStream
918 ))
919 }
920
921 async fn get_dataset_schema(
922 &self,
923 request: tonic::Request<re_protos::cloud::v1alpha1::GetDatasetSchemaRequest>,
924 ) -> std::result::Result<
925 tonic::Response<re_protos::cloud::v1alpha1::GetDatasetSchemaResponse>,
926 tonic::Status,
927 > {
928 let store = self.store.read().await;
929 let entry_id = get_entry_id_from_headers(&store, &request)?;
930
931 let dataset = store.dataset(entry_id)?;
932 let schema = dataset.schema().map_err(|err| {
933 tonic::Status::internal(format!("Unable to read dataset schema: {err:#}"))
934 })?;
935
936 Ok(tonic::Response::new(GetDatasetSchemaResponse {
937 schema: Some((&schema).try_into().map_err(|err| {
938 tonic::Status::internal(format!("Unable to serialize Arrow schema: {err:#}"))
939 })?),
940 }))
941 }
942
943 async fn create_index(
946 &self,
947 _request: tonic::Request<re_protos::cloud::v1alpha1::CreateIndexRequest>,
948 ) -> std::result::Result<
949 tonic::Response<re_protos::cloud::v1alpha1::CreateIndexResponse>,
950 tonic::Status,
951 > {
952 Err(tonic::Status::unimplemented("create_index not implemented"))
953 }
954
955 async fn list_indexes(
956 &self,
957 _request: tonic::Request<re_protos::cloud::v1alpha1::ListIndexesRequest>,
958 ) -> std::result::Result<
959 tonic::Response<re_protos::cloud::v1alpha1::ListIndexesResponse>,
960 tonic::Status,
961 > {
962 Err(tonic::Status::unimplemented("list_indexes not implemented"))
963 }
964
965 async fn delete_indexes(
966 &self,
967 _request: tonic::Request<re_protos::cloud::v1alpha1::DeleteIndexesRequest>,
968 ) -> std::result::Result<
969 tonic::Response<re_protos::cloud::v1alpha1::DeleteIndexesResponse>,
970 tonic::Status,
971 > {
972 Err(tonic::Status::unimplemented(
973 "delete_indexes not implemented",
974 ))
975 }
976
977 type SearchDatasetStream = SearchDatasetResponseStream;
980
981 async fn search_dataset(
982 &self,
983 _request: tonic::Request<re_protos::cloud::v1alpha1::SearchDatasetRequest>,
984 ) -> std::result::Result<tonic::Response<Self::SearchDatasetStream>, tonic::Status> {
985 Err(tonic::Status::unimplemented(
986 "search_dataset not implemented",
987 ))
988 }
989
990 type QueryDatasetStream = QueryDatasetResponseStream;
991
992 async fn query_dataset(
993 &self,
994 request: tonic::Request<re_protos::cloud::v1alpha1::QueryDatasetRequest>,
995 ) -> std::result::Result<tonic::Response<Self::QueryDatasetStream>, tonic::Status> {
996 if !request.get_ref().chunk_ids.is_empty() {
997 return Err(tonic::Status::unimplemented(
998 "query_dataset: querying specific chunk ids is not implemented",
999 ));
1000 }
1001
1002 let entry_id = get_entry_id_from_headers(&*self.store.read().await, &request)?;
1003
1004 let re_protos::cloud::v1alpha1::QueryDatasetRequest {
1005 partition_ids,
1006 entity_paths,
1007 select_all_entity_paths,
1008
1009 chunk_ids: _,
1011 fuzzy_descriptors: _,
1012 exclude_static_data: _,
1013 exclude_temporal_data: _,
1014 scan_parameters: _,
1015 query: _,
1016 } = request.into_inner();
1017
1018 let entity_paths: IntSet<EntityPath> = entity_paths
1019 .into_iter()
1020 .map(EntityPath::try_from)
1021 .collect::<Result<IntSet<EntityPath>, _>>()?;
1022 if select_all_entity_paths && !entity_paths.is_empty() {
1023 return Err(tonic::Status::invalid_argument(
1024 "cannot specify entity paths if `select_all_entity_paths` is true",
1025 ));
1026 }
1027
1028 let partition_ids = partition_ids
1029 .into_iter()
1030 .map(PartitionId::try_from)
1031 .collect::<Result<Vec<_>, _>>()?;
1032
1033 let chunk_stores = self.get_chunk_stores(entry_id, &partition_ids).await?;
1034
1035 if chunk_stores.is_empty() {
1036 let stream = futures::stream::iter([{
1037 let batch = QueryDatasetResponse::create_empty_dataframe();
1038 let data = Some(batch.into());
1039 Ok(QueryDatasetResponse { data })
1040 }]);
1041
1042 return Ok(tonic::Response::new(
1043 Box::pin(stream) as Self::QueryDatasetStream
1044 ));
1045 }
1046
1047 let stream = futures::stream::iter(chunk_stores.into_iter().map(
1048 move |(partition_id, layer_name, store_handle)| {
1049 let num_chunks = store_handle.read().num_chunks();
1050
1051 let mut chunk_ids = Vec::with_capacity(num_chunks);
1052 let mut chunk_partition_ids = Vec::with_capacity(num_chunks);
1053 let mut chunk_keys = Vec::with_capacity(num_chunks);
1054 let mut chunk_entity_path = Vec::with_capacity(num_chunks);
1055 let mut chunk_is_static = Vec::with_capacity(num_chunks);
1056
1057 let mut timelines = BTreeMap::new();
1058
1059 for chunk in store_handle.read().iter_chunks() {
1060 if !entity_paths.is_empty() && !entity_paths.contains(chunk.entity_path()) {
1061 continue;
1062 }
1063
1064 let mut missing_timelines: BTreeSet<_> = timelines.keys().copied().collect();
1065 for (timeline_name, timeline_col) in chunk.timelines() {
1066 let range = timeline_col.time_range();
1067 let time_min = range.min();
1068 let time_max = range.max();
1069
1070 let timeline_name = timeline_name.as_str();
1071 missing_timelines.remove(timeline_name);
1072 let timeline_data_type = timeline_col.times_array().data_type().to_owned();
1073
1074 let timeline_data = timelines.entry(timeline_name).or_insert((
1075 timeline_data_type,
1076 vec![None; chunk_partition_ids.len()],
1077 vec![None; chunk_partition_ids.len()],
1078 ));
1079
1080 timeline_data.1.push(Some(time_min.as_i64()));
1081 timeline_data.2.push(Some(time_max.as_i64()));
1082 }
1083 for timeline_name in missing_timelines {
1084 let timeline_data = timelines
1085 .get_mut(timeline_name)
1086 .expect("timeline_names already checked"); timeline_data.1.push(None);
1089 timeline_data.2.push(None);
1090 }
1091
1092 chunk_partition_ids.push(partition_id.id.clone());
1093 chunk_ids.push(chunk.id());
1094 chunk_entity_path.push(chunk.entity_path().to_string());
1095 chunk_is_static.push(chunk.is_static());
1096 chunk_keys.push(
1097 ChunkKey {
1098 chunk_id: chunk.id(),
1099 partition_id: partition_id.clone(),
1100 layer_name: layer_name.clone(),
1101 dataset_id: entry_id,
1102 }
1103 .encode()?,
1104 );
1105 }
1106
1107 let chunk_layer_names = vec![layer_name.clone(); chunk_ids.len()];
1108 let chunk_key_refs = chunk_keys.iter().map(|v| v.as_slice()).collect();
1109 let batch = QueryDatasetResponse::create_dataframe(
1110 chunk_ids,
1111 chunk_partition_ids,
1112 chunk_layer_names,
1113 chunk_key_refs,
1114 chunk_entity_path,
1115 chunk_is_static,
1116 )
1117 .map_err(|err| {
1118 tonic::Status::internal(format!("Failed to create dataframe: {err:#}"))
1119 })?;
1120
1121 let data = Some(batch.into());
1122
1123 Ok(QueryDatasetResponse { data })
1124 },
1125 ));
1126
1127 Ok(tonic::Response::new(
1128 Box::pin(stream) as Self::QueryDatasetStream
1129 ))
1130 }
1131
1132 type FetchChunksStream = FetchChunksResponseStream;
1133
1134 async fn fetch_chunks(
1135 &self,
1136 request: tonic::Request<re_protos::cloud::v1alpha1::FetchChunksRequest>,
1137 ) -> std::result::Result<tonic::Response<Self::FetchChunksStream>, tonic::Status> {
1138 let request = request.into_inner();
1140
1141 let mut chunk_keys = vec![];
1142 for chunk_info_data in request.chunk_infos {
1143 let chunk_info_batch: RecordBatch = chunk_info_data.try_into().map_err(|err| {
1144 tonic::Status::internal(format!("Failed to decode chunk_info: {err:#}"))
1145 })?;
1146
1147 let schema = chunk_info_batch.schema();
1148
1149 let chunk_key_col_idx = schema
1150 .column_with_name(FetchChunksRequest::FIELD_CHUNK_KEY)
1151 .ok_or_else(|| {
1152 tonic::Status::invalid_argument(format!(
1153 "Missing {} column",
1154 FetchChunksRequest::FIELD_CHUNK_KEY
1155 ))
1156 })?
1157 .0;
1158
1159 let chunk_keys_arr = chunk_info_batch
1160 .column(chunk_key_col_idx)
1161 .as_any()
1162 .downcast_ref::<BinaryArray>()
1163 .ok_or_else(|| {
1164 tonic::Status::invalid_argument(format!(
1165 "{} must be binary array",
1166 FetchChunksRequest::FIELD_CHUNK_KEY
1167 ))
1168 })?;
1169
1170 for chunk_key in chunk_keys_arr {
1171 let chunk_key = chunk_key.ok_or_else(|| {
1172 tonic::Status::invalid_argument(format!(
1173 "{} must not be null",
1174 FetchChunksRequest::FIELD_CHUNK_KEY
1175 ))
1176 })?;
1177
1178 let chunk_key = ChunkKey::decode(chunk_key)?;
1179 chunk_keys.push(chunk_key);
1180 }
1181 }
1182
1183 let chunks = self
1184 .store
1185 .read()
1186 .await
1187 .chunks_from_chunk_keys(&chunk_keys)?;
1188
1189 let compression = re_log_encoding::Compression::Off;
1190
1191 let encoded_chunks = chunks
1192 .into_iter()
1193 .map(|(store_id, chunk)| {
1194 let arrow_msg = re_log_types::ArrowMsg {
1195 chunk_id: *chunk.id(),
1196 batch: chunk.to_record_batch().map_err(|err| {
1197 tonic::Status::internal(format!(
1198 "failed to convert chunk to record batch: {err:#}"
1199 ))
1200 })?,
1201 on_release: None,
1202 };
1203
1204 arrow_msg
1205 .to_transport((store_id, compression))
1206 .map_err(|err| tonic::Status::internal(format!("encoding failed: {err:#}")))
1207 })
1208 .collect::<Result<Vec<_>, _>>()?;
1209
1210 let response = re_protos::cloud::v1alpha1::FetchChunksResponse {
1211 chunks: encoded_chunks,
1212 };
1213
1214 let stream = futures::stream::once(async move { Ok(response) });
1215
1216 Ok(tonic::Response::new(
1217 Box::pin(stream) as Self::FetchChunksStream
1218 ))
1219 }
1220
1221 async fn register_table(
1224 &self,
1225 request: tonic::Request<RegisterTableRequest>,
1226 ) -> Result<tonic::Response<RegisterTableResponse>, tonic::Status> {
1227 #[cfg_attr(not(feature = "lance"), expect(unused_mut))]
1228 let mut store = self.store.write().await;
1229 let request = request.into_inner();
1230 let Some(provider_details) = request.provider_details else {
1231 return Err(tonic::Status::invalid_argument("Missing provider details"));
1232 };
1233 #[cfg_attr(not(feature = "lance"), expect(unused_variables))]
1234 let lance_table = match ProviderDetails::try_from(&provider_details) {
1235 Ok(ProviderDetails::LanceTable(lance_table)) => lance_table.table_url,
1236 Ok(ProviderDetails::SystemTable(_)) => Err(Status::invalid_argument(
1237 "System tables cannot be registered",
1238 ))?,
1239 Err(err) => return Err(err.into()),
1240 }
1241 .to_file_path()
1242 .map_err(|()| tonic::Status::invalid_argument("Invalid lance table path"))?;
1243
1244 #[cfg(feature = "lance")]
1245 let entry_id = {
1246 let named_path = NamedPath {
1247 name: Some(request.name.clone()),
1248 path: lance_table,
1249 };
1250
1251 store
1252 .load_directory_as_table(&named_path, IfDuplicateBehavior::Error)
1253 .await?
1254 };
1255
1256 #[cfg(not(feature = "lance"))]
1257 let entry_id = EntryId::new();
1258
1259 let table_entry = store
1260 .table(entry_id)
1261 .ok_or(Status::internal("table missing that was just registered"))?
1262 .as_table_entry();
1263
1264 let response = RegisterTableResponse {
1265 table_entry: Some(table_entry.try_into()?),
1266 };
1267
1268 Ok(response.into())
1269 }
1270
1271 async fn get_table_schema(
1272 &self,
1273 request: tonic::Request<re_protos::cloud::v1alpha1::GetTableSchemaRequest>,
1274 ) -> Result<tonic::Response<re_protos::cloud::v1alpha1::GetTableSchemaResponse>, Status> {
1275 let store = self.store.read().await;
1276 let Some(entry_id) = request.into_inner().table_id else {
1277 return Err(Status::not_found("Table ID not specified in request"));
1278 };
1279 let entry_id = entry_id.try_into()?;
1280
1281 let table = store
1282 .table(entry_id)
1283 .ok_or_else(|| Status::not_found(format!("Entry with ID {entry_id} not found")))?;
1284
1285 let schema = table.schema();
1286
1287 Ok(tonic::Response::new(
1288 re_protos::cloud::v1alpha1::GetTableSchemaResponse {
1289 schema: Some(schema.as_ref().try_into().map_err(|err| {
1290 Status::internal(format!("Unable to serialize Arrow schema: {err:#}"))
1291 })?),
1292 },
1293 ))
1294 }
1295
1296 type ScanTableStream = ScanTableResponseStream;
1297
1298 async fn scan_table(
1299 &self,
1300 request: tonic::Request<re_protos::cloud::v1alpha1::ScanTableRequest>,
1301 ) -> Result<tonic::Response<Self::ScanTableStream>, Status> {
1302 let store = self.store.read().await;
1303 let Some(entry_id) = request.into_inner().table_id else {
1304 return Err(Status::not_found("Table ID not specified in request"));
1305 };
1306 let entry_id = entry_id.try_into()?;
1307
1308 let table = store
1309 .table(entry_id)
1310 .ok_or_else(|| Status::not_found(format!("Entry with ID {entry_id} not found")))?;
1311
1312 let ctx = SessionContext::default();
1313 let plan = table
1314 .provider()
1315 .scan(&ctx.state(), None, &[], None)
1316 .await
1317 .map_err(|err| Status::internal(format!("failed to scan table: {err:#}")))?;
1318
1319 let stream = plan
1320 .execute(0, ctx.task_ctx())
1321 .map_err(|err| tonic::Status::from_error(Box::new(err)))?;
1322
1323 let resp_stream = stream.map(|batch| {
1324 let batch = batch.map_err(|err| tonic::Status::from_error(Box::new(err)))?;
1325 Ok(ScanTableResponse {
1326 dataframe_part: Some(batch.into()),
1327 })
1328 });
1329
1330 Ok(tonic::Response::new(
1331 Box::pin(resp_stream) as Self::ScanTableStream
1332 ))
1333 }
1334
1335 async fn query_tasks(
1338 &self,
1339 request: tonic::Request<QueryTasksRequest>,
1340 ) -> Result<tonic::Response<QueryTasksResponse>, tonic::Status> {
1341 let tasks_id = request.into_inner().ids;
1342
1343 let dummy_task_id = TaskId {
1344 id: DUMMY_TASK_ID.to_owned(),
1345 };
1346
1347 for task_id in &tasks_id {
1348 if task_id != &dummy_task_id {
1349 return Err(tonic::Status::not_found(format!(
1350 "task {} not found",
1351 task_id.id
1352 )));
1353 }
1354 }
1355
1356 let rb = QueryTasksResponse::create_dataframe(
1357 vec![DUMMY_TASK_ID.to_owned()],
1358 vec![None],
1359 vec![None],
1360 vec!["success".to_owned()],
1361 vec![None],
1362 vec![None],
1363 vec![None],
1364 vec![None],
1365 vec![1],
1366 vec![None],
1367 vec![None],
1368 )
1369 .expect("constant content that should always succeed");
1370
1371 Ok(tonic::Response::new(QueryTasksResponse {
1373 data: Some(rb.into()),
1374 }))
1375 }
1376
1377 type QueryTasksOnCompletionStream = QueryTasksOnCompletionResponseStream;
1378
1379 async fn query_tasks_on_completion(
1380 &self,
1381 request: tonic::Request<QueryTasksOnCompletionRequest>,
1382 ) -> Result<tonic::Response<Self::QueryTasksOnCompletionStream>, tonic::Status> {
1383 let task_ids = request.into_inner().ids;
1384
1385 let response_data = self
1387 .query_tasks(tonic::Request::new(QueryTasksRequest { ids: task_ids }))
1388 .await?
1389 .into_inner()
1390 .data;
1391
1392 Ok(tonic::Response::new(
1393 Box::pin(futures::stream::once(async move {
1394 Ok(QueryTasksOnCompletionResponse {
1395 data: response_data,
1396 })
1397 })) as Self::QueryTasksOnCompletionStream,
1398 ))
1399 }
1400
1401 async fn do_maintenance(
1402 &self,
1403 _request: tonic::Request<re_protos::cloud::v1alpha1::DoMaintenanceRequest>,
1404 ) -> Result<tonic::Response<re_protos::cloud::v1alpha1::DoMaintenanceResponse>, tonic::Status>
1405 {
1406 Err(tonic::Status::unimplemented(
1407 "do_maintenance not implemented",
1408 ))
1409 }
1410
1411 async fn do_global_maintenance(
1412 &self,
1413 _request: tonic::Request<re_protos::cloud::v1alpha1::DoGlobalMaintenanceRequest>,
1414 ) -> Result<
1415 tonic::Response<re_protos::cloud::v1alpha1::DoGlobalMaintenanceResponse>,
1416 tonic::Status,
1417 > {
1418 Err(tonic::Status::unimplemented(
1419 "do_global_maintenance not implemented",
1420 ))
1421 }
1422
1423 async fn create_table_entry(
1424 &self,
1425 request: Request<re_protos::cloud::v1alpha1::CreateTableEntryRequest>,
1426 ) -> Result<Response<re_protos::cloud::v1alpha1::CreateTableEntryResponse>, Status> {
1427 let mut store = self.store.write().await;
1428
1429 let request: CreateTableEntryRequest = request.into_inner().try_into()?;
1430 let table_name = &request.name;
1431
1432 let schema = Arc::new(request.schema);
1433
1434 let table = match &request.provider_details {
1435 ProviderDetails::LanceTable(table) => {
1436 store
1437 .create_table_entry(table_name, &table.table_url, schema)
1438 .await?
1439 }
1440 ProviderDetails::SystemTable(_) => {
1441 return Err(tonic::Status::invalid_argument(
1442 "Creating system tables is not supported",
1443 ));
1444 }
1445 };
1446
1447 Ok(Response::new(
1448 CreateTableEntryResponse { table }.try_into()?,
1449 ))
1450 }
1451}
1452
1453fn get_entry_id_from_headers<T>(
1455 store: &InMemoryStore,
1456 req: &tonic::Request<T>,
1457) -> Result<EntryId, tonic::Status> {
1458 if let Some(entry_id) = req.entry_id()? {
1459 Ok(entry_id)
1460 } else if let Some(dataset_name) = req.entry_name()? {
1461 Ok(store.dataset_by_name(&dataset_name)?.id())
1462 } else {
1463 const HEADERS: &[&str] = &[
1464 re_protos::headers::RERUN_HTTP_HEADER_ENTRY_ID,
1465 re_protos::headers::RERUN_HTTP_HEADER_ENTRY_NAME,
1466 ];
1467 Err(tonic::Status::invalid_argument(format!(
1468 "missing mandatory {HEADERS:?} HTTP headers"
1469 )))
1470 }
1471}