use crate::storage::{Client, Snapshot, StorageTxn};
use chrono::Utc;
use uuid::Uuid;
pub const NIL_VERSION_ID: VersionId = Uuid::nil();
const SNAPSHOT_SEARCH_LEN: i32 = 5;
pub type HistorySegment = Vec<u8>;
pub type ClientId = Uuid;
pub type VersionId = Uuid;
pub struct ServerConfig {
pub snapshot_days: i64,
pub snapshot_versions: u32,
}
impl Default for ServerConfig {
fn default() -> Self {
ServerConfig {
snapshot_days: 14,
snapshot_versions: 100,
}
}
}
impl ServerConfig {
pub fn from_args(snapshot_days: i64, snapshot_versions: u32) -> anyhow::Result<ServerConfig> {
Ok(ServerConfig {
snapshot_days,
snapshot_versions,
})
}
}
#[derive(Clone, PartialEq, Debug)]
pub enum GetVersionResult {
NotFound,
Gone,
Success {
version_id: Uuid,
parent_version_id: Uuid,
history_segment: HistorySegment,
},
}
pub fn get_child_version<'a>(
mut txn: Box<dyn StorageTxn + 'a>,
_config: &ServerConfig,
client_id: ClientId,
client: Client,
parent_version_id: VersionId,
) -> anyhow::Result<GetVersionResult> {
if let Some(version) = txn.get_version_by_parent(client_id, parent_version_id)? {
return Ok(GetVersionResult::Success {
version_id: version.version_id,
parent_version_id: version.parent_version_id,
history_segment: version.history_segment,
});
}
Ok(
if client.latest_version_id == parent_version_id
|| client.latest_version_id == NIL_VERSION_ID
{
GetVersionResult::NotFound
} else {
GetVersionResult::Gone
},
)
}
#[derive(Clone, PartialEq, Debug)]
pub enum AddVersionResult {
Ok(VersionId),
ExpectedParentVersion(VersionId),
}
#[derive(PartialEq, Debug, Clone, Copy, Eq, PartialOrd, Ord)]
pub enum SnapshotUrgency {
None,
Low,
High,
}
impl SnapshotUrgency {
fn for_days(config: &ServerConfig, days: i64) -> Self {
if days >= config.snapshot_days * 3 / 2 {
SnapshotUrgency::High
} else if days >= config.snapshot_days {
SnapshotUrgency::Low
} else {
SnapshotUrgency::None
}
}
fn for_versions_since(config: &ServerConfig, versions_since: u32) -> Self {
if versions_since >= config.snapshot_versions * 3 / 2 {
SnapshotUrgency::High
} else if versions_since >= config.snapshot_versions {
SnapshotUrgency::Low
} else {
SnapshotUrgency::None
}
}
}
pub fn add_version<'a>(
mut txn: Box<dyn StorageTxn + 'a>,
config: &ServerConfig,
client_id: ClientId,
client: Client,
parent_version_id: VersionId,
history_segment: HistorySegment,
) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> {
log::debug!(
"add_version(client_id: {}, parent_version_id: {})",
client_id,
parent_version_id,
);
if client.latest_version_id != NIL_VERSION_ID && parent_version_id != client.latest_version_id {
log::debug!("add_version request rejected: mismatched latest_version_id");
return Ok((
AddVersionResult::ExpectedParentVersion(client.latest_version_id),
SnapshotUrgency::None,
));
}
let version_id = Uuid::new_v4();
log::debug!(
"add_version request accepted: new version_id: {}",
version_id
);
txn.add_version(client_id, version_id, parent_version_id, history_segment)?;
txn.commit()?;
let time_urgency = match client.snapshot {
None => SnapshotUrgency::High,
Some(Snapshot { timestamp, .. }) => {
SnapshotUrgency::for_days(config, (Utc::now() - timestamp).num_days())
}
};
let version_urgency = match client.snapshot {
None => SnapshotUrgency::High,
Some(Snapshot { versions_since, .. }) => {
SnapshotUrgency::for_versions_since(config, versions_since)
}
};
Ok((
AddVersionResult::Ok(version_id),
std::cmp::max(time_urgency, version_urgency),
))
}
pub fn add_snapshot<'a>(
mut txn: Box<dyn StorageTxn + 'a>,
_config: &ServerConfig,
client_id: ClientId,
client: Client,
version_id: VersionId,
data: Vec<u8>,
) -> anyhow::Result<()> {
log::debug!(
"add_snapshot(client_id: {}, version_id: {})",
client_id,
version_id,
);
let last_snapshot = client.snapshot.map(|snap| snap.version_id);
if Some(version_id) == last_snapshot {
log::debug!(
"rejecting snapshot for version {}: already exists",
version_id
);
return Ok(());
}
let mut search_len = SNAPSHOT_SEARCH_LEN;
let mut vid = client.latest_version_id;
loop {
if vid == version_id && version_id != NIL_VERSION_ID {
break;
}
if Some(vid) == last_snapshot {
log::debug!(
"rejecting snapshot for version {}: newer snapshot already exists or no such version",
version_id
);
return Ok(());
}
search_len -= 1;
if search_len <= 0 || vid == NIL_VERSION_ID {
log::warn!(
"rejecting snapshot for version {}: version is too old or no such version",
version_id
);
return Ok(());
}
if let Some(parent) = txn.get_version(client_id, vid)? {
vid = parent.parent_version_id;
} else {
log::warn!(
"rejecting snapshot for version {}: newer versions have already been deleted",
version_id
);
return Ok(());
}
}
log::debug!("accepting snapshot for version {}", version_id);
txn.set_snapshot(
client_id,
Snapshot {
version_id,
timestamp: Utc::now(),
versions_since: 0,
},
data,
)?;
txn.commit()?;
Ok(())
}
pub fn get_snapshot<'a>(
mut txn: Box<dyn StorageTxn + 'a>,
_config: &ServerConfig,
client_id: ClientId,
client: Client,
) -> anyhow::Result<Option<(Uuid, Vec<u8>)>> {
Ok(if let Some(snap) = client.snapshot {
txn.get_snapshot_data(client_id, snap.version_id)?
.map(|data| (snap.version_id, data))
} else {
None
})
}
#[cfg(test)]
mod test {
use super::*;
use crate::inmemory::InMemoryStorage;
use crate::storage::{Snapshot, Storage};
use chrono::{Duration, TimeZone, Utc};
use pretty_assertions::assert_eq;
fn init_logging() {
let _ = env_logger::builder().is_test(true).try_init();
}
#[test]
fn snapshot_urgency_max() {
use SnapshotUrgency::*;
assert_eq!(std::cmp::max(None, None), None);
assert_eq!(std::cmp::max(None, Low), Low);
assert_eq!(std::cmp::max(None, High), High);
assert_eq!(std::cmp::max(Low, None), Low);
assert_eq!(std::cmp::max(Low, Low), Low);
assert_eq!(std::cmp::max(Low, High), High);
assert_eq!(std::cmp::max(High, None), High);
assert_eq!(std::cmp::max(High, Low), High);
assert_eq!(std::cmp::max(High, High), High);
}
#[test]
fn snapshot_urgency_for_days() {
use SnapshotUrgency::*;
let config = ServerConfig::default();
assert_eq!(SnapshotUrgency::for_days(&config, 0), None);
assert_eq!(
SnapshotUrgency::for_days(&config, config.snapshot_days),
Low
);
assert_eq!(
SnapshotUrgency::for_days(&config, config.snapshot_days * 2),
High
);
}
#[test]
fn snapshot_urgency_for_versions_since() {
use SnapshotUrgency::*;
let config = ServerConfig::default();
assert_eq!(SnapshotUrgency::for_versions_since(&config, 0), None);
assert_eq!(
SnapshotUrgency::for_versions_since(&config, config.snapshot_versions),
Low
);
assert_eq!(
SnapshotUrgency::for_versions_since(&config, config.snapshot_versions * 2),
High
);
}
#[test]
fn get_child_version_not_found_initial_nil() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
txn.new_client(client_id, NIL_VERSION_ID)?;
let client = txn.get_client(client_id)?.unwrap();
assert_eq!(
get_child_version(
txn,
&ServerConfig::default(),
client_id,
client,
NIL_VERSION_ID
)?,
GetVersionResult::NotFound
);
Ok(())
}
#[test]
fn get_child_version_not_found_initial_continuing() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
txn.new_client(client_id, NIL_VERSION_ID)?;
let client = txn.get_client(client_id)?.unwrap();
assert_eq!(
get_child_version(
txn,
&ServerConfig::default(),
client_id,
client,
Uuid::new_v4(),
)?,
GetVersionResult::NotFound
);
Ok(())
}
#[test]
fn get_child_version_not_found_up_to_date() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
txn.new_client(client_id, parent_version_id)?;
txn.add_version(client_id, parent_version_id, NIL_VERSION_ID, vec![])?;
let client = txn.get_client(client_id)?.unwrap();
assert_eq!(
get_child_version(
txn,
&ServerConfig::default(),
client_id,
client,
parent_version_id
)?,
GetVersionResult::NotFound
);
Ok(())
}
#[test]
fn get_child_version_gone_not_latest() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
txn.new_client(client_id, parent_version_id)?;
txn.add_version(client_id, parent_version_id, NIL_VERSION_ID, vec![])?;
let client = txn.get_client(client_id)?.unwrap();
assert_eq!(
get_child_version(
txn,
&ServerConfig::default(),
client_id,
client,
Uuid::new_v4(),
)?,
GetVersionResult::Gone
);
Ok(())
}
#[test]
fn get_child_version_found() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
let version_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let history_segment = b"abcd".to_vec();
txn.new_client(client_id, version_id)?;
txn.add_version(
client_id,
version_id,
parent_version_id,
history_segment.clone(),
)?;
let client = txn.get_client(client_id)?.unwrap();
assert_eq!(
get_child_version(
txn,
&ServerConfig::default(),
client_id,
client,
parent_version_id
)?,
GetVersionResult::Success {
version_id,
parent_version_id,
history_segment,
}
);
Ok(())
}
fn av_setup(
storage: &InMemoryStorage,
num_versions: u32,
snapshot_version: Option<u32>,
snapshot_days_ago: Option<i64>,
) -> anyhow::Result<(Uuid, Vec<Uuid>)> {
init_logging();
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
let mut versions = vec![];
let mut version_id = Uuid::nil();
txn.new_client(client_id, Uuid::nil())?;
for vnum in 0..num_versions {
let parent_version_id = version_id;
version_id = Uuid::new_v4();
versions.push(version_id);
txn.add_version(
client_id,
version_id,
parent_version_id,
vec![0, 0, vnum as u8],
)?;
if Some(vnum) == snapshot_version {
txn.set_snapshot(
client_id,
Snapshot {
version_id,
versions_since: 0,
timestamp: Utc::now() - Duration::days(snapshot_days_ago.unwrap_or(0)),
},
vec![vnum as u8],
)?;
}
}
Ok((client_id, versions))
}
fn av_success_check(
storage: &InMemoryStorage,
client_id: Uuid,
existing_versions: &[Uuid],
result: (AddVersionResult, SnapshotUrgency),
expected_history: Vec<u8>,
expected_urgency: SnapshotUrgency,
) -> anyhow::Result<()> {
if let AddVersionResult::Ok(new_version_id) = result.0 {
for v in existing_versions {
assert_ne!(&new_version_id, v);
}
let mut txn = storage.txn()?;
let client = txn.get_client(client_id)?.unwrap();
assert_eq!(client.latest_version_id, new_version_id);
let parent_version_id = existing_versions.last().cloned().unwrap_or_else(Uuid::nil);
let version = txn.get_version(client_id, new_version_id)?.unwrap();
assert_eq!(version.version_id, new_version_id);
assert_eq!(version.parent_version_id, parent_version_id);
assert_eq!(version.history_segment, expected_history);
} else {
panic!("did not get Ok from add_version: {:?}", result);
}
assert_eq!(result.1, expected_urgency);
Ok(())
}
#[test]
fn add_version_conflict() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let (client_id, versions) = av_setup(&storage, 3, None, None)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_id)?.unwrap();
assert_eq!(
add_version(
txn,
&ServerConfig::default(),
client_id,
client,
versions[1],
vec![3, 6, 9]
)?
.0,
AddVersionResult::ExpectedParentVersion(versions[2])
);
txn = storage.txn()?;
assert_eq!(
txn.get_client(client_id)?.unwrap().latest_version_id,
versions[2]
);
assert_eq!(txn.get_version_by_parent(client_id, versions[2])?, None);
Ok(())
}
#[test]
fn add_version_with_existing_history() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let (client_id, versions) = av_setup(&storage, 1, None, None)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_id)?.unwrap();
let result = add_version(
txn,
&ServerConfig::default(),
client_id,
client,
versions[0],
vec![3, 6, 9],
)?;
av_success_check(
&storage,
client_id,
&versions,
result,
vec![3, 6, 9],
SnapshotUrgency::High,
)?;
Ok(())
}
#[test]
fn add_version_with_no_history() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let (client_id, versions) = av_setup(&storage, 0, None, None)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_id)?.unwrap();
let parent_version_id = Uuid::nil();
let result = add_version(
txn,
&ServerConfig::default(),
client_id,
client,
parent_version_id,
vec![3, 6, 9],
)?;
av_success_check(
&storage,
client_id,
&versions,
result,
vec![3, 6, 9],
SnapshotUrgency::High,
)?;
Ok(())
}
#[test]
fn add_version_success_recent_snapshot() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let (client_id, versions) = av_setup(&storage, 1, Some(0), None)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_id)?.unwrap();
let result = add_version(
txn,
&ServerConfig::default(),
client_id,
client,
versions[0],
vec![1, 2, 3],
)?;
av_success_check(
&storage,
client_id,
&versions,
result,
vec![1, 2, 3],
SnapshotUrgency::None,
)?;
Ok(())
}
#[test]
fn add_version_success_aged_snapshot() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let (client_id, versions) = av_setup(&storage, 1, Some(0), Some(50))?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_id)?.unwrap();
let result = add_version(
txn,
&ServerConfig::default(),
client_id,
client,
versions[0],
vec![1, 2, 3],
)?;
av_success_check(
&storage,
client_id,
&versions,
result,
vec![1, 2, 3],
SnapshotUrgency::High,
)?;
Ok(())
}
#[test]
fn add_version_success_snapshot_many_versions_ago() -> anyhow::Result<()> {
let storage = InMemoryStorage::new();
let (client_id, versions) = av_setup(&storage, 50, Some(0), None)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_id)?.unwrap();
let result = add_version(
txn,
&ServerConfig {
snapshot_versions: 30,
..ServerConfig::default()
},
client_id,
client,
versions[49],
vec![1, 2, 3],
)?;
av_success_check(
&storage,
client_id,
&versions,
result,
vec![1, 2, 3],
SnapshotUrgency::High,
)?;
Ok(())
}
#[test]
fn add_snapshot_success_latest() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
let version_id = Uuid::new_v4();
txn.new_client(client_id, version_id)?;
txn.add_version(client_id, version_id, NIL_VERSION_ID, vec![])?;
let client = txn.get_client(client_id)?.unwrap();
add_snapshot(
txn,
&ServerConfig::default(),
client_id,
client,
version_id,
vec![1, 2, 3],
)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_id)?.unwrap();
let snapshot = client.snapshot.unwrap();
assert_eq!(snapshot.version_id, version_id);
assert_eq!(snapshot.versions_since, 0);
assert_eq!(
txn.get_snapshot_data(client_id, version_id).unwrap(),
Some(vec![1, 2, 3])
);
Ok(())
}
#[test]
fn add_snapshot_success_older() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
let version_id_1 = Uuid::new_v4();
let version_id_2 = Uuid::new_v4();
txn.new_client(client_id, version_id_2)?;
txn.add_version(client_id, version_id_1, NIL_VERSION_ID, vec![])?;
txn.add_version(client_id, version_id_2, version_id_1, vec![])?;
let client = txn.get_client(client_id)?.unwrap();
add_snapshot(
txn,
&ServerConfig::default(),
client_id,
client,
version_id_1,
vec![1, 2, 3],
)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_id)?.unwrap();
let snapshot = client.snapshot.unwrap();
assert_eq!(snapshot.version_id, version_id_1);
assert_eq!(snapshot.versions_since, 0);
assert_eq!(
txn.get_snapshot_data(client_id, version_id_1).unwrap(),
Some(vec![1, 2, 3])
);
Ok(())
}
#[test]
fn add_snapshot_fails_no_such() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
let version_id_1 = Uuid::new_v4();
let version_id_2 = Uuid::new_v4();
txn.new_client(client_id, version_id_2)?;
txn.add_version(client_id, version_id_1, NIL_VERSION_ID, vec![])?;
txn.add_version(client_id, version_id_2, version_id_1, vec![])?;
let client = txn.get_client(client_id)?.unwrap();
let version_id_unk = Uuid::new_v4();
add_snapshot(
txn,
&ServerConfig::default(),
client_id,
client,
version_id_unk,
vec![1, 2, 3],
)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_id)?.unwrap();
assert!(client.snapshot.is_none());
Ok(())
}
#[test]
fn add_snapshot_fails_too_old() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
let mut version_id = Uuid::new_v4();
let mut parent_version_id = Uuid::nil();
let mut version_ids = vec![];
txn.new_client(client_id, Uuid::nil())?;
for _ in 0..10 {
txn.add_version(client_id, version_id, parent_version_id, vec![])?;
version_ids.push(version_id);
parent_version_id = version_id;
version_id = Uuid::new_v4();
}
let client = txn.get_client(client_id)?.unwrap();
add_snapshot(
txn,
&ServerConfig::default(),
client_id,
client,
version_ids[0],
vec![1, 2, 3],
)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_id)?.unwrap();
assert!(client.snapshot.is_none());
Ok(())
}
#[test]
fn add_snapshot_fails_newer_exists() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
let mut version_id = Uuid::new_v4();
let mut parent_version_id = Uuid::nil();
let mut version_ids = vec![];
txn.new_client(client_id, Uuid::nil())?;
for _ in 0..5 {
txn.add_version(client_id, version_id, parent_version_id, vec![])?;
version_ids.push(version_id);
parent_version_id = version_id;
version_id = Uuid::new_v4();
}
txn.set_snapshot(
client_id,
Snapshot {
version_id: version_ids[2],
versions_since: 2,
timestamp: Utc.with_ymd_and_hms(2001, 9, 9, 1, 46, 40).unwrap(),
},
vec![1, 2, 3],
)?;
let client = txn.get_client(client_id)?.unwrap();
add_snapshot(
txn,
&ServerConfig::default(),
client_id,
client,
version_ids[0],
vec![9, 9, 9],
)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_id)?.unwrap();
let snapshot = client.snapshot.unwrap();
assert_eq!(snapshot.version_id, version_ids[2]);
assert_eq!(snapshot.versions_since, 2);
assert_eq!(
txn.get_snapshot_data(client_id, version_ids[2]).unwrap(),
Some(vec![1, 2, 3])
);
Ok(())
}
#[test]
fn add_snapshot_fails_nil_version() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
txn.new_client(client_id, NIL_VERSION_ID)?;
let client = txn.get_client(client_id)?.unwrap();
add_snapshot(
txn,
&ServerConfig::default(),
client_id,
client,
NIL_VERSION_ID,
vec![9, 9, 9],
)?;
let mut txn = storage.txn()?;
let client = txn.get_client(client_id)?.unwrap();
assert!(client.snapshot.is_none());
Ok(())
}
#[test]
fn get_snapshot_found() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
let data = vec![1, 2, 3];
let snapshot_version_id = Uuid::new_v4();
txn.new_client(client_id, snapshot_version_id)?;
txn.set_snapshot(
client_id,
Snapshot {
version_id: snapshot_version_id,
versions_since: 3,
timestamp: Utc.with_ymd_and_hms(2001, 9, 9, 1, 46, 40).unwrap(),
},
data.clone(),
)?;
let client = txn.get_client(client_id)?.unwrap();
assert_eq!(
get_snapshot(txn, &ServerConfig::default(), client_id, client)?,
Some((snapshot_version_id, data))
);
Ok(())
}
#[test]
fn get_snapshot_not_found() -> anyhow::Result<()> {
init_logging();
let storage = InMemoryStorage::new();
let mut txn = storage.txn()?;
let client_id = Uuid::new_v4();
txn.new_client(client_id, NIL_VERSION_ID)?;
let client = txn.get_client(client_id)?.unwrap();
assert_eq!(
get_snapshot(txn, &ServerConfig::default(), client_id, client)?,
None
);
Ok(())
}
}