re_datafusion/
partition_table.rs

1use std::sync::Arc;
2
3use arrow::{array::RecordBatch, datatypes::SchemaRef};
4use async_trait::async_trait;
5use datafusion::{
6    catalog::TableProvider,
7    error::{DataFusionError, Result as DataFusionResult},
8};
9use tracing::instrument;
10
11use re_log_encoding::codec::wire::decoder::Decode as _;
12use re_log_types::EntryId;
13use re_protos::{
14    cloud::v1alpha1::{ScanPartitionTableRequest, ScanPartitionTableResponse},
15    headers::RerunHeadersInjectorExt as _,
16};
17use re_redap_client::ConnectionClient;
18
19use crate::grpc_streaming_provider::{GrpcStreamProvider, GrpcStreamToTable};
20use crate::wasm_compat::make_future_send;
21
22//TODO(ab): deduplicate from DatasetManifestProvider
23#[derive(Clone)]
24pub struct PartitionTableProvider {
25    client: ConnectionClient,
26    dataset_id: EntryId,
27}
28
29impl std::fmt::Debug for PartitionTableProvider {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        f.debug_struct("PartitionTableProvider")
32            .field("dataset_id", &self.dataset_id)
33            .finish()
34    }
35}
36
37impl PartitionTableProvider {
38    pub fn new(client: ConnectionClient, dataset_id: EntryId) -> Self {
39        Self { client, dataset_id }
40    }
41
42    /// This is a convenience function
43    pub async fn into_provider(self) -> DataFusionResult<Arc<dyn TableProvider>> {
44        Ok(GrpcStreamProvider::prepare(self).await?)
45    }
46}
47
48#[async_trait]
49impl GrpcStreamToTable for PartitionTableProvider {
50    type GrpcStreamData = ScanPartitionTableResponse;
51
52    #[instrument(skip(self), err)]
53    async fn fetch_schema(&mut self) -> DataFusionResult<SchemaRef> {
54        let mut client = self.client.clone();
55
56        let dataset_id = self.dataset_id;
57
58        Ok(Arc::new(
59            make_future_send(async move {
60                client
61                    .get_partition_table_schema(dataset_id)
62                    .await
63                    .map_err(|err| {
64                        DataFusionError::External(
65                            format!("Couldn't get partition table schema: {err}").into(),
66                        )
67                    })
68            })
69            .await?,
70        ))
71    }
72
73    // TODO(ab): what `GrpcStreamToTable` attempts to simplify should probably be handled by
74    // `ConnectionClient`
75    #[instrument(skip(self), err)]
76    async fn send_streaming_request(
77        &mut self,
78    ) -> DataFusionResult<tonic::Response<tonic::Streaming<Self::GrpcStreamData>>> {
79        let request = tonic::Request::new(ScanPartitionTableRequest {
80            columns: vec![], // all of them
81        })
82        .with_entry_id(self.dataset_id)
83        .map_err(|err| DataFusionError::External(Box::new(err)))?;
84
85        let mut client = self.client.clone();
86
87        make_future_send(async move { Ok(client.inner().scan_partition_table(request).await) })
88            .await?
89            .map_err(|err| DataFusionError::External(Box::new(err)))
90    }
91
92    fn process_response(
93        &mut self,
94        response: Self::GrpcStreamData,
95    ) -> DataFusionResult<RecordBatch> {
96        response
97            .data
98            .ok_or(DataFusionError::Execution(
99                "DataFrame missing from PartitionList response".to_owned(),
100            ))?
101            .decode()
102            .map_err(|err| DataFusionError::External(Box::new(err)))
103    }
104}