Skip to main content

ff_engine/scanner/
dependency_reconciler.rs

1//! Dependency resolution reconciler.
2//!
3//! Safety net for cross-partition dependency resolution. When an upstream
4//! execution completes, ff-engine::partition_router normally calls ff_resolve_dependency
5//! on each downstream child. If the engine crashes between the upstream's
6//! completion and the child resolution dispatch, children remain stuck in
7//! blocked_by_dependencies. This reconciler detects and resolves that gap.
8//!
9//! For each blocked execution: reads deps:unresolved SET, cross-partition
10//! reads upstream exec_core to check if terminal, calls ff_resolve_dependency
11//! if upstream is terminal.
12//!
13//! Reference: RFC-007 §Resolve dependency, RFC-010 §6.14
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use ff_core::backend::ScannerFilter;
19use ff_core::keys::IndexKeys;
20use ff_core::partition::{Partition, PartitionConfig, PartitionFamily, execution_partition};
21use ff_core::types::{ExecutionId, LaneId};
22
23use super::{should_skip_candidate, ScanResult, Scanner};
24
25const BATCH_SIZE: u32 = 50;
26/// Max dep edges to resolve per execution per cycle.
27const MAX_EDGES_PER_EXEC: usize = 20;
28
29pub struct DependencyReconciler {
30    interval: Duration,
31    lanes: Vec<LaneId>,
32    partition_config: PartitionConfig,
33    filter: ScannerFilter,
34}
35
36impl DependencyReconciler {
37    pub fn new(interval: Duration, lanes: Vec<LaneId>, partition_config: PartitionConfig) -> Self {
38        Self::with_filter(interval, lanes, partition_config, ScannerFilter::default())
39    }
40
41    /// Construct with a [`ScannerFilter`] applied per candidate
42    /// (issue #122).
43    pub fn with_filter(
44        interval: Duration,
45        lanes: Vec<LaneId>,
46        partition_config: PartitionConfig,
47        filter: ScannerFilter,
48    ) -> Self {
49        Self {
50            interval,
51            lanes,
52            partition_config,
53            filter,
54        }
55    }
56}
57
58impl Scanner for DependencyReconciler {
59    fn name(&self) -> &'static str {
60        "dependency_reconciler"
61    }
62
63    fn interval(&self) -> Duration {
64        self.interval
65    }
66
67    fn filter(&self) -> &ScannerFilter {
68        &self.filter
69    }
70
71    async fn scan_partition(
72        &self,
73        client: &ferriskey::Client,
74        partition: u16,
75    ) -> ScanResult {
76        let p = Partition {
77            family: PartitionFamily::Execution,
78            index: partition,
79        };
80        let idx = IndexKeys::new(&p);
81        let tag = p.hash_tag();
82
83        let mut total_processed: u32 = 0;
84        let mut total_errors: u32 = 0;
85
86        // Cross-partition cache: upstream_eid → terminal_outcome (or empty if not terminal)
87        let mut upstream_cache: HashMap<String, String> = HashMap::new();
88
89        for lane in &self.lanes {
90            let blocked_key = idx.lane_blocked_dependencies(lane);
91
92            let blocked: Vec<String> = match client
93                .cmd("ZRANGEBYSCORE")
94                .arg(&blocked_key)
95                .arg("-inf")
96                .arg("+inf")
97                .arg("LIMIT")
98                .arg("0")
99                .arg(BATCH_SIZE.to_string().as_str())
100                .execute()
101                .await
102            {
103                Ok(ids) => ids,
104                Err(e) => {
105                    tracing::warn!(
106                        partition, error = %e,
107                        "dependency_reconciler: ZRANGEBYSCORE blocked:deps failed"
108                    );
109                    total_errors += 1;
110                    continue;
111                }
112            };
113
114            for eid_str in &blocked {
115                if should_skip_candidate(client, &self.filter, partition, eid_str).await {
116                    continue;
117                }
118                match reconcile_one_execution(
119                    client, &tag, &idx, lane, eid_str,
120                    &mut upstream_cache, &self.partition_config,
121                ).await {
122                    Ok(n) => total_processed += n,
123                    Err(e) => {
124                        tracing::warn!(
125                            partition,
126                            execution_id = eid_str.as_str(),
127                            error = %e,
128                            "dependency_reconciler: reconcile failed"
129                        );
130                        total_errors += 1;
131                    }
132                }
133            }
134        }
135
136        ScanResult { processed: total_processed, errors: total_errors }
137    }
138}
139
140/// Reconcile one blocked execution. Returns count of edges resolved.
141async fn reconcile_one_execution(
142    client: &ferriskey::Client,
143    tag: &str,
144    idx: &IndexKeys,
145    lane: &LaneId,
146    eid_str: &str,
147    upstream_cache: &mut HashMap<String, String>,
148    config: &PartitionConfig,
149) -> Result<u32, ferriskey::Error> {
150    let deps_unresolved_key = format!("ff:exec:{}:{}:deps:unresolved", tag, eid_str);
151
152    // Read unresolved dep edge IDs
153    let edge_ids: Vec<String> = client
154        .cmd("SMEMBERS")
155        .arg(&deps_unresolved_key)
156        .execute()
157        .await
158        .unwrap_or_default();
159
160    if edge_ids.is_empty() {
161        return Ok(0);
162    }
163
164    let mut resolved: u32 = 0;
165
166    for (i, edge_id) in edge_ids.iter().enumerate() {
167        if i >= MAX_EDGES_PER_EXEC {
168            break; // limit per cycle
169        }
170
171        // Read the dep edge to find upstream_execution_id
172        let dep_key = format!("ff:exec:{}:{}:dep:{}", tag, eid_str, edge_id);
173        let upstream_id: Option<String> = client
174            .cmd("HGET")
175            .arg(&dep_key)
176            .arg("upstream_execution_id")
177            .execute()
178            .await?;
179
180        let upstream_id = match upstream_id {
181            Some(s) if !s.is_empty() => s,
182            _ => continue,
183        };
184
185        // Check upstream terminal state (cross-partition, cached)
186        let terminal_outcome = get_upstream_outcome(
187            client, &upstream_id, upstream_cache, config,
188        ).await;
189
190        if terminal_outcome.is_empty() {
191            continue; // upstream not terminal yet
192        }
193
194        // Upstream is terminal — resolve the dependency.
195        // Pass the actual terminal_outcome as upstream_outcome ARGV.
196        // The Lua checks == "success" for the satisfaction path; anything
197        // else triggers the impossible path. Using the real outcome
198        // maintains semantic correctness for audit and future extensions.
199        let resolution = terminal_outcome.as_str();
200
201        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
202            Ok(t) => t,
203            Err(_) => continue,
204        };
205
206        // Build KEYS for ff_resolve_dependency
207        let exec_core = format!("ff:exec:{}:{}:core", tag, eid_str);
208        let deps_meta = format!("ff:exec:{}:{}:deps:meta", tag, eid_str);
209        let eligible_key = idx.lane_eligible(lane);
210        let blocked_deps_key = idx.lane_blocked_dependencies(lane);
211        let terminal_key = idx.lane_terminal(lane);
212
213        // For attempt_hash and stream_meta, read current_attempt_index
214        let att_idx_str: Option<String> = client
215            .cmd("HGET")
216            .arg(&exec_core)
217            .arg("current_attempt_index")
218            .execute()
219            .await?;
220        let att_idx = att_idx_str.as_deref().unwrap_or("0");
221        let attempt_hash = format!("ff:attempt:{}:{}:{}", tag, eid_str, att_idx);
222        let stream_meta = format!("ff:stream:{}:{}:{}:meta", tag, eid_str, att_idx);
223
224        // KEYS must match Lua positional order:
225        // [1] exec_core, [2] deps_meta, [3] unresolved_set, [4] dep_hash,
226        // [5] eligible_zset, [6] terminal_zset, [7] blocked_deps_zset,
227        // [8] attempt_hash, [9] stream_meta, [10] downstream_payload,
228        // [11] upstream_result, [12] edgegroup (RFC-016 Stage A)
229        // [10]/[11] added for Batch C item 3. [12] added for RFC-016
230        // Stage A: the per-downstream edgegroup hash lives under the
231        // flow partition, which is the same `{fp:N}` slot via RFC-011
232        // flow-affinity co-location.
233        let downstream_payload = format!("ff:exec:{}:{}:payload", tag, eid_str);
234        let upstream_result = format!("ff:exec:{}:{}:result", tag, upstream_id);
235        // Read the downstream's flow_id so we can construct the
236        // edgegroup key. Absent / empty flow_id (standalones) means
237        // there is no edge group; the key is harmless — Lua's
238        // `is_set`+`EXISTS` guards skip edgegroup writes.
239        let flow_id_str: Option<String> = client
240            .cmd("HGET")
241            .arg(&exec_core)
242            .arg("flow_id")
243            .execute()
244            .await
245            .unwrap_or_default();
246        let edgegroup_key = match flow_id_str.as_deref() {
247            Some(fid) if !fid.is_empty() => {
248                format!("ff:flow:{}:{}:edgegroup:{}", tag, fid, eid_str)
249            }
250            // Standalone execution — no dep edges possible, but we
251            // still need a cluster-slot-valid KEYS[12]. Use a
252            // well-formed sentinel key on the same {fp:N} tag; Lua's
253            // EXISTS guard skips the write.
254            _ => format!("ff:flow:{}:_nil_:edgegroup:_nil_", tag),
255        };
256        // RFC-016 Stage C: incoming_set + pending_cancel_groups SET
257        // for the sibling-enumeration + dispatcher-index path. Both
258        // live on the same {fp:N} slot as the edgegroup.
259        let incoming_set_key = match flow_id_str.as_deref() {
260            Some(fid) if !fid.is_empty() => {
261                format!("ff:flow:{}:{}:in:{}", tag, fid, eid_str)
262            }
263            _ => format!("ff:flow:{}:_nil_:in:_nil_", tag),
264        };
265        let pending_cancel_groups_key =
266            format!("ff:idx:{}:pending_cancel_groups", tag);
267        let flow_id_owned = flow_id_str.clone().unwrap_or_default();
268        let keys: [&str; 14] = [
269            &exec_core,                   // 1
270            &deps_meta,                   // 2
271            &deps_unresolved_key,         // 3
272            &dep_key,                     // 4
273            &eligible_key,                // 5
274            &terminal_key,                // 6
275            &blocked_deps_key,            // 7
276            &attempt_hash,                // 8
277            &stream_meta,                 // 9
278            &downstream_payload,          // 10
279            &upstream_result,             // 11
280            &edgegroup_key,               // 12
281            &incoming_set_key,            // 13 (RFC-016 Stage C)
282            &pending_cancel_groups_key,   // 14 (RFC-016 Stage C)
283        ];
284        let now_s = now_ms.to_string();
285        let argv: [&str; 5] = [
286            edge_id.as_str(),
287            resolution,
288            &now_s,
289            flow_id_owned.as_str(),       // 4 (RFC-016 Stage C)
290            eid_str,                      // 5 (RFC-016 Stage C)
291        ];
292
293        match client
294            .fcall::<ferriskey::Value>("ff_resolve_dependency", &keys, &argv)
295            .await
296        {
297            Ok(_) => {
298                resolved += 1;
299                tracing::debug!(
300                    execution_id = eid_str,
301                    edge_id = edge_id.as_str(),
302                    upstream_id = upstream_id.as_str(),
303                    resolution,
304                    "dependency_reconciler: resolved stale dependency"
305                );
306            }
307            Err(e) => {
308                tracing::warn!(
309                    execution_id = eid_str,
310                    edge_id = edge_id.as_str(),
311                    error = %e,
312                    "dependency_reconciler: ff_resolve_dependency failed"
313                );
314            }
315        }
316    }
317
318    Ok(resolved)
319}
320
321/// Get the terminal outcome of an upstream execution (cross-partition, cached).
322/// Returns empty string if not terminal.
323async fn get_upstream_outcome(
324    client: &ferriskey::Client,
325    upstream_id: &str,
326    cache: &mut HashMap<String, String>,
327    config: &PartitionConfig,
328) -> String {
329    if let Some(outcome) = cache.get(upstream_id) {
330        return outcome.clone();
331    }
332
333    // Compute upstream's partition
334    let eid = match ExecutionId::parse(upstream_id) {
335        Ok(id) => id,
336        Err(_) => {
337            cache.insert(upstream_id.to_owned(), String::new());
338            return String::new();
339        }
340    };
341    let partition = execution_partition(&eid, config);
342    let upstream_tag = partition.hash_tag();
343    let upstream_core = format!("ff:exec:{}:{}:core", upstream_tag, upstream_id);
344
345    // Read lifecycle_phase + terminal_outcome
346    let fields: Vec<Option<String>> = client
347        .cmd("HMGET")
348        .arg(&upstream_core)
349        .arg("lifecycle_phase")
350        .arg("terminal_outcome")
351        .execute()
352        .await
353        .unwrap_or_default();
354
355    let lifecycle = fields.first().and_then(|v| v.clone()).unwrap_or_default();
356    let outcome = fields.get(1).and_then(|v| v.clone()).unwrap_or_default();
357
358    let result = if lifecycle == "terminal" && !outcome.is_empty() && outcome != "none" {
359        outcome
360    } else {
361        String::new()
362    };
363
364    cache.insert(upstream_id.to_owned(), result.clone());
365    result
366}
367
368// ── Postgres branch (Wave 6a, RFC-v0.7) ─────────────────────────────────
369
370/// Postgres parity for [`DependencyReconciler`].
371///
372/// Delegates to [`ff_backend_postgres::reconcilers::dependency::reconcile_tick`]
373/// — the cascade backstop for the per-hop-tx dispatch chain in
374/// Wave 5a. See that module's docs for the transitive-descendant
375/// sweep proof.
376///
377/// The engine's Postgres scanner task drives this on a fixed
378/// interval (mirrors the Valkey `ScannerRunner` contract), passing
379/// its configured `ScannerFilter` and the `stale_threshold_ms`
380/// from `BackendConfig`. A `None` threshold folds to
381/// [`ff_backend_postgres::reconcilers::dependency::DEFAULT_STALE_THRESHOLD_MS`].
382#[cfg(feature = "postgres")]
383pub async fn reconcile_via_postgres(
384    pool: &ff_backend_postgres::PgPool,
385    filter: &ScannerFilter,
386    stale_threshold_ms: Option<i64>,
387) -> Result<
388    ff_backend_postgres::reconcilers::dependency::ReconcileReport,
389    ff_core::engine_error::EngineError,
390> {
391    let threshold = stale_threshold_ms.unwrap_or(
392        ff_backend_postgres::reconcilers::dependency::DEFAULT_STALE_THRESHOLD_MS,
393    );
394    ff_backend_postgres::reconcilers::dependency::reconcile_tick(pool, filter, threshold).await
395}