Skip to main content

braid_core/core/server/
subscription.rs

1//! Server-side subscription utilities for Braid protocol.
2
3use axum::body::Bytes;
4use futures::{Stream, StreamExt};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use tokio::time::{interval, Duration, Interval};
8
9/// A stream wrapper that injects heartbeat blank lines into a Braid subscription.
10///
11/// Matches the Braid-HTTP specification (Section 4.1), which recommends sending
12/// blank lines (\r\n) at regular intervals to prevent intermediate proxies from
13/// timing out the connection.
14pub struct HeartbeatStream<S> {
15    /// The underlying update stream
16    inner: S,
17    /// Interval for heartbeats
18    heartbeat: Interval,
19    /// Whether the last yielded item was a heartbeat
20    _pending_heartbeat: bool,
21}
22
23impl<S> HeartbeatStream<S> {
24    /// Create a new heartbeat stream.
25    ///
26    /// # Arguments
27    ///
28    /// * `inner` - The stream of updates/data to wrap
29    /// * `delay` - The interval between heartbeats
30    pub fn new(inner: S, delay: Duration) -> Self {
31        let mut heartbeat = interval(delay);
32        // The first tick happens immediately, we skip it
33        heartbeat.reset();
34
35        Self {
36            inner,
37            heartbeat,
38            _pending_heartbeat: false,
39        }
40    }
41}
42
43impl<S, T, E> Stream for HeartbeatStream<S>
44where
45    S: Stream<Item = Result<T, E>> + Unpin,
46    T: From<Bytes>,
47{
48    type Item = Result<T, E>;
49
50    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51        // 1. Check if the inner stream has data
52        match self.inner.poll_next_unpin(cx) {
53            Poll::Ready(Some(item)) => {
54                // We got data, reset the heartbeat timer
55                self.heartbeat.reset();
56                return Poll::Ready(Some(item));
57            }
58            Poll::Ready(None) => return Poll::Ready(None),
59            Poll::Pending => {}
60        }
61
62        // 2. Check if it's time for a heartbeat
63        match self.heartbeat.poll_tick(cx) {
64            Poll::Ready(_) => {
65                // It's time! Yield a blank line (\r\n)
66                // In Braid, blank lines are ignored as whitespace but keep the connection alive
67                Poll::Ready(Some(Ok(T::from(Bytes::from("\r\n")))))
68            }
69            Poll::Pending => Poll::Pending,
70        }
71    }
72}