ccxt_core/ws_client/
subscription.rs

1//! WebSocket subscription management.
2
3use crate::error::{Error, Result};
4use dashmap::DashMap;
5use serde_json::Value;
6use std::collections::HashMap;
7
8/// WebSocket subscription metadata.
9#[derive(Debug, Clone)]
10pub struct Subscription {
11    pub(crate) channel: String,
12    pub(crate) symbol: Option<String>,
13    pub(crate) params: Option<HashMap<String, Value>>,
14}
15
16/// Subscription manager with capacity limits.
17#[derive(Debug)]
18pub struct SubscriptionManager {
19    subscriptions: DashMap<String, Subscription>,
20    max_subscriptions: usize,
21}
22
23impl SubscriptionManager {
24    /// Creates a new subscription manager with the specified maximum capacity.
25    pub fn new(max_subscriptions: usize) -> Self {
26        Self {
27            subscriptions: DashMap::new(),
28            max_subscriptions,
29        }
30    }
31
32    /// Creates a new subscription manager with the default maximum capacity (100).
33    pub fn with_default_capacity() -> Self {
34        Self::new(super::config::DEFAULT_MAX_SUBSCRIPTIONS)
35    }
36
37    /// Returns the maximum number of subscriptions allowed.
38    #[inline]
39    #[must_use]
40    pub fn max_subscriptions(&self) -> usize {
41        self.max_subscriptions
42    }
43
44    /// Attempts to add a subscription.
45    pub fn try_add(&self, key: String, subscription: Subscription) -> Result<()> {
46        if self.subscriptions.contains_key(&key) {
47            self.subscriptions.insert(key, subscription);
48            return Ok(());
49        }
50
51        if self.subscriptions.len() >= self.max_subscriptions {
52            return Err(Error::resource_exhausted(format!(
53                "Maximum subscriptions ({}) reached",
54                self.max_subscriptions
55            )));
56        }
57
58        self.subscriptions.insert(key, subscription);
59        Ok(())
60    }
61
62    /// Removes a subscription by key.
63    pub fn remove(&self, key: &str) -> Option<Subscription> {
64        self.subscriptions.remove(key).map(|(_, v)| v)
65    }
66
67    /// Returns the current number of active subscriptions.
68    #[inline]
69    #[must_use]
70    pub fn count(&self) -> usize {
71        self.subscriptions.len()
72    }
73
74    /// Returns the remaining capacity for new subscriptions.
75    #[inline]
76    #[must_use]
77    pub fn remaining_capacity(&self) -> usize {
78        self.max_subscriptions
79            .saturating_sub(self.subscriptions.len())
80    }
81
82    /// Checks if a subscription exists for the given key.
83    #[inline]
84    #[must_use]
85    pub fn contains(&self, key: &str) -> bool {
86        self.subscriptions.contains_key(key)
87    }
88
89    /// Returns a reference to the subscription for the given key, if it exists.
90    #[must_use]
91    pub fn get(&self, key: &str) -> Option<dashmap::mapref::one::Ref<'_, String, Subscription>> {
92        self.subscriptions.get(key)
93    }
94
95    /// Clears all subscriptions.
96    pub fn clear(&self) {
97        self.subscriptions.clear();
98    }
99
100    /// Returns an iterator over all subscriptions.
101    pub fn iter(
102        &self,
103    ) -> impl Iterator<Item = dashmap::mapref::multiple::RefMulti<'_, String, Subscription>> {
104        self.subscriptions.iter()
105    }
106
107    /// Collects all subscriptions into a vector.
108    #[must_use]
109    pub fn collect_subscriptions(&self) -> Vec<Subscription> {
110        self.subscriptions
111            .iter()
112            .map(|entry| entry.value().clone())
113            .collect()
114    }
115
116    /// Checks if the manager is at full capacity.
117    #[inline]
118    #[must_use]
119    pub fn is_full(&self) -> bool {
120        self.subscriptions.len() >= self.max_subscriptions
121    }
122
123    /// Checks if the manager has no subscriptions.
124    #[inline]
125    #[must_use]
126    pub fn is_empty(&self) -> bool {
127        self.subscriptions.is_empty()
128    }
129}
130
131impl Default for SubscriptionManager {
132    fn default() -> Self {
133        Self::with_default_capacity()
134    }
135}