1use std::cmp::Ordering;
8use std::collections::BTreeMap;
9use std::sync::Arc;
10
11use bytes::Bytes;
12use exoware_sdk_rs::keys::KeyCodec;
13use exoware_sdk_rs::prune_policy::{
14 compile_payload_regex, OrderEncoding, PrunePolicy, PrunePolicyDocument, RetainPolicy,
15};
16use regex::bytes::Regex;
17
18use crate::StoreEngine;
19
20#[derive(Debug)]
21pub enum PruneError {
22 Engine(String),
23 Policy(String),
24}
25
26impl std::fmt::Display for PruneError {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 match self {
29 PruneError::Engine(s) => write!(f, "engine: {s}"),
30 PruneError::Policy(s) => write!(f, "policy: {s}"),
31 }
32 }
33}
34
35impl std::error::Error for PruneError {}
36
37fn extract_order_value(payload: &[u8], regex: &Regex, policy: &PrunePolicy) -> Option<Vec<u8>> {
38 let order_by = policy.order_by.as_ref()?;
39 let captures = regex.captures(payload)?;
40 let matched = captures.name(&order_by.capture_group)?;
41 let raw = matched.as_bytes();
42 match order_by.encoding {
43 OrderEncoding::BytesAsc => Some(raw.to_vec()),
44 OrderEncoding::U64Be => {
45 if raw.len() == 8 {
46 Some(raw.to_vec())
47 } else {
48 None
49 }
50 }
51 OrderEncoding::I64Be => {
52 if raw.len() == 8 {
53 Some(raw.to_vec())
54 } else {
55 None
56 }
57 }
58 }
59}
60
61fn extract_group_key(payload: &[u8], regex: &Regex, policy: &PrunePolicy) -> Option<Vec<u8>> {
62 if policy.group_by.capture_groups.is_empty() {
63 return Some(Vec::new());
64 }
65 let captures = regex.captures(payload)?;
66 let mut group_key = Vec::new();
67 for group_name in &policy.group_by.capture_groups {
68 let matched = captures.name(group_name)?;
69 let bytes = matched.as_bytes();
70 group_key.extend_from_slice(&(bytes.len() as u32).to_be_bytes());
71 group_key.extend_from_slice(bytes);
72 }
73 Some(group_key)
74}
75
76struct KeyEntry {
77 key: Bytes,
78 order_value: Vec<u8>,
79}
80
81fn compare_order_values(a: &[u8], b: &[u8], policy: &PrunePolicy) -> Ordering {
82 match policy.order_by.as_ref().map(|o| &o.encoding) {
83 Some(OrderEncoding::U64Be) => {
84 let a_val = a.try_into().map(u64::from_be_bytes).unwrap_or(0);
85 let b_val = b.try_into().map(u64::from_be_bytes).unwrap_or(0);
86 a_val.cmp(&b_val)
87 }
88 Some(OrderEncoding::I64Be) => {
89 let a_val = a.try_into().map(i64::from_be_bytes).unwrap_or(0);
90 let b_val = b.try_into().map(i64::from_be_bytes).unwrap_or(0);
91 a_val.cmp(&b_val)
92 }
93 Some(OrderEncoding::BytesAsc) | None => a.cmp(b),
94 }
95}
96
97fn keys_to_delete(mut entries: Vec<KeyEntry>, policy: &PrunePolicy) -> Vec<Bytes> {
98 entries.sort_by(|a, b| compare_order_values(&a.order_value, &b.order_value, policy));
99
100 match &policy.retain {
101 RetainPolicy::KeepLatest { count } => {
102 if entries.len() <= *count {
103 return Vec::new();
104 }
105 entries[..entries.len() - count]
106 .iter()
107 .map(|e| e.key.clone())
108 .collect()
109 }
110 RetainPolicy::GreaterThan { threshold } => {
111 let threshold = threshold.to_be_bytes();
112 entries
113 .iter()
114 .filter(|e| {
115 compare_order_values(&e.order_value, &threshold, policy) != Ordering::Greater
116 })
117 .map(|e| e.key.clone())
118 .collect()
119 }
120 RetainPolicy::GreaterThanOrEqual { threshold } => {
121 let threshold = threshold.to_be_bytes();
122 entries
123 .iter()
124 .filter(|e| {
125 compare_order_values(&e.order_value, &threshold, policy) == Ordering::Less
126 })
127 .map(|e| e.key.clone())
128 .collect()
129 }
130 RetainPolicy::DropAll => entries.iter().map(|e| e.key.clone()).collect(),
131 }
132}
133
134pub fn execute_prune(
135 engine: &Arc<dyn StoreEngine>,
136 document: &PrunePolicyDocument,
137) -> Result<(), PruneError> {
138 for policy in &document.policies {
139 execute_single_policy(engine, policy)?;
140 }
141 Ok(())
142}
143
144fn execute_single_policy(
145 engine: &Arc<dyn StoreEngine>,
146 policy: &PrunePolicy,
147) -> Result<(), PruneError> {
148 let codec = KeyCodec::new(policy.match_key.reserved_bits, policy.match_key.prefix);
149 let regex = compile_payload_regex(&policy.match_key.payload_regex)
150 .map_err(|e| PruneError::Policy(e.to_string()))?;
151
152 let (start, end) = codec.prefix_bounds();
153 let rows = engine
154 .range_scan(start.as_ref(), end.as_ref(), usize::MAX, true)
155 .map_err(PruneError::Engine)?;
156
157 let mut groups: BTreeMap<Vec<u8>, Vec<KeyEntry>> = BTreeMap::new();
158
159 for (key, _value) in &rows {
160 if !codec.matches(key) {
161 continue;
162 }
163 let payload_len = codec.payload_capacity_bytes_for_key_len(key.len());
164 let payload = match codec.read_payload(key, 0, payload_len) {
165 Ok(p) => p,
166 Err(_) => continue,
167 };
168 if !regex.is_match(&payload) {
169 continue;
170 }
171
172 let group_key = match extract_group_key(&payload, ®ex, policy) {
173 Some(gk) => gk,
174 None => continue,
175 };
176
177 let order_value = extract_order_value(&payload, ®ex, policy).unwrap_or_default();
178
179 groups.entry(group_key).or_default().push(KeyEntry {
180 key: key.clone(),
181 order_value,
182 });
183 }
184
185 let mut all_deletes = Vec::new();
186 for (_group_key, entries) in groups {
187 all_deletes.extend(keys_to_delete(entries, policy));
188 }
189
190 if !all_deletes.is_empty() {
191 let refs: Vec<&[u8]> = all_deletes.iter().map(|k| k.as_ref()).collect();
192 engine.delete_batch(&refs).map_err(PruneError::Engine)?;
193 }
194
195 Ok(())
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use exoware_sdk_rs::kv_codec::Utf8;
202 use exoware_sdk_rs::prune_policy::{GroupBy, MatchKey, OrderBy};
203
204 fn make_policy(retain: RetainPolicy) -> PrunePolicy {
205 PrunePolicy {
206 match_key: MatchKey {
207 reserved_bits: 4,
208 prefix: 1,
209 payload_regex: Utf8::from(
210 "(?s-u)^(?P<logical>(?:\\x00\\xFF|[^\\x00])*)\\x00\\x00(?P<version>.{8})$",
211 ),
212 },
213 group_by: GroupBy {
214 capture_groups: vec![Utf8::from("logical")],
215 },
216 order_by: Some(OrderBy {
217 capture_group: Utf8::from("version"),
218 encoding: OrderEncoding::U64Be,
219 }),
220 retain,
221 }
222 }
223
224 fn make_entry(order: u64) -> KeyEntry {
225 KeyEntry {
226 key: Bytes::from(vec![order as u8]),
227 order_value: order.to_be_bytes().to_vec(),
228 }
229 }
230
231 #[test]
232 fn keep_latest_retains_newest() {
233 let policy = make_policy(RetainPolicy::KeepLatest { count: 2 });
234 let entries = vec![make_entry(1), make_entry(2), make_entry(3)];
235 let deletes = keys_to_delete(entries, &policy);
236 assert_eq!(deletes.len(), 1);
237 assert_eq!(deletes[0].as_ref(), &[1u8]);
238 }
239
240 #[test]
241 fn keep_latest_no_delete_when_under_count() {
242 let policy = make_policy(RetainPolicy::KeepLatest { count: 5 });
243 let entries = vec![make_entry(1), make_entry(2)];
244 let deletes = keys_to_delete(entries, &policy);
245 assert!(deletes.is_empty());
246 }
247
248 #[test]
249 fn drop_all_deletes_everything() {
250 let policy = make_policy(RetainPolicy::DropAll);
251 let entries = vec![make_entry(1), make_entry(2)];
252 let deletes = keys_to_delete(entries, &policy);
253 assert_eq!(deletes.len(), 2);
254 }
255
256 #[test]
257 fn greater_than_threshold() {
258 let policy = make_policy(RetainPolicy::GreaterThan { threshold: 5 });
259 let entries = vec![make_entry(3), make_entry(5), make_entry(7)];
260 let deletes = keys_to_delete(entries, &policy);
261 assert_eq!(deletes.len(), 2);
262 }
263
264 #[test]
265 fn greater_than_or_equal_threshold() {
266 let policy = make_policy(RetainPolicy::GreaterThanOrEqual { threshold: 5 });
267 let entries = vec![make_entry(3), make_entry(5), make_entry(7)];
268 let deletes = keys_to_delete(entries, &policy);
269 assert_eq!(deletes.len(), 1);
270 }
271}