1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
//! Http utilities for the [`bevy_defer`] crate, based on the [`hyper`] crate.
//! 
//! # Runtime
//! 
//! * The executor is the `futures` single threaded `LocalExecutor`.
//! * `async_io` is used as the reactor.
//! 
//! # Features
//! 
//! - [x] Http client.
//! - [ ] Https client.
//! - [ ] Server.
//! - [ ] WASM support.

use std::{future::Future, net::TcpStream};
use async_io::Async;
use bevy_defer::spawn;
pub use smol_hyper::rt::FuturesIo;

/// [`hyper`] executor for [`bevy_defer`].
pub struct BevyDeferExecutor;

use hyper::{body::Body, header::HOST, Request};
use http_body_util::BodyExt;

use hyper::client::conn::http1::handshake;

pub trait HyperHttpClientExt {
    fn http_get(&self, uri: &str) -> impl Future<Output = Result<Vec<u8>, HttpError>> + Send;
    fn http_request<T: Body + 'static>(&self, request: hyper::Request<T>) -> impl Future<Output = Result<Vec<u8>, HttpError>> + Send 
        where T: Send, T::Data: Send, T::Error: std::error::Error + Send + Sync;
}

impl<F> hyper::rt::Executor<F> for BevyDeferExecutor where
        F: Future + Send + 'static,
        F::Output: Send + 'static,{
    fn execute(&self, fut: F) {
        bevy_defer::spawn_and_forget(fut);
    }
}

#[derive(Debug, thiserror::Error)]
pub enum HttpError {
    #[error("{0}")]
    IoError(#[from] std::io::Error),
    #[error("{0}")]
    HyperError(#[from] hyper::Error),
    #[error("{0}")]
    HttpError(#[from] hyper::http::Error),
    #[error("{0}")]
    InvalidUri(#[from] hyper::http::uri::InvalidUri),
}

impl HyperHttpClientExt for bevy_defer::AsyncWorldMut {

    async fn http_get(&self, uri: &str) -> Result<Vec<u8>, HttpError> {
        let uri = uri.parse::<hyper::Uri>()?;
        let host = uri.host().expect("uri has no host");
        let port = uri.port_u16().unwrap_or(80);
        let address = format!("{}:{}", host, port);
        let stream = Async::<TcpStream>::new(TcpStream::connect(address)?)?;
            let auth = uri.authority().cloned();
            let (mut sender, conn) = handshake::<_, String>(FuturesIo::new(stream)).await.unwrap();
            let _conn = spawn(async move {
                if let Err(err) = conn.await {
                    println!("Connection failed: {:?}", err);
                }
        });
        let req = if let Some(auth) = auth {
            Request::builder()
                .uri(uri)
                .header(HOST, auth.as_str())
                .body(String::new())?
        } else {
            Request::builder()
                .uri(uri)
                .body(String::new())?
        };
        
        let mut resp = sender.send_request(req).await?;
        let mut buffer = Vec::new();
        while let Some(next) = resp.frame().await {
            let frame = next?;
            if let Some(chunk) = frame.data_ref() {
                buffer.extend(chunk);
            }
        }
        Ok(buffer)
    }

    async fn http_request<T: Body + 'static>(&self, request: hyper::Request<T>) -> Result<Vec<u8>, HttpError>
        where T: Send, T::Data: Send, T::Error: std::error::Error + Send + Sync
    {
        let host = request.uri().host().expect("uri has no host");
        let port = request.uri().port_u16().unwrap_or(80);
        let address = format!("{}:{}", host, port);
        let stream = Async::<TcpStream>::new(TcpStream::connect(address)?)?;
        let (mut sender, conn) = handshake::<_, T>(FuturesIo::new(stream)).await?;
        let _conn = spawn(async move {
            if let Err(err) = conn.await {
                println!("Connection failed: {:?}", err);
            }
        });
        let mut resp = sender.send_request(request).await?;
        let mut buffer = Vec::new();
        while let Some(next) = resp.frame().await {
            let frame = next?;
            if let Some(chunk) = frame.data_ref() {
                buffer.extend(chunk);
            }
        }
        Ok(buffer)
    }
}

#[cfg(test)]
mod test {
    use std::sync::atomic::AtomicBool;

    use bevy::{app::App, MinimalPlugins};
    use bevy_defer::{world, AsyncExtension, DefaultAsyncPlugin};

    use crate::HyperHttpClientExt;

    #[test]
    fn test() {
        static LOCK: AtomicBool = AtomicBool::new(false);
        let mut app = App::new();
        app.add_plugins(MinimalPlugins);
        app.add_plugins(DefaultAsyncPlugin);
        app.spawn_task(async {
            world().http_get("http://httpbin.org/ip").await.unwrap();
            LOCK.store(true, std::sync::atomic::Ordering::Relaxed);
            world().quit().await;
            Ok(())
        });
        app.run();
        assert!(LOCK.load(std::sync::atomic::Ordering::Relaxed))
    }
}