re_datafusion/
partition_table.rs1use std::sync::Arc;
2
3use arrow::{array::RecordBatch, datatypes::SchemaRef};
4use async_trait::async_trait;
5use datafusion::{
6 catalog::TableProvider,
7 error::{DataFusionError, Result as DataFusionResult},
8};
9use tracing::instrument;
10
11use re_log_encoding::codec::wire::decoder::Decode as _;
12use re_log_types::EntryId;
13use re_protos::{
14 cloud::v1alpha1::{ScanPartitionTableRequest, ScanPartitionTableResponse},
15 headers::RerunHeadersInjectorExt as _,
16};
17use re_redap_client::ConnectionClient;
18
19use crate::grpc_streaming_provider::{GrpcStreamProvider, GrpcStreamToTable};
20use crate::wasm_compat::make_future_send;
21
22#[derive(Clone)]
24pub struct PartitionTableProvider {
25 client: ConnectionClient,
26 dataset_id: EntryId,
27}
28
29impl std::fmt::Debug for PartitionTableProvider {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 f.debug_struct("PartitionTableProvider")
32 .field("dataset_id", &self.dataset_id)
33 .finish()
34 }
35}
36
37impl PartitionTableProvider {
38 pub fn new(client: ConnectionClient, dataset_id: EntryId) -> Self {
39 Self { client, dataset_id }
40 }
41
42 pub async fn into_provider(self) -> DataFusionResult<Arc<dyn TableProvider>> {
44 Ok(GrpcStreamProvider::prepare(self).await?)
45 }
46}
47
48#[async_trait]
49impl GrpcStreamToTable for PartitionTableProvider {
50 type GrpcStreamData = ScanPartitionTableResponse;
51
52 #[instrument(skip(self), err)]
53 async fn fetch_schema(&mut self) -> DataFusionResult<SchemaRef> {
54 let mut client = self.client.clone();
55
56 let dataset_id = self.dataset_id;
57
58 Ok(Arc::new(
59 make_future_send(async move {
60 client
61 .get_partition_table_schema(dataset_id)
62 .await
63 .map_err(|err| {
64 DataFusionError::External(
65 format!("Couldn't get partition table schema: {err}").into(),
66 )
67 })
68 })
69 .await?,
70 ))
71 }
72
73 #[instrument(skip(self), err)]
76 async fn send_streaming_request(
77 &mut self,
78 ) -> DataFusionResult<tonic::Response<tonic::Streaming<Self::GrpcStreamData>>> {
79 let request = tonic::Request::new(ScanPartitionTableRequest {
80 columns: vec![], })
82 .with_entry_id(self.dataset_id)
83 .map_err(|err| DataFusionError::External(Box::new(err)))?;
84
85 let mut client = self.client.clone();
86
87 make_future_send(async move { Ok(client.inner().scan_partition_table(request).await) })
88 .await?
89 .map_err(|err| DataFusionError::External(Box::new(err)))
90 }
91
92 fn process_response(
93 &mut self,
94 response: Self::GrpcStreamData,
95 ) -> DataFusionResult<RecordBatch> {
96 response
97 .data
98 .ok_or(DataFusionError::Execution(
99 "DataFrame missing from PartitionList response".to_owned(),
100 ))?
101 .decode()
102 .map_err(|err| DataFusionError::External(Box::new(err)))
103 }
104}