1use std::time::Duration;
15
16use ff_core::backend::ScannerFilter;
17use ff_core::keys::IndexKeys;
18use ff_core::partition::{Partition, PartitionFamily};
19use ff_core::types::LaneId;
20
21use super::{should_skip_candidate, ScanResult, Scanner};
22
23const BATCH_SIZE: u32 = 20;
24
25const DEFAULT_RETENTION_MS: u64 = 24 * 60 * 60 * 1000;
27
28pub struct RetentionTrimmer {
29 interval: Duration,
30 lanes: Vec<LaneId>,
31 default_retention_ms: u64,
33 filter: ScannerFilter,
34}
35
36impl RetentionTrimmer {
37 pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
38 Self::with_filter(interval, lanes, ScannerFilter::default())
39 }
40
41 pub fn with_filter(interval: Duration, lanes: Vec<LaneId>, filter: ScannerFilter) -> Self {
44 Self {
45 interval,
46 lanes,
47 default_retention_ms: DEFAULT_RETENTION_MS,
48 filter,
49 }
50 }
51}
52
53impl Scanner for RetentionTrimmer {
54 fn name(&self) -> &'static str {
55 "retention_trimmer"
56 }
57
58 fn interval(&self) -> Duration {
59 self.interval
60 }
61
62 fn filter(&self) -> &ScannerFilter {
63 &self.filter
64 }
65
66 async fn scan_partition(
67 &self,
68 client: &ferriskey::Client,
69 partition: u16,
70 ) -> ScanResult {
71 let p = Partition {
72 family: PartitionFamily::Execution,
73 index: partition,
74 };
75 let idx = IndexKeys::new(&p);
76
77 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
78 Ok(t) => t,
79 Err(e) => {
80 tracing::warn!(partition, error = %e, "retention_trimmer: failed to get server time");
81 return ScanResult { processed: 0, errors: 1 };
82 }
83 };
84
85 let mut total_processed: u32 = 0;
86 let mut total_errors: u32 = 0;
87
88 for lane in &self.lanes {
89 let terminal_key = idx.lane_terminal(lane);
90
91 let cutoff = now_ms.saturating_sub(self.default_retention_ms);
95
96 let expired: Vec<String> = match client
97 .cmd("ZRANGEBYSCORE")
98 .arg(&terminal_key)
99 .arg("-inf")
100 .arg(cutoff.to_string().as_str())
101 .arg("LIMIT")
102 .arg("0")
103 .arg(BATCH_SIZE.to_string().as_str())
104 .execute()
105 .await
106 {
107 Ok(ids) => ids,
108 Err(e) => {
109 tracing::warn!(
110 partition, lane = lane.as_str(), error = %e,
111 "retention_trimmer: ZRANGEBYSCORE failed"
112 );
113 total_errors += 1;
114 continue;
115 }
116 };
117
118 if expired.is_empty() {
119 continue;
120 }
121
122 for eid_str in &expired {
123 if should_skip_candidate(client, &self.filter, partition, eid_str).await {
124 continue;
125 }
126 match purge_execution(
127 client, &p, &idx, lane, eid_str, &terminal_key, now_ms,
128 self.default_retention_ms,
129 ).await {
130 Ok(true) => total_processed += 1,
131 Ok(false) => {} Err(e) => {
133 tracing::warn!(
134 partition,
135 execution_id = eid_str.as_str(),
136 error = %e,
137 "retention_trimmer: purge failed"
138 );
139 total_errors += 1;
140 }
141 }
142 }
143 }
144
145 ScanResult { processed: total_processed, errors: total_errors }
146 }
147}
148
149#[allow(clippy::too_many_arguments)]
152async fn purge_execution(
153 client: &ferriskey::Client,
154 partition: &Partition,
155 idx: &IndexKeys,
156 _lane: &LaneId,
157 eid_str: &str,
158 terminal_key: &str,
159 now_ms: u64,
160 default_retention_ms: u64,
161) -> Result<bool, ferriskey::Error> {
162 let tag = partition.hash_tag();
163 let exec_core_key = format!("ff:exec:{}:{}:core", tag, eid_str);
164
165 let fields: Vec<Option<String>> = client
167 .cmd("HMGET")
168 .arg(&exec_core_key)
169 .arg("completed_at")
170 .arg("total_attempt_count")
171 .execute()
172 .await?;
173
174 let completed_at: u64 = fields.first()
175 .and_then(|v| v.as_ref())
176 .and_then(|s| s.parse().ok())
177 .unwrap_or(0);
178 let total_attempts: u32 = fields.get(1)
179 .and_then(|v| v.as_ref())
180 .and_then(|s| s.parse().ok())
181 .unwrap_or(0);
182
183 if completed_at == 0 {
184 let _: u32 = client.cmd("ZREM").arg(terminal_key).arg(eid_str).execute().await?;
186 return Ok(true);
187 }
188
189 let policy_key = format!("ff:exec:{}:{}:policy", tag, eid_str);
191 let retention_ms = read_retention_ms(client, &policy_key, default_retention_ms).await;
192
193 if now_ms < completed_at + retention_ms {
194 return Ok(false); }
196
197 let mut del_keys: Vec<String> = Vec::with_capacity(16 + total_attempts as usize * 5);
207
208 del_keys.push(format!("ff:exec:{}:{}:payload", tag, eid_str));
210 del_keys.push(format!("ff:exec:{}:{}:result", tag, eid_str));
211 del_keys.push(format!("ff:exec:{}:{}:tags", tag, eid_str));
212
213 del_keys.push(format!("ff:exec:{}:{}:lease:current", tag, eid_str));
215 del_keys.push(format!("ff:exec:{}:{}:lease:history", tag, eid_str));
216 del_keys.push(format!("ff:exec:{}:{}:claim_grant", tag, eid_str));
217
218 del_keys.push(format!("ff:exec:{}:{}:attempts", tag, eid_str));
220 for i in 0..total_attempts {
221 del_keys.push(format!("ff:attempt:{}:{}:{}", tag, eid_str, i));
222 del_keys.push(format!("ff:attempt:{}:{}:{}:usage", tag, eid_str, i));
223 del_keys.push(format!("ff:attempt:{}:{}:{}:policy", tag, eid_str, i));
224 del_keys.push(format!("ff:stream:{}:{}:{}", tag, eid_str, i));
225 del_keys.push(format!("ff:stream:{}:{}:{}:meta", tag, eid_str, i));
226 }
227
228 del_keys.push(format!("ff:exec:{}:{}:suspension:current", tag, eid_str));
230
231 let deps_all_edges_key = format!("ff:exec:{}:{}:deps:all_edges", tag, eid_str);
236 let dep_edge_ids: Vec<String> = client
237 .cmd("SMEMBERS")
238 .arg(&deps_all_edges_key)
239 .execute()
240 .await
241 .unwrap_or_default();
242
243 del_keys.push(format!("ff:exec:{}:{}:deps:meta", tag, eid_str));
244 del_keys.push(format!("ff:exec:{}:{}:deps:unresolved", tag, eid_str));
245 del_keys.push(deps_all_edges_key);
246 for edge_id in &dep_edge_ids {
247 del_keys.push(format!("ff:exec:{}:{}:dep:{}", tag, eid_str, edge_id));
248 }
249
250 let waitpoints_key = format!("ff:exec:{}:{}:waitpoints", tag, eid_str);
252 let wp_ids: Vec<String> = client
253 .cmd("SMEMBERS")
254 .arg(&waitpoints_key)
255 .execute()
256 .await
257 .unwrap_or_default();
258
259 del_keys.push(waitpoints_key);
260
261 for wp_id_str in &wp_ids {
262 del_keys.push(format!("ff:wp:{}:{}", tag, wp_id_str));
263 del_keys.push(format!("ff:wp:{}:{}:signals", tag, wp_id_str));
264 del_keys.push(format!("ff:wp:{}:{}:condition", tag, wp_id_str));
265 }
266
267 let signal_key = format!("ff:exec:{}:{}:signals", tag, eid_str);
269 let sig_ids: Vec<String> = client
270 .cmd("ZRANGE")
271 .arg(&signal_key)
272 .arg("0")
273 .arg("-1")
274 .execute()
275 .await
276 .unwrap_or_default();
277
278 del_keys.push(signal_key);
279
280 for sig_id_str in &sig_ids {
281 del_keys.push(format!("ff:signal:{}:{}", tag, sig_id_str));
282 del_keys.push(format!("ff:signal:{}:{}:payload", tag, sig_id_str));
283 }
284
285 for chunk in del_keys.chunks(500) {
291 let key_refs: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
292 let _: u32 = client
293 .cmd("DEL")
294 .arg(&key_refs)
295 .execute()
296 .await?;
297 }
298
299 let _: u32 = client
305 .cmd("DEL")
306 .arg(&[exec_core_key.as_str(), policy_key.as_str()][..])
307 .execute()
308 .await?;
309
310 let _: u32 = client.cmd("ZREM").arg(terminal_key).arg(eid_str).execute().await?;
311 let all_exec_key = idx.all_executions();
312 let _: u32 = client.cmd("SREM").arg(&all_exec_key).arg(eid_str).execute().await?;
313
314 tracing::debug!(
315 execution_id = eid_str,
316 attempts = total_attempts,
317 waitpoints = wp_ids.len(),
318 signals = sig_ids.len(),
319 "retention_trimmer: purged execution"
320 );
321
322 Ok(true)
323}
324
325async fn read_retention_ms(
328 client: &ferriskey::Client,
329 policy_key: &str,
330 default_retention_ms: u64,
331) -> u64 {
332 let policy_json: Option<String> = match client
333 .cmd("GET")
334 .arg(policy_key)
335 .execute()
336 .await
337 {
338 Ok(v) => v,
339 Err(_) => return default_retention_ms,
340 };
341
342 let json_str = match policy_json {
343 Some(s) if !s.is_empty() => s,
344 _ => return default_retention_ms,
345 };
346
347 let parsed: serde_json::Value = match serde_json::from_str(&json_str) {
349 Ok(v) => v,
350 Err(_) => return default_retention_ms,
351 };
352
353 parsed
354 .get("stream_policy")
355 .and_then(|sp| sp.get("retention_ttl_ms"))
356 .and_then(|v| v.as_u64())
357 .unwrap_or(default_retention_ms)
358}
359