1use super::{Encoding, FromReq, FromRes, IntoReq};
2use crate::{
3 error::{FromServerFnError, ServerFnErrorErr},
4 request::{ClientReq, Req},
5 response::{ClientRes, TryRes},
6 ContentType, IntoRes, ServerFnError,
7};
8use bytes::Bytes;
9use futures::{Stream, StreamExt, TryStreamExt};
10use http::Method;
11use std::{fmt::Debug, pin::Pin};
12
13pub struct Streaming;
26
27impl ContentType for Streaming {
28 const CONTENT_TYPE: &'static str = "application/octet-stream";
29}
30
31impl Encoding for Streaming {
32 const METHOD: Method = Method::POST;
33}
34
35impl<E, T, Request> IntoReq<Streaming, Request, E> for T
36where
37 Request: ClientReq<E>,
38 T: Stream<Item = Bytes> + Send + 'static,
39 E: FromServerFnError,
40{
41 fn into_req(self, path: &str, accepts: &str) -> Result<Request, E> {
42 Request::try_new_post_streaming(
43 path,
44 accepts,
45 Streaming::CONTENT_TYPE,
46 self,
47 )
48 }
49}
50
51impl<E, T, Request> FromReq<Streaming, Request, E> for T
52where
53 Request: Req<E> + Send + 'static,
54 T: From<ByteStream<E>> + 'static,
55 E: FromServerFnError,
56{
57 async fn from_req(req: Request) -> Result<Self, E> {
58 let data = req.try_into_stream()?;
59 let s = ByteStream::new(data.map_err(|e| E::de(e)));
60 Ok(s.into())
61 }
62}
63
64pub struct ByteStream<E = ServerFnError>(
77 Pin<Box<dyn Stream<Item = Result<Bytes, E>> + Send>>,
78);
79
80impl<E> ByteStream<E> {
81 pub fn into_inner(self) -> impl Stream<Item = Result<Bytes, E>> + Send {
83 self.0
84 }
85}
86
87impl<E> Debug for ByteStream<E> {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 f.debug_tuple("ByteStream").finish()
90 }
91}
92
93impl<E> ByteStream<E> {
94 pub fn new<T>(
96 value: impl Stream<Item = Result<T, E>> + Send + 'static,
97 ) -> Self
98 where
99 T: Into<Bytes>,
100 {
101 Self(Box::pin(value.map(|value| value.map(Into::into))))
102 }
103}
104
105impl<E, S, T> From<S> for ByteStream<E>
106where
107 S: Stream<Item = T> + Send + 'static,
108 T: Into<Bytes>,
109{
110 fn from(value: S) -> Self {
111 Self(Box::pin(value.map(|data| Ok(data.into()))))
112 }
113}
114
115impl<E, Response> IntoRes<Streaming, Response, E> for ByteStream<E>
116where
117 Response: TryRes<E>,
118 E: FromServerFnError,
119{
120 async fn into_res(self) -> Result<Response, E> {
121 Response::try_from_stream(
122 Streaming::CONTENT_TYPE,
123 self.into_inner().map_err(|e| e.ser()),
124 )
125 }
126}
127
128impl<E, Response> FromRes<Streaming, Response, E> for ByteStream<E>
129where
130 Response: ClientRes<E> + Send,
131 E: FromServerFnError,
132{
133 async fn from_res(res: Response) -> Result<Self, E> {
134 let stream = res.try_into_stream()?;
135 Ok(ByteStream::new(stream.map_err(|e| E::de(e))))
136 }
137}
138
139pub struct StreamingText;
152
153impl ContentType for StreamingText {
154 const CONTENT_TYPE: &'static str = "text/plain";
155}
156
157impl Encoding for StreamingText {
158 const METHOD: Method = Method::POST;
159}
160
161pub struct TextStream<E = ServerFnError>(
174 Pin<Box<dyn Stream<Item = Result<String, E>> + Send>>,
175);
176
177impl<E> Debug for TextStream<E> {
178 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179 f.debug_tuple("TextStream").finish()
180 }
181}
182
183impl<E> TextStream<E> {
184 pub fn new(
186 value: impl Stream<Item = Result<String, E>> + Send + 'static,
187 ) -> Self {
188 Self(Box::pin(value.map(|value| value)))
189 }
190}
191
192impl<E> TextStream<E> {
193 pub fn into_inner(self) -> impl Stream<Item = Result<String, E>> + Send {
195 self.0
196 }
197}
198
199impl<E, S, T> From<S> for TextStream<E>
200where
201 S: Stream<Item = T> + Send + 'static,
202 T: Into<String>,
203{
204 fn from(value: S) -> Self {
205 Self(Box::pin(value.map(|data| Ok(data.into()))))
206 }
207}
208
209impl<E, T, Request> IntoReq<StreamingText, Request, E> for T
210where
211 Request: ClientReq<E>,
212 T: Into<TextStream<E>>,
213 E: FromServerFnError,
214{
215 fn into_req(self, path: &str, accepts: &str) -> Result<Request, E> {
216 let data = self.into();
217 Request::try_new_post_streaming(
218 path,
219 accepts,
220 Streaming::CONTENT_TYPE,
221 data.0.map(|chunk| chunk.unwrap_or_default().into()),
222 )
223 }
224}
225
226impl<E, T, Request> FromReq<StreamingText, Request, E> for T
227where
228 Request: Req<E> + Send + 'static,
229 T: From<TextStream<E>> + 'static,
230 E: FromServerFnError,
231{
232 async fn from_req(req: Request) -> Result<Self, E> {
233 let data = req.try_into_stream()?;
234 let s = TextStream::new(data.map(|chunk| match chunk {
235 Ok(bytes) => {
236 let de = String::from_utf8(bytes.to_vec()).map_err(|e| {
237 E::from_server_fn_error(ServerFnErrorErr::Deserialization(
238 e.to_string(),
239 ))
240 })?;
241 Ok(de)
242 }
243 Err(bytes) => Err(E::de(bytes)),
244 }));
245 Ok(s.into())
246 }
247}
248
249impl<E, Response> IntoRes<StreamingText, Response, E> for TextStream<E>
250where
251 Response: TryRes<E>,
252 E: FromServerFnError,
253{
254 async fn into_res(self) -> Result<Response, E> {
255 Response::try_from_stream(
256 Streaming::CONTENT_TYPE,
257 self.into_inner()
258 .map(|stream| stream.map(Into::into).map_err(|e| e.ser())),
259 )
260 }
261}
262
263impl<E, Response> FromRes<StreamingText, Response, E> for TextStream<E>
264where
265 Response: ClientRes<E> + Send,
266 E: FromServerFnError,
267{
268 async fn from_res(res: Response) -> Result<Self, E> {
269 let stream = res.try_into_stream()?;
270 Ok(TextStream(Box::pin(stream.map(|chunk| match chunk {
271 Ok(bytes) => {
272 let de = String::from_utf8(bytes.into()).map_err(|e| {
273 E::from_server_fn_error(ServerFnErrorErr::Deserialization(
274 e.to_string(),
275 ))
276 })?;
277 Ok(de)
278 }
279 Err(bytes) => Err(E::de(bytes)),
280 }))))
281 }
282}