use crate::{
clients::wss::WssDecoder,
errors::ExchangeError,
sources::coinbase::{
events::CoinbaseWssEvent,
responses::{
orderbooks::CoinbaseOrderbookResponse,
trades::CoinbaseTradeResponse,
},
},
};
use serde_json::Value;
pub struct CoinbaseDecoder;
impl WssDecoder for CoinbaseDecoder {
type Event = CoinbaseWssEvent;
fn decode(text: &str) -> Result<Option<Self::Event>, Box<ExchangeError>> {
let json: Value =
serde_json::from_str(text).map_err(|e| Box::new(ExchangeError::from(e)))?;
if let Some(ty) = json.get("type") {
let ty_str = ty.as_str().unwrap_or("");
match ty_str {
"subscriptions" => {
let channels = json
.get("channels")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|c| c.get("name").and_then(|n| n.as_str()))
.collect::<Vec<_>>()
.join(", ")
})
.unwrap_or_default();
tracing::info!(
channels = channels.as_str(),
"coinbase.subscription_confirmed"
);
return Ok(None);
}
"error" => {
let message = json
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("unknown error");
tracing::error!(
message = message,
raw = text,
"coinbase.subscription_failed"
);
return Ok(None);
}
_ => {} }
}
let channel = json
.get("channel")
.and_then(|c| c.as_str())
.unwrap_or("");
match channel {
"l2_data" => {
match serde_json::from_str::<CoinbaseOrderbookResponse>(text) {
Ok(resp) => Ok(Some(CoinbaseWssEvent::OrderbookData(resp))),
Err(e) => {
tracing::warn!("Failed to parse Coinbase level2 data: {}", e);
Ok(None)
}
}
}
"market_trades" => {
match serde_json::from_str::<CoinbaseTradeResponse>(text) {
Ok(resp) => {
if let Some(event) = resp.events.into_iter().next() {
if let Some(trade) = event.trades.into_iter().next() {
return Ok(Some(CoinbaseWssEvent::TradeData(trade)));
}
}
Ok(None)
}
Err(e) => {
tracing::warn!("Failed to parse Coinbase trade data: {}", e);
Ok(None)
}
}
}
_ => {
tracing::debug!("Coinbase: ignoring channel={}", channel);
Ok(None)
}
}
}
}