use std::{collections::HashSet, sync::Arc};
use lance_core::{
datatypes::Schema,
format::{
pb::{self, IndexMetadata},
Fragment, Index, Manifest,
},
io::{object_store::ObjectStore, reader::read_manifest, reader::read_manifest_indexes},
Error, Result,
};
use object_store::path::Path;
use roaring::RoaringBitmap;
use snafu::{location, Location};
use uuid::Uuid;
use super::{feature_flags::apply_feature_flags, ManifestWriteConfig};
use crate::utils::temporal::timestamp_to_nanos;
#[derive(Debug, Clone)]
pub struct Transaction {
pub read_version: u64,
pub uuid: String,
pub operation: Operation,
pub tag: Option<String>,
}
#[derive(Debug, Clone)]
pub enum Operation {
Append { fragments: Vec<Fragment> },
Delete {
updated_fragments: Vec<Fragment>,
deleted_fragment_ids: Vec<u64>,
predicate: String,
},
Overwrite {
fragments: Vec<Fragment>,
schema: Schema,
},
CreateIndex {
new_indices: Vec<Index>,
removed_indices: Vec<Index>,
},
Rewrite {
groups: Vec<RewriteGroup>,
rewritten_indices: Vec<RewrittenIndex>,
},
Merge {
fragments: Vec<Fragment>,
schema: Schema,
},
Restore { version: u64 },
ReserveFragments { num_fragments: u32 },
}
#[derive(Debug, Clone)]
pub struct RewrittenIndex {
pub old_id: Uuid,
pub new_id: Uuid,
}
#[derive(Debug, Clone)]
pub struct RewriteGroup {
pub old_fragments: Vec<Fragment>,
pub new_fragments: Vec<Fragment>,
}
impl Operation {
fn modified_fragment_ids(&self) -> Box<dyn Iterator<Item = u64> + '_> {
match self {
Self::Append { .. }
| Self::Overwrite { .. }
| Self::CreateIndex { .. }
| Self::ReserveFragments { .. }
| Self::Restore { .. } => Box::new(std::iter::empty()),
Self::Delete {
updated_fragments,
deleted_fragment_ids,
..
} => Box::new(
updated_fragments
.iter()
.map(|f| f.id)
.chain(deleted_fragment_ids.iter().copied()),
),
Self::Rewrite { groups, .. } => Box::new(
groups
.iter()
.flat_map(|f| f.old_fragments.iter().map(|f| f.id)),
),
Self::Merge { fragments, .. } => Box::new(fragments.iter().map(|f| f.id)),
}
}
fn modifies_same_ids(&self, other: &Self) -> bool {
let self_ids = self.modified_fragment_ids().collect::<HashSet<_>>();
let mut other_ids = other.modified_fragment_ids();
other_ids.any(|id| self_ids.contains(&id))
}
pub fn name(&self) -> &str {
match self {
Self::Append { .. } => "Append",
Self::Delete { .. } => "Delete",
Self::Overwrite { .. } => "Overwrite",
Self::CreateIndex { .. } => "CreateIndex",
Self::Rewrite { .. } => "Rewrite",
Self::Merge { .. } => "Merge",
Self::ReserveFragments { .. } => "ReserveFragments",
Self::Restore { .. } => "Restore",
}
}
}
impl Transaction {
pub fn new(read_version: u64, operation: Operation, tag: Option<String>) -> Self {
let uuid = uuid::Uuid::new_v4().hyphenated().to_string();
Self {
read_version,
uuid,
operation,
tag,
}
}
pub fn conflicts_with(&self, other: &Self) -> bool {
match &self.operation {
Operation::Append { .. } => match &other.operation {
Operation::Append { .. } => false,
Operation::Rewrite { .. } => false,
Operation::CreateIndex { .. } => false,
Operation::Delete { .. } => false,
Operation::ReserveFragments { .. } => false,
_ => true,
},
Operation::Rewrite { .. } => match &other.operation {
Operation::Append { .. } => false,
Operation::ReserveFragments { .. } => false,
Operation::Delete { .. } => {
self.operation.modifies_same_ids(&other.operation)
}
Operation::Rewrite { .. } => {
self.operation.modifies_same_ids(&other.operation)
}
_ => true,
},
Operation::Overwrite { .. } => false,
Operation::Restore { .. } => false,
Operation::ReserveFragments { .. } => matches!(
&other.operation,
Operation::Overwrite { .. } | Operation::Restore { .. }
),
Operation::CreateIndex { .. } => match &other.operation {
Operation::Append { .. } => false,
Operation::CreateIndex { .. } => false,
Operation::Delete { .. } => false,
Operation::Merge { .. } => false,
Operation::ReserveFragments { .. } => false,
Operation::Rewrite { .. } => true,
_ => true,
},
Operation::Delete { .. } => match &other.operation {
Operation::CreateIndex { .. } => false,
Operation::ReserveFragments { .. } => false,
Operation::Delete { .. } => {
self.operation.modifies_same_ids(&other.operation)
}
Operation::Rewrite { .. } => {
self.operation.modifies_same_ids(&other.operation)
}
_ => true,
},
Operation::Merge { .. } => !matches!(
&other.operation,
Operation::CreateIndex { .. } | Operation::ReserveFragments { .. }
),
}
}
fn fragments_with_ids<'a, T>(
new_fragments: T,
fragment_id: &'a mut u64,
) -> impl Iterator<Item = Fragment> + 'a
where
T: IntoIterator<Item = Fragment> + 'a,
{
new_fragments.into_iter().map(move |mut f| {
if f.id == 0 {
f.id = *fragment_id;
*fragment_id += 1;
}
f
})
}
pub(crate) async fn restore_old_manifest(
object_store: &ObjectStore,
base_path: &Path,
version: u64,
config: &ManifestWriteConfig,
tx_path: &str,
) -> Result<(Manifest, Vec<Index>)> {
let path = object_store
.commit_handler
.resolve_version(base_path, version, &object_store.inner)
.await?;
let mut manifest = read_manifest(object_store, &path).await?;
manifest.set_timestamp(timestamp_to_nanos(config.timestamp));
manifest.transaction_file = Some(tx_path.to_string());
let indices = read_manifest_indexes(object_store, &path, &manifest).await?;
Ok((manifest, indices))
}
pub(crate) fn build_manifest(
&self,
current_manifest: Option<&Manifest>,
current_indices: Vec<Index>,
transaction_file_path: &str,
config: &ManifestWriteConfig,
) -> Result<(Manifest, Vec<Index>)> {
let schema = match self.operation {
Operation::Overwrite { ref schema, .. } => schema.clone(),
Operation::Merge { ref schema, .. } => schema.clone(),
_ => {
if let Some(current_manifest) = current_manifest {
current_manifest.schema.clone()
} else {
return Err(Error::Internal {
message: "Cannot create a new dataset without a schema".to_string(),
location: location!(),
});
}
}
};
let mut fragment_id = if matches!(self.operation, Operation::Overwrite { .. }) {
0
} else {
current_manifest
.and_then(|m| m.max_fragment_id())
.map(|id| id + 1)
.unwrap_or(0)
};
let mut final_fragments = Vec::new();
let mut final_indices = current_indices;
let maybe_existing_fragments =
current_manifest
.map(|m| m.fragments.as_ref())
.ok_or_else(|| Error::Internal {
message: format!(
"No current manifest was provided while building manifest for operation {}",
self.operation.name()
),
location: location!(),
});
match &self.operation {
Operation::Append { ref fragments } => {
final_fragments.extend(maybe_existing_fragments?.clone());
final_fragments.extend(Self::fragments_with_ids(
fragments.clone(),
&mut fragment_id,
));
}
Operation::Delete {
ref updated_fragments,
ref deleted_fragment_ids,
..
} => {
final_fragments.extend(maybe_existing_fragments?.clone());
final_fragments.retain(|f| !deleted_fragment_ids.contains(&f.id));
final_fragments.iter_mut().for_each(|f| {
for updated in updated_fragments {
if updated.id == f.id {
*f = updated.clone();
}
}
});
}
Operation::Overwrite { ref fragments, .. } => {
final_fragments.extend(Self::fragments_with_ids(
fragments.clone(),
&mut fragment_id,
));
final_indices = Vec::new();
}
Operation::Rewrite {
ref groups,
ref rewritten_indices,
} => {
final_fragments.extend(maybe_existing_fragments?.clone());
let current_version = current_manifest.map(|m| m.version).unwrap_or_default();
Self::handle_rewrite_fragments(
&mut final_fragments,
groups,
&mut fragment_id,
current_version,
)?;
Self::handle_rewrite_indices(&mut final_indices, rewritten_indices, groups)?;
}
Operation::CreateIndex {
new_indices,
removed_indices,
} => {
final_fragments.extend(maybe_existing_fragments?.clone());
final_indices.retain(|existing_index| {
!new_indices
.iter()
.any(|new_index| new_index.name == existing_index.name)
&& !removed_indices
.iter()
.any(|old_index| old_index.uuid == existing_index.uuid)
});
final_indices.extend(new_indices.clone());
}
Operation::ReserveFragments { .. } => {
final_fragments.extend(maybe_existing_fragments?.clone());
}
Operation::Merge { ref fragments, .. } => {
final_fragments.extend(fragments.clone());
}
Operation::Restore { .. } => {
unreachable!()
}
};
let mut manifest = if let Some(current_manifest) = current_manifest {
Manifest::new_from_previous(current_manifest, &schema, Arc::new(final_fragments))
} else {
Manifest::new(&schema, Arc::new(final_fragments))
};
manifest.tag = self.tag.clone();
if config.auto_set_feature_flags {
apply_feature_flags(&mut manifest);
}
manifest.set_timestamp(timestamp_to_nanos(config.timestamp));
manifest.update_max_fragment_id();
if let Operation::ReserveFragments { num_fragments } = self.operation {
manifest.max_fragment_id += num_fragments;
}
manifest.transaction_file = Some(transaction_file_path.to_string());
Ok((manifest, final_indices))
}
fn recalculate_fragment_bitmap(
old: &RoaringBitmap,
groups: &[RewriteGroup],
) -> Result<RoaringBitmap> {
let mut new_bitmap = old.clone();
for group in groups {
let any_in_index = group
.old_fragments
.iter()
.any(|frag| old.contains(frag.id as u32));
let all_in_index = group
.old_fragments
.iter()
.all(|frag| old.contains(frag.id as u32));
if any_in_index {
if all_in_index {
for frag_id in group.old_fragments.iter().map(|frag| frag.id as u32) {
new_bitmap.remove(frag_id);
}
new_bitmap.extend(group.new_fragments.iter().map(|frag| frag.id as u32));
} else {
return Err(Error::invalid_input("The compaction plan included a rewrite group that was a split of indexed and non-indexed data", location!()));
}
}
}
Ok(new_bitmap)
}
fn handle_rewrite_indices(
indices: &mut [Index],
rewritten_indices: &[RewrittenIndex],
groups: &[RewriteGroup],
) -> Result<()> {
let mut modified_indices = HashSet::new();
for rewritten_index in rewritten_indices {
if !modified_indices.insert(rewritten_index.old_id) {
return Err(Error::invalid_input(format!("An invalid compaction plan must have been generated because multiple tasks modified the same index: {}", rewritten_index.old_id), location!()));
}
let index = indices
.iter_mut()
.find(|idx| idx.uuid == rewritten_index.old_id)
.ok_or_else(|| {
Error::invalid_input(
format!(
"Invalid compaction plan refers to index {} which does not exist",
rewritten_index.old_id
),
location!(),
)
})?;
index.fragment_bitmap = Some(Self::recalculate_fragment_bitmap(
index.fragment_bitmap.as_ref().ok_or_else(|| {
Error::invalid_input(
format!(
"Cannot rewrite index {} which did not store fragment bitmap",
index.uuid
),
location!(),
)
})?,
groups,
)?);
index.uuid = rewritten_index.new_id;
}
Ok(())
}
fn handle_rewrite_fragments(
final_fragments: &mut Vec<Fragment>,
groups: &[RewriteGroup],
fragment_id: &mut u64,
version: u64,
) -> Result<()> {
for group in groups {
let replace_range = {
let start = final_fragments.iter().enumerate().find(|(_, f)| f.id == group.old_fragments[0].id)
.ok_or_else(|| Error::CommitConflict { version, source:
format!("dataset does not contain a fragment a rewrite operation wants to replace: id={}", group.old_fragments[0].id).into() , location:location!()})?.0;
let mut i = 1;
loop {
if i == group.old_fragments.len() {
break Some(start..start + i);
}
if final_fragments[start + i].id != group.old_fragments[i].id {
break None;
}
i += 1;
}
};
let new_fragments = Self::fragments_with_ids(group.new_fragments.clone(), fragment_id);
if let Some(replace_range) = replace_range {
final_fragments.splice(replace_range, new_fragments);
} else {
for fragment in group.old_fragments.iter() {
final_fragments.retain(|f| f.id != fragment.id);
}
final_fragments.extend(new_fragments);
}
}
Ok(())
}
}
impl TryFrom<&pb::Transaction> for Transaction {
type Error = Error;
fn try_from(message: &pb::Transaction) -> Result<Self> {
let operation = match &message.operation {
Some(pb::transaction::Operation::Append(pb::transaction::Append { fragments })) => {
Operation::Append {
fragments: fragments.iter().map(Fragment::from).collect(),
}
}
Some(pb::transaction::Operation::Delete(pb::transaction::Delete {
updated_fragments,
deleted_fragment_ids,
predicate,
})) => Operation::Delete {
updated_fragments: updated_fragments.iter().map(Fragment::from).collect(),
deleted_fragment_ids: deleted_fragment_ids.clone(),
predicate: predicate.clone(),
},
Some(pb::transaction::Operation::Overwrite(pb::transaction::Overwrite {
fragments,
schema,
schema_metadata: _schema_metadata, })) => Operation::Overwrite {
fragments: fragments.iter().map(Fragment::from).collect(),
schema: Schema::from(schema),
},
Some(pb::transaction::Operation::ReserveFragments(
pb::transaction::ReserveFragments { num_fragments },
)) => Operation::ReserveFragments {
num_fragments: *num_fragments,
},
Some(pb::transaction::Operation::Rewrite(pb::transaction::Rewrite {
old_fragments,
new_fragments,
groups,
rewritten_indices,
})) => {
let groups = if !groups.is_empty() {
groups
.iter()
.map(RewriteGroup::try_from)
.collect::<Result<_>>()?
} else {
vec![RewriteGroup {
old_fragments: old_fragments.iter().map(Fragment::from).collect(),
new_fragments: new_fragments.iter().map(Fragment::from).collect(),
}]
};
let rewritten_indices = rewritten_indices
.iter()
.map(RewrittenIndex::try_from)
.collect::<Result<_>>()?;
Operation::Rewrite {
groups,
rewritten_indices,
}
}
Some(pb::transaction::Operation::CreateIndex(pb::transaction::CreateIndex {
new_indices,
removed_indices,
})) => Operation::CreateIndex {
new_indices: new_indices
.iter()
.map(Index::try_from)
.collect::<Result<_>>()?,
removed_indices: removed_indices
.iter()
.map(Index::try_from)
.collect::<Result<_>>()?,
},
Some(pb::transaction::Operation::Merge(pb::transaction::Merge {
fragments,
schema,
schema_metadata: _schema_metadata, })) => Operation::Merge {
fragments: fragments.iter().map(Fragment::from).collect(),
schema: Schema::from(schema),
},
Some(pb::transaction::Operation::Restore(pb::transaction::Restore { version })) => {
Operation::Restore { version: *version }
}
None => {
return Err(Error::Internal {
message: "Transaction message did not contain an operation".to_string(),
location: location!(),
});
}
};
Ok(Self {
read_version: message.read_version,
uuid: message.uuid.clone(),
operation,
tag: if message.tag.is_empty() {
None
} else {
Some(message.tag.clone())
},
})
}
}
impl TryFrom<&pb::transaction::rewrite::RewrittenIndex> for RewrittenIndex {
type Error = Error;
fn try_from(message: &pb::transaction::rewrite::RewrittenIndex) -> Result<Self> {
Ok(Self {
old_id: message
.old_id
.as_ref()
.map(Uuid::try_from)
.ok_or_else(|| Error::IO {
message: "required field (old_id) missing from message".to_string(),
location: location!(),
})??,
new_id: message
.new_id
.as_ref()
.map(Uuid::try_from)
.ok_or_else(|| Error::IO {
message: "required field (new_id) missing from message".to_string(),
location: location!(),
})??,
})
}
}
impl TryFrom<&pb::transaction::rewrite::RewriteGroup> for RewriteGroup {
type Error = Error;
fn try_from(message: &pb::transaction::rewrite::RewriteGroup) -> Result<Self> {
Ok(Self {
old_fragments: message.old_fragments.iter().map(Fragment::from).collect(),
new_fragments: message.new_fragments.iter().map(Fragment::from).collect(),
})
}
}
impl From<&Transaction> for pb::Transaction {
fn from(value: &Transaction) -> Self {
let operation = match &value.operation {
Operation::Append { fragments } => {
pb::transaction::Operation::Append(pb::transaction::Append {
fragments: fragments.iter().map(pb::DataFragment::from).collect(),
})
}
Operation::Delete {
updated_fragments,
deleted_fragment_ids,
predicate,
} => pb::transaction::Operation::Delete(pb::transaction::Delete {
updated_fragments: updated_fragments
.iter()
.map(pb::DataFragment::from)
.collect(),
deleted_fragment_ids: deleted_fragment_ids.clone(),
predicate: predicate.clone(),
}),
Operation::Overwrite { fragments, schema } => {
pb::transaction::Operation::Overwrite(pb::transaction::Overwrite {
fragments: fragments.iter().map(pb::DataFragment::from).collect(),
schema: schema.into(),
schema_metadata: Default::default(), })
}
Operation::ReserveFragments { num_fragments } => {
pb::transaction::Operation::ReserveFragments(pb::transaction::ReserveFragments {
num_fragments: *num_fragments,
})
}
Operation::Rewrite {
groups,
rewritten_indices,
} => pb::transaction::Operation::Rewrite(pb::transaction::Rewrite {
groups: groups
.iter()
.map(pb::transaction::rewrite::RewriteGroup::from)
.collect(),
rewritten_indices: rewritten_indices
.iter()
.map(|rewritten| rewritten.into())
.collect(),
..Default::default()
}),
Operation::CreateIndex {
new_indices,
removed_indices,
} => pb::transaction::Operation::CreateIndex(pb::transaction::CreateIndex {
new_indices: new_indices.iter().map(IndexMetadata::from).collect(),
removed_indices: removed_indices.iter().map(IndexMetadata::from).collect(),
}),
Operation::Merge { fragments, schema } => {
pb::transaction::Operation::Merge(pb::transaction::Merge {
fragments: fragments.iter().map(pb::DataFragment::from).collect(),
schema: schema.into(),
schema_metadata: Default::default(), })
}
Operation::Restore { version } => {
pb::transaction::Operation::Restore(pb::transaction::Restore { version: *version })
}
};
Self {
read_version: value.read_version,
uuid: value.uuid.clone(),
operation: Some(operation),
tag: value.tag.clone().unwrap_or("".to_string()),
}
}
}
impl From<&RewrittenIndex> for pb::transaction::rewrite::RewrittenIndex {
fn from(value: &RewrittenIndex) -> Self {
Self {
old_id: Some((&value.old_id).into()),
new_id: Some((&value.new_id).into()),
}
}
}
impl From<&RewriteGroup> for pb::transaction::rewrite::RewriteGroup {
fn from(value: &RewriteGroup) -> Self {
Self {
old_fragments: value
.old_fragments
.iter()
.map(pb::DataFragment::from)
.collect(),
new_fragments: value
.new_fragments
.iter()
.map(pb::DataFragment::from)
.collect(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_conflicts() {
let index0 = Index {
uuid: uuid::Uuid::new_v4(),
name: "test".to_string(),
fields: vec![0],
dataset_version: 1,
fragment_bitmap: None,
};
let fragment0 = Fragment::new(0);
let fragment1 = Fragment::new(1);
let fragment2 = Fragment::new(2);
let other_operations = [
Operation::Append {
fragments: vec![fragment0.clone()],
},
Operation::CreateIndex {
new_indices: vec![index0.clone()],
removed_indices: vec![index0.clone()],
},
Operation::Delete {
updated_fragments: vec![fragment0.clone()],
deleted_fragment_ids: vec![2],
predicate: "x > 2".to_string(),
},
Operation::Merge {
fragments: vec![fragment0.clone(), fragment2.clone()],
schema: Schema::default(),
},
Operation::Overwrite {
fragments: vec![fragment0.clone(), fragment2.clone()],
schema: Schema::default(),
},
Operation::Rewrite {
groups: vec![RewriteGroup {
old_fragments: vec![fragment0.clone()],
new_fragments: vec![fragment1.clone()],
}],
rewritten_indices: vec![],
},
Operation::ReserveFragments { num_fragments: 3 },
];
let other_transactions = other_operations
.iter()
.map(|op| Transaction::new(0, op.clone(), None))
.collect::<Vec<_>>();
let cases = [
(
Operation::Append {
fragments: vec![fragment0.clone()],
},
[false, false, false, true, true, false, false],
),
(
Operation::Delete {
updated_fragments: vec![fragment1.clone()],
deleted_fragment_ids: vec![],
predicate: "x > 2".to_string(),
},
[true, false, false, true, true, false, false],
),
(
Operation::Delete {
updated_fragments: vec![fragment0.clone(), fragment2.clone()],
deleted_fragment_ids: vec![],
predicate: "x > 2".to_string(),
},
[true, false, true, true, true, true, false],
),
(
Operation::Overwrite {
fragments: vec![fragment0.clone(), fragment2.clone()],
schema: Schema::default(),
},
[false, false, false, false, false, false, false],
),
(
Operation::CreateIndex {
new_indices: vec![index0.clone()],
removed_indices: vec![index0.clone()],
},
[false, false, false, false, true, true, false],
),
(
Operation::Rewrite {
groups: vec![RewriteGroup {
old_fragments: vec![fragment1.clone()],
new_fragments: vec![fragment0.clone()],
}],
rewritten_indices: Vec::new(),
},
[false, true, false, true, true, false, false],
),
(
Operation::Rewrite {
groups: vec![RewriteGroup {
old_fragments: vec![fragment0.clone(), fragment2.clone()],
new_fragments: vec![fragment0.clone()],
}],
rewritten_indices: Vec::new(),
},
[false, true, true, true, true, true, false],
),
(
Operation::Merge {
fragments: vec![fragment0.clone(), fragment2.clone()],
schema: Schema::default(),
},
[true, false, true, true, true, true, false],
),
(
Operation::ReserveFragments { num_fragments: 2 },
[false, false, false, false, true, false, false],
),
];
for (operation, expected_conflicts) in &cases {
let transaction = Transaction::new(0, operation.clone(), None);
for (other, expected_conflict) in other_transactions.iter().zip(expected_conflicts) {
assert_eq!(
transaction.conflicts_with(other),
*expected_conflict,
"Transaction {:?} should {} with {:?}",
transaction,
if *expected_conflict {
"conflict"
} else {
"not conflict"
},
other
);
}
}
}
#[test]
fn test_rewrite_fragments() {
let existing_fragments: Vec<Fragment> = (0..10).map(Fragment::new).collect();
let mut final_fragments = existing_fragments.clone();
let rewrite_groups = vec![
RewriteGroup {
old_fragments: vec![Fragment::new(1), Fragment::new(2)],
new_fragments: vec![Fragment::new(15), Fragment::new(16)],
},
RewriteGroup {
old_fragments: vec![Fragment::new(5), Fragment::new(8)],
new_fragments: vec![Fragment::new(0)],
},
];
let mut fragment_id = 20;
let version = 0;
Transaction::handle_rewrite_fragments(
&mut final_fragments,
&rewrite_groups,
&mut fragment_id,
version,
)
.unwrap();
assert_eq!(fragment_id, 21);
let expected_fragments: Vec<Fragment> = vec![
Fragment::new(0),
Fragment::new(15),
Fragment::new(16),
Fragment::new(3),
Fragment::new(4),
Fragment::new(6),
Fragment::new(7),
Fragment::new(9),
Fragment::new(20),
];
assert_eq!(final_fragments, expected_fragments);
}
}