eventsrc_client/
oneshot.rs1use std::{
2 error::Error as StdError,
3 fmt,
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use eventsrc::{Event, EventStream};
9use futures_core::stream::{LocalBoxStream, Stream};
10
11use crate::error::Error;
12
13struct OneshotStream<S> {
14 inner: Pin<Box<EventStream<S>>>,
15}
16
17impl<S> OneshotStream<S> {
18 fn new<B, E>(body: S) -> Self
19 where
20 S: Stream<Item = Result<B, E>>,
21 B: AsRef<[u8]>,
22 {
23 Self { inner: Box::pin(EventStream::new(body)) }
24 }
25}
26
27impl<S, B, E> Stream for OneshotStream<S>
28where
29 S: Stream<Item = Result<B, E>>,
30 B: AsRef<[u8]>,
31 E: StdError + Send + Sync + 'static,
32{
33 type Item = Result<Event, Error>;
34
35 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36 let this = self.get_mut();
37
38 match this.inner.as_mut().poll_next(cx) {
39 Poll::Ready(Some(Ok(event))) => Poll::Ready(Some(Ok(event))),
40 Poll::Ready(Some(Err(error))) => Poll::Ready(Some(Err(error.into()))),
41 Poll::Ready(None) => Poll::Ready(None),
42 Poll::Pending => Poll::Pending,
43 }
44 }
45}
46
47type BoxEventStream = LocalBoxStream<'static, Result<Event, Error>>;
48
49pub struct EventSource {
51 inner: BoxEventStream,
52}
53
54impl fmt::Debug for EventSource {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 f.write_str("oneshot::EventSource { .. }")
57 }
58}
59
60impl EventSource {
61 pub fn new<S, B, E>(body: S) -> Self
63 where
64 S: Stream<Item = Result<B, E>> + 'static,
65 B: AsRef<[u8]>,
66 E: StdError + Send + Sync + 'static,
67 {
68 let inner = Box::pin(OneshotStream::new(body));
69 Self { inner }
70 }
71}
72
73impl Stream for EventSource {
74 type Item = Result<Event, Error>;
75
76 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
77 self.get_mut().inner.as_mut().poll_next(cx)
78 }
79}
80
81pub trait EventSourceExt: Sized {
83 fn event_source(self) -> Result<EventSource, Error>;
85}