Skip to main content

exoware_sql/
prune.rs

1use exoware_sdk_rs::kv_codec::Utf8;
2use exoware_sdk_rs::prune_policy::{
3    GroupBy, MatchKey, OrderBy, OrderEncoding, PrunePolicy, RetainPolicy,
4};
5
6use crate::codec::primary_key_codec;
7use crate::types::PRIMARY_RESERVED_BITS;
8
9const VERSION_WIDTH_BYTES: usize = 8;
10const ORDERED_UTF8_REGEX: &str = r"(?:\x01[\x00-\x02]|[^\x00\x01\xFF])*\x00";
11
12fn keep_latest_versions_with_regex(
13    table_prefix: u8,
14    min_entity_bytes: usize,
15    payload_regex: impl Into<Utf8>,
16    count: usize,
17) -> Result<PrunePolicy, String> {
18    let payload_regex = payload_regex.into();
19    if count == 0 {
20        return Err("keep_latest_versions count must be > 0".to_string());
21    }
22    let codec = primary_key_codec(table_prefix)?;
23    let required_bytes = min_entity_bytes
24        .checked_add(VERSION_WIDTH_BYTES)
25        .ok_or_else(|| "entity width overflowed when adding version width".to_string())?;
26    if required_bytes > codec.payload_capacity_bytes() {
27        return Err(format!(
28            "entity width {min_entity_bytes} plus version width {VERSION_WIDTH_BYTES} exceeds primary key payload capacity {}",
29            codec.payload_capacity_bytes()
30        ));
31    }
32
33    Ok(PrunePolicy {
34        match_key: MatchKey {
35            reserved_bits: PRIMARY_RESERVED_BITS,
36            prefix: codec.prefix(),
37            payload_regex,
38        },
39        group_by: GroupBy {
40            capture_groups: vec![Utf8::from("entity")],
41        },
42        order_by: Some(OrderBy {
43            capture_group: Utf8::from("version"),
44            encoding: OrderEncoding::U64Be,
45        }),
46        retain: RetainPolicy::KeepLatest { count },
47    })
48}
49
50/// Build a prune policy that keeps the latest `count` versions for each entity
51/// in a `exoware-sql` versioned primary-key family with a fixed-width entity key.
52///
53/// The policy assumes the key layout created by `KvSchema::table_versioned`:
54/// `[entity bytes][u64_be version]` under the table's primary key codec family.
55pub fn keep_latest_versions(
56    table_prefix: u8,
57    entity_key_width: usize,
58    count: usize,
59) -> Result<PrunePolicy, String> {
60    keep_latest_versions_with_regex(
61        table_prefix,
62        entity_key_width,
63        format!(
64            r"(?s-u)^(?P<entity>.{{{entity_key_width}}})(?P<version>.{{{VERSION_WIDTH_BYTES}}})$"
65        ),
66        count,
67    )
68}
69
70/// Build a prune policy that keeps the latest `count` versions for each entity
71/// in a `exoware-sql` versioned primary-key family whose entity column is `Utf8`.
72///
73/// `table_versioned()` encodes ordered UTF-8 keys as an escape-aware byte stream
74/// terminated by `0x00`, so the entity capture must be length-delimited by that
75/// terminator rather than by a caller-provided fixed width.
76pub fn keep_latest_versions_utf8(table_prefix: u8, count: usize) -> Result<PrunePolicy, String> {
77    keep_latest_versions_with_regex(
78        table_prefix,
79        1,
80        format!(r"(?s-u)^(?P<entity>{ORDERED_UTF8_REGEX})(?P<version>.{{{VERSION_WIDTH_BYTES}}})$"),
81        count,
82    )
83}
84
85#[cfg(test)]
86mod tests {
87    use super::{keep_latest_versions, keep_latest_versions_utf8, ORDERED_UTF8_REGEX};
88    use crate::codec::encode_primary_key;
89    use crate::types::{KvTableConfig, TableColumnConfig, TableModel};
90    use crate::CellValue;
91    use datafusion::arrow::datatypes::DataType;
92    use exoware_sdk_rs::kv_codec::Utf8;
93    use exoware_sdk_rs::prune_policy::{
94        compile_payload_regex, validate_policy, OrderEncoding, RetainPolicy,
95    };
96
97    #[test]
98    fn keep_latest_versions_builds_expected_policy_for_fixed_width_entity() {
99        let policy = keep_latest_versions(3, 32, 1).expect("policy");
100        assert_eq!(policy.match_key.reserved_bits, 5);
101        assert_eq!(policy.match_key.prefix, 6);
102        assert_eq!(
103            policy.match_key.payload_regex,
104            r"(?s-u)^(?P<entity>.{32})(?P<version>.{8})$"
105        );
106        assert_eq!(policy.group_by.capture_groups, vec![Utf8::from("entity")]);
107        assert_eq!(
108            &*policy.order_by.as_ref().expect("order_by").capture_group,
109            "version"
110        );
111        assert_eq!(
112            policy.order_by.as_ref().expect("order_by").encoding,
113            OrderEncoding::U64Be
114        );
115        assert_eq!(policy.retain, RetainPolicy::KeepLatest { count: 1 });
116        validate_policy(&policy).expect("policy should validate");
117    }
118
119    #[test]
120    fn keep_latest_versions_rejects_zero_count() {
121        let err = keep_latest_versions(3, 32, 0).expect_err("zero count should fail");
122        assert!(err.contains("count must be > 0"));
123    }
124
125    #[test]
126    fn keep_latest_versions_rejects_oversized_entity_width() {
127        let err = keep_latest_versions(3, 1000, 1).expect_err("oversized entity should fail");
128        assert!(err.contains("exceeds primary key payload capacity"));
129    }
130
131    #[test]
132    fn keep_latest_versions_utf8_builds_expected_policy() {
133        let policy = keep_latest_versions_utf8(3, 1).expect("policy");
134        assert_eq!(policy.match_key.reserved_bits, 5);
135        assert_eq!(policy.match_key.prefix, 6);
136        assert_eq!(
137            policy.match_key.payload_regex,
138            format!(r"(?s-u)^(?P<entity>{ORDERED_UTF8_REGEX})(?P<version>.{{8}})$")
139        );
140        assert_eq!(policy.group_by.capture_groups, vec![Utf8::from("entity")]);
141        assert_eq!(
142            &*policy.order_by.as_ref().expect("order_by").capture_group,
143            "version"
144        );
145        assert_eq!(
146            policy.order_by.as_ref().expect("order_by").encoding,
147            OrderEncoding::U64Be
148        );
149        assert_eq!(policy.retain, RetainPolicy::KeepLatest { count: 1 });
150        validate_policy(&policy).expect("policy should validate");
151    }
152
153    #[test]
154    fn keep_latest_versions_utf8_matches_variable_length_entity_payloads() {
155        let policy = keep_latest_versions_utf8(3, 1).expect("policy");
156        let regex = compile_payload_regex(&policy.match_key.payload_regex).expect("regex");
157        let config = KvTableConfig::new(
158            3,
159            vec![
160                TableColumnConfig::new("entity", DataType::Utf8, false),
161                TableColumnConfig::new("version", DataType::UInt64, false),
162            ],
163            vec!["entity".to_string(), "version".to_string()],
164            vec![],
165        )
166        .expect("config");
167        let model = TableModel::from_config(&config).expect("model");
168        let short_entity = CellValue::Utf8("a".to_string());
169        let long_entity = CellValue::Utf8("alpha\x00beta".to_string());
170        let short_key =
171            encode_primary_key(3, &[&short_entity, &CellValue::UInt64(1)], &model).expect("key");
172        let long_key =
173            encode_primary_key(3, &[&long_entity, &CellValue::UInt64(2)], &model).expect("key");
174        let codec = model.primary_key_codec;
175
176        for key in [&short_key, &long_key] {
177            let payload = codec
178                .read_payload(key, 0, codec.payload_capacity_bytes_for_key_len(key.len()))
179                .expect("payload");
180            let captures = regex.captures(&payload).expect("captures");
181            assert_eq!(
182                captures.get(0).expect("full match").as_bytes(),
183                payload.as_slice()
184            );
185            assert_eq!(
186                captures.name("version").expect("version").as_bytes().len(),
187                8
188            );
189            assert!(
190                captures
191                    .name("entity")
192                    .expect("entity")
193                    .as_bytes()
194                    .ends_with(&[0x00]),
195                "ordered UTF-8 entity encoding should include the terminator"
196            );
197        }
198    }
199}