use bytes::{BufMut, BytesMut};
use futures::StreamExt as _;
use futures::future::BoxFuture;
use std::collections::HashMap;
use std::sync::Arc;
use object_store::ObjectStoreExt as _;
use object_store::path::{Path, PathPart};
use tracing::log::*;
use super::{CustomExecuteHandler, Operation};
use crate::kernel::{EagerSnapshot, resolve_snapshot};
use crate::logstore::LogStoreRef;
use crate::logstore::object_store::PutPayload;
use crate::table::state::DeltaTableState;
use crate::{DeltaResult, DeltaTable, DeltaTableError};
#[derive(Clone)]
pub struct GenerateBuilder {
snapshot: Option<EagerSnapshot>,
log_store: LogStoreRef,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
}
impl GenerateBuilder {
pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
Self {
snapshot,
log_store,
custom_execute_handler: None,
}
}
}
impl super::Operation for GenerateBuilder {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}
impl std::future::IntoFuture for GenerateBuilder {
type Output = DeltaResult<DeltaTable>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
let this = self;
Box::pin(async move {
let snapshot =
resolve_snapshot(this.log_store(), this.snapshot.clone(), true, None).await?;
let mut payloads = HashMap::new();
let manifest_part = PathPart::parse("manifest").expect("This is not possible");
let mut file_stream = snapshot.file_views(&this.log_store, None);
while let Some(add) = file_stream.next().await {
let add = add?;
let path = add.object_store_path();
let output_path = Path::from_iter(
std::iter::once(PathPart::parse("_symlink_format_manifest").map_err(|e| {
DeltaTableError::GenericError {
source: Box::new(e),
}
})?)
.chain(path.parts().filter(|p| path.filename() != Some(p.as_ref())))
.chain(std::iter::once(manifest_part.clone())),
);
trace!("Computed output path for add action: {output_path:?}");
if !payloads.contains_key(&output_path) {
payloads.insert(output_path.clone(), BytesMut::new());
}
if let Some(payload) = payloads.get_mut(&output_path) {
let uri = this.log_store().to_uri(&path);
trace!("Prepare {uri} for the symlink_format_manifest");
payload.put(uri.as_bytes());
payload.put_u8(b'\n');
}
}
debug!("Total of {} manifest files prepared", payloads.len());
for (path, payload) in payloads.drain() {
debug!(
"Generated manifest for {:?} is {} bytes",
path,
payload.len()
);
let payload = PutPayload::from(payload.freeze());
this.log_store()
.object_store(None)
.put(&path, payload)
.await?;
}
Ok(DeltaTable::new_with_state(
this.log_store().clone(),
DeltaTableState::new(snapshot.clone()),
))
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
use crate::DeltaTable;
use crate::kernel::schema::{DataType, PrimitiveType};
use crate::kernel::{Action, Add};
#[tokio::test]
async fn test_generate() -> DeltaResult<()> {
let actions = vec![Action::Add(Add {
path: "some-files.parquet".into(),
..Default::default()
})];
let table = DeltaTable::new_in_memory()
.create()
.with_column("id", DataType::Primitive(PrimitiveType::Long), true, None)
.with_actions(actions)
.await?;
let generate = GenerateBuilder::new(table.log_store(), table.state.map(|s| s.snapshot));
let table = generate.await?;
let store = table.log_store().object_store(None);
let mut stream = store.list(None);
let mut found = false;
while let Some(meta) = stream.next().await.transpose().unwrap() {
println!("Name: {}, size: {}", meta.location, meta.size);
if meta.location == Path::from("_symlink_format_manifest/manifest") {
found = true;
break;
}
}
assert!(
found,
"The _symlink_format_manifest/manifest was not found in the Delta table's object store prefix"
);
Ok(())
}
#[tokio::test]
async fn test_generate_with_partitions() -> DeltaResult<()> {
use crate::kernel::Add;
let actions = vec![Action::Add(Add {
path: "locale=us/some-files.parquet".into(),
partition_values: HashMap::from([("locale".to_string(), Some("us".to_string()))]),
..Default::default()
})];
let table = DeltaTable::new_in_memory()
.create()
.with_column("id", DataType::Primitive(PrimitiveType::Long), true, None)
.with_column(
"locale",
DataType::Primitive(PrimitiveType::String),
true,
None,
)
.with_partition_columns(vec!["locale"])
.with_actions(actions)
.await?;
let generate = GenerateBuilder::new(table.log_store(), table.state.map(|s| s.snapshot));
let table = generate.await?;
let store = table.log_store().object_store(None);
let mut stream = store.list(None);
let mut found = false;
while let Some(meta) = stream.next().await.transpose().unwrap() {
println!("Name: {}, size: {}", meta.location, meta.size);
if meta.location == Path::from("_symlink_format_manifest/locale=us/manifest") {
found = true;
break;
}
assert_ne!(
meta.location,
Path::from("_symlink_format_manifest/manifest"),
"The 'root' manifest file is not expected in a partitioned table!"
);
}
assert!(
found,
"The _symlink_format_manifest/manifest was not found in the Delta table's object store prefix"
);
Ok(())
}
}