titanrt 0.7.0

Typed reactive runtime for real-time systems
Documentation
use std::collections::HashMap;

use crate::connector::features::composite::stream::event::{StreamEventContext, StreamEventParsed};
use crate::connector::features::grpc::stream::GrpcDescriptor;
use crate::connector::features::http::stream::descriptor::HttpDescriptor;
use crate::connector::features::shared::rate_limiter::RateLimitConfig;
use crate::connector::features::websocket::stream::WebSocketStreamDescriptor;
use serde::Deserialize;
use serde_json::Value;

use crate::utils::{CorePickPolicy, SharedStr};

#[derive(Debug, Clone, Deserialize)]
pub struct StreamSpec {
    pub name: SharedStr,
    pub kind: StreamKind,
    #[serde(default)]
    pub ignore_init: Option<bool>,
    #[serde(default = "default_hook_calls_at_once")]
    pub max_hook_calls_at_once: usize,
    #[serde(default = "default_wait_async_task")]
    pub wait_async_tasks_us: u64,
    #[serde(default)]
    pub max_pending_actions: Option<usize>,
    #[serde(default)]
    pub max_pending_events: Option<usize>,
    #[serde(default)]
    pub core_pick_policy: Option<CorePickPolicy>,
    #[serde(default)]
    pub rate_limits: Vec<RateLimitConfig>,
    #[serde(default)]
    pub max_decoding_message_size: Option<usize>,
    #[serde(default)]
    pub max_encoding_message_size: Option<usize>,
    #[serde(default)]
    pub metadata: HashMap<String, Value>,
}

fn default_wait_async_task() -> u64 {
    1000
}

fn default_hook_calls_at_once() -> usize {
    128
}

#[derive(Debug, Clone, Deserialize, Copy)]
#[serde(rename_all = "snake_case")]
pub enum StreamKind {
    Http,
    Grpc,
    Ws,
}

impl AsRef<str> for StreamKind {
    fn as_ref(&self) -> &str {
        match self {
            StreamKind::Http => "http",
            StreamKind::Grpc => "grpc",
            StreamKind::Ws => "ws",
        }
    }
}

impl StreamSpec {
    pub fn get_context<E: StreamEventParsed>(&self) -> StreamEventContext<E> {
        let mut ctx = StreamEventContext::new(self.name.as_str(), self.kind);
        ctx.set_metadata(self.metadata.clone());
        ctx
    }

    pub fn maybe_http<E: StreamEventParsed>(
        &self,
        payload: &StreamEventContext<E>,
    ) -> Option<HttpDescriptor<StreamEventContext<E>>> {
        match self.kind {
            StreamKind::Http => Some(HttpDescriptor::new(
                Some(self.max_hook_calls_at_once),
                Some(self.wait_async_tasks_us),
                self.max_pending_actions,
                self.max_pending_events,
                self.core_pick_policy,
                Some(self.rate_limits.clone()),
                Some(payload.clone()),
            )),
            _ => None,
        }
    }

    pub fn maybe_grpc<E: StreamEventParsed>(
        &self,
        payload: &StreamEventContext<E>,
    ) -> Option<GrpcDescriptor<StreamEventContext<E>>> {
        match self.kind {
            StreamKind::Grpc => Some(GrpcDescriptor::new(
                Some(self.max_hook_calls_at_once),
                Some(self.wait_async_tasks_us),
                self.max_pending_actions,
                self.max_pending_events,
                self.core_pick_policy,
                Some(self.rate_limits.clone()),
                self.max_decoding_message_size,
                self.max_encoding_message_size,
                None,
                Some(payload.clone()),
            )),
            _ => None,
        }
    }

    pub fn maybe_ws<E: StreamEventParsed>(
        &self,
        payload: &StreamEventContext<E>,
    ) -> Option<WebSocketStreamDescriptor<StreamEventContext<E>>> {
        match self.kind {
            StreamKind::Ws => Some(WebSocketStreamDescriptor::new(
                Some(self.max_hook_calls_at_once),
                Some(self.wait_async_tasks_us),
                self.max_pending_actions,
                self.max_pending_events,
                self.core_pick_policy,
                Some(self.rate_limits.clone()),
                Some(payload.clone()),
            )),
            _ => None,
        }
    }
}