titanrt 0.7.0

Typed reactive runtime for real-time systems
Documentation
use std::{net::IpAddr, str::FromStr, sync::Arc, time::Duration};

use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use tokio::runtime::Runtime;
use tokio_tungstenite::tungstenite::http::{HeaderName, HeaderValue};
use url::Url;

use crate::connector::features::shared::clients_map::{ClientInitializer, SpecificClient};

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct WebSocketClientSpec {
    pub endpoint: String,
    #[serde(default)]
    pub protocols: Vec<String>,
    #[serde(default)]
    pub headers: Vec<(String, String)>,
    #[serde(default)]
    pub connect_timeout_ms: Option<u64>,
    #[serde(default)]
    pub max_message_size: Option<usize>,
    #[serde(default)]
    pub tcp_nodelay: Option<bool>,
}

#[derive(Clone)]
pub struct WebSocketClient {
    pub url: Url,
    pub protocols: Vec<String>,
    pub headers: Vec<(HeaderName, HeaderValue)>,
    pub connect_timeout: Option<Duration>,
    pub max_message_size: Option<usize>,
    pub tcp_nodelay: bool,
    pub local_ip: Option<IpAddr>,
}

impl ClientInitializer<WebSocketClientSpec> for WebSocketClient {
    fn init(cfg: &SpecificClient<WebSocketClientSpec>, _rt: Option<Arc<Runtime>>) -> Result<Self> {
        let url = Url::parse(&cfg.spec.endpoint)
            .with_context(|| format!("invalid websocket endpoint: {}", cfg.spec.endpoint))?;

        let mut headers = Vec::with_capacity(cfg.spec.headers.len());
        for (name, value) in &cfg.spec.headers {
            let header_name = HeaderName::from_str(name)
                .with_context(|| format!("invalid header name: {name}"))?;
            let header_value = HeaderValue::from_str(value)
                .with_context(|| format!("invalid header value for {name}"))?;
            headers.push((header_name, header_value));
        }

        let local_ip = match cfg.ip.as_deref() {
            Some(raw) => Some(
                IpAddr::from_str(raw)
                    .with_context(|| format!("invalid local ip address: {raw}"))?,
            ),
            None => None,
        };

        Ok(Self {
            url,
            protocols: cfg.spec.protocols.clone(),
            headers,
            connect_timeout: cfg
                .spec
                .connect_timeout_ms
                .map(|ms| Duration::from_millis(ms)),
            max_message_size: cfg.spec.max_message_size,
            tcp_nodelay: cfg.spec.tcp_nodelay.unwrap_or(true),
            local_ip,
        })
    }
}