reddb_server/storage/timeseries/
retention.rs1#[derive(Debug, Clone)]
15pub struct RetentionPolicy {
16 pub max_age_ns: u64,
18 pub resolution_tier: Option<String>,
20}
21
22impl RetentionPolicy {
23 pub fn from_secs(secs: u64) -> Self {
25 Self {
26 max_age_ns: secs * 1_000_000_000,
27 resolution_tier: None,
28 }
29 }
30
31 pub fn from_days(days: u64) -> Self {
33 Self::from_secs(days * 86400)
34 }
35
36 pub fn is_expired(&self, timestamp_ns: u64, now_ns: u64) -> bool {
38 now_ns.saturating_sub(timestamp_ns) > self.max_age_ns
39 }
40
41 pub fn cutoff_ns(&self, now_ns: u64) -> u64 {
43 now_ns.saturating_sub(self.max_age_ns)
44 }
45}
46
47#[derive(Debug, Clone)]
49pub struct DownsamplePolicy {
50 pub source: String,
52 pub target: String,
54 pub aggregation: String,
56 pub bucket_ns: u64,
58}
59
60impl DownsamplePolicy {
61 pub fn parse(spec: &str) -> Option<Self> {
64 let parts: Vec<&str> = spec.split(':').collect();
65 if parts.len() < 2 {
66 return None;
67 }
68 let target = parts[0].to_string();
69 let source = parts[1].to_string();
70 let aggregation = if parts.len() > 2 {
71 parts[2].to_string()
72 } else {
73 "avg".to_string()
74 };
75 let bucket_ns = parse_duration_ns(&target)?;
76
77 Some(Self {
78 source,
79 target,
80 aggregation,
81 bucket_ns,
82 })
83 }
84}
85
86pub fn parse_duration_ns(s: &str) -> Option<u64> {
94 let s = s.trim();
95 if s == "raw" {
96 return Some(0);
97 }
98
99 let split = s.find(|c: char| !c.is_ascii_digit()).map(|i| s.split_at(i));
102 if let Some((num_part, rest)) = split {
103 if !num_part.is_empty() {
104 let unit_word = rest.trim_start();
105 if rest.starts_with(|c: char| c.is_ascii_whitespace()) {
109 if let Some(mult) = long_form_multiplier(unit_word) {
110 let num: u64 = num_part.parse().ok()?;
111 return num.checked_mul(mult);
112 }
113 return None;
114 }
115 }
116 }
117
118 let (num_str, unit) = if let Some(stripped) = s.strip_suffix("ms") {
120 (stripped, "ms")
121 } else if let Some(stripped) = s.strip_suffix('s') {
122 (stripped, "s")
123 } else if let Some(stripped) = s.strip_suffix('m') {
124 (stripped, "m")
125 } else if let Some(stripped) = s.strip_suffix('h') {
126 (stripped, "h")
127 } else if let Some(stripped) = s.strip_suffix('d') {
128 (stripped, "d")
129 } else {
130 return None;
131 };
132
133 let num: u64 = num_str.parse().ok()?;
134 let multiplier = match unit {
135 "ms" => 1_000_000,
136 "s" => 1_000_000_000,
137 "m" => 60_000_000_000,
138 "h" => 3_600_000_000_000,
139 "d" => 86_400_000_000_000,
140 _ => return None,
141 };
142
143 Some(num * multiplier)
144}
145
146fn long_form_multiplier(unit: &str) -> Option<u64> {
150 match unit.to_ascii_lowercase().as_str() {
151 "ms" | "msec" | "msecs" | "millisecond" | "milliseconds" => Some(1_000_000),
152 "s" | "sec" | "secs" | "second" | "seconds" => Some(1_000_000_000),
153 "m" | "min" | "mins" | "minute" | "minutes" => Some(60_000_000_000),
154 "h" | "hr" | "hrs" | "hour" | "hours" => Some(3_600_000_000_000),
155 "d" | "day" | "days" => Some(86_400_000_000_000),
156 _ => None,
157 }
158}
159
160use std::collections::HashMap;
165use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
166use std::sync::{Arc, Mutex};
167use std::thread;
168use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
169
170pub trait RetentionBackend: Send + Sync {
175 fn time_series_collections(&self) -> Vec<String>;
177
178 fn drop_chunks_older_than(&self, collection: &str, cutoff_ns: u64) -> u64;
181}
182
183fn now_ns() -> u64 {
184 SystemTime::now()
185 .duration_since(UNIX_EPOCH)
186 .map(|d| d.as_nanos() as u64)
187 .unwrap_or(0)
188}
189
190#[derive(Debug, Default, Clone)]
192pub struct RetentionStats {
193 pub cycles: u64,
194 pub policies_evaluated: u64,
195 pub chunks_dropped: u64,
196 pub last_sweep_unix_ns: u64,
197}
198
199#[derive(Clone)]
202pub struct RetentionRegistry {
203 inner: Arc<Inner>,
204}
205
206struct Inner {
207 policies: Mutex<HashMap<String, RetentionPolicy>>,
208 stats: Mutex<RetentionStats>,
209 running: AtomicBool,
210 interval_ms: AtomicU64,
211}
212
213impl Default for RetentionRegistry {
214 fn default() -> Self {
215 Self::new()
216 }
217}
218
219impl std::fmt::Debug for RetentionRegistry {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 f.debug_struct("RetentionRegistry")
222 .field(
223 "policies",
224 &self.inner.policies.lock().map(|m| m.len()).unwrap_or(0),
225 )
226 .field("running", &self.inner.running.load(Ordering::SeqCst))
227 .finish()
228 }
229}
230
231impl RetentionRegistry {
232 pub fn new() -> Self {
233 Self {
234 inner: Arc::new(Inner {
235 policies: Mutex::new(HashMap::new()),
236 stats: Mutex::new(RetentionStats::default()),
237 running: AtomicBool::new(false),
238 interval_ms: AtomicU64::new(60_000),
239 }),
240 }
241 }
242
243 pub fn set_policy(&self, collection: impl Into<String>, policy: RetentionPolicy) {
245 let mut guard = match self.inner.policies.lock() {
246 Ok(g) => g,
247 Err(p) => p.into_inner(),
248 };
249 guard.insert(collection.into(), policy);
250 }
251
252 pub fn remove_policy(&self, collection: &str) -> Option<RetentionPolicy> {
254 let mut guard = match self.inner.policies.lock() {
255 Ok(g) => g,
256 Err(p) => p.into_inner(),
257 };
258 guard.remove(collection)
259 }
260
261 pub fn list_policies(&self) -> Vec<(String, RetentionPolicy)> {
262 let guard = match self.inner.policies.lock() {
263 Ok(g) => g,
264 Err(p) => p.into_inner(),
265 };
266 let mut out: Vec<(String, RetentionPolicy)> =
267 guard.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
268 out.sort_by(|a, b| a.0.cmp(&b.0));
269 out
270 }
271
272 pub fn get_policy(&self, collection: &str) -> Option<RetentionPolicy> {
273 let guard = match self.inner.policies.lock() {
274 Ok(g) => g,
275 Err(p) => p.into_inner(),
276 };
277 guard.get(collection).cloned()
278 }
279
280 pub fn stats(&self) -> RetentionStats {
281 let guard = match self.inner.stats.lock() {
282 Ok(g) => g,
283 Err(p) => p.into_inner(),
284 };
285 guard.clone()
286 }
287
288 pub fn set_interval_ms(&self, ms: u64) {
289 self.inner.interval_ms.store(ms.max(100), Ordering::SeqCst);
290 }
291
292 pub fn sweep_once(&self, backend: &dyn RetentionBackend) -> u64 {
296 let now = now_ns();
297 let policies: Vec<(String, RetentionPolicy)> = self.list_policies();
298 let available: std::collections::HashSet<String> =
299 backend.time_series_collections().into_iter().collect();
300
301 let mut evaluated = 0u64;
302 let mut dropped_total = 0u64;
303 for (collection, policy) in &policies {
304 if !available.contains(collection) {
305 continue; }
307 evaluated += 1;
308 let cutoff = policy.cutoff_ns(now);
309 if cutoff == 0 {
310 continue; }
312 let dropped = backend.drop_chunks_older_than(collection, cutoff);
313 dropped_total += dropped;
314 }
315
316 let mut stats = match self.inner.stats.lock() {
317 Ok(g) => g,
318 Err(p) => p.into_inner(),
319 };
320 stats.cycles += 1;
321 stats.policies_evaluated += evaluated;
322 stats.chunks_dropped += dropped_total;
323 stats.last_sweep_unix_ns = now;
324 dropped_total
325 }
326
327 pub fn start(&self, backend: Arc<dyn RetentionBackend>) -> RetentionDaemonHandle {
332 if self
333 .inner
334 .running
335 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
336 .is_err()
337 {
338 return RetentionDaemonHandle {
339 inner: Arc::clone(&self.inner),
340 join: None,
341 };
342 }
343 let inner = Arc::clone(&self.inner);
344 let registry = self.clone();
345 let handle = thread::spawn(move || {
346 while inner.running.load(Ordering::SeqCst) {
347 let _ = registry.sweep_once(backend.as_ref());
348 let interval_ms = inner.interval_ms.load(Ordering::SeqCst);
349 let deadline = Instant::now() + Duration::from_millis(interval_ms);
350 while Instant::now() < deadline && inner.running.load(Ordering::SeqCst) {
351 thread::sleep(Duration::from_millis(50.min(interval_ms)));
352 }
353 }
354 });
355 RetentionDaemonHandle {
356 inner: Arc::clone(&self.inner),
357 join: Some(handle),
358 }
359 }
360
361 pub fn is_running(&self) -> bool {
362 self.inner.running.load(Ordering::SeqCst)
363 }
364
365 pub fn stop(&self) {
366 self.inner.running.store(false, Ordering::SeqCst);
367 }
368}
369
370pub struct RetentionDaemonHandle {
375 inner: Arc<Inner>,
376 join: Option<thread::JoinHandle<()>>,
377}
378
379impl RetentionDaemonHandle {
380 pub fn stop(mut self) {
381 self.inner.running.store(false, Ordering::SeqCst);
382 if let Some(handle) = self.join.take() {
383 let _ = handle.join();
384 }
385 }
386
387 pub fn detach(mut self) {
388 self.join.take();
389 }
390}
391
392impl Drop for RetentionDaemonHandle {
393 fn drop(&mut self) {
394 self.inner.running.store(false, Ordering::SeqCst);
395 if let Some(handle) = self.join.take() {
396 let _ = handle.join();
397 }
398 }
399}
400
401#[cfg(test)]
402mod tests {
403 use super::*;
404
405 #[test]
406 fn test_retention_policy() {
407 let policy = RetentionPolicy::from_days(30);
408 let now = 5_000_000_000_000_000u64; let old = now - 31 * 86_400_000_000_000; let recent = now - 1_000_000_000; assert!(policy.is_expired(old, now));
413 assert!(!policy.is_expired(recent, now));
414 }
415
416 #[test]
417 fn test_parse_duration() {
418 assert_eq!(parse_duration_ns("5m"), Some(300_000_000_000));
419 assert_eq!(parse_duration_ns("1h"), Some(3_600_000_000_000));
420 assert_eq!(parse_duration_ns("30s"), Some(30_000_000_000));
421 assert_eq!(parse_duration_ns("1d"), Some(86_400_000_000_000));
422 assert_eq!(parse_duration_ns("100ms"), Some(100_000_000));
423 assert_eq!(parse_duration_ns("raw"), Some(0));
424 assert_eq!(parse_duration_ns("invalid"), None);
425 }
426
427 #[test]
428 fn test_downsample_policy_parse() {
429 let policy = DownsamplePolicy::parse("1h:5m:avg").unwrap();
430 assert_eq!(policy.target, "1h");
431 assert_eq!(policy.source, "5m");
432 assert_eq!(policy.aggregation, "avg");
433 assert_eq!(policy.bucket_ns, 3_600_000_000_000);
434 }
435
436 use std::sync::atomic::{AtomicU64, Ordering};
441
442 struct MockBackend {
446 collections: Mutex<Vec<String>>,
447 drops: Mutex<Vec<(String, u64)>>,
448 drop_return: AtomicU64,
449 }
450
451 impl MockBackend {
452 fn new(collections: Vec<&str>) -> Arc<Self> {
453 Arc::new(Self {
454 collections: Mutex::new(collections.into_iter().map(String::from).collect()),
455 drops: Mutex::new(Vec::new()),
456 drop_return: AtomicU64::new(0),
457 })
458 }
459
460 fn set_drop_return(&self, n: u64) {
461 self.drop_return.store(n, Ordering::SeqCst);
462 }
463
464 fn drops(&self) -> Vec<(String, u64)> {
465 self.drops.lock().unwrap().clone()
466 }
467 }
468
469 impl RetentionBackend for MockBackend {
470 fn time_series_collections(&self) -> Vec<String> {
471 self.collections.lock().unwrap().clone()
472 }
473
474 fn drop_chunks_older_than(&self, collection: &str, cutoff_ns: u64) -> u64 {
475 self.drops
476 .lock()
477 .unwrap()
478 .push((collection.to_string(), cutoff_ns));
479 self.drop_return.load(Ordering::SeqCst)
480 }
481 }
482
483 #[test]
484 fn registry_set_and_get_policy_round_trips() {
485 let reg = RetentionRegistry::new();
486 reg.set_policy("metrics", RetentionPolicy::from_days(30));
487 let fetched = reg.get_policy("metrics").unwrap();
488 assert_eq!(fetched.max_age_ns, 30 * 86_400_000_000_000);
489 }
490
491 #[test]
492 fn registry_list_is_sorted_by_collection() {
493 let reg = RetentionRegistry::new();
494 reg.set_policy("z", RetentionPolicy::from_days(1));
495 reg.set_policy("a", RetentionPolicy::from_days(1));
496 reg.set_policy("m", RetentionPolicy::from_days(1));
497 let names: Vec<_> = reg.list_policies().into_iter().map(|(k, _)| k).collect();
498 assert_eq!(names, vec!["a", "m", "z"]);
499 }
500
501 #[test]
502 fn registry_remove_policy_returns_previous_value() {
503 let reg = RetentionRegistry::new();
504 reg.set_policy("metrics", RetentionPolicy::from_days(7));
505 let removed = reg.remove_policy("metrics").unwrap();
506 assert_eq!(removed.max_age_ns, 7 * 86_400_000_000_000);
507 assert!(reg.get_policy("metrics").is_none());
508 }
509
510 #[test]
511 fn sweep_skips_collections_without_backend_presence() {
512 let reg = RetentionRegistry::new();
513 reg.set_policy("gone", RetentionPolicy::from_days(1));
514 let backend = MockBackend::new(vec![]);
515 reg.sweep_once(backend.as_ref());
516 assert!(backend.drops().is_empty());
517 }
518
519 #[test]
520 fn sweep_calls_backend_with_policy_cutoff() {
521 let reg = RetentionRegistry::new();
522 reg.set_policy("metrics", RetentionPolicy::from_days(1));
523 let backend = MockBackend::new(vec!["metrics"]);
524 backend.set_drop_return(3);
525 let dropped = reg.sweep_once(backend.as_ref());
526 assert_eq!(dropped, 3);
527 let drops = backend.drops();
528 assert_eq!(drops.len(), 1);
529 assert_eq!(drops[0].0, "metrics");
530 assert!(drops[0].1 > 0);
531 let stats = reg.stats();
532 assert_eq!(stats.cycles, 1);
533 assert_eq!(stats.policies_evaluated, 1);
534 assert_eq!(stats.chunks_dropped, 3);
535 }
536
537 #[test]
538 fn sweep_evaluates_every_matching_collection() {
539 let reg = RetentionRegistry::new();
540 reg.set_policy("a", RetentionPolicy::from_days(1));
541 reg.set_policy("b", RetentionPolicy::from_days(1));
542 let backend = MockBackend::new(vec!["a", "b", "c"]);
543 backend.set_drop_return(1);
544 let dropped = reg.sweep_once(backend.as_ref());
545 assert_eq!(dropped, 2);
546 assert_eq!(backend.drops().len(), 2);
547 }
548
549 #[test]
550 fn daemon_sweeps_repeatedly_and_stops_on_drop() {
551 let reg = RetentionRegistry::new();
552 reg.set_policy("metrics", RetentionPolicy::from_days(1));
553 reg.set_interval_ms(100);
554 let backend = MockBackend::new(vec!["metrics"]);
555 backend.set_drop_return(0);
556 let handle = reg.start(backend.clone());
557 std::thread::sleep(std::time::Duration::from_millis(350));
559 assert!(reg.is_running());
560 drop(handle); assert!(!reg.is_running());
562 let drops = backend.drops();
563 assert!(
564 drops.len() >= 2,
565 "expected >= 2 sweep cycles, got {}",
566 drops.len()
567 );
568 }
569
570 #[test]
571 fn start_is_idempotent() {
572 let reg = RetentionRegistry::new();
573 reg.set_interval_ms(500);
574 let backend = MockBackend::new(vec![]);
575 let h1 = reg.start(backend.clone());
576 let h2 = reg.start(backend.clone());
577 h2.stop();
579 drop(h1);
581 assert!(!reg.is_running());
582 }
583}