1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Duration;
18
19use ff_core::backend::ScannerFilter;
20use ff_core::contracts::ResolveDependencyArgs;
21use ff_core::engine_backend::EngineBackend;
22use ff_core::keys::IndexKeys;
23use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, execution_partition};
24use ff_core::types::{AttemptIndex, EdgeId, ExecutionId, FlowId, LaneId, TimestampMs};
25
26use super::{should_skip_candidate, ScanResult, Scanner};
27
28const BATCH_SIZE: u32 = 50;
29const MAX_EDGES_PER_EXEC: usize = 20;
31
32pub struct DependencyReconciler {
33 interval: Duration,
34 lanes: Vec<LaneId>,
35 partition_config: PartitionConfig,
36 filter: ScannerFilter,
37 backend: Option<Arc<dyn EngineBackend>>,
38}
39
40impl DependencyReconciler {
41 pub fn new(interval: Duration, lanes: Vec<LaneId>, partition_config: PartitionConfig) -> Self {
42 Self::with_filter(interval, lanes, partition_config, ScannerFilter::default())
43 }
44
45 pub fn with_filter(
48 interval: Duration,
49 lanes: Vec<LaneId>,
50 partition_config: PartitionConfig,
51 filter: ScannerFilter,
52 ) -> Self {
53 Self {
54 interval,
55 lanes,
56 partition_config,
57 filter,
58 backend: None,
59 }
60 }
61
62 pub fn with_filter_and_backend(
65 interval: Duration,
66 lanes: Vec<LaneId>,
67 partition_config: PartitionConfig,
68 filter: ScannerFilter,
69 backend: Arc<dyn EngineBackend>,
70 ) -> Self {
71 Self {
72 interval,
73 lanes,
74 partition_config,
75 filter,
76 backend: Some(backend),
77 }
78 }
79}
80
81impl Scanner for DependencyReconciler {
82 fn name(&self) -> &'static str {
83 "dependency_reconciler"
84 }
85
86 fn interval(&self) -> Duration {
87 self.interval
88 }
89
90 fn filter(&self) -> &ScannerFilter {
91 &self.filter
92 }
93
94 async fn scan_partition(
95 &self,
96 client: &ferriskey::Client,
97 partition: u16,
98 ) -> ScanResult {
99 let p = Partition {
100 family: PartitionFamily::Execution,
101 index: partition,
102 };
103 let idx = IndexKeys::new(&p);
104 let tag = p.hash_tag();
105
106 let mut total_processed: u32 = 0;
107 let mut total_errors: u32 = 0;
108
109 let mut upstream_cache: HashMap<String, String> = HashMap::new();
111
112 for lane in &self.lanes {
113 let blocked_key = idx.lane_blocked_dependencies(lane);
114
115 let blocked: Vec<String> = match client
116 .cmd("ZRANGEBYSCORE")
117 .arg(&blocked_key)
118 .arg("-inf")
119 .arg("+inf")
120 .arg("LIMIT")
121 .arg("0")
122 .arg(BATCH_SIZE.to_string().as_str())
123 .execute()
124 .await
125 {
126 Ok(ids) => ids,
127 Err(e) => {
128 tracing::warn!(
129 partition, error = %e,
130 "dependency_reconciler: ZRANGEBYSCORE blocked:deps failed"
131 );
132 total_errors += 1;
133 continue;
134 }
135 };
136
137 for eid_str in &blocked {
138 if should_skip_candidate(self.backend.as_ref(), &self.filter, partition, eid_str).await {
139 continue;
140 }
141 match reconcile_one_execution(
142 client, self.backend.as_ref(), &p, &tag, &idx, lane, eid_str,
143 &mut upstream_cache, &self.partition_config,
144 ).await {
145 Ok(n) => total_processed += n,
146 Err(e) => {
147 tracing::warn!(
148 partition,
149 execution_id = eid_str.as_str(),
150 error = %e,
151 "dependency_reconciler: reconcile failed"
152 );
153 total_errors += 1;
154 }
155 }
156 }
157 }
158
159 ScanResult { processed: total_processed, errors: total_errors }
160 }
161}
162
163#[allow(clippy::too_many_arguments)]
165async fn reconcile_one_execution(
166 client: &ferriskey::Client,
167 backend: Option<&Arc<dyn EngineBackend>>,
168 partition: &Partition,
169 tag: &str,
170 idx: &IndexKeys,
171 lane: &LaneId,
172 eid_str: &str,
173 upstream_cache: &mut HashMap<String, String>,
174 config: &PartitionConfig,
175) -> Result<u32, ferriskey::Error> {
176 let deps_unresolved_key = format!("ff:exec:{}:{}:deps:unresolved", tag, eid_str);
177
178 let edge_ids: Vec<String> = client
180 .cmd("SMEMBERS")
181 .arg(&deps_unresolved_key)
182 .execute()
183 .await
184 .unwrap_or_default();
185
186 if edge_ids.is_empty() {
187 return Ok(0);
188 }
189
190 let mut resolved: u32 = 0;
191
192 for (i, edge_id) in edge_ids.iter().enumerate() {
193 if i >= MAX_EDGES_PER_EXEC {
194 break; }
196
197 let dep_key = format!("ff:exec:{}:{}:dep:{}", tag, eid_str, edge_id);
199 let upstream_id: Option<String> = client
200 .cmd("HGET")
201 .arg(&dep_key)
202 .arg("upstream_execution_id")
203 .execute()
204 .await?;
205
206 let upstream_id = match upstream_id {
207 Some(s) if !s.is_empty() => s,
208 _ => continue,
209 };
210
211 let terminal_outcome = get_upstream_outcome(
213 client, &upstream_id, upstream_cache, config,
214 ).await;
215
216 if terminal_outcome.is_empty() {
217 continue; }
219
220 let resolution = terminal_outcome.as_str();
226
227 let now_ms_res: Result<u64, String> = if let Some(b) = backend {
228 b.server_time_ms().await.map_err(|e| e.to_string())
229 } else {
230 crate::scanner::lease_expiry::server_time_ms_legacy(client).await.map_err(|e| e.to_string())
231 };
232 let now_ms = match now_ms_res {
233 Ok(t) => t,
234 Err(_) => continue,
235 };
236
237 let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
241 let att_idx_str: Option<String> = client
242 .cmd("HGET")
243 .arg(&exec_core)
244 .arg("current_attempt_index")
245 .execute()
246 .await?;
247 let att_idx_n: u32 = att_idx_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0);
248 let flow_id_str: Option<String> = client
249 .cmd("HGET")
250 .arg(&exec_core)
251 .arg("flow_id")
252 .execute()
253 .await
254 .unwrap_or_default();
255
256 if let Some(backend_arc) = backend {
257 let Ok(downstream_eid) = ExecutionId::parse(eid_str) else {
263 tracing::warn!(execution_id = eid_str, "malformed eid; skipping");
264 continue;
265 };
266 let Ok(upstream_eid) = ExecutionId::parse(&upstream_id) else {
267 tracing::warn!(upstream_id = %upstream_id, "malformed upstream eid; skipping");
268 continue;
269 };
270 let flow_id = match flow_id_str.as_deref() {
281 Some(s) if !s.is_empty() => match FlowId::parse(s) {
282 Ok(fid) => fid,
283 Err(_) => {
284 tracing::warn!(
285 execution_id = eid_str,
286 flow_id = s,
287 "dependency_reconciler: malformed flow_id; skipping"
288 );
289 continue;
290 }
291 },
292 _ => {
293 tracing::warn!(
294 execution_id = eid_str,
295 edge_id = edge_id.as_str(),
296 "dependency_reconciler: unresolved dep edge on standalone execution \
297 (missing flow_id) — likely data corruption; skipping"
298 );
299 continue;
300 }
301 };
302 let Ok(edge_id_parsed) = EdgeId::parse(edge_id) else {
303 tracing::warn!(edge_id = edge_id.as_str(), "malformed edge_id; skipping");
304 continue;
305 };
306 let args = ResolveDependencyArgs::new(
307 *partition,
308 flow_id,
309 downstream_eid,
310 upstream_eid,
311 edge_id_parsed,
312 lane.clone(),
313 AttemptIndex::new(att_idx_n),
314 terminal_outcome.clone(),
315 TimestampMs::from_millis(now_ms as i64),
316 );
317 match backend_arc.resolve_dependency(args).await {
318 Ok(outcome) => {
319 resolved += 1;
320 tracing::debug!(
321 execution_id = eid_str,
322 edge_id = edge_id.as_str(),
323 upstream_id = upstream_id.as_str(),
324 resolution = resolution,
325 ?outcome,
326 "dependency_reconciler: resolved stale dependency"
327 );
328 }
329 Err(e) => {
330 tracing::warn!(
331 execution_id = eid_str,
332 edge_id = edge_id.as_str(),
333 error = %e,
334 "dependency_reconciler: resolve_dependency failed"
335 );
336 }
337 }
338 continue;
339 }
340
341 let deps_meta = format!("ff:exec:{}:{}:deps:meta", tag, eid_str);
346 let eligible_key = idx.lane_eligible(lane);
347 let blocked_deps_key = idx.lane_blocked_dependencies(lane);
348 let terminal_key = idx.lane_terminal(lane);
349
350 let att_idx = att_idx_str.as_deref().unwrap_or("0");
351 let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
352 let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
353
354 let downstream_payload = format!("ff:exec:{}:{}:payload", tag, eid_str);
355 let upstream_result = format!("ff:exec:{}:{}:result", tag, upstream_id);
356 let edgegroup_key = match flow_id_str.as_deref() {
357 Some(fid) if !fid.is_empty() => {
358 format!("ff:flow:{}:{}:edgegroup:{}", tag, fid, eid_str)
359 }
360 _ => format!("ff:flow:{}:_nil_:edgegroup:_nil_", tag),
361 };
362 let incoming_set_key = match flow_id_str.as_deref() {
363 Some(fid) if !fid.is_empty() => {
364 format!("ff:flow:{}:{}:in:{}", tag, fid, eid_str)
365 }
366 _ => format!("ff:flow:{}:_nil_:in:_nil_", tag),
367 };
368 let pending_cancel_groups_key =
369 format!("ff:idx:{}:pending_cancel_groups", tag);
370 let flow_id_owned = flow_id_str.clone().unwrap_or_default();
371 let keys: [&str; 14] = [
372 &exec_core,
373 &deps_meta,
374 &deps_unresolved_key,
375 &dep_key,
376 &eligible_key,
377 &terminal_key,
378 &blocked_deps_key,
379 &attempt_hash,
380 &stream_meta,
381 &downstream_payload,
382 &upstream_result,
383 &edgegroup_key,
384 &incoming_set_key,
385 &pending_cancel_groups_key,
386 ];
387 let now_s = now_ms.to_string();
388 let argv: [&str; 5] = [
389 edge_id.as_str(),
390 resolution,
391 &now_s,
392 flow_id_owned.as_str(),
393 eid_str,
394 ];
395
396 match client
397 .fcall::<ferriskey::Value>("ff_resolve_dependency", &keys, &argv)
398 .await
399 {
400 Ok(_) => {
401 resolved += 1;
402 tracing::debug!(
403 execution_id = eid_str,
404 edge_id = edge_id.as_str(),
405 upstream_id = upstream_id.as_str(),
406 resolution,
407 "dependency_reconciler: resolved stale dependency (fallback)"
408 );
409 }
410 Err(e) => {
411 tracing::warn!(
412 execution_id = eid_str,
413 edge_id = edge_id.as_str(),
414 error = %e,
415 "dependency_reconciler: ff_resolve_dependency failed (fallback)"
416 );
417 }
418 }
419 }
420
421 Ok(resolved)
422}
423
424async fn get_upstream_outcome(
427 client: &ferriskey::Client,
428 upstream_id: &str,
429 cache: &mut HashMap<String, String>,
430 config: &PartitionConfig,
431) -> String {
432 if let Some(outcome) = cache.get(upstream_id) {
433 return outcome.clone();
434 }
435
436 let eid = match ExecutionId::parse(upstream_id) {
438 Ok(id) => id,
439 Err(_) => {
440 cache.insert(upstream_id.to_owned(), String::new());
441 return String::new();
442 }
443 };
444 let partition = execution_partition(&eid, config);
445 let upstream_tag = partition.hash_tag();
446 let upstream_core = format!("ff:exec:{}:{}:core", upstream_tag, upstream_id);
447
448 let fields: Vec<Option<String>> = client
450 .cmd("HMGET")
451 .arg(&upstream_core)
452 .arg("lifecycle_phase")
453 .arg("terminal_outcome")
454 .execute()
455 .await
456 .unwrap_or_default();
457
458 let lifecycle = fields.first().and_then(|v| v.clone()).unwrap_or_default();
459 let outcome = fields.get(1).and_then(|v| v.clone()).unwrap_or_default();
460
461 let result = if lifecycle == "terminal" && !outcome.is_empty() && outcome != "none" {
462 outcome
463 } else {
464 String::new()
465 };
466
467 cache.insert(upstream_id.to_owned(), result.clone());
468 result
469}
470
471#[cfg(feature = "postgres")]
486pub async fn reconcile_via_postgres(
487 pool: &ff_backend_postgres::PgPool,
488 filter: &ScannerFilter,
489 stale_threshold_ms: Option<i64>,
490) -> Result<
491 ff_backend_postgres::reconcilers::dependency::ReconcileReport,
492 ff_core::engine_error::EngineError,
493> {
494 let threshold = stale_threshold_ms.unwrap_or(
495 ff_backend_postgres::reconcilers::dependency::DEFAULT_STALE_THRESHOLD_MS,
496 );
497 ff_backend_postgres::reconcilers::dependency::reconcile_tick(pool, filter, threshold).await
498}