hypershell_reqwest_components/providers/
streaming_request.rs

1use core::marker::PhantomData;
2use core::pin::Pin;
3use std::io::ErrorKind;
4
5use cgp::extra::handler::{CanHandle, Handler, HandlerComponent};
6use cgp::prelude::*;
7use futures::{AsyncRead as FutAsyncRead, TryStreamExt};
8use hypershell_components::dsl::StreamingHttpRequest;
9use reqwest::Response;
10
11use crate::dsl::CoreHttpRequest;
12use crate::providers::ErrorResponse;
13
14#[cgp_new_provider]
15impl<Context, MethodArg, UrlArg, Headers, Input>
16    Handler<Context, StreamingHttpRequest<MethodArg, UrlArg, Headers>, Input>
17    for HandleStreamingHttpRequest
18where
19    Context: CanHandle<CoreHttpRequest<MethodArg, UrlArg, Headers>, Input, Output = Response>
20        + CanRaiseAsyncError<reqwest::Error>
21        + CanRaiseAsyncError<ErrorResponse>,
22    MethodArg: Send,
23    UrlArg: Send,
24    Headers: Send,
25    Input: Send,
26{
27    type Output = Pin<Box<dyn FutAsyncRead + Send>>;
28
29    async fn handle(
30        context: &Context,
31        _tag: PhantomData<StreamingHttpRequest<MethodArg, UrlArg, Headers>>,
32        body: Input,
33    ) -> Result<Pin<Box<dyn FutAsyncRead + Send>>, Context::Error> {
34        let response = context.handle(PhantomData, body).await?;
35
36        let status_code = response.status();
37
38        if !status_code.is_success() {
39            return Err(Context::raise_error(ErrorResponse { response }));
40        }
41
42        let response_stream = response
43            .bytes_stream()
44            .map_err(|e| std::io::Error::new(ErrorKind::Other, e))
45            .into_async_read();
46
47        Ok(Box::pin(response_stream))
48    }
49}