Skip to main content

ff_core/
stream_subscribe.rs

1//! RFC-019 Stage A — cross-backend stream-cursor subscription surface.
2//!
3//! `EngineBackend::subscribe_{lease_history,completion,signal_delivery,
4//! instance_tags}` return a [`StreamSubscription`] — a pinned
5//! `tokio_stream::Stream` of `Result<StreamEvent, EngineError>`. The
6//! cursor is an opaque byte blob the consumer persists across crashes
7//! and hands back on resume; the first byte identifies the backend
8//! family + version so cursors stay stable across backend upgrades.
9//!
10//! This module defines the wire types only — no trait default is here;
11//! defaults live on the `EngineBackend` trait in
12//! [`crate::engine_backend`]. Real impls live in `ff-backend-valkey`
13//! (Valkey `XREAD BLOCK`-backed lease_history) and `ff-backend-postgres`
14//! (`LISTEN/NOTIFY`-backed completion) per RFC-019 §Implementation Plan.
15//!
16//! Four-family allow-list (RFC-019 §Open Questions #5, owner-
17//! adjudicated 2026-04-24): new families require an RFC amendment.
18
19use std::pin::Pin;
20
21use bytes::Bytes;
22use tokio_stream::Stream;
23
24use crate::engine_error::EngineError;
25use crate::types::{ExecutionId, TimestampMs};
26
27/// Opaque, backend-versioned cursor. Consumers persist the bytes, hand
28/// them back on resume. The first byte MUST encode a backend-family +
29/// version prefix so cursors stay stable across backend upgrades
30/// (Valkey: `0x01`, Postgres: `0x02`, …). The remainder is backend-
31/// specific.
32///
33/// [`StreamCursor::empty`] is the "start from tail / now" sentinel
34/// every backend accepts. Cursor byte order is owner-adjudicated
35/// opaque — consumers MUST NOT parse the bytes.
36#[derive(Clone, Debug, Eq, PartialEq)]
37pub struct StreamCursor(pub Bytes);
38
39impl StreamCursor {
40    /// Wrap arbitrary cursor bytes — typically only called by backend
41    /// impls that just encoded a fresh cursor from a backend-native
42    /// position (Valkey stream id, Postgres event_id).
43    pub fn new(raw: impl Into<Bytes>) -> Self {
44        Self(raw.into())
45    }
46
47    /// Borrow the raw cursor bytes for persistence. Consumers treat
48    /// the slice as opaque.
49    pub fn as_bytes(&self) -> &[u8] {
50        &self.0
51    }
52
53    /// Sentinel "start from tail" cursor. Backends interpret an empty
54    /// cursor as "subscribe from now" (Valkey: XREAD `$`; Postgres:
55    /// LISTEN from the current `max(event_id)`).
56    pub fn empty() -> Self {
57        Self(Bytes::new())
58    }
59}
60
61/// Event families covered by the v0.9 allow-list (RFC-019 §Open
62/// Questions #5). `#[non_exhaustive]` so v0.10+ families land without
63/// breaking consumer match blocks, but the owner-adjudicated stance is
64/// that new families require an RFC amendment — this is not a generic
65/// escape hatch.
66#[non_exhaustive]
67#[derive(Clone, Copy, Debug, Eq, PartialEq)]
68pub enum StreamFamily {
69    LeaseHistory,
70    Completion,
71    SignalDelivery,
72    InstanceTags,
73}
74
75/// Per-event payload.
76///
77/// - `family` identifies the event shape.
78/// - `cursor` is the position this event occupies in the stream;
79///   consumers persist it + hand it back on reconnect so replay begins
80///   strictly after this event.
81/// - `execution_id`, `attempt_index`, `timestamp` are inline hot fields
82///   (RFC-019 §Open Questions #4, owner-adjudicated inline) so common
83///   consumers do not need a follow-up `describe_execution` RPC.
84/// - `payload` is the family-specific binary event body. Schema is
85///   the backend's (not stabilised in Stage A — consumers parse via
86///   family-specific helpers Stage B will ship).
87///
88/// `#[non_exhaustive]` so future inline metadata (e.g. `namespace`)
89/// adds without a breaking change.
90#[non_exhaustive]
91#[derive(Clone, Debug)]
92pub struct StreamEvent {
93    pub family: StreamFamily,
94    pub cursor: StreamCursor,
95    pub execution_id: Option<ExecutionId>,
96    pub attempt_index: Option<u32>,
97    pub timestamp: TimestampMs,
98    pub payload: Bytes,
99}
100
101impl StreamEvent {
102    /// Construct a minimal `StreamEvent`. The struct is
103    /// `#[non_exhaustive]` so external crates cannot build via a
104    /// literal — backends go through this constructor + builder.
105    /// Optional inline metadata is added via `with_execution_id` /
106    /// `with_attempt_index` so call-site shape stays stable across
107    /// future field additions.
108    pub fn new(
109        family: StreamFamily,
110        cursor: StreamCursor,
111        timestamp: TimestampMs,
112        payload: Bytes,
113    ) -> Self {
114        Self {
115            family,
116            cursor,
117            execution_id: None,
118            attempt_index: None,
119            timestamp,
120            payload,
121        }
122    }
123
124    #[must_use]
125    pub fn with_execution_id(mut self, id: ExecutionId) -> Self {
126        self.execution_id = Some(id);
127        self
128    }
129
130    #[must_use]
131    pub fn with_attempt_index(mut self, idx: u32) -> Self {
132        self.attempt_index = Some(idx);
133        self
134    }
135}
136
137/// Shape of a subscription stream.
138///
139/// Errors surface as `Err` items (not panics / stream-end); the
140/// terminal `Err(EngineError::StreamDisconnected { cursor })` is the
141/// owner-adjudicated disconnect contract (RFC-019 §Open Questions #2):
142/// the consumer reconnects by re-calling the relevant `subscribe_*`
143/// method with the cursor they observed. Non-terminal errors may be
144/// followed by further events.
145pub type StreamSubscription =
146    Pin<Box<dyn Stream<Item = Result<StreamEvent, EngineError>> + Send>>;
147
148// ─── Valkey cursor codec ───
149//
150// Layout: `0x01` ++ ms_be(8) ++ seq_be(8). 17 bytes total. Empty
151// `StreamCursor` (zero-length) means "tail from `$`" (current end).
152//
153// These helpers live in ff-core rather than ff-backend-valkey because
154// the wire shape is part of the RFC-019 contract — a Postgres cursor
155// can never decode as a Valkey cursor (different first byte) and
156// consumers migrating between backends discard + restart cleanly.
157
158/// Valkey family prefix byte (RFC-019 §Backend Semantics Appendix).
159pub const VALKEY_CURSOR_PREFIX: u8 = 0x01;
160
161/// Postgres family prefix byte.
162pub const POSTGRES_CURSOR_PREFIX: u8 = 0x02;
163
164/// Encode a Valkey stream id `(ms, seq)` into a cursor.
165pub fn encode_valkey_cursor(ms: u64, seq: u64) -> StreamCursor {
166    let mut buf = Vec::with_capacity(17);
167    buf.push(VALKEY_CURSOR_PREFIX);
168    buf.extend_from_slice(&ms.to_be_bytes());
169    buf.extend_from_slice(&seq.to_be_bytes());
170    StreamCursor::new(buf)
171}
172
173/// Decode a Valkey cursor back to `(ms, seq)`. Returns `None` for the
174/// empty / tail-from-now cursor. Returns `Err` for malformed bytes
175/// (wrong prefix, truncated).
176pub fn decode_valkey_cursor(cursor: &StreamCursor) -> Result<Option<(u64, u64)>, &'static str> {
177    let bytes = cursor.as_bytes();
178    if bytes.is_empty() {
179        return Ok(None);
180    }
181    if bytes.len() != 17 || bytes[0] != VALKEY_CURSOR_PREFIX {
182        return Err("stream_subscribe: cursor does not belong to the Valkey backend");
183    }
184    let mut ms = [0u8; 8];
185    let mut seq = [0u8; 8];
186    ms.copy_from_slice(&bytes[1..9]);
187    seq.copy_from_slice(&bytes[9..17]);
188    Ok(Some((u64::from_be_bytes(ms), u64::from_be_bytes(seq))))
189}
190
191/// Encode a Postgres `event_id` (i64, from `ff_completion_event`) into
192/// a cursor.
193pub fn encode_postgres_event_cursor(event_id: i64) -> StreamCursor {
194    let mut buf = Vec::with_capacity(9);
195    buf.push(POSTGRES_CURSOR_PREFIX);
196    buf.extend_from_slice(&event_id.to_be_bytes());
197    StreamCursor::new(buf)
198}
199
200/// Decode a Postgres event cursor back to `event_id`. `None` =
201/// start-from-tail.
202pub fn decode_postgres_event_cursor(cursor: &StreamCursor) -> Result<Option<i64>, &'static str> {
203    let bytes = cursor.as_bytes();
204    if bytes.is_empty() {
205        return Ok(None);
206    }
207    if bytes.len() != 9 || bytes[0] != POSTGRES_CURSOR_PREFIX {
208        return Err("stream_subscribe: cursor does not belong to the Postgres backend");
209    }
210    let mut event_id = [0u8; 8];
211    event_id.copy_from_slice(&bytes[1..9]);
212    Ok(Some(i64::from_be_bytes(event_id)))
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218
219    #[test]
220    fn valkey_cursor_roundtrip() {
221        let c = encode_valkey_cursor(1_700_000_000_000, 42);
222        assert_eq!(c.as_bytes()[0], VALKEY_CURSOR_PREFIX);
223        let (ms, seq) = decode_valkey_cursor(&c).unwrap().unwrap();
224        assert_eq!(ms, 1_700_000_000_000);
225        assert_eq!(seq, 42);
226    }
227
228    #[test]
229    fn valkey_empty_is_tail() {
230        let c = StreamCursor::empty();
231        assert!(decode_valkey_cursor(&c).unwrap().is_none());
232    }
233
234    #[test]
235    fn valkey_rejects_postgres_cursor() {
236        let c = encode_postgres_event_cursor(7);
237        assert!(decode_valkey_cursor(&c).is_err());
238    }
239
240    #[test]
241    fn postgres_cursor_roundtrip() {
242        let c = encode_postgres_event_cursor(12345);
243        assert_eq!(c.as_bytes()[0], POSTGRES_CURSOR_PREFIX);
244        assert_eq!(decode_postgres_event_cursor(&c).unwrap(), Some(12345));
245    }
246
247    #[test]
248    fn postgres_rejects_valkey_cursor() {
249        let c = encode_valkey_cursor(1, 1);
250        assert!(decode_postgres_event_cursor(&c).is_err());
251    }
252}