titanrt 0.7.0

Typed reactive runtime for real-time systems
Documentation
use crate::connector::Stream;
use crate::connector::features::composite::stream::event::{StreamEvent, StreamEventParsed};
use crate::connector::features::grpc::stream::GrpcCommand;
use crate::connector::features::http::stream::actions::HttpAction;
use crate::connector::features::shared::actions::StreamActionRaw;
use crate::connector::features::websocket::stream::WebSocketCommand;
use crate::io::mpmc::MpmcReceiver;
use crate::io::ringbuffer::RingSender;
use crate::utils::StateCell;

pub enum StreamWrapper<E>
where
    E: StreamEventParsed,
{
    Http {
        stream: Stream<
            RingSender<StreamActionRaw<HttpAction>>,
            Option<MpmcReceiver<StreamEvent<E>>>,
            E::HttpState,
        >,
    },
    Grpc {
        stream: Stream<
            RingSender<StreamActionRaw<GrpcCommand>>,
            Option<MpmcReceiver<StreamEvent<E>>>,
            E::GrpcState,
        >,
    },
    Ws {
        stream: Stream<
            RingSender<StreamActionRaw<WebSocketCommand>>,
            Option<MpmcReceiver<StreamEvent<E>>>,
            E::WsState,
        >,
    },
    _Phantom(std::marker::PhantomData<E>),
}

impl<E> StreamWrapper<E>
where
    E: StreamEventParsed,
{
    pub fn get_http(
        &mut self,
    ) -> Option<
        &mut Stream<
            RingSender<StreamActionRaw<HttpAction>>,
            Option<MpmcReceiver<StreamEvent<E>>>,
            E::HttpState,
        >,
    > {
        match self {
            StreamWrapper::Http { stream } => Some(stream),
            _ => None,
        }
    }

    pub fn get_grpc(
        &mut self,
    ) -> Option<
        &mut Stream<
            RingSender<StreamActionRaw<GrpcCommand>>,
            Option<MpmcReceiver<StreamEvent<E>>>,
            E::GrpcState,
        >,
    > {
        match self {
            StreamWrapper::Grpc { stream } => Some(stream),
            _ => None,
        }
    }

    pub fn get_ws(
        &mut self,
    ) -> Option<
        &mut Stream<
            RingSender<StreamActionRaw<WebSocketCommand>>,
            Option<MpmcReceiver<StreamEvent<E>>>,
            E::WsState,
        >,
    > {
        match self {
            StreamWrapper::Ws { stream } => Some(stream),
            _ => None,
        }
    }

    pub fn cancel(&mut self) {
        match self {
            StreamWrapper::Http { stream } => stream.cancel(),
            StreamWrapper::Grpc { stream } => stream.cancel(),
            StreamWrapper::Ws { stream } => stream.cancel(),
            StreamWrapper::_Phantom(_) => {}
        }
    }

    pub fn is_alive(&self) -> bool {
        match self {
            StreamWrapper::Http { stream } => stream.is_healthy() && !stream.is_cancelled(),
            StreamWrapper::Grpc { stream } => stream.is_healthy() && !stream.is_cancelled(),
            StreamWrapper::Ws { stream } => stream.is_healthy() && !stream.is_cancelled(),
            StreamWrapper::_Phantom(_) => false,
        }
    }

    pub fn http_state(&self) -> Option<&StateCell<E::HttpState>> {
        match self {
            StreamWrapper::Http { stream } => Some(stream.state()),
            _ => None,
        }
    }
    pub fn grpc_state(&self) -> Option<&StateCell<E::GrpcState>> {
        match self {
            StreamWrapper::Grpc { stream } => Some(stream.state()),
            _ => None,
        }
    }
    pub fn ws_state(&self) -> Option<&StateCell<E::WsState>> {
        match self {
            StreamWrapper::Ws { stream } => Some(stream.state()),
            _ => None,
        }
    }
}