1use std::{collections::BinaryHeap, fmt::Write, num::NonZeroU8, str::FromStr};
2
3use chrono::{DateTime, TimeDelta, Utc};
4
5#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct RetentionPolicy {
11 pub rules: Vec<RetentionRule>,
12}
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub struct RetentionRule {
23 pub duration: RetentionSpan,
25 pub keep: RetentionSpan,
27}
28
29impl RetentionPolicy {
30 pub fn new(rules: Vec<RetentionRule>) -> Self {
31 Self { rules }
32 }
33
34 pub fn is_ready_with_time(&self, new_time: &DateTime<Utc>, last_time: &DateTime<Utc>) -> bool {
36 let Some(rule) = self.rules.first() else {
38 return false;
39 };
40
41 let Some(keep) = rule.keep.as_delta() else {
43 return true;
44 };
45
46 let delta = new_time.signed_duration_since(last_time);
48
49 delta >= keep
52 }
53
54 pub fn reject(&self, times: Vec<&DateTime<Utc>>) -> Vec<DateTime<Utc>> {
57 self.reject_with_time(Utc::now(), times)
58 }
59 pub fn reject_with_time(
62 &self,
63 now: DateTime<Utc>,
64 times: Vec<&DateTime<Utc>>,
65 ) -> Vec<DateTime<Utc>> {
66 if self.rules.is_empty() || times.is_empty() {
69 return Vec::new();
70 }
71
72 let mut rejected = Vec::new();
73
74 let mut rules = self.rules.iter().rev().peekable();
88
89 let mut times = times
90 .into_iter()
91 .collect::<BinaryHeap<_>>()
92 .into_sorted_vec()
93 .into_iter()
94 .rev()
95 .peekable();
96
97 let mut last_kept = times.next().unwrap(); let mut curr_rule = rules.next().unwrap(); 'outer: while let Some(time) = times.peek().cloned() {
102 let delta = now.signed_duration_since(time);
103 let last_delta = now.signed_duration_since(last_kept);
104
105 match curr_rule.duration.as_delta() {
108 Some(duration) if last_delta > duration => {
109 rejected.push(*last_kept);
115 last_kept = time;
117 times.next();
118 continue;
119 }
120 _ => {}
121 }
122
123 while let Some(RetentionRule { duration, .. }) = rules.peek() {
125 if &curr_rule.duration == duration || duration == &RetentionSpan::Unlimited {
132 curr_rule = rules.next().unwrap();
133 continue;
134 }
135
136 if let Some(next_duration) = duration.as_delta() {
137 if delta >= next_duration && last_delta >= next_duration {
142 break;
143 }
144
145 if delta < next_duration {
147 last_kept = time;
148 times.next();
149 }
150
151 curr_rule = rules.next().unwrap();
152 continue 'outer;
153 }
154 }
155
156 let Some(keep) = curr_rule.keep.as_delta() else {
158 last_kept = time;
159 times.next();
160 continue;
161 };
162
163 if last_kept.signed_duration_since(time) < keep {
166 rejected.push(*time);
172 times.next();
173 continue;
174 }
175
176 last_kept = time;
183 times.next();
184 }
185
186 rejected
187 }
188}
189
190impl Default for RetentionPolicy {
191 fn default() -> Self {
194 Self {
195 rules: [
196 "4h:1h", "1D:8h", "1W:1D", "4W:1W", "4M:1M", "U:1Y", ]
203 .into_iter()
204 .map(RetentionRule::from_str)
205 .collect::<Result<_, _>>()
206 .unwrap(),
207 }
208 }
209}
210
211#[derive(Debug, Clone, Copy, PartialEq, Eq)]
212pub enum RetentionSpan {
213 Unlimited,
215 Minute(NonZeroU8),
217 Hour(NonZeroU8),
219 Day(NonZeroU8),
221 Week(NonZeroU8),
223 Month(NonZeroU8),
225 Year(NonZeroU8),
227}
228
229impl RetentionSpan {
230 pub fn as_delta(&self) -> Option<TimeDelta> {
231 match self {
232 RetentionSpan::Unlimited => None,
233 RetentionSpan::Minute(value) => TimeDelta::try_minutes(value.get() as i64),
234 RetentionSpan::Hour(value) => TimeDelta::try_hours(value.get() as i64),
235 RetentionSpan::Day(value) => TimeDelta::try_days(value.get() as i64),
236 RetentionSpan::Week(value) => TimeDelta::try_weeks(value.get() as i64),
237 RetentionSpan::Month(value) => TimeDelta::try_days(value.get() as i64 * 30),
238 RetentionSpan::Year(value) => TimeDelta::try_days(value.get() as i64 * 365),
239 }
240 }
241
242 pub fn as_timestamp(&self) -> Option<i64> {
244 Utc::now().timestamp().checked_sub(match self {
245 RetentionSpan::Unlimited => return None,
246 RetentionSpan::Minute(value) => value.get() as i64 * 60,
247 RetentionSpan::Hour(value) => value.get() as i64 * 3600,
248 RetentionSpan::Day(value) => value.get() as i64 * 3600 * 24,
249 RetentionSpan::Week(value) => value.get() as i64 * 3600 * 24 * 7,
250 RetentionSpan::Month(value) => value.get() as i64 * 3600 * 24 * 30,
251 RetentionSpan::Year(value) => value.get() as i64 * 3600 * 24 * 365,
252 })
253 }
254}
255
256impl FromStr for RetentionPolicy {
257 type Err = String;
258
259 fn from_str(s: &str) -> Result<Self, Self::Err> {
260 let rules = s
261 .split(',')
262 .enumerate()
263 .filter(|(_, s)| !s.is_empty())
264 .map(|(i, rule)| {
265 rule.parse()
266 .map_err(|e| format!("parse error in rule {} ({rule}): {e}", i + 1))
267 })
268 .collect::<Result<Vec<_>, _>>()?;
269 Ok(RetentionPolicy::new(rules))
270 }
271}
272
273impl std::fmt::Display for RetentionPolicy {
274 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275 for (i, rule) in self.rules.iter().enumerate() {
276 if i > 0 {
277 f.write_char(',')?;
278 }
279 rule.fmt(f)?;
280 }
281 Ok(())
282 }
283}
284
285impl FromStr for RetentionRule {
286 type Err = String;
287
288 fn from_str(s: &str) -> Result<Self, Self::Err> {
289 let (duration, keep) = s.split_at(s.find(':').ok_or("missing ':'".to_owned())?);
290 Ok(RetentionRule {
291 duration: duration.parse().map_err(|e| format!("duration: {e}"))?,
292 keep: keep[1..].parse().map_err(|e| format!("keep: {e}"))?,
293 })
294 }
295}
296
297impl std::fmt::Display for RetentionRule {
298 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299 write!(f, "{}:{}", self.duration, self.keep)
300 }
301}
302
303impl FromStr for RetentionSpan {
304 type Err = String;
305
306 fn from_str(s: &str) -> Result<Self, Self::Err> {
307 let unit = s.chars().last().ok_or("missing unit")?;
308 if unit == 'U' {
309 if s.len() != 1 {
310 return Err("invalid value for unlimited".to_owned());
311 }
312 return Ok(RetentionSpan::Unlimited);
313 }
314 let value = s[..s.len() - 1]
315 .parse()
316 .map_err(|e| format!("invalid value '{}': {e}", &s[..s.len() - 1]))?;
317
318 match unit {
319 'm' => Ok(RetentionSpan::Minute(value)),
320 'h' => Ok(RetentionSpan::Hour(value)),
321 'D' => Ok(RetentionSpan::Day(value)),
322 'W' => Ok(RetentionSpan::Week(value)),
323 'M' => Ok(RetentionSpan::Month(value)),
324 'Y' => Ok(RetentionSpan::Year(value)),
325 _ => Err("invalid unit".to_owned()),
326 }
327 }
328}
329
330impl std::fmt::Display for RetentionSpan {
331 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332 match self {
333 RetentionSpan::Unlimited => write!(f, "U"),
334 RetentionSpan::Minute(value) => write!(f, "{}m", value),
335 RetentionSpan::Hour(value) => write!(f, "{}h", value),
336 RetentionSpan::Day(value) => write!(f, "{}D", value),
337 RetentionSpan::Week(value) => write!(f, "{}W", value),
338 RetentionSpan::Month(value) => write!(f, "{}M", value),
339 RetentionSpan::Year(value) => write!(f, "{}Y", value),
340 }
341 }
342}
343
344#[cfg(feature = "serde")]
345macro_rules! impl_serde {
346 ($($ty:ty),*) => {
347 $(
348 impl serde::Serialize for $ty {
349 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
350 self.to_string().serialize(serializer)
351 }
352 }
353
354 impl<'de> serde::Deserialize<'de> for $ty {
355 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
356 String::deserialize(deserializer)?
357 .parse()
358 .map_err(serde::de::Error::custom)
359 }
360 }
361 )*
362 };
363}
364
365#[cfg(feature = "serde")]
366impl_serde!(RetentionSpan, RetentionRule);
367
368#[cfg(feature = "serde")]
369impl serde::Serialize for RetentionPolicy {
370 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
371 self.to_string().serialize(serializer)
372 }
373}
374
375#[cfg(feature = "serde")]
376impl<'de> serde::Deserialize<'de> for RetentionPolicy {
377 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
378 let string = String::deserialize(deserializer)?;
379 if string == "default" {
380 return Ok(RetentionPolicy::default());
381 }
382 string.parse().map_err(serde::de::Error::custom)
383 }
384}