ff_engine/scanner/
edge_cancel_reconciler.rs1use std::time::Duration;
26
27use ff_core::backend::ScannerFilter;
28use ff_core::keys::{FlowIndexKeys, FlowKeyContext};
29use ff_core::partition::{Partition, PartitionFamily};
30use ff_core::types::{ExecutionId, FlowId};
31
32use super::{ScanResult, Scanner};
33
34const BATCH_SIZE: usize = 50;
38
39pub struct EdgeCancelReconciler {
40 interval: Duration,
41 filter: ScannerFilter,
42 metrics: std::sync::Arc<ff_observability::Metrics>,
43}
44
45impl EdgeCancelReconciler {
46 pub fn new(interval: Duration) -> Self {
47 Self::with_filter(interval, ScannerFilter::default())
48 }
49
50 pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
51 Self::with_filter_and_metrics(
52 interval,
53 filter,
54 std::sync::Arc::new(ff_observability::Metrics::new()),
55 )
56 }
57
58 pub fn with_filter_and_metrics(
59 interval: Duration,
60 filter: ScannerFilter,
61 metrics: std::sync::Arc<ff_observability::Metrics>,
62 ) -> Self {
63 Self {
64 interval,
65 filter,
66 metrics,
67 }
68 }
69}
70
71impl Scanner for EdgeCancelReconciler {
72 fn name(&self) -> &'static str {
73 "edge_cancel_reconciler"
74 }
75
76 fn interval(&self) -> Duration {
77 self.interval
78 }
79
80 fn filter(&self) -> &ScannerFilter {
81 &self.filter
82 }
83
84 async fn scan_partition(
85 &self,
86 client: &ferriskey::Client,
87 partition: u16,
88 ) -> ScanResult {
89 let p = Partition {
90 family: PartitionFamily::Flow,
91 index: partition,
92 };
93 let fidx = FlowIndexKeys::new(&p);
94 let pending_key = fidx.pending_cancel_groups();
95
96 let members: Vec<String> = match client
101 .cmd("SRANDMEMBER")
102 .arg(&pending_key)
103 .arg(BATCH_SIZE.to_string().as_str())
104 .execute()
105 .await
106 {
107 Ok(m) => m,
108 Err(e) => {
109 tracing::warn!(
110 partition,
111 error = %e,
112 "edge_cancel_reconciler: SRANDMEMBER pending_cancel_groups failed"
113 );
114 return ScanResult { processed: 0, errors: 1 };
115 }
116 };
117
118 if members.is_empty() {
119 return ScanResult { processed: 0, errors: 0 };
120 }
121
122 let mut processed: u32 = 0;
123 let mut errors: u32 = 0;
124
125 for member in &members {
126 match self
127 .reconcile_one_group(client, &p, &pending_key, member)
128 .await
129 {
130 ReconcileOutcome::Acted => processed += 1,
131 ReconcileOutcome::NoOp => { }
132 ReconcileOutcome::Error => errors += 1,
133 }
134 }
135
136 ScanResult { processed, errors }
137 }
138}
139
140enum ReconcileOutcome {
141 Acted,
143 NoOp,
146 Error,
148}
149
150impl EdgeCancelReconciler {
151 async fn reconcile_one_group(
152 &self,
153 client: &ferriskey::Client,
154 flow_p: &Partition,
155 pending_key: &str,
156 member: &str,
157 ) -> ReconcileOutcome {
158 let (flow_id_str, downstream_eid_str) = match member.split_once('|') {
159 Some((f, d)) if !f.is_empty() && !d.is_empty() => (f, d),
160 _ => {
161 tracing::warn!(
162 raw = member,
163 "edge_cancel_reconciler: malformed pending_cancel_groups \
164 member; SREM-ing to avoid poison"
165 );
166 let _: Result<i64, _> = client
167 .cmd("SREM")
168 .arg(pending_key)
169 .arg(member)
170 .execute()
171 .await;
172 return ReconcileOutcome::Error;
173 }
174 };
175
176 let flow_id = match FlowId::parse(flow_id_str) {
177 Ok(id) => id,
178 Err(_) => {
179 let _: Result<i64, _> = client
180 .cmd("SREM")
181 .arg(pending_key)
182 .arg(member)
183 .execute()
184 .await;
185 return ReconcileOutcome::Error;
186 }
187 };
188
189 let downstream_eid = match ExecutionId::parse(downstream_eid_str) {
190 Ok(id) => id,
191 Err(_) => {
192 let _: Result<i64, _> = client
193 .cmd("SREM")
194 .arg(pending_key)
195 .arg(member)
196 .execute()
197 .await;
198 return ReconcileOutcome::Error;
199 }
200 };
201
202 let fctx = FlowKeyContext::new(flow_p, &flow_id);
203 let edgegroup_key = fctx.edgegroup(&downstream_eid);
204 let flow_id_s = flow_id.to_string();
205 let downstream_s = downstream_eid.to_string();
206 let keys = [pending_key, edgegroup_key.as_str()];
207 let argv = [flow_id_s.as_str(), downstream_s.as_str()];
208
209 let reply: Result<ferriskey::Value, _> = client
210 .fcall(
211 "ff_reconcile_sibling_cancel_group",
212 &keys,
213 &argv,
214 )
215 .await;
216
217 match reply {
218 Ok(val) => match extract_action(&val) {
219 Some(action) => {
220 self.metrics.inc_sibling_cancel_reconcile(action);
221 match action {
222 "sremmed_stale" | "completed_drain" => {
223 tracing::debug!(
224 flow_id = %flow_id,
225 downstream = %downstream_eid,
226 action,
227 "edge_cancel_reconciler: action applied"
228 );
229 ReconcileOutcome::Acted
230 }
231 _ => ReconcileOutcome::NoOp,
233 }
234 }
235 None => {
236 tracing::warn!(
237 flow_id = %flow_id,
238 downstream = %downstream_eid,
239 "edge_cancel_reconciler: unparsable FCALL reply"
240 );
241 ReconcileOutcome::Error
242 }
243 },
244 Err(e) => {
245 tracing::warn!(
246 flow_id = %flow_id,
247 downstream = %downstream_eid,
248 error = %e,
249 "edge_cancel_reconciler: FCALL failed; retry next cycle"
250 );
251 ReconcileOutcome::Error
252 }
253 }
254 }
255}
256
257fn extract_action(val: &ferriskey::Value) -> Option<&'static str> {
261 let arr = match val {
262 ferriskey::Value::Array(a) => a,
263 _ => return None,
264 };
265 let action_result = arr.get(2)?;
266 let action_val = action_result.as_ref().ok()?;
267 let action = match action_val {
268 ferriskey::Value::BulkString(b) => String::from_utf8_lossy(b).into_owned(),
269 ferriskey::Value::SimpleString(s) => s.clone(),
270 _ => return None,
271 };
272 match action.as_str() {
273 "sremmed_stale" => Some("sremmed_stale"),
274 "completed_drain" => Some("completed_drain"),
275 "no_op" => Some("no_op"),
276 _ => None,
277 }
278}