clientix_core/client/asynchronous/
response.rs1use futures_util::StreamExt;
2use bytes::Bytes;
3use encoding_rs::UTF_8;
4use futures_util::TryStreamExt;
5use reqwest::Response;
6use serde::de::DeserializeOwned;
7use crate::client::asynchronous::stream::ClientixStream;
8use crate::client::result::{ClientixError, ClientixResponse, ClientixResult};
9
10pub struct AsyncResponseHandler {
11 result: ClientixResult<Response>
12}
13
14impl AsyncResponseHandler {
15
16 pub fn new(result: ClientixResult<Response>) -> AsyncResponseHandler {
17 AsyncResponseHandler { result }
18 }
19
20 pub async fn text(self) -> ClientixResult<ClientixResponse<String>> {
21 match self.result {
22 Ok(response) => {
23 Ok(ClientixResponse::new(
24 response.version(),
25 response.content_length(),
26 response.status(),
27 response.url().clone(),
28 response.remote_addr(),
29 response.headers().clone(),
30 response.text().await?
31 ))
32 },
33 Err(error) => Err(error),
34 }
35 }
36
37 pub async fn text_with_encoding(self, encoding: &str) -> ClientixResult<ClientixResponse<String>> {
38 match self.result {
39 Ok(response) => {
40 Ok(ClientixResponse::new(
41 response.version(),
42 response.content_length(),
43 response.status(),
44 response.url().clone(),
45 response.remote_addr(),
46 response.headers().clone(),
47 response.text_with_charset(encoding).await?
48 ))
49 },
50 Err(error) => Err(error),
51 }
52 }
53
54 pub fn text_stream(self) -> ClientixResult<ClientixStream<String>> {
55 match self.bytes_stream() {
56 Ok(stream) => {
57 let version = stream.version();
58 let content_length = stream.content_length();
59 let status = stream.status();
60 let url = stream.url().clone();
61 let remote_addr = stream.remote_addr();
62 let headers = stream.headers().clone();
63 let stream = stream
64 .map(|chunk| match chunk {
65 Ok(chunk) => {
66 let (text, _, _) = UTF_8.decode(&chunk);
67 Ok(text.to_string())
68 },
69 Err(error) => Err(error),
70 })
71 .flat_map(|text| {
72 match text {
73 Ok(text) => {
74 let lines: Vec<ClientixResult<String>> = text
75 .split("\n")
76 .map(str::trim)
77 .flat_map(|line| line.strip_prefix("data:"))
78 .map(str::trim)
79 .map(str::to_string)
80 .map(|value| Ok(value))
81 .collect::<>();
82
83 futures_util::stream::iter(lines)
84 }
85 Err(error) => {
86 let lines: Vec<ClientixResult<String>> = vec![Err(error)];
87 futures_util::stream::iter(lines)
88 }
89 }
90 });
91
92 Ok(ClientixStream::new(version, content_length, status, url, remote_addr, headers, stream))
93 },
94 Err(error) => Err(error),
95 }
96 }
97
98 pub async fn bytes(self) -> ClientixResult<ClientixResponse<Bytes>> {
99 match self.result {
100 Ok(response) => {
101 Ok(ClientixResponse::new(
102 response.version(),
103 response.content_length(),
104 response.status(),
105 response.url().clone(),
106 response.remote_addr(),
107 response.headers().clone(),
108 response.bytes().await?
109 ))
110 },
111 Err(error) => Err(error),
112 }
113 }
114
115 pub fn bytes_stream(self) -> ClientixResult<ClientixStream<Bytes>> {
116 match self.result {
117 Ok(response) => {
118 Ok(ClientixStream::new(
119 response.version(),
120 response.content_length(),
121 response.status(),
122 response.url().clone(),
123 response.remote_addr(),
124 response.headers().clone(),
125 response.bytes_stream().map_err(ClientixError::from)
126 ))
127 },
128 Err(error) => Err(error)
129 }
130 }
131
132 pub async fn json<T>(self) -> ClientixResult<ClientixResponse<T>> where T: DeserializeOwned + Clone {
133 match self.result {
134 Ok(response) => {
135 Ok(ClientixResponse::new(
136 response.version(),
137 response.content_length(),
138 response.status(),
139 response.url().clone(),
140 response.remote_addr(),
141 response.headers().clone(),
142 serde_json::from_str::<T>(response.text().await?.as_str())?
143 ))
144 },
145 Err(error) => Err(error),
146 }
147 }
148
149 pub fn json_stream<T>(self) -> ClientixResult<ClientixStream<T>> where T: DeserializeOwned + Clone {
150 match self.text_stream() {
151 Ok(stream) => {
152 let version = stream.version();
153 let content_length = stream.content_length();
154 let status = stream.status();
155 let url = stream.url().clone();
156 let remote_addr = stream.remote_addr();
157 let headers = stream.headers().clone();
158 let stream = stream
159 .filter(|line| match line {
160 Ok(line) if !line.contains("[DONE]") => futures_util::future::ready(true),
161 _ => futures_util::future::ready(false)
162 })
163 .map(|line| match line {
164 Ok(line) => Ok(serde_json::from_str::<T>(line.as_str())?),
165 Err(err) => Err(err),
166 });
167
168 Ok(ClientixStream::new(version, content_length, status, url, remote_addr, headers, stream))
169 }
170 Err(error) => Err(error),
171 }
172 }
173
174}