Skip to main content

exoware_server/
prune.rs

1//! Prune execution: apply prune policies against the store.
2//!
3//! Each policy's `scope` discriminates the keyspace:
4//! - `UserKeys` — scan key family keys matching `match_key`, partition by
5//!   `group_by` capture groups, order within each group, and delete entries
6//!   that don't survive `retain`.
7//! - `BatchLog` — translate `retain` into a cutoff sequence number and call
8//!   `StoreEngine::prune_batch_log`. No key scan; no grouping.
9
10use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::sync::Arc;
13
14use bytes::Bytes;
15use exoware_sdk::keys::KeyCodec;
16use exoware_sdk::match_key::compile_payload_regex;
17use exoware_sdk::prune_policy::{
18    KeysScope, OrderEncoding, PolicyScope, PrunePolicyDocument, RetainPolicy,
19};
20use regex::bytes::Regex;
21
22use crate::StoreEngine;
23
24#[derive(Debug)]
25pub enum PruneError {
26    Engine(String),
27    Policy(String),
28}
29
30impl std::fmt::Display for PruneError {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        match self {
33            PruneError::Engine(s) => write!(f, "engine: {s}"),
34            PruneError::Policy(s) => write!(f, "policy: {s}"),
35        }
36    }
37}
38
39impl std::error::Error for PruneError {}
40
41fn extract_order_value(payload: &[u8], regex: &Regex, scope: &KeysScope) -> Option<Vec<u8>> {
42    let order_by = scope.order_by.as_ref()?;
43    let captures = regex.captures(payload)?;
44    let matched = captures.name(&order_by.capture_group)?;
45    let raw = matched.as_bytes();
46    match order_by.encoding {
47        OrderEncoding::BytesAsc => Some(raw.to_vec()),
48        OrderEncoding::U64Be | OrderEncoding::I64Be => {
49            if raw.len() == 8 {
50                Some(raw.to_vec())
51            } else {
52                None
53            }
54        }
55    }
56}
57
58fn extract_group_key(payload: &[u8], regex: &Regex, scope: &KeysScope) -> Option<Vec<u8>> {
59    if scope.group_by.capture_groups.is_empty() {
60        return Some(Vec::new());
61    }
62    let captures = regex.captures(payload)?;
63    let mut group_key = Vec::new();
64    for group_name in &scope.group_by.capture_groups {
65        let matched = captures.name(group_name)?;
66        let bytes = matched.as_bytes();
67        group_key.extend_from_slice(&(bytes.len() as u32).to_be_bytes());
68        group_key.extend_from_slice(bytes);
69    }
70    Some(group_key)
71}
72
73struct KeyEntry {
74    key: Bytes,
75    order_value: Vec<u8>,
76}
77
78fn compare_order_values(a: &[u8], b: &[u8], scope: &KeysScope) -> Ordering {
79    match scope.order_by.as_ref().map(|o| &o.encoding) {
80        Some(OrderEncoding::U64Be) => {
81            let a_val = a.try_into().map(u64::from_be_bytes).unwrap_or(0);
82            let b_val = b.try_into().map(u64::from_be_bytes).unwrap_or(0);
83            a_val.cmp(&b_val)
84        }
85        Some(OrderEncoding::I64Be) => {
86            let a_val = a.try_into().map(i64::from_be_bytes).unwrap_or(0);
87            let b_val = b.try_into().map(i64::from_be_bytes).unwrap_or(0);
88            a_val.cmp(&b_val)
89        }
90        Some(OrderEncoding::BytesAsc) | None => a.cmp(b),
91    }
92}
93
94fn keys_to_delete(
95    mut entries: Vec<KeyEntry>,
96    scope: &KeysScope,
97    retain: &RetainPolicy,
98) -> Vec<Bytes> {
99    entries.sort_by(|a, b| compare_order_values(&a.order_value, &b.order_value, scope));
100
101    match retain {
102        RetainPolicy::KeepLatest { count } => {
103            if entries.len() <= *count {
104                return Vec::new();
105            }
106            entries[..entries.len() - count]
107                .iter()
108                .map(|e| e.key.clone())
109                .collect()
110        }
111        RetainPolicy::GreaterThan { threshold } => {
112            let threshold = threshold.to_be_bytes();
113            entries
114                .iter()
115                .filter(|e| {
116                    compare_order_values(&e.order_value, &threshold, scope) != Ordering::Greater
117                })
118                .map(|e| e.key.clone())
119                .collect()
120        }
121        RetainPolicy::GreaterThanOrEqual { threshold } => {
122            let threshold = threshold.to_be_bytes();
123            entries
124                .iter()
125                .filter(|e| {
126                    compare_order_values(&e.order_value, &threshold, scope) == Ordering::Less
127                })
128                .map(|e| e.key.clone())
129                .collect()
130        }
131        RetainPolicy::DropAll => entries.iter().map(|e| e.key.clone()).collect(),
132    }
133}
134
135pub fn execute_prune(
136    engine: &Arc<dyn StoreEngine>,
137    document: &PrunePolicyDocument,
138) -> Result<(), PruneError> {
139    for policy in &document.policies {
140        match &policy.scope {
141            PolicyScope::Keys(scope) => {
142                execute_user_keys_policy(engine, scope, &policy.retain)?;
143            }
144            PolicyScope::Sequence => {
145                execute_batch_log_policy(engine, &policy.retain)?;
146            }
147        }
148    }
149    Ok(())
150}
151
152fn execute_user_keys_policy(
153    engine: &Arc<dyn StoreEngine>,
154    scope: &KeysScope,
155    retain: &RetainPolicy,
156) -> Result<(), PruneError> {
157    let codec = KeyCodec::new(scope.match_key.reserved_bits, scope.match_key.prefix);
158    let regex: Regex = compile_payload_regex(&scope.match_key.payload_regex)
159        .map_err(|e| PruneError::Policy(e.to_string()))?;
160
161    let (start, end) = codec.prefix_bounds();
162    let rows = engine
163        .range_scan(start.as_ref(), end.as_ref(), usize::MAX, true)
164        .map_err(PruneError::Engine)?;
165
166    let mut groups: BTreeMap<Vec<u8>, Vec<KeyEntry>> = BTreeMap::new();
167
168    for (key, _value) in &rows {
169        if !codec.matches(key) {
170            continue;
171        }
172        let payload_len = codec.payload_capacity_bytes_for_key_len(key.len());
173        let payload = match codec.read_payload(key, 0, payload_len) {
174            Ok(p) => p,
175            Err(_) => continue,
176        };
177        if !regex.is_match(&payload) {
178            continue;
179        }
180
181        let group_key = match extract_group_key(&payload, &regex, scope) {
182            Some(gk) => gk,
183            None => continue,
184        };
185
186        let order_value = extract_order_value(&payload, &regex, scope).unwrap_or_default();
187
188        groups.entry(group_key).or_default().push(KeyEntry {
189            key: key.clone(),
190            order_value,
191        });
192    }
193
194    let mut all_deletes = Vec::new();
195    for (_group_key, entries) in groups {
196        all_deletes.extend(keys_to_delete(entries, scope, retain));
197    }
198
199    if !all_deletes.is_empty() {
200        let refs: Vec<&[u8]> = all_deletes.iter().map(|k| k.as_ref()).collect();
201        engine.delete_batch(&refs).map_err(PruneError::Engine)?;
202    }
203
204    Ok(())
205}
206
207fn execute_batch_log_policy(
208    engine: &Arc<dyn StoreEngine>,
209    retain: &RetainPolicy,
210) -> Result<(), PruneError> {
211    let current = engine.current_sequence();
212    let cutoff_exclusive = match retain {
213        RetainPolicy::KeepLatest { count } => {
214            // Keep the last N batches: cutoff = current + 1 - N (saturating).
215            let count = *count as u64;
216            current.saturating_add(1).saturating_sub(count)
217        }
218        RetainPolicy::GreaterThan { threshold } => threshold.saturating_add(1),
219        RetainPolicy::GreaterThanOrEqual { threshold } => *threshold,
220        RetainPolicy::DropAll => current.saturating_add(1),
221    };
222
223    engine
224        .prune_batch_log(cutoff_exclusive)
225        .map_err(PruneError::Engine)?;
226    Ok(())
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use exoware_sdk::kv_codec::Utf8;
233    use exoware_sdk::match_key::MatchKey;
234    use exoware_sdk::prune_policy::{GroupBy, OrderBy};
235
236    fn make_scope() -> KeysScope {
237        KeysScope {
238            match_key: MatchKey {
239                reserved_bits: 4,
240                prefix: 1,
241                payload_regex: Utf8::from(
242                    "(?s-u)^(?P<logical>(?:\\x00\\xFF|[^\\x00])*)\\x00\\x00(?P<version>.{8})$",
243                ),
244            },
245            group_by: GroupBy {
246                capture_groups: vec![Utf8::from("logical")],
247            },
248            order_by: Some(OrderBy {
249                capture_group: Utf8::from("version"),
250                encoding: OrderEncoding::U64Be,
251            }),
252        }
253    }
254
255    fn make_entry(order: u64) -> KeyEntry {
256        KeyEntry {
257            key: Bytes::from(vec![order as u8]),
258            order_value: order.to_be_bytes().to_vec(),
259        }
260    }
261
262    #[test]
263    fn keep_latest_retains_newest() {
264        let scope = make_scope();
265        let retain = RetainPolicy::KeepLatest { count: 2 };
266        let entries = vec![make_entry(1), make_entry(2), make_entry(3)];
267        let deletes = keys_to_delete(entries, &scope, &retain);
268        assert_eq!(deletes.len(), 1);
269        assert_eq!(deletes[0].as_ref(), &[1u8]);
270    }
271
272    #[test]
273    fn keep_latest_no_delete_when_under_count() {
274        let scope = make_scope();
275        let retain = RetainPolicy::KeepLatest { count: 5 };
276        let entries = vec![make_entry(1), make_entry(2)];
277        let deletes = keys_to_delete(entries, &scope, &retain);
278        assert!(deletes.is_empty());
279    }
280
281    #[test]
282    fn drop_all_deletes_everything() {
283        let scope = make_scope();
284        let retain = RetainPolicy::DropAll;
285        let entries = vec![make_entry(1), make_entry(2)];
286        let deletes = keys_to_delete(entries, &scope, &retain);
287        assert_eq!(deletes.len(), 2);
288    }
289
290    #[test]
291    fn greater_than_threshold() {
292        let scope = make_scope();
293        let retain = RetainPolicy::GreaterThan { threshold: 5 };
294        let entries = vec![make_entry(3), make_entry(5), make_entry(7)];
295        let deletes = keys_to_delete(entries, &scope, &retain);
296        assert_eq!(deletes.len(), 2);
297    }
298
299    #[test]
300    fn greater_than_or_equal_threshold() {
301        let scope = make_scope();
302        let retain = RetainPolicy::GreaterThanOrEqual { threshold: 5 };
303        let entries = vec![make_entry(3), make_entry(5), make_entry(7)];
304        let deletes = keys_to_delete(entries, &scope, &retain);
305        assert_eq!(deletes.len(), 1);
306    }
307}