titanrt 0.7.0

Typed reactive runtime for real-time systems
Documentation
use anyhow::anyhow;
use bytes::Bytes;
use tonic::{Code, Status, metadata::MetadataMap};

use crate::connector::features::shared::events::StreamEventInner;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GrpcEventKind {
    StreamConnected,
    StreamDisconnected,
    StreamItem,
    UnaryResponse,
    BadCommand,
}

#[derive(Debug, Clone)]
pub struct GrpcEvent {
    kind: GrpcEventKind,
    code: Code,
    err_msg: Option<String>,
    metadata: MetadataMap,
    body: Option<Bytes>,
}

impl GrpcEvent {
    pub fn from_ok() -> Self {
        Self {
            kind: GrpcEventKind::StreamConnected,
            code: Code::Ok,
            err_msg: None,
            metadata: MetadataMap::new(),
            body: None,
        }
    }

    pub fn from_ok_unary(resp: tonic::Response<Bytes>) -> Self {
        let (metadata, body, _trailing) = resp.into_parts();
        Self {
            kind: GrpcEventKind::UnaryResponse,
            code: Code::Ok,
            err_msg: None,
            metadata,
            body: Some(body),
        }
    }

    pub fn from_ok_stream_item(body: Bytes) -> Self {
        Self {
            kind: GrpcEventKind::StreamItem,
            code: Code::Ok,
            err_msg: None,
            metadata: MetadataMap::new(),
            body: Some(body),
        }
    }

    pub fn from_ok_stream_close(metadata: MetadataMap) -> Self {
        let status = Status::aborted("grpc streaming aborted".to_string());
        Self {
            kind: GrpcEventKind::StreamDisconnected,
            code: status.code(),
            err_msg: Some(status.message().to_string()),
            metadata,
            body: None,
        }
    }

    pub fn from_status(kind: GrpcEventKind, st: Status) -> Self {
        Self {
            kind,
            code: st.code(),
            err_msg: Some(st.message().to_string()),
            metadata: st.metadata().clone(),
            body: None,
        }
    }

    pub fn kind(&self) -> GrpcEventKind {
        self.kind
    }

    pub fn metadata(&self) -> &MetadataMap {
        &self.metadata
    }

    pub fn decode_as<T: prost::Message + Default>(&self) -> anyhow::Result<T> {
        if let Some(ref b) = self.body {
            T::decode(b.as_ref()).map_err(|e| anyhow!("Prost decode error: {e}"))
        } else {
            Err(anyhow!("No body to decode"))
        }
    }
}

impl StreamEventInner for GrpcEvent {
    type Body = Bytes;
    type Err = String;
    type Code = Code;

    fn status(&self) -> Option<&Self::Code> {
        Some(&self.code)
    }

    fn is_ok(&self) -> bool {
        self.code == Code::Ok
    }

    fn error(&self) -> Option<&Self::Err> {
        self.err_msg.as_ref()
    }

    fn body(&self) -> Option<&Self::Body> {
        self.body.as_ref()
    }

    fn into_body(self) -> Option<Self::Body> {
        self.body
    }
}