snops_checkpoint/
retention.rs

1use std::{collections::BinaryHeap, fmt::Write, num::NonZeroU8, str::FromStr};
2
3use chrono::{DateTime, TimeDelta, Utc};
4
5/// A comma separated list of retention rules ordered by duration,
6/// with the first rule being the shortest
7///
8/// eg. 4h:1h,1W:U,4W:1D,6M:1W,1Y:1M,U:6M
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct RetentionPolicy {
11    pub rules: Vec<RetentionRule>,
12}
13
14/// An individual rule in a retention policy
15/// - 4h:1h - for 4 hours, keep a checkpoint every hour
16/// - 1W:U - for 1 week, keep every checkpoint
17/// - 4W:1D - for 4 weeks, keep a checkpoint every day
18/// - 6M:1W - for 6 months, keep a checkpoint every week
19/// - 1Y:1M - for 1 year, keep a checkpoint every month
20/// - U:6M - for all time, keep a checkpoint every 6 months
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub struct RetentionRule {
23    /// For checkpoints created in this duration...
24    pub duration: RetentionSpan,
25    /// keep this many
26    pub keep: RetentionSpan,
27}
28
29impl RetentionPolicy {
30    pub fn new(rules: Vec<RetentionRule>) -> Self {
31        Self { rules }
32    }
33
34    /// Returns true if the policy is ready to be applied based on a given time
35    pub fn is_ready_with_time(&self, new_time: &DateTime<Utc>, last_time: &DateTime<Utc>) -> bool {
36        // if there are no rules, the policy does not apply
37        let Some(rule) = self.rules.first() else {
38            return false;
39        };
40
41        // if the first rule is unlimited, the policy is always ready
42        let Some(keep) = rule.keep.as_delta() else {
43            return true;
44        };
45
46        // amount of time since the last checkpoint
47        let delta = new_time.signed_duration_since(last_time);
48
49        // if the time since the last checkpoint is greater than the minimum delta
50        // the policy indicates a new checkpoint should be created
51        delta >= keep
52    }
53
54    /// Receives a list of checkpoint times, and returns a list of times to
55    /// reject
56    pub fn reject(&self, times: Vec<&DateTime<Utc>>) -> Vec<DateTime<Utc>> {
57        self.reject_with_time(Utc::now(), times)
58    }
59    /// Receives a list of checkpoint times, and returns a list of times to
60    /// reject given a time
61    pub fn reject_with_time(
62        &self,
63        now: DateTime<Utc>,
64        times: Vec<&DateTime<Utc>>,
65    ) -> Vec<DateTime<Utc>> {
66        // if the policy is empty, we should technically reject ALL checkpoints but
67        // for safety we will not reject any
68        if self.rules.is_empty() || times.is_empty() {
69            return Vec::new();
70        }
71
72        let mut rejected = Vec::new();
73
74        // ALGORITHM
75        // 1. walk backwards through rules and times
76        // 2. keep track of the last kept time for each rule
77        // 3. if the last kept time is outside the duration of the current rule reject
78        //    the last kept time and promote the next time to the last kept time
79        // 4. if the current rule does not encompass both times move to the next rule
80        // 5. if difference between last kept time and current time is smaller than the
81        //    keep time, reject it
82        // 6. if the time was not rejected, it becomes the new last kept time
83
84        // TODO: this is bugged atm - it's keeping the last checkpoint
85
86        // step 1 - walk backwards through rules and times
87        let mut rules = self.rules.iter().rev().peekable();
88
89        let mut times = times
90            .into_iter()
91            .collect::<BinaryHeap<_>>()
92            .into_sorted_vec()
93            .into_iter()
94            .rev()
95            .peekable();
96
97        // step 2 - keep track of the last kept time
98        let mut last_kept = times.next().unwrap(); // is_empty checked at the beginning of the fn
99        let mut curr_rule = rules.next().unwrap(); // is_empty checked at the beginning of the fn
100
101        'outer: while let Some(time) = times.peek().cloned() {
102            let delta = now.signed_duration_since(time);
103            let last_delta = now.signed_duration_since(last_kept);
104
105            // step 3 - if the last time is outside the duration of the current rule, reject
106            // it
107            match curr_rule.duration.as_delta() {
108                Some(duration) if last_delta > duration => {
109                    /* println!(
110                        "STEP 3 {curr_rule}: {last_kept} is older than ({}) > {}",
111                        last_delta.num_seconds() / 60,
112                        duration.num_seconds() / 60
113                    ); */
114                    rejected.push(*last_kept);
115                    // promote the next time to the last kept time
116                    last_kept = time;
117                    times.next();
118                    continue;
119                }
120                _ => {}
121            }
122
123            // check if we should move to the next rule
124            while let Some(RetentionRule { duration, .. }) = rules.peek() {
125                // if both rules have the same duration, continue to the next rule
126                // this is another case of configuration mishaps
127                //
128                // additionallly, if the second to last rule is unlimited, continue to the next
129                // rule you should not be writing policies with multiple
130                // unlimited rules
131                if &curr_rule.duration == duration || duration == &RetentionSpan::Unlimited {
132                    curr_rule = rules.next().unwrap();
133                    continue;
134                }
135
136                if let Some(next_duration) = duration.as_delta() {
137                    // step 4 - if the current rule does not encompass both times, move to the next
138                    // rule
139
140                    // continue because both times are within the current rule
141                    if delta >= next_duration && last_delta >= next_duration {
142                        break;
143                    }
144
145                    // update the last step time if the current time is within the next duration
146                    if delta < next_duration {
147                        last_kept = time;
148                        times.next();
149                    }
150
151                    curr_rule = rules.next().unwrap();
152                    continue 'outer;
153                }
154            }
155
156            // keep the current time if the current rule is unlimited
157            let Some(keep) = curr_rule.keep.as_delta() else {
158                last_kept = time;
159                times.next();
160                continue;
161            };
162
163            // step 5 - if the difference between the last kept time and the
164            // current time is smaller than the keep time, reject it
165            if last_kept.signed_duration_since(time) < keep {
166                /*  println!(
167                    "STEP 5 {curr_rule}: {last_kept} - {time} ({}) < {}",
168                    last_kept.signed_duration_since(time).num_seconds() / 60,
169                    keep.num_seconds() / 60
170                ); */
171                rejected.push(*time);
172                times.next();
173                continue;
174            }
175
176            // step 6 - if the time was not rejected, it becomes the new last kept time
177            /* println!(
178                "OK {curr_rule}: {last_kept} - {time} ({}) >= {}",
179                last_kept.signed_duration_since(time).num_seconds() / 60,
180                keep.num_seconds() / 60
181            ); */
182            last_kept = time;
183            times.next();
184        }
185
186        rejected
187    }
188}
189
190impl Default for RetentionPolicy {
191    /// The default policy is intended to align with the test cases provided by
192    /// Aleo.
193    fn default() -> Self {
194        Self {
195            rules: [
196                "4h:1h", // for 4 hours, keep a checkpoint every hour
197                "1D:8h", // for 1 day, keep a checkpoint every 8 hours
198                "1W:1D", // for 1 week, keep a checkpoint every day
199                "4W:1W", // for 4 weeks, keep a checkpoint every week
200                "4M:1M", // for 4 months, keep a checkpoint every month
201                "U:1Y",  // for all time, keep a checkpoint every year
202            ]
203            .into_iter()
204            .map(RetentionRule::from_str)
205            .collect::<Result<_, _>>()
206            .unwrap(),
207        }
208    }
209}
210
211#[derive(Debug, Clone, Copy, PartialEq, Eq)]
212pub enum RetentionSpan {
213    /// U
214    Unlimited,
215    /// 1m
216    Minute(NonZeroU8),
217    /// 1h
218    Hour(NonZeroU8),
219    /// 1D
220    Day(NonZeroU8),
221    /// 1W
222    Week(NonZeroU8),
223    /// 1M
224    Month(NonZeroU8),
225    /// 1Y
226    Year(NonZeroU8),
227}
228
229impl RetentionSpan {
230    pub fn as_delta(&self) -> Option<TimeDelta> {
231        match self {
232            RetentionSpan::Unlimited => None,
233            RetentionSpan::Minute(value) => TimeDelta::try_minutes(value.get() as i64),
234            RetentionSpan::Hour(value) => TimeDelta::try_hours(value.get() as i64),
235            RetentionSpan::Day(value) => TimeDelta::try_days(value.get() as i64),
236            RetentionSpan::Week(value) => TimeDelta::try_weeks(value.get() as i64),
237            RetentionSpan::Month(value) => TimeDelta::try_days(value.get() as i64 * 30),
238            RetentionSpan::Year(value) => TimeDelta::try_days(value.get() as i64 * 365),
239        }
240    }
241
242    // get the timestamp for the start of the retention span
243    pub fn as_timestamp(&self) -> Option<i64> {
244        Utc::now().timestamp().checked_sub(match self {
245            RetentionSpan::Unlimited => return None,
246            RetentionSpan::Minute(value) => value.get() as i64 * 60,
247            RetentionSpan::Hour(value) => value.get() as i64 * 3600,
248            RetentionSpan::Day(value) => value.get() as i64 * 3600 * 24,
249            RetentionSpan::Week(value) => value.get() as i64 * 3600 * 24 * 7,
250            RetentionSpan::Month(value) => value.get() as i64 * 3600 * 24 * 30,
251            RetentionSpan::Year(value) => value.get() as i64 * 3600 * 24 * 365,
252        })
253    }
254}
255
256impl FromStr for RetentionPolicy {
257    type Err = String;
258
259    fn from_str(s: &str) -> Result<Self, Self::Err> {
260        let rules = s
261            .split(',')
262            .enumerate()
263            .filter(|(_, s)| !s.is_empty())
264            .map(|(i, rule)| {
265                rule.parse()
266                    .map_err(|e| format!("parse error in rule {} ({rule}): {e}", i + 1))
267            })
268            .collect::<Result<Vec<_>, _>>()?;
269        Ok(RetentionPolicy::new(rules))
270    }
271}
272
273impl std::fmt::Display for RetentionPolicy {
274    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275        for (i, rule) in self.rules.iter().enumerate() {
276            if i > 0 {
277                f.write_char(',')?;
278            }
279            rule.fmt(f)?;
280        }
281        Ok(())
282    }
283}
284
285impl FromStr for RetentionRule {
286    type Err = String;
287
288    fn from_str(s: &str) -> Result<Self, Self::Err> {
289        let (duration, keep) = s.split_at(s.find(':').ok_or("missing ':'".to_owned())?);
290        Ok(RetentionRule {
291            duration: duration.parse().map_err(|e| format!("duration: {e}"))?,
292            keep: keep[1..].parse().map_err(|e| format!("keep: {e}"))?,
293        })
294    }
295}
296
297impl std::fmt::Display for RetentionRule {
298    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299        write!(f, "{}:{}", self.duration, self.keep)
300    }
301}
302
303impl FromStr for RetentionSpan {
304    type Err = String;
305
306    fn from_str(s: &str) -> Result<Self, Self::Err> {
307        let unit = s.chars().last().ok_or("missing unit")?;
308        if unit == 'U' {
309            if s.len() != 1 {
310                return Err("invalid value for unlimited".to_owned());
311            }
312            return Ok(RetentionSpan::Unlimited);
313        }
314        let value = s[..s.len() - 1]
315            .parse()
316            .map_err(|e| format!("invalid value '{}': {e}", &s[..s.len() - 1]))?;
317
318        match unit {
319            'm' => Ok(RetentionSpan::Minute(value)),
320            'h' => Ok(RetentionSpan::Hour(value)),
321            'D' => Ok(RetentionSpan::Day(value)),
322            'W' => Ok(RetentionSpan::Week(value)),
323            'M' => Ok(RetentionSpan::Month(value)),
324            'Y' => Ok(RetentionSpan::Year(value)),
325            _ => Err("invalid unit".to_owned()),
326        }
327    }
328}
329
330impl std::fmt::Display for RetentionSpan {
331    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332        match self {
333            RetentionSpan::Unlimited => write!(f, "U"),
334            RetentionSpan::Minute(value) => write!(f, "{}m", value),
335            RetentionSpan::Hour(value) => write!(f, "{}h", value),
336            RetentionSpan::Day(value) => write!(f, "{}D", value),
337            RetentionSpan::Week(value) => write!(f, "{}W", value),
338            RetentionSpan::Month(value) => write!(f, "{}M", value),
339            RetentionSpan::Year(value) => write!(f, "{}Y", value),
340        }
341    }
342}
343
344#[cfg(feature = "serde")]
345macro_rules! impl_serde {
346    ($($ty:ty),*) => {
347        $(
348            impl serde::Serialize for $ty {
349                fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
350                    self.to_string().serialize(serializer)
351                }
352            }
353
354            impl<'de> serde::Deserialize<'de> for $ty {
355                fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
356                    String::deserialize(deserializer)?
357                        .parse()
358                        .map_err(serde::de::Error::custom)
359                }
360            }
361        )*
362    };
363}
364
365#[cfg(feature = "serde")]
366impl_serde!(RetentionSpan, RetentionRule);
367
368#[cfg(feature = "serde")]
369impl serde::Serialize for RetentionPolicy {
370    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
371        self.to_string().serialize(serializer)
372    }
373}
374
375#[cfg(feature = "serde")]
376impl<'de> serde::Deserialize<'de> for RetentionPolicy {
377    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
378        let string = String::deserialize(deserializer)?;
379        if string == "default" {
380            return Ok(RetentionPolicy::default());
381        }
382        string.parse().map_err(serde::de::Error::custom)
383    }
384}