1use ff_core::contracts::*;
4use crate::error::ScriptError;
5use ff_core::keys::{ExecKeyContext, FlowIndexKeys, FlowKeyContext, IndexKeys};
6use ff_core::state::PublicState;
7
8use crate::result::{FcallResult, FromFcallResult};
9
10pub struct FlowStructOpKeys<'a> {
12 pub fctx: &'a FlowKeyContext,
13 pub fidx: &'a FlowIndexKeys,
14}
15
16pub struct DepOpKeys<'a> {
18 pub ctx: &'a ExecKeyContext,
19 pub idx: &'a IndexKeys,
20 pub lane_id: &'a ff_core::types::LaneId,
21}
22
23pub struct ResolveDependencyKeys<'a> {
35 pub ctx: &'a ExecKeyContext,
36 pub idx: &'a IndexKeys,
37 pub lane_id: &'a ff_core::types::LaneId,
38 pub upstream_ctx: &'a ExecKeyContext,
39}
40
41ff_function! {
46 pub ff_create_flow(args: CreateFlowArgs) -> CreateFlowResult {
47 keys(k: &FlowStructOpKeys<'_>) {
48 k.fctx.core(),
49 k.fctx.members(),
50 k.fidx.flow_index(),
51 }
52 argv {
53 args.flow_id.to_string(),
54 args.flow_kind.clone(),
55 args.namespace.to_string(),
56 args.now.to_string(),
57 }
58 }
59}
60
61impl FromFcallResult for CreateFlowResult {
62 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
63 let r = FcallResult::parse(raw)?.into_success()?;
64 let fid_str = r.field_str(0);
65 let fid = ff_core::types::FlowId::parse(&fid_str)
66 .map_err(|e| ScriptError::Parse {
67 fcall: "ff_create_flow".into(),
68 execution_id: None,
69 message: format!("bad flow_id: {e}"),
70 })?;
71 match r.status.as_str() {
72 "OK" => Ok(CreateFlowResult::Created { flow_id: fid }),
73 "ALREADY_SATISFIED" => Ok(CreateFlowResult::AlreadySatisfied { flow_id: fid }),
74 _ => Err(ScriptError::Parse {
75 fcall: "ff_create_flow".into(),
76 execution_id: None,
77 message: format!("unexpected status: {}", r.status),
78 }),
79 }
80 }
81}
82
83ff_function! {
88 pub ff_add_execution_to_flow(args: AddExecutionToFlowArgs) -> AddExecutionToFlowResult {
89 keys(k: &FlowStructOpKeys<'_>) {
90 k.fctx.core(),
91 k.fctx.members(),
92 k.fidx.flow_index(),
93 }
94 argv {
95 args.flow_id.to_string(),
96 args.execution_id.to_string(),
97 args.now.to_string(),
98 }
99 }
100}
101
102impl FromFcallResult for AddExecutionToFlowResult {
103 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
104 let r = FcallResult::parse(raw)?.into_success()?;
105 let eid_str = r.field_str(0);
106 let nc_str = r.field_str(1);
107 match r.status.as_str() {
108 "ALREADY_SATISFIED" => {
109 let eid = ff_core::types::ExecutionId::parse(&eid_str)
110 .map_err(|e| ScriptError::Parse {
111 fcall: "ff_add_execution_to_flow".into(),
112 execution_id: None,
113 message: format!("bad execution_id: {e}"),
114 })?;
115 let nc: u32 = nc_str.parse().unwrap_or(0);
116 Ok(AddExecutionToFlowResult::AlreadyMember {
117 execution_id: eid,
118 node_count: nc,
119 })
120 }
121 "OK" => {
122 let eid = ff_core::types::ExecutionId::parse(&eid_str)
123 .map_err(|e| ScriptError::Parse {
124 fcall: "ff_add_execution_to_flow".into(),
125 execution_id: None,
126 message: format!("bad execution_id: {e}"),
127 })?;
128 let nc: u32 = nc_str.parse().unwrap_or(0);
129 Ok(AddExecutionToFlowResult::Added {
130 execution_id: eid,
131 new_node_count: nc,
132 })
133 }
134 _ => Err(ScriptError::Parse {
135 fcall: "ff_add_execution_to_flow".into(),
136 execution_id: None,
137 message: format!("unexpected status: {}", r.status),
138 }),
139 }
140 }
141}
142
143ff_function! {
153 pub ff_cancel_flow(args: CancelFlowArgs) -> CancelFlowResult {
154 keys(k: &FlowStructOpKeys<'_>) {
155 k.fctx.core(),
156 k.fctx.members(),
157 k.fidx.flow_index(),
158 k.fctx.pending_cancels(),
159 k.fidx.cancel_backlog(),
160 }
161 argv {
162 args.flow_id.to_string(),
163 args.reason.clone(),
164 args.cancellation_policy.clone(),
165 args.now.to_string(),
166 String::new(),
167 }
168 }
169}
170
171impl FromFcallResult for CancelFlowResult {
172 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
173 let r = FcallResult::parse(raw)?.into_success()?;
174 let policy = r.field_str(0);
175 let mut members = Vec::new();
176 let mut i = 1;
177 loop {
178 let s = r.field_str(i);
179 if s.is_empty() {
180 break;
181 }
182 members.push(s);
183 i += 1;
184 }
185 Ok(CancelFlowResult::Cancelled {
186 cancellation_policy: policy,
187 member_execution_ids: members,
188 })
189 }
190}
191
192ff_function! {
197 #[allow(unused_variables)]
198 pub ff_evaluate_flow_eligibility(args: EvaluateFlowEligibilityArgs) -> EvaluateFlowEligibilityResult {
199 keys(k: &DepOpKeys<'_>) {
200 k.ctx.core(),
201 k.ctx.deps_meta(),
202 }
203 argv {
204 }
205 }
206}
207
208impl FromFcallResult for EvaluateFlowEligibilityResult {
209 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
210 let r = FcallResult::parse(raw)?.into_success()?;
211 Ok(EvaluateFlowEligibilityResult::Status {
212 status: r.field_str(0),
213 })
214 }
215}
216
217ff_function! {
224 pub ff_apply_dependency_to_child(args: ApplyDependencyToChildArgs) -> ApplyDependencyToChildResult {
225 keys(k: &DepOpKeys<'_>) {
226 k.ctx.core(),
227 k.ctx.deps_meta(),
228 k.ctx.deps_unresolved(),
229 k.ctx.dep_edge(&args.edge_id),
230 k.idx.lane_eligible(k.lane_id),
231 k.idx.lane_blocked_dependencies(k.lane_id),
232 k.ctx.deps_all_edges(),
233 }
234 argv {
235 args.flow_id.to_string(),
236 args.edge_id.to_string(),
237 args.upstream_execution_id.to_string(),
238 args.graph_revision.to_string(),
239 args.dependency_kind.clone(),
240 args.data_passing_ref.clone().unwrap_or_default(),
241 args.now.to_string(),
242 }
243 }
244}
245
246impl FromFcallResult for ApplyDependencyToChildResult {
247 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
248 let r = FcallResult::parse(raw)?.into_success()?;
249 let sub = r.field_str(0);
250 if sub == "already_applied" {
251 Ok(ApplyDependencyToChildResult::AlreadyApplied)
252 } else {
253 let count: u32 = sub.parse().unwrap_or(0);
254 Ok(ApplyDependencyToChildResult::Applied {
255 unsatisfied_count: count,
256 })
257 }
258 }
259}
260
261ff_function! {
274 pub ff_resolve_dependency(args: ResolveDependencyArgs) -> ResolveDependencyResult {
275 keys(k: &ResolveDependencyKeys<'_>) {
276 k.ctx.core(),
277 k.ctx.deps_meta(),
278 k.ctx.deps_unresolved(),
279 k.ctx.dep_edge(&args.edge_id),
280 k.idx.lane_eligible(k.lane_id),
281 k.idx.lane_terminal(k.lane_id),
282 k.idx.lane_blocked_dependencies(k.lane_id),
283 k.ctx.attempt_hash(ff_core::types::AttemptIndex::new(0)), k.ctx.stream_meta(ff_core::types::AttemptIndex::new(0)), k.ctx.payload(),
286 k.upstream_ctx.result(),
287 }
288 argv {
289 args.edge_id.to_string(),
290 args.upstream_outcome.clone(),
291 args.now.to_string(),
292 }
293 }
294}
295
296impl FromFcallResult for ResolveDependencyResult {
297 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
298 let r = FcallResult::parse(raw)?.into_success()?;
299 match r.field_str(0).as_str() {
300 "satisfied" => Ok(ResolveDependencyResult::Satisfied),
301 "impossible" => Ok(ResolveDependencyResult::Impossible),
302 "already_resolved" => Ok(ResolveDependencyResult::AlreadyResolved),
303 other => Err(ScriptError::Parse {
304 fcall: "ff_resolve_dependency".into(),
305 execution_id: None,
306 message: format!("unknown resolve status: {other}"),
307 }),
308 }
309 }
310}
311
312ff_function! {
318 pub ff_promote_blocked_to_eligible(args: PromoteBlockedToEligibleArgs) -> PromoteBlockedToEligibleResult {
319 keys(k: &DepOpKeys<'_>) {
320 k.ctx.core(),
321 k.idx.lane_blocked_dependencies(k.lane_id),
322 k.idx.lane_eligible(k.lane_id),
323 k.ctx.deps_meta(),
324 k.ctx.deps_unresolved(),
325 }
326 argv {
327 args.execution_id.to_string(),
328 args.now.to_string(),
329 }
330 }
331}
332
333impl FromFcallResult for PromoteBlockedToEligibleResult {
334 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
335 let _r = FcallResult::parse(raw)?.into_success()?;
336 Ok(PromoteBlockedToEligibleResult::Promoted)
337 }
338}
339
340ff_function! {
351 pub ff_replay_execution(args: ReplayExecutionArgs) -> ReplayExecutionResult {
352 keys(k: &DepOpKeys<'_>) {
353 k.ctx.core(),
354 k.idx.lane_terminal(k.lane_id),
355 k.idx.lane_eligible(k.lane_id),
356 k.ctx.lease_history(),
357 }
358 argv {
359 args.execution_id.to_string(),
360 args.now.to_string(),
361 }
362 }
363}
364
365impl FromFcallResult for ReplayExecutionResult {
366 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
367 let r = FcallResult::parse(raw)?.into_success()?;
368 let unsatisfied = r.field_str(0);
370 let ps = if unsatisfied == "0" {
371 PublicState::Waiting
372 } else {
373 PublicState::WaitingChildren
374 };
375 Ok(ReplayExecutionResult::Replayed { public_state: ps })
376 }
377}
378
379ff_function! {
387 pub ff_stage_dependency_edge(args: StageDependencyEdgeArgs) -> StageDependencyEdgeResult {
388 keys(k: &FlowStructOpKeys<'_>) {
389 k.fctx.core(),
390 k.fctx.members(),
391 k.fctx.edge(&args.edge_id),
392 k.fctx.outgoing(&args.upstream_execution_id),
393 k.fctx.incoming(&args.downstream_execution_id),
394 k.fctx.grant(&args.edge_id.to_string()),
395 }
396 argv {
397 args.flow_id.to_string(),
398 args.edge_id.to_string(),
399 args.upstream_execution_id.to_string(),
400 args.downstream_execution_id.to_string(),
401 args.dependency_kind.clone(),
402 args.data_passing_ref.clone().unwrap_or_default(),
403 args.expected_graph_revision.to_string(),
404 args.now.to_string(),
405 }
406 }
407}
408
409impl FromFcallResult for StageDependencyEdgeResult {
410 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
411 let r = FcallResult::parse(raw)?.into_success()?;
412 let eid = ff_core::types::EdgeId::parse(&r.field_str(0))
413 .map_err(|e| ScriptError::Parse {
414 fcall: "ff_stage_dependency_edge".into(),
415 execution_id: None,
416 message: format!("bad edge_id: {e}"),
417 })?;
418 let rev: u64 = r.field_str(1).parse()
419 .map_err(|e| ScriptError::Parse {
420 fcall: "ff_stage_dependency_edge".into(),
421 execution_id: None,
422 message: format!("bad graph_revision: {e}"),
423 })?;
424 Ok(StageDependencyEdgeResult::Staged {
425 edge_id: eid,
426 new_graph_revision: rev,
427 })
428 }
429}
430
431pub async fn ff_set_flow_tags(
445 conn: &ferriskey::Client,
446 fctx: &FlowKeyContext,
447 args: &SetFlowTagsArgs,
448) -> Result<SetFlowTagsResult, ScriptError> {
449 let keys = [fctx.core(), fctx.tags()];
450 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
451
452 let mut argv: Vec<String> = Vec::with_capacity(args.tags.len() * 2);
453 for (k, v) in &args.tags {
454 argv.push(k.clone());
455 argv.push(v.clone());
456 }
457 let argv_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
458
459 let raw = conn
460 .fcall::<ferriskey::Value>("ff_set_flow_tags", &key_refs, &argv_refs)
461 .await
462 .map_err(ScriptError::Valkey)?;
463 SetFlowTagsResult::from_fcall_result(&raw)
464}
465
466impl FromFcallResult for SetFlowTagsResult {
467 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
468 let r = FcallResult::parse(raw)?.into_success()?;
469 let count: u32 = r
470 .field_str(0)
471 .parse()
472 .map_err(|e| ScriptError::Parse {
473 fcall: "ff_set_flow_tags".into(),
474 execution_id: None,
475 message: format!("bad tag count: {e}"),
476 })?;
477 Ok(SetFlowTagsResult::Ok { count })
478 }
479}