1use std::time::Duration;
35
36use ff_core::backend::ScannerFilter;
37use ff_core::keys::{
38 ExecKeyContext, FlowIndexKeys, FlowKeyContext, IndexKeys,
39};
40use ff_core::partition::{
41 execution_partition, Partition, PartitionConfig, PartitionFamily,
42};
43use ff_core::types::{
44 AttemptIndex, ExecutionId, FlowId, LaneId, WaitpointId,
45 WorkerInstanceId,
46};
47
48use super::{ScanResult, Scanner};
49
50const BATCH_SIZE: u32 = 50;
54
55const MAX_SIBLINGS_PER_GROUP: usize = 1024;
61
62pub struct EdgeCancelDispatcher {
63 interval: Duration,
64 partition_config: PartitionConfig,
65 filter: ScannerFilter,
66 metrics: std::sync::Arc<ff_observability::Metrics>,
67}
68
69impl EdgeCancelDispatcher {
70 pub fn new(interval: Duration, partition_config: PartitionConfig) -> Self {
71 Self::with_filter(interval, partition_config, ScannerFilter::default())
72 }
73
74 pub fn with_filter(
75 interval: Duration,
76 partition_config: PartitionConfig,
77 filter: ScannerFilter,
78 ) -> Self {
79 Self::with_filter_and_metrics(
80 interval,
81 partition_config,
82 filter,
83 std::sync::Arc::new(ff_observability::Metrics::new()),
84 )
85 }
86
87 pub fn with_filter_and_metrics(
88 interval: Duration,
89 partition_config: PartitionConfig,
90 filter: ScannerFilter,
91 metrics: std::sync::Arc<ff_observability::Metrics>,
92 ) -> Self {
93 Self {
94 interval,
95 partition_config,
96 filter,
97 metrics,
98 }
99 }
100}
101
102impl Scanner for EdgeCancelDispatcher {
103 fn name(&self) -> &'static str {
104 "edge_cancel_dispatcher"
105 }
106
107 fn interval(&self) -> Duration {
108 self.interval
109 }
110
111 fn filter(&self) -> &ScannerFilter {
112 &self.filter
113 }
114
115 async fn scan_partition(
116 &self,
117 client: &ferriskey::Client,
118 partition: u16,
119 ) -> ScanResult {
120 let p = Partition {
121 family: PartitionFamily::Flow,
122 index: partition,
123 };
124 let fidx = FlowIndexKeys::new(&p);
125 let pending_key = fidx.pending_cancel_groups();
126
127 let members: Vec<String> = match client
132 .cmd("SRANDMEMBER")
133 .arg(&pending_key)
134 .arg(BATCH_SIZE.to_string().as_str())
135 .execute()
136 .await
137 {
138 Ok(m) => m,
139 Err(e) => {
140 tracing::warn!(
141 partition,
142 error = %e,
143 "edge_cancel_dispatcher: SRANDMEMBER pending_cancel_groups failed"
144 );
145 return ScanResult { processed: 0, errors: 1 };
146 }
147 };
148
149 if members.is_empty() {
150 return ScanResult { processed: 0, errors: 0 };
151 }
152
153 let mut processed: u32 = 0;
154 let mut errors: u32 = 0;
155
156 for member in &members {
157 match self
158 .dispatch_one_group(client, &p, &pending_key, member)
159 .await
160 {
161 GroupOutcome::Drained => processed += 1,
162 GroupOutcome::SkippedRetry => { }
163 GroupOutcome::Error => errors += 1,
164 }
165 }
166
167 ScanResult { processed, errors }
168 }
169}
170
171enum GroupOutcome {
172 Drained,
174 SkippedRetry,
177 Error,
179}
180
181impl EdgeCancelDispatcher {
182 async fn dispatch_one_group(
183 &self,
184 client: &ferriskey::Client,
185 flow_p: &Partition,
186 pending_key: &str,
187 member: &str,
188 ) -> GroupOutcome {
189 let (flow_id_str, downstream_eid_str) = match member.split_once('|') {
191 Some((f, d)) if !f.is_empty() && !d.is_empty() => (f, d),
192 _ => {
193 tracing::warn!(
194 raw = member,
195 "edge_cancel_dispatcher: malformed pending_cancel_groups \
196 member; SREM-ing to avoid poison"
197 );
198 let _: Result<i64, _> = client
199 .cmd("SREM")
200 .arg(pending_key)
201 .arg(member)
202 .execute()
203 .await;
204 return GroupOutcome::Error;
205 }
206 };
207
208 let flow_id = match FlowId::parse(flow_id_str) {
209 Ok(id) => id,
210 Err(_) => {
211 let _: Result<i64, _> = client
212 .cmd("SREM")
213 .arg(pending_key)
214 .arg(member)
215 .execute()
216 .await;
217 return GroupOutcome::Error;
218 }
219 };
220
221 let downstream_eid = match ExecutionId::parse(downstream_eid_str) {
222 Ok(id) => id,
223 Err(_) => {
224 let _: Result<i64, _> = client
225 .cmd("SREM")
226 .arg(pending_key)
227 .arg(member)
228 .execute()
229 .await;
230 return GroupOutcome::Error;
231 }
232 };
233
234 let fctx = FlowKeyContext::new(flow_p, &flow_id);
235 let edgegroup_key = fctx.edgegroup(&downstream_eid);
236
237 let fields: Vec<Option<String>> = match client
241 .cmd("HMGET")
242 .arg(&edgegroup_key)
243 .arg("cancel_siblings_reason")
244 .arg("cancel_siblings_pending_members")
245 .arg("cancel_siblings_pending_flag")
246 .execute()
247 .await
248 {
249 Ok(v) => v,
250 Err(e) => {
251 tracing::debug!(
252 flow_id = %flow_id,
253 downstream = %downstream_eid,
254 error = %e,
255 "edge_cancel_dispatcher: HMGET edgegroup failed; retry next cycle"
256 );
257 return GroupOutcome::SkippedRetry;
258 }
259 };
260
261 let reason = fields.first().and_then(|v| v.clone()).unwrap_or_default();
262 let members_raw = fields.get(1).and_then(|v| v.clone()).unwrap_or_default();
263 let flag = fields.get(2).and_then(|v| v.clone()).unwrap_or_default();
264
265 if flag.is_empty() && members_raw.is_empty() {
269 tracing::debug!(
270 flow_id = %flow_id,
271 downstream = %downstream_eid,
272 "edge_cancel_dispatcher: group has no pending flag / members; \
273 draining tuple (likely already drained or racing retention)"
274 );
275 return self
276 .drain_group(client, pending_key, &edgegroup_key, &flow_id, &downstream_eid)
277 .await;
278 }
279
280 let reason_str = if reason.is_empty() {
285 "sibling_quorum_satisfied"
286 } else {
287 reason.as_str()
288 };
289
290 let sibling_eids: Vec<&str> = members_raw
292 .split('|')
293 .filter(|s| !s.is_empty())
294 .take(MAX_SIBLINGS_PER_GROUP)
295 .collect();
296
297 let static_reason: &'static str = match reason_str {
302 "sibling_quorum_impossible" => "sibling_quorum_impossible",
303 _ => "sibling_quorum_satisfied",
304 };
305
306 let mut cancel_dispositions: [u64; 3] = [0, 0, 0]; for sib_str in &sibling_eids {
308 let sib_eid = match ExecutionId::parse(sib_str) {
309 Ok(id) => id,
310 Err(e) => {
311 tracing::warn!(
312 flow_id = %flow_id,
313 raw = %sib_str,
314 error = %e,
315 "edge_cancel_dispatcher: malformed sibling eid; counting as not_found"
316 );
317 cancel_dispositions[2] += 1;
318 continue;
319 }
320 };
321
322 self.metrics.inc_sibling_cancel_dispatched(static_reason);
323 match cancel_sibling(
324 client,
325 &self.partition_config,
326 &sib_eid,
327 reason_str,
328 )
329 .await
330 {
331 SiblingDisposition::Cancelled => {
332 cancel_dispositions[0] += 1;
333 self.metrics.inc_sibling_cancel_disposition("cancelled");
334 }
335 SiblingDisposition::AlreadyTerminal => {
336 cancel_dispositions[1] += 1;
337 self.metrics
338 .inc_sibling_cancel_disposition("already_terminal");
339 }
340 SiblingDisposition::NotFound => {
341 cancel_dispositions[2] += 1;
342 self.metrics.inc_sibling_cancel_disposition("not_found");
343 }
344 SiblingDisposition::TransientError => {
345 tracing::debug!(
349 flow_id = %flow_id,
350 sibling = %sib_eid,
351 "edge_cancel_dispatcher: transient cancel error; retry group next cycle"
352 );
353 return GroupOutcome::SkippedRetry;
354 }
355 }
356 }
357
358 for (i, label) in ["cancelled", "already_terminal", "not_found"].iter().enumerate() {
359 if cancel_dispositions[i] > 0 {
360 tracing::debug!(
361 flow_id = %flow_id,
362 downstream = %downstream_eid,
363 reason = %static_reason,
364 disposition = label,
365 count = cancel_dispositions[i],
366 "edge_cancel_dispatcher: sibling cancel disposition"
367 );
368 }
369 }
370
371 self.drain_group(client, pending_key, &edgegroup_key, &flow_id, &downstream_eid)
373 .await
374 }
375
376 async fn drain_group(
377 &self,
378 client: &ferriskey::Client,
379 pending_key: &str,
380 edgegroup_key: &str,
381 flow_id: &FlowId,
382 downstream_eid: &ExecutionId,
383 ) -> GroupOutcome {
384 let flow_id_str = flow_id.to_string();
385 let downstream_eid_str = downstream_eid.to_string();
386 let keys = [pending_key, edgegroup_key];
387 let argv = [flow_id_str.as_str(), downstream_eid_str.as_str()];
388 match client
389 .fcall::<ferriskey::Value>(
390 "ff_drain_sibling_cancel_group",
391 &keys,
392 &argv,
393 )
394 .await
395 {
396 Ok(_) => GroupOutcome::Drained,
397 Err(e) => {
398 tracing::warn!(
399 flow_id = %flow_id,
400 downstream = %downstream_eid,
401 error = %e,
402 "edge_cancel_dispatcher: drain FCALL failed; retry next cycle"
403 );
404 GroupOutcome::SkippedRetry
405 }
406 }
407 }
408}
409
410#[derive(Debug, Clone, Copy)]
411enum SiblingDisposition {
412 Cancelled,
413 AlreadyTerminal,
414 NotFound,
415 TransientError,
416}
417
418async fn cancel_sibling(
425 client: &ferriskey::Client,
426 partition_config: &PartitionConfig,
427 sib_eid: &ExecutionId,
428 reason: &str,
429) -> SiblingDisposition {
430 let partition = execution_partition(sib_eid, partition_config);
431 let ctx = ExecKeyContext::new(&partition, sib_eid);
432 let idx = IndexKeys::new(&partition);
433
434 let lane_str: Option<String> = match client.hget(&ctx.core(), "lane_id").await {
437 Ok(v) => v,
438 Err(_) => return SiblingDisposition::TransientError,
439 };
440 let lane = LaneId::new(lane_str.as_deref().unwrap_or("default"));
441
442 let dyn_fields: Vec<Option<String>> = match client
443 .cmd("HMGET")
444 .arg(ctx.core())
445 .arg("current_attempt_index")
446 .arg("current_waitpoint_id")
447 .arg("current_worker_instance_id")
448 .execute()
449 .await
450 {
451 Ok(v) => v,
452 Err(_) => return SiblingDisposition::TransientError,
453 };
454
455 let att_idx_val = dyn_fields
456 .first()
457 .and_then(|v| v.as_ref())
458 .and_then(|s| s.parse::<u32>().ok())
459 .unwrap_or(0);
460 let att_idx = AttemptIndex::new(att_idx_val);
461 let wp_id_str = dyn_fields
462 .get(1)
463 .and_then(|v| v.as_ref())
464 .cloned()
465 .unwrap_or_default();
466 let wp_id = if wp_id_str.is_empty() {
467 WaitpointId::new()
468 } else {
469 WaitpointId::parse(&wp_id_str).unwrap_or_else(|_| WaitpointId::new())
470 };
471 let wiid_str = dyn_fields
472 .get(2)
473 .and_then(|v| v.as_ref())
474 .cloned()
475 .unwrap_or_default();
476 let wiid = WorkerInstanceId::new(&wiid_str);
477
478 let keys: Vec<String> = vec![
479 ctx.core(),
480 ctx.attempt_hash(att_idx),
481 ctx.stream_meta(att_idx),
482 ctx.lease_current(),
483 ctx.lease_history(),
484 idx.lease_expiry(),
485 idx.worker_leases(&wiid),
486 ctx.suspension_current(),
487 ctx.waitpoint(&wp_id),
488 ctx.waitpoint_condition(&wp_id),
489 idx.suspension_timeout(),
490 idx.lane_terminal(&lane),
491 idx.attempt_timeout(),
492 idx.execution_deadline(),
493 idx.lane_eligible(&lane),
494 idx.lane_delayed(&lane),
495 idx.lane_blocked_dependencies(&lane),
496 idx.lane_blocked_budget(&lane),
497 idx.lane_blocked_quota(&lane),
498 idx.lane_blocked_route(&lane),
499 idx.lane_blocked_operator(&lane),
500 ];
501 let argv: Vec<String> = vec![
502 sib_eid.to_string(),
503 reason.to_owned(),
504 "operator_override".to_owned(),
505 String::new(),
506 String::new(),
507 ];
508 let kr: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
509 let ar: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
510
511 match client
512 .fcall::<ferriskey::Value>("ff_cancel_execution", &kr, &ar)
513 .await
514 {
515 Ok(ferriskey::Value::Array(arr)) => match arr.first() {
516 Some(Ok(ferriskey::Value::Int(1))) => SiblingDisposition::Cancelled,
517 Some(Ok(ferriskey::Value::Int(0))) => {
518 let code = arr
519 .get(1)
520 .and_then(|r| match r {
521 Ok(ferriskey::Value::BulkString(b)) => {
522 Some(String::from_utf8_lossy(b).into_owned())
523 }
524 Ok(ferriskey::Value::SimpleString(s)) => Some(s.clone()),
525 _ => None,
526 })
527 .unwrap_or_default();
528 match code.as_str() {
529 "execution_not_active" => SiblingDisposition::AlreadyTerminal,
530 "execution_not_found" => SiblingDisposition::NotFound,
531 _ => SiblingDisposition::TransientError,
532 }
533 }
534 _ => SiblingDisposition::TransientError,
535 },
536 Ok(_) => SiblingDisposition::TransientError,
537 Err(_) => SiblingDisposition::TransientError,
538 }
539}