ff_core/
stream_subscribe.rs1use bytes::Bytes;
14
15#[derive(Clone, Debug, Eq, PartialEq)]
25pub struct StreamCursor(pub Bytes);
26
27impl StreamCursor {
28 pub fn new(raw: impl Into<Bytes>) -> Self {
32 Self(raw.into())
33 }
34
35 pub fn as_bytes(&self) -> &[u8] {
38 &self.0
39 }
40
41 pub fn empty() -> Self {
45 Self(Bytes::new())
46 }
47}
48
49pub const VALKEY_CURSOR_PREFIX: u8 = 0x01;
61
62pub const POSTGRES_CURSOR_PREFIX: u8 = 0x02;
64
65pub fn encode_valkey_cursor(ms: u64, seq: u64) -> StreamCursor {
67 let mut buf = Vec::with_capacity(17);
68 buf.push(VALKEY_CURSOR_PREFIX);
69 buf.extend_from_slice(&ms.to_be_bytes());
70 buf.extend_from_slice(&seq.to_be_bytes());
71 StreamCursor::new(buf)
72}
73
74pub fn decode_valkey_cursor(cursor: &StreamCursor) -> Result<Option<(u64, u64)>, &'static str> {
78 let bytes = cursor.as_bytes();
79 if bytes.is_empty() {
80 return Ok(None);
81 }
82 if bytes.len() != 17 || bytes[0] != VALKEY_CURSOR_PREFIX {
83 return Err("stream_subscribe: cursor does not belong to the Valkey backend");
84 }
85 let mut ms = [0u8; 8];
86 let mut seq = [0u8; 8];
87 ms.copy_from_slice(&bytes[1..9]);
88 seq.copy_from_slice(&bytes[9..17]);
89 Ok(Some((u64::from_be_bytes(ms), u64::from_be_bytes(seq))))
90}
91
92pub fn encode_postgres_event_cursor(event_id: i64) -> StreamCursor {
95 let mut buf = Vec::with_capacity(9);
96 buf.push(POSTGRES_CURSOR_PREFIX);
97 buf.extend_from_slice(&event_id.to_be_bytes());
98 StreamCursor::new(buf)
99}
100
101pub fn decode_postgres_event_cursor(cursor: &StreamCursor) -> Result<Option<i64>, &'static str> {
104 let bytes = cursor.as_bytes();
105 if bytes.is_empty() {
106 return Ok(None);
107 }
108 if bytes.len() != 9 || bytes[0] != POSTGRES_CURSOR_PREFIX {
109 return Err("stream_subscribe: cursor does not belong to the Postgres backend");
110 }
111 let mut event_id = [0u8; 8];
112 event_id.copy_from_slice(&bytes[1..9]);
113 Ok(Some(i64::from_be_bytes(event_id)))
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119
120 #[test]
121 fn valkey_cursor_roundtrip() {
122 let c = encode_valkey_cursor(1_700_000_000_000, 42);
123 assert_eq!(c.as_bytes()[0], VALKEY_CURSOR_PREFIX);
124 let (ms, seq) = decode_valkey_cursor(&c).unwrap().unwrap();
125 assert_eq!(ms, 1_700_000_000_000);
126 assert_eq!(seq, 42);
127 }
128
129 #[test]
130 fn valkey_empty_is_tail() {
131 let c = StreamCursor::empty();
132 assert!(decode_valkey_cursor(&c).unwrap().is_none());
133 }
134
135 #[test]
136 fn valkey_rejects_postgres_cursor() {
137 let c = encode_postgres_event_cursor(7);
138 assert!(decode_valkey_cursor(&c).is_err());
139 }
140
141 #[test]
142 fn postgres_cursor_roundtrip() {
143 let c = encode_postgres_event_cursor(12345);
144 assert_eq!(c.as_bytes()[0], POSTGRES_CURSOR_PREFIX);
145 assert_eq!(decode_postgres_event_cursor(&c).unwrap(), Some(12345));
146 }
147
148 #[test]
149 fn postgres_rejects_valkey_cursor() {
150 let c = encode_valkey_cursor(1, 1);
151 assert!(decode_postgres_event_cursor(&c).is_err());
152 }
153}