Skip to main content

ff_engine/scanner/
retention_trimmer.rs

1//! Terminal execution retention scanner.
2//!
3//! Scans `ff:idx:{p:N}:lane:<lane>:terminal` for each partition+lane,
4//! finding terminal executions whose `completed_at` score is older than
5//! the configured retention period. For each, cascading-deletes all
6//! sub-objects: streams, attempt hashes, usage, policy, lease, suspension,
7//! waitpoints, signals, and finally the exec core + index entries.
8//!
9//! This is a Rust-only scanner — no FCALL needed. Uses direct Valkey
10//! commands (ZRANGEBYSCORE + HGET + DEL).
11//!
12//! Reference: RFC-010 §6.12
13
14use std::time::Duration;
15
16use ff_core::backend::ScannerFilter;
17use ff_core::keys::IndexKeys;
18use ff_core::partition::{Partition, PartitionFamily};
19use ff_core::types::LaneId;
20
21use super::{should_skip_candidate, ScanResult, Scanner};
22
23const BATCH_SIZE: u32 = 20;
24
25/// Default retention period: 24 hours.
26const DEFAULT_RETENTION_MS: u64 = 24 * 60 * 60 * 1000;
27
28pub struct RetentionTrimmer {
29    interval: Duration,
30    lanes: Vec<LaneId>,
31    /// Default retention period in ms (used when execution has no policy).
32    default_retention_ms: u64,
33    filter: ScannerFilter,
34}
35
36impl RetentionTrimmer {
37    pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
38        Self::with_filter(interval, lanes, ScannerFilter::default())
39    }
40
41    /// Construct with a [`ScannerFilter`] applied per candidate
42    /// (issue #122).
43    pub fn with_filter(interval: Duration, lanes: Vec<LaneId>, filter: ScannerFilter) -> Self {
44        Self {
45            interval,
46            lanes,
47            default_retention_ms: DEFAULT_RETENTION_MS,
48            filter,
49        }
50    }
51}
52
53impl Scanner for RetentionTrimmer {
54    fn name(&self) -> &'static str {
55        "retention_trimmer"
56    }
57
58    fn interval(&self) -> Duration {
59        self.interval
60    }
61
62    fn filter(&self) -> &ScannerFilter {
63        &self.filter
64    }
65
66    async fn scan_partition(
67        &self,
68        client: &ferriskey::Client,
69        partition: u16,
70    ) -> ScanResult {
71        let p = Partition {
72            family: PartitionFamily::Execution,
73            index: partition,
74        };
75        let idx = IndexKeys::new(&p);
76
77        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
78            Ok(t) => t,
79            Err(e) => {
80                tracing::warn!(partition, error = %e, "retention_trimmer: failed to get server time");
81                return ScanResult { processed: 0, errors: 1 };
82            }
83        };
84
85        let mut total_processed: u32 = 0;
86        let mut total_errors: u32 = 0;
87
88        for lane in &self.lanes {
89            let terminal_key = idx.lane_terminal(lane);
90
91            // Find executions that completed before the retention cutoff.
92            // Score = completed_at ms. We look for score <= (now - default_retention).
93            // Per-execution retention override is checked after fetching.
94            let cutoff = now_ms.saturating_sub(self.default_retention_ms);
95
96            let expired: Vec<String> = match client
97                .cmd("ZRANGEBYSCORE")
98                .arg(&terminal_key)
99                .arg("-inf")
100                .arg(cutoff.to_string().as_str())
101                .arg("LIMIT")
102                .arg("0")
103                .arg(BATCH_SIZE.to_string().as_str())
104                .execute()
105                .await
106            {
107                Ok(ids) => ids,
108                Err(e) => {
109                    tracing::warn!(
110                        partition, lane = lane.as_str(), error = %e,
111                        "retention_trimmer: ZRANGEBYSCORE failed"
112                    );
113                    total_errors += 1;
114                    continue;
115                }
116            };
117
118            if expired.is_empty() {
119                continue;
120            }
121
122            for eid_str in &expired {
123                if should_skip_candidate(client, &self.filter, partition, eid_str).await {
124                    continue;
125                }
126                match purge_execution(
127                    client, &p, &idx, lane, eid_str, &terminal_key, now_ms,
128                    self.default_retention_ms,
129                ).await {
130                    Ok(true) => total_processed += 1,
131                    Ok(false) => {} // skipped (custom retention not yet expired)
132                    Err(e) => {
133                        tracing::warn!(
134                            partition,
135                            execution_id = eid_str.as_str(),
136                            error = %e,
137                            "retention_trimmer: purge failed"
138                        );
139                        total_errors += 1;
140                    }
141                }
142            }
143        }
144
145        ScanResult { processed: total_processed, errors: total_errors }
146    }
147}
148
149/// Purge all keys for one terminal execution. Returns Ok(true) if purged,
150/// Ok(false) if skipped (custom retention not yet due).
151#[allow(clippy::too_many_arguments)]
152async fn purge_execution(
153    client: &ferriskey::Client,
154    partition: &Partition,
155    idx: &IndexKeys,
156    _lane: &LaneId,
157    eid_str: &str,
158    terminal_key: &str,
159    now_ms: u64,
160    default_retention_ms: u64,
161) -> Result<bool, ferriskey::Error> {
162    let tag = partition.hash_tag();
163    let exec_core_key = format!("ff:exec:{}:{}:core", tag, eid_str);
164
165    // Read completed_at and total_attempt_count from exec_core
166    let fields: Vec<Option<String>> = client
167        .cmd("HMGET")
168        .arg(&exec_core_key)
169        .arg("completed_at")
170        .arg("total_attempt_count")
171        .execute()
172        .await?;
173
174    let completed_at: u64 = fields.first()
175        .and_then(|v| v.as_ref())
176        .and_then(|s| s.parse().ok())
177        .unwrap_or(0);
178    let total_attempts: u32 = fields.get(1)
179        .and_then(|v| v.as_ref())
180        .and_then(|s| s.parse().ok())
181        .unwrap_or(0);
182
183    if completed_at == 0 {
184        // No completed_at — exec_core may already be gone. Clean index entry.
185        let _: u32 = client.cmd("ZREM").arg(terminal_key).arg(eid_str).execute().await?;
186        return Ok(true);
187    }
188
189    // Check per-execution retention override from policy key
190    let policy_key = format!("ff:exec:{}:{}:policy", tag, eid_str);
191    let retention_ms = read_retention_ms(client, &policy_key, default_retention_ms).await;
192
193    if now_ms < completed_at + retention_ms {
194        return Ok(false); // Not yet expired under custom retention
195    }
196
197    // ── Cascading delete ──
198    // Collect subordinate keys first. exec_core and the policy key are
199    // held out of this list and DELeted LAST, after every other chunk
200    // succeeds. Rationale: if an intermediate chunk fails with a
201    // transient error, the next retention pass needs to re-read
202    // `exec_core.total_attempt_count` and `policy.retention_ttl_ms` to
203    // rebuild the full del_keys list — so we must not destroy those two
204    // keys until the rest of the cascade has committed. ZREM on the
205    // terminal_zset entry also happens last (existing invariant).
206    let mut del_keys: Vec<String> = Vec::with_capacity(16 + total_attempts as usize * 5);
207
208    // Execution-level keys (safe to delete before exec_core)
209    del_keys.push(format!("ff:exec:{}:{}:payload", tag, eid_str));
210    del_keys.push(format!("ff:exec:{}:{}:result", tag, eid_str));
211    del_keys.push(format!("ff:exec:{}:{}:tags", tag, eid_str));
212
213    // Lease keys
214    del_keys.push(format!("ff:exec:{}:{}:lease:current", tag, eid_str));
215    del_keys.push(format!("ff:exec:{}:{}:lease:history", tag, eid_str));
216    del_keys.push(format!("ff:exec:{}:{}:claim_grant", tag, eid_str));
217
218    // Attempt-level keys (for each attempt 0..total_attempts)
219    del_keys.push(format!("ff:exec:{}:{}:attempts", tag, eid_str));
220    for i in 0..total_attempts {
221        del_keys.push(format!("ff:attempt:{}:{}:{}", tag, eid_str, i));
222        del_keys.push(format!("ff:attempt:{}:{}:{}:usage", tag, eid_str, i));
223        del_keys.push(format!("ff:attempt:{}:{}:{}:policy", tag, eid_str, i));
224        del_keys.push(format!("ff:stream:{}:{}:{}", tag, eid_str, i));
225        del_keys.push(format!("ff:stream:{}:{}:{}:meta", tag, eid_str, i));
226    }
227
228    // Suspension/waitpoint keys
229    del_keys.push(format!("ff:exec:{}:{}:suspension:current", tag, eid_str));
230
231    // Dependency keys (RFC-007)
232    // deps:all_edges holds every edge ID ever applied to this exec (never
233    // pruned on resolve), so SMEMBERS gives us the full set of dep hashes to
234    // drop — cluster-safe, no SCAN.
235    let deps_all_edges_key = format!("ff:exec:{}:{}:deps:all_edges", tag, eid_str);
236    let dep_edge_ids: Vec<String> = client
237        .cmd("SMEMBERS")
238        .arg(&deps_all_edges_key)
239        .execute()
240        .await
241        .unwrap_or_default();
242
243    del_keys.push(format!("ff:exec:{}:{}:deps:meta", tag, eid_str));
244    del_keys.push(format!("ff:exec:{}:{}:deps:unresolved", tag, eid_str));
245    del_keys.push(deps_all_edges_key);
246    for edge_id in &dep_edge_ids {
247        del_keys.push(format!("ff:exec:{}:{}:dep:{}", tag, eid_str, edge_id));
248    }
249
250    // Read waitpoints set to discover all waitpoint-related keys
251    let waitpoints_key = format!("ff:exec:{}:{}:waitpoints", tag, eid_str);
252    let wp_ids: Vec<String> = client
253        .cmd("SMEMBERS")
254        .arg(&waitpoints_key)
255        .execute()
256        .await
257        .unwrap_or_default();
258
259    del_keys.push(waitpoints_key);
260
261    for wp_id_str in &wp_ids {
262        del_keys.push(format!("ff:wp:{}:{}", tag, wp_id_str));
263        del_keys.push(format!("ff:wp:{}:{}:signals", tag, wp_id_str));
264        del_keys.push(format!("ff:wp:{}:{}:condition", tag, wp_id_str));
265    }
266
267    // Signal keys — read from per-execution signal index
268    let signal_key = format!("ff:exec:{}:{}:signals", tag, eid_str);
269    let sig_ids: Vec<String> = client
270        .cmd("ZRANGE")
271        .arg(&signal_key)
272        .arg("0")
273        .arg("-1")
274        .execute()
275        .await
276        .unwrap_or_default();
277
278    del_keys.push(signal_key);
279
280    for sig_id_str in &sig_ids {
281        del_keys.push(format!("ff:signal:{}:{}", tag, sig_id_str));
282        del_keys.push(format!("ff:signal:{}:{}:payload", tag, sig_id_str));
283    }
284
285    // Batch DEL in chunks of 500 to avoid oversized commands on
286    // executions with many attempts/signals/waitpoints/deps. Subordinate
287    // keys go first; if any chunk errors with `?`, exec_core and
288    // policy_key remain untouched so the next retention pass can
289    // re-read them and rebuild the full del_keys list.
290    for chunk in del_keys.chunks(500) {
291        let key_refs: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
292        let _: u32 = client
293            .cmd("DEL")
294            .arg(&key_refs)
295            .execute()
296            .await?;
297    }
298
299    // Finalize: drop exec_core and policy together (both on {p:N}), then
300    // sweep the terminal-zset entry and the partition-wide all_executions
301    // index. Doing this after the subordinate chunks guarantees that a
302    // partial retention failure is idempotently retriable without
303    // orphaning keys that retention discovers by reading exec_core.
304    let _: u32 = client
305        .cmd("DEL")
306        .arg(&[exec_core_key.as_str(), policy_key.as_str()][..])
307        .execute()
308        .await?;
309
310    let _: u32 = client.cmd("ZREM").arg(terminal_key).arg(eid_str).execute().await?;
311    let all_exec_key = idx.all_executions();
312    let _: u32 = client.cmd("SREM").arg(&all_exec_key).arg(eid_str).execute().await?;
313
314    tracing::debug!(
315        execution_id = eid_str,
316        attempts = total_attempts,
317        waitpoints = wp_ids.len(),
318        signals = sig_ids.len(),
319        "retention_trimmer: purged execution"
320    );
321
322    Ok(true)
323}
324
325/// Read retention_ttl_ms from the execution's policy JSON.
326/// Returns default_retention_ms if policy doesn't exist or doesn't specify retention.
327async fn read_retention_ms(
328    client: &ferriskey::Client,
329    policy_key: &str,
330    default_retention_ms: u64,
331) -> u64 {
332    let policy_json: Option<String> = match client
333        .cmd("GET")
334        .arg(policy_key)
335        .execute()
336        .await
337    {
338        Ok(v) => v,
339        Err(_) => return default_retention_ms,
340    };
341
342    let json_str = match policy_json {
343        Some(s) if !s.is_empty() => s,
344        _ => return default_retention_ms,
345    };
346
347    // Parse JSON to extract stream_policy.retention_ttl_ms
348    let parsed: serde_json::Value = match serde_json::from_str(&json_str) {
349        Ok(v) => v,
350        Err(_) => return default_retention_ms,
351    };
352
353    parsed
354        .get("stream_policy")
355        .and_then(|sp| sp.get("retention_ttl_ms"))
356        .and_then(|v| v.as_u64())
357        .unwrap_or(default_retention_ms)
358}
359