1use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::sync::Arc;
13
14use bytes::Bytes;
15use exoware_sdk::keys::KeyCodec;
16use exoware_sdk::match_key::compile_payload_regex;
17use exoware_sdk::prune_policy::{
18 KeysScope, OrderEncoding, PolicyScope, PrunePolicyDocument, RetainPolicy,
19};
20use regex::bytes::Regex;
21
22use crate::StoreEngine;
23
24#[derive(Debug)]
25pub enum PruneError {
26 Engine(String),
27 Policy(String),
28}
29
30impl std::fmt::Display for PruneError {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 match self {
33 PruneError::Engine(s) => write!(f, "engine: {s}"),
34 PruneError::Policy(s) => write!(f, "policy: {s}"),
35 }
36 }
37}
38
39impl std::error::Error for PruneError {}
40
41fn extract_order_value(payload: &[u8], regex: &Regex, scope: &KeysScope) -> Option<Vec<u8>> {
42 let order_by = scope.order_by.as_ref()?;
43 let captures = regex.captures(payload)?;
44 let matched = captures.name(&order_by.capture_group)?;
45 let raw = matched.as_bytes();
46 match order_by.encoding {
47 OrderEncoding::BytesAsc => Some(raw.to_vec()),
48 OrderEncoding::U64Be | OrderEncoding::I64Be => {
49 if raw.len() == 8 {
50 Some(raw.to_vec())
51 } else {
52 None
53 }
54 }
55 }
56}
57
58fn extract_group_key(payload: &[u8], regex: &Regex, scope: &KeysScope) -> Option<Vec<u8>> {
59 if scope.group_by.capture_groups.is_empty() {
60 return Some(Vec::new());
61 }
62 let captures = regex.captures(payload)?;
63 let mut group_key = Vec::new();
64 for group_name in &scope.group_by.capture_groups {
65 let matched = captures.name(group_name)?;
66 let bytes = matched.as_bytes();
67 group_key.extend_from_slice(&(bytes.len() as u32).to_be_bytes());
68 group_key.extend_from_slice(bytes);
69 }
70 Some(group_key)
71}
72
73struct KeyEntry {
74 key: Bytes,
75 order_value: Vec<u8>,
76}
77
78fn compare_order_values(a: &[u8], b: &[u8], scope: &KeysScope) -> Ordering {
79 match scope.order_by.as_ref().map(|o| &o.encoding) {
80 Some(OrderEncoding::U64Be) => {
81 let a_val = a.try_into().map(u64::from_be_bytes).unwrap_or(0);
82 let b_val = b.try_into().map(u64::from_be_bytes).unwrap_or(0);
83 a_val.cmp(&b_val)
84 }
85 Some(OrderEncoding::I64Be) => {
86 let a_val = a.try_into().map(i64::from_be_bytes).unwrap_or(0);
87 let b_val = b.try_into().map(i64::from_be_bytes).unwrap_or(0);
88 a_val.cmp(&b_val)
89 }
90 Some(OrderEncoding::BytesAsc) | None => a.cmp(b),
91 }
92}
93
94fn keys_to_delete(
95 mut entries: Vec<KeyEntry>,
96 scope: &KeysScope,
97 retain: &RetainPolicy,
98) -> Vec<Bytes> {
99 entries.sort_by(|a, b| compare_order_values(&a.order_value, &b.order_value, scope));
100
101 match retain {
102 RetainPolicy::KeepLatest { count } => {
103 if entries.len() <= *count {
104 return Vec::new();
105 }
106 entries[..entries.len() - count]
107 .iter()
108 .map(|e| e.key.clone())
109 .collect()
110 }
111 RetainPolicy::GreaterThan { threshold } => {
112 let threshold = threshold.to_be_bytes();
113 entries
114 .iter()
115 .filter(|e| {
116 compare_order_values(&e.order_value, &threshold, scope) != Ordering::Greater
117 })
118 .map(|e| e.key.clone())
119 .collect()
120 }
121 RetainPolicy::GreaterThanOrEqual { threshold } => {
122 let threshold = threshold.to_be_bytes();
123 entries
124 .iter()
125 .filter(|e| {
126 compare_order_values(&e.order_value, &threshold, scope) == Ordering::Less
127 })
128 .map(|e| e.key.clone())
129 .collect()
130 }
131 RetainPolicy::DropAll => entries.iter().map(|e| e.key.clone()).collect(),
132 }
133}
134
135pub fn execute_prune(
136 engine: &Arc<dyn StoreEngine>,
137 document: &PrunePolicyDocument,
138) -> Result<(), PruneError> {
139 for policy in &document.policies {
140 match &policy.scope {
141 PolicyScope::Keys(scope) => {
142 execute_user_keys_policy(engine, scope, &policy.retain)?;
143 }
144 PolicyScope::Sequence => {
145 execute_batch_log_policy(engine, &policy.retain)?;
146 }
147 }
148 }
149 Ok(())
150}
151
152fn execute_user_keys_policy(
153 engine: &Arc<dyn StoreEngine>,
154 scope: &KeysScope,
155 retain: &RetainPolicy,
156) -> Result<(), PruneError> {
157 let codec = KeyCodec::new(scope.match_key.reserved_bits, scope.match_key.prefix);
158 let regex: Regex = compile_payload_regex(&scope.match_key.payload_regex)
159 .map_err(|e| PruneError::Policy(e.to_string()))?;
160
161 let (start, end) = codec.prefix_bounds();
162 let rows = engine
163 .range_scan(start.as_ref(), end.as_ref(), usize::MAX, true)
164 .map_err(PruneError::Engine)?;
165
166 let mut groups: BTreeMap<Vec<u8>, Vec<KeyEntry>> = BTreeMap::new();
167
168 for (key, _value) in &rows {
169 if !codec.matches(key) {
170 continue;
171 }
172 let payload_len = codec.payload_capacity_bytes_for_key_len(key.len());
173 let payload = match codec.read_payload(key, 0, payload_len) {
174 Ok(p) => p,
175 Err(_) => continue,
176 };
177 if !regex.is_match(&payload) {
178 continue;
179 }
180
181 let group_key = match extract_group_key(&payload, ®ex, scope) {
182 Some(gk) => gk,
183 None => continue,
184 };
185
186 let order_value = extract_order_value(&payload, ®ex, scope).unwrap_or_default();
187
188 groups.entry(group_key).or_default().push(KeyEntry {
189 key: key.clone(),
190 order_value,
191 });
192 }
193
194 let mut all_deletes = Vec::new();
195 for (_group_key, entries) in groups {
196 all_deletes.extend(keys_to_delete(entries, scope, retain));
197 }
198
199 if !all_deletes.is_empty() {
200 let refs: Vec<&[u8]> = all_deletes.iter().map(|k| k.as_ref()).collect();
201 engine.delete_batch(&refs).map_err(PruneError::Engine)?;
202 }
203
204 Ok(())
205}
206
207fn execute_batch_log_policy(
208 engine: &Arc<dyn StoreEngine>,
209 retain: &RetainPolicy,
210) -> Result<(), PruneError> {
211 let current = engine.current_sequence();
212 let cutoff_exclusive = match retain {
213 RetainPolicy::KeepLatest { count } => {
214 let count = *count as u64;
216 current.saturating_add(1).saturating_sub(count)
217 }
218 RetainPolicy::GreaterThan { threshold } => threshold.saturating_add(1),
219 RetainPolicy::GreaterThanOrEqual { threshold } => *threshold,
220 RetainPolicy::DropAll => current.saturating_add(1),
221 };
222
223 engine
224 .prune_batch_log(cutoff_exclusive)
225 .map_err(PruneError::Engine)?;
226 Ok(())
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232 use exoware_sdk::kv_codec::Utf8;
233 use exoware_sdk::match_key::MatchKey;
234 use exoware_sdk::prune_policy::{GroupBy, OrderBy};
235
236 fn make_scope() -> KeysScope {
237 KeysScope {
238 match_key: MatchKey {
239 reserved_bits: 4,
240 prefix: 1,
241 payload_regex: Utf8::from(
242 "(?s-u)^(?P<logical>(?:\\x00\\xFF|[^\\x00])*)\\x00\\x00(?P<version>.{8})$",
243 ),
244 },
245 group_by: GroupBy {
246 capture_groups: vec![Utf8::from("logical")],
247 },
248 order_by: Some(OrderBy {
249 capture_group: Utf8::from("version"),
250 encoding: OrderEncoding::U64Be,
251 }),
252 }
253 }
254
255 fn make_entry(order: u64) -> KeyEntry {
256 KeyEntry {
257 key: Bytes::from(vec![order as u8]),
258 order_value: order.to_be_bytes().to_vec(),
259 }
260 }
261
262 #[test]
263 fn keep_latest_retains_newest() {
264 let scope = make_scope();
265 let retain = RetainPolicy::KeepLatest { count: 2 };
266 let entries = vec![make_entry(1), make_entry(2), make_entry(3)];
267 let deletes = keys_to_delete(entries, &scope, &retain);
268 assert_eq!(deletes.len(), 1);
269 assert_eq!(deletes[0].as_ref(), &[1u8]);
270 }
271
272 #[test]
273 fn keep_latest_no_delete_when_under_count() {
274 let scope = make_scope();
275 let retain = RetainPolicy::KeepLatest { count: 5 };
276 let entries = vec![make_entry(1), make_entry(2)];
277 let deletes = keys_to_delete(entries, &scope, &retain);
278 assert!(deletes.is_empty());
279 }
280
281 #[test]
282 fn drop_all_deletes_everything() {
283 let scope = make_scope();
284 let retain = RetainPolicy::DropAll;
285 let entries = vec![make_entry(1), make_entry(2)];
286 let deletes = keys_to_delete(entries, &scope, &retain);
287 assert_eq!(deletes.len(), 2);
288 }
289
290 #[test]
291 fn greater_than_threshold() {
292 let scope = make_scope();
293 let retain = RetainPolicy::GreaterThan { threshold: 5 };
294 let entries = vec![make_entry(3), make_entry(5), make_entry(7)];
295 let deletes = keys_to_delete(entries, &scope, &retain);
296 assert_eq!(deletes.len(), 2);
297 }
298
299 #[test]
300 fn greater_than_or_equal_threshold() {
301 let scope = make_scope();
302 let retain = RetainPolicy::GreaterThanOrEqual { threshold: 5 };
303 let entries = vec![make_entry(3), make_entry(5), make_entry(7)];
304 let deletes = keys_to_delete(entries, &scope, &retain);
305 assert_eq!(deletes.len(), 1);
306 }
307}