Skip to main content

live_binance/
spec.rs

1use std::borrow::Cow;
2
3use minarrow::Field;
4
5/// Field metadata key naming the JSON key the decoder uses to look
6/// up this column on the upstream record. When absent, the decoder
7/// falls back to the field's own name.
8pub const FIELD_META_SOURCE_KEY: &str = "json_source_key";
9
10/// Field metadata key opting a column in to parsing its value from
11/// a JSON string into the column's Arrow type. Recognised values:
12/// `"number"`, `"bool"`, `"datetime"`.
13pub const FIELD_META_FROM_STRING: &str = "json_from_string";
14
15/// Per-field opt-in for parsing JSON strings into non-string column
16/// types. Mirrors the value space of [`FIELD_META_FROM_STRING`].
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
18#[non_exhaustive]
19pub enum FromStringHint {
20    #[default]
21    None,
22    Number,
23    Bool,
24    Datetime,
25}
26
27impl FromStringHint {
28    pub fn from_metadata(value: &str) -> Self {
29        match value {
30            "number" => Self::Number,
31            "bool" => Self::Bool,
32            "datetime" => Self::Datetime,
33            _ => Self::None,
34        }
35    }
36}
37
38/// How a publisher batches decoded rows before forwarding them
39/// downstream.
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41#[non_exhaustive]
42pub enum EmitMode {
43    /// Emit each decoded row immediately.
44    Immediate,
45    /// Emit a batch every `n` rows.
46    EveryNRows(u32),
47    /// Emit a batch every `micros` microseconds.
48    EveryMicros(u64),
49    /// Emit when either bound is hit.
50    Adaptive { max_rows: u32, max_micros: u64 },
51}
52
53impl Default for EmitMode {
54    fn default() -> Self {
55        Self::Immediate
56    }
57}
58
59/// Connector source specification for a WebSocket-JSON live feed.
60#[derive(Debug, Clone)]
61pub struct WsJsonSourceSpec {
62    pub name: Cow<'static, str>,
63    pub url: Cow<'static, str>,
64    pub subscribe_message: Option<Cow<'static, [u8]>>,
65    pub schema: Vec<Field>,
66    pub event_time_key: Cow<'static, str>,
67    pub emit_mode: EmitMode,
68}
69
70impl WsJsonSourceSpec {
71    pub fn new(
72        name: impl Into<Cow<'static, str>>,
73        url: impl Into<Cow<'static, str>>,
74        event_time_key: impl Into<Cow<'static, str>>,
75    ) -> Self {
76        Self {
77            name: name.into(),
78            url: url.into(),
79            subscribe_message: None,
80            schema: Vec::new(),
81            event_time_key: event_time_key.into(),
82            emit_mode: EmitMode::default(),
83        }
84    }
85
86    pub fn schema(mut self, schema: Vec<Field>) -> Self {
87        self.schema = schema;
88        self
89    }
90
91    pub fn subscribe_message(mut self, msg: impl Into<Cow<'static, [u8]>>) -> Self {
92        self.subscribe_message = Some(msg.into());
93        self
94    }
95
96    pub fn emit_mode(mut self, mode: EmitMode) -> Self {
97        self.emit_mode = mode;
98        self
99    }
100
101    pub fn validate(&self) -> std::result::Result<(), String> {
102        if !self.schema.iter().any(|f| f.name == self.event_time_key.as_ref()) {
103            return Err(format!(
104                "event_time_key '{}' not found in schema of feed '{}'",
105                self.event_time_key, self.name,
106            ));
107        }
108        Ok(())
109    }
110}