Skip to main content

deribit_websocket/model/
subscription.rs

1//! Subscription management for WebSocket client
2
3use pretty_simple_display::DisplaySimple;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6
7/// Re-export subscription channel from deribit-base
8use crate::model::SubscriptionChannel;
9
10/// Subscription information
11#[derive(Clone, Serialize, Deserialize, DisplaySimple)]
12pub struct Subscription {
13    /// Channel name
14    pub channel: String,
15    /// Channel type
16    pub channel_type: SubscriptionChannel,
17    /// Instrument name (if applicable)
18    pub instrument: Option<String>,
19    /// Whether subscription is active
20    pub active: bool,
21}
22
23/// Subscription manager
24#[derive(Debug, Default)]
25pub struct SubscriptionManager {
26    subscriptions: HashMap<String, Subscription>,
27}
28
29impl SubscriptionManager {
30    /// Create a new subscription manager
31    pub fn new() -> Self {
32        Self {
33            subscriptions: HashMap::new(),
34        }
35    }
36
37    /// Add a subscription
38    pub fn add_subscription(
39        &mut self,
40        channel: String,
41        channel_type: SubscriptionChannel,
42        instrument: Option<String>,
43    ) {
44        let subscription = Subscription {
45            channel: channel.clone(),
46            channel_type,
47            instrument,
48            active: true,
49        };
50        self.subscriptions.insert(channel, subscription);
51    }
52
53    /// Add a subscription from channel type
54    pub fn add_subscription_from_channel(&mut self, channel_type: SubscriptionChannel) {
55        let channel = channel_type.to_string();
56        let instrument = match &channel_type {
57            SubscriptionChannel::Ticker(inst)
58            | SubscriptionChannel::OrderBook(inst)
59            | SubscriptionChannel::Trades(inst)
60            | SubscriptionChannel::IncrementalTicker(inst) => Some(inst.clone()),
61            SubscriptionChannel::ChartTrades { instrument, .. }
62            | SubscriptionChannel::GroupedOrderBook { instrument, .. } => Some(instrument.clone()),
63            SubscriptionChannel::UserChanges { instrument, .. } => Some(instrument.clone()),
64            SubscriptionChannel::TradesByKind { currency, .. } => Some(currency.clone()),
65            SubscriptionChannel::PriceIndex(index_name)
66            | SubscriptionChannel::PriceRanking(index_name)
67            | SubscriptionChannel::PriceStatistics(index_name)
68            | SubscriptionChannel::VolatilityIndex(index_name) => Some(index_name.clone()),
69            SubscriptionChannel::EstimatedExpirationPrice(inst)
70            | SubscriptionChannel::MarkPrice(inst)
71            | SubscriptionChannel::Funding(inst)
72            | SubscriptionChannel::Quote(inst) => Some(inst.clone()),
73            SubscriptionChannel::Perpetual { instrument, .. } => Some(instrument.clone()),
74            SubscriptionChannel::InstrumentState { currency, .. } => Some(currency.clone()),
75            SubscriptionChannel::BlockRfqTrades(currency)
76            | SubscriptionChannel::BlockTradeConfirmationsByCurrency(currency) => {
77                Some(currency.clone())
78            }
79            SubscriptionChannel::UserMmpTrigger(index_name) => Some(index_name.clone()),
80            SubscriptionChannel::UserOrders
81            | SubscriptionChannel::UserTrades
82            | SubscriptionChannel::UserPortfolio
83            | SubscriptionChannel::PlatformState
84            | SubscriptionChannel::PlatformStatePublicMethods
85            | SubscriptionChannel::BlockTradeConfirmations
86            | SubscriptionChannel::UserAccessLog
87            | SubscriptionChannel::UserLock
88            | SubscriptionChannel::Unknown(_) => None,
89        };
90        self.add_subscription(channel, channel_type, instrument);
91    }
92
93    /// Remove a subscription
94    pub fn remove_subscription(&mut self, channel: &str) -> Option<Subscription> {
95        self.subscriptions.remove(channel)
96    }
97
98    /// Get all active subscriptions
99    pub fn active_subscriptions(&self) -> Vec<&Subscription> {
100        self.subscriptions.values().filter(|s| s.active).collect()
101    }
102
103    /// Get subscription by channel
104    pub fn get_subscription(&self, channel: &str) -> Option<&Subscription> {
105        self.subscriptions.get(channel)
106    }
107
108    /// Mark subscription as inactive
109    pub fn deactivate_subscription(&mut self, channel: &str) {
110        if let Some(subscription) = self.subscriptions.get_mut(channel) {
111            subscription.active = false;
112        }
113    }
114
115    /// Reactivate all subscriptions
116    pub fn reactivate_all(&mut self) {
117        for subscription in self.subscriptions.values_mut() {
118            subscription.active = true;
119        }
120    }
121
122    /// Clear all subscriptions
123    pub fn clear(&mut self) {
124        self.subscriptions.clear();
125    }
126
127    /// Get all channel names
128    pub fn get_all_channels(&self) -> Vec<String> {
129        self.subscriptions.keys().cloned().collect()
130    }
131
132    /// Get active channel names
133    pub fn get_active_channels(&self) -> Vec<String> {
134        self.subscriptions
135            .iter()
136            .filter(|(_, sub)| sub.active)
137            .map(|(channel, _)| channel.clone())
138            .collect()
139    }
140}
141
142// Custom Debug implementation for Subscription
143impl std::fmt::Debug for Subscription {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        f.debug_struct("Subscription")
146            .field("channel", &self.channel)
147            .field("channel_type", &self.channel_type)
148            .field("instrument", &self.instrument)
149            .field("active", &self.active)
150            .finish()
151    }
152}