clientix_core/client/asynchronous/
stream.rs

1use std::net::SocketAddr;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use futures_core::Stream;
5use futures_util::{StreamExt, TryStreamExt};
6use http::{HeaderMap, StatusCode, Version};
7use reqwest::Url;
8use crate::client::result::ClientixResult;
9
10pub struct ClientixStream<T> {
11    version: Version,
12    content_length: Option<u64>,
13    status: StatusCode,
14    url: Url,
15    remote_addr: Option<SocketAddr>,
16    headers: HeaderMap,
17    stream: Pin<Box<dyn Stream<Item = ClientixResult<T>>>>,
18}
19
20impl<T> Stream for ClientixStream<T> {
21    type Item = ClientixResult<T>;
22
23    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
24        self.stream.as_mut().poll_next(cx)
25    }
26}
27
28impl<T> ClientixStream<T> {
29
30    pub fn new(
31        version: Version,
32        content_length: Option<u64>,
33        status: StatusCode,
34        url: Url,
35        remote_addr: Option<SocketAddr>,
36        headers: HeaderMap,
37        stream: impl Stream<Item = ClientixResult<T>> + 'static
38    ) -> Self {
39        Self {
40            version,
41            content_length,
42            status,
43            url,
44            remote_addr,
45            headers,
46            stream: Box::pin(stream)
47        }
48    }
49    
50    pub fn version(&self) -> Version {
51        self.version
52    }
53    
54    pub fn content_length(&self) -> Option<u64> {
55        self.content_length
56    }
57
58    pub fn status(&self) -> StatusCode {
59        self.status
60    }
61    
62    pub fn url(&self) -> &Url {
63        &self.url
64    }
65    
66    pub fn remote_addr(&self) -> Option<SocketAddr> {
67        self.remote_addr
68    }
69    
70    pub fn headers(&self) -> &HeaderMap {
71        &self.headers
72    }
73
74    pub async fn execute<F>(mut self, mut handle: F) where F: FnMut(ClientixResult<T>) {
75        while let Some(result) = self.stream.next().await {
76            handle(result);
77        }
78    }
79
80    pub async fn collect(self) -> ClientixResult<Vec<T>> {
81        self.stream.try_collect().await
82    }
83
84}