1use exoware_sdk_rs::kv_codec::Utf8;
2use exoware_sdk_rs::prune_policy::{
3 GroupBy, MatchKey, OrderBy, OrderEncoding, PrunePolicy, RetainPolicy,
4};
5
6use crate::codec::primary_key_codec;
7use crate::types::PRIMARY_RESERVED_BITS;
8
9const VERSION_WIDTH_BYTES: usize = 8;
10const ORDERED_UTF8_REGEX: &str = r"(?:\x01[\x00-\x02]|[^\x00\x01\xFF])*\x00";
11
12fn keep_latest_versions_with_regex(
13 table_prefix: u8,
14 min_entity_bytes: usize,
15 payload_regex: impl Into<Utf8>,
16 count: usize,
17) -> Result<PrunePolicy, String> {
18 let payload_regex = payload_regex.into();
19 if count == 0 {
20 return Err("keep_latest_versions count must be > 0".to_string());
21 }
22 let codec = primary_key_codec(table_prefix)?;
23 let required_bytes = min_entity_bytes
24 .checked_add(VERSION_WIDTH_BYTES)
25 .ok_or_else(|| "entity width overflowed when adding version width".to_string())?;
26 if required_bytes > codec.payload_capacity_bytes() {
27 return Err(format!(
28 "entity width {min_entity_bytes} plus version width {VERSION_WIDTH_BYTES} exceeds primary key payload capacity {}",
29 codec.payload_capacity_bytes()
30 ));
31 }
32
33 Ok(PrunePolicy {
34 match_key: MatchKey {
35 reserved_bits: PRIMARY_RESERVED_BITS,
36 prefix: codec.prefix(),
37 payload_regex,
38 },
39 group_by: GroupBy {
40 capture_groups: vec![Utf8::from("entity")],
41 },
42 order_by: Some(OrderBy {
43 capture_group: Utf8::from("version"),
44 encoding: OrderEncoding::U64Be,
45 }),
46 retain: RetainPolicy::KeepLatest { count },
47 })
48}
49
50pub fn keep_latest_versions(
56 table_prefix: u8,
57 entity_key_width: usize,
58 count: usize,
59) -> Result<PrunePolicy, String> {
60 keep_latest_versions_with_regex(
61 table_prefix,
62 entity_key_width,
63 format!(
64 r"(?s-u)^(?P<entity>.{{{entity_key_width}}})(?P<version>.{{{VERSION_WIDTH_BYTES}}})$"
65 ),
66 count,
67 )
68}
69
70pub fn keep_latest_versions_utf8(table_prefix: u8, count: usize) -> Result<PrunePolicy, String> {
77 keep_latest_versions_with_regex(
78 table_prefix,
79 1,
80 format!(r"(?s-u)^(?P<entity>{ORDERED_UTF8_REGEX})(?P<version>.{{{VERSION_WIDTH_BYTES}}})$"),
81 count,
82 )
83}
84
85#[cfg(test)]
86mod tests {
87 use super::{keep_latest_versions, keep_latest_versions_utf8, ORDERED_UTF8_REGEX};
88 use crate::codec::encode_primary_key;
89 use crate::types::{KvTableConfig, TableColumnConfig, TableModel};
90 use crate::CellValue;
91 use datafusion::arrow::datatypes::DataType;
92 use exoware_sdk_rs::kv_codec::Utf8;
93 use exoware_sdk_rs::prune_policy::{
94 compile_payload_regex, validate_policy, OrderEncoding, RetainPolicy,
95 };
96
97 #[test]
98 fn keep_latest_versions_builds_expected_policy_for_fixed_width_entity() {
99 let policy = keep_latest_versions(3, 32, 1).expect("policy");
100 assert_eq!(policy.match_key.reserved_bits, 5);
101 assert_eq!(policy.match_key.prefix, 6);
102 assert_eq!(
103 policy.match_key.payload_regex,
104 r"(?s-u)^(?P<entity>.{32})(?P<version>.{8})$"
105 );
106 assert_eq!(policy.group_by.capture_groups, vec![Utf8::from("entity")]);
107 assert_eq!(
108 &*policy.order_by.as_ref().expect("order_by").capture_group,
109 "version"
110 );
111 assert_eq!(
112 policy.order_by.as_ref().expect("order_by").encoding,
113 OrderEncoding::U64Be
114 );
115 assert_eq!(policy.retain, RetainPolicy::KeepLatest { count: 1 });
116 validate_policy(&policy).expect("policy should validate");
117 }
118
119 #[test]
120 fn keep_latest_versions_rejects_zero_count() {
121 let err = keep_latest_versions(3, 32, 0).expect_err("zero count should fail");
122 assert!(err.contains("count must be > 0"));
123 }
124
125 #[test]
126 fn keep_latest_versions_rejects_oversized_entity_width() {
127 let err = keep_latest_versions(3, 1000, 1).expect_err("oversized entity should fail");
128 assert!(err.contains("exceeds primary key payload capacity"));
129 }
130
131 #[test]
132 fn keep_latest_versions_utf8_builds_expected_policy() {
133 let policy = keep_latest_versions_utf8(3, 1).expect("policy");
134 assert_eq!(policy.match_key.reserved_bits, 5);
135 assert_eq!(policy.match_key.prefix, 6);
136 assert_eq!(
137 policy.match_key.payload_regex,
138 format!(r"(?s-u)^(?P<entity>{ORDERED_UTF8_REGEX})(?P<version>.{{8}})$")
139 );
140 assert_eq!(policy.group_by.capture_groups, vec![Utf8::from("entity")]);
141 assert_eq!(
142 &*policy.order_by.as_ref().expect("order_by").capture_group,
143 "version"
144 );
145 assert_eq!(
146 policy.order_by.as_ref().expect("order_by").encoding,
147 OrderEncoding::U64Be
148 );
149 assert_eq!(policy.retain, RetainPolicy::KeepLatest { count: 1 });
150 validate_policy(&policy).expect("policy should validate");
151 }
152
153 #[test]
154 fn keep_latest_versions_utf8_matches_variable_length_entity_payloads() {
155 let policy = keep_latest_versions_utf8(3, 1).expect("policy");
156 let regex = compile_payload_regex(&policy.match_key.payload_regex).expect("regex");
157 let config = KvTableConfig::new(
158 3,
159 vec![
160 TableColumnConfig::new("entity", DataType::Utf8, false),
161 TableColumnConfig::new("version", DataType::UInt64, false),
162 ],
163 vec!["entity".to_string(), "version".to_string()],
164 vec![],
165 )
166 .expect("config");
167 let model = TableModel::from_config(&config).expect("model");
168 let short_entity = CellValue::Utf8("a".to_string());
169 let long_entity = CellValue::Utf8("alpha\x00beta".to_string());
170 let short_key =
171 encode_primary_key(3, &[&short_entity, &CellValue::UInt64(1)], &model).expect("key");
172 let long_key =
173 encode_primary_key(3, &[&long_entity, &CellValue::UInt64(2)], &model).expect("key");
174 let codec = model.primary_key_codec;
175
176 for key in [&short_key, &long_key] {
177 let payload = codec
178 .read_payload(key, 0, codec.payload_capacity_bytes_for_key_len(key.len()))
179 .expect("payload");
180 let captures = regex.captures(&payload).expect("captures");
181 assert_eq!(
182 captures.get(0).expect("full match").as_bytes(),
183 payload.as_slice()
184 );
185 assert_eq!(
186 captures.name("version").expect("version").as_bytes().len(),
187 8
188 );
189 assert!(
190 captures
191 .name("entity")
192 .expect("entity")
193 .as_bytes()
194 .ends_with(&[0x00]),
195 "ordered UTF-8 entity encoding should include the terminator"
196 );
197 }
198 }
199}