braid_core/core/server/subscription.rs
1//! Server-side subscription utilities for Braid protocol.
2
3use axum::body::Bytes;
4use futures::{Stream, StreamExt};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use tokio::time::{interval, Duration, Interval};
8
9/// A stream wrapper that injects heartbeat blank lines into a Braid subscription.
10///
11/// Matches the Braid-HTTP specification (Section 4.1), which recommends sending
12/// blank lines (\r\n) at regular intervals to prevent intermediate proxies from
13/// timing out the connection.
14pub struct HeartbeatStream<S> {
15 /// The underlying update stream
16 inner: S,
17 /// Interval for heartbeats
18 heartbeat: Interval,
19 /// Whether the last yielded item was a heartbeat
20 _pending_heartbeat: bool,
21}
22
23impl<S> HeartbeatStream<S> {
24 /// Create a new heartbeat stream.
25 ///
26 /// # Arguments
27 ///
28 /// * `inner` - The stream of updates/data to wrap
29 /// * `delay` - The interval between heartbeats
30 pub fn new(inner: S, delay: Duration) -> Self {
31 let mut heartbeat = interval(delay);
32 // The first tick happens immediately, we skip it
33 heartbeat.reset();
34
35 Self {
36 inner,
37 heartbeat,
38 _pending_heartbeat: false,
39 }
40 }
41}
42
43impl<S, T, E> Stream for HeartbeatStream<S>
44where
45 S: Stream<Item = Result<T, E>> + Unpin,
46 T: From<Bytes>,
47{
48 type Item = Result<T, E>;
49
50 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51 // 1. Check if the inner stream has data
52 match self.inner.poll_next_unpin(cx) {
53 Poll::Ready(Some(item)) => {
54 // We got data, reset the heartbeat timer
55 self.heartbeat.reset();
56 return Poll::Ready(Some(item));
57 }
58 Poll::Ready(None) => return Poll::Ready(None),
59 Poll::Pending => {}
60 }
61
62 // 2. Check if it's time for a heartbeat
63 match self.heartbeat.poll_tick(cx) {
64 Poll::Ready(_) => {
65 // It's time! Yield a blank line (\r\n)
66 // In Braid, blank lines are ignored as whitespace but keep the connection alive
67 Poll::Ready(Some(Ok(T::from(Bytes::from("\r\n")))))
68 }
69 Poll::Pending => Poll::Pending,
70 }
71 }
72}