1use chunked_transfer;
5use serde_json;
6
7use std::convert::TryFrom;
8use std::fmt;
9#[cfg(not(feature = "tokio"))]
10use std::io::Write;
11use std::net::{SocketAddr, ToSocketAddrs};
12use std::time::Duration;
13
14#[cfg(feature = "tokio")]
15use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
16#[cfg(feature = "tokio")]
17use tokio::net::TcpStream;
18
19#[cfg(not(feature = "tokio"))]
20use std::io::BufRead;
21use std::io::Read;
22#[cfg(not(feature = "tokio"))]
23use std::net::TcpStream;
24
25const TCP_STREAM_TIMEOUT: Duration = Duration::from_secs(5);
27
28const TCP_STREAM_RESPONSE_TIMEOUT: Duration = Duration::from_secs(300);
34
35const MAX_HTTP_MESSAGE_HEADER_SIZE: usize = 8192;
37
38const MAX_HTTP_MESSAGE_BODY_SIZE: usize = 2 * 4_000_000 + 32_000;
41
42#[derive(Debug)]
44pub struct HttpEndpoint {
45 host: String,
46 port: Option<u16>,
47 path: String,
48}
49
50impl HttpEndpoint {
51 pub fn for_host(host: String) -> Self {
53 Self { host, port: None, path: String::from("/") }
54 }
55
56 pub fn with_port(mut self, port: u16) -> Self {
58 self.port = Some(port);
59 self
60 }
61
62 pub fn with_path(mut self, path: String) -> Self {
64 self.path = path;
65 self
66 }
67
68 pub fn host(&self) -> &str {
70 &self.host
71 }
72
73 pub fn port(&self) -> u16 {
75 match self.port {
76 None => 80,
77 Some(port) => port,
78 }
79 }
80
81 pub fn path(&self) -> &str {
83 &self.path
84 }
85}
86
87impl<'a> std::net::ToSocketAddrs for &'a HttpEndpoint {
88 type Iter = <(&'a str, u16) as std::net::ToSocketAddrs>::Iter;
89
90 fn to_socket_addrs(&self) -> std::io::Result<Self::Iter> {
91 (self.host(), self.port()).to_socket_addrs()
92 }
93}
94
95pub(crate) struct HttpClient {
97 address: SocketAddr,
98 stream: TcpStream,
99}
100
101impl HttpClient {
102 pub fn connect<E: ToSocketAddrs>(endpoint: E) -> std::io::Result<Self> {
104 let address = match endpoint.to_socket_addrs()?.next() {
105 None => {
106 return Err(std::io::Error::new(
107 std::io::ErrorKind::InvalidInput,
108 "could not resolve to any addresses",
109 ));
110 },
111 Some(address) => address,
112 };
113 let stream = std::net::TcpStream::connect_timeout(&address, TCP_STREAM_TIMEOUT)?;
114 stream.set_read_timeout(Some(TCP_STREAM_TIMEOUT))?;
115 stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT))?;
116
117 #[cfg(feature = "tokio")]
118 let stream = {
119 stream.set_nonblocking(true)?;
120 TcpStream::from_std(stream)?
121 };
122
123 Ok(Self { address, stream })
124 }
125
126 #[allow(dead_code)]
130 pub async fn get<F>(&mut self, uri: &str, host: &str) -> std::io::Result<F>
131 where
132 F: TryFrom<Vec<u8>, Error = std::io::Error>,
133 {
134 let request = format!(
135 "GET {} HTTP/1.1\r\n\
136 Host: {}\r\n\
137 Connection: keep-alive\r\n\
138 \r\n",
139 uri, host
140 );
141 let response_body = self.send_request_with_retry(&request).await?;
142 F::try_from(response_body)
143 }
144
145 #[allow(dead_code)]
151 pub async fn post<F>(
152 &mut self, uri: &str, host: &str, auth: &str, content: serde_json::Value,
153 ) -> std::io::Result<F>
154 where
155 F: TryFrom<Vec<u8>, Error = std::io::Error>,
156 {
157 let content = content.to_string();
158 let request = format!(
159 "POST {} HTTP/1.1\r\n\
160 Host: {}\r\n\
161 Authorization: {}\r\n\
162 Connection: keep-alive\r\n\
163 Content-Type: application/json\r\n\
164 Content-Length: {}\r\n\
165 \r\n\
166 {}",
167 uri,
168 host,
169 auth,
170 content.len(),
171 content
172 );
173 let response_body = self.send_request_with_retry(&request).await?;
174 F::try_from(response_body)
175 }
176
177 async fn send_request_with_retry(&mut self, request: &str) -> std::io::Result<Vec<u8>> {
180 match self.send_request(request).await {
181 Ok(bytes) => Ok(bytes),
182 Err(_) => {
183 #[cfg(feature = "tokio")]
190 tokio::time::sleep(Duration::from_millis(100)).await;
191 #[cfg(not(feature = "tokio"))]
192 std::thread::sleep(Duration::from_millis(100));
193 *self = Self::connect(self.address)?;
194 self.send_request(request).await
195 },
196 }
197 }
198
199 async fn send_request(&mut self, request: &str) -> std::io::Result<Vec<u8>> {
201 self.write_request(request).await?;
202 self.read_response().await
203 }
204
205 async fn write_request(&mut self, request: &str) -> std::io::Result<()> {
207 #[cfg(feature = "tokio")]
208 {
209 self.stream.write_all(request.as_bytes()).await?;
210 self.stream.flush().await
211 }
212 #[cfg(not(feature = "tokio"))]
213 {
214 self.stream.write_all(request.as_bytes())?;
215 self.stream.flush()
216 }
217 }
218
219 async fn read_response(&mut self) -> std::io::Result<Vec<u8>> {
221 #[cfg(feature = "tokio")]
222 let stream = self.stream.split().0;
223 #[cfg(not(feature = "tokio"))]
224 let stream = std::io::Read::by_ref(&mut self.stream);
225
226 let limited_stream = stream.take(MAX_HTTP_MESSAGE_HEADER_SIZE as u64);
227
228 #[cfg(feature = "tokio")]
229 let mut reader = tokio::io::BufReader::new(limited_stream);
230 #[cfg(not(feature = "tokio"))]
231 let mut reader = std::io::BufReader::new(limited_stream);
232
233 macro_rules! read_line {
234 () => {
235 read_line!(0)
236 };
237 ($retry_count: expr) => {{
238 let mut line = String::new();
239 let mut timeout_count: u64 = 0;
240 let bytes_read = loop {
241 #[cfg(feature = "tokio")]
242 let read_res = reader.read_line(&mut line).await;
243 #[cfg(not(feature = "tokio"))]
244 let read_res = reader.read_line(&mut line);
245 match read_res {
246 Ok(bytes_read) => break bytes_read,
247 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
248 timeout_count += 1;
249 if timeout_count > $retry_count {
250 return Err(e);
251 } else {
252 continue;
253 }
254 },
255 Err(e) => return Err(e),
256 }
257 };
258
259 match bytes_read {
260 0 => None,
261 _ => {
262 if line.ends_with('\n') {
264 line.pop();
265 if line.ends_with('\r') {
266 line.pop();
267 }
268 }
269 Some(line)
270 },
271 }
272 }};
273 }
274
275 let status_line =
278 read_line!(TCP_STREAM_RESPONSE_TIMEOUT.as_secs() / TCP_STREAM_TIMEOUT.as_secs())
279 .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no status line"))?;
280 let status = HttpStatus::parse(&status_line)?;
281
282 let mut message_length = HttpMessageLength::Empty;
284 loop {
285 let line = read_line!()
286 .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no headers"))?;
287 if line.is_empty() {
288 break;
289 }
290
291 let header = HttpHeader::parse(&line)?;
292 if header.has_name("Content-Length") {
293 let length = header
294 .value
295 .parse()
296 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
297 if let HttpMessageLength::Empty = message_length {
298 message_length = HttpMessageLength::ContentLength(length);
299 }
300 continue;
301 }
302
303 if header.has_name("Transfer-Encoding") {
304 message_length = HttpMessageLength::TransferEncoding(header.value.into());
305 continue;
306 }
307 }
308
309 let read_limit = MAX_HTTP_MESSAGE_BODY_SIZE - reader.buffer().len();
311 reader.get_mut().set_limit(read_limit as u64);
312 let contents = match message_length {
313 HttpMessageLength::Empty => Vec::new(),
314 HttpMessageLength::ContentLength(length) => {
315 if length == 0 || length > MAX_HTTP_MESSAGE_BODY_SIZE {
316 return Err(std::io::Error::new(
317 std::io::ErrorKind::InvalidData,
318 format!("invalid response length: {} bytes", length),
319 ));
320 } else {
321 let mut content = vec![0; length];
322 #[cfg(feature = "tokio")]
323 reader.read_exact(&mut content[..]).await?;
324 #[cfg(not(feature = "tokio"))]
325 reader.read_exact(&mut content[..])?;
326 content
327 }
328 },
329 HttpMessageLength::TransferEncoding(coding) => {
330 if !coding.eq_ignore_ascii_case("chunked") {
331 return Err(std::io::Error::new(
332 std::io::ErrorKind::InvalidInput,
333 "unsupported transfer coding",
334 ));
335 } else {
336 let mut content = Vec::new();
337 #[cfg(feature = "tokio")]
338 {
339 loop {
345 let mut chunk_header = String::new();
347 reader.read_line(&mut chunk_header).await?;
348 if chunk_header == "0\r\n" {
349 reader.read_line(&mut chunk_header).await?;
352 }
353
354 let mut buffer = Vec::new();
356 let mut decoder =
357 chunked_transfer::Decoder::new(chunk_header.as_bytes());
358 decoder.read_to_end(&mut buffer)?;
359
360 let chunk_size = match decoder.remaining_chunks_size() {
362 None => break,
363 Some(chunk_size) => chunk_size,
364 };
365 let chunk_offset = content.len();
366 content.resize(chunk_offset + chunk_size + "\r\n".len(), 0);
367 reader.read_exact(&mut content[chunk_offset..]).await?;
368 content.resize(chunk_offset + chunk_size, 0);
369 }
370 content
371 }
372 #[cfg(not(feature = "tokio"))]
373 {
374 let mut decoder = chunked_transfer::Decoder::new(reader);
375 decoder.read_to_end(&mut content)?;
376 content
377 }
378 }
379 },
380 };
381
382 if !status.is_ok() {
383 let error = HttpError { status_code: status.code.to_string(), contents };
385 return Err(std::io::Error::new(std::io::ErrorKind::Other, error));
386 }
387
388 Ok(contents)
389 }
390}
391
392#[derive(Debug)]
394pub(crate) struct HttpError {
395 pub(crate) status_code: String,
396 pub(crate) contents: Vec<u8>,
397}
398
399impl std::error::Error for HttpError {}
400
401impl fmt::Display for HttpError {
402 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
403 let contents = String::from_utf8_lossy(&self.contents);
404 write!(f, "status_code: {}, contents: {}", self.status_code, contents)
405 }
406}
407
408struct HttpStatus<'a> {
412 code: &'a str,
413}
414
415impl<'a> HttpStatus<'a> {
416 fn parse(line: &'a String) -> std::io::Result<HttpStatus<'a>> {
420 let mut tokens = line.splitn(3, ' ');
421
422 let http_version = tokens
423 .next()
424 .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no HTTP-Version"))?;
425 if !http_version.eq_ignore_ascii_case("HTTP/1.1")
426 && !http_version.eq_ignore_ascii_case("HTTP/1.0")
427 {
428 return Err(std::io::Error::new(
429 std::io::ErrorKind::InvalidData,
430 "invalid HTTP-Version",
431 ));
432 }
433
434 let code = tokens
435 .next()
436 .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Status-Code"))?;
437 if code.len() != 3 || !code.chars().all(|c| c.is_ascii_digit()) {
438 return Err(std::io::Error::new(
439 std::io::ErrorKind::InvalidData,
440 "invalid Status-Code",
441 ));
442 }
443
444 let _reason = tokens
445 .next()
446 .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Reason-Phrase"))?;
447
448 Ok(Self { code })
449 }
450
451 fn is_ok(&self) -> bool {
453 self.code.starts_with('2')
454 }
455}
456
457struct HttpHeader<'a> {
461 name: &'a str,
462 value: &'a str,
463}
464
465impl<'a> HttpHeader<'a> {
466 fn parse(line: &'a String) -> std::io::Result<HttpHeader<'a>> {
470 let mut tokens = line.splitn(2, ':');
471 let name = tokens
472 .next()
473 .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header name"))?;
474 let value = tokens
475 .next()
476 .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header value"))?
477 .trim_start();
478 Ok(Self { name, value })
479 }
480
481 fn has_name(&self, name: &str) -> bool {
483 self.name.eq_ignore_ascii_case(name)
484 }
485}
486
487enum HttpMessageLength {
491 Empty,
492 ContentLength(usize),
493 TransferEncoding(String),
494}
495
496pub struct BinaryResponse(pub Vec<u8>);
498
499pub struct JsonResponse(pub serde_json::Value);
501
502impl TryFrom<Vec<u8>> for BinaryResponse {
504 type Error = std::io::Error;
505
506 fn try_from(bytes: Vec<u8>) -> std::io::Result<Self> {
507 Ok(BinaryResponse(bytes))
508 }
509}
510
511impl TryFrom<Vec<u8>> for JsonResponse {
513 type Error = std::io::Error;
514
515 fn try_from(bytes: Vec<u8>) -> std::io::Result<Self> {
516 Ok(JsonResponse(serde_json::from_slice(&bytes)?))
517 }
518}
519
520#[cfg(test)]
521mod endpoint_tests {
522 use super::HttpEndpoint;
523
524 #[test]
525 fn with_default_port() {
526 let endpoint = HttpEndpoint::for_host("foo.com".into());
527 assert_eq!(endpoint.host(), "foo.com");
528 assert_eq!(endpoint.port(), 80);
529 }
530
531 #[test]
532 fn with_custom_port() {
533 let endpoint = HttpEndpoint::for_host("foo.com".into()).with_port(8080);
534 assert_eq!(endpoint.host(), "foo.com");
535 assert_eq!(endpoint.port(), 8080);
536 }
537
538 #[test]
539 fn with_uri_path() {
540 let endpoint = HttpEndpoint::for_host("foo.com".into()).with_path("/path".into());
541 assert_eq!(endpoint.host(), "foo.com");
542 assert_eq!(endpoint.path(), "/path");
543 }
544
545 #[test]
546 fn without_uri_path() {
547 let endpoint = HttpEndpoint::for_host("foo.com".into());
548 assert_eq!(endpoint.host(), "foo.com");
549 assert_eq!(endpoint.path(), "/");
550 }
551
552 #[test]
553 fn convert_to_socket_addrs() {
554 let endpoint = HttpEndpoint::for_host("localhost".into());
555 let host = endpoint.host();
556 let port = endpoint.port();
557
558 use std::net::ToSocketAddrs;
559 match (&endpoint).to_socket_addrs() {
560 Err(e) => panic!("Unexpected error: {:?}", e),
561 Ok(socket_addrs) => {
562 let mut std_addrs = (host, port).to_socket_addrs().unwrap();
563 for addr in socket_addrs {
564 assert_eq!(addr, std_addrs.next().unwrap());
565 }
566 assert!(std_addrs.next().is_none());
567 },
568 }
569 }
570}
571
572#[cfg(test)]
573pub(crate) mod client_tests {
574 use super::*;
575 use std::io::BufRead;
576 use std::io::Write;
577
578 pub struct HttpServer {
580 address: std::net::SocketAddr,
581 handler: std::thread::JoinHandle<()>,
582 shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
583 }
584
585 pub enum MessageBody<T: ToString> {
587 Empty,
588 Content(T),
589 ChunkedContent(T),
590 }
591
592 impl HttpServer {
593 fn responding_with_body<T: ToString>(status: &str, body: MessageBody<T>) -> Self {
594 let response = match body {
595 MessageBody::Empty => format!("{}\r\n\r\n", status),
596 MessageBody::Content(body) => {
597 let body = body.to_string();
598 format!(
599 "{}\r\n\
600 Content-Length: {}\r\n\
601 \r\n\
602 {}",
603 status,
604 body.len(),
605 body
606 )
607 },
608 MessageBody::ChunkedContent(body) => {
609 let mut chuncked_body = Vec::new();
610 {
611 use chunked_transfer::Encoder;
612 let mut encoder = Encoder::with_chunks_size(&mut chuncked_body, 8);
613 encoder.write_all(body.to_string().as_bytes()).unwrap();
614 }
615 format!(
616 "{}\r\n\
617 Transfer-Encoding: chunked\r\n\
618 \r\n\
619 {}",
620 status,
621 String::from_utf8(chuncked_body).unwrap()
622 )
623 },
624 };
625 HttpServer::responding_with(response)
626 }
627
628 pub fn responding_with_ok<T: ToString>(body: MessageBody<T>) -> Self {
629 HttpServer::responding_with_body("HTTP/1.1 200 OK", body)
630 }
631
632 pub fn responding_with_not_found() -> Self {
633 HttpServer::responding_with_body::<String>("HTTP/1.1 404 Not Found", MessageBody::Empty)
634 }
635
636 pub fn responding_with_server_error<T: ToString>(content: T) -> Self {
637 let body = MessageBody::Content(content);
638 HttpServer::responding_with_body("HTTP/1.1 500 Internal Server Error", body)
639 }
640
641 fn responding_with(response: String) -> Self {
642 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
643 let address = listener.local_addr().unwrap();
644
645 let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
646 let shutdown_signaled = std::sync::Arc::clone(&shutdown);
647 let handler = std::thread::spawn(move || {
648 for stream in listener.incoming() {
649 let mut stream = stream.unwrap();
650 stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT)).unwrap();
651
652 let lines_read = std::io::BufReader::new(&stream)
653 .lines()
654 .take_while(|line| !line.as_ref().unwrap().is_empty())
655 .count();
656 if lines_read == 0 {
657 continue;
658 }
659
660 for chunk in response.as_bytes().chunks(16) {
661 if shutdown_signaled.load(std::sync::atomic::Ordering::SeqCst) {
662 return;
663 } else {
664 if let Err(_) = stream.write(chunk) {
665 break;
666 }
667 if let Err(_) = stream.flush() {
668 break;
669 }
670 }
671 }
672 }
673 });
674
675 Self { address, handler, shutdown }
676 }
677
678 fn shutdown(self) {
679 self.shutdown.store(true, std::sync::atomic::Ordering::SeqCst);
680 self.handler.join().unwrap();
681 }
682
683 pub fn endpoint(&self) -> HttpEndpoint {
684 HttpEndpoint::for_host(self.address.ip().to_string()).with_port(self.address.port())
685 }
686 }
687
688 #[test]
689 fn connect_to_unresolvable_host() {
690 match HttpClient::connect(("example.invalid", 80)) {
691 Err(e) => {
692 assert!(
693 e.to_string().contains("failed to lookup address information")
694 || e.to_string().contains("No such host"),
695 "{:?}",
696 e
697 );
698 },
699 Ok(_) => panic!("Expected error"),
700 }
701 }
702
703 #[test]
704 fn connect_with_no_socket_address() {
705 match HttpClient::connect(&vec![][..]) {
706 Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput),
707 Ok(_) => panic!("Expected error"),
708 }
709 }
710
711 #[test]
712 fn connect_with_unknown_server() {
713 match HttpClient::connect(("::", 80)) {
714 #[cfg(target_os = "windows")]
715 Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::AddrNotAvailable),
716 #[cfg(not(target_os = "windows"))]
717 Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::ConnectionRefused),
718 Ok(_) => panic!("Expected error"),
719 }
720 }
721
722 #[tokio::test]
723 async fn connect_with_valid_endpoint() {
724 let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
725
726 match HttpClient::connect(&server.endpoint()) {
727 Err(e) => panic!("Unexpected error: {:?}", e),
728 Ok(_) => {},
729 }
730 }
731
732 #[tokio::test]
733 async fn read_empty_message() {
734 let server = HttpServer::responding_with("".to_string());
735
736 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
737 match client.get::<BinaryResponse>("/foo", "foo.com").await {
738 Err(e) => {
739 assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
740 assert_eq!(e.get_ref().unwrap().to_string(), "no status line");
741 },
742 Ok(_) => panic!("Expected error"),
743 }
744 }
745
746 #[tokio::test]
747 async fn read_incomplete_message() {
748 let server = HttpServer::responding_with("HTTP/1.1 200 OK".to_string());
749
750 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
751 match client.get::<BinaryResponse>("/foo", "foo.com").await {
752 Err(e) => {
753 assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
754 assert_eq!(e.get_ref().unwrap().to_string(), "no headers");
755 },
756 Ok(_) => panic!("Expected error"),
757 }
758 }
759
760 #[tokio::test]
761 async fn read_too_large_message_headers() {
762 let response = format!(
763 "HTTP/1.1 302 Found\r\n\
764 Location: {}\r\n\
765 \r\n",
766 "Z".repeat(MAX_HTTP_MESSAGE_HEADER_SIZE)
767 );
768 let server = HttpServer::responding_with(response);
769
770 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
771 match client.get::<BinaryResponse>("/foo", "foo.com").await {
772 Err(e) => {
773 assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
774 assert_eq!(e.get_ref().unwrap().to_string(), "no headers");
775 },
776 Ok(_) => panic!("Expected error"),
777 }
778 }
779
780 #[tokio::test]
781 async fn read_too_large_message_body() {
782 let body = "Z".repeat(MAX_HTTP_MESSAGE_BODY_SIZE + 1);
783 let server = HttpServer::responding_with_ok::<String>(MessageBody::Content(body));
784
785 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
786 match client.get::<BinaryResponse>("/foo", "foo.com").await {
787 Err(e) => {
788 assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
789 assert_eq!(
790 e.get_ref().unwrap().to_string(),
791 "invalid response length: 8032001 bytes"
792 );
793 },
794 Ok(_) => panic!("Expected error"),
795 }
796 server.shutdown();
797 }
798
799 #[tokio::test]
800 async fn read_message_with_unsupported_transfer_coding() {
801 let response = String::from(
802 "HTTP/1.1 200 OK\r\n\
803 Transfer-Encoding: gzip\r\n\
804 \r\n\
805 foobar",
806 );
807 let server = HttpServer::responding_with(response);
808
809 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
810 match client.get::<BinaryResponse>("/foo", "foo.com").await {
811 Err(e) => {
812 assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput);
813 assert_eq!(e.get_ref().unwrap().to_string(), "unsupported transfer coding");
814 },
815 Ok(_) => panic!("Expected error"),
816 }
817 }
818
819 #[tokio::test]
820 async fn read_error() {
821 let server = HttpServer::responding_with_server_error("foo");
822
823 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
824 match client.get::<JsonResponse>("/foo", "foo.com").await {
825 Err(e) => {
826 assert_eq!(e.kind(), std::io::ErrorKind::Other);
827 let http_error = e.into_inner().unwrap().downcast::<HttpError>().unwrap();
828 assert_eq!(http_error.status_code, "500");
829 assert_eq!(http_error.contents, "foo".as_bytes());
830 },
831 Ok(_) => panic!("Expected error"),
832 }
833 }
834
835 #[tokio::test]
836 async fn read_empty_message_body() {
837 let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
838
839 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
840 match client.get::<BinaryResponse>("/foo", "foo.com").await {
841 Err(e) => panic!("Unexpected error: {:?}", e),
842 Ok(bytes) => assert_eq!(bytes.0, Vec::<u8>::new()),
843 }
844 }
845
846 #[tokio::test]
847 async fn read_message_body_with_length() {
848 let body = "foo bar baz qux".repeat(32);
849 let content = MessageBody::Content(body.clone());
850 let server = HttpServer::responding_with_ok::<String>(content);
851
852 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
853 match client.get::<BinaryResponse>("/foo", "foo.com").await {
854 Err(e) => panic!("Unexpected error: {:?}", e),
855 Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()),
856 }
857 }
858
859 #[tokio::test]
860 async fn read_chunked_message_body() {
861 let body = "foo bar baz qux".repeat(32);
862 let chunked_content = MessageBody::ChunkedContent(body.clone());
863 let server = HttpServer::responding_with_ok::<String>(chunked_content);
864
865 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
866 match client.get::<BinaryResponse>("/foo", "foo.com").await {
867 Err(e) => panic!("Unexpected error: {:?}", e),
868 Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()),
869 }
870 }
871
872 #[tokio::test]
873 async fn reconnect_closed_connection() {
874 let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
875
876 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
877 assert!(client.get::<BinaryResponse>("/foo", "foo.com").await.is_ok());
878 match client.get::<BinaryResponse>("/foo", "foo.com").await {
879 Err(e) => panic!("Unexpected error: {:?}", e),
880 Ok(bytes) => assert_eq!(bytes.0, Vec::<u8>::new()),
881 }
882 }
883
884 #[test]
885 fn from_bytes_into_binary_response() {
886 let bytes = b"foo";
887 match BinaryResponse::try_from(bytes.to_vec()) {
888 Err(e) => panic!("Unexpected error: {:?}", e),
889 Ok(response) => assert_eq!(&response.0, bytes),
890 }
891 }
892
893 #[test]
894 fn from_invalid_bytes_into_json_response() {
895 let json = serde_json::json!({ "result": 42 });
896 match JsonResponse::try_from(json.to_string().as_bytes()[..5].to_vec()) {
897 Err(_) => {},
898 Ok(_) => panic!("Expected error"),
899 }
900 }
901
902 #[test]
903 fn from_valid_bytes_into_json_response() {
904 let json = serde_json::json!({ "result": 42 });
905 match JsonResponse::try_from(json.to_string().as_bytes().to_vec()) {
906 Err(e) => panic!("Unexpected error: {:?}", e),
907 Ok(response) => assert_eq!(response.0, json),
908 }
909 }
910}