Skip to main content

ff_engine/scanner/
dependency_reconciler.rs

1//! Dependency resolution reconciler.
2//!
3//! Safety net for cross-partition dependency resolution. When an upstream
4//! execution completes, ff-engine::partition_router normally calls ff_resolve_dependency
5//! on each downstream child. If the engine crashes between the upstream's
6//! completion and the child resolution dispatch, children remain stuck in
7//! blocked_by_dependencies. This reconciler detects and resolves that gap.
8//!
9//! For each blocked execution: reads deps:unresolved SET, cross-partition
10//! reads upstream exec_core to check if terminal, calls ff_resolve_dependency
11//! if upstream is terminal.
12//!
13//! Reference: RFC-007 §Resolve dependency, RFC-010 §6.14
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use ff_core::backend::ScannerFilter;
19use ff_core::keys::IndexKeys;
20use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, execution_partition};
21use ff_core::types::{ExecutionId, LaneId};
22
23use super::{should_skip_candidate, ScanResult, Scanner};
24
25const BATCH_SIZE: u32 = 50;
26/// Max dep edges to resolve per execution per cycle.
27const MAX_EDGES_PER_EXEC: usize = 20;
28
29pub struct DependencyReconciler {
30    interval: Duration,
31    lanes: Vec<LaneId>,
32    partition_config: PartitionConfig,
33    filter: ScannerFilter,
34}
35
36impl DependencyReconciler {
37    pub fn new(interval: Duration, lanes: Vec<LaneId>, partition_config: PartitionConfig) -> Self {
38        Self::with_filter(interval, lanes, partition_config, ScannerFilter::default())
39    }
40
41    /// Construct with a [`ScannerFilter`] applied per candidate
42    /// (issue #122).
43    pub fn with_filter(
44        interval: Duration,
45        lanes: Vec<LaneId>,
46        partition_config: PartitionConfig,
47        filter: ScannerFilter,
48    ) -> Self {
49        Self {
50            interval,
51            lanes,
52            partition_config,
53            filter,
54        }
55    }
56}
57
58impl Scanner for DependencyReconciler {
59    fn name(&self) -> &'static str {
60        "dependency_reconciler"
61    }
62
63    fn interval(&self) -> Duration {
64        self.interval
65    }
66
67    fn filter(&self) -> &ScannerFilter {
68        &self.filter
69    }
70
71    async fn scan_partition(
72        &self,
73        client: &ferriskey::Client,
74        partition: u16,
75    ) -> ScanResult {
76        let p = Partition {
77            family: PartitionFamily::Execution,
78            index: partition,
79        };
80        let idx = IndexKeys::new(&p);
81        let tag = p.hash_tag();
82
83        let mut total_processed: u32 = 0;
84        let mut total_errors: u32 = 0;
85
86        // Cross-partition cache: upstream_eid → terminal_outcome (or empty if not terminal)
87        let mut upstream_cache: HashMap<String, String> = HashMap::new();
88
89        for lane in &self.lanes {
90            let blocked_key = idx.lane_blocked_dependencies(lane);
91
92            let blocked: Vec<String> = match client
93                .cmd("ZRANGEBYSCORE")
94                .arg(&blocked_key)
95                .arg("-inf")
96                .arg("+inf")
97                .arg("LIMIT")
98                .arg("0")
99                .arg(BATCH_SIZE.to_string().as_str())
100                .execute()
101                .await
102            {
103                Ok(ids) => ids,
104                Err(e) => {
105                    tracing::warn!(
106                        partition, error = %e,
107                        "dependency_reconciler: ZRANGEBYSCORE blocked:deps failed"
108                    );
109                    total_errors += 1;
110                    continue;
111                }
112            };
113
114            for eid_str in &blocked {
115                if should_skip_candidate(client, &self.filter, partition, eid_str).await {
116                    continue;
117                }
118                match reconcile_one_execution(
119                    client, &tag, &idx, lane, eid_str,
120                    &mut upstream_cache, &self.partition_config,
121                ).await {
122                    Ok(n) => total_processed += n,
123                    Err(e) => {
124                        tracing::warn!(
125                            partition,
126                            execution_id = eid_str.as_str(),
127                            error = %e,
128                            "dependency_reconciler: reconcile failed"
129                        );
130                        total_errors += 1;
131                    }
132                }
133            }
134        }
135
136        ScanResult { processed: total_processed, errors: total_errors }
137    }
138}
139
140/// Reconcile one blocked execution. Returns count of edges resolved.
141async fn reconcile_one_execution(
142    client: &ferriskey::Client,
143    tag: &str,
144    idx: &IndexKeys,
145    lane: &LaneId,
146    eid_str: &str,
147    upstream_cache: &mut HashMap<String, String>,
148    config: &PartitionConfig,
149) -> Result<u32, ferriskey::Error> {
150    let deps_unresolved_key = format!("ff:exec:{}:{}:deps:unresolved", tag, eid_str);
151
152    // Read unresolved dep edge IDs
153    let edge_ids: Vec<String> = client
154        .cmd("SMEMBERS")
155        .arg(&deps_unresolved_key)
156        .execute()
157        .await
158        .unwrap_or_default();
159
160    if edge_ids.is_empty() {
161        return Ok(0);
162    }
163
164    let mut resolved: u32 = 0;
165
166    for (i, edge_id) in edge_ids.iter().enumerate() {
167        if i >= MAX_EDGES_PER_EXEC {
168            break; // limit per cycle
169        }
170
171        // Read the dep edge to find upstream_execution_id
172        let dep_key = format!("ff:exec:{}:{}:dep:{}", tag, eid_str, edge_id);
173        let upstream_id: Option<String> = client
174            .cmd("HGET")
175            .arg(&dep_key)
176            .arg("upstream_execution_id")
177            .execute()
178            .await?;
179
180        let upstream_id = match upstream_id {
181            Some(s) if !s.is_empty() => s,
182            _ => continue,
183        };
184
185        // Check upstream terminal state (cross-partition, cached)
186        let terminal_outcome = get_upstream_outcome(
187            client, &upstream_id, upstream_cache, config,
188        ).await;
189
190        if terminal_outcome.is_empty() {
191            continue; // upstream not terminal yet
192        }
193
194        // Upstream is terminal — resolve the dependency.
195        // Pass the actual terminal_outcome as upstream_outcome ARGV.
196        // The Lua checks == "success" for the satisfaction path; anything
197        // else triggers the impossible path. Using the real outcome
198        // maintains semantic correctness for audit and future extensions.
199        let resolution = terminal_outcome.as_str();
200
201        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
202            Ok(t) => t,
203            Err(_) => continue,
204        };
205
206        // Build KEYS for ff_resolve_dependency
207        let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
208        let deps_meta = format!("ff:exec:{}:{}:deps:meta", tag, eid_str);
209        let eligible_key = idx.lane_eligible(lane);
210        let blocked_deps_key = idx.lane_blocked_dependencies(lane);
211        let terminal_key = idx.lane_terminal(lane);
212
213        // For attempt_hash and stream_meta, read current_attempt_index
214        let att_idx_str: Option<String> = client
215            .cmd("HGET")
216            .arg(&exec_core)
217            .arg("current_attempt_index")
218            .execute()
219            .await?;
220        let att_idx = att_idx_str.as_deref().unwrap_or("0");
221        let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
222        let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
223
224        // KEYS must match Lua positional order:
225        // [1] exec_core, [2] deps_meta, [3] unresolved_set, [4] dep_hash,
226        // [5] eligible_zset, [6] terminal_zset, [7] blocked_deps_zset,
227        // [8] attempt_hash, [9] stream_meta, [10] downstream_payload,
228        // [11] upstream_result
229        // [10]/[11] added for Batch C item 3. Upstream + downstream are
230        // co-located on the same {fp:N} slot via flow membership;
231        // build both keys with the same partition tag.
232        let downstream_payload = format!("ff:exec:{}:{}:payload", tag, eid_str);
233        let upstream_result = format!("ff:exec:{}:{}:result", tag, upstream_id);
234        let keys: [&str; 11] = [
235            &exec_core,           // 1
236            &deps_meta,           // 2
237            &deps_unresolved_key, // 3
238            &dep_key,             // 4
239            &eligible_key,        // 5
240            &terminal_key,        // 6
241            &blocked_deps_key,    // 7
242            &attempt_hash,        // 8
243            &stream_meta,         // 9
244            &downstream_payload,  // 10
245            &upstream_result,     // 11
246        ];
247        let now_s = now_ms.to_string();
248        let argv: [&str; 3] = [edge_id.as_str(), resolution, &now_s];
249
250        match client
251            .fcall::<ferriskey::Value>("ff_resolve_dependency", &keys, &argv)
252            .await
253        {
254            Ok(_) => {
255                resolved += 1;
256                tracing::debug!(
257                    execution_id = eid_str,
258                    edge_id = edge_id.as_str(),
259                    upstream_id = upstream_id.as_str(),
260                    resolution,
261                    "dependency_reconciler: resolved stale dependency"
262                );
263            }
264            Err(e) => {
265                tracing::warn!(
266                    execution_id = eid_str,
267                    edge_id = edge_id.as_str(),
268                    error = %e,
269                    "dependency_reconciler: ff_resolve_dependency failed"
270                );
271            }
272        }
273    }
274
275    Ok(resolved)
276}
277
278/// Get the terminal outcome of an upstream execution (cross-partition, cached).
279/// Returns empty string if not terminal.
280async fn get_upstream_outcome(
281    client: &ferriskey::Client,
282    upstream_id: &str,
283    cache: &mut HashMap<String, String>,
284    config: &PartitionConfig,
285) -> String {
286    if let Some(outcome) = cache.get(upstream_id) {
287        return outcome.clone();
288    }
289
290    // Compute upstream's partition
291    let eid = match ExecutionId::parse(upstream_id) {
292        Ok(id) => id,
293        Err(_) => {
294            cache.insert(upstream_id.to_owned(), String::new());
295            return String::new();
296        }
297    };
298    let partition = execution_partition(&eid, config);
299    let upstream_tag = partition.hash_tag();
300    let upstream_core = format!("ff:exec:{}:{}:core", upstream_tag, upstream_id);
301
302    // Read lifecycle_phase + terminal_outcome
303    let fields: Vec<Option<String>> = client
304        .cmd("HMGET")
305        .arg(&upstream_core)
306        .arg("lifecycle_phase")
307        .arg("terminal_outcome")
308        .execute()
309        .await
310        .unwrap_or_default();
311
312    let lifecycle = fields.first().and_then(|v| v.clone()).unwrap_or_default();
313    let outcome = fields.get(1).and_then(|v| v.clone()).unwrap_or_default();
314
315    let result = if lifecycle == "terminal" && !outcome.is_empty() && outcome != "none" {
316        outcome
317    } else {
318        String::new()
319    };
320
321    cache.insert(upstream_id.to_owned(), result.clone());
322    result
323}