use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::sync::Arc;
use bytes::Bytes;
use exoware_sdk::keys::KeyCodec;
use exoware_sdk::match_key::compile_payload_regex;
use exoware_sdk::prune_policy::{
KeysScope, OrderEncoding, PolicyScope, PrunePolicyDocument, RetainPolicy,
};
use regex::bytes::Regex;
use crate::StoreEngine;
#[derive(Debug)]
pub enum PruneError {
Engine(String),
Policy(String),
}
impl std::fmt::Display for PruneError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PruneError::Engine(s) => write!(f, "engine: {s}"),
PruneError::Policy(s) => write!(f, "policy: {s}"),
}
}
}
impl std::error::Error for PruneError {}
fn extract_order_value(payload: &[u8], regex: &Regex, scope: &KeysScope) -> Option<Vec<u8>> {
let order_by = scope.order_by.as_ref()?;
let captures = regex.captures(payload)?;
let matched = captures.name(&order_by.capture_group)?;
let raw = matched.as_bytes();
match order_by.encoding {
OrderEncoding::BytesAsc => Some(raw.to_vec()),
OrderEncoding::U64Be | OrderEncoding::I64Be => {
if raw.len() == 8 {
Some(raw.to_vec())
} else {
None
}
}
}
}
fn extract_group_key(payload: &[u8], regex: &Regex, scope: &KeysScope) -> Option<Vec<u8>> {
if scope.group_by.capture_groups.is_empty() {
return Some(Vec::new());
}
let captures = regex.captures(payload)?;
let mut group_key = Vec::new();
for group_name in &scope.group_by.capture_groups {
let matched = captures.name(group_name)?;
let bytes = matched.as_bytes();
group_key.extend_from_slice(&(bytes.len() as u32).to_be_bytes());
group_key.extend_from_slice(bytes);
}
Some(group_key)
}
struct KeyEntry {
key: Bytes,
order_value: Vec<u8>,
}
fn compare_order_values(a: &[u8], b: &[u8], scope: &KeysScope) -> Ordering {
match scope.order_by.as_ref().map(|o| &o.encoding) {
Some(OrderEncoding::U64Be) => {
let a_val = a.try_into().map(u64::from_be_bytes).unwrap_or(0);
let b_val = b.try_into().map(u64::from_be_bytes).unwrap_or(0);
a_val.cmp(&b_val)
}
Some(OrderEncoding::I64Be) => {
let a_val = a.try_into().map(i64::from_be_bytes).unwrap_or(0);
let b_val = b.try_into().map(i64::from_be_bytes).unwrap_or(0);
a_val.cmp(&b_val)
}
Some(OrderEncoding::BytesAsc) | None => a.cmp(b),
}
}
fn keys_to_delete(
mut entries: Vec<KeyEntry>,
scope: &KeysScope,
retain: &RetainPolicy,
) -> Vec<Bytes> {
entries.sort_by(|a, b| compare_order_values(&a.order_value, &b.order_value, scope));
match retain {
RetainPolicy::KeepLatest { count } => {
if entries.len() <= *count {
return Vec::new();
}
entries[..entries.len() - count]
.iter()
.map(|e| e.key.clone())
.collect()
}
RetainPolicy::GreaterThan { threshold } => {
let threshold = threshold.to_be_bytes();
entries
.iter()
.filter(|e| {
compare_order_values(&e.order_value, &threshold, scope) != Ordering::Greater
})
.map(|e| e.key.clone())
.collect()
}
RetainPolicy::GreaterThanOrEqual { threshold } => {
let threshold = threshold.to_be_bytes();
entries
.iter()
.filter(|e| {
compare_order_values(&e.order_value, &threshold, scope) == Ordering::Less
})
.map(|e| e.key.clone())
.collect()
}
RetainPolicy::DropAll => entries.iter().map(|e| e.key.clone()).collect(),
}
}
pub fn execute_prune(
engine: &Arc<dyn StoreEngine>,
document: &PrunePolicyDocument,
) -> Result<(), PruneError> {
for policy in &document.policies {
match &policy.scope {
PolicyScope::Keys(scope) => {
execute_user_keys_policy(engine, scope, &policy.retain)?;
}
PolicyScope::Sequence => {
execute_batch_log_policy(engine, &policy.retain)?;
}
}
}
Ok(())
}
fn execute_user_keys_policy(
engine: &Arc<dyn StoreEngine>,
scope: &KeysScope,
retain: &RetainPolicy,
) -> Result<(), PruneError> {
let codec = KeyCodec::new(scope.match_key.reserved_bits, scope.match_key.prefix);
let regex: Regex = compile_payload_regex(&scope.match_key.payload_regex)
.map_err(|e| PruneError::Policy(e.to_string()))?;
let (start, end) = codec.prefix_bounds();
let rows = engine
.range_scan(start.as_ref(), end.as_ref(), usize::MAX, true)
.map_err(PruneError::Engine)?;
let mut groups: BTreeMap<Vec<u8>, Vec<KeyEntry>> = BTreeMap::new();
for (key, _value) in &rows {
if !codec.matches(key) {
continue;
}
let payload_len = codec.payload_capacity_bytes_for_key_len(key.len());
let payload = match codec.read_payload(key, 0, payload_len) {
Ok(p) => p,
Err(_) => continue,
};
if !regex.is_match(&payload) {
continue;
}
let group_key = match extract_group_key(&payload, ®ex, scope) {
Some(gk) => gk,
None => continue,
};
let order_value = extract_order_value(&payload, ®ex, scope).unwrap_or_default();
groups.entry(group_key).or_default().push(KeyEntry {
key: key.clone(),
order_value,
});
}
let mut all_deletes = Vec::new();
for (_group_key, entries) in groups {
all_deletes.extend(keys_to_delete(entries, scope, retain));
}
if !all_deletes.is_empty() {
let refs: Vec<&[u8]> = all_deletes.iter().map(|k| k.as_ref()).collect();
engine.delete_batch(&refs).map_err(PruneError::Engine)?;
}
Ok(())
}
fn execute_batch_log_policy(
engine: &Arc<dyn StoreEngine>,
retain: &RetainPolicy,
) -> Result<(), PruneError> {
let current = engine.current_sequence();
let cutoff_exclusive = match retain {
RetainPolicy::KeepLatest { count } => {
let count = *count as u64;
current.saturating_add(1).saturating_sub(count)
}
RetainPolicy::GreaterThan { threshold } => threshold.saturating_add(1),
RetainPolicy::GreaterThanOrEqual { threshold } => *threshold,
RetainPolicy::DropAll => current.saturating_add(1),
};
engine
.prune_batch_log(cutoff_exclusive)
.map_err(PruneError::Engine)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use exoware_sdk::kv_codec::Utf8;
use exoware_sdk::match_key::MatchKey;
use exoware_sdk::prune_policy::{GroupBy, OrderBy};
fn make_scope() -> KeysScope {
KeysScope {
match_key: MatchKey {
reserved_bits: 4,
prefix: 1,
payload_regex: Utf8::from(
"(?s-u)^(?P<logical>(?:\\x00\\xFF|[^\\x00])*)\\x00\\x00(?P<version>.{8})$",
),
},
group_by: GroupBy {
capture_groups: vec![Utf8::from("logical")],
},
order_by: Some(OrderBy {
capture_group: Utf8::from("version"),
encoding: OrderEncoding::U64Be,
}),
}
}
fn make_entry(order: u64) -> KeyEntry {
KeyEntry {
key: Bytes::from(vec![order as u8]),
order_value: order.to_be_bytes().to_vec(),
}
}
#[test]
fn keep_latest_retains_newest() {
let scope = make_scope();
let retain = RetainPolicy::KeepLatest { count: 2 };
let entries = vec![make_entry(1), make_entry(2), make_entry(3)];
let deletes = keys_to_delete(entries, &scope, &retain);
assert_eq!(deletes.len(), 1);
assert_eq!(deletes[0].as_ref(), &[1u8]);
}
#[test]
fn keep_latest_no_delete_when_under_count() {
let scope = make_scope();
let retain = RetainPolicy::KeepLatest { count: 5 };
let entries = vec![make_entry(1), make_entry(2)];
let deletes = keys_to_delete(entries, &scope, &retain);
assert!(deletes.is_empty());
}
#[test]
fn drop_all_deletes_everything() {
let scope = make_scope();
let retain = RetainPolicy::DropAll;
let entries = vec![make_entry(1), make_entry(2)];
let deletes = keys_to_delete(entries, &scope, &retain);
assert_eq!(deletes.len(), 2);
}
#[test]
fn greater_than_threshold() {
let scope = make_scope();
let retain = RetainPolicy::GreaterThan { threshold: 5 };
let entries = vec![make_entry(3), make_entry(5), make_entry(7)];
let deletes = keys_to_delete(entries, &scope, &retain);
assert_eq!(deletes.len(), 2);
}
#[test]
fn greater_than_or_equal_threshold() {
let scope = make_scope();
let retain = RetainPolicy::GreaterThanOrEqual { threshold: 5 };
let entries = vec![make_entry(3), make_entry(5), make_entry(7)];
let deletes = keys_to_delete(entries, &scope, &retain);
assert_eq!(deletes.len(), 1);
}
}