ff_engine/scanner/
edge_cancel_reconciler.rs1use std::time::Duration;
26
27use ff_core::backend::ScannerFilter;
28use ff_core::keys::{FlowIndexKeys, FlowKeyContext};
29use ff_core::partition::{Partition, PartitionFamily};
30use ff_core::types::{ExecutionId, FlowId};
31
32use super::{ScanResult, Scanner};
33
34const BATCH_SIZE: usize = 50;
38
39pub struct EdgeCancelReconciler {
40 interval: Duration,
41 filter: ScannerFilter,
42 metrics: std::sync::Arc<ff_observability::Metrics>,
43}
44
45impl EdgeCancelReconciler {
46 pub fn new(interval: Duration) -> Self {
47 Self::with_filter(interval, ScannerFilter::default())
48 }
49
50 pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
51 Self::with_filter_and_metrics(
52 interval,
53 filter,
54 std::sync::Arc::new(ff_observability::Metrics::new()),
55 )
56 }
57
58 pub fn with_filter_and_metrics(
59 interval: Duration,
60 filter: ScannerFilter,
61 metrics: std::sync::Arc<ff_observability::Metrics>,
62 ) -> Self {
63 Self {
64 interval,
65 filter,
66 metrics,
67 }
68 }
69}
70
71impl Scanner for EdgeCancelReconciler {
72 fn name(&self) -> &'static str {
73 "edge_cancel_reconciler"
74 }
75
76 fn interval(&self) -> Duration {
77 self.interval
78 }
79
80 fn filter(&self) -> &ScannerFilter {
81 &self.filter
82 }
83
84 async fn scan_partition(
85 &self,
86 client: &ferriskey::Client,
87 partition: u16,
88 ) -> ScanResult {
89 let p = Partition {
90 family: PartitionFamily::Flow,
91 index: partition,
92 };
93 let fidx = FlowIndexKeys::new(&p);
94 let pending_key = fidx.pending_cancel_groups();
95
96 let members: Vec<String> = match client
101 .cmd("SRANDMEMBER")
102 .arg(&pending_key)
103 .arg(BATCH_SIZE.to_string().as_str())
104 .execute()
105 .await
106 {
107 Ok(m) => m,
108 Err(e) => {
109 tracing::warn!(
110 partition,
111 error = %e,
112 "edge_cancel_reconciler: SRANDMEMBER pending_cancel_groups failed"
113 );
114 return ScanResult { processed: 0, errors: 1 };
115 }
116 };
117
118 if members.is_empty() {
119 return ScanResult { processed: 0, errors: 0 };
120 }
121
122 let mut processed: u32 = 0;
123 let mut errors: u32 = 0;
124
125 for member in &members {
126 match self
127 .reconcile_one_group(client, &p, &pending_key, member)
128 .await
129 {
130 ReconcileOutcome::Acted => processed += 1,
131 ReconcileOutcome::NoOp => { }
132 ReconcileOutcome::Error => errors += 1,
133 }
134 }
135
136 ScanResult { processed, errors }
137 }
138}
139
140enum ReconcileOutcome {
141 Acted,
143 NoOp,
146 Error,
148}
149
150#[cfg(feature = "postgres")]
157pub async fn reconcile_via_postgres(
158 pool: &ff_backend_postgres::PgPool,
159 filter: &ff_core::backend::ScannerFilter,
160) -> Result<
161 ff_backend_postgres::reconcilers::edge_cancel_reconciler::ReconcileReport,
162 ff_core::engine_error::EngineError,
163> {
164 ff_backend_postgres::reconcilers::edge_cancel_reconciler::reconciler_tick(pool, filter).await
165}
166
167impl EdgeCancelReconciler {
168 async fn reconcile_one_group(
169 &self,
170 client: &ferriskey::Client,
171 flow_p: &Partition,
172 pending_key: &str,
173 member: &str,
174 ) -> ReconcileOutcome {
175 let (flow_id_str, downstream_eid_str) = match member.split_once('|') {
176 Some((f, d)) if !f.is_empty() && !d.is_empty() => (f, d),
177 _ => {
178 tracing::warn!(
179 raw = member,
180 "edge_cancel_reconciler: malformed pending_cancel_groups \
181 member; SREM-ing to avoid poison"
182 );
183 let _: Result<i64, _> = client
184 .cmd("SREM")
185 .arg(pending_key)
186 .arg(member)
187 .execute()
188 .await;
189 return ReconcileOutcome::Error;
190 }
191 };
192
193 let flow_id = match FlowId::parse(flow_id_str) {
194 Ok(id) => id,
195 Err(_) => {
196 let _: Result<i64, _> = client
197 .cmd("SREM")
198 .arg(pending_key)
199 .arg(member)
200 .execute()
201 .await;
202 return ReconcileOutcome::Error;
203 }
204 };
205
206 let downstream_eid = match ExecutionId::parse(downstream_eid_str) {
207 Ok(id) => id,
208 Err(_) => {
209 let _: Result<i64, _> = client
210 .cmd("SREM")
211 .arg(pending_key)
212 .arg(member)
213 .execute()
214 .await;
215 return ReconcileOutcome::Error;
216 }
217 };
218
219 let fctx = FlowKeyContext::new(flow_p, &flow_id);
220 let edgegroup_key = fctx.edgegroup(&downstream_eid);
221 let flow_id_s = flow_id.to_string();
222 let downstream_s = downstream_eid.to_string();
223 let keys = [pending_key, edgegroup_key.as_str()];
224 let argv = [flow_id_s.as_str(), downstream_s.as_str()];
225
226 let reply: Result<ferriskey::Value, _> = client
227 .fcall(
228 "ff_reconcile_sibling_cancel_group",
229 &keys,
230 &argv,
231 )
232 .await;
233
234 match reply {
235 Ok(val) => match extract_action(&val) {
236 Some(action) => {
237 self.metrics.inc_sibling_cancel_reconcile(action);
238 match action {
239 "sremmed_stale" | "completed_drain" => {
240 tracing::debug!(
241 flow_id = %flow_id,
242 downstream = %downstream_eid,
243 action,
244 "edge_cancel_reconciler: action applied"
245 );
246 ReconcileOutcome::Acted
247 }
248 _ => ReconcileOutcome::NoOp,
250 }
251 }
252 None => {
253 tracing::warn!(
254 flow_id = %flow_id,
255 downstream = %downstream_eid,
256 "edge_cancel_reconciler: unparsable FCALL reply"
257 );
258 ReconcileOutcome::Error
259 }
260 },
261 Err(e) => {
262 tracing::warn!(
263 flow_id = %flow_id,
264 downstream = %downstream_eid,
265 error = %e,
266 "edge_cancel_reconciler: FCALL failed; retry next cycle"
267 );
268 ReconcileOutcome::Error
269 }
270 }
271 }
272}
273
274fn extract_action(val: &ferriskey::Value) -> Option<&'static str> {
278 let arr = match val {
279 ferriskey::Value::Array(a) => a,
280 _ => return None,
281 };
282 let action_result = arr.get(2)?;
283 let action_val = action_result.as_ref().ok()?;
284 let action = match action_val {
285 ferriskey::Value::BulkString(b) => String::from_utf8_lossy(b).into_owned(),
286 ferriskey::Value::SimpleString(s) => s.clone(),
287 _ => return None,
288 };
289 match action.as_str() {
290 "sremmed_stale" => Some("sremmed_stale"),
291 "completed_drain" => Some("completed_drain"),
292 "no_op" => Some("no_op"),
293 _ => None,
294 }
295}