Skip to main content

server_fn/codec/
stream.rs

1use super::{Encoding, FromReq, FromRes, IntoReq};
2use crate::{
3    error::{FromServerFnError, ServerFnErrorErr},
4    request::{ClientReq, Req},
5    response::{ClientRes, TryRes},
6    ContentType, IntoRes, ServerFnError,
7};
8use bytes::Bytes;
9use futures::{Stream, StreamExt, TryStreamExt};
10use http::Method;
11use std::{fmt::Debug, pin::Pin};
12
13/// An encoding that represents a stream of bytes.
14///
15/// A server function that uses this as its output encoding should return [`ByteStream`].
16///
17/// ## Browser Support for Streaming Input
18///
19/// Browser fetch requests do not currently support full request duplexing, which
20/// means that that they do begin handling responses until the full request has been sent.
21/// This means that if you use a streaming input encoding, the input stream needs to
22/// end before the output will begin.
23///
24/// Streaming requests are only allowed over HTTP2 or HTTP3.
25pub struct Streaming;
26
27impl ContentType for Streaming {
28    const CONTENT_TYPE: &'static str = "application/octet-stream";
29}
30
31impl Encoding for Streaming {
32    const METHOD: Method = Method::POST;
33}
34
35impl<E, T, Request> IntoReq<Streaming, Request, E> for T
36where
37    Request: ClientReq<E>,
38    T: Stream<Item = Bytes> + Send + 'static,
39    E: FromServerFnError,
40{
41    fn into_req(self, path: &str, accepts: &str) -> Result<Request, E> {
42        Request::try_new_post_streaming(
43            path,
44            accepts,
45            Streaming::CONTENT_TYPE,
46            self,
47        )
48    }
49}
50
51impl<E, T, Request> FromReq<Streaming, Request, E> for T
52where
53    Request: Req<E> + Send + 'static,
54    T: From<ByteStream<E>> + 'static,
55    E: FromServerFnError,
56{
57    async fn from_req(req: Request) -> Result<Self, E> {
58        let data = req.try_into_stream()?;
59        let s = ByteStream::new(data.map_err(|e| E::de(e)));
60        Ok(s.into())
61    }
62}
63
64/// A stream of bytes.
65///
66/// A server function can return this type if its output encoding is [`Streaming`].
67///
68/// ## Browser Support for Streaming Input
69///
70/// Browser fetch requests do not currently support full request duplexing, which
71/// means that that they do begin handling responses until the full request has been sent.
72/// This means that if you use a streaming input encoding, the input stream needs to
73/// end before the output will begin.
74///
75/// Streaming requests are only allowed over HTTP2 or HTTP3.
76pub struct ByteStream<E = ServerFnError>(
77    Pin<Box<dyn Stream<Item = Result<Bytes, E>> + Send>>,
78);
79
80impl<E> ByteStream<E> {
81    /// Consumes the wrapper, returning a stream of bytes.
82    pub fn into_inner(self) -> impl Stream<Item = Result<Bytes, E>> + Send {
83        self.0
84    }
85}
86
87impl<E> Debug for ByteStream<E> {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        f.debug_tuple("ByteStream").finish()
90    }
91}
92
93impl<E> ByteStream<E> {
94    /// Creates a new `ByteStream` from the given stream.
95    pub fn new<T>(
96        value: impl Stream<Item = Result<T, E>> + Send + 'static,
97    ) -> Self
98    where
99        T: Into<Bytes>,
100    {
101        Self(Box::pin(value.map(|value| value.map(Into::into))))
102    }
103}
104
105impl<E, S, T> From<S> for ByteStream<E>
106where
107    S: Stream<Item = T> + Send + 'static,
108    T: Into<Bytes>,
109{
110    fn from(value: S) -> Self {
111        Self(Box::pin(value.map(|data| Ok(data.into()))))
112    }
113}
114
115impl<E, Response> IntoRes<Streaming, Response, E> for ByteStream<E>
116where
117    Response: TryRes<E>,
118    E: FromServerFnError,
119{
120    async fn into_res(self) -> Result<Response, E> {
121        Response::try_from_stream(
122            Streaming::CONTENT_TYPE,
123            self.into_inner().map_err(|e| e.ser()),
124        )
125    }
126}
127
128impl<E, Response> FromRes<Streaming, Response, E> for ByteStream<E>
129where
130    Response: ClientRes<E> + Send,
131    E: FromServerFnError,
132{
133    async fn from_res(res: Response) -> Result<Self, E> {
134        let stream = res.try_into_stream()?;
135        Ok(ByteStream::new(stream.map_err(|e| E::de(e))))
136    }
137}
138
139/// An encoding that represents a stream of text.
140///
141/// A server function that uses this as its output encoding should return [`TextStream`].
142///
143/// ## Browser Support for Streaming Input
144///
145/// Browser fetch requests do not currently support full request duplexing, which
146/// means that that they do begin handling responses until the full request has been sent.
147/// This means that if you use a streaming input encoding, the input stream needs to
148/// end before the output will begin.
149///
150/// Streaming requests are only allowed over HTTP2 or HTTP3.
151pub struct StreamingText;
152
153impl ContentType for StreamingText {
154    const CONTENT_TYPE: &'static str = "text/plain";
155}
156
157impl Encoding for StreamingText {
158    const METHOD: Method = Method::POST;
159}
160
161/// A stream of text.
162///
163/// A server function can return this type if its output encoding is [`StreamingText`].
164///
165/// ## Browser Support for Streaming Input
166///
167/// Browser fetch requests do not currently support full request duplexing, which
168/// means that that they do begin handling responses until the full request has been sent.
169/// This means that if you use a streaming input encoding, the input stream needs to
170/// end before the output will begin.
171///
172/// Streaming requests are only allowed over HTTP2 or HTTP3.
173pub struct TextStream<E = ServerFnError>(
174    Pin<Box<dyn Stream<Item = Result<String, E>> + Send>>,
175);
176
177impl<E> Debug for TextStream<E> {
178    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179        f.debug_tuple("TextStream").finish()
180    }
181}
182
183impl<E> TextStream<E> {
184    /// Creates a new `TextStream` from the given stream.
185    pub fn new(
186        value: impl Stream<Item = Result<String, E>> + Send + 'static,
187    ) -> Self {
188        Self(Box::pin(value.map(|value| value)))
189    }
190}
191
192impl<E> TextStream<E> {
193    /// Consumes the wrapper, returning a stream of text.
194    pub fn into_inner(self) -> impl Stream<Item = Result<String, E>> + Send {
195        self.0
196    }
197}
198
199impl<E, S, T> From<S> for TextStream<E>
200where
201    S: Stream<Item = T> + Send + 'static,
202    T: Into<String>,
203{
204    fn from(value: S) -> Self {
205        Self(Box::pin(value.map(|data| Ok(data.into()))))
206    }
207}
208
209impl<E, T, Request> IntoReq<StreamingText, Request, E> for T
210where
211    Request: ClientReq<E>,
212    T: Into<TextStream<E>>,
213    E: FromServerFnError,
214{
215    fn into_req(self, path: &str, accepts: &str) -> Result<Request, E> {
216        let data = self.into();
217        Request::try_new_post_streaming(
218            path,
219            accepts,
220            Streaming::CONTENT_TYPE,
221            data.0.map(|chunk| chunk.unwrap_or_default().into()),
222        )
223    }
224}
225
226impl<E, T, Request> FromReq<StreamingText, Request, E> for T
227where
228    Request: Req<E> + Send + 'static,
229    T: From<TextStream<E>> + 'static,
230    E: FromServerFnError,
231{
232    async fn from_req(req: Request) -> Result<Self, E> {
233        let data = req.try_into_stream()?;
234        let s = TextStream::new(data.map(|chunk| match chunk {
235            Ok(bytes) => {
236                let de = String::from_utf8(bytes.to_vec()).map_err(|e| {
237                    E::from_server_fn_error(ServerFnErrorErr::Deserialization(
238                        e.to_string(),
239                    ))
240                })?;
241                Ok(de)
242            }
243            Err(bytes) => Err(E::de(bytes)),
244        }));
245        Ok(s.into())
246    }
247}
248
249impl<E, Response> IntoRes<StreamingText, Response, E> for TextStream<E>
250where
251    Response: TryRes<E>,
252    E: FromServerFnError,
253{
254    async fn into_res(self) -> Result<Response, E> {
255        Response::try_from_stream(
256            Streaming::CONTENT_TYPE,
257            self.into_inner()
258                .map(|stream| stream.map(Into::into).map_err(|e| e.ser())),
259        )
260    }
261}
262
263impl<E, Response> FromRes<StreamingText, Response, E> for TextStream<E>
264where
265    Response: ClientRes<E> + Send,
266    E: FromServerFnError,
267{
268    async fn from_res(res: Response) -> Result<Self, E> {
269        let stream = res.try_into_stream()?;
270        Ok(TextStream(Box::pin(stream.map(|chunk| match chunk {
271            Ok(bytes) => {
272                let de = String::from_utf8(bytes.into()).map_err(|e| {
273                    E::from_server_fn_error(ServerFnErrorErr::Deserialization(
274                        e.to_string(),
275                    ))
276                })?;
277                Ok(de)
278            }
279            Err(bytes) => Err(E::de(bytes)),
280        }))))
281    }
282}