1use ff_core::contracts::*;
8use crate::error::ScriptError;
9use ff_core::keys::{ExecKeyContext, IndexKeys};
10use ff_core::types::*;
11
12use crate::result::{FcallResult, FromFcallResult};
13
14use super::execution::ExecOpKeys;
16
17#[derive(Clone, Debug, PartialEq, Eq)]
19pub struct ClaimedResumedExecutionPartial {
20 pub lease_id: LeaseId,
21 pub lease_epoch: LeaseEpoch,
22 pub attempt_index: AttemptIndex,
23 pub attempt_id: AttemptId,
24 pub lease_expires_at: TimestampMs,
25}
26
27#[derive(Clone, Debug, PartialEq, Eq)]
29pub enum ClaimResumedExecutionResultPartial {
30 Claimed(ClaimedResumedExecutionPartial),
31}
32
33impl ClaimResumedExecutionResultPartial {
34 pub fn complete(self, execution_id: ExecutionId) -> ClaimResumedExecutionResult {
35 match self {
36 Self::Claimed(p) => ClaimResumedExecutionResult::Claimed(ClaimedResumedExecution {
37 execution_id,
38 lease_id: p.lease_id,
39 lease_epoch: p.lease_epoch,
40 attempt_index: p.attempt_index,
41 attempt_id: p.attempt_id,
42 lease_expires_at: p.lease_expires_at,
43 }),
44 }
45 }
46}
47
48pub struct SignalOpKeys<'a> {
51 pub ctx: &'a ExecKeyContext,
52 pub idx: &'a IndexKeys,
53 pub lane_id: &'a LaneId,
54}
55
56ff_function! {
71 pub ff_deliver_signal(args: DeliverSignalArgs) -> DeliverSignalResult {
72 keys(k: &SignalOpKeys<'_>) {
73 k.ctx.core(), k.ctx.waitpoint_condition(&args.waitpoint_id), k.ctx.waitpoint_signals(&args.waitpoint_id), k.ctx.exec_signals(), k.ctx.signal(&args.signal_id), k.ctx.signal_payload(&args.signal_id), args.idempotency_key.as_ref().filter(|ik| !ik.is_empty()).map(|ik| {
80 k.ctx.signal_dedup(&args.waitpoint_id, ik)
81 }).unwrap_or_else(|| k.ctx.noop()), k.ctx.waitpoint(&args.waitpoint_id), k.ctx.suspension_current(), k.idx.lane_eligible(k.lane_id), k.idx.lane_suspended(k.lane_id), k.idx.lane_delayed(k.lane_id), k.idx.suspension_timeout(), k.idx.waitpoint_hmac_secrets(), }
90 argv {
91 args.signal_id.to_string(), args.execution_id.to_string(), args.waitpoint_id.to_string(), args.signal_name.clone(), args.signal_category.clone(), args.source_type.clone(), args.source_identity.clone(), args.payload.as_ref()
99 .map(|p| String::from_utf8_lossy(p).into_owned())
100 .unwrap_or_default(), args.payload_encoding.clone().unwrap_or_else(|| "json".into()), args.idempotency_key.clone().unwrap_or_default(), args.correlation_id.clone().unwrap_or_default(), args.target_scope.clone(), args.created_at.map(|t| t.to_string()).unwrap_or_default(), args.dedup_ttl_ms.unwrap_or(86_400_000).to_string(), args.resume_delay_ms.unwrap_or(0).to_string(), args.signal_maxlen.unwrap_or(1000).to_string(), args.max_signals_per_execution.unwrap_or(10_000).to_string(), args.waitpoint_token.as_str().to_owned(), }
112 }
113}
114
115impl FromFcallResult for DeliverSignalResult {
116 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
117 let r = FcallResult::parse(raw)?;
118 if r.status == "DUPLICATE" {
120 let sid_str = r.field_str(0);
121 let sid = SignalId::parse(&sid_str)
122 .map_err(|e| ScriptError::Parse {
123 fcall: "ff_deliver_signal".into(),
124 execution_id: None,
125 message: format!("bad signal_id: {e}"),
126 })?;
127 return Ok(DeliverSignalResult::Duplicate {
128 existing_signal_id: sid,
129 });
130 }
131 let r = r.into_success()?;
132 let sid_str = r.field_str(0);
134 let effect = r.field_str(1);
135 let sid = SignalId::parse(&sid_str)
136 .map_err(|e| ScriptError::Parse {
137 fcall: "ff_deliver_signal".into(),
138 execution_id: None,
139 message: format!("bad signal_id: {e}"),
140 })?;
141 Ok(DeliverSignalResult::Accepted {
142 signal_id: sid,
143 effect,
144 })
145 }
146}
147
148ff_function! {
156 pub ff_buffer_signal_for_pending_waitpoint(args: BufferSignalArgs) -> BufferSignalResult {
157 keys(k: &SignalOpKeys<'_>) {
158 k.ctx.core(), k.ctx.waitpoint_condition(&args.waitpoint_id), k.ctx.waitpoint_signals(&args.waitpoint_id), k.ctx.exec_signals(), k.ctx.signal(&args.signal_id), k.ctx.signal_payload(&args.signal_id), args.idempotency_key.as_ref().filter(|ik| !ik.is_empty()).map(|ik| {
165 k.ctx.signal_dedup(&args.waitpoint_id, ik)
166 }).unwrap_or_else(|| k.ctx.noop()), k.ctx.waitpoint(&args.waitpoint_id), k.idx.waitpoint_hmac_secrets(), }
170 argv {
171 args.signal_id.to_string(), args.execution_id.to_string(), args.waitpoint_id.to_string(), args.signal_name.clone(), args.signal_category.clone(), args.source_type.clone(), args.source_identity.clone(), args.payload.as_ref()
179 .map(|p| String::from_utf8_lossy(p).into_owned())
180 .unwrap_or_default(), args.payload_encoding.clone().unwrap_or_else(|| "json".into()), args.idempotency_key.clone().unwrap_or_default(), String::new(), args.target_scope.clone(), String::new(), String::new(), String::new(), String::new(), String::new(), args.waitpoint_token.as_str().to_owned(), }
192 }
193}
194
195impl FromFcallResult for BufferSignalResult {
196 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
197 let r = FcallResult::parse(raw)?;
198 if r.status == "DUPLICATE" {
200 let sid_str = r.field_str(0);
201 let sid = SignalId::parse(&sid_str)
202 .map_err(|e| ScriptError::Parse {
203 fcall: "ff_buffer_signal".into(),
204 execution_id: None,
205 message: format!("bad signal_id: {e}"),
206 })?;
207 return Ok(BufferSignalResult::Duplicate {
208 existing_signal_id: sid,
209 });
210 }
211 let r = r.into_success()?;
212 let sid_str = r.field_str(0);
214 let sid = SignalId::parse(&sid_str)
215 .map_err(|e| ScriptError::Parse {
216 fcall: "ff_buffer_signal".into(),
217 execution_id: None,
218 message: format!("bad signal_id: {e}"),
219 })?;
220 Ok(BufferSignalResult::Buffered { signal_id: sid })
221 }
222}
223
224ff_function! {
235 pub ff_claim_resumed_execution(args: ClaimResumedExecutionArgs) -> ClaimResumedExecutionResultPartial {
236 keys(k: &ExecOpKeys<'_>) {
237 k.ctx.core(), k.ctx.claim_grant(), k.idx.lane_eligible(k.lane_id), k.idx.lease_expiry(), k.idx.worker_leases(k.worker_instance_id), k.ctx.attempt_hash(args.current_attempt_index), k.ctx.lease_current(), k.ctx.lease_history(), k.idx.lane_active(k.lane_id), k.idx.attempt_timeout(), k.idx.execution_deadline(), }
249 argv {
250 args.execution_id.to_string(), args.worker_id.to_string(), args.worker_instance_id.to_string(), args.lane_id.to_string(), String::new(), args.lease_id.to_string(), args.lease_ttl_ms.to_string(), args.remaining_attempt_timeout_ms
258 .map(|t| t.to_string())
259 .unwrap_or_default(), }
261 }
262}
263
264impl FromFcallResult for ClaimResumedExecutionResultPartial {
265 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
266 let r = FcallResult::parse(raw)?.into_success()?;
267 let lease_id = LeaseId::parse(&r.field_str(0))
269 .map_err(|e| ScriptError::Parse {
270 fcall: "ff_claim_resumed_execution_result_partial".into(),
271 execution_id: None,
272 message: format!("bad lease_id: {e}"),
273 })?;
274 let epoch = r.field_str(1).parse::<u64>()
275 .map_err(|e| ScriptError::Parse {
276 fcall: "ff_claim_resumed_execution_result_partial".into(),
277 execution_id: None,
278 message: format!("bad epoch: {e}"),
279 })?;
280 let expires_at = r.field_str(2).parse::<i64>()
281 .map_err(|e| ScriptError::Parse {
282 fcall: "ff_claim_resumed_execution_result_partial".into(),
283 execution_id: None,
284 message: format!("bad expires_at: {e}"),
285 })?;
286 let attempt_id = AttemptId::parse(&r.field_str(3))
287 .map_err(|e| ScriptError::Parse {
288 fcall: "ff_claim_resumed_execution_result_partial".into(),
289 execution_id: None,
290 message: format!("bad attempt_id: {e}"),
291 })?;
292 let attempt_index = r.field_str(4).parse::<u32>()
293 .map_err(|e| ScriptError::Parse {
294 fcall: "ff_claim_resumed_execution_result_partial".into(),
295 execution_id: None,
296 message: format!("bad attempt_index: {e}"),
297 })?;
298
299 Ok(Self::Claimed(ClaimedResumedExecutionPartial {
300 lease_id,
301 lease_epoch: LeaseEpoch::new(epoch),
302 attempt_index: AttemptIndex::new(attempt_index),
303 attempt_id,
304 lease_expires_at: TimestampMs::from_millis(expires_at),
305 }))
306 }
307}
308
309#[cfg(test)]
311mod partial_tests {
312 use super::*;
313 use ff_core::partition::PartitionConfig;
314
315 #[test]
316 fn claim_resumed_partial_complete_attaches_execution_id() {
317 let partial = ClaimResumedExecutionResultPartial::Claimed(ClaimedResumedExecutionPartial {
318 lease_id: LeaseId::new(),
319 lease_epoch: LeaseEpoch::new(2),
320 attempt_index: AttemptIndex::new(1),
321 attempt_id: AttemptId::new(),
322 lease_expires_at: TimestampMs::from_millis(2000),
323 });
324 let eid = ExecutionId::for_flow(&FlowId::new(), &PartitionConfig::default());
325 let full = partial.complete(eid.clone());
326 match full {
327 ClaimResumedExecutionResult::Claimed(c) => assert_eq!(c.execution_id, eid),
328 }
329 }
330}