1use std::error::Error as StdError;
6use std::fmt::{Debug, Display, Formatter, Result as FmtResult};
7use std::future::Future;
8use std::io::Read;
9use std::pin::Pin;
10use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
11use std::task::{Context, Poll, Waker};
12use std::thread::JoinHandle;
13use std::{io, thread};
14
15pub use ureq;
16
17use http_adapter::async_trait::async_trait;
18use http_adapter::http::HeaderValue;
19use http_adapter::{http, HttpClientAdapter};
20use http_adapter::{Request, Response};
21
22#[derive(Clone, Debug)]
23pub struct UreqAdapter {
24 agent: ureq::Agent,
25}
26
27impl UreqAdapter {
28 pub fn new(agent: ureq::Agent) -> Self {
29 Self { agent }
30 }
31}
32
33impl Default for UreqAdapter {
34 #[inline]
35 fn default() -> Self {
36 Self {
37 agent: ureq::Agent::new_with_defaults(),
38 }
39 }
40}
41
42#[derive(Debug)]
43pub enum Error {
44 Http(http::Error),
45 Ureq(ureq::Error),
46 Io(io::Error),
47 InvalidHeaderValue(HeaderValue),
48 InvalidHttpVersion(String),
49 InvalidStatusCode(u16),
50 InternalCommunicationError(String),
51}
52
53impl Display for Error {
54 fn fmt(&self, f: &mut Formatter) -> FmtResult {
55 match self {
56 Error::Http(e) => Display::fmt(e, f),
57 Error::Ureq(e) => Display::fmt(e, f),
58 Error::Io(e) => Display::fmt(e, f),
59 Error::InvalidHeaderValue(header_value) => {
60 write!(f, "Invalid header value: {header_value:?}")
61 }
62 Error::InvalidHttpVersion(version) => {
63 write!(f, "Invalid HTTP version: {version}")
64 }
65 Error::InvalidStatusCode(code) => {
66 write!(f, "Invalid status code: {code}")
67 }
68 Error::InternalCommunicationError(e) => {
69 write!(f, "Internal communication error: {e}")
70 }
71 }
72 }
73}
74
75impl StdError for Error {}
76
77#[inline]
78fn to_response(res: Response<ureq::Body>) -> Result<Response<Vec<u8>>, Error> {
79 let mut body_read_err = None;
80 let new_resp = res.map(|body| {
81 let mut out = Vec::with_capacity(usize::try_from(body.content_length().unwrap_or(4 * 1024)).unwrap_or(4 * 1024));
82 let res = body.into_reader().read_to_end(&mut out);
83 match res {
84 Ok(_) => out,
85 Err(err) => {
86 body_read_err = Some(err);
87 Vec::new()
88 }
89 }
90 });
91 match body_read_err {
92 None => Ok(new_resp),
93 Some(body_read_err) => Err(Error::Io(body_read_err)),
94 }
95}
96
97#[async_trait(?Send)]
98impl HttpClientAdapter for UreqAdapter {
99 type Error = Error;
100
101 async fn execute(&self, request: Request<Vec<u8>>) -> Result<Response<Vec<u8>>, Self::Error> {
102 let agent = self.agent.clone();
103 let res = ThreadFuture::new(|send_result, recv_waker| {
104 move || {
105 let waker = recv_waker
106 .recv()
107 .map_err(|_| Error::InternalCommunicationError("Waker receive channel is closed".to_string()))?;
108 match agent.run(request).map_err(Error::Ureq) {
109 Ok(res) => send_result
110 .send(to_response(res))
111 .map_err(|_| Error::InternalCommunicationError("Result send channel is closed for Ok result".to_string()))?,
112 Err(e) => send_result
113 .send(Err(e))
114 .map_err(|_| Error::InternalCommunicationError("Result send channel is closed for Err result".to_string()))?,
115 }
116 waker.wake();
117 Ok(())
118 }
119 })
120 .await;
121 match res {
122 FutureResult::CommunicationError(e) => Err(Error::InternalCommunicationError(e)),
123 FutureResult::Result(r) => r,
124 }
125 }
126}
127
128struct ThreadFuture<Res> {
129 thread: Option<JoinHandle<Result<(), Error>>>,
130 recv_result: Receiver<Res>,
131 send_waker: Sender<Waker>,
132 waker_sent: bool,
133}
134
135impl<Res: Send + 'static> ThreadFuture<Res> {
136 pub fn new<Factory, Body>(factory: Factory) -> ThreadFuture<Res>
137 where
138 Factory: FnOnce(Sender<Res>, Receiver<Waker>) -> Body,
139 Body: FnOnce() -> Result<(), Error> + Send + 'static,
140 {
141 let (send_result, recv_result) = channel();
142 let (send_waker, recv_waker) = channel();
143 let body = factory(send_result, recv_waker);
144 let thread = thread::spawn(body);
145 ThreadFuture {
146 thread: Some(thread),
147 recv_result,
148 send_waker,
149 waker_sent: false,
150 }
151 }
152}
153
154impl<Res> Drop for ThreadFuture<Res> {
155 fn drop(&mut self) {
156 if let Some(thread) = self.thread.take() {
157 let _ = thread.join().expect("Can't join thread");
158 }
159 }
160}
161
162impl<Res> Future for ThreadFuture<Res> {
163 type Output = FutureResult<Res>;
164
165 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
166 if !self.waker_sent {
167 if self.send_waker.send(cx.waker().clone()).is_err() {
168 return Poll::Ready(FutureResult::CommunicationError("Waker send channel is closed".to_string()));
169 }
170 self.waker_sent = true;
171 }
172 match self.recv_result.try_recv() {
173 Ok(res) => Poll::Ready(FutureResult::Result(res)),
174 Err(TryRecvError::Disconnected) => Poll::Ready(FutureResult::CommunicationError(
175 "Result receive channel is closed".to_string(),
176 )),
177 Err(TryRecvError::Empty) => Poll::Pending,
178 }
179 }
180}
181
182enum FutureResult<Res> {
183 CommunicationError(String),
184 Result(Res),
185}