1use std::time::Duration;
15
16use ff_core::keys::IndexKeys;
17use ff_core::partition::{Partition, PartitionFamily};
18use ff_core::types::LaneId;
19
20use super::{ScanResult, Scanner};
21
22const BATCH_SIZE: u32 = 20;
23
24const DEFAULT_RETENTION_MS: u64 = 24 * 60 * 60 * 1000;
26
27pub struct RetentionTrimmer {
28 interval: Duration,
29 lanes: Vec<LaneId>,
30 default_retention_ms: u64,
32}
33
34impl RetentionTrimmer {
35 pub fn new(interval: Duration, lanes: Vec<LaneId>) -> Self {
36 Self {
37 interval,
38 lanes,
39 default_retention_ms: DEFAULT_RETENTION_MS,
40 }
41 }
42}
43
44impl Scanner for RetentionTrimmer {
45 fn name(&self) -> &'static str {
46 "retention_trimmer"
47 }
48
49 fn interval(&self) -> Duration {
50 self.interval
51 }
52
53 async fn scan_partition(
54 &self,
55 client: &ferriskey::Client,
56 partition: u16,
57 ) -> ScanResult {
58 let p = Partition {
59 family: PartitionFamily::Execution,
60 index: partition,
61 };
62 let idx = IndexKeys::new(&p);
63
64 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
65 Ok(t) => t,
66 Err(e) => {
67 tracing::warn!(partition, error = %e, "retention_trimmer: failed to get server time");
68 return ScanResult { processed: 0, errors: 1 };
69 }
70 };
71
72 let mut total_processed: u32 = 0;
73 let mut total_errors: u32 = 0;
74
75 for lane in &self.lanes {
76 let terminal_key = idx.lane_terminal(lane);
77
78 let cutoff = now_ms.saturating_sub(self.default_retention_ms);
82
83 let expired: Vec<String> = match client
84 .cmd("ZRANGEBYSCORE")
85 .arg(&terminal_key)
86 .arg("-inf")
87 .arg(cutoff.to_string().as_str())
88 .arg("LIMIT")
89 .arg("0")
90 .arg(BATCH_SIZE.to_string().as_str())
91 .execute()
92 .await
93 {
94 Ok(ids) => ids,
95 Err(e) => {
96 tracing::warn!(
97 partition, lane = lane.as_str(), error = %e,
98 "retention_trimmer: ZRANGEBYSCORE failed"
99 );
100 total_errors += 1;
101 continue;
102 }
103 };
104
105 if expired.is_empty() {
106 continue;
107 }
108
109 for eid_str in &expired {
110 match purge_execution(
111 client, &p, &idx, lane, eid_str, &terminal_key, now_ms,
112 self.default_retention_ms,
113 ).await {
114 Ok(true) => total_processed += 1,
115 Ok(false) => {} Err(e) => {
117 tracing::warn!(
118 partition,
119 execution_id = eid_str.as_str(),
120 error = %e,
121 "retention_trimmer: purge failed"
122 );
123 total_errors += 1;
124 }
125 }
126 }
127 }
128
129 ScanResult { processed: total_processed, errors: total_errors }
130 }
131}
132
133#[allow(clippy::too_many_arguments)]
136async fn purge_execution(
137 client: &ferriskey::Client,
138 partition: &Partition,
139 idx: &IndexKeys,
140 _lane: &LaneId,
141 eid_str: &str,
142 terminal_key: &str,
143 now_ms: u64,
144 default_retention_ms: u64,
145) -> Result<bool, ferriskey::Error> {
146 let tag = partition.hash_tag();
147 let exec_core_key = format!("ff:exec:{}:{}:core", tag, eid_str);
148
149 let fields: Vec<Option<String>> = client
151 .cmd("HMGET")
152 .arg(&exec_core_key)
153 .arg("completed_at")
154 .arg("total_attempt_count")
155 .execute()
156 .await?;
157
158 let completed_at: u64 = fields.first()
159 .and_then(|v| v.as_ref())
160 .and_then(|s| s.parse().ok())
161 .unwrap_or(0);
162 let total_attempts: u32 = fields.get(1)
163 .and_then(|v| v.as_ref())
164 .and_then(|s| s.parse().ok())
165 .unwrap_or(0);
166
167 if completed_at == 0 {
168 let _: u32 = client.cmd("ZREM").arg(terminal_key).arg(eid_str).execute().await?;
170 return Ok(true);
171 }
172
173 let policy_key = format!("ff:exec:{}:{}:policy", tag, eid_str);
175 let retention_ms = read_retention_ms(client, &policy_key, default_retention_ms).await;
176
177 if now_ms < completed_at + retention_ms {
178 return Ok(false); }
180
181 let mut del_keys: Vec<String> = Vec::with_capacity(16 + total_attempts as usize * 5);
191
192 del_keys.push(format!("ff:exec:{}:{}:payload", tag, eid_str));
194 del_keys.push(format!("ff:exec:{}:{}:result", tag, eid_str));
195 del_keys.push(format!("ff:exec:{}:{}:tags", tag, eid_str));
196
197 del_keys.push(format!("ff:exec:{}:{}:lease:current", tag, eid_str));
199 del_keys.push(format!("ff:exec:{}:{}:lease:history", tag, eid_str));
200 del_keys.push(format!("ff:exec:{}:{}:claim_grant", tag, eid_str));
201
202 del_keys.push(format!("ff:exec:{}:{}:attempts", tag, eid_str));
204 for i in 0..total_attempts {
205 del_keys.push(format!("ff:attempt:{}:{}:{}", tag, eid_str, i));
206 del_keys.push(format!("ff:attempt:{}:{}:{}:usage", tag, eid_str, i));
207 del_keys.push(format!("ff:attempt:{}:{}:{}:policy", tag, eid_str, i));
208 del_keys.push(format!("ff:stream:{}:{}:{}", tag, eid_str, i));
209 del_keys.push(format!("ff:stream:{}:{}:{}:meta", tag, eid_str, i));
210 }
211
212 del_keys.push(format!("ff:exec:{}:{}:suspension:current", tag, eid_str));
214
215 let deps_all_edges_key = format!("ff:exec:{}:{}:deps:all_edges", tag, eid_str);
220 let dep_edge_ids: Vec<String> = client
221 .cmd("SMEMBERS")
222 .arg(&deps_all_edges_key)
223 .execute()
224 .await
225 .unwrap_or_default();
226
227 del_keys.push(format!("ff:exec:{}:{}:deps:meta", tag, eid_str));
228 del_keys.push(format!("ff:exec:{}:{}:deps:unresolved", tag, eid_str));
229 del_keys.push(deps_all_edges_key);
230 for edge_id in &dep_edge_ids {
231 del_keys.push(format!("ff:exec:{}:{}:dep:{}", tag, eid_str, edge_id));
232 }
233
234 let waitpoints_key = format!("ff:exec:{}:{}:waitpoints", tag, eid_str);
236 let wp_ids: Vec<String> = client
237 .cmd("SMEMBERS")
238 .arg(&waitpoints_key)
239 .execute()
240 .await
241 .unwrap_or_default();
242
243 del_keys.push(waitpoints_key);
244
245 for wp_id_str in &wp_ids {
246 del_keys.push(format!("ff:wp:{}:{}", tag, wp_id_str));
247 del_keys.push(format!("ff:wp:{}:{}:signals", tag, wp_id_str));
248 del_keys.push(format!("ff:wp:{}:{}:condition", tag, wp_id_str));
249 }
250
251 let signal_key = format!("ff:exec:{}:{}:signals", tag, eid_str);
253 let sig_ids: Vec<String> = client
254 .cmd("ZRANGE")
255 .arg(&signal_key)
256 .arg("0")
257 .arg("-1")
258 .execute()
259 .await
260 .unwrap_or_default();
261
262 del_keys.push(signal_key);
263
264 for sig_id_str in &sig_ids {
265 del_keys.push(format!("ff:signal:{}:{}", tag, sig_id_str));
266 del_keys.push(format!("ff:signal:{}:{}:payload", tag, sig_id_str));
267 }
268
269 for chunk in del_keys.chunks(500) {
275 let key_refs: Vec<&str> = chunk.iter().map(|s| s.as_str()).collect();
276 let _: u32 = client
277 .cmd("DEL")
278 .arg(&key_refs)
279 .execute()
280 .await?;
281 }
282
283 let _: u32 = client
289 .cmd("DEL")
290 .arg(&[exec_core_key.as_str(), policy_key.as_str()][..])
291 .execute()
292 .await?;
293
294 let _: u32 = client.cmd("ZREM").arg(terminal_key).arg(eid_str).execute().await?;
295 let all_exec_key = idx.all_executions();
296 let _: u32 = client.cmd("SREM").arg(&all_exec_key).arg(eid_str).execute().await?;
297
298 tracing::debug!(
299 execution_id = eid_str,
300 attempts = total_attempts,
301 waitpoints = wp_ids.len(),
302 signals = sig_ids.len(),
303 "retention_trimmer: purged execution"
304 );
305
306 Ok(true)
307}
308
309async fn read_retention_ms(
312 client: &ferriskey::Client,
313 policy_key: &str,
314 default_retention_ms: u64,
315) -> u64 {
316 let policy_json: Option<String> = match client
317 .cmd("GET")
318 .arg(policy_key)
319 .execute()
320 .await
321 {
322 Ok(v) => v,
323 Err(_) => return default_retention_ms,
324 };
325
326 let json_str = match policy_json {
327 Some(s) if !s.is_empty() => s,
328 _ => return default_retention_ms,
329 };
330
331 let parsed: serde_json::Value = match serde_json::from_str(&json_str) {
333 Ok(v) => v,
334 Err(_) => return default_retention_ms,
335 };
336
337 parsed
338 .get("stream_policy")
339 .and_then(|sp| sp.get("retention_ttl_ms"))
340 .and_then(|v| v.as_u64())
341 .unwrap_or(default_retention_ms)
342}
343