Skip to main content

ff_core/
stream_subscribe.rs

1//! RFC-019 Stage A/B/C — cross-backend stream-cursor subscription
2//! surface (shared primitives).
3//!
4//! This module defines the [`StreamCursor`] every `subscribe_*`
5//! method accepts, plus the codec helpers backends use to
6//! encode/decode their native stream positions (Valkey `(ms, seq)`,
7//! Postgres `event_id`). Family-specific typed event enums + per-
8//! family subscription aliases live in [`crate::stream_events`].
9//!
10//! Four-family allow-list (RFC-019 §Open Questions #5, owner-
11//! adjudicated 2026-04-24): new families require an RFC amendment.
12
13use bytes::Bytes;
14
15/// Opaque, backend-versioned cursor. Consumers persist the bytes, hand
16/// them back on resume. The first byte MUST encode a backend-family +
17/// version prefix so cursors stay stable across backend upgrades
18/// (Valkey: `0x01`, Postgres: `0x02`, …). The remainder is backend-
19/// specific.
20///
21/// [`StreamCursor::empty`] is the "start from tail / now" sentinel
22/// every backend accepts. Cursor byte order is owner-adjudicated
23/// opaque — consumers MUST NOT parse the bytes.
24#[derive(Clone, Debug, Eq, PartialEq)]
25pub struct StreamCursor(pub Bytes);
26
27impl StreamCursor {
28    /// Wrap arbitrary cursor bytes — typically only called by backend
29    /// impls that just encoded a fresh cursor from a backend-native
30    /// position (Valkey stream id, Postgres event_id).
31    pub fn new(raw: impl Into<Bytes>) -> Self {
32        Self(raw.into())
33    }
34
35    /// Borrow the raw cursor bytes for persistence. Consumers treat
36    /// the slice as opaque.
37    pub fn as_bytes(&self) -> &[u8] {
38        &self.0
39    }
40
41    /// Sentinel "start from tail" cursor. Backends interpret an empty
42    /// cursor as "subscribe from now" (Valkey: XREAD `$`; Postgres:
43    /// LISTEN from the current `max(event_id)`).
44    pub fn empty() -> Self {
45        Self(Bytes::new())
46    }
47}
48
49// ─── Valkey cursor codec ───
50//
51// Layout: `0x01` ++ ms_be(8) ++ seq_be(8). 17 bytes total. Empty
52// `StreamCursor` (zero-length) means "tail from `$`" (current end).
53//
54// These helpers live in ff-core rather than ff-backend-valkey because
55// the wire shape is part of the RFC-019 contract — a Postgres cursor
56// can never decode as a Valkey cursor (different first byte) and
57// consumers migrating between backends discard + restart cleanly.
58
59/// Valkey family prefix byte (RFC-019 §Backend Semantics Appendix).
60pub const VALKEY_CURSOR_PREFIX: u8 = 0x01;
61
62/// Postgres family prefix byte.
63pub const POSTGRES_CURSOR_PREFIX: u8 = 0x02;
64
65/// Encode a Valkey stream id `(ms, seq)` into a cursor.
66pub fn encode_valkey_cursor(ms: u64, seq: u64) -> StreamCursor {
67    let mut buf = Vec::with_capacity(17);
68    buf.push(VALKEY_CURSOR_PREFIX);
69    buf.extend_from_slice(&ms.to_be_bytes());
70    buf.extend_from_slice(&seq.to_be_bytes());
71    StreamCursor::new(buf)
72}
73
74/// Decode a Valkey cursor back to `(ms, seq)`. Returns `None` for the
75/// empty / tail-from-now cursor. Returns `Err` for malformed bytes
76/// (wrong prefix, truncated).
77pub fn decode_valkey_cursor(cursor: &StreamCursor) -> Result<Option<(u64, u64)>, &'static str> {
78    let bytes = cursor.as_bytes();
79    if bytes.is_empty() {
80        return Ok(None);
81    }
82    if bytes.len() != 17 || bytes[0] != VALKEY_CURSOR_PREFIX {
83        return Err("stream_subscribe: cursor does not belong to the Valkey backend");
84    }
85    let mut ms = [0u8; 8];
86    let mut seq = [0u8; 8];
87    ms.copy_from_slice(&bytes[1..9]);
88    seq.copy_from_slice(&bytes[9..17]);
89    Ok(Some((u64::from_be_bytes(ms), u64::from_be_bytes(seq))))
90}
91
92/// Encode a Postgres `event_id` (i64, from `ff_completion_event`) into
93/// a cursor.
94pub fn encode_postgres_event_cursor(event_id: i64) -> StreamCursor {
95    let mut buf = Vec::with_capacity(9);
96    buf.push(POSTGRES_CURSOR_PREFIX);
97    buf.extend_from_slice(&event_id.to_be_bytes());
98    StreamCursor::new(buf)
99}
100
101/// Decode a Postgres event cursor back to `event_id`. `None` =
102/// start-from-tail.
103pub fn decode_postgres_event_cursor(cursor: &StreamCursor) -> Result<Option<i64>, &'static str> {
104    let bytes = cursor.as_bytes();
105    if bytes.is_empty() {
106        return Ok(None);
107    }
108    if bytes.len() != 9 || bytes[0] != POSTGRES_CURSOR_PREFIX {
109        return Err("stream_subscribe: cursor does not belong to the Postgres backend");
110    }
111    let mut event_id = [0u8; 8];
112    event_id.copy_from_slice(&bytes[1..9]);
113    Ok(Some(i64::from_be_bytes(event_id)))
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119
120    #[test]
121    fn valkey_cursor_roundtrip() {
122        let c = encode_valkey_cursor(1_700_000_000_000, 42);
123        assert_eq!(c.as_bytes()[0], VALKEY_CURSOR_PREFIX);
124        let (ms, seq) = decode_valkey_cursor(&c).unwrap().unwrap();
125        assert_eq!(ms, 1_700_000_000_000);
126        assert_eq!(seq, 42);
127    }
128
129    #[test]
130    fn valkey_empty_is_tail() {
131        let c = StreamCursor::empty();
132        assert!(decode_valkey_cursor(&c).unwrap().is_none());
133    }
134
135    #[test]
136    fn valkey_rejects_postgres_cursor() {
137        let c = encode_postgres_event_cursor(7);
138        assert!(decode_valkey_cursor(&c).is_err());
139    }
140
141    #[test]
142    fn postgres_cursor_roundtrip() {
143        let c = encode_postgres_event_cursor(12345);
144        assert_eq!(c.as_bytes()[0], POSTGRES_CURSOR_PREFIX);
145        assert_eq!(decode_postgres_event_cursor(&c).unwrap(), Some(12345));
146    }
147
148    #[test]
149    fn postgres_rejects_valkey_cursor() {
150        let c = encode_valkey_cursor(1, 1);
151        assert!(decode_postgres_event_cursor(&c).is_err());
152    }
153}