1use std::time::Duration;
2
3use chrono::{NaiveDate, Utc};
4
5use crate::state::SharedS3State;
6use crate::xml_util::extract_tag;
7
8pub struct LifecycleProcessor {
15 state: SharedS3State,
16}
17
18impl LifecycleProcessor {
19 pub fn new(state: SharedS3State) -> Self {
20 Self { state }
21 }
22
23 pub async fn run(self) {
24 let mut interval = tokio::time::interval(Duration::from_secs(60));
25
26 loop {
27 interval.tick().await;
28 self.tick();
29 }
30 }
31
32 pub fn tick(&self) {
33 let now = Utc::now();
34 let today = now.date_naive();
35
36 let bucket_configs: Vec<(String, String, String)> = {
38 let __mas = self.state.read();
39 __mas
40 .iter()
41 .flat_map(|(acct_id, state)| {
42 state.buckets.values().filter_map(move |b| {
43 b.lifecycle_config
44 .as_ref()
45 .map(|cfg| (b.name.clone(), cfg.clone(), acct_id.to_string()))
46 })
47 })
48 .collect()
49 };
50
51 for (bucket_name, config_xml, account_id) in bucket_configs {
52 let rules = match parse_lifecycle_rules(&config_xml) {
53 Some(r) => r,
54 None => continue,
55 };
56
57 for rule in &rules {
58 if rule.status != "Enabled" {
59 continue;
60 }
61
62 self.process_rule(&account_id, &bucket_name, rule, today);
63 }
64 }
65 }
66
67 fn process_rule(
68 &self,
69 account_id: &str,
70 bucket_name: &str,
71 rule: &LifecycleRule,
72 today: NaiveDate,
73 ) {
74 let mut __mas = self.state.write();
75 let state = match __mas.get_mut(account_id) {
76 Some(s) => s,
77 None => return,
78 };
79 let bucket = match state.buckets.get_mut(bucket_name) {
80 Some(b) => b,
81 None => return,
82 };
83
84 let mut keys_to_delete: Vec<String> = Vec::new();
86 let mut keys_to_transition: Vec<(String, String)> = Vec::new();
88
89 for (key, obj) in bucket.objects.iter() {
90 if let Some(ref prefix) = rule.prefix {
92 if !prefix.is_empty() && !key.starts_with(prefix) {
93 continue;
94 }
95 }
96
97 if let Some(ref tag_filter) = rule.tag_filter {
99 let matches = obj
100 .tags
101 .get(&tag_filter.key)
102 .map(|v| v == &tag_filter.value)
103 .unwrap_or(false);
104 if !matches {
105 continue;
106 }
107 }
108
109 if let Some(days) = rule.expiration_days {
111 let age = today
112 .signed_duration_since(obj.last_modified.date_naive())
113 .num_days();
114 if age >= days as i64 {
115 keys_to_delete.push(key.clone());
116 continue;
117 }
118 }
119
120 if let Some(ref date) = rule.expiration_date {
122 if &today >= date {
123 keys_to_delete.push(key.clone());
124 continue;
125 }
126 }
127
128 for transition in &rule.transitions {
130 let should_transition = if let Some(days) = transition.days {
131 let age = today
132 .signed_duration_since(obj.last_modified.date_naive())
133 .num_days();
134 age >= days as i64
135 } else if let Some(ref date) = transition.date {
136 &today >= date
137 } else {
138 false
139 };
140
141 if should_transition && obj.storage_class != transition.storage_class {
142 keys_to_transition.push((key.clone(), transition.storage_class.clone()));
143 break; }
145 }
146 }
147
148 if !keys_to_delete.is_empty() {
150 tracing::info!(
151 bucket = %bucket_name,
152 count = keys_to_delete.len(),
153 "S3 lifecycle: expiring objects"
154 );
155 for key in &keys_to_delete {
156 bucket.objects.remove(key);
157 }
158 }
159
160 if !keys_to_transition.is_empty() {
162 tracing::info!(
163 bucket = %bucket_name,
164 count = keys_to_transition.len(),
165 "S3 lifecycle: transitioning object storage classes"
166 );
167 for (key, new_class) in &keys_to_transition {
168 if let Some(obj) = bucket.objects.get_mut(key) {
169 obj.storage_class = new_class.clone();
170 }
171 }
172 }
173
174 if let Some(days) = rule.abort_incomplete_mpu_days {
178 let stale: Vec<String> = bucket
179 .multipart_uploads
180 .iter()
181 .filter(|(_, mpu)| {
182 rule.prefix
184 .as_ref()
185 .map(|p| p.is_empty() || mpu.key.starts_with(p))
186 .unwrap_or(true)
187 && today
188 .signed_duration_since(mpu.initiated.date_naive())
189 .num_days()
190 >= days as i64
191 })
192 .map(|(id, _)| id.clone())
193 .collect();
194 if !stale.is_empty() {
195 tracing::info!(
196 bucket = %bucket_name,
197 count = stale.len(),
198 "S3 lifecycle: aborting incomplete multipart uploads"
199 );
200 for id in &stale {
201 bucket.multipart_uploads.remove(id);
202 }
203 }
204 }
205 }
206}
207
208struct LifecycleRule {
210 status: String,
211 prefix: Option<String>,
212 tag_filter: Option<TagFilter>,
213 expiration_days: Option<u32>,
214 expiration_date: Option<NaiveDate>,
215 transitions: Vec<Transition>,
216 abort_incomplete_mpu_days: Option<u32>,
220}
221
222struct TagFilter {
223 key: String,
224 value: String,
225}
226
227struct Transition {
228 days: Option<u32>,
229 date: Option<NaiveDate>,
230 storage_class: String,
231}
232
233fn parse_lifecycle_rules(xml: &str) -> Option<Vec<LifecycleRule>> {
235 let mut rules = Vec::new();
236 let mut remaining = xml;
237
238 while let Some(rule_start) = remaining.find("<Rule>") {
239 let after = &remaining[rule_start + 6..];
240 let rule_end = after.find("</Rule>")?;
241 let rule_body = &after[..rule_end];
242
243 let status = extract_tag(rule_body, "Status").unwrap_or_default();
244
245 let prefix = if let Some(filter_body) = extract_block(rule_body, "Filter") {
247 let filter_prefix = extract_tag(filter_body, "Prefix");
249 if filter_prefix.is_some() {
251 filter_prefix
252 } else if let Some(and_body) = extract_block(filter_body, "And") {
253 extract_tag(and_body, "Prefix")
254 } else {
255 None
256 }
257 } else {
258 extract_tag(rule_body, "Prefix")
259 };
260
261 let tag_filter = if let Some(filter_body) = extract_block(rule_body, "Filter") {
263 parse_tag_filter(filter_body)
264 } else {
265 None
266 };
267
268 let (expiration_days, expiration_date) =
270 if let Some(exp_body) = extract_block(rule_body, "Expiration") {
271 let days = extract_tag(exp_body, "Days").and_then(|s| s.parse::<u32>().ok());
272 let date = extract_tag(exp_body, "Date").and_then(|s| parse_date(&s));
273 (days, date)
274 } else {
275 (None, None)
276 };
277
278 let mut transitions = Vec::new();
280 let mut trans_remaining = rule_body;
281 while let Some(t_start) = trans_remaining.find("<Transition>") {
282 let t_after = &trans_remaining[t_start + 12..];
283 if let Some(t_end) = t_after.find("</Transition>") {
284 let t_body = &t_after[..t_end];
285 let days = extract_tag(t_body, "Days").and_then(|s| s.parse::<u32>().ok());
286 let date = extract_tag(t_body, "Date").and_then(|s| parse_date(&s));
287 let storage_class =
288 extract_tag(t_body, "StorageClass").unwrap_or_else(|| "GLACIER".to_string());
289 transitions.push(Transition {
290 days,
291 date,
292 storage_class,
293 });
294 trans_remaining = &t_after[t_end + 13..];
295 } else {
296 break;
297 }
298 }
299
300 let abort_incomplete_mpu_days = extract_block(rule_body, "AbortIncompleteMultipartUpload")
301 .and_then(|b| extract_tag(b, "DaysAfterInitiation"))
302 .and_then(|s| s.parse::<u32>().ok());
303
304 rules.push(LifecycleRule {
305 status,
306 prefix,
307 tag_filter,
308 expiration_days,
309 expiration_date,
310 transitions,
311 abort_incomplete_mpu_days,
312 });
313
314 remaining = &after[rule_end + 7..];
315 }
316
317 Some(rules)
318}
319
320fn extract_block<'a>(body: &'a str, tag: &str) -> Option<&'a str> {
322 let open = format!("<{tag}>");
323 let close = format!("</{tag}>");
324 let start = body.find(&open)?;
325 let content_start = start + open.len();
326 let end = body[content_start..].find(&close)?;
327 Some(&body[content_start..content_start + end])
328}
329
330fn parse_tag_filter(filter_body: &str) -> Option<TagFilter> {
331 if let Some(tag_body) = extract_block(filter_body, "Tag") {
333 let key = extract_tag(tag_body, "Key")?;
334 let value = extract_tag(tag_body, "Value").unwrap_or_default();
335 return Some(TagFilter { key, value });
336 }
337 if let Some(and_body) = extract_block(filter_body, "And") {
339 if let Some(tag_body) = extract_block(and_body, "Tag") {
340 let key = extract_tag(tag_body, "Key")?;
341 let value = extract_tag(tag_body, "Value").unwrap_or_default();
342 return Some(TagFilter { key, value });
343 }
344 }
345 None
346}
347
348fn parse_date(s: &str) -> Option<NaiveDate> {
350 if let Ok(d) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
352 return Some(d);
353 }
354 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s) {
356 return Some(dt.date_naive());
357 }
358 if let Some(date_part) = s.split('T').next() {
360 if let Ok(d) = NaiveDate::parse_from_str(date_part, "%Y-%m-%d") {
361 return Some(d);
362 }
363 }
364 None
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370
371 #[test]
372 fn parse_expiration_days_rule() {
373 let xml = r#"<LifecycleConfiguration>
374 <Rule>
375 <Filter><Prefix>logs/</Prefix></Filter>
376 <Status>Enabled</Status>
377 <Expiration><Days>30</Days></Expiration>
378 </Rule>
379 </LifecycleConfiguration>"#;
380
381 let rules = parse_lifecycle_rules(xml).unwrap();
382 assert_eq!(rules.len(), 1);
383 assert_eq!(rules[0].status, "Enabled");
384 assert_eq!(rules[0].prefix.as_deref(), Some("logs/"));
385 assert_eq!(rules[0].expiration_days, Some(30));
386 }
387
388 #[test]
389 fn parse_abort_incomplete_mpu_rule() {
390 let xml = r#"<LifecycleConfiguration>
391 <Rule>
392 <Filter><Prefix>uploads/</Prefix></Filter>
393 <Status>Enabled</Status>
394 <AbortIncompleteMultipartUpload><DaysAfterInitiation>7</DaysAfterInitiation></AbortIncompleteMultipartUpload>
395 </Rule>
396 </LifecycleConfiguration>"#;
397
398 let rules = parse_lifecycle_rules(xml).unwrap();
399 assert_eq!(rules.len(), 1);
400 assert_eq!(rules[0].abort_incomplete_mpu_days, Some(7));
401 }
402
403 #[test]
404 fn parse_expiration_date_rule() {
405 let xml = r#"<LifecycleConfiguration>
406 <Rule>
407 <Filter><Prefix></Prefix></Filter>
408 <Status>Enabled</Status>
409 <Expiration><Date>2024-06-01</Date></Expiration>
410 </Rule>
411 </LifecycleConfiguration>"#;
412
413 let rules = parse_lifecycle_rules(xml).unwrap();
414 assert_eq!(rules.len(), 1);
415 assert_eq!(
416 rules[0].expiration_date,
417 Some(NaiveDate::from_ymd_opt(2024, 6, 1).unwrap())
418 );
419 }
420
421 #[test]
422 fn parse_transition_rule() {
423 let xml = r#"<LifecycleConfiguration>
424 <Rule>
425 <Filter><Prefix>archive/</Prefix></Filter>
426 <Status>Enabled</Status>
427 <Transition>
428 <Days>90</Days>
429 <StorageClass>GLACIER</StorageClass>
430 </Transition>
431 <Transition>
432 <Days>365</Days>
433 <StorageClass>DEEP_ARCHIVE</StorageClass>
434 </Transition>
435 </Rule>
436 </LifecycleConfiguration>"#;
437
438 let rules = parse_lifecycle_rules(xml).unwrap();
439 assert_eq!(rules.len(), 1);
440 assert_eq!(rules[0].transitions.len(), 2);
441 assert_eq!(rules[0].transitions[0].days, Some(90));
442 assert_eq!(rules[0].transitions[0].storage_class, "GLACIER");
443 assert_eq!(rules[0].transitions[1].days, Some(365));
444 assert_eq!(rules[0].transitions[1].storage_class, "DEEP_ARCHIVE");
445 }
446
447 #[test]
448 fn parse_disabled_rule() {
449 let xml = r#"<LifecycleConfiguration>
450 <Rule>
451 <Filter><Prefix></Prefix></Filter>
452 <Status>Disabled</Status>
453 <Expiration><Days>1</Days></Expiration>
454 </Rule>
455 </LifecycleConfiguration>"#;
456
457 let rules = parse_lifecycle_rules(xml).unwrap();
458 assert_eq!(rules.len(), 1);
459 assert_eq!(rules[0].status, "Disabled");
460 }
461
462 #[test]
463 fn parse_tag_filter_rule() {
464 let xml = r#"<LifecycleConfiguration>
465 <Rule>
466 <Filter>
467 <Tag><Key>env</Key><Value>test</Value></Tag>
468 </Filter>
469 <Status>Enabled</Status>
470 <Expiration><Days>7</Days></Expiration>
471 </Rule>
472 </LifecycleConfiguration>"#;
473
474 let rules = parse_lifecycle_rules(xml).unwrap();
475 assert_eq!(rules.len(), 1);
476 let tag = rules[0].tag_filter.as_ref().unwrap();
477 assert_eq!(tag.key, "env");
478 assert_eq!(tag.value, "test");
479 }
480
481 #[test]
482 fn parse_multiple_rules() {
483 let xml = r#"<LifecycleConfiguration>
484 <Rule>
485 <Filter><Prefix>a/</Prefix></Filter>
486 <Status>Enabled</Status>
487 <Expiration><Days>10</Days></Expiration>
488 </Rule>
489 <Rule>
490 <Filter><Prefix>b/</Prefix></Filter>
491 <Status>Enabled</Status>
492 <Expiration><Days>20</Days></Expiration>
493 </Rule>
494 </LifecycleConfiguration>"#;
495
496 let rules = parse_lifecycle_rules(xml).unwrap();
497 assert_eq!(rules.len(), 2);
498 assert_eq!(rules[0].prefix.as_deref(), Some("a/"));
499 assert_eq!(rules[0].expiration_days, Some(10));
500 assert_eq!(rules[1].prefix.as_deref(), Some("b/"));
501 assert_eq!(rules[1].expiration_days, Some(20));
502 }
503
504 #[test]
505 fn parse_empty_lifecycle_xml_returns_empty() {
506 let xml = "<LifecycleConfiguration></LifecycleConfiguration>";
507 let rules = parse_lifecycle_rules(xml);
508 assert!(rules.is_some());
509 assert!(rules.unwrap().is_empty());
510 }
511
512 #[test]
513 fn parse_rule_with_noncurrent_version_expiration() {
514 let xml = r#"<LifecycleConfiguration>
515 <Rule>
516 <ID>nc-rule</ID>
517 <Status>Enabled</Status>
518 <Prefix>x/</Prefix>
519 <NoncurrentVersionExpiration><NoncurrentDays>30</NoncurrentDays></NoncurrentVersionExpiration>
520 </Rule>
521 </LifecycleConfiguration>"#;
522 let rules = parse_lifecycle_rules(xml).unwrap();
523 assert_eq!(rules.len(), 1);
524 }
525
526 #[test]
527 fn parse_rule_with_abort_incomplete_multipart() {
528 let xml = r#"<LifecycleConfiguration>
529 <Rule>
530 <ID>mp-rule</ID>
531 <Status>Enabled</Status>
532 <Prefix></Prefix>
533 <AbortIncompleteMultipartUpload><DaysAfterInitiation>7</DaysAfterInitiation></AbortIncompleteMultipartUpload>
534 </Rule>
535 </LifecycleConfiguration>"#;
536 let rules = parse_lifecycle_rules(xml).unwrap();
537 assert_eq!(rules.len(), 1);
538 }
539
540 #[test]
541 fn parse_date_valid() {
542 let d = parse_date("2025-01-15T00:00:00Z");
543 assert!(d.is_some());
544 }
545
546 #[test]
547 fn parse_date_invalid_returns_none() {
548 assert!(parse_date("bogus").is_none());
549 }
550
551 #[test]
552 fn extract_block_finds_tag() {
553 let body = "<a><b>content</b></a>";
554 let block = extract_block(body, "b");
555 assert_eq!(block, Some("content"));
556 }
557
558 #[test]
559 fn extract_block_missing_tag_returns_none() {
560 let body = "<a></a>";
561 assert!(extract_block(body, "missing").is_none());
562 }
563}