Skip to main content

ff_engine/scanner/
retention_trimmer.rs

1//! Terminal execution retention scanner.
2//!
3//! Scans `ff:idx:{p:N}:lane:<lane>:terminal` for each partition+lane,
4//! finding terminal executions whose `completed_at` score is older than
5//! the configured retention period. For each, cascading-deletes all
6//! sub-objects: streams, attempt hashes, usage, policy, lease, suspension,
7//! waitpoints, signals, and finally the exec core + index entries.
8//!
9//! This is a Rust-only scanner — no FCALL needed. Uses direct Valkey
10//! commands (ZRANGEBYSCORE + HGET + DEL).
11//!
12//! Reference: RFC-010 §6.12
13
14use std::time::Duration;
15
16use ff_core::keys::IndexKeys;
17use ff_core::partition::{Partition, PartitionFamily};
18use ff_core::types::LaneId;
19
20use super::{ScanResult, Scanner};
21
22const BATCH_SIZE: u32 = 20;
23
24/// Default retention period: 24 hours.
25const DEFAULT_RETENTION_MS: u64 = 24 * 60 * 60 * 1000;
26
27pub struct RetentionTrimmer {
28    interval: Duration,
29    lanes: Vec<LaneId>,
30    /// Default retention period in ms (used when execution has no policy).
31    default_retention_ms: u64,
32}
33
34impl RetentionTrimmer {
35    pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
36        Self {
37            interval,
38            lanes,
39            default_retention_ms: DEFAULT_RETENTION_MS,
40        }
41    }
42}
43
44impl Scanner for RetentionTrimmer {
45    fn name(&self) -> &'static str {
46        "retention_trimmer"
47    }
48
49    fn interval(&self) -> Duration {
50        self.interval
51    }
52
53    async fn scan_partition(
54        &self,
55        client: &ferriskey::Client,
56        partition: u16,
57    ) -> ScanResult {
58        let p = Partition {
59            family: PartitionFamily::Execution,
60            index: partition,
61        };
62        let idx = IndexKeys::new(&p);
63
64        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
65            Ok(t) => t,
66            Err(e) => {
67                tracing::warn!(partition, error = %e, "retention_trimmer: failed to get server time");
68                return ScanResult { processed: 0, errors: 1 };
69            }
70        };
71
72        let mut total_processed: u32 = 0;
73        let mut total_errors: u32 = 0;
74
75        for lane in &self.lanes {
76            let terminal_key = idx.lane_terminal(lane);
77
78            // Find executions that completed before the retention cutoff.
79            // Score = completed_at ms. We look for score <= (now - default_retention).
80            // Per-execution retention override is checked after fetching.
81            let cutoff = now_ms.saturating_sub(self.default_retention_ms);
82
83            let expired: Vec<String> = match client
84                .cmd("ZRANGEBYSCORE")
85                .arg(&terminal_key)
86                .arg("-inf")
87                .arg(cutoff.to_string().as_str())
88                .arg("LIMIT")
89                .arg("0")
90                .arg(BATCH_SIZE.to_string().as_str())
91                .execute()
92                .await
93            {
94                Ok(ids) => ids,
95                Err(e) => {
96                    tracing::warn!(
97                        partition, lane = lane.as_str(), error = %e,
98                        "retention_trimmer: ZRANGEBYSCORE failed"
99                    );
100                    total_errors += 1;
101                    continue;
102                }
103            };
104
105            if expired.is_empty() {
106                continue;
107            }
108
109            for eid_str in &expired {
110                match purge_execution(
111                    client, &p, &idx, lane, eid_str, &terminal_key, now_ms,
112                    self.default_retention_ms,
113                ).await {
114                    Ok(true) => total_processed += 1,
115                    Ok(false) => {} // skipped (custom retention not yet expired)
116                    Err(e) => {
117                        tracing::warn!(
118                            partition,
119                            execution_id = eid_str.as_str(),
120                            error = %e,
121                            "retention_trimmer: purge failed"
122                        );
123                        total_errors += 1;
124                    }
125                }
126            }
127        }
128
129        ScanResult { processed: total_processed, errors: total_errors }
130    }
131}
132
133/// Purge all keys for one terminal execution. Returns Ok(true) if purged,
134/// Ok(false) if skipped (custom retention not yet due).
135#[allow(clippy::too_many_arguments)]
136async fn purge_execution(
137    client: &ferriskey::Client,
138    partition: &Partition,
139    idx: &IndexKeys,
140    _lane: &LaneId,
141    eid_str: &str,
142    terminal_key: &str,
143    now_ms: u64,
144    default_retention_ms: u64,
145) -> Result<bool, ferriskey::Error> {
146    let tag = partition.hash_tag();
147    let exec_core_key = format!("ff:exec:{}:{}:core", tag, eid_str);
148
149    // Read completed_at and total_attempt_count from exec_core
150    let fields: Vec<Option<String>> = client
151        .cmd("HMGET")
152        .arg(&exec_core_key)
153        .arg("completed_at")
154        .arg("total_attempt_count")
155        .execute()
156        .await?;
157
158    let completed_at: u64 = fields.first()
159        .and_then(|v| v.as_ref())
160        .and_then(|s| s.parse().ok())
161        .unwrap_or(0);
162    let total_attempts: u32 = fields.get(1)
163        .and_then(|v| v.as_ref())
164        .and_then(|s| s.parse().ok())
165        .unwrap_or(0);
166
167    if completed_at == 0 {
168        // No completed_at — exec_core may already be gone. Clean index entry.
169        let _: u32 = client.cmd("ZREM").arg(terminal_key).arg(eid_str).execute().await?;
170        return Ok(true);
171    }
172
173    // Check per-execution retention override from policy key
174    let policy_key = format!("ff:exec:{}:{}:policy", tag, eid_str);
175    let retention_ms = read_retention_ms(client, &policy_key, default_retention_ms).await;
176
177    if now_ms < completed_at + retention_ms {
178        return Ok(false); // Not yet expired under custom retention
179    }
180
181    // ── Cascading delete ──
182    // Collect subordinate keys first. exec_core and the policy key are
183    // held out of this list and DELeted LAST, after every other chunk
184    // succeeds. Rationale: if an intermediate chunk fails with a
185    // transient error, the next retention pass needs to re-read
186    // `exec_core.total_attempt_count` and `policy.retention_ttl_ms` to
187    // rebuild the full del_keys list — so we must not destroy those two
188    // keys until the rest of the cascade has committed. ZREM on the
189    // terminal_zset entry also happens last (existing invariant).
190    let mut del_keys: Vec<String> = Vec::with_capacity(16 + total_attempts as usize * 5);
191
192    // Execution-level keys (safe to delete before exec_core)
193    del_keys.push(format!("ff:exec:{}:{}:payload", tag, eid_str));
194    del_keys.push(format!("ff:exec:{}:{}:result", tag, eid_str));
195    del_keys.push(format!("ff:exec:{}:{}:tags", tag, eid_str));
196
197    // Lease keys
198    del_keys.push(format!("ff:exec:{}:{}:lease:current", tag, eid_str));
199    del_keys.push(format!("ff:exec:{}:{}:lease:history", tag, eid_str));
200    del_keys.push(format!("ff:exec:{}:{}:claim_grant", tag, eid_str));
201
202    // Attempt-level keys (for each attempt 0..total_attempts)
203    del_keys.push(format!("ff:exec:{}:{}:attempts", tag, eid_str));
204    for i in 0..total_attempts {
205        del_keys.push(format!("ff:attempt:{}:{}:{}", tag, eid_str, i));
206        del_keys.push(format!("ff:attempt:{}:{}:{}:usage", tag, eid_str, i));
207        del_keys.push(format!("ff:attempt:{}:{}:{}:policy", tag, eid_str, i));
208        del_keys.push(format!("ff:stream:{}:{}:{}", tag, eid_str, i));
209        del_keys.push(format!("ff:stream:{}:{}:{}:meta", tag, eid_str, i));
210    }
211
212    // Suspension/waitpoint keys
213    del_keys.push(format!("ff:exec:{}:{}:suspension:current", tag, eid_str));
214
215    // Dependency keys (RFC-007)
216    // deps:all_edges holds every edge ID ever applied to this exec (never
217    // pruned on resolve), so SMEMBERS gives us the full set of dep hashes to
218    // drop — cluster-safe, no SCAN.
219    let deps_all_edges_key = format!("ff:exec:{}:{}:deps:all_edges", tag, eid_str);
220    let dep_edge_ids: Vec<String> = client
221        .cmd("SMEMBERS")
222        .arg(&deps_all_edges_key)
223        .execute()
224        .await
225        .unwrap_or_default();
226
227    del_keys.push(format!("ff:exec:{}:{}:deps:meta", tag, eid_str));
228    del_keys.push(format!("ff:exec:{}:{}:deps:unresolved", tag, eid_str));
229    del_keys.push(deps_all_edges_key);
230    for edge_id in &dep_edge_ids {
231        del_keys.push(format!("ff:exec:{}:{}:dep:{}", tag, eid_str, edge_id));
232    }
233
234    // Read waitpoints set to discover all waitpoint-related keys
235    let waitpoints_key = format!("ff:exec:{}:{}:waitpoints", tag, eid_str);
236    let wp_ids: Vec<String> = client
237        .cmd("SMEMBERS")
238        .arg(&waitpoints_key)
239        .execute()
240        .await
241        .unwrap_or_default();
242
243    del_keys.push(waitpoints_key);
244
245    for wp_id_str in &wp_ids {
246        del_keys.push(format!("ff:wp:{}:{}", tag, wp_id_str));
247        del_keys.push(format!("ff:wp:{}:{}:signals", tag, wp_id_str));
248        del_keys.push(format!("ff:wp:{}:{}:condition", tag, wp_id_str));
249    }
250
251    // Signal keys — read from per-execution signal index
252    let signal_key = format!("ff:exec:{}:{}:signals", tag, eid_str);
253    let sig_ids: Vec<String> = client
254        .cmd("ZRANGE")
255        .arg(&signal_key)
256        .arg("0")
257        .arg("-1")
258        .execute()
259        .await
260        .unwrap_or_default();
261
262    del_keys.push(signal_key);
263
264    for sig_id_str in &sig_ids {
265        del_keys.push(format!("ff:signal:{}:{}", tag, sig_id_str));
266        del_keys.push(format!("ff:signal:{}:{}:payload", tag, sig_id_str));
267    }
268
269    // Batch DEL in chunks of 500 to avoid oversized commands on
270    // executions with many attempts/signals/waitpoints/deps. Subordinate
271    // keys go first; if any chunk errors with `?`, exec_core and
272    // policy_key remain untouched so the next retention pass can
273    // re-read them and rebuild the full del_keys list.
274    for chunk in del_keys.chunks(500) {
275        let key_refs: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
276        let _: u32 = client
277            .cmd("DEL")
278            .arg(&key_refs)
279            .execute()
280            .await?;
281    }
282
283    // Finalize: drop exec_core and policy together (both on {p:N}), then
284    // sweep the terminal-zset entry and the partition-wide all_executions
285    // index. Doing this after the subordinate chunks guarantees that a
286    // partial retention failure is idempotently retriable without
287    // orphaning keys that retention discovers by reading exec_core.
288    let _: u32 = client
289        .cmd("DEL")
290        .arg(&[exec_core_key.as_str(), policy_key.as_str()][..])
291        .execute()
292        .await?;
293
294    let _: u32 = client.cmd("ZREM").arg(terminal_key).arg(eid_str).execute().await?;
295    let all_exec_key = idx.all_executions();
296    let _: u32 = client.cmd("SREM").arg(&all_exec_key).arg(eid_str).execute().await?;
297
298    tracing::debug!(
299        execution_id = eid_str,
300        attempts = total_attempts,
301        waitpoints = wp_ids.len(),
302        signals = sig_ids.len(),
303        "retention_trimmer: purged execution"
304    );
305
306    Ok(true)
307}
308
309/// Read retention_ttl_ms from the execution's policy JSON.
310/// Returns default_retention_ms if policy doesn't exist or doesn't specify retention.
311async fn read_retention_ms(
312    client: &ferriskey::Client,
313    policy_key: &str,
314    default_retention_ms: u64,
315) -> u64 {
316    let policy_json: Option<String> = match client
317        .cmd("GET")
318        .arg(policy_key)
319        .execute()
320        .await
321    {
322        Ok(v) => v,
323        Err(_) => return default_retention_ms,
324    };
325
326    let json_str = match policy_json {
327        Some(s) if !s.is_empty() => s,
328        _ => return default_retention_ms,
329    };
330
331    // Parse JSON to extract stream_policy.retention_ttl_ms
332    let parsed: serde_json::Value = match serde_json::from_str(&json_str) {
333        Ok(v) => v,
334        Err(_) => return default_retention_ms,
335    };
336
337    parsed
338        .get("stream_policy")
339        .and_then(|sp| sp.get("retention_ttl_ms"))
340        .and_then(|v| v.as_u64())
341        .unwrap_or(default_retention_ms)
342}
343