ff_backend_postgres/suspend.rs
1//! Suspend + resume plumbing for the Postgres backend.
2//!
3//! **Wave 4 Agent D (RFC-v0.7 v0.7 migration-master).**
4//!
5//! This module is the landing pad for the 4 suspend-family trait
6//! methods (`suspend`, `observe_signals`, `list_suspended`,
7//! `claim_resumed_execution`). The current tranche ships the
8//! composite-condition evaluator as a pure-Rust helper ([`evaluate`])
9//! and the parititon-slot helper ([`slot_of`]); the SQL bodies for
10//! the trait methods land together with the Wave-4 claim/lease
11//! plumbing being built in parallel (Wave 4 Agents A/B/C).
12//!
13//! # Composite evaluator placement
14//!
15//! Q11 pinned composite-condition evaluation at one of the three
16//! SERIALIZABLE sites; we run the evaluator Rust-side inside the
17//! SERIALIZABLE transaction (not as a plpgsql stored proc) because:
18//!
19//! 1. The declarative `ResumeCondition` + `CompositeBody` types
20//! already live in `ff-core::contracts`; replicating the logic in
21//! plpgsql would double-maintain a non-trivial tree walker.
22//! 2. SERIALIZABLE already serializes reads + writes on the
23//! `satisfied_set` / `member_map` JSONB columns the evaluator
24//! touches, so moving the eval into pg buys no isolation.
25//! 3. Rust-side eval keeps the evaluator unit-testable without a
26//! live Postgres (see [`evaluate`] tests below).
27//!
28//! The Valkey backend runs the equivalent logic in Lua; this is the
29//! only evaluator divergence and is deliberate per the rationale
30//! above.
31
32use std::collections::{HashMap, HashSet};
33
34use ff_core::backend::ResumeSignal;
35use ff_core::contracts::{CompositeBody, CountKind, ResumeCondition, SignalMatcher};
36use ff_core::partition::{PartitionConfig, PartitionKey};
37use ff_core::types::ExecutionId;
38
39/// Map from `waitpoint_key` → list of signals delivered to it. This is
40/// the shape [`ff_suspension_current.member_map`] deserialises into on
41/// the Postgres side — signals are keyed by their target waitpoint,
42/// so the evaluator can answer "did this waitpoint fire?" without a
43/// per-signal wp lookup.
44pub type SignalsByWaitpoint<'a> = HashMap<&'a str, &'a [ResumeSignal]>;
45
46/// Map an `ExecutionId` to its partition slot under the deployment's
47/// [`PartitionConfig`]. The `i16` return matches the column type on
48/// every partitioned table in `migrations/0001_initial.sql`.
49pub fn slot_of(exec_id: &ExecutionId, _cfg: &PartitionConfig) -> i16 {
50 // `ExecutionId::partition()` is infallible on any validly-minted
51 // id (it reads the `{fp:N}` hash-tag prefix). The partition
52 // number is a u16 but our schema uses smallint (i16), which
53 // accommodates the full 256-partition range with room to spare.
54 exec_id.partition() as i16
55}
56
57/// Convert a `PartitionKey` to its slot index for schema-partition
58/// keying. Returns `None` if the key is malformed.
59pub fn slot_of_partition_key(pk: &PartitionKey) -> Option<i16> {
60 pk.as_partition().ok().map(|p| p.index as i16)
61}
62
63/// Evaluate a [`ResumeCondition`] against the set of signals
64/// delivered so far. Returns `true` iff the condition is satisfied.
65///
66/// `signals` is the full ordered list of [`ResumeSignal`]s recorded
67/// against this suspension's `member_map`. The function walks the
68/// declarative tree; it does NOT mutate inputs. Caller is responsible
69/// for reading the satisfied-signals view from
70/// `ff_suspension_current.member_map` under a SERIALIZABLE txn.
71///
72/// # Semantics (RFC-013 §2.4 + RFC-014 §2.1)
73///
74/// * `Single { waitpoint_key, matcher }` — `true` iff at least one
75/// signal targeting `waitpoint_key` matches `matcher`.
76/// * `OperatorOnly` — `false` (only an explicit operator resume
77/// satisfies this condition; not reachable through this evaluator).
78/// * `TimeoutOnly` — `false` (timeout-only suspensions resolve via
79/// `timeout_behavior` at `timeout_at`, not via signals).
80/// * `Composite(AllOf { members })` — `true` iff every `member`
81/// evaluates `true`.
82/// * `Composite(Count { n, count_kind, matcher, waitpoints })` —
83/// `true` iff at least `n` distinct satisfiers (by `count_kind`)
84/// match across signals targeting `waitpoints` and (optionally)
85/// matching `matcher`.
86pub fn evaluate(condition: &ResumeCondition, by_wp: &SignalsByWaitpoint<'_>) -> bool {
87 match condition {
88 ResumeCondition::Single {
89 waitpoint_key,
90 matcher,
91 } => by_wp
92 .get(waitpoint_key.as_str())
93 .map(|sigs| sigs.iter().any(|s| matcher_matches(matcher, s)))
94 .unwrap_or(false),
95 ResumeCondition::OperatorOnly | ResumeCondition::TimeoutOnly => false,
96 ResumeCondition::Composite(body) => evaluate_composite(body, by_wp),
97 _ => false,
98 }
99}
100
101fn evaluate_composite(body: &CompositeBody, by_wp: &SignalsByWaitpoint<'_>) -> bool {
102 match body {
103 CompositeBody::AllOf { members } => {
104 !members.is_empty() && members.iter().all(|m| evaluate(m, by_wp))
105 }
106 CompositeBody::Count {
107 n,
108 count_kind,
109 matcher,
110 waitpoints,
111 } => {
112 // Gather all signals whose waitpoint is in `waitpoints` and
113 // (optionally) match `matcher`. Collect alongside their
114 // wp-key so `DistinctWaitpoints` can count distinct wp sets.
115 let mut candidates: Vec<(&str, &ResumeSignal)> = Vec::new();
116 for wpk in waitpoints {
117 let Some(sigs) = by_wp.get(wpk.as_str()) else {
118 continue;
119 };
120 for s in sigs.iter() {
121 if matcher
122 .as_ref()
123 .map(|m| matcher_matches(m, s))
124 .unwrap_or(true)
125 {
126 candidates.push((wpk.as_str(), s));
127 }
128 }
129 }
130
131 let distinct_count = match count_kind {
132 CountKind::DistinctWaitpoints => {
133 let mut set: HashSet<&str> = HashSet::new();
134 for (wpk, _) in &candidates {
135 set.insert(wpk);
136 }
137 set.len() as u32
138 }
139 CountKind::DistinctSignals => {
140 let mut set: HashSet<String> = HashSet::new();
141 for (_, s) in &candidates {
142 set.insert(s.signal_id.0.to_string());
143 }
144 set.len() as u32
145 }
146 CountKind::DistinctSources => {
147 let mut set: HashSet<(&str, &str)> = HashSet::new();
148 for (_, s) in &candidates {
149 set.insert((s.source_type.as_str(), s.source_identity.as_str()));
150 }
151 set.len() as u32
152 }
153 // Forward-compat: unknown `CountKind` variants count
154 // nothing (never satisfies) so a pinned evaluator
155 // cannot silently resume under a future wire shape.
156 _ => 0,
157 };
158 distinct_count >= *n
159 }
160 // Forward-compat: unknown `CompositeBody` variants never
161 // satisfy. Same rationale as the `CountKind` arm.
162 _ => false,
163 }
164}
165
166fn matcher_matches(matcher: &SignalMatcher, signal: &ResumeSignal) -> bool {
167 match matcher {
168 SignalMatcher::ByName(name) => signal.signal_name.as_str() == name.as_str(),
169 SignalMatcher::Wildcard => true,
170 _ => false,
171 }
172}
173
174#[cfg(test)]
175mod evaluator_tests {
176 use super::*;
177 use ff_core::contracts::{CompositeBody, CountKind, ResumeCondition, SignalMatcher};
178 use ff_core::types::{SignalId, TimestampMs};
179
180 fn sig(wp_key: &str, name: &str, source_type: &str, source_identity: &str) -> ResumeSignal {
181 // wp_key is not on ResumeSignal but used by the caller's map
182 let _ = wp_key;
183 ResumeSignal {
184 signal_id: SignalId::new(),
185 signal_name: name.to_owned(),
186 signal_category: "external".to_owned(),
187 source_type: source_type.to_owned(),
188 source_identity: source_identity.to_owned(),
189 correlation_id: String::new(),
190 accepted_at: TimestampMs(0),
191 payload: None,
192 }
193 }
194
195 #[test]
196 fn single_byname_matches() {
197 let s = sig("wpk:a", "ready", "worker", "w1");
198 let v = vec![s];
199 let by_wp: SignalsByWaitpoint<'_> = [("wpk:a", v.as_slice())].into_iter().collect();
200 let cond = ResumeCondition::Single {
201 waitpoint_key: "wpk:a".into(),
202 matcher: SignalMatcher::ByName("ready".into()),
203 };
204 assert!(evaluate(&cond, &by_wp));
205 }
206
207 #[test]
208 fn single_byname_rejects_wrong_name() {
209 let s = sig("wpk:a", "other", "worker", "w1");
210 let v = vec![s];
211 let by_wp: SignalsByWaitpoint<'_> = [("wpk:a", v.as_slice())].into_iter().collect();
212 let cond = ResumeCondition::Single {
213 waitpoint_key: "wpk:a".into(),
214 matcher: SignalMatcher::ByName("ready".into()),
215 };
216 assert!(!evaluate(&cond, &by_wp));
217 }
218
219 #[test]
220 fn count_distinct_sources_requires_two_distinct() {
221 let dup_source = vec![
222 sig("wpk:a", "x", "worker", "w1"),
223 sig("wpk:a", "x", "worker", "w1"),
224 ];
225 let by_wp: SignalsByWaitpoint<'_> =
226 [("wpk:a", dup_source.as_slice())].into_iter().collect();
227 let cond = ResumeCondition::Composite(CompositeBody::Count {
228 n: 2,
229 count_kind: CountKind::DistinctSources,
230 matcher: None,
231 waitpoints: vec!["wpk:a".into()],
232 });
233 // Same source fires twice — must NOT satisfy.
234 assert!(!evaluate(&cond, &by_wp));
235
236 let distinct_sources = vec![
237 sig("wpk:a", "x", "worker", "w1"),
238 sig("wpk:a", "x", "worker", "w2"),
239 ];
240 let by_wp_ok: SignalsByWaitpoint<'_> =
241 [("wpk:a", distinct_sources.as_slice())].into_iter().collect();
242 assert!(evaluate(&cond, &by_wp_ok));
243 }
244
245 #[test]
246 fn allof_two_waitpoints() {
247 let a = sig("wpk:a", "x", "src", "id");
248 let b = sig("wpk:b", "x", "src", "id");
249 let va = vec![a];
250 let vb = vec![b];
251 let by_wp: SignalsByWaitpoint<'_> =
252 [("wpk:a", va.as_slice()), ("wpk:b", vb.as_slice())]
253 .into_iter()
254 .collect();
255 let cond = ResumeCondition::all_of_waitpoints(["wpk:a", "wpk:b"]);
256 assert!(evaluate(&cond, &by_wp));
257
258 // Missing wpk:b ⇒ not satisfied
259 let only_a: SignalsByWaitpoint<'_> =
260 [("wpk:a", va.as_slice())].into_iter().collect();
261 assert!(!evaluate(&cond, &only_a));
262 }
263}