Skip to main content

ff_sdk/
signal_bridge.rs

1//! External-callback "signal bridge" helpers.
2//!
3//! A signal-bridge is the control-plane glue that sits between an
4//! external actor (webhook callback, email reply, third-party API,
5//! human approver) and FlowFabric's signal delivery surface. The
6//! external actor presents a waitpoint HMAC token it was handed at
7//! suspend time; the bridge authenticates the token against what the
8//! engine has stored for the waitpoint, and on match forwards through
9//! [`FlowFabricWorker::deliver_signal`].
10//!
11//! This module exposes the two consumer-facing pieces of that shape —
12//! the error enum and the verify-and-deliver helper — so callers don't
13//! rebuild them. The helper was extracted from the v0.13
14//! `examples/external-callback/` example after cairn's signal-bridge
15//! code review surfaced it as rough-edge API (every consumer ends up
16//! reinventing `UnknownWaitpoint` / `TokenMismatch` / `Backend`).
17//!
18//! # Security notes
19//!
20//! * Token comparison is constant-time via [`subtle::ConstantTimeEq`].
21//!   This guards against byte-timing oracles on the HMAC digest; the
22//!   token length is not a secret (all valid tokens share the
23//!   `<kid>:<hex-digest>` shape) and an early-return on length mismatch
24//!   is acceptable.
25//! * The bridge's token check is **defense-in-depth**: the engine
26//!   re-verifies the HMAC server-side in [`EngineBackend::deliver_signal`]
27//!   (via the `waitpoint_token` field), so a bridge bug that
28//!   mis-routes a bad token does not bypass authentication. The bridge
29//!   check exists to fail fast at the network edge and produce a
30//!   structured error surface for operators.
31//! * The helper stamps `waitpoint_token` on the outgoing [`Signal`]
32//!   from the `presented` parameter — any `waitpoint_token` the caller
33//!   set on the [`Signal`] they passed is overwritten so the engine
34//!   re-verifies against the same bytes the bridge checked, not
35//!   something else.
36
37use std::sync::Arc;
38
39use ff_core::engine_backend::EngineBackend;
40use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
41use ff_core::types::{ExecutionId, WaitpointId, WaitpointToken};
42use subtle::ConstantTimeEq;
43
44use crate::task::{Signal, SignalOutcome};
45use crate::worker::FlowFabricWorker;
46use crate::SdkError;
47
48/// Error returned by a signal-bridge authentication + dispatch
49/// attempt. On success, [`verify_and_deliver`] returns the
50/// [`SignalOutcome`] from the underlying
51/// [`FlowFabricWorker::deliver_signal`] call; this enum only
52/// enumerates the rejection + fault cases.
53#[derive(Debug)]
54pub enum SignalBridgeError {
55    /// The waitpoint has no stored token — it was consumed, expired,
56    /// or never existed. Real bridges typically map this to HTTP 404.
57    UnknownWaitpoint,
58    /// The token presented by the external actor did not match the
59    /// stored token. Real bridges typically map this to HTTP 401.
60    TokenMismatch,
61    /// Underlying SDK / engine failure (read-waitpoint-token backend
62    /// error, network fault, deliver-signal rejection).
63    Backend(SdkError),
64}
65
66impl std::fmt::Display for SignalBridgeError {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        match self {
69            Self::UnknownWaitpoint => {
70                f.write_str("unknown waitpoint (consumed, expired, or unknown id)")
71            }
72            Self::TokenMismatch => f.write_str("token mismatch (presented HMAC does not verify)"),
73            Self::Backend(e) => write!(f, "backend: {e}"),
74        }
75    }
76}
77
78impl std::error::Error for SignalBridgeError {
79    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
80        match self {
81            Self::Backend(e) => Some(e),
82            _ => None,
83        }
84    }
85}
86
87impl From<SdkError> for SignalBridgeError {
88    fn from(e: SdkError) -> Self {
89        Self::Backend(e)
90    }
91}
92
93/// Authenticate an external-actor callback against the stored
94/// waitpoint token and, on match, deliver `signal` through the worker.
95///
96/// The helper:
97///
98/// 1. Reads the stored token via
99///    [`EngineBackend::read_waitpoint_token`]. No stored token (the
100///    backend returns `Ok(None)`) → [`SignalBridgeError::UnknownWaitpoint`].
101/// 2. Compares the stored token to `presented` in constant time. No
102///    match → [`SignalBridgeError::TokenMismatch`].
103/// 3. Stamps `signal.waitpoint_token = presented.clone()` (so the
104///    engine's server-side re-verification uses the same bytes the
105///    bridge already verified) and dispatches via
106///    [`FlowFabricWorker::deliver_signal`].
107///
108/// # Arguments
109///
110/// * `backend` — the `EngineBackend` trait object the bridge reads the
111///   stored token through. Typically the same `Arc<dyn EngineBackend>`
112///   the caller constructed their `FlowFabricWorker` with; accepts a
113///   reference so callers can pass it without cloning.
114/// * `worker` — the [`FlowFabricWorker`] that fronts the engine-side
115///   signal delivery. Must be configured against the same backend the
116///   stored token lives on.
117/// * `execution_id` — the suspended execution the waitpoint belongs
118///   to. The partition key is derived from the exec id's `{fp:N}:`
119///   prefix.
120/// * `waitpoint_id` — the waitpoint to authenticate.
121/// * `presented` — the HMAC token the external actor presented.
122/// * `signal` — caller-owned [`Signal`] payload (name, category,
123///   source, body). The helper overwrites `signal.waitpoint_token` so
124///   the caller's value for that field is ignored.
125///
126/// # Errors
127///
128/// See [`SignalBridgeError`].
129pub async fn verify_and_deliver(
130    backend: &dyn EngineBackend,
131    worker: &FlowFabricWorker,
132    execution_id: &ExecutionId,
133    waitpoint_id: &WaitpointId,
134    presented: &WaitpointToken,
135    mut signal: Signal,
136) -> Result<SignalOutcome, SignalBridgeError> {
137    let partition = PartitionKey::from(&Partition {
138        family: PartitionFamily::Flow,
139        index: execution_id.partition(),
140    });
141
142    let stored = backend
143        .read_waitpoint_token(partition, waitpoint_id)
144        .await
145        .map_err(|e| SignalBridgeError::Backend(SdkError::from(e)))?
146        .ok_or(SignalBridgeError::UnknownWaitpoint)?;
147
148    // Constant-time compare. `ConstantTimeEq::ct_eq` returns `Choice`;
149    // `bool::from(Choice)` is the idiomatic conversion.
150    let stored_bytes = stored.as_bytes();
151    let presented_bytes = presented.as_str().as_bytes();
152    if !bool::from(stored_bytes.ct_eq(presented_bytes)) {
153        return Err(SignalBridgeError::TokenMismatch);
154    }
155
156    signal.waitpoint_token = presented.clone();
157    worker
158        .deliver_signal(execution_id, waitpoint_id, signal)
159        .await
160        .map_err(SignalBridgeError::Backend)
161}
162
163/// Convenience wrapper for consumers that hold an `Arc<dyn EngineBackend>`
164/// rather than a bare trait reference — the more common shape in code
165/// that already follows the [`FlowFabricWorker::connect_with`] pattern.
166/// Dispatches to [`verify_and_deliver`]; no behavioural difference.
167pub async fn verify_and_deliver_arc(
168    backend: &Arc<dyn EngineBackend>,
169    worker: &FlowFabricWorker,
170    execution_id: &ExecutionId,
171    waitpoint_id: &WaitpointId,
172    presented: &WaitpointToken,
173    signal: Signal,
174) -> Result<SignalOutcome, SignalBridgeError> {
175    verify_and_deliver(
176        backend.as_ref(),
177        worker,
178        execution_id,
179        waitpoint_id,
180        presented,
181        signal,
182    )
183    .await
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    #[test]
191    fn display_each_variant() {
192        assert!(SignalBridgeError::UnknownWaitpoint
193            .to_string()
194            .contains("unknown waitpoint"));
195        assert!(SignalBridgeError::TokenMismatch
196            .to_string()
197            .contains("token mismatch"));
198        let wrapped = SignalBridgeError::Backend(SdkError::Config {
199            context: "test".into(),
200            field: None,
201            message: "boom".into(),
202        });
203        let msg = wrapped.to_string();
204        assert!(msg.starts_with("backend: "), "got: {msg}");
205    }
206
207    #[test]
208    fn error_source_wraps_backend() {
209        let inner = SdkError::Config {
210            context: "test".into(),
211            field: None,
212            message: "boom".into(),
213        };
214        let e = SignalBridgeError::Backend(inner);
215        assert!(
216            std::error::Error::source(&e).is_some(),
217            "Backend variant must expose source"
218        );
219        assert!(
220            std::error::Error::source(&SignalBridgeError::UnknownWaitpoint).is_none(),
221            "UnknownWaitpoint has no underlying source"
222        );
223    }
224
225    #[test]
226    fn from_sdk_error() {
227        let inner = SdkError::Config {
228            context: "test".into(),
229            field: None,
230            message: "boom".into(),
231        };
232        match SignalBridgeError::from(inner) {
233            SignalBridgeError::Backend(_) => {}
234            other => panic!("expected Backend, got {other:?}"),
235        }
236    }
237}