use crate::error::{ComponentInfo, ErrorAction, ErrorContext, ErrorStrategy, StreamError};
use crate::http_server::types::HttpServerRequest;
use crate::message::Message;
use crate::{Input, Output, Transformer, TransformerConfig};
use async_trait::async_trait;
use futures::Stream;
use futures::StreamExt;
use std::pin::Pin;
#[derive(Debug, Clone)]
pub struct RoutePattern {
pub pattern: String,
pub port: usize,
}
#[derive(Debug, Clone, Default)]
pub struct PathRouterConfig {
pub routes: Vec<RoutePattern>,
pub default_port: Option<usize>,
}
#[derive(Debug, Clone)]
pub struct PathBasedRouterTransformer {
config: TransformerConfig<Message<HttpServerRequest>>,
router_config: PathRouterConfig,
}
impl PathBasedRouterTransformer {
pub fn new(router_config: PathRouterConfig) -> Self {
Self {
config: TransformerConfig::default(),
router_config,
}
}
pub fn with_routes(routes: Vec<RoutePattern>, default_port: Option<usize>) -> Self {
Self::new(PathRouterConfig {
routes,
default_port,
})
}
pub fn add_route(&mut self, pattern: String, port: usize) {
self
.router_config
.routes
.push(RoutePattern { pattern, port });
}
pub fn set_default_port(&mut self, port: Option<usize>) {
self.router_config.default_port = port;
}
#[must_use]
pub fn router_config(&self) -> &PathRouterConfig {
&self.router_config
}
pub fn router_config_mut(&mut self) -> &mut PathRouterConfig {
&mut self.router_config
}
fn match_route(&self, path: &str) -> Option<usize> {
for route in &self.router_config.routes {
if self.path_matches(&route.pattern, path) {
return Some(route.port);
}
}
self.router_config.default_port
}
fn path_matches(&self, pattern: &str, path: &str) -> bool {
if let Some(prefix) = pattern.strip_suffix("/*") {
path.starts_with(prefix)
} else {
pattern == path
}
}
}
impl Input for PathBasedRouterTransformer {
type Input = Message<HttpServerRequest>;
type InputStream = Pin<Box<dyn Stream<Item = Self::Input> + Send>>;
}
impl Output for PathBasedRouterTransformer {
type Output = (
Option<Message<HttpServerRequest>>,
Option<Message<HttpServerRequest>>,
Option<Message<HttpServerRequest>>,
Option<Message<HttpServerRequest>>,
Option<Message<HttpServerRequest>>,
);
type OutputStream = Pin<Box<dyn Stream<Item = Self::Output> + Send>>;
}
#[async_trait]
impl Transformer for PathBasedRouterTransformer {
type InputPorts = (Message<HttpServerRequest>,);
type OutputPorts = (
Option<Message<HttpServerRequest>>,
Option<Message<HttpServerRequest>>,
Option<Message<HttpServerRequest>>,
Option<Message<HttpServerRequest>>,
Option<Message<HttpServerRequest>>,
);
async fn transform(&mut self, input: Self::InputStream) -> Self::OutputStream {
let router_config = self.router_config.clone();
Box::pin(input.map(move |item| {
let path = item.payload().path.clone();
let port = Self {
config: TransformerConfig::default(),
router_config: router_config.clone(),
}
.match_route(&path);
match port {
Some(0) => (Some(item.clone()), None, None, None, None),
Some(1) => (None, Some(item.clone()), None, None, None),
Some(2) => (None, None, Some(item.clone()), None, None),
Some(3) => (None, None, None, Some(item.clone()), None),
Some(4) => (None, None, None, None, Some(item.clone())),
Some(_) => (None, None, None, None, Some(item)), None => (None, None, None, None, None), }
}))
}
fn set_config_impl(&mut self, config: TransformerConfig<Message<HttpServerRequest>>) {
self.config = config;
}
fn get_config_impl(&self) -> &TransformerConfig<Message<HttpServerRequest>> {
&self.config
}
fn get_config_mut_impl(&mut self) -> &mut TransformerConfig<Message<HttpServerRequest>> {
&mut self.config
}
fn handle_error(&self, error: &StreamError<Message<HttpServerRequest>>) -> ErrorAction {
match &self.config.error_strategy {
ErrorStrategy::Stop => ErrorAction::Stop,
ErrorStrategy::Skip => ErrorAction::Skip,
ErrorStrategy::Retry(n) if error.retries < *n => ErrorAction::Retry,
_ => ErrorAction::Stop,
}
}
fn create_error_context(
&self,
item: Option<Message<HttpServerRequest>>,
) -> ErrorContext<Message<HttpServerRequest>> {
ErrorContext {
timestamp: chrono::Utc::now(),
item,
component_name: self.component_info().name,
component_type: std::any::type_name::<Self>().to_string(),
}
}
fn component_info(&self) -> ComponentInfo {
ComponentInfo {
name: self
.config
.name
.clone()
.unwrap_or_else(|| "path_router".to_string()),
type_name: std::any::type_name::<Self>().to_string(),
}
}
}