use std::pin::Pin;
use bytes::Bytes;
use tokio_stream::Stream;
use crate::engine_error::EngineError;
use crate::types::{ExecutionId, TimestampMs};
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct StreamCursor(pub Bytes);
impl StreamCursor {
pub fn new(raw: impl Into<Bytes>) -> Self {
Self(raw.into())
}
pub fn as_bytes(&self) -> &[u8] {
&self.0
}
pub fn empty() -> Self {
Self(Bytes::new())
}
}
#[non_exhaustive]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum StreamFamily {
LeaseHistory,
Completion,
SignalDelivery,
InstanceTags,
}
#[non_exhaustive]
#[derive(Clone, Debug)]
pub struct StreamEvent {
pub family: StreamFamily,
pub cursor: StreamCursor,
pub execution_id: Option<ExecutionId>,
pub attempt_index: Option<u32>,
pub timestamp: TimestampMs,
pub payload: Bytes,
}
impl StreamEvent {
pub fn new(
family: StreamFamily,
cursor: StreamCursor,
timestamp: TimestampMs,
payload: Bytes,
) -> Self {
Self {
family,
cursor,
execution_id: None,
attempt_index: None,
timestamp,
payload,
}
}
#[must_use]
pub fn with_execution_id(mut self, id: ExecutionId) -> Self {
self.execution_id = Some(id);
self
}
#[must_use]
pub fn with_attempt_index(mut self, idx: u32) -> Self {
self.attempt_index = Some(idx);
self
}
}
pub type StreamSubscription =
Pin<Box<dyn Stream<Item = Result<StreamEvent, EngineError>> + Send>>;
pub const VALKEY_CURSOR_PREFIX: u8 = 0x01;
pub const POSTGRES_CURSOR_PREFIX: u8 = 0x02;
pub fn encode_valkey_cursor(ms: u64, seq: u64) -> StreamCursor {
let mut buf = Vec::with_capacity(17);
buf.push(VALKEY_CURSOR_PREFIX);
buf.extend_from_slice(&ms.to_be_bytes());
buf.extend_from_slice(&seq.to_be_bytes());
StreamCursor::new(buf)
}
pub fn decode_valkey_cursor(cursor: &StreamCursor) -> Result<Option<(u64, u64)>, &'static str> {
let bytes = cursor.as_bytes();
if bytes.is_empty() {
return Ok(None);
}
if bytes.len() != 17 || bytes[0] != VALKEY_CURSOR_PREFIX {
return Err("stream_subscribe: cursor does not belong to the Valkey backend");
}
let mut ms = [0u8; 8];
let mut seq = [0u8; 8];
ms.copy_from_slice(&bytes[1..9]);
seq.copy_from_slice(&bytes[9..17]);
Ok(Some((u64::from_be_bytes(ms), u64::from_be_bytes(seq))))
}
pub fn encode_postgres_event_cursor(event_id: i64) -> StreamCursor {
let mut buf = Vec::with_capacity(9);
buf.push(POSTGRES_CURSOR_PREFIX);
buf.extend_from_slice(&event_id.to_be_bytes());
StreamCursor::new(buf)
}
pub fn decode_postgres_event_cursor(cursor: &StreamCursor) -> Result<Option<i64>, &'static str> {
let bytes = cursor.as_bytes();
if bytes.is_empty() {
return Ok(None);
}
if bytes.len() != 9 || bytes[0] != POSTGRES_CURSOR_PREFIX {
return Err("stream_subscribe: cursor does not belong to the Postgres backend");
}
let mut event_id = [0u8; 8];
event_id.copy_from_slice(&bytes[1..9]);
Ok(Some(i64::from_be_bytes(event_id)))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn valkey_cursor_roundtrip() {
let c = encode_valkey_cursor(1_700_000_000_000, 42);
assert_eq!(c.as_bytes()[0], VALKEY_CURSOR_PREFIX);
let (ms, seq) = decode_valkey_cursor(&c).unwrap().unwrap();
assert_eq!(ms, 1_700_000_000_000);
assert_eq!(seq, 42);
}
#[test]
fn valkey_empty_is_tail() {
let c = StreamCursor::empty();
assert!(decode_valkey_cursor(&c).unwrap().is_none());
}
#[test]
fn valkey_rejects_postgres_cursor() {
let c = encode_postgres_event_cursor(7);
assert!(decode_valkey_cursor(&c).is_err());
}
#[test]
fn postgres_cursor_roundtrip() {
let c = encode_postgres_event_cursor(12345);
assert_eq!(c.as_bytes()[0], POSTGRES_CURSOR_PREFIX);
assert_eq!(decode_postgres_event_cursor(&c).unwrap(), Some(12345));
}
#[test]
fn postgres_rejects_valkey_cursor() {
let c = encode_valkey_cursor(1, 1);
assert!(decode_postgres_event_cursor(&c).is_err());
}
}