Skip to main content

fakecloud_s3/
lifecycle.rs

1use std::time::Duration;
2
3use chrono::{NaiveDate, Utc};
4
5use crate::state::SharedS3State;
6use crate::xml_util::extract_tag;
7
8/// Background task that processes S3 lifecycle rules.
9///
10/// Every 60 seconds, iterates all buckets with lifecycle configurations,
11/// parses the lifecycle XML, and:
12/// - Deletes objects matching expiration rules (by Days or Date)
13/// - Updates storage class for objects matching transition rules
14pub struct LifecycleProcessor {
15    state: SharedS3State,
16}
17
18impl LifecycleProcessor {
19    pub fn new(state: SharedS3State) -> Self {
20        Self { state }
21    }
22
23    pub async fn run(self) {
24        let mut interval = tokio::time::interval(Duration::from_secs(60));
25
26        loop {
27            interval.tick().await;
28            self.tick();
29        }
30    }
31
32    pub fn tick(&self) {
33        let now = Utc::now();
34        let today = now.date_naive();
35
36        // Collect bucket names, their lifecycle configs, and owning account id
37        let bucket_configs: Vec<(String, String, String)> = {
38            let __mas = self.state.read();
39            __mas
40                .iter()
41                .flat_map(|(acct_id, state)| {
42                    state.buckets.values().filter_map(move |b| {
43                        b.lifecycle_config
44                            .as_ref()
45                            .map(|cfg| (b.name.clone(), cfg.clone(), acct_id.to_string()))
46                    })
47                })
48                .collect()
49        };
50
51        for (bucket_name, config_xml, account_id) in bucket_configs {
52            let rules = match parse_lifecycle_rules(&config_xml) {
53                Some(r) => r,
54                None => continue,
55            };
56
57            for rule in &rules {
58                if rule.status != "Enabled" {
59                    continue;
60                }
61
62                self.process_rule(&account_id, &bucket_name, rule, today);
63            }
64        }
65    }
66
67    fn process_rule(
68        &self,
69        account_id: &str,
70        bucket_name: &str,
71        rule: &LifecycleRule,
72        today: NaiveDate,
73    ) {
74        let mut __mas = self.state.write();
75        let state = match __mas.get_mut(account_id) {
76            Some(s) => s,
77            None => return,
78        };
79        let bucket = match state.buckets.get_mut(bucket_name) {
80            Some(b) => b,
81            None => return,
82        };
83
84        // Collect keys to expire
85        let mut keys_to_delete: Vec<String> = Vec::new();
86        // Collect keys to transition (key, new_storage_class)
87        let mut keys_to_transition: Vec<(String, String)> = Vec::new();
88
89        for (key, obj) in bucket.objects.iter() {
90            // Check prefix filter
91            if let Some(ref prefix) = rule.prefix {
92                if !prefix.is_empty() && !key.starts_with(prefix) {
93                    continue;
94                }
95            }
96
97            // Check tag filter
98            if let Some(ref tag_filter) = rule.tag_filter {
99                let matches = obj
100                    .tags
101                    .get(&tag_filter.key)
102                    .map(|v| v == &tag_filter.value)
103                    .unwrap_or(false);
104                if !matches {
105                    continue;
106                }
107            }
108
109            // Check expiration by Days
110            if let Some(days) = rule.expiration_days {
111                let age = today
112                    .signed_duration_since(obj.last_modified.date_naive())
113                    .num_days();
114                if age >= days as i64 {
115                    keys_to_delete.push(key.clone());
116                    continue;
117                }
118            }
119
120            // Check expiration by Date
121            if let Some(ref date) = rule.expiration_date {
122                if &today >= date {
123                    keys_to_delete.push(key.clone());
124                    continue;
125                }
126            }
127
128            // Check transition by Days
129            for transition in &rule.transitions {
130                let should_transition = if let Some(days) = transition.days {
131                    let age = today
132                        .signed_duration_since(obj.last_modified.date_naive())
133                        .num_days();
134                    age >= days as i64
135                } else if let Some(ref date) = transition.date {
136                    &today >= date
137                } else {
138                    false
139                };
140
141                if should_transition && obj.storage_class != transition.storage_class {
142                    keys_to_transition.push((key.clone(), transition.storage_class.clone()));
143                    break; // Only apply first matching transition
144                }
145            }
146        }
147
148        // Apply deletions
149        if !keys_to_delete.is_empty() {
150            tracing::info!(
151                bucket = %bucket_name,
152                count = keys_to_delete.len(),
153                "S3 lifecycle: expiring objects"
154            );
155            for key in &keys_to_delete {
156                bucket.objects.remove(key);
157            }
158        }
159
160        // Apply transitions
161        if !keys_to_transition.is_empty() {
162            tracing::info!(
163                bucket = %bucket_name,
164                count = keys_to_transition.len(),
165                "S3 lifecycle: transitioning object storage classes"
166            );
167            for (key, new_class) in &keys_to_transition {
168                if let Some(obj) = bucket.objects.get_mut(key) {
169                    obj.storage_class = new_class.clone();
170                }
171            }
172        }
173
174        // Abort multipart uploads older than DaysAfterInitiation. Without this
175        // sweep, abandoned uploads (started but never completed/aborted) leaked
176        // forever in memory + on disk, and configuring the rule had no effect.
177        if let Some(days) = rule.abort_incomplete_mpu_days {
178            let stale: Vec<String> = bucket
179                .multipart_uploads
180                .iter()
181                .filter(|(_, mpu)| {
182                    // The prefix filter (if any) scopes the rule to matching keys.
183                    rule.prefix
184                        .as_ref()
185                        .map(|p| p.is_empty() || mpu.key.starts_with(p))
186                        .unwrap_or(true)
187                        && today
188                            .signed_duration_since(mpu.initiated.date_naive())
189                            .num_days()
190                            >= days as i64
191                })
192                .map(|(id, _)| id.clone())
193                .collect();
194            if !stale.is_empty() {
195                tracing::info!(
196                    bucket = %bucket_name,
197                    count = stale.len(),
198                    "S3 lifecycle: aborting incomplete multipart uploads"
199                );
200                for id in &stale {
201                    bucket.multipart_uploads.remove(id);
202                }
203            }
204        }
205    }
206}
207
208/// A parsed lifecycle rule.
209struct LifecycleRule {
210    status: String,
211    prefix: Option<String>,
212    tag_filter: Option<TagFilter>,
213    expiration_days: Option<u32>,
214    expiration_date: Option<NaiveDate>,
215    transitions: Vec<Transition>,
216    /// `<AbortIncompleteMultipartUpload><DaysAfterInitiation>` — abort
217    /// multipart uploads older than this many days (the only AWS mechanism to
218    /// auto-clean abandoned MPUs).
219    abort_incomplete_mpu_days: Option<u32>,
220}
221
222struct TagFilter {
223    key: String,
224    value: String,
225}
226
227struct Transition {
228    days: Option<u32>,
229    date: Option<NaiveDate>,
230    storage_class: String,
231}
232
233/// Parse lifecycle configuration XML into rules.
234fn parse_lifecycle_rules(xml: &str) -> Option<Vec<LifecycleRule>> {
235    let mut rules = Vec::new();
236    let mut remaining = xml;
237
238    while let Some(rule_start) = remaining.find("<Rule>") {
239        let after = &remaining[rule_start + 6..];
240        let rule_end = after.find("</Rule>")?;
241        let rule_body = &after[..rule_end];
242
243        let status = extract_tag(rule_body, "Status").unwrap_or_default();
244
245        // Parse prefix — can be at rule level or inside <Filter>
246        let prefix = if let Some(filter_body) = extract_block(rule_body, "Filter") {
247            // Check for <Prefix> inside Filter
248            let filter_prefix = extract_tag(filter_body, "Prefix");
249            // Also check for <And><Prefix> pattern
250            if filter_prefix.is_some() {
251                filter_prefix
252            } else if let Some(and_body) = extract_block(filter_body, "And") {
253                extract_tag(and_body, "Prefix")
254            } else {
255                None
256            }
257        } else {
258            extract_tag(rule_body, "Prefix")
259        };
260
261        // Parse tag filter from <Filter><Tag> or <Filter><And><Tag>
262        let tag_filter = if let Some(filter_body) = extract_block(rule_body, "Filter") {
263            parse_tag_filter(filter_body)
264        } else {
265            None
266        };
267
268        // Parse expiration
269        let (expiration_days, expiration_date) =
270            if let Some(exp_body) = extract_block(rule_body, "Expiration") {
271                let days = extract_tag(exp_body, "Days").and_then(|s| s.parse::<u32>().ok());
272                let date = extract_tag(exp_body, "Date").and_then(|s| parse_date(&s));
273                (days, date)
274            } else {
275                (None, None)
276            };
277
278        // Parse transitions
279        let mut transitions = Vec::new();
280        let mut trans_remaining = rule_body;
281        while let Some(t_start) = trans_remaining.find("<Transition>") {
282            let t_after = &trans_remaining[t_start + 12..];
283            if let Some(t_end) = t_after.find("</Transition>") {
284                let t_body = &t_after[..t_end];
285                let days = extract_tag(t_body, "Days").and_then(|s| s.parse::<u32>().ok());
286                let date = extract_tag(t_body, "Date").and_then(|s| parse_date(&s));
287                let storage_class =
288                    extract_tag(t_body, "StorageClass").unwrap_or_else(|| "GLACIER".to_string());
289                transitions.push(Transition {
290                    days,
291                    date,
292                    storage_class,
293                });
294                trans_remaining = &t_after[t_end + 13..];
295            } else {
296                break;
297            }
298        }
299
300        let abort_incomplete_mpu_days = extract_block(rule_body, "AbortIncompleteMultipartUpload")
301            .and_then(|b| extract_tag(b, "DaysAfterInitiation"))
302            .and_then(|s| s.parse::<u32>().ok());
303
304        rules.push(LifecycleRule {
305            status,
306            prefix,
307            tag_filter,
308            expiration_days,
309            expiration_date,
310            transitions,
311            abort_incomplete_mpu_days,
312        });
313
314        remaining = &after[rule_end + 7..];
315    }
316
317    Some(rules)
318}
319
320/// Extract the body of a block element, e.g. `<Filter>...</Filter>` -> "...".
321fn extract_block<'a>(body: &'a str, tag: &str) -> Option<&'a str> {
322    let open = format!("<{tag}>");
323    let close = format!("</{tag}>");
324    let start = body.find(&open)?;
325    let content_start = start + open.len();
326    let end = body[content_start..].find(&close)?;
327    Some(&body[content_start..content_start + end])
328}
329
330fn parse_tag_filter(filter_body: &str) -> Option<TagFilter> {
331    // Try direct <Tag> inside <Filter>
332    if let Some(tag_body) = extract_block(filter_body, "Tag") {
333        let key = extract_tag(tag_body, "Key")?;
334        let value = extract_tag(tag_body, "Value").unwrap_or_default();
335        return Some(TagFilter { key, value });
336    }
337    // Try <And><Tag> inside <Filter>
338    if let Some(and_body) = extract_block(filter_body, "And") {
339        if let Some(tag_body) = extract_block(and_body, "Tag") {
340            let key = extract_tag(tag_body, "Key")?;
341            let value = extract_tag(tag_body, "Value").unwrap_or_default();
342            return Some(TagFilter { key, value });
343        }
344    }
345    None
346}
347
348/// Parse a date string like "2024-01-01" or "2024-01-01T00:00:00.000Z".
349fn parse_date(s: &str) -> Option<NaiveDate> {
350    // Try YYYY-MM-DD first
351    if let Ok(d) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
352        return Some(d);
353    }
354    // Try ISO 8601 with time
355    if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s) {
356        return Some(dt.date_naive());
357    }
358    // Try with T and Z suffix
359    if let Some(date_part) = s.split('T').next() {
360        if let Ok(d) = NaiveDate::parse_from_str(date_part, "%Y-%m-%d") {
361            return Some(d);
362        }
363    }
364    None
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370
371    #[test]
372    fn parse_expiration_days_rule() {
373        let xml = r#"<LifecycleConfiguration>
374            <Rule>
375                <Filter><Prefix>logs/</Prefix></Filter>
376                <Status>Enabled</Status>
377                <Expiration><Days>30</Days></Expiration>
378            </Rule>
379        </LifecycleConfiguration>"#;
380
381        let rules = parse_lifecycle_rules(xml).unwrap();
382        assert_eq!(rules.len(), 1);
383        assert_eq!(rules[0].status, "Enabled");
384        assert_eq!(rules[0].prefix.as_deref(), Some("logs/"));
385        assert_eq!(rules[0].expiration_days, Some(30));
386    }
387
388    #[test]
389    fn parse_abort_incomplete_mpu_rule() {
390        let xml = r#"<LifecycleConfiguration>
391            <Rule>
392                <Filter><Prefix>uploads/</Prefix></Filter>
393                <Status>Enabled</Status>
394                <AbortIncompleteMultipartUpload><DaysAfterInitiation>7</DaysAfterInitiation></AbortIncompleteMultipartUpload>
395            </Rule>
396        </LifecycleConfiguration>"#;
397
398        let rules = parse_lifecycle_rules(xml).unwrap();
399        assert_eq!(rules.len(), 1);
400        assert_eq!(rules[0].abort_incomplete_mpu_days, Some(7));
401    }
402
403    #[test]
404    fn parse_expiration_date_rule() {
405        let xml = r#"<LifecycleConfiguration>
406            <Rule>
407                <Filter><Prefix></Prefix></Filter>
408                <Status>Enabled</Status>
409                <Expiration><Date>2024-06-01</Date></Expiration>
410            </Rule>
411        </LifecycleConfiguration>"#;
412
413        let rules = parse_lifecycle_rules(xml).unwrap();
414        assert_eq!(rules.len(), 1);
415        assert_eq!(
416            rules[0].expiration_date,
417            Some(NaiveDate::from_ymd_opt(2024, 6, 1).unwrap())
418        );
419    }
420
421    #[test]
422    fn parse_transition_rule() {
423        let xml = r#"<LifecycleConfiguration>
424            <Rule>
425                <Filter><Prefix>archive/</Prefix></Filter>
426                <Status>Enabled</Status>
427                <Transition>
428                    <Days>90</Days>
429                    <StorageClass>GLACIER</StorageClass>
430                </Transition>
431                <Transition>
432                    <Days>365</Days>
433                    <StorageClass>DEEP_ARCHIVE</StorageClass>
434                </Transition>
435            </Rule>
436        </LifecycleConfiguration>"#;
437
438        let rules = parse_lifecycle_rules(xml).unwrap();
439        assert_eq!(rules.len(), 1);
440        assert_eq!(rules[0].transitions.len(), 2);
441        assert_eq!(rules[0].transitions[0].days, Some(90));
442        assert_eq!(rules[0].transitions[0].storage_class, "GLACIER");
443        assert_eq!(rules[0].transitions[1].days, Some(365));
444        assert_eq!(rules[0].transitions[1].storage_class, "DEEP_ARCHIVE");
445    }
446
447    #[test]
448    fn parse_disabled_rule() {
449        let xml = r#"<LifecycleConfiguration>
450            <Rule>
451                <Filter><Prefix></Prefix></Filter>
452                <Status>Disabled</Status>
453                <Expiration><Days>1</Days></Expiration>
454            </Rule>
455        </LifecycleConfiguration>"#;
456
457        let rules = parse_lifecycle_rules(xml).unwrap();
458        assert_eq!(rules.len(), 1);
459        assert_eq!(rules[0].status, "Disabled");
460    }
461
462    #[test]
463    fn parse_tag_filter_rule() {
464        let xml = r#"<LifecycleConfiguration>
465            <Rule>
466                <Filter>
467                    <Tag><Key>env</Key><Value>test</Value></Tag>
468                </Filter>
469                <Status>Enabled</Status>
470                <Expiration><Days>7</Days></Expiration>
471            </Rule>
472        </LifecycleConfiguration>"#;
473
474        let rules = parse_lifecycle_rules(xml).unwrap();
475        assert_eq!(rules.len(), 1);
476        let tag = rules[0].tag_filter.as_ref().unwrap();
477        assert_eq!(tag.key, "env");
478        assert_eq!(tag.value, "test");
479    }
480
481    #[test]
482    fn parse_multiple_rules() {
483        let xml = r#"<LifecycleConfiguration>
484            <Rule>
485                <Filter><Prefix>a/</Prefix></Filter>
486                <Status>Enabled</Status>
487                <Expiration><Days>10</Days></Expiration>
488            </Rule>
489            <Rule>
490                <Filter><Prefix>b/</Prefix></Filter>
491                <Status>Enabled</Status>
492                <Expiration><Days>20</Days></Expiration>
493            </Rule>
494        </LifecycleConfiguration>"#;
495
496        let rules = parse_lifecycle_rules(xml).unwrap();
497        assert_eq!(rules.len(), 2);
498        assert_eq!(rules[0].prefix.as_deref(), Some("a/"));
499        assert_eq!(rules[0].expiration_days, Some(10));
500        assert_eq!(rules[1].prefix.as_deref(), Some("b/"));
501        assert_eq!(rules[1].expiration_days, Some(20));
502    }
503
504    #[test]
505    fn parse_empty_lifecycle_xml_returns_empty() {
506        let xml = "<LifecycleConfiguration></LifecycleConfiguration>";
507        let rules = parse_lifecycle_rules(xml);
508        assert!(rules.is_some());
509        assert!(rules.unwrap().is_empty());
510    }
511
512    #[test]
513    fn parse_rule_with_noncurrent_version_expiration() {
514        let xml = r#"<LifecycleConfiguration>
515            <Rule>
516                <ID>nc-rule</ID>
517                <Status>Enabled</Status>
518                <Prefix>x/</Prefix>
519                <NoncurrentVersionExpiration><NoncurrentDays>30</NoncurrentDays></NoncurrentVersionExpiration>
520            </Rule>
521        </LifecycleConfiguration>"#;
522        let rules = parse_lifecycle_rules(xml).unwrap();
523        assert_eq!(rules.len(), 1);
524    }
525
526    #[test]
527    fn parse_rule_with_abort_incomplete_multipart() {
528        let xml = r#"<LifecycleConfiguration>
529            <Rule>
530                <ID>mp-rule</ID>
531                <Status>Enabled</Status>
532                <Prefix></Prefix>
533                <AbortIncompleteMultipartUpload><DaysAfterInitiation>7</DaysAfterInitiation></AbortIncompleteMultipartUpload>
534            </Rule>
535        </LifecycleConfiguration>"#;
536        let rules = parse_lifecycle_rules(xml).unwrap();
537        assert_eq!(rules.len(), 1);
538    }
539
540    #[test]
541    fn parse_date_valid() {
542        let d = parse_date("2025-01-15T00:00:00Z");
543        assert!(d.is_some());
544    }
545
546    #[test]
547    fn parse_date_invalid_returns_none() {
548        assert!(parse_date("bogus").is_none());
549    }
550
551    #[test]
552    fn extract_block_finds_tag() {
553        let body = "<a><b>content</b></a>";
554        let block = extract_block(body, "b");
555        assert_eq!(block, Some("content"));
556    }
557
558    #[test]
559    fn extract_block_missing_tag_returns_none() {
560        let body = "<a></a>";
561        assert!(extract_block(body, "missing").is_none());
562    }
563}