1use std::fmt;
8
9use object_store::{Attribute, Attributes, PutOptions, TagSet};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
20pub enum StorageClass {
21 Standard,
23 ExpressOneZone,
25 IntelligentTiering,
27 GlacierInstantRetrieval,
29}
30
31impl StorageClass {
32 #[must_use]
34 pub fn as_s3_str(self) -> &'static str {
35 match self {
36 Self::Standard => "STANDARD",
37 Self::ExpressOneZone => "EXPRESS_ONEZONE",
38 Self::IntelligentTiering => "INTELLIGENT_TIERING",
39 Self::GlacierInstantRetrieval => "GLACIER_IR",
40 }
41 }
42
43 #[must_use]
45 pub fn from_config(s: &str) -> Option<Self> {
46 match s.to_ascii_uppercase().replace('-', "_").as_str() {
47 "STANDARD" => Some(Self::Standard),
48 "EXPRESS_ONEZONE" | "EXPRESS_ONE_ZONE" => Some(Self::ExpressOneZone),
49 "INTELLIGENT_TIERING" => Some(Self::IntelligentTiering),
50 "GLACIER_IR" | "GLACIER_INSTANT_RETRIEVAL" => Some(Self::GlacierInstantRetrieval),
51 _ => None,
52 }
53 }
54}
55
56impl fmt::Display for StorageClass {
57 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58 f.write_str(self.as_s3_str())
59 }
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
72pub enum StorageTier {
73 Hot,
75 Warm,
77 Cold,
79}
80
81impl StorageTier {
82 #[must_use]
84 pub fn tag_value(self) -> &'static str {
85 match self {
86 Self::Hot => "hot",
87 Self::Warm => "warm",
88 Self::Cold => "cold",
89 }
90 }
91}
92
93impl fmt::Display for StorageTier {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 f.write_str(self.tag_value())
96 }
97}
98
99#[derive(Debug, Clone)]
108#[allow(clippy::struct_field_names)]
109pub struct TieringPolicy {
110 hot_class: StorageClass,
111 warm_class: StorageClass,
112 cold_class: Option<StorageClass>,
113}
114
115impl TieringPolicy {
116 #[must_use]
120 pub fn new(hot_class: &str, warm_class: &str, cold_class: &str) -> Self {
121 Self {
122 hot_class: StorageClass::from_config(hot_class).unwrap_or(StorageClass::Standard),
123 warm_class: StorageClass::from_config(warm_class).unwrap_or(StorageClass::Standard),
124 cold_class: if cold_class.is_empty() {
125 None
126 } else {
127 StorageClass::from_config(cold_class)
128 },
129 }
130 }
131
132 #[must_use]
134 pub fn standard() -> Self {
135 Self {
136 hot_class: StorageClass::Standard,
137 warm_class: StorageClass::Standard,
138 cold_class: None,
139 }
140 }
141
142 #[must_use]
144 pub fn storage_class(&self, tier: StorageTier) -> StorageClass {
145 match tier {
146 StorageTier::Hot => self.hot_class,
147 StorageTier::Warm => self.warm_class,
148 StorageTier::Cold => self.cold_class.unwrap_or(self.warm_class),
149 }
150 }
151
152 #[must_use]
154 pub fn has_cold_tier(&self) -> bool {
155 self.cold_class.is_some()
156 }
157
158 #[must_use]
164 pub fn put_options(&self, tier: StorageTier) -> PutOptions {
165 let class = self.storage_class(tier);
166
167 let mut attrs = Attributes::new();
168 attrs.insert(Attribute::StorageClass, class.as_s3_str().into());
169
170 let mut tags = TagSet::default();
171 tags.push("laminardb-tier", tier.tag_value());
172
173 PutOptions {
174 attributes: attrs,
175 tags,
176 ..PutOptions::default()
177 }
178 }
179
180 #[must_use]
182 pub fn put_options_create(&self, tier: StorageTier) -> PutOptions {
183 let mut opts = self.put_options(tier);
184 opts.mode = object_store::PutMode::Create;
185 opts
186 }
187}
188
189impl Default for TieringPolicy {
190 fn default() -> Self {
191 Self::standard()
192 }
193}
194
195#[cfg(test)]
200const ZSTD_WARM_LEVEL: i32 = 3;
202
203#[cfg(test)]
204const ZSTD_COLD_LEVEL: i32 = 19;
206
207#[cfg(test)]
208fn compress_for_tier(data: &[u8], tier: StorageTier) -> Vec<u8> {
209 match tier {
210 StorageTier::Hot => lz4_flex::compress_prepend_size(data),
211 StorageTier::Warm => zstd::encode_all(data, ZSTD_WARM_LEVEL).unwrap(),
212 StorageTier::Cold => zstd::encode_all(data, ZSTD_COLD_LEVEL).unwrap(),
213 }
214}
215
216#[cfg(test)]
217fn decompress_for_tier(
218 compressed: &[u8],
219 tier: StorageTier,
220) -> Result<Vec<u8>, DecompressionError> {
221 match tier {
222 StorageTier::Hot => lz4_flex::decompress_size_prepended(compressed)
223 .map_err(|e| DecompressionError(format!("LZ4 decompression failed: {e}"))),
224 StorageTier::Warm | StorageTier::Cold => zstd::decode_all(compressed)
225 .map_err(|e| DecompressionError(format!("Zstd decompression failed: {e}"))),
226 }
227}
228
229#[cfg(test)]
231#[derive(Debug, thiserror::Error)]
232#[error("{0}")]
233pub struct DecompressionError(String);
234
235#[cfg(test)]
240mod tests {
241 use super::*;
242
243 #[test]
246 fn test_storage_class_s3_strings() {
247 assert_eq!(StorageClass::Standard.as_s3_str(), "STANDARD");
248 assert_eq!(StorageClass::ExpressOneZone.as_s3_str(), "EXPRESS_ONEZONE");
249 assert_eq!(
250 StorageClass::IntelligentTiering.as_s3_str(),
251 "INTELLIGENT_TIERING"
252 );
253 assert_eq!(
254 StorageClass::GlacierInstantRetrieval.as_s3_str(),
255 "GLACIER_IR"
256 );
257 }
258
259 #[test]
260 fn test_storage_class_from_config() {
261 assert_eq!(
262 StorageClass::from_config("STANDARD"),
263 Some(StorageClass::Standard)
264 );
265 assert_eq!(
266 StorageClass::from_config("express_one_zone"),
267 Some(StorageClass::ExpressOneZone)
268 );
269 assert_eq!(
270 StorageClass::from_config("EXPRESS-ONEZONE"),
271 Some(StorageClass::ExpressOneZone)
272 );
273 assert_eq!(
274 StorageClass::from_config("intelligent_tiering"),
275 Some(StorageClass::IntelligentTiering)
276 );
277 assert_eq!(
278 StorageClass::from_config("GLACIER_IR"),
279 Some(StorageClass::GlacierInstantRetrieval)
280 );
281 assert_eq!(
282 StorageClass::from_config("glacier_instant_retrieval"),
283 Some(StorageClass::GlacierInstantRetrieval)
284 );
285 assert_eq!(StorageClass::from_config("unknown"), None);
286 assert_eq!(StorageClass::from_config(""), None);
287 }
288
289 #[test]
290 fn test_storage_class_display() {
291 assert_eq!(format!("{}", StorageClass::Standard), "STANDARD");
292 assert_eq!(
293 format!("{}", StorageClass::GlacierInstantRetrieval),
294 "GLACIER_IR"
295 );
296 }
297
298 #[test]
301 fn test_tier_tag_values() {
302 assert_eq!(StorageTier::Hot.tag_value(), "hot");
303 assert_eq!(StorageTier::Warm.tag_value(), "warm");
304 assert_eq!(StorageTier::Cold.tag_value(), "cold");
305 }
306
307 #[test]
310 fn test_policy_from_config() {
311 let policy = TieringPolicy::new("EXPRESS_ONE_ZONE", "STANDARD", "GLACIER_IR");
312 assert_eq!(
313 policy.storage_class(StorageTier::Hot),
314 StorageClass::ExpressOneZone
315 );
316 assert_eq!(
317 policy.storage_class(StorageTier::Warm),
318 StorageClass::Standard
319 );
320 assert_eq!(
321 policy.storage_class(StorageTier::Cold),
322 StorageClass::GlacierInstantRetrieval
323 );
324 assert!(policy.has_cold_tier());
325 }
326
327 #[test]
328 fn test_policy_no_cold_tier() {
329 let policy = TieringPolicy::new("STANDARD", "STANDARD", "");
330 assert!(!policy.has_cold_tier());
331 assert_eq!(
333 policy.storage_class(StorageTier::Cold),
334 StorageClass::Standard
335 );
336 }
337
338 #[test]
339 fn test_policy_standard_default() {
340 let policy = TieringPolicy::standard();
341 assert_eq!(
342 policy.storage_class(StorageTier::Hot),
343 StorageClass::Standard
344 );
345 assert_eq!(
346 policy.storage_class(StorageTier::Warm),
347 StorageClass::Standard
348 );
349 assert!(!policy.has_cold_tier());
350 }
351
352 #[test]
353 fn test_policy_unknown_config_falls_back() {
354 let policy = TieringPolicy::new("NONEXISTENT", "ALSO_BAD", "");
355 assert_eq!(
356 policy.storage_class(StorageTier::Hot),
357 StorageClass::Standard
358 );
359 assert_eq!(
360 policy.storage_class(StorageTier::Warm),
361 StorageClass::Standard
362 );
363 }
364
365 #[test]
366 fn test_put_options_has_storage_class() {
367 let policy = TieringPolicy::new("EXPRESS_ONE_ZONE", "STANDARD", "GLACIER_IR");
368 let opts = policy.put_options(StorageTier::Hot);
369
370 let class = opts.attributes.get(&Attribute::StorageClass);
371 assert!(class.is_some());
372 assert_eq!(class.unwrap().as_ref(), "EXPRESS_ONEZONE");
373 }
374
375 #[test]
376 fn test_put_options_has_tier_tag() {
377 let policy = TieringPolicy::new("STANDARD", "STANDARD", "");
378 let opts = policy.put_options(StorageTier::Warm);
379
380 let encoded = opts.tags.encoded();
381 assert!(
382 encoded.contains("laminardb-tier=warm"),
383 "tag encoding: {encoded}"
384 );
385 }
386
387 #[test]
388 fn test_put_options_create_mode() {
389 let policy = TieringPolicy::standard();
390 let opts = policy.put_options_create(StorageTier::Hot);
391 assert!(matches!(opts.mode, object_store::PutMode::Create));
392 }
393
394 #[test]
397 fn test_lz4_roundtrip_hot() {
398 let data = b"checkpoint state data for hot tier recovery";
399 let compressed = compress_for_tier(data, StorageTier::Hot);
400 let decompressed = decompress_for_tier(&compressed, StorageTier::Hot).unwrap();
401 assert_eq!(decompressed, data);
402 }
403
404 #[test]
405 fn test_zstd_roundtrip_warm() {
406 let data = b"checkpoint state data for warm tier archival";
407 let compressed = compress_for_tier(data, StorageTier::Warm);
408 let decompressed = decompress_for_tier(&compressed, StorageTier::Warm).unwrap();
409 assert_eq!(decompressed, data);
410 }
411
412 #[test]
413 fn test_zstd_roundtrip_cold() {
414 let data = b"checkpoint state data for cold tier archive compliance";
415 let compressed = compress_for_tier(data, StorageTier::Cold);
416 let decompressed = decompress_for_tier(&compressed, StorageTier::Cold).unwrap();
417 assert_eq!(decompressed, data);
418 }
419
420 #[test]
421 fn test_cold_compresses_better_than_warm() {
422 let data: Vec<u8> = (0u8..=255).cycle().take(10_000).collect();
424 let warm = compress_for_tier(&data, StorageTier::Warm);
425 let cold = compress_for_tier(&data, StorageTier::Cold);
426 assert!(
427 cold.len() <= warm.len(),
428 "cold ({}) should be <= warm ({})",
429 cold.len(),
430 warm.len()
431 );
432 }
433
434 #[test]
435 fn test_decompress_corrupt_data() {
436 let bad = b"not valid compressed data";
437 assert!(decompress_for_tier(bad, StorageTier::Hot).is_err());
438 assert!(decompress_for_tier(bad, StorageTier::Warm).is_err());
439 assert!(decompress_for_tier(bad, StorageTier::Cold).is_err());
440 }
441}