use std::collections::HashMap;
use std::ops::Deref;
use std::sync::{Arc, RwLock};
use futures::StreamExt;
use futures::channel::mpsc::{Sender, channel};
use tokio::sync::Notify;
use crate::runtime::spawn;
use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
use crate::spec::{DataContentType, DataFile, Struct};
#[derive(Debug, Clone)]
pub(crate) struct DeleteFileIndex {
state: Arc<RwLock<DeleteFileIndexState>>,
}
#[derive(Debug)]
enum DeleteFileIndexState {
Populating(Arc<Notify>),
Populated(PopulatedDeleteFileIndex),
}
#[derive(Debug)]
struct PopulatedDeleteFileIndex {
#[allow(dead_code)]
global_equality_deletes: Vec<Arc<DeleteFileContext>>,
eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
}
impl DeleteFileIndex {
pub(crate) fn new() -> (DeleteFileIndex, Sender<DeleteFileContext>) {
let (tx, rx) = channel(10);
let notify = Arc::new(Notify::new());
let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating(
notify.clone(),
)));
let delete_file_stream = rx.boxed();
spawn({
let state = state.clone();
async move {
let delete_files: Vec<DeleteFileContext> =
delete_file_stream.collect::<Vec<_>>().await;
let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files);
{
let mut guard = state.write().unwrap();
*guard = DeleteFileIndexState::Populated(populated_delete_file_index);
}
notify.notify_waiters();
}
});
(DeleteFileIndex { state }, tx)
}
pub(crate) async fn get_deletes_for_data_file(
&self,
data_file: &DataFile,
seq_num: Option<i64>,
) -> Vec<FileScanTaskDeleteFile> {
let notifier = {
let guard = self.state.read().unwrap();
match *guard {
DeleteFileIndexState::Populating(ref notifier) => notifier.clone(),
DeleteFileIndexState::Populated(ref index) => {
return index.get_deletes_for_data_file(data_file, seq_num);
}
}
};
notifier.notified().await;
let guard = self.state.read().unwrap();
match guard.deref() {
DeleteFileIndexState::Populated(index) => {
index.get_deletes_for_data_file(data_file, seq_num)
}
_ => unreachable!("Cannot be any other state than loaded"),
}
}
}
impl PopulatedDeleteFileIndex {
fn new(files: Vec<DeleteFileContext>) -> PopulatedDeleteFileIndex {
let mut eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
HashMap::default();
let mut pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
HashMap::default();
let mut global_equality_deletes: Vec<Arc<DeleteFileContext>> = vec![];
files.into_iter().for_each(|ctx| {
let arc_ctx = Arc::new(ctx);
let partition = arc_ctx.manifest_entry.data_file().partition();
if partition.fields().is_empty() {
if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes {
global_equality_deletes.push(arc_ctx);
return;
}
}
let destination_map = match arc_ctx.manifest_entry.content_type() {
DataContentType::PositionDeletes => &mut pos_deletes_by_partition,
DataContentType::EqualityDeletes => &mut eq_deletes_by_partition,
_ => unreachable!(),
};
destination_map
.entry(partition.clone())
.and_modify(|entry| {
entry.push(arc_ctx.clone());
})
.or_insert(vec![arc_ctx.clone()]);
});
PopulatedDeleteFileIndex {
global_equality_deletes,
eq_deletes_by_partition,
pos_deletes_by_partition,
}
}
fn get_deletes_for_data_file(
&self,
data_file: &DataFile,
seq_num: Option<i64>,
) -> Vec<FileScanTaskDeleteFile> {
let mut results = vec![];
self.global_equality_deletes
.iter()
.filter(|&delete| {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
.unwrap_or_else(|| true)
})
.for_each(|delete| results.push(delete.as_ref().into()));
if let Some(deletes) = self.eq_deletes_by_partition.get(data_file.partition()) {
deletes
.iter()
.filter(|&delete| {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
.unwrap_or_else(|| true)
&& data_file.partition_spec_id == delete.partition_spec_id
})
.for_each(|delete| results.push(delete.as_ref().into()));
}
if let Some(deletes) = self.pos_deletes_by_partition.get(data_file.partition()) {
deletes
.iter()
.filter(|&delete| {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num))
.unwrap_or_else(|| true)
&& data_file.partition_spec_id == delete.partition_spec_id
})
.for_each(|delete| results.push(delete.as_ref().into()));
}
results
}
}
#[cfg(test)]
mod tests {
use uuid::Uuid;
use super::*;
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, Literal, ManifestEntry, ManifestStatus,
Struct,
};
#[test]
fn test_delete_file_index_unpartitioned() {
let deletes: Vec<ManifestEntry> = vec![
build_added_manifest_entry(4, &build_unpartitioned_eq_delete()),
build_added_manifest_entry(6, &build_unpartitioned_eq_delete()),
build_added_manifest_entry(5, &build_unpartitioned_pos_delete()),
build_added_manifest_entry(6, &build_unpartitioned_pos_delete()),
];
let delete_file_paths: Vec<String> = deletes
.iter()
.map(|file| file.file_path().to_string())
.collect();
let delete_contexts: Vec<DeleteFileContext> = deletes
.into_iter()
.map(|entry| DeleteFileContext {
manifest_entry: entry.into(),
partition_spec_id: 0,
})
.collect();
let delete_file_index = PopulatedDeleteFileIndex::new(delete_contexts);
let data_file = build_unpartitioned_data_file();
let delete_files_to_apply_for_seq_0 =
delete_file_index.get_deletes_for_data_file(&data_file, Some(0));
assert_eq!(delete_files_to_apply_for_seq_0.len(), 4);
let delete_files_to_apply_for_seq_3 =
delete_file_index.get_deletes_for_data_file(&data_file, Some(3));
assert_eq!(delete_files_to_apply_for_seq_3.len(), 4);
let delete_files_to_apply_for_seq_4 =
delete_file_index.get_deletes_for_data_file(&data_file, Some(4));
let actual_paths_to_apply_for_seq_4: Vec<String> = delete_files_to_apply_for_seq_4
.into_iter()
.map(|file| file.file_path)
.collect();
assert_eq!(
actual_paths_to_apply_for_seq_4,
delete_file_paths[delete_file_paths.len() - 3..]
);
let delete_files_to_apply_for_seq_5 =
delete_file_index.get_deletes_for_data_file(&data_file, Some(5));
let actual_paths_to_apply_for_seq_5: Vec<String> = delete_files_to_apply_for_seq_5
.into_iter()
.map(|file| file.file_path)
.collect();
assert_eq!(
actual_paths_to_apply_for_seq_5,
delete_file_paths[delete_file_paths.len() - 3..]
);
let delete_files_to_apply_for_seq_6 =
delete_file_index.get_deletes_for_data_file(&data_file, Some(6));
let actual_paths_to_apply_for_seq_6: Vec<String> = delete_files_to_apply_for_seq_6
.into_iter()
.map(|file| file.file_path)
.collect();
assert_eq!(
actual_paths_to_apply_for_seq_6,
delete_file_paths[delete_file_paths.len() - 1..]
);
let partitioned_file =
build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(100))]), 1);
let delete_files_to_apply_for_partitioned_file =
delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(0));
let actual_paths_to_apply_for_partitioned_file: Vec<String> =
delete_files_to_apply_for_partitioned_file
.into_iter()
.map(|file| file.file_path)
.collect();
assert_eq!(
actual_paths_to_apply_for_partitioned_file,
delete_file_paths[..2]
);
}
#[test]
fn test_delete_file_index_partitioned() {
let partition_one = Struct::from_iter([Some(Literal::long(100))]);
let spec_id = 1;
let deletes: Vec<ManifestEntry> = vec![
build_added_manifest_entry(4, &build_partitioned_eq_delete(&partition_one, spec_id)),
build_added_manifest_entry(6, &build_partitioned_eq_delete(&partition_one, spec_id)),
build_added_manifest_entry(5, &build_partitioned_pos_delete(&partition_one, spec_id)),
build_added_manifest_entry(6, &build_partitioned_pos_delete(&partition_one, spec_id)),
];
let delete_file_paths: Vec<String> = deletes
.iter()
.map(|file| file.file_path().to_string())
.collect();
let delete_contexts: Vec<DeleteFileContext> = deletes
.into_iter()
.map(|entry| DeleteFileContext {
manifest_entry: entry.into(),
partition_spec_id: spec_id,
})
.collect();
let delete_file_index = PopulatedDeleteFileIndex::new(delete_contexts);
let partitioned_file =
build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(100))]), spec_id);
let delete_files_to_apply_for_seq_0 =
delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(0));
assert_eq!(delete_files_to_apply_for_seq_0.len(), 4);
let delete_files_to_apply_for_seq_3 =
delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(3));
assert_eq!(delete_files_to_apply_for_seq_3.len(), 4);
let delete_files_to_apply_for_seq_4 =
delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(4));
let actual_paths_to_apply_for_seq_4: Vec<String> = delete_files_to_apply_for_seq_4
.into_iter()
.map(|file| file.file_path)
.collect();
assert_eq!(
actual_paths_to_apply_for_seq_4,
delete_file_paths[delete_file_paths.len() - 3..]
);
let delete_files_to_apply_for_seq_5 =
delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(5));
let actual_paths_to_apply_for_seq_5: Vec<String> = delete_files_to_apply_for_seq_5
.into_iter()
.map(|file| file.file_path)
.collect();
assert_eq!(
actual_paths_to_apply_for_seq_5,
delete_file_paths[delete_file_paths.len() - 3..]
);
let delete_files_to_apply_for_seq_6 =
delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(6));
let actual_paths_to_apply_for_seq_6: Vec<String> = delete_files_to_apply_for_seq_6
.into_iter()
.map(|file| file.file_path)
.collect();
assert_eq!(
actual_paths_to_apply_for_seq_6,
delete_file_paths[delete_file_paths.len() - 1..]
);
let partitioned_second_file =
build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(200))]), 1);
let delete_files_to_apply_for_different_partition =
delete_file_index.get_deletes_for_data_file(&partitioned_second_file, Some(0));
let actual_paths_to_apply_for_different_partition: Vec<String> =
delete_files_to_apply_for_different_partition
.into_iter()
.map(|file| file.file_path)
.collect();
assert!(actual_paths_to_apply_for_different_partition.is_empty());
let partitioned_different_spec = build_partitioned_data_file(&partition_one, 2);
let delete_files_to_apply_for_different_spec =
delete_file_index.get_deletes_for_data_file(&partitioned_different_spec, Some(0));
let actual_paths_to_apply_for_different_spec: Vec<String> =
delete_files_to_apply_for_different_spec
.into_iter()
.map(|file| file.file_path)
.collect();
assert!(actual_paths_to_apply_for_different_spec.is_empty());
}
fn build_unpartitioned_eq_delete() -> DataFile {
build_partitioned_eq_delete(&Struct::empty(), 0)
}
fn build_partitioned_eq_delete(partition: &Struct, spec_id: i32) -> DataFile {
DataFileBuilder::default()
.file_path(format!("{}_equality_delete.parquet", Uuid::new_v4()))
.file_format(DataFileFormat::Parquet)
.content(DataContentType::EqualityDeletes)
.equality_ids(Some(vec![1]))
.record_count(1)
.partition(partition.clone())
.partition_spec_id(spec_id)
.file_size_in_bytes(100)
.build()
.unwrap()
}
fn build_unpartitioned_pos_delete() -> DataFile {
build_partitioned_pos_delete(&Struct::empty(), 0)
}
fn build_partitioned_pos_delete(partition: &Struct, spec_id: i32) -> DataFile {
DataFileBuilder::default()
.file_path(format!("{}-pos-delete.parquet", Uuid::new_v4()))
.file_format(DataFileFormat::Parquet)
.content(DataContentType::PositionDeletes)
.record_count(1)
.referenced_data_file(Some("/some-data-file.parquet".to_string()))
.partition(partition.clone())
.partition_spec_id(spec_id)
.file_size_in_bytes(100)
.build()
.unwrap()
}
fn build_unpartitioned_data_file() -> DataFile {
DataFileBuilder::default()
.file_path(format!("{}-data.parquet", Uuid::new_v4()))
.file_format(DataFileFormat::Parquet)
.content(DataContentType::Data)
.record_count(100)
.partition(Struct::empty())
.partition_spec_id(0)
.file_size_in_bytes(100)
.build()
.unwrap()
}
fn build_partitioned_data_file(partition_value: &Struct, spec_id: i32) -> DataFile {
DataFileBuilder::default()
.file_path(format!("{}-data.parquet", Uuid::new_v4()))
.file_format(DataFileFormat::Parquet)
.content(DataContentType::Data)
.record_count(100)
.partition(partition_value.clone())
.partition_spec_id(spec_id)
.file_size_in_bytes(100)
.build()
.unwrap()
}
fn build_added_manifest_entry(data_seq_number: i64, file: &DataFile) -> ManifestEntry {
ManifestEntry::builder()
.status(ManifestStatus::Added)
.sequence_number(data_seq_number)
.data_file(file.clone())
.build()
}
}