clientix_core/client/asynchronous/
response.rs

1use futures_util::StreamExt;
2use bytes::Bytes;
3use encoding_rs::UTF_8;
4use futures_util::TryStreamExt;
5use reqwest::Response;
6use serde::de::DeserializeOwned;
7use crate::client::asynchronous::stream::ClientixStream;
8use crate::client::response::{ClientixError, ClientixResponse, ClientixResult};
9
10pub struct AsyncResponseHandler {
11    result: ClientixResult<Response>
12}
13
14impl AsyncResponseHandler {
15
16    pub fn new(result: ClientixResult<Response>) -> AsyncResponseHandler {
17        AsyncResponseHandler { result }
18    }
19
20    pub async fn text(self) -> ClientixResult<ClientixResponse<String>> {
21        match self.result {
22            Ok(response) => {
23                Ok(ClientixResponse::new(
24                    response.version(),
25                    response.content_length(),
26                    response.status(),
27                    response.url().clone(),
28                    response.remote_addr(),
29                    response.headers().clone(),
30                    response.text().await?
31                ))
32            },
33            Err(error) => Err(error),
34        }
35    }
36
37    pub async fn text_with_encoding(self, encoding: &str) -> ClientixResult<ClientixResponse<String>> {
38        match self.result {
39            Ok(response) => {
40                Ok(ClientixResponse::new(
41                    response.version(),
42                    response.content_length(),
43                    response.status(),
44                    response.url().clone(),
45                    response.remote_addr(),
46                    response.headers().clone(),
47                    response.text_with_charset(encoding).await?
48                ))
49            },
50            Err(error) => Err(error),
51        }
52    }
53
54    pub fn text_stream(self) -> ClientixResult<ClientixStream<String>> {
55        match self.bytes_stream() {
56            Ok(stream) => {
57                let version = stream.version();
58                let content_length = stream.content_length();
59                let status = stream.status();
60                let url = stream.url().clone();
61                let remote_addr = stream.remote_addr();
62                let headers = stream.headers().clone();
63                let stream = stream
64                    .map(|chunk| match chunk {
65                        Ok(chunk) => {
66                            let (text, _, _) = UTF_8.decode(&chunk);
67                            Ok(text.to_string())
68                        },
69                        Err(error) => Err(error),
70                    })
71                    .flat_map(|text| {
72                        match text {
73                            Ok(text) => {
74                                let lines: Vec<ClientixResult<String>> = text
75                                    .split("\n")
76                                    .map(str::trim)
77                                    .flat_map(|line| line.strip_prefix("data:"))
78                                    .map(str::trim)
79                                    .map(str::to_string)
80                                    .map(|value| Ok(value))
81                                    .collect::<>();
82
83                                futures_util::stream::iter(lines)
84                            }
85                            Err(error) => {
86                                let lines: Vec<ClientixResult<String>> = vec![Err(error)];
87                                futures_util::stream::iter(lines)
88                            }
89                        }
90                    });
91
92                Ok(ClientixStream::new(version, content_length, status, url, remote_addr, headers, stream))
93            },
94            Err(error) => Err(error),
95        }
96    }
97
98    pub async fn bytes(self) -> ClientixResult<ClientixResponse<Bytes>> {
99        match self.result {
100            Ok(response) => {
101                Ok(ClientixResponse::new(
102                    response.version(),
103                    response.content_length(),
104                    response.status(),
105                    response.url().clone(),
106                    response.remote_addr(),
107                    response.headers().clone(),
108                    response.bytes().await?
109                ))
110            },
111            Err(error) => Err(error),
112        }
113    }
114
115    pub fn bytes_stream(self) -> ClientixResult<ClientixStream<Bytes>> {
116        match self.result {
117            Ok(response) => {
118                Ok(ClientixStream::new(
119                    response.version(),
120                    response.content_length(),
121                    response.status(),
122                    response.url().clone(),
123                    response.remote_addr(),
124                    response.headers().clone(),
125                    response.bytes_stream().map_err(ClientixError::from)
126                ))
127            },
128            Err(error) => Err(error)
129        }
130    }
131
132    pub async fn json<T>(self) -> ClientixResult<ClientixResponse<T>> where T: DeserializeOwned + Clone {
133        match self.result {
134            Ok(response) => {
135                Ok(ClientixResponse::new(
136                    response.version(),
137                    response.content_length(),
138                    response.status(),
139                    response.url().clone(),
140                    response.remote_addr(),
141                    response.headers().clone(),
142                    serde_json::from_str::<T>(response.text().await?.as_str())?
143                ))
144            },
145            Err(error) => Err(error),
146        }
147    }
148
149    pub fn json_stream<T>(self) -> ClientixResult<ClientixStream<T>> where T: DeserializeOwned + Clone {
150        match self.text_stream() {
151            Ok(stream) => {
152                let version = stream.version();
153                let content_length = stream.content_length();
154                let status = stream.status();
155                let url = stream.url().clone();
156                let remote_addr = stream.remote_addr();
157                let headers = stream.headers().clone();
158                let stream = stream
159                    .filter(|line| match line {
160                        Ok(line) if !line.contains("[DONE]") => futures_util::future::ready(true),
161                        _ => futures_util::future::ready(false)
162                    })
163                    .map(|line| match line {
164                        Ok(line) => Ok(serde_json::from_str::<T>(line.as_str())?),
165                        Err(err) => Err(err),
166                    });
167
168                Ok(ClientixStream::new(version, content_length, status, url, remote_addr, headers, stream))
169            }
170            Err(error) => Err(error),
171        }
172    }
173
174    pub async fn xml<T>(self) -> ClientixResult<ClientixResponse<T>> where T: DeserializeOwned + Clone {
175        match self.result {
176            Ok(response) => {
177                Ok(ClientixResponse::new(
178                    response.version(),
179                    response.content_length(),
180                    response.status(),
181                    response.url().clone(),
182                    response.remote_addr(),
183                    response.headers().clone(),
184                    serde_xml_rs::from_str::<T>(response.text().await?.as_str())?
185                ))
186            },
187            Err(error) => Err(error),
188        }
189    }
190    
191    pub async fn urlencoded<T>(self) -> ClientixResult<ClientixResponse<T>> where T: DeserializeOwned + Clone {
192        match self.result {
193            Ok(response) => {
194                Ok(ClientixResponse::new(
195                    response.version(),
196                    response.content_length(),
197                    response.status(),
198                    response.url().clone(),
199                    response.remote_addr(),
200                    response.headers().clone(),
201                    serde_urlencoded::from_str::<T>(response.text().await?.as_str())?
202                ))
203            },
204            Err(error) => Err(error),
205        }
206    }
207    
208}