1use ff_core::contracts::*;
18use crate::error::ScriptError;
19use ff_core::keys::{ExecKeyContext, IndexKeys};
20use ff_core::state::{AttemptType, PublicState};
21use ff_core::types::*;
22
23use crate::result::{FcallResult, FromFcallResult};
24
25#[derive(Clone, Debug, PartialEq, Eq)]
30pub struct ClaimedExecutionPartial {
31 pub lease_id: LeaseId,
32 pub lease_epoch: LeaseEpoch,
33 pub attempt_index: AttemptIndex,
34 pub attempt_id: AttemptId,
35 pub attempt_type: AttemptType,
36 pub lease_expires_at: TimestampMs,
37}
38
39#[derive(Clone, Debug, PartialEq, Eq)]
41pub enum ClaimExecutionResultPartial {
42 Claimed(ClaimedExecutionPartial),
43}
44
45impl ClaimExecutionResultPartial {
46 pub fn complete(self, execution_id: ExecutionId) -> ClaimExecutionResult {
49 match self {
50 Self::Claimed(p) => ClaimExecutionResult::Claimed(ClaimedExecution {
51 execution_id,
52 lease_id: p.lease_id,
53 lease_epoch: p.lease_epoch,
54 attempt_index: p.attempt_index,
55 attempt_id: p.attempt_id,
56 attempt_type: p.attempt_type,
57 lease_expires_at: p.lease_expires_at,
58 }),
59 }
60 }
61}
62
63#[derive(Clone, Debug, PartialEq, Eq)]
65pub enum CompleteExecutionResultPartial {
66 Completed { public_state: PublicState },
67}
68
69impl CompleteExecutionResultPartial {
70 pub fn complete(self, execution_id: ExecutionId) -> CompleteExecutionResult {
71 match self {
72 Self::Completed { public_state } => CompleteExecutionResult::Completed {
73 execution_id,
74 public_state,
75 },
76 }
77 }
78}
79
80#[derive(Clone, Debug, PartialEq, Eq)]
82pub enum CancelExecutionResultPartial {
83 Cancelled { public_state: PublicState },
84}
85
86impl CancelExecutionResultPartial {
87 pub fn complete(self, execution_id: ExecutionId) -> CancelExecutionResult {
88 match self {
89 Self::Cancelled { public_state } => CancelExecutionResult::Cancelled {
90 execution_id,
91 public_state,
92 },
93 }
94 }
95}
96
97#[derive(Clone, Debug, PartialEq, Eq)]
99pub enum DelayExecutionResultPartial {
100 Delayed { public_state: PublicState },
101}
102
103impl DelayExecutionResultPartial {
104 pub fn complete(self, execution_id: ExecutionId) -> DelayExecutionResult {
105 match self {
106 Self::Delayed { public_state } => DelayExecutionResult::Delayed {
107 execution_id,
108 public_state,
109 },
110 }
111 }
112}
113
114#[derive(Clone, Debug, PartialEq, Eq)]
116pub enum MoveToWaitingChildrenResultPartial {
117 Moved { public_state: PublicState },
118}
119
120impl MoveToWaitingChildrenResultPartial {
121 pub fn complete(self, execution_id: ExecutionId) -> MoveToWaitingChildrenResult {
122 match self {
123 Self::Moved { public_state } => MoveToWaitingChildrenResult::Moved {
124 execution_id,
125 public_state,
126 },
127 }
128 }
129}
130
131#[derive(Clone, Debug, PartialEq, Eq)]
136pub enum ExpireExecutionResultPartial {
137 Expired,
138 AlreadyTerminal,
139}
140
141impl ExpireExecutionResultPartial {
142 pub fn complete(self, execution_id: ExecutionId) -> ExpireExecutionResult {
143 match self {
144 Self::Expired => ExpireExecutionResult::Expired { execution_id },
145 Self::AlreadyTerminal => ExpireExecutionResult::AlreadyTerminal,
146 }
147 }
148}
149
150pub struct ExecOpKeys<'a> {
153 pub ctx: &'a ExecKeyContext,
154 pub idx: &'a IndexKeys,
155 pub lane_id: &'a LaneId,
156 pub worker_instance_id: &'a WorkerInstanceId,
157}
158
159ff_function! {
170 pub ff_create_execution(args: CreateExecutionArgs) -> CreateExecutionResult {
171 keys(k: &ExecOpKeys<'_>) {
172 k.ctx.core(),
173 k.ctx.payload(),
174 k.ctx.policy(),
175 k.ctx.tags(),
176 if args.delay_until.is_some() {
179 k.idx.lane_delayed(k.lane_id)
180 } else {
181 k.idx.lane_eligible(k.lane_id)
182 },
183 args.idempotency_key.as_ref().filter(|ik| !ik.is_empty()).map(|ik| {
184 ff_core::keys::idempotency_key(k.ctx.hash_tag(), args.namespace.as_str(), ik)
185 }).unwrap_or_else(|| k.ctx.noop()),
186 k.idx.execution_deadline(),
187 k.idx.all_executions(),
188 }
189 argv {
190 args.execution_id.to_string(),
191 args.namespace.to_string(),
192 args.lane_id.to_string(),
193 args.execution_kind.clone(),
194 args.priority.to_string(),
195 args.creator_identity.clone(),
196 args.policy.as_ref().map(|p| serde_json::to_string(p).unwrap_or_else(|_| "{}".into())).unwrap_or_else(|| "{}".into()),
197 String::from_utf8_lossy(&args.input_payload).into_owned(),
198 args.delay_until.map(|t| t.to_string()).unwrap_or_default(),
199 args.idempotency_key.as_ref().map(|_| "86400000".to_string()).unwrap_or_default(),
200 serde_json::to_string(&args.tags).unwrap_or_else(|_| "{}".into()),
201 String::new(),
202 args.partition_id.to_string(),
203 }
204 }
205}
206
207impl FromFcallResult for CreateExecutionResult {
208 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
209 let r = FcallResult::parse(raw)?;
210 if r.status == "DUPLICATE" {
212 let eid_str = r.field_str(0);
213 let eid = ExecutionId::parse(&eid_str)
214 .map_err(|e| ScriptError::Parse(format!("bad execution_id: {e}")))?;
215 return Ok(CreateExecutionResult::Duplicate { execution_id: eid });
216 }
217 let r = r.into_success()?;
218 let eid_str = r.field_str(0);
219 let ps_str = r.field_str(1);
220 let eid = ExecutionId::parse(&eid_str)
221 .map_err(|e| ScriptError::Parse(format!("bad execution_id: {e}")))?;
222 let public_state = parse_public_state(&ps_str)?;
223 Ok(CreateExecutionResult::Created {
224 execution_id: eid,
225 public_state,
226 })
227 }
228}
229
230ff_function! {
242 pub ff_claim_execution(args: ClaimExecutionArgs) -> ClaimExecutionResultPartial {
243 keys(k: &ExecOpKeys<'_>) {
244 k.ctx.core(),
245 k.ctx.claim_grant(),
246 k.idx.lane_eligible(k.lane_id),
247 k.idx.lease_expiry(),
248 k.idx.worker_leases(k.worker_instance_id),
249 k.ctx.attempt_hash(args.expected_attempt_index),
250 k.ctx.attempt_usage(args.expected_attempt_index),
251 k.ctx.attempt_policy(args.expected_attempt_index),
252 k.ctx.attempts(),
253 k.ctx.lease_current(),
254 k.ctx.lease_history(),
255 k.idx.lane_active(k.lane_id),
256 k.idx.attempt_timeout(),
257 k.idx.execution_deadline(),
258 }
259 argv {
260 args.execution_id.to_string(),
261 args.worker_id.to_string(),
262 args.worker_instance_id.to_string(),
263 args.lane_id.to_string(),
264 String::new(),
265 args.lease_id.to_string(),
266 args.lease_ttl_ms.to_string(),
267 (args.lease_ttl_ms * 2 / 3).to_string(),
268 args.attempt_id.to_string(),
269 args.attempt_policy_json.clone(),
270 args.attempt_timeout_ms.map(|t| t.to_string()).unwrap_or_default(),
271 args.execution_deadline_at.map(|t| t.to_string()).unwrap_or_default(),
272 }
273 }
274}
275
276impl FromFcallResult for ClaimExecutionResultPartial {
277 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
278 let r = FcallResult::parse(raw)?.into_success()?;
279 let lease_id = LeaseId::parse(&r.field_str(0))
281 .map_err(|e| ScriptError::Parse(format!("bad lease_id: {e}")))?;
282 let epoch = r.field_str(1).parse::<u64>()
283 .map_err(|e| ScriptError::Parse(format!("bad epoch: {e}")))?;
284 let expires_at = r.field_str(2).parse::<i64>()
285 .map_err(|e| ScriptError::Parse(format!("bad expires_at: {e}")))?;
286 let attempt_id = AttemptId::parse(&r.field_str(3))
287 .map_err(|e| ScriptError::Parse(format!("bad attempt_id: {e}")))?;
288 let attempt_index = r.field_str(4).parse::<u32>()
289 .map_err(|e| ScriptError::Parse(format!("bad attempt_index: {e}")))?;
290 let attempt_type = parse_attempt_type(&r.field_str(5))?;
291
292 Ok(Self::Claimed(ClaimedExecutionPartial {
293 lease_id,
294 lease_epoch: LeaseEpoch::new(epoch),
295 attempt_index: AttemptIndex::new(attempt_index),
296 attempt_id,
297 attempt_type,
298 lease_expires_at: TimestampMs::from_millis(expires_at),
299 }))
300 }
301}
302
303ff_function! {
312 pub ff_complete_execution(args: CompleteExecutionArgs) -> CompleteExecutionResultPartial {
313 keys(k: &ExecOpKeys<'_>) {
314 k.ctx.core(),
315 k.ctx.attempt_hash(args.attempt_index),
316 k.idx.lease_expiry(),
317 k.idx.worker_leases(k.worker_instance_id),
318 k.idx.lane_terminal(k.lane_id),
319 k.ctx.lease_current(),
320 k.ctx.lease_history(),
321 k.idx.lane_active(k.lane_id),
322 k.ctx.stream_meta(args.attempt_index),
323 k.ctx.result(),
324 k.idx.attempt_timeout(),
325 k.idx.execution_deadline(),
326 }
327 argv {
328 args.execution_id.to_string(),
329 args.lease_id.to_string(),
330 args.lease_epoch.to_string(),
331 args.attempt_id.to_string(),
332 args.result_payload.as_ref()
333 .map(|p| String::from_utf8_lossy(p).into_owned())
334 .unwrap_or_default(),
335 }
336 }
337}
338
339impl FromFcallResult for CompleteExecutionResultPartial {
340 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
341 let _r = FcallResult::parse(raw)?.into_success()?;
342 Ok(Self::Completed { public_state: PublicState::Completed })
344 }
345}
346
347ff_function! {
369 pub ff_cancel_execution(args: CancelExecutionArgs) -> CancelExecutionResultPartial {
370 keys(k: &ExecOpKeys<'_>) {
371 k.ctx.core(), k.ctx.attempt_hash(AttemptIndex::new(0)), k.ctx.stream_meta(AttemptIndex::new(0)), k.ctx.lease_current(), k.ctx.lease_history(), k.idx.lease_expiry(), k.idx.worker_leases(k.worker_instance_id), k.ctx.suspension_current(), k.ctx.suspension_current(), k.ctx.suspension_current(), k.idx.suspension_timeout(), k.idx.lane_terminal(k.lane_id), k.idx.attempt_timeout(), k.idx.execution_deadline(), k.idx.lane_eligible(k.lane_id), k.idx.lane_delayed(k.lane_id), k.idx.lane_blocked_dependencies(k.lane_id), k.idx.lane_blocked_budget(k.lane_id), k.idx.lane_blocked_quota(k.lane_id), k.idx.lane_blocked_route(k.lane_id), k.idx.lane_blocked_operator(k.lane_id), }
393 argv {
394 args.execution_id.to_string(),
395 args.reason.clone(),
396 args.source.to_string(),
397 args.lease_id.as_ref().map(|l| l.to_string()).unwrap_or_default(),
398 args.lease_epoch.as_ref().map(|e| e.to_string()).unwrap_or_default(),
399 }
400 }
401}
402
403impl FromFcallResult for CancelExecutionResultPartial {
404 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
405 let _r = FcallResult::parse(raw)?.into_success()?;
406 Ok(Self::Cancelled { public_state: PublicState::Cancelled })
408 }
409}
410
411ff_function! {
419 pub ff_delay_execution(args: DelayExecutionArgs) -> DelayExecutionResultPartial {
420 keys(k: &ExecOpKeys<'_>) {
421 k.ctx.core(),
422 k.ctx.attempt_hash(args.attempt_index),
423 k.ctx.lease_current(),
424 k.ctx.lease_history(),
425 k.idx.lease_expiry(),
426 k.idx.worker_leases(k.worker_instance_id),
427 k.idx.lane_active(k.lane_id),
428 k.idx.lane_delayed(k.lane_id),
429 k.idx.attempt_timeout(),
430 }
431 argv {
432 args.execution_id.to_string(),
433 args.lease_id.to_string(),
434 args.lease_epoch.to_string(),
435 args.attempt_id.to_string(),
436 args.delay_until.to_string(),
437 }
438 }
439}
440
441impl FromFcallResult for DelayExecutionResultPartial {
442 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
443 let _r = FcallResult::parse(raw)?.into_success()?;
444 Ok(Self::Delayed { public_state: PublicState::Delayed })
446 }
447}
448
449ff_function! {
457 pub ff_move_to_waiting_children(args: MoveToWaitingChildrenArgs) -> MoveToWaitingChildrenResultPartial {
458 keys(k: &ExecOpKeys<'_>) {
459 k.ctx.core(),
460 k.ctx.attempt_hash(args.attempt_index),
461 k.ctx.lease_current(),
462 k.ctx.lease_history(),
463 k.idx.lease_expiry(),
464 k.idx.worker_leases(k.worker_instance_id),
465 k.idx.lane_active(k.lane_id),
466 k.idx.lane_blocked_dependencies(k.lane_id),
467 k.idx.attempt_timeout(),
468 }
469 argv {
470 args.execution_id.to_string(),
471 args.lease_id.to_string(),
472 args.lease_epoch.to_string(),
473 args.attempt_id.to_string(),
474 }
475 }
476}
477
478impl FromFcallResult for MoveToWaitingChildrenResultPartial {
479 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
480 let _r = FcallResult::parse(raw)?.into_success()?;
481 Ok(Self::Moved { public_state: PublicState::WaitingChildren })
483 }
484}
485
486ff_function! {
496 pub ff_fail_execution(args: FailExecutionArgs) -> FailExecutionResult {
497 keys(k: &ExecOpKeys<'_>) {
498 k.ctx.core(),
499 k.ctx.attempt_hash(args.attempt_index),
500 k.idx.lease_expiry(),
501 k.idx.worker_leases(k.worker_instance_id),
502 k.idx.lane_terminal(k.lane_id),
503 k.idx.lane_delayed(k.lane_id),
504 k.ctx.lease_current(),
505 k.ctx.lease_history(),
506 k.idx.lane_active(k.lane_id),
507 k.ctx.stream_meta(args.attempt_index),
508 k.idx.attempt_timeout(),
509 k.idx.execution_deadline(),
510 }
511 argv {
512 args.execution_id.to_string(),
513 args.lease_id.to_string(),
514 args.lease_epoch.to_string(),
515 args.attempt_id.to_string(),
516 args.failure_reason.clone(),
517 args.failure_category.clone(),
518 args.retry_policy_json.clone(),
519 }
520 }
521}
522
523impl FromFcallResult for FailExecutionResult {
524 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
525 let r = FcallResult::parse(raw)?.into_success()?;
526 let sub_status = r.field_str(0);
528 match sub_status.as_str() {
529 "retry_scheduled" => {
530 let delay_str = r.field_str(1);
531 let delay_ms: i64 = delay_str
532 .parse()
533 .map_err(|e| ScriptError::Parse(format!("bad delay_until: {e}")))?;
534 Ok(FailExecutionResult::RetryScheduled {
535 delay_until: TimestampMs::from_millis(delay_ms),
536 next_attempt_index: AttemptIndex::new(0), })
538 }
539 "terminal_failed" => Ok(FailExecutionResult::TerminalFailed),
540 _ => Err(ScriptError::Parse(format!(
541 "unexpected fail sub-status: {sub_status}"
542 ))),
543 }
544 }
545}
546
547ff_function! {
557 pub ff_expire_execution(args: ExpireExecutionArgs) -> ExpireExecutionResultPartial {
558 keys(k: &ExecOpKeys<'_>) {
559 k.ctx.core(),
560 k.ctx.attempt_hash(AttemptIndex::new(0)), k.ctx.stream_meta(AttemptIndex::new(0)), k.ctx.lease_current(),
563 k.ctx.lease_history(),
564 k.idx.lease_expiry(),
565 k.idx.worker_leases(k.worker_instance_id),
566 k.idx.lane_active(k.lane_id),
567 k.idx.lane_terminal(k.lane_id),
568 k.idx.attempt_timeout(),
569 k.idx.execution_deadline(),
570 k.idx.lane_suspended(k.lane_id),
571 k.idx.suspension_timeout(),
572 k.ctx.suspension_current(),
573 }
574 argv {
575 args.execution_id.to_string(),
576 args.expire_reason.clone(),
577 }
578 }
579}
580
581impl FromFcallResult for ExpireExecutionResultPartial {
582 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
583 let r = FcallResult::parse(raw)?.into_success()?;
584 let sub = r.field_str(0);
586 match sub.as_str() {
587 "already_terminal" | "not_found_cleaned" => Ok(Self::AlreadyTerminal),
588 "expired" => Ok(Self::Expired),
589 _ => Ok(Self::Expired),
590 }
591 }
592}
593
594fn parse_public_state(s: &str) -> Result<PublicState, ScriptError> {
597 match s {
598 "waiting" => Ok(PublicState::Waiting),
599 "delayed" => Ok(PublicState::Delayed),
600 "rate_limited" => Ok(PublicState::RateLimited),
601 "waiting_children" => Ok(PublicState::WaitingChildren),
602 "active" => Ok(PublicState::Active),
603 "suspended" => Ok(PublicState::Suspended),
604 "completed" => Ok(PublicState::Completed),
605 "failed" => Ok(PublicState::Failed),
606 "cancelled" => Ok(PublicState::Cancelled),
607 "expired" => Ok(PublicState::Expired),
608 "skipped" => Ok(PublicState::Skipped),
609 _ => Err(ScriptError::Parse(format!("unknown public_state: {s}"))),
610 }
611}
612
613fn parse_attempt_type(s: &str) -> Result<AttemptType, ScriptError> {
614 match s {
615 "initial" => Ok(AttemptType::Initial),
616 "retry" => Ok(AttemptType::Retry),
617 "reclaim" => Ok(AttemptType::Reclaim),
618 "replay" => Ok(AttemptType::Replay),
619 "fallback" => Ok(AttemptType::Fallback),
620 _ => Err(ScriptError::Parse(format!("unknown attempt_type: {s}"))),
621 }
622}
623
624#[cfg(test)]
626mod partial_tests {
627 use super::*;
628 use ff_core::partition::PartitionConfig;
629
630 fn test_eid() -> ExecutionId {
631 ExecutionId::for_flow(&FlowId::new(), &PartitionConfig::default())
632 }
633
634 #[test]
635 fn claim_partial_complete_attaches_execution_id() {
636 let partial = ClaimExecutionResultPartial::Claimed(ClaimedExecutionPartial {
637 lease_id: LeaseId::new(),
638 lease_epoch: LeaseEpoch::new(1),
639 attempt_index: AttemptIndex::new(0),
640 attempt_id: AttemptId::new(),
641 attempt_type: AttemptType::Initial,
642 lease_expires_at: TimestampMs::from_millis(1000),
643 });
644 let eid = test_eid();
645 let full = partial.complete(eid.clone());
646 match full {
647 ClaimExecutionResult::Claimed(c) => assert_eq!(c.execution_id, eid),
648 }
649 }
650
651 #[test]
652 fn complete_partial_complete_attaches_execution_id() {
653 let partial = CompleteExecutionResultPartial::Completed {
654 public_state: PublicState::Completed,
655 };
656 let eid = test_eid();
657 let full = partial.complete(eid.clone());
658 match full {
659 CompleteExecutionResult::Completed { execution_id, .. } => assert_eq!(execution_id, eid),
660 }
661 }
662
663 #[test]
664 fn cancel_partial_complete_attaches_execution_id() {
665 let partial = CancelExecutionResultPartial::Cancelled {
666 public_state: PublicState::Cancelled,
667 };
668 let eid = test_eid();
669 let full = partial.complete(eid.clone());
670 match full {
671 CancelExecutionResult::Cancelled { execution_id, .. } => assert_eq!(execution_id, eid),
672 }
673 }
674
675 #[test]
676 fn delay_partial_complete_attaches_execution_id() {
677 let partial = DelayExecutionResultPartial::Delayed {
678 public_state: PublicState::Delayed,
679 };
680 let eid = test_eid();
681 let full = partial.complete(eid.clone());
682 match full {
683 DelayExecutionResult::Delayed { execution_id, .. } => assert_eq!(execution_id, eid),
684 }
685 }
686
687 #[test]
688 fn move_to_waiting_children_partial_complete_attaches_execution_id() {
689 let partial = MoveToWaitingChildrenResultPartial::Moved {
690 public_state: PublicState::WaitingChildren,
691 };
692 let eid = test_eid();
693 let full = partial.complete(eid.clone());
694 match full {
695 MoveToWaitingChildrenResult::Moved { execution_id, .. } => assert_eq!(execution_id, eid),
696 }
697 }
698
699 #[test]
700 fn expire_partial_expired_variant_attaches_execution_id() {
701 let partial = ExpireExecutionResultPartial::Expired;
702 let eid = test_eid();
703 let full = partial.complete(eid.clone());
704 match full {
705 ExpireExecutionResult::Expired { execution_id } => assert_eq!(execution_id, eid),
706 _ => panic!("expected Expired variant"),
707 }
708 }
709
710 #[test]
711 fn expire_partial_already_terminal_variant_ignores_execution_id() {
712 let partial = ExpireExecutionResultPartial::AlreadyTerminal;
716 let eid = test_eid();
717 let full = partial.complete(eid);
718 assert!(matches!(full, ExpireExecutionResult::AlreadyTerminal));
719 }
720}