ff_sdk/signal_bridge.rs
1//! External-callback "signal bridge" helpers.
2//!
3//! A signal-bridge is the control-plane glue that sits between an
4//! external actor (webhook callback, email reply, third-party API,
5//! human approver) and FlowFabric's signal delivery surface. The
6//! external actor presents a waitpoint HMAC token it was handed at
7//! suspend time; the bridge authenticates the token against what the
8//! engine has stored for the waitpoint, and on match forwards through
9//! [`FlowFabricWorker::deliver_signal`].
10//!
11//! This module exposes the two consumer-facing pieces of that shape —
12//! the error enum and the verify-and-deliver helper — so callers don't
13//! rebuild them. The helper was extracted from the v0.13
14//! `examples/external-callback/` example after cairn's signal-bridge
15//! code review surfaced it as rough-edge API (every consumer ends up
16//! reinventing `UnknownWaitpoint` / `TokenMismatch` / `Backend`).
17//!
18//! # Security notes
19//!
20//! * Token comparison is constant-time via [`subtle::ConstantTimeEq`].
21//! This guards against byte-timing oracles on the HMAC digest; the
22//! token length is not a secret (all valid tokens share the
23//! `<kid>:<hex-digest>` shape) and an early-return on length mismatch
24//! is acceptable.
25//! * The bridge's token check is **defense-in-depth**: the engine
26//! re-verifies the HMAC server-side in [`EngineBackend::deliver_signal`]
27//! (via the `waitpoint_token` field), so a bridge bug that
28//! mis-routes a bad token does not bypass authentication. The bridge
29//! check exists to fail fast at the network edge and produce a
30//! structured error surface for operators.
31//! * The helper stamps `waitpoint_token` on the outgoing [`Signal`]
32//! from the `presented` parameter — any `waitpoint_token` the caller
33//! set on the [`Signal`] they passed is overwritten so the engine
34//! re-verifies against the same bytes the bridge checked, not
35//! something else.
36
37use std::sync::Arc;
38
39use ff_core::engine_backend::EngineBackend;
40use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
41use ff_core::types::{ExecutionId, WaitpointId, WaitpointToken};
42use subtle::ConstantTimeEq;
43
44use crate::task::{Signal, SignalOutcome};
45use crate::worker::FlowFabricWorker;
46use crate::SdkError;
47
48/// Error returned by a signal-bridge authentication + dispatch
49/// attempt. On success, [`verify_and_deliver`] returns the
50/// [`SignalOutcome`] from the underlying
51/// [`FlowFabricWorker::deliver_signal`] call; this enum only
52/// enumerates the rejection + fault cases.
53#[derive(Debug)]
54pub enum SignalBridgeError {
55 /// The waitpoint has no stored token — it was consumed, expired,
56 /// or never existed. Real bridges typically map this to HTTP 404.
57 UnknownWaitpoint,
58 /// The token presented by the external actor did not match the
59 /// stored token. Real bridges typically map this to HTTP 401.
60 TokenMismatch,
61 /// Underlying SDK / engine failure (read-waitpoint-token backend
62 /// error, network fault, deliver-signal rejection).
63 Backend(SdkError),
64}
65
66impl std::fmt::Display for SignalBridgeError {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 match self {
69 Self::UnknownWaitpoint => {
70 f.write_str("unknown waitpoint (consumed, expired, or unknown id)")
71 }
72 Self::TokenMismatch => f.write_str("token mismatch (presented HMAC does not verify)"),
73 Self::Backend(e) => write!(f, "backend: {e}"),
74 }
75 }
76}
77
78impl std::error::Error for SignalBridgeError {
79 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80 match self {
81 Self::Backend(e) => Some(e),
82 _ => None,
83 }
84 }
85}
86
87impl From<SdkError> for SignalBridgeError {
88 fn from(e: SdkError) -> Self {
89 Self::Backend(e)
90 }
91}
92
93/// Authenticate an external-actor callback against the stored
94/// waitpoint token and, on match, deliver `signal` through the worker.
95///
96/// The helper:
97///
98/// 1. Reads the stored token via
99/// [`EngineBackend::read_waitpoint_token`]. No stored token (the
100/// backend returns `Ok(None)`) → [`SignalBridgeError::UnknownWaitpoint`].
101/// 2. Compares the stored token to `presented` in constant time. No
102/// match → [`SignalBridgeError::TokenMismatch`].
103/// 3. Stamps `signal.waitpoint_token = presented.clone()` (so the
104/// engine's server-side re-verification uses the same bytes the
105/// bridge already verified) and dispatches via
106/// [`FlowFabricWorker::deliver_signal`].
107///
108/// # Arguments
109///
110/// * `backend` — the `EngineBackend` trait object the bridge reads the
111/// stored token through. Typically the same `Arc<dyn EngineBackend>`
112/// the caller constructed their `FlowFabricWorker` with; accepts a
113/// reference so callers can pass it without cloning.
114/// * `worker` — the [`FlowFabricWorker`] that fronts the engine-side
115/// signal delivery. Must be configured against the same backend the
116/// stored token lives on.
117/// * `execution_id` — the suspended execution the waitpoint belongs
118/// to. The partition key is derived from the exec id's `{fp:N}:`
119/// prefix.
120/// * `waitpoint_id` — the waitpoint to authenticate.
121/// * `presented` — the HMAC token the external actor presented.
122/// * `signal` — caller-owned [`Signal`] payload (name, category,
123/// source, body). The helper overwrites `signal.waitpoint_token` so
124/// the caller's value for that field is ignored.
125///
126/// # Errors
127///
128/// See [`SignalBridgeError`].
129pub async fn verify_and_deliver(
130 backend: &dyn EngineBackend,
131 worker: &FlowFabricWorker,
132 execution_id: &ExecutionId,
133 waitpoint_id: &WaitpointId,
134 presented: &WaitpointToken,
135 mut signal: Signal,
136) -> Result<SignalOutcome, SignalBridgeError> {
137 let partition = PartitionKey::from(&Partition {
138 family: PartitionFamily::Flow,
139 index: execution_id.partition(),
140 });
141
142 let stored = backend
143 .read_waitpoint_token(partition, waitpoint_id)
144 .await
145 .map_err(|e| SignalBridgeError::Backend(SdkError::from(e)))?
146 .ok_or(SignalBridgeError::UnknownWaitpoint)?;
147
148 // Constant-time compare. `ConstantTimeEq::ct_eq` returns `Choice`;
149 // `bool::from(Choice)` is the idiomatic conversion.
150 let stored_bytes = stored.as_bytes();
151 let presented_bytes = presented.as_str().as_bytes();
152 if !bool::from(stored_bytes.ct_eq(presented_bytes)) {
153 return Err(SignalBridgeError::TokenMismatch);
154 }
155
156 signal.waitpoint_token = presented.clone();
157 worker
158 .deliver_signal(execution_id, waitpoint_id, signal)
159 .await
160 .map_err(SignalBridgeError::Backend)
161}
162
163/// Convenience wrapper for consumers that hold an `Arc<dyn EngineBackend>`
164/// rather than a bare trait reference — the more common shape in code
165/// that already follows the [`FlowFabricWorker::connect_with`] pattern.
166/// Dispatches to [`verify_and_deliver`]; no behavioural difference.
167pub async fn verify_and_deliver_arc(
168 backend: &Arc<dyn EngineBackend>,
169 worker: &FlowFabricWorker,
170 execution_id: &ExecutionId,
171 waitpoint_id: &WaitpointId,
172 presented: &WaitpointToken,
173 signal: Signal,
174) -> Result<SignalOutcome, SignalBridgeError> {
175 verify_and_deliver(
176 backend.as_ref(),
177 worker,
178 execution_id,
179 waitpoint_id,
180 presented,
181 signal,
182 )
183 .await
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 #[test]
191 fn display_each_variant() {
192 assert!(SignalBridgeError::UnknownWaitpoint
193 .to_string()
194 .contains("unknown waitpoint"));
195 assert!(SignalBridgeError::TokenMismatch
196 .to_string()
197 .contains("token mismatch"));
198 let wrapped = SignalBridgeError::Backend(SdkError::Config {
199 context: "test".into(),
200 field: None,
201 message: "boom".into(),
202 });
203 let msg = wrapped.to_string();
204 assert!(msg.starts_with("backend: "), "got: {msg}");
205 }
206
207 #[test]
208 fn error_source_wraps_backend() {
209 let inner = SdkError::Config {
210 context: "test".into(),
211 field: None,
212 message: "boom".into(),
213 };
214 let e = SignalBridgeError::Backend(inner);
215 assert!(
216 std::error::Error::source(&e).is_some(),
217 "Backend variant must expose source"
218 );
219 assert!(
220 std::error::Error::source(&SignalBridgeError::UnknownWaitpoint).is_none(),
221 "UnknownWaitpoint has no underlying source"
222 );
223 }
224
225 #[test]
226 fn from_sdk_error() {
227 let inner = SdkError::Config {
228 context: "test".into(),
229 field: None,
230 message: "boom".into(),
231 };
232 match SignalBridgeError::from(inner) {
233 SignalBridgeError::Backend(_) => {}
234 other => panic!("expected Backend, got {other:?}"),
235 }
236 }
237}