lightning_block_sync/
http.rs

1//! Simple HTTP implementation which supports both async and traditional execution environments
2//! with minimal dependencies. This is used as the basis for REST and RPC clients.
3
4use chunked_transfer;
5use serde_json;
6
7use std::convert::TryFrom;
8use std::fmt;
9#[cfg(not(feature = "tokio"))]
10use std::io::Write;
11use std::net::{SocketAddr, ToSocketAddrs};
12use std::time::Duration;
13
14#[cfg(feature = "tokio")]
15use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
16#[cfg(feature = "tokio")]
17use tokio::net::TcpStream;
18
19#[cfg(not(feature = "tokio"))]
20use std::io::BufRead;
21use std::io::Read;
22#[cfg(not(feature = "tokio"))]
23use std::net::TcpStream;
24
25/// Timeout for operations on TCP streams.
26const TCP_STREAM_TIMEOUT: Duration = Duration::from_secs(5);
27
28/// Timeout for reading the first byte of a response. This is separate from the general read
29/// timeout as it is not uncommon for Bitcoin Core to be blocked waiting on UTXO cache flushes for
30/// upwards of 10 minutes on slow devices (e.g. RPis with SSDs over USB). Note that we always retry
31/// once when we time out, so the maximum time we allow Bitcoin Core to block for is twice this
32/// value.
33const TCP_STREAM_RESPONSE_TIMEOUT: Duration = Duration::from_secs(300);
34
35/// Maximum HTTP message header size in bytes.
36const MAX_HTTP_MESSAGE_HEADER_SIZE: usize = 8192;
37
38/// Maximum HTTP message body size in bytes. Enough for a hex-encoded block in JSON format and any
39/// overhead for HTTP chunked transfer encoding.
40const MAX_HTTP_MESSAGE_BODY_SIZE: usize = 2 * 4_000_000 + 32_000;
41
42/// Endpoint for interacting with an HTTP-based API.
43#[derive(Debug)]
44pub struct HttpEndpoint {
45	host: String,
46	port: Option<u16>,
47	path: String,
48}
49
50impl HttpEndpoint {
51	/// Creates an endpoint for the given host and default HTTP port.
52	pub fn for_host(host: String) -> Self {
53		Self { host, port: None, path: String::from("/") }
54	}
55
56	/// Specifies a port to use with the endpoint.
57	pub fn with_port(mut self, port: u16) -> Self {
58		self.port = Some(port);
59		self
60	}
61
62	/// Specifies a path to use with the endpoint.
63	pub fn with_path(mut self, path: String) -> Self {
64		self.path = path;
65		self
66	}
67
68	/// Returns the endpoint host.
69	pub fn host(&self) -> &str {
70		&self.host
71	}
72
73	/// Returns the endpoint port.
74	pub fn port(&self) -> u16 {
75		match self.port {
76			None => 80,
77			Some(port) => port,
78		}
79	}
80
81	/// Returns the endpoint path.
82	pub fn path(&self) -> &str {
83		&self.path
84	}
85}
86
87impl<'a> std::net::ToSocketAddrs for &'a HttpEndpoint {
88	type Iter = <(&'a str, u16) as std::net::ToSocketAddrs>::Iter;
89
90	fn to_socket_addrs(&self) -> std::io::Result<Self::Iter> {
91		(self.host(), self.port()).to_socket_addrs()
92	}
93}
94
95/// Client for making HTTP requests.
96pub(crate) struct HttpClient {
97	address: SocketAddr,
98	stream: TcpStream,
99}
100
101impl HttpClient {
102	/// Opens a connection to an HTTP endpoint.
103	pub fn connect<E: ToSocketAddrs>(endpoint: E) -> std::io::Result<Self> {
104		let address = match endpoint.to_socket_addrs()?.next() {
105			None => {
106				return Err(std::io::Error::new(
107					std::io::ErrorKind::InvalidInput,
108					"could not resolve to any addresses",
109				));
110			},
111			Some(address) => address,
112		};
113		let stream = std::net::TcpStream::connect_timeout(&address, TCP_STREAM_TIMEOUT)?;
114		stream.set_read_timeout(Some(TCP_STREAM_TIMEOUT))?;
115		stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT))?;
116
117		#[cfg(feature = "tokio")]
118		let stream = {
119			stream.set_nonblocking(true)?;
120			TcpStream::from_std(stream)?
121		};
122
123		Ok(Self { address, stream })
124	}
125
126	/// Sends a `GET` request for a resource identified by `uri` at the `host`.
127	///
128	/// Returns the response body in `F` format.
129	#[allow(dead_code)]
130	pub async fn get<F>(&mut self, uri: &str, host: &str) -> std::io::Result<F>
131	where
132		F: TryFrom<Vec<u8>, Error = std::io::Error>,
133	{
134		let request = format!(
135			"GET {} HTTP/1.1\r\n\
136			 Host: {}\r\n\
137			 Connection: keep-alive\r\n\
138			 \r\n",
139			uri, host
140		);
141		let response_body = self.send_request_with_retry(&request).await?;
142		F::try_from(response_body)
143	}
144
145	/// Sends a `POST` request for a resource identified by `uri` at the `host` using the given HTTP
146	/// authentication credentials.
147	///
148	/// The request body consists of the provided JSON `content`. Returns the response body in `F`
149	/// format.
150	#[allow(dead_code)]
151	pub async fn post<F>(
152		&mut self, uri: &str, host: &str, auth: &str, content: serde_json::Value,
153	) -> std::io::Result<F>
154	where
155		F: TryFrom<Vec<u8>, Error = std::io::Error>,
156	{
157		let content = content.to_string();
158		let request = format!(
159			"POST {} HTTP/1.1\r\n\
160			 Host: {}\r\n\
161			 Authorization: {}\r\n\
162			 Connection: keep-alive\r\n\
163			 Content-Type: application/json\r\n\
164			 Content-Length: {}\r\n\
165			 \r\n\
166			 {}",
167			uri,
168			host,
169			auth,
170			content.len(),
171			content
172		);
173		let response_body = self.send_request_with_retry(&request).await?;
174		F::try_from(response_body)
175	}
176
177	/// Sends an HTTP request message and reads the response, returning its body. Attempts to
178	/// reconnect and retry if the connection has been closed.
179	async fn send_request_with_retry(&mut self, request: &str) -> std::io::Result<Vec<u8>> {
180		match self.send_request(request).await {
181			Ok(bytes) => Ok(bytes),
182			Err(_) => {
183				// Reconnect and retry on fail. This can happen if the connection was closed after
184				// the keep-alive limits are reached, or generally if the request timed out due to
185				// Bitcoin Core being stuck on a long-running operation or its RPC queue being
186				// full.
187				// Block 100ms before retrying the request as in many cases the source of the error
188				// may be persistent for some time.
189				#[cfg(feature = "tokio")]
190				tokio::time::sleep(Duration::from_millis(100)).await;
191				#[cfg(not(feature = "tokio"))]
192				std::thread::sleep(Duration::from_millis(100));
193				*self = Self::connect(self.address)?;
194				self.send_request(request).await
195			},
196		}
197	}
198
199	/// Sends an HTTP request message and reads the response, returning its body.
200	async fn send_request(&mut self, request: &str) -> std::io::Result<Vec<u8>> {
201		self.write_request(request).await?;
202		self.read_response().await
203	}
204
205	/// Writes an HTTP request message.
206	async fn write_request(&mut self, request: &str) -> std::io::Result<()> {
207		#[cfg(feature = "tokio")]
208		{
209			self.stream.write_all(request.as_bytes()).await?;
210			self.stream.flush().await
211		}
212		#[cfg(not(feature = "tokio"))]
213		{
214			self.stream.write_all(request.as_bytes())?;
215			self.stream.flush()
216		}
217	}
218
219	/// Reads an HTTP response message.
220	async fn read_response(&mut self) -> std::io::Result<Vec<u8>> {
221		#[cfg(feature = "tokio")]
222		let stream = self.stream.split().0;
223		#[cfg(not(feature = "tokio"))]
224		let stream = std::io::Read::by_ref(&mut self.stream);
225
226		let limited_stream = stream.take(MAX_HTTP_MESSAGE_HEADER_SIZE as u64);
227
228		#[cfg(feature = "tokio")]
229		let mut reader = tokio::io::BufReader::new(limited_stream);
230		#[cfg(not(feature = "tokio"))]
231		let mut reader = std::io::BufReader::new(limited_stream);
232
233		macro_rules! read_line {
234			() => {
235				read_line!(0)
236			};
237			($retry_count: expr) => {{
238				let mut line = String::new();
239				let mut timeout_count: u64 = 0;
240				let bytes_read = loop {
241					#[cfg(feature = "tokio")]
242					let read_res = reader.read_line(&mut line).await;
243					#[cfg(not(feature = "tokio"))]
244					let read_res = reader.read_line(&mut line);
245					match read_res {
246						Ok(bytes_read) => break bytes_read,
247						Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
248							timeout_count += 1;
249							if timeout_count > $retry_count {
250								return Err(e);
251							} else {
252								continue;
253							}
254						},
255						Err(e) => return Err(e),
256					}
257				};
258
259				match bytes_read {
260					0 => None,
261					_ => {
262						// Remove trailing CRLF
263						if line.ends_with('\n') {
264							line.pop();
265							if line.ends_with('\r') {
266								line.pop();
267							}
268						}
269						Some(line)
270					},
271				}
272			}};
273		}
274
275		// Read and parse status line
276		// Note that we allow retrying a few times to reach TCP_STREAM_RESPONSE_TIMEOUT.
277		let status_line =
278			read_line!(TCP_STREAM_RESPONSE_TIMEOUT.as_secs() / TCP_STREAM_TIMEOUT.as_secs())
279				.ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no status line"))?;
280		let status = HttpStatus::parse(&status_line)?;
281
282		// Read and parse relevant headers
283		let mut message_length = HttpMessageLength::Empty;
284		loop {
285			let line = read_line!()
286				.ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no headers"))?;
287			if line.is_empty() {
288				break;
289			}
290
291			let header = HttpHeader::parse(&line)?;
292			if header.has_name("Content-Length") {
293				let length = header
294					.value
295					.parse()
296					.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
297				if let HttpMessageLength::Empty = message_length {
298					message_length = HttpMessageLength::ContentLength(length);
299				}
300				continue;
301			}
302
303			if header.has_name("Transfer-Encoding") {
304				message_length = HttpMessageLength::TransferEncoding(header.value.into());
305				continue;
306			}
307		}
308
309		// Read message body
310		let read_limit = MAX_HTTP_MESSAGE_BODY_SIZE - reader.buffer().len();
311		reader.get_mut().set_limit(read_limit as u64);
312		let contents = match message_length {
313			HttpMessageLength::Empty => Vec::new(),
314			HttpMessageLength::ContentLength(length) => {
315				if length == 0 || length > MAX_HTTP_MESSAGE_BODY_SIZE {
316					return Err(std::io::Error::new(
317						std::io::ErrorKind::InvalidData,
318						format!("invalid response length: {} bytes", length),
319					));
320				} else {
321					let mut content = vec![0; length];
322					#[cfg(feature = "tokio")]
323					reader.read_exact(&mut content[..]).await?;
324					#[cfg(not(feature = "tokio"))]
325					reader.read_exact(&mut content[..])?;
326					content
327				}
328			},
329			HttpMessageLength::TransferEncoding(coding) => {
330				if !coding.eq_ignore_ascii_case("chunked") {
331					return Err(std::io::Error::new(
332						std::io::ErrorKind::InvalidInput,
333						"unsupported transfer coding",
334					));
335				} else {
336					let mut content = Vec::new();
337					#[cfg(feature = "tokio")]
338					{
339						// Since chunked_transfer doesn't have an async interface, only use it to
340						// determine the size of each chunk to read.
341						//
342						// TODO: Replace with an async interface when available.
343						// https://github.com/frewsxcv/rust-chunked-transfer/issues/7
344						loop {
345							// Read the chunk header which contains the chunk size.
346							let mut chunk_header = String::new();
347							reader.read_line(&mut chunk_header).await?;
348							if chunk_header == "0\r\n" {
349								// Read the terminator chunk since the decoder consumes the CRLF
350								// immediately when this chunk is encountered.
351								reader.read_line(&mut chunk_header).await?;
352							}
353
354							// Decode the chunk header to obtain the chunk size.
355							let mut buffer = Vec::new();
356							let mut decoder =
357								chunked_transfer::Decoder::new(chunk_header.as_bytes());
358							decoder.read_to_end(&mut buffer)?;
359
360							// Read the chunk body.
361							let chunk_size = match decoder.remaining_chunks_size() {
362								None => break,
363								Some(chunk_size) => chunk_size,
364							};
365							let chunk_offset = content.len();
366							content.resize(chunk_offset + chunk_size + "\r\n".len(), 0);
367							reader.read_exact(&mut content[chunk_offset..]).await?;
368							content.resize(chunk_offset + chunk_size, 0);
369						}
370						content
371					}
372					#[cfg(not(feature = "tokio"))]
373					{
374						let mut decoder = chunked_transfer::Decoder::new(reader);
375						decoder.read_to_end(&mut content)?;
376						content
377					}
378				}
379			},
380		};
381
382		if !status.is_ok() {
383			// TODO: Handle 3xx redirection responses.
384			let error = HttpError { status_code: status.code.to_string(), contents };
385			return Err(std::io::Error::new(std::io::ErrorKind::Other, error));
386		}
387
388		Ok(contents)
389	}
390}
391
392/// HTTP error consisting of a status code and body contents.
393#[derive(Debug)]
394pub(crate) struct HttpError {
395	pub(crate) status_code: String,
396	pub(crate) contents: Vec<u8>,
397}
398
399impl std::error::Error for HttpError {}
400
401impl fmt::Display for HttpError {
402	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
403		let contents = String::from_utf8_lossy(&self.contents);
404		write!(f, "status_code: {}, contents: {}", self.status_code, contents)
405	}
406}
407
408/// HTTP response status code as defined by [RFC 7231].
409///
410/// [RFC 7231]: https://tools.ietf.org/html/rfc7231#section-6
411struct HttpStatus<'a> {
412	code: &'a str,
413}
414
415impl<'a> HttpStatus<'a> {
416	/// Parses an HTTP status line as defined by [RFC 7230].
417	///
418	/// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.1.2
419	fn parse(line: &'a String) -> std::io::Result<HttpStatus<'a>> {
420		let mut tokens = line.splitn(3, ' ');
421
422		let http_version = tokens
423			.next()
424			.ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no HTTP-Version"))?;
425		if !http_version.eq_ignore_ascii_case("HTTP/1.1")
426			&& !http_version.eq_ignore_ascii_case("HTTP/1.0")
427		{
428			return Err(std::io::Error::new(
429				std::io::ErrorKind::InvalidData,
430				"invalid HTTP-Version",
431			));
432		}
433
434		let code = tokens
435			.next()
436			.ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Status-Code"))?;
437		if code.len() != 3 || !code.chars().all(|c| c.is_ascii_digit()) {
438			return Err(std::io::Error::new(
439				std::io::ErrorKind::InvalidData,
440				"invalid Status-Code",
441			));
442		}
443
444		let _reason = tokens
445			.next()
446			.ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Reason-Phrase"))?;
447
448		Ok(Self { code })
449	}
450
451	/// Returns whether the status is successful (i.e., 2xx status class).
452	fn is_ok(&self) -> bool {
453		self.code.starts_with('2')
454	}
455}
456
457/// HTTP response header as defined by [RFC 7231].
458///
459/// [RFC 7231]: https://tools.ietf.org/html/rfc7231#section-7
460struct HttpHeader<'a> {
461	name: &'a str,
462	value: &'a str,
463}
464
465impl<'a> HttpHeader<'a> {
466	/// Parses an HTTP header field as defined by [RFC 7230].
467	///
468	/// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.2
469	fn parse(line: &'a String) -> std::io::Result<HttpHeader<'a>> {
470		let mut tokens = line.splitn(2, ':');
471		let name = tokens
472			.next()
473			.ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header name"))?;
474		let value = tokens
475			.next()
476			.ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header value"))?
477			.trim_start();
478		Ok(Self { name, value })
479	}
480
481	/// Returns whether the header field has the given name.
482	fn has_name(&self, name: &str) -> bool {
483		self.name.eq_ignore_ascii_case(name)
484	}
485}
486
487/// HTTP message body length as defined by [RFC 7230].
488///
489/// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.3.3
490enum HttpMessageLength {
491	Empty,
492	ContentLength(usize),
493	TransferEncoding(String),
494}
495
496/// An HTTP response body in binary format.
497pub struct BinaryResponse(pub Vec<u8>);
498
499/// An HTTP response body in JSON format.
500pub struct JsonResponse(pub serde_json::Value);
501
502/// Interprets bytes from an HTTP response body as binary data.
503impl TryFrom<Vec<u8>> for BinaryResponse {
504	type Error = std::io::Error;
505
506	fn try_from(bytes: Vec<u8>) -> std::io::Result<Self> {
507		Ok(BinaryResponse(bytes))
508	}
509}
510
511/// Interprets bytes from an HTTP response body as a JSON value.
512impl TryFrom<Vec<u8>> for JsonResponse {
513	type Error = std::io::Error;
514
515	fn try_from(bytes: Vec<u8>) -> std::io::Result<Self> {
516		Ok(JsonResponse(serde_json::from_slice(&bytes)?))
517	}
518}
519
520#[cfg(test)]
521mod endpoint_tests {
522	use super::HttpEndpoint;
523
524	#[test]
525	fn with_default_port() {
526		let endpoint = HttpEndpoint::for_host("foo.com".into());
527		assert_eq!(endpoint.host(), "foo.com");
528		assert_eq!(endpoint.port(), 80);
529	}
530
531	#[test]
532	fn with_custom_port() {
533		let endpoint = HttpEndpoint::for_host("foo.com".into()).with_port(8080);
534		assert_eq!(endpoint.host(), "foo.com");
535		assert_eq!(endpoint.port(), 8080);
536	}
537
538	#[test]
539	fn with_uri_path() {
540		let endpoint = HttpEndpoint::for_host("foo.com".into()).with_path("/path".into());
541		assert_eq!(endpoint.host(), "foo.com");
542		assert_eq!(endpoint.path(), "/path");
543	}
544
545	#[test]
546	fn without_uri_path() {
547		let endpoint = HttpEndpoint::for_host("foo.com".into());
548		assert_eq!(endpoint.host(), "foo.com");
549		assert_eq!(endpoint.path(), "/");
550	}
551
552	#[test]
553	fn convert_to_socket_addrs() {
554		let endpoint = HttpEndpoint::for_host("localhost".into());
555		let host = endpoint.host();
556		let port = endpoint.port();
557
558		use std::net::ToSocketAddrs;
559		match (&endpoint).to_socket_addrs() {
560			Err(e) => panic!("Unexpected error: {:?}", e),
561			Ok(socket_addrs) => {
562				let mut std_addrs = (host, port).to_socket_addrs().unwrap();
563				for addr in socket_addrs {
564					assert_eq!(addr, std_addrs.next().unwrap());
565				}
566				assert!(std_addrs.next().is_none());
567			},
568		}
569	}
570}
571
572#[cfg(test)]
573pub(crate) mod client_tests {
574	use super::*;
575	use std::io::BufRead;
576	use std::io::Write;
577
578	/// Server for handling HTTP client requests with a stock response.
579	pub struct HttpServer {
580		address: std::net::SocketAddr,
581		handler: std::thread::JoinHandle<()>,
582		shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
583	}
584
585	/// Body of HTTP response messages.
586	pub enum MessageBody<T: ToString> {
587		Empty,
588		Content(T),
589		ChunkedContent(T),
590	}
591
592	impl HttpServer {
593		fn responding_with_body<T: ToString>(status: &str, body: MessageBody<T>) -> Self {
594			let response = match body {
595				MessageBody::Empty => format!("{}\r\n\r\n", status),
596				MessageBody::Content(body) => {
597					let body = body.to_string();
598					format!(
599						"{}\r\n\
600						 Content-Length: {}\r\n\
601						 \r\n\
602						 {}",
603						status,
604						body.len(),
605						body
606					)
607				},
608				MessageBody::ChunkedContent(body) => {
609					let mut chuncked_body = Vec::new();
610					{
611						use chunked_transfer::Encoder;
612						let mut encoder = Encoder::with_chunks_size(&mut chuncked_body, 8);
613						encoder.write_all(body.to_string().as_bytes()).unwrap();
614					}
615					format!(
616						"{}\r\n\
617						 Transfer-Encoding: chunked\r\n\
618						 \r\n\
619						 {}",
620						status,
621						String::from_utf8(chuncked_body).unwrap()
622					)
623				},
624			};
625			HttpServer::responding_with(response)
626		}
627
628		pub fn responding_with_ok<T: ToString>(body: MessageBody<T>) -> Self {
629			HttpServer::responding_with_body("HTTP/1.1 200 OK", body)
630		}
631
632		pub fn responding_with_not_found() -> Self {
633			HttpServer::responding_with_body::<String>("HTTP/1.1 404 Not Found", MessageBody::Empty)
634		}
635
636		pub fn responding_with_server_error<T: ToString>(content: T) -> Self {
637			let body = MessageBody::Content(content);
638			HttpServer::responding_with_body("HTTP/1.1 500 Internal Server Error", body)
639		}
640
641		fn responding_with(response: String) -> Self {
642			let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
643			let address = listener.local_addr().unwrap();
644
645			let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
646			let shutdown_signaled = std::sync::Arc::clone(&shutdown);
647			let handler = std::thread::spawn(move || {
648				for stream in listener.incoming() {
649					let mut stream = stream.unwrap();
650					stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT)).unwrap();
651
652					let lines_read = std::io::BufReader::new(&stream)
653						.lines()
654						.take_while(|line| !line.as_ref().unwrap().is_empty())
655						.count();
656					if lines_read == 0 {
657						continue;
658					}
659
660					for chunk in response.as_bytes().chunks(16) {
661						if shutdown_signaled.load(std::sync::atomic::Ordering::SeqCst) {
662							return;
663						} else {
664							if let Err(_) = stream.write(chunk) {
665								break;
666							}
667							if let Err(_) = stream.flush() {
668								break;
669							}
670						}
671					}
672				}
673			});
674
675			Self { address, handler, shutdown }
676		}
677
678		fn shutdown(self) {
679			self.shutdown.store(true, std::sync::atomic::Ordering::SeqCst);
680			self.handler.join().unwrap();
681		}
682
683		pub fn endpoint(&self) -> HttpEndpoint {
684			HttpEndpoint::for_host(self.address.ip().to_string()).with_port(self.address.port())
685		}
686	}
687
688	#[test]
689	fn connect_to_unresolvable_host() {
690		match HttpClient::connect(("example.invalid", 80)) {
691			Err(e) => {
692				assert!(
693					e.to_string().contains("failed to lookup address information")
694						|| e.to_string().contains("No such host"),
695					"{:?}",
696					e
697				);
698			},
699			Ok(_) => panic!("Expected error"),
700		}
701	}
702
703	#[test]
704	fn connect_with_no_socket_address() {
705		match HttpClient::connect(&vec![][..]) {
706			Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput),
707			Ok(_) => panic!("Expected error"),
708		}
709	}
710
711	#[test]
712	fn connect_with_unknown_server() {
713		match HttpClient::connect(("::", 80)) {
714			#[cfg(target_os = "windows")]
715			Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::AddrNotAvailable),
716			#[cfg(not(target_os = "windows"))]
717			Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::ConnectionRefused),
718			Ok(_) => panic!("Expected error"),
719		}
720	}
721
722	#[tokio::test]
723	async fn connect_with_valid_endpoint() {
724		let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
725
726		match HttpClient::connect(&server.endpoint()) {
727			Err(e) => panic!("Unexpected error: {:?}", e),
728			Ok(_) => {},
729		}
730	}
731
732	#[tokio::test]
733	async fn read_empty_message() {
734		let server = HttpServer::responding_with("".to_string());
735
736		let mut client = HttpClient::connect(&server.endpoint()).unwrap();
737		match client.get::<BinaryResponse>("/foo", "foo.com").await {
738			Err(e) => {
739				assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
740				assert_eq!(e.get_ref().unwrap().to_string(), "no status line");
741			},
742			Ok(_) => panic!("Expected error"),
743		}
744	}
745
746	#[tokio::test]
747	async fn read_incomplete_message() {
748		let server = HttpServer::responding_with("HTTP/1.1 200 OK".to_string());
749
750		let mut client = HttpClient::connect(&server.endpoint()).unwrap();
751		match client.get::<BinaryResponse>("/foo", "foo.com").await {
752			Err(e) => {
753				assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
754				assert_eq!(e.get_ref().unwrap().to_string(), "no headers");
755			},
756			Ok(_) => panic!("Expected error"),
757		}
758	}
759
760	#[tokio::test]
761	async fn read_too_large_message_headers() {
762		let response = format!(
763			"HTTP/1.1 302 Found\r\n\
764			 Location: {}\r\n\
765			 \r\n",
766			"Z".repeat(MAX_HTTP_MESSAGE_HEADER_SIZE)
767		);
768		let server = HttpServer::responding_with(response);
769
770		let mut client = HttpClient::connect(&server.endpoint()).unwrap();
771		match client.get::<BinaryResponse>("/foo", "foo.com").await {
772			Err(e) => {
773				assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
774				assert_eq!(e.get_ref().unwrap().to_string(), "no headers");
775			},
776			Ok(_) => panic!("Expected error"),
777		}
778	}
779
780	#[tokio::test]
781	async fn read_too_large_message_body() {
782		let body = "Z".repeat(MAX_HTTP_MESSAGE_BODY_SIZE + 1);
783		let server = HttpServer::responding_with_ok::<String>(MessageBody::Content(body));
784
785		let mut client = HttpClient::connect(&server.endpoint()).unwrap();
786		match client.get::<BinaryResponse>("/foo", "foo.com").await {
787			Err(e) => {
788				assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
789				assert_eq!(
790					e.get_ref().unwrap().to_string(),
791					"invalid response length: 8032001 bytes"
792				);
793			},
794			Ok(_) => panic!("Expected error"),
795		}
796		server.shutdown();
797	}
798
799	#[tokio::test]
800	async fn read_message_with_unsupported_transfer_coding() {
801		let response = String::from(
802			"HTTP/1.1 200 OK\r\n\
803			 Transfer-Encoding: gzip\r\n\
804			 \r\n\
805			 foobar",
806		);
807		let server = HttpServer::responding_with(response);
808
809		let mut client = HttpClient::connect(&server.endpoint()).unwrap();
810		match client.get::<BinaryResponse>("/foo", "foo.com").await {
811			Err(e) => {
812				assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput);
813				assert_eq!(e.get_ref().unwrap().to_string(), "unsupported transfer coding");
814			},
815			Ok(_) => panic!("Expected error"),
816		}
817	}
818
819	#[tokio::test]
820	async fn read_error() {
821		let server = HttpServer::responding_with_server_error("foo");
822
823		let mut client = HttpClient::connect(&server.endpoint()).unwrap();
824		match client.get::<JsonResponse>("/foo", "foo.com").await {
825			Err(e) => {
826				assert_eq!(e.kind(), std::io::ErrorKind::Other);
827				let http_error = e.into_inner().unwrap().downcast::<HttpError>().unwrap();
828				assert_eq!(http_error.status_code, "500");
829				assert_eq!(http_error.contents, "foo".as_bytes());
830			},
831			Ok(_) => panic!("Expected error"),
832		}
833	}
834
835	#[tokio::test]
836	async fn read_empty_message_body() {
837		let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
838
839		let mut client = HttpClient::connect(&server.endpoint()).unwrap();
840		match client.get::<BinaryResponse>("/foo", "foo.com").await {
841			Err(e) => panic!("Unexpected error: {:?}", e),
842			Ok(bytes) => assert_eq!(bytes.0, Vec::<u8>::new()),
843		}
844	}
845
846	#[tokio::test]
847	async fn read_message_body_with_length() {
848		let body = "foo bar baz qux".repeat(32);
849		let content = MessageBody::Content(body.clone());
850		let server = HttpServer::responding_with_ok::<String>(content);
851
852		let mut client = HttpClient::connect(&server.endpoint()).unwrap();
853		match client.get::<BinaryResponse>("/foo", "foo.com").await {
854			Err(e) => panic!("Unexpected error: {:?}", e),
855			Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()),
856		}
857	}
858
859	#[tokio::test]
860	async fn read_chunked_message_body() {
861		let body = "foo bar baz qux".repeat(32);
862		let chunked_content = MessageBody::ChunkedContent(body.clone());
863		let server = HttpServer::responding_with_ok::<String>(chunked_content);
864
865		let mut client = HttpClient::connect(&server.endpoint()).unwrap();
866		match client.get::<BinaryResponse>("/foo", "foo.com").await {
867			Err(e) => panic!("Unexpected error: {:?}", e),
868			Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()),
869		}
870	}
871
872	#[tokio::test]
873	async fn reconnect_closed_connection() {
874		let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
875
876		let mut client = HttpClient::connect(&server.endpoint()).unwrap();
877		assert!(client.get::<BinaryResponse>("/foo", "foo.com").await.is_ok());
878		match client.get::<BinaryResponse>("/foo", "foo.com").await {
879			Err(e) => panic!("Unexpected error: {:?}", e),
880			Ok(bytes) => assert_eq!(bytes.0, Vec::<u8>::new()),
881		}
882	}
883
884	#[test]
885	fn from_bytes_into_binary_response() {
886		let bytes = b"foo";
887		match BinaryResponse::try_from(bytes.to_vec()) {
888			Err(e) => panic!("Unexpected error: {:?}", e),
889			Ok(response) => assert_eq!(&response.0, bytes),
890		}
891	}
892
893	#[test]
894	fn from_invalid_bytes_into_json_response() {
895		let json = serde_json::json!({ "result": 42 });
896		match JsonResponse::try_from(json.to_string().as_bytes()[..5].to_vec()) {
897			Err(_) => {},
898			Ok(_) => panic!("Expected error"),
899		}
900	}
901
902	#[test]
903	fn from_valid_bytes_into_json_response() {
904		let json = serde_json::json!({ "result": 42 });
905		match JsonResponse::try_from(json.to_string().as_bytes().to_vec()) {
906			Err(e) => panic!("Unexpected error: {:?}", e),
907			Ok(response) => assert_eq!(response.0, json),
908		}
909	}
910}