Skip to main content

reddb_server/storage/timeseries/
retention.rs

1//! Retention policy for time-series data — declarative specs, a
2//! registry that survives restart, and a background daemon that
3//! sweeps expired chunks without the operator having to script
4//! anything.
5//!
6//! Timescale parity note: the daemon mirrors Timescale's
7//! `add_retention_policy` / `show_retention_policies` surface — you
8//! tell the engine "keep last 90 days of `metrics`" and it does the
9//! rest. The daemon is cooperative: it polls, acquires the chunk
10//! lifecycle lock only when it has real work, and never blocks
11//! writes.
12
13/// Retention policy configuration
14#[derive(Debug, Clone)]
15pub struct RetentionPolicy {
16    /// Maximum age in nanoseconds. Data older than this is eligible for deletion.
17    pub max_age_ns: u64,
18    /// Optional: only apply to a specific resolution tier
19    pub resolution_tier: Option<String>,
20}
21
22impl RetentionPolicy {
23    /// Create a retention policy with a duration in seconds
24    pub fn from_secs(secs: u64) -> Self {
25        Self {
26            max_age_ns: secs * 1_000_000_000,
27            resolution_tier: None,
28        }
29    }
30
31    /// Create a retention policy with a duration in days
32    pub fn from_days(days: u64) -> Self {
33        Self::from_secs(days * 86400)
34    }
35
36    /// Check if a timestamp is expired given the current time
37    pub fn is_expired(&self, timestamp_ns: u64, now_ns: u64) -> bool {
38        now_ns.saturating_sub(timestamp_ns) > self.max_age_ns
39    }
40
41    /// Get the cutoff timestamp (anything older should be deleted)
42    pub fn cutoff_ns(&self, now_ns: u64) -> u64 {
43        now_ns.saturating_sub(self.max_age_ns)
44    }
45}
46
47/// Downsample policy definition
48#[derive(Debug, Clone)]
49pub struct DownsamplePolicy {
50    /// Source resolution label (e.g., "raw", "1m", "5m")
51    pub source: String,
52    /// Target resolution label (e.g., "5m", "1h")
53    pub target: String,
54    /// Aggregation function to use (e.g., "avg", "max")
55    pub aggregation: String,
56    /// Target bucket size in nanoseconds
57    pub bucket_ns: u64,
58}
59
60impl DownsamplePolicy {
61    /// Parse a policy string like "1h:5m:avg"
62    /// Format: target_resolution:source_resolution:aggregation
63    pub fn parse(spec: &str) -> Option<Self> {
64        let parts: Vec<&str> = spec.split(':').collect();
65        if parts.len() < 2 {
66            return None;
67        }
68        let target = parts[0].to_string();
69        let source = parts[1].to_string();
70        let aggregation = if parts.len() > 2 {
71            parts[2].to_string()
72        } else {
73            "avg".to_string()
74        };
75        let bucket_ns = parse_duration_ns(&target)?;
76
77        Some(Self {
78            source,
79            target,
80            aggregation,
81            bucket_ns,
82        })
83    }
84}
85
86/// Parse a duration string into nanoseconds.
87///
88/// Accepts both the compact suffix form (`"5m"`, `"1h"`, `"30s"`) and
89/// the long, TimescaleDB-compatible form (`"1 day"`, `"2 hours"`,
90/// `"30 minutes"`, `"90 days"`). The number and unit may be separated
91/// by any run of ASCII whitespace; unit comparison is case-insensitive
92/// for the long form.
93pub fn parse_duration_ns(s: &str) -> Option<u64> {
94    let s = s.trim();
95    if s == "raw" {
96        return Some(0);
97    }
98
99    // Try the long form first: a leading integer, optional whitespace,
100    // then a word-style unit. If that splits cleanly we are done.
101    let split = s.find(|c: char| !c.is_ascii_digit()).map(|i| s.split_at(i));
102    if let Some((num_part, rest)) = split {
103        if !num_part.is_empty() {
104            let unit_word = rest.trim_start();
105            // If the unit slot contains whitespace before the suffix
106            // it is the long form; if it is glued to the digits it
107            // is the short form and we fall through.
108            if rest.starts_with(|c: char| c.is_ascii_whitespace()) {
109                if let Some(mult) = long_form_multiplier(unit_word) {
110                    let num: u64 = num_part.parse().ok()?;
111                    return num.checked_mul(mult);
112                }
113                return None;
114            }
115        }
116    }
117
118    // Compact suffix form: `"5m"`, `"1h"`, `"30s"`, `"100ms"`.
119    let (num_str, unit) = if let Some(stripped) = s.strip_suffix("ms") {
120        (stripped, "ms")
121    } else if let Some(stripped) = s.strip_suffix('s') {
122        (stripped, "s")
123    } else if let Some(stripped) = s.strip_suffix('m') {
124        (stripped, "m")
125    } else if let Some(stripped) = s.strip_suffix('h') {
126        (stripped, "h")
127    } else if let Some(stripped) = s.strip_suffix('d') {
128        (stripped, "d")
129    } else {
130        return None;
131    };
132
133    let num: u64 = num_str.parse().ok()?;
134    let multiplier = match unit {
135        "ms" => 1_000_000,
136        "s" => 1_000_000_000,
137        "m" => 60_000_000_000,
138        "h" => 3_600_000_000_000,
139        "d" => 86_400_000_000_000,
140        _ => return None,
141    };
142
143    Some(num * multiplier)
144}
145
146/// Long-form duration unit (e.g. `"day"`, `"hours"`, `"minutes"`) →
147/// nanosecond multiplier. Returns `None` for unrecognised words; the
148/// caller treats that as a parse failure.
149fn long_form_multiplier(unit: &str) -> Option<u64> {
150    match unit.to_ascii_lowercase().as_str() {
151        "ms" | "msec" | "msecs" | "millisecond" | "milliseconds" => Some(1_000_000),
152        "s" | "sec" | "secs" | "second" | "seconds" => Some(1_000_000_000),
153        "m" | "min" | "mins" | "minute" | "minutes" => Some(60_000_000_000),
154        "h" | "hr" | "hrs" | "hour" | "hours" => Some(3_600_000_000_000),
155        "d" | "day" | "days" => Some(86_400_000_000_000),
156        _ => None,
157    }
158}
159
160// =============================================================================
161// Retention registry + daemon (Timescale-parity Track A3)
162// =============================================================================
163
164use std::collections::HashMap;
165use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
166use std::sync::{Arc, Mutex};
167use std::thread;
168use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
169
170/// Backend contract the daemon uses to discover collections and drop
171/// chunks. Decouples the daemon from the storage service: the
172/// registry receives the trait object at startup, tests supply a
173/// mock, production wires the real store.
174pub trait RetentionBackend: Send + Sync {
175    /// Enumerate the time-series collections this backend owns.
176    fn time_series_collections(&self) -> Vec<String>;
177
178    /// Drop every chunk in `collection` whose max timestamp is at or
179    /// below `cutoff_ns`. Returns the number of chunks dropped.
180    fn drop_chunks_older_than(&self, collection: &str, cutoff_ns: u64) -> u64;
181}
182
183fn now_ns() -> u64 {
184    SystemTime::now()
185        .duration_since(UNIX_EPOCH)
186        .map(|d| d.as_nanos() as u64)
187        .unwrap_or(0)
188}
189
190/// Per-collection retention + daemon statistics.
191#[derive(Debug, Default, Clone)]
192pub struct RetentionStats {
193    pub cycles: u64,
194    pub policies_evaluated: u64,
195    pub chunks_dropped: u64,
196    pub last_sweep_unix_ns: u64,
197}
198
199/// Registry of `{collection → RetentionPolicy}` with a cooperative
200/// daemon that sweeps expired chunks on a configurable interval.
201#[derive(Clone)]
202pub struct RetentionRegistry {
203    inner: Arc<Inner>,
204}
205
206struct Inner {
207    policies: Mutex<HashMap<String, RetentionPolicy>>,
208    stats: Mutex<RetentionStats>,
209    running: AtomicBool,
210    interval_ms: AtomicU64,
211}
212
213impl Default for RetentionRegistry {
214    fn default() -> Self {
215        Self::new()
216    }
217}
218
219impl std::fmt::Debug for RetentionRegistry {
220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221        f.debug_struct("RetentionRegistry")
222            .field(
223                "policies",
224                &self.inner.policies.lock().map(|m| m.len()).unwrap_or(0),
225            )
226            .field("running", &self.inner.running.load(Ordering::SeqCst))
227            .finish()
228    }
229}
230
231impl RetentionRegistry {
232    pub fn new() -> Self {
233        Self {
234            inner: Arc::new(Inner {
235                policies: Mutex::new(HashMap::new()),
236                stats: Mutex::new(RetentionStats::default()),
237                running: AtomicBool::new(false),
238                interval_ms: AtomicU64::new(60_000),
239            }),
240        }
241    }
242
243    /// Install / replace the policy for `collection`.
244    pub fn set_policy(&self, collection: impl Into<String>, policy: RetentionPolicy) {
245        let mut guard = match self.inner.policies.lock() {
246            Ok(g) => g,
247            Err(p) => p.into_inner(),
248        };
249        guard.insert(collection.into(), policy);
250    }
251
252    /// Drop the policy if any. Returns the removed policy.
253    pub fn remove_policy(&self, collection: &str) -> Option<RetentionPolicy> {
254        let mut guard = match self.inner.policies.lock() {
255            Ok(g) => g,
256            Err(p) => p.into_inner(),
257        };
258        guard.remove(collection)
259    }
260
261    pub fn list_policies(&self) -> Vec<(String, RetentionPolicy)> {
262        let guard = match self.inner.policies.lock() {
263            Ok(g) => g,
264            Err(p) => p.into_inner(),
265        };
266        let mut out: Vec<(String, RetentionPolicy)> =
267            guard.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
268        out.sort_by(|a, b| a.0.cmp(&b.0));
269        out
270    }
271
272    pub fn get_policy(&self, collection: &str) -> Option<RetentionPolicy> {
273        let guard = match self.inner.policies.lock() {
274            Ok(g) => g,
275            Err(p) => p.into_inner(),
276        };
277        guard.get(collection).cloned()
278    }
279
280    pub fn stats(&self) -> RetentionStats {
281        let guard = match self.inner.stats.lock() {
282            Ok(g) => g,
283            Err(p) => p.into_inner(),
284        };
285        guard.clone()
286    }
287
288    pub fn set_interval_ms(&self, ms: u64) {
289        self.inner.interval_ms.store(ms.max(100), Ordering::SeqCst);
290    }
291
292    /// Run one sweep cycle against `backend`. Returns the number of
293    /// chunks dropped in this cycle. Exposed for tests — the daemon
294    /// calls this in a loop.
295    pub fn sweep_once(&self, backend: &dyn RetentionBackend) -> u64 {
296        let now = now_ns();
297        let policies: Vec<(String, RetentionPolicy)> = self.list_policies();
298        let available: std::collections::HashSet<String> =
299            backend.time_series_collections().into_iter().collect();
300
301        let mut evaluated = 0u64;
302        let mut dropped_total = 0u64;
303        for (collection, policy) in &policies {
304            if !available.contains(collection) {
305                continue; // collection dropped since policy was set
306            }
307            evaluated += 1;
308            let cutoff = policy.cutoff_ns(now);
309            if cutoff == 0 {
310                continue; // unbounded retention, skip
311            }
312            let dropped = backend.drop_chunks_older_than(collection, cutoff);
313            dropped_total += dropped;
314        }
315
316        let mut stats = match self.inner.stats.lock() {
317            Ok(g) => g,
318            Err(p) => p.into_inner(),
319        };
320        stats.cycles += 1;
321        stats.policies_evaluated += evaluated;
322        stats.chunks_dropped += dropped_total;
323        stats.last_sweep_unix_ns = now;
324        dropped_total
325    }
326
327    /// Start a background thread that calls `sweep_once` on the
328    /// configured interval. Idempotent — a second call while running
329    /// is a no-op. The returned handle keeps the daemon alive; drop
330    /// it (or call `stop`) to wind down.
331    pub fn start(&self, backend: Arc<dyn RetentionBackend>) -> RetentionDaemonHandle {
332        if self
333            .inner
334            .running
335            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
336            .is_err()
337        {
338            return RetentionDaemonHandle {
339                inner: Arc::clone(&self.inner),
340                join: None,
341            };
342        }
343        let inner = Arc::clone(&self.inner);
344        let registry = self.clone();
345        let handle = thread::spawn(move || {
346            while inner.running.load(Ordering::SeqCst) {
347                let _ = registry.sweep_once(backend.as_ref());
348                let interval_ms = inner.interval_ms.load(Ordering::SeqCst);
349                let deadline = Instant::now() + Duration::from_millis(interval_ms);
350                while Instant::now() < deadline && inner.running.load(Ordering::SeqCst) {
351                    thread::sleep(Duration::from_millis(50.min(interval_ms)));
352                }
353            }
354        });
355        RetentionDaemonHandle {
356            inner: Arc::clone(&self.inner),
357            join: Some(handle),
358        }
359    }
360
361    pub fn is_running(&self) -> bool {
362        self.inner.running.load(Ordering::SeqCst)
363    }
364
365    pub fn stop(&self) {
366        self.inner.running.store(false, Ordering::SeqCst);
367    }
368}
369
370/// RAII-ish handle returned by `RetentionRegistry::start`. Dropping
371/// it stops the daemon and waits for the thread to exit. Call
372/// `detach` to let the daemon outlive the handle (tests prefer the
373/// default, which is deterministic shutdown).
374pub struct RetentionDaemonHandle {
375    inner: Arc<Inner>,
376    join: Option<thread::JoinHandle<()>>,
377}
378
379impl RetentionDaemonHandle {
380    pub fn stop(mut self) {
381        self.inner.running.store(false, Ordering::SeqCst);
382        if let Some(handle) = self.join.take() {
383            let _ = handle.join();
384        }
385    }
386
387    pub fn detach(mut self) {
388        self.join.take();
389    }
390}
391
392impl Drop for RetentionDaemonHandle {
393    fn drop(&mut self) {
394        self.inner.running.store(false, Ordering::SeqCst);
395        if let Some(handle) = self.join.take() {
396            let _ = handle.join();
397        }
398    }
399}
400
401#[cfg(test)]
402mod tests {
403    use super::*;
404
405    #[test]
406    fn test_retention_policy() {
407        let policy = RetentionPolicy::from_days(30);
408        let now = 5_000_000_000_000_000u64; // ~58 days in ns
409        let old = now - 31 * 86_400_000_000_000; // 31 days ago
410        let recent = now - 1_000_000_000; // 1 second ago
411
412        assert!(policy.is_expired(old, now));
413        assert!(!policy.is_expired(recent, now));
414    }
415
416    #[test]
417    fn test_parse_duration() {
418        assert_eq!(parse_duration_ns("5m"), Some(300_000_000_000));
419        assert_eq!(parse_duration_ns("1h"), Some(3_600_000_000_000));
420        assert_eq!(parse_duration_ns("30s"), Some(30_000_000_000));
421        assert_eq!(parse_duration_ns("1d"), Some(86_400_000_000_000));
422        assert_eq!(parse_duration_ns("100ms"), Some(100_000_000));
423        assert_eq!(parse_duration_ns("raw"), Some(0));
424        assert_eq!(parse_duration_ns("invalid"), None);
425    }
426
427    #[test]
428    fn test_downsample_policy_parse() {
429        let policy = DownsamplePolicy::parse("1h:5m:avg").unwrap();
430        assert_eq!(policy.target, "1h");
431        assert_eq!(policy.source, "5m");
432        assert_eq!(policy.aggregation, "avg");
433        assert_eq!(policy.bucket_ns, 3_600_000_000_000);
434    }
435
436    // =====================================================================
437    // Retention registry + daemon — Timescale-parity surface
438    // =====================================================================
439
440    use std::sync::atomic::{AtomicU64, Ordering};
441
442    /// Test backend: records every `drop_chunks_older_than` call and
443    /// lets the test drive both the collection list and the drop
444    /// count it returns.
445    struct MockBackend {
446        collections: Mutex<Vec<String>>,
447        drops: Mutex<Vec<(String, u64)>>,
448        drop_return: AtomicU64,
449    }
450
451    impl MockBackend {
452        fn new(collections: Vec<&str>) -> Arc<Self> {
453            Arc::new(Self {
454                collections: Mutex::new(collections.into_iter().map(String::from).collect()),
455                drops: Mutex::new(Vec::new()),
456                drop_return: AtomicU64::new(0),
457            })
458        }
459
460        fn set_drop_return(&self, n: u64) {
461            self.drop_return.store(n, Ordering::SeqCst);
462        }
463
464        fn drops(&self) -> Vec<(String, u64)> {
465            self.drops.lock().unwrap().clone()
466        }
467    }
468
469    impl RetentionBackend for MockBackend {
470        fn time_series_collections(&self) -> Vec<String> {
471            self.collections.lock().unwrap().clone()
472        }
473
474        fn drop_chunks_older_than(&self, collection: &str, cutoff_ns: u64) -> u64 {
475            self.drops
476                .lock()
477                .unwrap()
478                .push((collection.to_string(), cutoff_ns));
479            self.drop_return.load(Ordering::SeqCst)
480        }
481    }
482
483    #[test]
484    fn registry_set_and_get_policy_round_trips() {
485        let reg = RetentionRegistry::new();
486        reg.set_policy("metrics", RetentionPolicy::from_days(30));
487        let fetched = reg.get_policy("metrics").unwrap();
488        assert_eq!(fetched.max_age_ns, 30 * 86_400_000_000_000);
489    }
490
491    #[test]
492    fn registry_list_is_sorted_by_collection() {
493        let reg = RetentionRegistry::new();
494        reg.set_policy("z", RetentionPolicy::from_days(1));
495        reg.set_policy("a", RetentionPolicy::from_days(1));
496        reg.set_policy("m", RetentionPolicy::from_days(1));
497        let names: Vec<_> = reg.list_policies().into_iter().map(|(k, _)| k).collect();
498        assert_eq!(names, vec!["a", "m", "z"]);
499    }
500
501    #[test]
502    fn registry_remove_policy_returns_previous_value() {
503        let reg = RetentionRegistry::new();
504        reg.set_policy("metrics", RetentionPolicy::from_days(7));
505        let removed = reg.remove_policy("metrics").unwrap();
506        assert_eq!(removed.max_age_ns, 7 * 86_400_000_000_000);
507        assert!(reg.get_policy("metrics").is_none());
508    }
509
510    #[test]
511    fn sweep_skips_collections_without_backend_presence() {
512        let reg = RetentionRegistry::new();
513        reg.set_policy("gone", RetentionPolicy::from_days(1));
514        let backend = MockBackend::new(vec![]);
515        reg.sweep_once(backend.as_ref());
516        assert!(backend.drops().is_empty());
517    }
518
519    #[test]
520    fn sweep_calls_backend_with_policy_cutoff() {
521        let reg = RetentionRegistry::new();
522        reg.set_policy("metrics", RetentionPolicy::from_days(1));
523        let backend = MockBackend::new(vec!["metrics"]);
524        backend.set_drop_return(3);
525        let dropped = reg.sweep_once(backend.as_ref());
526        assert_eq!(dropped, 3);
527        let drops = backend.drops();
528        assert_eq!(drops.len(), 1);
529        assert_eq!(drops[0].0, "metrics");
530        assert!(drops[0].1 > 0);
531        let stats = reg.stats();
532        assert_eq!(stats.cycles, 1);
533        assert_eq!(stats.policies_evaluated, 1);
534        assert_eq!(stats.chunks_dropped, 3);
535    }
536
537    #[test]
538    fn sweep_evaluates_every_matching_collection() {
539        let reg = RetentionRegistry::new();
540        reg.set_policy("a", RetentionPolicy::from_days(1));
541        reg.set_policy("b", RetentionPolicy::from_days(1));
542        let backend = MockBackend::new(vec!["a", "b", "c"]);
543        backend.set_drop_return(1);
544        let dropped = reg.sweep_once(backend.as_ref());
545        assert_eq!(dropped, 2);
546        assert_eq!(backend.drops().len(), 2);
547    }
548
549    #[test]
550    fn daemon_sweeps_repeatedly_and_stops_on_drop() {
551        let reg = RetentionRegistry::new();
552        reg.set_policy("metrics", RetentionPolicy::from_days(1));
553        reg.set_interval_ms(100);
554        let backend = MockBackend::new(vec!["metrics"]);
555        backend.set_drop_return(0);
556        let handle = reg.start(backend.clone());
557        // Give it ~350ms to run at least 2 cycles (first fires immediately).
558        std::thread::sleep(std::time::Duration::from_millis(350));
559        assert!(reg.is_running());
560        drop(handle); // stops the daemon
561        assert!(!reg.is_running());
562        let drops = backend.drops();
563        assert!(
564            drops.len() >= 2,
565            "expected >= 2 sweep cycles, got {}",
566            drops.len()
567        );
568    }
569
570    #[test]
571    fn start_is_idempotent() {
572        let reg = RetentionRegistry::new();
573        reg.set_interval_ms(500);
574        let backend = MockBackend::new(vec![]);
575        let h1 = reg.start(backend.clone());
576        let h2 = reg.start(backend.clone());
577        // Second handle has no join — stopping it is cheap.
578        h2.stop();
579        // First handle still owns the thread; dropping shuts down.
580        drop(h1);
581        assert!(!reg.is_running());
582    }
583}