1use std::collections::HashMap;
16use std::time::Duration;
17
18use ff_core::keys::IndexKeys;
19use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, execution_partition};
20use ff_core::types::{ExecutionId, LaneId};
21
22use super::{ScanResult, Scanner};
23
24const BATCH_SIZE: u32 = 50;
25const MAX_EDGES_PER_EXEC: usize = 20;
27
28pub struct DependencyReconciler {
29 interval: Duration,
30 lanes: Vec<LaneId>,
31 partition_config: PartitionConfig,
32}
33
34impl DependencyReconciler {
35 pub fn new(interval: Duration, lanes: Vec<LaneId>, partition_config: PartitionConfig) -> Self {
36 Self { interval, lanes, partition_config }
37 }
38}
39
40impl Scanner for DependencyReconciler {
41 fn name(&self) -> &'static str {
42 "dependency_reconciler"
43 }
44
45 fn interval(&self) -> Duration {
46 self.interval
47 }
48
49 async fn scan_partition(
50 &self,
51 client: &ferriskey::Client,
52 partition: u16,
53 ) -> ScanResult {
54 let p = Partition {
55 family: PartitionFamily::Execution,
56 index: partition,
57 };
58 let idx = IndexKeys::new(&p);
59 let tag = p.hash_tag();
60
61 let mut total_processed: u32 = 0;
62 let mut total_errors: u32 = 0;
63
64 let mut upstream_cache: HashMap<String, String> = HashMap::new();
66
67 for lane in &self.lanes {
68 let blocked_key = idx.lane_blocked_dependencies(lane);
69
70 let blocked: Vec<String> = match client
71 .cmd("ZRANGEBYSCORE")
72 .arg(&blocked_key)
73 .arg("-inf")
74 .arg("+inf")
75 .arg("LIMIT")
76 .arg("0")
77 .arg(BATCH_SIZE.to_string().as_str())
78 .execute()
79 .await
80 {
81 Ok(ids) => ids,
82 Err(e) => {
83 tracing::warn!(
84 partition, error = %e,
85 "dependency_reconciler: ZRANGEBYSCORE blocked:deps failed"
86 );
87 total_errors += 1;
88 continue;
89 }
90 };
91
92 for eid_str in &blocked {
93 match reconcile_one_execution(
94 client, &tag, &idx, lane, eid_str,
95 &mut upstream_cache, &self.partition_config,
96 ).await {
97 Ok(n) => total_processed += n,
98 Err(e) => {
99 tracing::warn!(
100 partition,
101 execution_id = eid_str.as_str(),
102 error = %e,
103 "dependency_reconciler: reconcile failed"
104 );
105 total_errors += 1;
106 }
107 }
108 }
109 }
110
111 ScanResult { processed: total_processed, errors: total_errors }
112 }
113}
114
115async fn reconcile_one_execution(
117 client: &ferriskey::Client,
118 tag: &str,
119 idx: &IndexKeys,
120 lane: &LaneId,
121 eid_str: &str,
122 upstream_cache: &mut HashMap<String, String>,
123 config: &PartitionConfig,
124) -> Result<u32, ferriskey::Error> {
125 let deps_unresolved_key = format!("ff:exec:{}:{}:deps:unresolved", tag, eid_str);
126
127 let edge_ids: Vec<String> = client
129 .cmd("SMEMBERS")
130 .arg(&deps_unresolved_key)
131 .execute()
132 .await
133 .unwrap_or_default();
134
135 if edge_ids.is_empty() {
136 return Ok(0);
137 }
138
139 let mut resolved: u32 = 0;
140
141 for (i, edge_id) in edge_ids.iter().enumerate() {
142 if i >= MAX_EDGES_PER_EXEC {
143 break; }
145
146 let dep_key = format!("ff:exec:{}:{}:dep:{}", tag, eid_str, edge_id);
148 let upstream_id: Option<String> = client
149 .cmd("HGET")
150 .arg(&dep_key)
151 .arg("upstream_execution_id")
152 .execute()
153 .await?;
154
155 let upstream_id = match upstream_id {
156 Some(s) if !s.is_empty() => s,
157 _ => continue,
158 };
159
160 let terminal_outcome = get_upstream_outcome(
162 client, &upstream_id, upstream_cache, config,
163 ).await;
164
165 if terminal_outcome.is_empty() {
166 continue; }
168
169 let resolution = terminal_outcome.as_str();
175
176 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
177 Ok(t) => t,
178 Err(_) => continue,
179 };
180
181 let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
183 let deps_meta = format!("ff:exec:{}:{}:deps:meta", tag, eid_str);
184 let eligible_key = idx.lane_eligible(lane);
185 let blocked_deps_key = idx.lane_blocked_dependencies(lane);
186 let terminal_key = idx.lane_terminal(lane);
187
188 let att_idx_str: Option<String> = client
190 .cmd("HGET")
191 .arg(&exec_core)
192 .arg("current_attempt_index")
193 .execute()
194 .await?;
195 let att_idx = att_idx_str.as_deref().unwrap_or("0");
196 let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
197 let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
198
199 let downstream_payload = format!("ff:exec:{}:{}:payload", tag, eid_str);
208 let upstream_result = format!("ff:exec:{}:{}:result", tag, upstream_id);
209 let keys: [&str; 11] = [
210 &exec_core, &deps_meta, &deps_unresolved_key, &dep_key, &eligible_key, &terminal_key, &blocked_deps_key, &attempt_hash, &stream_meta, &downstream_payload, &upstream_result, ];
222 let now_s = now_ms.to_string();
223 let argv: [&str; 3] = [edge_id.as_str(), resolution, &now_s];
224
225 match client
226 .fcall::<ferriskey::Value>("ff_resolve_dependency", &keys, &argv)
227 .await
228 {
229 Ok(_) => {
230 resolved += 1;
231 tracing::debug!(
232 execution_id = eid_str,
233 edge_id = edge_id.as_str(),
234 upstream_id = upstream_id.as_str(),
235 resolution,
236 "dependency_reconciler: resolved stale dependency"
237 );
238 }
239 Err(e) => {
240 tracing::warn!(
241 execution_id = eid_str,
242 edge_id = edge_id.as_str(),
243 error = %e,
244 "dependency_reconciler: ff_resolve_dependency failed"
245 );
246 }
247 }
248 }
249
250 Ok(resolved)
251}
252
253async fn get_upstream_outcome(
256 client: &ferriskey::Client,
257 upstream_id: &str,
258 cache: &mut HashMap<String, String>,
259 config: &PartitionConfig,
260) -> String {
261 if let Some(outcome) = cache.get(upstream_id) {
262 return outcome.clone();
263 }
264
265 let eid = match ExecutionId::parse(upstream_id) {
267 Ok(id) => id,
268 Err(_) => {
269 cache.insert(upstream_id.to_owned(), String::new());
270 return String::new();
271 }
272 };
273 let partition = execution_partition(&eid, config);
274 let upstream_tag = partition.hash_tag();
275 let upstream_core = format!("ff:exec:{}:{}:core", upstream_tag, upstream_id);
276
277 let fields: Vec<Option<String>> = client
279 .cmd("HMGET")
280 .arg(&upstream_core)
281 .arg("lifecycle_phase")
282 .arg("terminal_outcome")
283 .execute()
284 .await
285 .unwrap_or_default();
286
287 let lifecycle = fields.first().and_then(|v| v.clone()).unwrap_or_default();
288 let outcome = fields.get(1).and_then(|v| v.clone()).unwrap_or_default();
289
290 let result = if lifecycle == "terminal" && !outcome.is_empty() && outcome != "none" {
291 outcome
292 } else {
293 String::new()
294 };
295
296 cache.insert(upstream_id.to_owned(), result.clone());
297 result
298}