mmflow 0.1.1

Rust SDK for the mmflow market-data API — REST candles + a WebSocket client.
Documentation
//! Rust SDK for the mmflow market-data API: a blocking REST client (OHLCV
//! candles) and a WebSocket client implementing the documented JSON protocol
//! (auth/subscribe -> typed data frames).

use std::{error::Error, thread, time::Duration};
use tungstenite::Message;

pub const DEFAULT_REST_BASE: &str = "https://mmflow.ai";
pub const DEFAULT_WS_URL: &str = "wss://ws.mmflow.ai/v1";

/// One normalized OHLCV bar (time in epoch ms).
#[derive(Debug, Clone, PartialEq)]
pub struct Candle {
    pub time: f64,
    pub open: f64,
    pub high: f64,
    pub low: f64,
    pub close: f64,
    pub volume: f64,
}

/// Blocking REST client.
pub struct Client {
    base_url: String,
}

impl Client {
    pub fn new(base_url: impl Into<String>) -> Self {
        let b = base_url.into();
        Self {
            base_url: if b.is_empty() { DEFAULT_REST_BASE.to_string() } else { b },
        }
    }

    /// Fetch OHLCV history for a coin, normalized to `Vec<Candle>`.
    pub fn candles(
        &self,
        coin: &str,
        interval: &str,
        hours: u32,
    ) -> Result<Vec<Candle>, Box<dyn Error>> {
        let url = format!(
            "{}/api/hl/candles?coin={}&interval={}&hours={}",
            self.base_url, coin, interval, hours
        );
        let body: serde_json::Value = ureq::get(&url).call()?.into_json()?;
        let rows = body
            .get("candles")
            .and_then(|v| v.as_array())
            .cloned()
            .unwrap_or_default();

        let parse = |row: &serde_json::Value, key: &str| -> f64 {
            row.get(key)
                .and_then(|v| v.as_str())
                .and_then(|s| s.parse::<f64>().ok())
                .unwrap_or(f64::NAN)
        };

        let mut out = Vec::with_capacity(rows.len());
        for row in &rows {
            let time = row.get("t").and_then(|v| v.as_f64()).unwrap_or(f64::NAN);
            if !time.is_finite() {
                continue;
            }
            out.push(Candle {
                time,
                open: parse(row, "o"),
                high: parse(row, "h"),
                low: parse(row, "l"),
                close: parse(row, "c"),
                volume: parse(row, "v"),
            });
        }
        Ok(out)
    }
}

/// Metadata accompanying each data frame.
#[derive(Debug, Clone)]
pub struct MessageMeta {
    pub channel: String,
    pub symbol: String,
    pub seq: Option<i64>,
    pub ts: Option<i64>,
}

/// WebSocket client options.
pub struct SocketOptions {
    pub url: String,
    pub api_key: Option<String>,
    pub auto_reconnect: bool,
    pub reconnect_initial_ms: u64,
    pub reconnect_max_ms: u64,
}

impl Default for SocketOptions {
    fn default() -> Self {
        Self {
            url: DEFAULT_WS_URL.to_string(),
            api_key: None,
            auto_reconnect: true,
            reconnect_initial_ms: 500,
            reconnect_max_ms: 30_000,
        }
    }
}

/// Blocking WebSocket client. Register subscriptions, then `run` the receive
/// loop; `run` authenticates, sends every subscription, invokes `on_data` for
/// each `data` frame, and replays subscriptions after reconnect.
pub struct Socket {
    opts: SocketOptions,
    subs: Vec<(String, String)>,
    next_id: i64,
}

impl Socket {
    pub fn new(opts: SocketOptions) -> Self {
        Self { opts, subs: Vec::new(), next_id: 0 }
    }

    /// Queue a (channel, symbol) subscription. Call before `run`.
    pub fn subscribe(&mut self, channel: &str, symbol: &str) {
        self.subs.push((channel.to_string(), symbol.to_string()));
    }

    pub fn run<F>(&mut self, mut on_data: F) -> Result<(), Box<dyn Error>>
    where
        F: FnMut(serde_json::Value, MessageMeta),
    {
        let mut backoff = Duration::from_millis(self.opts.reconnect_initial_ms);
        let max_backoff = Duration::from_millis(self.opts.reconnect_max_ms);

        loop {
            match tungstenite::connect(&self.opts.url) {
                Ok((mut socket, _resp)) => {
                    backoff = Duration::from_millis(self.opts.reconnect_initial_ms);

                    let mut setup_failed = false;
                    let mut authed = self.opts.api_key.is_none();
                    if let Some(key) = &self.opts.api_key {
                        let frame =
                            serde_json::json!({ "op": "auth", "apiKey": key }).to_string();
                        if let Err(err) = socket.send(Message::Text(frame.into())) {
                            if !self.opts.auto_reconnect {
                                return Err(Box::new(err));
                            }
                            setup_failed = true;
                        }
                    }

                    if !setup_failed && !authed {
                        loop {
                            match socket.read() {
                                Ok(Message::Text(text)) => {
                                    let text = text.to_string();
                                    if let Ok(v) = serde_json::from_str::<serde_json::Value>(&text) {
                                        let kind = v.get("type").and_then(|t| t.as_str());
                                        let op = v.get("op").and_then(|o| o.as_str());
                                        if kind == Some("ack") && op == Some("auth") {
                                            authed = true;
                                            break;
                                        }
                                        if kind == Some("error") {
                                            let code = v
                                                .get("code")
                                                .and_then(|x| x.as_str())
                                                .unwrap_or("error");
                                            let message = v
                                                .get("message")
                                                .and_then(|x| x.as_str())
                                                .unwrap_or("websocket auth failed");
                                            setup_failed = true;
                                            if !self.opts.auto_reconnect {
                                                let err = std::io::Error::new(
                                                    std::io::ErrorKind::PermissionDenied,
                                                    format!("mmflow ws auth failed: {code}: {message}"),
                                                );
                                                return Err(Box::new(err));
                                            }
                                            break;
                                        }
                                    }
                                }
                                Ok(Message::Close(_)) => {
                                    if !self.opts.auto_reconnect {
                                        return Ok(());
                                    }
                                    setup_failed = true;
                                    break;
                                }
                                Ok(_) => {}
                                Err(err) => {
                                    if !self.opts.auto_reconnect {
                                        return Err(Box::new(err));
                                    }
                                    setup_failed = true;
                                    break;
                                }
                            }
                        }
                    }

                    if !setup_failed && authed {
                        for (channel, symbol) in &self.subs {
                            self.next_id += 1;
                            let frame = serde_json::json!({
                                "op": "subscribe",
                                "channel": channel,
                                "symbol": symbol,
                                "id": self.next_id,
                            })
                            .to_string();
                            if let Err(err) = socket.send(Message::Text(frame.into())) {
                                if !self.opts.auto_reconnect {
                                    return Err(Box::new(err));
                                }
                                setup_failed = true;
                                break;
                            }
                        }
                    }

                    if !setup_failed {
                        loop {
                            match socket.read() {
                                Ok(Message::Text(text)) => {
                                    let text = text.to_string();
                                    if let Ok(v) = serde_json::from_str::<serde_json::Value>(&text) {
                                        if v.get("type").and_then(|t| t.as_str()) == Some("data") {
                                            let meta = MessageMeta {
                                                channel: v
                                                    .get("channel")
                                                    .and_then(|x| x.as_str())
                                                    .unwrap_or("")
                                                    .to_string(),
                                                symbol: v
                                                    .get("symbol")
                                                    .and_then(|x| x.as_str())
                                                    .unwrap_or("")
                                                    .to_string(),
                                                seq: v.get("seq").and_then(|x| x.as_i64()),
                                                ts: v.get("ts").and_then(|x| x.as_i64()),
                                            };
                                            let payload = v
                                                .get("payload")
                                                .cloned()
                                                .unwrap_or(serde_json::Value::Null);
                                            on_data(payload, meta);
                                        }
                                    }
                                }
                                Ok(Message::Close(_)) => {
                                    if !self.opts.auto_reconnect {
                                        return Ok(());
                                    }
                                    break;
                                }
                                Ok(_) => {}
                                Err(err) => {
                                    if !self.opts.auto_reconnect {
                                        return Err(Box::new(err));
                                    }
                                    break;
                                }
                            }
                        }
                    }
                }
                Err(err) => {
                    if !self.opts.auto_reconnect {
                        return Err(Box::new(err));
                    }
                }
            }

            thread::sleep(backoff);
            backoff = std::cmp::min(backoff.saturating_mul(2), max_backoff);
        }
    }
}