1use std::time::Duration;
35
36use ff_core::backend::ScannerFilter;
37use ff_core::keys::{
38 ExecKeyContext, FlowIndexKeys, FlowKeyContext, IndexKeys,
39};
40use ff_core::partition::{
41 execution_partition, Partition, PartitionConfig, PartitionFamily,
42};
43use ff_core::types::{
44 AttemptIndex, ExecutionId, FlowId, LaneId, WaitpointId,
45 WorkerInstanceId,
46};
47
48use super::{ScanResult, Scanner};
49
50const BATCH_SIZE: u32 = 50;
54
55const MAX_SIBLINGS_PER_GROUP: usize = 1024;
61
62pub struct EdgeCancelDispatcher {
63 interval: Duration,
64 partition_config: PartitionConfig,
65 filter: ScannerFilter,
66 metrics: std::sync::Arc<ff_observability::Metrics>,
67}
68
69impl EdgeCancelDispatcher {
70 pub fn new(interval: Duration, partition_config: PartitionConfig) -> Self {
71 Self::with_filter(interval, partition_config, ScannerFilter::default())
72 }
73
74 pub fn with_filter(
75 interval: Duration,
76 partition_config: PartitionConfig,
77 filter: ScannerFilter,
78 ) -> Self {
79 Self::with_filter_and_metrics(
80 interval,
81 partition_config,
82 filter,
83 std::sync::Arc::new(ff_observability::Metrics::new()),
84 )
85 }
86
87 pub fn with_filter_and_metrics(
88 interval: Duration,
89 partition_config: PartitionConfig,
90 filter: ScannerFilter,
91 metrics: std::sync::Arc<ff_observability::Metrics>,
92 ) -> Self {
93 Self {
94 interval,
95 partition_config,
96 filter,
97 metrics,
98 }
99 }
100}
101
102impl Scanner for EdgeCancelDispatcher {
103 fn name(&self) -> &'static str {
104 "edge_cancel_dispatcher"
105 }
106
107 fn interval(&self) -> Duration {
108 self.interval
109 }
110
111 fn filter(&self) -> &ScannerFilter {
112 &self.filter
113 }
114
115 async fn scan_partition(
116 &self,
117 client: &ferriskey::Client,
118 partition: u16,
119 ) -> ScanResult {
120 let p = Partition {
121 family: PartitionFamily::Flow,
122 index: partition,
123 };
124 let fidx = FlowIndexKeys::new(&p);
125 let pending_key = fidx.pending_cancel_groups();
126
127 let members: Vec<String> = match client
132 .cmd("SRANDMEMBER")
133 .arg(&pending_key)
134 .arg(BATCH_SIZE.to_string().as_str())
135 .execute()
136 .await
137 {
138 Ok(m) => m,
139 Err(e) => {
140 tracing::warn!(
141 partition,
142 error = %e,
143 "edge_cancel_dispatcher: SRANDMEMBER pending_cancel_groups failed"
144 );
145 return ScanResult { processed: 0, errors: 1 };
146 }
147 };
148
149 if members.is_empty() {
150 return ScanResult { processed: 0, errors: 0 };
151 }
152
153 let mut processed: u32 = 0;
154 let mut errors: u32 = 0;
155
156 for member in &members {
157 match self
158 .dispatch_one_group(client, &p, &pending_key, member)
159 .await
160 {
161 GroupOutcome::Drained => processed += 1,
162 GroupOutcome::SkippedRetry => { }
163 GroupOutcome::Error => errors += 1,
164 }
165 }
166
167 ScanResult { processed, errors }
168 }
169}
170
171enum GroupOutcome {
172 Drained,
174 SkippedRetry,
177 Error,
179}
180
181#[cfg(feature = "postgres")]
189pub async fn dispatch_via_postgres(
190 pool: &ff_backend_postgres::PgPool,
191 filter: &ff_core::backend::ScannerFilter,
192) -> Result<
193 ff_backend_postgres::reconcilers::edge_cancel_dispatcher::DispatchReport,
194 ff_core::engine_error::EngineError,
195> {
196 ff_backend_postgres::reconcilers::edge_cancel_dispatcher::dispatcher_tick(pool, filter).await
197}
198
199impl EdgeCancelDispatcher {
200 async fn dispatch_one_group(
201 &self,
202 client: &ferriskey::Client,
203 flow_p: &Partition,
204 pending_key: &str,
205 member: &str,
206 ) -> GroupOutcome {
207 let (flow_id_str, downstream_eid_str) = match member.split_once('|') {
209 Some((f, d)) if !f.is_empty() && !d.is_empty() => (f, d),
210 _ => {
211 tracing::warn!(
212 raw = member,
213 "edge_cancel_dispatcher: malformed pending_cancel_groups \
214 member; SREM-ing to avoid poison"
215 );
216 let _: Result<i64, _> = client
217 .cmd("SREM")
218 .arg(pending_key)
219 .arg(member)
220 .execute()
221 .await;
222 return GroupOutcome::Error;
223 }
224 };
225
226 let flow_id = match FlowId::parse(flow_id_str) {
227 Ok(id) => id,
228 Err(_) => {
229 let _: Result<i64, _> = client
230 .cmd("SREM")
231 .arg(pending_key)
232 .arg(member)
233 .execute()
234 .await;
235 return GroupOutcome::Error;
236 }
237 };
238
239 let downstream_eid = match ExecutionId::parse(downstream_eid_str) {
240 Ok(id) => id,
241 Err(_) => {
242 let _: Result<i64, _> = client
243 .cmd("SREM")
244 .arg(pending_key)
245 .arg(member)
246 .execute()
247 .await;
248 return GroupOutcome::Error;
249 }
250 };
251
252 let fctx = FlowKeyContext::new(flow_p, &flow_id);
253 let edgegroup_key = fctx.edgegroup(&downstream_eid);
254
255 let fields: Vec<Option<String>> = match client
259 .cmd("HMGET")
260 .arg(&edgegroup_key)
261 .arg("cancel_siblings_reason")
262 .arg("cancel_siblings_pending_members")
263 .arg("cancel_siblings_pending_flag")
264 .execute()
265 .await
266 {
267 Ok(v) => v,
268 Err(e) => {
269 tracing::debug!(
270 flow_id = %flow_id,
271 downstream = %downstream_eid,
272 error = %e,
273 "edge_cancel_dispatcher: HMGET edgegroup failed; retry next cycle"
274 );
275 return GroupOutcome::SkippedRetry;
276 }
277 };
278
279 let reason = fields.first().and_then(|v| v.clone()).unwrap_or_default();
280 let members_raw = fields.get(1).and_then(|v| v.clone()).unwrap_or_default();
281 let flag = fields.get(2).and_then(|v| v.clone()).unwrap_or_default();
282
283 if flag.is_empty() && members_raw.is_empty() {
287 tracing::debug!(
288 flow_id = %flow_id,
289 downstream = %downstream_eid,
290 "edge_cancel_dispatcher: group has no pending flag / members; \
291 draining tuple (likely already drained or racing retention)"
292 );
293 return self
294 .drain_group(client, pending_key, &edgegroup_key, &flow_id, &downstream_eid)
295 .await;
296 }
297
298 let reason_str = if reason.is_empty() {
303 "sibling_quorum_satisfied"
304 } else {
305 reason.as_str()
306 };
307
308 let sibling_eids: Vec<&str> = members_raw
310 .split('|')
311 .filter(|s| !s.is_empty())
312 .take(MAX_SIBLINGS_PER_GROUP)
313 .collect();
314
315 let static_reason: &'static str = match reason_str {
320 "sibling_quorum_impossible" => "sibling_quorum_impossible",
321 _ => "sibling_quorum_satisfied",
322 };
323
324 let mut cancel_dispositions: [u64; 3] = [0, 0, 0]; for sib_str in &sibling_eids {
326 let sib_eid = match ExecutionId::parse(sib_str) {
327 Ok(id) => id,
328 Err(e) => {
329 tracing::warn!(
330 flow_id = %flow_id,
331 raw = %sib_str,
332 error = %e,
333 "edge_cancel_dispatcher: malformed sibling eid; counting as not_found"
334 );
335 cancel_dispositions[2] += 1;
336 continue;
337 }
338 };
339
340 self.metrics.inc_sibling_cancel_dispatched(static_reason);
341 match cancel_sibling(
342 client,
343 &self.partition_config,
344 &sib_eid,
345 reason_str,
346 )
347 .await
348 {
349 SiblingDisposition::Cancelled => {
350 cancel_dispositions[0] += 1;
351 self.metrics.inc_sibling_cancel_disposition("cancelled");
352 }
353 SiblingDisposition::AlreadyTerminal => {
354 cancel_dispositions[1] += 1;
355 self.metrics
356 .inc_sibling_cancel_disposition("already_terminal");
357 }
358 SiblingDisposition::NotFound => {
359 cancel_dispositions[2] += 1;
360 self.metrics.inc_sibling_cancel_disposition("not_found");
361 }
362 SiblingDisposition::TransientError => {
363 tracing::debug!(
367 flow_id = %flow_id,
368 sibling = %sib_eid,
369 "edge_cancel_dispatcher: transient cancel error; retry group next cycle"
370 );
371 return GroupOutcome::SkippedRetry;
372 }
373 }
374 }
375
376 for (i, label) in ["cancelled", "already_terminal", "not_found"].iter().enumerate() {
377 if cancel_dispositions[i] > 0 {
378 tracing::debug!(
379 flow_id = %flow_id,
380 downstream = %downstream_eid,
381 reason = %static_reason,
382 disposition = label,
383 count = cancel_dispositions[i],
384 "edge_cancel_dispatcher: sibling cancel disposition"
385 );
386 }
387 }
388
389 self.drain_group(client, pending_key, &edgegroup_key, &flow_id, &downstream_eid)
391 .await
392 }
393
394 async fn drain_group(
395 &self,
396 client: &ferriskey::Client,
397 pending_key: &str,
398 edgegroup_key: &str,
399 flow_id: &FlowId,
400 downstream_eid: &ExecutionId,
401 ) -> GroupOutcome {
402 let flow_id_str = flow_id.to_string();
403 let downstream_eid_str = downstream_eid.to_string();
404 let keys = [pending_key, edgegroup_key];
405 let argv = [flow_id_str.as_str(), downstream_eid_str.as_str()];
406 match client
407 .fcall::<ferriskey::Value>(
408 "ff_drain_sibling_cancel_group",
409 &keys,
410 &argv,
411 )
412 .await
413 {
414 Ok(_) => GroupOutcome::Drained,
415 Err(e) => {
416 tracing::warn!(
417 flow_id = %flow_id,
418 downstream = %downstream_eid,
419 error = %e,
420 "edge_cancel_dispatcher: drain FCALL failed; retry next cycle"
421 );
422 GroupOutcome::SkippedRetry
423 }
424 }
425 }
426}
427
428#[derive(Debug, Clone, Copy)]
429enum SiblingDisposition {
430 Cancelled,
431 AlreadyTerminal,
432 NotFound,
433 TransientError,
434}
435
436async fn cancel_sibling(
443 client: &ferriskey::Client,
444 partition_config: &PartitionConfig,
445 sib_eid: &ExecutionId,
446 reason: &str,
447) -> SiblingDisposition {
448 let partition = execution_partition(sib_eid, partition_config);
449 let ctx = ExecKeyContext::new(&partition, sib_eid);
450 let idx = IndexKeys::new(&partition);
451
452 let lane_str: Option<String> = match client.hget(&ctx.core(), "lane_id").await {
455 Ok(v) => v,
456 Err(_) => return SiblingDisposition::TransientError,
457 };
458 let lane = LaneId::new(lane_str.as_deref().unwrap_or("default"));
459
460 let dyn_fields: Vec<Option<String>> = match client
461 .cmd("HMGET")
462 .arg(ctx.core())
463 .arg("current_attempt_index")
464 .arg("current_waitpoint_id")
465 .arg("current_worker_instance_id")
466 .execute()
467 .await
468 {
469 Ok(v) => v,
470 Err(_) => return SiblingDisposition::TransientError,
471 };
472
473 let att_idx_val = dyn_fields
474 .first()
475 .and_then(|v| v.as_ref())
476 .and_then(|s| s.parse::<u32>().ok())
477 .unwrap_or(0);
478 let att_idx = AttemptIndex::new(att_idx_val);
479 let wp_id_str = dyn_fields
480 .get(1)
481 .and_then(|v| v.as_ref())
482 .cloned()
483 .unwrap_or_default();
484 let wp_id = if wp_id_str.is_empty() {
485 WaitpointId::new()
486 } else {
487 WaitpointId::parse(&wp_id_str).unwrap_or_else(|_| WaitpointId::new())
488 };
489 let wiid_str = dyn_fields
490 .get(2)
491 .and_then(|v| v.as_ref())
492 .cloned()
493 .unwrap_or_default();
494 let wiid = WorkerInstanceId::new(&wiid_str);
495
496 let keys: Vec<String> = vec![
497 ctx.core(),
498 ctx.attempt_hash(att_idx),
499 ctx.stream_meta(att_idx),
500 ctx.lease_current(),
501 ctx.lease_history(),
502 idx.lease_expiry(),
503 idx.worker_leases(&wiid),
504 ctx.suspension_current(),
505 ctx.waitpoint(&wp_id),
506 ctx.waitpoint_condition(&wp_id),
507 idx.suspension_timeout(),
508 idx.lane_terminal(&lane),
509 idx.attempt_timeout(),
510 idx.execution_deadline(),
511 idx.lane_eligible(&lane),
512 idx.lane_delayed(&lane),
513 idx.lane_blocked_dependencies(&lane),
514 idx.lane_blocked_budget(&lane),
515 idx.lane_blocked_quota(&lane),
516 idx.lane_blocked_route(&lane),
517 idx.lane_blocked_operator(&lane),
518 ];
519 let argv: Vec<String> = vec![
520 sib_eid.to_string(),
521 reason.to_owned(),
522 "operator_override".to_owned(),
523 String::new(),
524 String::new(),
525 ];
526 let kr: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
527 let ar: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
528
529 match client
530 .fcall::<ferriskey::Value>("ff_cancel_execution", &kr, &ar)
531 .await
532 {
533 Ok(ferriskey::Value::Array(arr)) => match arr.first() {
534 Some(Ok(ferriskey::Value::Int(1))) => SiblingDisposition::Cancelled,
535 Some(Ok(ferriskey::Value::Int(0))) => {
536 let code = arr
537 .get(1)
538 .and_then(|r| match r {
539 Ok(ferriskey::Value::BulkString(b)) => {
540 Some(String::from_utf8_lossy(b).into_owned())
541 }
542 Ok(ferriskey::Value::SimpleString(s)) => Some(s.clone()),
543 _ => None,
544 })
545 .unwrap_or_default();
546 match code.as_str() {
547 "execution_not_active" => SiblingDisposition::AlreadyTerminal,
548 "execution_not_found" => SiblingDisposition::NotFound,
549 _ => SiblingDisposition::TransientError,
550 }
551 }
552 _ => SiblingDisposition::TransientError,
553 },
554 Ok(_) => SiblingDisposition::TransientError,
555 Err(_) => SiblingDisposition::TransientError,
556 }
557}