use std::{convert::Infallible, sync::Arc};
use async_trait::async_trait;
use tracing::{error, instrument, trace, warn};
use wasmbus_rpc::{
core::LinkDefinition, error::RpcError, provider::prelude::*, provider::ProviderTransport,
};
use wasmcloud_provider_httpserver::{
load_settings,
wasmcloud_interface_httpserver::{HttpRequest, HttpResponse, HttpServer, HttpServerSender},
HttpServerCore,
};
fn main() -> Result<(), Box<dyn std::error::Error>> {
provider_main(
HttpServerProvider::default(),
Some("HttpServer Provider".to_string()),
)?;
eprintln!("HttpServer provider exiting");
Ok(())
}
#[derive(Clone, Default, Provider)]
struct HttpServerProvider {
actors: Arc<dashmap::DashMap<String, HttpServerCore>>,
}
impl ProviderDispatch for HttpServerProvider {}
#[async_trait]
impl ProviderHandler for HttpServerProvider {
async fn put_link(&self, ld: &LinkDefinition) -> Result<bool, RpcError> {
let settings =
load_settings(&ld.values).map_err(|e| RpcError::ProviderInit(e.to_string()))?;
let http_server = HttpServerCore::new(
settings.clone(),
get_host_bridge().lattice_prefix().to_string(),
call_actor,
);
http_server.start(ld).await.map_err(|e| {
RpcError::ProviderInit(format!(
"starting httpserver for {} {:?}: {}",
&ld.actor_id, &settings.address, e
))
})?;
self.actors.insert(ld.actor_id.to_string(), http_server);
Ok(true)
}
async fn delete_link(&self, actor_id: &str) {
if let Some(entry) = self.actors.remove(actor_id) {
tracing::info!(%actor_id, "httpserver stopping listener for actor");
entry.1.begin_shutdown();
}
}
async fn shutdown(&self) -> Result<(), Infallible> {
self.actors.clear();
Ok(())
}
}
#[instrument(level = "debug", skip(_lattice_id, ld, req, timeout), fields(actor_id = %ld.actor_id))]
async fn call_actor(
_lattice_id: String,
ld: Arc<LinkDefinition>,
req: HttpRequest,
timeout: Option<std::time::Duration>,
) -> Result<HttpResponse, RpcError> {
let tx = ProviderTransport::new_with_timeout(ld.as_ref(), Some(get_host_bridge()), timeout);
let ctx = Context::default();
let actor = HttpServerSender::via(tx);
let rc = actor.handle_request(&ctx, &req).await;
match rc {
Err(RpcError::Timeout(_)) => {
error!("actor request timed out: returning 503",);
Ok(HttpResponse {
status_code: 503,
body: Default::default(),
header: Default::default(),
})
}
Ok(resp) => {
trace!(
status_code = %resp.status_code,
"http response received from actor"
);
Ok(resp)
}
Err(e) => {
warn!(
error = %e,
"actor responded with error"
);
Err(e)
}
}
}