smcp_computer/mcp_clients/
subscription_manager.rs1use crate::mcp_clients::model::Resource;
23use std::collections::HashSet;
24use std::sync::Arc;
25use tokio::sync::RwLock;
26
27#[derive(Debug, Clone)]
29pub struct Subscription {
30 pub uri: String,
32 pub subscribed_at: std::time::Instant,
34 pub resource: Resource,
36}
37
38impl Subscription {
39 pub fn new(resource: Resource) -> Self {
41 Self {
42 uri: resource.uri.clone(),
43 subscribed_at: std::time::Instant::now(),
44 resource,
45 }
46 }
47
48 pub fn is_expired(&self, ttl: std::time::Duration) -> bool {
50 self.subscribed_at.elapsed() > ttl
51 }
52}
53
54#[derive(Debug, Clone)]
58pub struct SubscriptionManager {
59 subscriptions: Arc<RwLock<HashSet<String>>>,
61}
62
63impl SubscriptionManager {
64 pub fn new() -> Self {
66 Self {
67 subscriptions: Arc::new(RwLock::new(HashSet::new())),
68 }
69 }
70
71 pub async fn add_subscription(&self, uri: String) -> Result<bool, String> {
80 let mut subs = self.subscriptions.write().await;
81 let is_new = subs.insert(uri.clone());
82 Ok(is_new)
83 }
84
85 pub async fn remove_subscription(&self, uri: &str) -> Result<bool, String> {
94 let mut subs = self.subscriptions.write().await;
95 let removed = subs.remove(uri);
96 Ok(removed)
97 }
98
99 pub async fn is_subscribed(&self, uri: &str) -> bool {
108 let subs = self.subscriptions.read().await;
109 subs.contains(uri)
110 }
111
112 pub async fn get_subscriptions(&self) -> Vec<String> {
117 let subs = self.subscriptions.read().await;
118 subs.iter().cloned().collect()
119 }
120
121 pub async fn subscription_count(&self) -> usize {
126 let subs = self.subscriptions.read().await;
127 subs.len()
128 }
129
130 pub async fn clear(&self) {
132 let mut subs = self.subscriptions.write().await;
133 subs.clear();
134 }
135
136 pub async fn add_subscriptions_batch(&self, uris: Vec<String>) -> usize {
144 let mut subs = self.subscriptions.write().await;
145 let mut added = 0;
146 for uri in uris {
147 if subs.insert(uri) {
148 added += 1;
149 }
150 }
151 added
152 }
153}
154
155impl Default for SubscriptionManager {
156 fn default() -> Self {
157 Self::new()
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164
165 #[tokio::test]
166 async fn test_add_and_check_subscription() {
167 let manager = SubscriptionManager::new();
168
169 let result = manager.add_subscription("window://test".to_string()).await;
171 assert!(result.is_ok());
172 assert!(result.unwrap());
173
174 assert!(manager.is_subscribed("window://test").await);
176
177 let result = manager.add_subscription("window://test".to_string()).await;
179 assert!(result.is_ok());
180 assert!(!result.unwrap());
181 }
182
183 #[tokio::test]
184 async fn test_remove_subscription() {
185 let manager = SubscriptionManager::new();
186
187 manager
188 .add_subscription("window://test".to_string())
189 .await
190 .unwrap();
191 assert!(manager.is_subscribed("window://test").await);
192
193 let removed = manager.remove_subscription("window://test").await.unwrap();
195 assert!(removed);
196 assert!(!manager.is_subscribed("window://test").await);
197
198 let removed = manager.remove_subscription("window://test").await.unwrap();
200 assert!(!removed);
201 }
202
203 #[tokio::test]
204 async fn test_get_subscriptions() {
205 let manager = SubscriptionManager::new();
206
207 manager
208 .add_subscription("window://test1".to_string())
209 .await
210 .unwrap();
211 manager
212 .add_subscription("window://test2".to_string())
213 .await
214 .unwrap();
215
216 let subs = manager.get_subscriptions().await;
217 assert_eq!(subs.len(), 2);
218 assert!(subs.contains(&"window://test1".to_string()));
219 assert!(subs.contains(&"window://test2".to_string()));
220 }
221
222 #[tokio::test]
223 async fn test_clear_subscriptions() {
224 let manager = SubscriptionManager::new();
225
226 manager
227 .add_subscription("window://test1".to_string())
228 .await
229 .unwrap();
230 manager
231 .add_subscription("window://test2".to_string())
232 .await
233 .unwrap();
234
235 assert_eq!(manager.subscription_count().await, 2);
236
237 manager.clear().await;
238 assert_eq!(manager.subscription_count().await, 0);
239 }
240}