1use exoware_sdk::kv_codec::Utf8;
2use exoware_sdk::match_key::MatchKey;
3use exoware_sdk::prune_policy::{
4 GroupBy, KeysScope, OrderBy, OrderEncoding, PolicyScope, PrunePolicy, RetainPolicy,
5};
6
7use crate::codec::primary_key_codec;
8use crate::types::PRIMARY_RESERVED_BITS;
9
10const VERSION_WIDTH_BYTES: usize = 8;
11const ORDERED_UTF8_REGEX: &str = r"(?:\x01[\x00-\x02]|[^\x00\x01\xFF])*\x00";
12
13fn keep_latest_versions_with_regex(
14 table_prefix: u8,
15 min_entity_bytes: usize,
16 payload_regex: impl Into<Utf8>,
17 count: usize,
18) -> Result<PrunePolicy, String> {
19 let payload_regex = payload_regex.into();
20 if count == 0 {
21 return Err("keep_latest_versions count must be > 0".to_string());
22 }
23 let codec = primary_key_codec(table_prefix)?;
24 let required_bytes = min_entity_bytes
25 .checked_add(VERSION_WIDTH_BYTES)
26 .ok_or_else(|| "entity width overflowed when adding version width".to_string())?;
27 if required_bytes > codec.payload_capacity_bytes() {
28 return Err(format!(
29 "entity width {min_entity_bytes} plus version width {VERSION_WIDTH_BYTES} exceeds primary key payload capacity {}",
30 codec.payload_capacity_bytes()
31 ));
32 }
33
34 Ok(PrunePolicy {
35 scope: PolicyScope::Keys(KeysScope {
36 match_key: MatchKey {
37 reserved_bits: PRIMARY_RESERVED_BITS,
38 prefix: codec.prefix(),
39 payload_regex,
40 },
41 group_by: GroupBy {
42 capture_groups: vec![Utf8::from("entity")],
43 },
44 order_by: Some(OrderBy {
45 capture_group: Utf8::from("version"),
46 encoding: OrderEncoding::U64Be,
47 }),
48 }),
49 retain: RetainPolicy::KeepLatest { count },
50 })
51}
52
53pub fn keep_latest_versions(
59 table_prefix: u8,
60 entity_key_width: usize,
61 count: usize,
62) -> Result<PrunePolicy, String> {
63 keep_latest_versions_with_regex(
64 table_prefix,
65 entity_key_width,
66 format!(
67 r"(?s-u)^(?P<entity>.{{{entity_key_width}}})(?P<version>.{{{VERSION_WIDTH_BYTES}}})$"
68 ),
69 count,
70 )
71}
72
73pub fn keep_latest_versions_utf8(table_prefix: u8, count: usize) -> Result<PrunePolicy, String> {
80 keep_latest_versions_with_regex(
81 table_prefix,
82 1,
83 format!(r"(?s-u)^(?P<entity>{ORDERED_UTF8_REGEX})(?P<version>.{{{VERSION_WIDTH_BYTES}}})$"),
84 count,
85 )
86}
87
88#[cfg(test)]
89mod tests {
90 use super::{keep_latest_versions, keep_latest_versions_utf8, ORDERED_UTF8_REGEX};
91 use crate::codec::encode_primary_key;
92 use crate::types::{KvTableConfig, TableColumnConfig, TableModel};
93 use crate::CellValue;
94 use datafusion::arrow::datatypes::DataType;
95 use exoware_sdk::kv_codec::Utf8;
96 use exoware_sdk::match_key::compile_payload_regex;
97 use exoware_sdk::prune_policy::{validate_policy, OrderEncoding, PolicyScope, RetainPolicy};
98
99 fn keys_scope(policy: &super::PrunePolicy) -> &super::KeysScope {
100 match &policy.scope {
101 PolicyScope::Keys(s) => s,
102 PolicyScope::Sequence => panic!("expected Keys scope"),
103 }
104 }
105
106 #[test]
107 fn keep_latest_versions_builds_expected_policy_for_fixed_width_entity() {
108 let policy = keep_latest_versions(3, 32, 1).expect("policy");
109 let scope = keys_scope(&policy);
110 assert_eq!(scope.match_key.reserved_bits, 5);
111 assert_eq!(scope.match_key.prefix, 6);
112 assert_eq!(
113 scope.match_key.payload_regex,
114 r"(?s-u)^(?P<entity>.{32})(?P<version>.{8})$"
115 );
116 assert_eq!(scope.group_by.capture_groups, vec![Utf8::from("entity")]);
117 assert_eq!(
118 &*scope.order_by.as_ref().expect("order_by").capture_group,
119 "version"
120 );
121 assert_eq!(
122 scope.order_by.as_ref().expect("order_by").encoding,
123 OrderEncoding::U64Be
124 );
125 assert_eq!(policy.retain, RetainPolicy::KeepLatest { count: 1 });
126 validate_policy(&policy).expect("policy should validate");
127 }
128
129 #[test]
130 fn keep_latest_versions_rejects_zero_count() {
131 let err = keep_latest_versions(3, 32, 0).expect_err("zero count should fail");
132 assert!(err.contains("count must be > 0"));
133 }
134
135 #[test]
136 fn keep_latest_versions_rejects_oversized_entity_width() {
137 let err = keep_latest_versions(3, 1000, 1).expect_err("oversized entity should fail");
138 assert!(err.contains("exceeds primary key payload capacity"));
139 }
140
141 #[test]
142 fn keep_latest_versions_utf8_builds_expected_policy() {
143 let policy = keep_latest_versions_utf8(3, 1).expect("policy");
144 let scope = keys_scope(&policy);
145 assert_eq!(scope.match_key.reserved_bits, 5);
146 assert_eq!(scope.match_key.prefix, 6);
147 assert_eq!(
148 scope.match_key.payload_regex,
149 format!(r"(?s-u)^(?P<entity>{ORDERED_UTF8_REGEX})(?P<version>.{{8}})$")
150 );
151 assert_eq!(scope.group_by.capture_groups, vec![Utf8::from("entity")]);
152 assert_eq!(
153 &*scope.order_by.as_ref().expect("order_by").capture_group,
154 "version"
155 );
156 assert_eq!(
157 scope.order_by.as_ref().expect("order_by").encoding,
158 OrderEncoding::U64Be
159 );
160 assert_eq!(policy.retain, RetainPolicy::KeepLatest { count: 1 });
161 validate_policy(&policy).expect("policy should validate");
162 }
163
164 #[test]
165 fn keep_latest_versions_utf8_matches_variable_length_entity_payloads() {
166 let policy = keep_latest_versions_utf8(3, 1).expect("policy");
167 let scope = keys_scope(&policy);
168 let regex = compile_payload_regex(&scope.match_key.payload_regex).expect("regex");
169 let config = KvTableConfig::new(
170 3,
171 vec![
172 TableColumnConfig::new("entity", DataType::Utf8, false),
173 TableColumnConfig::new("version", DataType::UInt64, false),
174 ],
175 vec!["entity".to_string(), "version".to_string()],
176 vec![],
177 )
178 .expect("config");
179 let model = TableModel::from_config(&config).expect("model");
180 let short_entity = CellValue::Utf8("a".to_string());
181 let long_entity = CellValue::Utf8("alpha\x00beta".to_string());
182 let short_key =
183 encode_primary_key(3, &[&short_entity, &CellValue::UInt64(1)], &model).expect("key");
184 let long_key =
185 encode_primary_key(3, &[&long_entity, &CellValue::UInt64(2)], &model).expect("key");
186 let codec = model.primary_key_codec;
187
188 for key in [&short_key, &long_key] {
189 let payload = codec
190 .read_payload(key, 0, codec.payload_capacity_bytes_for_key_len(key.len()))
191 .expect("payload");
192 let captures = regex.captures(&payload).expect("captures");
193 assert_eq!(
194 captures.get(0).expect("full match").as_bytes(),
195 payload.as_slice()
196 );
197 assert_eq!(
198 captures.name("version").expect("version").as_bytes().len(),
199 8
200 );
201 assert!(
202 captures
203 .name("entity")
204 .expect("entity")
205 .as_bytes()
206 .ends_with(&[0x00]),
207 "ordered UTF-8 entity encoding should include the terminator"
208 );
209 }
210 }
211}