rustolio_utils/http/
outgoing.rs1#[cfg(target_arch = "wasm32")]
12pub use wasm::Outgoing;
13
14#[cfg(not(target_arch = "wasm32"))]
15pub use non_wasm::Outgoing;
16
17#[cfg(target_arch = "wasm32")]
18mod wasm {
19 use wasm_bindgen::JsValue;
20
21 pub struct Outgoing(JsValue);
22
23 impl Outgoing {
24 pub const fn empty() -> Self {
25 Self(JsValue::null())
26 }
27 }
28
29 impl From<JsValue> for Outgoing {
30 fn from(value: JsValue) -> Self {
31 Self(value)
32 }
33 }
34
35 impl std::ops::Deref for Outgoing {
36 type Target = JsValue;
37 fn deref(&self) -> &Self::Target {
38 &self.0
39 }
40 }
41
42 impl std::ops::DerefMut for Outgoing {
43 fn deref_mut(&mut self) -> &mut Self::Target {
44 &mut self.0
45 }
46 }
47}
48
49#[cfg(not(target_arch = "wasm32"))]
50mod non_wasm {
51 use std::{pin::Pin, task::Poll};
52
53 use crate::bytes::{Buf, Bytes};
54 use futures::Stream;
55 use hyper::body::{Body, Frame};
56
57 pub struct Outgoing(OutgoingInner);
58
59 enum OutgoingInner {
60 Bytes(Bytes),
61 Body(Box<dyn Body<Data = Bytes, Error = std::io::Error> + Send + Sync + Unpin>),
62 Stream(Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + Unpin>),
63 }
64
65 impl Outgoing {
66 pub const fn empty() -> Self {
67 Self::from_bytes(Bytes::new())
68 }
69
70 pub const fn from_bytes(bytes: Bytes) -> Self {
71 Self(OutgoingInner::Bytes(bytes))
72 }
73
74 pub fn from_body<B>(body: B) -> Self
75 where
76 B: Body<Data = Bytes, Error = std::io::Error> + Send + Sync + Unpin + 'static,
77 {
78 Self(OutgoingInner::Body(Box::new(body)))
79 }
80
81 pub fn from_stream<S>(stream: S) -> Self
82 where
83 S: Stream<Item = std::io::Result<Bytes>> + Send + Sync + Unpin + 'static,
84 {
85 Self(OutgoingInner::Stream(Box::new(stream)))
86 }
87 }
88
89 impl Body for Outgoing {
90 type Data = Bytes;
91 type Error = std::io::Error;
92
93 fn poll_frame(
94 mut self: Pin<&mut Self>,
95 cx: &mut std::task::Context<'_>,
96 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
97 match &mut self.0 {
98 OutgoingInner::Bytes(b) => {
99 if !b.has_remaining() {
100 return Poll::Ready(None);
101 }
102 let take = std::cmp::min(8_192, b.remaining());
103 let b = b.copy_to_bytes(take);
104 Poll::Ready(Some(Ok(Frame::data(b))))
105 }
106 OutgoingInner::Body(b) => Pin::new(b).poll_frame(cx),
107 OutgoingInner::Stream(s) => Pin::new(s).poll_next(cx).map_ok(Frame::data),
108 }
109 }
110
111 fn size_hint(&self) -> hyper::body::SizeHint {
112 match &self.0 {
113 OutgoingInner::Bytes(b) => hyper::body::SizeHint::with_exact(b.remaining() as u64),
114 OutgoingInner::Body(b) => b.size_hint(),
115 OutgoingInner::Stream(s) => {
116 let size_hint = s.size_hint();
117 let mut hyper_size_hint = hyper::body::SizeHint::new();
118 hyper_size_hint.set_lower(size_hint.0 as u64);
119 if let Some(upper) = size_hint.1 {
120 hyper_size_hint.set_upper(upper as u64);
121 }
122 hyper_size_hint
123 }
124 }
125 }
126
127 fn is_end_stream(&self) -> bool {
128 match &self.0 {
129 OutgoingInner::Bytes(b) => !b.has_remaining(),
130 OutgoingInner::Body(s) => s.is_end_stream(),
131 OutgoingInner::Stream(s) => s.size_hint().1.map(|u| u == 0).unwrap_or(false),
132 }
133 }
134 }
135}