Skip to main content

ff_script/functions/
signal.rs

1//! Typed FCALL wrappers for signal delivery and resume-claim functions
2//! (lua/signal.lua).
3//!
4//! See `execution.rs` module-level rustdoc for the Partial-type pattern
5//! rationale (RFC-011 §2.4).
6
7use ff_core::contracts::*;
8use crate::error::ScriptError;
9use ff_core::keys::{ExecKeyContext, IndexKeys};
10use ff_core::types::*;
11
12use crate::result::{FcallResult, FromFcallResult};
13
14// Re-export ExecOpKeys from execution.rs for ff_claim_resumed_execution.
15use super::execution::ExecOpKeys;
16
17/// Partial form of [`ClaimedResumedExecution`] (omits `execution_id`).
18#[derive(Clone, Debug, PartialEq, Eq)]
19pub struct ClaimedResumedExecutionPartial {
20    pub lease_id: LeaseId,
21    pub lease_epoch: LeaseEpoch,
22    pub attempt_index: AttemptIndex,
23    pub attempt_id: AttemptId,
24    pub lease_expires_at: TimestampMs,
25}
26
27/// Partial form of [`ClaimResumedExecutionResult`].
28#[derive(Clone, Debug, PartialEq, Eq)]
29pub enum ClaimResumedExecutionResultPartial {
30    Claimed(ClaimedResumedExecutionPartial),
31}
32
33impl ClaimResumedExecutionResultPartial {
34    pub fn complete(self, execution_id: ExecutionId) -> ClaimResumedExecutionResult {
35        match self {
36            Self::Claimed(p) => ClaimResumedExecutionResult::Claimed(ClaimedResumedExecution {
37                execution_id,
38                lease_id: p.lease_id,
39                lease_epoch: p.lease_epoch,
40                attempt_index: p.attempt_index,
41                attempt_id: p.attempt_id,
42                lease_expires_at: p.lease_expires_at,
43            }),
44        }
45    }
46}
47
48/// Key context for signal delivery operations.
49/// Needs exec keys + index keys + lane (for eligible/suspended/delayed).
50pub struct SignalOpKeys<'a> {
51    pub ctx: &'a ExecKeyContext,
52    pub idx: &'a IndexKeys,
53    pub lane_id: &'a LaneId,
54}
55
56// ─── ff_deliver_signal ────────────────────────────────────────────────
57//
58// Lua KEYS (14): exec_core, wp_condition, wp_signals_stream,
59//                exec_signals_zset, signal_hash, signal_payload,
60//                idem_key, waitpoint_hash, suspension_current,
61//                eligible_zset, suspended_zset, delayed_zset,
62//                suspension_timeout_zset, hmac_secrets
63// Lua ARGV (18): signal_id, execution_id, waitpoint_id, signal_name,
64//                signal_category, source_type, source_identity,
65//                payload, payload_encoding, idempotency_key,
66//                correlation_id, target_scope, created_at,
67//                dedup_ttl_ms, resume_delay_ms, signal_maxlen,
68//                max_signals_per_execution, waitpoint_token
69
70ff_function! {
71    pub ff_deliver_signal(args: DeliverSignalArgs) -> DeliverSignalResult {
72        keys(k: &SignalOpKeys<'_>) {
73            k.ctx.core(),                                              // 1
74            k.ctx.waitpoint_condition(&args.waitpoint_id),             // 2
75            k.ctx.waitpoint_signals(&args.waitpoint_id),               // 3
76            k.ctx.exec_signals(),                                      // 4
77            k.ctx.signal(&args.signal_id),                             // 5
78            k.ctx.signal_payload(&args.signal_id),                     // 6
79            args.idempotency_key.as_ref().filter(|ik| !ik.is_empty()).map(|ik| {
80                k.ctx.signal_dedup(&args.waitpoint_id, ik)
81            }).unwrap_or_else(|| k.ctx.noop()),                        // 7
82            k.ctx.waitpoint(&args.waitpoint_id),                       // 8
83            k.ctx.suspension_current(),                                // 9
84            k.idx.lane_eligible(k.lane_id),                            // 10
85            k.idx.lane_suspended(k.lane_id),                           // 11
86            k.idx.lane_delayed(k.lane_id),                             // 12
87            k.idx.suspension_timeout(),                                // 13
88            k.idx.waitpoint_hmac_secrets(),                            // 14
89        }
90        argv {
91            args.signal_id.to_string(),                                // 1
92            args.execution_id.to_string(),                             // 2
93            args.waitpoint_id.to_string(),                             // 3
94            args.signal_name.clone(),                                  // 4
95            args.signal_category.clone(),                              // 5
96            args.source_type.clone(),                                  // 6
97            args.source_identity.clone(),                              // 7
98            args.payload.as_ref()
99                .map(|p| String::from_utf8_lossy(p).into_owned())
100                .unwrap_or_default(),                                  // 8
101            args.payload_encoding.clone().unwrap_or_else(|| "json".into()), // 9
102            args.idempotency_key.clone().unwrap_or_default(),          // 10
103            args.correlation_id.clone().unwrap_or_default(),           // 11
104            args.target_scope.clone(),                                 // 12
105            args.created_at.map(|t| t.to_string()).unwrap_or_default(), // 13
106            args.dedup_ttl_ms.unwrap_or(86_400_000).to_string(),       // 14
107            args.resume_delay_ms.unwrap_or(0).to_string(),             // 15
108            args.signal_maxlen.unwrap_or(1000).to_string(),            // 16
109            args.max_signals_per_execution.unwrap_or(10_000).to_string(), // 17
110            args.waitpoint_token.as_str().to_owned(),                  // 18 (wire: raw, not redacted Display)
111        }
112    }
113}
114
115impl FromFcallResult for DeliverSignalResult {
116    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
117        let r = FcallResult::parse(raw)?;
118        // DUPLICATE status: {1, "DUPLICATE", existing_signal_id}
119        if r.status == "DUPLICATE" {
120            let sid_str = r.field_str(0);
121            let sid = SignalId::parse(&sid_str)
122                .map_err(|e| ScriptError::Parse(format!("bad signal_id: {e}")))?;
123            return Ok(DeliverSignalResult::Duplicate {
124                existing_signal_id: sid,
125            });
126        }
127        let r = r.into_success()?;
128        // ok(signal_id, effect)
129        let sid_str = r.field_str(0);
130        let effect = r.field_str(1);
131        let sid = SignalId::parse(&sid_str)
132            .map_err(|e| ScriptError::Parse(format!("bad signal_id: {e}")))?;
133        Ok(DeliverSignalResult::Accepted {
134            signal_id: sid,
135            effect,
136        })
137    }
138}
139
140// ─── ff_buffer_signal_for_pending_waitpoint ───────────────────────────
141//
142// Lua KEYS (9): exec_core, wp_condition, wp_signals_stream,
143//               exec_signals_zset, signal_hash, signal_payload,
144//               idem_key, waitpoint_hash, hmac_secrets
145// Lua ARGV (18): same as ff_deliver_signal (17 + waitpoint_token)
146
147ff_function! {
148    pub ff_buffer_signal_for_pending_waitpoint(args: BufferSignalArgs) -> BufferSignalResult {
149        keys(k: &SignalOpKeys<'_>) {
150            k.ctx.core(),                                              // 1
151            k.ctx.waitpoint_condition(&args.waitpoint_id),             // 2
152            k.ctx.waitpoint_signals(&args.waitpoint_id),               // 3
153            k.ctx.exec_signals(),                                      // 4
154            k.ctx.signal(&args.signal_id),                             // 5
155            k.ctx.signal_payload(&args.signal_id),                     // 6
156            args.idempotency_key.as_ref().filter(|ik| !ik.is_empty()).map(|ik| {
157                k.ctx.signal_dedup(&args.waitpoint_id, ik)
158            }).unwrap_or_else(|| k.ctx.noop()),                        // 7
159            k.ctx.waitpoint(&args.waitpoint_id),                       // 8
160            k.idx.waitpoint_hmac_secrets(),                            // 9
161        }
162        argv {
163            args.signal_id.to_string(),                                // 1
164            args.execution_id.to_string(),                             // 2
165            args.waitpoint_id.to_string(),                             // 3
166            args.signal_name.clone(),                                  // 4
167            args.signal_category.clone(),                              // 5
168            args.source_type.clone(),                                  // 6
169            args.source_identity.clone(),                              // 7
170            args.payload.as_ref()
171                .map(|p| String::from_utf8_lossy(p).into_owned())
172                .unwrap_or_default(),                                  // 8
173            args.payload_encoding.clone().unwrap_or_else(|| "json".into()), // 9
174            args.idempotency_key.clone().unwrap_or_default(),          // 10
175            String::new(),                                             // 11 correlation_id (not in BufferSignalArgs)
176            args.target_scope.clone(),                                 // 12
177            String::new(),                                             // 13 created_at
178            String::new(),                                             // 14 dedup_ttl_ms
179            String::new(),                                             // 15 resume_delay_ms (unused)
180            String::new(),                                             // 16 signal_maxlen
181            String::new(),                                             // 17 max_signals
182            args.waitpoint_token.as_str().to_owned(),                  // 18 (wire: raw, not redacted Display)
183        }
184    }
185}
186
187impl FromFcallResult for BufferSignalResult {
188    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
189        let r = FcallResult::parse(raw)?;
190        // DUPLICATE status: {1, "DUPLICATE", existing_signal_id}
191        if r.status == "DUPLICATE" {
192            let sid_str = r.field_str(0);
193            let sid = SignalId::parse(&sid_str)
194                .map_err(|e| ScriptError::Parse(format!("bad signal_id: {e}")))?;
195            return Ok(BufferSignalResult::Duplicate {
196                existing_signal_id: sid,
197            });
198        }
199        let r = r.into_success()?;
200        // ok(signal_id, "buffered_for_pending_waitpoint")
201        let sid_str = r.field_str(0);
202        let sid = SignalId::parse(&sid_str)
203            .map_err(|e| ScriptError::Parse(format!("bad signal_id: {e}")))?;
204        Ok(BufferSignalResult::Buffered { signal_id: sid })
205    }
206}
207
208// ─── ff_claim_resumed_execution ───────────────────────────────────────
209//
210// Lua KEYS (11): exec_core, claim_grant, eligible_zset, lease_expiry_zset,
211//                worker_leases, existing_attempt_hash, lease_current,
212//                lease_history, active_index, attempt_timeout_zset,
213//                execution_deadline_zset
214// Lua ARGV (8): execution_id, worker_id, worker_instance_id, lane,
215//               capability_snapshot_hash, lease_id, lease_ttl_ms,
216//               remaining_attempt_timeout_ms
217
218ff_function! {
219    pub ff_claim_resumed_execution(args: ClaimResumedExecutionArgs) -> ClaimResumedExecutionResultPartial {
220        keys(k: &ExecOpKeys<'_>) {
221            k.ctx.core(),                                              // 1
222            k.ctx.claim_grant(),                                       // 2
223            k.idx.lane_eligible(k.lane_id),                            // 3
224            k.idx.lease_expiry(),                                      // 4
225            k.idx.worker_leases(k.worker_instance_id),                 // 5
226            k.ctx.attempt_hash(args.current_attempt_index),            // 6
227            k.ctx.lease_current(),                                     // 7
228            k.ctx.lease_history(),                                     // 8
229            k.idx.lane_active(k.lane_id),                              // 9
230            k.idx.attempt_timeout(),                                   // 10
231            k.idx.execution_deadline(),                                // 11
232        }
233        argv {
234            args.execution_id.to_string(),                             // 1
235            args.worker_id.to_string(),                                // 2
236            args.worker_instance_id.to_string(),                       // 3
237            args.lane_id.to_string(),                                  // 4
238            String::new(),                                             // 5 capability_snapshot_hash
239            args.lease_id.to_string(),                                 // 6
240            args.lease_ttl_ms.to_string(),                             // 7
241            args.remaining_attempt_timeout_ms
242                .map(|t| t.to_string())
243                .unwrap_or_default(),                                  // 8
244        }
245    }
246}
247
248impl FromFcallResult for ClaimResumedExecutionResultPartial {
249    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
250        let r = FcallResult::parse(raw)?.into_success()?;
251        // ok(lease_id, epoch, expires_at, attempt_id, attempt_index, "resumed")
252        let lease_id = LeaseId::parse(&r.field_str(0))
253            .map_err(|e| ScriptError::Parse(format!("bad lease_id: {e}")))?;
254        let epoch = r.field_str(1).parse::<u64>()
255            .map_err(|e| ScriptError::Parse(format!("bad epoch: {e}")))?;
256        let expires_at = r.field_str(2).parse::<i64>()
257            .map_err(|e| ScriptError::Parse(format!("bad expires_at: {e}")))?;
258        let attempt_id = AttemptId::parse(&r.field_str(3))
259            .map_err(|e| ScriptError::Parse(format!("bad attempt_id: {e}")))?;
260        let attempt_index = r.field_str(4).parse::<u32>()
261            .map_err(|e| ScriptError::Parse(format!("bad attempt_index: {e}")))?;
262
263        Ok(Self::Claimed(ClaimedResumedExecutionPartial {
264            lease_id,
265            lease_epoch: LeaseEpoch::new(epoch),
266            attempt_index: AttemptIndex::new(attempt_index),
267            attempt_id,
268            lease_expires_at: TimestampMs::from_millis(expires_at),
269        }))
270    }
271}
272
273// ─── Partial-type tests (RFC-011 §2.4 acceptance) ──────────────────────
274#[cfg(test)]
275mod partial_tests {
276    use super::*;
277    use ff_core::partition::PartitionConfig;
278
279    #[test]
280    fn claim_resumed_partial_complete_attaches_execution_id() {
281        let partial = ClaimResumedExecutionResultPartial::Claimed(ClaimedResumedExecutionPartial {
282            lease_id: LeaseId::new(),
283            lease_epoch: LeaseEpoch::new(2),
284            attempt_index: AttemptIndex::new(1),
285            attempt_id: AttemptId::new(),
286            lease_expires_at: TimestampMs::from_millis(2000),
287        });
288        let eid = ExecutionId::for_flow(&FlowId::new(), &PartitionConfig::default());
289        let full = partial.complete(eid.clone());
290        match full {
291            ClaimResumedExecutionResult::Claimed(c) => assert_eq!(c.execution_id, eid),
292        }
293    }
294}