use std::time::Instant;
use wasmbus_rpc::{core::InvocationResponse, provider::prelude::*};
use wasmcloud_provider_httpserver::wasmcloud_interface_httpserver::{HttpRequest, HttpResponse};
use wasmcloud_test_util::{
check, cli::print_test_results, provider_test::test_provider, run_selected_spawn,
testing::TestOptions,
};
const SERVER_UNDER_TEST: &str = "http://localhost:9000";
const NUM_RPC: u32 = 5;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn run_all() -> std::result::Result<(), Box<dyn std::error::Error>> {
let opts = TestOptions::default();
let join = mock_echo_actor(NUM_RPC).await;
let res = run_selected_spawn!(opts, health_check, send_http, send_http_body, test_timeout);
print_test_results(&res);
let passed = res.iter().filter(|tr| tr.passed).count();
let total = res.len();
assert_eq!(passed, total, "{} passed out of {}", passed, total);
let completed = join.await??;
assert_eq!(completed, NUM_RPC);
let provider = test_provider().await;
let _ = provider.shutdown().await;
Ok(())
}
async fn health_check(_opt: &TestOptions) -> RpcResult<()> {
let prov = test_provider().await;
let hc = prov.health_check().await;
check!(hc.is_ok())?;
Ok(())
}
async fn mock_echo_actor(num_requests: u32) -> tokio::task::JoinHandle<RpcResult<u32>> {
use futures::StreamExt as _;
use wasmbus_rpc::{
common::{deserialize, serialize},
core::Invocation,
};
let handle = tokio::runtime::Handle::current();
handle.spawn(async move {
let mut completed = 0u32;
if let Err::<(), RpcError>(e) = {
let prov = test_provider().await;
let topic = prov.mock_actor_rpc_topic();
let mut sub = prov
.nats_client
.subscribe(topic)
.await
.map_err(|e| RpcError::Nats(e.to_string()))?;
while let Some(msg) = sub.next().await {
let inv: Invocation = deserialize(&msg.payload)?;
if &inv.operation != "HttpServer.HandleRequest" {
eprintln!("Unexpected method received by actor: {}", &inv.operation);
break;
}
let http_req: HttpRequest = deserialize(&inv.msg)?;
if http_req.path.contains("sleep") {
eprintln!("httpserver /sleep test: expect timeouts in log");
tokio::time::sleep(std::time::Duration::from_millis(4000)).await;
}
let body = serde_json::to_vec(&serde_json::json!({
"msg_id": completed,
"method": http_req.method,
"path": http_req.path,
"query": http_req.query_string,
"body_len": http_req.body.len(),
"body_hash": hash(&http_req.body),
}))
.map_err(|e| RpcError::Ser(e.to_string()))?;
let http_resp = HttpResponse {
body,
status_code: 200,
header: Default::default(),
};
let buf = serialize(&http_resp)?;
if let Some(ref reply_to) = msg.reply {
let mut ir = InvocationResponse::default();
ir.invocation_id = inv.id;
ir.msg = buf;
prov.rpc_client
.publish(reply_to.to_string(), serialize(&ir)?)
.await
.map_err(|e| RpcError::Nats(e.to_string()))?;
}
completed += 1;
if completed >= num_requests {
break;
}
}
let _ = sub.unsubscribe().await;
Ok(())
} {
eprintln!("mock_actor got error: {}. quitting actor thread", e);
}
Ok(completed)
})
}
async fn send_http(_: &TestOptions) -> RpcResult<()> {
type JsonData = std::collections::HashMap<String, serde_json::Value>;
let client = reqwest::Client::new();
let start_time = Instant::now();
let resp = client
.get(&format!("{}/abc", SERVER_UNDER_TEST))
.send()
.await
.map_err(|e| RpcError::Other(e.to_string()))?;
let elapsed = start_time.elapsed();
eprintln!("GET /abc returned in {} ms", &elapsed.as_millis());
assert_eq!(resp.status().as_u16(), 200);
let body = resp
.json::<JsonData>()
.await
.map_err(|e| RpcError::Deser(e.to_string()))?;
assert_eq!(body.get("method").unwrap().as_str(), Some("GET"));
assert_eq!(body.get("path").unwrap().as_str(), Some("/abc"));
let client = reqwest::Client::new();
let start_time = Instant::now();
let resp = client
.get(&format!("{}/def?name=Carol&thing=one", SERVER_UNDER_TEST))
.send()
.await
.map_err(|e| RpcError::Other(e.to_string()))?;
let elapsed = start_time.elapsed();
eprintln!("GET /def returned in {} ms", &elapsed.as_millis());
assert_eq!(resp.status().as_u16(), 200);
let body = resp
.json::<JsonData>()
.await
.map_err(|e| RpcError::Deser(e.to_string()))?;
assert_eq!(body.get("method").unwrap().as_str(), Some("GET"));
assert_eq!(body.get("path").unwrap().as_str(), Some("/def"));
assert_eq!(
body.get("query").unwrap().as_str(),
Some("name=Carol&thing=one")
);
Ok(())
}
async fn send_http_body(_: &TestOptions) -> RpcResult<()> {
type JsonData = std::collections::HashMap<String, serde_json::Value>;
let client = reqwest::Client::new();
let start_time = Instant::now();
let resp = client
.post(&format!("{}/1", SERVER_UNDER_TEST))
.send()
.await
.map_err(|e| RpcError::Other(e.to_string()))?;
let elapsed = start_time.elapsed();
eprintln!("POST /1 returned in {} ms", &elapsed.as_millis());
assert_eq!(resp.status().as_u16(), 200);
let body = resp
.json::<JsonData>()
.await
.map_err(|e| RpcError::Deser(e.to_string()))?;
assert_eq!(body.get("method").unwrap().as_str(), Some("POST"));
assert_eq!(body.get("path").unwrap().as_str(), Some("/1"));
assert_eq!(body.get("body_len").unwrap().as_i64(), Some(0));
let mut blob = [0u8; 700];
for (i, item) in blob.iter_mut().enumerate() {
*item = (i % 256) as u8;
}
let expected_hash = hash(&blob);
let client = reqwest::Client::new();
let start_time = Instant::now();
let resp = client
.put(&format!("{}/2", SERVER_UNDER_TEST))
.body(blob.to_vec())
.send()
.await
.map_err(|e| RpcError::Other(e.to_string()))?;
let elapsed = start_time.elapsed();
eprintln!("POST /2 returned in {} ms", &elapsed.as_millis());
assert_eq!(resp.status().as_u16(), 200);
let body = resp
.json::<JsonData>()
.await
.map_err(|e| RpcError::Deser(e.to_string()))?;
assert_eq!(body.get("path").unwrap().as_str(), Some("/2"));
assert_eq!(body.get("method").unwrap().as_str(), Some("PUT"));
assert_eq!(
body.get("body_len").unwrap().as_u64(),
Some(blob.len() as u64)
);
assert_eq!(
body.get("body_hash").unwrap().as_str(),
Some(expected_hash.as_str())
);
Ok(())
}
async fn test_timeout(_: &TestOptions) -> RpcResult<()> {
let client = reqwest::Client::new();
let start_time = Instant::now();
let resp = client
.get(&format!("{}/sleep", SERVER_UNDER_TEST))
.send()
.await;
let elapsed = start_time.elapsed();
eprintln!("GET /sleep returned in {} ms", &elapsed.as_millis());
assert!(resp.is_ok(), "expect ok response");
let resp = resp.unwrap();
assert_eq!(resp.status().as_u16(), 503, "expected 503 timeout error");
Ok(())
}
fn hash(buf: &[u8]) -> String {
use blake2::{Blake2s256, Digest};
let mut hasher = Blake2s256::new();
hasher.update(buf);
let res = hasher.finalize();
format!("{:x}", res)
}