1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
//! # 📡 HTTP API
//!
//! API with low-level building blocks for HTTP-from-WASM, providing a gRPC-compatible
//! transport and a basic HTTP client.
//!
//! This is useful and powerful to be able to access and communicate with local or internet services.
//!
//! ## Security note
//!
//! Currently there are no limitations on what domains and services can be accessed, or what kind of authentication is supported.
//! But this will change later as more of the Ark capability-based security gets put into place
//! and where modules have to opt-in and request access to minimal possible specific domains and URLs.
//!
//! To prepare for this, avoid connecting to arbitrary URLs or allow user input of what to connect to when possible.
//! Prefer domains and URLs the module can know statically at compile time, as it may need to declare them in its
//! manifest later to get access to it.
//!
//! ## Example usage
//!
//! ```rust,no_run
//! require_http_client_api!();
//!
//! let channel = ConnectionBuilder::for_host("http://my-grpc-server.localhost")
//!     .with_authentication()
//!     .with_user_agent("ark-hive")
//!     .build_grpc()
//!     .await?;
//! let mut client = MyProtoClient::new(channel);
//!
//! let response = client
//!     .say_hello(HelloMessage { message: "World".to_owned() }).await;
//! ```

use crate::{ffi::http_client_v0 as ffi, ErrorCode};
use bytes::Buf;
use http::{Method, Request};
use http_body::combinators::UnsyncBoxBody;
use http_body::Body;
use http_body::Empty;
use std::error::Error as StdError;
use std::future::Future;
use std::pin::Pin;
use std::string::FromUtf8Error;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
mod pollable;
use http::header::HeaderName;
use http::header::HeaderValue;
use pollable::PollableResponse;
mod error;
pub use error::Error;

#[doc(hidden)]
pub use ffi::API as FFI_API;

/// A builder for a GRPC channel.
pub struct ConnectionBuilder {
    host: String,
    auth: bool,
    user_agent: Option<String>,
}

impl ConnectionBuilder {
    /// Create a new builder for connection to the specified host.
    pub fn for_host(host: impl ToString) -> Self {
        Self {
            host: host.to_string(),
            auth: false,
            user_agent: None,
        }
    }

    /// Configure the channel to always authenticate as the logged in user.
    pub fn with_authentication(mut self) -> Self {
        self.auth = true;
        self
    }

    /// Set the user-agent header of all subsequent requests to the provided value.
    pub fn with_user_agent(mut self, user_agent: impl ToString) -> Self {
        self.user_agent = Some(user_agent.to_string());
        self
    }

    async fn build_internal(self) -> Result<ffi::ChannelHandle, Error> {
        let Self {
            host,
            auth,
            user_agent,
        } = self;

        let build_handle = ffi::channel_create(
            &host,
            user_agent.as_ref().map_or("", |s| s.as_str()),
            u32::from(auth),
        )?;

        let handle = PollableResponse::<_, _> {
            handle: build_handle,
            poll_fn: |h: ffi::ChannelBuildHandle| match ffi::channel_create_poll(h) {
                Ok(handle) => Poll::Ready(Ok(handle)),
                Err(e) if e == ErrorCode::Unavailable => Poll::Pending,
                Err(e) => Poll::Ready(Err(e)),
            },
            drop_fn: ffi::channel_create_drop,
        }
        .await?;

        Ok(handle)
    }

    /// Finish the builder to construct a gRPC channel.
    pub async fn build_grpc(self) -> Result<GrpcChannel, Error> {
        let handle = self.build_internal().await?;
        Ok(GrpcChannel::new(handle))
    }

    /// Finish the builder to construct the channel.
    pub async fn build_http(self) -> Result<HttpClient, Error> {
        let handle = self.build_internal().await?;
        Ok(HttpClient::new(handle))
    }
}

struct Channel(ffi::ChannelHandle);

impl Drop for Channel {
    fn drop(&mut self) {
        ffi::channel_drop(self.0);
    }
}

/// A gRPC channel that can be used for tonic-based clients. Created from [`ConnectionBuilder`].
/// Like regular tonic clients, this is cheap to clone to help avoid lifetime and ownership issues.
#[derive(Clone)]
pub struct GrpcChannel {
    native: Arc<Channel>,
}

impl GrpcChannel {
    fn new(native: ffi::ChannelHandle) -> Self {
        Self {
            native: Arc::new(Channel(native)),
        }
    }
}

/// An HTTP client that can be used for requests. Created from [`ConnectionBuilder`].
/// This is cheap to clone to help avoid lifetime and ownership issues.
#[derive(Clone)]
pub struct HttpClient {
    native: Arc<Channel>,
}

/// The response of a [`HttpClient`] call.
pub struct HttpResponse {
    /// The code of the call.
    pub code: http::StatusCode,

    /// The returned body.
    pub body: Vec<u8>,
}

impl HttpResponse {
    /// Attempt reading the body as a string.
    pub fn body_string(self) -> Result<String, FromUtf8Error> {
        String::from_utf8(self.body)
    }
}

impl HttpClient {
    fn new(native: ffi::ChannelHandle) -> Self {
        Self {
            native: Arc::new(Channel(native)),
        }
    }

    async fn internal_call<D, E, B>(
        &mut self,
        request: http::Request<B>,
    ) -> Result<http::Response<UnsyncBoxBody<D, E>>, Error>
    where
        D: Buf + IntoIterator<Item = u8> + Send + From<Vec<u8>> + Unpin + 'static,
        E: StdError + Send + Sync + 'static,
        B: http_body::Body<Data = D, Error = E> + Send + Unpin + 'static,
        http::Error: From<E>,
    {
        let channel = self.native.0;
        let handle = initiate_call(channel, request).await?;
        let (handle, status, version) = poll_call(handle).await?;
        build_response(handle, status, version)
    }

    /// Get the requested resource.
    pub async fn get<E>(
        &mut self,
        path: impl TryInto<http::uri::PathAndQuery, Error = E>,
    ) -> Result<HttpResponse, Error>
    where
        E: StdError + Send + Sync + 'static,
        http::Error: From<E>,
    {
        let path: http::uri::PathAndQuery = path.try_into().map_err(Error::from_specific)?;

        let req = Request::builder()
            .method(Method::GET)
            .uri(path)
            .body(Empty::<bytes::Bytes>::new().map_err(|e| match e {}))
            .map_err(Error::Error)?;

        let res = self.internal_call(req).await?;
        let (head, body) = res.into_parts();
        let response_bytes = to_bytes(body).await.map_err(Error::from_specific)?;

        Ok(HttpResponse {
            code: head.status,
            body: response_bytes,
        })
    }
}

impl<D, E, B> tower::Service<http::Request<B>> for GrpcChannel
where
    D: Buf + IntoIterator<Item = u8> + Send + From<Vec<u8>> + Unpin + 'static,
    E: Into<Box<dyn StdError + Send + Sync>> + 'static,
    B: http_body::Body<Data = D, Error = E> + Send + Unpin + 'static,
{
    type Response = http::Response<UnsyncBoxBody<D, E>>;
    type Error = Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, request: http::Request<B>) -> Self::Future {
        let channel = self.native.0;
        let f = async move {
            let handle = initiate_call(channel, request).await?;
            let (handle, status, version) = poll_call(handle).await?;
            build_response(handle, status, version)
        };

        Box::pin(f)
    }
}

async fn poll_call(
    handle: ffi::AsyncCallHandle,
) -> Result<(ffi::HttpResponseHandle, http::StatusCode, http::Version), Error> {
    fn poll_call(
        handle: ffi::AsyncCallHandle,
    ) -> Poll<Result<(ffi::HttpResponseHandle, u16, ffi::Version), ErrorCode>> {
        let ready = ffi::channel_call_poll(handle);
        match ready {
            Ok(_) => {
                let mut status = 0;
                let mut version = ffi::Version::V1_0;
                let response_handle =
                    ffi::channel_call_retrieve_response(handle, &mut status, &mut version)?;

                Poll::Ready(Ok((response_handle, status, version)))
            }
            Err(e) if e == ErrorCode::Unavailable => Poll::Pending,
            Err(e) => Poll::Ready(Err(e)),
        }
    }

    let (handle, status, version) = PollableResponse {
        handle,
        poll_fn: poll_call,
        drop_fn: ffi::channel_call_drop,
    }
    .await?;

    let status = http::StatusCode::from_u16(status).map_err(|e| Error::Other(e.into()))?;

    let version = match version {
        ffi::Version::V0_9 => http::Version::HTTP_09,
        ffi::Version::V1_0 => http::Version::HTTP_10,
        ffi::Version::V1_1 => http::Version::HTTP_11,
        ffi::Version::V2_0 => http::Version::HTTP_2,
        ffi::Version::V3_0 => http::Version::HTTP_3,
        version => return Err(Error::InvalidHttpVersion(version as u32)),
    };

    Ok((handle, status, version))
}

async fn initiate_call<D, E, B>(
    channel: ffi::ChannelHandle,
    request: http::Request<B>,
) -> Result<ffi::AsyncCallHandle, Error>
where
    D: Buf + IntoIterator<Item = u8>,
    E: Into<Box<dyn StdError + Send + Sync>> + 'static,
    B: http_body::Body<Data = D, Error = E> + Send + Unpin,
{
    let (head, body) = request.into_parts();

    let body = to_bytes(body).await.map_err(|e| Error::Other(e.into()))?;

    let ffi_version = match head.version {
        http::Version::HTTP_09 => ffi::Version::V0_9,
        http::Version::HTTP_10 => ffi::Version::V1_0,
        http::Version::HTTP_11 => ffi::Version::V1_1,
        http::Version::HTTP_2 => ffi::Version::V2_0,
        http::Version::HTTP_3 => ffi::Version::V3_0,
        unhandled => {
            unreachable!("unhandled HTTP version: {unhandled:?}",)
        }
    };

    let ffi_method = match head.method {
        http::Method::GET => ffi::Method::Get,
        http::Method::POST => ffi::Method::Post,
        http::Method::PUT => ffi::Method::Put,
        http::Method::DELETE => ffi::Method::Delete,
        http::Method::PATCH => ffi::Method::Patch,
        http::Method::HEAD => ffi::Method::Head,
        http::Method::OPTIONS => ffi::Method::Options,
        http::Method::TRACE => ffi::Method::Trace,
        http::Method::CONNECT => ffi::Method::Connect,
        unhandled => {
            unreachable!("unhandled HTTP method: {unhandled:?}",)
        }
    };

    let mut headers = vec![];

    for (key, value) in head.headers.iter() {
        #[cfg(not(target_pointer_width = "32"))]
        compile_error!("the below code is only valid with 32-bit pointers and usize");

        headers.push(ffi::HttpHeaderKeyValue {
            key_str_ptr: key.as_str().as_ptr() as u32,
            key_str_len: key.as_str().len() as u32,

            value_ptr: value.as_ref().as_ptr() as u32,
            value_len: value.as_ref().len() as u32,
        });
    }

    let handle = ffi::channel_call(
        channel,
        &format!("{}", head.uri),
        ffi_version,
        ffi_method,
        &headers,
        &body,
    )
    .map_err(|e| Error::FFIError(e.into()))?;

    Ok(handle)
}

fn build_response<D, E>(
    handle: ffi::HttpResponseHandle,
    status: http::StatusCode,
    version: http::Version,
) -> Result<http::Response<UnsyncBoxBody<D, E>>, Error>
where
    D: Buf + Unpin + Send + From<Vec<u8>> + 'static,
    E: Into<Box<dyn StdError + Send + Sync>> + 'static,
{
    let raw_body = ffi::http_response_body(handle);

    let body = http_body::Full::from(raw_body)
        .map_err(|e| match e {})
        .boxed_unsync();

    let header_count = ffi::http_response_header_count(handle);

    let mut headers = http::HeaderMap::<HeaderValue>::default();
    for index in 0..header_count {
        let key = ffi::http_response_header_key(handle, index);
        let value = ffi::http_response_header_value(handle, index);

        headers.insert(
            HeaderName::from_lowercase(key.as_bytes()).map_err(Error::from_specific)?,
            value.try_into().map_err(Error::from_specific)?,
        );
    }

    ffi::http_response_drop(handle);

    let mut response = http::Response::new(body);
    *response.status_mut() = status;
    *response.version_mut() = version;
    *response.headers_mut() = headers;

    Ok(response)
}

async fn to_bytes<
    D: Buf + IntoIterator<Item = u8>,
    E: Into<Box<dyn StdError + Send + Sync>> + 'static,
    B: http_body::Body<Data = D, Error = E> + Unpin,
>(
    mut body: B,
) -> Result<Vec<u8>, E> {
    let mut accum = vec![];

    while let Some(data) = body.data().await {
        accum.extend(data?);

        // 2 gigabyte max for safety
        if accum.len() > (1usize << 31) {
            panic!("too long message");
        }
    }

    Ok(accum)
}