Skip to main content

exoware_sql/
prune.rs

1use exoware_sdk::kv_codec::Utf8;
2use exoware_sdk::match_key::MatchKey;
3use exoware_sdk::prune_policy::{
4    GroupBy, KeysScope, OrderBy, OrderEncoding, PolicyScope, PrunePolicy, RetainPolicy,
5};
6
7use crate::codec::primary_key_codec;
8use crate::types::PRIMARY_RESERVED_BITS;
9
10const VERSION_WIDTH_BYTES: usize = 8;
11const ORDERED_UTF8_REGEX: &str = r"(?:\x01[\x00-\x02]|[^\x00\x01\xFF])*\x00";
12
13fn keep_latest_versions_with_regex(
14    table_prefix: u8,
15    min_entity_bytes: usize,
16    payload_regex: impl Into<Utf8>,
17    count: usize,
18) -> Result<PrunePolicy, String> {
19    let payload_regex = payload_regex.into();
20    if count == 0 {
21        return Err("keep_latest_versions count must be > 0".to_string());
22    }
23    let codec = primary_key_codec(table_prefix)?;
24    let required_bytes = min_entity_bytes
25        .checked_add(VERSION_WIDTH_BYTES)
26        .ok_or_else(|| "entity width overflowed when adding version width".to_string())?;
27    if required_bytes > codec.payload_capacity_bytes() {
28        return Err(format!(
29            "entity width {min_entity_bytes} plus version width {VERSION_WIDTH_BYTES} exceeds primary key payload capacity {}",
30            codec.payload_capacity_bytes()
31        ));
32    }
33
34    Ok(PrunePolicy {
35        scope: PolicyScope::Keys(KeysScope {
36            match_key: MatchKey {
37                reserved_bits: PRIMARY_RESERVED_BITS,
38                prefix: codec.prefix(),
39                payload_regex,
40            },
41            group_by: GroupBy {
42                capture_groups: vec![Utf8::from("entity")],
43            },
44            order_by: Some(OrderBy {
45                capture_group: Utf8::from("version"),
46                encoding: OrderEncoding::U64Be,
47            }),
48        }),
49        retain: RetainPolicy::KeepLatest { count },
50    })
51}
52
53/// Build a prune policy that keeps the latest `count` versions for each entity
54/// in a `exoware-sql` versioned primary-key family with a fixed-width entity key.
55///
56/// The policy assumes the key layout created by `KvSchema::table_versioned`:
57/// `[entity bytes][u64_be version]` under the table's primary key codec family.
58pub fn keep_latest_versions(
59    table_prefix: u8,
60    entity_key_width: usize,
61    count: usize,
62) -> Result<PrunePolicy, String> {
63    keep_latest_versions_with_regex(
64        table_prefix,
65        entity_key_width,
66        format!(
67            r"(?s-u)^(?P<entity>.{{{entity_key_width}}})(?P<version>.{{{VERSION_WIDTH_BYTES}}})$"
68        ),
69        count,
70    )
71}
72
73/// Build a prune policy that keeps the latest `count` versions for each entity
74/// in a `exoware-sql` versioned primary-key family whose entity column is `Utf8`.
75///
76/// `table_versioned()` encodes ordered UTF-8 keys as an escape-aware byte stream
77/// terminated by `0x00`, so the entity capture must be length-delimited by that
78/// terminator rather than by a caller-provided fixed width.
79pub fn keep_latest_versions_utf8(table_prefix: u8, count: usize) -> Result<PrunePolicy, String> {
80    keep_latest_versions_with_regex(
81        table_prefix,
82        1,
83        format!(r"(?s-u)^(?P<entity>{ORDERED_UTF8_REGEX})(?P<version>.{{{VERSION_WIDTH_BYTES}}})$"),
84        count,
85    )
86}
87
88#[cfg(test)]
89mod tests {
90    use super::{keep_latest_versions, keep_latest_versions_utf8, ORDERED_UTF8_REGEX};
91    use crate::codec::encode_primary_key;
92    use crate::types::{KvTableConfig, TableColumnConfig, TableModel};
93    use crate::CellValue;
94    use datafusion::arrow::datatypes::DataType;
95    use exoware_sdk::kv_codec::Utf8;
96    use exoware_sdk::match_key::compile_payload_regex;
97    use exoware_sdk::prune_policy::{validate_policy, OrderEncoding, PolicyScope, RetainPolicy};
98
99    fn keys_scope(policy: &super::PrunePolicy) -> &super::KeysScope {
100        match &policy.scope {
101            PolicyScope::Keys(s) => s,
102            PolicyScope::Sequence => panic!("expected Keys scope"),
103        }
104    }
105
106    #[test]
107    fn keep_latest_versions_builds_expected_policy_for_fixed_width_entity() {
108        let policy = keep_latest_versions(3, 32, 1).expect("policy");
109        let scope = keys_scope(&policy);
110        assert_eq!(scope.match_key.reserved_bits, 5);
111        assert_eq!(scope.match_key.prefix, 6);
112        assert_eq!(
113            scope.match_key.payload_regex,
114            r"(?s-u)^(?P<entity>.{32})(?P<version>.{8})$"
115        );
116        assert_eq!(scope.group_by.capture_groups, vec![Utf8::from("entity")]);
117        assert_eq!(
118            &*scope.order_by.as_ref().expect("order_by").capture_group,
119            "version"
120        );
121        assert_eq!(
122            scope.order_by.as_ref().expect("order_by").encoding,
123            OrderEncoding::U64Be
124        );
125        assert_eq!(policy.retain, RetainPolicy::KeepLatest { count: 1 });
126        validate_policy(&policy).expect("policy should validate");
127    }
128
129    #[test]
130    fn keep_latest_versions_rejects_zero_count() {
131        let err = keep_latest_versions(3, 32, 0).expect_err("zero count should fail");
132        assert!(err.contains("count must be > 0"));
133    }
134
135    #[test]
136    fn keep_latest_versions_rejects_oversized_entity_width() {
137        let err = keep_latest_versions(3, 1000, 1).expect_err("oversized entity should fail");
138        assert!(err.contains("exceeds primary key payload capacity"));
139    }
140
141    #[test]
142    fn keep_latest_versions_utf8_builds_expected_policy() {
143        let policy = keep_latest_versions_utf8(3, 1).expect("policy");
144        let scope = keys_scope(&policy);
145        assert_eq!(scope.match_key.reserved_bits, 5);
146        assert_eq!(scope.match_key.prefix, 6);
147        assert_eq!(
148            scope.match_key.payload_regex,
149            format!(r"(?s-u)^(?P<entity>{ORDERED_UTF8_REGEX})(?P<version>.{{8}})$")
150        );
151        assert_eq!(scope.group_by.capture_groups, vec![Utf8::from("entity")]);
152        assert_eq!(
153            &*scope.order_by.as_ref().expect("order_by").capture_group,
154            "version"
155        );
156        assert_eq!(
157            scope.order_by.as_ref().expect("order_by").encoding,
158            OrderEncoding::U64Be
159        );
160        assert_eq!(policy.retain, RetainPolicy::KeepLatest { count: 1 });
161        validate_policy(&policy).expect("policy should validate");
162    }
163
164    #[test]
165    fn keep_latest_versions_utf8_matches_variable_length_entity_payloads() {
166        let policy = keep_latest_versions_utf8(3, 1).expect("policy");
167        let scope = keys_scope(&policy);
168        let regex = compile_payload_regex(&scope.match_key.payload_regex).expect("regex");
169        let config = KvTableConfig::new(
170            3,
171            vec![
172                TableColumnConfig::new("entity", DataType::Utf8, false),
173                TableColumnConfig::new("version", DataType::UInt64, false),
174            ],
175            vec!["entity".to_string(), "version".to_string()],
176            vec![],
177        )
178        .expect("config");
179        let model = TableModel::from_config(&config).expect("model");
180        let short_entity = CellValue::Utf8("a".to_string());
181        let long_entity = CellValue::Utf8("alpha\x00beta".to_string());
182        let short_key =
183            encode_primary_key(3, &[&short_entity, &CellValue::UInt64(1)], &model).expect("key");
184        let long_key =
185            encode_primary_key(3, &[&long_entity, &CellValue::UInt64(2)], &model).expect("key");
186        let codec = model.primary_key_codec;
187
188        for key in [&short_key, &long_key] {
189            let payload = codec
190                .read_payload(key, 0, codec.payload_capacity_bytes_for_key_len(key.len()))
191                .expect("payload");
192            let captures = regex.captures(&payload).expect("captures");
193            assert_eq!(
194                captures.get(0).expect("full match").as_bytes(),
195                payload.as_slice()
196            );
197            assert_eq!(
198                captures.name("version").expect("version").as_bytes().len(),
199                8
200            );
201            assert!(
202                captures
203                    .name("entity")
204                    .expect("entity")
205                    .as_bytes()
206                    .ends_with(&[0x00]),
207                "ordered UTF-8 entity encoding should include the terminator"
208            );
209        }
210    }
211}