use std::{marker, rc::Rc};
use ntex_router::{IntoPattern, Router as PatternRouter};
use ntex_service::{
IntoServiceFactory, Pipeline, Service, ServiceCtx, ServiceFactory, boxed,
fn_factory_with_config,
};
use ntex_util::{HashMap, future::Ready, future::join_all};
use crate::codec::protocol::{DeliveryState, Error, Rejected, Transfer};
use crate::error::LinkError;
use crate::types::{Link, Message, Outcome};
use crate::{Delivery, State, cell::Cell, rcvlink::ReceiverLink};
type Handle<S> = boxed::BoxServiceFactory<Link<S>, Transfer, Outcome, Error, Error>;
type HandleService = boxed::BoxService<Transfer, Outcome, Error>;
pub struct Router<S = ()>(Vec<(Vec<String>, Handle<S>)>);
impl<S: 'static> Default for Router<S> {
fn default() -> Router<S> {
Router::builder()
}
}
impl<S: 'static> Router<S> {
pub fn builder() -> Router<S> {
Router(Vec::new())
}
#[must_use]
#[allow(clippy::needless_pass_by_value)]
pub fn service<T, F, U>(mut self, address: T, service: F) -> Self
where
T: IntoPattern,
F: IntoServiceFactory<U, Transfer, Link<S>>,
U: ServiceFactory<Transfer, Link<S>, Response = Outcome> + 'static,
Error: From<U::Error> + From<U::InitError>,
Outcome: TryFrom<U::Error, Error = Error>,
{
self.0.push((
address.patterns(),
ResourceServiceFactory::create(service.into_factory()),
));
self
}
pub fn build(
self,
) -> impl ServiceFactory<
Message,
State<S>,
Response = (),
Error = Error,
InitError = std::convert::Infallible,
> {
let mut router = PatternRouter::build();
for (addr, hnd) in self.0 {
router.path(addr, hnd);
}
let router = Rc::new(router.finish());
fn_factory_with_config(move |state: State<S>| {
Ready::Ok(RouterService(Cell::new(RouterServiceInner {
state,
router: router.clone(),
handlers: HashMap::default(),
})))
})
}
}
struct RouterService<S>(Cell<RouterServiceInner<S>>);
struct RouterServiceInner<S> {
state: State<S>,
router: Rc<PatternRouter<Handle<S>>>,
handlers: HashMap<ReceiverLink, Option<Pipeline<HandleService>>>,
}
impl<S: 'static> Service<Message> for RouterService<S> {
type Response = ();
type Error = Error;
async fn call(&self, msg: Message, _: ServiceCtx<'_, Self>) -> Result<(), Error> {
match msg {
Message::Attached(frm, link) => {
let path = frm.target().and_then(|target| target.address.clone());
if let Some(path) = path {
let inner = self.0.get_mut();
let mut link = Link::new(frm, link, inner.state.clone(), path);
if let Some((hnd, _info)) = inner.router.recognize(link.path_mut()) {
log::trace!("Create handler service for {}", link.path().get_ref());
let rcv_link = link.link.clone();
inner.handlers.insert(link.receiver().clone(), None);
match hnd.create(link).await {
Ok(srv) => {
log::trace!("Handler service is created for {}", rcv_link.name());
self.0
.get_mut()
.handlers
.insert(rcv_link.clone(), Some(Pipeline::new(srv)));
if let Some((delivery, tr)) = rcv_link.get_delivery() {
service_call(rcv_link, delivery, tr, &self.0).await
} else {
Ok(())
}
}
Err(e) => {
log::error!(
"Failed to create link service for {} err: {e:?}",
rcv_link.name()
);
Err(e)
}
}
} else {
log::trace!(
"Target address is not recognized: {}",
link.path().get_ref()
);
Err(LinkError::force_detach()
.description(format!(
"Target address is not supported: {}",
link.path().get_ref()
))
.into())
}
} else {
Err(LinkError::force_detach()
.description("Target address is required")
.into())
}
}
Message::Detached(link) => {
if let Some(Some(srv)) = self.0.get_mut().handlers.remove(&link) {
log::trace!("Releasing handler service for {}", link.name());
let name = link.name().clone();
ntex_rt::spawn(async move {
srv.shutdown().await;
log::trace!("Handler service for {name} has shutdown");
});
}
Ok(())
}
Message::DetachedAll(links) => {
let links: Vec<_> = links
.into_iter()
.filter_map(|link| {
self.0
.get_mut()
.handlers
.remove(&link)
.and_then(move |srv| srv.map(|srv| (link, srv)))
})
.collect();
log::trace!(
"Shutting down {} handler services (session ended)",
links.len()
);
ntex_rt::spawn(async move {
let futs: Vec<_> = links
.iter()
.map(|(link, srv)| {
log::trace!(
"Releasing handler service for {} (session ended)",
link.name()
);
srv.shutdown()
})
.collect();
let len = futs.len();
let _ = join_all(futs).await;
log::trace!("Handler services for {len} links have shutdown (session ended)");
});
Ok(())
}
Message::Transfer(link) => {
if let Some(Some(_)) = self.0.get_ref().handlers.get(&link)
&& let Some((delivery, tr)) = link.get_delivery()
{
service_call(link, delivery, tr, &self.0).await?;
}
Ok(())
}
}
}
}
async fn service_call<S>(
link: ReceiverLink,
mut delivery: Delivery,
tr: Transfer,
inner: &Cell<RouterServiceInner<S>>,
) -> Result<(), Error> {
if let Some(Some(srv)) = inner.handlers.get(&link) {
if let Err(e) = srv.ready().await {
log::trace!("Service readiness check failed: {e:?}");
let _ =
link.close_with_error(LinkError::force_detach().description(format!("error: {e}")));
return Ok(());
}
if link.credit() == 0 {
link.set_link_credit(50);
}
match srv.call(tr).await {
Ok(outcome) => {
log::trace!("Outcome is ready {outcome:?} for {}", link.name());
delivery.settle(outcome.into_delivery_state());
}
Err(e) => {
log::trace!("Service response error: {e:?}");
delivery.settle(DeliveryState::Rejected(Rejected { error: Some(e) }));
}
}
}
Ok(())
}
struct ResourceServiceFactory<S, T> {
factory: T,
_t: marker::PhantomData<S>,
}
impl<S, T> ResourceServiceFactory<S, T>
where
S: 'static,
T: ServiceFactory<Transfer, Link<S>, Response = Outcome> + 'static,
Error: From<T::Error> + From<T::InitError>,
Outcome: TryFrom<T::Error, Error = Error>,
{
fn create(factory: T) -> Handle<S> {
boxed::factory(ResourceServiceFactory {
factory,
_t: marker::PhantomData,
})
}
}
impl<S, T> ServiceFactory<Transfer, Link<S>> for ResourceServiceFactory<S, T>
where
T: ServiceFactory<Transfer, Link<S>, Response = Outcome>,
Error: From<T::Error> + From<T::InitError>,
Outcome: TryFrom<T::Error, Error = Error>,
{
type Response = Outcome;
type Error = Error;
type InitError = Error;
type Service = ResourceService<S, T::Service>;
async fn create(&self, cfg: Link<S>) -> Result<Self::Service, Self::InitError> {
let service = self.factory.create(cfg).await?;
Ok(ResourceService {
service,
_t: marker::PhantomData,
})
}
}
struct ResourceService<S, T> {
service: T,
_t: marker::PhantomData<S>,
}
impl<S, T> Service<Transfer> for ResourceService<S, T>
where
T: Service<Transfer, Response = Outcome>,
Error: From<T::Error>,
Outcome: TryFrom<T::Error, Error = Error>,
{
type Response = Outcome;
type Error = Error;
ntex_service::forward_ready!(service);
ntex_service::forward_shutdown!(service);
async fn call(
&self,
req: Transfer,
ctx: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
match ctx.call(&self.service, req).await {
Ok(v) => Ok(v),
Err(err) => Outcome::try_from(err),
}
}
}