1use ff_core::contracts::*;
4use crate::error::ScriptError;
5use ff_core::keys::{ExecKeyContext, FlowIndexKeys, FlowKeyContext, IndexKeys};
6use ff_core::state::PublicState;
7
8use crate::result::{FcallResult, FromFcallResult};
9
10pub struct FlowStructOpKeys<'a> {
12 pub fctx: &'a FlowKeyContext,
13 pub fidx: &'a FlowIndexKeys,
14}
15
16pub struct DepOpKeys<'a> {
18 pub ctx: &'a ExecKeyContext,
19 pub idx: &'a IndexKeys,
20 pub lane_id: &'a ff_core::types::LaneId,
21}
22
23pub struct ResolveDependencyKeys<'a> {
35 pub ctx: &'a ExecKeyContext,
36 pub idx: &'a IndexKeys,
37 pub lane_id: &'a ff_core::types::LaneId,
38 pub upstream_ctx: &'a ExecKeyContext,
39}
40
41ff_function! {
46 pub ff_create_flow(args: CreateFlowArgs) -> CreateFlowResult {
47 keys(k: &FlowStructOpKeys<'_>) {
48 k.fctx.core(),
49 k.fctx.members(),
50 k.fidx.flow_index(),
51 }
52 argv {
53 args.flow_id.to_string(),
54 args.flow_kind.clone(),
55 args.namespace.to_string(),
56 args.now.to_string(),
57 }
58 }
59}
60
61impl FromFcallResult for CreateFlowResult {
62 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
63 let r = FcallResult::parse(raw)?.into_success()?;
64 let fid_str = r.field_str(0);
65 let fid = ff_core::types::FlowId::parse(&fid_str)
66 .map_err(|e| ScriptError::Parse(format!("bad flow_id: {e}")))?;
67 match r.status.as_str() {
68 "OK" => Ok(CreateFlowResult::Created { flow_id: fid }),
69 "ALREADY_SATISFIED" => Ok(CreateFlowResult::AlreadySatisfied { flow_id: fid }),
70 _ => Err(ScriptError::Parse(format!("unexpected status: {}", r.status))),
71 }
72 }
73}
74
75ff_function! {
80 pub ff_add_execution_to_flow(args: AddExecutionToFlowArgs) -> AddExecutionToFlowResult {
81 keys(k: &FlowStructOpKeys<'_>) {
82 k.fctx.core(),
83 k.fctx.members(),
84 k.fidx.flow_index(),
85 }
86 argv {
87 args.flow_id.to_string(),
88 args.execution_id.to_string(),
89 args.now.to_string(),
90 }
91 }
92}
93
94impl FromFcallResult for AddExecutionToFlowResult {
95 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
96 let r = FcallResult::parse(raw)?.into_success()?;
97 let eid_str = r.field_str(0);
98 let nc_str = r.field_str(1);
99 match r.status.as_str() {
100 "ALREADY_SATISFIED" => {
101 let eid = ff_core::types::ExecutionId::parse(&eid_str)
102 .map_err(|e| ScriptError::Parse(format!("bad execution_id: {e}")))?;
103 let nc: u32 = nc_str.parse().unwrap_or(0);
104 Ok(AddExecutionToFlowResult::AlreadyMember {
105 execution_id: eid,
106 node_count: nc,
107 })
108 }
109 "OK" => {
110 let eid = ff_core::types::ExecutionId::parse(&eid_str)
111 .map_err(|e| ScriptError::Parse(format!("bad execution_id: {e}")))?;
112 let nc: u32 = nc_str.parse().unwrap_or(0);
113 Ok(AddExecutionToFlowResult::Added {
114 execution_id: eid,
115 new_node_count: nc,
116 })
117 }
118 _ => Err(ScriptError::Parse(format!("unexpected status: {}", r.status))),
119 }
120 }
121}
122
123ff_function! {
128 pub ff_cancel_flow(args: CancelFlowArgs) -> CancelFlowResult {
129 keys(k: &FlowStructOpKeys<'_>) {
130 k.fctx.core(),
131 k.fctx.members(),
132 k.fidx.flow_index(),
133 }
134 argv {
135 args.flow_id.to_string(),
136 args.reason.clone(),
137 args.cancellation_policy.clone(),
138 args.now.to_string(),
139 }
140 }
141}
142
143impl FromFcallResult for CancelFlowResult {
144 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
145 let r = FcallResult::parse(raw)?.into_success()?;
146 let policy = r.field_str(0);
147 let mut members = Vec::new();
148 let mut i = 1;
149 loop {
150 let s = r.field_str(i);
151 if s.is_empty() {
152 break;
153 }
154 members.push(s);
155 i += 1;
156 }
157 Ok(CancelFlowResult::Cancelled {
158 cancellation_policy: policy,
159 member_execution_ids: members,
160 })
161 }
162}
163
164ff_function! {
169 #[allow(unused_variables)]
170 pub ff_evaluate_flow_eligibility(args: EvaluateFlowEligibilityArgs) -> EvaluateFlowEligibilityResult {
171 keys(k: &DepOpKeys<'_>) {
172 k.ctx.core(),
173 k.ctx.deps_meta(),
174 }
175 argv {
176 }
177 }
178}
179
180impl FromFcallResult for EvaluateFlowEligibilityResult {
181 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
182 let r = FcallResult::parse(raw)?.into_success()?;
183 Ok(EvaluateFlowEligibilityResult::Status {
184 status: r.field_str(0),
185 })
186 }
187}
188
189ff_function! {
196 pub ff_apply_dependency_to_child(args: ApplyDependencyToChildArgs) -> ApplyDependencyToChildResult {
197 keys(k: &DepOpKeys<'_>) {
198 k.ctx.core(),
199 k.ctx.deps_meta(),
200 k.ctx.deps_unresolved(),
201 k.ctx.dep_edge(&args.edge_id),
202 k.idx.lane_eligible(k.lane_id),
203 k.idx.lane_blocked_dependencies(k.lane_id),
204 k.ctx.deps_all_edges(),
205 }
206 argv {
207 args.flow_id.to_string(),
208 args.edge_id.to_string(),
209 args.upstream_execution_id.to_string(),
210 args.graph_revision.to_string(),
211 args.dependency_kind.clone(),
212 args.data_passing_ref.clone().unwrap_or_default(),
213 args.now.to_string(),
214 }
215 }
216}
217
218impl FromFcallResult for ApplyDependencyToChildResult {
219 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
220 let r = FcallResult::parse(raw)?.into_success()?;
221 let sub = r.field_str(0);
222 if sub == "already_applied" {
223 Ok(ApplyDependencyToChildResult::AlreadyApplied)
224 } else {
225 let count: u32 = sub.parse().unwrap_or(0);
226 Ok(ApplyDependencyToChildResult::Applied {
227 unsatisfied_count: count,
228 })
229 }
230 }
231}
232
233ff_function! {
246 pub ff_resolve_dependency(args: ResolveDependencyArgs) -> ResolveDependencyResult {
247 keys(k: &ResolveDependencyKeys<'_>) {
248 k.ctx.core(),
249 k.ctx.deps_meta(),
250 k.ctx.deps_unresolved(),
251 k.ctx.dep_edge(&args.edge_id),
252 k.idx.lane_eligible(k.lane_id),
253 k.idx.lane_terminal(k.lane_id),
254 k.idx.lane_blocked_dependencies(k.lane_id),
255 k.ctx.attempt_hash(ff_core::types::AttemptIndex::new(0)), k.ctx.stream_meta(ff_core::types::AttemptIndex::new(0)), k.ctx.payload(),
258 k.upstream_ctx.result(),
259 }
260 argv {
261 args.edge_id.to_string(),
262 args.upstream_outcome.clone(),
263 args.now.to_string(),
264 }
265 }
266}
267
268impl FromFcallResult for ResolveDependencyResult {
269 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
270 let r = FcallResult::parse(raw)?.into_success()?;
271 match r.field_str(0).as_str() {
272 "satisfied" => Ok(ResolveDependencyResult::Satisfied),
273 "impossible" => Ok(ResolveDependencyResult::Impossible),
274 "already_resolved" => Ok(ResolveDependencyResult::AlreadyResolved),
275 other => Err(ScriptError::Parse(format!("unknown resolve status: {other}"))),
276 }
277 }
278}
279
280ff_function! {
286 pub ff_promote_blocked_to_eligible(args: PromoteBlockedToEligibleArgs) -> PromoteBlockedToEligibleResult {
287 keys(k: &DepOpKeys<'_>) {
288 k.ctx.core(),
289 k.idx.lane_blocked_dependencies(k.lane_id),
290 k.idx.lane_eligible(k.lane_id),
291 k.ctx.deps_meta(),
292 k.ctx.deps_unresolved(),
293 }
294 argv {
295 args.execution_id.to_string(),
296 args.now.to_string(),
297 }
298 }
299}
300
301impl FromFcallResult for PromoteBlockedToEligibleResult {
302 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
303 let _r = FcallResult::parse(raw)?.into_success()?;
304 Ok(PromoteBlockedToEligibleResult::Promoted)
305 }
306}
307
308ff_function! {
319 pub ff_replay_execution(args: ReplayExecutionArgs) -> ReplayExecutionResult {
320 keys(k: &DepOpKeys<'_>) {
321 k.ctx.core(),
322 k.idx.lane_terminal(k.lane_id),
323 k.idx.lane_eligible(k.lane_id),
324 k.ctx.lease_history(),
325 }
326 argv {
327 args.execution_id.to_string(),
328 args.now.to_string(),
329 }
330 }
331}
332
333impl FromFcallResult for ReplayExecutionResult {
334 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
335 let r = FcallResult::parse(raw)?.into_success()?;
336 let unsatisfied = r.field_str(0);
338 let ps = if unsatisfied == "0" {
339 PublicState::Waiting
340 } else {
341 PublicState::WaitingChildren
342 };
343 Ok(ReplayExecutionResult::Replayed { public_state: ps })
344 }
345}
346
347ff_function! {
355 pub ff_stage_dependency_edge(args: StageDependencyEdgeArgs) -> StageDependencyEdgeResult {
356 keys(k: &FlowStructOpKeys<'_>) {
357 k.fctx.core(),
358 k.fctx.members(),
359 k.fctx.edge(&args.edge_id),
360 k.fctx.outgoing(&args.upstream_execution_id),
361 k.fctx.incoming(&args.downstream_execution_id),
362 k.fctx.grant(&args.edge_id.to_string()),
363 }
364 argv {
365 args.flow_id.to_string(),
366 args.edge_id.to_string(),
367 args.upstream_execution_id.to_string(),
368 args.downstream_execution_id.to_string(),
369 args.dependency_kind.clone(),
370 args.data_passing_ref.clone().unwrap_or_default(),
371 args.expected_graph_revision.to_string(),
372 args.now.to_string(),
373 }
374 }
375}
376
377impl FromFcallResult for StageDependencyEdgeResult {
378 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
379 let r = FcallResult::parse(raw)?.into_success()?;
380 let eid = ff_core::types::EdgeId::parse(&r.field_str(0))
381 .map_err(|e| ScriptError::Parse(format!("bad edge_id: {e}")))?;
382 let rev: u64 = r.field_str(1).parse()
383 .map_err(|e| ScriptError::Parse(format!("bad graph_revision: {e}")))?;
384 Ok(StageDependencyEdgeResult::Staged {
385 edge_id: eid,
386 new_graph_revision: rev,
387 })
388 }
389}