Skip to main content

ff_script/functions/
signal.rs

1//! Typed FCALL wrappers for signal delivery and resume-claim functions
2//! (lua/signal.lua).
3//!
4//! See `execution.rs` module-level rustdoc for the Partial-type pattern
5//! rationale (RFC-011 §2.4).
6
7use ff_core::contracts::*;
8use crate::error::ScriptError;
9use ff_core::keys::{ExecKeyContext, IndexKeys};
10use ff_core::types::*;
11
12use crate::result::{FcallResult, FromFcallResult};
13
14// Re-export ExecOpKeys from execution.rs for ff_claim_resumed_execution.
15use super::execution::ExecOpKeys;
16
17/// Partial form of [`ClaimedResumedExecution`] (omits `execution_id`).
18#[derive(Clone, Debug, PartialEq, Eq)]
19pub struct ClaimedResumedExecutionPartial {
20    pub lease_id: LeaseId,
21    pub lease_epoch: LeaseEpoch,
22    pub attempt_index: AttemptIndex,
23    pub attempt_id: AttemptId,
24    pub lease_expires_at: TimestampMs,
25}
26
27/// Partial form of [`ClaimResumedExecutionResult`].
28#[derive(Clone, Debug, PartialEq, Eq)]
29pub enum ClaimResumedExecutionResultPartial {
30    Claimed(ClaimedResumedExecutionPartial),
31}
32
33impl ClaimResumedExecutionResultPartial {
34    pub fn complete(self, execution_id: ExecutionId) -> ClaimResumedExecutionResult {
35        match self {
36            Self::Claimed(p) => ClaimResumedExecutionResult::Claimed(ClaimedResumedExecution {
37                execution_id,
38                lease_id: p.lease_id,
39                lease_epoch: p.lease_epoch,
40                attempt_index: p.attempt_index,
41                attempt_id: p.attempt_id,
42                lease_expires_at: p.lease_expires_at,
43            }),
44        }
45    }
46}
47
48/// Key context for signal delivery operations.
49/// Needs exec keys + index keys + lane (for eligible/suspended/delayed).
50pub struct SignalOpKeys<'a> {
51    pub ctx: &'a ExecKeyContext,
52    pub idx: &'a IndexKeys,
53    pub lane_id: &'a LaneId,
54}
55
56// ─── ff_deliver_signal ────────────────────────────────────────────────
57//
58// Lua KEYS (14): exec_core, wp_condition, wp_signals_stream,
59//                exec_signals_zset, signal_hash, signal_payload,
60//                idem_key, waitpoint_hash, suspension_current,
61//                eligible_zset, suspended_zset, delayed_zset,
62//                suspension_timeout_zset, hmac_secrets
63// Lua ARGV (18): signal_id, execution_id, waitpoint_id, signal_name,
64//                signal_category, source_type, source_identity,
65//                payload, payload_encoding, idempotency_key,
66//                correlation_id, target_scope, created_at,
67//                dedup_ttl_ms, resume_delay_ms, signal_maxlen,
68//                max_signals_per_execution, waitpoint_token
69
70ff_function! {
71    pub ff_deliver_signal(args: DeliverSignalArgs) -> DeliverSignalResult {
72        keys(k: &SignalOpKeys<'_>) {
73            k.ctx.core(),                                              // 1
74            k.ctx.waitpoint_condition(&args.waitpoint_id),             // 2
75            k.ctx.waitpoint_signals(&args.waitpoint_id),               // 3
76            k.ctx.exec_signals(),                                      // 4
77            k.ctx.signal(&args.signal_id),                             // 5
78            k.ctx.signal_payload(&args.signal_id),                     // 6
79            args.idempotency_key.as_ref().filter(|ik| !ik.is_empty()).map(|ik| {
80                k.ctx.signal_dedup(&args.waitpoint_id, ik)
81            }).unwrap_or_else(|| k.ctx.noop()),                        // 7
82            k.ctx.waitpoint(&args.waitpoint_id),                       // 8
83            k.ctx.suspension_current(),                                // 9
84            k.idx.lane_eligible(k.lane_id),                            // 10
85            k.idx.lane_suspended(k.lane_id),                           // 11
86            k.idx.lane_delayed(k.lane_id),                             // 12
87            k.idx.suspension_timeout(),                                // 13
88            k.idx.waitpoint_hmac_secrets(),                            // 14
89        }
90        argv {
91            args.signal_id.to_string(),                                // 1
92            args.execution_id.to_string(),                             // 2
93            args.waitpoint_id.to_string(),                             // 3
94            args.signal_name.clone(),                                  // 4
95            args.signal_category.clone(),                              // 5
96            args.source_type.clone(),                                  // 6
97            args.source_identity.clone(),                              // 7
98            args.payload.as_ref()
99                .map(|p| String::from_utf8_lossy(p).into_owned())
100                .unwrap_or_default(),                                  // 8
101            args.payload_encoding.clone().unwrap_or_else(|| "json".into()), // 9
102            args.idempotency_key.clone().unwrap_or_default(),          // 10
103            args.correlation_id.clone().unwrap_or_default(),           // 11
104            args.target_scope.clone(),                                 // 12
105            args.created_at.map(|t| t.to_string()).unwrap_or_default(), // 13
106            args.dedup_ttl_ms.unwrap_or(86_400_000).to_string(),       // 14
107            args.resume_delay_ms.unwrap_or(0).to_string(),             // 15
108            args.signal_maxlen.unwrap_or(1000).to_string(),            // 16
109            args.max_signals_per_execution.unwrap_or(10_000).to_string(), // 17
110            args.waitpoint_token.as_str().to_owned(),                  // 18 (wire: raw, not redacted Display)
111        }
112    }
113}
114
115impl FromFcallResult for DeliverSignalResult {
116    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
117        let r = FcallResult::parse(raw)?;
118        // DUPLICATE status: {1, "DUPLICATE", existing_signal_id}
119        if r.status == "DUPLICATE" {
120            let sid_str = r.field_str(0);
121            let sid = SignalId::parse(&sid_str)
122                .map_err(|e| ScriptError::Parse {
123                    fcall: "ff_deliver_signal".into(),
124                    execution_id: None,
125                    message: format!("bad signal_id: {e}"),
126                })?;
127            return Ok(DeliverSignalResult::Duplicate {
128                existing_signal_id: sid,
129            });
130        }
131        let r = r.into_success()?;
132        // ok(signal_id, effect)
133        let sid_str = r.field_str(0);
134        let effect = r.field_str(1);
135        let sid = SignalId::parse(&sid_str)
136            .map_err(|e| ScriptError::Parse {
137                fcall: "ff_deliver_signal".into(),
138                execution_id: None,
139                message: format!("bad signal_id: {e}"),
140            })?;
141        Ok(DeliverSignalResult::Accepted {
142            signal_id: sid,
143            effect,
144        })
145    }
146}
147
148// ─── ff_buffer_signal_for_pending_waitpoint ───────────────────────────
149//
150// Lua KEYS (9): exec_core, wp_condition, wp_signals_stream,
151//               exec_signals_zset, signal_hash, signal_payload,
152//               idem_key, waitpoint_hash, hmac_secrets
153// Lua ARGV (18): same as ff_deliver_signal (17 + waitpoint_token)
154
155ff_function! {
156    pub ff_buffer_signal_for_pending_waitpoint(args: BufferSignalArgs) -> BufferSignalResult {
157        keys(k: &SignalOpKeys<'_>) {
158            k.ctx.core(),                                              // 1
159            k.ctx.waitpoint_condition(&args.waitpoint_id),             // 2
160            k.ctx.waitpoint_signals(&args.waitpoint_id),               // 3
161            k.ctx.exec_signals(),                                      // 4
162            k.ctx.signal(&args.signal_id),                             // 5
163            k.ctx.signal_payload(&args.signal_id),                     // 6
164            args.idempotency_key.as_ref().filter(|ik| !ik.is_empty()).map(|ik| {
165                k.ctx.signal_dedup(&args.waitpoint_id, ik)
166            }).unwrap_or_else(|| k.ctx.noop()),                        // 7
167            k.ctx.waitpoint(&args.waitpoint_id),                       // 8
168            k.idx.waitpoint_hmac_secrets(),                            // 9
169        }
170        argv {
171            args.signal_id.to_string(),                                // 1
172            args.execution_id.to_string(),                             // 2
173            args.waitpoint_id.to_string(),                             // 3
174            args.signal_name.clone(),                                  // 4
175            args.signal_category.clone(),                              // 5
176            args.source_type.clone(),                                  // 6
177            args.source_identity.clone(),                              // 7
178            args.payload.as_ref()
179                .map(|p| String::from_utf8_lossy(p).into_owned())
180                .unwrap_or_default(),                                  // 8
181            args.payload_encoding.clone().unwrap_or_else(|| "json".into()), // 9
182            args.idempotency_key.clone().unwrap_or_default(),          // 10
183            String::new(),                                             // 11 correlation_id (not in BufferSignalArgs)
184            args.target_scope.clone(),                                 // 12
185            String::new(),                                             // 13 created_at
186            String::new(),                                             // 14 dedup_ttl_ms
187            String::new(),                                             // 15 resume_delay_ms (unused)
188            String::new(),                                             // 16 signal_maxlen
189            String::new(),                                             // 17 max_signals
190            args.waitpoint_token.as_str().to_owned(),                  // 18 (wire: raw, not redacted Display)
191        }
192    }
193}
194
195impl FromFcallResult for BufferSignalResult {
196    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
197        let r = FcallResult::parse(raw)?;
198        // DUPLICATE status: {1, "DUPLICATE", existing_signal_id}
199        if r.status == "DUPLICATE" {
200            let sid_str = r.field_str(0);
201            let sid = SignalId::parse(&sid_str)
202                .map_err(|e| ScriptError::Parse {
203                    fcall: "ff_buffer_signal".into(),
204                    execution_id: None,
205                    message: format!("bad signal_id: {e}"),
206                })?;
207            return Ok(BufferSignalResult::Duplicate {
208                existing_signal_id: sid,
209            });
210        }
211        let r = r.into_success()?;
212        // ok(signal_id, "buffered_for_pending_waitpoint")
213        let sid_str = r.field_str(0);
214        let sid = SignalId::parse(&sid_str)
215            .map_err(|e| ScriptError::Parse {
216                fcall: "ff_buffer_signal".into(),
217                execution_id: None,
218                message: format!("bad signal_id: {e}"),
219            })?;
220        Ok(BufferSignalResult::Buffered { signal_id: sid })
221    }
222}
223
224// ─── ff_claim_resumed_execution ───────────────────────────────────────
225//
226// Lua KEYS (11): exec_core, claim_grant, eligible_zset, lease_expiry_zset,
227//                worker_leases, existing_attempt_hash, lease_current,
228//                lease_history, active_index, attempt_timeout_zset,
229//                execution_deadline_zset
230// Lua ARGV (8): execution_id, worker_id, worker_instance_id, lane,
231//               capability_snapshot_hash, lease_id, lease_ttl_ms,
232//               remaining_attempt_timeout_ms
233
234ff_function! {
235    pub ff_claim_resumed_execution(args: ClaimResumedExecutionArgs) -> ClaimResumedExecutionResultPartial {
236        keys(k: &ExecOpKeys<'_>) {
237            k.ctx.core(),                                              // 1
238            k.ctx.claim_grant(),                                       // 2
239            k.idx.lane_eligible(k.lane_id),                            // 3
240            k.idx.lease_expiry(),                                      // 4
241            k.idx.worker_leases(k.worker_instance_id),                 // 5
242            k.ctx.attempt_hash(args.current_attempt_index),            // 6
243            k.ctx.lease_current(),                                     // 7
244            k.ctx.lease_history(),                                     // 8
245            k.idx.lane_active(k.lane_id),                              // 9
246            k.idx.attempt_timeout(),                                   // 10
247            k.idx.execution_deadline(),                                // 11
248        }
249        argv {
250            args.execution_id.to_string(),                             // 1
251            args.worker_id.to_string(),                                // 2
252            args.worker_instance_id.to_string(),                       // 3
253            args.lane_id.to_string(),                                  // 4
254            String::new(),                                             // 5 capability_snapshot_hash
255            args.lease_id.to_string(),                                 // 6
256            args.lease_ttl_ms.to_string(),                             // 7
257            args.remaining_attempt_timeout_ms
258                .map(|t| t.to_string())
259                .unwrap_or_default(),                                  // 8
260        }
261    }
262}
263
264impl FromFcallResult for ClaimResumedExecutionResultPartial {
265    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
266        let r = FcallResult::parse(raw)?.into_success()?;
267        // ok(lease_id, epoch, expires_at, attempt_id, attempt_index, "resumed")
268        let lease_id = LeaseId::parse(&r.field_str(0))
269            .map_err(|e| ScriptError::Parse {
270                fcall: "ff_claim_resumed_execution_result_partial".into(),
271                execution_id: None,
272                message: format!("bad lease_id: {e}"),
273            })?;
274        let epoch = r.field_str(1).parse::<u64>()
275            .map_err(|e| ScriptError::Parse {
276                fcall: "ff_claim_resumed_execution_result_partial".into(),
277                execution_id: None,
278                message: format!("bad epoch: {e}"),
279            })?;
280        let expires_at = r.field_str(2).parse::<i64>()
281            .map_err(|e| ScriptError::Parse {
282                fcall: "ff_claim_resumed_execution_result_partial".into(),
283                execution_id: None,
284                message: format!("bad expires_at: {e}"),
285            })?;
286        let attempt_id = AttemptId::parse(&r.field_str(3))
287            .map_err(|e| ScriptError::Parse {
288                fcall: "ff_claim_resumed_execution_result_partial".into(),
289                execution_id: None,
290                message: format!("bad attempt_id: {e}"),
291            })?;
292        let attempt_index = r.field_str(4).parse::<u32>()
293            .map_err(|e| ScriptError::Parse {
294                fcall: "ff_claim_resumed_execution_result_partial".into(),
295                execution_id: None,
296                message: format!("bad attempt_index: {e}"),
297            })?;
298
299        Ok(Self::Claimed(ClaimedResumedExecutionPartial {
300            lease_id,
301            lease_epoch: LeaseEpoch::new(epoch),
302            attempt_index: AttemptIndex::new(attempt_index),
303            attempt_id,
304            lease_expires_at: TimestampMs::from_millis(expires_at),
305        }))
306    }
307}
308
309// ─── Partial-type tests (RFC-011 §2.4 acceptance) ──────────────────────
310#[cfg(test)]
311mod partial_tests {
312    use super::*;
313    use ff_core::partition::PartitionConfig;
314
315    #[test]
316    fn claim_resumed_partial_complete_attaches_execution_id() {
317        let partial = ClaimResumedExecutionResultPartial::Claimed(ClaimedResumedExecutionPartial {
318            lease_id: LeaseId::new(),
319            lease_epoch: LeaseEpoch::new(2),
320            attempt_index: AttemptIndex::new(1),
321            attempt_id: AttemptId::new(),
322            lease_expires_at: TimestampMs::from_millis(2000),
323        });
324        let eid = ExecutionId::for_flow(&FlowId::new(), &PartitionConfig::default());
325        let full = partial.complete(eid.clone());
326        match full {
327            ClaimResumedExecutionResult::Claimed(c) => assert_eq!(c.execution_id, eid),
328        }
329    }
330}