use arrow::datatypes::Schema as ArrowSchema;
use arrow_array::{Int32Array, RecordBatch};
use arrow_schema::{DataType as ArrowDataType, Field};
use chrono::DateTime;
use deltalake_core::kernel::{DataType, PrimitiveType, StructField};
use deltalake_core::logstore::commit_uri_from_version;
use deltalake_core::logstore::object_store::ObjectStoreExt as _;
use deltalake_core::protocol::SaveMode;
use deltalake_core::{DeltaTable, ensure_table_uri};
use futures::TryStreamExt;
use rand::{Rng, RngExt};
use std::error::Error;
use std::fs;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
#[derive(Debug)]
struct Context {
pub table: DeltaTable,
}
async fn setup_test(table_uri: &str) -> Result<Context, Box<dyn Error>> {
let columns = vec![
StructField::new(
"id".to_string(),
DataType::Primitive(PrimitiveType::Integer),
true,
),
StructField::new(
"value".to_string(),
DataType::Primitive(PrimitiveType::Integer),
true,
),
];
let table = DeltaTable::try_from_url(ensure_table_uri(table_uri).unwrap())
.await
.unwrap()
.create()
.with_columns(columns)
.await?;
let batch = get_record_batch();
thread::sleep(Duration::from_secs(1));
let table = table
.write(vec![batch.clone()])
.with_save_mode(SaveMode::Append)
.await
.unwrap();
thread::sleep(Duration::from_secs(1));
let table = table
.write(vec![batch.clone()])
.with_save_mode(SaveMode::Overwrite)
.await
.unwrap();
thread::sleep(Duration::from_secs(1));
let table = table
.write(vec![batch.clone()])
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Ok(Context { table })
}
fn get_record_batch() -> RecordBatch {
let mut id_vec: Vec<i32> = Vec::with_capacity(10);
let mut value_vec: Vec<i32> = Vec::with_capacity(10);
let mut rng = rand::rng();
for _ in 0..10 {
id_vec.push(rng.random());
value_vec.push(rng.random());
}
let schema = ArrowSchema::new(vec![
Field::new("id", ArrowDataType::Int32, true),
Field::new("value", ArrowDataType::Int32, true),
]);
let id_array = Int32Array::from(id_vec);
let value_array = Int32Array::from(value_vec);
RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(id_array), Arc::new(value_array)],
)
.unwrap()
}
#[tokio::test]
async fn test_restore_by_version() -> Result<(), Box<dyn Error>> {
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;
let table = context.table;
let result = table.restore().with_version_to_restore(1).await?;
assert_eq!(result.1.num_restored_file, 1);
assert_eq!(result.1.num_removed_file, 2);
assert_eq!(result.0.snapshot()?.version(), 4);
let mut table = DeltaTable::try_from_url(ensure_table_uri(table_uri).unwrap())
.await
.unwrap();
table.load_version(1).await?;
let curr_files = table.get_files_by_partitions(&[]).await?;
let result_files = result.0.get_files_by_partitions(&[]).await?;
assert_eq!(curr_files, result_files);
let result = result.0.restore().with_version_to_restore(0).await?;
assert_eq!(result.0.snapshot().unwrap().log_data().num_files(), 0);
Ok(())
}
#[tokio::test]
async fn test_restore_by_datetime() -> Result<(), Box<dyn Error>> {
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;
let table = context.table;
let version = 1;
let meta = table
.object_store()
.head(&commit_uri_from_version(Some(version)))
.await?;
let timestamp = meta.last_modified.timestamp_millis();
let datetime = DateTime::from_timestamp_millis(timestamp).unwrap();
let result = table.restore().with_datetime_to_restore(datetime).await?;
assert_eq!(result.1.num_restored_file, 1);
assert_eq!(result.1.num_removed_file, 2);
assert_eq!(result.0.snapshot()?.version(), 4);
Ok(())
}
#[tokio::test]
async fn test_restore_with_error_params() -> Result<(), Box<dyn Error>> {
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;
let table = context.table;
let history: Vec<_> = table.history(Some(10)).await?.collect();
let timestamp = history.get(1).unwrap().timestamp.unwrap();
let datetime = DateTime::from_timestamp_millis(timestamp).unwrap();
let result = table
.restore()
.with_version_to_restore(1)
.with_datetime_to_restore(datetime)
.await;
assert!(result.is_err());
let ops = DeltaTable::try_from_url(ensure_table_uri(table_uri).unwrap())
.await
.unwrap();
let result = ops.restore().with_version_to_restore(5).await;
assert!(result.is_err());
Ok(())
}
#[tokio::test]
async fn test_restore_file_missing() -> Result<(), Box<dyn Error>> {
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;
for file in context.table.snapshot()?.log_data() {
let p = tmp_dir.path().join(file.path().as_ref());
fs::remove_file(p).unwrap();
}
for file in context
.table
.snapshot()?
.all_tombstones(&context.table.log_store())
.try_collect::<Vec<_>>()
.await?
{
let p = tmp_dir.path().join(file.path().to_string());
fs::remove_file(p).unwrap();
}
let result = context.table.restore().with_version_to_restore(1).await;
assert!(result.is_err());
Ok(())
}
#[tokio::test]
async fn test_restore_allow_file_missing() -> Result<(), Box<dyn Error>> {
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;
for file in context.table.snapshot()?.log_data() {
let p = tmp_dir.path().join(file.path().as_ref());
fs::remove_file(p).unwrap();
}
for file in context
.table
.snapshot()?
.all_tombstones(&context.table.log_store())
.try_collect::<Vec<_>>()
.await?
{
let p = tmp_dir.path().join(file.path().to_string());
fs::remove_file(p).unwrap();
}
let result = context
.table
.restore()
.with_ignore_missing_files(true)
.with_version_to_restore(1)
.await;
assert!(result.is_ok());
Ok(())
}
#[tokio::test]
async fn test_restore_transaction_conflict() -> Result<(), Box<dyn Error>> {
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let context = setup_test(table_uri).await?;
let mut table = context.table;
table.load_version(2).await?;
let result = table.restore().with_version_to_restore(1).await;
assert!(result.is_err());
Ok(())
}