1use ff_core::contracts::*;
8use crate::error::ScriptError;
9use ff_core::keys::{ExecKeyContext, IndexKeys};
10use ff_core::types::*;
11
12use crate::result::{FcallResult, FromFcallResult};
13
14use super::execution::ExecOpKeys;
16
17#[derive(Clone, Debug, PartialEq, Eq)]
19pub struct ClaimedResumedExecutionPartial {
20 pub lease_id: LeaseId,
21 pub lease_epoch: LeaseEpoch,
22 pub attempt_index: AttemptIndex,
23 pub attempt_id: AttemptId,
24 pub lease_expires_at: TimestampMs,
25}
26
27#[derive(Clone, Debug, PartialEq, Eq)]
29pub enum ClaimResumedExecutionResultPartial {
30 Claimed(ClaimedResumedExecutionPartial),
31}
32
33impl ClaimResumedExecutionResultPartial {
34 pub fn complete(self, execution_id: ExecutionId) -> ClaimResumedExecutionResult {
35 match self {
36 Self::Claimed(p) => ClaimResumedExecutionResult::Claimed(ClaimedResumedExecution {
37 execution_id,
38 lease_id: p.lease_id,
39 lease_epoch: p.lease_epoch,
40 attempt_index: p.attempt_index,
41 attempt_id: p.attempt_id,
42 lease_expires_at: p.lease_expires_at,
43 }),
44 }
45 }
46}
47
48pub struct SignalOpKeys<'a> {
51 pub ctx: &'a ExecKeyContext,
52 pub idx: &'a IndexKeys,
53 pub lane_id: &'a LaneId,
54}
55
56ff_function! {
71 pub ff_deliver_signal(args: DeliverSignalArgs) -> DeliverSignalResult {
72 keys(k: &SignalOpKeys<'_>) {
73 k.ctx.core(), k.ctx.waitpoint_condition(&args.waitpoint_id), k.ctx.waitpoint_signals(&args.waitpoint_id), k.ctx.exec_signals(), k.ctx.signal(&args.signal_id), k.ctx.signal_payload(&args.signal_id), args.idempotency_key.as_ref().filter(|ik| !ik.is_empty()).map(|ik| {
80 k.ctx.signal_dedup(&args.waitpoint_id, ik)
81 }).unwrap_or_else(|| k.ctx.noop()), k.ctx.waitpoint(&args.waitpoint_id), k.ctx.suspension_current(), k.idx.lane_eligible(k.lane_id), k.idx.lane_suspended(k.lane_id), k.idx.lane_delayed(k.lane_id), k.idx.suspension_timeout(), k.idx.waitpoint_hmac_secrets(), }
90 argv {
91 args.signal_id.to_string(), args.execution_id.to_string(), args.waitpoint_id.to_string(), args.signal_name.clone(), args.signal_category.clone(), args.source_type.clone(), args.source_identity.clone(), args.payload.as_ref()
99 .map(|p| String::from_utf8_lossy(p).into_owned())
100 .unwrap_or_default(), args.payload_encoding.clone().unwrap_or_else(|| "json".into()), args.idempotency_key.clone().unwrap_or_default(), args.correlation_id.clone().unwrap_or_default(), args.target_scope.clone(), args.created_at.map(|t| t.to_string()).unwrap_or_default(), args.dedup_ttl_ms.unwrap_or(86_400_000).to_string(), args.resume_delay_ms.unwrap_or(0).to_string(), args.signal_maxlen.unwrap_or(1000).to_string(), args.max_signals_per_execution.unwrap_or(10_000).to_string(), args.waitpoint_token.as_str().to_owned(), }
112 }
113}
114
115impl FromFcallResult for DeliverSignalResult {
116 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
117 let r = FcallResult::parse(raw)?;
118 if r.status == "DUPLICATE" {
120 let sid_str = r.field_str(0);
121 let sid = SignalId::parse(&sid_str)
122 .map_err(|e| ScriptError::Parse(format!("bad signal_id: {e}")))?;
123 return Ok(DeliverSignalResult::Duplicate {
124 existing_signal_id: sid,
125 });
126 }
127 let r = r.into_success()?;
128 let sid_str = r.field_str(0);
130 let effect = r.field_str(1);
131 let sid = SignalId::parse(&sid_str)
132 .map_err(|e| ScriptError::Parse(format!("bad signal_id: {e}")))?;
133 Ok(DeliverSignalResult::Accepted {
134 signal_id: sid,
135 effect,
136 })
137 }
138}
139
140ff_function! {
148 pub ff_buffer_signal_for_pending_waitpoint(args: BufferSignalArgs) -> BufferSignalResult {
149 keys(k: &SignalOpKeys<'_>) {
150 k.ctx.core(), k.ctx.waitpoint_condition(&args.waitpoint_id), k.ctx.waitpoint_signals(&args.waitpoint_id), k.ctx.exec_signals(), k.ctx.signal(&args.signal_id), k.ctx.signal_payload(&args.signal_id), args.idempotency_key.as_ref().filter(|ik| !ik.is_empty()).map(|ik| {
157 k.ctx.signal_dedup(&args.waitpoint_id, ik)
158 }).unwrap_or_else(|| k.ctx.noop()), k.ctx.waitpoint(&args.waitpoint_id), k.idx.waitpoint_hmac_secrets(), }
162 argv {
163 args.signal_id.to_string(), args.execution_id.to_string(), args.waitpoint_id.to_string(), args.signal_name.clone(), args.signal_category.clone(), args.source_type.clone(), args.source_identity.clone(), args.payload.as_ref()
171 .map(|p| String::from_utf8_lossy(p).into_owned())
172 .unwrap_or_default(), args.payload_encoding.clone().unwrap_or_else(|| "json".into()), args.idempotency_key.clone().unwrap_or_default(), String::new(), args.target_scope.clone(), String::new(), String::new(), String::new(), String::new(), String::new(), args.waitpoint_token.as_str().to_owned(), }
184 }
185}
186
187impl FromFcallResult for BufferSignalResult {
188 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
189 let r = FcallResult::parse(raw)?;
190 if r.status == "DUPLICATE" {
192 let sid_str = r.field_str(0);
193 let sid = SignalId::parse(&sid_str)
194 .map_err(|e| ScriptError::Parse(format!("bad signal_id: {e}")))?;
195 return Ok(BufferSignalResult::Duplicate {
196 existing_signal_id: sid,
197 });
198 }
199 let r = r.into_success()?;
200 let sid_str = r.field_str(0);
202 let sid = SignalId::parse(&sid_str)
203 .map_err(|e| ScriptError::Parse(format!("bad signal_id: {e}")))?;
204 Ok(BufferSignalResult::Buffered { signal_id: sid })
205 }
206}
207
208ff_function! {
219 pub ff_claim_resumed_execution(args: ClaimResumedExecutionArgs) -> ClaimResumedExecutionResultPartial {
220 keys(k: &ExecOpKeys<'_>) {
221 k.ctx.core(), k.ctx.claim_grant(), k.idx.lane_eligible(k.lane_id), k.idx.lease_expiry(), k.idx.worker_leases(k.worker_instance_id), k.ctx.attempt_hash(args.current_attempt_index), k.ctx.lease_current(), k.ctx.lease_history(), k.idx.lane_active(k.lane_id), k.idx.attempt_timeout(), k.idx.execution_deadline(), }
233 argv {
234 args.execution_id.to_string(), args.worker_id.to_string(), args.worker_instance_id.to_string(), args.lane_id.to_string(), String::new(), args.lease_id.to_string(), args.lease_ttl_ms.to_string(), args.remaining_attempt_timeout_ms
242 .map(|t| t.to_string())
243 .unwrap_or_default(), }
245 }
246}
247
248impl FromFcallResult for ClaimResumedExecutionResultPartial {
249 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
250 let r = FcallResult::parse(raw)?.into_success()?;
251 let lease_id = LeaseId::parse(&r.field_str(0))
253 .map_err(|e| ScriptError::Parse(format!("bad lease_id: {e}")))?;
254 let epoch = r.field_str(1).parse::<u64>()
255 .map_err(|e| ScriptError::Parse(format!("bad epoch: {e}")))?;
256 let expires_at = r.field_str(2).parse::<i64>()
257 .map_err(|e| ScriptError::Parse(format!("bad expires_at: {e}")))?;
258 let attempt_id = AttemptId::parse(&r.field_str(3))
259 .map_err(|e| ScriptError::Parse(format!("bad attempt_id: {e}")))?;
260 let attempt_index = r.field_str(4).parse::<u32>()
261 .map_err(|e| ScriptError::Parse(format!("bad attempt_index: {e}")))?;
262
263 Ok(Self::Claimed(ClaimedResumedExecutionPartial {
264 lease_id,
265 lease_epoch: LeaseEpoch::new(epoch),
266 attempt_index: AttemptIndex::new(attempt_index),
267 attempt_id,
268 lease_expires_at: TimestampMs::from_millis(expires_at),
269 }))
270 }
271}
272
273#[cfg(test)]
275mod partial_tests {
276 use super::*;
277 use ff_core::partition::PartitionConfig;
278
279 #[test]
280 fn claim_resumed_partial_complete_attaches_execution_id() {
281 let partial = ClaimResumedExecutionResultPartial::Claimed(ClaimedResumedExecutionPartial {
282 lease_id: LeaseId::new(),
283 lease_epoch: LeaseEpoch::new(2),
284 attempt_index: AttemptIndex::new(1),
285 attempt_id: AttemptId::new(),
286 lease_expires_at: TimestampMs::from_millis(2000),
287 });
288 let eid = ExecutionId::for_flow(&FlowId::new(), &PartitionConfig::default());
289 let full = partial.complete(eid.clone());
290 match full {
291 ClaimResumedExecutionResult::Claimed(c) => assert_eq!(c.execution_id, eid),
292 }
293 }
294}