1use std::collections::{HashMap, VecDeque};
22use std::sync::Arc;
23use std::time::Duration;
24
25use async_trait::async_trait;
26use serde::{Deserialize, Serialize};
27use tokio::sync::{broadcast, RwLock};
28
29use crate::error::PodError;
30use crate::storage::StorageEvent;
31
32#[cfg(feature = "legacy-notifications")]
35pub mod legacy;
36
37#[cfg(feature = "webhook-signing")]
42pub mod signing;
43
44pub mod as_ns {
46 pub const CONTEXT: &str = "https://www.w3.org/ns/activitystreams";
47 pub const CREATE: &str = "Create";
48 pub const UPDATE: &str = "Update";
49 pub const DELETE: &str = "Delete";
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
54#[serde(rename_all = "PascalCase")]
55pub enum ChannelType {
56 WebSocketChannel2023,
57 WebhookChannel2023,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct Subscription {
63 pub id: String,
65 pub topic: String,
67 pub channel_type: ChannelType,
69 pub receive_from: String,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct ChangeNotification {
78 #[serde(rename = "@context")]
79 pub context: String,
80 pub id: String,
81 #[serde(rename = "type")]
82 pub kind: String,
83 pub object: String,
84 pub published: String,
85}
86
87impl ChangeNotification {
88 pub fn from_storage_event(event: &StorageEvent, pod_base: &str) -> Self {
90 let (kind, path) = match event {
91 StorageEvent::Created(p) => (as_ns::CREATE, p),
92 StorageEvent::Updated(p) => (as_ns::UPDATE, p),
93 StorageEvent::Deleted(p) => (as_ns::DELETE, p),
94 };
95 let object = format!("{}{}", pod_base.trim_end_matches('/'), path);
96 Self {
97 context: as_ns::CONTEXT.to_string(),
98 id: format!("urn:uuid:{}", uuid::Uuid::new_v4()),
99 kind: kind.to_string(),
100 object,
101 published: chrono::Utc::now().to_rfc3339(),
102 }
103 }
104}
105
106#[async_trait]
108pub trait Notifications: Send + Sync {
109 async fn subscribe(&self, subscription: Subscription) -> Result<(), PodError>;
111
112 async fn unsubscribe(&self, id: &str) -> Result<(), PodError>;
114
115 async fn publish(&self, topic: &str, notification: ChangeNotification) -> Result<(), PodError>;
117}
118
119pub const DEFAULT_MAX_SUBSCRIPTIONS: usize = 10_000;
126
127#[derive(Clone)]
128pub struct InMemoryNotifications {
129 inner: Arc<RwLock<InMemoryInner>>,
130}
131
132#[derive(Clone)]
133struct InMemoryInner {
134 topics: HashMap<String, VecDeque<Subscription>>,
135 max_capacity: usize,
137 total_count: usize,
139}
140
141impl Default for InMemoryInner {
142 fn default() -> Self {
143 Self {
144 topics: HashMap::new(),
145 max_capacity: DEFAULT_MAX_SUBSCRIPTIONS,
146 total_count: 0,
147 }
148 }
149}
150
151impl Default for InMemoryNotifications {
152 fn default() -> Self {
153 Self {
154 inner: Arc::new(RwLock::new(InMemoryInner::default())),
155 }
156 }
157}
158
159impl InMemoryNotifications {
160 pub fn new() -> Self {
161 Self::default()
162 }
163
164 pub fn with_capacity(max_capacity: usize) -> Self {
166 Self {
167 inner: Arc::new(RwLock::new(InMemoryInner {
168 topics: HashMap::new(),
169 max_capacity,
170 total_count: 0,
171 })),
172 }
173 }
174}
175
176#[async_trait]
177impl Notifications for InMemoryNotifications {
178 async fn subscribe(&self, subscription: Subscription) -> Result<(), PodError> {
179 let mut guard = self.inner.write().await;
180 if guard.total_count >= guard.max_capacity {
182 let evict_topic = guard
184 .topics
185 .iter()
186 .find(|(_, subs)| !subs.is_empty())
187 .map(|(t, _)| t.clone());
188 if let Some(topic_key) = evict_topic {
189 let now_empty = {
190 let subs = guard.topics.get_mut(&topic_key).unwrap();
191 subs.pop_front();
192 subs.is_empty()
193 };
194 guard.total_count = guard.total_count.saturating_sub(1);
195 if now_empty {
196 guard.topics.remove(&topic_key);
197 }
198 }
199 }
200 guard
201 .topics
202 .entry(subscription.topic.clone())
203 .or_default()
204 .push_back(subscription);
205 guard.total_count += 1;
206 Ok(())
207 }
208
209 async fn unsubscribe(&self, id: &str) -> Result<(), PodError> {
210 let mut guard = self.inner.write().await;
211 let mut removed = 0usize;
212 for subs in guard.topics.values_mut() {
213 let before = subs.len();
214 subs.retain(|s| s.id != id);
215 removed += before - subs.len();
216 }
217 guard.total_count = guard.total_count.saturating_sub(removed);
218 guard.topics.retain(|_, subs| !subs.is_empty());
220 Ok(())
221 }
222
223 async fn publish(
224 &self,
225 topic: &str,
226 _notification: ChangeNotification,
227 ) -> Result<(), PodError> {
228 let guard = self.inner.read().await;
229 let _ = guard.topics.get(topic);
230 Ok(())
231 }
232}
233
234#[derive(Clone)]
243pub struct WebSocketChannelManager {
244 subscriptions: Arc<RwLock<HashMap<String, Subscription>>>,
245 sender: broadcast::Sender<ChangeNotification>,
246 heartbeat_interval: Duration,
247}
248
249impl Default for WebSocketChannelManager {
250 fn default() -> Self {
251 Self::new()
252 }
253}
254
255impl WebSocketChannelManager {
256 pub fn new() -> Self {
257 let (tx, _) = broadcast::channel(1024);
258 Self {
259 subscriptions: Arc::new(RwLock::new(HashMap::new())),
260 sender: tx,
261 heartbeat_interval: Duration::from_secs(30),
262 }
263 }
264
265 pub fn with_heartbeat(mut self, interval: Duration) -> Self {
267 self.heartbeat_interval = interval;
268 self
269 }
270
271 pub fn heartbeat_interval(&self) -> Duration {
273 self.heartbeat_interval
274 }
275
276 pub async fn subscribe(&self, topic: &str, base_url: &str) -> Subscription {
279 let id = uuid::Uuid::new_v4().to_string();
280 let receive_from = format!(
281 "{}/subscription/{}",
282 base_url.trim_end_matches('/'),
283 urlencoding(topic)
284 );
285 let sub = Subscription {
286 id: id.clone(),
287 topic: topic.to_string(),
288 channel_type: ChannelType::WebSocketChannel2023,
289 receive_from,
290 };
291 self.subscriptions.write().await.insert(id, sub.clone());
292 sub
293 }
294
295 pub async fn unsubscribe(&self, id: &str) {
297 self.subscriptions.write().await.remove(id);
298 }
299
300 pub fn stream(&self) -> broadcast::Receiver<ChangeNotification> {
304 self.sender.subscribe()
305 }
306
307 pub async fn active_subscriptions(&self) -> usize {
309 self.subscriptions.read().await.len()
310 }
311
312 pub async fn pump_from_storage(
317 self,
318 mut rx: tokio::sync::mpsc::Receiver<StorageEvent>,
319 pod_base: String,
320 ) {
321 while let Some(event) = rx.recv().await {
322 let note = ChangeNotification::from_storage_event(&event, &pod_base);
323 let _ = self.sender.send(note);
324 }
325 }
326}
327
328#[async_trait]
329impl Notifications for WebSocketChannelManager {
330 async fn subscribe(&self, subscription: Subscription) -> Result<(), PodError> {
331 self.subscriptions
332 .write()
333 .await
334 .insert(subscription.id.clone(), subscription);
335 Ok(())
336 }
337
338 async fn unsubscribe(&self, id: &str) -> Result<(), PodError> {
339 self.subscriptions.write().await.remove(id);
340 Ok(())
341 }
342
343 async fn publish(
344 &self,
345 _topic: &str,
346 notification: ChangeNotification,
347 ) -> Result<(), PodError> {
348 let _ = self.sender.send(notification);
349 Ok(())
350 }
351}
352
353#[derive(Debug, Clone, PartialEq, Eq)]
359pub enum WebhookDelivery {
360 Delivered { status: u16 },
362 FatalDrop { status: u16 },
364 TransientRetry { reason: String },
366}
367
368#[derive(Clone)]
383pub struct WebhookChannelManager {
384 client: reqwest::Client,
385 subscriptions: Arc<RwLock<HashMap<String, Subscription>>>,
386 pub retry_base: Duration,
388 pub max_retries: u32,
392 pub max_backoff: Duration,
394 pub circuit_threshold: u32,
396 consecutive_failures: Arc<std::sync::atomic::AtomicU32>,
399 #[cfg(feature = "webhook-signing")]
403 signer: Option<signing::SignerConfig>,
404}
405
406impl Default for WebhookChannelManager {
407 fn default() -> Self {
408 Self::new()
409 }
410}
411
412impl WebhookChannelManager {
413 pub fn new() -> Self {
414 Self {
415 client: reqwest::Client::builder()
416 .timeout(Duration::from_secs(10))
417 .build()
418 .unwrap_or_default(),
419 subscriptions: Arc::new(RwLock::new(HashMap::new())),
420 retry_base: Duration::from_millis(500),
421 max_retries: 3,
422 max_backoff: Duration::from_secs(3600),
423 circuit_threshold: 10,
424 consecutive_failures: Arc::new(std::sync::atomic::AtomicU32::new(0)),
425 #[cfg(feature = "webhook-signing")]
426 signer: None,
427 }
428 }
429
430 pub fn with_client(client: reqwest::Client) -> Self {
433 let mut m = Self::new();
434 m.client = client;
435 m
436 }
437
438 #[cfg(feature = "webhook-signing")]
441 pub fn with_signer(mut self, signer: signing::SignerConfig) -> Self {
442 self.signer = Some(signer);
443 self
444 }
445
446 pub fn with_max_attempts(mut self, attempts: u32) -> Self {
448 self.max_retries = attempts.saturating_sub(1);
452 self
453 }
454
455 pub fn with_max_backoff(mut self, max: Duration) -> Self {
457 self.max_backoff = max;
458 self
459 }
460
461 pub fn with_circuit_threshold(mut self, threshold: u32) -> Self {
464 self.circuit_threshold = threshold;
465 self
466 }
467
468 pub fn circuit_open(&self) -> bool {
470 self.consecutive_failures
471 .load(std::sync::atomic::Ordering::Relaxed)
472 >= self.circuit_threshold
473 }
474
475 pub fn consecutive_failures(&self) -> u32 {
478 self.consecutive_failures
479 .load(std::sync::atomic::Ordering::Relaxed)
480 }
481
482 pub fn reset_circuit(&self) {
485 self.consecutive_failures
486 .store(0, std::sync::atomic::Ordering::Relaxed);
487 }
488
489 pub async fn subscribe(&self, topic: &str, target_url: &str) -> Subscription {
490 let sub = Subscription {
491 id: uuid::Uuid::new_v4().to_string(),
492 topic: topic.to_string(),
493 channel_type: ChannelType::WebhookChannel2023,
494 receive_from: target_url.to_string(),
495 };
496 self.subscriptions
497 .write()
498 .await
499 .insert(sub.id.clone(), sub.clone());
500 sub
501 }
502
503 pub async fn unsubscribe(&self, id: &str) {
504 self.subscriptions.write().await.remove(id);
505 }
506
507 pub async fn active_subscriptions(&self) -> usize {
508 self.subscriptions.read().await.len()
509 }
510
511 fn parse_retry_after(raw: &str) -> Option<Duration> {
514 if let Ok(secs) = raw.trim().parse::<u64>() {
515 return Some(Duration::from_secs(secs));
516 }
517 #[cfg(feature = "webhook-signing")]
518 {
519 if let Ok(when) = httpdate::parse_http_date(raw.trim()) {
520 if let Ok(delta) = when.duration_since(std::time::SystemTime::now()) {
521 return Some(delta);
522 }
523 }
524 }
525 None
526 }
527
528 #[doc(hidden)]
533 pub fn compute_backoff(&self, attempt: u32) -> Duration {
534 let exp = self
535 .retry_base
536 .saturating_mul(2u32.saturating_pow(attempt.min(20)));
537 let cap = std::cmp::min(exp, self.max_backoff);
538 let factor = jitter_factor();
544 let nanos = (cap.as_nanos() as f64 * factor) as u128;
545 Duration::from_nanos(nanos.min(u64::MAX as u128) as u64)
546 }
547
548 async fn send_once(
550 &self,
551 url: &str,
552 note: &ChangeNotification,
553 ) -> Result<reqwest::Response, reqwest::Error> {
554 let body = serde_json::to_vec(note).unwrap_or_default();
555 #[cfg(feature = "webhook-signing")]
556 let notification_id = note.id.clone();
557 #[cfg_attr(not(feature = "webhook-signing"), allow(unused_mut))]
558 let mut req = self
559 .client
560 .post(url)
561 .header("Content-Type", "application/ld+json");
562
563 #[cfg(feature = "webhook-signing")]
564 {
565 if let Some(cfg) = &self.signer {
566 let now = std::time::SystemTime::now()
567 .duration_since(std::time::UNIX_EPOCH)
568 .map(|d| d.as_secs())
569 .unwrap_or_default();
570 let signed = signing::sign_request(
571 cfg,
572 "POST",
573 url,
574 "application/ld+json",
575 &body,
576 ¬ification_id,
577 now,
578 );
579 for (name, value) in &signed.headers {
582 if name.eq_ignore_ascii_case("content-type") {
583 continue;
584 }
585 req = req.header(name.as_str(), value.as_str());
586 }
587 } else {
588 tracing::warn!(
589 "webhook manager delivering {} unsigned — consider configuring a SignerConfig",
590 url
591 );
592 }
593 }
594
595 req.body(body).send().await
596 }
597
598 pub async fn deliver_one(&self, url: &str, note: &ChangeNotification) -> WebhookDelivery {
601 if self.circuit_open() {
603 return WebhookDelivery::TransientRetry {
604 reason: "circuit open".to_string(),
605 };
606 }
607
608 let total_attempts = self.max_retries.saturating_add(1);
609 let mut attempt = 0u32;
610 loop {
611 let resp = self.send_once(url, note).await;
612 match resp {
613 Ok(r) => {
614 let status = r.status().as_u16();
615 if r.status().is_success() {
617 self.consecutive_failures
618 .store(0, std::sync::atomic::Ordering::Relaxed);
619 return WebhookDelivery::Delivered { status };
620 }
621 if status == 410 {
623 self.consecutive_failures
624 .store(0, std::sync::atomic::Ordering::Relaxed);
625 return WebhookDelivery::FatalDrop { status };
626 }
627 if status == 429 {
629 let retry_after = r
630 .headers()
631 .get("retry-after")
632 .and_then(|v| v.to_str().ok())
633 .and_then(Self::parse_retry_after)
634 .unwrap_or_else(|| self.compute_backoff(attempt));
635 attempt += 1;
636 if attempt >= total_attempts {
637 self.record_failure();
638 return WebhookDelivery::TransientRetry {
639 reason: format!("429 after {attempt} attempts"),
640 };
641 }
642 tokio::time::sleep(retry_after.min(self.max_backoff)).await;
643 continue;
644 }
645 if r.status().is_server_error() {
648 let wait = r
649 .headers()
650 .get("retry-after")
651 .and_then(|v| v.to_str().ok())
652 .and_then(Self::parse_retry_after)
653 .unwrap_or_else(|| self.compute_backoff(attempt));
654 attempt += 1;
655 if attempt >= total_attempts {
656 self.record_failure();
657 return WebhookDelivery::TransientRetry {
658 reason: format!("5xx after {attempt} attempts"),
659 };
660 }
661 tokio::time::sleep(wait.min(self.max_backoff)).await;
662 continue;
663 }
664 if r.status().is_client_error() {
667 let wait = self.compute_backoff(attempt);
668 attempt += 1;
669 if attempt >= total_attempts {
670 self.record_failure();
671 return WebhookDelivery::TransientRetry {
672 reason: format!("{status} after {attempt} attempts"),
673 };
674 }
675 tokio::time::sleep(wait.min(self.max_backoff)).await;
676 continue;
677 }
678 let wait = self.compute_backoff(attempt);
680 attempt += 1;
681 if attempt >= total_attempts {
682 self.record_failure();
683 return WebhookDelivery::TransientRetry {
684 reason: format!("status {status} after {attempt} attempts"),
685 };
686 }
687 tokio::time::sleep(wait.min(self.max_backoff)).await;
688 }
689 Err(e) => {
690 let wait = self.compute_backoff(attempt);
692 attempt += 1;
693 if attempt >= total_attempts {
694 self.record_failure();
695 return WebhookDelivery::TransientRetry {
696 reason: format!("network error: {e}"),
697 };
698 }
699 tokio::time::sleep(wait.min(self.max_backoff)).await;
700 }
701 }
702 }
703 }
704
705 fn record_failure(&self) {
706 self.consecutive_failures
707 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
708 }
709
710 pub async fn deliver_all(
713 &self,
714 note: &ChangeNotification,
715 topic_matches: impl Fn(&str) -> bool,
716 ) -> Vec<(String, WebhookDelivery)> {
717 let subs: Vec<Subscription> = {
718 let guard = self.subscriptions.read().await;
719 guard
720 .values()
721 .filter(|s| topic_matches(&s.topic))
722 .cloned()
723 .collect()
724 };
725 let mut out = Vec::with_capacity(subs.len());
726 let mut dropped = Vec::new();
727 for sub in subs {
728 let result = self.deliver_one(&sub.receive_from, note).await;
729 if matches!(result, WebhookDelivery::FatalDrop { .. }) {
730 dropped.push(sub.id.clone());
731 }
732 out.push((sub.id, result));
733 }
734 if !dropped.is_empty() {
735 let mut guard = self.subscriptions.write().await;
736 for id in dropped {
737 guard.remove(&id);
738 }
739 }
740 out
741 }
742
743 pub async fn pump_from_storage(
748 self,
749 mut rx: tokio::sync::mpsc::Receiver<StorageEvent>,
750 pod_base: String,
751 ) {
752 while let Some(event) = rx.recv().await {
753 let path = match &event {
754 StorageEvent::Created(p) | StorageEvent::Updated(p) | StorageEvent::Deleted(p) => {
755 p.clone()
756 }
757 };
758 let note = ChangeNotification::from_storage_event(&event, &pod_base);
759 self.deliver_all(¬e, |topic| path.starts_with(topic))
760 .await;
761 }
762 }
763}
764
765#[async_trait]
766impl Notifications for WebhookChannelManager {
767 async fn subscribe(&self, subscription: Subscription) -> Result<(), PodError> {
768 self.subscriptions
769 .write()
770 .await
771 .insert(subscription.id.clone(), subscription);
772 Ok(())
773 }
774
775 async fn unsubscribe(&self, id: &str) -> Result<(), PodError> {
776 self.subscriptions.write().await.remove(id);
777 Ok(())
778 }
779
780 async fn publish(&self, topic: &str, notification: ChangeNotification) -> Result<(), PodError> {
781 let matches_topic = |t: &str| topic.starts_with(t) || t == topic;
782 self.deliver_all(¬ification, matches_topic).await;
783 Ok(())
784 }
785}
786
787pub fn discovery_document(pod_base: &str) -> serde_json::Value {
794 let base = pod_base.trim_end_matches('/');
795 serde_json::json!({
796 "@context": ["https://www.w3.org/ns/solid/notifications-context/v1"],
797 "id": format!("{base}/.notifications"),
798 "channelTypes": [
799 {
800 "id": "WebSocketChannel2023",
801 "endpoint": format!("{base}/.notifications/websocket"),
802 "features": ["as:Create", "as:Update", "as:Delete"]
803 },
804 {
805 "id": "WebhookChannel2023",
806 "endpoint": format!("{base}/.notifications/webhook"),
807 "features": ["as:Create", "as:Update", "as:Delete"]
808 }
809 ]
810 })
811}
812
813#[cfg(feature = "webhook-signing")]
823fn jitter_factor() -> f64 {
824 use rand::Rng;
825 rand::thread_rng().gen_range(0.8_f64..1.0_f64)
826}
827
828#[cfg(not(feature = "webhook-signing"))]
829fn jitter_factor() -> f64 {
830 use std::sync::atomic::{AtomicU64, Ordering};
831 static SEED: AtomicU64 = AtomicU64::new(0);
833 let seed = {
834 let n = std::time::Instant::now().elapsed().as_nanos() as u64;
835 let prev = SEED.fetch_add(n | 1, Ordering::Relaxed);
836 prev.wrapping_add(n).wrapping_add(0x9E3779B97F4A7C15)
837 };
838 let mut x = seed;
839 x = (x ^ (x >> 30)).wrapping_mul(0xBF58476D1CE4E5B9);
840 x = (x ^ (x >> 27)).wrapping_mul(0x94D049BB133111EB);
841 x ^= x >> 31;
842 let unit = (x >> 11) as f64 / (1u64 << 53) as f64;
844 0.8 + unit * 0.2
845}
846
847fn urlencoding(s: &str) -> String {
852 let mut out = String::with_capacity(s.len());
853 for b in s.bytes() {
854 match b {
855 b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' | b'/' => {
856 out.push(b as char);
857 }
858 _ => {
859 out.push_str(&format!("%{:02X}", b));
860 }
861 }
862 }
863 out
864}
865
866#[cfg(test)]
871mod tests {
872 use super::*;
873
874 #[tokio::test]
875 async fn subscribe_unsubscribe_roundtrip() {
876 let n = InMemoryNotifications::new();
877 let sub = Subscription {
878 id: "sub-1".into(),
879 topic: "/public/".into(),
880 channel_type: ChannelType::WebhookChannel2023,
881 receive_from: "https://example.com/hook".into(),
882 };
883 n.subscribe(sub.clone()).await.unwrap();
884 n.unsubscribe("sub-1").await.unwrap();
885 n.publish(
886 "/public/",
887 ChangeNotification {
888 context: as_ns::CONTEXT.into(),
889 id: "urn:uuid:test".into(),
890 kind: "Update".into(),
891 object: "/public/x".into(),
892 published: chrono::Utc::now().to_rfc3339(),
893 },
894 )
895 .await
896 .unwrap();
897 }
898
899 #[tokio::test]
900 async fn websocket_manager_broadcasts_events() {
901 let m = WebSocketChannelManager::new();
902 let mut rx = m.stream();
903 let sub = m.subscribe("/public/", "wss://pod.example").await;
904 assert_eq!(sub.channel_type, ChannelType::WebSocketChannel2023);
905 assert!(sub.receive_from.contains("/subscription/"));
906
907 let note = ChangeNotification::from_storage_event(
908 &StorageEvent::Created("/public/x".into()),
909 "https://pod.example",
910 );
911 m.publish("/public/", note.clone()).await.unwrap();
912 let received = tokio::time::timeout(Duration::from_secs(1), rx.recv())
913 .await
914 .unwrap()
915 .unwrap();
916 assert_eq!(received.kind, "Create");
917 assert_eq!(received.object, "https://pod.example/public/x");
918 }
919
920 #[tokio::test]
921 async fn change_notification_maps_event_types() {
922 let c = ChangeNotification::from_storage_event(
923 &StorageEvent::Created("/x".into()),
924 "https://p.example",
925 );
926 assert_eq!(c.kind, "Create");
927 let u = ChangeNotification::from_storage_event(
928 &StorageEvent::Updated("/x".into()),
929 "https://p.example",
930 );
931 assert_eq!(u.kind, "Update");
932 let d = ChangeNotification::from_storage_event(
933 &StorageEvent::Deleted("/x".into()),
934 "https://p.example",
935 );
936 assert_eq!(d.kind, "Delete");
937 }
938
939 #[test]
940 fn discovery_lists_both_channels() {
941 let doc = discovery_document("https://pod.example");
942 let arr = doc["channelTypes"].as_array().unwrap();
943 assert_eq!(arr.len(), 2);
944 let ids: Vec<&str> = arr.iter().map(|v| v["id"].as_str().unwrap()).collect();
945 assert!(ids.contains(&"WebSocketChannel2023"));
946 assert!(ids.contains(&"WebhookChannel2023"));
947 }
948
949 #[test]
950 fn webhook_manager_default_retries() {
951 let m = WebhookChannelManager::new();
952 assert_eq!(m.max_retries, 3);
953 }
954
955 #[tokio::test]
956 async fn websocket_active_subscriptions_count() {
957 let m = WebSocketChannelManager::new();
958 assert_eq!(m.active_subscriptions().await, 0);
959 let s = m.subscribe("/a/", "wss://p").await;
960 assert_eq!(m.active_subscriptions().await, 1);
961 m.unsubscribe(&s.id).await;
962 assert_eq!(m.active_subscriptions().await, 0);
963 }
964
965 #[tokio::test]
966 async fn inmemory_bounded_evicts_oldest_at_capacity() {
967 let n = InMemoryNotifications::with_capacity(3);
968 for i in 0..3 {
969 let sub = Subscription {
970 id: format!("sub-{i}"),
971 topic: "/t/".into(),
972 channel_type: ChannelType::WebhookChannel2023,
973 receive_from: format!("https://example.com/hook-{i}"),
974 };
975 n.subscribe(sub).await.unwrap();
976 }
977 let sub4 = Subscription {
979 id: "sub-3".into(),
980 topic: "/t/".into(),
981 channel_type: ChannelType::WebhookChannel2023,
982 receive_from: "https://example.com/hook-3".into(),
983 };
984 n.subscribe(sub4).await.unwrap();
985
986 let guard = n.inner.read().await;
988 assert_eq!(guard.total_count, 3);
989 let subs = guard.topics.get("/t/").unwrap();
991 assert!(!subs.iter().any(|s| s.id == "sub-0"));
992 assert!(subs.iter().any(|s| s.id == "sub-3"));
993 }
994
995 #[tokio::test]
996 async fn inmemory_unsubscribe_decrements_total_count() {
997 let n = InMemoryNotifications::with_capacity(100);
998 let sub = Subscription {
999 id: "sub-x".into(),
1000 topic: "/x/".into(),
1001 channel_type: ChannelType::WebhookChannel2023,
1002 receive_from: "https://example.com/hook".into(),
1003 };
1004 n.subscribe(sub).await.unwrap();
1005 {
1006 let guard = n.inner.read().await;
1007 assert_eq!(guard.total_count, 1);
1008 }
1009 n.unsubscribe("sub-x").await.unwrap();
1010 {
1011 let guard = n.inner.read().await;
1012 assert_eq!(guard.total_count, 0);
1013 assert!(guard.topics.is_empty());
1015 }
1016 }
1017}