use std::{error::Error, thread, time::Duration};
use tungstenite::Message;
pub const DEFAULT_REST_BASE: &str = "https://mmflow.ai";
pub const DEFAULT_WS_URL: &str = "wss://mmflowai-production.up.railway.app/v1";
#[derive(Debug, Clone, PartialEq)]
pub struct Candle {
pub time: f64,
pub open: f64,
pub high: f64,
pub low: f64,
pub close: f64,
pub volume: f64,
}
pub struct Client {
base_url: String,
}
impl Client {
pub fn new(base_url: impl Into<String>) -> Self {
let b = base_url.into();
Self {
base_url: if b.is_empty() { DEFAULT_REST_BASE.to_string() } else { b },
}
}
pub fn candles(
&self,
coin: &str,
interval: &str,
hours: u32,
) -> Result<Vec<Candle>, Box<dyn Error>> {
let url = format!(
"{}/api/hl/candles?coin={}&interval={}&hours={}",
self.base_url, coin, interval, hours
);
let body: serde_json::Value = ureq::get(&url).call()?.into_json()?;
let rows = body
.get("candles")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let parse = |row: &serde_json::Value, key: &str| -> f64 {
row.get(key)
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(f64::NAN)
};
let mut out = Vec::with_capacity(rows.len());
for row in &rows {
let time = row.get("t").and_then(|v| v.as_f64()).unwrap_or(f64::NAN);
if !time.is_finite() {
continue;
}
out.push(Candle {
time,
open: parse(row, "o"),
high: parse(row, "h"),
low: parse(row, "l"),
close: parse(row, "c"),
volume: parse(row, "v"),
});
}
Ok(out)
}
}
#[derive(Debug, Clone)]
pub struct MessageMeta {
pub channel: String,
pub symbol: String,
pub seq: Option<i64>,
pub ts: Option<i64>,
}
pub struct SocketOptions {
pub url: String,
pub api_key: Option<String>,
pub auto_reconnect: bool,
pub reconnect_initial_ms: u64,
pub reconnect_max_ms: u64,
}
impl Default for SocketOptions {
fn default() -> Self {
Self {
url: DEFAULT_WS_URL.to_string(),
api_key: None,
auto_reconnect: true,
reconnect_initial_ms: 500,
reconnect_max_ms: 30_000,
}
}
}
pub struct Socket {
opts: SocketOptions,
subs: Vec<(String, String)>,
next_id: i64,
}
impl Socket {
pub fn new(opts: SocketOptions) -> Self {
Self { opts, subs: Vec::new(), next_id: 0 }
}
pub fn subscribe(&mut self, channel: &str, symbol: &str) {
self.subs.push((channel.to_string(), symbol.to_string()));
}
pub fn run<F>(&mut self, mut on_data: F) -> Result<(), Box<dyn Error>>
where
F: FnMut(serde_json::Value, MessageMeta),
{
let mut backoff = Duration::from_millis(self.opts.reconnect_initial_ms);
let max_backoff = Duration::from_millis(self.opts.reconnect_max_ms);
loop {
match tungstenite::connect(&self.opts.url) {
Ok((mut socket, _resp)) => {
backoff = Duration::from_millis(self.opts.reconnect_initial_ms);
let mut setup_failed = false;
let mut authed = self.opts.api_key.is_none();
if let Some(key) = &self.opts.api_key {
let frame =
serde_json::json!({ "op": "auth", "apiKey": key }).to_string();
if let Err(err) = socket.send(Message::Text(frame.into())) {
if !self.opts.auto_reconnect {
return Err(Box::new(err));
}
setup_failed = true;
}
}
if !setup_failed && !authed {
loop {
match socket.read() {
Ok(Message::Text(text)) => {
let text = text.to_string();
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&text) {
let kind = v.get("type").and_then(|t| t.as_str());
let op = v.get("op").and_then(|o| o.as_str());
if kind == Some("ack") && op == Some("auth") {
authed = true;
break;
}
if kind == Some("error") {
let code = v
.get("code")
.and_then(|x| x.as_str())
.unwrap_or("error");
let message = v
.get("message")
.and_then(|x| x.as_str())
.unwrap_or("websocket auth failed");
setup_failed = true;
if !self.opts.auto_reconnect {
let err = std::io::Error::new(
std::io::ErrorKind::PermissionDenied,
format!("mmflow ws auth failed: {code}: {message}"),
);
return Err(Box::new(err));
}
break;
}
}
}
Ok(Message::Close(_)) => {
if !self.opts.auto_reconnect {
return Ok(());
}
setup_failed = true;
break;
}
Ok(_) => {}
Err(err) => {
if !self.opts.auto_reconnect {
return Err(Box::new(err));
}
setup_failed = true;
break;
}
}
}
}
if !setup_failed && authed {
for (channel, symbol) in &self.subs {
self.next_id += 1;
let frame = serde_json::json!({
"op": "subscribe",
"channel": channel,
"symbol": symbol,
"id": self.next_id,
})
.to_string();
if let Err(err) = socket.send(Message::Text(frame.into())) {
if !self.opts.auto_reconnect {
return Err(Box::new(err));
}
setup_failed = true;
break;
}
}
}
if !setup_failed {
loop {
match socket.read() {
Ok(Message::Text(text)) => {
let text = text.to_string();
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&text) {
if v.get("type").and_then(|t| t.as_str()) == Some("data") {
let meta = MessageMeta {
channel: v
.get("channel")
.and_then(|x| x.as_str())
.unwrap_or("")
.to_string(),
symbol: v
.get("symbol")
.and_then(|x| x.as_str())
.unwrap_or("")
.to_string(),
seq: v.get("seq").and_then(|x| x.as_i64()),
ts: v.get("ts").and_then(|x| x.as_i64()),
};
let payload = v
.get("payload")
.cloned()
.unwrap_or(serde_json::Value::Null);
on_data(payload, meta);
}
}
}
Ok(Message::Close(_)) => {
if !self.opts.auto_reconnect {
return Ok(());
}
break;
}
Ok(_) => {}
Err(err) => {
if !self.opts.auto_reconnect {
return Err(Box::new(err));
}
break;
}
}
}
}
}
Err(err) => {
if !self.opts.auto_reconnect {
return Err(Box::new(err));
}
}
}
thread::sleep(backoff);
backoff = std::cmp::min(backoff.saturating_mul(2), max_backoff);
}
}
}