1extern crate futures;
16extern crate httparse;
17extern crate tokio;
18extern crate tokio_util;
19
20use std::fmt;
21use std::io;
22use std::pin::Pin;
23use std::task::{Context, Poll};
24
25use futures::future::Future;
26use futures::AsyncReadExt;
27
28use httparse::Response as HttpResponse;
29
30use libc::c_void;
31use tokio::io::{AsyncRead, ReadBuf, ReadHalf};
32use tokio::sync::mpsc;
33
34use tokio_util::sync::PollSender;
35
36use crate::ffi::log::platform_log;
37
38use crate::internet::header::Header;
39use crate::internet::syntax;
40use crate::internet::AsHeaderField;
41
42use crate::io::network::stream::ClientStream;
43
44use crate::util::raw_string::{StrEq, ToInt};
45
46use super::decode::{
47 ChunkDecodeResult, ChunkDecoder, ErrorKind, HeaderPartDecodeStatus, HeaderPartDecoder, Result,
48};
49
50use super::decompress::Decompressor;
51
52const LOG_TAG: &str = "http_client_decode";
53
54pub struct Response {
55 pub status_code: u16,
56 pub reason_phrase: Vec<u8>,
57 pub headers: Vec<Header>,
58}
59
60impl Response {
61 pub fn from(resp: &HttpResponse) -> Option<Response> {
62 if let (Some(code), Some(reason)) = (resp.code, resp.reason) {
63 platform_log(LOG_TAG, format!("{} {} HTTP/1.1", code, reason));
64 let mut headers = Vec::new();
65
66 for h in &*resp.headers {
67 platform_log(
68 LOG_TAG,
69 format!("{}: {}", h.name, String::from_utf8_lossy(h.value)),
70 );
71 headers.push(Header::new(String::from(h.name), h.value.to_vec()));
72 }
73
74 return Some(Response {
75 status_code: code,
76 reason_phrase: reason.as_bytes().to_vec(),
77 headers,
78 });
79 }
80
81 None
82 }
83
84 pub fn get_response_transfer_method(
85 &self,
86 req_method: &[u8],
87 ) -> Result<Option<TransferMethod>> {
88 if req_method == b"HEAD"
89 || self.status_code < 200
90 || self.status_code == 204
91 || self.status_code == 304
92 {
93 return Ok(None);
94 }
95
96 if req_method == b"CONNECT" && self.status_code >= 200 && self.status_code < 300 {
97 return Ok(None);
98 }
99
100 let mut method = TransferMethod::Unbounded;
101
102 for header in &self.headers {
103 if header.get_name().equals_bytes(b"Content-Length", true) {
104 if let Ok(i) = header.get_value().to_int() {
105 match method {
106 TransferMethod::Sized(_) | TransferMethod::Chunked(_) => {
107 return Err(ErrorKind::Parse);
108 }
109 TransferMethod::Unbounded => {
110 method = TransferMethod::Sized(SizedTransferEncoding::Plain(i));
111 }
112 }
113 }
114 } else if header.get_name().equals_bytes(b"Transfer-Encoding", true) {
115 let header_field = header.get_value().as_header_field();
116 let mut iter = header_field.value.split(|c| *c == b',');
117 while let Some(value) = iter.next() {
118 let value = syntax::trim(value);
119 if value.equals_bytes(b"chunked", true) {
120 match &mut method {
121 TransferMethod::Sized(encoding) => match encoding {
122 SizedTransferEncoding::Plain(_) => {
123 return Err(ErrorKind::Parse);
124 }
125
126 SizedTransferEncoding::Encoded(ref mut encodings) => {
127 let mut v = Vec::new();
128 v.append(encodings);
129 method = TransferMethod::Chunked(
130 ChunkedTransferEncoding::Encoded(v),
131 );
132 }
133 },
134
135 TransferMethod::Chunked(_) => {
136 return Err(ErrorKind::Parse);
137 }
138
139 TransferMethod::Unbounded => {
140 method = TransferMethod::Chunked(ChunkedTransferEncoding::Plain);
141 }
142 }
143 } else if value.equals_bytes(b"br", true)
144 || value.equals_bytes(b"compress", true)
145 || value.equals_bytes(b"deflate", true)
146 || value.equals_bytes(b"gzip", true)
147 {
148 match &mut method {
149 TransferMethod::Sized(encoding) => match encoding {
150 SizedTransferEncoding::Plain(_) => {
151 return Err(ErrorKind::Parse);
152 }
153
154 SizedTransferEncoding::Encoded(ref mut encodings) => {
155 if value.equals_bytes(b"compress", true) {
156 encodings.push(Encoding::Compress);
157 } else if value.equals_bytes(b"deflate", true) {
158 encodings.push(Encoding::Deflate);
159 } else if value.equals_bytes(b"gzip", true) {
160 encodings.push(Encoding::Gzip);
161 } else {
162 panic!("Impossible Condition");
163 }
164 }
165 },
166
167 TransferMethod::Chunked(_) => {
168 return Err(ErrorKind::Parse);
169 }
170
171 TransferMethod::Unbounded => {
172 let mut v = Vec::new();
173 if value.equals_bytes(b"compress", true) {
174 v.push(Encoding::Compress);
175 } else if value.equals_bytes(b"deflate", true) {
176 v.push(Encoding::Deflate);
177 } else if value.equals_bytes(b"gzip", true) {
178 v.push(Encoding::Gzip);
179 } else {
180 panic!("Impossible Condition");
181 }
182 method = TransferMethod::Sized(SizedTransferEncoding::Encoded(v));
183 }
184 }
185 }
186 }
187 }
188 }
189
190 Ok(Some(method))
191 }
192}
193
194pub enum Encoding {
195 Brotli,
196 Compress,
197 Deflate,
198 Gzip,
199}
200
201impl Copy for Encoding {}
202
203impl Clone for Encoding {
204 fn clone(&self) -> Encoding {
205 *self
206 }
207}
208
209impl fmt::Debug for Encoding {
210 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211 match self {
212 Encoding::Brotli => {
213 write!(f, "Brotli")
214 }
215
216 Encoding::Compress => {
217 write!(f, "Compress")
218 }
219
220 Encoding::Deflate => {
221 write!(f, "Deflate")
222 }
223
224 Encoding::Gzip => {
225 write!(f, "Gzip")
226 }
227 }
228 }
229}
230
231pub enum SizedTransferEncoding {
232 Plain(usize),
233 Encoded(Vec<Encoding>),
234}
235
236impl Clone for SizedTransferEncoding {
237 fn clone(&self) -> SizedTransferEncoding {
238 match self {
239 SizedTransferEncoding::Plain(size) => SizedTransferEncoding::Plain(*size),
240
241 SizedTransferEncoding::Encoded(encodings) => {
242 SizedTransferEncoding::Encoded(encodings.to_vec())
243 }
244 }
245 }
246}
247
248impl fmt::Debug for SizedTransferEncoding {
249 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250 match self {
251 SizedTransferEncoding::Plain(size) => {
252 write!(f, "Plain {}", size)
253 }
254
255 SizedTransferEncoding::Encoded(encodings) => {
256 write!(f, "Encoded with {:?}", encodings)
257 }
258 }
259 }
260}
261
262pub enum ChunkedTransferEncoding {
263 Plain,
264 Encoded(Vec<Encoding>),
265}
266
267impl Clone for ChunkedTransferEncoding {
268 fn clone(&self) -> ChunkedTransferEncoding {
269 match self {
270 ChunkedTransferEncoding::Plain => ChunkedTransferEncoding::Plain,
271
272 ChunkedTransferEncoding::Encoded(encodings) => {
273 ChunkedTransferEncoding::Encoded(encodings.to_vec())
274 }
275 }
276 }
277}
278
279impl fmt::Debug for ChunkedTransferEncoding {
280 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
281 match self {
282 ChunkedTransferEncoding::Plain => {
283 write!(f, "Plain")
284 }
285
286 ChunkedTransferEncoding::Encoded(encodings) => {
287 write!(f, "Encoded with {:?}", encodings)
288 }
289 }
290 }
291}
292
293pub enum TransferMethod {
294 Sized(SizedTransferEncoding),
295 Chunked(ChunkedTransferEncoding),
296 Unbounded,
297}
298
299impl Clone for TransferMethod {
300 fn clone(&self) -> TransferMethod {
301 match self {
302 TransferMethod::Sized(encoding) => TransferMethod::Sized(encoding.clone()),
303
304 TransferMethod::Chunked(encoding) => TransferMethod::Chunked(encoding.clone()),
305
306 TransferMethod::Unbounded => TransferMethod::Unbounded,
307 }
308 }
309}
310
311impl fmt::Debug for TransferMethod {
312 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
313 match self {
314 TransferMethod::Sized(encoding) => {
315 write!(f, "Sized {:?}", encoding)
316 }
317
318 TransferMethod::Chunked(encoding) => {
319 write!(f, "Chunked {:?}", encoding)
320 }
321
322 TransferMethod::Unbounded => {
323 write!(f, "Unbounded")
324 }
325 }
326 }
327}
328
329pub enum StreamState {
330 DecodeHeader,
331 DecodeBody(
332 TransferMethod,
333 PollSender<io::Result<Vec<u8>>>,
334 usize,
335 Option<io::Result<Vec<u8>>>,
336 bool,
337 bool,
338 ),
339}
340
341pub enum ResponseOrAgain {
342 Response(Response),
343 Again(bool), }
345
346pub struct ResponseStream<'a> {
347 state: StreamState,
348 buf: ReadBuf<'a>,
349 consumed: usize,
350 p: *mut u8,
351 rh: ReadHalf<ClientStream>,
352 pub method: &'static [u8],
353}
354
355impl ResponseStream<'_> {
356 pub fn new(rh: ReadHalf<ClientStream>) -> ResponseStream<'static> {
357 let buf;
358 let p;
359
360 unsafe {
361 p = libc::calloc(1, 4 * 1024) as *mut u8;
362 let s = std::slice::from_raw_parts_mut(p, 4 * 1024);
363 buf = ReadBuf::new(&mut *s);
364 }
365
366 ResponseStream {
367 state: StreamState::DecodeHeader,
368 buf,
369 consumed: 0,
370 p,
371 rh,
372 method: &[],
373 }
374 }
375
376 pub fn setup_body_reading(
377 &mut self,
378 req_method: &[u8],
379 r: &Response,
380 ) -> io::Result<Option<mpsc::Receiver<io::Result<Vec<u8>>>>> {
381 match r.get_response_transfer_method(req_method) {
382 Ok(Some(ref method)) => {
383 platform_log(LOG_TAG, format!("setting up transfer {:?}", method));
384
385 let (data_tx, data_rx) = mpsc::channel::<io::Result<Vec<u8>>>(8);
386
387 match method {
388 TransferMethod::Sized(encoding) => match encoding {
389 SizedTransferEncoding::Plain(size) => {
390 if *size == 0 {
391 platform_log(LOG_TAG, format!("no data transfer needed"));
392 return Ok(None);
393 } else {
394 self.state = StreamState::DecodeBody(
395 method.clone(),
396 PollSender::new(data_tx),
397 0,
398 None,
399 false,
400 false,
401 );
402 }
403 }
404
405 SizedTransferEncoding::Encoded(encodings) => {
406 let (codec_tx, codec_rx) = mpsc::channel::<io::Result<Vec<u8>>>(8);
407 let encodings = encodings.to_vec();
408
409 tokio::spawn(async move {
410 let mut decompressor = Decompressor::new(&encodings, codec_rx);
411 let reader = decompressor.reader();
412 loop {
413 let mut buf = vec![0; 4 * 1024];
414 match reader.read(&mut buf).await {
415 Ok(size) => {
416 if size == 0 {
417 break;
418 } else {
419 buf.truncate(size);
420
421 match data_tx.send(Ok(buf)).await {
422 Ok(_) => {}
423 Err(_) => {
424 break;
425 }
426 }
427 }
428 }
429
430 Err(e) => {
431 match data_tx.send(Err(e)).await {
432 Ok(_) => {}
433 Err(_) => {}
434 }
435 break;
436 }
437 }
438 }
439 });
440
441 self.state = StreamState::DecodeBody(
442 method.clone(),
443 PollSender::new(codec_tx),
444 0,
445 None,
446 false,
447 false,
448 );
449 }
450 },
451
452 TransferMethod::Chunked(encoding) => match encoding {
453 ChunkedTransferEncoding::Plain => {
454 self.state = StreamState::DecodeBody(
455 method.clone(),
456 PollSender::new(data_tx),
457 0,
458 None,
459 false,
460 false,
461 );
462 }
463
464 ChunkedTransferEncoding::Encoded(encodings) => {
465 let (codec_tx, codec_rx) = mpsc::channel::<io::Result<Vec<u8>>>(1);
466 let encodings = encodings.to_vec();
467
468 tokio::spawn(async move {
469 let mut decompressor = Decompressor::new(&encodings, codec_rx);
470 let reader = decompressor.reader();
471 loop {
472 let mut buf = vec![0; 4 * 1024];
473 match reader.read(&mut buf).await {
474 Ok(size) => {
475 if size == 0 {
476 break;
477 } else {
478 buf.truncate(size);
479 match data_tx.send(Ok(buf)).await {
480 Ok(_) => {}
481 Err(_) => {
482 break;
483 }
484 }
485 }
486 }
487
488 Err(e) => {
489 match data_tx.send(Err(e)).await {
490 Ok(_) => {}
491 Err(_) => {}
492 }
493 break;
494 }
495 }
496 }
497 });
498
499 self.state = StreamState::DecodeBody(
500 method.clone(),
501 PollSender::new(codec_tx),
502 0,
503 None,
504 false,
505 false,
506 );
507 }
508 },
509
510 TransferMethod::Unbounded => {
511 self.state = StreamState::DecodeBody(
512 method.clone(),
513 PollSender::new(data_tx),
514 0,
515 None,
516 false,
517 false,
518 );
519 }
520 }
521
522 Ok(Some(data_rx))
523 }
524
525 Ok(None) => {
526 platform_log(LOG_TAG, format!("no data transfer found"));
527 Ok(None)
528 }
529
530 Err(e) => {
531 platform_log(
532 LOG_TAG,
533 format!("get_response_transfer_method failed with error {:?}", e),
534 );
535 Err(io::Error::from(io::ErrorKind::BrokenPipe))
536 }
537 }
538 }
539}
540
541impl futures::Stream for ResponseStream<'_> {
542 type Item = ResponseOrAgain;
543 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
544 let stream = self.get_mut();
545 match stream.state {
546 StreamState::DecodeHeader => {
547 platform_log(LOG_TAG, "decoding header");
548
549 let mut decoder =
550 HeaderPartDecoder::new(&mut stream.buf, &mut stream.consumed, &mut stream.rh);
551 match Pin::new(&mut decoder).poll(cx) {
552 Poll::Ready(Ok(status)) => match status {
553 HeaderPartDecodeStatus::Success(r) => {
554 Poll::Ready(Some(ResponseOrAgain::Response(r)))
555 }
556
557 HeaderPartDecodeStatus::Again => {
558 Poll::Ready(Some(ResponseOrAgain::Again(false)))
559 }
560
561 HeaderPartDecodeStatus::BufferTooSmall => {
562 if stream.consumed > 0 {
563 let filled = stream.buf.filled_mut();
564 let len = filled.len();
565
566 for i in stream.consumed..len {
567 filled[i - stream.consumed] = filled[i];
568 }
569
570 stream.buf.set_filled(len - stream.consumed);
571 stream.consumed = 0;
572
573 Poll::Ready(Some(ResponseOrAgain::Again(false)))
574 } else {
575 let capacity = stream.buf.capacity() * 2;
576 if capacity > 4 * 1024 * 1024 {
577 platform_log(LOG_TAG, "buffer too large");
578 Poll::Ready(None)
579 } else {
580 let filled = stream.buf.filled().to_vec();
581
582 unsafe {
583 let p = libc::calloc(1, capacity) as *mut u8;
584 let s = std::slice::from_raw_parts_mut(p, capacity);
585 stream.buf = ReadBuf::new(&mut *s);
586 libc::free(stream.p as *mut c_void);
587 stream.p = p;
588 }
589
590 stream.buf.put_slice(&filled);
591
592 Poll::Ready(Some(ResponseOrAgain::Again(false)))
593 }
594 }
595 }
596
597 HeaderPartDecodeStatus::EOF => Poll::Ready(None),
598 },
599
600 Poll::Ready(Err(e)) => {
601 platform_log(LOG_TAG, format!("encountering error {:?}", e));
602 Poll::Ready(None)
603 }
604
605 Poll::Pending => Poll::Pending,
606 }
607 }
608
609 StreamState::DecodeBody(
610 ref method,
611 ref mut data_tx,
612 ref mut sent,
613 ref mut pending_event,
614 ref mut is_last_data_chunk,
615 ref mut is_eof,
616 ) => {
617 match pending_event.take() {
618 Some(event) => {
619 platform_log(LOG_TAG, "decode result waiting to be processed");
620
621 match data_tx.poll_reserve(cx) {
622 Poll::Ready(Ok(())) => match event {
623 Ok(ref data) => {
624 platform_log(
625 LOG_TAG,
626 format!(
627 "transfer content body: {}",
628 String::from_utf8_lossy(&data)
629 ),
630 );
631
632 let len = data.len();
633
634 match data_tx.send_item(event) {
635 Ok(()) => {
636 platform_log(LOG_TAG, "content body transfered");
637
638 *sent = *sent + len;
639
640 if *is_last_data_chunk {
641 if *is_eof {
642 Poll::Ready(None)
643 } else {
644 stream.state = StreamState::DecodeHeader;
645 Poll::Ready(Some(ResponseOrAgain::Again(true)))
646 }
647 } else {
648 Poll::Ready(Some(ResponseOrAgain::Again(false)))
649 }
650 }
651
652 Err(e) => {
653 platform_log(
654 LOG_TAG,
655 format!("content body cannot be transfered due to pipe error: {:?}", e),
656 );
657
658 *sent = *sent + len;
660
661 if *is_last_data_chunk {
662 if *is_eof {
663 Poll::Ready(None)
664 } else {
665 stream.state = StreamState::DecodeHeader;
666 Poll::Ready(Some(ResponseOrAgain::Again(true)))
667 }
668 } else {
669 Poll::Ready(Some(ResponseOrAgain::Again(false)))
670 }
671 }
672 }
673 }
674
675 Err(ref e) => {
676 platform_log(LOG_TAG, format!("transfer error {:?}", e));
677
678 match data_tx.send_item(event) {
679 Ok(()) => {
680 platform_log(LOG_TAG, "error transfered");
681
682 Poll::Ready(None)
683 }
684
685 Err(e) => {
686 platform_log(LOG_TAG, format!("error cannot be transfered due to pipe error: {:?}", e));
687
688 Poll::Ready(None)
689 }
690 }
691 }
692 },
693
694 Poll::Ready(Err(e)) => {
695 platform_log(LOG_TAG, format!("channel reserve failed: {:?}", e));
696
697 match event {
698 Ok(ref data) => {
699 platform_log(
700 LOG_TAG,
701 format!(
702 "content body: {:?} cannot be transfered due to receiving end closed",
703 String::from_utf8_lossy(&data)
704 ),
705 );
706
707 let len = data.len();
708
709 *sent = *sent + len;
711
712 if *is_last_data_chunk {
713 if *is_eof {
714 Poll::Ready(None)
715 } else {
716 stream.state = StreamState::DecodeHeader;
717 Poll::Ready(Some(ResponseOrAgain::Again(true)))
718 }
719 } else {
720 Poll::Ready(Some(ResponseOrAgain::Again(false)))
721 }
722 }
723
724 Err(e) => {
725 platform_log(LOG_TAG, format!("error {:?} cannot be transfered due to receiving end closed", e));
726
727 Poll::Ready(None)
728 }
729 }
730 }
731
732 Poll::Pending => {
733 pending_event.replace(event);
734 Poll::Pending
735 }
736 }
737 }
738
739 None => {
740 if *is_eof {
741 return Poll::Ready(None);
742 }
743
744 platform_log(
745 LOG_TAG,
746 format!("decoding body with transfer method {:?}", method),
747 );
748
749 if stream.buf.filled().len() == stream.consumed && stream.consumed != 0 {
750 platform_log(LOG_TAG, "making space for poll_read");
751 stream.buf.clear();
752 stream.consumed = 0;
753 }
754
755 match method {
756 TransferMethod::Sized(encoding) => {
757 let pending_data = &stream.buf.filled()[stream.consumed..];
758 if pending_data.len() > 0 {
759 match encoding {
760 SizedTransferEncoding::Plain(size) => {
761 let remains = size - *sent;
762
763 if pending_data.len() >= remains {
764 let data = pending_data[..remains].to_vec();
765
766 stream.consumed += data.len();
767
768 pending_event.replace(Ok(data));
769
770 *is_last_data_chunk = true;
771 } else {
772 let data = pending_data.to_vec();
773
774 stream.consumed += data.len();
775
776 pending_event.replace(Ok(data));
777
778 *is_last_data_chunk = false;
779 }
780
781 Poll::Ready(Some(ResponseOrAgain::Again(false)))
782 }
783
784 SizedTransferEncoding::Encoded(_) => {
785 let data = pending_data.to_vec();
786
787 stream.consumed += data.len();
788
789 pending_event.replace(Ok(data));
790
791 *is_last_data_chunk = false;
792
793 Poll::Ready(Some(ResponseOrAgain::Again(false)))
794 }
795 }
796 } else {
797 let before_read = stream.buf.filled().len();
798
799 match Pin::new(&mut stream.rh).poll_read(cx, &mut stream.buf) {
800 Poll::Ready(Ok(())) => {
801 platform_log(
802 LOG_TAG,
803 format!(
804 "poll_read success with new data range {}-{}",
805 stream.consumed,
806 stream.buf.filled().len()
807 ),
808 );
809
810 let after_read = stream.buf.filled().len();
811
812 if before_read == after_read {
813 platform_log(LOG_TAG, "no more data");
814 *pending_event = Some(Err(io::Error::new(
815 io::ErrorKind::UnexpectedEof,
816 "stream closed before reading last chunk",
817 )));
818 *is_eof = true;
819 }
820
821 Poll::Ready(Some(ResponseOrAgain::Again(false)))
822 }
823
824 Poll::Ready(Err(e)) => {
825 pending_event.replace(Err(e));
826
827 Poll::Ready(Some(ResponseOrAgain::Again(false)))
828 }
829
830 Poll::Pending => Poll::Pending,
831 }
832 }
833 }
834
835 TransferMethod::Chunked(_) => {
836 let mut decoder = ChunkDecoder::new(
837 &mut stream.buf,
838 &mut stream.consumed,
839 &mut stream.rh,
840 );
841
842 match Pin::new(&mut decoder).poll(cx) {
843 Poll::Ready(Ok(result)) => match result {
844 ChunkDecodeResult::Part(data) => {
845 pending_event.replace(Ok(data));
846
847 *is_last_data_chunk = false;
848
849 Poll::Ready(Some(ResponseOrAgain::Again(false)))
850 }
851
852 ChunkDecodeResult::Again => {
853 Poll::Ready(Some(ResponseOrAgain::Again(false)))
854 }
855
856 ChunkDecodeResult::BufferTooSmall => {
857 if stream.consumed > 0 {
858 let filled = stream.buf.filled_mut();
859 let len = filled.len();
860
861 for i in stream.consumed..len {
862 filled[i - stream.consumed] = filled[i];
863 }
864
865 stream.buf.set_filled(len - stream.consumed);
866 stream.consumed = 0;
867
868 Poll::Ready(Some(ResponseOrAgain::Again(false)))
869 } else {
870 let capacity = stream.buf.capacity() * 2;
871 if capacity > 4 * 1024 * 1024 {
872 platform_log(LOG_TAG, "buffer too large");
873 Poll::Ready(None)
874 } else {
875 let filled = stream.buf.filled().to_vec();
876
877 unsafe {
878 let p =
879 libc::calloc(1, capacity) as *mut u8;
880 let s = std::slice::from_raw_parts_mut(
881 p, capacity,
882 );
883 stream.buf = ReadBuf::new(&mut *s);
884 libc::free(stream.p as *mut c_void);
885 stream.p = p;
886 }
887
888 stream.buf.put_slice(&filled);
889
890 Poll::Ready(Some(ResponseOrAgain::Again(false)))
891 }
892 }
893 }
894
895 ChunkDecodeResult::EOF => {
896 platform_log(LOG_TAG, "on chunk decode EOF");
897 Poll::Ready(None)
898 }
899 },
900
901 Poll::Ready(Err(e)) => {
902 platform_log(
903 LOG_TAG,
904 format!("on chunk decode error {:?}", e),
905 );
906
907 match e {
908 ErrorKind::Io(e) => {
909 pending_event.replace(Err(e));
910 }
911
912 ErrorKind::Parse => {
913 pending_event.replace(Err(io::Error::from(
914 io::ErrorKind::BrokenPipe,
915 )));
916 }
917 }
918
919 Poll::Ready(Some(ResponseOrAgain::Again(false)))
920 }
921
922 Poll::Pending => Poll::Pending,
923 }
924 }
925
926 TransferMethod::Unbounded => {
927 let pending_data = &stream.buf.filled()[stream.consumed..];
928 if pending_data.len() > 0 {
929 let data = pending_data.to_vec();
930
931 stream.consumed += data.len();
932
933 pending_event.replace(Ok(data));
934
935 Poll::Ready(Some(ResponseOrAgain::Again(false)))
936 } else {
937 match Pin::new(&mut stream.rh).poll_read(cx, &mut stream.buf) {
938 Poll::Ready(Ok(())) => {
939 platform_log(
940 LOG_TAG,
941 format!(
942 "poll_read success with new data range {}-{}",
943 stream.consumed,
944 stream.buf.filled().len()
945 ),
946 );
947
948 let filled = &stream.buf.filled()[stream.consumed..];
949 if filled.len() > 0 {
950 let data = filled.to_vec();
951
952 stream.consumed += data.len();
953
954 pending_event.replace(Ok(data));
955
956 *is_last_data_chunk = false;
957
958 Poll::Ready(Some(ResponseOrAgain::Again(false)))
959 } else {
960 platform_log(LOG_TAG, "no more data");
961
962 *is_last_data_chunk = true;
963 *is_eof = true;
964
965 Poll::Ready(Some(ResponseOrAgain::Again(false)))
966 }
967 }
968
969 Poll::Ready(Err(e)) => {
970 platform_log(LOG_TAG, format!("on error {:?}", e));
971
972 pending_event.replace(Err(e));
973
974 Poll::Ready(None)
975 }
976
977 Poll::Pending => Poll::Pending,
978 }
979 }
980 }
981 }
982 }
983 }
984 }
985 }
986 }
987}
988
989unsafe impl Send for ResponseStream<'_> {}