clientix_core/client/asynchronous/stream/
mod.rs

1pub mod sse;
2
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use bytes::Bytes;
7use futures_core::Stream;
8use futures_util::{StreamExt, TryStreamExt};
9use http::{HeaderMap, StatusCode, Version};
10use reqwest::Url;
11use crate::client::asynchronous::stream::sse::ClientixSSEStream;
12use crate::client::response::ClientixResult;
13
14pub trait ClientixStreamInterface<T>: Stream {
15
16    fn version(&self) -> Version;
17
18    fn content_length(&self) -> Option<u64>;
19
20    fn status(&self) -> StatusCode;
21
22    fn url(&self) -> &Url;
23
24    fn remote_addr(&self) -> Option<SocketAddr>;
25
26    fn headers(&self) -> &HeaderMap;
27
28    #[allow(async_fn_in_trait)]
29    async fn execute<F>(self, handle: F) where F: FnMut(ClientixResult<T>);
30
31    #[allow(async_fn_in_trait)]
32    async fn collect(self) -> ClientixResult<Vec<T>>;
33
34}
35
36pub struct ClientixStream {
37    version: Version,
38    content_length: Option<u64>,
39    status: StatusCode,
40    url: Url,
41    remote_addr: Option<SocketAddr>,
42    headers: HeaderMap,
43    stream: Pin<Box<dyn Stream<Item = ClientixResult<Bytes>>>>,
44}
45
46impl ClientixStream {
47
48    pub fn new(
49        version: Version,
50        content_length: Option<u64>,
51        status: StatusCode,
52        url: Url,
53        remote_addr: Option<SocketAddr>,
54        headers: HeaderMap,
55        stream: impl Stream<Item = ClientixResult<Bytes>> + 'static
56    ) -> Self {
57        Self {
58            version,
59            content_length,
60            status,
61            url,
62            remote_addr,
63            headers,
64            stream: Box::pin(stream)
65        }
66    }
67    
68    pub fn sse(self) -> ClientixSSEStream<String> {
69        self.into()
70    }
71
72}
73
74impl Stream for ClientixStream {
75    type Item = ClientixResult<Bytes>;
76
77    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
78        self.stream.as_mut().poll_next(cx)
79    }
80}
81
82impl ClientixStreamInterface<Bytes> for ClientixStream {
83
84    fn version(&self) -> Version {
85        self.version
86    }
87
88    fn content_length(&self) -> Option<u64> {
89        self.content_length
90    }
91
92    fn status(&self) -> StatusCode {
93        self.status
94    }
95
96    fn url(&self) -> &Url {
97        &self.url
98    }
99
100    fn remote_addr(&self) -> Option<SocketAddr> {
101        self.remote_addr
102    }
103
104    fn headers(&self) -> &HeaderMap {
105        &self.headers
106    }
107
108    async fn execute<F>(mut self, mut handle: F) where F: FnMut(ClientixResult<Bytes>) {
109        while let Some(result) = self.stream.next().await {
110            handle(result);
111        }
112    }
113
114    async fn collect(self) -> ClientixResult<Vec<Bytes>> {
115        self.stream.try_collect().await
116    }
117
118}