use std::sync::Arc;
use anyhow::Context;
use async_nats::HeaderMap;
use bytes::Bytes;
use futures::StreamExt as _;
use http_body_util::BodyExt as _;
use hyper::Uri;
use tokio::select;
use tokio::sync::Notify;
use tracing::info;
use wrpc_interface_http::HttpBody;
use wrpc_interface_http::{InvokeIncomingHandler as _, ServeHttp};
mod common;
use common::{init, start_nats};
#[tokio::test(flavor = "multi_thread")]
async fn rust() -> anyhow::Result<()> {
init().await;
let (_port, nats_client, nats_server, stop_tx) = start_nats().await?;
let client = wrpc_transport_nats::Client::new(nats_client, "test-prefix".to_string(), None)
.await
.context("failed to construct client")?;
let client = Arc::new(client);
{
#[derive(Clone)]
struct Handler;
impl wrpc_interface_http::ServeIncomingHandlerHttp<Option<HeaderMap>> for Handler {
async fn handle(
&self,
cx: Option<HeaderMap>,
req: hyper::Request<HttpBody>,
) -> anyhow::Result<
Result<
hyper::Response<
impl http_body::Body<Data = Bytes, Error = core::convert::Infallible>
+ Send
+ 'static,
>,
wrpc_interface_http::bindings::wrpc::http::types::ErrorCode,
>,
> {
assert_eq!(cx, None);
let (
http::request::Parts {
method,
uri,
headers,
..
},
body,
) = req.into_parts();
let mut headers = headers
.iter()
.map(|(k, v)| (k.as_str(), v.to_str().unwrap()))
.collect::<Vec<_>>();
headers.sort_unstable();
assert_eq!(method, http::Method::POST);
assert_eq!(uri.to_string(), "https://example.com/test?foo=bar");
assert_eq!(headers, [("user-agent", "wrpc/0.2.0")],);
let mut res = hyper::Response::new(body.map_err(|err| {
panic!("body error encountered: {err}");
}));
res.headers_mut().insert(
http::HeaderName::from_static("foo"),
http::HeaderValue::from_static("bar"),
);
Ok(Ok(res))
}
}
let shutdown = Arc::new(Notify::new());
info!("serving incoming handler");
let server = tokio::spawn({
let client = Arc::clone(&client);
let shutdown = Arc::clone(&shutdown);
async move {
let [(instance, name, mut invocations)] = wrpc_interface_http::bindings::exports::wrpc::http::incoming_handler::serve_interface(
client.as_ref(),
ServeHttp(Handler),
)
.await
.context("failed to serve incoming handler")?;
assert_eq!(instance, "wrpc:http/incoming-handler@0.1.0");
assert_eq!(name, "handle");
loop {
select! {
Some(invocation) = invocations.next() => {
let invocation = invocation.expect("failed to accept invocation");
invocation.await.expect("failed to handle invocation")
}
() = shutdown.notified() => {
return anyhow::Ok(())
}
}
}
}
});
let mut req = hyper::Request::new(http_body_util::Full::new(Bytes::from("foobar")));
*req.method_mut() = http::Method::POST;
*req.uri_mut() = Uri::from_static("https://example.com/test?foo=bar");
req.headers_mut().insert(
http::HeaderName::from_static("user-agent"),
http::HeaderValue::from_static("wrpc/0.2.0"),
);
info!("invoking incoming handler");
let (res, err, io) = client.invoke_handle_http(None, req).await?;
let res = res?;
if let Some(io) = io {
io.await?;
}
let err = err.collect::<Vec<_>>().await;
assert!(matches!(err[..], []));
let (
http::response::Parts {
status, headers, ..
},
body,
) = res.into_parts();
let mut headers = headers
.iter()
.map(|(k, v)| (k.as_str(), v.to_str().unwrap()))
.collect::<Vec<_>>();
headers.sort_unstable();
assert_eq!(status, http::StatusCode::OK);
assert_eq!(headers, [("foo", "bar")]);
let body = body.collect().await?;
assert_eq!(body.to_bytes(), "foobar");
shutdown.notify_one();
server.await??;
};
stop_tx.send(()).expect("failed to stop NATS.io server");
nats_server
.await
.context("failed to await NATS.io server stop")?
.context("NATS.io server failed to stop")?;
Ok(())
}