Skip to main content

ff_backend_postgres/
suspend.rs

1//! Suspend + resume plumbing for the Postgres backend.
2//!
3//! **Wave 4 Agent D (RFC-v0.7 v0.7 migration-master).**
4//!
5//! This module is the landing pad for the 4 suspend-family trait
6//! methods (`suspend`, `observe_signals`, `list_suspended`,
7//! `claim_resumed_execution`). The current tranche ships the
8//! composite-condition evaluator as a pure-Rust helper ([`evaluate`])
9//! and the parititon-slot helper ([`slot_of`]); the SQL bodies for
10//! the trait methods land together with the Wave-4 claim/lease
11//! plumbing being built in parallel (Wave 4 Agents A/B/C).
12//!
13//! # Composite evaluator placement
14//!
15//! Q11 pinned composite-condition evaluation at one of the three
16//! SERIALIZABLE sites; we run the evaluator Rust-side inside the
17//! SERIALIZABLE transaction (not as a plpgsql stored proc) because:
18//!
19//! 1. The declarative `ResumeCondition` + `CompositeBody` types
20//!    already live in `ff-core::contracts`; replicating the logic in
21//!    plpgsql would double-maintain a non-trivial tree walker.
22//! 2. SERIALIZABLE already serializes reads + writes on the
23//!    `satisfied_set` / `member_map` JSONB columns the evaluator
24//!    touches, so moving the eval into pg buys no isolation.
25//! 3. Rust-side eval keeps the evaluator unit-testable without a
26//!    live Postgres (see [`evaluate`] tests below).
27//!
28//! The Valkey backend runs the equivalent logic in Lua; this is the
29//! only evaluator divergence and is deliberate per the rationale
30//! above.
31
32use std::collections::{HashMap, HashSet};
33
34use ff_core::backend::ResumeSignal;
35use ff_core::contracts::{CompositeBody, CountKind, ResumeCondition, SignalMatcher};
36use ff_core::partition::{PartitionConfig, PartitionKey};
37use ff_core::types::ExecutionId;
38
39/// Map from `waitpoint_key` → list of signals delivered to it. This is
40/// the shape [`ff_suspension_current.member_map`] deserialises into on
41/// the Postgres side — signals are keyed by their target waitpoint,
42/// so the evaluator can answer "did this waitpoint fire?" without a
43/// per-signal wp lookup.
44pub type SignalsByWaitpoint<'a> = HashMap<&'a str, &'a [ResumeSignal]>;
45
46/// Map an `ExecutionId` to its partition slot under the deployment's
47/// [`PartitionConfig`]. The `i16` return matches the column type on
48/// every partitioned table in `migrations/0001_initial.sql`.
49pub fn slot_of(exec_id: &ExecutionId, _cfg: &PartitionConfig) -> i16 {
50    // `ExecutionId::partition()` is infallible on any validly-minted
51    // id (it reads the `{fp:N}` hash-tag prefix). The partition
52    // number is a u16 but our schema uses smallint (i16), which
53    // accommodates the full 256-partition range with room to spare.
54    exec_id.partition() as i16
55}
56
57/// Convert a `PartitionKey` to its slot index for schema-partition
58/// keying. Returns `None` if the key is malformed.
59pub fn slot_of_partition_key(pk: &PartitionKey) -> Option<i16> {
60    pk.as_partition().ok().map(|p| p.index as i16)
61}
62
63/// Evaluate a [`ResumeCondition`] against the set of signals
64/// delivered so far. Returns `true` iff the condition is satisfied.
65///
66/// `signals` is the full ordered list of [`ResumeSignal`]s recorded
67/// against this suspension's `member_map`. The function walks the
68/// declarative tree; it does NOT mutate inputs. Caller is responsible
69/// for reading the satisfied-signals view from
70/// `ff_suspension_current.member_map` under a SERIALIZABLE txn.
71///
72/// # Semantics (RFC-013 §2.4 + RFC-014 §2.1)
73///
74/// * `Single { waitpoint_key, matcher }` — `true` iff at least one
75///   signal targeting `waitpoint_key` matches `matcher`.
76/// * `OperatorOnly` — `false` (only an explicit operator resume
77///   satisfies this condition; not reachable through this evaluator).
78/// * `TimeoutOnly` — `false` (timeout-only suspensions resolve via
79///   `timeout_behavior` at `timeout_at`, not via signals).
80/// * `Composite(AllOf { members })` — `true` iff every `member`
81///   evaluates `true`.
82/// * `Composite(Count { n, count_kind, matcher, waitpoints })` —
83///   `true` iff at least `n` distinct satisfiers (by `count_kind`)
84///   match across signals targeting `waitpoints` and (optionally)
85///   matching `matcher`.
86pub fn evaluate(condition: &ResumeCondition, by_wp: &SignalsByWaitpoint<'_>) -> bool {
87    match condition {
88        ResumeCondition::Single {
89            waitpoint_key,
90            matcher,
91        } => by_wp
92            .get(waitpoint_key.as_str())
93            .map(|sigs| sigs.iter().any(|s| matcher_matches(matcher, s)))
94            .unwrap_or(false),
95        ResumeCondition::OperatorOnly | ResumeCondition::TimeoutOnly => false,
96        ResumeCondition::Composite(body) => evaluate_composite(body, by_wp),
97        _ => false,
98    }
99}
100
101fn evaluate_composite(body: &CompositeBody, by_wp: &SignalsByWaitpoint<'_>) -> bool {
102    match body {
103        CompositeBody::AllOf { members } => {
104            !members.is_empty() && members.iter().all(|m| evaluate(m, by_wp))
105        }
106        CompositeBody::Count {
107            n,
108            count_kind,
109            matcher,
110            waitpoints,
111        } => {
112            // Gather all signals whose waitpoint is in `waitpoints` and
113            // (optionally) match `matcher`. Collect alongside their
114            // wp-key so `DistinctWaitpoints` can count distinct wp sets.
115            let mut candidates: Vec<(&str, &ResumeSignal)> = Vec::new();
116            for wpk in waitpoints {
117                let Some(sigs) = by_wp.get(wpk.as_str()) else {
118                    continue;
119                };
120                for s in sigs.iter() {
121                    if matcher
122                        .as_ref()
123                        .map(|m| matcher_matches(m, s))
124                        .unwrap_or(true)
125                    {
126                        candidates.push((wpk.as_str(), s));
127                    }
128                }
129            }
130
131            let distinct_count = match count_kind {
132                CountKind::DistinctWaitpoints => {
133                    let mut set: HashSet<&str> = HashSet::new();
134                    for (wpk, _) in &candidates {
135                        set.insert(wpk);
136                    }
137                    set.len() as u32
138                }
139                CountKind::DistinctSignals => {
140                    let mut set: HashSet<String> = HashSet::new();
141                    for (_, s) in &candidates {
142                        set.insert(s.signal_id.0.to_string());
143                    }
144                    set.len() as u32
145                }
146                CountKind::DistinctSources => {
147                    let mut set: HashSet<(&str, &str)> = HashSet::new();
148                    for (_, s) in &candidates {
149                        set.insert((s.source_type.as_str(), s.source_identity.as_str()));
150                    }
151                    set.len() as u32
152                }
153                // Forward-compat: unknown `CountKind` variants count
154                // nothing (never satisfies) so a pinned evaluator
155                // cannot silently resume under a future wire shape.
156                _ => 0,
157            };
158            distinct_count >= *n
159        }
160        // Forward-compat: unknown `CompositeBody` variants never
161        // satisfy. Same rationale as the `CountKind` arm.
162        _ => false,
163    }
164}
165
166fn matcher_matches(matcher: &SignalMatcher, signal: &ResumeSignal) -> bool {
167    match matcher {
168        SignalMatcher::ByName(name) => signal.signal_name.as_str() == name.as_str(),
169        SignalMatcher::Wildcard => true,
170        _ => false,
171    }
172}
173
174#[cfg(test)]
175mod evaluator_tests {
176    use super::*;
177    use ff_core::contracts::{CompositeBody, CountKind, ResumeCondition, SignalMatcher};
178    use ff_core::types::{SignalId, TimestampMs};
179
180    fn sig(wp_key: &str, name: &str, source_type: &str, source_identity: &str) -> ResumeSignal {
181        // wp_key is not on ResumeSignal but used by the caller's map
182        let _ = wp_key;
183        ResumeSignal {
184            signal_id: SignalId::new(),
185            signal_name: name.to_owned(),
186            signal_category: "external".to_owned(),
187            source_type: source_type.to_owned(),
188            source_identity: source_identity.to_owned(),
189            correlation_id: String::new(),
190            accepted_at: TimestampMs(0),
191            payload: None,
192        }
193    }
194
195    #[test]
196    fn single_byname_matches() {
197        let s = sig("wpk:a", "ready", "worker", "w1");
198        let v = vec![s];
199        let by_wp: SignalsByWaitpoint<'_> = [("wpk:a", v.as_slice())].into_iter().collect();
200        let cond = ResumeCondition::Single {
201            waitpoint_key: "wpk:a".into(),
202            matcher: SignalMatcher::ByName("ready".into()),
203        };
204        assert!(evaluate(&cond, &by_wp));
205    }
206
207    #[test]
208    fn single_byname_rejects_wrong_name() {
209        let s = sig("wpk:a", "other", "worker", "w1");
210        let v = vec![s];
211        let by_wp: SignalsByWaitpoint<'_> = [("wpk:a", v.as_slice())].into_iter().collect();
212        let cond = ResumeCondition::Single {
213            waitpoint_key: "wpk:a".into(),
214            matcher: SignalMatcher::ByName("ready".into()),
215        };
216        assert!(!evaluate(&cond, &by_wp));
217    }
218
219    #[test]
220    fn count_distinct_sources_requires_two_distinct() {
221        let dup_source = vec![
222            sig("wpk:a", "x", "worker", "w1"),
223            sig("wpk:a", "x", "worker", "w1"),
224        ];
225        let by_wp: SignalsByWaitpoint<'_> =
226            [("wpk:a", dup_source.as_slice())].into_iter().collect();
227        let cond = ResumeCondition::Composite(CompositeBody::Count {
228            n: 2,
229            count_kind: CountKind::DistinctSources,
230            matcher: None,
231            waitpoints: vec!["wpk:a".into()],
232        });
233        // Same source fires twice — must NOT satisfy.
234        assert!(!evaluate(&cond, &by_wp));
235
236        let distinct_sources = vec![
237            sig("wpk:a", "x", "worker", "w1"),
238            sig("wpk:a", "x", "worker", "w2"),
239        ];
240        let by_wp_ok: SignalsByWaitpoint<'_> =
241            [("wpk:a", distinct_sources.as_slice())].into_iter().collect();
242        assert!(evaluate(&cond, &by_wp_ok));
243    }
244
245    #[test]
246    fn allof_two_waitpoints() {
247        let a = sig("wpk:a", "x", "src", "id");
248        let b = sig("wpk:b", "x", "src", "id");
249        let va = vec![a];
250        let vb = vec![b];
251        let by_wp: SignalsByWaitpoint<'_> =
252            [("wpk:a", va.as_slice()), ("wpk:b", vb.as_slice())]
253                .into_iter()
254                .collect();
255        let cond = ResumeCondition::all_of_waitpoints(["wpk:a", "wpk:b"]);
256        assert!(evaluate(&cond, &by_wp));
257
258        // Missing wpk:b ⇒ not satisfied
259        let only_a: SignalsByWaitpoint<'_> =
260            [("wpk:a", va.as_slice())].into_iter().collect();
261        assert!(!evaluate(&cond, &only_a));
262    }
263}