live-binance 0.1.0

Binance public market data WebSocket streams. Plugs into the live-feed publisher SDK; data flow goes via lightstream's WebSocket-JSON pipeline.
Documentation
use std::borrow::Cow;

use minarrow::Field;

/// Field metadata key naming the JSON key the decoder uses to look
/// up this column on the upstream record. When absent, the decoder
/// falls back to the field's own name.
pub const FIELD_META_SOURCE_KEY: &str = "json_source_key";

/// Field metadata key opting a column in to parsing its value from
/// a JSON string into the column's Arrow type. Recognised values:
/// `"number"`, `"bool"`, `"datetime"`.
pub const FIELD_META_FROM_STRING: &str = "json_from_string";

/// Per-field opt-in for parsing JSON strings into non-string column
/// types. Mirrors the value space of [`FIELD_META_FROM_STRING`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[non_exhaustive]
pub enum FromStringHint {
    #[default]
    None,
    Number,
    Bool,
    Datetime,
}

impl FromStringHint {
    pub fn from_metadata(value: &str) -> Self {
        match value {
            "number" => Self::Number,
            "bool" => Self::Bool,
            "datetime" => Self::Datetime,
            _ => Self::None,
        }
    }
}

/// How a publisher batches decoded rows before forwarding them
/// downstream.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum EmitMode {
    /// Emit each decoded row immediately.
    Immediate,
    /// Emit a batch every `n` rows.
    EveryNRows(u32),
    /// Emit a batch every `micros` microseconds.
    EveryMicros(u64),
    /// Emit when either bound is hit.
    Adaptive { max_rows: u32, max_micros: u64 },
}

impl Default for EmitMode {
    fn default() -> Self {
        Self::Immediate
    }
}

/// Connector source specification for a WebSocket-JSON live feed.
#[derive(Debug, Clone)]
pub struct WsJsonSourceSpec {
    pub name: Cow<'static, str>,
    pub url: Cow<'static, str>,
    pub subscribe_message: Option<Cow<'static, [u8]>>,
    pub schema: Vec<Field>,
    pub event_time_key: Cow<'static, str>,
    pub emit_mode: EmitMode,
}

impl WsJsonSourceSpec {
    pub fn new(
        name: impl Into<Cow<'static, str>>,
        url: impl Into<Cow<'static, str>>,
        event_time_key: impl Into<Cow<'static, str>>,
    ) -> Self {
        Self {
            name: name.into(),
            url: url.into(),
            subscribe_message: None,
            schema: Vec::new(),
            event_time_key: event_time_key.into(),
            emit_mode: EmitMode::default(),
        }
    }

    pub fn schema(mut self, schema: Vec<Field>) -> Self {
        self.schema = schema;
        self
    }

    pub fn subscribe_message(mut self, msg: impl Into<Cow<'static, [u8]>>) -> Self {
        self.subscribe_message = Some(msg.into());
        self
    }

    pub fn emit_mode(mut self, mode: EmitMode) -> Self {
        self.emit_mode = mode;
        self
    }

    pub fn validate(&self) -> std::result::Result<(), String> {
        if !self.schema.iter().any(|f| f.name == self.event_time_key.as_ref()) {
            return Err(format!(
                "event_time_key '{}' not found in schema of feed '{}'",
                self.event_time_key, self.name,
            ));
        }
        Ok(())
    }
}