Skip to main content

ff_engine/scanner/
dependency_reconciler.rs

1//! Dependency resolution reconciler.
2//!
3//! Safety net for cross-partition dependency resolution. When an upstream
4//! execution completes, ff-engine::partition_router normally calls ff_resolve_dependency
5//! on each downstream child. If the engine crashes between the upstream's
6//! completion and the child resolution dispatch, children remain stuck in
7//! blocked_by_dependencies. This reconciler detects and resolves that gap.
8//!
9//! For each blocked execution: reads deps:unresolved SET, cross-partition
10//! reads upstream exec_core to check if terminal, calls ff_resolve_dependency
11//! if upstream is terminal.
12//!
13//! Reference: RFC-007 §Resolve dependency, RFC-010 §6.14
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use ff_core::keys::IndexKeys;
19use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, execution_partition};
20use ff_core::types::{ExecutionId, LaneId};
21
22use super::{ScanResult, Scanner};
23
24const BATCH_SIZE: u32 = 50;
25/// Max dep edges to resolve per execution per cycle.
26const MAX_EDGES_PER_EXEC: usize = 20;
27
28pub struct DependencyReconciler {
29    interval: Duration,
30    lanes: Vec<LaneId>,
31    partition_config: PartitionConfig,
32}
33
34impl DependencyReconciler {
35    pub fn new(interval: Duration, lanes: Vec<LaneId>, partition_config: PartitionConfig) -> Self {
36        Self { interval, lanes, partition_config }
37    }
38}
39
40impl Scanner for DependencyReconciler {
41    fn name(&self) -> &'static str {
42        "dependency_reconciler"
43    }
44
45    fn interval(&self) -> Duration {
46        self.interval
47    }
48
49    async fn scan_partition(
50        &self,
51        client: &ferriskey::Client,
52        partition: u16,
53    ) -> ScanResult {
54        let p = Partition {
55            family: PartitionFamily::Execution,
56            index: partition,
57        };
58        let idx = IndexKeys::new(&p);
59        let tag = p.hash_tag();
60
61        let mut total_processed: u32 = 0;
62        let mut total_errors: u32 = 0;
63
64        // Cross-partition cache: upstream_eid → terminal_outcome (or empty if not terminal)
65        let mut upstream_cache: HashMap<String, String> = HashMap::new();
66
67        for lane in &self.lanes {
68            let blocked_key = idx.lane_blocked_dependencies(lane);
69
70            let blocked: Vec<String> = match client
71                .cmd("ZRANGEBYSCORE")
72                .arg(&blocked_key)
73                .arg("-inf")
74                .arg("+inf")
75                .arg("LIMIT")
76                .arg("0")
77                .arg(BATCH_SIZE.to_string().as_str())
78                .execute()
79                .await
80            {
81                Ok(ids) => ids,
82                Err(e) => {
83                    tracing::warn!(
84                        partition, error = %e,
85                        "dependency_reconciler: ZRANGEBYSCORE blocked:deps failed"
86                    );
87                    total_errors += 1;
88                    continue;
89                }
90            };
91
92            for eid_str in &blocked {
93                match reconcile_one_execution(
94                    client, &tag, &idx, lane, eid_str,
95                    &mut upstream_cache, &self.partition_config,
96                ).await {
97                    Ok(n) => total_processed += n,
98                    Err(e) => {
99                        tracing::warn!(
100                            partition,
101                            execution_id = eid_str.as_str(),
102                            error = %e,
103                            "dependency_reconciler: reconcile failed"
104                        );
105                        total_errors += 1;
106                    }
107                }
108            }
109        }
110
111        ScanResult { processed: total_processed, errors: total_errors }
112    }
113}
114
115/// Reconcile one blocked execution. Returns count of edges resolved.
116async fn reconcile_one_execution(
117    client: &ferriskey::Client,
118    tag: &str,
119    idx: &IndexKeys,
120    lane: &LaneId,
121    eid_str: &str,
122    upstream_cache: &mut HashMap<String, String>,
123    config: &PartitionConfig,
124) -> Result<u32, ferriskey::Error> {
125    let deps_unresolved_key = format!("ff:exec:{}:{}:deps:unresolved", tag, eid_str);
126
127    // Read unresolved dep edge IDs
128    let edge_ids: Vec<String> = client
129        .cmd("SMEMBERS")
130        .arg(&deps_unresolved_key)
131        .execute()
132        .await
133        .unwrap_or_default();
134
135    if edge_ids.is_empty() {
136        return Ok(0);
137    }
138
139    let mut resolved: u32 = 0;
140
141    for (i, edge_id) in edge_ids.iter().enumerate() {
142        if i >= MAX_EDGES_PER_EXEC {
143            break; // limit per cycle
144        }
145
146        // Read the dep edge to find upstream_execution_id
147        let dep_key = format!("ff:exec:{}:{}:dep:{}", tag, eid_str, edge_id);
148        let upstream_id: Option<String> = client
149            .cmd("HGET")
150            .arg(&dep_key)
151            .arg("upstream_execution_id")
152            .execute()
153            .await?;
154
155        let upstream_id = match upstream_id {
156            Some(s) if !s.is_empty() => s,
157            _ => continue,
158        };
159
160        // Check upstream terminal state (cross-partition, cached)
161        let terminal_outcome = get_upstream_outcome(
162            client, &upstream_id, upstream_cache, config,
163        ).await;
164
165        if terminal_outcome.is_empty() {
166            continue; // upstream not terminal yet
167        }
168
169        // Upstream is terminal — resolve the dependency.
170        // Pass the actual terminal_outcome as upstream_outcome ARGV.
171        // The Lua checks == "success" for the satisfaction path; anything
172        // else triggers the impossible path. Using the real outcome
173        // maintains semantic correctness for audit and future extensions.
174        let resolution = terminal_outcome.as_str();
175
176        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
177            Ok(t) => t,
178            Err(_) => continue,
179        };
180
181        // Build KEYS for ff_resolve_dependency
182        let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
183        let deps_meta = format!("ff:exec:{}:{}:deps:meta", tag, eid_str);
184        let eligible_key = idx.lane_eligible(lane);
185        let blocked_deps_key = idx.lane_blocked_dependencies(lane);
186        let terminal_key = idx.lane_terminal(lane);
187
188        // For attempt_hash and stream_meta, read current_attempt_index
189        let att_idx_str: Option<String> = client
190            .cmd("HGET")
191            .arg(&exec_core)
192            .arg("current_attempt_index")
193            .execute()
194            .await?;
195        let att_idx = att_idx_str.as_deref().unwrap_or("0");
196        let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
197        let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
198
199        // KEYS must match Lua positional order:
200        // [1] exec_core, [2] deps_meta, [3] unresolved_set, [4] dep_hash,
201        // [5] eligible_zset, [6] terminal_zset, [7] blocked_deps_zset,
202        // [8] attempt_hash, [9] stream_meta, [10] downstream_payload,
203        // [11] upstream_result
204        // [10]/[11] added for Batch C item 3. Upstream + downstream are
205        // co-located on the same {fp:N} slot via flow membership;
206        // build both keys with the same partition tag.
207        let downstream_payload = format!("ff:exec:{}:{}:payload", tag, eid_str);
208        let upstream_result = format!("ff:exec:{}:{}:result", tag, upstream_id);
209        let keys: [&str; 11] = [
210            &exec_core,           // 1
211            &deps_meta,           // 2
212            &deps_unresolved_key, // 3
213            &dep_key,             // 4
214            &eligible_key,        // 5
215            &terminal_key,        // 6
216            &blocked_deps_key,    // 7
217            &attempt_hash,        // 8
218            &stream_meta,         // 9
219            &downstream_payload,  // 10
220            &upstream_result,     // 11
221        ];
222        let now_s = now_ms.to_string();
223        let argv: [&str; 3] = [edge_id.as_str(), resolution, &now_s];
224
225        match client
226            .fcall::<ferriskey::Value>("ff_resolve_dependency", &keys, &argv)
227            .await
228        {
229            Ok(_) => {
230                resolved += 1;
231                tracing::debug!(
232                    execution_id = eid_str,
233                    edge_id = edge_id.as_str(),
234                    upstream_id = upstream_id.as_str(),
235                    resolution,
236                    "dependency_reconciler: resolved stale dependency"
237                );
238            }
239            Err(e) => {
240                tracing::warn!(
241                    execution_id = eid_str,
242                    edge_id = edge_id.as_str(),
243                    error = %e,
244                    "dependency_reconciler: ff_resolve_dependency failed"
245                );
246            }
247        }
248    }
249
250    Ok(resolved)
251}
252
253/// Get the terminal outcome of an upstream execution (cross-partition, cached).
254/// Returns empty string if not terminal.
255async fn get_upstream_outcome(
256    client: &ferriskey::Client,
257    upstream_id: &str,
258    cache: &mut HashMap<String, String>,
259    config: &PartitionConfig,
260) -> String {
261    if let Some(outcome) = cache.get(upstream_id) {
262        return outcome.clone();
263    }
264
265    // Compute upstream's partition
266    let eid = match ExecutionId::parse(upstream_id) {
267        Ok(id) => id,
268        Err(_) => {
269            cache.insert(upstream_id.to_owned(), String::new());
270            return String::new();
271        }
272    };
273    let partition = execution_partition(&eid, config);
274    let upstream_tag = partition.hash_tag();
275    let upstream_core = format!("ff:exec:{}:{}:core", upstream_tag, upstream_id);
276
277    // Read lifecycle_phase + terminal_outcome
278    let fields: Vec<Option<String>> = client
279        .cmd("HMGET")
280        .arg(&upstream_core)
281        .arg("lifecycle_phase")
282        .arg("terminal_outcome")
283        .execute()
284        .await
285        .unwrap_or_default();
286
287    let lifecycle = fields.first().and_then(|v| v.clone()).unwrap_or_default();
288    let outcome = fields.get(1).and_then(|v| v.clone()).unwrap_or_default();
289
290    let result = if lifecycle == "terminal" && !outcome.is_empty() && outcome != "none" {
291        outcome
292    } else {
293        String::new()
294    };
295
296    cache.insert(upstream_id.to_owned(), result.clone());
297    result
298}