Skip to main content

exoware_server/
prune.rs

1//! Prune execution: apply prune policies against the store.
2//!
3//! For each policy, scan keys matching the policy's prefix family, filter by payload regex,
4//! group by capture groups, order within groups, and delete keys that do not survive the
5//! retain policy.
6
7use std::cmp::Ordering;
8use std::collections::BTreeMap;
9use std::sync::Arc;
10
11use bytes::Bytes;
12use exoware_sdk_rs::keys::KeyCodec;
13use exoware_sdk_rs::prune_policy::{
14    compile_payload_regex, OrderEncoding, PrunePolicy, PrunePolicyDocument, RetainPolicy,
15};
16use regex::bytes::Regex;
17
18use crate::StoreEngine;
19
20#[derive(Debug)]
21pub enum PruneError {
22    Engine(String),
23    Policy(String),
24}
25
26impl std::fmt::Display for PruneError {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        match self {
29            PruneError::Engine(s) => write!(f, "engine: {s}"),
30            PruneError::Policy(s) => write!(f, "policy: {s}"),
31        }
32    }
33}
34
35impl std::error::Error for PruneError {}
36
37fn extract_order_value(payload: &[u8], regex: &Regex, policy: &PrunePolicy) -> Option<Vec<u8>> {
38    let order_by = policy.order_by.as_ref()?;
39    let captures = regex.captures(payload)?;
40    let matched = captures.name(&order_by.capture_group)?;
41    let raw = matched.as_bytes();
42    match order_by.encoding {
43        OrderEncoding::BytesAsc => Some(raw.to_vec()),
44        OrderEncoding::U64Be => {
45            if raw.len() == 8 {
46                Some(raw.to_vec())
47            } else {
48                None
49            }
50        }
51        OrderEncoding::I64Be => {
52            if raw.len() == 8 {
53                Some(raw.to_vec())
54            } else {
55                None
56            }
57        }
58    }
59}
60
61fn extract_group_key(payload: &[u8], regex: &Regex, policy: &PrunePolicy) -> Option<Vec<u8>> {
62    if policy.group_by.capture_groups.is_empty() {
63        return Some(Vec::new());
64    }
65    let captures = regex.captures(payload)?;
66    let mut group_key = Vec::new();
67    for group_name in &policy.group_by.capture_groups {
68        let matched = captures.name(group_name)?;
69        let bytes = matched.as_bytes();
70        group_key.extend_from_slice(&(bytes.len() as u32).to_be_bytes());
71        group_key.extend_from_slice(bytes);
72    }
73    Some(group_key)
74}
75
76struct KeyEntry {
77    key: Bytes,
78    order_value: Vec<u8>,
79}
80
81fn compare_order_values(a: &[u8], b: &[u8], policy: &PrunePolicy) -> Ordering {
82    match policy.order_by.as_ref().map(|o| &o.encoding) {
83        Some(OrderEncoding::U64Be) => {
84            let a_val = a.try_into().map(u64::from_be_bytes).unwrap_or(0);
85            let b_val = b.try_into().map(u64::from_be_bytes).unwrap_or(0);
86            a_val.cmp(&b_val)
87        }
88        Some(OrderEncoding::I64Be) => {
89            let a_val = a.try_into().map(i64::from_be_bytes).unwrap_or(0);
90            let b_val = b.try_into().map(i64::from_be_bytes).unwrap_or(0);
91            a_val.cmp(&b_val)
92        }
93        Some(OrderEncoding::BytesAsc) | None => a.cmp(b),
94    }
95}
96
97fn keys_to_delete(mut entries: Vec<KeyEntry>, policy: &PrunePolicy) -> Vec<Bytes> {
98    entries.sort_by(|a, b| compare_order_values(&a.order_value, &b.order_value, policy));
99
100    match &policy.retain {
101        RetainPolicy::KeepLatest { count } => {
102            if entries.len() <= *count {
103                return Vec::new();
104            }
105            entries[..entries.len() - count]
106                .iter()
107                .map(|e| e.key.clone())
108                .collect()
109        }
110        RetainPolicy::GreaterThan { threshold } => {
111            let threshold = threshold.to_be_bytes();
112            entries
113                .iter()
114                .filter(|e| {
115                    compare_order_values(&e.order_value, &threshold, policy) != Ordering::Greater
116                })
117                .map(|e| e.key.clone())
118                .collect()
119        }
120        RetainPolicy::GreaterThanOrEqual { threshold } => {
121            let threshold = threshold.to_be_bytes();
122            entries
123                .iter()
124                .filter(|e| {
125                    compare_order_values(&e.order_value, &threshold, policy) == Ordering::Less
126                })
127                .map(|e| e.key.clone())
128                .collect()
129        }
130        RetainPolicy::DropAll => entries.iter().map(|e| e.key.clone()).collect(),
131    }
132}
133
134pub fn execute_prune(
135    engine: &Arc<dyn StoreEngine>,
136    document: &PrunePolicyDocument,
137) -> Result<(), PruneError> {
138    for policy in &document.policies {
139        execute_single_policy(engine, policy)?;
140    }
141    Ok(())
142}
143
144fn execute_single_policy(
145    engine: &Arc<dyn StoreEngine>,
146    policy: &PrunePolicy,
147) -> Result<(), PruneError> {
148    let codec = KeyCodec::new(policy.match_key.reserved_bits, policy.match_key.prefix);
149    let regex = compile_payload_regex(&policy.match_key.payload_regex)
150        .map_err(|e| PruneError::Policy(e.to_string()))?;
151
152    let (start, end) = codec.prefix_bounds();
153    let rows = engine
154        .range_scan(start.as_ref(), end.as_ref(), usize::MAX, true)
155        .map_err(PruneError::Engine)?;
156
157    let mut groups: BTreeMap<Vec<u8>, Vec<KeyEntry>> = BTreeMap::new();
158
159    for (key, _value) in &rows {
160        if !codec.matches(key) {
161            continue;
162        }
163        let payload_len = codec.payload_capacity_bytes_for_key_len(key.len());
164        let payload = match codec.read_payload(key, 0, payload_len) {
165            Ok(p) => p,
166            Err(_) => continue,
167        };
168        if !regex.is_match(&payload) {
169            continue;
170        }
171
172        let group_key = match extract_group_key(&payload, &regex, policy) {
173            Some(gk) => gk,
174            None => continue,
175        };
176
177        let order_value = extract_order_value(&payload, &regex, policy).unwrap_or_default();
178
179        groups.entry(group_key).or_default().push(KeyEntry {
180            key: key.clone(),
181            order_value,
182        });
183    }
184
185    let mut all_deletes = Vec::new();
186    for (_group_key, entries) in groups {
187        all_deletes.extend(keys_to_delete(entries, policy));
188    }
189
190    if !all_deletes.is_empty() {
191        let refs: Vec<&[u8]> = all_deletes.iter().map(|k| k.as_ref()).collect();
192        engine.delete_batch(&refs).map_err(PruneError::Engine)?;
193    }
194
195    Ok(())
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201    use exoware_sdk_rs::kv_codec::Utf8;
202    use exoware_sdk_rs::prune_policy::{GroupBy, MatchKey, OrderBy};
203
204    fn make_policy(retain: RetainPolicy) -> PrunePolicy {
205        PrunePolicy {
206            match_key: MatchKey {
207                reserved_bits: 4,
208                prefix: 1,
209                payload_regex: Utf8::from(
210                    "(?s-u)^(?P<logical>(?:\\x00\\xFF|[^\\x00])*)\\x00\\x00(?P<version>.{8})$",
211                ),
212            },
213            group_by: GroupBy {
214                capture_groups: vec![Utf8::from("logical")],
215            },
216            order_by: Some(OrderBy {
217                capture_group: Utf8::from("version"),
218                encoding: OrderEncoding::U64Be,
219            }),
220            retain,
221        }
222    }
223
224    fn make_entry(order: u64) -> KeyEntry {
225        KeyEntry {
226            key: Bytes::from(vec![order as u8]),
227            order_value: order.to_be_bytes().to_vec(),
228        }
229    }
230
231    #[test]
232    fn keep_latest_retains_newest() {
233        let policy = make_policy(RetainPolicy::KeepLatest { count: 2 });
234        let entries = vec![make_entry(1), make_entry(2), make_entry(3)];
235        let deletes = keys_to_delete(entries, &policy);
236        assert_eq!(deletes.len(), 1);
237        assert_eq!(deletes[0].as_ref(), &[1u8]);
238    }
239
240    #[test]
241    fn keep_latest_no_delete_when_under_count() {
242        let policy = make_policy(RetainPolicy::KeepLatest { count: 5 });
243        let entries = vec![make_entry(1), make_entry(2)];
244        let deletes = keys_to_delete(entries, &policy);
245        assert!(deletes.is_empty());
246    }
247
248    #[test]
249    fn drop_all_deletes_everything() {
250        let policy = make_policy(RetainPolicy::DropAll);
251        let entries = vec![make_entry(1), make_entry(2)];
252        let deletes = keys_to_delete(entries, &policy);
253        assert_eq!(deletes.len(), 2);
254    }
255
256    #[test]
257    fn greater_than_threshold() {
258        let policy = make_policy(RetainPolicy::GreaterThan { threshold: 5 });
259        let entries = vec![make_entry(3), make_entry(5), make_entry(7)];
260        let deletes = keys_to_delete(entries, &policy);
261        assert_eq!(deletes.len(), 2);
262    }
263
264    #[test]
265    fn greater_than_or_equal_threshold() {
266        let policy = make_policy(RetainPolicy::GreaterThanOrEqual { threshold: 5 });
267        let entries = vec![make_entry(3), make_entry(5), make_entry(7)];
268        let deletes = keys_to_delete(entries, &policy);
269        assert_eq!(deletes.len(), 1);
270    }
271}