titanrt 0.7.0

Typed reactive runtime for real-time systems
Documentation
use std::str::FromStr;

use bytes::Bytes;
use tokio_tungstenite::tungstenite::http::{HeaderName, HeaderValue};
use tracing::warn;
use url::Url;

use crate::connector::features::shared::actions::StreamActionBuilder;

use super::message::WebSocketMessage;

#[derive(Debug, Clone)]
pub enum WebSocketCommand {
    Connect(WebSocketConnect),
    Send(WebSocketMessage),
    Close(WebSocketClose),
    Ping(Bytes),
    Pong(Bytes),
    Disconnect,
}

impl WebSocketCommand {
    pub fn connect() -> WebSocketConnectBuilder {
        WebSocketConnectBuilder::default()
    }

    pub fn send_text<T: Into<String>>(message: T) -> Self {
        Self::Send(WebSocketMessage::text(message))
    }

    pub fn send_binary<B: Into<Bytes>>(message: B) -> Self {
        Self::Send(WebSocketMessage::binary(message))
    }

    pub fn close() -> WebSocketCloseBuilder {
        WebSocketCloseBuilder::default()
    }

    pub fn ping<B: Into<Bytes>>(payload: B) -> Self {
        Self::Ping(payload.into())
    }

    pub fn pong<B: Into<Bytes>>(payload: B) -> Self {
        Self::Pong(payload.into())
    }

    pub fn disconnect() -> Self {
        Self::Disconnect
    }

    pub fn to_builder(self) -> StreamActionBuilder<WebSocketCommand> {
        StreamActionBuilder::new(Some(self))
    }
}

#[derive(Debug, Clone, Default)]
pub struct WebSocketConnectBuilder {
    override_url: Option<Url>,
    protocols: Vec<String>,
    headers: Vec<(HeaderName, HeaderValue)>,
    initial_messages: Vec<WebSocketMessage>,
}

impl WebSocketConnectBuilder {
    pub fn url(mut self, url: Url) -> Self {
        self.override_url = Some(url);
        self
    }

    pub fn url_str(mut self, value: &str) -> anyhow::Result<Self> {
        let url = Url::parse(value)?;
        self.override_url = Some(url);
        Ok(self)
    }

    pub fn protocol<S: Into<String>>(mut self, protocol: S) -> Self {
        self.protocols.push(protocol.into());
        self
    }

    pub fn protocols<I, S>(mut self, protocols: I) -> Self
    where
        I: IntoIterator<Item = S>,
        S: Into<String>,
    {
        self.protocols.extend(protocols.into_iter().map(Into::into));
        self
    }

    pub fn header(mut self, name: &str, value: &str) -> Self {
        match (HeaderName::from_str(name), HeaderValue::from_str(value)) {
            (Ok(name), Ok(value)) => self.headers.push((name, value)),
            _ => warn!(%name, %value, "websocket connect header is invalid"),
        }
        self
    }

    pub fn initial_message(mut self, message: WebSocketMessage) -> Self {
        self.initial_messages.push(message);
        self
    }

    pub fn initial_text<T: Into<String>>(self, text: T) -> Self {
        self.initial_message(WebSocketMessage::text(text))
    }

    pub fn initial_binary<B: Into<Bytes>>(self, bytes: B) -> Self {
        self.initial_message(WebSocketMessage::binary(bytes))
    }

    pub fn build(self) -> WebSocketCommand {
        WebSocketCommand::Connect(WebSocketConnect {
            override_url: self.override_url,
            extra_protocols: self.protocols,
            extra_headers: self.headers,
            initial_messages: self.initial_messages,
        })
    }

    pub fn to_builder(self) -> StreamActionBuilder<WebSocketCommand> {
        self.build().to_builder()
    }
}

#[derive(Debug, Clone)]
pub struct WebSocketConnect {
    pub(crate) override_url: Option<Url>,
    pub(crate) extra_protocols: Vec<String>,
    pub(crate) extra_headers: Vec<(HeaderName, HeaderValue)>,
    pub(crate) initial_messages: Vec<WebSocketMessage>,
}

#[derive(Debug, Clone, Default)]
pub struct WebSocketCloseBuilder {
    code: Option<u16>,
    reason: Option<String>,
}

impl WebSocketCloseBuilder {
    pub fn code(mut self, code: u16) -> Self {
        self.code = Some(code);
        self
    }

    pub fn reason<S: Into<String>>(mut self, reason: S) -> Self {
        self.reason = Some(reason.into());
        self
    }

    pub fn build(self) -> WebSocketCommand {
        WebSocketCommand::Close(WebSocketClose {
            code: self.code,
            reason: self.reason,
        })
    }

    pub fn to_builder(self) -> StreamActionBuilder<WebSocketCommand> {
        self.build().to_builder()
    }
}

#[derive(Debug, Clone)]
pub struct WebSocketClose {
    pub(crate) code: Option<u16>,
    pub(crate) reason: Option<String>,
}