http_adapter_ureq/
lib.rs

1//! # HTTP adapter implementation for [ureq](https://crates.io/crates/ureq)
2//!
3//! For more details refer to [http-adapter](https://crates.io/crates/http-adapter)
4
5use std::error::Error as StdError;
6use std::fmt::{Debug, Display, Formatter, Result as FmtResult};
7use std::future::Future;
8use std::io::Read;
9use std::pin::Pin;
10use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
11use std::task::{Context, Poll, Waker};
12use std::thread::JoinHandle;
13use std::{io, thread};
14
15pub use ureq;
16
17use http_adapter::async_trait::async_trait;
18use http_adapter::http::HeaderValue;
19use http_adapter::{http, HttpClientAdapter};
20use http_adapter::{Request, Response};
21
22#[derive(Clone, Debug)]
23pub struct UreqAdapter {
24	agent: ureq::Agent,
25}
26
27impl UreqAdapter {
28	pub fn new(agent: ureq::Agent) -> Self {
29		Self { agent }
30	}
31}
32
33impl Default for UreqAdapter {
34	#[inline]
35	fn default() -> Self {
36		Self {
37			agent: ureq::Agent::new_with_defaults(),
38		}
39	}
40}
41
42#[derive(Debug)]
43pub enum Error {
44	Http(http::Error),
45	Ureq(ureq::Error),
46	Io(io::Error),
47	InvalidHeaderValue(HeaderValue),
48	InvalidHttpVersion(String),
49	InvalidStatusCode(u16),
50	InternalCommunicationError(String),
51}
52
53impl Display for Error {
54	fn fmt(&self, f: &mut Formatter) -> FmtResult {
55		match self {
56			Error::Http(e) => Display::fmt(e, f),
57			Error::Ureq(e) => Display::fmt(e, f),
58			Error::Io(e) => Display::fmt(e, f),
59			Error::InvalidHeaderValue(header_value) => {
60				write!(f, "Invalid header value: {header_value:?}")
61			}
62			Error::InvalidHttpVersion(version) => {
63				write!(f, "Invalid HTTP version: {version}")
64			}
65			Error::InvalidStatusCode(code) => {
66				write!(f, "Invalid status code: {code}")
67			}
68			Error::InternalCommunicationError(e) => {
69				write!(f, "Internal communication error: {e}")
70			}
71		}
72	}
73}
74
75impl StdError for Error {}
76
77#[inline]
78fn to_response(res: Response<ureq::Body>) -> Result<Response<Vec<u8>>, Error> {
79	let mut body_read_err = None;
80	let new_resp = res.map(|body| {
81		let mut out = Vec::with_capacity(usize::try_from(body.content_length().unwrap_or(4 * 1024)).unwrap_or(4 * 1024));
82		let res = body.into_reader().read_to_end(&mut out);
83		match res {
84			Ok(_) => out,
85			Err(err) => {
86				body_read_err = Some(err);
87				Vec::new()
88			}
89		}
90	});
91	match body_read_err {
92		None => Ok(new_resp),
93		Some(body_read_err) => Err(Error::Io(body_read_err)),
94	}
95}
96
97#[async_trait(?Send)]
98impl HttpClientAdapter for UreqAdapter {
99	type Error = Error;
100
101	async fn execute(&self, request: Request<Vec<u8>>) -> Result<Response<Vec<u8>>, Self::Error> {
102		let agent = self.agent.clone();
103		let res = ThreadFuture::new(|send_result, recv_waker| {
104			move || {
105				let waker = recv_waker
106					.recv()
107					.map_err(|_| Error::InternalCommunicationError("Waker receive channel is closed".to_string()))?;
108				match agent.run(request).map_err(Error::Ureq) {
109					Ok(res) => send_result
110						.send(to_response(res))
111						.map_err(|_| Error::InternalCommunicationError("Result send channel is closed for Ok result".to_string()))?,
112					Err(e) => send_result
113						.send(Err(e))
114						.map_err(|_| Error::InternalCommunicationError("Result send channel is closed for Err result".to_string()))?,
115				}
116				waker.wake();
117				Ok(())
118			}
119		})
120		.await;
121		match res {
122			FutureResult::CommunicationError(e) => Err(Error::InternalCommunicationError(e)),
123			FutureResult::Result(r) => r,
124		}
125	}
126}
127
128struct ThreadFuture<Res> {
129	thread: Option<JoinHandle<Result<(), Error>>>,
130	recv_result: Receiver<Res>,
131	send_waker: Sender<Waker>,
132	waker_sent: bool,
133}
134
135impl<Res: Send + 'static> ThreadFuture<Res> {
136	pub fn new<Factory, Body>(factory: Factory) -> ThreadFuture<Res>
137	where
138		Factory: FnOnce(Sender<Res>, Receiver<Waker>) -> Body,
139		Body: FnOnce() -> Result<(), Error> + Send + 'static,
140	{
141		let (send_result, recv_result) = channel();
142		let (send_waker, recv_waker) = channel();
143		let body = factory(send_result, recv_waker);
144		let thread = thread::spawn(body);
145		ThreadFuture {
146			thread: Some(thread),
147			recv_result,
148			send_waker,
149			waker_sent: false,
150		}
151	}
152}
153
154impl<Res> Drop for ThreadFuture<Res> {
155	fn drop(&mut self) {
156		if let Some(thread) = self.thread.take() {
157			let _ = thread.join().expect("Can't join thread");
158		}
159	}
160}
161
162impl<Res> Future for ThreadFuture<Res> {
163	type Output = FutureResult<Res>;
164
165	fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
166		if !self.waker_sent {
167			if self.send_waker.send(cx.waker().clone()).is_err() {
168				return Poll::Ready(FutureResult::CommunicationError("Waker send channel is closed".to_string()));
169			}
170			self.waker_sent = true;
171		}
172		match self.recv_result.try_recv() {
173			Ok(res) => Poll::Ready(FutureResult::Result(res)),
174			Err(TryRecvError::Disconnected) => Poll::Ready(FutureResult::CommunicationError(
175				"Result receive channel is closed".to_string(),
176			)),
177			Err(TryRecvError::Empty) => Poll::Pending,
178		}
179	}
180}
181
182enum FutureResult<Res> {
183	CommunicationError(String),
184	Result(Res),
185}