Skip to main content

ff_engine/scanner/
dependency_reconciler.rs

1//! Dependency resolution reconciler.
2//!
3//! Safety net for cross-partition dependency resolution. When an upstream
4//! execution completes, ff-engine::partition_router normally calls ff_resolve_dependency
5//! on each downstream child. If the engine crashes between the upstream's
6//! completion and the child resolution dispatch, children remain stuck in
7//! blocked_by_dependencies. This reconciler detects and resolves that gap.
8//!
9//! For each blocked execution: reads deps:unresolved SET, cross-partition
10//! reads upstream exec_core to check if terminal, calls ff_resolve_dependency
11//! if upstream is terminal.
12//!
13//! Reference: RFC-007 §Resolve dependency, RFC-010 §6.14
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Duration;
18
19use ff_core::backend::ScannerFilter;
20use ff_core::contracts::ResolveDependencyArgs;
21use ff_core::engine_backend::EngineBackend;
22use ff_core::keys::IndexKeys;
23use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, execution_partition};
24use ff_core::types::{AttemptIndex, EdgeId, ExecutionId, FlowId, LaneId, TimestampMs};
25
26use super::{should_skip_candidate, ScanResult, Scanner};
27
28const BATCH_SIZE: u32 = 50;
29/// Max dep edges to resolve per execution per cycle.
30const MAX_EDGES_PER_EXEC: usize = 20;
31
32pub struct DependencyReconciler {
33    interval: Duration,
34    lanes: Vec<LaneId>,
35    partition_config: PartitionConfig,
36    filter: ScannerFilter,
37    backend: Option<Arc<dyn EngineBackend>>,
38}
39
40impl DependencyReconciler {
41    pub fn new(interval: Duration, lanes: Vec<LaneId>, partition_config: PartitionConfig) -> Self {
42        Self::with_filter(interval, lanes, partition_config, ScannerFilter::default())
43    }
44
45    /// Construct with a [`ScannerFilter`] applied per candidate
46    /// (issue #122).
47    pub fn with_filter(
48        interval: Duration,
49        lanes: Vec<LaneId>,
50        partition_config: PartitionConfig,
51        filter: ScannerFilter,
52    ) -> Self {
53        Self {
54            interval,
55            lanes,
56            partition_config,
57            filter,
58            backend: None,
59        }
60    }
61
62    /// PR-7b Cluster 1: wire an `EngineBackend` for filter-resolution
63    /// reads. FCALL routing is cluster 2 scope.
64    pub fn with_filter_and_backend(
65        interval: Duration,
66        lanes: Vec<LaneId>,
67        partition_config: PartitionConfig,
68        filter: ScannerFilter,
69        backend: Arc<dyn EngineBackend>,
70    ) -> Self {
71        Self {
72            interval,
73            lanes,
74            partition_config,
75            filter,
76            backend: Some(backend),
77        }
78    }
79}
80
81impl Scanner for DependencyReconciler {
82    fn name(&self) -> &'static str {
83        "dependency_reconciler"
84    }
85
86    fn interval(&self) -> Duration {
87        self.interval
88    }
89
90    fn filter(&self) -> &ScannerFilter {
91        &self.filter
92    }
93
94    async fn scan_partition(
95        &self,
96        client: &ferriskey::Client,
97        partition: u16,
98    ) -> ScanResult {
99        let p = Partition {
100            family: PartitionFamily::Execution,
101            index: partition,
102        };
103        let idx = IndexKeys::new(&p);
104        let tag = p.hash_tag();
105
106        let mut total_processed: u32 = 0;
107        let mut total_errors: u32 = 0;
108
109        // Cross-partition cache: upstream_eid → terminal_outcome (or empty if not terminal)
110        let mut upstream_cache: HashMap<String, String> = HashMap::new();
111
112        for lane in &self.lanes {
113            let blocked_key = idx.lane_blocked_dependencies(lane);
114
115            let blocked: Vec<String> = match client
116                .cmd("ZRANGEBYSCORE")
117                .arg(&blocked_key)
118                .arg("-inf")
119                .arg("+inf")
120                .arg("LIMIT")
121                .arg("0")
122                .arg(BATCH_SIZE.to_string().as_str())
123                .execute()
124                .await
125            {
126                Ok(ids) => ids,
127                Err(e) => {
128                    tracing::warn!(
129                        partition, error = %e,
130                        "dependency_reconciler: ZRANGEBYSCORE blocked:deps failed"
131                    );
132                    total_errors += 1;
133                    continue;
134                }
135            };
136
137            for eid_str in &blocked {
138                if should_skip_candidate(self.backend.as_ref(), &self.filter, partition, eid_str).await {
139                    continue;
140                }
141                match reconcile_one_execution(
142                    client, self.backend.as_ref(), &p, &tag, &idx, lane, eid_str,
143                    &mut upstream_cache, &self.partition_config,
144                ).await {
145                    Ok(n) => total_processed += n,
146                    Err(e) => {
147                        tracing::warn!(
148                            partition,
149                            execution_id = eid_str.as_str(),
150                            error = %e,
151                            "dependency_reconciler: reconcile failed"
152                        );
153                        total_errors += 1;
154                    }
155                }
156            }
157        }
158
159        ScanResult { processed: total_processed, errors: total_errors }
160    }
161}
162
163/// Reconcile one blocked execution. Returns count of edges resolved.
164#[allow(clippy::too_many_arguments)]
165async fn reconcile_one_execution(
166    client: &ferriskey::Client,
167    backend: Option<&Arc<dyn EngineBackend>>,
168    partition: &Partition,
169    tag: &str,
170    idx: &IndexKeys,
171    lane: &LaneId,
172    eid_str: &str,
173    upstream_cache: &mut HashMap<String, String>,
174    config: &PartitionConfig,
175) -> Result<u32, ferriskey::Error> {
176    let deps_unresolved_key = format!("ff:exec:{}:{}:deps:unresolved", tag, eid_str);
177
178    // Read unresolved dep edge IDs
179    let edge_ids: Vec<String> = client
180        .cmd("SMEMBERS")
181        .arg(&deps_unresolved_key)
182        .execute()
183        .await
184        .unwrap_or_default();
185
186    if edge_ids.is_empty() {
187        return Ok(0);
188    }
189
190    let mut resolved: u32 = 0;
191
192    for (i, edge_id) in edge_ids.iter().enumerate() {
193        if i >= MAX_EDGES_PER_EXEC {
194            break; // limit per cycle
195        }
196
197        // Read the dep edge to find upstream_execution_id
198        let dep_key = format!("ff:exec:{}:{}:dep:{}", tag, eid_str, edge_id);
199        let upstream_id: Option<String> = client
200            .cmd("HGET")
201            .arg(&dep_key)
202            .arg("upstream_execution_id")
203            .execute()
204            .await?;
205
206        let upstream_id = match upstream_id {
207            Some(s) if !s.is_empty() => s,
208            _ => continue,
209        };
210
211        // Check upstream terminal state (cross-partition, cached)
212        let terminal_outcome = get_upstream_outcome(
213            client, &upstream_id, upstream_cache, config,
214        ).await;
215
216        if terminal_outcome.is_empty() {
217            continue; // upstream not terminal yet
218        }
219
220        // Upstream is terminal — resolve the dependency.
221        // Pass the actual terminal_outcome as upstream_outcome ARGV.
222        // The Lua checks == "success" for the satisfaction path; anything
223        // else triggers the impossible path. Using the real outcome
224        // maintains semantic correctness for audit and future extensions.
225        let resolution = terminal_outcome.as_str();
226
227        let now_ms_res: Result<u64, String> = if let Some(b) = backend {
228            b.server_time_ms().await.map_err(|e| e.to_string())
229        } else {
230            crate::scanner::lease_expiry::server_time_ms_legacy(client).await.map_err(|e| e.to_string())
231        };
232        let now_ms = match now_ms_res {
233            Ok(t) => t,
234            Err(_) => continue,
235        };
236
237        // Read current_attempt_index + flow_id (needed for both the
238        // trait-routed and the fallback paths — Valkey's impl reads
239        // these itself from `ResolveDependencyArgs`).
240        let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
241        let att_idx_str: Option<String> = client
242            .cmd("HGET")
243            .arg(&exec_core)
244            .arg("current_attempt_index")
245            .execute()
246            .await?;
247        let att_idx_n: u32 = att_idx_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0);
248        let flow_id_str: Option<String> = client
249            .cmd("HGET")
250            .arg(&exec_core)
251            .arg("flow_id")
252            .execute()
253            .await
254            .unwrap_or_default();
255
256        if let Some(backend_arc) = backend {
257            // PR-7b Cluster 2: trait-routed resolve. The Valkey impl
258            // wraps `ff_resolve_dependency` with identical KEYS[14] /
259            // ARGV[5]; Postgres returns `Unavailable` (structural —
260            // PG's dispatch uses `dispatch_completion(event_id)` per
261            // `resolve_dependency` rustdoc).
262            let Ok(downstream_eid) = ExecutionId::parse(eid_str) else {
263                tracing::warn!(execution_id = eid_str, "malformed eid; skipping");
264                continue;
265            };
266            let Ok(upstream_eid) = ExecutionId::parse(&upstream_id) else {
267                tracing::warn!(upstream_id = %upstream_id, "malformed upstream eid; skipping");
268                continue;
269            };
270            // `resolve_dependency` requires a typed `FlowId`. Standalone
271            // executions (`flow_id` absent / empty) cannot have dep
272            // edges, so an entry in `deps:unresolved` for such an
273            // execution indicates data corruption or a partial write —
274            // skip with a warn rather than fabricating a sentinel flow
275            // id. The legacy FCALL path used a `_nil_` sentinel key,
276            // but that only worked because the Lua body's `EXISTS`
277            // guard silently no-ops on nil-shaped keys; the trait
278            // surface has no sentinel and forces us to diagnose the
279            // invariant violation instead.
280            let flow_id = match flow_id_str.as_deref() {
281                Some(s) if !s.is_empty() => match FlowId::parse(s) {
282                    Ok(fid) => fid,
283                    Err(_) => {
284                        tracing::warn!(
285                            execution_id = eid_str,
286                            flow_id = s,
287                            "dependency_reconciler: malformed flow_id; skipping"
288                        );
289                        continue;
290                    }
291                },
292                _ => {
293                    tracing::warn!(
294                        execution_id = eid_str,
295                        edge_id = edge_id.as_str(),
296                        "dependency_reconciler: unresolved dep edge on standalone execution \
297                         (missing flow_id) — likely data corruption; skipping"
298                    );
299                    continue;
300                }
301            };
302            let Ok(edge_id_parsed) = EdgeId::parse(edge_id) else {
303                tracing::warn!(edge_id = edge_id.as_str(), "malformed edge_id; skipping");
304                continue;
305            };
306            let args = ResolveDependencyArgs::new(
307                *partition,
308                flow_id,
309                downstream_eid,
310                upstream_eid,
311                edge_id_parsed,
312                lane.clone(),
313                AttemptIndex::new(att_idx_n),
314                terminal_outcome.clone(),
315                TimestampMs::from_millis(now_ms as i64),
316            );
317            match backend_arc.resolve_dependency(args).await {
318                Ok(outcome) => {
319                    resolved += 1;
320                    tracing::debug!(
321                        execution_id = eid_str,
322                        edge_id = edge_id.as_str(),
323                        upstream_id = upstream_id.as_str(),
324                        resolution = resolution,
325                        ?outcome,
326                        "dependency_reconciler: resolved stale dependency"
327                    );
328                }
329                Err(e) => {
330                    tracing::warn!(
331                        execution_id = eid_str,
332                        edge_id = edge_id.as_str(),
333                        error = %e,
334                        "dependency_reconciler: resolve_dependency failed"
335                    );
336                }
337            }
338            continue;
339        }
340
341        // ── Test-only fallback (no backend wired) ──
342        // Preserves the pre-trait-routing FCALL path for unit tests that
343        // construct the scanner without a backend. Mirrors cluster-1's
344        // lease_expiry pattern.
345        let deps_meta = format!("ff:exec:{}:{}:deps:meta", tag, eid_str);
346        let eligible_key = idx.lane_eligible(lane);
347        let blocked_deps_key = idx.lane_blocked_dependencies(lane);
348        let terminal_key = idx.lane_terminal(lane);
349
350        let att_idx = att_idx_str.as_deref().unwrap_or("0");
351        let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
352        let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
353
354        let downstream_payload = format!("ff:exec:{}:{}:payload", tag, eid_str);
355        let upstream_result = format!("ff:exec:{}:{}:result", tag, upstream_id);
356        let edgegroup_key = match flow_id_str.as_deref() {
357            Some(fid) if !fid.is_empty() => {
358                format!("ff:flow:{}:{}:edgegroup:{}", tag, fid, eid_str)
359            }
360            _ => format!("ff:flow:{}:_nil_:edgegroup:_nil_", tag),
361        };
362        let incoming_set_key = match flow_id_str.as_deref() {
363            Some(fid) if !fid.is_empty() => {
364                format!("ff:flow:{}:{}:in:{}", tag, fid, eid_str)
365            }
366            _ => format!("ff:flow:{}:_nil_:in:_nil_", tag),
367        };
368        let pending_cancel_groups_key =
369            format!("ff:idx:{}:pending_cancel_groups", tag);
370        let flow_id_owned = flow_id_str.clone().unwrap_or_default();
371        let keys: [&str; 14] = [
372            &exec_core,
373            &deps_meta,
374            &deps_unresolved_key,
375            &dep_key,
376            &eligible_key,
377            &terminal_key,
378            &blocked_deps_key,
379            &attempt_hash,
380            &stream_meta,
381            &downstream_payload,
382            &upstream_result,
383            &edgegroup_key,
384            &incoming_set_key,
385            &pending_cancel_groups_key,
386        ];
387        let now_s = now_ms.to_string();
388        let argv: [&str; 5] = [
389            edge_id.as_str(),
390            resolution,
391            &now_s,
392            flow_id_owned.as_str(),
393            eid_str,
394        ];
395
396        match client
397            .fcall::<ferriskey::Value>("ff_resolve_dependency", &keys, &argv)
398            .await
399        {
400            Ok(_) => {
401                resolved += 1;
402                tracing::debug!(
403                    execution_id = eid_str,
404                    edge_id = edge_id.as_str(),
405                    upstream_id = upstream_id.as_str(),
406                    resolution,
407                    "dependency_reconciler: resolved stale dependency (fallback)"
408                );
409            }
410            Err(e) => {
411                tracing::warn!(
412                    execution_id = eid_str,
413                    edge_id = edge_id.as_str(),
414                    error = %e,
415                    "dependency_reconciler: ff_resolve_dependency failed (fallback)"
416                );
417            }
418        }
419    }
420
421    Ok(resolved)
422}
423
424/// Get the terminal outcome of an upstream execution (cross-partition, cached).
425/// Returns empty string if not terminal.
426async fn get_upstream_outcome(
427    client: &ferriskey::Client,
428    upstream_id: &str,
429    cache: &mut HashMap<String, String>,
430    config: &PartitionConfig,
431) -> String {
432    if let Some(outcome) = cache.get(upstream_id) {
433        return outcome.clone();
434    }
435
436    // Compute upstream's partition
437    let eid = match ExecutionId::parse(upstream_id) {
438        Ok(id) => id,
439        Err(_) => {
440            cache.insert(upstream_id.to_owned(), String::new());
441            return String::new();
442        }
443    };
444    let partition = execution_partition(&eid, config);
445    let upstream_tag = partition.hash_tag();
446    let upstream_core = format!("ff:exec:{}:{}:core", upstream_tag, upstream_id);
447
448    // Read lifecycle_phase + terminal_outcome
449    let fields: Vec<Option<String>> = client
450        .cmd("HMGET")
451        .arg(&upstream_core)
452        .arg("lifecycle_phase")
453        .arg("terminal_outcome")
454        .execute()
455        .await
456        .unwrap_or_default();
457
458    let lifecycle = fields.first().and_then(|v| v.clone()).unwrap_or_default();
459    let outcome = fields.get(1).and_then(|v| v.clone()).unwrap_or_default();
460
461    let result = if lifecycle == "terminal" && !outcome.is_empty() && outcome != "none" {
462        outcome
463    } else {
464        String::new()
465    };
466
467    cache.insert(upstream_id.to_owned(), result.clone());
468    result
469}
470
471// ── Postgres branch (Wave 6a, RFC-v0.7) ─────────────────────────────────
472
473/// Postgres parity for [`DependencyReconciler`].
474///
475/// Delegates to [`ff_backend_postgres::reconcilers::dependency::reconcile_tick`]
476/// — the cascade backstop for the per-hop-tx dispatch chain in
477/// Wave 5a. See that module's docs for the transitive-descendant
478/// sweep proof.
479///
480/// The engine's Postgres scanner task drives this on a fixed
481/// interval (mirrors the Valkey `ScannerRunner` contract), passing
482/// its configured `ScannerFilter` and the `stale_threshold_ms`
483/// from `BackendConfig`. A `None` threshold folds to
484/// [`ff_backend_postgres::reconcilers::dependency::DEFAULT_STALE_THRESHOLD_MS`].
485#[cfg(feature = "postgres")]
486pub async fn reconcile_via_postgres(
487    pool: &ff_backend_postgres::PgPool,
488    filter: &ScannerFilter,
489    stale_threshold_ms: Option<i64>,
490) -> Result<
491    ff_backend_postgres::reconcilers::dependency::ReconcileReport,
492    ff_core::engine_error::EngineError,
493> {
494    let threshold = stale_threshold_ms.unwrap_or(
495        ff_backend_postgres::reconcilers::dependency::DEFAULT_STALE_THRESHOLD_MS,
496    );
497    ff_backend_postgres::reconcilers::dependency::reconcile_tick(pool, filter, threshold).await
498}