atelier_data 0.0.15

Data Artifacts and I/O for the atelier-rs engine
//! Coinbase Advanced Trade WSS decoder.
//!
//! Routes incoming JSON messages by the `"channel"` field to produce
//! [`CoinbaseWssEvent`] variants.

use crate::{
    clients::wss::WssDecoder,
    errors::ExchangeError,
    sources::coinbase::{
        events::CoinbaseWssEvent,
        responses::{
            orderbooks::CoinbaseOrderbookResponse,
            trades::CoinbaseTradeResponse,
        },
    },
};
use serde_json::Value;

/// [`WssDecoder`] implementation for the Coinbase Advanced Trade WebSocket API.
///
/// `CoinbaseDecoder` is a zero-sized type — all state lives in the
/// frame payload itself.  Decoding proceeds as follows:
///
/// 1. Parse the raw text into a [`serde_json::Value`].
/// 2. Check for `"type": "subscriptions"` or `"error"` control messages
///    (subscription acks / errors) — consumed as `Ok(None)`.
/// 3. Dispatch on the `"channel"` field:
///    - `"l2_data"` → [`CoinbaseWssEvent::OrderbookData`]
///    - `"market_trades"` → [`CoinbaseWssEvent::TradeData`]
///    - anything else (heartbeats, unknown) → `Ok(None)`
///
/// Parse failures on data frames are logged at `WARN` and swallowed as
/// `Ok(None)` to avoid tearing down the WebSocket pump.
pub struct CoinbaseDecoder;

impl WssDecoder for CoinbaseDecoder {
    type Event = CoinbaseWssEvent;

    fn decode(text: &str) -> Result<Option<Self::Event>, Box<ExchangeError>> {
        let json: Value =
            serde_json::from_str(text).map_err(|e| Box::new(ExchangeError::from(e)))?;

        // ── Subscription confirmation / error ──────────────────────────
        // Coinbase returns `{"type": "subscriptions", "channels": [...]}` on
        // success and `{"type": "error", "message": "..."}` on failure.
        if let Some(ty) = json.get("type") {
            let ty_str = ty.as_str().unwrap_or("");
            match ty_str {
                "subscriptions" => {
                    let channels = json
                        .get("channels")
                        .and_then(|v| v.as_array())
                        .map(|arr| {
                            arr.iter()
                                .filter_map(|c| c.get("name").and_then(|n| n.as_str()))
                                .collect::<Vec<_>>()
                                .join(", ")
                        })
                        .unwrap_or_default();
                    tracing::info!(
                        channels = channels.as_str(),
                        "coinbase.subscription_confirmed"
                    );
                    return Ok(None);
                }
                "error" => {
                    let message = json
                        .get("message")
                        .and_then(|v| v.as_str())
                        .unwrap_or("unknown error");
                    tracing::error!(
                        message = message,
                        raw = text,
                        "coinbase.subscription_failed"
                    );
                    return Ok(None);
                }
                _ => {} // fall through to channel dispatch
            }
        }

        // Data messages carry a `"channel"` field
        let channel = json
            .get("channel")
            .and_then(|c| c.as_str())
            .unwrap_or("");

        match channel {
            // level2 / l2_data channel — orderbook snapshots and updates
            "l2_data" => {
                match serde_json::from_str::<CoinbaseOrderbookResponse>(text) {
                    Ok(resp) => Ok(Some(CoinbaseWssEvent::OrderbookData(resp))),
                    Err(e) => {
                        tracing::warn!("Failed to parse Coinbase level2 data: {}", e);
                        Ok(None)
                    }
                }
            }

            // market_trades channel
            "market_trades" => {
                match serde_json::from_str::<CoinbaseTradeResponse>(text) {
                    Ok(resp) => {
                        // Flatten: each event may contain multiple trades
                        if let Some(event) = resp.events.into_iter().next() {
                            if let Some(trade) = event.trades.into_iter().next() {
                                return Ok(Some(CoinbaseWssEvent::TradeData(trade)));
                            }
                        }
                        Ok(None)
                    }
                    Err(e) => {
                        tracing::warn!("Failed to parse Coinbase trade data: {}", e);
                        Ok(None)
                    }
                }
            }

            // Heartbeat, subscriptions, or unknown channels
            _ => {
                tracing::debug!("Coinbase: ignoring channel={}", channel);
                Ok(None)
            }
        }
    }
}