Skip to main content

ff_script/functions/
flow.rs

1//! Typed FCALL wrappers for flow coordination functions (lua/flow.lua).
2
3use ff_core::contracts::*;
4use crate::error::ScriptError;
5use ff_core::keys::{ExecKeyContext, FlowIndexKeys, FlowKeyContext, IndexKeys};
6use ff_core::state::PublicState;
7
8use crate::result::{FcallResult, FromFcallResult};
9
10/// Key context for flow-structural operations on {fp:N}.
11pub struct FlowStructOpKeys<'a> {
12    pub fctx: &'a FlowKeyContext,
13    pub fidx: &'a FlowIndexKeys,
14}
15
16/// Key context for child-local dependency operations on {p:N}.
17pub struct DepOpKeys<'a> {
18    pub ctx: &'a ExecKeyContext,
19    pub idx: &'a IndexKeys,
20    pub lane_id: &'a ff_core::types::LaneId,
21}
22
23/// Extended key context for [`ff_resolve_dependency`], which needs
24/// access to the upstream execution's result key for server-side
25/// `data_passing_ref` resolution (Batch C item 3). Separate from
26/// [`DepOpKeys`] so the other dependency wrappers —
27/// `ff_apply_dependency_to_child`, `ff_evaluate_flow_eligibility`,
28/// `ff_promote_blocked_to_eligible`, `ff_replay_execution` — don't
29/// have to carry an upstream context they never use.
30///
31/// Upstream and downstream are co-located on the same `{fp:N}` slot
32/// via flow membership (RFC-011 §7.3), so `upstream_ctx` builds the
33/// upstream key on the child's partition.
34pub struct ResolveDependencyKeys<'a> {
35    pub ctx: &'a ExecKeyContext,
36    pub idx: &'a IndexKeys,
37    pub lane_id: &'a ff_core::types::LaneId,
38    pub upstream_ctx: &'a ExecKeyContext,
39}
40
41// ─── ff_create_flow ──────────────────────────────────────────────────
42// KEYS (3): flow_core, members_set, flow_index
43// ARGV (4): flow_id, flow_kind, namespace, now_ms
44
45ff_function! {
46    pub ff_create_flow(args: CreateFlowArgs) -> CreateFlowResult {
47        keys(k: &FlowStructOpKeys<'_>) {
48            k.fctx.core(),
49            k.fctx.members(),
50            k.fidx.flow_index(),
51        }
52        argv {
53            args.flow_id.to_string(),
54            args.flow_kind.clone(),
55            args.namespace.to_string(),
56            args.now.to_string(),
57        }
58    }
59}
60
61impl FromFcallResult for CreateFlowResult {
62    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
63        let r = FcallResult::parse(raw)?.into_success()?;
64        let fid_str = r.field_str(0);
65        let fid = ff_core::types::FlowId::parse(&fid_str)
66            .map_err(|e| ScriptError::Parse {
67                fcall: "ff_create_flow".into(),
68                execution_id: None,
69                message: format!("bad flow_id: {e}"),
70            })?;
71        match r.status.as_str() {
72            "OK" => Ok(CreateFlowResult::Created { flow_id: fid }),
73            "ALREADY_SATISFIED" => Ok(CreateFlowResult::AlreadySatisfied { flow_id: fid }),
74            _ => Err(ScriptError::Parse {
75                fcall: "ff_create_flow".into(),
76                execution_id: None,
77                message: format!("unexpected status: {}", r.status),
78            }),
79        }
80    }
81}
82
83// ─── ff_add_execution_to_flow ────────────────────────────────────────
84// KEYS (3): flow_core, members_set, flow_index
85// ARGV (3): flow_id, execution_id, now_ms
86
87ff_function! {
88    pub ff_add_execution_to_flow(args: AddExecutionToFlowArgs) -> AddExecutionToFlowResult {
89        keys(k: &FlowStructOpKeys<'_>) {
90            k.fctx.core(),
91            k.fctx.members(),
92            k.fidx.flow_index(),
93        }
94        argv {
95            args.flow_id.to_string(),
96            args.execution_id.to_string(),
97            args.now.to_string(),
98        }
99    }
100}
101
102impl FromFcallResult for AddExecutionToFlowResult {
103    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
104        let r = FcallResult::parse(raw)?.into_success()?;
105        let eid_str = r.field_str(0);
106        let nc_str = r.field_str(1);
107        match r.status.as_str() {
108            "ALREADY_SATISFIED" => {
109                let eid = ff_core::types::ExecutionId::parse(&eid_str)
110                    .map_err(|e| ScriptError::Parse {
111                        fcall: "ff_add_execution_to_flow".into(),
112                        execution_id: None,
113                        message: format!("bad execution_id: {e}"),
114                    })?;
115                let nc: u32 = nc_str.parse().unwrap_or(0);
116                Ok(AddExecutionToFlowResult::AlreadyMember {
117                    execution_id: eid,
118                    node_count: nc,
119                })
120            }
121            "OK" => {
122                let eid = ff_core::types::ExecutionId::parse(&eid_str)
123                    .map_err(|e| ScriptError::Parse {
124                        fcall: "ff_add_execution_to_flow".into(),
125                        execution_id: None,
126                        message: format!("bad execution_id: {e}"),
127                    })?;
128                let nc: u32 = nc_str.parse().unwrap_or(0);
129                Ok(AddExecutionToFlowResult::Added {
130                    execution_id: eid,
131                    new_node_count: nc,
132                })
133            }
134            _ => Err(ScriptError::Parse {
135                fcall: "ff_add_execution_to_flow".into(),
136                execution_id: None,
137                message: format!("unexpected status: {}", r.status),
138            }),
139        }
140    }
141}
142
143// ─── ff_cancel_flow ──────────────────────────────────────────────────
144// KEYS (5): flow_core, members_set, flow_index, pending_cancels, cancel_backlog
145// ARGV (5): flow_id, reason, cancellation_policy, now_ms, grace_ms
146//
147// pending_cancels + cancel_backlog are populated only on policy=cancel_all
148// with members > 0. The cancel_reconciler scanner drains any entries
149// live dispatch leaves behind. Passing grace_ms as "" accepts the Lua
150// default (30s).
151
152ff_function! {
153    pub ff_cancel_flow(args: CancelFlowArgs) -> CancelFlowResult {
154        keys(k: &FlowStructOpKeys<'_>) {
155            k.fctx.core(),
156            k.fctx.members(),
157            k.fidx.flow_index(),
158            k.fctx.pending_cancels(),
159            k.fidx.cancel_backlog(),
160        }
161        argv {
162            args.flow_id.to_string(),
163            args.reason.clone(),
164            args.cancellation_policy.clone(),
165            args.now.to_string(),
166            String::new(),
167        }
168    }
169}
170
171impl FromFcallResult for CancelFlowResult {
172    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
173        let r = FcallResult::parse(raw)?.into_success()?;
174        let policy = r.field_str(0);
175        let mut members = Vec::new();
176        let mut i = 1;
177        loop {
178            let s = r.field_str(i);
179            if s.is_empty() {
180                break;
181            }
182            members.push(s);
183            i += 1;
184        }
185        Ok(CancelFlowResult::Cancelled {
186            cancellation_policy: policy,
187            member_execution_ids: members,
188        })
189    }
190}
191
192// ─── ff_evaluate_flow_eligibility ─────────────────────────────────────
193// KEYS (2): exec_core, deps_meta
194// ARGV (0)
195
196ff_function! {
197    #[allow(unused_variables)]
198    pub ff_evaluate_flow_eligibility(args: EvaluateFlowEligibilityArgs) -> EvaluateFlowEligibilityResult {
199        keys(k: &DepOpKeys<'_>) {
200            k.ctx.core(),
201            k.ctx.deps_meta(),
202        }
203        argv {
204        }
205    }
206}
207
208impl FromFcallResult for EvaluateFlowEligibilityResult {
209    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
210        let r = FcallResult::parse(raw)?.into_success()?;
211        Ok(EvaluateFlowEligibilityResult::Status {
212            status: r.field_str(0),
213        })
214    }
215}
216
217// ─── ff_apply_dependency_to_child ─────────────────────────────────────
218// KEYS (7): exec_core, deps_meta, unresolved_set, dep_hash,
219//           eligible_zset, blocked_deps_zset, deps_all_edges
220// ARGV (7): flow_id, edge_id, upstream_eid, graph_revision,
221//           dependency_kind, data_passing_ref, now_ms
222
223ff_function! {
224    pub ff_apply_dependency_to_child(args: ApplyDependencyToChildArgs) -> ApplyDependencyToChildResult {
225        keys(k: &DepOpKeys<'_>) {
226            k.ctx.core(),
227            k.ctx.deps_meta(),
228            k.ctx.deps_unresolved(),
229            k.ctx.dep_edge(&args.edge_id),
230            k.idx.lane_eligible(k.lane_id),
231            k.idx.lane_blocked_dependencies(k.lane_id),
232            k.ctx.deps_all_edges(),
233        }
234        argv {
235            args.flow_id.to_string(),
236            args.edge_id.to_string(),
237            args.upstream_execution_id.to_string(),
238            args.graph_revision.to_string(),
239            args.dependency_kind.clone(),
240            args.data_passing_ref.clone().unwrap_or_default(),
241            args.now.to_string(),
242        }
243    }
244}
245
246impl FromFcallResult for ApplyDependencyToChildResult {
247    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
248        let r = FcallResult::parse(raw)?.into_success()?;
249        let sub = r.field_str(0);
250        if sub == "already_applied" {
251            Ok(ApplyDependencyToChildResult::AlreadyApplied)
252        } else {
253            let count: u32 = sub.parse().unwrap_or(0);
254            Ok(ApplyDependencyToChildResult::Applied {
255                unsatisfied_count: count,
256            })
257        }
258    }
259}
260
261// ─── ff_resolve_dependency ────────────────────────────────────────────
262// KEYS (11): exec_core, deps_meta, unresolved_set, dep_hash,
263//            eligible_zset, terminal_zset, blocked_deps_zset,
264//            attempt_hash, stream_meta, downstream_payload,
265//            upstream_result
266// ARGV (3): edge_id, upstream_outcome, now_ms
267//
268// KEYS[10]/[11] added in Batch C item 3 for server-side
269// data_passing_ref resolution. Upstream and downstream are co-located
270// on the same {fp:N} slot via flow membership — the `upstream_ctx`
271// field on DepOpKeys builds the upstream key on that shared partition.
272
273ff_function! {
274    pub ff_resolve_dependency(args: ResolveDependencyArgs) -> ResolveDependencyResult {
275        keys(k: &ResolveDependencyKeys<'_>) {
276            k.ctx.core(),
277            k.ctx.deps_meta(),
278            k.ctx.deps_unresolved(),
279            k.ctx.dep_edge(&args.edge_id),
280            k.idx.lane_eligible(k.lane_id),
281            k.idx.lane_terminal(k.lane_id),
282            k.idx.lane_blocked_dependencies(k.lane_id),
283            k.ctx.attempt_hash(ff_core::types::AttemptIndex::new(0)), // placeholder
284            k.ctx.stream_meta(ff_core::types::AttemptIndex::new(0)),  // placeholder
285            k.ctx.payload(),
286            k.upstream_ctx.result(),
287        }
288        argv {
289            args.edge_id.to_string(),
290            args.upstream_outcome.clone(),
291            args.now.to_string(),
292        }
293    }
294}
295
296impl FromFcallResult for ResolveDependencyResult {
297    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
298        let r = FcallResult::parse(raw)?.into_success()?;
299        match r.field_str(0).as_str() {
300            "satisfied" => Ok(ResolveDependencyResult::Satisfied),
301            "impossible" => Ok(ResolveDependencyResult::Impossible),
302            "already_resolved" => Ok(ResolveDependencyResult::AlreadyResolved),
303            other => Err(ScriptError::Parse {
304                fcall: "ff_resolve_dependency".into(),
305                execution_id: None,
306                message: format!("unknown resolve status: {other}"),
307            }),
308        }
309    }
310}
311
312// ─── ff_promote_blocked_to_eligible ───────────────────────────────────
313// KEYS (5): exec_core, blocked_deps_zset, eligible_zset, deps_meta,
314//           deps_unresolved
315// ARGV (2): execution_id, now_ms
316
317ff_function! {
318    pub ff_promote_blocked_to_eligible(args: PromoteBlockedToEligibleArgs) -> PromoteBlockedToEligibleResult {
319        keys(k: &DepOpKeys<'_>) {
320            k.ctx.core(),
321            k.idx.lane_blocked_dependencies(k.lane_id),
322            k.idx.lane_eligible(k.lane_id),
323            k.ctx.deps_meta(),
324            k.ctx.deps_unresolved(),
325        }
326        argv {
327            args.execution_id.to_string(),
328            args.now.to_string(),
329        }
330    }
331}
332
333impl FromFcallResult for PromoteBlockedToEligibleResult {
334    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
335        let _r = FcallResult::parse(raw)?.into_success()?;
336        Ok(PromoteBlockedToEligibleResult::Promoted)
337    }
338}
339
340// ─── ff_replay_execution ──────────────────────────────────────────────
341// KEYS (4+N): exec_core, terminal_zset, eligible_zset, lease_history,
342//             [blocked_deps_zset, deps_meta, deps_unresolved, dep_edge_0..N]
343// ARGV (2+N): execution_id, now_ms, [edge_id_0..N]
344//
345// NOTE: Variable KEYS/ARGV. The ff_function! macro generates a fixed-size
346// Vec, but this function needs dynamic N edges. For skipped flow member
347// replay, use the manual FCALL path instead of this wrapper.
348// This wrapper handles the common non-flow replay case (base 4 KEYS).
349
350ff_function! {
351    pub ff_replay_execution(args: ReplayExecutionArgs) -> ReplayExecutionResult {
352        keys(k: &DepOpKeys<'_>) {
353            k.ctx.core(),
354            k.idx.lane_terminal(k.lane_id),
355            k.idx.lane_eligible(k.lane_id),
356            k.ctx.lease_history(),
357        }
358        argv {
359            args.execution_id.to_string(),
360            args.now.to_string(),
361        }
362    }
363}
364
365impl FromFcallResult for ReplayExecutionResult {
366    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
367        let r = FcallResult::parse(raw)?.into_success()?;
368        // ok("0") for normal, ok(N) for skipped flow member
369        let unsatisfied = r.field_str(0);
370        let ps = if unsatisfied == "0" {
371            PublicState::Waiting
372        } else {
373            PublicState::WaitingChildren
374        };
375        Ok(ReplayExecutionResult::Replayed { public_state: ps })
376    }
377}
378
379// ─── ff_stage_dependency_edge ─────────────────────────────────────────
380// KEYS (6): flow_core, members_set, edge_hash, out_adj_set, in_adj_set,
381//           grant_hash
382// ARGV (8): flow_id, edge_id, upstream_eid, downstream_eid,
383//           dependency_kind, data_passing_ref, expected_graph_revision,
384//           now_ms
385
386ff_function! {
387    pub ff_stage_dependency_edge(args: StageDependencyEdgeArgs) -> StageDependencyEdgeResult {
388        keys(k: &FlowStructOpKeys<'_>) {
389            k.fctx.core(),
390            k.fctx.members(),
391            k.fctx.edge(&args.edge_id),
392            k.fctx.outgoing(&args.upstream_execution_id),
393            k.fctx.incoming(&args.downstream_execution_id),
394            k.fctx.grant(&args.edge_id.to_string()),
395        }
396        argv {
397            args.flow_id.to_string(),
398            args.edge_id.to_string(),
399            args.upstream_execution_id.to_string(),
400            args.downstream_execution_id.to_string(),
401            args.dependency_kind.clone(),
402            args.data_passing_ref.clone().unwrap_or_default(),
403            args.expected_graph_revision.to_string(),
404            args.now.to_string(),
405        }
406    }
407}
408
409impl FromFcallResult for StageDependencyEdgeResult {
410    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
411        let r = FcallResult::parse(raw)?.into_success()?;
412        let eid = ff_core::types::EdgeId::parse(&r.field_str(0))
413            .map_err(|e| ScriptError::Parse {
414                fcall: "ff_stage_dependency_edge".into(),
415                execution_id: None,
416                message: format!("bad edge_id: {e}"),
417            })?;
418        let rev: u64 = r.field_str(1).parse()
419            .map_err(|e| ScriptError::Parse {
420                fcall: "ff_stage_dependency_edge".into(),
421                execution_id: None,
422                message: format!("bad graph_revision: {e}"),
423            })?;
424        Ok(StageDependencyEdgeResult::Staged {
425            edge_id: eid,
426            new_graph_revision: rev,
427        })
428    }
429}
430
431// ─── ff_set_flow_tags (issue #58.4) ─────────────────────────────────────
432//
433// Variadic-ARGV FCALL mirroring `ff_set_execution_tags`. Hand-rolled
434// since the `ff_function!` macro assumes a fixed ARGV vector.
435//
436// KEYS (2): flow_core, tags_key
437// ARGV (>=2, even): k1, v1, k2, v2, ...
438
439/// Call `ff_set_flow_tags`: write caller-supplied tag fields to the
440/// flow's separate tags key. Returns the number of pairs applied. Tag
441/// keys must match `^[a-z][a-z0-9_]*\.`. The Lua function also
442/// lazy-migrates any pre-58.4 reserved-namespace fields stashed
443/// inline on `flow_core` into the new tags key.
444pub async fn ff_set_flow_tags(
445    conn: &ferriskey::Client,
446    fctx: &FlowKeyContext,
447    args: &SetFlowTagsArgs,
448) -> Result<SetFlowTagsResult, ScriptError> {
449    let keys = [fctx.core(), fctx.tags()];
450    let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
451
452    let mut argv: Vec<String> = Vec::with_capacity(args.tags.len() * 2);
453    for (k, v) in &args.tags {
454        argv.push(k.clone());
455        argv.push(v.clone());
456    }
457    let argv_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
458
459    let raw = conn
460        .fcall::<ferriskey::Value>("ff_set_flow_tags", &key_refs, &argv_refs)
461        .await
462        .map_err(ScriptError::Valkey)?;
463    SetFlowTagsResult::from_fcall_result(&raw)
464}
465
466impl FromFcallResult for SetFlowTagsResult {
467    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
468        let r = FcallResult::parse(raw)?.into_success()?;
469        let count: u32 = r
470            .field_str(0)
471            .parse()
472            .map_err(|e| ScriptError::Parse {
473                fcall: "ff_set_flow_tags".into(),
474                execution_id: None,
475                message: format!("bad tag count: {e}"),
476            })?;
477        Ok(SetFlowTagsResult::Ok { count })
478    }
479}