use super::service::{ObjectInfo, Service};
use crate::errors::{Error, Result};
use crate::server::encryption::{Cryptor, Sealed, Unsealed};
use crate::server::{
AddVersionResult, GetVersionResult, HistorySegment, Server, Snapshot, SnapshotUrgency,
VersionId,
};
use async_trait::async_trait;
use ring::rand;
use std::collections::{HashMap, HashSet};
#[cfg(test)]
use std::future::Future;
#[cfg(test)]
use std::pin::Pin;
#[cfg(not(test))]
use std::time::{SystemTime, UNIX_EPOCH};
use uuid::Uuid;
#[cfg(test)]
type InterceptFn<S> =
Box<dyn for<'a> FnOnce(&'a mut S) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> + Send>;
pub(in crate::server) struct CloudServer<SVC: Service> {
service: SVC,
cryptor: Cryptor,
cleanup_probability: u8,
#[cfg(test)]
add_version_intercept: Option<InterceptFn<SVC>>,
}
const LATEST: &str = "latest";
const DEFAULT_CLEANUP_PROBABILITY: u8 = 13;
#[cfg(not(test))]
const MAX_VERSION_AGE_SECS: u64 = 3600 * 24 * 180;
fn version_to_bytes(v: VersionId) -> Vec<u8> {
v.as_simple().to_string().into_bytes()
}
impl<SVC: Service> CloudServer<SVC> {
pub(in crate::server) async fn new(
mut service: SVC,
encryption_secret: Vec<u8>,
) -> Result<Self> {
let salt = Self::get_salt(&mut service).await?;
let cryptor = Cryptor::new(salt, &encryption_secret.into())?;
Ok(Self {
service,
cryptor,
cleanup_probability: DEFAULT_CLEANUP_PROBABILITY,
#[cfg(test)]
add_version_intercept: None,
})
}
async fn get_salt(service: &mut SVC) -> Result<Vec<u8>> {
const SALT_NAME: &str = "salt";
loop {
if let Some(salt) = service.get(SALT_NAME).await? {
return Ok(salt);
}
service
.compare_and_swap(SALT_NAME, None, Cryptor::gen_salt()?)
.await?;
}
}
fn version_name(parent_version_id: &VersionId, child_version_id: &VersionId) -> String {
format!(
"v-{}-{}",
parent_version_id.as_simple(),
child_version_id.as_simple()
)
}
fn parse_version_name(name: &str) -> Option<(VersionId, VersionId)> {
debug_assert!(name.is_ascii());
let dash = 2 + 32;
if name.len() != 2 + 32 + 1 + 32 || !name.starts_with("v-") || &name[dash..dash + 1] != "-"
{
return None;
}
let Ok(parent_version_id) = VersionId::try_parse(&name[2..2 + 32]) else {
return None;
};
let Ok(child_version_id) = VersionId::try_parse(&name[2 + 32 + 1..]) else {
return None;
};
Some((parent_version_id, child_version_id))
}
fn snapshot_name(version_id: &VersionId) -> String {
format!("s-{}", version_id.as_simple())
}
fn parse_snapshot_name(name: &str) -> Option<VersionId> {
if name.len() != 2 + 32 || !name.starts_with("s-") {
return None;
}
let Ok(version_id) = VersionId::try_parse(&name[2..2 + 32]) else {
return None;
};
Some(version_id)
}
fn randint(&self) -> Result<u8> {
use rand::SecureRandom;
let mut randint = [0u8];
rand::SystemRandom::new()
.fill(&mut randint)
.map_err(|_| Error::Server("Random number generator failure".into()))?;
Ok(randint[0])
}
async fn get_latest(&mut self) -> Result<Option<VersionId>> {
let Some(latest) = self.service.get(LATEST).await? else {
return Ok(None);
};
let latest = VersionId::try_parse_ascii(&latest)
.map_err(|_| Error::Server("'latest' object contains invalid data".into()))?;
Ok(Some(latest))
}
async fn get_child_versions(
&mut self,
parent_version_id: &VersionId,
) -> Result<Vec<VersionId>> {
let mut versions = Vec::new();
let prefix = &format!("v-{}-", parent_version_id.as_simple());
let mut iterator = self.service.list(prefix).await;
while let Some(res) = iterator.next().await {
match res {
Ok(ObjectInfo { name, .. }) => {
if let Some((_, c)) = Self::parse_version_name(&name) {
versions.push(c);
}
}
Err(e) => {
return Err(e);
}
}
}
Ok(versions)
}
fn snapshot_urgency(&self) -> Result<SnapshotUrgency> {
let r = self.randint()?;
if r < 2 {
Ok(SnapshotUrgency::High)
} else if r < 25 {
Ok(SnapshotUrgency::Low)
} else {
Ok(SnapshotUrgency::None)
}
}
async fn maybe_cleanup(&mut self) -> Result<()> {
if self.randint()? < self.cleanup_probability {
self.cleanup_probability = DEFAULT_CLEANUP_PROBABILITY;
self.cleanup().await
} else {
Ok(())
}
}
async fn cleanup(&mut self) -> Result<()> {
let mut versions = {
let mut versions = Vec::new();
let mut iterator = self.service.list("v-").await;
while let Some(res) = iterator.next().await {
match res {
Ok(ObjectInfo { name, creation }) => {
if let Some((p, c)) = Self::parse_version_name(&name) {
versions.push((c, p, creation));
}
}
Err(e) => return Err(e),
}
}
versions
};
versions.sort();
let parent_of = |c| match versions.binary_search_by_key(&c, |tup| tup.0) {
Ok(idx) => Some(versions[idx].1),
Err(_) => None,
};
let mut rev_chain = HashMap::new();
let mut iterations = versions.len() + 1; let latest = self.get_latest().await?;
if let Some(mut c) = latest {
while let Some(p) = parent_of(c) {
rev_chain.insert(c, p);
c = p;
iterations -= 1;
if iterations == 0 {
return Err(Error::Server("Version cycle detected".into()));
}
}
}
#[cfg(not(test))]
let age_threshold = {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|t| t.as_secs())
.unwrap_or(0);
now.saturating_sub(MAX_VERSION_AGE_SECS)
};
#[cfg(test)]
let age_threshold = 1000;
let old_versions: HashSet<Uuid> = versions
.iter()
.filter_map(|(c, _, creation)| {
if *creation < age_threshold {
Some(*c)
} else {
None
}
})
.collect();
for (c, p, _) in versions {
if rev_chain.get(&c) != Some(&p) && Some(p) != latest {
self.service.del(&Self::version_name(&p, &c)).await?;
}
}
let snapshots = {
let mut snapshots = HashSet::new();
let mut iterator = self.service.list("s-").await;
while let Some(res) = iterator.next().await {
match res {
Ok(ObjectInfo { name, .. }) => {
if let Some(parsed_name) = Self::parse_snapshot_name(&name) {
snapshots.insert(parsed_name);
}
}
Err(e) => return Err(e),
}
}
snapshots
};
let mut latest_snapshot = None;
if let Some(mut version) = latest {
loop {
if snapshots.contains(&version) {
latest_snapshot = Some(version);
break;
}
if let Some(v) = rev_chain.get(&version) {
version = *v;
} else {
break;
}
}
}
let Some(latest_snapshot) = latest_snapshot else {
return Ok(());
};
for version in snapshots {
if version != latest_snapshot {
self.service.del(&Self::snapshot_name(&version)).await?;
}
}
let mut version = latest_snapshot;
while let Some(parent) = rev_chain.get(&version) {
if old_versions.contains(&version) {
self.service
.del(&Self::version_name(parent, &version))
.await?;
}
version = *parent;
}
Ok(())
}
}
#[async_trait(?Send)]
impl<SVC: Service + Send> Server for CloudServer<SVC> {
async fn add_version(
&mut self,
parent_version_id: VersionId,
history_segment: HistorySegment,
) -> Result<(AddVersionResult, SnapshotUrgency)> {
let latest = self.get_latest().await?;
if let Some(l) = latest {
if l != parent_version_id {
return Ok((
AddVersionResult::ExpectedParentVersion(l),
self.snapshot_urgency()?,
));
}
}
let version_id = VersionId::new_v4();
let new_name = Self::version_name(&parent_version_id, &version_id);
let sealed = self.cryptor.seal(Unsealed {
version_id,
payload: history_segment,
})?;
self.service.put(&new_name, sealed.as_ref()).await?;
#[cfg(test)]
if let Some(f) = self.add_version_intercept.take() {
f(&mut self.service).await;
}
let old_value = latest.map(version_to_bytes);
let new_value = version_to_bytes(version_id);
if !self
.service
.compare_and_swap(LATEST, old_value, new_value)
.await?
{
self.service.del(&new_name).await?;
let latest = self.get_latest().await?;
let latest = latest.unwrap_or(Uuid::nil());
return Ok((
AddVersionResult::ExpectedParentVersion(latest),
self.snapshot_urgency()?,
));
}
let _ = self.maybe_cleanup().await;
Ok((AddVersionResult::Ok(version_id), self.snapshot_urgency()?))
}
async fn get_child_version(
&mut self,
parent_version_id: VersionId,
) -> Result<GetVersionResult> {
let version_id = match &(self.get_child_versions(&parent_version_id).await?)[..] {
[] => return Ok(GetVersionResult::NoSuchVersion),
children => {
self.cleanup_probability = 255;
let latest = self.get_latest().await?;
let mut true_child = None;
for child in children {
if Some(*child) == latest {
true_child = Some(*child);
break;
}
}
if true_child.is_none() {
for child in children {
if !self.get_child_versions(child).await?.is_empty() {
true_child = Some(*child)
}
}
}
match true_child {
Some(true_child) => true_child,
None => return Ok(GetVersionResult::NoSuchVersion),
}
}
};
let Some(sealed) = self
.service
.get(&Self::version_name(&parent_version_id, &version_id))
.await?
else {
return Ok(GetVersionResult::NoSuchVersion);
};
let unsealed = self.cryptor.unseal(Sealed {
version_id,
payload: sealed,
})?;
Ok(GetVersionResult::Version {
version_id,
parent_version_id,
history_segment: unsealed.into(),
})
}
async fn add_snapshot(&mut self, version_id: VersionId, snapshot: Snapshot) -> Result<()> {
let name = Self::snapshot_name(&version_id);
let sealed = self.cryptor.seal(Unsealed {
version_id,
payload: snapshot,
})?;
self.service.put(&name, sealed.as_ref()).await?;
Ok(())
}
async fn get_snapshot(&mut self) -> Result<Option<(VersionId, Snapshot)>> {
let Some(name) = self.service.list("s-").await.next().await else {
return Ok(None);
};
let ObjectInfo { name, .. } = name?;
let Some(version_id) = Self::parse_snapshot_name(&name) else {
return Ok(None);
};
let Some(payload) = self.service.get(&name).await? else {
return Ok(None);
};
let unsealed = self.cryptor.unseal(Sealed {
version_id,
payload,
})?;
Ok(Some((version_id, unsealed.payload)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::server::{
cloud::iter::{AsyncObjectIterator, SyncIteratorWrapper},
NIL_VERSION_ID,
};
#[derive(Clone)]
struct MockService(HashMap<String, (u64, Vec<u8>)>);
const INSERTION_TIME: u64 = 9999999999;
impl MockService {
fn new() -> Self {
let mut map = HashMap::new();
map.insert("salt".into(), (0, "abcdefghabcdefgh".into()));
Self(map)
}
}
#[async_trait]
impl Service for MockService {
async fn put(&mut self, name: &str, value: &[u8]) -> Result<()> {
self.0.insert(name.into(), (INSERTION_TIME, value.into()));
Ok(())
}
async fn get(&mut self, name: &str) -> Result<Option<Vec<u8>>> {
Ok(self.0.get(name).map(|(_, data)| data.clone()))
}
async fn del(&mut self, name: &str) -> Result<()> {
self.0.remove(name);
Ok(())
}
async fn compare_and_swap(
&mut self,
name: &str,
existing_value: Option<Vec<u8>>,
new_value: Vec<u8>,
) -> Result<bool> {
if self.0.get(name).map(|(_, d)| d) == existing_value.as_ref() {
self.0.insert(name.into(), (INSERTION_TIME, new_value));
return Ok(true);
}
Ok(false)
}
async fn list<'a>(
&'a mut self,
prefix: &'a str,
) -> Box<dyn AsyncObjectIterator + Send + 'a> {
let inner = self
.0
.iter()
.filter(move |(k, _)| k.starts_with(prefix))
.map(|(k, (t, _))| {
Ok(ObjectInfo {
name: k.to_string(),
creation: *t,
})
});
Box::new(SyncIteratorWrapper { inner })
}
}
impl std::fmt::Debug for MockService {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_map().entries(self.0.iter()).finish()
}
}
impl CloudServer<MockService> {
fn mock_add_version(
&mut self,
parent: VersionId,
child: VersionId,
creation: u64,
data: &[u8],
) {
let name = Self::version_name(&parent, &child);
let sealed = self
.cryptor
.seal(Unsealed {
version_id: child,
payload: data.into(),
})
.unwrap();
self.service.0.insert(name, (creation, sealed.into()));
}
fn mock_add_snapshot(&mut self, version: VersionId, creation: u64, snapshot: &[u8]) {
let name = Self::snapshot_name(&version);
let sealed = self
.cryptor
.seal(Unsealed {
version_id: version,
payload: snapshot.into(),
})
.unwrap();
self.service.0.insert(name, (creation, sealed.into()));
}
fn mock_set_latest(&mut self, latest: VersionId) {
let latest = version_to_bytes(latest);
self.service
.0
.insert(LATEST.into(), (INSERTION_TIME, latest));
}
fn empty_clone(&self) -> Self {
Self {
cryptor: self.cryptor.clone(),
cleanup_probability: 0,
service: MockService::new(),
add_version_intercept: None,
}
}
fn unencrypted(&self) -> HashMap<String, (u64, String)> {
self.service
.0
.iter()
.map(|(k, v)| {
let k = k.clone();
if k == "latest" {
return (k, (v.0, String::from_utf8(v.1.to_vec()).unwrap()));
}
let version_id;
if let Some((_, v)) = Self::parse_version_name(&k) {
version_id = v;
} else if let Some(v) = Self::parse_snapshot_name(&k) {
version_id = v;
} else {
return (k, (v.0, format!("{:?}", v.1)));
}
let unsealed = self
.cryptor
.unseal(Sealed {
version_id,
payload: v.1.to_vec(),
})
.unwrap();
let vstr = String::from_utf8(unsealed.into()).unwrap();
(k, (v.0, vstr))
})
.collect()
}
}
impl Clone for CloudServer<MockService> {
fn clone(&self) -> Self {
Self {
cryptor: self.cryptor.clone(),
cleanup_probability: self.cleanup_probability,
service: self.service.clone(),
add_version_intercept: None,
}
}
}
const SECRET: &[u8] = b"testing";
async fn make_server() -> CloudServer<MockService> {
let mut server = CloudServer::new(MockService::new(), SECRET.into())
.await
.unwrap();
server.cleanup_probability = 0;
server
}
#[test]
fn version_name() {
let p = Uuid::parse_str("a1a2a3a4b1b2c1c2d1d2d3d4d5d6d7d8").unwrap();
let c = Uuid::parse_str("adcf4e350fa54e4aaf9d3f20f3ba5a32").unwrap();
assert_eq!(
CloudServer::<MockService>::version_name(&p, &c),
"v-a1a2a3a4b1b2c1c2d1d2d3d4d5d6d7d8-adcf4e350fa54e4aaf9d3f20f3ba5a32"
);
}
#[test]
fn version_name_round_trip() {
let p = Uuid::new_v4();
let c = Uuid::new_v4();
assert_eq!(
CloudServer::<MockService>::parse_version_name(
&CloudServer::<MockService>::version_name(&p, &c)
),
Some((p, c))
);
}
#[test]
fn parse_version_name_bad_prefix() {
assert_eq!(
CloudServer::<MockService>::parse_version_name(
"X-a1a2a3a4b1b2c1c2d1d2d3d4d5d6d7d8-adcf4e350fa54e4aaf9d3f20f3ba5a32"
),
None
);
}
#[test]
fn parse_version_name_bad_separator() {
assert_eq!(
CloudServer::<MockService>::parse_version_name(
"v-a1a2a3a4b1b2c1c2d1d2d3d4d5d6d7d8xadcf4e350fa54e4aaf9d3f20f3ba5a32"
),
None
);
}
#[test]
fn parse_version_name_too_short() {
assert_eq!(
CloudServer::<MockService>::parse_version_name(
"v-a1a2a3a4b1b2c1c2d1d2d3d4d5d6d7d8-adcf4e350fa54e4aaf9d3f20f3ba5a3"
),
None
);
}
#[test]
fn parse_version_name_too_long() {
assert_eq!(
CloudServer::<MockService>::parse_version_name(
"v-a1a2a3a4b1b2c1c2d1d2d3d4d5d6d7d8-adcf4e350fa54e4aaf9d3f20f3ba5a320"
),
None
);
}
#[test]
fn snapshot_name_round_trip() {
let v = Uuid::new_v4();
assert_eq!(
CloudServer::<MockService>::parse_snapshot_name(
&CloudServer::<MockService>::snapshot_name(&v)
),
Some(v)
);
}
#[test]
fn parse_snapshot_name_invalid() {
assert_eq!(
CloudServer::<MockService>::parse_snapshot_name("s-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"),
None
);
}
#[test]
fn parse_snapshot_name_bad_prefix() {
assert_eq!(
CloudServer::<MockService>::parse_snapshot_name("s:a1a2a3a4b1b2c1c2d1d2d3d4d5d6d7d8"),
None
);
}
#[test]
fn parse_snapshot_name_too_short() {
assert_eq!(
CloudServer::<MockService>::parse_snapshot_name("s-a1a2a3a4b1b2c1c2d1d2d3d4d5d6"),
None
);
}
#[test]
fn parse_snapshot_name_too_long() {
assert_eq!(
CloudServer::<MockService>::parse_snapshot_name(
"s-a1a2a3a4b1b2c1c2d1d2d3d4d5d6d7d8000"
),
None
);
}
#[tokio::test]
async fn get_salt_existing() {
let mut service = MockService::new();
assert_eq!(
CloudServer::<MockService>::get_salt(&mut service)
.await
.unwrap(),
b"abcdefghabcdefgh".to_vec()
);
}
#[tokio::test]
async fn get_salt_create() {
let mut service = MockService::new();
service.del("salt").await.unwrap();
let got_salt = CloudServer::<MockService>::get_salt(&mut service)
.await
.unwrap();
let salt_obj = service.get("salt").await.unwrap().unwrap();
assert_eq!(got_salt, salt_obj);
}
#[tokio::test]
async fn get_latest_empty() {
let mut server = make_server().await;
assert_eq!(server.get_latest().await.unwrap(), None);
}
#[tokio::test]
async fn get_latest_exists() {
let mut server = make_server().await;
let latest = Uuid::new_v4();
server.mock_set_latest(latest);
assert_eq!(server.get_latest().await.unwrap(), Some(latest));
}
#[tokio::test]
async fn get_latest_invalid() {
let mut server = make_server().await;
server
.service
.0
.insert(LATEST.into(), (999, b"not-a-uuid".to_vec()));
assert!(server.get_latest().await.is_err());
}
#[tokio::test]
async fn get_child_versions_empty() {
let mut server = make_server().await;
assert_eq!(
server
.get_child_versions(&Uuid::new_v4())
.await
.unwrap()
.len(),
0
);
}
#[tokio::test]
async fn get_child_versions_single() {
let mut server = make_server().await;
let (v1, v2) = (Uuid::new_v4(), Uuid::new_v4());
server.mock_add_version(v2, v1, 1000, b"first");
assert_eq!(server.get_child_versions(&v1).await.unwrap().len(), 0);
assert_eq!(server.get_child_versions(&v2).await.unwrap(), vec![v1]);
}
#[tokio::test]
async fn get_child_versions_multiple() {
let mut server = make_server().await;
let (v1, v2, v3) = (Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4());
server.mock_add_version(v3, v1, 1000, b"first");
server.mock_add_version(v3, v2, 1000, b"second");
assert_eq!(server.get_child_versions(&v1).await.unwrap().len(), 0);
assert_eq!(server.get_child_versions(&v2).await.unwrap().len(), 0);
let versions = server.get_child_versions(&v3).await.unwrap();
assert!(versions == vec![v1, v2] || versions == vec![v2, v1]);
}
#[tokio::test]
async fn add_version_empty() {
let mut server = make_server().await;
let parent = Uuid::new_v4();
let (res, _) = server
.add_version(parent, b"history".to_vec())
.await
.unwrap();
assert!(matches!(res, AddVersionResult::Ok(_)));
}
#[tokio::test]
async fn add_version_good() {
let mut server = make_server().await;
let (v1, v2) = (Uuid::new_v4(), Uuid::new_v4());
server.mock_add_version(v1, v2, 1000, b"first");
server.mock_set_latest(v2);
let (res, _) = server.add_version(v2, b"history".to_vec()).await.unwrap();
let AddVersionResult::Ok(new_version) = res else {
panic!("expected OK");
};
let mut expected = server.empty_clone();
expected.mock_add_version(v1, v2, 1000, b"first");
expected.mock_add_version(v2, new_version, INSERTION_TIME, b"history");
expected.mock_set_latest(new_version);
assert_eq!(server.unencrypted(), expected.unencrypted());
}
#[tokio::test]
async fn add_version_not_latest() {
let mut server = make_server().await;
let (v1, v2) = (Uuid::new_v4(), Uuid::new_v4());
server.mock_add_version(v1, v2, 1000, b"first");
server.mock_set_latest(v2);
let expected = server.clone();
let (res, _) = server.add_version(v1, b"history".to_vec()).await.unwrap();
assert_eq!(res, AddVersionResult::ExpectedParentVersion(v2));
assert_eq!(server.unencrypted(), expected.unencrypted());
}
#[tokio::test]
async fn add_version_not_latest_race() {
let mut server: CloudServer<MockService> = make_server().await;
let (v1, v2) = (Uuid::new_v4(), Uuid::new_v4());
const V3: Uuid = Uuid::max();
server.mock_add_version(v1, v2, 1000, b"first");
server.mock_add_version(v2, V3, 1000, b"second");
server.mock_set_latest(v2);
server.add_version_intercept = Some(Box::new(|service| {
Box::pin(async move {
service.put(LATEST, &version_to_bytes(V3)).await.unwrap();
})
}));
let mut expected = server.empty_clone();
expected.mock_add_version(v1, v2, 1000, b"first");
expected.mock_add_version(v2, V3, 1000, b"second");
expected.mock_set_latest(V3);
assert_ne!(server.unencrypted(), expected.unencrypted());
let (res, _) = server.add_version(v2, b"history".to_vec()).await.unwrap();
assert_eq!(res, AddVersionResult::ExpectedParentVersion(V3));
assert_eq!(server.unencrypted(), expected.unencrypted());
}
#[tokio::test]
async fn add_version_unknown() {
let mut server = make_server().await;
let (v1, v2) = (Uuid::new_v4(), Uuid::new_v4());
server.mock_add_version(v1, v2, 1000, b"first");
server.mock_set_latest(v2);
let expected = server.clone();
let (res, _) = server
.add_version(Uuid::new_v4(), b"history".to_vec())
.await
.unwrap();
assert_eq!(res, AddVersionResult::ExpectedParentVersion(v2));
assert_eq!(server.unencrypted(), expected.unencrypted());
}
#[tokio::test]
async fn get_child_version_empty() {
let mut server = make_server().await;
assert_eq!(
server.get_child_version(Uuid::new_v4()).await.unwrap(),
GetVersionResult::NoSuchVersion
);
}
#[tokio::test]
async fn get_child_version_single() {
let mut server = make_server().await;
let (v1, v2) = (Uuid::new_v4(), Uuid::new_v4());
server.mock_add_version(v2, v1, 1000, b"first");
server.mock_set_latest(v1);
assert_eq!(
server.get_child_version(v1).await.unwrap(),
GetVersionResult::NoSuchVersion
);
assert_eq!(
server.get_child_version(v2).await.unwrap(),
GetVersionResult::Version {
version_id: v1,
parent_version_id: v2,
history_segment: b"first".to_vec(),
}
);
}
#[tokio::test]
async fn get_child_version_single_invalid() {
let mut server = make_server().await;
let (v1, v2, v3) = (Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4());
server.mock_add_version(v1, v2, 1000, b"second");
server.mock_add_version(v2, v3, 1000, b"third");
server.mock_set_latest(v2);
assert_eq!(
server.get_child_version(v1).await.unwrap(),
GetVersionResult::Version {
version_id: v2,
parent_version_id: v1,
history_segment: b"second".to_vec(),
}
);
assert_eq!(
server.get_child_version(v2).await.unwrap(),
GetVersionResult::NoSuchVersion
);
}
#[tokio::test]
async fn get_child_version_multiple() {
let mut server = make_server().await;
let (v1, v2, v3) = (Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4());
let (vx, vy, vz) = (Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4());
server.mock_add_version(v1, v2, 1000, b"second");
server.mock_add_version(v1, vx, 1000, b"false start x");
server.mock_add_version(v1, vy, 1000, b"false start y");
server.mock_add_version(v2, v3, 1000, b"third");
server.mock_add_version(v2, vz, 1000, b"false start z");
server.mock_set_latest(v3);
assert_eq!(
server.get_child_version(v1).await.unwrap(),
GetVersionResult::Version {
version_id: v2,
parent_version_id: v1,
history_segment: b"second".to_vec(),
}
);
assert_eq!(
server.get_child_version(v2).await.unwrap(),
GetVersionResult::Version {
version_id: v3,
parent_version_id: v2,
history_segment: b"third".to_vec(),
}
);
assert_eq!(
server.get_child_version(v3).await.unwrap(),
GetVersionResult::NoSuchVersion
);
}
#[tokio::test]
async fn cleanup_empty() {
let mut server = make_server().await;
server.cleanup().await.unwrap();
}
#[tokio::test]
async fn cleanup_linear() {
let mut server = make_server().await;
let (v1, v2, v3) = (Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4());
server.mock_add_version(NIL_VERSION_ID, v1, 1000, b"first");
server.mock_add_version(v1, v2, 1000, b"second");
server.mock_add_version(v2, v3, 1000, b"third");
server.mock_add_snapshot(v1, 1000, b"snap 1");
server.mock_set_latest(v3);
let expected = server.clone();
server.cleanup().await.unwrap();
assert_eq!(server.unencrypted(), expected.unencrypted());
}
#[tokio::test]
async fn cleanup_cycle() {
let mut server = make_server().await;
let (v1, v2, v3) = (Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4());
server.mock_add_version(v3, v1, 1000, b"first");
server.mock_add_version(v1, v2, 1000, b"second");
server.mock_add_version(v2, v3, 1000, b"third");
server.mock_set_latest(v3);
let expected = server.clone();
assert!(server.cleanup().await.is_err());
assert_eq!(server.unencrypted(), expected.unencrypted());
}
#[tokio::test]
async fn cleanup_extra_branches() {
let mut server = make_server().await;
let (v1, v2, v3) = (Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4());
let (vx, vy) = (Uuid::new_v4(), Uuid::new_v4());
server.mock_add_version(v1, v2, 1000, b"second");
server.mock_add_version(v1, vx, 1000, b"false start x");
server.mock_add_version(v2, v3, 1000, b"third");
server.mock_add_version(v2, vy, 1000, b"false start y");
server.mock_set_latest(v3);
let mut expected = server.empty_clone();
expected.mock_add_version(v1, v2, 1000, b"second");
expected.mock_add_version(v2, v3, 1000, b"third");
expected.mock_set_latest(v3);
assert_ne!(server.unencrypted(), expected.unencrypted());
server.cleanup().await.unwrap();
assert_eq!(server.unencrypted(), expected.unencrypted());
}
#[tokio::test]
async fn cleanup_extra_snapshots() {
let mut server = make_server().await;
let (v1, v2, v3) = (Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4());
let vy = Uuid::new_v4();
server.mock_add_version(v1, v2, 1000, b"second");
server.mock_add_version(v2, v3, 1000, b"third");
server.mock_add_version(v2, vy, 1000, b"false start y");
server.mock_add_snapshot(v1, 1000, b"snap 1");
server.mock_add_snapshot(v2, 1000, b"snap 2");
server.mock_add_snapshot(vy, 1000, b"snap y");
server.mock_set_latest(v3);
let mut expected = server.empty_clone();
expected.mock_add_version(v1, v2, 1000, b"second");
expected.mock_add_version(v2, v3, 1000, b"third");
expected.mock_add_snapshot(v2, 1000, b"snap 2");
expected.mock_set_latest(v3);
assert_ne!(server.unencrypted(), expected.unencrypted());
server.cleanup().await.unwrap();
assert_eq!(server.unencrypted(), expected.unencrypted());
}
#[tokio::test]
async fn cleanup_old_versions_no_snapshot() {
let mut server = make_server().await;
let (v1, v2, v3) = (Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4());
server.mock_add_version(v1, v2, 200, b"second");
server.mock_add_version(v2, v3, 300, b"third");
server.mock_set_latest(v3);
let expected = server.clone();
server.cleanup().await.unwrap();
assert_eq!(server.unencrypted(), expected.unencrypted());
}
#[tokio::test]
async fn cleanup_old_versions_with_snapshot() {
let mut server = make_server().await;
let (v1, v2, v3) = (Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4());
let (v4, v5, v6) = (Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4());
server.mock_add_version(v1, v2, 200, b"second");
server.mock_add_version(v2, v3, 300, b"third");
server.mock_add_version(v3, v4, 1400, b"fourth");
server.mock_add_version(v4, v5, 1500, b"fifth");
server.mock_add_snapshot(v5, 1501, b"snap 1");
server.mock_add_version(v5, v6, 1600, b"sixth");
server.mock_set_latest(v6);
let mut expected = server.empty_clone();
expected.mock_add_version(v3, v4, 1400, b"fourth"); expected.mock_add_version(v4, v5, 1500, b"fifth");
expected.mock_add_snapshot(v5, 1501, b"snap 1");
expected.mock_add_version(v5, v6, 1600, b"sixth");
expected.mock_set_latest(v6);
assert_ne!(server.unencrypted(), expected.unencrypted());
server.cleanup().await.unwrap();
assert_eq!(server.unencrypted(), expected.unencrypted());
}
#[tokio::test]
async fn cleanup_old_versions_newer_than_snapshot() {
let mut server = make_server().await;
let (v1, v2, v3) = (Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4());
let (v4, v5, v6) = (Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4());
server.mock_add_version(v1, v2, 200, b"second");
server.mock_add_version(v2, v3, 300, b"third");
server.mock_add_snapshot(v3, 301, b"snap 1");
server.mock_add_version(v3, v4, 400, b"fourth");
server.mock_add_version(v4, v5, 500, b"fifth");
server.mock_add_version(v5, v6, 600, b"sixth");
server.mock_set_latest(v6);
let mut expected = server.empty_clone();
expected.mock_add_snapshot(v3, 301, b"snap 1");
expected.mock_add_version(v3, v4, 400, b"fourth");
expected.mock_add_version(v4, v5, 500, b"fifth");
expected.mock_add_version(v5, v6, 600, b"sixth");
expected.mock_set_latest(v6);
assert_ne!(server.unencrypted(), expected.unencrypted());
server.cleanup().await.unwrap();
assert_eq!(server.unencrypted(), expected.unencrypted());
}
#[tokio::test]
async fn cleanup_children_of_latest() {
let mut server = make_server().await;
let (v1, v2, v3) = (Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4());
let (vnew1, vnew2) = (Uuid::new_v4(), Uuid::new_v4());
server.mock_add_version(v1, v2, 1000, b"second");
server.mock_add_version(v2, v3, 1000, b"third");
server.mock_add_version(v3, vnew1, 1000, b"new 1");
server.mock_add_version(v3, vnew2, 1000, b"new 2");
server.mock_set_latest(v3);
let expected = server.clone();
server.cleanup().await.unwrap();
assert_eq!(server.unencrypted(), expected.unencrypted());
}
#[tokio::test]
async fn add_snapshot() {
let mut server = make_server().await;
let v = Uuid::new_v4();
let mut expected = server.empty_clone();
expected.mock_add_snapshot(v, INSERTION_TIME, b"SNAP");
assert_ne!(server.unencrypted(), expected.unencrypted());
server.add_snapshot(v, b"SNAP".to_vec()).await.unwrap();
assert_eq!(server.unencrypted(), expected.unencrypted());
}
#[tokio::test]
async fn get_snapshot_missing() {
let mut server = make_server().await;
assert_eq!(server.get_snapshot().await.unwrap(), None);
}
#[tokio::test]
async fn get_snapshot_present() {
let mut server = make_server().await;
let v = Uuid::new_v4();
server.mock_add_snapshot(v, 1000, b"SNAP");
assert_eq!(
server.get_snapshot().await.unwrap(),
Some((v, b"SNAP".to_vec()))
);
}
}