use std::{collections::HashSet, sync::Arc};
use object_store::path::Path;
use crate::{
datatypes::Schema,
format::Index,
format::{
pb::{self, IndexMetadata},
Fragment, Manifest,
},
io::{read_manifest, reader::read_manifest_indexes, ObjectStore},
};
use super::{feature_flags::apply_feature_flags, ManifestWriteConfig};
use crate::{Error, Result};
#[derive(Debug, Clone)]
pub struct Transaction {
pub read_version: u64,
pub uuid: String,
pub operation: Operation,
pub tag: Option<String>,
}
#[derive(Debug, Clone)]
pub enum Operation {
Append { fragments: Vec<Fragment> },
Delete {
updated_fragments: Vec<Fragment>,
deleted_fragment_ids: Vec<u64>,
predicate: String,
},
Overwrite {
fragments: Vec<Fragment>,
schema: Schema,
},
CreateIndex {
new_indices: Vec<Index>,
},
Rewrite {
old_fragments: Vec<Fragment>,
new_fragments: Vec<Fragment>,
},
Merge {
fragments: Vec<Fragment>,
schema: Schema,
},
Restore { version: u64 },
}
impl Operation {
fn modified_fragment_ids(&self) -> Box<dyn Iterator<Item = u64> + '_> {
match self {
Self::Append { .. }
| Self::Overwrite { .. }
| Self::CreateIndex { .. }
| Self::Restore { .. } => Box::new(std::iter::empty()),
Self::Delete {
updated_fragments,
deleted_fragment_ids,
..
} => Box::new(
updated_fragments
.iter()
.map(|f| f.id)
.chain(deleted_fragment_ids.iter().copied()),
),
Self::Rewrite { old_fragments, .. } => Box::new(old_fragments.iter().map(|f| f.id)),
Self::Merge { fragments, .. } => Box::new(fragments.iter().map(|f| f.id)),
}
}
fn modifies_same_ids(&self, other: &Self) -> bool {
let self_ids = self.modified_fragment_ids().collect::<HashSet<_>>();
let mut other_ids = other.modified_fragment_ids();
other_ids.any(|id| self_ids.contains(&id))
}
pub fn name(&self) -> &str {
match self {
Self::Append { .. } => "Append",
Self::Delete { .. } => "Delete",
Self::Overwrite { .. } => "Overwrite",
Self::CreateIndex { .. } => "CreateIndex",
Self::Rewrite { .. } => "Rewrite",
Self::Merge { .. } => "Merge",
Self::Restore { .. } => "Restore",
}
}
}
impl Transaction {
pub fn new(read_version: u64, operation: Operation, tag: Option<String>) -> Self {
let uuid = uuid::Uuid::new_v4().hyphenated().to_string();
Self {
read_version,
uuid,
operation,
tag,
}
}
pub fn conflicts_with(&self, other: &Self) -> bool {
match &self.operation {
Operation::Append { .. } => match &other.operation {
Operation::Append { .. } => false,
Operation::Rewrite { .. } => false,
Operation::CreateIndex { .. } => false,
Operation::Delete { .. } => false,
_ => true,
},
Operation::Rewrite { .. } => match &other.operation {
Operation::Append { .. } => false,
Operation::Delete { .. } => {
self.operation.modifies_same_ids(&other.operation)
}
Operation::Rewrite { .. } => {
self.operation.modifies_same_ids(&other.operation)
}
_ => true,
},
Operation::Overwrite { .. } => false,
Operation::Restore { .. } => false,
Operation::CreateIndex { .. } => match &other.operation {
Operation::Append { .. } => false,
Operation::CreateIndex { .. } => false,
Operation::Delete { .. } => false,
Operation::Merge { .. } => false,
Operation::Rewrite { .. } => true,
_ => true,
},
Operation::Delete { .. } => match &other.operation {
Operation::CreateIndex { .. } => false,
Operation::Delete { .. } => {
self.operation.modifies_same_ids(&other.operation)
}
Operation::Rewrite { .. } => {
self.operation.modifies_same_ids(&other.operation)
}
_ => true,
},
Operation::Merge { .. } => !matches!(&other.operation, Operation::CreateIndex { .. }),
}
}
fn fragments_with_ids<'a, T>(
new_fragments: T,
fragment_id: &'a mut u64,
) -> impl Iterator<Item = Fragment> + 'a
where
T: IntoIterator<Item = Fragment> + 'a,
{
new_fragments.into_iter().map(|mut f| {
f.id = *fragment_id;
*fragment_id += 1;
f
})
}
pub(crate) async fn restore_old_manifest(
object_store: &ObjectStore,
base_path: &Path,
version: u64,
config: &ManifestWriteConfig,
tx_path: &str,
) -> Result<(Manifest, Vec<Index>)> {
let path = object_store
.commit_handler
.resolve_version(base_path, version, object_store)
.await?;
let mut manifest = read_manifest(object_store, &path).await?;
manifest.set_timestamp(config.timestamp);
manifest.transaction_file = Some(tx_path.to_string());
let indices = read_manifest_indexes(object_store, &path, &manifest).await?;
Ok((manifest, indices))
}
pub(crate) fn build_manifest(
&self,
current_manifest: Option<&Manifest>,
current_indices: Vec<Index>,
transaction_file_path: &str,
config: &ManifestWriteConfig,
) -> Result<(Manifest, Vec<Index>)> {
let schema = match self.operation {
Operation::Overwrite { ref schema, .. } => schema.clone(),
Operation::Merge { ref schema, .. } => schema.clone(),
_ => {
if let Some(current_manifest) = current_manifest {
current_manifest.schema.clone()
} else {
return Err(Error::Internal {
message: "Cannot create a new dataset without a schema".to_string(),
});
}
}
};
let mut fragment_id = if matches!(self.operation, Operation::Overwrite { .. }) {
0
} else {
current_manifest
.and_then(|m| m.max_fragment_id())
.map(|id| id + 1)
.unwrap_or(0)
};
let mut final_fragments = Vec::new();
let mut final_indices = current_indices;
let maybe_existing_fragments =
current_manifest
.map(|m| m.fragments.as_ref())
.ok_or_else(|| Error::Internal {
message: format!(
"No current manifest was provided while building manifest for operation {}",
self.operation.name()
),
});
match &self.operation {
Operation::Append { ref fragments } => {
final_fragments.extend(maybe_existing_fragments?.clone());
final_fragments.extend(Self::fragments_with_ids(
fragments.clone(),
&mut fragment_id,
));
}
Operation::Delete {
ref updated_fragments,
ref deleted_fragment_ids,
..
} => {
final_fragments.extend(maybe_existing_fragments?.clone());
final_fragments.retain(|f| !deleted_fragment_ids.contains(&f.id));
final_fragments.iter_mut().for_each(|f| {
for updated in updated_fragments {
if updated.id == f.id {
*f = updated.clone();
}
}
});
}
Operation::Overwrite { ref fragments, .. } => {
final_fragments.extend(Self::fragments_with_ids(
fragments.clone(),
&mut fragment_id,
));
final_indices = Vec::new();
}
Operation::Rewrite {
ref new_fragments, ..
} => {
final_fragments.extend(Self::fragments_with_ids(
new_fragments.clone(),
&mut fragment_id,
));
}
Operation::CreateIndex { new_indices } => {
final_fragments.extend(maybe_existing_fragments?.clone());
final_indices.retain(|existing_index| {
!new_indices
.iter()
.any(|new_index| new_index.name == existing_index.name)
});
final_indices.extend(new_indices.clone());
}
Operation::Merge { ref fragments, .. } => {
final_fragments.extend(fragments.clone());
}
Operation::Restore { .. } => {
unreachable!()
}
};
let mut manifest = if let Some(current_manifest) = current_manifest {
Manifest::new_from_previous(current_manifest, &schema, Arc::new(final_fragments))
} else {
Manifest::new(&schema, Arc::new(final_fragments))
};
manifest.tag = self.tag.clone();
if config.auto_set_feature_flags {
apply_feature_flags(&mut manifest);
}
manifest.set_timestamp(config.timestamp);
manifest.update_max_fragment_id();
manifest.transaction_file = Some(transaction_file_path.to_string());
Ok((manifest, final_indices))
}
}
impl TryFrom<&pb::Transaction> for Transaction {
type Error = Error;
fn try_from(message: &pb::Transaction) -> Result<Self> {
let operation = match &message.operation {
Some(pb::transaction::Operation::Append(pb::transaction::Append { fragments })) => {
Operation::Append {
fragments: fragments.iter().map(Fragment::from).collect(),
}
}
Some(pb::transaction::Operation::Delete(pb::transaction::Delete {
updated_fragments,
deleted_fragment_ids,
predicate,
})) => Operation::Delete {
updated_fragments: updated_fragments.iter().map(Fragment::from).collect(),
deleted_fragment_ids: deleted_fragment_ids.clone(),
predicate: predicate.clone(),
},
Some(pb::transaction::Operation::Overwrite(pb::transaction::Overwrite {
fragments,
schema,
schema_metadata: _schema_metadata, })) => Operation::Overwrite {
fragments: fragments.iter().map(Fragment::from).collect(),
schema: Schema::from(schema),
},
Some(pb::transaction::Operation::Rewrite(pb::transaction::Rewrite {
old_fragments,
new_fragments,
})) => Operation::Rewrite {
old_fragments: old_fragments.iter().map(Fragment::from).collect(),
new_fragments: new_fragments.iter().map(Fragment::from).collect(),
},
Some(pb::transaction::Operation::CreateIndex(pb::transaction::CreateIndex {
new_indices,
})) => Operation::CreateIndex {
new_indices: new_indices
.iter()
.map(Index::try_from)
.collect::<Result<_>>()?,
},
Some(pb::transaction::Operation::Merge(pb::transaction::Merge {
fragments,
schema,
schema_metadata: _schema_metadata, })) => Operation::Merge {
fragments: fragments.iter().map(Fragment::from).collect(),
schema: Schema::from(schema),
},
Some(pb::transaction::Operation::Restore(pb::transaction::Restore { version })) => {
Operation::Restore { version: *version }
}
None => {
return Err(Error::Internal {
message: "Transaction message did not contain an operation".to_string(),
});
}
};
Ok(Self {
read_version: message.read_version,
uuid: message.uuid.clone(),
operation,
tag: if message.tag.is_empty() {
None
} else {
Some(message.tag.clone())
},
})
}
}
impl From<&Transaction> for pb::Transaction {
fn from(value: &Transaction) -> Self {
let operation = match &value.operation {
Operation::Append { fragments } => {
pb::transaction::Operation::Append(pb::transaction::Append {
fragments: fragments.iter().map(pb::DataFragment::from).collect(),
})
}
Operation::Delete {
updated_fragments,
deleted_fragment_ids,
predicate,
} => pb::transaction::Operation::Delete(pb::transaction::Delete {
updated_fragments: updated_fragments
.iter()
.map(pb::DataFragment::from)
.collect(),
deleted_fragment_ids: deleted_fragment_ids.clone(),
predicate: predicate.clone(),
}),
Operation::Overwrite { fragments, schema } => {
pb::transaction::Operation::Overwrite(pb::transaction::Overwrite {
fragments: fragments.iter().map(pb::DataFragment::from).collect(),
schema: schema.into(),
schema_metadata: Default::default(), })
}
Operation::Rewrite {
old_fragments,
new_fragments,
} => pb::transaction::Operation::Rewrite(pb::transaction::Rewrite {
old_fragments: old_fragments.iter().map(pb::DataFragment::from).collect(),
new_fragments: new_fragments.iter().map(pb::DataFragment::from).collect(),
}),
Operation::CreateIndex { new_indices } => {
pb::transaction::Operation::CreateIndex(pb::transaction::CreateIndex {
new_indices: new_indices.iter().map(IndexMetadata::from).collect(),
})
}
Operation::Merge { fragments, schema } => {
pb::transaction::Operation::Merge(pb::transaction::Merge {
fragments: fragments.iter().map(pb::DataFragment::from).collect(),
schema: schema.into(),
schema_metadata: Default::default(), })
}
Operation::Restore { version } => {
pb::transaction::Operation::Restore(pb::transaction::Restore { version: *version })
}
};
Self {
read_version: value.read_version,
uuid: value.uuid.clone(),
operation: Some(operation),
tag: value.tag.clone().unwrap_or("".to_string()),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_conflicts() {
let index0 = Index::new(uuid::Uuid::new_v4(), "test", &[0], 1);
let fragment0 = Fragment::new(0);
let fragment1 = Fragment::new(1);
let fragment2 = Fragment::new(2);
let other_operations = [
Operation::Append {
fragments: vec![fragment0.clone()],
},
Operation::CreateIndex {
new_indices: vec![index0.clone()],
},
Operation::Delete {
updated_fragments: vec![fragment0.clone()],
deleted_fragment_ids: vec![2],
predicate: "x > 2".to_string(),
},
Operation::Merge {
fragments: vec![fragment0.clone(), fragment2.clone()],
schema: Schema::default(),
},
Operation::Overwrite {
fragments: vec![fragment0.clone(), fragment2.clone()],
schema: Schema::default(),
},
Operation::Rewrite {
old_fragments: vec![fragment0.clone()],
new_fragments: vec![fragment2.clone()],
},
];
let other_transactions = other_operations
.iter()
.map(|op| Transaction::new(0, op.clone(), None))
.collect::<Vec<_>>();
let cases = [
(
Operation::Append {
fragments: vec![fragment0.clone()],
},
[false, false, false, true, true, false],
),
(
Operation::Delete {
updated_fragments: vec![fragment1.clone()],
deleted_fragment_ids: vec![],
predicate: "x > 2".to_string(),
},
[true, false, false, true, true, false],
),
(
Operation::Delete {
updated_fragments: vec![fragment0.clone(), fragment2.clone()],
deleted_fragment_ids: vec![],
predicate: "x > 2".to_string(),
},
[true, false, true, true, true, true],
),
(
Operation::Overwrite {
fragments: vec![fragment0.clone(), fragment2.clone()],
schema: Schema::default(),
},
[false, false, false, false, false, false],
),
(
Operation::CreateIndex {
new_indices: vec![index0],
},
[false, false, false, false, true, true],
),
(
Operation::Rewrite {
old_fragments: vec![fragment1.clone()],
new_fragments: vec![fragment0.clone()],
},
[false, true, false, true, true, false],
),
(
Operation::Rewrite {
old_fragments: vec![fragment0.clone(), fragment2.clone()],
new_fragments: vec![fragment0.clone()],
},
[false, true, true, true, true, true],
),
(
Operation::Merge {
fragments: vec![fragment0.clone(), fragment2.clone()],
schema: Schema::default(),
},
[true, false, true, true, true, true],
),
];
for (operation, expected_conflicts) in &cases {
let transaction = Transaction::new(0, operation.clone(), None);
for (other, expected_conflict) in other_transactions.iter().zip(expected_conflicts) {
assert_eq!(
transaction.conflicts_with(other),
*expected_conflict,
"Transaction {:?} should {} with {:?}",
transaction,
if *expected_conflict {
"conflict"
} else {
"not conflict"
},
other
);
}
}
}
}