Skip to main content

ff_engine/
partition_router.rs

1//! Partition-aware dispatch for cross-partition operations.
2//!
3//! The PartitionRouter resolves an ExecutionId to its partition and provides
4//! key contexts for Valkey operations. For Phase 1, a single ferriskey::Client
5//! connection is shared across all partitions (the client handles cluster routing
6//! internally via hash tags).
7
8use ff_core::keys::{ExecKeyContext, IndexKeys};
9use ff_core::partition::{
10    Partition, PartitionConfig, PartitionFamily, execution_partition,
11};
12use ff_core::types::ExecutionId;
13
14/// Routes execution operations to the correct partition.
15///
16/// In a Valkey Cluster deployment, the ferriskey client handles slot-level
17/// routing transparently — all keys for a partition share the same `{p:N}`
18/// hash tag, so they land on the same shard. The router's job is partition
19/// computation and key context construction, not connection selection.
20pub struct PartitionRouter {
21    config: PartitionConfig,
22}
23
24impl PartitionRouter {
25    pub fn new(config: PartitionConfig) -> Self {
26        Self { config }
27    }
28
29    /// Resolve an execution ID to its partition.
30    pub fn partition_for(&self, eid: &ExecutionId) -> Partition {
31        execution_partition(eid, &self.config)
32    }
33
34    /// Build an ExecKeyContext for the given execution.
35    pub fn exec_keys(&self, eid: &ExecutionId) -> ExecKeyContext {
36        let partition = self.partition_for(eid);
37        ExecKeyContext::new(&partition, eid)
38    }
39
40    /// Build IndexKeys for a given partition index.
41    pub fn index_keys(&self, partition_index: u16) -> IndexKeys {
42        let partition = Partition {
43            family: PartitionFamily::Execution,
44            index: partition_index,
45        };
46        IndexKeys::new(&partition)
47    }
48
49    /// The partition config.
50    pub fn config(&self) -> &PartitionConfig {
51        &self.config
52    }
53
54    /// Total number of flow partitions.
55    ///
56    /// Post-RFC-011: exec keys co-locate with their parent flow's partition
57    /// under hash-tag routing, so this count governs exec routing too.
58    /// There is no separate `num_execution_partitions`.
59    pub fn num_flow_partitions(&self) -> u16 {
60        self.config.num_flow_partitions
61    }
62}
63
64/// Post-completion dependency dispatch.
65///
66/// After an execution completes/fails/cancels, the engine dispatches
67/// `resolve_dependency` to each downstream child's `{p:N}` partition.
68/// Reads outgoing edges from the flow partition, then for each downstream
69/// child, calls FCALL ff_resolve_dependency on the child's partition.
70///
71/// If a child is transitioned to terminal (skipped) by the resolution,
72/// cascades dispatch for the skipped child's outgoing edges so that
73/// grandchildren don't wait for the reconciler (up to 15s/level).
74pub async fn dispatch_dependency_resolution(
75    client: &ferriskey::Client,
76    router: &PartitionRouter,
77    eid: &ExecutionId,
78    flow_id: Option<&str>,
79) {
80    dispatch_dependency_resolution_inner(client, router, eid, flow_id, 0).await;
81}
82
83/// Max cascade depth to prevent runaway recursion on degenerate graphs.
84const MAX_CASCADE_DEPTH: u32 = 50;
85
86async fn dispatch_dependency_resolution_inner(
87    client: &ferriskey::Client,
88    router: &PartitionRouter,
89    eid: &ExecutionId,
90    flow_id: Option<&str>,
91    cascade_depth: u32,
92) {
93    if cascade_depth > MAX_CASCADE_DEPTH {
94        tracing::warn!(
95            execution_id = %eid,
96            cascade_depth,
97            "dispatch_dep: cascade depth limit reached, reconciler will catch remaining"
98        );
99        return;
100    }
101
102    let flow_id_str = match flow_id {
103        Some(fid) if !fid.is_empty() => fid,
104        _ => return, // not in a flow
105    };
106
107    // Read terminal_outcome from exec_core to determine resolution type
108    let exec_ctx = router.exec_keys(eid);
109    let core_key = exec_ctx.core();
110    let outcome: Option<String> = match client
111        .cmd("HGET")
112        .arg(&core_key)
113        .arg("terminal_outcome")
114        .execute()
115        .await
116    {
117        Ok(v) => v,
118        Err(e) => {
119            tracing::warn!(
120                execution_id = %eid,
121                error = %e,
122                "dispatch_dep: failed to read terminal_outcome"
123            );
124            return;
125        }
126    };
127
128    let outcome_str = outcome.unwrap_or_default();
129    // Pass the actual terminal_outcome as upstream_outcome ARGV.
130    // The Lua checks == "success" for the satisfaction path; anything
131    // else (failed, cancelled, expired, skipped) triggers the impossible path.
132    let upstream_outcome = outcome_str.as_str();
133
134    // Read outgoing edges from flow partition.
135    // First, compute flow partition.
136    let fid = match ff_core::types::FlowId::parse(flow_id_str) {
137        Ok(id) => id,
138        Err(e) => {
139            tracing::warn!(
140                flow_id = flow_id_str,
141                error = %e,
142                "dispatch_dep: invalid flow_id"
143            );
144            return;
145        }
146    };
147
148    let flow_partition = ff_core::partition::flow_partition(&fid, router.config());
149    let flow_ctx = ff_core::keys::FlowKeyContext::new(&flow_partition, &fid);
150
151    // Read outgoing adjacency set: ff:flow:{fp:N}:<flow_id>:out:<execution_id>
152    let out_key = flow_ctx.outgoing(eid);
153    let edge_ids: Vec<String> = match client
154        .cmd("SMEMBERS")
155        .arg(&out_key)
156        .execute()
157        .await
158    {
159        Ok(ids) => ids,
160        Err(e) => {
161            tracing::warn!(
162                execution_id = %eid,
163                flow_id = flow_id_str,
164                error = %e,
165                "dispatch_dep: SMEMBERS outgoing failed"
166            );
167            return;
168        }
169    };
170
171    if edge_ids.is_empty() {
172        return;
173    }
174
175    let now_ms = ff_core::types::TimestampMs::now().0.to_string();
176    let mut resolved: u32 = 0;
177    let mut skipped_children: Vec<(ExecutionId, String)> = Vec::new();
178
179    for edge_id in &edge_ids {
180        // Parse the edge id once up front — an adjacency-set entry that
181        // doesn't round-trip through EdgeId::parse is either corruption
182        // or a legacy marker. Defaulting to a fresh random UUID (the
183        // pre-fix behaviour of `unwrap_or_default()` via the uuid_id!
184        // macro) would point at a non-existent edge and mask the issue.
185        let parsed_edge_id = match ff_core::types::EdgeId::parse(edge_id) {
186            Ok(id) => id,
187            Err(e) => {
188                tracing::warn!(
189                    edge_id = edge_id.as_str(),
190                    flow_id = flow_id_str,
191                    error = %e,
192                    "dispatch_dep: invalid edge_id in outgoing adjacency set, skipping"
193                );
194                continue;
195            }
196        };
197
198        // Read the edge record from flow partition to find downstream_execution_id
199        let edge_key = flow_ctx.edge(&parsed_edge_id);
200        let downstream_eid_str: Option<String> = match client
201            .cmd("HGET")
202            .arg(&edge_key)
203            .arg("downstream_execution_id")
204            .execute()
205            .await
206        {
207            Ok(v) => v,
208            Err(_) => continue,
209        };
210
211        let downstream_eid_str = match downstream_eid_str {
212            Some(s) if !s.is_empty() => s,
213            _ => continue,
214        };
215
216        let downstream_eid = match ExecutionId::parse(&downstream_eid_str) {
217            Ok(id) => id,
218            Err(_) => continue,
219        };
220
221        // Compute child's partition and build keys for ff_resolve_dependency
222        let child_partition = router.partition_for(&downstream_eid);
223        let child_ctx = ExecKeyContext::new(&child_partition, &downstream_eid);
224        let child_idx = IndexKeys::new(&child_partition);
225
226        // Read child's lane_id for lane-scoped index keys. Fail-closed:
227        // a transport error must NOT silently default to "default",
228        // which would mutate the wrong lane's eligible/terminal/blocked
229        // ZSETs on the ff_resolve_dependency FCALL below. Skip this
230        // edge; the dependency_reconciler will retry on its next pass.
231        let child_core_key = child_ctx.core();
232        let lane_str: Option<String> = match client
233            .cmd("HGET")
234            .arg(&child_core_key)
235            .arg("lane_id")
236            .execute()
237            .await
238        {
239            Ok(v) => v,
240            Err(e) => {
241                tracing::warn!(
242                    edge_id = edge_id.as_str(),
243                    downstream = downstream_eid_str.as_str(),
244                    error = %e,
245                    "dispatch_dep: HGET lane_id failed, skipping (reconciler will retry)"
246                );
247                continue;
248            }
249        };
250        let lane_id = ff_core::types::LaneId::new(
251            lane_str.as_deref().unwrap_or("default"),
252        );
253
254        // current_attempt_index: same treatment as lane_id. Absence
255        // (None) legitimately means the child is at attempt 0; only a
256        // read failure skips.
257        let att_idx_str: Option<String> = match client
258            .cmd("HGET")
259            .arg(&child_core_key)
260            .arg("current_attempt_index")
261            .execute()
262            .await
263        {
264            Ok(v) => v,
265            Err(e) => {
266                tracing::warn!(
267                    edge_id = edge_id.as_str(),
268                    downstream = downstream_eid_str.as_str(),
269                    error = %e,
270                    "dispatch_dep: HGET current_attempt_index failed, \
271                     skipping (reconciler will retry)"
272                );
273                continue;
274            }
275        };
276        let att_idx = ff_core::types::AttemptIndex::new(
277            att_idx_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0),
278        );
279
280        let dep_hash = child_ctx.dep_edge(&parsed_edge_id);
281
282        // KEYS (11): exec_core, deps_meta, unresolved_set, dep_hash,
283        //            eligible_zset, terminal_zset, blocked_deps_zset,
284        //            attempt_hash, stream_meta, downstream_payload,
285        //            upstream_result
286        // KEYS[10]/[11] added for Batch C item 3: server-side
287        // data_passing_ref resolution. Upstream and downstream are
288        // co-located via flow membership (RFC-011 §7.3), so we build
289        // the upstream key on the child's partition using the same
290        // ExecKeyContext shape.
291        let deps_meta = child_ctx.deps_meta();
292        let unresolved = child_ctx.deps_unresolved();
293        let eligible = child_idx.lane_eligible(&lane_id);
294        let terminal = child_idx.lane_terminal(&lane_id);
295        let blocked_deps = child_idx.lane_blocked_dependencies(&lane_id);
296        let attempt_hash = child_ctx.attempt_hash(att_idx);
297        let stream_meta = child_ctx.stream_meta(att_idx);
298        let downstream_payload = child_ctx.payload();
299        let upstream_ctx = ExecKeyContext::new(&child_partition, eid);
300        let upstream_result = upstream_ctx.result();
301
302        let edgegroup = flow_ctx.edgegroup(&downstream_eid);
303        let incoming_set = flow_ctx.incoming(&downstream_eid);
304        let pending_cancel_groups = ff_core::keys::FlowIndexKeys::new(&flow_partition)
305            .pending_cancel_groups();
306        let downstream_eid_full = downstream_eid.to_string();
307        let keys: [&str; 14] = [
308            &child_core_key,         // 1
309            &deps_meta,              // 2
310            &unresolved,             // 3
311            &dep_hash,               // 4
312            &eligible,               // 5
313            &terminal,               // 6
314            &blocked_deps,           // 7
315            &attempt_hash,           // 8
316            &stream_meta,            // 9
317            &downstream_payload,     // 10
318            &upstream_result,        // 11
319            &edgegroup,              // 12 (RFC-016 Stage A)
320            &incoming_set,           // 13 (RFC-016 Stage C)
321            &pending_cancel_groups,  // 14 (RFC-016 Stage C)
322        ];
323        let argv: [&str; 5] = [
324            edge_id,
325            upstream_outcome,
326            &now_ms,
327            flow_id_str,             // 4 (RFC-016 Stage C)
328            &downstream_eid_full,    // 5 (RFC-016 Stage C)
329        ];
330
331        match client
332            .fcall::<ferriskey::Value>("ff_resolve_dependency", &keys, &argv)
333            .await
334        {
335            Ok(val) => {
336                resolved += 1;
337                tracing::debug!(
338                    edge_id = edge_id.as_str(),
339                    downstream = downstream_eid_str.as_str(),
340                    outcome = upstream_outcome,
341                    "dispatch_dep: resolved dependency"
342                );
343                // Check if child was skipped (transitioned to terminal).
344                // If so, cascade dispatch for the child's outgoing edges.
345                if is_child_skipped_result(&val) {
346                    skipped_children.push((
347                        downstream_eid.clone(),
348                        flow_id_str.to_string(),
349                    ));
350                }
351            }
352            Err(e) => {
353                tracing::warn!(
354                    edge_id = edge_id.as_str(),
355                    downstream = downstream_eid_str.as_str(),
356                    error = %e,
357                    "dispatch_dep: ff_resolve_dependency failed"
358                );
359            }
360        }
361    }
362
363    if resolved > 0 {
364        tracing::info!(
365            execution_id = %eid,
366            flow_id = flow_id_str,
367            resolved,
368            total_edges = edge_ids.len(),
369            skipped_cascade = skipped_children.len(),
370            "dispatch_dep: dependency resolution complete"
371        );
372    }
373
374    // Cascade: dispatch for children that were skipped by dependency
375    // impossibility. Without this, grandchildren stay blocked until the
376    // dependency_reconciler picks them up (up to reconciler_interval per level).
377    for (child_eid, child_flow_id) in &skipped_children {
378        Box::pin(dispatch_dependency_resolution_inner(
379            client, router, child_eid, Some(child_flow_id.as_str()),
380            cascade_depth + 1,
381        )).await;
382    }
383}
384
385/// Postgres-backend parallel to [`dispatch_dependency_resolution`].
386///
387/// Wave 5a (RFC-v0.7 migration-master Part 3 hotspot #1). Delegates
388/// to [`ff_backend_postgres::dispatch::dispatch_completion`], which
389/// implements the same cascade semantics as the Valkey
390/// `ff_resolve_dependency` FCALL but under the per-hop-transaction
391/// rule adjudicated in K-2 of the RFC round-2 debate: each
392/// downstream `ff_edge_group` advance runs in its own serializable
393/// tx so the cascade never holds a lock across transitive
394/// descendants.
395///
396/// The engine's dispatch loop picks this branch when the deployment
397/// configures the Postgres backend; the Valkey branch above stays
398/// untouched. Keyed on `event_id` (the `ff_completion_event`
399/// bigserial primary key), NOT on `execution_id`, so a replay of the
400/// same completion event short-circuits via the
401/// `dispatched_at_ms IS NULL` claim in the dispatcher.
402#[cfg(feature = "postgres")]
403pub async fn dispatch_via_postgres(
404    pool: &ff_backend_postgres::PgPool,
405    event_id: i64,
406) -> Result<ff_backend_postgres::dispatch::DispatchOutcome, ff_core::engine_error::EngineError> {
407    ff_backend_postgres::dispatch::dispatch_completion(pool, event_id).await
408}
409
410/// Check if an ff_resolve_dependency result indicates the child was
411/// skipped. Result shapes after Batch C item 3:
412///   `[1, "OK", "already_resolved"]`            — 3 elements total
413///   `[1, "OK", "satisfied", ""|"data_injected"]` — 4 elements total
414///   `[1, "OK", "impossible", ""|"child_skipped"]` — 4 elements total
415///
416/// "Child skipped" lives in slot 3 (0-indexed) on the impossible path
417/// only; the satisfied path's fourth slot is the unrelated
418/// data_injected marker.
419fn is_child_skipped_result(value: &ferriskey::Value) -> bool {
420    match value {
421        ferriskey::Value::Array(arr) => {
422            if arr.len() < 4 {
423                // 3-element responses are normal (already_resolved).
424                // No warning.
425                return false;
426            }
427            arr.get(3)
428                .and_then(|v| match v {
429                    Ok(ferriskey::Value::BulkString(b)) => {
430                        Some(&b[..] == b"child_skipped")
431                    }
432                    Ok(ferriskey::Value::SimpleString(s)) => {
433                        Some(s == "child_skipped")
434                    }
435                    _ => None,
436                })
437                .unwrap_or(false)
438        }
439        _ => {
440            tracing::warn!(
441                "is_child_skipped_result: expected Array, got non-array value"
442            );
443            false
444        }
445    }
446}