use super::transaction::Transaction;
use crate::Dataset;
use crate::Result;
use crate::dataset::scanner::DatasetRecordBatchStream;
use chrono::{DateTime, Utc};
use futures::stream::{self, StreamExt, TryStreamExt};
use lance_core::Error;
use lance_core::ROW_CREATED_AT_VERSION;
use lance_core::ROW_ID;
use lance_core::ROW_LAST_UPDATED_AT_VERSION;
use lance_core::WILDCARD;
use lance_core::utils::tokio::get_num_compute_intensive_cpus;
#[derive(Clone, Debug)]
pub struct DatasetDeltaBuilder {
dataset: Dataset,
compared_against_version: Option<u64>,
begin_version: Option<u64>,
end_version: Option<u64>,
begin_timestamp: Option<DateTime<Utc>>,
end_timestamp: Option<DateTime<Utc>>,
}
impl DatasetDeltaBuilder {
pub fn new(dataset: Dataset) -> Self {
Self {
dataset,
compared_against_version: None,
begin_version: None,
end_version: None,
begin_timestamp: None,
end_timestamp: None,
}
}
pub fn compared_against_version(mut self, version: u64) -> Self {
self.compared_against_version = Some(version);
self
}
pub fn with_begin_version(mut self, version: u64) -> Self {
self.begin_version = Some(version);
self
}
pub fn with_end_version(mut self, version: u64) -> Self {
self.end_version = Some(version);
self
}
pub fn with_begin_date(mut self, timestamp: DateTime<Utc>) -> Self {
self.begin_timestamp = Some(timestamp);
self
}
pub fn with_end_date(mut self, timestamp: DateTime<Utc>) -> Self {
self.end_timestamp = Some(timestamp);
self
}
pub fn build(self) -> Result<DatasetDelta> {
if self.compared_against_version.is_some()
&& (self.begin_version.is_some()
|| self.end_version.is_some()
|| self.begin_timestamp.is_some()
|| self.end_timestamp.is_some())
{
return Err(Error::invalid_input(
"Cannot combine compared_against_version with explicit begin/end versions or dates",
));
}
let (begin_version, end_version, begin_ts, end_ts) = match (
self.compared_against_version,
self.begin_version,
self.end_version,
self.begin_timestamp,
self.end_timestamp,
) {
(Some(compared), None, None, None, None) => {
let current_version = self.dataset.version().version;
if current_version > compared {
(compared, current_version, None, None)
} else {
(current_version, compared, None, None)
}
}
(None, Some(begin), Some(end), None, None) => (begin, end, None, None),
(None, None, None, Some(begin_ts), Some(end_ts)) => {
(0, 0, Some(begin_ts), Some(end_ts))
}
(None, Some(_), None, None, None) | (None, None, Some(_), None, None) => {
return Err(Error::invalid_input(
"Must specify both with_begin_version and with_end_version",
));
}
(None, None, None, Some(begin_ts), None) => (0, 0, Some(begin_ts), None),
(None, None, None, None, Some(_)) => {
return Err(Error::invalid_input(
"Must specify with_begin_date when with_end_date is provided",
));
}
(None, None, None, None, None) => {
return Err(Error::invalid_input(
"Must specify either compared_against_version or both with_begin_version and with_end_version",
));
}
_ => {
return Err(Error::invalid_input(
"Invalid combination of parameters for DatasetDeltaBuilder",
));
}
};
Ok(DatasetDelta {
begin_version,
end_version,
base_dataset: self.dataset,
begin_timestamp: begin_ts,
end_timestamp: end_ts,
})
}
}
pub struct DatasetDelta {
pub(crate) begin_version: u64,
pub(crate) end_version: u64,
pub(crate) base_dataset: Dataset,
pub(crate) begin_timestamp: Option<DateTime<Utc>>,
pub(crate) end_timestamp: Option<DateTime<Utc>>,
}
impl DatasetDelta {
async fn resolve_range(&self) -> Result<(u64, u64)> {
if let (Some(begin_ts), Some(end_ts)) = (self.begin_timestamp, self.end_timestamp) {
let versions = self.base_dataset.versions().await?;
let mut begin_version: u64 = 0;
let mut end_version: u64 = 0;
for v in &versions {
if v.timestamp < begin_ts && v.version > begin_version {
begin_version = v.version;
}
if v.timestamp <= end_ts && v.version > end_version {
end_version = v.version;
}
}
Ok((begin_version, end_version))
} else if let (Some(begin_ts), None) = (self.begin_timestamp, self.end_timestamp) {
let versions = self.base_dataset.versions().await?;
let mut begin_version: u64 = 0;
for v in &versions {
if v.timestamp < begin_ts && v.version > begin_version {
begin_version = v.version;
}
}
let end_version = self.base_dataset.latest_version_id().await?;
Ok((begin_version, end_version))
} else {
Ok((self.begin_version, self.end_version))
}
}
pub async fn list_transactions(&self) -> Result<Vec<Transaction>> {
let (begin_version, end_version) = self.resolve_range().await?;
stream::iter((begin_version + 1)..=end_version)
.map(|version| {
let base_dataset = self.base_dataset.clone();
async move {
let current_ds = match base_dataset.checkout_version(version).await {
Ok(ds) => ds,
Err(err) => {
if matches!(err, Error::DatasetNotFound { .. }) {
return Err(Error::VersionNotFound {
message: format!(
"Can not find version {}, please check if it has been cleanup.",
version
),
});
} else {
return Err(err);
}
}
};
current_ds.read_transaction().await
}
})
.buffered(get_num_compute_intensive_cpus())
.try_filter_map(|result| async move { Ok(result) })
.try_collect()
.await
}
pub async fn get_inserted_rows(&self) -> Result<DatasetRecordBatchStream> {
let mut scanner = self.base_dataset.scan();
scanner.project(&[
WILDCARD,
ROW_ID,
ROW_CREATED_AT_VERSION,
ROW_LAST_UPDATED_AT_VERSION,
])?;
let filter = self.build_inserted_rows_filter().await?;
scanner.filter(&filter)?;
scanner.try_into_stream().await
}
async fn build_inserted_rows_filter(&self) -> Result<String> {
let (begin_version, end_version) = self.resolve_range().await?;
Ok(format!(
"_row_created_at_version > {} AND _row_created_at_version <= {}",
begin_version, end_version
))
}
pub async fn get_updated_rows(&self) -> Result<DatasetRecordBatchStream> {
let mut scanner = self.base_dataset.scan();
scanner.project(&[
WILDCARD,
ROW_ID,
ROW_CREATED_AT_VERSION,
ROW_LAST_UPDATED_AT_VERSION,
])?;
let filter = self.build_updated_rows_batch_filter().await?;
scanner.filter(&filter)?;
scanner.try_into_stream().await
}
async fn build_updated_rows_batch_filter(&self) -> Result<String> {
let (begin_version, end_version) = self.resolve_range().await?;
Ok(format!(
"_row_created_at_version <= {} AND _row_last_updated_at_version > {} AND _row_last_updated_at_version <= {}",
begin_version, begin_version, end_version
))
}
pub async fn get_upserted_rows(&self) -> Result<DatasetRecordBatchStream> {
let mut scanner = self.base_dataset.scan();
scanner.project(&[
WILDCARD,
ROW_ID,
ROW_CREATED_AT_VERSION,
ROW_LAST_UPDATED_AT_VERSION,
])?;
let filter = self.build_upserted_rows_filter().await?;
scanner.filter(&filter)?;
scanner.try_into_stream().await
}
async fn build_upserted_rows_filter(&self) -> Result<String> {
let inserted_row_filter = self.build_inserted_rows_filter().await?;
let updated_rows_filter = self.build_updated_rows_batch_filter().await?;
Ok(format!(
"({}) OR ({})",
inserted_row_filter, updated_rows_filter
))
}
}
#[cfg(test)]
mod tests {
use crate::dataset::transaction::Operation;
use crate::dataset::{Dataset, WriteParams};
use arrow_array::cast::AsArray;
use arrow_array::types::Int32Type;
use arrow_array::types::UInt64Type;
use chrono::Duration;
use futures::TryStreamExt;
use lance_core::{ROW_CREATED_AT_VERSION, ROW_ID, ROW_LAST_UPDATED_AT_VERSION};
use lance_datagen::{BatchCount, RowCount, array};
use mock_instant::thread_local::MockClock;
use std::sync::Arc;
async fn create_test_dataset(
rows: usize,
batches: usize,
value: &str,
stable_row_ids: bool,
) -> Dataset {
let data = lance_datagen::gen_batch()
.col("key", array::step::<Int32Type>())
.col("value", array::fill_utf8(value.to_string()))
.into_reader_rows(
RowCount::from(rows as u64),
BatchCount::from(batches as u32),
);
let write_params = WriteParams {
enable_stable_row_ids: stable_row_ids,
..Default::default()
};
Dataset::write(data, "memory://", Some(write_params))
.await
.unwrap()
}
async fn write_dataset_temp(
dir: &lance_core::utils::tempfile::TempStrDir,
start_key: i32,
rows: usize,
batches: usize,
value: &str,
stable_row_ids: bool,
append: bool,
) -> Dataset {
let data = lance_datagen::gen_batch()
.col("key", array::step_custom::<Int32Type>(start_key, 1))
.col("value", array::fill_utf8(value.to_string()))
.into_reader_rows(
RowCount::from(rows as u64),
BatchCount::from(batches as u32),
);
let write_params = WriteParams {
enable_stable_row_ids: stable_row_ids,
mode: if append {
crate::dataset::WriteMode::Append
} else {
crate::dataset::WriteMode::Create
},
..Default::default()
};
Dataset::write(data, dir, Some(write_params)).await.unwrap()
}
async fn update_where<T: Into<Arc<Dataset>>>(ds: T, predicate: &str, value: &str) -> Dataset {
let updated = crate::dataset::UpdateBuilder::new(ds.into())
.update_where(predicate)
.unwrap()
.set("value", &format!("'{}'", value))
.unwrap()
.build()
.unwrap()
.execute()
.await
.unwrap();
Arc::try_unwrap(updated.new_dataset).unwrap_or_else(|arc| arc.as_ref().clone())
}
async fn scan_project_filter(
ds: &Dataset,
cols: &[&str],
filter: Option<&str>,
) -> arrow_array::RecordBatch {
let mut scanner = ds.scan();
scanner.project(cols).unwrap();
if let Some(f) = filter {
scanner.filter(f).unwrap();
}
scanner.try_into_batch().await.unwrap()
}
async fn collect_stream(
stream: crate::dataset::scanner::DatasetRecordBatchStream,
) -> arrow_array::RecordBatch {
let batches: Vec<_> = stream.try_collect().await.unwrap();
arrow_select::concat::concat_batches(&batches[0].schema(), &batches).unwrap()
}
#[tokio::test]
async fn test_list_no_transaction() {
let ds = create_test_dataset(1_000, 10, "value", false).await;
let delta = ds.delta().compared_against_version(1).build().unwrap();
let result = delta.list_transactions().await;
assert_eq!(result.unwrap().len(), 0);
}
#[tokio::test]
async fn test_list_single_transaction() {
let mut ds = create_test_dataset(1_000, 10, "value", false).await;
ds.delete("key = 5").await.unwrap();
let delta_struct = ds
.delta()
.with_begin_version(1)
.with_end_version(ds.version().version)
.build()
.unwrap();
let txs = delta_struct.list_transactions().await.unwrap();
assert_eq!(txs.len(), 1);
assert!(matches!(txs[0].operation, Operation::Delete { .. }));
}
#[tokio::test]
async fn test_list_multiple_transactions() {
let mut ds = create_test_dataset(1_000, 10, "value", false).await;
ds.delete("key = 5").await.unwrap();
ds.delete("key = 6").await.unwrap();
let delta_struct = ds
.delta()
.with_begin_version(1)
.with_end_version(ds.version().version)
.build()
.unwrap();
let txs = delta_struct.list_transactions().await.unwrap();
assert_eq!(txs.len(), 2);
}
#[tokio::test]
async fn test_list_contains_deleted_transaction() {
MockClock::set_system_time(std::time::Duration::from_secs(1));
let mut ds = create_test_dataset(1_000, 10, "value", false).await;
MockClock::set_system_time(std::time::Duration::from_secs(2));
ds.delete("key = 5").await.unwrap();
ds.delete("key = 6").await.unwrap();
ds.delete("key = 7").await.unwrap();
MockClock::set_system_time(std::time::Duration::from_secs(3));
let end_version = ds.version().version;
let base_dataset = ds.clone();
MockClock::set_system_time(std::time::Duration::from_secs(4));
ds.cleanup_old_versions(Duration::seconds(1), Some(true), None)
.await
.expect("Cleanup old versions failed");
MockClock::set_system_time(std::time::Duration::from_secs(5));
let delta_struct = base_dataset
.delta()
.with_begin_version(1)
.with_end_version(end_version)
.build()
.unwrap();
let result = delta_struct.list_transactions().await;
match result {
Err(lance_core::Error::VersionNotFound { message }) => {
assert!(message.contains("Can not find version"));
}
_ => panic!("Expected VersionNotFound error."),
}
}
#[tokio::test]
async fn test_row_created_at_version_basic() {
let ds = create_test_dataset(100, 1, "value", true).await;
assert_eq!(ds.version().version, 1);
let result = scan_project_filter(&ds, &["key", ROW_CREATED_AT_VERSION], None).await;
let created_at = result[ROW_CREATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
assert_eq!(result.num_rows(), 100);
for version in created_at.iter() {
assert_eq!(*version, 1);
}
}
#[tokio::test]
async fn test_row_last_updated_at_version_basic() {
let ds = create_test_dataset(100, 1, "value", true).await;
assert_eq!(ds.version().version, 1);
let ds = update_where(ds, "key < 30", "updated_v2").await;
assert_eq!(ds.version().version, 2);
let ds = update_where(ds, "key >= 30 AND key < 50", "updated_v3").await;
assert_eq!(ds.version().version, 3);
let ds = update_where(ds, "key >= 10 AND key < 20", "updated_v4").await;
assert_eq!(ds.version().version, 4);
let result = scan_project_filter(&ds, &["key", ROW_LAST_UPDATED_AT_VERSION], None).await;
let updated_at = result[ROW_LAST_UPDATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let keys = result["key"].as_primitive::<Int32Type>().values();
assert_eq!(result.num_rows(), 100);
for i in 0..result.num_rows() {
let key = keys[i];
if (10..20).contains(&key) {
assert_eq!(updated_at[i], 4);
} else if key < 30 {
assert_eq!(updated_at[i], 2);
} else if (30..50).contains(&key) {
assert_eq!(updated_at[i], 3);
} else {
assert_eq!(updated_at[i], 1);
}
}
}
#[tokio::test]
async fn test_row_version_metadata_after_update() {
let ds = create_test_dataset(100, 1, "value", true).await;
assert_eq!(ds.version().version, 1);
let ds = update_where(ds, "key < 10", "updated_v2").await;
assert_eq!(ds.version().version, 2);
let ds = update_where(ds, "key >= 20 AND key < 30", "updated_v3").await;
assert_eq!(ds.version().version, 3);
let ds = update_where(ds, "key >= 5 AND key < 15", "updated_v4").await;
assert_eq!(ds.version().version, 4);
let result = scan_project_filter(
&ds,
&["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION],
None,
)
.await;
let created_at = result[ROW_CREATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let updated_at = result[ROW_LAST_UPDATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let keys = result["key"].as_primitive::<Int32Type>().values();
assert_eq!(result.num_rows(), 100);
for i in 0..result.num_rows() {
let key = keys[i];
assert_eq!(created_at[i], 1);
if (5..15).contains(&key) {
assert_eq!(updated_at[i], 4);
} else if key < 10 {
assert_eq!(updated_at[i], 2);
} else if (20..30).contains(&key) {
assert_eq!(updated_at[i], 3);
} else {
assert_eq!(updated_at[i], 1);
}
}
}
#[tokio::test]
async fn test_row_version_metadata_after_append() {
let temp_dir = lance_core::utils::tempfile::TempStrDir::default();
let ds = write_dataset_temp(&temp_dir, 0, 50, 1, "value", true, false).await;
assert_eq!(ds.version().version, 1);
let ds = write_dataset_temp(&temp_dir, 50, 50, 1, "appended", true, true).await;
assert_eq!(ds.version().version, 2);
let result = scan_project_filter(
&ds,
&["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION],
None,
)
.await;
let created_at = result[ROW_CREATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let updated_at = result[ROW_LAST_UPDATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let keys = result["key"].as_primitive::<Int32Type>().values();
assert_eq!(result.num_rows(), 100);
for i in 0..result.num_rows() {
let key = keys[i];
if key < 50 {
assert_eq!(created_at[i], 1);
assert_eq!(updated_at[i], 1);
} else {
assert_eq!(created_at[i], 2);
assert_eq!(updated_at[i], 2);
}
}
}
#[tokio::test]
async fn test_row_version_metadata_after_delete() {
let mut ds = create_test_dataset(100, 1, "value", true).await;
assert_eq!(ds.version().version, 1);
ds.delete("key < 10").await.unwrap();
assert_eq!(ds.version().version, 2);
let result = scan_project_filter(
&ds,
&["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION],
None,
)
.await;
let created_at = result[ROW_CREATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let updated_at = result[ROW_LAST_UPDATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let keys = result["key"].as_primitive::<Int32Type>().values();
assert_eq!(result.num_rows(), 90);
for i in 0..result.num_rows() {
let key = keys[i];
assert!(key >= 10);
assert_eq!(created_at[i], 1);
assert_eq!(updated_at[i], 1);
}
}
#[tokio::test]
async fn test_row_version_metadata_combined() {
let data = lance_datagen::gen_batch()
.col("key", array::step::<Int32Type>())
.col("value", array::fill_utf8("value".to_string()))
.into_reader_rows(RowCount::from(100), BatchCount::from(1));
let write_params = WriteParams {
enable_stable_row_ids: true,
..Default::default()
};
let ds = Dataset::write(data, "memory://", Some(write_params))
.await
.unwrap();
assert_eq!(ds.version().version, 1);
let updated = crate::dataset::UpdateBuilder::new(Arc::new(ds))
.update_where("key >= 40 AND key < 50")
.unwrap()
.set("value", "'updated1'")
.unwrap()
.build()
.unwrap()
.execute()
.await
.unwrap();
let ds = updated.new_dataset;
let updated = crate::dataset::UpdateBuilder::new(ds)
.update_where("key >= 50 AND key < 60")
.unwrap()
.set("value", "'updated2'")
.unwrap()
.build()
.unwrap()
.execute()
.await
.unwrap();
let mut ds = Arc::try_unwrap(updated.new_dataset).expect("no other Arc references");
ds.delete("key < 10").await.unwrap();
assert_eq!(ds.version().version, 4);
let result = ds
.scan()
.with_row_id()
.project(&["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION])
.unwrap()
.try_into_batch()
.await
.unwrap();
let row_ids = result[ROW_ID].as_primitive::<UInt64Type>().values();
let created_at = result[ROW_CREATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let updated_at = result[ROW_LAST_UPDATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let keys = result["key"].as_primitive::<Int32Type>().values();
assert_eq!(result.num_rows(), 90);
for i in 0..result.num_rows() {
let key = keys[i];
let _row_id = row_ids[i];
assert_eq!(created_at[i], 1);
if (40..50).contains(&key) {
assert_eq!(updated_at[i], 2);
} else if (50..60).contains(&key) {
assert_eq!(updated_at[i], 3);
} else {
assert_eq!(updated_at[i], 1);
}
}
}
#[tokio::test]
async fn test_filter_by_row_created_at_version() {
let temp_dir = lance_core::utils::tempfile::TempStrDir::default();
let ds = write_dataset_temp(&temp_dir, 0, 50, 1, "value", true, false).await;
assert_eq!(ds.version().version, 1);
let ds = write_dataset_temp(&temp_dir, 50, 50, 1, "appended", true, true).await;
assert_eq!(ds.version().version, 2);
let result = scan_project_filter(
&ds,
&["key", ROW_CREATED_AT_VERSION],
Some("_row_created_at_version = 1"),
)
.await;
assert_eq!(result.num_rows(), 50);
let created_at = result[ROW_CREATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let keys = result["key"].as_primitive::<Int32Type>().values();
for i in 0..result.num_rows() {
assert_eq!(created_at[i], 1);
assert!(keys[i] < 50);
}
let result = scan_project_filter(
&ds,
&["key", ROW_CREATED_AT_VERSION],
Some("_row_created_at_version = 2"),
)
.await;
assert_eq!(result.num_rows(), 50);
let created_at = result[ROW_CREATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let keys = result["key"].as_primitive::<Int32Type>().values();
for i in 0..result.num_rows() {
assert_eq!(created_at[i], 2);
assert!(keys[i] >= 50);
}
let result = scan_project_filter(
&ds,
&["key", ROW_CREATED_AT_VERSION],
Some("_row_created_at_version >= 2"),
)
.await;
assert_eq!(result.num_rows(), 50);
for i in 0..result.num_rows() {
let created_at_val = result[ROW_CREATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.value(i);
assert!(created_at_val >= 2);
}
}
#[tokio::test]
async fn test_filter_by_row_last_updated_at_version() {
let data = lance_datagen::gen_batch()
.col("key", array::step::<Int32Type>())
.col("value", array::fill_utf8("value".to_string()))
.into_reader_rows(RowCount::from(100), BatchCount::from(1));
let write_params = WriteParams {
enable_stable_row_ids: true,
..Default::default()
};
let ds = Dataset::write(data, "memory://", Some(write_params))
.await
.unwrap();
assert_eq!(ds.version().version, 1);
let updated = crate::dataset::UpdateBuilder::new(Arc::new(ds))
.update_where("key < 30")
.unwrap()
.set("value", "'updated_v2'")
.unwrap()
.build()
.unwrap()
.execute()
.await
.unwrap();
let ds = updated.new_dataset;
assert_eq!(ds.version().version, 2);
let updated = crate::dataset::UpdateBuilder::new(ds)
.update_where("key >= 30 AND key < 50")
.unwrap()
.set("value", "'updated_v3'")
.unwrap()
.build()
.unwrap()
.execute()
.await
.unwrap();
let ds = updated.new_dataset;
assert_eq!(ds.version().version, 3);
let result = ds
.scan()
.project(&["key", ROW_LAST_UPDATED_AT_VERSION])
.unwrap()
.filter("_row_last_updated_at_version = 1")
.unwrap()
.try_into_batch()
.await
.unwrap();
assert_eq!(result.num_rows(), 50);
let updated_at = result[ROW_LAST_UPDATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let keys = result["key"].as_primitive::<Int32Type>().values();
for i in 0..result.num_rows() {
assert_eq!(updated_at[i], 1);
assert!(keys[i] >= 50);
}
let result = ds
.scan()
.project(&["key", ROW_LAST_UPDATED_AT_VERSION])
.unwrap()
.filter("_row_last_updated_at_version = 2")
.unwrap()
.try_into_batch()
.await
.unwrap();
assert_eq!(result.num_rows(), 30);
let updated_at = result[ROW_LAST_UPDATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let keys = result["key"].as_primitive::<Int32Type>().values();
for i in 0..result.num_rows() {
assert_eq!(updated_at[i], 2);
assert!(keys[i] < 30);
}
let result = ds
.scan()
.project(&["key", ROW_LAST_UPDATED_AT_VERSION])
.unwrap()
.filter("_row_last_updated_at_version = 3")
.unwrap()
.try_into_batch()
.await
.unwrap();
assert_eq!(result.num_rows(), 20);
let updated_at = result[ROW_LAST_UPDATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let keys = result["key"].as_primitive::<Int32Type>().values();
for i in 0..result.num_rows() {
assert_eq!(updated_at[i], 3);
assert!(keys[i] >= 30 && keys[i] < 50);
}
let result = ds
.scan()
.project(&["key", ROW_LAST_UPDATED_AT_VERSION])
.unwrap()
.filter("_row_last_updated_at_version > 1")
.unwrap()
.try_into_batch()
.await
.unwrap();
assert_eq!(result.num_rows(), 50);
for i in 0..result.num_rows() {
let updated_at_val = result[ROW_LAST_UPDATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.value(i);
assert!(updated_at_val > 1);
}
}
#[tokio::test]
async fn test_filter_by_combined_version_columns() {
let temp_dir = lance_core::utils::tempfile::TempStrDir::default();
let ds = write_dataset_temp(&temp_dir, 0, 50, 1, "value", true, false).await;
assert_eq!(ds.version().version, 1);
let ds = write_dataset_temp(&temp_dir, 50, 50, 1, "appended", true, true).await;
assert_eq!(ds.version().version, 2);
let ds = update_where(ds, "key >= 20 AND key < 30", "updated_v3").await;
assert_eq!(ds.version().version, 3);
let result = scan_project_filter(
&ds,
&["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION],
Some("_row_created_at_version = 1 AND _row_last_updated_at_version = 1"),
)
.await;
assert_eq!(result.num_rows(), 40);
let created_at = result[ROW_CREATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let updated_at = result[ROW_LAST_UPDATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let keys = result["key"].as_primitive::<Int32Type>().values();
for i in 0..result.num_rows() {
assert_eq!(created_at[i], 1);
assert_eq!(updated_at[i], 1);
assert!(keys[i] < 50);
assert!(keys[i] < 20 || keys[i] >= 30);
}
let result = scan_project_filter(
&ds,
&["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION],
Some("_row_created_at_version = 1 AND _row_last_updated_at_version = 3"),
)
.await;
assert_eq!(result.num_rows(), 10);
let created_at = result[ROW_CREATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let updated_at = result[ROW_LAST_UPDATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let keys = result["key"].as_primitive::<Int32Type>().values();
for i in 0..result.num_rows() {
assert_eq!(created_at[i], 1);
assert_eq!(updated_at[i], 3);
assert!(keys[i] >= 20 && keys[i] < 30);
}
let result = scan_project_filter(
&ds,
&["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION],
Some("_row_created_at_version = _row_last_updated_at_version"),
)
.await;
assert_eq!(result.num_rows(), 90);
let created_at = result[ROW_CREATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let updated_at = result[ROW_LAST_UPDATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
for i in 0..result.num_rows() {
assert_eq!(created_at[i], updated_at[i]);
}
let result = scan_project_filter(
&ds,
&["key", ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION],
Some("_row_created_at_version != _row_last_updated_at_version"),
)
.await;
assert_eq!(result.num_rows(), 10);
let created_at = result[ROW_CREATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let updated_at = result[ROW_LAST_UPDATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let keys = result["key"].as_primitive::<Int32Type>().values();
for i in 0..result.num_rows() {
assert_ne!(created_at[i], updated_at[i]);
assert_eq!(created_at[i], 1);
assert_eq!(updated_at[i], 3);
assert!(keys[i] >= 20 && keys[i] < 30);
}
}
#[tokio::test]
async fn test_filter_version_columns_with_other_columns() {
let ds = create_test_dataset(100, 1, "value", true).await;
let ds = update_where(ds, "key >= 30 AND key < 60", "updated").await;
let result = scan_project_filter(
&ds,
&["key", "value", ROW_LAST_UPDATED_AT_VERSION],
Some("key < 50 AND _row_last_updated_at_version = 2"),
)
.await;
assert_eq!(result.num_rows(), 20);
let updated_at = result[ROW_LAST_UPDATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let keys = result["key"].as_primitive::<Int32Type>().values();
for i in 0..result.num_rows() {
assert_eq!(updated_at[i], 2);
assert!(keys[i] >= 30 && keys[i] < 50);
}
}
#[tokio::test]
async fn test_get_inserted_rows() {
let temp_dir = lance_core::utils::tempfile::TempStrDir::default();
let ds = write_dataset_temp(&temp_dir, 0, 50, 1, "value", true, false).await;
assert_eq!(ds.version().version, 1);
let ds = write_dataset_temp(&temp_dir, 50, 30, 1, "appended_v2", true, true).await;
assert_eq!(ds.version().version, 2);
let ds = write_dataset_temp(&temp_dir, 80, 20, 1, "appended_v3", true, true).await;
assert_eq!(ds.version().version, 3);
let delta = ds
.delta()
.with_begin_version(0)
.with_end_version(3)
.build()
.unwrap();
let stream = delta.get_inserted_rows().await.unwrap();
let result = collect_stream(stream).await;
assert_eq!(result.num_rows(), 100);
assert!(result.column_by_name(ROW_ID).is_some());
assert!(result.column_by_name(ROW_CREATED_AT_VERSION).is_some());
assert!(result.column_by_name(ROW_LAST_UPDATED_AT_VERSION).is_some());
let delta = ds
.delta()
.with_begin_version(1)
.with_end_version(2)
.build()
.unwrap();
let stream = delta.get_inserted_rows().await.unwrap();
let result = collect_stream(stream).await;
assert_eq!(result.num_rows(), 30);
let created_at = result[ROW_CREATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let keys = result["key"].as_primitive::<Int32Type>().values();
for i in 0..result.num_rows() {
assert_eq!(created_at[i], 2);
assert!(keys[i] >= 50 && keys[i] < 80);
}
let delta = ds
.delta()
.with_begin_version(2)
.with_end_version(3)
.build()
.unwrap();
let stream = delta.get_inserted_rows().await.unwrap();
let result = collect_stream(stream).await;
assert_eq!(result.num_rows(), 20);
let created_at = result[ROW_CREATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let keys = result["key"].as_primitive::<Int32Type>().values();
for i in 0..result.num_rows() {
assert_eq!(created_at[i], 3);
assert!(keys[i] >= 80 && keys[i] < 100);
}
}
#[tokio::test]
async fn test_get_updated_rows() {
let ds = create_test_dataset(100, 1, "value", true).await;
assert_eq!(ds.version().version, 1);
let ds = update_where(ds, "key < 30", "updated_v2").await;
assert_eq!(ds.version().version, 2);
let ds = update_where(ds, "key >= 50 AND key < 70", "updated_v3").await;
assert_eq!(ds.version().version, 3);
let ds = update_where(ds, "key >= 10 AND key < 20", "updated_v4").await;
assert_eq!(ds.version().version, 4);
let delta = ds
.delta()
.with_begin_version(1)
.with_end_version(2)
.build()
.unwrap();
let stream = delta.get_updated_rows().await.unwrap();
let result = collect_stream(stream).await;
assert_eq!(result.num_rows(), 20);
assert!(result.column_by_name(ROW_ID).is_some());
assert!(result.column_by_name(ROW_CREATED_AT_VERSION).is_some());
assert!(result.column_by_name(ROW_LAST_UPDATED_AT_VERSION).is_some());
let created_at = result[ROW_CREATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let updated_at = result[ROW_LAST_UPDATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let keys = result["key"].as_primitive::<Int32Type>().values();
for i in 0..result.num_rows() {
assert_eq!(created_at[i], 1); assert_eq!(updated_at[i], 2); assert!(keys[i] < 30);
assert!(keys[i] < 10 || keys[i] >= 20);
}
let delta = ds
.delta()
.with_begin_version(2)
.with_end_version(3)
.build()
.unwrap();
let stream = delta.get_updated_rows().await.unwrap();
let result = collect_stream(stream).await;
assert_eq!(result.num_rows(), 20);
let updated_at = result[ROW_LAST_UPDATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let keys = result["key"].as_primitive::<Int32Type>().values();
for i in 0..result.num_rows() {
assert_eq!(updated_at[i], 3);
assert!(keys[i] >= 50 && keys[i] < 70);
}
let delta = ds
.delta()
.with_begin_version(1)
.with_end_version(4)
.build()
.unwrap();
let stream = delta.get_updated_rows().await.unwrap();
let result = collect_stream(stream).await;
assert_eq!(result.num_rows(), 50);
let created_at = result[ROW_CREATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
for i in 0..result.num_rows() {
assert_eq!(created_at[i], 1); }
}
#[tokio::test]
async fn test_get_upsert_rows() {
let temp_dir = lance_core::utils::tempfile::TempStrDir::default();
let ds = write_dataset_temp(&temp_dir, 0, 50, 1, "value", true, false).await;
assert_eq!(ds.version().version, 1);
let ds = write_dataset_temp(&temp_dir, 50, 20, 1, "appended_v2", true, true).await;
assert_eq!(ds.version().version, 2);
let ds = update_where(ds, "key < 10", "updated_v3").await;
assert_eq!(ds.version().version, 3);
let delta = ds
.delta()
.with_begin_version(1)
.with_end_version(3)
.build()
.unwrap();
let stream = delta.get_upserted_rows().await.unwrap();
let result = collect_stream(stream).await;
assert_eq!(result.num_rows(), 30);
assert!(result.column_by_name(ROW_ID).is_some());
assert!(result.column_by_name(ROW_CREATED_AT_VERSION).is_some());
assert!(result.column_by_name(ROW_LAST_UPDATED_AT_VERSION).is_some());
let created_at = result[ROW_CREATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let updated_at = result[ROW_LAST_UPDATED_AT_VERSION]
.as_primitive::<UInt64Type>()
.values();
let keys = result["key"].as_primitive::<Int32Type>().values();
for i in 0..result.num_rows() {
let key = keys[i];
if key < 10 {
assert_eq!(created_at[i], 1);
assert_eq!(updated_at[i], 3);
} else {
assert!((50..70).contains(&key));
assert_eq!(created_at[i], 2);
assert_eq!(updated_at[i], 2);
}
}
}
#[tokio::test]
async fn test_build_with_date_window_basic() {
MockClock::set_system_time(std::time::Duration::from_secs(10));
let ds = create_test_dataset(50, 1, "v1", true).await;
assert_eq!(ds.version().version, 1);
MockClock::set_system_time(std::time::Duration::from_secs(20));
let ds = update_where(ds, "key < 10", "v2").await;
assert_eq!(ds.version().version, 2);
MockClock::set_system_time(std::time::Duration::from_secs(30));
let ds = update_where(ds, "key >= 10 AND key < 20", "v3").await;
assert_eq!(ds.version().version, 3);
let begin_ts = chrono::DateTime::<chrono::Utc>::from_timestamp(15, 0).unwrap();
let end_ts = chrono::DateTime::<chrono::Utc>::from_timestamp(25, 0).unwrap();
let delta = ds
.delta()
.with_begin_date(begin_ts)
.with_end_date(end_ts)
.build()
.unwrap();
let txs = delta.list_transactions().await.unwrap();
assert_eq!(txs.len(), 1);
}
#[tokio::test]
async fn test_build_with_date_window_edges() {
MockClock::set_system_time(std::time::Duration::from_secs(100));
let ds = create_test_dataset(10, 1, "v1", true).await;
assert_eq!(ds.version().version, 1);
MockClock::set_system_time(std::time::Duration::from_secs(200));
let ds = update_where(ds, "key < 5", "v2").await;
assert_eq!(ds.version().version, 2);
let begin_ts = chrono::DateTime::<chrono::Utc>::from_timestamp(50, 0).unwrap();
let end_ts = chrono::DateTime::<chrono::Utc>::from_timestamp(250, 0).unwrap();
let delta = ds
.delta()
.with_begin_date(begin_ts)
.with_end_date(end_ts)
.build()
.unwrap();
let txs = delta.list_transactions().await.unwrap();
assert_eq!(txs.len(), 2);
}
#[tokio::test]
async fn test_build_with_date_open_end_uses_latest() {
MockClock::set_system_time(std::time::Duration::from_secs(10));
let ds = create_test_dataset(20, 1, "v1", true).await;
assert_eq!(ds.version().version, 1);
MockClock::set_system_time(std::time::Duration::from_secs(20));
let ds = update_where(ds, "key < 5", "v2").await;
assert_eq!(ds.version().version, 2);
MockClock::set_system_time(std::time::Duration::from_secs(30));
let ds = update_where(ds, "key >= 5 AND key < 10", "v3").await;
assert_eq!(ds.version().version, 3);
let begin_ts = chrono::DateTime::<chrono::Utc>::from_timestamp(15, 0).unwrap();
let delta = ds.delta().with_begin_date(begin_ts).build().unwrap();
let txs = delta.list_transactions().await.unwrap();
assert_eq!(txs.len(), 2);
}
}