use deltalake::arrow::{
array::{Int32Array, StringArray, TimestampMicrosecondArray},
datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema, TimeUnit},
record_batch::RecordBatch,
};
use deltalake::kernel::{DataType, PrimitiveType, StructField};
use deltalake::operations::collect_sendable_stream;
use deltalake::parquet::{
basic::{Compression, ZstdLevel},
file::properties::WriterProperties,
};
use deltalake::{protocol::SaveMode, DeltaOps};
use std::sync::Arc;
fn get_table_columns() -> Vec<StructField> {
vec![
StructField::new(
String::from("int"),
DataType::Primitive(PrimitiveType::Integer),
false,
),
StructField::new(
String::from("string"),
DataType::Primitive(PrimitiveType::String),
true,
),
StructField::new(
String::from("timestamp"),
DataType::Primitive(PrimitiveType::TimestampNtz),
true,
),
]
}
fn get_table_batches() -> RecordBatch {
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("int", ArrowDataType::Int32, false),
Field::new("string", ArrowDataType::Utf8, true),
Field::new(
"timestamp",
ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
true,
),
]));
let int_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
let str_values = StringArray::from(vec!["A", "B", "A", "B", "A", "A", "A", "B", "B", "A", "A"]);
let ts_values = TimestampMicrosecondArray::from(vec![
1000000012, 1000000012, 1000000012, 1000000012, 500012305, 500012305, 500012305, 500012305,
500012305, 500012305, 500012305,
]);
RecordBatch::try_new(
schema,
vec![
Arc::new(int_values),
Arc::new(str_values),
Arc::new(ts_values),
],
)
.unwrap()
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), deltalake::errors::DeltaTableError> {
let ops = if let Ok(table_uri) = std::env::var("TABLE_URI") {
DeltaOps::try_from_uri(table_uri).await?
} else {
DeltaOps::new_in_memory()
};
let table = ops
.create()
.with_columns(get_table_columns())
.with_partition_columns(["timestamp"])
.with_table_name("my_table")
.with_comment("A table to show how delta-rs works")
.await?;
assert_eq!(table.version(), 0);
let writer_properties = WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()))
.build();
let batch = get_table_batches();
let table = DeltaOps(table)
.write(vec![batch.clone()])
.with_writer_properties(writer_properties)
.await?;
assert_eq!(table.version(), 1);
let writer_properties = WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()))
.build();
let table = DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(SaveMode::Overwrite)
.with_writer_properties(writer_properties)
.await?;
assert_eq!(table.version(), 2);
let (_table, stream) = DeltaOps(table).load().await?;
let data: Vec<RecordBatch> = collect_sendable_stream(stream).await?;
println!("{:?}", data);
Ok(())
}