use crate::connector::features::composite::stream::StreamKind;
use crate::utils::MatchTarget;
use crate::utils::RouteMatcher;
use crate::utils::StringTokens;
use crate::utils::{Reducer, SharedStr, StateMarker};
use crate::{
connector::features::{
grpc::stream::{GrpcDescriptor, GrpcEvent},
http::stream::{descriptor::HttpDescriptor, event::HttpEvent},
shared::events::StreamEventRaw,
websocket::stream::{WebSocketEvent, WebSocketStreamDescriptor},
},
utils::{NullReducer, NullState},
};
use serde_json::Value;
use std::fmt::Debug;
use std::sync::Arc;
use std::collections::HashMap;
#[derive(Clone, Debug)]
pub struct EventRouteRegistry<E> {
map: HashMap<SharedStr, Vec<StreamEventRoute<E>>>,
}
impl<E> EventRouteRegistry<E> {
pub fn new() -> Self {
Self {
map: HashMap::new(),
}
}
pub fn add_route(
mut self,
stream_name: impl Into<SharedStr>,
route: StreamEventRoute<E>,
) -> Self {
let stream_name = stream_name.into();
self.map
.entry(stream_name)
.or_insert_with(Vec::new)
.push(route);
self
}
pub fn add_routes(
mut self,
stream_name: impl Into<SharedStr>,
routes: Vec<StreamEventRoute<E>>,
) -> Self {
let stream_name = stream_name.into();
self.map
.entry(stream_name)
.or_insert_with(Vec::new)
.extend(routes);
self
}
pub fn get_routes(&self, stream_name: &str) -> Option<&Vec<StreamEventRoute<E>>> {
self.map.get(stream_name)
}
pub fn get_routes_mut(&mut self, stream_name: &str) -> Option<&mut Vec<StreamEventRoute<E>>> {
self.map.get_mut(stream_name)
}
pub fn iter(&self) -> impl Iterator<Item = (&SharedStr, &Vec<StreamEventRoute<E>>)> {
self.map.iter()
}
}
#[derive(Debug, Clone)]
pub struct StreamEventContext<E> {
stream: SharedStr,
kind: StreamKind,
routes: Vec<StreamEventRoute<E>>,
tokens: StringTokens,
metadata: HashMap<String, Value>,
}
impl<E> StreamEventContext<E> {
pub fn new(stream: &str, kind: StreamKind) -> Self {
let tokens = StringTokens::parse(stream);
let metadata = HashMap::new();
let ctx = StreamEventContext {
stream: stream.into(),
kind,
routes: vec![],
tokens,
metadata,
};
ctx
}
#[inline]
pub fn stream_name(&self) -> &SharedStr {
&self.stream
}
#[inline]
pub fn tokens(&self) -> &StringTokens {
&self.tokens
}
#[inline]
pub fn tokens_mut(&mut self) -> &mut StringTokens {
&mut self.tokens
}
#[inline]
pub fn kind(&self) -> StreamKind {
self.kind
}
#[inline]
pub fn metadata(&self) -> &HashMap<String, Value> {
&self.metadata
}
#[inline]
pub fn set_metadata(&mut self, metadata: HashMap<String, Value>) {
self.metadata = metadata;
}
pub fn update_metadata<F>(&mut self, update: F)
where
F: FnOnce(&mut HashMap<String, Value>),
{
update(&mut self.metadata);
}
pub fn routes_iter(&self) -> impl Iterator<Item = &StreamEventRoute<E>> + '_ {
self.routes.iter()
}
pub fn routes(&self) -> &Vec<StreamEventRoute<E>> {
&self.routes
}
pub fn routes_mut(&mut self) -> &mut Vec<StreamEventRoute<E>> {
&mut self.routes
}
pub fn route(&self, id: &str) -> Option<&StreamEventRoute<E>> {
self.routes
.iter()
.find(|route| route.id().as_str().eq_ignore_ascii_case(id))
}
pub fn add_route(&mut self, route: StreamEventRoute<E>) {
self.routes.retain(|r| {
!r.id().as_str().eq_ignore_ascii_case(route.id().as_str()) && r.matcher != route.matcher
});
self.routes.push(route);
}
#[inline(always)]
pub fn select_route(
&self,
label: Option<&str>,
payload: Option<&Value>,
) -> Option<&StreamEventRoute<E>> {
self.routes
.iter()
.find(|route| route.matches(label, payload))
}
}
#[derive(Clone, Debug)]
pub struct StreamEventRoute<E> {
id: SharedStr,
matcher: RouteMatcher,
parser: Arc<dyn StreamEventParser<E>>,
}
impl<E> StreamEventRoute<E> {
pub fn new(
id: impl Into<SharedStr>,
matcher: RouteMatcher,
parser: impl StreamEventParser<E>,
) -> Self {
let route = Self {
id: id.into(),
matcher: matcher,
parser: Arc::new(parser),
};
route
}
#[inline(always)]
pub fn id(&self) -> &SharedStr {
&self.id
}
#[inline(always)]
pub fn matches(&self, label: Option<&str>, payload: Option<&Value>) -> bool {
self.matcher.matches(label, payload)
}
#[inline(always)]
pub fn parser(&self) -> &Arc<dyn StreamEventParser<E>> {
&self.parser
}
pub fn match_target(&self) -> MatchTarget {
self.matcher.extract_targets()
}
}
pub trait StreamEventParser<E>: Debug + Send + Sync + 'static {
fn name(&self) -> &str;
fn from_http(
&self,
_event: StreamEventRaw<HttpEvent>,
_route: &StreamEventRoute<E>,
_desc: &HttpDescriptor<StreamEventContext<E>>,
) -> Option<E> {
None
}
fn from_grpc(
&self,
_event: StreamEventRaw<GrpcEvent>,
_route: &StreamEventRoute<E>,
_desc: &GrpcDescriptor<StreamEventContext<E>>,
) -> Option<E> {
None
}
fn from_ws(
&self,
_event: StreamEventRaw<WebSocketEvent>,
_route: &StreamEventRoute<E>,
_desc: &WebSocketStreamDescriptor<StreamEventContext<E>>,
) -> Option<E> {
None
}
}
pub trait StreamEventParsed: Debug + Send + Clone + 'static {
type HttpReducer: Reducer;
type GrpcReducer: Reducer;
type WsReducer: Reducer;
type HttpState: StateMarker;
type GrpcState: StateMarker;
type WsState: StateMarker;
}
#[derive(Clone, Debug)]
pub struct NullStreamEvent;
impl StreamEventParsed for NullStreamEvent {
type HttpReducer = NullReducer;
type GrpcReducer = NullReducer;
type WsReducer = NullReducer;
type HttpState = NullState;
type GrpcState = NullState;
type WsState = NullState;
}
#[derive(Debug, Clone)]
pub enum StreamEventUnion<Parsed> {
RawHttp(StreamEventRaw<HttpEvent>),
RawGrpc(StreamEventRaw<GrpcEvent>),
RawWs(StreamEventRaw<WebSocketEvent>),
Parsed(Parsed),
}
#[derive(Debug, Clone)]
pub struct StreamEvent<Parsed> {
pub stream: SharedStr,
pub union: StreamEventUnion<Parsed>,
}