1use std::sync::Arc;
19use std::time::Duration;
20
21use ff_core::backend::ScannerFilter;
22use ff_core::engine_backend::EngineBackend;
23use ff_core::engine_error::EngineError;
24use ff_core::keys::IndexKeys;
25use ff_core::partition::{Partition, PartitionFamily};
26use ff_core::types::{LaneId, TimestampMs};
27
28use super::{should_skip_candidate, ScanResult, Scanner};
29
30const BATCH_SIZE: u32 = 20;
31
32const DEFAULT_RETENTION_MS: u64 = 24 * 60 * 60 * 1000;
34
35pub struct RetentionTrimmer {
36 interval: Duration,
37 lanes: Vec<LaneId>,
38 default_retention_ms: u64,
40 filter: ScannerFilter,
41 backend: Option<Arc<dyn EngineBackend>>,
42}
43
44impl RetentionTrimmer {
45 pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
46 Self::with_filter(interval, lanes, ScannerFilter::default())
47 }
48
49 pub fn with_filter(interval: Duration, lanes: Vec<LaneId>, filter: ScannerFilter) -> Self {
52 Self {
53 interval,
54 lanes,
55 default_retention_ms: DEFAULT_RETENTION_MS,
56 filter,
57 backend: None,
58 }
59 }
60
61 pub fn with_filter_and_backend(
64 interval: Duration,
65 lanes: Vec<LaneId>,
66 filter: ScannerFilter,
67 backend: Arc<dyn EngineBackend>,
68 ) -> Self {
69 Self {
70 interval,
71 lanes,
72 default_retention_ms: DEFAULT_RETENTION_MS,
73 filter,
74 backend: Some(backend),
75 }
76 }
77}
78
79impl Scanner for RetentionTrimmer {
80 fn name(&self) -> &'static str {
81 "retention_trimmer"
82 }
83
84 fn interval(&self) -> Duration {
85 self.interval
86 }
87
88 fn filter(&self) -> &ScannerFilter {
89 &self.filter
90 }
91
92 async fn scan_partition(
93 &self,
94 client: &ferriskey::Client,
95 partition: u16,
96 ) -> ScanResult {
97 let p = Partition {
98 family: PartitionFamily::Execution,
99 index: partition,
100 };
101
102 let now_ms_res: Result<u64, String> = if let Some(ref b) = self.backend {
103 b.server_time_ms().await.map_err(|e| e.to_string())
104 } else {
105 crate::scanner::lease_expiry::server_time_ms_legacy(client).await.map_err(|e| e.to_string())
106 };
107 let now_ms = match now_ms_res {
108 Ok(t) => t,
109 Err(e) => {
110 tracing::warn!(partition, error = %e, "retention_trimmer: failed to get server time");
111 return ScanResult { processed: 0, errors: 1 };
112 }
113 };
114 let now_ts = TimestampMs::from_millis(now_ms as i64);
115
116 let mut total_processed: u32 = 0;
117 let mut total_errors: u32 = 0;
118
119 for lane in &self.lanes {
120 let res = if let Some(backend) = self.backend.as_ref() {
121 backend
122 .trim_retention(
123 p,
124 lane,
125 self.default_retention_ms,
126 now_ts,
127 BATCH_SIZE,
128 &self.filter,
129 )
130 .await
131 .map_err(|e: EngineError| e.to_string())
132 } else {
133 trim_retention_direct_fallback(
141 client,
142 self.backend.as_ref(),
143 &self.filter,
144 &p,
145 lane,
146 self.default_retention_ms,
147 now_ms,
148 BATCH_SIZE,
149 )
150 .await
151 .map_err(|e| e.to_string())
152 };
153
154 match res {
155 Ok(n) => total_processed += n,
156 Err(e) => {
157 tracing::warn!(
158 partition,
159 lane = lane.as_str(),
160 error = %e,
161 "retention_trimmer: trim_retention failed"
162 );
163 total_errors += 1;
164 }
165 }
166 }
167
168 ScanResult { processed: total_processed, errors: total_errors }
169 }
170}
171
172#[allow(clippy::too_many_arguments)]
176async fn trim_retention_direct_fallback(
177 client: &ferriskey::Client,
178 backend: Option<&Arc<dyn EngineBackend>>,
179 filter: &ScannerFilter,
180 partition: &Partition,
181 lane: &LaneId,
182 default_retention_ms: u64,
183 now_ms: u64,
184 batch_size: u32,
185) -> Result<u32, ferriskey::Error> {
186 let idx = IndexKeys::new(partition);
187 let terminal_key = idx.lane_terminal(lane);
188 let cutoff = now_ms.saturating_sub(default_retention_ms);
189
190 let expired: Vec<String> = client
191 .cmd("ZRANGEBYSCORE")
192 .arg(&terminal_key)
193 .arg("-inf")
194 .arg(cutoff.to_string().as_str())
195 .arg("LIMIT")
196 .arg("0")
197 .arg(batch_size.to_string().as_str())
198 .execute()
199 .await?;
200
201 if expired.is_empty() {
202 return Ok(0);
203 }
204
205 let mut processed = 0u32;
206 for eid_str in &expired {
207 if should_skip_candidate(backend, filter, partition.index, eid_str).await {
208 continue;
209 }
210 if purge_execution_fallback(
211 client,
212 partition,
213 &idx,
214 eid_str,
215 &terminal_key,
216 now_ms,
217 default_retention_ms,
218 )
219 .await?
220 {
221 processed += 1;
222 }
223 }
224 Ok(processed)
225}
226
227#[allow(clippy::too_many_arguments)]
228async fn purge_execution_fallback(
229 client: &ferriskey::Client,
230 partition: &Partition,
231 idx: &IndexKeys,
232 eid_str: &str,
233 terminal_key: &str,
234 now_ms: u64,
235 default_retention_ms: u64,
236) -> Result<bool, ferriskey::Error> {
237 let tag = partition.hash_tag();
238 let exec_core_key = format!("ff:exec:{}:{}:core", tag, eid_str);
239
240 let fields: Vec<Option<String>> = client
241 .cmd("HMGET")
242 .arg(&exec_core_key)
243 .arg("completed_at")
244 .arg("total_attempt_count")
245 .execute()
246 .await?;
247
248 let completed_at: u64 = fields.first()
249 .and_then(|v| v.as_ref())
250 .and_then(|s| s.parse().ok())
251 .unwrap_or(0);
252 let total_attempts: u32 = fields.get(1)
253 .and_then(|v| v.as_ref())
254 .and_then(|s| s.parse().ok())
255 .unwrap_or(0);
256
257 if completed_at == 0 {
258 let _: u32 = client.cmd("ZREM").arg(terminal_key).arg(eid_str).execute().await?;
259 return Ok(true);
260 }
261
262 let policy_key = format!("ff:exec:{}:{}:policy", tag, eid_str);
263 let retention_ms = read_retention_ms_fallback(client, &policy_key, default_retention_ms).await;
264 if now_ms < completed_at + retention_ms {
265 return Ok(false);
266 }
267
268 let mut del_keys: Vec<String> = Vec::with_capacity(16 + total_attempts as usize * 5);
269 del_keys.push(format!("ff:exec:{}:{}:payload", tag, eid_str));
270 del_keys.push(format!("ff:exec:{}:{}:result", tag, eid_str));
271 del_keys.push(format!("ff:exec:{}:{}:tags", tag, eid_str));
272 del_keys.push(format!("ff:exec:{}:{}:lease:current", tag, eid_str));
273 del_keys.push(format!("ff:exec:{}:{}:lease:history", tag, eid_str));
274 del_keys.push(format!("ff:exec:{}:{}:claim_grant", tag, eid_str));
275 del_keys.push(format!("ff:exec:{}:{}:attempts", tag, eid_str));
276 for i in 0..total_attempts {
277 del_keys.push(format!("ff:attempt:{}:{}:{}", tag, eid_str, i));
278 del_keys.push(format!("ff:attempt:{}:{}:{}:usage", tag, eid_str, i));
279 del_keys.push(format!("ff:attempt:{}:{}:{}:policy", tag, eid_str, i));
280 del_keys.push(format!("ff:stream:{}:{}:{}", tag, eid_str, i));
281 del_keys.push(format!("ff:stream:{}:{}:{}:meta", tag, eid_str, i));
282 }
283 del_keys.push(format!("ff:exec:{}:{}:suspension:current", tag, eid_str));
284
285 let deps_all_edges_key = format!("ff:exec:{}:{}:deps:all_edges", tag, eid_str);
286 let dep_edge_ids: Vec<String> = client
287 .cmd("SMEMBERS")
288 .arg(&deps_all_edges_key)
289 .execute()
290 .await
291 .unwrap_or_default();
292 del_keys.push(format!("ff:exec:{}:{}:deps:meta", tag, eid_str));
293 del_keys.push(format!("ff:exec:{}:{}:deps:unresolved", tag, eid_str));
294 del_keys.push(deps_all_edges_key);
295 for edge_id in &dep_edge_ids {
296 del_keys.push(format!("ff:exec:{}:{}:dep:{}", tag, eid_str, edge_id));
297 }
298
299 let waitpoints_key = format!("ff:exec:{}:{}:waitpoints", tag, eid_str);
300 let wp_ids: Vec<String> = client
301 .cmd("SMEMBERS")
302 .arg(&waitpoints_key)
303 .execute()
304 .await
305 .unwrap_or_default();
306 del_keys.push(waitpoints_key);
307 for wp_id_str in &wp_ids {
308 del_keys.push(format!("ff:wp:{}:{}", tag, wp_id_str));
309 del_keys.push(format!("ff:wp:{}:{}:signals", tag, wp_id_str));
310 del_keys.push(format!("ff:wp:{}:{}:condition", tag, wp_id_str));
311 }
312
313 let signal_key = format!("ff:exec:{}:{}:signals", tag, eid_str);
314 let sig_ids: Vec<String> = client
315 .cmd("ZRANGE")
316 .arg(&signal_key)
317 .arg("0")
318 .arg("-1")
319 .execute()
320 .await
321 .unwrap_or_default();
322 del_keys.push(signal_key);
323 for sig_id_str in &sig_ids {
324 del_keys.push(format!("ff:signal:{}:{}", tag, sig_id_str));
325 del_keys.push(format!("ff:signal:{}:{}:payload", tag, sig_id_str));
326 }
327
328 for chunk in del_keys.chunks(500) {
329 let key_refs: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
330 let _: u32 = client.cmd("DEL").arg(&key_refs).execute().await?;
331 }
332
333 let _: u32 = client
334 .cmd("DEL")
335 .arg(&[exec_core_key.as_str(), policy_key.as_str()][..])
336 .execute()
337 .await?;
338
339 let _: u32 = client.cmd("ZREM").arg(terminal_key).arg(eid_str).execute().await?;
340 let all_exec_key = idx.all_executions();
341 let _: u32 = client.cmd("SREM").arg(&all_exec_key).arg(eid_str).execute().await?;
342
343 Ok(true)
344}
345
346async fn read_retention_ms_fallback(
347 client: &ferriskey::Client,
348 policy_key: &str,
349 default_retention_ms: u64,
350) -> u64 {
351 let policy_json: Option<String> = match client.cmd("GET").arg(policy_key).execute().await {
352 Ok(v) => v,
353 Err(_) => return default_retention_ms,
354 };
355 let json_str = match policy_json {
356 Some(s) if !s.is_empty() => s,
357 _ => return default_retention_ms,
358 };
359 let parsed: serde_json::Value = match serde_json::from_str(&json_str) {
360 Ok(v) => v,
361 Err(_) => return default_retention_ms,
362 };
363 parsed
364 .get("stream_policy")
365 .and_then(|sp| sp.get("retention_ttl_ms"))
366 .and_then(|v| v.as_u64())
367 .unwrap_or(default_retention_ms)
368}