rust_rcs_core/http/
mod.rs

1// Copyright 2023 宋昊文
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15extern crate cookie;
16extern crate httparse;
17extern crate rustls;
18extern crate tokio;
19extern crate tokio_stream;
20extern crate url;
21extern crate walkdir;
22
23pub mod decode;
24pub mod decompress;
25pub mod request;
26pub mod response;
27
28use std::fmt;
29use std::io;
30use std::io::Read;
31use std::marker::Unpin;
32use std::net::IpAddr;
33use std::ops::Add;
34use std::sync::{Arc, Mutex};
35use std::time::Duration;
36
37use futures::io::AsyncBufRead;
38use futures::stream::TryStreamExt;
39use futures::StreamExt;
40
41use rustls::ClientConfig;
42
43use tokio::io::copy;
44use tokio::io::AsyncWriteExt;
45use tokio::runtime::Runtime;
46use tokio::sync::mpsc;
47use tokio::sync::oneshot;
48
49use tokio::time::sleep_until;
50use tokio::time::Instant;
51use tokio_stream::wrappers::ReceiverStream;
52
53use tokio_util::compat::FuturesAsyncReadCompatExt;
54use url::{Host, Url};
55use uuid::Uuid;
56
57use crate::dns::{DnsClient, DnsConfig};
58
59use crate::ffi::log::platform_log;
60use crate::ffi::net_ctrl::get_active_dns_servers;
61
62use crate::internet::header;
63use crate::internet::Header;
64use crate::io::network::stream;
65use crate::io::network::stream::ClientStream;
66use crate::io::DynamicChain;
67use crate::io::ProgressReportingReader;
68use crate::io::Serializable;
69
70use request::Request;
71use response::{Response, ResponseOrAgain, ResponseStream};
72
73const LOG_TAG: &str = "http_client";
74
75const MAX_CONCURRENT_CONNECTIONS: usize = 16;
76
77const MAX_CONCURRENT_REQUEST_IN_CONNECTIONS: usize = 16;
78
79enum ConnectMessage {
80    Request(
81        bool,
82        Host,
83        u16,
84        bool,
85        oneshot::Sender<Result<HttpConnectionHandle>>,
86    ),
87    // DnsResolved(SocketAddr, Host, u16, bool, oneshot::Sender<HttpConnectionHandle>),
88    // ConnectSuccess(ClientStream, Option<(u8, u8)>, Host, u16, bool, oneshot::Sender<HttpConnectionHandle>),
89}
90
91fn setup_idle_timeout(
92    conn: &Arc<HttpConnection>,
93    connections: &Arc<
94        Mutex<
95            Vec<(
96                Arc<HttpConnection>,
97                mpsc::Sender<(
98                    Request,
99                    mpsc::Sender<usize>,
100                    oneshot::Sender<
101                        Result<(Response, Option<mpsc::Receiver<io::Result<Vec<u8>>>>)>,
102                    >,
103                )>,
104            )>,
105        >,
106    >,
107    now: Instant,
108) {
109    let conn = Arc::clone(&conn);
110    let connections = Arc::clone(&connections);
111
112    tokio::spawn(async move {
113        let deadline = now.add(Duration::from_secs(16));
114        sleep_until(deadline).await;
115        platform_log(
116            LOG_TAG,
117            format!("on idle timer for http connection {}", conn.debug_name),
118        );
119        let mut guard = connections.lock().unwrap();
120        match *conn.state.lock().unwrap() {
121            ConnectionState::Idle(scheduled) => {
122                if scheduled == now {
123                    let mut i = 0;
124                    for (conn_, _) in &mut *guard {
125                        if Arc::ptr_eq(&conn, conn_) {
126                            let _ = (*guard).remove(i);
127                            platform_log(
128                                LOG_TAG,
129                                format!("http connection {} removed", conn.debug_name),
130                            );
131                            break;
132                        }
133                        i = i + 1;
134                    }
135                }
136            }
137            ConnectionState::Working(_) => {}
138        }
139    });
140}
141
142pub struct HttpClient {
143    // connections: Arc<Mutex<HashMap<Url, ConnectionState>>>,
144    // cookie_jar: CookieJar,
145    tx: mpsc::Sender<ConnectMessage>,
146}
147
148impl HttpClient {
149    pub fn new(
150        client_config: Arc<ClientConfig>,
151        dns_client: Arc<DnsClient>,
152        rt: Arc<Runtime>,
153    ) -> HttpClient {
154        let (tx, mut rx) = mpsc::channel::<ConnectMessage>(MAX_CONCURRENT_CONNECTIONS);
155
156        let connections: Arc<
157            Mutex<
158                Vec<(
159                    Arc<HttpConnection>,
160                    mpsc::Sender<(
161                        Request,
162                        mpsc::Sender<usize>,
163                        oneshot::Sender<
164                            Result<(Response, Option<mpsc::Receiver<io::Result<Vec<u8>>>>)>,
165                        >,
166                    )>,
167                )>,
168            >,
169        > = Arc::new(Mutex::new(Vec::new()));
170
171        let mut stdout = tokio::io::stdout();
172
173        rt.spawn(async move {
174            'next: loop {
175                match rx.recv().await {
176                    Some(cm) => {
177                        match cm {
178                            ConnectMessage::Request(tls, host, port, can_pipeline, tx) => {
179                                stdout
180                                    .write(b"getting http connect request\r\n")
181                                    .await
182                                    .unwrap();
183
184                                {
185                                    let mut guard = connections.lock().unwrap();
186
187                                    for (conn, req_tx) in &mut *guard {
188                                        if conn.host == host && conn.port == port {
189                                            let mut conn_guard = conn.state.lock().unwrap();
190                                            match &mut *conn_guard {
191                                                ConnectionState::Idle(_) if conn.can_pipeline == can_pipeline => {
192                                                    *conn_guard = ConnectionState::Working(0);
193
194                                                    match tx.send(Ok(HttpConnectionHandle {
195                                                        cipher_id: conn.cipher_id,
196                                                        tx: req_tx.clone(),
197                                                    })) {
198                                                        Ok(()) => {
199                                                            continue 'next;
200                                                        }
201
202                                                        Err(_) => {
203                                                            panic!("")
204                                                        }
205                                                    }
206                                                }
207
208                                                ConnectionState::Working(_) if conn.can_pipeline && can_pipeline => {
209                                                    match tx.send(Ok(HttpConnectionHandle {
210                                                        cipher_id: conn.cipher_id,
211                                                        tx: req_tx.clone(),
212                                                    })) {
213                                                        Ok(()) => {
214                                                            continue 'next;
215                                                        }
216
217                                                        Err(_) => {
218                                                            panic!("")
219                                                        }
220                                                    }
221                                                }
222
223                                                _ => {}
224                                            }
225                                        }
226                                    }
227                                }
228
229                                match host {
230                                    Host::Domain(domain) => {
231                                        let client_config = Arc::clone(&client_config);
232                                        let dns_client = Arc::clone(&dns_client);
233                                        let connections = Arc::clone(&connections);
234
235                                        stdout.write(b"resolve dns for ").await.unwrap();
236                                        stdout.write(domain.as_bytes()).await.unwrap();
237                                        stdout.write(b"\r\n").await.unwrap();
238
239                                        tokio::spawn(async move {
240                                            let dns_servers = get_active_dns_servers();
241
242                                            let dns_config = DnsConfig { server_addrs: dns_servers };
243
244                                            match dns_client.resolve(dns_config, domain.clone()).await {
245                                                Ok(mut stream) => {
246                                                    while let Some(ip) = stream.next().await {
247                                                        if let IpAddr::V6(_) = ip {
248                                                            continue; // to-do: IPv6 not supported?
249                                                        }
250                                                        let client_config = Arc::clone(&client_config);
251                                                        match Self::connect_inner(
252                                                            tls,
253                                                            Host::Domain(domain.clone()),
254                                                            ip,
255                                                            port,
256                                                            client_config,
257                                                            &domain,
258                                                            can_pipeline,
259                                                            &connections,
260                                                        )
261                                                        .await
262                                                        {
263                                                            Ok(handle) => match tx.send(Ok(handle)) {
264                                                                Ok(()) => {
265                                                                    return;
266                                                                }
267
268                                                                Err(_) => {
269                                                                    panic!("")
270                                                                }
271                                                            },
272
273                                                            Err(e) => {
274                                                                platform_log(
275                                                                    LOG_TAG,
276                                                                    format!(
277                                                                        "connect_inner failed with error {:?}",
278                                                                        e
279                                                                    ),
280                                                                );
281                                                            }
282                                                        }
283                                                    }
284
285                                                    match tx.send(Err(ErrorKind::Dns)) {
286                                                        Ok(()) => {
287                                                            return;
288                                                        }
289
290                                                        Err(_) => {
291                                                            panic!("")
292                                                        }
293                                                    }
294                                                }
295
296                                                Err(_) => match tx.send(Err(ErrorKind::Dns)) {
297                                                    Ok(()) => {
298                                                        return;
299                                                    }
300
301                                                    Err(_) => {
302                                                        panic!("")
303                                                    }
304                                                },
305                                            }
306                                        });
307                                    }
308
309                                    Host::Ipv4(ip) => {
310                                        let client_config = Arc::clone(&client_config);
311                                        let connections = Arc::clone(&connections);
312
313                                        tokio::spawn(async move {
314                                            if let Ok(handle) = Self::connect_inner(
315                                                tls,
316                                                host,
317                                                IpAddr::V4(ip),
318                                                port,
319                                                client_config,
320                                                &ip.to_string(),
321                                                can_pipeline,
322                                                &connections,
323                                            )
324                                            .await
325                                            {
326                                                match tx.send(Ok(handle)) {
327                                                    Ok(()) => {
328                                                        return;
329                                                    }
330
331                                                    Err(_) => {
332                                                        panic!("")
333                                                    }
334                                                }
335                                            }
336                                        });
337                                    }
338
339                                    Host::Ipv6(ip) => {
340                                        let client_config = Arc::clone(&client_config);
341                                        let connections = Arc::clone(&connections);
342
343                                        tokio::spawn(async move {
344                                            if let Ok(handle) = Self::connect_inner(
345                                                tls,
346                                                host,
347                                                IpAddr::V6(ip),
348                                                port,
349                                                client_config,
350                                                &ip.to_string(),
351                                                can_pipeline,
352                                                &connections,
353                                            )
354                                            .await
355                                            {
356                                                match tx.send(Ok(handle)) {
357                                                    Ok(()) => {
358                                                        return;
359                                                    }
360
361                                                    Err(_) => {
362                                                        panic!("")
363                                                    }
364                                                }
365                                            }
366                                        });
367                                    }
368                                }
369                            }
370
371                        }
372                    },
373
374                    None => break,
375                }
376            }
377        });
378
379        HttpClient {
380            // connections: Arc::new(Mutex::new(HashMap::new())),
381            // cookie_jar: CookieJar::new(),
382            tx,
383        }
384    }
385
386    async fn connect_inner(
387        tls: bool,
388        host: Host,
389        addr: IpAddr,
390        port: u16,
391        client_config: Arc<ClientConfig>,
392        server_name: &str,
393        can_pipeline: bool,
394        connections: &Arc<
395            Mutex<
396                Vec<(
397                    Arc<HttpConnection>,
398                    mpsc::Sender<(
399                        Request,
400                        mpsc::Sender<usize>,
401                        oneshot::Sender<
402                            Result<(Response, Option<mpsc::Receiver<io::Result<Vec<u8>>>>)>,
403                        >,
404                    )>,
405                )>,
406            >,
407        >,
408    ) -> Result<HttpConnectionHandle> {
409        platform_log(
410            LOG_TAG,
411            format!(
412                "connect_inner: {}/{}:{}",
413                host.to_string(),
414                addr.to_string(),
415                &port
416            ),
417        );
418
419        if tls {
420            let (stream, cipher_id) =
421                Self::connect_tls(addr, port, client_config, server_name).await?;
422            let now = Instant::now();
423            let conn = HttpConnection::new(host, port, cipher_id, can_pipeline, now);
424            let conn = Arc::new(conn);
425            let conn_ = Arc::clone(&conn);
426            let req_tx = setup_http_connection(connections, &conn, stream);
427
428            let handle = HttpConnectionHandle {
429                cipher_id: conn.cipher_id,
430                tx: req_tx.clone(),
431            };
432
433            let mut guard = connections.lock().unwrap();
434
435            guard.push((conn, req_tx));
436
437            setup_idle_timeout(&conn_, &connections, now);
438
439            return Ok(handle);
440        } else {
441            let stream = Self::connect_tcp(addr, port).await?;
442            let now = Instant::now();
443            let conn = HttpConnection::new(host, port, None, can_pipeline, now);
444            let conn = Arc::new(conn);
445            let req_tx = setup_http_connection(connections, &conn, stream);
446            let conn_ = Arc::clone(&conn);
447
448            let handle = HttpConnectionHandle {
449                cipher_id: conn.cipher_id,
450                tx: req_tx.clone(),
451            };
452
453            let mut guard = connections.lock().unwrap();
454
455            guard.push((conn, req_tx));
456
457            setup_idle_timeout(&conn_, &connections, now);
458
459            return Ok(handle);
460        }
461    }
462
463    async fn connect_tcp(ip: IpAddr, port: u16) -> Result<ClientStream> {
464        #[cfg(all(feature = "android", target_os = "android"))]
465        match ClientStream::new_android(ip, port).await {
466            Ok(client_stream) => Ok(client_stream),
467
468            Err(e) => Err(ErrorKind::Stream(e)),
469        }
470
471        #[cfg(all(feature = "ohos", all(target_os = "linux", target_env = "ohos")))]
472        match ClientStream::new_ohos(ip, port).await {
473            Ok(client_stream) => Ok(client_stream),
474
475            Err(e) => Err(ErrorKind::Stream(e)),
476        }
477
478        #[cfg(not(any(
479            all(feature = "android", target_os = "android"),
480            all(feature = "ohos", all(target_os = "linux", target_env = "ohos"))
481        )))]
482        match ClientStream::new_tokio(ip, port).await {
483            Ok(client_stream) => Ok(client_stream),
484
485            Err(e) => Err(ErrorKind::Stream(e)),
486        }
487    }
488
489    async fn connect_tls(
490        ip: IpAddr,
491        port: u16,
492        client_config: Arc<ClientConfig>,
493        server_name: &str,
494    ) -> Result<(ClientStream, Option<(u8, u8)>)> {
495        #[cfg(all(feature = "android", target_os = "android"))]
496        match ClientStream::new_android_ssl(ip, port, server_name).await {
497            Ok(client_stream) => match client_stream.do_handshake().await {
498                Ok((client_stream, cipher_id)) => {
499                    platform_log(LOG_TAG, format!("ssl do_handshake success"));
500                    Ok((client_stream, cipher_id))
501                }
502                Err(e) => {
503                    platform_log(
504                        LOG_TAG,
505                        format!("ssl do_handshake failed with error {:?}", e),
506                    );
507                    Err(ErrorKind::Stream(e))
508                }
509            },
510
511            Err(e) => {
512                platform_log(LOG_TAG, format!("ssl new failed with error {:?}", e));
513                Err(ErrorKind::Stream(e))
514            }
515        }
516
517        #[cfg(all(feature = "ohos", all(target_os = "linux", target_env = "ohos")))]
518        match ClientStream::new_ohos_ssl(client_config, ip, port, server_name).await {
519            Ok(client_stream) => match client_stream.do_handshake().await {
520                Ok((client_stream, cipher_id)) => {
521                    platform_log(LOG_TAG, format!("ssl do_handshake success"));
522                    Ok((client_stream, cipher_id))
523                }
524                Err(e) => {
525                    platform_log(
526                        LOG_TAG,
527                        format!("ssl do_handshake failed with error {:?}", e),
528                    );
529                    Err(ErrorKind::Stream(e))
530                }
531            },
532
533            Err(e) => {
534                platform_log(LOG_TAG, format!("ssl new failed with error {:?}", e));
535                Err(ErrorKind::Stream(e))
536            }
537        }
538
539        #[cfg(not(any(
540            all(feature = "android", target_os = "android"),
541            all(feature = "ohos", all(target_os = "linux", target_env = "ohos"))
542        )))]
543        match ClientStream::new_tokio_ssl(client_config, ip, port, server_name).await {
544            Ok(client_stream) => match client_stream.do_handshake().await {
545                Ok((client_stream, cipher_id)) => {
546                    platform_log(LOG_TAG, format!("ssl do_handshake success"));
547                    Ok((client_stream, cipher_id))
548                }
549                Err(e) => {
550                    platform_log(
551                        LOG_TAG,
552                        format!("ssl do_handshake failed with error {:?}", e),
553                    );
554                    Err(ErrorKind::Stream(e))
555                }
556            },
557
558            Err(e) => {
559                platform_log(LOG_TAG, format!("ssl new failed with error {:?}", e));
560                Err(ErrorKind::Stream(e))
561            }
562        }
563    }
564
565    pub async fn connect(&self, url: &Url, can_pipeline: bool) -> Result<HttpConnectionHandle> {
566        if let Some(host) = url.host() {
567            let tls = if url.scheme() == "https" { true } else { false };
568
569            let port = if let Some(port_) = url.port() {
570                port_
571            } else {
572                if tls {
573                    443
574                } else {
575                    80
576                }
577            };
578
579            match host {
580                Host::Domain(domain) => {
581                    let (request_tx, request_rx) =
582                        oneshot::channel::<Result<HttpConnectionHandle>>();
583
584                    match self
585                        .tx
586                        .send(ConnectMessage::Request(
587                            tls,
588                            Host::Domain(String::from(domain)),
589                            port,
590                            can_pipeline,
591                            request_tx,
592                        ))
593                        .await
594                    {
595                        Ok(()) => request_rx.await.unwrap(),
596
597                        Err(_) => Err(ErrorKind::BrokenPipe),
598                    }
599                }
600
601                Host::Ipv4(ip) => {
602                    let (request_tx, request_rx) =
603                        oneshot::channel::<Result<HttpConnectionHandle>>();
604
605                    match self
606                        .tx
607                        .send(ConnectMessage::Request(
608                            tls,
609                            Host::Ipv4(ip),
610                            port,
611                            can_pipeline,
612                            request_tx,
613                        ))
614                        .await
615                    {
616                        Ok(()) => request_rx.await.unwrap(),
617
618                        Err(_) => Err(ErrorKind::BrokenPipe),
619                    }
620                }
621
622                Host::Ipv6(ip) => {
623                    let (request_tx, request_rx) =
624                        oneshot::channel::<Result<HttpConnectionHandle>>();
625
626                    match self
627                        .tx
628                        .send(ConnectMessage::Request(
629                            tls,
630                            Host::Ipv6(ip),
631                            port,
632                            can_pipeline,
633                            request_tx,
634                        ))
635                        .await
636                    {
637                        Ok(()) => request_rx.await.unwrap(),
638
639                        Err(_) => Err(ErrorKind::BrokenPipe),
640                    }
641                }
642            }
643        } else {
644            Err(ErrorKind::BadFormat)
645        }
646    }
647}
648
649pub enum ErrorKind {
650    BadFormat,
651    Dns,
652    Stream(stream::ErrorKind),
653    BrokenPipe,
654    ConnectionLost,
655    ProtocolError,
656}
657
658impl Copy for ErrorKind {}
659
660impl Clone for ErrorKind {
661    fn clone(&self) -> ErrorKind {
662        *self
663    }
664}
665
666impl fmt::Debug for ErrorKind {
667    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
668        match self {
669            ErrorKind::BadFormat => {
670                write!(f, "BadFormat")
671            }
672
673            ErrorKind::Dns => {
674                write!(f, "Dns")
675            }
676
677            ErrorKind::Stream(e) => {
678                write!(f, "Stream {:?}", e)
679            }
680
681            ErrorKind::BrokenPipe => {
682                write!(f, "BrokenPipe")
683            }
684
685            ErrorKind::ConnectionLost => {
686                write!(f, "ConnectionLost")
687            }
688
689            ErrorKind::ProtocolError => {
690                write!(f, "ProtocolError")
691            }
692        }
693    }
694}
695
696pub type Result<T> = std::result::Result<T, ErrorKind>;
697
698enum ConnectionState {
699    Idle(Instant),
700    Working(usize), // Working(transaction_count)
701}
702
703pub struct HttpConnection {
704    debug_name: String,
705    state: Arc<Mutex<ConnectionState>>,
706    can_pipeline: bool,
707    host: Host,
708    port: u16,
709    cipher_id: Option<(u8, u8)>,
710}
711
712impl HttpConnection {
713    pub fn new(
714        host: Host,
715        port: u16,
716        cipher_id: Option<(u8, u8)>,
717        can_pipeline: bool,
718        now: Instant,
719    ) -> HttpConnection {
720        let state = Arc::new(Mutex::new(ConnectionState::Idle(now)));
721        HttpConnection {
722            debug_name: Uuid::new_v4().to_string(),
723            state,
724            can_pipeline,
725            host,
726            port,
727            cipher_id,
728        }
729    }
730}
731
732fn on_transaction_complete(
733    state: &Arc<Mutex<ConnectionState>>,
734    conn: &Arc<HttpConnection>,
735    connections: &Arc<
736        Mutex<
737            Vec<(
738                Arc<HttpConnection>,
739                mpsc::Sender<(
740                    Request,
741                    mpsc::Sender<usize>,
742                    oneshot::Sender<
743                        Result<(Response, Option<mpsc::Receiver<io::Result<Vec<u8>>>>)>,
744                    >,
745                )>,
746            )>,
747        >,
748    >,
749) {
750    platform_log(
751        LOG_TAG,
752        format!(
753            "on_transaction_complete for http connection {}",
754            conn.debug_name
755        ),
756    );
757    let mut state = state.lock().unwrap();
758    match &mut *state {
759        ConnectionState::Idle(_) => panic!(""),
760        ConnectionState::Working(transaction_count) => {
761            if *transaction_count > 0 {
762                *transaction_count -= 1;
763                if *transaction_count == 0 {
764                    platform_log(
765                        LOG_TAG,
766                        format!("http connection {} going idle", conn.debug_name),
767                    );
768
769                    let now = Instant::now();
770                    *state = ConnectionState::Idle(now);
771
772                    setup_idle_timeout(&conn, &connections, now);
773                }
774            } else {
775                panic!("")
776            }
777        }
778    }
779}
780
781fn setup_http_connection(
782    connections: &Arc<
783        Mutex<
784            Vec<(
785                Arc<HttpConnection>,
786                mpsc::Sender<(
787                    Request,
788                    mpsc::Sender<usize>,
789                    oneshot::Sender<
790                        Result<(Response, Option<mpsc::Receiver<io::Result<Vec<u8>>>>)>,
791                    >,
792                )>,
793            )>,
794        >,
795    >,
796    conn: &Arc<HttpConnection>,
797    stream: ClientStream,
798) -> mpsc::Sender<(
799    Request,
800    mpsc::Sender<usize>,
801    oneshot::Sender<Result<(Response, Option<mpsc::Receiver<io::Result<Vec<u8>>>>)>>,
802)> {
803    let debug_name_1 = String::from(&conn.debug_name);
804    let debug_name_2 = String::from(&conn.debug_name);
805
806    let connections_1 = Arc::clone(connections);
807    let connections_2 = Arc::clone(connections);
808    let conn_1 = Arc::clone(conn);
809    let conn_2 = Arc::clone(conn);
810
811    let state_1 = Arc::clone(&conn.state);
812    let state_2 = Arc::clone(&conn.state);
813
814    let (transaction_tx, mut transaction_rx) = mpsc::channel::<(
815        &'static [u8],
816        oneshot::Sender<Result<(Response, Option<mpsc::Receiver<io::Result<Vec<u8>>>>)>>,
817    )>(MAX_CONCURRENT_REQUEST_IN_CONNECTIONS);
818
819    let (rh, mut wh) = tokio::io::split(stream);
820
821    tokio::spawn(async move {
822        let mut resp_stream = ResponseStream::new(rh);
823
824        loop {
825            match resp_stream.next().await {
826                Some(r) => match r {
827                    ResponseOrAgain::Response(resp) => {
828                        platform_log(LOG_TAG, format!("http connection {} on resp", debug_name_1));
829
830                        let (req_method, resp_tx) = transaction_rx.recv().await.unwrap();
831
832                        match resp_stream.setup_body_reading(req_method, &resp) {
833                            Ok(data_rx) => {
834                                if data_rx.is_none() {
835                                    on_transaction_complete(&state_1, &conn_1, &connections_1);
836                                }
837
838                                match resp_tx.send(Ok((resp, data_rx))) {
839                                    Ok(()) => {}
840
841                                    Err(_) => {
842                                        platform_log(
843                                            LOG_TAG,
844                                            "body reader dropped due to pipe error",
845                                        );
846                                        close_transaction_rx_and_quit(
847                                            transaction_rx,
848                                            ErrorKind::BrokenPipe,
849                                        )
850                                        .await;
851                                        break;
852                                    }
853                                }
854                            }
855
856                            Err(e) => {
857                                platform_log(
858                                    LOG_TAG,
859                                    format!("could not setup body reader due to error: {:?}", e),
860                                );
861                                close_transaction_rx_and_quit(
862                                    transaction_rx,
863                                    ErrorKind::BrokenPipe,
864                                )
865                                .await;
866                                break;
867                            }
868                        }
869                    }
870
871                    ResponseOrAgain::Again(transaction_completed) => {
872                        if transaction_completed {
873                            on_transaction_complete(&state_1, &conn_1, &connections_1);
874                        }
875                    }
876                },
877
878                None => {
879                    platform_log(LOG_TAG, format!("http connection {} closed", debug_name_1));
880                    close_transaction_rx_and_quit(transaction_rx, ErrorKind::BrokenPipe).await;
881
882                    let mut guard = connections_1.lock().unwrap();
883                    let mut i = 0;
884                    for (conn, _) in &mut *guard {
885                        if Arc::ptr_eq(conn, &conn_1) {
886                            let _ = (*guard).remove(i);
887                            platform_log(
888                                LOG_TAG,
889                                format!("http connection {} removed", debug_name_1),
890                            );
891                            break;
892                        }
893                        i = i + 1;
894                    }
895
896                    break;
897                }
898            }
899        }
900    });
901
902    let (req_tx, mut req_rx) = mpsc::channel::<(
903        Request,
904        mpsc::Sender<usize>,
905        oneshot::Sender<Result<(Response, Option<mpsc::Receiver<io::Result<Vec<u8>>>>)>>,
906    )>(MAX_CONCURRENT_REQUEST_IN_CONNECTIONS);
907
908    tokio::spawn(async move {
909        'next: loop {
910            match req_rx.recv().await {
911                Some((request, prog_tx, resp_tx)) => {
912                    platform_log(
913                        LOG_TAG,
914                        format!(
915                            "on_transaction_received over http connection {}",
916                            debug_name_2
917                        ),
918                    );
919
920                    {
921                        let mut guard = state_2.lock().unwrap();
922
923                        match &mut *guard {
924                            ConnectionState::Idle(_) => {
925                                *guard = ConnectionState::Working(1);
926                            }
927                            ConnectionState::Working(transaction_count) => {
928                                *transaction_count += 1;
929                            }
930                        }
931                    }
932
933                    let data_size = request.estimated_size();
934                    let mut data = Vec::with_capacity(data_size);
935                    {
936                        let mut readers = Vec::new();
937                        request.get_readers(&mut readers);
938                        let mut chain = DynamicChain::new(readers);
939                        match chain.read_to_end(&mut data) {
940                            Ok(_) => {}
941                            Err(_) => {} // to-do: early failure
942                        }
943                    }
944
945                    platform_log(
946                        LOG_TAG,
947                        format!("sending request {}", String::from_utf8_lossy(&data)),
948                    );
949
950                    let mut written = 0;
951                    while written < data.len() {
952                        match wh.write(&data[written..]).await {
953                            Ok(size) => {
954                                written = written + size;
955                            }
956
957                            Err(e) => {
958                                platform_log(
959                                    LOG_TAG,
960                                    format!("socket write failed with error: {}", e),
961                                );
962
963                                match resp_tx.send(Err(ErrorKind::BrokenPipe)) {
964                                    Ok(()) => {}
965                                    Err(_) => {}
966                                }
967                                continue 'next;
968                            }
969                        }
970                    }
971
972                    if let Some(body) = request.body {
973                        platform_log(LOG_TAG, "sending request body");
974
975                        loop {
976                            if let Ok(reader) = body.reader() {
977                                let reader =
978                                    ProgressReportingReader::new(reader, move |read| match prog_tx
979                                        .blocking_send(read)
980                                    {
981                                        Ok(()) => {}
982                                        Err(e) => {}
983                                    });
984
985                                let mut reader = reader.compat();
986                                match copy(&mut reader, &mut wh).await {
987                                    Ok(i) => {
988                                        platform_log(
989                                            LOG_TAG,
990                                            format!("written {} bytes of http body", i),
991                                        );
992
993                                        match wh.flush().await {
994                                            Ok(()) => {
995                                                platform_log(LOG_TAG, "wh flushed");
996                                            }
997                                            Err(e) => {
998                                                platform_log(LOG_TAG, format!("wh error: {:?}", e));
999                                            }
1000                                        }
1001
1002                                        break;
1003                                    }
1004
1005                                    Err(e) => {
1006                                        platform_log(
1007                                            LOG_TAG,
1008                                            format!("socket write failed with error: {}", e),
1009                                        );
1010                                    }
1011                                }
1012                            } else {
1013                                platform_log(LOG_TAG, "cannot create body reader");
1014                            }
1015
1016                            match resp_tx.send(Err(ErrorKind::BrokenPipe)) {
1017                                Ok(()) => {}
1018                                Err(_) => {
1019                                    platform_log(LOG_TAG, "response dropped");
1020                                }
1021                            }
1022
1023                            continue 'next;
1024                        }
1025                    }
1026
1027                    match transaction_tx.send((request.method, resp_tx)).await {
1028                        Ok(()) => {}
1029
1030                        Err(mpsc::error::SendError((_, tx))) => {
1031                            match tx.send(Err(ErrorKind::BrokenPipe)) {
1032                                Ok(()) => {}
1033                                Err(_) => {
1034                                    platform_log(LOG_TAG, "transaction not processed");
1035                                }
1036                            }
1037                        }
1038                    }
1039                }
1040
1041                None => {
1042                    platform_log(
1043                        LOG_TAG,
1044                        format!("no more request on http connection {}", debug_name_2),
1045                    );
1046                    match wh.shutdown().await {
1047                        Ok(()) => platform_log(LOG_TAG, "write handle shutdown"),
1048                        Err(e) => {
1049                            platform_log(LOG_TAG, format!("write handle shutdown failed: {}", e))
1050                        }
1051                    }
1052                    break;
1053                }
1054            }
1055        }
1056    });
1057
1058    req_tx
1059}
1060
1061async fn close_transaction_rx_and_quit(
1062    mut transaction_rx: mpsc::Receiver<(
1063        &'static [u8],
1064        oneshot::Sender<Result<(Response, Option<mpsc::Receiver<io::Result<Vec<u8>>>>)>>,
1065    )>,
1066    error_kind: ErrorKind,
1067) {
1068    platform_log(LOG_TAG, "close_transaction_rx_and_quit()");
1069
1070    transaction_rx.close();
1071
1072    while let Some((_, response_tx)) = transaction_rx.recv().await {
1073        match response_tx.send(Err(error_kind)) {
1074            Ok(()) => {}
1075
1076            Err(_) => {
1077                platform_log(LOG_TAG, "response dropped");
1078            }
1079        }
1080    }
1081
1082    platform_log(LOG_TAG, "transactions cleared");
1083}
1084
1085pub struct HttpConnectionHandle {
1086    cipher_id: Option<(u8, u8)>,
1087    tx: mpsc::Sender<(
1088        Request,
1089        mpsc::Sender<usize>,
1090        oneshot::Sender<Result<(Response, Option<mpsc::Receiver<io::Result<Vec<u8>>>>)>>,
1091    )>,
1092}
1093
1094impl HttpConnectionHandle {
1095    pub fn cipher_id(&self) -> Option<(u8, u8)> {
1096        self.cipher_id
1097    }
1098
1099    pub async fn send<F>(
1100        &self,
1101        mut request: Request,
1102        io_callback: F,
1103    ) -> Result<(Response, Option<Box<dyn AsyncBufRead + Send + Unpin>>)>
1104    where
1105        F: Fn(usize) + Send + Sync + 'static,
1106    {
1107        let (prog_tx, mut prog_rx) = mpsc::channel(1);
1108        let (resp_tx, resp_rx) = oneshot::channel();
1109
1110        if request.body.is_none() {
1111            if let None = header::search(&request.headers, b"Content-Length", true) {
1112                request.headers.push(Header::new("Content-Length", "0"));
1113            }
1114        }
1115
1116        tokio::spawn(async move {
1117            while let Some(written) = prog_rx.recv().await {
1118                io_callback(written);
1119            }
1120        });
1121
1122        match self.tx.send((request, prog_tx, resp_tx)).await {
1123            Ok(()) => match resp_rx.await {
1124                Ok(result) => {
1125                    let (resp, data_rx) = result?;
1126
1127                    platform_log(LOG_TAG, "resp transfered");
1128
1129                    if let Some(data_rx) = data_rx {
1130                        let stream = ReceiverStream::new(data_rx);
1131
1132                        platform_log(LOG_TAG, "converting data stream into async reader");
1133
1134                        let reader = stream.into_async_read();
1135
1136                        Ok((resp, Some(Box::new(reader))))
1137                    } else {
1138                        Ok((resp, None))
1139                    }
1140                }
1141
1142                Err(_) => Err(ErrorKind::BrokenPipe),
1143            },
1144
1145            Err(_) => Err(ErrorKind::BrokenPipe),
1146        }
1147    }
1148}