clientix_core/client/asynchronous/
stream.rs1use std::net::SocketAddr;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use futures_core::Stream;
5use futures_util::{StreamExt, TryStreamExt};
6use http::{HeaderMap, StatusCode, Version};
7use reqwest::Url;
8use crate::client::response::ClientixResult;
9
10pub struct ClientixStream<T> {
11 version: Version,
12 content_length: Option<u64>,
13 status: StatusCode,
14 url: Url,
15 remote_addr: Option<SocketAddr>,
16 headers: HeaderMap,
17 stream: Pin<Box<dyn Stream<Item = ClientixResult<T>>>>,
18}
19
20impl<T> Stream for ClientixStream<T> {
21 type Item = ClientixResult<T>;
22
23 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
24 self.stream.as_mut().poll_next(cx)
25 }
26}
27
28impl<T> ClientixStream<T> {
29
30 pub fn new(
31 version: Version,
32 content_length: Option<u64>,
33 status: StatusCode,
34 url: Url,
35 remote_addr: Option<SocketAddr>,
36 headers: HeaderMap,
37 stream: impl Stream<Item = ClientixResult<T>> + 'static
38 ) -> Self {
39 Self {
40 version,
41 content_length,
42 status,
43 url,
44 remote_addr,
45 headers,
46 stream: Box::pin(stream)
47 }
48 }
49
50 pub fn version(&self) -> Version {
51 self.version
52 }
53
54 pub fn content_length(&self) -> Option<u64> {
55 self.content_length
56 }
57
58 pub fn status(&self) -> StatusCode {
59 self.status
60 }
61
62 pub fn url(&self) -> &Url {
63 &self.url
64 }
65
66 pub fn remote_addr(&self) -> Option<SocketAddr> {
67 self.remote_addr
68 }
69
70 pub fn headers(&self) -> &HeaderMap {
71 &self.headers
72 }
73
74 pub async fn execute<F>(mut self, mut handle: F) where F: FnMut(ClientixResult<T>) {
75 while let Some(result) = self.stream.next().await {
76 handle(result);
77 }
78 }
79
80 pub async fn collect(self) -> ClientixResult<Vec<T>> {
81 self.stream.try_collect().await
82 }
83
84}