Skip to main content

eventsrc_client/
oneshot.rs

1use std::{
2    error::Error as StdError,
3    fmt,
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use eventsrc::{Event, EventStream};
9use futures_core::stream::{LocalBoxStream, Stream};
10
11use crate::error::Error;
12
13struct OneshotStream<S> {
14    inner: Pin<Box<EventStream<S>>>,
15}
16
17impl<S> OneshotStream<S> {
18    fn new<B, E>(body: S) -> Self
19    where
20        S: Stream<Item = Result<B, E>>,
21        B: AsRef<[u8]>,
22    {
23        Self { inner: Box::pin(EventStream::new(body)) }
24    }
25}
26
27impl<S, B, E> Stream for OneshotStream<S>
28where
29    S: Stream<Item = Result<B, E>>,
30    B: AsRef<[u8]>,
31    E: StdError + Send + Sync + 'static,
32{
33    type Item = Result<Event, Error>;
34
35    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36        let this = self.get_mut();
37
38        match this.inner.as_mut().poll_next(cx) {
39            Poll::Ready(Some(Ok(event))) => Poll::Ready(Some(Ok(event))),
40            Poll::Ready(Some(Err(error))) => Poll::Ready(Some(Err(error.into()))),
41            Poll::Ready(None) => Poll::Ready(None),
42            Poll::Pending => Poll::Pending,
43        }
44    }
45}
46
47type BoxEventStream = LocalBoxStream<'static, Result<Event, Error>>;
48
49/// One-shot SSE event source backed by a single body stream.
50pub struct EventSource {
51    inner: BoxEventStream,
52}
53
54impl fmt::Debug for EventSource {
55    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56        f.write_str("oneshot::EventSource { .. }")
57    }
58}
59
60impl EventSource {
61    /// Creates a one-shot SSE event source from a byte stream body.
62    pub fn new<S, B, E>(body: S) -> Self
63    where
64        S: Stream<Item = Result<B, E>> + 'static,
65        B: AsRef<[u8]>,
66        E: StdError + Send + Sync + 'static,
67    {
68        let inner = Box::pin(OneshotStream::new(body));
69        Self { inner }
70    }
71}
72
73impl Stream for EventSource {
74    type Item = Result<Event, Error>;
75
76    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
77        self.get_mut().inner.as_mut().poll_next(cx)
78    }
79}
80
81/// Extension methods for building one-shot SSE event sources from backend-specific responses.
82pub trait EventSourceExt: Sized {
83    /// Validates the backend response and converts its body into a one-shot [`EventSource`].
84    fn event_source(self) -> Result<EventSource, Error>;
85}