use std::sync::Arc;
use buoyant_kernel as delta_kernel;
use delta_kernel::arrow::array::Int32Array;
use delta_kernel::committer::FileSystemCommitter;
use delta_kernel::engine::default::executor::tokio::TokioMultiThreadExecutor;
use delta_kernel::engine::default::{DefaultEngine, DefaultEngineBuilder};
use delta_kernel::schema::{DataType, StructField, StructType};
use delta_kernel::transaction::create_table::create_table;
use delta_kernel::{DeltaResult, Snapshot};
use test_utils::table_builder::{LogState, TestTableBuilder};
use test_utils::{insert_data, test_table_setup_mt, CountingReporter};
use url::Url;
mod scan;
mod snapshot_load;
fn measuring_engine(
store: Arc<dyn delta_kernel::object_store::ObjectStore>,
) -> (
DefaultEngine<delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor>,
Arc<CountingReporter>,
) {
let reporter = Arc::new(CountingReporter::default());
let engine = DefaultEngineBuilder::new(store)
.with_metrics_reporter(reporter.clone())
.build();
(engine, reporter)
}
fn simple_schema() -> Arc<StructType> {
Arc::new(
StructType::try_new(vec![StructField::nullable("id", DataType::INTEGER)])
.expect("valid schema"),
)
}
async fn insert_rows(
table_url: &Url,
engine: &Arc<
DefaultEngine<delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor>,
>,
start_val: i32,
count: i32,
) -> DeltaResult<()> {
let mut snap = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
for val in start_val..(start_val + count) {
let committed = insert_data(snap, engine, vec![Arc::new(Int32Array::from(vec![val]))])
.await?
.unwrap_committed();
snap = committed
.post_commit_snapshot()
.expect("post-commit snapshot")
.clone();
}
Ok(())
}
async fn setup_table_with_v1_checkpoint() -> DeltaResult<(
Url,
Arc<DefaultEngine<TokioMultiThreadExecutor>>,
tempfile::TempDir,
)> {
let (temp_dir, table_path, setup_engine) = test_table_setup_mt()?;
let table_url = delta_kernel::try_parse_uri(&table_path)?;
let _ = create_table(&table_path, simple_schema(), "Test/1.0")
.build(setup_engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(setup_engine.as_ref())?;
let snap0 = Snapshot::builder_for(table_url.clone()).build(setup_engine.as_ref())?;
let committed = insert_data(
snap0,
&setup_engine,
vec![Arc::new(Int32Array::from(vec![1]))],
)
.await?
.unwrap_committed();
committed
.post_commit_snapshot()
.expect("post-commit snapshot")
.clone()
.checkpoint(setup_engine.as_ref())?;
Ok((table_url, setup_engine, temp_dir))
}