use std::sync::Arc;
use datafusion::catalog::Session;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use futures::future::BoxFuture;
use super::CustomExecuteHandler;
use crate::DeltaTable;
use crate::delta_datafusion::engine::AsObjectStoreUrl as _;
use crate::delta_datafusion::{DataFusionMixins as _, create_session};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::transaction::PROTOCOL;
use crate::kernel::{EagerSnapshot, resolve_snapshot};
use crate::logstore::{LogStoreExt, LogStoreRef};
use crate::table::state::DeltaTableState;
#[derive(Clone)]
pub struct LoadBuilder {
snapshot: Option<EagerSnapshot>,
log_store: LogStoreRef,
columns: Option<Vec<String>>,
session: Option<Arc<dyn Session>>,
}
impl std::fmt::Debug for LoadBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LoadBuilder")
.field("snapshot", &self.snapshot)
.field("log_store", &self.log_store)
.finish()
}
}
impl super::Operation for LoadBuilder {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
unimplemented!("Not required in loadBuilder for now.")
}
}
impl LoadBuilder {
pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
Self {
snapshot,
log_store,
columns: None,
session: None,
}
}
pub fn with_columns(mut self, columns: impl IntoIterator<Item = impl Into<String>>) -> Self {
self.columns = Some(columns.into_iter().map(|s| s.into()).collect());
self
}
pub fn with_session_state(mut self, session: Arc<dyn Session>) -> Self {
self.session = Some(session);
self
}
}
impl std::future::IntoFuture for LoadBuilder {
type Output = DeltaResult<(DeltaTable, SendableRecordBatchStream)>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
let this = self;
Box::pin(async move {
let snapshot = resolve_snapshot(&this.log_store, this.snapshot, true, None).await?;
PROTOCOL.can_read_from(&snapshot)?;
let schema = snapshot.read_schema();
let projection = this
.columns
.map(|cols| {
cols.iter()
.map(|col| {
schema.column_with_name(col).map(|(idx, _)| idx).ok_or(
DeltaTableError::SchemaMismatch {
msg: format!("Column '{col}' does not exist in table schema."),
},
)
})
.collect::<Result<_, _>>()
})
.transpose()?;
let session = if let Some(session) = this.session {
session
} else {
let session = Arc::new(create_session().into_inner().state());
let url = this.log_store.log_root_url();
let store_url = url.as_object_store_url();
if session.runtime_env().object_store(&store_url).is_err() {
session
.runtime_env()
.register_object_store(&url, this.log_store.root_object_store(None));
}
session
};
let table = DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot));
let provider = table.table_provider().await?;
let scan_plan = provider
.scan(session.as_ref(), projection.as_ref(), &[], None)
.await?;
let plan = CoalescePartitionsExec::new(scan_plan);
let stream = plan.execute(0, session.task_ctx())?;
Ok((table, stream))
})
}
}
#[cfg(test)]
mod tests {
use crate::operations::collect_sendable_stream;
use crate::writer::test_utils::{TestResult, get_record_batch};
use crate::{DeltaTable, DeltaTableBuilder};
use datafusion::assert_batches_sorted_eq;
use datafusion::common::DFSchema;
use std::path::Path;
use url::Url;
#[tokio::test]
async fn test_load_local() -> TestResult {
let table_path = Path::new("../test/tests/data/delta-0.8.0");
let table_uri =
Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
let table = DeltaTableBuilder::from_url(table_uri)?
.load()
.await
.unwrap();
let (_table, stream) = table.scan_table().await?;
let data = collect_sendable_stream(stream).await?;
let expected = vec![
"+-------+",
"| value |",
"+-------+",
"| 0 |",
"| 1 |",
"| 2 |",
"| 4 |",
"+-------+",
];
assert_batches_sorted_eq!(&expected, &data);
Ok(())
}
#[tokio::test]
async fn test_write_load() -> TestResult {
let batch = get_record_batch(None, false);
let table = DeltaTable::new_in_memory()
.write(vec![batch.clone()])
.await?;
let (_table, stream) = table.scan_table().await?;
let data = collect_sendable_stream(stream).await?;
let expected = vec![
"+----+-------+------------+",
"| id | value | modified |",
"+----+-------+------------+",
"| A | 1 | 2021-02-02 |",
"| B | 2 | 2021-02-02 |",
"| A | 3 | 2021-02-02 |",
"| B | 4 | 2021-02-01 |",
"| A | 5 | 2021-02-01 |",
"| A | 6 | 2021-02-01 |",
"| A | 7 | 2021-02-01 |",
"| B | 8 | 2021-02-01 |",
"| B | 9 | 2021-02-01 |",
"| A | 10 | 2021-02-01 |",
"| A | 11 | 2021-02-01 |",
"+----+-------+------------+",
];
assert_batches_sorted_eq!(&expected, &data);
let df_schema = DFSchema::try_from(batch.schema())?;
let data_df_schema = DFSchema::try_from(data[0].schema())?;
assert!(df_schema.logically_equivalent_names_and_types(&data_df_schema));
Ok(())
}
#[tokio::test]
async fn test_load_with_columns() -> TestResult {
let batch = get_record_batch(None, false);
let table = DeltaTable::new_in_memory()
.write(vec![batch.clone()])
.await?;
let (_table, stream) = table.scan_table().with_columns(["id", "value"]).await?;
let data = collect_sendable_stream(stream).await?;
let expected = vec![
"+----+-------+",
"| id | value |",
"+----+-------+",
"| A | 1 |",
"| B | 2 |",
"| A | 3 |",
"| B | 4 |",
"| A | 5 |",
"| A | 6 |",
"| A | 7 |",
"| B | 8 |",
"| B | 9 |",
"| A | 10 |",
"| A | 11 |",
"+----+-------+",
];
assert_batches_sorted_eq!(&expected, &data);
Ok(())
}
}