Skip to main content

ff_engine/scanner/
retention_trimmer.rs

1//! Terminal execution retention scanner.
2//!
3//! Scans `ff:idx:{p:N}:lane:<lane>:terminal` for each partition+lane,
4//! finding terminal executions older than the configured retention
5//! period. For each batch the per-execution cascade-delete runs
6//! through [`EngineBackend::trim_retention`] (Valkey lifts the pre-
7//! PR-7b direct-client path; Postgres cascades DELETEs across every
8//! sibling table inside one transaction; SQLite is `Unavailable` per
9//! RFC-023 Phase 3.5).
10//!
11//! Retention trimming is inherently a scan-and-delete loop over time.
12//! The trait exists to remove engine-side Valkey coupling, NOT to
13//! atomise the operation into a single round-trip — implementations
14//! may still issue multiple round-trips per batch.
15//!
16//! Reference: RFC-010 §6.12
17
18use std::sync::Arc;
19use std::time::Duration;
20
21use ff_core::backend::ScannerFilter;
22use ff_core::engine_backend::EngineBackend;
23use ff_core::engine_error::EngineError;
24use ff_core::keys::IndexKeys;
25use ff_core::partition::{Partition, PartitionFamily};
26use ff_core::types::{LaneId, TimestampMs};
27
28use super::{should_skip_candidate, ScanResult, Scanner};
29
30const BATCH_SIZE: u32 = 20;
31
32/// Default retention period: 24 hours.
33const DEFAULT_RETENTION_MS: u64 = 24 * 60 * 60 * 1000;
34
35pub struct RetentionTrimmer {
36    interval: Duration,
37    lanes: Vec<LaneId>,
38    /// Default retention period in ms (used when execution has no policy).
39    default_retention_ms: u64,
40    filter: ScannerFilter,
41    backend: Option<Arc<dyn EngineBackend>>,
42}
43
44impl RetentionTrimmer {
45    pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
46        Self::with_filter(interval, lanes, ScannerFilter::default())
47    }
48
49    /// Construct with a [`ScannerFilter`] applied per candidate
50    /// (issue #122).
51    pub fn with_filter(interval: Duration, lanes: Vec<LaneId>, filter: ScannerFilter) -> Self {
52        Self {
53            interval,
54            lanes,
55            default_retention_ms: DEFAULT_RETENTION_MS,
56            filter,
57            backend: None,
58        }
59    }
60
61    /// PR-7b Cluster 2b-B: wire an `EngineBackend` so the per-batch
62    /// retention cascade runs through the trait.
63    pub fn with_filter_and_backend(
64        interval: Duration,
65        lanes: Vec<LaneId>,
66        filter: ScannerFilter,
67        backend: Arc<dyn EngineBackend>,
68    ) -> Self {
69        Self {
70            interval,
71            lanes,
72            default_retention_ms: DEFAULT_RETENTION_MS,
73            filter,
74            backend: Some(backend),
75        }
76    }
77}
78
79impl Scanner for RetentionTrimmer {
80    fn name(&self) -> &'static str {
81        "retention_trimmer"
82    }
83
84    fn interval(&self) -> Duration {
85        self.interval
86    }
87
88    fn filter(&self) -> &ScannerFilter {
89        &self.filter
90    }
91
92    async fn scan_partition(
93        &self,
94        client: &ferriskey::Client,
95        partition: u16,
96    ) -> ScanResult {
97        let p = Partition {
98            family: PartitionFamily::Execution,
99            index: partition,
100        };
101
102        let now_ms_res: Result<u64, String> = if let Some(ref b) = self.backend {
103            b.server_time_ms().await.map_err(|e| e.to_string())
104        } else {
105            crate::scanner::lease_expiry::server_time_ms_legacy(client).await.map_err(|e| e.to_string())
106        };
107        let now_ms = match now_ms_res {
108            Ok(t) => t,
109            Err(e) => {
110                tracing::warn!(partition, error = %e, "retention_trimmer: failed to get server time");
111                return ScanResult { processed: 0, errors: 1 };
112            }
113        };
114        let now_ts = TimestampMs::from_millis(now_ms as i64);
115
116        let mut total_processed: u32 = 0;
117        let mut total_errors: u32 = 0;
118
119        for lane in &self.lanes {
120            let res = if let Some(backend) = self.backend.as_ref() {
121                backend
122                    .trim_retention(
123                        p,
124                        lane,
125                        self.default_retention_ms,
126                        now_ts,
127                        BATCH_SIZE,
128                        &self.filter,
129                    )
130                    .await
131                    .map_err(|e: EngineError| e.to_string())
132            } else {
133                // Test-only fallback when the scanner is instantiated
134                // without a backend. Mirrors the pre-PR-7b ZRANGEBYSCORE
135                // + per-exec cascade loop behaviour for test fixtures.
136                // Filter is honoured via `should_skip_candidate` in the
137                // per-eid loop; with no backend plumbed a non-noop filter
138                // skips everything (conservative), matching the trait-
139                // routed semantic.
140                trim_retention_direct_fallback(
141                    client,
142                    self.backend.as_ref(),
143                    &self.filter,
144                    &p,
145                    lane,
146                    self.default_retention_ms,
147                    now_ms,
148                    BATCH_SIZE,
149                )
150                .await
151                .map_err(|e| e.to_string())
152            };
153
154            match res {
155                Ok(n) => total_processed += n,
156                Err(e) => {
157                    tracing::warn!(
158                        partition,
159                        lane = lane.as_str(),
160                        error = %e,
161                        "retention_trimmer: trim_retention failed"
162                    );
163                    total_errors += 1;
164                }
165            }
166        }
167
168        ScanResult { processed: total_processed, errors: total_errors }
169    }
170}
171
172/// Test-only direct-client retention loop for scanner instantiations
173/// without an `EngineBackend`. Returns the number of executions purged
174/// in this call.
175#[allow(clippy::too_many_arguments)]
176async fn trim_retention_direct_fallback(
177    client: &ferriskey::Client,
178    backend: Option<&Arc<dyn EngineBackend>>,
179    filter: &ScannerFilter,
180    partition: &Partition,
181    lane: &LaneId,
182    default_retention_ms: u64,
183    now_ms: u64,
184    batch_size: u32,
185) -> Result<u32, ferriskey::Error> {
186    let idx = IndexKeys::new(partition);
187    let terminal_key = idx.lane_terminal(lane);
188    let cutoff = now_ms.saturating_sub(default_retention_ms);
189
190    let expired: Vec<String> = client
191        .cmd("ZRANGEBYSCORE")
192        .arg(&terminal_key)
193        .arg("-inf")
194        .arg(cutoff.to_string().as_str())
195        .arg("LIMIT")
196        .arg("0")
197        .arg(batch_size.to_string().as_str())
198        .execute()
199        .await?;
200
201    if expired.is_empty() {
202        return Ok(0);
203    }
204
205    let mut processed = 0u32;
206    for eid_str in &expired {
207        if should_skip_candidate(backend, filter, partition.index, eid_str).await {
208            continue;
209        }
210        if purge_execution_fallback(
211            client,
212            partition,
213            &idx,
214            eid_str,
215            &terminal_key,
216            now_ms,
217            default_retention_ms,
218        )
219        .await?
220        {
221            processed += 1;
222        }
223    }
224    Ok(processed)
225}
226
227#[allow(clippy::too_many_arguments)]
228async fn purge_execution_fallback(
229    client: &ferriskey::Client,
230    partition: &Partition,
231    idx: &IndexKeys,
232    eid_str: &str,
233    terminal_key: &str,
234    now_ms: u64,
235    default_retention_ms: u64,
236) -> Result<bool, ferriskey::Error> {
237    let tag = partition.hash_tag();
238    let exec_core_key = format!("ff:exec:{}:{}:core", tag, eid_str);
239
240    let fields: Vec<Option<String>> = client
241        .cmd("HMGET")
242        .arg(&exec_core_key)
243        .arg("completed_at")
244        .arg("total_attempt_count")
245        .execute()
246        .await?;
247
248    let completed_at: u64 = fields.first()
249        .and_then(|v| v.as_ref())
250        .and_then(|s| s.parse().ok())
251        .unwrap_or(0);
252    let total_attempts: u32 = fields.get(1)
253        .and_then(|v| v.as_ref())
254        .and_then(|s| s.parse().ok())
255        .unwrap_or(0);
256
257    if completed_at == 0 {
258        let _: u32 = client.cmd("ZREM").arg(terminal_key).arg(eid_str).execute().await?;
259        return Ok(true);
260    }
261
262    let policy_key = format!("ff:exec:{}:{}:policy", tag, eid_str);
263    let retention_ms = read_retention_ms_fallback(client, &policy_key, default_retention_ms).await;
264    if now_ms < completed_at + retention_ms {
265        return Ok(false);
266    }
267
268    let mut del_keys: Vec<String> = Vec::with_capacity(16 + total_attempts as usize * 5);
269    del_keys.push(format!("ff:exec:{}:{}:payload", tag, eid_str));
270    del_keys.push(format!("ff:exec:{}:{}:result", tag, eid_str));
271    del_keys.push(format!("ff:exec:{}:{}:tags", tag, eid_str));
272    del_keys.push(format!("ff:exec:{}:{}:lease:current", tag, eid_str));
273    del_keys.push(format!("ff:exec:{}:{}:lease:history", tag, eid_str));
274    del_keys.push(format!("ff:exec:{}:{}:claim_grant", tag, eid_str));
275    del_keys.push(format!("ff:exec:{}:{}:attempts", tag, eid_str));
276    for i in 0..total_attempts {
277        del_keys.push(format!("ff:attempt:{}:{}:{}", tag, eid_str, i));
278        del_keys.push(format!("ff:attempt:{}:{}:{}:usage", tag, eid_str, i));
279        del_keys.push(format!("ff:attempt:{}:{}:{}:policy", tag, eid_str, i));
280        del_keys.push(format!("ff:stream:{}:{}:{}", tag, eid_str, i));
281        del_keys.push(format!("ff:stream:{}:{}:{}:meta", tag, eid_str, i));
282    }
283    del_keys.push(format!("ff:exec:{}:{}:suspension:current", tag, eid_str));
284
285    let deps_all_edges_key = format!("ff:exec:{}:{}:deps:all_edges", tag, eid_str);
286    let dep_edge_ids: Vec<String> = client
287        .cmd("SMEMBERS")
288        .arg(&deps_all_edges_key)
289        .execute()
290        .await
291        .unwrap_or_default();
292    del_keys.push(format!("ff:exec:{}:{}:deps:meta", tag, eid_str));
293    del_keys.push(format!("ff:exec:{}:{}:deps:unresolved", tag, eid_str));
294    del_keys.push(deps_all_edges_key);
295    for edge_id in &dep_edge_ids {
296        del_keys.push(format!("ff:exec:{}:{}:dep:{}", tag, eid_str, edge_id));
297    }
298
299    let waitpoints_key = format!("ff:exec:{}:{}:waitpoints", tag, eid_str);
300    let wp_ids: Vec<String> = client
301        .cmd("SMEMBERS")
302        .arg(&waitpoints_key)
303        .execute()
304        .await
305        .unwrap_or_default();
306    del_keys.push(waitpoints_key);
307    for wp_id_str in &wp_ids {
308        del_keys.push(format!("ff:wp:{}:{}", tag, wp_id_str));
309        del_keys.push(format!("ff:wp:{}:{}:signals", tag, wp_id_str));
310        del_keys.push(format!("ff:wp:{}:{}:condition", tag, wp_id_str));
311    }
312
313    let signal_key = format!("ff:exec:{}:{}:signals", tag, eid_str);
314    let sig_ids: Vec<String> = client
315        .cmd("ZRANGE")
316        .arg(&signal_key)
317        .arg("0")
318        .arg("-1")
319        .execute()
320        .await
321        .unwrap_or_default();
322    del_keys.push(signal_key);
323    for sig_id_str in &sig_ids {
324        del_keys.push(format!("ff:signal:{}:{}", tag, sig_id_str));
325        del_keys.push(format!("ff:signal:{}:{}:payload", tag, sig_id_str));
326    }
327
328    for chunk in del_keys.chunks(500) {
329        let key_refs: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
330        let _: u32 = client.cmd("DEL").arg(&key_refs).execute().await?;
331    }
332
333    let _: u32 = client
334        .cmd("DEL")
335        .arg(&[exec_core_key.as_str(), policy_key.as_str()][..])
336        .execute()
337        .await?;
338
339    let _: u32 = client.cmd("ZREM").arg(terminal_key).arg(eid_str).execute().await?;
340    let all_exec_key = idx.all_executions();
341    let _: u32 = client.cmd("SREM").arg(&all_exec_key).arg(eid_str).execute().await?;
342
343    Ok(true)
344}
345
346async fn read_retention_ms_fallback(
347    client: &ferriskey::Client,
348    policy_key: &str,
349    default_retention_ms: u64,
350) -> u64 {
351    let policy_json: Option<String> = match client.cmd("GET").arg(policy_key).execute().await {
352        Ok(v) => v,
353        Err(_) => return default_retention_ms,
354    };
355    let json_str = match policy_json {
356        Some(s) if !s.is_empty() => s,
357        _ => return default_retention_ms,
358    };
359    let parsed: serde_json::Value = match serde_json::from_str(&json_str) {
360        Ok(v) => v,
361        Err(_) => return default_retention_ms,
362    };
363    parsed
364        .get("stream_policy")
365        .and_then(|sp| sp.get("retention_ttl_ms"))
366        .and_then(|v| v.as_u64())
367        .unwrap_or(default_retention_ms)
368}