Skip to main content

ff_script/functions/
flow.rs

1//! Typed FCALL wrappers for flow coordination functions (lua/flow.lua).
2
3use ff_core::contracts::*;
4use crate::error::ScriptError;
5use ff_core::keys::{ExecKeyContext, FlowIndexKeys, FlowKeyContext, IndexKeys};
6use ff_core::state::PublicState;
7
8use crate::result::{FcallResult, FromFcallResult};
9
10/// Key context for flow-structural operations on {fp:N}.
11pub struct FlowStructOpKeys<'a> {
12    pub fctx: &'a FlowKeyContext,
13    pub fidx: &'a FlowIndexKeys,
14}
15
16/// Key context for child-local dependency operations on {p:N}.
17pub struct DepOpKeys<'a> {
18    pub ctx: &'a ExecKeyContext,
19    pub idx: &'a IndexKeys,
20    pub lane_id: &'a ff_core::types::LaneId,
21}
22
23/// Extended key context for [`ff_resolve_dependency`], which needs
24/// access to the upstream execution's result key for server-side
25/// `data_passing_ref` resolution (Batch C item 3). Separate from
26/// [`DepOpKeys`] so the other dependency wrappers —
27/// `ff_apply_dependency_to_child`, `ff_evaluate_flow_eligibility`,
28/// `ff_promote_blocked_to_eligible`, `ff_replay_execution` — don't
29/// have to carry an upstream context they never use.
30///
31/// Upstream and downstream are co-located on the same `{fp:N}` slot
32/// via flow membership (RFC-011 §7.3), so `upstream_ctx` builds the
33/// upstream key on the child's partition.
34pub struct ResolveDependencyKeys<'a> {
35    pub ctx: &'a ExecKeyContext,
36    pub idx: &'a IndexKeys,
37    pub lane_id: &'a ff_core::types::LaneId,
38    pub upstream_ctx: &'a ExecKeyContext,
39}
40
41// ─── ff_create_flow ──────────────────────────────────────────────────
42// KEYS (3): flow_core, members_set, flow_index
43// ARGV (4): flow_id, flow_kind, namespace, now_ms
44
45ff_function! {
46    pub ff_create_flow(args: CreateFlowArgs) -> CreateFlowResult {
47        keys(k: &FlowStructOpKeys<'_>) {
48            k.fctx.core(),
49            k.fctx.members(),
50            k.fidx.flow_index(),
51        }
52        argv {
53            args.flow_id.to_string(),
54            args.flow_kind.clone(),
55            args.namespace.to_string(),
56            args.now.to_string(),
57        }
58    }
59}
60
61impl FromFcallResult for CreateFlowResult {
62    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
63        let r = FcallResult::parse(raw)?.into_success()?;
64        let fid_str = r.field_str(0);
65        let fid = ff_core::types::FlowId::parse(&fid_str)
66            .map_err(|e| ScriptError::Parse(format!("bad flow_id: {e}")))?;
67        match r.status.as_str() {
68            "OK" => Ok(CreateFlowResult::Created { flow_id: fid }),
69            "ALREADY_SATISFIED" => Ok(CreateFlowResult::AlreadySatisfied { flow_id: fid }),
70            _ => Err(ScriptError::Parse(format!("unexpected status: {}", r.status))),
71        }
72    }
73}
74
75// ─── ff_add_execution_to_flow ────────────────────────────────────────
76// KEYS (3): flow_core, members_set, flow_index
77// ARGV (3): flow_id, execution_id, now_ms
78
79ff_function! {
80    pub ff_add_execution_to_flow(args: AddExecutionToFlowArgs) -> AddExecutionToFlowResult {
81        keys(k: &FlowStructOpKeys<'_>) {
82            k.fctx.core(),
83            k.fctx.members(),
84            k.fidx.flow_index(),
85        }
86        argv {
87            args.flow_id.to_string(),
88            args.execution_id.to_string(),
89            args.now.to_string(),
90        }
91    }
92}
93
94impl FromFcallResult for AddExecutionToFlowResult {
95    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
96        let r = FcallResult::parse(raw)?.into_success()?;
97        let eid_str = r.field_str(0);
98        let nc_str = r.field_str(1);
99        match r.status.as_str() {
100            "ALREADY_SATISFIED" => {
101                let eid = ff_core::types::ExecutionId::parse(&eid_str)
102                    .map_err(|e| ScriptError::Parse(format!("bad execution_id: {e}")))?;
103                let nc: u32 = nc_str.parse().unwrap_or(0);
104                Ok(AddExecutionToFlowResult::AlreadyMember {
105                    execution_id: eid,
106                    node_count: nc,
107                })
108            }
109            "OK" => {
110                let eid = ff_core::types::ExecutionId::parse(&eid_str)
111                    .map_err(|e| ScriptError::Parse(format!("bad execution_id: {e}")))?;
112                let nc: u32 = nc_str.parse().unwrap_or(0);
113                Ok(AddExecutionToFlowResult::Added {
114                    execution_id: eid,
115                    new_node_count: nc,
116                })
117            }
118            _ => Err(ScriptError::Parse(format!("unexpected status: {}", r.status))),
119        }
120    }
121}
122
123// ─── ff_cancel_flow ──────────────────────────────────────────────────
124// KEYS (3): flow_core, members_set, flow_index
125// ARGV (4): flow_id, reason, cancellation_policy, now_ms
126
127ff_function! {
128    pub ff_cancel_flow(args: CancelFlowArgs) -> CancelFlowResult {
129        keys(k: &FlowStructOpKeys<'_>) {
130            k.fctx.core(),
131            k.fctx.members(),
132            k.fidx.flow_index(),
133        }
134        argv {
135            args.flow_id.to_string(),
136            args.reason.clone(),
137            args.cancellation_policy.clone(),
138            args.now.to_string(),
139        }
140    }
141}
142
143impl FromFcallResult for CancelFlowResult {
144    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
145        let r = FcallResult::parse(raw)?.into_success()?;
146        let policy = r.field_str(0);
147        let mut members = Vec::new();
148        let mut i = 1;
149        loop {
150            let s = r.field_str(i);
151            if s.is_empty() {
152                break;
153            }
154            members.push(s);
155            i += 1;
156        }
157        Ok(CancelFlowResult::Cancelled {
158            cancellation_policy: policy,
159            member_execution_ids: members,
160        })
161    }
162}
163
164// ─── ff_evaluate_flow_eligibility ─────────────────────────────────────
165// KEYS (2): exec_core, deps_meta
166// ARGV (0)
167
168ff_function! {
169    #[allow(unused_variables)]
170    pub ff_evaluate_flow_eligibility(args: EvaluateFlowEligibilityArgs) -> EvaluateFlowEligibilityResult {
171        keys(k: &DepOpKeys<'_>) {
172            k.ctx.core(),
173            k.ctx.deps_meta(),
174        }
175        argv {
176        }
177    }
178}
179
180impl FromFcallResult for EvaluateFlowEligibilityResult {
181    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
182        let r = FcallResult::parse(raw)?.into_success()?;
183        Ok(EvaluateFlowEligibilityResult::Status {
184            status: r.field_str(0),
185        })
186    }
187}
188
189// ─── ff_apply_dependency_to_child ─────────────────────────────────────
190// KEYS (7): exec_core, deps_meta, unresolved_set, dep_hash,
191//           eligible_zset, blocked_deps_zset, deps_all_edges
192// ARGV (7): flow_id, edge_id, upstream_eid, graph_revision,
193//           dependency_kind, data_passing_ref, now_ms
194
195ff_function! {
196    pub ff_apply_dependency_to_child(args: ApplyDependencyToChildArgs) -> ApplyDependencyToChildResult {
197        keys(k: &DepOpKeys<'_>) {
198            k.ctx.core(),
199            k.ctx.deps_meta(),
200            k.ctx.deps_unresolved(),
201            k.ctx.dep_edge(&args.edge_id),
202            k.idx.lane_eligible(k.lane_id),
203            k.idx.lane_blocked_dependencies(k.lane_id),
204            k.ctx.deps_all_edges(),
205        }
206        argv {
207            args.flow_id.to_string(),
208            args.edge_id.to_string(),
209            args.upstream_execution_id.to_string(),
210            args.graph_revision.to_string(),
211            args.dependency_kind.clone(),
212            args.data_passing_ref.clone().unwrap_or_default(),
213            args.now.to_string(),
214        }
215    }
216}
217
218impl FromFcallResult for ApplyDependencyToChildResult {
219    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
220        let r = FcallResult::parse(raw)?.into_success()?;
221        let sub = r.field_str(0);
222        if sub == "already_applied" {
223            Ok(ApplyDependencyToChildResult::AlreadyApplied)
224        } else {
225            let count: u32 = sub.parse().unwrap_or(0);
226            Ok(ApplyDependencyToChildResult::Applied {
227                unsatisfied_count: count,
228            })
229        }
230    }
231}
232
233// ─── ff_resolve_dependency ────────────────────────────────────────────
234// KEYS (11): exec_core, deps_meta, unresolved_set, dep_hash,
235//            eligible_zset, terminal_zset, blocked_deps_zset,
236//            attempt_hash, stream_meta, downstream_payload,
237//            upstream_result
238// ARGV (3): edge_id, upstream_outcome, now_ms
239//
240// KEYS[10]/[11] added in Batch C item 3 for server-side
241// data_passing_ref resolution. Upstream and downstream are co-located
242// on the same {fp:N} slot via flow membership — the `upstream_ctx`
243// field on DepOpKeys builds the upstream key on that shared partition.
244
245ff_function! {
246    pub ff_resolve_dependency(args: ResolveDependencyArgs) -> ResolveDependencyResult {
247        keys(k: &ResolveDependencyKeys<'_>) {
248            k.ctx.core(),
249            k.ctx.deps_meta(),
250            k.ctx.deps_unresolved(),
251            k.ctx.dep_edge(&args.edge_id),
252            k.idx.lane_eligible(k.lane_id),
253            k.idx.lane_terminal(k.lane_id),
254            k.idx.lane_blocked_dependencies(k.lane_id),
255            k.ctx.attempt_hash(ff_core::types::AttemptIndex::new(0)), // placeholder
256            k.ctx.stream_meta(ff_core::types::AttemptIndex::new(0)),  // placeholder
257            k.ctx.payload(),
258            k.upstream_ctx.result(),
259        }
260        argv {
261            args.edge_id.to_string(),
262            args.upstream_outcome.clone(),
263            args.now.to_string(),
264        }
265    }
266}
267
268impl FromFcallResult for ResolveDependencyResult {
269    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
270        let r = FcallResult::parse(raw)?.into_success()?;
271        match r.field_str(0).as_str() {
272            "satisfied" => Ok(ResolveDependencyResult::Satisfied),
273            "impossible" => Ok(ResolveDependencyResult::Impossible),
274            "already_resolved" => Ok(ResolveDependencyResult::AlreadyResolved),
275            other => Err(ScriptError::Parse(format!("unknown resolve status: {other}"))),
276        }
277    }
278}
279
280// ─── ff_promote_blocked_to_eligible ───────────────────────────────────
281// KEYS (5): exec_core, blocked_deps_zset, eligible_zset, deps_meta,
282//           deps_unresolved
283// ARGV (2): execution_id, now_ms
284
285ff_function! {
286    pub ff_promote_blocked_to_eligible(args: PromoteBlockedToEligibleArgs) -> PromoteBlockedToEligibleResult {
287        keys(k: &DepOpKeys<'_>) {
288            k.ctx.core(),
289            k.idx.lane_blocked_dependencies(k.lane_id),
290            k.idx.lane_eligible(k.lane_id),
291            k.ctx.deps_meta(),
292            k.ctx.deps_unresolved(),
293        }
294        argv {
295            args.execution_id.to_string(),
296            args.now.to_string(),
297        }
298    }
299}
300
301impl FromFcallResult for PromoteBlockedToEligibleResult {
302    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
303        let _r = FcallResult::parse(raw)?.into_success()?;
304        Ok(PromoteBlockedToEligibleResult::Promoted)
305    }
306}
307
308// ─── ff_replay_execution ──────────────────────────────────────────────
309// KEYS (4+N): exec_core, terminal_zset, eligible_zset, lease_history,
310//             [blocked_deps_zset, deps_meta, deps_unresolved, dep_edge_0..N]
311// ARGV (2+N): execution_id, now_ms, [edge_id_0..N]
312//
313// NOTE: Variable KEYS/ARGV. The ff_function! macro generates a fixed-size
314// Vec, but this function needs dynamic N edges. For skipped flow member
315// replay, use the manual FCALL path instead of this wrapper.
316// This wrapper handles the common non-flow replay case (base 4 KEYS).
317
318ff_function! {
319    pub ff_replay_execution(args: ReplayExecutionArgs) -> ReplayExecutionResult {
320        keys(k: &DepOpKeys<'_>) {
321            k.ctx.core(),
322            k.idx.lane_terminal(k.lane_id),
323            k.idx.lane_eligible(k.lane_id),
324            k.ctx.lease_history(),
325        }
326        argv {
327            args.execution_id.to_string(),
328            args.now.to_string(),
329        }
330    }
331}
332
333impl FromFcallResult for ReplayExecutionResult {
334    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
335        let r = FcallResult::parse(raw)?.into_success()?;
336        // ok("0") for normal, ok(N) for skipped flow member
337        let unsatisfied = r.field_str(0);
338        let ps = if unsatisfied == "0" {
339            PublicState::Waiting
340        } else {
341            PublicState::WaitingChildren
342        };
343        Ok(ReplayExecutionResult::Replayed { public_state: ps })
344    }
345}
346
347// ─── ff_stage_dependency_edge ─────────────────────────────────────────
348// KEYS (6): flow_core, members_set, edge_hash, out_adj_set, in_adj_set,
349//           grant_hash
350// ARGV (8): flow_id, edge_id, upstream_eid, downstream_eid,
351//           dependency_kind, data_passing_ref, expected_graph_revision,
352//           now_ms
353
354ff_function! {
355    pub ff_stage_dependency_edge(args: StageDependencyEdgeArgs) -> StageDependencyEdgeResult {
356        keys(k: &FlowStructOpKeys<'_>) {
357            k.fctx.core(),
358            k.fctx.members(),
359            k.fctx.edge(&args.edge_id),
360            k.fctx.outgoing(&args.upstream_execution_id),
361            k.fctx.incoming(&args.downstream_execution_id),
362            k.fctx.grant(&args.edge_id.to_string()),
363        }
364        argv {
365            args.flow_id.to_string(),
366            args.edge_id.to_string(),
367            args.upstream_execution_id.to_string(),
368            args.downstream_execution_id.to_string(),
369            args.dependency_kind.clone(),
370            args.data_passing_ref.clone().unwrap_or_default(),
371            args.expected_graph_revision.to_string(),
372            args.now.to_string(),
373        }
374    }
375}
376
377impl FromFcallResult for StageDependencyEdgeResult {
378    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
379        let r = FcallResult::parse(raw)?.into_success()?;
380        let eid = ff_core::types::EdgeId::parse(&r.field_str(0))
381            .map_err(|e| ScriptError::Parse(format!("bad edge_id: {e}")))?;
382        let rev: u64 = r.field_str(1).parse()
383            .map_err(|e| ScriptError::Parse(format!("bad graph_revision: {e}")))?;
384        Ok(StageDependencyEdgeResult::Staged {
385            edge_id: eid,
386            new_graph_revision: rev,
387        })
388    }
389}