use std::collections::HashSet;
use std::pin::Pin;
use std::sync::{Arc, Mutex as StdMutex};
use std::time::Duration;
use async_trait::async_trait;
use futures_util::stream::SplitSink;
use futures_util::{SinkExt, Stream, StreamExt};
use serde_json::{json, Value};
use tokio::sync::{broadcast, Mutex};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use crate::core::types::{
Kline, PublicTrade, StreamType, Ticker, TradeSide, WebSocketError, WebSocketResult,
};
use crate::core::{AccountType, ConnectionStatus, StreamEvent, SubscriptionRequest};
use crate::core::traits::WebSocketConnector;
use super::auth::CryptoCompareAuth;
const WS_BASE_URL: &str = "wss://streamer.cryptocompare.com/v2";
const DEFAULT_EXCHANGE: &str = "CCCAGG";
type WsStream = WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>;
type WsWriter = SplitSink<WsStream, Message>;
pub struct CryptoCompareWebSocket {
auth: CryptoCompareAuth,
status: Arc<Mutex<ConnectionStatus>>,
subscriptions: Arc<Mutex<HashSet<SubscriptionRequest>>>,
event_tx: Arc<StdMutex<Option<broadcast::Sender<WebSocketResult<StreamEvent>>>>>,
ws_writer: Arc<Mutex<Option<WsWriter>>>,
use_streamer_format: bool,
}
impl Default for CryptoCompareWebSocket {
fn default() -> Self {
Self::with_auth(CryptoCompareAuth::public())
}
}
impl CryptoCompareWebSocket {
pub fn new() -> Self {
Self::default()
}
pub fn with_auth(auth: CryptoCompareAuth) -> Self {
let use_streamer_format = auth.api_key.is_none();
Self {
auth,
status: Arc::new(Mutex::new(ConnectionStatus::Disconnected)),
subscriptions: Arc::new(Mutex::new(HashSet::new())),
event_tx: Arc::new(StdMutex::new(None)),
ws_writer: Arc::new(Mutex::new(None)),
use_streamer_format,
}
}
fn ws_url(&self) -> String {
match &self.auth.api_key {
Some(key) => format!("{}?api_key={}", WS_BASE_URL, key),
None => format!("{}?format=streamer", WS_BASE_URL),
}
}
fn build_sub_string(request: &SubscriptionRequest) -> Result<String, WebSocketError> {
let fsym = request.symbol.base.to_uppercase();
let tsym = request.symbol.quote.to_uppercase();
match &request.stream_type {
StreamType::Ticker => {
Ok(format!("5~{}~{}~{}", DEFAULT_EXCHANGE, fsym, tsym))
}
StreamType::Trade => {
Ok(format!("0~{}~{}~{}", DEFAULT_EXCHANGE, fsym, tsym))
}
StreamType::Kline { interval } => {
Ok(format!("17~{}~{}~{}~{}", DEFAULT_EXCHANGE, fsym, tsym, interval))
}
StreamType::Orderbook | StreamType::OrderbookDelta => {
Err(WebSocketError::Subscription(
"CryptoCompare orderbook WebSocket requires paid tier".to_string(),
))
}
other => Err(WebSocketError::Subscription(format!(
"Stream type {:?} not supported for CryptoCompare WebSocket",
other
))),
}
}
async fn send_action(
ws_writer: &Arc<Mutex<Option<WsWriter>>>,
action: &str,
subs: Vec<String>,
) -> WebSocketResult<()> {
let msg = json!({
"action": action,
"subs": subs,
});
let json_str = msg.to_string();
let mut writer_guard = ws_writer.lock().await;
let writer = writer_guard
.as_mut()
.ok_or(WebSocketError::NotConnected)?;
writer
.send(Message::Text(json_str))
.await
.map_err(|e| WebSocketError::SendError(format!("Failed to send message: {}", e)))?;
Ok(())
}
fn start_message_handler(
mut reader: futures_util::stream::SplitStream<WsStream>,
ws_writer: Arc<Mutex<Option<WsWriter>>>,
event_tx: Arc<StdMutex<Option<broadcast::Sender<WebSocketResult<StreamEvent>>>>>,
status: Arc<Mutex<ConnectionStatus>>,
use_streamer_format: bool,
) {
tokio::spawn(async move {
loop {
match reader.next().await {
Some(Ok(Message::Text(text))) => {
Self::handle_message(&text, &event_tx, use_streamer_format);
}
Some(Ok(Message::Ping(data))) => {
let mut writer_guard = ws_writer.lock().await;
if let Some(writer) = writer_guard.as_mut() {
let _ = writer.send(Message::Pong(data)).await;
}
}
Some(Ok(Message::Pong(_))) => {
}
Some(Ok(Message::Binary(_))) => {
}
Some(Ok(Message::Close(_))) => {
*status.lock().await = ConnectionStatus::Disconnected;
break;
}
Some(Ok(Message::Frame(_))) => {
}
Some(Err(_e)) => {
if let Some(tx) = event_tx.lock().unwrap().as_ref() {
let _ = tx.send(Err(WebSocketError::ConnectionError(
"WebSocket read error".to_string(),
)));
}
*status.lock().await = ConnectionStatus::Disconnected;
break;
}
None => {
*status.lock().await = ConnectionStatus::Disconnected;
break;
}
}
}
let _ = event_tx.lock().unwrap().take();
});
}
fn start_ping_task(
ws_writer: Arc<Mutex<Option<WsWriter>>>,
status: Arc<Mutex<ConnectionStatus>>,
) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.tick().await;
loop {
interval.tick().await;
let current_status = *status.lock().await;
if current_status != ConnectionStatus::Connected {
break;
}
let mut writer_guard = ws_writer.lock().await;
if let Some(writer) = writer_guard.as_mut() {
if writer.send(Message::Ping(vec![])).await.is_err() {
break;
}
} else {
break;
}
}
});
}
fn handle_message(
text: &str,
event_tx: &Arc<StdMutex<Option<broadcast::Sender<WebSocketResult<StreamEvent>>>>>,
use_streamer_format: bool,
) {
if use_streamer_format {
Self::handle_streamer_message(text, event_tx);
} else {
Self::handle_json_message(text, event_tx);
}
}
fn handle_streamer_message(
text: &str,
event_tx: &Arc<StdMutex<Option<broadcast::Sender<WebSocketResult<StreamEvent>>>>>,
) {
for msg in text.split('|') {
let parts: Vec<&str> = msg.split('~').collect();
match parts.first().copied() {
Some("0") => {
if let Some(event) = Self::parse_trade_streamer(&parts) {
if let Some(tx) = event_tx.lock().unwrap().as_ref() {
let _ = tx.send(Ok(event));
}
}
}
Some("5") => {
if let Some(event) = Self::parse_ticker_streamer(&parts) {
if let Some(tx) = event_tx.lock().unwrap().as_ref() {
let _ = tx.send(Ok(event));
}
}
}
Some("500") => {
let message = parts.get(1).unwrap_or(&"Unknown error");
if let Some(tx) = event_tx.lock().unwrap().as_ref() {
let _ = tx.send(Err(WebSocketError::ProtocolError(
format!("CryptoCompare error: {}", message),
)));
}
}
Some("999") => {
}
Some("20") => {
}
Some("16") => {
}
Some("3") => {
}
_ => {
}
}
}
}
fn handle_json_message(
text: &str,
event_tx: &Arc<StdMutex<Option<broadcast::Sender<WebSocketResult<StreamEvent>>>>>,
) {
let json: Value = match serde_json::from_str(text) {
Ok(v) => v,
Err(_) => return, };
let msg_type = match json.get("TYPE").and_then(|t| t.as_str()) {
Some(t) => t,
None => {
match json.get("TYPE").and_then(|t| t.as_i64()) {
Some(n) => {
match n {
0 => {
if let Some(event) = Self::parse_trade(&json) {
if let Some(tx) = event_tx.lock().unwrap().as_ref() {
let _ = tx.send(Ok(event));
}
}
}
2 | 5 => {
if let Some(event) = Self::parse_ticker(&json) {
if let Some(tx) = event_tx.lock().unwrap().as_ref() {
let _ = tx.send(Ok(event));
}
}
}
17 => {
if let Some(event) = Self::parse_ohlc(&json) {
if let Some(tx) = event_tx.lock().unwrap().as_ref() {
let _ = tx.send(Ok(event));
}
}
}
500 => {
let message = json
.get("MESSAGE")
.and_then(|m| m.as_str())
.unwrap_or("Unknown error");
if let Some(tx) = event_tx.lock().unwrap().as_ref() {
let _ = tx.send(Err(WebSocketError::ProtocolError(
format!("CryptoCompare error: {}", message),
)));
}
}
_ => {
}
}
return;
}
None => return,
}
}
};
match msg_type {
"0" => {
if let Some(event) = Self::parse_trade(&json) {
if let Some(tx) = event_tx.lock().unwrap().as_ref() {
let _ = tx.send(Ok(event));
}
}
}
"2" | "5" => {
if let Some(event) = Self::parse_ticker(&json) {
if let Some(tx) = event_tx.lock().unwrap().as_ref() {
let _ = tx.send(Ok(event));
}
}
}
"17" => {
if let Some(event) = Self::parse_ohlc(&json) {
if let Some(tx) = event_tx.lock().unwrap().as_ref() {
let _ = tx.send(Ok(event));
}
}
}
"500" => {
let message = json
.get("MESSAGE")
.and_then(|m| m.as_str())
.unwrap_or("Unknown error");
if let Some(tx) = event_tx.lock().unwrap().as_ref() {
let _ = tx.send(Err(WebSocketError::ProtocolError(format!(
"CryptoCompare error: {}",
message
))));
}
}
_ => {
}
}
}
fn parse_trade(json: &Value) -> Option<StreamEvent> {
let fsym = json.get("FSYM").and_then(|v| v.as_str())?;
let tsym = json.get("TSYM").and_then(|v| v.as_str())?;
let price = Self::extract_f64(json, "P")?;
let quantity = Self::extract_f64(json, "Q").unwrap_or(0.0);
let timestamp = json.get("TS").and_then(|v| v.as_i64()).unwrap_or(0);
let trade_id = json
.get("ID")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let flags = json
.get("F")
.and_then(|v| v.as_str())
.and_then(|s| {
if let Some(hex) = s.strip_prefix("0x") {
u64::from_str_radix(hex, 16).ok()
} else {
s.parse::<u64>().ok()
}
})
.unwrap_or(0);
let side = if flags & 0x2 != 0 {
TradeSide::Sell
} else {
TradeSide::Buy
};
Some(StreamEvent::Trade(PublicTrade {
id: trade_id,
symbol: format!("{}{}", fsym, tsym),
price,
quantity,
side,
timestamp: timestamp * 1000, }))
}
fn parse_ticker(json: &Value) -> Option<StreamEvent> {
let fsym = json
.get("FROMSYMBOL")
.or_else(|| json.get("FSYM"))
.and_then(|v| v.as_str())?;
let tsym = json
.get("TOSYMBOL")
.or_else(|| json.get("TSYM"))
.and_then(|v| v.as_str())?;
let price = Self::extract_f64(json, "PRICE")?;
let timestamp = json.get("LASTUPDATE").and_then(|v| v.as_i64()).unwrap_or(0);
Some(StreamEvent::Ticker(Ticker {
symbol: format!("{}{}", fsym, tsym),
last_price: price,
bid_price: Self::extract_f64(json, "BID"),
ask_price: Self::extract_f64(json, "OFFER"),
high_24h: Self::extract_f64(json, "HIGH24HOUR")
.or_else(|| Self::extract_f64(json, "HIGHDAY")),
low_24h: Self::extract_f64(json, "LOW24HOUR")
.or_else(|| Self::extract_f64(json, "LOWDAY")),
volume_24h: Self::extract_f64(json, "VOLUME24HOUR")
.or_else(|| Self::extract_f64(json, "VOLUMEDAY")),
quote_volume_24h: Self::extract_f64(json, "VOLUME24HOURTO")
.or_else(|| Self::extract_f64(json, "VOLUMEDAYTO")),
price_change_24h: {
let open = Self::extract_f64(json, "OPEN24HOUR")
.or_else(|| Self::extract_f64(json, "OPENDAY"));
open.map(|o| price - o)
},
price_change_percent_24h: {
let open = Self::extract_f64(json, "OPEN24HOUR")
.or_else(|| Self::extract_f64(json, "OPENDAY"));
open.filter(|&o| o > 0.0).map(|o| ((price - o) / o) * 100.0)
},
timestamp: timestamp * 1000, }))
}
fn parse_ohlc(json: &Value) -> Option<StreamEvent> {
let open = Self::extract_f64(json, "OPEN")?;
let high = Self::extract_f64(json, "HIGH")?;
let low = Self::extract_f64(json, "LOW")?;
let close = Self::extract_f64(json, "CLOSE")?;
let volume = Self::extract_f64(json, "VOLUME").unwrap_or(0.0);
let timestamp = json.get("TS").and_then(|v| v.as_i64()).unwrap_or(0);
Some(StreamEvent::Kline(Kline {
open_time: timestamp * 1000,
open,
high,
low,
close,
volume,
close_time: None,
quote_volume: Self::extract_f64(json, "VOLUMETO"),
trades: None,
}))
}
fn extract_f64(json: &Value, key: &str) -> Option<f64> {
json.get(key).and_then(|v| {
v.as_f64()
.or_else(|| v.as_str().and_then(|s| s.parse().ok()))
})
}
fn parse_trade_streamer(parts: &[&str]) -> Option<StreamEvent> {
if parts.len() < 9 {
return None;
}
let _exchange = parts.get(1)?; let fsym = parts.get(2)?;
let tsym = parts.get(3)?;
let flags = parts.get(4)?.parse::<u32>().ok()?;
let trade_id = parts.get(5)?.to_string();
let timestamp = parts.get(6)?.parse::<i64>().ok()?;
let quantity = parts.get(7)?.parse::<f64>().ok()?;
let price = parts.get(8)?.parse::<f64>().ok()?;
let side = if flags & 0x2 != 0 {
TradeSide::Sell
} else {
TradeSide::Buy
};
Some(StreamEvent::Trade(PublicTrade {
id: trade_id,
symbol: format!("{}{}", fsym, tsym),
price,
quantity,
side,
timestamp: timestamp * 1000, }))
}
fn parse_ticker_streamer(parts: &[&str]) -> Option<StreamEvent> {
if parts.len() < 7 {
return None;
}
let fsym = parts.get(2)?;
let tsym = parts.get(3)?;
let price = parts.get(5)?.parse::<f64>().ok()?;
let timestamp = parts.get(6)?.parse::<i64>().ok()?;
let volume_24h = parts.get(11).and_then(|s| s.parse::<f64>().ok());
let volume_24h_to = parts.get(12).and_then(|s| s.parse::<f64>().ok());
let open_24h = parts.get(15).and_then(|s| s.parse::<f64>().ok());
let high_24h = parts.get(16).and_then(|s| s.parse::<f64>().ok());
let low_24h = parts.get(17).and_then(|s| s.parse::<f64>().ok());
Some(StreamEvent::Ticker(Ticker {
symbol: format!("{}{}", fsym, tsym),
last_price: price,
bid_price: None, ask_price: None, high_24h,
low_24h,
volume_24h,
quote_volume_24h: volume_24h_to,
price_change_24h: open_24h.map(|o| price - o),
price_change_percent_24h: open_24h
.filter(|&o| o > 0.0)
.map(|o| ((price - o) / o) * 100.0),
timestamp: timestamp * 1000, }))
}
}
#[async_trait]
impl WebSocketConnector for CryptoCompareWebSocket {
async fn connect(&mut self, _account_type: AccountType) -> WebSocketResult<()> {
*self.status.lock().await = ConnectionStatus::Connecting;
let url = self.ws_url();
let (ws_stream, _) = connect_async(&url)
.await
.map_err(|e| WebSocketError::ConnectionError(format!("Failed to connect: {}", e)))?;
let (writer, reader) = ws_stream.split();
*self.ws_writer.lock().await = Some(writer);
*self.status.lock().await = ConnectionStatus::Connected;
let (tx, _) = broadcast::channel(1000);
*self.event_tx.lock().unwrap() = Some(tx);
Self::start_message_handler(
reader,
self.ws_writer.clone(),
self.event_tx.clone(),
self.status.clone(),
self.use_streamer_format,
);
Self::start_ping_task(self.ws_writer.clone(), self.status.clone());
Ok(())
}
async fn disconnect(&mut self) -> WebSocketResult<()> {
*self.status.lock().await = ConnectionStatus::Disconnected;
let mut writer_guard = self.ws_writer.lock().await;
if let Some(mut writer) = writer_guard.take() {
let _ = writer.close().await;
}
let _ = self.event_tx.lock().unwrap().take();
self.subscriptions.lock().await.clear();
Ok(())
}
fn connection_status(&self) -> ConnectionStatus {
match self.status.try_lock() {
Ok(guard) => *guard,
Err(_) => ConnectionStatus::Disconnected,
}
}
async fn subscribe(&mut self, request: SubscriptionRequest) -> WebSocketResult<()> {
let sub_string = Self::build_sub_string(&request)?;
Self::send_action(&self.ws_writer, "SubAdd", vec![sub_string]).await?;
self.subscriptions.lock().await.insert(request);
Ok(())
}
async fn unsubscribe(&mut self, request: SubscriptionRequest) -> WebSocketResult<()> {
let sub_string = Self::build_sub_string(&request)?;
Self::send_action(&self.ws_writer, "SubRemove", vec![sub_string]).await?;
self.subscriptions.lock().await.remove(&request);
Ok(())
}
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = WebSocketResult<StreamEvent>> + Send>> {
let tx_guard = self.event_tx.lock().unwrap();
if let Some(ref tx) = *tx_guard {
let rx = tx.subscribe();
Box::pin(tokio_stream::wrappers::BroadcastStream::new(rx).map(|r| {
r.map_err(|e| WebSocketError::ConnectionError(format!("Broadcast error: {}", e)))
.and_then(|x| x)
}))
} else {
Box::pin(futures_util::stream::empty())
}
}
fn active_subscriptions(&self) -> Vec<SubscriptionRequest> {
match self.subscriptions.try_lock() {
Ok(guard) => guard.iter().cloned().collect(),
Err(_) => Vec::new(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::types::Symbol;
#[test]
fn test_websocket_creation() {
let ws = CryptoCompareWebSocket::new();
assert_eq!(ws.connection_status(), ConnectionStatus::Disconnected);
}
#[test]
fn test_ws_url_public() {
let ws = CryptoCompareWebSocket::new();
assert_eq!(ws.ws_url(), "wss://streamer.cryptocompare.com/v2?format=streamer");
}
#[test]
fn test_ws_url_with_key() {
let auth = CryptoCompareAuth::new("test_key_123");
let ws = CryptoCompareWebSocket::with_auth(auth);
assert_eq!(
ws.ws_url(),
"wss://streamer.cryptocompare.com/v2?api_key=test_key_123"
);
}
#[test]
fn test_build_sub_string_ticker() {
let req = SubscriptionRequest::ticker(Symbol::new("BTC", "USD"));
let sub = CryptoCompareWebSocket::build_sub_string(&req).unwrap();
assert_eq!(sub, "5~CCCAGG~BTC~USD");
}
#[test]
fn test_build_sub_string_trade() {
let req = SubscriptionRequest::trade(Symbol::new("ETH", "USDT"));
let sub = CryptoCompareWebSocket::build_sub_string(&req).unwrap();
assert_eq!(sub, "0~CCCAGG~ETH~USDT");
}
#[test]
fn test_build_sub_string_kline() {
let req = SubscriptionRequest::kline(Symbol::new("BTC", "USD"), "1h");
let sub = CryptoCompareWebSocket::build_sub_string(&req).unwrap();
assert_eq!(sub, "17~CCCAGG~BTC~USD~1h");
}
#[test]
fn test_build_sub_string_orderbook_fails() {
let req = SubscriptionRequest::orderbook(Symbol::new("BTC", "USD"));
let result = CryptoCompareWebSocket::build_sub_string(&req);
assert!(result.is_err());
}
#[test]
fn test_parse_ticker_channel5() {
let json: Value = serde_json::from_str(
r#"{
"TYPE": "5",
"FROMSYMBOL": "BTC",
"TOSYMBOL": "USD",
"PRICE": 84023.50,
"LASTUPDATE": 1706280000,
"HIGH24HOUR": 85000.0,
"LOW24HOUR": 83000.0,
"VOLUME24HOUR": 1500.5,
"VOLUME24HOURTO": 126000000.0,
"OPEN24HOUR": 83500.0
}"#,
)
.unwrap();
let event = CryptoCompareWebSocket::parse_ticker(&json);
assert!(event.is_some());
if let Some(StreamEvent::Ticker(ticker)) = event {
assert_eq!(ticker.symbol, "BTCUSD");
assert_eq!(ticker.last_price, 84023.50);
assert_eq!(ticker.high_24h, Some(85000.0));
assert_eq!(ticker.low_24h, Some(83000.0));
assert_eq!(ticker.volume_24h, Some(1500.5));
} else {
panic!("Expected StreamEvent::Ticker");
}
}
#[test]
fn test_parse_trade_channel0() {
let json: Value = serde_json::from_str(
r#"{
"TYPE": "0",
"FSYM": "BTC",
"TSYM": "USDT",
"P": 45000.50,
"Q": 0.5,
"TS": 1706280000,
"ID": "123456",
"F": "0x1"
}"#,
)
.unwrap();
let event = CryptoCompareWebSocket::parse_trade(&json);
assert!(event.is_some());
if let Some(StreamEvent::Trade(trade)) = event {
assert_eq!(trade.symbol, "BTCUSDT");
assert_eq!(trade.price, 45000.50);
assert_eq!(trade.quantity, 0.5);
assert_eq!(trade.id, "123456");
} else {
panic!("Expected StreamEvent::Trade");
}
}
#[test]
fn test_parse_ohlc_channel17() {
let json: Value = serde_json::from_str(
r#"{
"TYPE": "17",
"OPEN": 45000.0,
"HIGH": 45100.0,
"LOW": 44950.0,
"CLOSE": 45050.0,
"VOLUME": 125.5,
"VOLUMETO": 5650000.0,
"TS": 1706280000
}"#,
)
.unwrap();
let event = CryptoCompareWebSocket::parse_ohlc(&json);
assert!(event.is_some());
if let Some(StreamEvent::Kline(kline)) = event {
assert_eq!(kline.open, 45000.0);
assert_eq!(kline.high, 45100.0);
assert_eq!(kline.low, 44950.0);
assert_eq!(kline.close, 45050.0);
assert_eq!(kline.volume, 125.5);
assert_eq!(kline.quote_volume, Some(5650000.0));
} else {
panic!("Expected StreamEvent::Kline");
}
}
#[test]
fn test_parse_trade_streamer() {
let parts: Vec<&str> = vec![
"0",
"Coinbase",
"BTC",
"USD",
"2", "947952988",
"1769917571",
"0.00023",
"78706.05",
"18.1023915",
];
let event = CryptoCompareWebSocket::parse_trade_streamer(&parts);
assert!(event.is_some());
if let Some(StreamEvent::Trade(trade)) = event {
assert_eq!(trade.symbol, "BTCUSD");
assert_eq!(trade.price, 78706.05);
assert_eq!(trade.quantity, 0.00023);
assert_eq!(trade.id, "947952988");
assert_eq!(trade.side, TradeSide::Sell);
assert_eq!(trade.timestamp, 1769917571000); } else {
panic!("Expected StreamEvent::Trade");
}
}
#[test]
fn test_parse_ticker_streamer() {
let parts: Vec<&str> = vec![
"5",
"CCCAGG",
"BTC",
"USD",
"1",
"78716.20", "1769917542", "78716.20", "0.00023", "18.10", "947952988", "1500.5", "118074300.0", "1200.0", "94459440.0", "78000.0", "79000.0", "77500.0", ];
let event = CryptoCompareWebSocket::parse_ticker_streamer(&parts);
assert!(event.is_some());
if let Some(StreamEvent::Ticker(ticker)) = event {
assert_eq!(ticker.symbol, "BTCUSD");
assert_eq!(ticker.last_price, 78716.20);
assert_eq!(ticker.high_24h, Some(79000.0));
assert_eq!(ticker.low_24h, Some(77500.0));
assert_eq!(ticker.volume_24h, Some(1500.5));
assert_eq!(ticker.quote_volume_24h, Some(118074300.0));
assert_eq!(ticker.price_change_24h, Some(716.20));
assert!(ticker.bid_price.is_none()); assert!(ticker.ask_price.is_none()); assert_eq!(ticker.timestamp, 1769917542000); } else {
panic!("Expected StreamEvent::Ticker");
}
}
#[test]
fn test_parse_ticker_streamer_partial_update() {
let parts: Vec<&str> = vec![
"5",
"CCCAGG",
"ETH",
"USDT",
"1",
"2850.50", "1769917600", ];
let event = CryptoCompareWebSocket::parse_ticker_streamer(&parts);
assert!(event.is_some());
if let Some(StreamEvent::Ticker(ticker)) = event {
assert_eq!(ticker.symbol, "ETHUSDT");
assert_eq!(ticker.last_price, 2850.50);
assert!(ticker.high_24h.is_none());
assert!(ticker.low_24h.is_none());
assert!(ticker.volume_24h.is_none());
} else {
panic!("Expected StreamEvent::Ticker");
}
}
}