1use chunked_transfer;
5use serde_json;
6
7use std::convert::TryFrom;
8use std::fmt;
9#[cfg(not(feature = "tokio"))]
10use std::io::Write;
11use std::net::{SocketAddr, ToSocketAddrs};
12use std::time::Duration;
13
14#[cfg(feature = "tokio")]
15use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
16#[cfg(feature = "tokio")]
17use tokio::net::TcpStream;
18
19#[cfg(not(feature = "tokio"))]
20use std::io::BufRead;
21use std::io::Read;
22#[cfg(not(feature = "tokio"))]
23use std::net::TcpStream;
24
25const TCP_STREAM_TIMEOUT: Duration = Duration::from_secs(5);
27
28const TCP_STREAM_RESPONSE_TIMEOUT: Duration = Duration::from_secs(300);
34
35const MAX_HTTP_MESSAGE_HEADER_SIZE: usize = 8192;
37
38const MAX_HTTP_MESSAGE_BODY_SIZE: usize = 2 * 4_000_000 + 32_000;
41
42#[derive(Debug)]
44pub struct HttpEndpoint {
45 host: String,
46 port: Option<u16>,
47 path: String,
48}
49
50impl HttpEndpoint {
51 pub fn for_host(host: String) -> Self {
53 Self { host, port: None, path: String::from("/") }
54 }
55
56 pub fn with_port(mut self, port: u16) -> Self {
58 self.port = Some(port);
59 self
60 }
61
62 pub fn with_path(mut self, path: String) -> Self {
64 self.path = path;
65 self
66 }
67
68 pub fn host(&self) -> &str {
70 &self.host
71 }
72
73 pub fn port(&self) -> u16 {
75 match self.port {
76 None => 80,
77 Some(port) => port,
78 }
79 }
80
81 pub fn path(&self) -> &str {
83 &self.path
84 }
85}
86
87impl<'a> std::net::ToSocketAddrs for &'a HttpEndpoint {
88 type Iter = <(&'a str, u16) as std::net::ToSocketAddrs>::Iter;
89
90 fn to_socket_addrs(&self) -> std::io::Result<Self::Iter> {
91 (self.host(), self.port()).to_socket_addrs()
92 }
93}
94
95pub(crate) struct HttpClient {
97 address: SocketAddr,
98 stream: TcpStream,
99}
100
101impl HttpClient {
102 pub fn connect<E: ToSocketAddrs>(endpoint: E) -> std::io::Result<Self> {
104 let address = match endpoint.to_socket_addrs()?.next() {
105 None => {
106 return Err(std::io::Error::new(
107 std::io::ErrorKind::InvalidInput,
108 "could not resolve to any addresses",
109 ));
110 },
111 Some(address) => address,
112 };
113 let stream = std::net::TcpStream::connect_timeout(&address, TCP_STREAM_TIMEOUT)?;
114 stream.set_read_timeout(Some(TCP_STREAM_TIMEOUT))?;
115 stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT))?;
116
117 #[cfg(feature = "tokio")]
118 let stream = {
119 stream.set_nonblocking(true)?;
120 TcpStream::from_std(stream)?
121 };
122
123 Ok(Self { address, stream })
124 }
125
126 #[allow(dead_code)]
130 pub async fn get<F>(&mut self, uri: &str, host: &str) -> std::io::Result<F>
131 where
132 F: TryFrom<Vec<u8>, Error = std::io::Error>,
133 {
134 let request = format!(
135 "GET {} HTTP/1.1\r\n\
136 Host: {}\r\n\
137 Connection: keep-alive\r\n\
138 \r\n",
139 uri, host
140 );
141 let response_body = self.send_request_with_retry(&request).await?;
142 F::try_from(response_body)
143 }
144
145 #[allow(dead_code)]
151 pub async fn post<F>(
152 &mut self, uri: &str, host: &str, auth: &str, content: serde_json::Value,
153 ) -> std::io::Result<F>
154 where
155 F: TryFrom<Vec<u8>, Error = std::io::Error>,
156 {
157 let content = content.to_string();
158 let request = format!(
159 "POST {} HTTP/1.1\r\n\
160 Host: {}\r\n\
161 Authorization: {}\r\n\
162 Connection: keep-alive\r\n\
163 Content-Type: application/json\r\n\
164 Content-Length: {}\r\n\
165 \r\n\
166 {}",
167 uri,
168 host,
169 auth,
170 content.len(),
171 content
172 );
173 let response_body = self.send_request_with_retry(&request).await?;
174 F::try_from(response_body)
175 }
176
177 async fn send_request_with_retry(&mut self, request: &str) -> std::io::Result<Vec<u8>> {
180 match self.send_request(request).await {
181 Ok(bytes) => Ok(bytes),
182 Err(_) => {
183 #[cfg(feature = "tokio")]
190 tokio::time::sleep(Duration::from_millis(100)).await;
191 #[cfg(not(feature = "tokio"))]
192 std::thread::sleep(Duration::from_millis(100));
193 *self = Self::connect(self.address)?;
194 self.send_request(request).await
195 },
196 }
197 }
198
199 async fn send_request(&mut self, request: &str) -> std::io::Result<Vec<u8>> {
201 self.write_request(request).await?;
202 self.read_response().await
203 }
204
205 async fn write_request(&mut self, request: &str) -> std::io::Result<()> {
207 #[cfg(feature = "tokio")]
208 {
209 self.stream.write_all(request.as_bytes()).await?;
210 self.stream.flush().await
211 }
212 #[cfg(not(feature = "tokio"))]
213 {
214 self.stream.write_all(request.as_bytes())?;
215 self.stream.flush()
216 }
217 }
218
219 async fn read_response(&mut self) -> std::io::Result<Vec<u8>> {
221 #[cfg(feature = "tokio")]
222 let stream = self.stream.split().0;
223 #[cfg(not(feature = "tokio"))]
224 let stream = std::io::Read::by_ref(&mut self.stream);
225
226 let limited_stream = stream.take(MAX_HTTP_MESSAGE_HEADER_SIZE as u64);
227
228 #[cfg(feature = "tokio")]
229 let mut reader = tokio::io::BufReader::new(limited_stream);
230 #[cfg(not(feature = "tokio"))]
231 let mut reader = std::io::BufReader::new(limited_stream);
232
233 macro_rules! read_line {
234 () => {
235 read_line!(0)
236 };
237 ($retry_count: expr) => {{
238 let mut line = String::new();
239 let mut timeout_count: u64 = 0;
240 let bytes_read = loop {
241 #[cfg(feature = "tokio")]
242 let read_res = reader.read_line(&mut line).await;
243 #[cfg(not(feature = "tokio"))]
244 let read_res = reader.read_line(&mut line);
245 match read_res {
246 Ok(bytes_read) => break bytes_read,
247 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
248 timeout_count += 1;
249 if timeout_count > $retry_count {
250 return Err(e);
251 } else {
252 continue;
253 }
254 },
255 Err(e) => return Err(e),
256 }
257 };
258
259 match bytes_read {
260 0 => None,
261 _ => {
262 if line.ends_with('\n') {
264 line.pop();
265 if line.ends_with('\r') {
266 line.pop();
267 }
268 }
269 Some(line)
270 },
271 }
272 }};
273 }
274
275 let status_line =
278 read_line!(TCP_STREAM_RESPONSE_TIMEOUT.as_secs() / TCP_STREAM_TIMEOUT.as_secs())
279 .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no status line"))?;
280 let status = HttpStatus::parse(&status_line)?;
281
282 let mut message_length = HttpMessageLength::Empty;
284 loop {
285 let line = read_line!()
286 .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no headers"))?;
287 if line.is_empty() {
288 break;
289 }
290
291 let header = HttpHeader::parse(&line)?;
292 if header.has_name("Content-Length") {
293 let length = header
294 .value
295 .parse()
296 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
297 if let HttpMessageLength::Empty = message_length {
298 message_length = HttpMessageLength::ContentLength(length);
299 }
300 continue;
301 }
302
303 if header.has_name("Transfer-Encoding") {
304 message_length = HttpMessageLength::TransferEncoding(header.value.into());
305 continue;
306 }
307 }
308
309 let read_limit = MAX_HTTP_MESSAGE_BODY_SIZE - reader.buffer().len();
311 reader.get_mut().set_limit(read_limit as u64);
312 let contents = match message_length {
313 HttpMessageLength::Empty => Vec::new(),
314 HttpMessageLength::ContentLength(length) => {
315 if length == 0 || length > MAX_HTTP_MESSAGE_BODY_SIZE {
316 return Err(std::io::Error::new(
317 std::io::ErrorKind::InvalidData,
318 format!("invalid response length: {} bytes", length),
319 ));
320 } else {
321 let mut content = vec![0; length];
322 #[cfg(feature = "tokio")]
323 reader.read_exact(&mut content[..]).await?;
324 #[cfg(not(feature = "tokio"))]
325 reader.read_exact(&mut content[..])?;
326 content
327 }
328 },
329 HttpMessageLength::TransferEncoding(coding) => {
330 if !coding.eq_ignore_ascii_case("chunked") {
331 return Err(std::io::Error::new(
332 std::io::ErrorKind::InvalidInput,
333 "unsupported transfer coding",
334 ));
335 } else {
336 let mut content = Vec::new();
337 #[cfg(feature = "tokio")]
338 {
339 loop {
345 let mut chunk_header = String::new();
347 reader.read_line(&mut chunk_header).await?;
348 if chunk_header == "0\r\n" {
349 reader.read_line(&mut chunk_header).await?;
352 }
353
354 let mut buffer = Vec::new();
356 let mut decoder =
357 chunked_transfer::Decoder::new(chunk_header.as_bytes());
358 decoder.read_to_end(&mut buffer)?;
359
360 let chunk_size = match decoder.remaining_chunks_size() {
362 None => break,
363 Some(chunk_size) => chunk_size,
364 };
365 let chunk_offset = content.len();
366 content.resize(chunk_offset + chunk_size + "\r\n".len(), 0);
367 reader.read_exact(&mut content[chunk_offset..]).await?;
368 content.resize(chunk_offset + chunk_size, 0);
369 }
370 content
371 }
372 #[cfg(not(feature = "tokio"))]
373 {
374 let mut decoder = chunked_transfer::Decoder::new(reader);
375 decoder.read_to_end(&mut content)?;
376 content
377 }
378 }
379 },
380 };
381
382 if !status.is_ok() {
383 let error = HttpError { status_code: status.code.to_string(), contents };
385 return Err(std::io::Error::new(std::io::ErrorKind::Other, error));
386 }
387
388 Ok(contents)
389 }
390}
391
392#[derive(Debug)]
394pub(crate) struct HttpError {
395 pub(crate) status_code: String,
396 pub(crate) contents: Vec<u8>,
397}
398
399impl std::error::Error for HttpError {}
400
401impl fmt::Display for HttpError {
402 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
403 let contents = String::from_utf8_lossy(&self.contents);
404 write!(f, "status_code: {}, contents: {}", self.status_code, contents)
405 }
406}
407
408struct HttpStatus<'a> {
412 code: &'a str,
413}
414
415impl<'a> HttpStatus<'a> {
416 fn parse(line: &'a String) -> std::io::Result<HttpStatus<'a>> {
420 let mut tokens = line.splitn(3, ' ');
421
422 let http_version = tokens
423 .next()
424 .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no HTTP-Version"))?;
425 if !http_version.eq_ignore_ascii_case("HTTP/1.1")
426 && !http_version.eq_ignore_ascii_case("HTTP/1.0")
427 {
428 return Err(std::io::Error::new(
429 std::io::ErrorKind::InvalidData,
430 "invalid HTTP-Version",
431 ));
432 }
433
434 let code = tokens
435 .next()
436 .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Status-Code"))?;
437 if code.len() != 3 || !code.chars().all(|c| c.is_ascii_digit()) {
438 return Err(std::io::Error::new(
439 std::io::ErrorKind::InvalidData,
440 "invalid Status-Code",
441 ));
442 }
443
444 let _reason = tokens
445 .next()
446 .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Reason-Phrase"))?;
447
448 Ok(Self { code })
449 }
450
451 fn is_ok(&self) -> bool {
453 self.code.starts_with('2')
454 }
455}
456
457struct HttpHeader<'a> {
461 name: &'a str,
462 value: &'a str,
463}
464
465impl<'a> HttpHeader<'a> {
466 fn parse(line: &'a String) -> std::io::Result<HttpHeader<'a>> {
470 let mut tokens = line.splitn(2, ':');
471 let name = tokens
472 .next()
473 .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header name"))?;
474 let value = tokens
475 .next()
476 .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header value"))?
477 .trim_start();
478 Ok(Self { name, value })
479 }
480
481 fn has_name(&self, name: &str) -> bool {
483 self.name.eq_ignore_ascii_case(name)
484 }
485}
486
487enum HttpMessageLength {
491 Empty,
492 ContentLength(usize),
493 TransferEncoding(String),
494}
495
496pub struct BinaryResponse(pub Vec<u8>);
498
499pub struct JsonResponse(pub serde_json::Value);
501
502impl TryFrom<Vec<u8>> for BinaryResponse {
504 type Error = std::io::Error;
505
506 fn try_from(bytes: Vec<u8>) -> std::io::Result<Self> {
507 Ok(BinaryResponse(bytes))
508 }
509}
510
511impl TryFrom<Vec<u8>> for JsonResponse {
513 type Error = std::io::Error;
514
515 fn try_from(bytes: Vec<u8>) -> std::io::Result<Self> {
516 Ok(JsonResponse(serde_json::from_slice(&bytes)?))
517 }
518}
519
520#[cfg(test)]
521mod endpoint_tests {
522 use super::HttpEndpoint;
523
524 #[test]
525 fn with_default_port() {
526 let endpoint = HttpEndpoint::for_host("foo.com".into());
527 assert_eq!(endpoint.host(), "foo.com");
528 assert_eq!(endpoint.port(), 80);
529 }
530
531 #[test]
532 fn with_custom_port() {
533 let endpoint = HttpEndpoint::for_host("foo.com".into()).with_port(8080);
534 assert_eq!(endpoint.host(), "foo.com");
535 assert_eq!(endpoint.port(), 8080);
536 }
537
538 #[test]
539 fn with_uri_path() {
540 let endpoint = HttpEndpoint::for_host("foo.com".into()).with_path("/path".into());
541 assert_eq!(endpoint.host(), "foo.com");
542 assert_eq!(endpoint.path(), "/path");
543 }
544
545 #[test]
546 fn without_uri_path() {
547 let endpoint = HttpEndpoint::for_host("foo.com".into());
548 assert_eq!(endpoint.host(), "foo.com");
549 assert_eq!(endpoint.path(), "/");
550 }
551
552 #[test]
553 fn convert_to_socket_addrs() {
554 let endpoint = HttpEndpoint::for_host("localhost".into());
555 let host = endpoint.host();
556 let port = endpoint.port();
557
558 use std::net::ToSocketAddrs;
559 match (&endpoint).to_socket_addrs() {
560 Err(e) => panic!("Unexpected error: {:?}", e),
561 Ok(socket_addrs) => {
562 let mut std_addrs = (host, port).to_socket_addrs().unwrap();
563 for addr in socket_addrs {
564 assert_eq!(addr, std_addrs.next().unwrap());
565 }
566 assert!(std_addrs.next().is_none());
567 },
568 }
569 }
570}
571
572#[cfg(test)]
573pub(crate) mod client_tests {
574 use super::*;
575 use std::io::BufRead;
576 use std::io::Write;
577
578 pub struct HttpServer {
580 address: std::net::SocketAddr,
581 handler: std::thread::JoinHandle<()>,
582 shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
583 }
584
585 pub enum MessageBody<T: ToString> {
587 Empty,
588 Content(T),
589 ChunkedContent(T),
590 }
591
592 impl HttpServer {
593 fn responding_with_body<T: ToString>(status: &str, body: MessageBody<T>) -> Self {
594 let response = match body {
595 MessageBody::Empty => format!("{}\r\n\r\n", status),
596 MessageBody::Content(body) => {
597 let body = body.to_string();
598 format!(
599 "{}\r\n\
600 Content-Length: {}\r\n\
601 \r\n\
602 {}",
603 status,
604 body.len(),
605 body
606 )
607 },
608 MessageBody::ChunkedContent(body) => {
609 let mut chuncked_body = Vec::new();
610 {
611 use chunked_transfer::Encoder;
612 let mut encoder = Encoder::with_chunks_size(&mut chuncked_body, 8);
613 encoder.write_all(body.to_string().as_bytes()).unwrap();
614 }
615 format!(
616 "{}\r\n\
617 Transfer-Encoding: chunked\r\n\
618 \r\n\
619 {}",
620 status,
621 String::from_utf8(chuncked_body).unwrap()
622 )
623 },
624 };
625 HttpServer::responding_with(response)
626 }
627
628 pub fn responding_with_ok<T: ToString>(body: MessageBody<T>) -> Self {
629 HttpServer::responding_with_body("HTTP/1.1 200 OK", body)
630 }
631
632 pub fn responding_with_not_found() -> Self {
633 HttpServer::responding_with_body::<String>("HTTP/1.1 404 Not Found", MessageBody::Empty)
634 }
635
636 pub fn responding_with_server_error<T: ToString>(content: T) -> Self {
637 let body = MessageBody::Content(content);
638 HttpServer::responding_with_body("HTTP/1.1 500 Internal Server Error", body)
639 }
640
641 fn responding_with(response: String) -> Self {
642 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
643 let address = listener.local_addr().unwrap();
644
645 let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
646 let shutdown_signaled = std::sync::Arc::clone(&shutdown);
647 let handler = std::thread::spawn(move || {
648 for stream in listener.incoming() {
649 let mut stream = stream.unwrap();
650 stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT)).unwrap();
651
652 let lines_read = std::io::BufReader::new(&stream)
653 .lines()
654 .take_while(|line| !line.as_ref().unwrap().is_empty())
655 .count();
656 if lines_read == 0 {
657 continue;
658 }
659
660 for chunk in response.as_bytes().chunks(16) {
661 if shutdown_signaled.load(std::sync::atomic::Ordering::SeqCst) {
662 return;
663 } else {
664 if let Err(_) = stream.write(chunk) {
665 break;
666 }
667 if let Err(_) = stream.flush() {
668 break;
669 }
670 }
671 }
672 }
673 });
674
675 Self { address, handler, shutdown }
676 }
677
678 fn shutdown(self) {
679 self.shutdown.store(true, std::sync::atomic::Ordering::SeqCst);
680 self.handler.join().unwrap();
681 }
682
683 pub fn endpoint(&self) -> HttpEndpoint {
684 HttpEndpoint::for_host(self.address.ip().to_string()).with_port(self.address.port())
685 }
686 }
687
688 #[test]
689 fn connect_to_unresolvable_host() {
690 match HttpClient::connect(("example.invalid", 80)) {
691 Err(e) => {
692 assert!(
693 e.to_string().contains("failed to lookup address information")
694 || e.to_string().contains("No such host"),
695 "{:?}",
696 e
697 );
698 },
699 Ok(_) => panic!("Expected error"),
700 }
701 }
702
703 #[test]
704 fn connect_with_no_socket_address() {
705 match HttpClient::connect(&vec![][..]) {
706 Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput),
707 Ok(_) => panic!("Expected error"),
708 }
709 }
710
711 #[test]
712 fn connect_with_unknown_server() {
713 let port = {
715 let t = std::net::TcpListener::bind(("127.0.0.1", 0)).unwrap();
716 t.local_addr().unwrap().port()
717 };
718
719 match HttpClient::connect(("::", port)) {
720 #[cfg(target_os = "windows")]
721 Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::AddrNotAvailable),
722 #[cfg(not(target_os = "windows"))]
723 Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::ConnectionRefused),
724 Ok(_) => panic!("Expected error"),
725 }
726 }
727
728 #[tokio::test]
729 async fn connect_with_valid_endpoint() {
730 let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
731
732 match HttpClient::connect(&server.endpoint()) {
733 Err(e) => panic!("Unexpected error: {:?}", e),
734 Ok(_) => {},
735 }
736 }
737
738 #[tokio::test]
739 async fn read_empty_message() {
740 let server = HttpServer::responding_with("".to_string());
741
742 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
743 match client.get::<BinaryResponse>("/foo", "foo.com").await {
744 Err(e) => {
745 assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
746 assert_eq!(e.get_ref().unwrap().to_string(), "no status line");
747 },
748 Ok(_) => panic!("Expected error"),
749 }
750 }
751
752 #[tokio::test]
753 async fn read_incomplete_message() {
754 let server = HttpServer::responding_with("HTTP/1.1 200 OK".to_string());
755
756 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
757 match client.get::<BinaryResponse>("/foo", "foo.com").await {
758 Err(e) => {
759 assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
760 assert_eq!(e.get_ref().unwrap().to_string(), "no headers");
761 },
762 Ok(_) => panic!("Expected error"),
763 }
764 }
765
766 #[tokio::test]
767 async fn read_too_large_message_headers() {
768 let response = format!(
769 "HTTP/1.1 302 Found\r\n\
770 Location: {}\r\n\
771 \r\n",
772 "Z".repeat(MAX_HTTP_MESSAGE_HEADER_SIZE)
773 );
774 let server = HttpServer::responding_with(response);
775
776 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
777 match client.get::<BinaryResponse>("/foo", "foo.com").await {
778 Err(e) => {
779 assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
780 assert_eq!(e.get_ref().unwrap().to_string(), "no headers");
781 },
782 Ok(_) => panic!("Expected error"),
783 }
784 }
785
786 #[tokio::test]
787 async fn read_too_large_message_body() {
788 let body = "Z".repeat(MAX_HTTP_MESSAGE_BODY_SIZE + 1);
789 let server = HttpServer::responding_with_ok::<String>(MessageBody::Content(body));
790
791 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
792 match client.get::<BinaryResponse>("/foo", "foo.com").await {
793 Err(e) => {
794 assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
795 assert_eq!(
796 e.get_ref().unwrap().to_string(),
797 "invalid response length: 8032001 bytes"
798 );
799 },
800 Ok(_) => panic!("Expected error"),
801 }
802 server.shutdown();
803 }
804
805 #[tokio::test]
806 async fn read_message_with_unsupported_transfer_coding() {
807 let response = String::from(
808 "HTTP/1.1 200 OK\r\n\
809 Transfer-Encoding: gzip\r\n\
810 \r\n\
811 foobar",
812 );
813 let server = HttpServer::responding_with(response);
814
815 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
816 match client.get::<BinaryResponse>("/foo", "foo.com").await {
817 Err(e) => {
818 assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput);
819 assert_eq!(e.get_ref().unwrap().to_string(), "unsupported transfer coding");
820 },
821 Ok(_) => panic!("Expected error"),
822 }
823 }
824
825 #[tokio::test]
826 async fn read_error() {
827 let server = HttpServer::responding_with_server_error("foo");
828
829 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
830 match client.get::<JsonResponse>("/foo", "foo.com").await {
831 Err(e) => {
832 assert_eq!(e.kind(), std::io::ErrorKind::Other);
833 let http_error = e.into_inner().unwrap().downcast::<HttpError>().unwrap();
834 assert_eq!(http_error.status_code, "500");
835 assert_eq!(http_error.contents, "foo".as_bytes());
836 },
837 Ok(_) => panic!("Expected error"),
838 }
839 }
840
841 #[tokio::test]
842 async fn read_empty_message_body() {
843 let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
844
845 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
846 match client.get::<BinaryResponse>("/foo", "foo.com").await {
847 Err(e) => panic!("Unexpected error: {:?}", e),
848 Ok(bytes) => assert_eq!(bytes.0, Vec::<u8>::new()),
849 }
850 }
851
852 #[tokio::test]
853 async fn read_message_body_with_length() {
854 let body = "foo bar baz qux".repeat(32);
855 let content = MessageBody::Content(body.clone());
856 let server = HttpServer::responding_with_ok::<String>(content);
857
858 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
859 match client.get::<BinaryResponse>("/foo", "foo.com").await {
860 Err(e) => panic!("Unexpected error: {:?}", e),
861 Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()),
862 }
863 }
864
865 #[tokio::test]
866 async fn read_chunked_message_body() {
867 let body = "foo bar baz qux".repeat(32);
868 let chunked_content = MessageBody::ChunkedContent(body.clone());
869 let server = HttpServer::responding_with_ok::<String>(chunked_content);
870
871 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
872 match client.get::<BinaryResponse>("/foo", "foo.com").await {
873 Err(e) => panic!("Unexpected error: {:?}", e),
874 Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()),
875 }
876 }
877
878 #[tokio::test]
879 async fn reconnect_closed_connection() {
880 let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
881
882 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
883 assert!(client.get::<BinaryResponse>("/foo", "foo.com").await.is_ok());
884 match client.get::<BinaryResponse>("/foo", "foo.com").await {
885 Err(e) => panic!("Unexpected error: {:?}", e),
886 Ok(bytes) => assert_eq!(bytes.0, Vec::<u8>::new()),
887 }
888 }
889
890 #[test]
891 fn from_bytes_into_binary_response() {
892 let bytes = b"foo";
893 match BinaryResponse::try_from(bytes.to_vec()) {
894 Err(e) => panic!("Unexpected error: {:?}", e),
895 Ok(response) => assert_eq!(&response.0, bytes),
896 }
897 }
898
899 #[test]
900 fn from_invalid_bytes_into_json_response() {
901 let json = serde_json::json!({ "result": 42 });
902 match JsonResponse::try_from(json.to_string().as_bytes()[..5].to_vec()) {
903 Err(_) => {},
904 Ok(_) => panic!("Expected error"),
905 }
906 }
907
908 #[test]
909 fn from_valid_bytes_into_json_response() {
910 let json = serde_json::json!({ "result": 42 });
911 match JsonResponse::try_from(json.to_string().as_bytes().to_vec()) {
912 Err(e) => panic!("Unexpected error: {:?}", e),
913 Ok(response) => assert_eq!(response.0, json),
914 }
915 }
916}