1use std::time::Duration;
25
26use ff_core::backend::ScannerFilter;
27use ff_core::keys::{ExecKeyContext, FlowIndexKeys, FlowKeyContext, IndexKeys};
28use ff_core::partition::{
29 execution_partition, Partition, PartitionConfig, PartitionFamily,
30};
31use ff_core::types::{AttemptIndex, ExecutionId, FlowId, LaneId, WaitpointId, WorkerInstanceId};
32
33use super::{should_skip_candidate, ScanResult, Scanner};
34
35const BATCH_SIZE: u32 = 50;
36const MAX_MEMBERS_PER_FLOW_PER_CYCLE: usize = 500;
37
38pub struct CancelReconciler {
39 interval: Duration,
40 partition_config: PartitionConfig,
41 filter: ScannerFilter,
42}
43
44impl CancelReconciler {
45 pub fn new(interval: Duration, partition_config: PartitionConfig) -> Self {
46 Self::with_filter(interval, partition_config, ScannerFilter::default())
47 }
48
49 pub fn with_filter(
53 interval: Duration,
54 partition_config: PartitionConfig,
55 filter: ScannerFilter,
56 ) -> Self {
57 Self {
58 interval,
59 partition_config,
60 filter,
61 }
62 }
63}
64
65impl Scanner for CancelReconciler {
66 fn name(&self) -> &'static str {
67 "cancel_reconciler"
68 }
69
70 fn interval(&self) -> Duration {
71 self.interval
72 }
73
74 fn filter(&self) -> &ScannerFilter {
75 &self.filter
76 }
77
78 async fn sample_backlog_depth(
87 &self,
88 client: &ferriskey::Client,
89 partition: u16,
90 ) -> Option<u64> {
91 let p = Partition {
92 family: PartitionFamily::Flow,
93 index: partition,
94 };
95 let fidx = FlowIndexKeys::new(&p);
96 let backlog_key = fidx.cancel_backlog();
97 let card: Result<Option<u64>, _> = client
98 .cmd("ZCARD")
99 .arg(&backlog_key)
100 .execute()
101 .await;
102 card.ok().flatten()
103 }
104
105 async fn scan_partition(
106 &self,
107 client: &ferriskey::Client,
108 partition: u16,
109 ) -> ScanResult {
110 let p = Partition {
111 family: PartitionFamily::Flow,
112 index: partition,
113 };
114 let fidx = FlowIndexKeys::new(&p);
115 let backlog_key = fidx.cancel_backlog();
116
117 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
118 Ok(t) => t,
119 Err(e) => {
120 tracing::warn!(partition, error = %e, "cancel_reconciler: TIME failed");
121 return ScanResult { processed: 0, errors: 1 };
122 }
123 };
124
125 let flow_ids: Vec<String> = match client
131 .cmd("ZRANGEBYSCORE")
132 .arg(&backlog_key)
133 .arg("-inf")
134 .arg(now_ms.to_string().as_str())
135 .arg("LIMIT")
136 .arg("0")
137 .arg(BATCH_SIZE.to_string().as_str())
138 .execute()
139 .await
140 {
141 Ok(ids) => ids,
142 Err(e) => {
143 tracing::warn!(
144 partition,
145 error = %e,
146 "cancel_reconciler: ZRANGEBYSCORE cancel_backlog failed"
147 );
148 return ScanResult { processed: 0, errors: 1 };
149 }
150 };
151
152 if flow_ids.is_empty() {
153 return ScanResult { processed: 0, errors: 0 };
154 }
155
156 let mut processed: u32 = 0;
157 let mut errors: u32 = 0;
158
159 for flow_id_str in flow_ids {
160 let flow_id = match FlowId::parse(&flow_id_str) {
161 Ok(id) => id,
162 Err(e) => {
163 tracing::warn!(
164 partition,
165 raw = %flow_id_str,
166 error = %e,
167 "cancel_reconciler: malformed flow_id in cancel_backlog; ZREM"
168 );
169 let _: Result<i64, _> = client
170 .cmd("ZREM")
171 .arg(&backlog_key)
172 .arg(flow_id_str.as_str())
173 .execute()
174 .await;
175 errors += 1;
176 continue;
177 }
178 };
179 let fctx = FlowKeyContext::new(&p, &flow_id);
180 let pending_key = fctx.pending_cancels();
181
182 let core_exists: bool = match client
186 .cmd("EXISTS")
187 .arg(fctx.core().as_str())
188 .execute()
189 .await
190 {
191 Ok(v) => v,
192 Err(e) => {
193 tracing::warn!(
194 flow_id = %flow_id,
195 error = %e,
196 "cancel_reconciler: EXISTS flow_core failed"
197 );
198 errors += 1;
199 continue;
200 }
201 };
202 if !core_exists {
203 let _: Result<i64, _> = client
204 .cmd("DEL")
205 .arg(pending_key.as_str())
206 .execute()
207 .await;
208 let _: Result<i64, _> = client
209 .cmd("ZREM")
210 .arg(&backlog_key)
211 .arg(flow_id.to_string().as_str())
212 .execute()
213 .await;
214 continue;
215 }
216
217 let member_strs: Vec<String> = match client
218 .cmd("SRANDMEMBER")
219 .arg(pending_key.as_str())
220 .arg(MAX_MEMBERS_PER_FLOW_PER_CYCLE.to_string().as_str())
221 .execute()
222 .await
223 {
224 Ok(m) => m,
225 Err(e) => {
226 tracing::warn!(
227 flow_id = %flow_id,
228 error = %e,
229 "cancel_reconciler: SRANDMEMBER pending_cancels failed"
230 );
231 errors += 1;
232 continue;
233 }
234 };
235
236 if member_strs.is_empty() {
237 let _: Result<i64, _> = client
241 .cmd("ZREM")
242 .arg(&backlog_key)
243 .arg(flow_id.to_string().as_str())
244 .execute()
245 .await;
246 continue;
247 }
248
249 let reason: String = match client
254 .cmd("HGET")
255 .arg(fctx.core().as_str())
256 .arg("cancel_reason")
257 .execute::<Option<String>>()
258 .await
259 {
260 Ok(Some(s)) => s,
261 Ok(None) => "flow_cancelled".to_owned(),
262 Err(e) => {
263 tracing::warn!(
264 flow_id = %flow_id,
265 error = %e,
266 "cancel_reconciler: HGET cancel_reason failed; retry next cycle"
267 );
268 errors += 1;
269 continue;
270 }
271 };
272
273 for eid_str in &member_strs {
274 let execution_id = match ExecutionId::parse(eid_str) {
275 Ok(id) => id,
276 Err(e) => {
277 tracing::warn!(
278 flow_id = %flow_id,
279 raw = %eid_str,
280 error = %e,
281 "cancel_reconciler: malformed eid in pending_cancels; SREM"
282 );
283 let _: Result<i64, _> = client
284 .cmd("SREM")
285 .arg(pending_key.as_str())
286 .arg(eid_str.as_str())
287 .execute()
288 .await;
289 errors += 1;
290 continue;
291 }
292 };
293
294 let member_part = execution_partition(
298 &execution_id,
299 &self.partition_config,
300 ).index;
301 if should_skip_candidate(
302 client,
303 &self.filter,
304 member_part,
305 eid_str,
306 )
307 .await
308 {
309 continue;
310 }
311
312 if cancel_member(
313 client,
314 &self.partition_config,
315 &execution_id,
316 &reason,
317 )
318 .await
319 {
320 let flow_id_str = flow_id.to_string();
327 let ack_keys = [pending_key.as_str(), backlog_key.as_str()];
328 let ack_args = [eid_str.as_str(), flow_id_str.as_str()];
329 match client
330 .fcall::<ferriskey::Value>("ff_ack_cancel_member", &ack_keys, &ack_args)
331 .await
332 {
333 Ok(_) => processed += 1,
334 Err(e) => {
335 tracing::debug!(
336 flow_id = %flow_id,
337 execution_id = %eid_str,
338 error = %e,
339 "cancel_reconciler: ack failed; retry next cycle"
340 );
341 errors += 1;
342 }
343 }
344 } else {
345 errors += 1;
346 }
347 }
348 }
349
350 ScanResult { processed, errors }
351 }
352}
353
354async fn cancel_member(
368 client: &ferriskey::Client,
369 partition_config: &PartitionConfig,
370 execution_id: &ExecutionId,
371 reason: &str,
372) -> bool {
373 let partition = execution_partition(execution_id, partition_config);
374 let ctx = ExecKeyContext::new(&partition, execution_id);
375 let idx = IndexKeys::new(&partition);
376
377 let lane_str: Option<String> = match client.hget(&ctx.core(), "lane_id").await {
382 Ok(v) => v,
383 Err(e) => {
384 let kind = e.kind();
385 let retryable = is_retryable_kind(kind);
386 if !retryable {
387 tracing::warn!(
388 execution_id = %execution_id,
389 error = %e,
390 "cancel_reconciler: permanent HGET lane_id error; ack to avoid poison"
391 );
392 return true;
393 }
394 tracing::debug!(
395 execution_id = %execution_id,
396 error = %e,
397 "cancel_reconciler: transient HGET lane_id; retry next cycle"
398 );
399 return false;
400 }
401 };
402 let lane = LaneId::new(lane_str.as_deref().unwrap_or("default"));
403
404 let dyn_fields: Vec<Option<String>> = match client
405 .cmd("HMGET")
406 .arg(ctx.core())
407 .arg("current_attempt_index")
408 .arg("current_waitpoint_id")
409 .arg("current_worker_instance_id")
410 .execute()
411 .await
412 {
413 Ok(v) => v,
414 Err(e) => {
415 let kind = e.kind();
416 let retryable = is_retryable_kind(kind);
417 if !retryable {
418 tracing::warn!(
419 execution_id = %execution_id,
420 error = %e,
421 "cancel_reconciler: permanent HMGET error; ack to avoid poison"
422 );
423 return true;
424 }
425 tracing::debug!(
426 execution_id = %execution_id,
427 error = %e,
428 "cancel_reconciler: transient HMGET; retry next cycle"
429 );
430 return false;
431 }
432 };
433
434 let att_idx_val = dyn_fields
435 .first()
436 .and_then(|v| v.as_ref())
437 .and_then(|s| s.parse::<u32>().ok())
438 .unwrap_or(0);
439 let att_idx = AttemptIndex::new(att_idx_val);
440 let wp_id_str = dyn_fields.get(1).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
441 let wp_id = if wp_id_str.is_empty() {
442 WaitpointId::new()
443 } else {
444 WaitpointId::parse(&wp_id_str).unwrap_or_else(|_| WaitpointId::new())
445 };
446 let wiid_str = dyn_fields.get(2).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
447 let wiid = WorkerInstanceId::new(&wiid_str);
448
449 let keys: Vec<String> = vec![
450 ctx.core(),
451 ctx.attempt_hash(att_idx),
452 ctx.stream_meta(att_idx),
453 ctx.lease_current(),
454 ctx.lease_history(),
455 idx.lease_expiry(),
456 idx.worker_leases(&wiid),
457 ctx.suspension_current(),
458 ctx.waitpoint(&wp_id),
459 ctx.waitpoint_condition(&wp_id),
460 idx.suspension_timeout(),
461 idx.lane_terminal(&lane),
462 idx.attempt_timeout(),
463 idx.execution_deadline(),
464 idx.lane_eligible(&lane),
465 idx.lane_delayed(&lane),
466 idx.lane_blocked_dependencies(&lane),
467 idx.lane_blocked_budget(&lane),
468 idx.lane_blocked_quota(&lane),
469 idx.lane_blocked_route(&lane),
470 idx.lane_blocked_operator(&lane),
471 ];
472 let argv: Vec<String> = vec![
473 execution_id.to_string(),
474 reason.to_owned(),
475 "operator_override".to_owned(),
476 String::new(),
477 String::new(),
478 ];
479 let kr: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
480 let ar: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
481
482 match client.fcall::<ferriskey::Value>("ff_cancel_execution", &kr, &ar).await {
483 Ok(ferriskey::Value::Array(arr)) => match arr.first() {
484 Some(Ok(ferriskey::Value::Int(1))) => true,
485 Some(Ok(ferriskey::Value::Int(0))) => {
486 let code = arr
487 .get(1)
488 .and_then(|r| match r {
489 Ok(ferriskey::Value::BulkString(b)) => {
490 Some(String::from_utf8_lossy(b).into_owned())
491 }
492 Ok(ferriskey::Value::SimpleString(s)) => Some(s.clone()),
493 _ => None,
494 })
495 .unwrap_or_default();
496 matches!(code.as_str(), "execution_not_active" | "execution_not_found")
497 }
498 _ => false,
499 },
500 Ok(_) => false,
501 Err(e) => {
502 let retryable = is_retryable_kind(e.kind());
503 if !retryable {
504 tracing::warn!(
505 execution_id = %execution_id,
506 error = %e,
507 "cancel_reconciler: permanent error on FCALL; ack to avoid poison"
508 );
509 return true;
510 }
511 tracing::debug!(
512 execution_id = %execution_id,
513 error = %e,
514 "cancel_reconciler: transient FCALL error; retry next cycle"
515 );
516 false
517 }
518 }
519}
520
521fn is_retryable_kind(kind: ferriskey::ErrorKind) -> bool {
527 use ferriskey::ErrorKind::*;
528 matches!(
529 kind,
530 IoError | FatalSendError | TryAgain | BusyLoadingError | ClusterDown
531 )
532}