use std::fmt::{Display, Formatter, Write};
use std::num::NonZeroU64;
use nom::{
IResult, Offset,
branch::alt,
bytes::complete::tag,
character::complete::{digit1, multispace0},
combinator::{all_consuming, cut, map, map_res, opt, recognize},
error::{VerboseError, VerboseErrorKind, context},
multi::separated_list0,
sequence::{terminated, tuple},
};
use super::{Cursor, Error, KeyRef, KeyValuePair};
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum GarbageCollectionPolicy {
Versions {
number: NonZeroU64,
},
Expires {
micros: NonZeroU64,
},
Any(Vec<GarbageCollectionPolicy>),
All(Vec<GarbageCollectionPolicy>),
}
impl GarbageCollectionPolicy {
pub fn collector<C: Cursor + 'static>(
&self,
cursor: C,
now_micros: u64,
) -> Result<GarbageCollector, Error> {
let cursor: Box<dyn Cursor> = Box::new(cursor) as _;
let determiner = self.determiner(now_micros);
let key_backing = if let Some(key) = cursor.key() {
key.key.to_vec()
} else {
vec![]
};
let key_return = None;
Ok(GarbageCollector {
cursor,
determiner,
key_backing,
key_return,
})
}
fn determiner(&self, now_micros: u64) -> Box<dyn Determiner> {
match self {
Self::Versions { number } => Box::new(VersionsDeterminer::new(*number)),
Self::Expires { micros } => {
let threshold = now_micros.saturating_sub(micros.get());
Box::new(ExpiresDeterminer::new(threshold))
}
Self::Any(any) => {
let any = any
.iter()
.map(|p| p.determiner(now_micros))
.collect::<Vec<_>>();
Box::new(AnyDeterminer::new(any))
}
Self::All(all) => {
let all = all
.iter()
.map(|p| p.determiner(now_micros))
.collect::<Vec<_>>();
Box::new(AllDeterminer::new(all))
}
}
}
}
impl TryFrom<&str> for GarbageCollectionPolicy {
type Error = ParseError;
fn try_from(input: &str) -> Result<GarbageCollectionPolicy, ParseError> {
parse_all(gc_policy)(input)
}
}
impl std::str::FromStr for GarbageCollectionPolicy {
type Err = ParseError;
fn from_str(input: &str) -> Result<GarbageCollectionPolicy, ParseError> {
GarbageCollectionPolicy::try_from(input)
}
}
impl Display for GarbageCollectionPolicy {
fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
Self::Versions { number } => {
write!(fmt, "versions = {number}")
}
Self::Expires { micros } => {
write!(fmt, "ttl_micros = {micros}")
}
Self::Any(any) => {
write!(
fmt,
"any({})",
any.iter()
.map(|gc| gc.to_string())
.collect::<Vec<_>>()
.join(", ")
)
}
Self::All(all) => {
write!(
fmt,
"all({})",
all.iter()
.map(|gc| gc.to_string())
.collect::<Vec<_>>()
.join(", ")
)
}
}
}
}
pub struct GarbageCollector {
cursor: Box<dyn Cursor>,
determiner: Box<dyn Determiner>,
key_backing: Vec<u8>,
key_return: Option<u64>,
}
impl GarbageCollector {
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Result<Option<KeyRef<'_>>, Error> {
if let Some(ts) = self.key_return.take() {
return Ok(Some(KeyRef {
key: &self.key_backing,
timestamp: ts,
}));
}
'iterating: loop {
let mut tombstones = vec![];
let mut kvp = match self.cursor.key_value() {
Some(kvr) => KeyValuePair::from(kvr),
None => {
break 'iterating;
}
};
while self.key_backing == kvp.key {
if kvp.value.is_some() {
self.cursor.next()?;
if self.determiner.retain(&kvp.key, &tombstones, kvp.timestamp) {
return self.return_key(kvp, tombstones);
} else {
continue 'iterating;
}
}
tombstones.push(kvp.timestamp);
self.cursor.next()?;
kvp = match self.cursor.key_value() {
Some(kvr) => KeyValuePair::from(kvr),
None => {
break 'iterating;
}
};
}
self.key_backing.resize(kvp.key.len(), 0);
self.key_backing.copy_from_slice(&kvp.key);
}
Ok(None)
}
fn return_key(
&mut self,
kvp: KeyValuePair,
tombstones: Vec<u64>,
) -> Result<Option<KeyRef<'_>>, Error> {
if !tombstones.is_empty() {
self.key_return = Some(kvp.timestamp);
Ok(Some(KeyRef {
key: &self.key_backing,
timestamp: tombstones[tombstones.len() - 1],
}))
} else {
self.key_return = None;
Ok(Some(KeyRef {
key: &self.key_backing,
timestamp: kvp.timestamp,
}))
}
}
}
type ParseResult<'a, T> = IResult<&'a str, T, VerboseError<&'a str>>;
#[derive(Clone, Eq, PartialEq)]
pub struct ParseError {
string: String,
}
impl std::fmt::Debug for ParseError {
fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
writeln!(fmt, "{}", self.string)
}
}
impl std::fmt::Display for ParseError {
fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
writeln!(fmt, "{}", self.string)
}
}
impl From<String> for ParseError {
fn from(string: String) -> Self {
Self { string }
}
}
fn interpret_verbose_error(input: &'_ str, err: VerboseError<&'_ str>) -> ParseError {
let mut result = String::new();
let mut index = 0;
for (substring, kind) in err.errors.iter() {
let offset = input.offset(substring);
let prefix = &input.as_bytes()[..offset];
let line_number = prefix.iter().filter(|&&b| b == b'\n').count() + 1;
let line_begin = prefix
.iter()
.rev()
.position(|&b| b == b'\n')
.map(|pos| offset - pos)
.unwrap_or(0);
let line = input[line_begin..]
.lines()
.next()
.unwrap_or(&input[line_begin..])
.trim_end();
let column_number = line.offset(substring) + 1;
match kind {
VerboseErrorKind::Char(c) => {
if let Some(actual) = substring.chars().next() {
write!(
&mut result,
"{index}: at line {line_number}:\n\
{line}\n\
{caret:>column$}\n\
expected '{expected}', found {actual}\n\n",
index = index,
line_number = line_number,
line = line,
caret = '^',
column = column_number,
expected = c,
actual = actual,
)
.unwrap();
} else {
write!(
&mut result,
"{index}: at line {line_number}:\n\
{line}\n\
{caret:>column$}\n\
expected '{expected}', got end of input\n\n",
index = index,
line_number = line_number,
line = line,
caret = '^',
column = column_number,
expected = c,
)
.unwrap();
}
index += 1;
}
VerboseErrorKind::Context(s) => {
write!(
&mut result,
"{index}: at line {line_number}, in {context}:\n\
{line}\n\
{caret:>column$}\n\n",
index = index,
line_number = line_number,
context = s,
line = line,
caret = '^',
column = column_number,
)
.unwrap();
index += 1;
}
VerboseErrorKind::Nom(_) => {}
};
}
ParseError {
string: result.trim().to_string(),
}
}
fn parse_all<T, F: Fn(&str) -> ParseResult<T> + Copy>(
f: F,
) -> impl Fn(&str) -> Result<T, ParseError> {
move |input| {
let (rem, t) = match all_consuming(f)(input) {
Ok((rem, t)) => (rem, t),
Err(err) => match err {
nom::Err::Incomplete(_) => {
panic!("all_consuming combinator should be all consuming");
}
nom::Err::Error(err) | nom::Err::Failure(err) => {
return Err(interpret_verbose_error(input, err));
}
},
};
if rem.is_empty() {
Ok(t)
} else {
panic!("all_consuming combinator should be all consuming");
}
}
}
fn ws0(input: &str) -> ParseResult<'_, ()> {
map(multispace0, |_| ())(input)
}
fn parse_number(input: &str) -> Result<NonZeroU64, &'static str> {
if let Ok(x) = str::parse::<u64>(input) {
if let Some(x) = NonZeroU64::new(x) {
Ok(x)
} else {
Err("must have non-zero number of versions")
}
} else {
Err("invalid number")
}
}
fn number_literal(input: &str) -> ParseResult<'_, NonZeroU64> {
context(
"number literal",
map_res(recognize(tuple((opt(tag("-")), digit1))), parse_number),
)(input)
}
fn versions(input: &str) -> ParseResult<'_, GarbageCollectionPolicy> {
context(
"versions",
map(
tuple((
ws0,
tag("versions"),
cut(ws0),
cut(tag("=")),
cut(ws0),
cut(number_literal),
cut(ws0),
)),
|(_, _, _, _, _, number, _)| GarbageCollectionPolicy::Versions { number },
),
)(input)
}
fn expires(input: &str) -> ParseResult<'_, GarbageCollectionPolicy> {
context(
"expires",
map(
tuple((
ws0,
tag("ttl_micros"),
cut(ws0),
cut(tag("=")),
cut(ws0),
cut(number_literal),
cut(ws0),
)),
|(_, _, _, _, _, micros, _)| GarbageCollectionPolicy::Expires { micros },
),
)(input)
}
fn any(input: &str) -> ParseResult<'_, GarbageCollectionPolicy> {
context(
"any",
map(
tuple((
ws0,
tag("any"),
cut(ws0),
cut(tag("(")),
cut(ws0),
terminated(separated_list0(tag(","), gc_policy), opt(tag(","))),
cut(ws0),
cut(tag(")")),
cut(ws0),
)),
|(_, _, _, _, _, any, _, _, _)| GarbageCollectionPolicy::Any(any),
),
)(input)
}
fn all(input: &str) -> ParseResult<'_, GarbageCollectionPolicy> {
context(
"all",
map(
tuple((
ws0,
tag("all"),
cut(ws0),
cut(tag("(")),
cut(ws0),
terminated(separated_list0(tag(","), gc_policy), opt(tag(","))),
cut(ws0),
cut(tag(")")),
cut(ws0),
)),
|(_, _, _, _, _, all, _, _, _)| GarbageCollectionPolicy::All(all),
),
)(input)
}
fn gc_policy(input: &str) -> ParseResult<'_, GarbageCollectionPolicy> {
context(
"garbage collection policy",
alt((versions, expires, any, all)),
)(input)
}
pub trait Determiner {
fn retain(&mut self, key: &[u8], tombstones: &[u64], exists: u64) -> bool;
}
#[derive(Debug)]
struct VersionsDeterminer {
number: NonZeroU64,
key: Vec<u8>,
count: u64,
}
impl VersionsDeterminer {
fn new(number: NonZeroU64) -> Self {
Self {
number,
key: vec![],
count: 0,
}
}
}
impl Determiner for VersionsDeterminer {
fn retain(&mut self, key: &[u8], tombstones: &[u64], _: u64) -> bool {
if self.key != key {
self.key.resize(key.len(), 0);
self.key.copy_from_slice(key);
if tombstones.is_empty() {
self.count = 1;
true
} else {
self.count = 2;
self.count <= self.number.get()
}
} else {
if tombstones.is_empty() {
self.count += 1;
} else {
self.count += 2;
}
self.count <= self.number.get()
}
}
}
struct ExpiresDeterminer {
threshold: u64,
}
impl ExpiresDeterminer {
fn new(threshold: u64) -> Self {
Self { threshold }
}
}
impl Determiner for ExpiresDeterminer {
fn retain(&mut self, _: &[u8], _: &[u64], exists: u64) -> bool {
exists >= self.threshold
}
}
struct AnyDeterminer {
any: Vec<Box<dyn Determiner>>,
}
impl AnyDeterminer {
fn new(any: Vec<Box<dyn Determiner>>) -> Self {
Self { any }
}
}
impl Determiner for AnyDeterminer {
fn retain(&mut self, key: &[u8], tombstones: &[u64], exists: u64) -> bool {
let mut retain = false;
for d in self.any.iter_mut() {
retain |= d.retain(key, tombstones, exists);
}
retain
}
}
struct AllDeterminer {
all: Vec<Box<dyn Determiner>>,
}
impl AllDeterminer {
fn new(all: Vec<Box<dyn Determiner>>) -> Self {
Self { all }
}
}
impl Determiner for AllDeterminer {
fn retain(&mut self, key: &[u8], tombstones: &[u64], exists: u64) -> bool {
let mut retain = true;
for d in self.all.iter_mut() {
retain &= d.retain(key, tombstones, exists);
}
retain
}
}
#[cfg(test)]
mod tests {
use super::*;
mod policy {
use super::*;
#[test]
fn versions0() {
assert!(GarbageCollectionPolicy::try_from("versions = 0").is_err());
}
#[test]
fn versions1() {
const POLICY: &str = "versions = 1";
assert_eq!(
POLICY,
GarbageCollectionPolicy::try_from(POLICY)
.unwrap()
.to_string()
);
assert_eq!(
GarbageCollectionPolicy::Versions {
number: NonZeroU64::new(1).unwrap()
},
GarbageCollectionPolicy::try_from(POLICY).unwrap()
);
}
#[test]
fn versions42() {
const POLICY: &str = "versions = 42";
assert_eq!(
GarbageCollectionPolicy::Versions {
number: NonZeroU64::new(42).unwrap()
},
GarbageCollectionPolicy::try_from(POLICY).unwrap()
);
}
#[test]
fn expires0() {
assert!(GarbageCollectionPolicy::try_from("ttl_micros = 0").is_err());
}
#[test]
fn expires1() {
const POLICY: &str = "ttl_micros = 1";
assert_eq!(
POLICY,
GarbageCollectionPolicy::try_from(POLICY)
.unwrap()
.to_string()
);
assert_eq!(
GarbageCollectionPolicy::Expires {
micros: NonZeroU64::new(1).unwrap()
},
GarbageCollectionPolicy::try_from(POLICY).unwrap()
);
}
#[test]
fn expires42() {
const POLICY: &str = "ttl_micros = 42";
assert_eq!(
POLICY,
GarbageCollectionPolicy::try_from(POLICY)
.unwrap()
.to_string()
);
assert_eq!(
GarbageCollectionPolicy::Expires {
micros: NonZeroU64::new(42).unwrap()
},
GarbageCollectionPolicy::try_from(POLICY).unwrap()
);
}
#[test]
fn any() {
const POLICY: &str = "any(versions = 1, ttl_micros = 42)";
assert_eq!(
POLICY,
GarbageCollectionPolicy::try_from(POLICY)
.unwrap()
.to_string()
);
let policy = GarbageCollectionPolicy::Any(vec![
GarbageCollectionPolicy::Versions {
number: NonZeroU64::new(1).unwrap(),
},
GarbageCollectionPolicy::Expires {
micros: NonZeroU64::new(42).unwrap(),
},
]);
assert_eq!(policy, GarbageCollectionPolicy::try_from(POLICY).unwrap());
}
#[test]
fn all() {
const POLICY: &str = "all(versions = 1, ttl_micros = 42)";
assert_eq!(
POLICY,
GarbageCollectionPolicy::try_from(POLICY)
.unwrap()
.to_string()
);
let policy = GarbageCollectionPolicy::All(vec![
GarbageCollectionPolicy::Versions {
number: NonZeroU64::new(1).unwrap(),
},
GarbageCollectionPolicy::Expires {
micros: NonZeroU64::new(42).unwrap(),
},
]);
assert_eq!(policy, GarbageCollectionPolicy::try_from(POLICY).unwrap());
}
}
#[derive(Debug, Default)]
struct SampleCursor {
entries: Vec<KeyValuePair>,
index: usize,
}
impl Cursor for SampleCursor {
fn next(&mut self) -> Result<(), Error> {
if self.index < self.entries.len() {
self.index += 1;
}
Ok(())
}
fn key(&self) -> Option<KeyRef<'_>> {
if self.index < self.entries.len() {
Some(KeyRef::from(&self.entries[self.index]))
} else {
None
}
}
fn value(&self) -> Option<&[u8]> {
if self.index < self.entries.len() {
self.entries[self.index].value.as_deref()
} else {
None
}
}
fn seek_to_first(&mut self) -> Result<(), Error> {
unimplemented!()
}
fn seek_to_last(&mut self) -> Result<(), Error> {
unimplemented!()
}
fn seek(&mut self, _: &[u8]) -> Result<(), Error> {
unimplemented!()
}
fn prev(&mut self) -> Result<(), Error> {
unimplemented!()
}
}
macro_rules! sample_cursor {
() => {
SampleCursor::default()
};
($($key:literal @ $ts:literal => $val:expr,)*) => {
{
let mut cursor = SampleCursor::default();
$(
let v: Option::<&[u8]> = $val;
cursor.entries.push(KeyValuePair {
key: $key.to_vec(),
timestamp: $ts,
value: v.map(|v| v.to_vec()),
});
)*
cursor
}
};
}
fn test_expectation(
keys: SampleCursor,
mut expect: SampleCursor,
policy: &str,
now_micros: u64,
) {
let policy = GarbageCollectionPolicy::try_from(policy).unwrap();
let mut collector = policy.collector(keys, now_micros).unwrap();
loop {
let exp = expect.key();
let got = collector.next().unwrap();
match (&exp, &got) {
(Some(exp), Some(got)) => {
assert_eq!(exp, got);
}
(None, None) => {
break;
}
(Some(exp), None) => {
panic!("dropped too much data: {exp:?}");
}
(None, Some(got)) => {
panic!("retained too much data: {got:?}");
}
}
expect.next().unwrap();
}
}
#[test]
fn versions_example1() {
let cursor = sample_cursor! {
b"key" @ 4 => None,
b"key" @ 3 => None,
b"key" @ 2 => None,
b"key" @ 1 => Some(b"value"),
};
let expectation = sample_cursor! {
b"key" @ 2 => None,
b"key" @ 1 => Some(b"value"),
};
let policy = "versions = 2";
test_expectation(cursor, expectation, policy, 4);
}
#[test]
fn versions_example2() {
let cursor = sample_cursor! {
b"key" @ 4 => Some(b"value"),
b"key" @ 3 => None,
b"key" @ 2 => None,
b"key" @ 1 => None,
};
let expectation = sample_cursor! {
b"key" @ 4 => Some(b"value"),
};
let policy = "versions = 2";
test_expectation(cursor, expectation, policy, 4);
}
#[test]
fn expires_example1() {
let cursor = sample_cursor! {
b"key" @ 4 => Some(b"value"),
b"key" @ 3 => None,
b"key" @ 2 => None,
b"key" @ 1 => Some(b"drop"),
};
let expectation = sample_cursor! {
b"key" @ 4 => Some(b"value"),
};
let policy = "ttl_micros = 2";
test_expectation(cursor, expectation, policy, 4);
}
#[test]
fn expires_example2() {
let cursor = sample_cursor! {
b"key" @ 4 => None,
b"key" @ 3 => None,
b"key" @ 2 => None,
b"key" @ 1 => Some(b"drop"),
};
let expectation = sample_cursor! {};
let policy = "ttl_micros = 2";
test_expectation(cursor, expectation, policy, 4);
}
}