clientix_core/client/asynchronous/stream/
mod.rs1pub mod sse;
2
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use bytes::Bytes;
7use futures_core::Stream;
8use futures_util::{StreamExt, TryStreamExt};
9use http::{HeaderMap, StatusCode, Version};
10use reqwest::Url;
11use crate::client::asynchronous::stream::sse::ClientixSSEStream;
12use crate::client::response::ClientixResult;
13
14pub trait ClientixStreamInterface<T>: Stream {
15
16 fn version(&self) -> Version;
17
18 fn content_length(&self) -> Option<u64>;
19
20 fn status(&self) -> StatusCode;
21
22 fn url(&self) -> &Url;
23
24 fn remote_addr(&self) -> Option<SocketAddr>;
25
26 fn headers(&self) -> &HeaderMap;
27
28 #[allow(async_fn_in_trait)]
29 async fn execute<F>(self, handle: F) where F: FnMut(ClientixResult<T>);
30
31 #[allow(async_fn_in_trait)]
32 async fn collect(self) -> ClientixResult<Vec<T>>;
33
34}
35
36pub struct ClientixStream {
37 version: Version,
38 content_length: Option<u64>,
39 status: StatusCode,
40 url: Url,
41 remote_addr: Option<SocketAddr>,
42 headers: HeaderMap,
43 stream: Pin<Box<dyn Stream<Item = ClientixResult<Bytes>>>>,
44}
45
46impl ClientixStream {
47
48 pub fn new(
49 version: Version,
50 content_length: Option<u64>,
51 status: StatusCode,
52 url: Url,
53 remote_addr: Option<SocketAddr>,
54 headers: HeaderMap,
55 stream: impl Stream<Item = ClientixResult<Bytes>> + 'static
56 ) -> Self {
57 Self {
58 version,
59 content_length,
60 status,
61 url,
62 remote_addr,
63 headers,
64 stream: Box::pin(stream)
65 }
66 }
67
68 pub fn sse(self) -> ClientixSSEStream<String> {
69 self.into()
70 }
71
72}
73
74impl Stream for ClientixStream {
75 type Item = ClientixResult<Bytes>;
76
77 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
78 self.stream.as_mut().poll_next(cx)
79 }
80}
81
82impl ClientixStreamInterface<Bytes> for ClientixStream {
83
84 fn version(&self) -> Version {
85 self.version
86 }
87
88 fn content_length(&self) -> Option<u64> {
89 self.content_length
90 }
91
92 fn status(&self) -> StatusCode {
93 self.status
94 }
95
96 fn url(&self) -> &Url {
97 &self.url
98 }
99
100 fn remote_addr(&self) -> Option<SocketAddr> {
101 self.remote_addr
102 }
103
104 fn headers(&self) -> &HeaderMap {
105 &self.headers
106 }
107
108 async fn execute<F>(mut self, mut handle: F) where F: FnMut(ClientixResult<Bytes>) {
109 while let Some(result) = self.stream.next().await {
110 handle(result);
111 }
112 }
113
114 async fn collect(self) -> ClientixResult<Vec<Bytes>> {
115 self.stream.try_collect().await
116 }
117
118}