1use crate::error::ScriptError;
18use ff_core::contracts::*;
19use ff_core::keys::{ExecKeyContext, IndexKeys};
20use ff_core::state::{AttemptType, PublicState};
21use ff_core::types::*;
22
23use crate::result::{FcallResult, FromFcallResult};
24
25#[derive(Clone, Debug, PartialEq, Eq)]
30pub struct ClaimedExecutionPartial {
31 pub lease_id: LeaseId,
32 pub lease_epoch: LeaseEpoch,
33 pub attempt_index: AttemptIndex,
34 pub attempt_id: AttemptId,
35 pub attempt_type: AttemptType,
36 pub lease_expires_at: TimestampMs,
37}
38
39#[derive(Clone, Debug, PartialEq, Eq)]
41pub enum ClaimExecutionResultPartial {
42 Claimed(ClaimedExecutionPartial),
43}
44
45impl ClaimExecutionResultPartial {
46 pub fn complete(self, execution_id: ExecutionId) -> ClaimExecutionResult {
49 match self {
50 Self::Claimed(p) => ClaimExecutionResult::Claimed(ClaimedExecution {
51 execution_id,
52 lease_id: p.lease_id,
53 lease_epoch: p.lease_epoch,
54 attempt_index: p.attempt_index,
55 attempt_id: p.attempt_id,
56 attempt_type: p.attempt_type,
57 lease_expires_at: p.lease_expires_at,
58 }),
59 }
60 }
61}
62
63#[derive(Clone, Debug, PartialEq, Eq)]
65pub enum CompleteExecutionResultPartial {
66 Completed { public_state: PublicState },
67}
68
69impl CompleteExecutionResultPartial {
70 pub fn complete(self, execution_id: ExecutionId) -> CompleteExecutionResult {
71 match self {
72 Self::Completed { public_state } => CompleteExecutionResult::Completed {
73 execution_id,
74 public_state,
75 },
76 }
77 }
78}
79
80#[derive(Clone, Debug, PartialEq, Eq)]
82pub enum CancelExecutionResultPartial {
83 Cancelled { public_state: PublicState },
84}
85
86impl CancelExecutionResultPartial {
87 pub fn complete(self, execution_id: ExecutionId) -> CancelExecutionResult {
88 match self {
89 Self::Cancelled { public_state } => CancelExecutionResult::Cancelled {
90 execution_id,
91 public_state,
92 },
93 }
94 }
95}
96
97#[derive(Clone, Debug, PartialEq, Eq)]
99pub enum DelayExecutionResultPartial {
100 Delayed { public_state: PublicState },
101}
102
103impl DelayExecutionResultPartial {
104 pub fn complete(self, execution_id: ExecutionId) -> DelayExecutionResult {
105 match self {
106 Self::Delayed { public_state } => DelayExecutionResult::Delayed {
107 execution_id,
108 public_state,
109 },
110 }
111 }
112}
113
114#[derive(Clone, Debug, PartialEq, Eq)]
116pub enum MoveToWaitingChildrenResultPartial {
117 Moved { public_state: PublicState },
118}
119
120impl MoveToWaitingChildrenResultPartial {
121 pub fn complete(self, execution_id: ExecutionId) -> MoveToWaitingChildrenResult {
122 match self {
123 Self::Moved { public_state } => MoveToWaitingChildrenResult::Moved {
124 execution_id,
125 public_state,
126 },
127 }
128 }
129}
130
131#[derive(Clone, Debug, PartialEq, Eq)]
136pub enum ExpireExecutionResultPartial {
137 Expired,
138 AlreadyTerminal,
139}
140
141impl ExpireExecutionResultPartial {
142 pub fn complete(self, execution_id: ExecutionId) -> ExpireExecutionResult {
143 match self {
144 Self::Expired => ExpireExecutionResult::Expired { execution_id },
145 Self::AlreadyTerminal => ExpireExecutionResult::AlreadyTerminal,
146 }
147 }
148}
149
150pub struct ExecOpKeys<'a> {
153 pub ctx: &'a ExecKeyContext,
154 pub idx: &'a IndexKeys,
155 pub lane_id: &'a LaneId,
156 pub worker_instance_id: &'a WorkerInstanceId,
157}
158
159ff_function! {
170 pub ff_create_execution(args: CreateExecutionArgs) -> CreateExecutionResult {
171 keys(k: &ExecOpKeys<'_>) {
172 k.ctx.core(),
173 k.ctx.payload(),
174 k.ctx.policy(),
175 k.ctx.tags(),
176 if args.delay_until.is_some() {
179 k.idx.lane_delayed(k.lane_id)
180 } else {
181 k.idx.lane_eligible(k.lane_id)
182 },
183 args.idempotency_key.as_ref().filter(|ik| !ik.is_empty()).map(|ik| {
184 ff_core::keys::idempotency_key(k.ctx.hash_tag(), args.namespace.as_str(), ik)
185 }).unwrap_or_else(|| k.ctx.noop()),
186 k.idx.execution_deadline(),
187 k.idx.all_executions(),
188 }
189 argv {
190 args.execution_id.to_string(),
191 args.namespace.to_string(),
192 args.lane_id.to_string(),
193 args.execution_kind.clone(),
194 args.priority.to_string(),
195 args.creator_identity.clone(),
196 args.policy.as_ref().map(|p| serde_json::to_string(p).unwrap_or_else(|_| "{}".into())).unwrap_or_else(|| "{}".into()),
197 String::from_utf8_lossy(&args.input_payload).into_owned(),
198 args.delay_until.map(|t| t.to_string()).unwrap_or_default(),
199 args.idempotency_key.as_ref().map(|_| "86400000".to_string()).unwrap_or_default(),
200 serde_json::to_string(&args.tags).unwrap_or_else(|_| "{}".into()),
201 args.execution_deadline_at.map(|t| t.to_string()).unwrap_or_default(),
202 args.partition_id.to_string(),
203 }
204 }
205}
206
207impl FromFcallResult for CreateExecutionResult {
208 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
209 let r = FcallResult::parse(raw)?;
210 if r.status == "DUPLICATE" {
212 let eid_str = r.field_str(0);
213 let eid = ExecutionId::parse(&eid_str)
214 .map_err(|e| ScriptError::Parse {
215 fcall: "ff_create_execution".into(),
216 execution_id: None,
217 message: format!("bad execution_id: {e}"),
218 })?;
219 return Ok(CreateExecutionResult::Duplicate { execution_id: eid });
220 }
221 let r = r.into_success()?;
222 let eid_str = r.field_str(0);
223 let ps_str = r.field_str(1);
224 let eid = ExecutionId::parse(&eid_str)
225 .map_err(|e| ScriptError::Parse {
226 fcall: "ff_create_execution".into(),
227 execution_id: None,
228 message: format!("bad execution_id: {e}"),
229 })?;
230 let public_state = parse_public_state(&ps_str)?;
231 Ok(CreateExecutionResult::Created {
232 execution_id: eid,
233 public_state,
234 })
235 }
236}
237
238ff_function! {
250 pub ff_claim_execution(args: ClaimExecutionArgs) -> ClaimExecutionResultPartial {
251 keys(k: &ExecOpKeys<'_>) {
252 k.ctx.core(),
253 k.ctx.claim_grant(),
254 k.idx.lane_eligible(k.lane_id),
255 k.idx.lease_expiry(),
256 k.idx.worker_leases(k.worker_instance_id),
257 k.ctx.attempt_hash(args.expected_attempt_index),
258 k.ctx.attempt_usage(args.expected_attempt_index),
259 k.ctx.attempt_policy(args.expected_attempt_index),
260 k.ctx.attempts(),
261 k.ctx.lease_current(),
262 k.ctx.lease_history(),
263 k.idx.lane_active(k.lane_id),
264 k.idx.attempt_timeout(),
265 k.idx.execution_deadline(),
266 }
267 argv {
268 args.execution_id.to_string(),
269 args.worker_id.to_string(),
270 args.worker_instance_id.to_string(),
271 args.lane_id.to_string(),
272 String::new(),
273 args.lease_id.to_string(),
274 args.lease_ttl_ms.to_string(),
275 (args.lease_ttl_ms * 2 / 3).to_string(),
276 args.attempt_id.to_string(),
277 args.attempt_policy_json.clone(),
278 args.attempt_timeout_ms.map(|t| t.to_string()).unwrap_or_default(),
279 args.execution_deadline_at.map(|t| t.to_string()).unwrap_or_default(),
280 }
281 }
282}
283
284impl FromFcallResult for ClaimExecutionResultPartial {
285 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
286 let r = FcallResult::parse(raw)?.into_success()?;
287 let lease_id = LeaseId::parse(&r.field_str(0))
289 .map_err(|e| ScriptError::Parse {
290 fcall: "ff_claim_execution_result_partial".into(),
291 execution_id: None,
292 message: format!("bad lease_id: {e}"),
293 })?;
294 let epoch = r
295 .field_str(1)
296 .parse::<u64>()
297 .map_err(|e| ScriptError::Parse {
298 fcall: "ff_claim_execution_result_partial".into(),
299 execution_id: None,
300 message: format!("bad epoch: {e}"),
301 })?;
302 let expires_at = r
303 .field_str(2)
304 .parse::<i64>()
305 .map_err(|e| ScriptError::Parse {
306 fcall: "ff_claim_execution_result_partial".into(),
307 execution_id: None,
308 message: format!("bad expires_at: {e}"),
309 })?;
310 let attempt_id = AttemptId::parse(&r.field_str(3))
311 .map_err(|e| ScriptError::Parse {
312 fcall: "ff_claim_execution_result_partial".into(),
313 execution_id: None,
314 message: format!("bad attempt_id: {e}"),
315 })?;
316 let attempt_index = r
317 .field_str(4)
318 .parse::<u32>()
319 .map_err(|e| ScriptError::Parse {
320 fcall: "ff_claim_execution_result_partial".into(),
321 execution_id: None,
322 message: format!("bad attempt_index: {e}"),
323 })?;
324 let attempt_type = parse_attempt_type(&r.field_str(5))?;
325
326 Ok(Self::Claimed(ClaimedExecutionPartial {
327 lease_id,
328 lease_epoch: LeaseEpoch::new(epoch),
329 attempt_index: AttemptIndex::new(attempt_index),
330 attempt_id,
331 attempt_type,
332 lease_expires_at: TimestampMs::from_millis(expires_at),
333 }))
334 }
335}
336
337ff_function! {
351 pub ff_complete_execution(args: CompleteExecutionArgs) -> CompleteExecutionResultPartial {
352 keys(k: &ExecOpKeys<'_>) {
353 k.ctx.core(),
354 k.ctx.attempt_hash(args.attempt_index),
355 k.idx.lease_expiry(),
356 k.idx.worker_leases(k.worker_instance_id),
357 k.idx.lane_terminal(k.lane_id),
358 k.ctx.lease_current(),
359 k.ctx.lease_history(),
360 k.idx.lane_active(k.lane_id),
361 k.ctx.stream_meta(args.attempt_index),
362 k.ctx.result(),
363 k.idx.attempt_timeout(),
364 k.idx.execution_deadline(),
365 }
366 argv {
367 args.execution_id.to_string(),
368 args.fence.as_ref().map(|f| f.lease_id.to_string()).unwrap_or_default(),
369 args.fence.as_ref().map(|f| f.lease_epoch.to_string()).unwrap_or_default(),
370 args.fence.as_ref().map(|f| f.attempt_id.to_string()).unwrap_or_default(),
371 args.result_payload.as_ref()
372 .map(|p| String::from_utf8_lossy(p).into_owned())
373 .unwrap_or_default(),
374 args.source.to_string(),
375 }
376 }
377}
378
379impl FromFcallResult for CompleteExecutionResultPartial {
380 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
381 let _r = FcallResult::parse(raw)?.into_success()?;
382 Ok(Self::Completed {
384 public_state: PublicState::Completed,
385 })
386 }
387}
388
389ff_function! {
411 pub ff_cancel_execution(args: CancelExecutionArgs) -> CancelExecutionResultPartial {
412 keys(k: &ExecOpKeys<'_>) {
413 k.ctx.core(), k.ctx.attempt_hash(AttemptIndex::new(0)), k.ctx.stream_meta(AttemptIndex::new(0)), k.ctx.lease_current(), k.ctx.lease_history(), k.idx.lease_expiry(), k.idx.worker_leases(k.worker_instance_id), k.ctx.suspension_current(), k.ctx.suspension_current(), k.ctx.suspension_current(), k.idx.suspension_timeout(), k.idx.lane_terminal(k.lane_id), k.idx.attempt_timeout(), k.idx.execution_deadline(), k.idx.lane_eligible(k.lane_id), k.idx.lane_delayed(k.lane_id), k.idx.lane_blocked_dependencies(k.lane_id), k.idx.lane_blocked_budget(k.lane_id), k.idx.lane_blocked_quota(k.lane_id), k.idx.lane_blocked_route(k.lane_id), k.idx.lane_blocked_operator(k.lane_id), }
435 argv {
436 args.execution_id.to_string(),
437 args.reason.clone(),
438 args.source.to_string(),
439 args.lease_id.as_ref().map(|l| l.to_string()).unwrap_or_default(),
440 args.lease_epoch.as_ref().map(|e| e.to_string()).unwrap_or_default(),
441 }
442 }
443}
444
445impl FromFcallResult for CancelExecutionResultPartial {
446 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
447 let _r = FcallResult::parse(raw)?.into_success()?;
448 Ok(Self::Cancelled {
450 public_state: PublicState::Cancelled,
451 })
452 }
453}
454
455ff_function! {
466 pub ff_delay_execution(args: DelayExecutionArgs) -> DelayExecutionResultPartial {
467 keys(k: &ExecOpKeys<'_>) {
468 k.ctx.core(),
469 k.ctx.attempt_hash(args.attempt_index),
470 k.ctx.lease_current(),
471 k.ctx.lease_history(),
472 k.idx.lease_expiry(),
473 k.idx.worker_leases(k.worker_instance_id),
474 k.idx.lane_active(k.lane_id),
475 k.idx.lane_delayed(k.lane_id),
476 k.idx.attempt_timeout(),
477 }
478 argv {
479 args.execution_id.to_string(),
480 args.fence.as_ref().map(|f| f.lease_id.to_string()).unwrap_or_default(),
481 args.fence.as_ref().map(|f| f.lease_epoch.to_string()).unwrap_or_default(),
482 args.fence.as_ref().map(|f| f.attempt_id.to_string()).unwrap_or_default(),
483 args.delay_until.to_string(),
484 args.source.to_string(),
485 }
486 }
487}
488
489impl FromFcallResult for DelayExecutionResultPartial {
490 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
491 let _r = FcallResult::parse(raw)?.into_success()?;
492 Ok(Self::Delayed {
494 public_state: PublicState::Delayed,
495 })
496 }
497}
498
499ff_function! {
509 pub ff_move_to_waiting_children(args: MoveToWaitingChildrenArgs) -> MoveToWaitingChildrenResultPartial {
510 keys(k: &ExecOpKeys<'_>) {
511 k.ctx.core(),
512 k.ctx.attempt_hash(args.attempt_index),
513 k.ctx.lease_current(),
514 k.ctx.lease_history(),
515 k.idx.lease_expiry(),
516 k.idx.worker_leases(k.worker_instance_id),
517 k.idx.lane_active(k.lane_id),
518 k.idx.lane_blocked_dependencies(k.lane_id),
519 k.idx.attempt_timeout(),
520 }
521 argv {
522 args.execution_id.to_string(),
523 args.fence.as_ref().map(|f| f.lease_id.to_string()).unwrap_or_default(),
524 args.fence.as_ref().map(|f| f.lease_epoch.to_string()).unwrap_or_default(),
525 args.fence.as_ref().map(|f| f.attempt_id.to_string()).unwrap_or_default(),
526 args.source.to_string(),
527 }
528 }
529}
530
531impl FromFcallResult for MoveToWaitingChildrenResultPartial {
532 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
533 let _r = FcallResult::parse(raw)?.into_success()?;
534 Ok(Self::Moved {
536 public_state: PublicState::WaitingChildren,
537 })
538 }
539}
540
541ff_function! {
553 pub ff_fail_execution(args: FailExecutionArgs) -> FailExecutionResult {
554 keys(k: &ExecOpKeys<'_>) {
555 k.ctx.core(),
556 k.ctx.attempt_hash(args.attempt_index),
557 k.idx.lease_expiry(),
558 k.idx.worker_leases(k.worker_instance_id),
559 k.idx.lane_terminal(k.lane_id),
560 k.idx.lane_delayed(k.lane_id),
561 k.ctx.lease_current(),
562 k.ctx.lease_history(),
563 k.idx.lane_active(k.lane_id),
564 k.ctx.stream_meta(args.attempt_index),
565 k.idx.attempt_timeout(),
566 k.idx.execution_deadline(),
567 }
568 argv {
569 args.execution_id.to_string(),
570 args.fence.as_ref().map(|f| f.lease_id.to_string()).unwrap_or_default(),
571 args.fence.as_ref().map(|f| f.lease_epoch.to_string()).unwrap_or_default(),
572 args.fence.as_ref().map(|f| f.attempt_id.to_string()).unwrap_or_default(),
573 args.failure_reason.clone(),
574 args.failure_category.clone(),
575 args.retry_policy_json.clone(),
576 args.source.to_string(),
577 }
578 }
579}
580
581impl FromFcallResult for FailExecutionResult {
582 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
583 let r = FcallResult::parse(raw)?.into_success()?;
584 let sub_status = r.field_str(0);
586 match sub_status.as_str() {
587 "retry_scheduled" => {
588 let delay_str = r.field_str(1);
589 let delay_ms: i64 = delay_str
590 .parse()
591 .map_err(|e| ScriptError::Parse {
592 fcall: "ff_fail_execution".into(),
593 execution_id: None,
594 message: format!("bad delay_until: {e}"),
595 })?;
596 Ok(FailExecutionResult::RetryScheduled {
597 delay_until: TimestampMs::from_millis(delay_ms),
598 next_attempt_index: AttemptIndex::new(0), })
600 }
601 "terminal_failed" => Ok(FailExecutionResult::TerminalFailed),
602 _ => Err(ScriptError::Parse {
603 fcall: "ff_fail_execution".into(),
604 execution_id: None,
605 message: format!(
606 "unexpected fail sub-status: {sub_status}"
607 ),
608 }),
609 }
610 }
611}
612
613ff_function! {
623 pub ff_expire_execution(args: ExpireExecutionArgs) -> ExpireExecutionResultPartial {
624 keys(k: &ExecOpKeys<'_>) {
625 k.ctx.core(),
626 k.ctx.attempt_hash(AttemptIndex::new(0)), k.ctx.stream_meta(AttemptIndex::new(0)), k.ctx.lease_current(),
629 k.ctx.lease_history(),
630 k.idx.lease_expiry(),
631 k.idx.worker_leases(k.worker_instance_id),
632 k.idx.lane_active(k.lane_id),
633 k.idx.lane_terminal(k.lane_id),
634 k.idx.attempt_timeout(),
635 k.idx.execution_deadline(),
636 k.idx.lane_suspended(k.lane_id),
637 k.idx.suspension_timeout(),
638 k.ctx.suspension_current(),
639 }
640 argv {
641 args.execution_id.to_string(),
642 args.expire_reason.clone(),
643 }
644 }
645}
646
647impl FromFcallResult for ExpireExecutionResultPartial {
648 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
649 let r = FcallResult::parse(raw)?.into_success()?;
650 let sub = r.field_str(0);
652 match sub.as_str() {
653 "already_terminal" | "not_found_cleaned" => Ok(Self::AlreadyTerminal),
654 "expired" => Ok(Self::Expired),
655 _ => Ok(Self::Expired),
656 }
657 }
658}
659
660pub async fn ff_set_execution_tags(
679 conn: &ferriskey::Client,
680 ctx: &ExecKeyContext,
681 args: &SetExecutionTagsArgs,
682) -> Result<SetExecutionTagsResult, ScriptError> {
683 let keys = [ctx.core(), ctx.tags()];
684 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
685
686 let mut argv: Vec<String> = Vec::with_capacity(args.tags.len() * 2);
690 for (k, v) in &args.tags {
691 argv.push(k.clone());
692 argv.push(v.clone());
693 }
694 let argv_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
695
696 let raw = conn
697 .fcall::<ferriskey::Value>("ff_set_execution_tags", &key_refs, &argv_refs)
698 .await
699 .map_err(ScriptError::Valkey)?;
700 SetExecutionTagsResult::from_fcall_result(&raw)
701}
702
703impl FromFcallResult for SetExecutionTagsResult {
704 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
705 let r = FcallResult::parse(raw)?.into_success()?;
706 let count: u32 = r
707 .field_str(0)
708 .parse()
709 .map_err(|e| ScriptError::Parse {
710 fcall: "ff_set_execution_tags".into(),
711 execution_id: None,
712 message: format!("bad tag count: {e}"),
713 })?;
714 Ok(SetExecutionTagsResult::Ok { count })
715 }
716}
717
718pub fn validate_tag_key(key: &str) -> Result<(), ScriptError> {
734 let mut chars = key.chars();
735 let first = match chars.next() {
736 Some(c) => c,
737 None => return Err(ScriptError::InvalidTagKey(key.to_owned())),
738 };
739 if !first.is_ascii_lowercase() {
740 return Err(ScriptError::InvalidTagKey(key.to_owned()));
741 }
742 let mut saw_dot = false;
743 for c in chars.by_ref() {
744 if c == '.' {
745 saw_dot = true;
746 break;
747 }
748 if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
749 return Err(ScriptError::InvalidTagKey(key.to_owned()));
750 }
751 }
752 if !saw_dot {
753 return Err(ScriptError::InvalidTagKey(key.to_owned()));
754 }
755 match chars.next() {
757 Some(c) if c != '.' => Ok(()),
758 _ => Err(ScriptError::InvalidTagKey(key.to_owned())),
759 }
760}
761
762fn parse_public_state(s: &str) -> Result<PublicState, ScriptError> {
765 match s {
766 "waiting" => Ok(PublicState::Waiting),
767 "delayed" => Ok(PublicState::Delayed),
768 "rate_limited" => Ok(PublicState::RateLimited),
769 "waiting_children" => Ok(PublicState::WaitingChildren),
770 "active" => Ok(PublicState::Active),
771 "suspended" => Ok(PublicState::Suspended),
772 "completed" => Ok(PublicState::Completed),
773 "failed" => Ok(PublicState::Failed),
774 "cancelled" => Ok(PublicState::Cancelled),
775 "expired" => Ok(PublicState::Expired),
776 "skipped" => Ok(PublicState::Skipped),
777 _ => Err(ScriptError::Parse {
778 fcall: "parse_public_state".into(),
779 execution_id: None,
780 message: format!("unknown public_state: {s}"),
781 }),
782 }
783}
784
785fn parse_attempt_type(s: &str) -> Result<AttemptType, ScriptError> {
786 match s {
787 "initial" => Ok(AttemptType::Initial),
788 "retry" => Ok(AttemptType::Retry),
789 "reclaim" => Ok(AttemptType::Reclaim),
790 "replay" => Ok(AttemptType::Replay),
791 "fallback" => Ok(AttemptType::Fallback),
792 _ => Err(ScriptError::Parse {
793 fcall: "parse_attempt_type".into(),
794 execution_id: None,
795 message: format!("unknown attempt_type: {s}"),
796 }),
797 }
798}
799
800#[cfg(test)]
802mod partial_tests {
803 use super::*;
804 use ff_core::partition::PartitionConfig;
805
806 fn test_eid() -> ExecutionId {
807 ExecutionId::for_flow(&FlowId::new(), &PartitionConfig::default())
808 }
809
810 #[test]
811 fn claim_partial_complete_attaches_execution_id() {
812 let partial = ClaimExecutionResultPartial::Claimed(ClaimedExecutionPartial {
813 lease_id: LeaseId::new(),
814 lease_epoch: LeaseEpoch::new(1),
815 attempt_index: AttemptIndex::new(0),
816 attempt_id: AttemptId::new(),
817 attempt_type: AttemptType::Initial,
818 lease_expires_at: TimestampMs::from_millis(1000),
819 });
820 let eid = test_eid();
821 let full = partial.complete(eid.clone());
822 match full {
823 ClaimExecutionResult::Claimed(c) => assert_eq!(c.execution_id, eid),
824 }
825 }
826
827 #[test]
828 fn complete_partial_complete_attaches_execution_id() {
829 let partial = CompleteExecutionResultPartial::Completed {
830 public_state: PublicState::Completed,
831 };
832 let eid = test_eid();
833 let full = partial.complete(eid.clone());
834 match full {
835 CompleteExecutionResult::Completed { execution_id, .. } => {
836 assert_eq!(execution_id, eid)
837 }
838 }
839 }
840
841 #[test]
842 fn cancel_partial_complete_attaches_execution_id() {
843 let partial = CancelExecutionResultPartial::Cancelled {
844 public_state: PublicState::Cancelled,
845 };
846 let eid = test_eid();
847 let full = partial.complete(eid.clone());
848 match full {
849 CancelExecutionResult::Cancelled { execution_id, .. } => assert_eq!(execution_id, eid),
850 }
851 }
852
853 #[test]
854 fn delay_partial_complete_attaches_execution_id() {
855 let partial = DelayExecutionResultPartial::Delayed {
856 public_state: PublicState::Delayed,
857 };
858 let eid = test_eid();
859 let full = partial.complete(eid.clone());
860 match full {
861 DelayExecutionResult::Delayed { execution_id, .. } => assert_eq!(execution_id, eid),
862 }
863 }
864
865 #[test]
866 fn move_to_waiting_children_partial_complete_attaches_execution_id() {
867 let partial = MoveToWaitingChildrenResultPartial::Moved {
868 public_state: PublicState::WaitingChildren,
869 };
870 let eid = test_eid();
871 let full = partial.complete(eid.clone());
872 match full {
873 MoveToWaitingChildrenResult::Moved { execution_id, .. } => {
874 assert_eq!(execution_id, eid)
875 }
876 }
877 }
878
879 #[test]
880 fn expire_partial_expired_variant_attaches_execution_id() {
881 let partial = ExpireExecutionResultPartial::Expired;
882 let eid = test_eid();
883 let full = partial.complete(eid.clone());
884 match full {
885 ExpireExecutionResult::Expired { execution_id } => assert_eq!(execution_id, eid),
886 _ => panic!("expected Expired variant"),
887 }
888 }
889
890 #[test]
891 fn expire_partial_already_terminal_variant_ignores_execution_id() {
892 let partial = ExpireExecutionResultPartial::AlreadyTerminal;
896 let eid = test_eid();
897 let full = partial.complete(eid);
898 assert!(matches!(full, ExpireExecutionResult::AlreadyTerminal));
899 }
900}
901
902#[cfg(test)]
904mod tag_key_tests {
905 use super::*;
906
907 #[test]
908 fn validate_accepts_caller_namespaced_key() {
909 assert!(validate_tag_key("cairn.task_id").is_ok());
910 assert!(validate_tag_key("my_app.run_123").is_ok());
911 assert!(validate_tag_key("app2.x.y").is_ok());
912 }
913
914 #[test]
915 fn validate_rejects_key_without_dot() {
916 let err = validate_tag_key("no_dot_here").unwrap_err();
917 assert!(matches!(err, ScriptError::InvalidTagKey(k) if k == "no_dot_here"));
918 }
919
920 #[test]
921 fn validate_rejects_empty_key() {
922 let err = validate_tag_key("").unwrap_err();
923 assert!(matches!(err, ScriptError::InvalidTagKey(k) if k.is_empty()));
924 }
925
926 #[test]
927 fn validate_rejects_uppercase_first_char() {
928 let err = validate_tag_key("Cairn.task_id").unwrap_err();
929 assert!(matches!(err, ScriptError::InvalidTagKey(_)));
930 }
931
932 #[test]
933 fn validate_rejects_leading_digit() {
934 let err = validate_tag_key("1cairn.task_id").unwrap_err();
935 assert!(matches!(err, ScriptError::InvalidTagKey(_)));
936 }
937
938 #[test]
939 fn validate_rejects_dash_in_prefix() {
940 let err = validate_tag_key("my-app.run").unwrap_err();
941 assert!(matches!(err, ScriptError::InvalidTagKey(_)));
942 }
943
944 #[test]
945 fn validate_accepts_single_char_prefix() {
946 assert!(validate_tag_key("a.x").is_ok());
947 }
948
949 #[test]
950 fn validate_rejects_trailing_dot() {
951 let err = validate_tag_key("cairn.").unwrap_err();
954 assert!(matches!(err, ScriptError::InvalidTagKey(_)));
955 }
956
957 #[test]
958 fn validate_rejects_double_dot_after_prefix() {
959 let err = validate_tag_key("cairn..x").unwrap_err();
962 assert!(matches!(err, ScriptError::InvalidTagKey(_)));
963 }
964
965 #[test]
966 fn validate_accepts_dots_in_suffix() {
967 assert!(validate_tag_key("app.sub.field").is_ok());
970 }
971
972 #[test]
973 fn from_code_invalid_tag_key_roundtrip() {
974 let err = ScriptError::from_code_with_detail("invalid_tag_key", "BadKey").unwrap();
975 match err {
976 ScriptError::InvalidTagKey(k) => assert_eq!(k, "BadKey"),
977 other => panic!("expected InvalidTagKey, got {other:?}"),
978 }
979 }
980}