use crate::brokers::Broker;
use crate::brokers::base::HeaderMap;
use crate::errors::CrabbyError;
use crate::event::Event;
use crate::extract::RuntimeState;
use crate::handler::IntoHandler;
use crate::publish::Publisher;
use crate::response::HandlerOutcome;
use crate::service::{CrabbyService, MessageStreamFactory};
use std::sync::Arc;
use tower::Layer;
use tower::Service;
use tower::util::BoxService;
pub struct Router<S = ()> {
routes: Vec<RouteDefinition>,
state: S,
error_topic: Option<String>,
error_headers: Option<HeaderMap>,
layers: Vec<Arc<dyn RouteLayer>>,
}
trait RouteBuilder: Send {
fn build(
self: Box<Self>,
publisher: Publisher,
) -> BoxService<Event, HandlerOutcome, CrabbyError>;
}
trait RouteLayer: Send + Sync {
fn layer(
&self,
service: BoxService<Event, HandlerOutcome, CrabbyError>,
) -> BoxService<Event, HandlerOutcome, CrabbyError>;
}
struct TowerRouteLayer<L> {
layer: L,
}
impl<L> TowerRouteLayer<L> {
fn new(layer: L) -> Self {
Self { layer }
}
}
impl<L> RouteLayer for TowerRouteLayer<L>
where
L: Layer<BoxService<Event, HandlerOutcome, CrabbyError>> + Send + Sync + 'static,
L::Service: Service<Event, Response = HandlerOutcome, Error = CrabbyError> + Send + 'static,
<L::Service as Service<Event>>::Future: Send + 'static,
{
fn layer(
&self,
service: BoxService<Event, HandlerOutcome, CrabbyError>,
) -> BoxService<Event, HandlerOutcome, CrabbyError> {
BoxService::new(self.layer.layer(service))
}
}
struct RouteDefinition {
subject: String,
error_topic: Option<String>,
error_headers: Option<HeaderMap>,
layers: Vec<Arc<dyn RouteLayer>>,
stream_factory: Option<Box<dyn MessageStreamFactory>>,
builder: Box<dyn RouteBuilder>,
}
struct HandlerRoute<H, S> {
handler: H,
state: S,
_marker: std::marker::PhantomData<fn()>,
}
impl<H, S> HandlerRoute<H, S> {
fn new(handler: H, state: S) -> Self {
Self {
handler,
state,
_marker: std::marker::PhantomData,
}
}
}
struct TypedHandlerRoute<H, S, T> {
inner: HandlerRoute<H, S>,
_marker: std::marker::PhantomData<fn() -> T>,
}
impl<H, S, T> TypedHandlerRoute<H, S, T> {
fn new(handler: H, state: S) -> Self {
Self {
inner: HandlerRoute::new(handler, state),
_marker: std::marker::PhantomData,
}
}
}
impl<H, S, T> RouteBuilder for TypedHandlerRoute<H, S, T>
where
H: IntoHandler<RuntimeState<S>, T> + Send + 'static,
S: Clone + Send + Sync + 'static,
{
fn build(
self: Box<Self>,
publisher: Publisher,
) -> BoxService<Event, HandlerOutcome, CrabbyError> {
let Self { inner, .. } = *self;
let HandlerRoute { handler, state, .. } = inner;
handler.into_handler(RuntimeState::new(state, publisher))
}
}
struct ServiceRoute<T> {
service: T,
}
impl<T> ServiceRoute<T> {
fn new(service: T) -> Self {
Self { service }
}
}
impl<T> RouteBuilder for ServiceRoute<T>
where
T: Service<Event, Response = HandlerOutcome, Error = CrabbyError> + Send + 'static,
T::Future: Send + 'static,
{
fn build(
self: Box<Self>,
_publisher: Publisher,
) -> BoxService<Event, HandlerOutcome, CrabbyError> {
BoxService::new(self.service)
}
}
impl Router<()> {
pub fn new() -> Self {
Self {
routes: Vec::new(),
state: (),
error_topic: None,
error_headers: None,
layers: Vec::new(),
}
}
pub fn set_state<S: Clone + Send + Sync + 'static>(self, state: S) -> Router<S> {
Router {
routes: self.routes,
state,
error_topic: self.error_topic,
error_headers: self.error_headers,
layers: self.layers,
}
}
}
impl<S: Clone + Send + Sync + 'static> Router<S> {
pub fn route<H, T>(self, subject: &str, handler: H) -> Self
where
H: IntoHandler<RuntimeState<S>, T> + Send + 'static,
T: 'static,
{
self.route_handler(subject, handler)
}
pub fn routes<I, H, T>(mut self, subjects: I, handler: H) -> Self
where
I: IntoIterator,
I::Item: AsRef<str>,
H: IntoHandler<RuntimeState<S>, T> + Clone + Send + 'static,
T: 'static,
{
for subject in subjects {
self = self.route_handler(subject.as_ref(), handler.clone());
}
self
}
pub fn route_service<T>(mut self, subject: &str, service: T) -> Self
where
T: Service<Event, Response = HandlerOutcome, Error = CrabbyError> + Send + 'static,
T::Future: Send + 'static,
{
self.assert_route_available(subject);
self.routes.push(RouteDefinition {
subject: subject.to_string(),
error_topic: self.error_topic.clone(),
error_headers: self.error_headers.clone(),
layers: self.layers.clone(),
stream_factory: None,
builder: Box::new(ServiceRoute::new(service)),
});
self
}
pub fn layer<L>(mut self, layer: L) -> Self
where
L: Layer<BoxService<Event, HandlerOutcome, CrabbyError>> + Send + Sync + 'static,
L::Service: Service<Event, Response = HandlerOutcome, Error = CrabbyError> + Send + 'static,
<L::Service as Service<Event>>::Future: Send + 'static,
{
let layer = Arc::new(TowerRouteLayer::new(layer)) as Arc<dyn RouteLayer>;
self.layers.push(layer.clone());
for route in &mut self.routes {
route.layers.push(layer.clone());
}
self
}
pub fn on_error(mut self, topic: &str) -> Self {
let topic = topic.to_string();
self.error_topic = Some(topic.clone());
for route in &mut self.routes {
route.error_topic = Some(topic.clone());
}
self
}
pub fn error_headers(mut self, headers: HeaderMap) -> Self {
self.error_headers = Some(headers.clone());
for route in &mut self.routes {
route.error_headers = Some(headers.clone());
}
self
}
pub fn include<OtherState>(mut self, other: Router<OtherState>) -> Self {
for route in other.routes {
self.assert_route_available(&route.subject);
self.routes.push(route);
}
self
}
fn route_handler<H, T>(mut self, subject: &str, handler: H) -> Self
where
H: IntoHandler<RuntimeState<S>, T> + Send + 'static,
T: 'static,
{
self.assert_route_available(subject);
self.routes.push(RouteDefinition {
subject: subject.to_string(),
error_topic: self.error_topic.clone(),
error_headers: self.error_headers.clone(),
layers: self.layers.clone(),
stream_factory: None,
builder: Box::new(TypedHandlerRoute::<H, S, T>::new(
handler,
self.state.clone(),
)),
});
self
}
#[cfg(feature = "nats")]
pub(crate) fn route_with_stream_factory<H, T, F>(
mut self,
subject: &str,
handler: H,
stream_factory: F,
) -> Self
where
H: IntoHandler<RuntimeState<S>, T> + Send + 'static,
T: 'static,
F: MessageStreamFactory + 'static,
{
self.assert_route_available(subject);
self.routes.push(RouteDefinition {
subject: subject.to_string(),
error_topic: self.error_topic.clone(),
error_headers: self.error_headers.clone(),
layers: self.layers.clone(),
stream_factory: Some(Box::new(stream_factory)),
builder: Box::new(TypedHandlerRoute::<H, S, T>::new(
handler,
self.state.clone(),
)),
});
self
}
fn assert_route_available(&self, subject: &str) {
assert!(
!self.routes.iter().any(|route| route.subject == subject),
"duplicate route key '{subject}' is already registered"
);
}
pub fn into_service<B: Broker + Clone>(self, broker: B) -> CrabbyService<B> {
let publisher = Publisher::new(broker.clone());
let routes = self
.routes
.into_iter()
.map(|route| {
let mut service = route.builder.build(publisher.clone());
for layer in route.layers {
service = layer.layer(service);
}
crate::service::ServiceRoute {
subject: route.subject,
error_topic: route.error_topic,
error_headers: route.error_headers,
stream_factory: route.stream_factory,
service,
}
})
.collect();
CrabbyService::new(routes, broker)
}
}