rust_rcs_core/http/
response.rs

1// Copyright 2023 宋昊文
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15extern crate futures;
16extern crate httparse;
17extern crate tokio;
18extern crate tokio_util;
19
20use std::fmt;
21use std::io;
22use std::pin::Pin;
23use std::task::{Context, Poll};
24
25use futures::future::Future;
26use futures::AsyncReadExt;
27
28use httparse::Response as HttpResponse;
29
30use libc::c_void;
31use tokio::io::{AsyncRead, ReadBuf, ReadHalf};
32use tokio::sync::mpsc;
33
34use tokio_util::sync::PollSender;
35
36use crate::ffi::log::platform_log;
37
38use crate::internet::header::Header;
39use crate::internet::syntax;
40use crate::internet::AsHeaderField;
41
42use crate::io::network::stream::ClientStream;
43
44use crate::util::raw_string::{StrEq, ToInt};
45
46use super::decode::{
47    ChunkDecodeResult, ChunkDecoder, ErrorKind, HeaderPartDecodeStatus, HeaderPartDecoder, Result,
48};
49
50use super::decompress::Decompressor;
51
52const LOG_TAG: &str = "http_client_decode";
53
54pub struct Response {
55    pub status_code: u16,
56    pub reason_phrase: Vec<u8>,
57    pub headers: Vec<Header>,
58}
59
60impl Response {
61    pub fn from(resp: &HttpResponse) -> Option<Response> {
62        if let (Some(code), Some(reason)) = (resp.code, resp.reason) {
63            platform_log(LOG_TAG, format!("{} {} HTTP/1.1", code, reason));
64            let mut headers = Vec::new();
65
66            for h in &*resp.headers {
67                platform_log(
68                    LOG_TAG,
69                    format!("{}: {}", h.name, String::from_utf8_lossy(h.value)),
70                );
71                headers.push(Header::new(String::from(h.name), h.value.to_vec()));
72            }
73
74            return Some(Response {
75                status_code: code,
76                reason_phrase: reason.as_bytes().to_vec(),
77                headers,
78            });
79        }
80
81        None
82    }
83
84    pub fn get_response_transfer_method(
85        &self,
86        req_method: &[u8],
87    ) -> Result<Option<TransferMethod>> {
88        if req_method == b"HEAD"
89            || self.status_code < 200
90            || self.status_code == 204
91            || self.status_code == 304
92        {
93            return Ok(None);
94        }
95
96        if req_method == b"CONNECT" && self.status_code >= 200 && self.status_code < 300 {
97            return Ok(None);
98        }
99
100        let mut method = TransferMethod::Unbounded;
101
102        for header in &self.headers {
103            if header.get_name().equals_bytes(b"Content-Length", true) {
104                if let Ok(i) = header.get_value().to_int() {
105                    match method {
106                        TransferMethod::Sized(_) | TransferMethod::Chunked(_) => {
107                            return Err(ErrorKind::Parse);
108                        }
109                        TransferMethod::Unbounded => {
110                            method = TransferMethod::Sized(SizedTransferEncoding::Plain(i));
111                        }
112                    }
113                }
114            } else if header.get_name().equals_bytes(b"Transfer-Encoding", true) {
115                let header_field = header.get_value().as_header_field();
116                let mut iter = header_field.value.split(|c| *c == b',');
117                while let Some(value) = iter.next() {
118                    let value = syntax::trim(value);
119                    if value.equals_bytes(b"chunked", true) {
120                        match &mut method {
121                            TransferMethod::Sized(encoding) => match encoding {
122                                SizedTransferEncoding::Plain(_) => {
123                                    return Err(ErrorKind::Parse);
124                                }
125
126                                SizedTransferEncoding::Encoded(ref mut encodings) => {
127                                    let mut v = Vec::new();
128                                    v.append(encodings);
129                                    method = TransferMethod::Chunked(
130                                        ChunkedTransferEncoding::Encoded(v),
131                                    );
132                                }
133                            },
134
135                            TransferMethod::Chunked(_) => {
136                                return Err(ErrorKind::Parse);
137                            }
138
139                            TransferMethod::Unbounded => {
140                                method = TransferMethod::Chunked(ChunkedTransferEncoding::Plain);
141                            }
142                        }
143                    } else if value.equals_bytes(b"br", true)
144                        || value.equals_bytes(b"compress", true)
145                        || value.equals_bytes(b"deflate", true)
146                        || value.equals_bytes(b"gzip", true)
147                    {
148                        match &mut method {
149                            TransferMethod::Sized(encoding) => match encoding {
150                                SizedTransferEncoding::Plain(_) => {
151                                    return Err(ErrorKind::Parse);
152                                }
153
154                                SizedTransferEncoding::Encoded(ref mut encodings) => {
155                                    if value.equals_bytes(b"compress", true) {
156                                        encodings.push(Encoding::Compress);
157                                    } else if value.equals_bytes(b"deflate", true) {
158                                        encodings.push(Encoding::Deflate);
159                                    } else if value.equals_bytes(b"gzip", true) {
160                                        encodings.push(Encoding::Gzip);
161                                    } else {
162                                        panic!("Impossible Condition");
163                                    }
164                                }
165                            },
166
167                            TransferMethod::Chunked(_) => {
168                                return Err(ErrorKind::Parse);
169                            }
170
171                            TransferMethod::Unbounded => {
172                                let mut v = Vec::new();
173                                if value.equals_bytes(b"compress", true) {
174                                    v.push(Encoding::Compress);
175                                } else if value.equals_bytes(b"deflate", true) {
176                                    v.push(Encoding::Deflate);
177                                } else if value.equals_bytes(b"gzip", true) {
178                                    v.push(Encoding::Gzip);
179                                } else {
180                                    panic!("Impossible Condition");
181                                }
182                                method = TransferMethod::Sized(SizedTransferEncoding::Encoded(v));
183                            }
184                        }
185                    }
186                }
187            }
188        }
189
190        Ok(Some(method))
191    }
192}
193
194pub enum Encoding {
195    Brotli,
196    Compress,
197    Deflate,
198    Gzip,
199}
200
201impl Copy for Encoding {}
202
203impl Clone for Encoding {
204    fn clone(&self) -> Encoding {
205        *self
206    }
207}
208
209impl fmt::Debug for Encoding {
210    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211        match self {
212            Encoding::Brotli => {
213                write!(f, "Brotli")
214            }
215
216            Encoding::Compress => {
217                write!(f, "Compress")
218            }
219
220            Encoding::Deflate => {
221                write!(f, "Deflate")
222            }
223
224            Encoding::Gzip => {
225                write!(f, "Gzip")
226            }
227        }
228    }
229}
230
231pub enum SizedTransferEncoding {
232    Plain(usize),
233    Encoded(Vec<Encoding>),
234}
235
236impl Clone for SizedTransferEncoding {
237    fn clone(&self) -> SizedTransferEncoding {
238        match self {
239            SizedTransferEncoding::Plain(size) => SizedTransferEncoding::Plain(*size),
240
241            SizedTransferEncoding::Encoded(encodings) => {
242                SizedTransferEncoding::Encoded(encodings.to_vec())
243            }
244        }
245    }
246}
247
248impl fmt::Debug for SizedTransferEncoding {
249    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250        match self {
251            SizedTransferEncoding::Plain(size) => {
252                write!(f, "Plain {}", size)
253            }
254
255            SizedTransferEncoding::Encoded(encodings) => {
256                write!(f, "Encoded with {:?}", encodings)
257            }
258        }
259    }
260}
261
262pub enum ChunkedTransferEncoding {
263    Plain,
264    Encoded(Vec<Encoding>),
265}
266
267impl Clone for ChunkedTransferEncoding {
268    fn clone(&self) -> ChunkedTransferEncoding {
269        match self {
270            ChunkedTransferEncoding::Plain => ChunkedTransferEncoding::Plain,
271
272            ChunkedTransferEncoding::Encoded(encodings) => {
273                ChunkedTransferEncoding::Encoded(encodings.to_vec())
274            }
275        }
276    }
277}
278
279impl fmt::Debug for ChunkedTransferEncoding {
280    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
281        match self {
282            ChunkedTransferEncoding::Plain => {
283                write!(f, "Plain")
284            }
285
286            ChunkedTransferEncoding::Encoded(encodings) => {
287                write!(f, "Encoded with {:?}", encodings)
288            }
289        }
290    }
291}
292
293pub enum TransferMethod {
294    Sized(SizedTransferEncoding),
295    Chunked(ChunkedTransferEncoding),
296    Unbounded,
297}
298
299impl Clone for TransferMethod {
300    fn clone(&self) -> TransferMethod {
301        match self {
302            TransferMethod::Sized(encoding) => TransferMethod::Sized(encoding.clone()),
303
304            TransferMethod::Chunked(encoding) => TransferMethod::Chunked(encoding.clone()),
305
306            TransferMethod::Unbounded => TransferMethod::Unbounded,
307        }
308    }
309}
310
311impl fmt::Debug for TransferMethod {
312    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
313        match self {
314            TransferMethod::Sized(encoding) => {
315                write!(f, "Sized {:?}", encoding)
316            }
317
318            TransferMethod::Chunked(encoding) => {
319                write!(f, "Chunked {:?}", encoding)
320            }
321
322            TransferMethod::Unbounded => {
323                write!(f, "Unbounded")
324            }
325        }
326    }
327}
328
329pub enum StreamState {
330    DecodeHeader,
331    DecodeBody(
332        TransferMethod,
333        PollSender<io::Result<Vec<u8>>>,
334        usize,
335        Option<io::Result<Vec<u8>>>,
336        bool,
337        bool,
338    ),
339}
340
341pub enum ResponseOrAgain {
342    Response(Response),
343    Again(bool), // Again(transaction_completed)
344}
345
346pub struct ResponseStream<'a> {
347    state: StreamState,
348    buf: ReadBuf<'a>,
349    consumed: usize,
350    p: *mut u8,
351    rh: ReadHalf<ClientStream>,
352    pub method: &'static [u8],
353}
354
355impl ResponseStream<'_> {
356    pub fn new(rh: ReadHalf<ClientStream>) -> ResponseStream<'static> {
357        let buf;
358        let p;
359
360        unsafe {
361            p = libc::calloc(1, 4 * 1024) as *mut u8;
362            let s = std::slice::from_raw_parts_mut(p, 4 * 1024);
363            buf = ReadBuf::new(&mut *s);
364        }
365
366        ResponseStream {
367            state: StreamState::DecodeHeader,
368            buf,
369            consumed: 0,
370            p,
371            rh,
372            method: &[],
373        }
374    }
375
376    pub fn setup_body_reading(
377        &mut self,
378        req_method: &[u8],
379        r: &Response,
380    ) -> io::Result<Option<mpsc::Receiver<io::Result<Vec<u8>>>>> {
381        match r.get_response_transfer_method(req_method) {
382            Ok(Some(ref method)) => {
383                platform_log(LOG_TAG, format!("setting up transfer {:?}", method));
384
385                let (data_tx, data_rx) = mpsc::channel::<io::Result<Vec<u8>>>(8);
386
387                match method {
388                    TransferMethod::Sized(encoding) => match encoding {
389                        SizedTransferEncoding::Plain(size) => {
390                            if *size == 0 {
391                                platform_log(LOG_TAG, format!("no data transfer needed"));
392                                return Ok(None);
393                            } else {
394                                self.state = StreamState::DecodeBody(
395                                    method.clone(),
396                                    PollSender::new(data_tx),
397                                    0,
398                                    None,
399                                    false,
400                                    false,
401                                );
402                            }
403                        }
404
405                        SizedTransferEncoding::Encoded(encodings) => {
406                            let (codec_tx, codec_rx) = mpsc::channel::<io::Result<Vec<u8>>>(8);
407                            let encodings = encodings.to_vec();
408
409                            tokio::spawn(async move {
410                                let mut decompressor = Decompressor::new(&encodings, codec_rx);
411                                let reader = decompressor.reader();
412                                loop {
413                                    let mut buf = vec![0; 4 * 1024];
414                                    match reader.read(&mut buf).await {
415                                        Ok(size) => {
416                                            if size == 0 {
417                                                break;
418                                            } else {
419                                                buf.truncate(size);
420
421                                                match data_tx.send(Ok(buf)).await {
422                                                    Ok(_) => {}
423                                                    Err(_) => {
424                                                        break;
425                                                    }
426                                                }
427                                            }
428                                        }
429
430                                        Err(e) => {
431                                            match data_tx.send(Err(e)).await {
432                                                Ok(_) => {}
433                                                Err(_) => {}
434                                            }
435                                            break;
436                                        }
437                                    }
438                                }
439                            });
440
441                            self.state = StreamState::DecodeBody(
442                                method.clone(),
443                                PollSender::new(codec_tx),
444                                0,
445                                None,
446                                false,
447                                false,
448                            );
449                        }
450                    },
451
452                    TransferMethod::Chunked(encoding) => match encoding {
453                        ChunkedTransferEncoding::Plain => {
454                            self.state = StreamState::DecodeBody(
455                                method.clone(),
456                                PollSender::new(data_tx),
457                                0,
458                                None,
459                                false,
460                                false,
461                            );
462                        }
463
464                        ChunkedTransferEncoding::Encoded(encodings) => {
465                            let (codec_tx, codec_rx) = mpsc::channel::<io::Result<Vec<u8>>>(1);
466                            let encodings = encodings.to_vec();
467
468                            tokio::spawn(async move {
469                                let mut decompressor = Decompressor::new(&encodings, codec_rx);
470                                let reader = decompressor.reader();
471                                loop {
472                                    let mut buf = vec![0; 4 * 1024];
473                                    match reader.read(&mut buf).await {
474                                        Ok(size) => {
475                                            if size == 0 {
476                                                break;
477                                            } else {
478                                                buf.truncate(size);
479                                                match data_tx.send(Ok(buf)).await {
480                                                    Ok(_) => {}
481                                                    Err(_) => {
482                                                        break;
483                                                    }
484                                                }
485                                            }
486                                        }
487
488                                        Err(e) => {
489                                            match data_tx.send(Err(e)).await {
490                                                Ok(_) => {}
491                                                Err(_) => {}
492                                            }
493                                            break;
494                                        }
495                                    }
496                                }
497                            });
498
499                            self.state = StreamState::DecodeBody(
500                                method.clone(),
501                                PollSender::new(codec_tx),
502                                0,
503                                None,
504                                false,
505                                false,
506                            );
507                        }
508                    },
509
510                    TransferMethod::Unbounded => {
511                        self.state = StreamState::DecodeBody(
512                            method.clone(),
513                            PollSender::new(data_tx),
514                            0,
515                            None,
516                            false,
517                            false,
518                        );
519                    }
520                }
521
522                Ok(Some(data_rx))
523            }
524
525            Ok(None) => {
526                platform_log(LOG_TAG, format!("no data transfer found"));
527                Ok(None)
528            }
529
530            Err(e) => {
531                platform_log(
532                    LOG_TAG,
533                    format!("get_response_transfer_method failed with error {:?}", e),
534                );
535                Err(io::Error::from(io::ErrorKind::BrokenPipe))
536            }
537        }
538    }
539}
540
541impl futures::Stream for ResponseStream<'_> {
542    type Item = ResponseOrAgain;
543    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
544        let stream = self.get_mut();
545        match stream.state {
546            StreamState::DecodeHeader => {
547                platform_log(LOG_TAG, "decoding header");
548
549                let mut decoder =
550                    HeaderPartDecoder::new(&mut stream.buf, &mut stream.consumed, &mut stream.rh);
551                match Pin::new(&mut decoder).poll(cx) {
552                    Poll::Ready(Ok(status)) => match status {
553                        HeaderPartDecodeStatus::Success(r) => {
554                            Poll::Ready(Some(ResponseOrAgain::Response(r)))
555                        }
556
557                        HeaderPartDecodeStatus::Again => {
558                            Poll::Ready(Some(ResponseOrAgain::Again(false)))
559                        }
560
561                        HeaderPartDecodeStatus::BufferTooSmall => {
562                            if stream.consumed > 0 {
563                                let filled = stream.buf.filled_mut();
564                                let len = filled.len();
565
566                                for i in stream.consumed..len {
567                                    filled[i - stream.consumed] = filled[i];
568                                }
569
570                                stream.buf.set_filled(len - stream.consumed);
571                                stream.consumed = 0;
572
573                                Poll::Ready(Some(ResponseOrAgain::Again(false)))
574                            } else {
575                                let capacity = stream.buf.capacity() * 2;
576                                if capacity > 4 * 1024 * 1024 {
577                                    platform_log(LOG_TAG, "buffer too large");
578                                    Poll::Ready(None)
579                                } else {
580                                    let filled = stream.buf.filled().to_vec();
581
582                                    unsafe {
583                                        let p = libc::calloc(1, capacity) as *mut u8;
584                                        let s = std::slice::from_raw_parts_mut(p, capacity);
585                                        stream.buf = ReadBuf::new(&mut *s);
586                                        libc::free(stream.p as *mut c_void);
587                                        stream.p = p;
588                                    }
589
590                                    stream.buf.put_slice(&filled);
591
592                                    Poll::Ready(Some(ResponseOrAgain::Again(false)))
593                                }
594                            }
595                        }
596
597                        HeaderPartDecodeStatus::EOF => Poll::Ready(None),
598                    },
599
600                    Poll::Ready(Err(e)) => {
601                        platform_log(LOG_TAG, format!("encountering error {:?}", e));
602                        Poll::Ready(None)
603                    }
604
605                    Poll::Pending => Poll::Pending,
606                }
607            }
608
609            StreamState::DecodeBody(
610                ref method,
611                ref mut data_tx,
612                ref mut sent,
613                ref mut pending_event,
614                ref mut is_last_data_chunk,
615                ref mut is_eof,
616            ) => {
617                match pending_event.take() {
618                    Some(event) => {
619                        platform_log(LOG_TAG, "decode result waiting to be processed");
620
621                        match data_tx.poll_reserve(cx) {
622                            Poll::Ready(Ok(())) => match event {
623                                Ok(ref data) => {
624                                    platform_log(
625                                        LOG_TAG,
626                                        format!(
627                                            "transfer content body: {}",
628                                            String::from_utf8_lossy(&data)
629                                        ),
630                                    );
631
632                                    let len = data.len();
633
634                                    match data_tx.send_item(event) {
635                                        Ok(()) => {
636                                            platform_log(LOG_TAG, "content body transfered");
637
638                                            *sent = *sent + len;
639
640                                            if *is_last_data_chunk {
641                                                if *is_eof {
642                                                    Poll::Ready(None)
643                                                } else {
644                                                    stream.state = StreamState::DecodeHeader;
645                                                    Poll::Ready(Some(ResponseOrAgain::Again(true)))
646                                                }
647                                            } else {
648                                                Poll::Ready(Some(ResponseOrAgain::Again(false)))
649                                            }
650                                        }
651
652                                        Err(e) => {
653                                            platform_log(
654                                                LOG_TAG,
655                                                format!("content body cannot be transfered due to pipe error: {:?}", e),
656                                            );
657
658                                            // treat it like we've successfully send the response data
659                                            *sent = *sent + len;
660
661                                            if *is_last_data_chunk {
662                                                if *is_eof {
663                                                    Poll::Ready(None)
664                                                } else {
665                                                    stream.state = StreamState::DecodeHeader;
666                                                    Poll::Ready(Some(ResponseOrAgain::Again(true)))
667                                                }
668                                            } else {
669                                                Poll::Ready(Some(ResponseOrAgain::Again(false)))
670                                            }
671                                        }
672                                    }
673                                }
674
675                                Err(ref e) => {
676                                    platform_log(LOG_TAG, format!("transfer error {:?}", e));
677
678                                    match data_tx.send_item(event) {
679                                        Ok(()) => {
680                                            platform_log(LOG_TAG, "error transfered");
681
682                                            Poll::Ready(None)
683                                        }
684
685                                        Err(e) => {
686                                            platform_log(LOG_TAG, format!("error cannot be transfered due to pipe error: {:?}", e));
687
688                                            Poll::Ready(None)
689                                        }
690                                    }
691                                }
692                            },
693
694                            Poll::Ready(Err(e)) => {
695                                platform_log(LOG_TAG, format!("channel reserve failed: {:?}", e));
696
697                                match event {
698                                    Ok(ref data) => {
699                                        platform_log(
700                                            LOG_TAG,
701                                            format!(
702                                                "content body: {:?} cannot be transfered due to receiving end closed",
703                                                String::from_utf8_lossy(&data)
704                                            ),
705                                        );
706
707                                        let len = data.len();
708
709                                        // treat it like we've successfully send the response data
710                                        *sent = *sent + len;
711
712                                        if *is_last_data_chunk {
713                                            if *is_eof {
714                                                Poll::Ready(None)
715                                            } else {
716                                                stream.state = StreamState::DecodeHeader;
717                                                Poll::Ready(Some(ResponseOrAgain::Again(true)))
718                                            }
719                                        } else {
720                                            Poll::Ready(Some(ResponseOrAgain::Again(false)))
721                                        }
722                                    }
723
724                                    Err(e) => {
725                                        platform_log(LOG_TAG, format!("error {:?} cannot be transfered due to receiving end closed", e));
726
727                                        Poll::Ready(None)
728                                    }
729                                }
730                            }
731
732                            Poll::Pending => {
733                                pending_event.replace(event);
734                                Poll::Pending
735                            }
736                        }
737                    }
738
739                    None => {
740                        if *is_eof {
741                            return Poll::Ready(None);
742                        }
743
744                        platform_log(
745                            LOG_TAG,
746                            format!("decoding body with transfer method {:?}", method),
747                        );
748
749                        if stream.buf.filled().len() == stream.consumed && stream.consumed != 0 {
750                            platform_log(LOG_TAG, "making space for poll_read");
751                            stream.buf.clear();
752                            stream.consumed = 0;
753                        }
754
755                        match method {
756                            TransferMethod::Sized(encoding) => {
757                                let pending_data = &stream.buf.filled()[stream.consumed..];
758                                if pending_data.len() > 0 {
759                                    match encoding {
760                                        SizedTransferEncoding::Plain(size) => {
761                                            let remains = size - *sent;
762
763                                            if pending_data.len() >= remains {
764                                                let data = pending_data[..remains].to_vec();
765
766                                                stream.consumed += data.len();
767
768                                                pending_event.replace(Ok(data));
769
770                                                *is_last_data_chunk = true;
771                                            } else {
772                                                let data = pending_data.to_vec();
773
774                                                stream.consumed += data.len();
775
776                                                pending_event.replace(Ok(data));
777
778                                                *is_last_data_chunk = false;
779                                            }
780
781                                            Poll::Ready(Some(ResponseOrAgain::Again(false)))
782                                        }
783
784                                        SizedTransferEncoding::Encoded(_) => {
785                                            let data = pending_data.to_vec();
786
787                                            stream.consumed += data.len();
788
789                                            pending_event.replace(Ok(data));
790
791                                            *is_last_data_chunk = false;
792
793                                            Poll::Ready(Some(ResponseOrAgain::Again(false)))
794                                        }
795                                    }
796                                } else {
797                                    let before_read = stream.buf.filled().len();
798
799                                    match Pin::new(&mut stream.rh).poll_read(cx, &mut stream.buf) {
800                                        Poll::Ready(Ok(())) => {
801                                            platform_log(
802                                                LOG_TAG,
803                                                format!(
804                                                    "poll_read success with new data range {}-{}",
805                                                    stream.consumed,
806                                                    stream.buf.filled().len()
807                                                ),
808                                            );
809
810                                            let after_read = stream.buf.filled().len();
811
812                                            if before_read == after_read {
813                                                platform_log(LOG_TAG, "no more data");
814                                                *pending_event = Some(Err(io::Error::new(
815                                                    io::ErrorKind::UnexpectedEof,
816                                                    "stream closed before reading last chunk",
817                                                )));
818                                                *is_eof = true;
819                                            }
820
821                                            Poll::Ready(Some(ResponseOrAgain::Again(false)))
822                                        }
823
824                                        Poll::Ready(Err(e)) => {
825                                            pending_event.replace(Err(e));
826
827                                            Poll::Ready(Some(ResponseOrAgain::Again(false)))
828                                        }
829
830                                        Poll::Pending => Poll::Pending,
831                                    }
832                                }
833                            }
834
835                            TransferMethod::Chunked(_) => {
836                                let mut decoder = ChunkDecoder::new(
837                                    &mut stream.buf,
838                                    &mut stream.consumed,
839                                    &mut stream.rh,
840                                );
841
842                                match Pin::new(&mut decoder).poll(cx) {
843                                    Poll::Ready(Ok(result)) => match result {
844                                        ChunkDecodeResult::Part(data) => {
845                                            pending_event.replace(Ok(data));
846
847                                            *is_last_data_chunk = false;
848
849                                            Poll::Ready(Some(ResponseOrAgain::Again(false)))
850                                        }
851
852                                        ChunkDecodeResult::Again => {
853                                            Poll::Ready(Some(ResponseOrAgain::Again(false)))
854                                        }
855
856                                        ChunkDecodeResult::BufferTooSmall => {
857                                            if stream.consumed > 0 {
858                                                let filled = stream.buf.filled_mut();
859                                                let len = filled.len();
860
861                                                for i in stream.consumed..len {
862                                                    filled[i - stream.consumed] = filled[i];
863                                                }
864
865                                                stream.buf.set_filled(len - stream.consumed);
866                                                stream.consumed = 0;
867
868                                                Poll::Ready(Some(ResponseOrAgain::Again(false)))
869                                            } else {
870                                                let capacity = stream.buf.capacity() * 2;
871                                                if capacity > 4 * 1024 * 1024 {
872                                                    platform_log(LOG_TAG, "buffer too large");
873                                                    Poll::Ready(None)
874                                                } else {
875                                                    let filled = stream.buf.filled().to_vec();
876
877                                                    unsafe {
878                                                        let p =
879                                                            libc::calloc(1, capacity) as *mut u8;
880                                                        let s = std::slice::from_raw_parts_mut(
881                                                            p, capacity,
882                                                        );
883                                                        stream.buf = ReadBuf::new(&mut *s);
884                                                        libc::free(stream.p as *mut c_void);
885                                                        stream.p = p;
886                                                    }
887
888                                                    stream.buf.put_slice(&filled);
889
890                                                    Poll::Ready(Some(ResponseOrAgain::Again(false)))
891                                                }
892                                            }
893                                        }
894
895                                        ChunkDecodeResult::EOF => {
896                                            platform_log(LOG_TAG, "on chunk decode EOF");
897                                            Poll::Ready(None)
898                                        }
899                                    },
900
901                                    Poll::Ready(Err(e)) => {
902                                        platform_log(
903                                            LOG_TAG,
904                                            format!("on chunk decode error {:?}", e),
905                                        );
906
907                                        match e {
908                                            ErrorKind::Io(e) => {
909                                                pending_event.replace(Err(e));
910                                            }
911
912                                            ErrorKind::Parse => {
913                                                pending_event.replace(Err(io::Error::from(
914                                                    io::ErrorKind::BrokenPipe,
915                                                )));
916                                            }
917                                        }
918
919                                        Poll::Ready(Some(ResponseOrAgain::Again(false)))
920                                    }
921
922                                    Poll::Pending => Poll::Pending,
923                                }
924                            }
925
926                            TransferMethod::Unbounded => {
927                                let pending_data = &stream.buf.filled()[stream.consumed..];
928                                if pending_data.len() > 0 {
929                                    let data = pending_data.to_vec();
930
931                                    stream.consumed += data.len();
932
933                                    pending_event.replace(Ok(data));
934
935                                    Poll::Ready(Some(ResponseOrAgain::Again(false)))
936                                } else {
937                                    match Pin::new(&mut stream.rh).poll_read(cx, &mut stream.buf) {
938                                        Poll::Ready(Ok(())) => {
939                                            platform_log(
940                                                LOG_TAG,
941                                                format!(
942                                                    "poll_read success with new data range {}-{}",
943                                                    stream.consumed,
944                                                    stream.buf.filled().len()
945                                                ),
946                                            );
947
948                                            let filled = &stream.buf.filled()[stream.consumed..];
949                                            if filled.len() > 0 {
950                                                let data = filled.to_vec();
951
952                                                stream.consumed += data.len();
953
954                                                pending_event.replace(Ok(data));
955
956                                                *is_last_data_chunk = false;
957
958                                                Poll::Ready(Some(ResponseOrAgain::Again(false)))
959                                            } else {
960                                                platform_log(LOG_TAG, "no more data");
961
962                                                *is_last_data_chunk = true;
963                                                *is_eof = true;
964
965                                                Poll::Ready(Some(ResponseOrAgain::Again(false)))
966                                            }
967                                        }
968
969                                        Poll::Ready(Err(e)) => {
970                                            platform_log(LOG_TAG, format!("on error {:?}", e));
971
972                                            pending_event.replace(Err(e));
973
974                                            Poll::Ready(None)
975                                        }
976
977                                        Poll::Pending => Poll::Pending,
978                                    }
979                                }
980                            }
981                        }
982                    }
983                }
984            }
985        }
986    }
987}
988
989unsafe impl Send for ResponseStream<'_> {}