use crate::checkpoint::Checkpoint;
use crate::config::CheckpointOptions;
use crate::db_state::ManifestCore;
use crate::error::SlateDBError;
use crate::error::SlateDBError::{
CheckpointMissing, InvalidDBState, LatestTransactionalObjectVersionMissing, ManifestMissing,
};
use crate::flatbuffer_types::FlatBufferManifestCodec;
use crate::manifest::Manifest;
use crate::rand::DbRand;
use chrono::Utc;
use log::debug;
use object_store::path::Path;
use object_store::ObjectStore;
use serde::Serialize;
use slatedb_common::clock::SystemClock;
use slatedb_txn_obj::object_store::ObjectStoreSequencedStorageProtocol;
use slatedb_txn_obj::{
DirtyObject, FenceableTransactionalObject, MonotonicId, SequencedStorageProtocol,
SimpleTransactionalObject, TransactionalObject, TransactionalStorageProtocol,
};
use std::collections::BTreeMap;
use std::ops::RangeBounds;
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
pub(crate) struct FenceableManifest {
clock: Arc<dyn SystemClock>,
inner: FenceableTransactionalObject<Manifest>,
}
impl FenceableManifest {
pub(crate) async fn init_writer(
stored_manifest: StoredManifest,
manifest_update_timeout: Duration,
system_clock: Arc<dyn SystemClock>,
) -> Result<Self, SlateDBError> {
let clock = system_clock.clone();
let fr = FenceableTransactionalObject::init(
stored_manifest.inner,
manifest_update_timeout,
system_clock,
|m: &Manifest| m.writer_epoch,
|m: &mut Manifest, e: u64| m.writer_epoch = e,
)
.await?;
Ok(Self { inner: fr, clock })
}
pub(crate) async fn init_compactor(
stored_manifest: StoredManifest,
manifest_update_timeout: Duration,
system_clock: Arc<dyn SystemClock>,
) -> Result<Self, SlateDBError> {
let clock = system_clock.clone();
let fr = FenceableTransactionalObject::init(
stored_manifest.inner,
manifest_update_timeout,
system_clock,
|m: &Manifest| m.compactor_epoch,
|m: &mut Manifest, e: u64| m.compactor_epoch = e,
)
.await?;
Ok(Self { inner: fr, clock })
}
pub(crate) fn local_epoch(&self) -> u64 {
self.inner.local_epoch()
}
pub(crate) async fn refresh(&mut self) -> Result<&Manifest, SlateDBError> {
Ok(self.inner.refresh().await?)
}
pub(crate) fn prepare_dirty(&self) -> Result<DirtyObject<Manifest>, SlateDBError> {
Ok(self.inner.prepare_dirty()?)
}
pub(crate) async fn update(
&mut self,
dirty: DirtyObject<Manifest>,
) -> Result<(), SlateDBError> {
Ok(self.inner.update(dirty).await?)
}
pub(crate) fn new_checkpoint(
&self,
checkpoint_id: Uuid,
options: &CheckpointOptions,
) -> Result<Checkpoint, SlateDBError> {
Self::make_new_checkpoint(self.clock.clone(), &self.inner, checkpoint_id, options)
}
fn make_new_checkpoint(
clock: Arc<dyn SystemClock>,
inner: &FenceableTransactionalObject<Manifest>,
checkpoint_id: Uuid,
options: &CheckpointOptions,
) -> Result<Checkpoint, SlateDBError> {
let db_state = &inner.object().core;
let manifest_id = match options.source {
Some(source_checkpoint_id) => {
let Some(source_checkpoint) = db_state.find_checkpoint(source_checkpoint_id) else {
return Err(CheckpointMissing(source_checkpoint_id));
};
source_checkpoint.manifest_id
}
None => {
if !db_state.initialized {
return Err(InvalidDBState);
}
inner.id().next().into()
}
};
Ok(Checkpoint {
id: checkpoint_id,
manifest_id,
expire_time: options.lifetime.map(|l| clock.now() + l),
create_time: clock.now(),
name: options.name.clone(),
})
}
pub(crate) async fn write_checkpoint(
&mut self,
checkpoint_id: Uuid,
options: &CheckpointOptions,
) -> Result<Checkpoint, SlateDBError> {
let clock = self.clock.clone();
self.maybe_apply_update(|fm| {
let checkpoint = Self::make_new_checkpoint(clock.clone(), fm, checkpoint_id, options)?;
let mut dirty = fm.prepare_dirty()?;
dirty.value.core.checkpoints.push(checkpoint);
Ok(Some(dirty))
})
.await?;
let checkpoint = self
.inner
.object()
.core
.find_checkpoint(checkpoint_id)
.expect("update applied but checkpoint not found")
.clone();
Ok(checkpoint)
}
pub(crate) async fn maybe_apply_update<F>(&mut self, mutator: F) -> Result<(), SlateDBError>
where
F: Fn(
&FenceableTransactionalObject<Manifest>,
) -> Result<Option<DirtyObject<Manifest>>, SlateDBError>
+ Send
+ Sync,
{
Ok(self.inner.maybe_apply_update(mutator).await?)
}
}
pub(crate) struct StoredManifest {
inner: SimpleTransactionalObject<Manifest>,
clock: Arc<dyn SystemClock>,
}
impl StoredManifest {
async fn init(
store: Arc<ManifestStore>,
manifest: Manifest,
clock: Arc<dyn SystemClock>,
) -> Result<Self, SlateDBError> {
let inner = SimpleTransactionalObject::<Manifest>::init(
Arc::clone(&store.inner)
as Arc<dyn TransactionalStorageProtocol<Manifest, MonotonicId>>,
manifest.clone(),
)
.await?;
Ok(Self { inner, clock })
}
pub(crate) async fn create_new_db(
store: Arc<ManifestStore>,
core: ManifestCore,
clock: Arc<dyn SystemClock>,
) -> Result<Self, SlateDBError> {
let manifest = Manifest::initial(core);
Self::init(store, manifest, clock).await
}
pub(crate) async fn create_uninitialized_clone(
clone_manifest_store: Arc<ManifestStore>,
parent_manifest: &Manifest,
parent_path: String,
source_checkpoint_id: Uuid,
rand: Arc<DbRand>,
clock: Arc<dyn SystemClock>,
) -> Result<Self, SlateDBError> {
let manifest = Manifest::cloned(parent_manifest, parent_path, source_checkpoint_id, rand);
Self::init(clone_manifest_store, manifest, clock).await
}
pub(crate) async fn try_load(
store: Arc<ManifestStore>,
clock: Arc<dyn SystemClock>,
) -> Result<Option<Self>, SlateDBError> {
let Some(inner) = SimpleTransactionalObject::<Manifest>::try_load(Arc::clone(&store.inner)
as Arc<dyn TransactionalStorageProtocol<Manifest, MonotonicId>>)
.await?
else {
return Ok(None);
};
Ok(Some(Self { inner, clock }))
}
pub(crate) async fn load(
store: Arc<ManifestStore>,
clock: Arc<dyn SystemClock>,
) -> Result<Self, SlateDBError> {
SimpleTransactionalObject::<Manifest>::try_load(Arc::clone(&store.inner)
as Arc<dyn TransactionalStorageProtocol<Manifest, MonotonicId>>)
.await?
.map(|inner| Self { inner, clock })
.ok_or(LatestTransactionalObjectVersionMissing)
}
#[allow(dead_code)]
pub(crate) fn id(&self) -> u64 {
self.inner.id().into()
}
pub(crate) fn manifest(&self) -> &Manifest {
self.inner.object()
}
pub(crate) fn prepare_dirty(&self) -> Result<DirtyObject<Manifest>, SlateDBError> {
Ok(self.inner.prepare_dirty()?)
}
pub(crate) fn db_state(&self) -> &ManifestCore {
&self.manifest().core
}
#[allow(unused)]
pub(crate) async fn refresh(&mut self) -> Result<&Manifest, SlateDBError> {
Ok(self.inner.refresh().await?)
}
fn new_checkpoint(
manifest: &Manifest,
current_id: u64,
clock: &dyn SystemClock,
checkpoint_id: Uuid,
options: &CheckpointOptions,
) -> Result<Checkpoint, SlateDBError> {
let manifest_id = match options.source {
Some(source_checkpoint_id) => {
let Some(source_checkpoint) = manifest.core.find_checkpoint(source_checkpoint_id)
else {
return Err(CheckpointMissing(source_checkpoint_id));
};
source_checkpoint.manifest_id
}
None => {
if !manifest.core.initialized {
return Err(InvalidDBState);
}
current_id + 1
}
};
Ok(Checkpoint {
id: checkpoint_id,
manifest_id,
expire_time: options.lifetime.map(|l| clock.now() + l),
create_time: clock.now(),
name: options.name.clone(),
})
}
pub(crate) async fn write_checkpoint(
&mut self,
checkpoint_id: Uuid,
options: &CheckpointOptions,
) -> Result<Checkpoint, SlateDBError> {
let clock = Arc::clone(&self.clock);
self.inner
.maybe_apply_update(|sr| {
let mut new_val = sr.object().clone();
let checkpoint = Self::new_checkpoint(
&new_val,
sr.id().into(),
clock.as_ref(),
checkpoint_id,
options,
)?;
new_val.core.checkpoints.push(checkpoint);
let mut dirty = sr.prepare_dirty()?;
dirty.value = new_val;
let result: Result<Option<DirtyObject<Manifest>>, SlateDBError> = Ok(Some(dirty));
result
})
.await?;
Ok(self
.db_state()
.find_checkpoint(checkpoint_id)
.expect("update applied but checkpoint not found")
.clone())
}
pub(crate) async fn delete_checkpoint(
&mut self,
checkpoint_id: Uuid,
) -> Result<(), SlateDBError> {
Ok(self
.inner
.maybe_apply_update(|sr| {
let mut new_val = sr.object().clone();
let before = new_val.core.checkpoints.len();
new_val.core.checkpoints.retain(|cp| cp.id != checkpoint_id);
let result: Result<Option<DirtyObject<Manifest>>, SlateDBError> =
if new_val.core.checkpoints.len() == before {
Ok(None)
} else {
let mut dirty = sr.prepare_dirty()?;
dirty.value = new_val;
Ok(Some(dirty))
};
result
})
.await?)
}
pub(crate) async fn replace_checkpoint(
&mut self,
old_checkpoint_id: Uuid,
new_checkpoint_id: Uuid,
new_checkpoint_options: &CheckpointOptions,
) -> Result<Checkpoint, SlateDBError> {
let clock = Arc::clone(&self.clock);
self.inner
.maybe_apply_update(|sr| {
let mut new_val = sr.object().clone();
let checkpoint = Self::new_checkpoint(
&new_val,
sr.id().into(),
clock.as_ref(),
new_checkpoint_id,
new_checkpoint_options,
)?;
new_val
.core
.checkpoints
.retain(|cp| cp.id != old_checkpoint_id);
new_val.core.checkpoints.push(checkpoint);
let mut dirty = sr.prepare_dirty()?;
dirty.value = new_val;
let result: Result<Option<DirtyObject<Manifest>>, SlateDBError> = Ok(Some(dirty));
result
})
.await?;
let new_checkpoint = self
.db_state()
.find_checkpoint(new_checkpoint_id)
.expect("update applied but checkpoint not found")
.clone();
Ok(new_checkpoint)
}
pub(crate) async fn refresh_checkpoint(
&mut self,
checkpoint_id: Uuid,
new_lifetime: Duration,
) -> Result<Checkpoint, SlateDBError> {
let clock = Arc::clone(&self.clock);
self.inner
.maybe_apply_update(|sr| {
let mut new_val = sr.object().clone();
let Some(cp) = new_val
.core
.checkpoints
.iter_mut()
.find(|c| c.id == checkpoint_id)
else {
return Err(CheckpointMissing(checkpoint_id));
};
cp.expire_time = Some(clock.now() + new_lifetime);
let mut dirty = sr.prepare_dirty()?;
dirty.value = new_val;
Ok(Some(dirty))
})
.await?;
let checkpoint = self
.db_state()
.find_checkpoint(checkpoint_id)
.expect("update applied but checkpoint not found")
.clone();
Ok(checkpoint)
}
pub(crate) async fn update(
&mut self,
dirty: DirtyObject<Manifest>,
) -> Result<(), SlateDBError> {
Ok(self.inner.update(dirty).await?)
}
pub(crate) async fn maybe_apply_update<F>(&mut self, mutator: F) -> Result<(), SlateDBError>
where
F: Fn(
&SimpleTransactionalObject<Manifest>,
) -> Result<Option<DirtyObject<Manifest>>, SlateDBError>
+ Send
+ Sync,
{
Ok(self.inner.maybe_apply_update(mutator).await?)
}
}
#[derive(Serialize, Debug)]
pub(crate) struct ManifestFileMetadata {
pub(crate) id: u64,
#[serde(serialize_with = "serialize_path")]
pub(crate) location: Path,
pub(crate) last_modified: chrono::DateTime<Utc>,
#[allow(dead_code)]
pub(crate) size: u32,
}
fn serialize_path<S>(path: &Path, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(path.as_ref())
}
pub(crate) struct ManifestStore {
inner: Arc<dyn SequencedStorageProtocol<Manifest>>,
}
impl ManifestStore {
pub(crate) fn new(root_path: &Path, object_store: Arc<dyn ObjectStore>) -> Self {
let inner = Arc::new(ObjectStoreSequencedStorageProtocol::<Manifest>::new(
root_path,
object_store,
"manifest",
"manifest",
Box::new(FlatBufferManifestCodec {}),
));
Self { inner }
}
pub(crate) async fn delete_manifest(&self, id: u64) -> Result<(), SlateDBError> {
let (active_id, manifest) = self.read_latest_manifest().await?;
if active_id == id {
return Err(SlateDBError::InvalidDeletion);
}
if manifest
.core
.checkpoints
.iter()
.any(|ck| ck.manifest_id == id)
{
return Err(SlateDBError::InvalidDeletion);
}
debug!("deleting manifest [id={}]", id);
Ok(self.inner.delete(MonotonicId::new(id)).await?)
}
pub(crate) async fn list_manifests<R: RangeBounds<u64>>(
&self,
id_range: R,
) -> Result<Vec<ManifestFileMetadata>, SlateDBError> {
let manifests = self
.inner
.list(
id_range.start_bound().map(|b| (*b).into()),
id_range.end_bound().map(|b| (*b).into()),
)
.await?
.into_iter()
.map(|f| ManifestFileMetadata {
id: f.id.into(),
location: f.location,
last_modified: f.last_modified,
size: f.size,
})
.collect::<Vec<_>>();
Ok(manifests)
}
pub(crate) async fn read_referenced_manifests(
&self,
manifest_id: u64,
manifest: &Manifest,
) -> Result<BTreeMap<u64, Manifest>, SlateDBError> {
let mut referenced_manifests = BTreeMap::new();
referenced_manifests.insert(manifest_id, manifest.clone());
let checkpoint_manifest_ids = manifest
.core
.checkpoints
.iter()
.map(|checkpoint| checkpoint.manifest_id)
.collect::<Vec<_>>();
for checkpoint_manifest_id in checkpoint_manifest_ids {
if let std::collections::btree_map::Entry::Vacant(entry) =
referenced_manifests.entry(checkpoint_manifest_id)
{
let checkpoint_manifest = self.read_manifest(checkpoint_manifest_id).await?;
entry.insert(checkpoint_manifest);
}
}
Ok(referenced_manifests)
}
pub(crate) async fn try_read_latest_manifest(
&self,
) -> Result<Option<(u64, Manifest)>, SlateDBError> {
Ok(self
.inner
.try_read_latest()
.await
.map(|opt| opt.map(|(id, manifest)| (id.into(), manifest)))?)
}
pub(crate) async fn read_latest_manifest(&self) -> Result<(u64, Manifest), SlateDBError> {
self.try_read_latest_manifest()
.await?
.ok_or(LatestTransactionalObjectVersionMissing)
}
pub(crate) async fn try_read_manifest(
&self,
id: u64,
) -> Result<Option<Manifest>, SlateDBError> {
Ok(self.inner.try_read(MonotonicId::new(id)).await?)
}
pub(crate) async fn read_manifest(&self, id: u64) -> Result<Manifest, SlateDBError> {
self.try_read_manifest(id).await?.ok_or(ManifestMissing(id))
}
}
#[cfg(test)]
pub(crate) mod test_utils {
use crate::db_state::ManifestCore;
use crate::manifest::Manifest;
use slatedb_txn_obj::test_utils::new_dirty_object;
use slatedb_txn_obj::DirtyObject;
pub(crate) fn new_dirty_manifest() -> DirtyObject<Manifest> {
new_dirty_object(1u64, Manifest::initial(ManifestCore::new()))
}
}
#[cfg(test)]
mod tests {
use crate::checkpoint::Checkpoint;
use crate::config::CheckpointOptions;
use crate::db_state::ManifestCore;
use crate::error;
use crate::error::SlateDBError;
use crate::manifest::store::{FenceableManifest, ManifestStore, StoredManifest};
use crate::rand::DbRand;
use crate::retrying_object_store::RetryingObjectStore;
use crate::test_utils::FlakyObjectStore;
use chrono::Timelike;
use object_store::memory::InMemory;
use object_store::path::Path;
use slatedb_common::clock::{DefaultSystemClock, SystemClock};
use slatedb_txn_obj::TransactionalObject;
use std::sync::Arc;
use std::time::Duration;
const ROOT: &str = "/root/path";
#[tokio::test]
async fn test_should_fail_write_on_version_conflict() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
let mut sm = StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let mut sm2 = StoredManifest::load(ms.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
sm.update(sm.prepare_dirty().unwrap()).await.unwrap();
let result = sm2.update(sm2.prepare_dirty().unwrap()).await;
assert!(matches!(
result.unwrap_err(),
error::SlateDBError::TransactionalObjectVersionExists
));
}
#[tokio::test]
async fn test_should_write_with_new_version() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
let mut sm = StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
sm.update(sm.prepare_dirty().unwrap()).await.unwrap();
let (version, _) = ms.read_latest_manifest().await.unwrap();
assert_eq!(version, 2);
}
#[tokio::test]
async fn test_should_update_local_state_on_write() {
let ms = new_memory_manifest_store();
let mut sm = StoredManifest::create_new_db(
ms.clone(),
ManifestCore::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let mut dirty = sm.prepare_dirty().unwrap();
dirty.value.core.next_wal_sst_id = 123;
sm.update(dirty).await.unwrap();
assert_eq!(sm.db_state().next_wal_sst_id, 123);
}
#[tokio::test]
async fn test_should_refresh() {
let ms = new_memory_manifest_store();
let mut sm = StoredManifest::create_new_db(
ms.clone(),
ManifestCore::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let mut sm2 = StoredManifest::load(ms.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let mut dirty = sm.prepare_dirty().unwrap();
dirty.value.core.next_wal_sst_id = 123;
sm.update(dirty).await.unwrap();
let refreshed = sm2.refresh().await.unwrap();
assert_eq!(refreshed.core.next_wal_sst_id, 123);
assert_eq!(sm2.db_state().next_wal_sst_id, 123);
}
#[tokio::test]
async fn test_should_bump_writer_epoch() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let timeout = Duration::from_secs(300);
for i in 1..5 {
let sm = StoredManifest::load(ms.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
FenceableManifest::init_writer(sm, timeout, Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let (_, manifest) = ms.read_latest_manifest().await.unwrap();
assert_eq!(manifest.writer_epoch, i);
}
}
#[tokio::test]
async fn test_should_fail_refresh_on_writer_fenced() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
let sm = StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let timeout = Duration::from_secs(300);
let mut writer1 =
FenceableManifest::init_writer(sm, timeout, Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let sm2 = StoredManifest::load(ms.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
FenceableManifest::init_writer(sm2, timeout, Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let result = writer1.refresh().await;
assert!(matches!(result, Err(error::SlateDBError::Fenced)));
}
#[tokio::test]
async fn test_should_bump_compactor_epoch() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let timeout = Duration::from_secs(300);
for i in 1..5 {
let sm = StoredManifest::load(ms.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
FenceableManifest::init_compactor(sm, timeout, Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let (_, manifest) = ms.read_latest_manifest().await.unwrap();
assert_eq!(manifest.compactor_epoch, i);
}
}
#[tokio::test]
async fn test_should_fail_refresh_on_compactor_fenced() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
let sm = StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let timeout = Duration::from_secs(300);
let mut compactor1 =
FenceableManifest::init_compactor(sm, timeout, Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let sm2 = StoredManifest::load(ms.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
FenceableManifest::init_compactor(sm2, timeout, Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let result = compactor1.refresh().await;
assert!(matches!(result, Err(error::SlateDBError::Fenced)));
}
#[tokio::test]
async fn test_should_fail_manifest_write_of_stale_dirty_manifest() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
let mut sm = StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let stale = sm.prepare_dirty().unwrap();
sm.update(sm.prepare_dirty().unwrap()).await.unwrap();
let result = sm.update(stale).await;
assert!(matches!(
result,
Err(SlateDBError::TransactionalObjectVersionExists)
));
}
#[tokio::test]
async fn test_should_fail_write_checkpoint_when_fenced() {
let ms = new_memory_manifest_store();
let sm = StoredManifest::create_new_db(
ms.clone(),
ManifestCore::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let timeout = Duration::from_secs(300);
let mut compactor1 =
FenceableManifest::init_compactor(sm, timeout, Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let sm2 = StoredManifest::load(ms.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let mut compactor2 =
FenceableManifest::init_compactor(sm2, timeout, Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let result = compactor1
.write_checkpoint(uuid::Uuid::new_v4(), &CheckpointOptions::default())
.await;
assert!(matches!(result, Err(error::SlateDBError::Fenced)));
assert_state_not_updated(&mut compactor2).await;
}
#[tokio::test]
async fn test_should_fail_state_update_when_fenced() {
let ms = new_memory_manifest_store();
let sm = StoredManifest::create_new_db(
ms.clone(),
ManifestCore::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let timeout = Duration::from_secs(300);
let mut fm1 =
FenceableManifest::init_writer(sm, timeout, Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let sm2 = StoredManifest::load(ms.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let mut fm2 =
FenceableManifest::init_writer(sm2, timeout, Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let result = fm1
.maybe_apply_update(|fm| {
let mut dirty = fm.prepare_dirty()?;
dirty.value.core.last_l0_seq += 1;
Ok(Some(dirty))
})
.await;
assert!(matches!(result, Err(SlateDBError::Fenced)));
assert_state_not_updated(&mut fm2).await;
}
async fn assert_state_not_updated(fm: &mut FenceableManifest) {
let original_db_state = fm.inner.object().core.clone();
fm.refresh().await.unwrap();
let refreshed_db_state = fm.inner.object().core.clone();
assert_eq!(refreshed_db_state, original_db_state);
}
#[tokio::test]
async fn test_should_read_specific_manifest() {
let os = Arc::new(InMemory::new());
let ms = Arc::new(ManifestStore::new(&Path::from(ROOT), os.clone()));
let state = ManifestCore::new();
let mut sm = StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let mut dirty = sm.prepare_dirty().unwrap();
dirty
.value
.core
.checkpoints
.push(new_checkpoint(sm.inner.id().into()));
sm.update(dirty).await.unwrap();
let manifest = ms.try_read_manifest(2).await.unwrap().unwrap();
assert_eq!(1, manifest.core.checkpoints.len());
}
#[tokio::test]
async fn test_retry_write_manifest_on_timeout() {
let base = Arc::new(InMemory::new());
let flaky = Arc::new(FlakyObjectStore::new(base.clone(), 1));
let retrying = Arc::new(RetryingObjectStore::new(
flaky.clone(),
Arc::new(DbRand::default()),
Arc::new(DefaultSystemClock::new()),
));
let ms = Arc::new(ManifestStore::new(&Path::from(ROOT), retrying.clone()));
let core = ManifestCore::new();
let _sm = StoredManifest::create_new_db(
ms.clone(),
core.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
assert!(flaky.put_attempts() >= 2);
let written = ms.try_read_manifest(1).await.unwrap().unwrap();
assert_eq!(written, super::super::Manifest::initial(core));
}
#[tokio::test]
async fn test_list_manifests_unbounded() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
let mut sm = StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
sm.update(sm.prepare_dirty().unwrap()).await.unwrap();
let manifests = ms.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 2);
assert_eq!(manifests[0].id, 1);
assert_eq!(manifests[1].id, 2);
let manifests = ms.list_manifests(1..2).await.unwrap();
assert_eq!(manifests.len(), 1);
assert_eq!(manifests[0].id, 1);
let manifests = ms.list_manifests(2..).await.unwrap();
assert_eq!(manifests.len(), 1);
assert_eq!(manifests[0].id, 2);
let manifests = ms.list_manifests(..2).await.unwrap();
assert_eq!(manifests.len(), 1);
assert_eq!(manifests[0].id, 1);
}
#[tokio::test]
async fn test_delete_manifest() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
let mut sm = StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
sm.update(sm.prepare_dirty().unwrap()).await.unwrap();
let manifests = ms.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 2);
assert_eq!(manifests[0].id, 1);
assert_eq!(manifests[1].id, 2);
ms.delete_manifest(1).await.unwrap();
let manifests = ms.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 1);
assert_eq!(manifests[0].id, 2);
}
#[tokio::test]
async fn test_delete_active_manifest_should_fail() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
let mut sm = StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
sm.update(sm.prepare_dirty().unwrap()).await.unwrap();
let manifests = ms.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 2);
assert_eq!(manifests[0].id, 1);
assert_eq!(manifests[1].id, 2);
let result = ms.delete_manifest(2).await;
assert!(matches!(result, Err(error::SlateDBError::InvalidDeletion)));
}
fn new_memory_manifest_store() -> Arc<ManifestStore> {
let os = Arc::new(InMemory::new());
Arc::new(ManifestStore::new(&Path::from(ROOT), os))
}
fn new_checkpoint(manifest_id: u64) -> Checkpoint {
let create_time = DefaultSystemClock::default()
.now()
.with_nanosecond(0)
.unwrap();
Checkpoint {
id: uuid::Uuid::new_v4(),
manifest_id,
expire_time: None,
create_time,
name: None,
}
}
#[tokio::test]
async fn test_read_referenced_manifests_includes_checkpointed_manifests() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
let mut sm = StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let initial_manifest = sm.manifest().clone();
let initial_manifest_id = sm.id();
let (latest_manifest_id, latest_manifest) = ms.read_latest_manifest().await.unwrap();
let referenced = ms
.read_referenced_manifests(latest_manifest_id, &latest_manifest)
.await
.unwrap();
assert_eq!(1, referenced.len());
assert_eq!(
Some(&initial_manifest),
referenced.get(&initial_manifest_id)
);
let mut dirty = sm.prepare_dirty().unwrap();
dirty
.value
.core
.checkpoints
.push(new_checkpoint(initial_manifest_id));
sm.update(dirty).await.unwrap();
let (latest_manifest_id, latest_manifest) = ms.read_latest_manifest().await.unwrap();
let referenced = ms
.read_referenced_manifests(latest_manifest_id, &latest_manifest)
.await
.unwrap();
assert_eq!(2, referenced.len());
assert_eq!(
Some(&initial_manifest),
referenced.get(&initial_manifest_id)
);
assert_eq!(Some(&latest_manifest), referenced.get(&latest_manifest_id));
let mut dirty = sm.prepare_dirty().unwrap();
dirty.value.core.checkpoints.clear();
sm.update(dirty).await.unwrap();
let (latest_manifest_id, latest_manifest) = ms.read_latest_manifest().await.unwrap();
let referenced = ms
.read_referenced_manifests(latest_manifest_id, &latest_manifest)
.await
.unwrap();
assert_eq!(1, referenced.len());
assert_eq!(Some(&latest_manifest), referenced.get(&latest_manifest_id));
}
#[tokio::test]
async fn test_read_referenced_manifests_dedupes_checkpoint_ids() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
let mut sm = StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let checkpoint_manifest_id = sm.id();
let mut dirty = sm.prepare_dirty().unwrap();
dirty
.value
.core
.checkpoints
.push(new_checkpoint(checkpoint_manifest_id));
dirty
.value
.core
.checkpoints
.push(new_checkpoint(checkpoint_manifest_id));
sm.update(dirty).await.unwrap();
let (latest_manifest_id, latest_manifest) = ms.read_latest_manifest().await.unwrap();
let referenced = ms
.read_referenced_manifests(latest_manifest_id, &latest_manifest)
.await
.unwrap();
assert_eq!(2, referenced.len());
}
#[tokio::test]
async fn test_read_referenced_manifests_missing_manifest_returns_error() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
let mut sm = StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let missing_manifest_id = sm.id() + 42;
let mut dirty = sm.prepare_dirty().unwrap();
dirty
.value
.core
.checkpoints
.push(new_checkpoint(missing_manifest_id));
sm.update(dirty).await.unwrap();
let (latest_manifest_id, latest_manifest) = ms.read_latest_manifest().await.unwrap();
let result = ms
.read_referenced_manifests(latest_manifest_id, &latest_manifest)
.await;
assert!(matches!(
result,
Err(SlateDBError::ManifestMissing(id)) if id == missing_manifest_id
));
}
#[tokio::test]
async fn test_maybe_apply_state_update() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
let mut sm = StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let initial_id = sm.inner.id();
sm.maybe_apply_update(|_| Ok(None)).await.unwrap();
assert_eq!(initial_id, sm.inner.id());
sm.maybe_apply_update(|sm| Ok(Some(sm.prepare_dirty().unwrap())))
.await
.unwrap();
assert_eq!(initial_id + 1, sm.inner.id().id());
}
#[tokio::test]
async fn test_deletion_of_manifest_with_checkpoint_reference_not_allowed() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
let mut sm = StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let checkpoint1 = sm
.write_checkpoint(uuid::Uuid::new_v4(), &CheckpointOptions::default())
.await
.unwrap();
let _ = sm
.write_checkpoint(uuid::Uuid::new_v4(), &CheckpointOptions::default())
.await
.unwrap();
assert!(matches!(
ms.delete_manifest(checkpoint1.manifest_id).await,
Err(SlateDBError::InvalidDeletion)
));
}
#[tokio::test]
async fn should_refresh_checkpoint() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
let mut sm = StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let options = CheckpointOptions {
lifetime: Some(Duration::from_secs(100)),
..CheckpointOptions::default()
};
let checkpoint = sm
.write_checkpoint(uuid::Uuid::new_v4(), &options)
.await
.unwrap();
let expire_time = checkpoint.expire_time.unwrap();
let refreshed_checkpoint = sm
.refresh_checkpoint(checkpoint.id, Duration::from_secs(500))
.await
.unwrap();
let refreshed_expire_time = refreshed_checkpoint.expire_time.unwrap();
assert!(refreshed_expire_time > expire_time);
assert_eq!(
Some(&refreshed_checkpoint),
sm.manifest().core.find_checkpoint(checkpoint.id)
);
}
#[tokio::test]
async fn should_fail_refresh_if_checkpoint_missing() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
let mut sm = StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let checkpoint_id = uuid::Uuid::new_v4();
let result = sm
.refresh_checkpoint(checkpoint_id, Duration::from_secs(100))
.await;
if let Err(SlateDBError::CheckpointMissing(missing_id)) = result {
assert_eq!(checkpoint_id, missing_id);
} else {
panic!("Unexpected result {result:?}")
}
}
#[tokio::test]
async fn should_replace_checkpoint() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
let mut sm = StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let checkpoint = sm
.write_checkpoint(uuid::Uuid::new_v4(), &CheckpointOptions::default())
.await
.unwrap();
let replaced_checkpoint = sm
.replace_checkpoint(
checkpoint.id,
uuid::Uuid::new_v4(),
&CheckpointOptions::default(),
)
.await
.unwrap();
assert_ne!(checkpoint.id, replaced_checkpoint.id);
assert_eq!(None, sm.manifest().core.find_checkpoint(checkpoint.id));
assert_eq!(
Some(&replaced_checkpoint),
sm.manifest().core.find_checkpoint(replaced_checkpoint.id),
);
}
#[tokio::test]
async fn should_ignore_missing_checkpoint_if_replacing() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
let mut sm = StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let missing_checkpoint_id = uuid::Uuid::new_v4();
let replaced_checkpoint = sm
.replace_checkpoint(
uuid::Uuid::new_v4(),
missing_checkpoint_id,
&CheckpointOptions::default(),
)
.await
.unwrap();
assert_eq!(
Some(&replaced_checkpoint),
sm.manifest().core.find_checkpoint(replaced_checkpoint.id),
);
}
#[tokio::test]
async fn should_delete_checkpoint() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
let mut sm = StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let checkpoint = sm
.write_checkpoint(uuid::Uuid::new_v4(), &CheckpointOptions::default())
.await
.unwrap();
sm.delete_checkpoint(checkpoint.id).await.unwrap();
assert_eq!(None, sm.manifest().core.find_checkpoint(checkpoint.id));
}
#[tokio::test]
async fn should_ignore_missing_checkpoint_if_deleting() {
let ms = new_memory_manifest_store();
let state = ManifestCore::new();
let mut sm = StoredManifest::create_new_db(
ms.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let checkpoint_id = uuid::Uuid::new_v4();
let manifest_id = sm.inner.id().id();
sm.delete_checkpoint(checkpoint_id).await.unwrap();
sm.refresh().await.unwrap();
assert_eq!(manifest_id, sm.id());
}
#[tokio::test]
async fn test_should_cretry_epoch_bump_if_manifest_version_exists() {
let os = Arc::new(InMemory::new());
let ms = Arc::new(ManifestStore::new(&Path::from(ROOT), os.clone()));
let state = ManifestCore::new();
let sm_a = StoredManifest::create_new_db(
Arc::clone(&ms),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let sm_b = StoredManifest::load(Arc::clone(&ms), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let timeout = Duration::from_secs(300);
let mut fm_b =
FenceableManifest::init_writer(sm_b, timeout, Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
assert_eq!(1, fm_b.inner.local_epoch());
let mut fm_a =
FenceableManifest::init_writer(sm_a, timeout, Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
assert_eq!(2, fm_a.inner.local_epoch());
assert!(matches!(
fm_b.refresh().await.err(),
Some(SlateDBError::Fenced)
));
assert!(fm_a.refresh().await.is_ok());
}
}