1pub use http::{
5 uri, Extensions, HeaderMap, HeaderName, HeaderValue, Method, StatusCode, Uri, Version,
6};
7pub mod request {
9 pub use http::request::{Builder, Parts};
10}
11pub mod response {
13 pub use http::response::{Builder, Parts};
14}
15pub type Request = http::Request<crate::Bytes>;
18pub type Response = http::Response<crate::Bytes>;
21
22use crate::wit::wasi::http::outgoing_handler;
23use crate::wit::wasi::http::types::{
24 ErrorCode, Headers, IncomingBody, IncomingResponse, OutgoingBody, OutgoingRequest, Scheme,
25};
26use crate::wit::wasi::io::streams::{InputStream, OutputStream, StreamError};
27use futures::SinkExt;
28use std::cell::RefCell;
29use std::future::Future;
30use std::rc::Rc;
31use std::sync::{Arc, Mutex};
32use std::task::{Poll, Wake, Waker};
33
34const READ_SIZE: u64 = 16 * 1024;
35
36#[derive(thiserror::Error, Debug)]
38pub enum SendError {
39 #[error("i/o error: {0}")]
41 GenericIo(String),
42 #[error(transparent)]
44 StreamIo(#[from] crate::wit::wasi::io::streams::StreamError),
45 #[error(transparent)]
47 WasiHttp(#[from] crate::wit::wasi::http::types::ErrorCode),
48 #[error(transparent)]
50 ClientHttp(#[from] http::Error),
51 #[error("could not convert request: {0}")]
53 RequestConversion(String),
54 #[error("could not convert response")]
56 ResponseConversion,
57}
58
59#[cfg_attr(doctest, doc = " ````no_test")] pub fn send(request: Request) -> Result<Response, SendError> {
96 run(send_async(request))
97}
98
99impl OutgoingRequest {
100 pub fn take_body(&self) -> impl futures::Sink<Vec<u8>, Error = StreamError> {
106 outgoing_body(self.body().expect("request body was already taken"))
107 }
108}
109
110impl IncomingResponse {
111 fn take_body_stream(
117 self,
118 ) -> impl futures::Stream<Item = Result<Vec<u8>, crate::wit::wasi::io::streams::Error>> {
119 incoming_body(self.consume().expect("response body was already consumed"))
120 }
121
122 async fn into_body(self) -> Result<Vec<u8>, crate::wit::wasi::io::streams::Error> {
128 use futures::TryStreamExt;
129 let mut stream = self.take_body_stream();
130 let mut body = Vec::new();
131 while let Some(chunk) = stream.try_next().await? {
132 body.extend(chunk);
133 }
134 Ok(body)
135 }
136}
137
138static WAKERS: Mutex<Vec<(crate::wit::wasi::io::poll::Pollable, Waker)>> = Mutex::new(Vec::new());
139
140fn push_waker(pollable: crate::wit::wasi::io::poll::Pollable, waker: Waker) {
141 WAKERS
142 .lock()
143 .expect("poisoned mutex")
144 .push((pollable, waker));
145}
146
147fn run<T>(future: impl Future<Output = T>) -> T {
151 futures::pin_mut!(future);
152 struct DummyWaker;
153
154 impl Wake for DummyWaker {
155 fn wake(self: Arc<Self>) {}
156 }
157
158 let waker = Arc::new(DummyWaker).into();
159
160 loop {
161 match future
162 .as_mut()
163 .poll(&mut std::task::Context::from_waker(&waker))
164 {
165 Poll::Pending => {
166 let mut new_wakers = Vec::new();
167
168 let wakers = std::mem::take::<Vec<_>>(&mut WAKERS.lock().expect("poisoned mutex"));
169
170 assert!(!wakers.is_empty());
171
172 let pollables = wakers
173 .iter()
174 .map(|(pollable, _)| pollable)
175 .collect::<Vec<_>>();
176
177 let mut ready = vec![false; wakers.len()];
178
179 for index in crate::wit::wasi::io::poll::poll(&pollables) {
180 ready[usize::try_from(index).unwrap()] = true;
181 }
182
183 for (ready, (pollable, waker)) in ready.into_iter().zip(wakers) {
184 if ready {
185 waker.wake()
186 } else {
187 new_wakers.push((pollable, waker));
188 }
189 }
190
191 *WAKERS.lock().expect("poisoned mutex") = new_wakers;
192 }
193 Poll::Ready(result) => break result,
194 }
195 }
196}
197
198async fn send_async(request: Request) -> Result<Response, SendError> {
200 let (parts, body) = request.into_parts();
202 let (method, uri, headers) = (parts.method, parts.uri, parts.headers);
203 let is_https = if let Some(scheme) = uri.scheme() {
204 scheme == &http::uri::Scheme::HTTPS
205 } else {
206 false
207 };
208 let headers = headers
209 .iter()
210 .map(|(k, v)| (k.to_string(), v.as_bytes().to_vec()))
211 .collect::<Vec<_>>();
212 let out_req = OutgoingRequest::new(
213 Headers::from_list(&headers)
214 .map_err(|e| SendError::RequestConversion(format!("header error: {}", e)))?,
215 );
216 out_req
217 .set_method(match method {
218 http::Method::GET => &crate::wit::wasi::http::types::Method::Get,
219 http::Method::HEAD => &crate::wit::wasi::http::types::Method::Head,
220 http::Method::POST => &crate::wit::wasi::http::types::Method::Post,
221 http::Method::PUT => &crate::wit::wasi::http::types::Method::Put,
222 http::Method::DELETE => &crate::wit::wasi::http::types::Method::Delete,
223 http::Method::PATCH => &crate::wit::wasi::http::types::Method::Patch,
224 http::Method::CONNECT => &crate::wit::wasi::http::types::Method::Connect,
225 http::Method::OPTIONS => &crate::wit::wasi::http::types::Method::Options,
226 http::Method::TRACE => &crate::wit::wasi::http::types::Method::Trace,
227 _ => {
228 return Err(
229 crate::wit::wasi::http::types::ErrorCode::HttpRequestMethodInvalid.into(),
230 )
231 }
232 })
233 .map_err(|()| {
234 SendError::RequestConversion(format!("could not set method to {}", method))
235 })?;
236 out_req
237 .set_path_with_query(uri.path_and_query().map(|path| path.as_str()))
238 .map_err(|()| {
239 SendError::RequestConversion(format!(
240 "error setting path to {:?}",
241 uri.path_and_query()
242 ))
243 })?;
244 out_req
245 .set_scheme(Some(if is_https {
246 &Scheme::Https
247 } else {
248 &Scheme::Http
249 }))
250 .expect("unexpected scheme");
253 let authority = uri
254 .authority()
255 .map(|authority| authority.as_str())
256 .or(Some(if is_https { ":443" } else { ":80" }));
258 out_req.set_authority(authority).map_err(|()| {
259 SendError::RequestConversion(format!("error setting authority to {authority:?}"))
260 })?;
261
262 let (request, body_buffer) = (out_req, Some(body.to_vec()));
263
264 let response = if let Some(body_buffer) = body_buffer {
265 let mut body_sink = request.take_body();
268 let response = outgoing_request_send(request);
269 body_sink
270 .send(body_buffer)
271 .await
272 .map_err(SendError::StreamIo)?;
273 drop(body_sink);
274 response.await.map_err(SendError::WasiHttp)?
275 } else {
276 outgoing_request_send(request)
277 .await
278 .map_err(SendError::WasiHttp)?
279 };
280
281 let mut new_resp = http::response::Builder::new()
283 .status(response.status())
284 .status(response.status());
285 for (name, value) in response.headers().entries() {
286 new_resp = new_resp.header(name, value);
287 }
288 let body = bytes::Bytes::from(
289 response
290 .into_body()
291 .await
292 .map_err(|e| SendError::GenericIo(e.to_debug_string()))?,
293 );
294 Ok(new_resp.body(body)?)
295
296 }
300
301fn outgoing_body(body: OutgoingBody) -> impl futures::Sink<Vec<u8>, Error = StreamError> {
302 struct Outgoing(Option<(OutputStream, OutgoingBody)>);
303
304 impl Drop for Outgoing {
305 fn drop(&mut self) {
306 if let Some((stream, body)) = self.0.take() {
307 drop(stream);
308 _ = OutgoingBody::finish(body, None);
309 }
310 }
311 }
312
313 let stream = body.write().expect("response body should be writable");
314 let pair = Rc::new(RefCell::new(Outgoing(Some((stream, body)))));
315
316 futures::sink::unfold((), {
317 move |(), chunk: Vec<u8>| {
318 futures::future::poll_fn({
319 let mut offset = 0;
320 let mut flushing = false;
321 let pair = pair.clone();
322
323 move |context| {
324 let pair = pair.borrow();
325 let (stream, _) = &pair.0.as_ref().unwrap();
326 loop {
327 match stream.check_write() {
328 Ok(0) => {
329 push_waker(stream.subscribe(), context.waker().clone());
330 break Poll::Pending;
331 }
332 Ok(count) => {
333 if offset == chunk.len() {
334 if flushing {
335 break Poll::Ready(Ok(()));
336 } else {
337 match stream.flush() {
338 Ok(()) => flushing = true,
339 Err(StreamError::Closed) => break Poll::Ready(Ok(())),
340 Err(e) => break Poll::Ready(Err(e)),
341 }
342 }
343 } else {
344 let count =
345 usize::try_from(count).unwrap().min(chunk.len() - offset);
346
347 match stream.write(&chunk[offset..][..count]) {
348 Ok(()) => {
349 offset += count;
350 }
351 Err(e) => break Poll::Ready(Err(e)),
352 }
353 }
354 }
355 Err(StreamError::Closed) if offset == chunk.len() => {
359 break Poll::Ready(Ok(()))
360 }
361 Err(e) => break Poll::Ready(Err(e)),
362 }
363 }
364 }
365 })
366 }
367 })
368}
369
370fn incoming_body(
371 body: IncomingBody,
372) -> impl futures::Stream<Item = Result<Vec<u8>, crate::wit::wasi::io::streams::Error>> {
373 struct Incoming(Option<(InputStream, IncomingBody)>);
374
375 impl Drop for Incoming {
376 fn drop(&mut self) {
377 if let Some((stream, body)) = self.0.take() {
378 drop(stream);
379 IncomingBody::finish(body);
380 }
381 }
382 }
383
384 futures::stream::poll_fn({
385 let stream = body.stream().expect("response body should be readable");
386 let pair = Incoming(Some((stream, body)));
387
388 move |context| {
389 if let Some((stream, _)) = &pair.0 {
390 match stream.read(READ_SIZE) {
391 Ok(buffer) => {
392 if buffer.is_empty() {
393 push_waker(stream.subscribe(), context.waker().clone());
394 Poll::Pending
395 } else {
396 Poll::Ready(Some(Ok(buffer)))
397 }
398 }
399 Err(StreamError::Closed) => Poll::Ready(None),
400 Err(StreamError::LastOperationFailed(error)) => Poll::Ready(Some(Err(error))),
401 }
402 } else {
403 Poll::Ready(None)
404 }
405 }
406 })
407}
408
409fn outgoing_request_send(
411 request: OutgoingRequest,
412) -> impl Future<Output = Result<IncomingResponse, ErrorCode>> {
413 let response = outgoing_handler::handle(request, None);
414 futures::future::poll_fn({
415 move |context| match &response {
416 Ok(response) => {
417 if let Some(response) = response.get() {
418 Poll::Ready(response.unwrap())
419 } else {
420 push_waker(response.subscribe(), context.waker().clone());
421 Poll::Pending
422 }
423 }
424 Err(error) => Poll::Ready(Err(error.clone())),
425 }
426 })
427}