1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
3use std::sync::Arc;
4use std::time::Duration;
5
6use ferriskey::{Client, Value};
7use ff_core::contracts::ReportUsageResult;
8use ff_script::error::ScriptError;
9use ff_core::keys::{usage_dedup_key, BudgetKeyContext, ExecKeyContext, IndexKeys};
10use ff_core::partition::{budget_partition, execution_partition, PartitionConfig};
11use ff_core::types::*;
12use tokio::sync::{Notify, OwnedSemaphorePermit};
13use tokio::task::JoinHandle;
14
15use crate::SdkError;
16
17#[derive(Clone, Debug, PartialEq, Eq)]
21pub enum TimeoutBehavior {
22 Fail,
23 Cancel,
24 Expire,
25 AutoResume,
26 Escalate,
27}
28
29impl TimeoutBehavior {
30 pub fn as_str(&self) -> &str {
31 match self {
32 Self::Fail => "fail",
33 Self::Cancel => "cancel",
34 Self::Expire => "expire",
35 Self::AutoResume => "auto_resume_with_timeout_signal",
36 Self::Escalate => "escalate",
37 }
38 }
39}
40
41impl std::fmt::Display for TimeoutBehavior {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 f.write_str(self.as_str())
44 }
45}
46
47impl std::str::FromStr for TimeoutBehavior {
48 type Err = String;
49
50 fn from_str(s: &str) -> Result<Self, Self::Err> {
51 match s {
52 "fail" => Ok(Self::Fail),
53 "cancel" => Ok(Self::Cancel),
54 "expire" => Ok(Self::Expire),
55 "auto_resume_with_timeout_signal" | "auto_resume" => Ok(Self::AutoResume),
56 "escalate" => Ok(Self::Escalate),
57 other => Err(format!("unknown timeout behavior: {other}")),
58 }
59 }
60}
61
62#[derive(Clone, Debug)]
64pub struct ConditionMatcher {
65 pub signal_name: String,
67}
68
69#[derive(Clone, Debug, PartialEq, Eq)]
71pub enum SuspendOutcome {
72 Suspended {
74 suspension_id: SuspensionId,
75 waitpoint_id: WaitpointId,
76 waitpoint_key: String,
77 waitpoint_token: WaitpointToken,
79 },
80 AlreadySatisfied {
83 suspension_id: SuspensionId,
84 waitpoint_id: WaitpointId,
85 waitpoint_key: String,
86 waitpoint_token: WaitpointToken,
87 },
88}
89
90#[derive(Clone, Debug)]
92pub struct Signal {
93 pub signal_name: String,
94 pub signal_category: String,
95 pub payload: Option<Vec<u8>>,
96 pub source_type: String,
97 pub source_identity: String,
98 pub idempotency_key: Option<String>,
99 pub waitpoint_token: WaitpointToken,
102}
103
104#[derive(Clone, Debug, PartialEq, Eq)]
106pub enum SignalOutcome {
107 Accepted { signal_id: SignalId, effect: String },
109 TriggeredResume { signal_id: SignalId },
111 Duplicate { existing_signal_id: String },
113}
114
115impl SignalOutcome {
116 pub fn from_fcall_value(raw: &Value) -> Result<Self, SdkError> {
121 parse_signal_result(raw)
122 }
123}
124
125#[derive(Clone, Debug, PartialEq, Eq)]
137pub struct ResumeSignal {
138 pub signal_id: SignalId,
139 pub signal_name: String,
140 pub signal_category: String,
141 pub source_type: String,
142 pub source_identity: String,
143 pub correlation_id: String,
144 pub accepted_at: TimestampMs,
149 pub payload: Option<Vec<u8>>,
156}
157
158#[derive(Clone, Debug)]
160pub struct AppendFrameOutcome {
161 pub stream_id: String,
163 pub frame_count: u64,
165}
166
167#[derive(Clone, Debug, PartialEq, Eq)]
169pub enum FailOutcome {
170 RetryScheduled {
172 delay_until: TimestampMs,
173 },
174 TerminalFailed,
176}
177
178pub struct ClaimedTask {
187 client: Client,
189 partition_config: PartitionConfig,
191 execution_id: ExecutionId,
193 attempt_index: AttemptIndex,
195 attempt_id: AttemptId,
196 lease_id: LeaseId,
198 lease_epoch: LeaseEpoch,
199 lease_ttl_ms: u64,
201 lane_id: LaneId,
203 worker_instance_id: WorkerInstanceId,
205 input_payload: Vec<u8>,
207 execution_kind: String,
208 tags: HashMap<String, String>,
209 renewal_handle: JoinHandle<()>,
211 renewal_stop: Arc<Notify>,
213 renewal_failures: Arc<AtomicU32>,
217 terminal_op_called: AtomicBool,
233 _concurrency_permit: Option<OwnedSemaphorePermit>,
236}
237
238impl ClaimedTask {
239 #[allow(clippy::too_many_arguments)]
292 pub(crate) fn new(
293 client: Client,
294 partition_config: PartitionConfig,
295 execution_id: ExecutionId,
296 attempt_index: AttemptIndex,
297 attempt_id: AttemptId,
298 lease_id: LeaseId,
299 lease_epoch: LeaseEpoch,
300 lease_ttl_ms: u64,
301 lane_id: LaneId,
302 worker_instance_id: WorkerInstanceId,
303 input_payload: Vec<u8>,
304 execution_kind: String,
305 tags: HashMap<String, String>,
306 ) -> Self {
307 let renewal_stop = Arc::new(Notify::new());
308 let renewal_failures = Arc::new(AtomicU32::new(0));
309
310 let renewal_handle = spawn_renewal_task(
311 client.clone(),
312 partition_config,
313 execution_id.clone(),
314 attempt_index,
315 attempt_id.clone(),
316 lease_id.clone(),
317 lease_epoch,
318 lease_ttl_ms,
319 renewal_stop.clone(),
320 renewal_failures.clone(),
321 );
322
323 Self {
324 client,
325 partition_config,
326 execution_id,
327 attempt_index,
328 attempt_id,
329 lease_id,
330 lease_epoch,
331 lease_ttl_ms,
332 lane_id,
333 worker_instance_id,
334 input_payload,
335 execution_kind,
336 tags,
337 renewal_handle,
338 renewal_stop,
339 renewal_failures,
340 terminal_op_called: AtomicBool::new(false),
341 _concurrency_permit: None,
342 }
343 }
344
345 #[allow(dead_code)]
348 pub(crate) fn set_concurrency_permit(&mut self, permit: OwnedSemaphorePermit) {
349 self._concurrency_permit = Some(permit);
350 }
351
352 pub fn execution_id(&self) -> &ExecutionId {
355 &self.execution_id
356 }
357
358 pub fn attempt_index(&self) -> AttemptIndex {
359 self.attempt_index
360 }
361
362 pub fn attempt_id(&self) -> &AttemptId {
363 &self.attempt_id
364 }
365
366 pub fn lease_id(&self) -> &LeaseId {
367 &self.lease_id
368 }
369
370 pub fn lease_epoch(&self) -> LeaseEpoch {
371 self.lease_epoch
372 }
373
374 pub fn input_payload(&self) -> &[u8] {
375 &self.input_payload
376 }
377
378 pub fn execution_kind(&self) -> &str {
379 &self.execution_kind
380 }
381
382 pub fn tags(&self) -> &HashMap<String, String> {
383 &self.tags
384 }
385
386 pub fn lane_id(&self) -> &LaneId {
387 &self.lane_id
388 }
389
390 pub fn is_lease_healthy(&self) -> bool {
397 self.renewal_failures.load(Ordering::Relaxed) < 3
398 }
399
400 pub fn consecutive_renewal_failures(&self) -> u32 {
405 self.renewal_failures.load(Ordering::Relaxed)
406 }
407
408 pub async fn delay_execution(self, delay_until: TimestampMs) -> Result<(), SdkError> {
415 let partition = execution_partition(&self.execution_id, &self.partition_config);
416 let ctx = ExecKeyContext::new(&partition, &self.execution_id);
417 let idx = IndexKeys::new(&partition);
418
419 let keys: Vec<String> = vec![
423 ctx.core(),
424 ctx.attempt_hash(self.attempt_index),
425 ctx.lease_current(),
426 ctx.lease_history(),
427 idx.lease_expiry(),
428 idx.worker_leases(&self.worker_instance_id),
429 idx.lane_active(&self.lane_id),
430 idx.lane_delayed(&self.lane_id),
431 idx.attempt_timeout(),
432 ];
433
434 let args: Vec<String> = vec![
436 self.execution_id.to_string(),
437 self.lease_id.to_string(),
438 self.lease_epoch.to_string(),
439 self.attempt_id.to_string(),
440 delay_until.to_string(),
441 ];
442
443 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
444 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
445
446 let raw: Value = self
447 .client
448 .fcall("ff_delay_execution", &key_refs, &arg_refs)
449 .await
450 .map_err(SdkError::Valkey)?;
451
452 self.stop_renewal();
453 parse_success_result(&raw, "ff_delay_execution")
454 }
455
456 pub async fn move_to_waiting_children(self) -> Result<(), SdkError> {
461 let partition = execution_partition(&self.execution_id, &self.partition_config);
462 let ctx = ExecKeyContext::new(&partition, &self.execution_id);
463 let idx = IndexKeys::new(&partition);
464
465 let keys: Vec<String> = vec![
469 ctx.core(),
470 ctx.attempt_hash(self.attempt_index),
471 ctx.lease_current(),
472 ctx.lease_history(),
473 idx.lease_expiry(),
474 idx.worker_leases(&self.worker_instance_id),
475 idx.lane_active(&self.lane_id),
476 idx.lane_blocked_dependencies(&self.lane_id),
477 idx.attempt_timeout(),
478 ];
479
480 let args: Vec<String> = vec![
482 self.execution_id.to_string(),
483 self.lease_id.to_string(),
484 self.lease_epoch.to_string(),
485 self.attempt_id.to_string(),
486 ];
487
488 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
489 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
490
491 let raw: Value = self
492 .client
493 .fcall("ff_move_to_waiting_children", &key_refs, &arg_refs)
494 .await
495 .map_err(SdkError::Valkey)?;
496
497 self.stop_renewal();
498 parse_success_result(&raw, "ff_move_to_waiting_children")
499 }
500
501 pub async fn complete(self, result_payload: Option<Vec<u8>>) -> Result<(), SdkError> {
516 let partition = execution_partition(&self.execution_id, &self.partition_config);
517 let ctx = ExecKeyContext::new(&partition, &self.execution_id);
518 let idx = IndexKeys::new(&partition);
519
520 let keys: Vec<String> = vec![
525 ctx.core(), ctx.attempt_hash(self.attempt_index), idx.lease_expiry(), idx.worker_leases(&self.worker_instance_id), idx.lane_terminal(&self.lane_id), ctx.lease_current(), ctx.lease_history(), idx.lane_active(&self.lane_id), ctx.stream_meta(self.attempt_index), ctx.result(), idx.attempt_timeout(), idx.execution_deadline(), ];
538
539 let result_bytes = result_payload.unwrap_or_default();
540 let result_str = String::from_utf8_lossy(&result_bytes);
541
542 let args: Vec<String> = vec![
545 self.execution_id.to_string(),
546 self.lease_id.to_string(),
547 self.lease_epoch.to_string(),
548 self.attempt_id.to_string(),
549 result_str.into_owned(),
550 ];
551
552 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
553 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
554
555 let raw: Value = self
556 .client
557 .fcall("ff_complete_execution", &key_refs, &arg_refs)
558 .await
559 .map_err(SdkError::Valkey)?;
560
561 self.stop_renewal();
562 parse_success_result(&raw, "ff_complete_execution")
563 }
564
565 pub async fn fail(
579 self,
580 reason: &str,
581 error_category: &str,
582 ) -> Result<FailOutcome, SdkError> {
583 let partition = execution_partition(&self.execution_id, &self.partition_config);
584 let ctx = ExecKeyContext::new(&partition, &self.execution_id);
585 let idx = IndexKeys::new(&partition);
586
587 let keys: Vec<String> = vec![
591 ctx.core(),
592 ctx.attempt_hash(self.attempt_index),
593 idx.lease_expiry(),
594 idx.worker_leases(&self.worker_instance_id),
595 idx.lane_terminal(&self.lane_id),
596 idx.lane_delayed(&self.lane_id),
597 ctx.lease_current(),
598 ctx.lease_history(),
599 idx.lane_active(&self.lane_id),
600 ctx.stream_meta(self.attempt_index),
601 idx.attempt_timeout(),
602 idx.execution_deadline(),
603 ];
604
605 let retry_policy_json = self.read_retry_policy_json(&ctx).await?;
608
609 let args: Vec<String> = vec![
610 self.execution_id.to_string(),
611 self.lease_id.to_string(),
612 self.lease_epoch.to_string(),
613 self.attempt_id.to_string(),
614 reason.to_owned(),
615 error_category.to_owned(),
616 retry_policy_json,
617 ];
618
619 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
620 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
621
622 let raw: Value = self
623 .client
624 .fcall("ff_fail_execution", &key_refs, &arg_refs)
625 .await
626 .map_err(SdkError::Valkey)?;
627
628 self.stop_renewal();
629 parse_fail_result(&raw)
630 }
631
632 pub async fn cancel(self, reason: &str) -> Result<(), SdkError> {
645 self.cancel_inner(reason).await
646 }
647
648 async fn cancel_inner(self, reason: &str) -> Result<(), SdkError> {
650 let partition = execution_partition(&self.execution_id, &self.partition_config);
651 let ctx = ExecKeyContext::new(&partition, &self.execution_id);
652 let idx = IndexKeys::new(&partition);
653
654 let wp_id_str: Option<String> = self
659 .client
660 .hget(&ctx.core(), "current_waitpoint_id")
661 .await
662 .map_err(|e| SdkError::ValkeyContext { source: e, context: "read current_waitpoint_id".into() })?;
663 let wp_id = match wp_id_str.as_deref().filter(|s| !s.is_empty()) {
664 Some(s) => match WaitpointId::parse(s) {
665 Ok(id) => id,
666 Err(e) => {
667 tracing::warn!(
668 execution_id = %self.execution_id,
669 raw = %s,
670 error = %e,
671 "corrupt waitpoint_id in exec_core, using placeholder"
672 );
673 WaitpointId::new()
674 }
675 },
676 None => WaitpointId::default(),
677 };
678 let keys: Vec<String> = vec![
679 ctx.core(), ctx.attempt_hash(self.attempt_index), ctx.stream_meta(self.attempt_index), ctx.lease_current(), ctx.lease_history(), idx.lease_expiry(), idx.worker_leases(&self.worker_instance_id), ctx.suspension_current(), ctx.waitpoint(&wp_id), ctx.waitpoint_condition(&wp_id), idx.suspension_timeout(), idx.lane_terminal(&self.lane_id), idx.attempt_timeout(), idx.execution_deadline(), idx.lane_eligible(&self.lane_id), idx.lane_delayed(&self.lane_id), idx.lane_blocked_dependencies(&self.lane_id), idx.lane_blocked_budget(&self.lane_id), idx.lane_blocked_quota(&self.lane_id), idx.lane_blocked_route(&self.lane_id), idx.lane_blocked_operator(&self.lane_id), ];
701
702 let args: Vec<String> = vec![
705 self.execution_id.to_string(), reason.to_owned(), "worker".to_owned(), self.lease_id.to_string(), self.lease_epoch.to_string(), ];
711
712 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
713 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
714
715 let raw: Value = self
716 .client
717 .fcall("ff_cancel_execution", &key_refs, &arg_refs)
718 .await
719 .map_err(SdkError::Valkey)?;
720
721 self.stop_renewal();
722 parse_success_result(&raw, "ff_cancel_execution")
723 }
724
725 pub async fn renew_lease(&self) -> Result<(), SdkError> {
729 renew_lease_inner(
730 &self.client,
731 &self.partition_config,
732 &self.execution_id,
733 self.attempt_index,
734 &self.attempt_id,
735 &self.lease_id,
736 self.lease_epoch,
737 self.lease_ttl_ms,
738 )
739 .await
740 }
741
742 pub async fn update_progress(&self, pct: u8, message: &str) -> Result<(), SdkError> {
744 let partition = execution_partition(&self.execution_id, &self.partition_config);
745 let ctx = ExecKeyContext::new(&partition, &self.execution_id);
746
747 let keys: Vec<String> = vec![ctx.core()];
749
750 let args: Vec<String> = vec![
753 self.execution_id.to_string(),
754 self.lease_id.to_string(),
755 self.lease_epoch.to_string(),
756 pct.to_string(),
757 message.to_owned(),
758 ];
759
760 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
761 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
762
763 let raw: Value = self
764 .client
765 .fcall("ff_update_progress", &key_refs, &arg_refs)
766 .await
767 .map_err(SdkError::Valkey)?;
768
769 parse_success_result(&raw, "ff_update_progress")
770 }
771
772 pub async fn report_usage(
778 &self,
779 budget_id: &BudgetId,
780 dimensions: &[(&str, u64)],
781 dedup_key: Option<&str>,
782 ) -> Result<ReportUsageResult, SdkError> {
783 let partition = budget_partition(budget_id, &self.partition_config);
784 let bctx = BudgetKeyContext::new(&partition, budget_id);
785
786 let keys: Vec<String> = vec![bctx.usage(), bctx.limits(), bctx.definition()];
788
789 let now = TimestampMs::now();
791 let dim_count = dimensions.len();
792 let mut argv: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
793 argv.push(dim_count.to_string());
794 for (dim, _) in dimensions {
795 argv.push((*dim).to_string());
796 }
797 for (_, delta) in dimensions {
798 argv.push(delta.to_string());
799 }
800 argv.push(now.to_string());
801 let dedup_key_val = dedup_key
802 .filter(|k| !k.is_empty())
803 .map(|k| usage_dedup_key(bctx.hash_tag(), k))
804 .unwrap_or_default();
805 argv.push(dedup_key_val);
806
807 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
808 let argv_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
809
810 let raw: Value = self
811 .client
812 .fcall("ff_report_usage_and_check", &key_refs, &argv_refs)
813 .await
814 .map_err(SdkError::Valkey)?;
815
816 parse_report_usage_result(&raw)
817 }
818
819 pub async fn create_pending_waitpoint(
830 &self,
831 waitpoint_key: &str,
832 expires_in_ms: u64,
833 ) -> Result<(WaitpointId, WaitpointToken), SdkError> {
834 let partition = execution_partition(&self.execution_id, &self.partition_config);
835 let ctx = ExecKeyContext::new(&partition, &self.execution_id);
836 let idx = IndexKeys::new(&partition);
837
838 let waitpoint_id = WaitpointId::new();
839 let expires_at = TimestampMs::from_millis(TimestampMs::now().0 + expires_in_ms as i64);
840
841 let keys: Vec<String> = vec![
843 ctx.core(),
844 ctx.waitpoint(&waitpoint_id),
845 idx.pending_waitpoint_expiry(),
846 idx.waitpoint_hmac_secrets(),
847 ];
848
849 let args: Vec<String> = vec![
851 self.execution_id.to_string(),
852 self.attempt_index.to_string(),
853 waitpoint_id.to_string(),
854 waitpoint_key.to_owned(),
855 expires_at.to_string(),
856 ];
857
858 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
859 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
860
861 let raw: Value = self
862 .client
863 .fcall("ff_create_pending_waitpoint", &key_refs, &arg_refs)
864 .await
865 .map_err(SdkError::Valkey)?;
866
867 let token = extract_pending_waitpoint_token(&raw)?;
870 Ok((waitpoint_id, token))
871 }
872
873 pub async fn append_frame(
880 &self,
881 frame_type: &str,
882 payload: &[u8],
883 metadata: Option<&str>,
884 ) -> Result<AppendFrameOutcome, SdkError> {
885 let partition = execution_partition(&self.execution_id, &self.partition_config);
886 let ctx = ExecKeyContext::new(&partition, &self.execution_id);
887
888 let now = TimestampMs::now();
889
890 let keys: Vec<String> = vec![
892 ctx.core(),
893 ctx.stream(self.attempt_index),
894 ctx.stream_meta(self.attempt_index),
895 ];
896
897 let payload_str = String::from_utf8_lossy(payload);
898
899 let args: Vec<String> = vec![
903 self.execution_id.to_string(), self.attempt_index.to_string(), self.lease_id.to_string(), self.lease_epoch.to_string(), frame_type.to_owned(), now.to_string(), payload_str.into_owned(), "utf8".to_owned(), metadata.unwrap_or("").to_owned(), "worker".to_owned(), "10000".to_owned(), self.attempt_id.to_string(), "65536".to_owned(), ];
917
918 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
919 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
920
921 let raw: Value = self
922 .client
923 .fcall("ff_append_frame", &key_refs, &arg_refs)
924 .await
925 .map_err(SdkError::Valkey)?;
926
927 parse_append_frame_result(&raw)
928 }
929
930 pub async fn suspend(
946 self,
947 reason_code: &str,
948 condition_matchers: &[ConditionMatcher],
949 timeout_ms: Option<u64>,
950 timeout_behavior: TimeoutBehavior,
951 ) -> Result<SuspendOutcome, SdkError> {
952 let partition = execution_partition(&self.execution_id, &self.partition_config);
953 let ctx = ExecKeyContext::new(&partition, &self.execution_id);
954 let idx = IndexKeys::new(&partition);
955
956 let suspension_id = SuspensionId::new();
957 let waitpoint_id = WaitpointId::new();
958 let waitpoint_key = format!("wpk:{}", waitpoint_id);
960
961 let timeout_at = timeout_ms.map(|ms| TimestampMs::from_millis(TimestampMs::now().0 + ms as i64));
962
963 let required_signal_names: Vec<&str> = condition_matchers
965 .iter()
966 .map(|m| m.signal_name.as_str())
967 .collect();
968 let match_mode = if required_signal_names.len() <= 1 { "any" } else { "all" };
969 let resume_condition_json = serde_json::json!({
970 "condition_type": "signal_set",
971 "required_signal_names": required_signal_names,
972 "signal_match_mode": match_mode,
973 "minimum_signal_count": 1,
974 "timeout_behavior": timeout_behavior.as_str(),
975 "allow_operator_override": true,
976 }).to_string();
977
978 let resume_policy_json = serde_json::json!({
979 "resume_target": "runnable",
980 "close_waitpoint_on_resume": true,
981 "consume_matched_signals": true,
982 "retain_signal_buffer_until_closed": true,
983 }).to_string();
984
985 let keys: Vec<String> = vec![
991 ctx.core(), ctx.attempt_hash(self.attempt_index), ctx.lease_current(), ctx.lease_history(), idx.lease_expiry(), idx.worker_leases(&self.worker_instance_id), ctx.suspension_current(), ctx.waitpoint(&waitpoint_id), ctx.waitpoint_signals(&waitpoint_id), idx.suspension_timeout(), idx.pending_waitpoint_expiry(), idx.lane_active(&self.lane_id), idx.lane_suspended(&self.lane_id), ctx.waitpoints(), ctx.waitpoint_condition(&waitpoint_id), idx.attempt_timeout(), idx.waitpoint_hmac_secrets(), ];
1009
1010 let args: Vec<String> = vec![
1016 self.execution_id.to_string(), self.attempt_index.to_string(), self.attempt_id.to_string(), self.lease_id.to_string(), self.lease_epoch.to_string(), suspension_id.to_string(), waitpoint_id.to_string(), waitpoint_key.clone(), reason_code.to_owned(), "worker".to_owned(), timeout_at.map_or(String::new(), |t| t.to_string()), resume_condition_json, resume_policy_json, String::new(), String::new(), timeout_behavior.as_str().to_owned(), "1000".to_owned(), ];
1034
1035 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1036 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1037
1038 let raw: Value = self
1039 .client
1040 .fcall("ff_suspend_execution", &key_refs, &arg_refs)
1041 .await
1042 .map_err(SdkError::Valkey)?;
1043
1044 self.stop_renewal();
1045 parse_suspend_result(&raw, suspension_id, waitpoint_id, waitpoint_key)
1046 }
1047
1048 pub async fn resume_signals(&self) -> Result<Vec<ResumeSignal>, SdkError> {
1068 let partition = execution_partition(&self.execution_id, &self.partition_config);
1069 let ctx = ExecKeyContext::new(&partition, &self.execution_id);
1070
1071 let susp: HashMap<String, String> = self
1072 .client
1073 .hgetall(&ctx.suspension_current())
1074 .await
1075 .map_err(|e| SdkError::ValkeyContext {
1076 source: e,
1077 context: "HGETALL suspension_current".into(),
1078 })?;
1079
1080 let Some(waitpoint_id) =
1081 resume_waitpoint_id_from_suspension(&susp, self.attempt_index)?
1082 else {
1083 return Ok(Vec::new());
1084 };
1085
1086 let wp_cond_key = ctx.waitpoint_condition(&waitpoint_id);
1090 let total_str: Option<String> = self
1091 .client
1092 .hget(&wp_cond_key, "total_matchers")
1093 .await
1094 .map_err(|e| SdkError::ValkeyContext {
1095 source: e,
1096 context: "HGET total_matchers".into(),
1097 })?;
1098 let total: usize = total_str
1099 .as_deref()
1100 .and_then(|s| s.parse().ok())
1101 .unwrap_or(0);
1102
1103 let mut signal_ids: Vec<SignalId> = Vec::new();
1104 for i in 0..total {
1105 let fields: Vec<Option<String>> = self
1106 .client
1107 .cmd("HMGET")
1108 .arg(&wp_cond_key)
1109 .arg(format!("matcher:{i}:satisfied"))
1110 .arg(format!("matcher:{i}:signal_id"))
1111 .execute()
1112 .await
1113 .map_err(|e| SdkError::ValkeyContext {
1114 source: e,
1115 context: "HMGET matcher slot".into(),
1116 })?;
1117 let satisfied = fields.first().and_then(|o| o.as_deref());
1118 if satisfied != Some("1") {
1119 continue;
1120 }
1121 let Some(raw) = fields.get(1).and_then(|o| o.as_deref()).filter(|s| !s.is_empty())
1122 else {
1123 continue;
1124 };
1125 match SignalId::parse(raw) {
1126 Ok(sid) => signal_ids.push(sid),
1127 Err(e) => {
1128 tracing::warn!(
1129 execution_id = %self.execution_id,
1130 waitpoint_id = %waitpoint_id,
1131 raw = %raw,
1132 error = %e,
1133 "resume_signals: matcher signal_id failed to parse, skipping"
1134 );
1135 }
1136 }
1137 }
1138
1139 let mut out: Vec<ResumeSignal> = Vec::with_capacity(signal_ids.len());
1140 for signal_id in signal_ids {
1141 let sig: HashMap<String, String> = self
1142 .client
1143 .hgetall(&ctx.signal(&signal_id))
1144 .await
1145 .map_err(|e| SdkError::ValkeyContext {
1146 source: e,
1147 context: "HGETALL signal_hash".into(),
1148 })?;
1149 if sig.is_empty() {
1150 continue;
1153 }
1154
1155 let payload_raw: Option<Value> = self
1161 .client
1162 .cmd("GET")
1163 .arg(ctx.signal_payload(&signal_id))
1164 .execute()
1165 .await
1166 .map_err(|e| SdkError::ValkeyContext {
1167 source: e,
1168 context: "GET signal_payload".into(),
1169 })?;
1170 let payload: Option<Vec<u8>> = match payload_raw {
1171 Some(Value::BulkString(b)) => Some(b.to_vec()),
1172 Some(Value::SimpleString(s)) => Some(s.into_bytes()),
1173 _ => None,
1174 };
1175
1176 let accepted_at = sig
1177 .get("accepted_at")
1178 .and_then(|s| s.parse::<i64>().ok())
1179 .map(TimestampMs::from_millis)
1180 .unwrap_or_else(|| TimestampMs::from_millis(0));
1181
1182 out.push(ResumeSignal {
1183 signal_id,
1184 signal_name: sig.get("signal_name").cloned().unwrap_or_default(),
1185 signal_category: sig.get("signal_category").cloned().unwrap_or_default(),
1186 source_type: sig.get("source_type").cloned().unwrap_or_default(),
1187 source_identity: sig.get("source_identity").cloned().unwrap_or_default(),
1188 correlation_id: sig.get("correlation_id").cloned().unwrap_or_default(),
1189 accepted_at,
1190 payload,
1191 });
1192 }
1193
1194 Ok(out)
1195 }
1196
1197 fn stop_renewal(&self) {
1203 self.terminal_op_called.store(true, Ordering::Release);
1204 self.renewal_stop.notify_one();
1205 }
1206
1207 async fn read_retry_policy_json(&self, ctx: &ExecKeyContext) -> Result<String, SdkError> {
1209 let policy_str: Option<String> = self
1210 .client
1211 .get(&ctx.policy())
1212 .await
1213 .map_err(|e| SdkError::ValkeyContext { source: e, context: "read retry policy".into() })?;
1214
1215 match policy_str {
1216 Some(json) => {
1217 match serde_json::from_str::<serde_json::Value>(&json) {
1218 Ok(policy) => {
1219 if let Some(retry) = policy.get("retry_policy") {
1220 return Ok(serde_json::to_string(retry).unwrap_or_default());
1221 }
1222 Ok(String::new())
1223 }
1224 Err(e) => {
1225 tracing::warn!(
1226 execution_id = %self.execution_id,
1227 error = %e,
1228 "malformed retry policy JSON, treating as no policy"
1229 );
1230 Ok(String::new())
1231 }
1232 }
1233 }
1234 None => Ok(String::new()),
1235 }
1236 }
1237}
1238
1239impl Drop for ClaimedTask {
1240 fn drop(&mut self) {
1241 if !self.terminal_op_called.load(Ordering::Acquire) {
1257 tracing::warn!(
1258 execution_id = %self.execution_id,
1259 "ClaimedTask dropped without terminal operation — lease will expire"
1260 );
1261 }
1262 self.renewal_handle.abort();
1263 }
1264}
1265
1266#[allow(clippy::too_many_arguments)]
1276#[tracing::instrument(
1277 name = "renew_lease",
1278 skip_all,
1279 fields(execution_id = %execution_id)
1280)]
1281async fn renew_lease_inner(
1282 client: &Client,
1283 partition_config: &PartitionConfig,
1284 execution_id: &ExecutionId,
1285 attempt_index: AttemptIndex,
1286 attempt_id: &AttemptId,
1287 lease_id: &LeaseId,
1288 lease_epoch: LeaseEpoch,
1289 lease_ttl_ms: u64,
1290) -> Result<(), SdkError> {
1291 let partition = execution_partition(execution_id, partition_config);
1292 let ctx = ExecKeyContext::new(&partition, execution_id);
1293 let idx = IndexKeys::new(&partition);
1294
1295 let keys: Vec<String> = vec![
1297 ctx.core(),
1298 ctx.lease_current(),
1299 ctx.lease_history(),
1300 idx.lease_expiry(),
1301 ];
1302
1303 let lease_history_grace_ms = 5000_u64; let args: Vec<String> = vec![
1307 execution_id.to_string(),
1308 attempt_index.to_string(),
1309 attempt_id.to_string(),
1310 lease_id.to_string(),
1311 lease_epoch.to_string(),
1312 lease_ttl_ms.to_string(),
1313 lease_history_grace_ms.to_string(),
1314 ];
1315
1316 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1317 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1318
1319 let raw: Value = client
1320 .fcall("ff_renew_lease", &key_refs, &arg_refs)
1321 .await
1322 .map_err(SdkError::Valkey)?;
1323
1324 parse_success_result(&raw, "ff_renew_lease")
1325}
1326
1327#[allow(clippy::too_many_arguments, dead_code)]
1334fn spawn_renewal_task(
1335 client: Client,
1336 partition_config: PartitionConfig,
1337 execution_id: ExecutionId,
1338 attempt_index: AttemptIndex,
1339 attempt_id: AttemptId,
1340 lease_id: LeaseId,
1341 lease_epoch: LeaseEpoch,
1342 lease_ttl_ms: u64,
1343 stop_signal: Arc<Notify>,
1344 failure_counter: Arc<AtomicU32>,
1345) -> JoinHandle<()> {
1346 let interval = Duration::from_millis(lease_ttl_ms / 3);
1347
1348 tokio::spawn(async move {
1349 let mut tick = tokio::time::interval(interval);
1350 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1351 tick.tick().await;
1353
1354 loop {
1355 tokio::select! {
1356 _ = stop_signal.notified() => {
1357 tracing::debug!(
1358 execution_id = %execution_id,
1359 "lease renewal stopped by signal"
1360 );
1361 return;
1362 }
1363 _ = tick.tick() => {
1364 match renew_lease_inner(
1365 &client,
1366 &partition_config,
1367 &execution_id,
1368 attempt_index,
1369 &attempt_id,
1370 &lease_id,
1371 lease_epoch,
1372 lease_ttl_ms,
1373 )
1374 .await
1375 {
1376 Ok(()) => {
1377 failure_counter.store(0, Ordering::Relaxed);
1378 tracing::trace!(
1379 execution_id = %execution_id,
1380 "lease renewed"
1381 );
1382 }
1383 Err(SdkError::Script(ref e)) if is_terminal_renewal_error(e) => {
1384 failure_counter.fetch_add(1, Ordering::Relaxed);
1385 tracing::warn!(
1386 execution_id = %execution_id,
1387 error = %e,
1388 "lease renewal failed with terminal error, stopping renewal"
1389 );
1390 return;
1391 }
1392 Err(e) => {
1393 let count = failure_counter.fetch_add(1, Ordering::Relaxed) + 1;
1394 tracing::warn!(
1395 execution_id = %execution_id,
1396 error = %e,
1397 consecutive_failures = count,
1398 "lease renewal failed (will retry next interval)"
1399 );
1400 }
1401 }
1402 }
1403 }
1404 }
1405 })
1406}
1407
1408#[allow(dead_code)]
1410fn is_terminal_renewal_error(err: &ScriptError) -> bool {
1411 matches!(
1412 err,
1413 ScriptError::StaleLease
1414 | ScriptError::LeaseExpired
1415 | ScriptError::LeaseRevoked
1416 | ScriptError::ExecutionNotActive
1417 | ScriptError::ExecutionNotFound
1418 )
1419}
1420
1421pub fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, SdkError> {
1438 let arr = match raw {
1439 Value::Array(arr) => arr,
1440 _ => {
1441 return Err(SdkError::Script(ScriptError::Parse(
1442 "ff_report_usage_and_check: expected Array".into(),
1443 )));
1444 }
1445 };
1446 let status_code = match arr.first() {
1447 Some(Ok(Value::Int(n))) => *n,
1448 _ => {
1449 return Err(SdkError::Script(ScriptError::Parse(
1450 "ff_report_usage_and_check: expected Int status code".into(),
1451 )));
1452 }
1453 };
1454 if status_code != 1 {
1455 let error_code = usage_field_str(arr, 1);
1456 let detail = usage_field_str(arr, 2);
1457 return Err(SdkError::Script(
1458 ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1459 ScriptError::Parse(format!("ff_report_usage_and_check: {error_code}"))
1460 }),
1461 ));
1462 }
1463 let sub_status = usage_field_str(arr, 1);
1464 match sub_status.as_str() {
1465 "OK" => Ok(ReportUsageResult::Ok),
1466 "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
1467 "SOFT_BREACH" => {
1468 let dim = usage_field_str(arr, 2);
1469 let current = parse_usage_u64(arr, 3, "SOFT_BREACH", "current_usage")?;
1470 let limit = parse_usage_u64(arr, 4, "SOFT_BREACH", "soft_limit")?;
1471 Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
1472 }
1473 "HARD_BREACH" => {
1474 let dim = usage_field_str(arr, 2);
1475 let current = parse_usage_u64(arr, 3, "HARD_BREACH", "current_usage")?;
1476 let limit = parse_usage_u64(arr, 4, "HARD_BREACH", "hard_limit")?;
1477 Ok(ReportUsageResult::HardBreach {
1478 dimension: dim,
1479 current_usage: current,
1480 hard_limit: limit,
1481 })
1482 }
1483 _ => Err(SdkError::Script(ScriptError::Parse(format!(
1484 "ff_report_usage_and_check: unknown sub-status: {sub_status}"
1485 )))),
1486 }
1487}
1488
1489fn usage_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
1490 match arr.get(index) {
1491 Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
1492 Some(Ok(Value::SimpleString(s))) => s.clone(),
1493 Some(Ok(Value::Int(n))) => n.to_string(),
1494 _ => String::new(),
1495 }
1496}
1497
1498fn parse_usage_u64(
1512 arr: &[Result<Value, ferriskey::Error>],
1513 index: usize,
1514 sub_status: &str,
1515 field_name: &str,
1516) -> Result<u64, SdkError> {
1517 match arr.get(index) {
1518 Some(Ok(Value::Int(n))) => {
1519 u64::try_from(*n).map_err(|_| {
1520 SdkError::Script(ScriptError::Parse(format!(
1521 "ff_report_usage_and_check {sub_status}: {field_name} \
1522 (index {index}) negative int {n} cannot be u64"
1523 )))
1524 })
1525 }
1526 Some(Ok(Value::BulkString(b))) => {
1527 let s = String::from_utf8_lossy(b);
1528 s.parse::<u64>().map_err(|_| {
1529 SdkError::Script(ScriptError::Parse(format!(
1530 "ff_report_usage_and_check {sub_status}: {field_name} \
1531 (index {index}) not a u64 string: {s:?}"
1532 )))
1533 })
1534 }
1535 Some(Ok(Value::SimpleString(s))) => s.parse::<u64>().map_err(|_| {
1536 SdkError::Script(ScriptError::Parse(format!(
1537 "ff_report_usage_and_check {sub_status}: {field_name} \
1538 (index {index}) not a u64 string: {s:?}"
1539 )))
1540 }),
1541 Some(_) => Err(SdkError::Script(ScriptError::Parse(format!(
1542 "ff_report_usage_and_check {sub_status}: {field_name} \
1543 (index {index}) wrong wire type (expected Int or String)"
1544 )))),
1545 None => Err(SdkError::Script(ScriptError::Parse(format!(
1546 "ff_report_usage_and_check {sub_status}: {field_name} \
1547 (index {index}) missing from response"
1548 )))),
1549 }
1550}
1551
1552fn extract_pending_waitpoint_token(raw: &Value) -> Result<WaitpointToken, SdkError> {
1557 parse_success_result(raw, "ff_create_pending_waitpoint")?;
1558 let arr = match raw {
1559 Value::Array(arr) => arr,
1560 _ => unreachable!("parse_success_result would have rejected non-array"),
1561 };
1562 let token_str = arr
1564 .get(4)
1565 .and_then(|v| match v {
1566 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1567 Ok(Value::SimpleString(s)) => Some(s.clone()),
1568 _ => None,
1569 })
1570 .ok_or_else(|| {
1571 SdkError::Script(ScriptError::Parse(
1572 "ff_create_pending_waitpoint: missing waitpoint_token in response".into(),
1573 ))
1574 })?;
1575 Ok(WaitpointToken::new(token_str))
1576}
1577
1578fn resume_waitpoint_id_from_suspension(
1585 susp: &HashMap<String, String>,
1586 claimed_attempt: AttemptIndex,
1587) -> Result<Option<WaitpointId>, SdkError> {
1588 if susp.is_empty() {
1589 return Ok(None);
1590 }
1591 let susp_att: u32 = susp
1592 .get("attempt_index")
1593 .and_then(|s| s.parse().ok())
1594 .unwrap_or(u32::MAX);
1595 if susp_att != claimed_attempt.0 {
1596 return Ok(None);
1597 }
1598 let close_reason = susp.get("close_reason").map(String::as_str).unwrap_or("");
1599 if close_reason != "resumed" {
1600 return Ok(None);
1601 }
1602 let wp_id_str = susp
1603 .get("waitpoint_id")
1604 .map(String::as_str)
1605 .unwrap_or_default();
1606 if wp_id_str.is_empty() {
1607 return Ok(None);
1608 }
1609 let waitpoint_id = WaitpointId::parse(wp_id_str).map_err(|e| {
1610 SdkError::Script(ScriptError::Parse(format!(
1611 "resume_signals: suspension_current.waitpoint_id is not a valid UUID: {e}"
1612 )))
1613 })?;
1614 Ok(Some(waitpoint_id))
1615}
1616
1617pub(crate) fn parse_success_result(raw: &Value, function_name: &str) -> Result<(), SdkError> {
1618 let arr = match raw {
1619 Value::Array(arr) => arr,
1620 _ => {
1621 return Err(SdkError::Script(ScriptError::Parse(format!(
1622 "{function_name}: expected Array, got non-array"
1623 ))));
1624 }
1625 };
1626
1627 if arr.is_empty() {
1628 return Err(SdkError::Script(ScriptError::Parse(format!(
1629 "{function_name}: empty result array"
1630 ))));
1631 }
1632
1633 let status_code = match arr.first() {
1634 Some(Ok(Value::Int(n))) => *n,
1635 _ => {
1636 return Err(SdkError::Script(ScriptError::Parse(format!(
1637 "{function_name}: expected Int at index 0"
1638 ))));
1639 }
1640 };
1641
1642 if status_code == 1 {
1643 Ok(())
1644 } else {
1645 let field_str = |idx: usize| -> String {
1650 arr.get(idx)
1651 .and_then(|v| match v {
1652 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1653 Ok(Value::SimpleString(s)) => Some(s.clone()),
1654 _ => None,
1655 })
1656 .unwrap_or_default()
1657 };
1658 let error_code = {
1659 let s = field_str(1);
1660 if s.is_empty() { "unknown".to_owned() } else { s }
1661 };
1662 let detail = field_str(2);
1663
1664 let script_err = ScriptError::from_code_with_detail(&error_code, &detail)
1665 .unwrap_or_else(|| {
1666 ScriptError::Parse(format!("{function_name}: unknown error: {error_code}"))
1667 });
1668
1669 Err(SdkError::Script(script_err))
1670 }
1671}
1672
1673fn parse_suspend_result(
1677 raw: &Value,
1678 suspension_id: SuspensionId,
1679 waitpoint_id: WaitpointId,
1680 waitpoint_key: String,
1681) -> Result<SuspendOutcome, SdkError> {
1682 let arr = match raw {
1683 Value::Array(arr) => arr,
1684 _ => {
1685 return Err(SdkError::Script(ScriptError::Parse(
1686 "ff_suspend_execution: expected Array".into(),
1687 )));
1688 }
1689 };
1690
1691 let status_code = match arr.first() {
1692 Some(Ok(Value::Int(n))) => *n,
1693 _ => {
1694 return Err(SdkError::Script(ScriptError::Parse(
1695 "ff_suspend_execution: bad status code".into(),
1696 )));
1697 }
1698 };
1699
1700 if status_code != 1 {
1701 let err_field_str = |idx: usize| -> String {
1702 arr.get(idx)
1703 .and_then(|v| match v {
1704 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1705 Ok(Value::SimpleString(s)) => Some(s.clone()),
1706 _ => None,
1707 })
1708 .unwrap_or_default()
1709 };
1710 let error_code = {
1711 let s = err_field_str(1);
1712 if s.is_empty() { "unknown".to_owned() } else { s }
1713 };
1714 let detail = err_field_str(2);
1715 return Err(SdkError::Script(
1716 ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1717 ScriptError::Parse(format!("ff_suspend_execution: {error_code}"))
1718 }),
1719 ));
1720 }
1721
1722 let sub_status = arr
1724 .get(1)
1725 .and_then(|v| match v {
1726 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1727 Ok(Value::SimpleString(s)) => Some(s.clone()),
1728 _ => None,
1729 })
1730 .unwrap_or_default();
1731
1732 let waitpoint_token = arr
1737 .get(5)
1738 .and_then(|v| match v {
1739 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1740 Ok(Value::SimpleString(s)) => Some(s.clone()),
1741 _ => None,
1742 })
1743 .map(WaitpointToken::new)
1744 .ok_or_else(|| {
1745 SdkError::Script(ScriptError::Parse(
1746 "ff_suspend_execution: missing waitpoint_token in response".into(),
1747 ))
1748 })?;
1749
1750 if sub_status == "ALREADY_SATISFIED" {
1751 Ok(SuspendOutcome::AlreadySatisfied {
1752 suspension_id,
1753 waitpoint_id,
1754 waitpoint_key,
1755 waitpoint_token,
1756 })
1757 } else {
1758 Ok(SuspendOutcome::Suspended {
1759 suspension_id,
1760 waitpoint_id,
1761 waitpoint_key,
1762 waitpoint_token,
1763 })
1764 }
1765}
1766
1767pub(crate) fn parse_signal_result(raw: &Value) -> Result<SignalOutcome, SdkError> {
1771 let arr = match raw {
1772 Value::Array(arr) => arr,
1773 _ => {
1774 return Err(SdkError::Script(ScriptError::Parse(
1775 "ff_deliver_signal: expected Array".into(),
1776 )));
1777 }
1778 };
1779
1780 let status_code = match arr.first() {
1781 Some(Ok(Value::Int(n))) => *n,
1782 _ => {
1783 return Err(SdkError::Script(ScriptError::Parse(
1784 "ff_deliver_signal: bad status code".into(),
1785 )));
1786 }
1787 };
1788
1789 if status_code != 1 {
1790 let err_field_str = |idx: usize| -> String {
1791 arr.get(idx)
1792 .and_then(|v| match v {
1793 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1794 Ok(Value::SimpleString(s)) => Some(s.clone()),
1795 _ => None,
1796 })
1797 .unwrap_or_default()
1798 };
1799 let error_code = {
1800 let s = err_field_str(1);
1801 if s.is_empty() { "unknown".to_owned() } else { s }
1802 };
1803 let detail = err_field_str(2);
1804 return Err(SdkError::Script(
1805 ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1806 ScriptError::Parse(format!("ff_deliver_signal: {error_code}"))
1807 }),
1808 ));
1809 }
1810
1811 let sub_status = arr
1812 .get(1)
1813 .and_then(|v| match v {
1814 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1815 Ok(Value::SimpleString(s)) => Some(s.clone()),
1816 _ => None,
1817 })
1818 .unwrap_or_default();
1819
1820 if sub_status == "DUPLICATE" {
1821 let existing_id = arr
1822 .get(2)
1823 .and_then(|v| match v {
1824 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1825 Ok(Value::SimpleString(s)) => Some(s.clone()),
1826 _ => None,
1827 })
1828 .unwrap_or_default();
1829 return Ok(SignalOutcome::Duplicate {
1830 existing_signal_id: existing_id,
1831 });
1832 }
1833
1834 let signal_id_str = arr
1836 .get(2)
1837 .and_then(|v| match v {
1838 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1839 Ok(Value::SimpleString(s)) => Some(s.clone()),
1840 _ => None,
1841 })
1842 .unwrap_or_default();
1843
1844 let effect = arr
1845 .get(3)
1846 .and_then(|v| match v {
1847 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1848 Ok(Value::SimpleString(s)) => Some(s.clone()),
1849 _ => None,
1850 })
1851 .unwrap_or_default();
1852
1853 let signal_id = SignalId::parse(&signal_id_str).map_err(|e| {
1854 SdkError::Script(ScriptError::Parse(format!(
1855 "ff_deliver_signal: invalid signal_id from Lua: {e}"
1856 )))
1857 })?;
1858
1859 if effect == "resume_condition_satisfied" {
1860 Ok(SignalOutcome::TriggeredResume { signal_id })
1861 } else {
1862 Ok(SignalOutcome::Accepted { signal_id, effect })
1863 }
1864}
1865
1866fn parse_append_frame_result(raw: &Value) -> Result<AppendFrameOutcome, SdkError> {
1868 let arr = match raw {
1869 Value::Array(arr) => arr,
1870 _ => {
1871 return Err(SdkError::Script(ScriptError::Parse(
1872 "ff_append_frame: expected Array".into(),
1873 )));
1874 }
1875 };
1876
1877 let status_code = match arr.first() {
1878 Some(Ok(Value::Int(n))) => *n,
1879 _ => {
1880 return Err(SdkError::Script(ScriptError::Parse(
1881 "ff_append_frame: bad status code".into(),
1882 )));
1883 }
1884 };
1885
1886 if status_code != 1 {
1887 let err_field_str = |idx: usize| -> String {
1888 arr.get(idx)
1889 .and_then(|v| match v {
1890 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1891 Ok(Value::SimpleString(s)) => Some(s.clone()),
1892 _ => None,
1893 })
1894 .unwrap_or_default()
1895 };
1896 let error_code = {
1897 let s = err_field_str(1);
1898 if s.is_empty() { "unknown".to_owned() } else { s }
1899 };
1900 let detail = err_field_str(2);
1901 return Err(SdkError::Script(
1902 ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1903 ScriptError::Parse(format!("ff_append_frame: {error_code}"))
1904 }),
1905 ));
1906 }
1907
1908 let stream_id = arr
1910 .get(2)
1911 .and_then(|v| match v {
1912 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1913 Ok(Value::SimpleString(s)) => Some(s.clone()),
1914 _ => None,
1915 })
1916 .unwrap_or_default();
1917
1918 let frame_count = arr
1919 .get(3)
1920 .and_then(|v| match v {
1921 Ok(Value::BulkString(b)) => String::from_utf8_lossy(b).parse::<u64>().ok(),
1922 Ok(Value::SimpleString(s)) => s.parse::<u64>().ok(),
1923 Ok(Value::Int(n)) => Some(*n as u64),
1924 _ => None,
1925 })
1926 .unwrap_or(0);
1927
1928 Ok(AppendFrameOutcome {
1929 stream_id,
1930 frame_count,
1931 })
1932}
1933
1934fn parse_fail_result(raw: &Value) -> Result<FailOutcome, SdkError> {
1938 let arr = match raw {
1939 Value::Array(arr) => arr,
1940 _ => {
1941 return Err(SdkError::Script(ScriptError::Parse(
1942 "ff_fail_execution: expected Array".into(),
1943 )));
1944 }
1945 };
1946
1947 let status_code = match arr.first() {
1948 Some(Ok(Value::Int(n))) => *n,
1949 _ => {
1950 return Err(SdkError::Script(ScriptError::Parse(
1951 "ff_fail_execution: bad status code".into(),
1952 )));
1953 }
1954 };
1955
1956 if status_code != 1 {
1957 let err_field_str = |idx: usize| -> String {
1958 arr.get(idx)
1959 .and_then(|v| match v {
1960 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1961 Ok(Value::SimpleString(s)) => Some(s.clone()),
1962 _ => None,
1963 })
1964 .unwrap_or_default()
1965 };
1966 let error_code = {
1967 let s = err_field_str(1);
1968 if s.is_empty() { "unknown".to_owned() } else { s }
1969 };
1970 let detail = err_field_str(2);
1971 return Err(SdkError::Script(
1972 ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1973 ScriptError::Parse(format!("ff_fail_execution: {error_code}"))
1974 }),
1975 ));
1976 }
1977
1978 let sub_status = arr
1980 .get(2)
1981 .and_then(|v| match v {
1982 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1983 Ok(Value::SimpleString(s)) => Some(s.clone()),
1984 _ => None,
1985 })
1986 .unwrap_or_default();
1987
1988 match sub_status.as_str() {
1989 "retry_scheduled" => {
1990 let delay_str = arr
1993 .get(3)
1994 .and_then(|v| match v {
1995 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1996 Ok(Value::Int(n)) => Some(n.to_string()),
1997 _ => None,
1998 })
1999 .unwrap_or_default();
2000 let delay_until = delay_str.parse::<i64>().unwrap_or(0);
2001
2002 Ok(FailOutcome::RetryScheduled {
2003 delay_until: TimestampMs::from_millis(delay_until),
2004 })
2005 }
2006 "terminal_failed" => Ok(FailOutcome::TerminalFailed),
2007 _ => Err(SdkError::Script(ScriptError::Parse(format!(
2008 "ff_fail_execution: unexpected sub-status: {sub_status}"
2009 )))),
2010 }
2011}
2012
2013pub const MAX_TAIL_BLOCK_MS: u64 = 30_000;
2019
2020pub use ff_core::contracts::STREAM_READ_HARD_CAP;
2024
2025pub use ff_core::contracts::StreamFrames;
2030
2031fn validate_stream_read_count(count_limit: u64) -> Result<(), SdkError> {
2032 if count_limit == 0 {
2033 return Err(SdkError::Config("count_limit must be >= 1".to_owned()));
2034 }
2035 if count_limit > STREAM_READ_HARD_CAP {
2036 return Err(SdkError::Config(format!(
2037 "count_limit exceeds STREAM_READ_HARD_CAP ({STREAM_READ_HARD_CAP})"
2038 )));
2039 }
2040 Ok(())
2041}
2042
2043pub async fn read_stream(
2067 client: &Client,
2068 partition_config: &PartitionConfig,
2069 execution_id: &ExecutionId,
2070 attempt_index: AttemptIndex,
2071 from_id: &str,
2072 to_id: &str,
2073 count_limit: u64,
2074) -> Result<StreamFrames, SdkError> {
2075 use ff_core::contracts::{ReadFramesArgs, ReadFramesResult};
2076 validate_stream_read_count(count_limit)?;
2077
2078 let partition = execution_partition(execution_id, partition_config);
2079 let ctx = ExecKeyContext::new(&partition, execution_id);
2080 let keys = ff_script::functions::stream::StreamOpKeys { ctx: &ctx };
2081
2082 let args = ReadFramesArgs {
2083 execution_id: execution_id.clone(),
2084 attempt_index,
2085 from_id: from_id.to_owned(),
2086 to_id: to_id.to_owned(),
2087 count_limit,
2088 };
2089
2090 let ReadFramesResult::Frames(f) =
2091 ff_script::functions::stream::ff_read_attempt_stream(client, &keys, &args)
2092 .await
2093 .map_err(SdkError::Script)?;
2094 Ok(f)
2095}
2096
2097pub async fn tail_stream(
2163 client: &Client,
2164 partition_config: &PartitionConfig,
2165 execution_id: &ExecutionId,
2166 attempt_index: AttemptIndex,
2167 last_id: &str,
2168 block_ms: u64,
2169 count_limit: u64,
2170) -> Result<StreamFrames, SdkError> {
2171 if block_ms > MAX_TAIL_BLOCK_MS {
2172 return Err(SdkError::Config(format!(
2173 "block_ms exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"
2174 )));
2175 }
2176 validate_stream_read_count(count_limit)?;
2177
2178 let partition = execution_partition(execution_id, partition_config);
2179 let ctx = ExecKeyContext::new(&partition, execution_id);
2180 let stream_key = ctx.stream(attempt_index);
2181 let stream_meta_key = ctx.stream_meta(attempt_index);
2182
2183 ff_script::stream_tail::xread_block(
2184 client,
2185 &stream_key,
2186 &stream_meta_key,
2187 last_id,
2188 block_ms,
2189 count_limit,
2190 )
2191 .await
2192 .map_err(SdkError::Script)
2193}
2194
2195#[cfg(test)]
2196mod parse_report_usage_result_tests {
2197 use super::*;
2198
2199 fn s(v: &str) -> Result<Value, ferriskey::Error> {
2205 Ok(Value::SimpleString(v.to_owned()))
2206 }
2207
2208 fn int(n: i64) -> Result<Value, ferriskey::Error> {
2209 Ok(Value::Int(n))
2210 }
2211
2212 fn arr(items: Vec<Result<Value, ferriskey::Error>>) -> Value {
2213 Value::Array(items)
2214 }
2215
2216 #[test]
2217 fn ok_status() {
2218 let raw = arr(vec![int(1), s("OK")]);
2219 assert_eq!(parse_report_usage_result(&raw).unwrap(), ReportUsageResult::Ok);
2220 }
2221
2222 #[test]
2223 fn already_applied_status() {
2224 let raw = arr(vec![int(1), s("ALREADY_APPLIED")]);
2225 assert_eq!(
2226 parse_report_usage_result(&raw).unwrap(),
2227 ReportUsageResult::AlreadyApplied
2228 );
2229 }
2230
2231 #[test]
2232 fn soft_breach_status() {
2233 let raw = arr(vec![int(1), s("SOFT_BREACH"), s("tokens"), s("150"), s("100")]);
2234 match parse_report_usage_result(&raw).unwrap() {
2235 ReportUsageResult::SoftBreach { dimension, current_usage, soft_limit } => {
2236 assert_eq!(dimension, "tokens");
2237 assert_eq!(current_usage, 150);
2238 assert_eq!(soft_limit, 100);
2239 }
2240 other => panic!("expected SoftBreach, got {other:?}"),
2241 }
2242 }
2243
2244 #[test]
2245 fn hard_breach_status() {
2246 let raw = arr(vec![int(1), s("HARD_BREACH"), s("requests"), s("10001"), s("10000")]);
2247 match parse_report_usage_result(&raw).unwrap() {
2248 ReportUsageResult::HardBreach { dimension, current_usage, hard_limit } => {
2249 assert_eq!(dimension, "requests");
2250 assert_eq!(current_usage, 10001);
2251 assert_eq!(hard_limit, 10000);
2252 }
2253 other => panic!("expected HardBreach, got {other:?}"),
2254 }
2255 }
2256
2257 #[test]
2261 fn non_array_input_is_parse_error() {
2262 let raw = Value::SimpleString("OK".to_owned());
2263 let err = parse_report_usage_result(&raw).unwrap_err();
2264 let msg = format!("{err}");
2265 assert!(
2266 msg.to_lowercase().contains("expected array"),
2267 "error should mention expected shape, got: {msg}"
2268 );
2269 }
2270
2271 #[test]
2276 fn first_element_non_int_is_parse_error() {
2277 let raw = arr(vec![s("not_an_int"), s("OK")]);
2278 let err = parse_report_usage_result(&raw).unwrap_err();
2279 let msg = format!("{err}");
2280 assert!(
2281 msg.to_lowercase().contains("int"),
2282 "error should mention Int status code, got: {msg}"
2283 );
2284 }
2285
2286 #[test]
2294 fn soft_breach_non_numeric_current_is_parse_error() {
2295 let raw = arr(vec![
2296 int(1),
2297 s("SOFT_BREACH"),
2298 s("tokens"),
2299 s("not_a_number"), s("100"),
2301 ]);
2302 let err = parse_report_usage_result(&raw).unwrap_err();
2303 let msg = format!("{err}");
2304 assert!(
2305 msg.contains("SOFT_BREACH") && msg.contains("current_usage"),
2306 "error should identify sub-status + field, got: {msg}"
2307 );
2308 assert!(
2309 msg.to_lowercase().contains("u64"),
2310 "error should mention the expected type (u64), got: {msg}"
2311 );
2312 }
2313
2314 #[test]
2318 fn hard_breach_missing_limit_is_parse_error() {
2319 let raw = arr(vec![
2320 int(1),
2321 s("HARD_BREACH"),
2322 s("requests"),
2323 s("10001"),
2324 ]);
2326 let err = parse_report_usage_result(&raw).unwrap_err();
2327 let msg = format!("{err}");
2328 assert!(
2329 msg.contains("HARD_BREACH") && msg.contains("hard_limit"),
2330 "error should identify sub-status + field, got: {msg}"
2331 );
2332 assert!(
2333 msg.to_lowercase().contains("missing"),
2334 "error should say 'missing', got: {msg}"
2335 );
2336 }
2337}
2338
2339
2340#[cfg(test)]
2341mod resume_signals_tests {
2342 use super::*;
2343
2344 fn m(pairs: &[(&str, &str)]) -> HashMap<String, String> {
2345 pairs.iter().map(|(k, v)| ((*k).to_owned(), (*v).to_owned())).collect()
2346 }
2347
2348 #[test]
2349 fn empty_suspension_returns_none() {
2350 let susp = m(&[]);
2351 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2352 assert!(out.is_none(), "no suspension record → None");
2353 }
2354
2355 #[test]
2356 fn stale_prior_attempt_returns_none() {
2357 let wp = WaitpointId::new();
2358 let susp = m(&[
2359 ("attempt_index", "0"),
2360 ("close_reason", "resumed"),
2361 ("waitpoint_id", &wp.to_string()),
2362 ]);
2363 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(1)).unwrap();
2365 assert!(out.is_none(), "attempt_index mismatch → None");
2366 }
2367
2368 #[test]
2369 fn non_resumed_close_returns_none() {
2370 let wp = WaitpointId::new();
2371 for reason in ["timeout", "cancelled", "", "expired"] {
2372 let susp = m(&[
2373 ("attempt_index", "0"),
2374 ("close_reason", reason),
2375 ("waitpoint_id", &wp.to_string()),
2376 ]);
2377 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2378 assert!(out.is_none(), "close_reason={reason:?} must not return signals");
2379 }
2380 }
2381
2382 #[test]
2383 fn resumed_same_attempt_returns_waitpoint() {
2384 let wp = WaitpointId::new();
2385 let susp = m(&[
2386 ("attempt_index", "2"),
2387 ("close_reason", "resumed"),
2388 ("waitpoint_id", &wp.to_string()),
2389 ]);
2390 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(2)).unwrap();
2391 assert_eq!(out, Some(wp));
2392 }
2393
2394 #[test]
2395 fn malformed_waitpoint_id_is_error() {
2396 let susp = m(&[
2397 ("attempt_index", "0"),
2398 ("close_reason", "resumed"),
2399 ("waitpoint_id", "not-a-uuid"),
2400 ]);
2401 let err = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap_err();
2402 assert!(
2403 format!("{err}").contains("not a valid UUID"),
2404 "error should mention invalid UUID, got: {err}"
2405 );
2406 }
2407
2408 #[test]
2409 fn empty_waitpoint_id_returns_none() {
2410 let susp = m(&[
2413 ("attempt_index", "0"),
2414 ("close_reason", "resumed"),
2415 ("waitpoint_id", ""),
2416 ]);
2417 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2418 assert!(out.is_none());
2419 }
2420
2421 }