hypershell_reqwest_components/providers/
streaming_request.rs1use core::marker::PhantomData;
2use core::pin::Pin;
3use std::io::ErrorKind;
4
5use cgp::extra::handler::{CanHandle, Handler, HandlerComponent};
6use cgp::prelude::*;
7use futures::{AsyncRead as FutAsyncRead, TryStreamExt};
8use hypershell_components::dsl::StreamingHttpRequest;
9use reqwest::Response;
10
11use crate::dsl::CoreHttpRequest;
12use crate::providers::ErrorResponse;
13
14#[cgp_new_provider]
15impl<Context, MethodArg, UrlArg, Headers, Input>
16 Handler<Context, StreamingHttpRequest<MethodArg, UrlArg, Headers>, Input>
17 for HandleStreamingHttpRequest
18where
19 Context: CanHandle<CoreHttpRequest<MethodArg, UrlArg, Headers>, Input, Output = Response>
20 + CanRaiseAsyncError<reqwest::Error>
21 + CanRaiseAsyncError<ErrorResponse>,
22 MethodArg: Send,
23 UrlArg: Send,
24 Headers: Send,
25 Input: Send,
26{
27 type Output = Pin<Box<dyn FutAsyncRead + Send>>;
28
29 async fn handle(
30 context: &Context,
31 _tag: PhantomData<StreamingHttpRequest<MethodArg, UrlArg, Headers>>,
32 body: Input,
33 ) -> Result<Pin<Box<dyn FutAsyncRead + Send>>, Context::Error> {
34 let response = context.handle(PhantomData, body).await?;
35
36 let status_code = response.status();
37
38 if !status_code.is_success() {
39 return Err(Context::raise_error(ErrorResponse { response }));
40 }
41
42 let response_stream = response
43 .bytes_stream()
44 .map_err(|e| std::io::Error::new(ErrorKind::Other, e))
45 .into_async_read();
46
47 Ok(Box::pin(response_stream))
48 }
49}