titanrt 0.7.0

Typed reactive runtime for real-time systems
Documentation
use crate::connector::HookArgs;
use crate::connector::features::composite::stream::event::{
    StreamEvent, StreamEventContext, StreamEventParsed, StreamEventRoute, StreamEventUnion,
};
use crate::connector::features::grpc::stream::{GrpcDescriptor, GrpcEvent};
use crate::connector::features::http::stream::descriptor::HttpDescriptor;
use crate::connector::features::http::stream::event::HttpEvent;
use crate::connector::features::shared::events::{StreamEventInner, StreamEventRaw};
use crate::connector::features::websocket::stream::{WebSocketEvent, WebSocketStreamDescriptor};
use crate::io::mpmc::MpmcSender;
use crate::prelude::BaseTx;
use tracing::warn;

#[inline]
fn send_event<E: StreamEventParsed>(
    event_tx: &mut MpmcSender<StreamEvent<E>>,
    event: StreamEvent<E>,
    stream_type: &str,
) where
    E: StreamEventParsed,
{
    if let Err(err) = event_tx.try_send(event) {
        warn!(
            "{} hook failed to forward event from stream {:?}: {}",
            stream_type,
            err.value.as_ref().map(|v| v.stream.as_str()),
            err
        );
    }
}

#[inline]
fn process_event<E, RawEvent, Desc, F>(
    ctx: &StreamEventContext<E>,
    raw: StreamEventRaw<RawEvent>,
    desc: &Desc,
    event_tx: &mut MpmcSender<StreamEvent<E>>,
    stream_type: &str,
    use_parser: F,
) -> Option<StreamEventRaw<RawEvent>>
where
    RawEvent: StreamEventInner,
    E: StreamEventParsed,
    F: FnOnce(StreamEventRaw<RawEvent>, &StreamEventRoute<E>, &Desc) -> Option<E>,
{
    if let Some(route) = ctx.select_route(raw.label(), raw.payload()) {
        if let Some(event) = use_parser(raw, &route, desc) {
            send_event(
                event_tx,
                StreamEvent {
                    stream: ctx.stream_name().clone(),
                    union: StreamEventUnion::Parsed(event),
                },
                stream_type,
            );
        }
        return None;
    }

    return Some(raw);
}

pub fn http_hook<E>(
    args: HookArgs<
        '_,
        StreamEventRaw<HttpEvent>,
        MpmcSender<StreamEvent<E>>,
        E::HttpReducer,
        E::HttpState,
        HttpDescriptor<StreamEventContext<E>>,
        StreamEventContext<E>,
    >,
) where
    E: StreamEventParsed,
{
    let ctx = args
        .desc
        .ctx
        .as_ref()
        .expect("http hook ctx is required for http hook");

    let maybe_raw = process_event(
        ctx,
        args.raw,
        args.desc,
        args.event_tx,
        "http",
        |raw, route, desc| route.parser().from_http(raw, route, desc),
    );

    if let Some(raw) = maybe_raw {
        send_event(
            args.event_tx,
            StreamEvent {
                stream: ctx.stream_name().clone(),
                union: StreamEventUnion::RawHttp(raw),
            },
            "http",
        );
    }
}

pub fn grpc_hook<E>(
    args: HookArgs<
        '_,
        StreamEventRaw<GrpcEvent>,
        MpmcSender<StreamEvent<E>>,
        E::GrpcReducer,
        E::GrpcState,
        GrpcDescriptor<StreamEventContext<E>>,
        StreamEventContext<E>,
    >,
) where
    E: StreamEventParsed,
{
    let ctx = args
        .desc
        .ctx
        .as_ref()
        .expect("grpc hook ctx is required for grpc hook");

    let maybe_raw = process_event(
        ctx,
        args.raw,
        args.desc,
        args.event_tx,
        "grpc",
        |raw, route, desc| route.parser().from_grpc(raw, route, desc),
    );

    if let Some(raw) = maybe_raw {
        send_event(
            args.event_tx,
            StreamEvent {
                stream: ctx.stream_name().clone(),
                union: StreamEventUnion::RawGrpc(raw),
            },
            "grpc",
        );
    }
}

pub fn ws_hook<E>(
    args: HookArgs<
        '_,
        StreamEventRaw<WebSocketEvent>,
        MpmcSender<StreamEvent<E>>,
        E::WsReducer,
        E::WsState,
        WebSocketStreamDescriptor<StreamEventContext<E>>,
        StreamEventContext<E>,
    >,
) where
    E: StreamEventParsed,
{
    let ctx = args
        .desc
        .ctx
        .as_ref()
        .expect("grpc hook ctx is required for grpc hook");

    let maybe_raw = process_event(
        ctx,
        args.raw,
        args.desc,
        args.event_tx,
        "ws",
        |raw, route, desc| route.parser().from_ws(raw, route, desc),
    );

    if let Some(raw) = maybe_raw {
        send_event(
            args.event_tx,
            StreamEvent {
                stream: ctx.stream_name().clone(),
                union: StreamEventUnion::RawWs(raw),
            },
            "ws",
        );
    }
}