1extern crate cookie;
16extern crate httparse;
17extern crate rustls;
18extern crate tokio;
19extern crate tokio_stream;
20extern crate url;
21extern crate walkdir;
22
23pub mod decode;
24pub mod decompress;
25pub mod request;
26pub mod response;
27
28use std::fmt;
29use std::io;
30use std::io::Read;
31use std::marker::Unpin;
32use std::net::IpAddr;
33use std::ops::Add;
34use std::sync::{Arc, Mutex};
35use std::time::Duration;
36
37use futures::io::AsyncBufRead;
38use futures::stream::TryStreamExt;
39use futures::StreamExt;
40
41use rustls::ClientConfig;
42
43use tokio::io::copy;
44use tokio::io::AsyncWriteExt;
45use tokio::runtime::Runtime;
46use tokio::sync::mpsc;
47use tokio::sync::oneshot;
48
49use tokio::time::sleep_until;
50use tokio::time::Instant;
51use tokio_stream::wrappers::ReceiverStream;
52
53use tokio_util::compat::FuturesAsyncReadCompatExt;
54use url::{Host, Url};
55use uuid::Uuid;
56
57use crate::dns::{DnsClient, DnsConfig};
58
59use crate::ffi::log::platform_log;
60use crate::ffi::net_ctrl::get_active_dns_servers;
61
62use crate::internet::header;
63use crate::internet::Header;
64use crate::io::network::stream;
65use crate::io::network::stream::ClientStream;
66use crate::io::DynamicChain;
67use crate::io::ProgressReportingReader;
68use crate::io::Serializable;
69
70use request::Request;
71use response::{Response, ResponseOrAgain, ResponseStream};
72
73const LOG_TAG: &str = "http_client";
74
75const MAX_CONCURRENT_CONNECTIONS: usize = 16;
76
77const MAX_CONCURRENT_REQUEST_IN_CONNECTIONS: usize = 16;
78
79enum ConnectMessage {
80 Request(
81 bool,
82 Host,
83 u16,
84 bool,
85 oneshot::Sender<Result<HttpConnectionHandle>>,
86 ),
87 }
90
91fn setup_idle_timeout(
92 conn: &Arc<HttpConnection>,
93 connections: &Arc<
94 Mutex<
95 Vec<(
96 Arc<HttpConnection>,
97 mpsc::Sender<(
98 Request,
99 mpsc::Sender<usize>,
100 oneshot::Sender<
101 Result<(Response, Option<mpsc::Receiver<io::Result<Vec<u8>>>>)>,
102 >,
103 )>,
104 )>,
105 >,
106 >,
107 now: Instant,
108) {
109 let conn = Arc::clone(&conn);
110 let connections = Arc::clone(&connections);
111
112 tokio::spawn(async move {
113 let deadline = now.add(Duration::from_secs(16));
114 sleep_until(deadline).await;
115 platform_log(
116 LOG_TAG,
117 format!("on idle timer for http connection {}", conn.debug_name),
118 );
119 let mut guard = connections.lock().unwrap();
120 match *conn.state.lock().unwrap() {
121 ConnectionState::Idle(scheduled) => {
122 if scheduled == now {
123 let mut i = 0;
124 for (conn_, _) in &mut *guard {
125 if Arc::ptr_eq(&conn, conn_) {
126 let _ = (*guard).remove(i);
127 platform_log(
128 LOG_TAG,
129 format!("http connection {} removed", conn.debug_name),
130 );
131 break;
132 }
133 i = i + 1;
134 }
135 }
136 }
137 ConnectionState::Working(_) => {}
138 }
139 });
140}
141
142pub struct HttpClient {
143 tx: mpsc::Sender<ConnectMessage>,
146}
147
148impl HttpClient {
149 pub fn new(
150 client_config: Arc<ClientConfig>,
151 dns_client: Arc<DnsClient>,
152 rt: Arc<Runtime>,
153 ) -> HttpClient {
154 let (tx, mut rx) = mpsc::channel::<ConnectMessage>(MAX_CONCURRENT_CONNECTIONS);
155
156 let connections: Arc<
157 Mutex<
158 Vec<(
159 Arc<HttpConnection>,
160 mpsc::Sender<(
161 Request,
162 mpsc::Sender<usize>,
163 oneshot::Sender<
164 Result<(Response, Option<mpsc::Receiver<io::Result<Vec<u8>>>>)>,
165 >,
166 )>,
167 )>,
168 >,
169 > = Arc::new(Mutex::new(Vec::new()));
170
171 let mut stdout = tokio::io::stdout();
172
173 rt.spawn(async move {
174 'next: loop {
175 match rx.recv().await {
176 Some(cm) => {
177 match cm {
178 ConnectMessage::Request(tls, host, port, can_pipeline, tx) => {
179 stdout
180 .write(b"getting http connect request\r\n")
181 .await
182 .unwrap();
183
184 {
185 let mut guard = connections.lock().unwrap();
186
187 for (conn, req_tx) in &mut *guard {
188 if conn.host == host && conn.port == port {
189 let mut conn_guard = conn.state.lock().unwrap();
190 match &mut *conn_guard {
191 ConnectionState::Idle(_) if conn.can_pipeline == can_pipeline => {
192 *conn_guard = ConnectionState::Working(0);
193
194 match tx.send(Ok(HttpConnectionHandle {
195 cipher_id: conn.cipher_id,
196 tx: req_tx.clone(),
197 })) {
198 Ok(()) => {
199 continue 'next;
200 }
201
202 Err(_) => {
203 panic!("")
204 }
205 }
206 }
207
208 ConnectionState::Working(_) if conn.can_pipeline && can_pipeline => {
209 match tx.send(Ok(HttpConnectionHandle {
210 cipher_id: conn.cipher_id,
211 tx: req_tx.clone(),
212 })) {
213 Ok(()) => {
214 continue 'next;
215 }
216
217 Err(_) => {
218 panic!("")
219 }
220 }
221 }
222
223 _ => {}
224 }
225 }
226 }
227 }
228
229 match host {
230 Host::Domain(domain) => {
231 let client_config = Arc::clone(&client_config);
232 let dns_client = Arc::clone(&dns_client);
233 let connections = Arc::clone(&connections);
234
235 stdout.write(b"resolve dns for ").await.unwrap();
236 stdout.write(domain.as_bytes()).await.unwrap();
237 stdout.write(b"\r\n").await.unwrap();
238
239 tokio::spawn(async move {
240 let dns_servers = get_active_dns_servers();
241
242 let dns_config = DnsConfig { server_addrs: dns_servers };
243
244 match dns_client.resolve(dns_config, domain.clone()).await {
245 Ok(mut stream) => {
246 while let Some(ip) = stream.next().await {
247 if let IpAddr::V6(_) = ip {
248 continue; }
250 let client_config = Arc::clone(&client_config);
251 match Self::connect_inner(
252 tls,
253 Host::Domain(domain.clone()),
254 ip,
255 port,
256 client_config,
257 &domain,
258 can_pipeline,
259 &connections,
260 )
261 .await
262 {
263 Ok(handle) => match tx.send(Ok(handle)) {
264 Ok(()) => {
265 return;
266 }
267
268 Err(_) => {
269 panic!("")
270 }
271 },
272
273 Err(e) => {
274 platform_log(
275 LOG_TAG,
276 format!(
277 "connect_inner failed with error {:?}",
278 e
279 ),
280 );
281 }
282 }
283 }
284
285 match tx.send(Err(ErrorKind::Dns)) {
286 Ok(()) => {
287 return;
288 }
289
290 Err(_) => {
291 panic!("")
292 }
293 }
294 }
295
296 Err(_) => match tx.send(Err(ErrorKind::Dns)) {
297 Ok(()) => {
298 return;
299 }
300
301 Err(_) => {
302 panic!("")
303 }
304 },
305 }
306 });
307 }
308
309 Host::Ipv4(ip) => {
310 let client_config = Arc::clone(&client_config);
311 let connections = Arc::clone(&connections);
312
313 tokio::spawn(async move {
314 if let Ok(handle) = Self::connect_inner(
315 tls,
316 host,
317 IpAddr::V4(ip),
318 port,
319 client_config,
320 &ip.to_string(),
321 can_pipeline,
322 &connections,
323 )
324 .await
325 {
326 match tx.send(Ok(handle)) {
327 Ok(()) => {
328 return;
329 }
330
331 Err(_) => {
332 panic!("")
333 }
334 }
335 }
336 });
337 }
338
339 Host::Ipv6(ip) => {
340 let client_config = Arc::clone(&client_config);
341 let connections = Arc::clone(&connections);
342
343 tokio::spawn(async move {
344 if let Ok(handle) = Self::connect_inner(
345 tls,
346 host,
347 IpAddr::V6(ip),
348 port,
349 client_config,
350 &ip.to_string(),
351 can_pipeline,
352 &connections,
353 )
354 .await
355 {
356 match tx.send(Ok(handle)) {
357 Ok(()) => {
358 return;
359 }
360
361 Err(_) => {
362 panic!("")
363 }
364 }
365 }
366 });
367 }
368 }
369 }
370
371 }
372 },
373
374 None => break,
375 }
376 }
377 });
378
379 HttpClient {
380 tx,
383 }
384 }
385
386 async fn connect_inner(
387 tls: bool,
388 host: Host,
389 addr: IpAddr,
390 port: u16,
391 client_config: Arc<ClientConfig>,
392 server_name: &str,
393 can_pipeline: bool,
394 connections: &Arc<
395 Mutex<
396 Vec<(
397 Arc<HttpConnection>,
398 mpsc::Sender<(
399 Request,
400 mpsc::Sender<usize>,
401 oneshot::Sender<
402 Result<(Response, Option<mpsc::Receiver<io::Result<Vec<u8>>>>)>,
403 >,
404 )>,
405 )>,
406 >,
407 >,
408 ) -> Result<HttpConnectionHandle> {
409 platform_log(
410 LOG_TAG,
411 format!(
412 "connect_inner: {}/{}:{}",
413 host.to_string(),
414 addr.to_string(),
415 &port
416 ),
417 );
418
419 if tls {
420 let (stream, cipher_id) =
421 Self::connect_tls(addr, port, client_config, server_name).await?;
422 let now = Instant::now();
423 let conn = HttpConnection::new(host, port, cipher_id, can_pipeline, now);
424 let conn = Arc::new(conn);
425 let conn_ = Arc::clone(&conn);
426 let req_tx = setup_http_connection(connections, &conn, stream);
427
428 let handle = HttpConnectionHandle {
429 cipher_id: conn.cipher_id,
430 tx: req_tx.clone(),
431 };
432
433 let mut guard = connections.lock().unwrap();
434
435 guard.push((conn, req_tx));
436
437 setup_idle_timeout(&conn_, &connections, now);
438
439 return Ok(handle);
440 } else {
441 let stream = Self::connect_tcp(addr, port).await?;
442 let now = Instant::now();
443 let conn = HttpConnection::new(host, port, None, can_pipeline, now);
444 let conn = Arc::new(conn);
445 let req_tx = setup_http_connection(connections, &conn, stream);
446 let conn_ = Arc::clone(&conn);
447
448 let handle = HttpConnectionHandle {
449 cipher_id: conn.cipher_id,
450 tx: req_tx.clone(),
451 };
452
453 let mut guard = connections.lock().unwrap();
454
455 guard.push((conn, req_tx));
456
457 setup_idle_timeout(&conn_, &connections, now);
458
459 return Ok(handle);
460 }
461 }
462
463 async fn connect_tcp(ip: IpAddr, port: u16) -> Result<ClientStream> {
464 #[cfg(all(feature = "android", target_os = "android"))]
465 match ClientStream::new_android(ip, port).await {
466 Ok(client_stream) => Ok(client_stream),
467
468 Err(e) => Err(ErrorKind::Stream(e)),
469 }
470
471 #[cfg(all(feature = "ohos", all(target_os = "linux", target_env = "ohos")))]
472 match ClientStream::new_ohos(ip, port).await {
473 Ok(client_stream) => Ok(client_stream),
474
475 Err(e) => Err(ErrorKind::Stream(e)),
476 }
477
478 #[cfg(not(any(
479 all(feature = "android", target_os = "android"),
480 all(feature = "ohos", all(target_os = "linux", target_env = "ohos"))
481 )))]
482 match ClientStream::new_tokio(ip, port).await {
483 Ok(client_stream) => Ok(client_stream),
484
485 Err(e) => Err(ErrorKind::Stream(e)),
486 }
487 }
488
489 async fn connect_tls(
490 ip: IpAddr,
491 port: u16,
492 client_config: Arc<ClientConfig>,
493 server_name: &str,
494 ) -> Result<(ClientStream, Option<(u8, u8)>)> {
495 #[cfg(all(feature = "android", target_os = "android"))]
496 match ClientStream::new_android_ssl(ip, port, server_name).await {
497 Ok(client_stream) => match client_stream.do_handshake().await {
498 Ok((client_stream, cipher_id)) => {
499 platform_log(LOG_TAG, format!("ssl do_handshake success"));
500 Ok((client_stream, cipher_id))
501 }
502 Err(e) => {
503 platform_log(
504 LOG_TAG,
505 format!("ssl do_handshake failed with error {:?}", e),
506 );
507 Err(ErrorKind::Stream(e))
508 }
509 },
510
511 Err(e) => {
512 platform_log(LOG_TAG, format!("ssl new failed with error {:?}", e));
513 Err(ErrorKind::Stream(e))
514 }
515 }
516
517 #[cfg(all(feature = "ohos", all(target_os = "linux", target_env = "ohos")))]
518 match ClientStream::new_ohos_ssl(client_config, ip, port, server_name).await {
519 Ok(client_stream) => match client_stream.do_handshake().await {
520 Ok((client_stream, cipher_id)) => {
521 platform_log(LOG_TAG, format!("ssl do_handshake success"));
522 Ok((client_stream, cipher_id))
523 }
524 Err(e) => {
525 platform_log(
526 LOG_TAG,
527 format!("ssl do_handshake failed with error {:?}", e),
528 );
529 Err(ErrorKind::Stream(e))
530 }
531 },
532
533 Err(e) => {
534 platform_log(LOG_TAG, format!("ssl new failed with error {:?}", e));
535 Err(ErrorKind::Stream(e))
536 }
537 }
538
539 #[cfg(not(any(
540 all(feature = "android", target_os = "android"),
541 all(feature = "ohos", all(target_os = "linux", target_env = "ohos"))
542 )))]
543 match ClientStream::new_tokio_ssl(client_config, ip, port, server_name).await {
544 Ok(client_stream) => match client_stream.do_handshake().await {
545 Ok((client_stream, cipher_id)) => {
546 platform_log(LOG_TAG, format!("ssl do_handshake success"));
547 Ok((client_stream, cipher_id))
548 }
549 Err(e) => {
550 platform_log(
551 LOG_TAG,
552 format!("ssl do_handshake failed with error {:?}", e),
553 );
554 Err(ErrorKind::Stream(e))
555 }
556 },
557
558 Err(e) => {
559 platform_log(LOG_TAG, format!("ssl new failed with error {:?}", e));
560 Err(ErrorKind::Stream(e))
561 }
562 }
563 }
564
565 pub async fn connect(&self, url: &Url, can_pipeline: bool) -> Result<HttpConnectionHandle> {
566 if let Some(host) = url.host() {
567 let tls = if url.scheme() == "https" { true } else { false };
568
569 let port = if let Some(port_) = url.port() {
570 port_
571 } else {
572 if tls {
573 443
574 } else {
575 80
576 }
577 };
578
579 match host {
580 Host::Domain(domain) => {
581 let (request_tx, request_rx) =
582 oneshot::channel::<Result<HttpConnectionHandle>>();
583
584 match self
585 .tx
586 .send(ConnectMessage::Request(
587 tls,
588 Host::Domain(String::from(domain)),
589 port,
590 can_pipeline,
591 request_tx,
592 ))
593 .await
594 {
595 Ok(()) => request_rx.await.unwrap(),
596
597 Err(_) => Err(ErrorKind::BrokenPipe),
598 }
599 }
600
601 Host::Ipv4(ip) => {
602 let (request_tx, request_rx) =
603 oneshot::channel::<Result<HttpConnectionHandle>>();
604
605 match self
606 .tx
607 .send(ConnectMessage::Request(
608 tls,
609 Host::Ipv4(ip),
610 port,
611 can_pipeline,
612 request_tx,
613 ))
614 .await
615 {
616 Ok(()) => request_rx.await.unwrap(),
617
618 Err(_) => Err(ErrorKind::BrokenPipe),
619 }
620 }
621
622 Host::Ipv6(ip) => {
623 let (request_tx, request_rx) =
624 oneshot::channel::<Result<HttpConnectionHandle>>();
625
626 match self
627 .tx
628 .send(ConnectMessage::Request(
629 tls,
630 Host::Ipv6(ip),
631 port,
632 can_pipeline,
633 request_tx,
634 ))
635 .await
636 {
637 Ok(()) => request_rx.await.unwrap(),
638
639 Err(_) => Err(ErrorKind::BrokenPipe),
640 }
641 }
642 }
643 } else {
644 Err(ErrorKind::BadFormat)
645 }
646 }
647}
648
649pub enum ErrorKind {
650 BadFormat,
651 Dns,
652 Stream(stream::ErrorKind),
653 BrokenPipe,
654 ConnectionLost,
655 ProtocolError,
656}
657
658impl Copy for ErrorKind {}
659
660impl Clone for ErrorKind {
661 fn clone(&self) -> ErrorKind {
662 *self
663 }
664}
665
666impl fmt::Debug for ErrorKind {
667 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
668 match self {
669 ErrorKind::BadFormat => {
670 write!(f, "BadFormat")
671 }
672
673 ErrorKind::Dns => {
674 write!(f, "Dns")
675 }
676
677 ErrorKind::Stream(e) => {
678 write!(f, "Stream {:?}", e)
679 }
680
681 ErrorKind::BrokenPipe => {
682 write!(f, "BrokenPipe")
683 }
684
685 ErrorKind::ConnectionLost => {
686 write!(f, "ConnectionLost")
687 }
688
689 ErrorKind::ProtocolError => {
690 write!(f, "ProtocolError")
691 }
692 }
693 }
694}
695
696pub type Result<T> = std::result::Result<T, ErrorKind>;
697
698enum ConnectionState {
699 Idle(Instant),
700 Working(usize), }
702
703pub struct HttpConnection {
704 debug_name: String,
705 state: Arc<Mutex<ConnectionState>>,
706 can_pipeline: bool,
707 host: Host,
708 port: u16,
709 cipher_id: Option<(u8, u8)>,
710}
711
712impl HttpConnection {
713 pub fn new(
714 host: Host,
715 port: u16,
716 cipher_id: Option<(u8, u8)>,
717 can_pipeline: bool,
718 now: Instant,
719 ) -> HttpConnection {
720 let state = Arc::new(Mutex::new(ConnectionState::Idle(now)));
721 HttpConnection {
722 debug_name: Uuid::new_v4().to_string(),
723 state,
724 can_pipeline,
725 host,
726 port,
727 cipher_id,
728 }
729 }
730}
731
732fn on_transaction_complete(
733 state: &Arc<Mutex<ConnectionState>>,
734 conn: &Arc<HttpConnection>,
735 connections: &Arc<
736 Mutex<
737 Vec<(
738 Arc<HttpConnection>,
739 mpsc::Sender<(
740 Request,
741 mpsc::Sender<usize>,
742 oneshot::Sender<
743 Result<(Response, Option<mpsc::Receiver<io::Result<Vec<u8>>>>)>,
744 >,
745 )>,
746 )>,
747 >,
748 >,
749) {
750 platform_log(
751 LOG_TAG,
752 format!(
753 "on_transaction_complete for http connection {}",
754 conn.debug_name
755 ),
756 );
757 let mut state = state.lock().unwrap();
758 match &mut *state {
759 ConnectionState::Idle(_) => panic!(""),
760 ConnectionState::Working(transaction_count) => {
761 if *transaction_count > 0 {
762 *transaction_count -= 1;
763 if *transaction_count == 0 {
764 platform_log(
765 LOG_TAG,
766 format!("http connection {} going idle", conn.debug_name),
767 );
768
769 let now = Instant::now();
770 *state = ConnectionState::Idle(now);
771
772 setup_idle_timeout(&conn, &connections, now);
773 }
774 } else {
775 panic!("")
776 }
777 }
778 }
779}
780
781fn setup_http_connection(
782 connections: &Arc<
783 Mutex<
784 Vec<(
785 Arc<HttpConnection>,
786 mpsc::Sender<(
787 Request,
788 mpsc::Sender<usize>,
789 oneshot::Sender<
790 Result<(Response, Option<mpsc::Receiver<io::Result<Vec<u8>>>>)>,
791 >,
792 )>,
793 )>,
794 >,
795 >,
796 conn: &Arc<HttpConnection>,
797 stream: ClientStream,
798) -> mpsc::Sender<(
799 Request,
800 mpsc::Sender<usize>,
801 oneshot::Sender<Result<(Response, Option<mpsc::Receiver<io::Result<Vec<u8>>>>)>>,
802)> {
803 let debug_name_1 = String::from(&conn.debug_name);
804 let debug_name_2 = String::from(&conn.debug_name);
805
806 let connections_1 = Arc::clone(connections);
807 let connections_2 = Arc::clone(connections);
808 let conn_1 = Arc::clone(conn);
809 let conn_2 = Arc::clone(conn);
810
811 let state_1 = Arc::clone(&conn.state);
812 let state_2 = Arc::clone(&conn.state);
813
814 let (transaction_tx, mut transaction_rx) = mpsc::channel::<(
815 &'static [u8],
816 oneshot::Sender<Result<(Response, Option<mpsc::Receiver<io::Result<Vec<u8>>>>)>>,
817 )>(MAX_CONCURRENT_REQUEST_IN_CONNECTIONS);
818
819 let (rh, mut wh) = tokio::io::split(stream);
820
821 tokio::spawn(async move {
822 let mut resp_stream = ResponseStream::new(rh);
823
824 loop {
825 match resp_stream.next().await {
826 Some(r) => match r {
827 ResponseOrAgain::Response(resp) => {
828 platform_log(LOG_TAG, format!("http connection {} on resp", debug_name_1));
829
830 let (req_method, resp_tx) = transaction_rx.recv().await.unwrap();
831
832 match resp_stream.setup_body_reading(req_method, &resp) {
833 Ok(data_rx) => {
834 if data_rx.is_none() {
835 on_transaction_complete(&state_1, &conn_1, &connections_1);
836 }
837
838 match resp_tx.send(Ok((resp, data_rx))) {
839 Ok(()) => {}
840
841 Err(_) => {
842 platform_log(
843 LOG_TAG,
844 "body reader dropped due to pipe error",
845 );
846 close_transaction_rx_and_quit(
847 transaction_rx,
848 ErrorKind::BrokenPipe,
849 )
850 .await;
851 break;
852 }
853 }
854 }
855
856 Err(e) => {
857 platform_log(
858 LOG_TAG,
859 format!("could not setup body reader due to error: {:?}", e),
860 );
861 close_transaction_rx_and_quit(
862 transaction_rx,
863 ErrorKind::BrokenPipe,
864 )
865 .await;
866 break;
867 }
868 }
869 }
870
871 ResponseOrAgain::Again(transaction_completed) => {
872 if transaction_completed {
873 on_transaction_complete(&state_1, &conn_1, &connections_1);
874 }
875 }
876 },
877
878 None => {
879 platform_log(LOG_TAG, format!("http connection {} closed", debug_name_1));
880 close_transaction_rx_and_quit(transaction_rx, ErrorKind::BrokenPipe).await;
881
882 let mut guard = connections_1.lock().unwrap();
883 let mut i = 0;
884 for (conn, _) in &mut *guard {
885 if Arc::ptr_eq(conn, &conn_1) {
886 let _ = (*guard).remove(i);
887 platform_log(
888 LOG_TAG,
889 format!("http connection {} removed", debug_name_1),
890 );
891 break;
892 }
893 i = i + 1;
894 }
895
896 break;
897 }
898 }
899 }
900 });
901
902 let (req_tx, mut req_rx) = mpsc::channel::<(
903 Request,
904 mpsc::Sender<usize>,
905 oneshot::Sender<Result<(Response, Option<mpsc::Receiver<io::Result<Vec<u8>>>>)>>,
906 )>(MAX_CONCURRENT_REQUEST_IN_CONNECTIONS);
907
908 tokio::spawn(async move {
909 'next: loop {
910 match req_rx.recv().await {
911 Some((request, prog_tx, resp_tx)) => {
912 platform_log(
913 LOG_TAG,
914 format!(
915 "on_transaction_received over http connection {}",
916 debug_name_2
917 ),
918 );
919
920 {
921 let mut guard = state_2.lock().unwrap();
922
923 match &mut *guard {
924 ConnectionState::Idle(_) => {
925 *guard = ConnectionState::Working(1);
926 }
927 ConnectionState::Working(transaction_count) => {
928 *transaction_count += 1;
929 }
930 }
931 }
932
933 let data_size = request.estimated_size();
934 let mut data = Vec::with_capacity(data_size);
935 {
936 let mut readers = Vec::new();
937 request.get_readers(&mut readers);
938 let mut chain = DynamicChain::new(readers);
939 match chain.read_to_end(&mut data) {
940 Ok(_) => {}
941 Err(_) => {} }
943 }
944
945 platform_log(
946 LOG_TAG,
947 format!("sending request {}", String::from_utf8_lossy(&data)),
948 );
949
950 let mut written = 0;
951 while written < data.len() {
952 match wh.write(&data[written..]).await {
953 Ok(size) => {
954 written = written + size;
955 }
956
957 Err(e) => {
958 platform_log(
959 LOG_TAG,
960 format!("socket write failed with error: {}", e),
961 );
962
963 match resp_tx.send(Err(ErrorKind::BrokenPipe)) {
964 Ok(()) => {}
965 Err(_) => {}
966 }
967 continue 'next;
968 }
969 }
970 }
971
972 if let Some(body) = request.body {
973 platform_log(LOG_TAG, "sending request body");
974
975 loop {
976 if let Ok(reader) = body.reader() {
977 let reader =
978 ProgressReportingReader::new(reader, move |read| match prog_tx
979 .blocking_send(read)
980 {
981 Ok(()) => {}
982 Err(e) => {}
983 });
984
985 let mut reader = reader.compat();
986 match copy(&mut reader, &mut wh).await {
987 Ok(i) => {
988 platform_log(
989 LOG_TAG,
990 format!("written {} bytes of http body", i),
991 );
992
993 match wh.flush().await {
994 Ok(()) => {
995 platform_log(LOG_TAG, "wh flushed");
996 }
997 Err(e) => {
998 platform_log(LOG_TAG, format!("wh error: {:?}", e));
999 }
1000 }
1001
1002 break;
1003 }
1004
1005 Err(e) => {
1006 platform_log(
1007 LOG_TAG,
1008 format!("socket write failed with error: {}", e),
1009 );
1010 }
1011 }
1012 } else {
1013 platform_log(LOG_TAG, "cannot create body reader");
1014 }
1015
1016 match resp_tx.send(Err(ErrorKind::BrokenPipe)) {
1017 Ok(()) => {}
1018 Err(_) => {
1019 platform_log(LOG_TAG, "response dropped");
1020 }
1021 }
1022
1023 continue 'next;
1024 }
1025 }
1026
1027 match transaction_tx.send((request.method, resp_tx)).await {
1028 Ok(()) => {}
1029
1030 Err(mpsc::error::SendError((_, tx))) => {
1031 match tx.send(Err(ErrorKind::BrokenPipe)) {
1032 Ok(()) => {}
1033 Err(_) => {
1034 platform_log(LOG_TAG, "transaction not processed");
1035 }
1036 }
1037 }
1038 }
1039 }
1040
1041 None => {
1042 platform_log(
1043 LOG_TAG,
1044 format!("no more request on http connection {}", debug_name_2),
1045 );
1046 match wh.shutdown().await {
1047 Ok(()) => platform_log(LOG_TAG, "write handle shutdown"),
1048 Err(e) => {
1049 platform_log(LOG_TAG, format!("write handle shutdown failed: {}", e))
1050 }
1051 }
1052 break;
1053 }
1054 }
1055 }
1056 });
1057
1058 req_tx
1059}
1060
1061async fn close_transaction_rx_and_quit(
1062 mut transaction_rx: mpsc::Receiver<(
1063 &'static [u8],
1064 oneshot::Sender<Result<(Response, Option<mpsc::Receiver<io::Result<Vec<u8>>>>)>>,
1065 )>,
1066 error_kind: ErrorKind,
1067) {
1068 platform_log(LOG_TAG, "close_transaction_rx_and_quit()");
1069
1070 transaction_rx.close();
1071
1072 while let Some((_, response_tx)) = transaction_rx.recv().await {
1073 match response_tx.send(Err(error_kind)) {
1074 Ok(()) => {}
1075
1076 Err(_) => {
1077 platform_log(LOG_TAG, "response dropped");
1078 }
1079 }
1080 }
1081
1082 platform_log(LOG_TAG, "transactions cleared");
1083}
1084
1085pub struct HttpConnectionHandle {
1086 cipher_id: Option<(u8, u8)>,
1087 tx: mpsc::Sender<(
1088 Request,
1089 mpsc::Sender<usize>,
1090 oneshot::Sender<Result<(Response, Option<mpsc::Receiver<io::Result<Vec<u8>>>>)>>,
1091 )>,
1092}
1093
1094impl HttpConnectionHandle {
1095 pub fn cipher_id(&self) -> Option<(u8, u8)> {
1096 self.cipher_id
1097 }
1098
1099 pub async fn send<F>(
1100 &self,
1101 mut request: Request,
1102 io_callback: F,
1103 ) -> Result<(Response, Option<Box<dyn AsyncBufRead + Send + Unpin>>)>
1104 where
1105 F: Fn(usize) + Send + Sync + 'static,
1106 {
1107 let (prog_tx, mut prog_rx) = mpsc::channel(1);
1108 let (resp_tx, resp_rx) = oneshot::channel();
1109
1110 if request.body.is_none() {
1111 if let None = header::search(&request.headers, b"Content-Length", true) {
1112 request.headers.push(Header::new("Content-Length", "0"));
1113 }
1114 }
1115
1116 tokio::spawn(async move {
1117 while let Some(written) = prog_rx.recv().await {
1118 io_callback(written);
1119 }
1120 });
1121
1122 match self.tx.send((request, prog_tx, resp_tx)).await {
1123 Ok(()) => match resp_rx.await {
1124 Ok(result) => {
1125 let (resp, data_rx) = result?;
1126
1127 platform_log(LOG_TAG, "resp transfered");
1128
1129 if let Some(data_rx) = data_rx {
1130 let stream = ReceiverStream::new(data_rx);
1131
1132 platform_log(LOG_TAG, "converting data stream into async reader");
1133
1134 let reader = stream.into_async_read();
1135
1136 Ok((resp, Some(Box::new(reader))))
1137 } else {
1138 Ok((resp, None))
1139 }
1140 }
1141
1142 Err(_) => Err(ErrorKind::BrokenPipe),
1143 },
1144
1145 Err(_) => Err(ErrorKind::BrokenPipe),
1146 }
1147 }
1148}