use pretty_simple_display::{DebugPretty, DisplaySimple};
use serde::{Deserialize, Serialize};
#[derive(Clone, Serialize, Deserialize, PartialEq, DebugPretty, DisplaySimple)]
pub enum WebSocketMessage {
Request(JsonRpcRequest),
Response(JsonRpcResponse),
Notification(JsonRpcNotification),
}
#[derive(Clone, Serialize, Deserialize, PartialEq, DebugPretty, DisplaySimple)]
pub struct JsonRpcRequest {
pub jsonrpc: String,
pub id: serde_json::Value,
pub method: String,
pub params: Option<serde_json::Value>,
}
#[derive(Clone, Serialize, Deserialize, PartialEq, DebugPretty, DisplaySimple)]
pub struct JsonRpcResponse {
pub jsonrpc: String,
pub id: serde_json::Value,
#[serde(flatten)]
pub result: JsonRpcResult,
}
#[derive(Clone, Serialize, Deserialize, PartialEq, DebugPretty, DisplaySimple)]
#[serde(untagged)]
pub enum JsonRpcResult {
Success {
result: serde_json::Value,
},
Error {
error: JsonRpcError,
},
}
#[derive(Clone, Serialize, Deserialize, PartialEq, DebugPretty, DisplaySimple)]
pub struct JsonRpcError {
pub code: i32,
pub message: String,
pub data: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AuthResponse {
pub access_token: String,
pub token_type: String,
pub expires_in: i64,
pub refresh_token: String,
pub scope: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct HelloResponse {
pub version: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TestResponse {
pub version: String,
}
#[derive(Clone, Serialize, Deserialize, PartialEq, DebugPretty, DisplaySimple)]
pub struct JsonRpcNotification {
pub jsonrpc: String,
pub method: String,
pub params: Option<serde_json::Value>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ConnectionState {
Disconnected,
Connecting,
Connected,
Authenticated,
Reconnecting,
Failed,
}
#[derive(Debug, Clone)]
pub struct HeartbeatStatus {
pub last_ping: Option<std::time::Instant>,
pub last_pong: Option<std::time::Instant>,
pub missed_pongs: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum SubscriptionChannel {
Ticker(String),
OrderBook(String),
Trades(String),
ChartTrades {
instrument: String,
resolution: String,
},
UserOrders,
UserTrades,
UserPortfolio,
UserChanges {
instrument: String,
interval: String,
},
PriceIndex(String),
EstimatedExpirationPrice(String),
MarkPrice(String),
Funding(String),
Perpetual {
instrument: String,
interval: String,
},
Quote(String),
PlatformState,
PlatformStatePublicMethods,
InstrumentState {
kind: String,
currency: String,
},
GroupedOrderBook {
instrument: String,
group: String,
depth: String,
interval: String,
},
IncrementalTicker(String),
TradesByKind {
kind: String,
currency: String,
interval: String,
},
PriceRanking(String),
PriceStatistics(String),
VolatilityIndex(String),
BlockRfqTrades(String),
BlockTradeConfirmations,
BlockTradeConfirmationsByCurrency(String),
UserMmpTrigger(String),
UserAccessLog,
UserLock,
Unknown(String),
}
#[derive(Clone, Serialize, Deserialize, PartialEq, DebugPretty, DisplaySimple)]
pub struct WsRequest {
pub jsonrpc: String,
pub id: serde_json::Value,
pub method: String,
pub params: Option<serde_json::Value>,
}
#[derive(Clone, Serialize, Deserialize, PartialEq, DebugPretty, DisplaySimple)]
pub struct WsResponse {
pub jsonrpc: String,
pub id: Option<serde_json::Value>,
pub result: Option<serde_json::Value>,
pub error: Option<JsonRpcError>,
}
impl JsonRpcRequest {
pub fn new<T: Serialize>(id: serde_json::Value, method: &str, params: Option<T>) -> Self {
Self {
jsonrpc: "2.0".to_string(),
id,
method: method.to_string(),
params: params.map(|p| serde_json::to_value(p).unwrap_or(serde_json::Value::Null)),
}
}
}
impl JsonRpcResponse {
pub fn success(id: serde_json::Value, result: serde_json::Value) -> Self {
Self {
jsonrpc: "2.0".to_string(),
id,
result: JsonRpcResult::Success { result },
}
}
pub fn error(id: serde_json::Value, error: JsonRpcError) -> Self {
Self {
jsonrpc: "2.0".to_string(),
id,
result: JsonRpcResult::Error { error },
}
}
}
impl JsonRpcNotification {
pub fn new<T: Serialize>(method: &str, params: Option<T>) -> Self {
Self {
jsonrpc: "2.0".to_string(),
method: method.to_string(),
params: params.map(|p| serde_json::to_value(p).unwrap_or(serde_json::Value::Null)),
}
}
}
impl SubscriptionChannel {
pub fn channel_name(&self) -> String {
match self {
SubscriptionChannel::Ticker(instrument) => format!("ticker.{}", instrument),
SubscriptionChannel::OrderBook(instrument) => format!("book.{}.raw", instrument),
SubscriptionChannel::Trades(instrument) => format!("trades.{}.raw", instrument),
SubscriptionChannel::ChartTrades {
instrument,
resolution,
} => {
format!("chart.trades.{}.{}", instrument, resolution)
}
SubscriptionChannel::UserOrders => "user.orders.any.any.raw".to_string(),
SubscriptionChannel::UserTrades => "user.trades.any.any.raw".to_string(),
SubscriptionChannel::UserPortfolio => "user.portfolio.any".to_string(),
SubscriptionChannel::UserChanges {
instrument,
interval,
} => {
format!("user.changes.{}.{}", instrument, interval)
}
SubscriptionChannel::PriceIndex(currency) => {
format!("deribit_price_index.{}_usd", currency.to_lowercase())
}
SubscriptionChannel::EstimatedExpirationPrice(instrument) => {
format!("estimated_expiration_price.{}", instrument)
}
SubscriptionChannel::MarkPrice(instrument) => {
format!("markprice.options.{}", instrument)
}
SubscriptionChannel::Funding(instrument) => format!("perpetual.{}.raw", instrument),
SubscriptionChannel::Perpetual {
instrument,
interval,
} => {
format!("perpetual.{}.{}", instrument, interval)
}
SubscriptionChannel::Quote(instrument) => format!("quote.{}", instrument),
SubscriptionChannel::PlatformState => "platform_state".to_string(),
SubscriptionChannel::PlatformStatePublicMethods => {
"platform_state.public_methods_state".to_string()
}
SubscriptionChannel::InstrumentState { kind, currency } => {
format!("instrument.state.{}.{}", kind, currency)
}
SubscriptionChannel::GroupedOrderBook {
instrument,
group,
depth,
interval,
} => {
format!("book.{}.{}.{}.{}", instrument, group, depth, interval)
}
SubscriptionChannel::IncrementalTicker(instrument) => {
format!("incremental_ticker.{}", instrument)
}
SubscriptionChannel::TradesByKind {
kind,
currency,
interval,
} => {
format!("trades.{}.{}.{}", kind, currency, interval)
}
SubscriptionChannel::PriceRanking(index_name) => {
format!("deribit_price_ranking.{}", index_name)
}
SubscriptionChannel::PriceStatistics(index_name) => {
format!("deribit_price_statistics.{}", index_name)
}
SubscriptionChannel::VolatilityIndex(index_name) => {
format!("deribit_volatility_index.{}", index_name)
}
SubscriptionChannel::BlockRfqTrades(currency) => {
format!("block_rfq.trades.{}", currency)
}
SubscriptionChannel::BlockTradeConfirmations => "block_trade_confirmations".to_string(),
SubscriptionChannel::BlockTradeConfirmationsByCurrency(currency) => {
format!("block_trade_confirmations.{}", currency)
}
SubscriptionChannel::UserMmpTrigger(index_name) => {
format!("user.mmp_trigger.{}", index_name)
}
SubscriptionChannel::UserAccessLog => "user.access_log".to_string(),
SubscriptionChannel::UserLock => "user.lock".to_string(),
SubscriptionChannel::Unknown(channel) => channel.clone(),
}
}
#[must_use]
pub fn from_string(s: &str) -> Self {
let parts: Vec<&str> = s.split('.').collect();
match parts.as_slice() {
["ticker", instrument] => SubscriptionChannel::Ticker(instrument.to_string()),
["ticker", instrument, _interval] => {
SubscriptionChannel::Ticker(instrument.to_string())
}
["book", instrument, "raw"] => SubscriptionChannel::OrderBook(instrument.to_string()),
["book", instrument, group, depth, interval] => SubscriptionChannel::GroupedOrderBook {
instrument: instrument.to_string(),
group: group.to_string(),
depth: depth.to_string(),
interval: interval.to_string(),
},
["book", instrument, _depth, _interval] => {
SubscriptionChannel::OrderBook(instrument.to_string())
}
["incremental_ticker", instrument] => {
SubscriptionChannel::IncrementalTicker(instrument.to_string())
}
["trades", instrument, "raw"] => SubscriptionChannel::Trades(instrument.to_string()),
["trades", kind, currency, interval] if !Self::looks_like_instrument(kind) => {
SubscriptionChannel::TradesByKind {
kind: kind.to_string(),
currency: currency.to_string(),
interval: interval.to_string(),
}
}
["trades", instrument, _interval] => {
SubscriptionChannel::Trades(instrument.to_string())
}
["chart", "trades", instrument, resolution] => SubscriptionChannel::ChartTrades {
instrument: instrument.to_string(),
resolution: resolution.to_string(),
},
["user", "orders", ..] => SubscriptionChannel::UserOrders,
["user", "trades", ..] => SubscriptionChannel::UserTrades,
["user", "portfolio", ..] => SubscriptionChannel::UserPortfolio,
["user", "changes", instrument, interval] => SubscriptionChannel::UserChanges {
instrument: instrument.to_string(),
interval: interval.to_string(),
},
["deribit_price_index", currency_pair] => {
let currency = currency_pair
.strip_suffix("_usd")
.map(|c| c.to_uppercase())
.unwrap_or_else(|| currency_pair.to_uppercase());
SubscriptionChannel::PriceIndex(currency)
}
["estimated_expiration_price", instrument] => {
SubscriptionChannel::EstimatedExpirationPrice(instrument.to_string())
}
["markprice", "options", instrument] => {
SubscriptionChannel::MarkPrice(instrument.to_string())
}
["perpetual", instrument, interval] => SubscriptionChannel::Perpetual {
instrument: instrument.to_string(),
interval: interval.to_string(),
},
["quote", instrument] => SubscriptionChannel::Quote(instrument.to_string()),
["platform_state"] => SubscriptionChannel::PlatformState,
["platform_state", "public_methods_state"] => {
SubscriptionChannel::PlatformStatePublicMethods
}
["instrument", "state", kind, currency] => SubscriptionChannel::InstrumentState {
kind: kind.to_string(),
currency: currency.to_string(),
},
["deribit_price_ranking", index_name] => {
SubscriptionChannel::PriceRanking(index_name.to_string())
}
["deribit_price_statistics", index_name] => {
SubscriptionChannel::PriceStatistics(index_name.to_string())
}
["deribit_volatility_index", index_name] => {
SubscriptionChannel::VolatilityIndex(index_name.to_string())
}
["block_rfq", "trades", currency] => {
SubscriptionChannel::BlockRfqTrades(currency.to_string())
}
["block_trade_confirmations"] => SubscriptionChannel::BlockTradeConfirmations,
["block_trade_confirmations", currency] => {
SubscriptionChannel::BlockTradeConfirmationsByCurrency(currency.to_string())
}
["user", "mmp_trigger", index_name] => {
SubscriptionChannel::UserMmpTrigger(index_name.to_string())
}
["user", "access_log"] => SubscriptionChannel::UserAccessLog,
["user", "lock"] => SubscriptionChannel::UserLock,
_ => SubscriptionChannel::Unknown(s.to_string()),
}
}
#[must_use]
pub fn is_unknown(&self) -> bool {
matches!(self, SubscriptionChannel::Unknown(_))
}
#[must_use]
fn looks_like_instrument(s: &str) -> bool {
s.contains('-')
}
}
impl std::fmt::Display for SubscriptionChannel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.channel_name())
}
}
impl ConnectionState {
pub fn is_connected(&self) -> bool {
matches!(
self,
ConnectionState::Connected | ConnectionState::Authenticated
)
}
pub fn is_authenticated(&self) -> bool {
matches!(self, ConnectionState::Authenticated)
}
pub fn is_transitional(&self) -> bool {
matches!(
self,
ConnectionState::Connecting | ConnectionState::Reconnecting
)
}
}
impl HeartbeatStatus {
pub fn new() -> Self {
Self {
last_ping: None,
last_pong: None,
missed_pongs: 0,
}
}
pub fn ping_sent(&mut self) {
self.last_ping = Some(std::time::Instant::now());
}
pub fn pong_received(&mut self) {
self.last_pong = Some(std::time::Instant::now());
self.missed_pongs = 0;
}
pub fn missed_pong(&mut self) {
self.missed_pongs += 1;
}
pub fn is_stale(&self, max_missed_pongs: u32) -> bool {
self.missed_pongs >= max_missed_pongs
}
}
impl Default for HeartbeatStatus {
fn default() -> Self {
Self::new()
}
}