1use std::collections::HashMap;
16use std::time::Duration;
17
18use ff_core::backend::ScannerFilter;
19use ff_core::keys::IndexKeys;
20use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, execution_partition};
21use ff_core::types::{ExecutionId, LaneId};
22
23use super::{should_skip_candidate, ScanResult, Scanner};
24
25const BATCH_SIZE: u32 = 50;
26const MAX_EDGES_PER_EXEC: usize = 20;
28
29pub struct DependencyReconciler {
30 interval: Duration,
31 lanes: Vec<LaneId>,
32 partition_config: PartitionConfig,
33 filter: ScannerFilter,
34}
35
36impl DependencyReconciler {
37 pub fn new(interval: Duration, lanes: Vec<LaneId>, partition_config: PartitionConfig) -> Self {
38 Self::with_filter(interval, lanes, partition_config, ScannerFilter::default())
39 }
40
41 pub fn with_filter(
44 interval: Duration,
45 lanes: Vec<LaneId>,
46 partition_config: PartitionConfig,
47 filter: ScannerFilter,
48 ) -> Self {
49 Self {
50 interval,
51 lanes,
52 partition_config,
53 filter,
54 }
55 }
56}
57
58impl Scanner for DependencyReconciler {
59 fn name(&self) -> &'static str {
60 "dependency_reconciler"
61 }
62
63 fn interval(&self) -> Duration {
64 self.interval
65 }
66
67 fn filter(&self) -> &ScannerFilter {
68 &self.filter
69 }
70
71 async fn scan_partition(
72 &self,
73 client: &ferriskey::Client,
74 partition: u16,
75 ) -> ScanResult {
76 let p = Partition {
77 family: PartitionFamily::Execution,
78 index: partition,
79 };
80 let idx = IndexKeys::new(&p);
81 let tag = p.hash_tag();
82
83 let mut total_processed: u32 = 0;
84 let mut total_errors: u32 = 0;
85
86 let mut upstream_cache: HashMap<String, String> = HashMap::new();
88
89 for lane in &self.lanes {
90 let blocked_key = idx.lane_blocked_dependencies(lane);
91
92 let blocked: Vec<String> = match client
93 .cmd("ZRANGEBYSCORE")
94 .arg(&blocked_key)
95 .arg("-inf")
96 .arg("+inf")
97 .arg("LIMIT")
98 .arg("0")
99 .arg(BATCH_SIZE.to_string().as_str())
100 .execute()
101 .await
102 {
103 Ok(ids) => ids,
104 Err(e) => {
105 tracing::warn!(
106 partition, error = %e,
107 "dependency_reconciler: ZRANGEBYSCORE blocked:deps failed"
108 );
109 total_errors += 1;
110 continue;
111 }
112 };
113
114 for eid_str in &blocked {
115 if should_skip_candidate(client, &self.filter, partition, eid_str).await {
116 continue;
117 }
118 match reconcile_one_execution(
119 client, &tag, &idx, lane, eid_str,
120 &mut upstream_cache, &self.partition_config,
121 ).await {
122 Ok(n) => total_processed += n,
123 Err(e) => {
124 tracing::warn!(
125 partition,
126 execution_id = eid_str.as_str(),
127 error = %e,
128 "dependency_reconciler: reconcile failed"
129 );
130 total_errors += 1;
131 }
132 }
133 }
134 }
135
136 ScanResult { processed: total_processed, errors: total_errors }
137 }
138}
139
140async fn reconcile_one_execution(
142 client: &ferriskey::Client,
143 tag: &str,
144 idx: &IndexKeys,
145 lane: &LaneId,
146 eid_str: &str,
147 upstream_cache: &mut HashMap<String, String>,
148 config: &PartitionConfig,
149) -> Result<u32, ferriskey::Error> {
150 let deps_unresolved_key = format!("ff:exec:{}:{}:deps:unresolved", tag, eid_str);
151
152 let edge_ids: Vec<String> = client
154 .cmd("SMEMBERS")
155 .arg(&deps_unresolved_key)
156 .execute()
157 .await
158 .unwrap_or_default();
159
160 if edge_ids.is_empty() {
161 return Ok(0);
162 }
163
164 let mut resolved: u32 = 0;
165
166 for (i, edge_id) in edge_ids.iter().enumerate() {
167 if i >= MAX_EDGES_PER_EXEC {
168 break; }
170
171 let dep_key = format!("ff:exec:{}:{}:dep:{}", tag, eid_str, edge_id);
173 let upstream_id: Option<String> = client
174 .cmd("HGET")
175 .arg(&dep_key)
176 .arg("upstream_execution_id")
177 .execute()
178 .await?;
179
180 let upstream_id = match upstream_id {
181 Some(s) if !s.is_empty() => s,
182 _ => continue,
183 };
184
185 let terminal_outcome = get_upstream_outcome(
187 client, &upstream_id, upstream_cache, config,
188 ).await;
189
190 if terminal_outcome.is_empty() {
191 continue; }
193
194 let resolution = terminal_outcome.as_str();
200
201 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
202 Ok(t) => t,
203 Err(_) => continue,
204 };
205
206 let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
208 let deps_meta = format!("ff:exec:{}:{}:deps:meta", tag, eid_str);
209 let eligible_key = idx.lane_eligible(lane);
210 let blocked_deps_key = idx.lane_blocked_dependencies(lane);
211 let terminal_key = idx.lane_terminal(lane);
212
213 let att_idx_str: Option<String> = client
215 .cmd("HGET")
216 .arg(&exec_core)
217 .arg("current_attempt_index")
218 .execute()
219 .await?;
220 let att_idx = att_idx_str.as_deref().unwrap_or("0");
221 let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
222 let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
223
224 let downstream_payload = format!("ff:exec:{}:{}:payload", tag, eid_str);
234 let upstream_result = format!("ff:exec:{}:{}:result", tag, upstream_id);
235 let flow_id_str: Option<String> = client
240 .cmd("HGET")
241 .arg(&exec_core)
242 .arg("flow_id")
243 .execute()
244 .await
245 .unwrap_or_default();
246 let edgegroup_key = match flow_id_str.as_deref() {
247 Some(fid) if !fid.is_empty() => {
248 format!("ff:flow:{}:{}:edgegroup:{}", tag, fid, eid_str)
249 }
250 _ => format!("ff:flow:{}:_nil_:edgegroup:_nil_", tag),
255 };
256 let incoming_set_key = match flow_id_str.as_deref() {
260 Some(fid) if !fid.is_empty() => {
261 format!("ff:flow:{}:{}:in:{}", tag, fid, eid_str)
262 }
263 _ => format!("ff:flow:{}:_nil_:in:_nil_", tag),
264 };
265 let pending_cancel_groups_key =
266 format!("ff:idx:{}:pending_cancel_groups", tag);
267 let flow_id_owned = flow_id_str.clone().unwrap_or_default();
268 let keys: [&str; 14] = [
269 &exec_core, &deps_meta, &deps_unresolved_key, &dep_key, &eligible_key, &terminal_key, &blocked_deps_key, &attempt_hash, &stream_meta, &downstream_payload, &upstream_result, &edgegroup_key, &incoming_set_key, &pending_cancel_groups_key, ];
284 let now_s = now_ms.to_string();
285 let argv: [&str; 5] = [
286 edge_id.as_str(),
287 resolution,
288 &now_s,
289 flow_id_owned.as_str(), eid_str, ];
292
293 match client
294 .fcall::<ferriskey::Value>("ff_resolve_dependency", &keys, &argv)
295 .await
296 {
297 Ok(_) => {
298 resolved += 1;
299 tracing::debug!(
300 execution_id = eid_str,
301 edge_id = edge_id.as_str(),
302 upstream_id = upstream_id.as_str(),
303 resolution,
304 "dependency_reconciler: resolved stale dependency"
305 );
306 }
307 Err(e) => {
308 tracing::warn!(
309 execution_id = eid_str,
310 edge_id = edge_id.as_str(),
311 error = %e,
312 "dependency_reconciler: ff_resolve_dependency failed"
313 );
314 }
315 }
316 }
317
318 Ok(resolved)
319}
320
321async fn get_upstream_outcome(
324 client: &ferriskey::Client,
325 upstream_id: &str,
326 cache: &mut HashMap<String, String>,
327 config: &PartitionConfig,
328) -> String {
329 if let Some(outcome) = cache.get(upstream_id) {
330 return outcome.clone();
331 }
332
333 let eid = match ExecutionId::parse(upstream_id) {
335 Ok(id) => id,
336 Err(_) => {
337 cache.insert(upstream_id.to_owned(), String::new());
338 return String::new();
339 }
340 };
341 let partition = execution_partition(&eid, config);
342 let upstream_tag = partition.hash_tag();
343 let upstream_core = format!("ff:exec:{}:{}:core", upstream_tag, upstream_id);
344
345 let fields: Vec<Option<String>> = client
347 .cmd("HMGET")
348 .arg(&upstream_core)
349 .arg("lifecycle_phase")
350 .arg("terminal_outcome")
351 .execute()
352 .await
353 .unwrap_or_default();
354
355 let lifecycle = fields.first().and_then(|v| v.clone()).unwrap_or_default();
356 let outcome = fields.get(1).and_then(|v| v.clone()).unwrap_or_default();
357
358 let result = if lifecycle == "terminal" && !outcome.is_empty() && outcome != "none" {
359 outcome
360 } else {
361 String::new()
362 };
363
364 cache.insert(upstream_id.to_owned(), result.clone());
365 result
366}
367
368#[cfg(feature = "postgres")]
383pub async fn reconcile_via_postgres(
384 pool: &ff_backend_postgres::PgPool,
385 filter: &ScannerFilter,
386 stale_threshold_ms: Option<i64>,
387) -> Result<
388 ff_backend_postgres::reconcilers::dependency::ReconcileReport,
389 ff_core::engine_error::EngineError,
390> {
391 let threshold = stale_threshold_ms.unwrap_or(
392 ff_backend_postgres::reconcilers::dependency::DEFAULT_STALE_THRESHOLD_MS,
393 );
394 ff_backend_postgres::reconcilers::dependency::reconcile_tick(pool, filter, threshold).await
395}