1use std::collections::{HashMap, VecDeque};
22use std::sync::Arc;
23use std::time::Duration;
24
25use async_trait::async_trait;
26use serde::{Deserialize, Serialize};
27use tokio::sync::{broadcast, RwLock};
28
29use crate::error::PodError;
30use crate::storage::StorageEvent;
31
32#[cfg(feature = "legacy-notifications")]
35pub mod legacy;
36
37#[cfg(feature = "webhook-signing")]
42pub mod signing;
43
44pub mod as_ns {
46 pub const CONTEXT: &str = "https://www.w3.org/ns/activitystreams";
47 pub const CREATE: &str = "Create";
48 pub const UPDATE: &str = "Update";
49 pub const DELETE: &str = "Delete";
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
54#[serde(rename_all = "PascalCase")]
55pub enum ChannelType {
56 WebSocketChannel2023,
57 WebhookChannel2023,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct Subscription {
63 pub id: String,
65 pub topic: String,
67 pub channel_type: ChannelType,
69 pub receive_from: String,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct ChangeNotification {
78 #[serde(rename = "@context")]
79 pub context: String,
80 pub id: String,
81 #[serde(rename = "type")]
82 pub kind: String,
83 pub object: String,
84 pub published: String,
85}
86
87impl ChangeNotification {
88 pub fn from_storage_event(event: &StorageEvent, pod_base: &str) -> Self {
90 let (kind, path) = match event {
91 StorageEvent::Created(p) => (as_ns::CREATE, p),
92 StorageEvent::Updated(p) => (as_ns::UPDATE, p),
93 StorageEvent::Deleted(p) => (as_ns::DELETE, p),
94 };
95 let object = format!("{}{}", pod_base.trim_end_matches('/'), path);
96 Self {
97 context: as_ns::CONTEXT.to_string(),
98 id: format!("urn:uuid:{}", uuid::Uuid::new_v4()),
99 kind: kind.to_string(),
100 object,
101 published: chrono::Utc::now().to_rfc3339(),
102 }
103 }
104}
105
106#[async_trait]
108pub trait Notifications: Send + Sync {
109 async fn subscribe(&self, subscription: Subscription) -> Result<(), PodError>;
111
112 async fn unsubscribe(&self, id: &str) -> Result<(), PodError>;
114
115 async fn publish(
117 &self,
118 topic: &str,
119 notification: ChangeNotification,
120 ) -> Result<(), PodError>;
121}
122
123pub const DEFAULT_MAX_SUBSCRIPTIONS: usize = 10_000;
130
131#[derive(Clone)]
132pub struct InMemoryNotifications {
133 inner: Arc<RwLock<InMemoryInner>>,
134}
135
136#[derive(Clone)]
137struct InMemoryInner {
138 topics: HashMap<String, VecDeque<Subscription>>,
139 max_capacity: usize,
141 total_count: usize,
143}
144
145impl Default for InMemoryInner {
146 fn default() -> Self {
147 Self {
148 topics: HashMap::new(),
149 max_capacity: DEFAULT_MAX_SUBSCRIPTIONS,
150 total_count: 0,
151 }
152 }
153}
154
155impl Default for InMemoryNotifications {
156 fn default() -> Self {
157 Self {
158 inner: Arc::new(RwLock::new(InMemoryInner::default())),
159 }
160 }
161}
162
163impl InMemoryNotifications {
164 pub fn new() -> Self {
165 Self::default()
166 }
167
168 pub fn with_capacity(max_capacity: usize) -> Self {
170 Self {
171 inner: Arc::new(RwLock::new(InMemoryInner {
172 topics: HashMap::new(),
173 max_capacity,
174 total_count: 0,
175 })),
176 }
177 }
178}
179
180#[async_trait]
181impl Notifications for InMemoryNotifications {
182 async fn subscribe(&self, subscription: Subscription) -> Result<(), PodError> {
183 let mut guard = self.inner.write().await;
184 if guard.total_count >= guard.max_capacity {
186 let evict_topic = guard
188 .topics
189 .iter()
190 .find(|(_, subs)| !subs.is_empty())
191 .map(|(t, _)| t.clone());
192 if let Some(topic_key) = evict_topic {
193 let now_empty = {
194 let subs = guard.topics.get_mut(&topic_key).unwrap();
195 subs.pop_front();
196 subs.is_empty()
197 };
198 guard.total_count = guard.total_count.saturating_sub(1);
199 if now_empty {
200 guard.topics.remove(&topic_key);
201 }
202 }
203 }
204 guard
205 .topics
206 .entry(subscription.topic.clone())
207 .or_default()
208 .push_back(subscription);
209 guard.total_count += 1;
210 Ok(())
211 }
212
213 async fn unsubscribe(&self, id: &str) -> Result<(), PodError> {
214 let mut guard = self.inner.write().await;
215 let mut removed = 0usize;
216 for subs in guard.topics.values_mut() {
217 let before = subs.len();
218 subs.retain(|s| s.id != id);
219 removed += before - subs.len();
220 }
221 guard.total_count = guard.total_count.saturating_sub(removed);
222 guard.topics.retain(|_, subs| !subs.is_empty());
224 Ok(())
225 }
226
227 async fn publish(
228 &self,
229 topic: &str,
230 _notification: ChangeNotification,
231 ) -> Result<(), PodError> {
232 let guard = self.inner.read().await;
233 let _ = guard.topics.get(topic);
234 Ok(())
235 }
236}
237
238#[derive(Clone)]
247pub struct WebSocketChannelManager {
248 subscriptions: Arc<RwLock<HashMap<String, Subscription>>>,
249 sender: broadcast::Sender<ChangeNotification>,
250 heartbeat_interval: Duration,
251}
252
253impl Default for WebSocketChannelManager {
254 fn default() -> Self {
255 Self::new()
256 }
257}
258
259impl WebSocketChannelManager {
260 pub fn new() -> Self {
261 let (tx, _) = broadcast::channel(1024);
262 Self {
263 subscriptions: Arc::new(RwLock::new(HashMap::new())),
264 sender: tx,
265 heartbeat_interval: Duration::from_secs(30),
266 }
267 }
268
269 pub fn with_heartbeat(mut self, interval: Duration) -> Self {
271 self.heartbeat_interval = interval;
272 self
273 }
274
275 pub fn heartbeat_interval(&self) -> Duration {
277 self.heartbeat_interval
278 }
279
280 pub async fn subscribe(&self, topic: &str, base_url: &str) -> Subscription {
283 let id = uuid::Uuid::new_v4().to_string();
284 let receive_from = format!(
285 "{}/subscription/{}",
286 base_url.trim_end_matches('/'),
287 urlencoding(topic)
288 );
289 let sub = Subscription {
290 id: id.clone(),
291 topic: topic.to_string(),
292 channel_type: ChannelType::WebSocketChannel2023,
293 receive_from,
294 };
295 self.subscriptions.write().await.insert(id, sub.clone());
296 sub
297 }
298
299 pub async fn unsubscribe(&self, id: &str) {
301 self.subscriptions.write().await.remove(id);
302 }
303
304 pub fn stream(&self) -> broadcast::Receiver<ChangeNotification> {
308 self.sender.subscribe()
309 }
310
311 pub async fn active_subscriptions(&self) -> usize {
313 self.subscriptions.read().await.len()
314 }
315
316 pub async fn pump_from_storage(
321 self,
322 mut rx: tokio::sync::mpsc::Receiver<StorageEvent>,
323 pod_base: String,
324 ) {
325 while let Some(event) = rx.recv().await {
326 let note = ChangeNotification::from_storage_event(&event, &pod_base);
327 let _ = self.sender.send(note);
328 }
329 }
330}
331
332#[async_trait]
333impl Notifications for WebSocketChannelManager {
334 async fn subscribe(&self, subscription: Subscription) -> Result<(), PodError> {
335 self.subscriptions
336 .write()
337 .await
338 .insert(subscription.id.clone(), subscription);
339 Ok(())
340 }
341
342 async fn unsubscribe(&self, id: &str) -> Result<(), PodError> {
343 self.subscriptions.write().await.remove(id);
344 Ok(())
345 }
346
347 async fn publish(
348 &self,
349 _topic: &str,
350 notification: ChangeNotification,
351 ) -> Result<(), PodError> {
352 let _ = self.sender.send(notification);
353 Ok(())
354 }
355}
356
357#[derive(Debug, Clone, PartialEq, Eq)]
363pub enum WebhookDelivery {
364 Delivered { status: u16 },
366 FatalDrop { status: u16 },
368 TransientRetry { reason: String },
370}
371
372#[derive(Clone)]
387pub struct WebhookChannelManager {
388 client: reqwest::Client,
389 subscriptions: Arc<RwLock<HashMap<String, Subscription>>>,
390 pub retry_base: Duration,
392 pub max_retries: u32,
396 pub max_backoff: Duration,
398 pub circuit_threshold: u32,
400 consecutive_failures: Arc<std::sync::atomic::AtomicU32>,
403 #[cfg(feature = "webhook-signing")]
407 signer: Option<signing::SignerConfig>,
408}
409
410impl Default for WebhookChannelManager {
411 fn default() -> Self {
412 Self::new()
413 }
414}
415
416impl WebhookChannelManager {
417 pub fn new() -> Self {
418 Self {
419 client: reqwest::Client::builder()
420 .timeout(Duration::from_secs(10))
421 .build()
422 .unwrap_or_default(),
423 subscriptions: Arc::new(RwLock::new(HashMap::new())),
424 retry_base: Duration::from_millis(500),
425 max_retries: 3,
426 max_backoff: Duration::from_secs(3600),
427 circuit_threshold: 10,
428 consecutive_failures: Arc::new(std::sync::atomic::AtomicU32::new(0)),
429 #[cfg(feature = "webhook-signing")]
430 signer: None,
431 }
432 }
433
434 pub fn with_client(client: reqwest::Client) -> Self {
437 let mut m = Self::new();
438 m.client = client;
439 m
440 }
441
442 #[cfg(feature = "webhook-signing")]
445 pub fn with_signer(mut self, signer: signing::SignerConfig) -> Self {
446 self.signer = Some(signer);
447 self
448 }
449
450 pub fn with_max_attempts(mut self, attempts: u32) -> Self {
452 self.max_retries = attempts.saturating_sub(1);
456 self
457 }
458
459 pub fn with_max_backoff(mut self, max: Duration) -> Self {
461 self.max_backoff = max;
462 self
463 }
464
465 pub fn with_circuit_threshold(mut self, threshold: u32) -> Self {
468 self.circuit_threshold = threshold;
469 self
470 }
471
472 pub fn circuit_open(&self) -> bool {
474 self.consecutive_failures
475 .load(std::sync::atomic::Ordering::Relaxed)
476 >= self.circuit_threshold
477 }
478
479 pub fn consecutive_failures(&self) -> u32 {
482 self.consecutive_failures
483 .load(std::sync::atomic::Ordering::Relaxed)
484 }
485
486 pub fn reset_circuit(&self) {
489 self.consecutive_failures
490 .store(0, std::sync::atomic::Ordering::Relaxed);
491 }
492
493 pub async fn subscribe(&self, topic: &str, target_url: &str) -> Subscription {
494 let sub = Subscription {
495 id: uuid::Uuid::new_v4().to_string(),
496 topic: topic.to_string(),
497 channel_type: ChannelType::WebhookChannel2023,
498 receive_from: target_url.to_string(),
499 };
500 self.subscriptions
501 .write()
502 .await
503 .insert(sub.id.clone(), sub.clone());
504 sub
505 }
506
507 pub async fn unsubscribe(&self, id: &str) {
508 self.subscriptions.write().await.remove(id);
509 }
510
511 pub async fn active_subscriptions(&self) -> usize {
512 self.subscriptions.read().await.len()
513 }
514
515 fn parse_retry_after(raw: &str) -> Option<Duration> {
518 if let Ok(secs) = raw.trim().parse::<u64>() {
519 return Some(Duration::from_secs(secs));
520 }
521 #[cfg(feature = "webhook-signing")]
522 {
523 if let Ok(when) = httpdate::parse_http_date(raw.trim()) {
524 if let Ok(delta) = when.duration_since(std::time::SystemTime::now()) {
525 return Some(delta);
526 }
527 }
528 }
529 None
530 }
531
532 #[doc(hidden)]
537 pub fn compute_backoff(&self, attempt: u32) -> Duration {
538 let exp = self
539 .retry_base
540 .saturating_mul(2u32.saturating_pow(attempt.min(20)));
541 let cap = std::cmp::min(exp, self.max_backoff);
542 let factor = jitter_factor();
548 let nanos = (cap.as_nanos() as f64 * factor) as u128;
549 Duration::from_nanos(nanos.min(u64::MAX as u128) as u64)
550 }
551
552 async fn send_once(
554 &self,
555 url: &str,
556 note: &ChangeNotification,
557 ) -> Result<reqwest::Response, reqwest::Error> {
558 let body = serde_json::to_vec(note).unwrap_or_default();
559 #[cfg(feature = "webhook-signing")]
560 let notification_id = note.id.clone();
561 #[cfg_attr(not(feature = "webhook-signing"), allow(unused_mut))]
562 let mut req = self
563 .client
564 .post(url)
565 .header("Content-Type", "application/ld+json");
566
567 #[cfg(feature = "webhook-signing")]
568 {
569 if let Some(cfg) = &self.signer {
570 let now = std::time::SystemTime::now()
571 .duration_since(std::time::UNIX_EPOCH)
572 .map(|d| d.as_secs())
573 .unwrap_or_default();
574 let signed = signing::sign_request(
575 cfg,
576 "POST",
577 url,
578 "application/ld+json",
579 &body,
580 ¬ification_id,
581 now,
582 );
583 for (name, value) in &signed.headers {
586 if name.eq_ignore_ascii_case("content-type") {
587 continue;
588 }
589 req = req.header(name.as_str(), value.as_str());
590 }
591 } else {
592 tracing::warn!(
593 "webhook manager delivering {} unsigned — consider configuring a SignerConfig",
594 url
595 );
596 }
597 }
598
599 req.body(body).send().await
600 }
601
602 pub async fn deliver_one(
605 &self,
606 url: &str,
607 note: &ChangeNotification,
608 ) -> WebhookDelivery {
609 if self.circuit_open() {
611 return WebhookDelivery::TransientRetry {
612 reason: "circuit open".to_string(),
613 };
614 }
615
616 let total_attempts = self.max_retries.saturating_add(1);
617 let mut attempt = 0u32;
618 loop {
619 let resp = self.send_once(url, note).await;
620 match resp {
621 Ok(r) => {
622 let status = r.status().as_u16();
623 if r.status().is_success() {
625 self.consecutive_failures
626 .store(0, std::sync::atomic::Ordering::Relaxed);
627 return WebhookDelivery::Delivered { status };
628 }
629 if status == 410 {
631 self.consecutive_failures
632 .store(0, std::sync::atomic::Ordering::Relaxed);
633 return WebhookDelivery::FatalDrop { status };
634 }
635 if status == 429 {
637 let retry_after = r
638 .headers()
639 .get("retry-after")
640 .and_then(|v| v.to_str().ok())
641 .and_then(Self::parse_retry_after)
642 .unwrap_or_else(|| self.compute_backoff(attempt));
643 attempt += 1;
644 if attempt >= total_attempts {
645 self.record_failure();
646 return WebhookDelivery::TransientRetry {
647 reason: format!("429 after {attempt} attempts"),
648 };
649 }
650 tokio::time::sleep(
651 retry_after.min(self.max_backoff),
652 )
653 .await;
654 continue;
655 }
656 if r.status().is_server_error() {
659 let wait = r
660 .headers()
661 .get("retry-after")
662 .and_then(|v| v.to_str().ok())
663 .and_then(Self::parse_retry_after)
664 .unwrap_or_else(|| self.compute_backoff(attempt));
665 attempt += 1;
666 if attempt >= total_attempts {
667 self.record_failure();
668 return WebhookDelivery::TransientRetry {
669 reason: format!("5xx after {attempt} attempts"),
670 };
671 }
672 tokio::time::sleep(wait.min(self.max_backoff)).await;
673 continue;
674 }
675 if r.status().is_client_error() {
678 let wait = self.compute_backoff(attempt);
679 attempt += 1;
680 if attempt >= total_attempts {
681 self.record_failure();
682 return WebhookDelivery::TransientRetry {
683 reason: format!("{status} after {attempt} attempts"),
684 };
685 }
686 tokio::time::sleep(wait.min(self.max_backoff)).await;
687 continue;
688 }
689 let wait = self.compute_backoff(attempt);
691 attempt += 1;
692 if attempt >= total_attempts {
693 self.record_failure();
694 return WebhookDelivery::TransientRetry {
695 reason: format!("status {status} after {attempt} attempts"),
696 };
697 }
698 tokio::time::sleep(wait.min(self.max_backoff)).await;
699 }
700 Err(e) => {
701 let wait = self.compute_backoff(attempt);
703 attempt += 1;
704 if attempt >= total_attempts {
705 self.record_failure();
706 return WebhookDelivery::TransientRetry {
707 reason: format!("network error: {e}"),
708 };
709 }
710 tokio::time::sleep(wait.min(self.max_backoff)).await;
711 }
712 }
713 }
714 }
715
716 fn record_failure(&self) {
717 self.consecutive_failures
718 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
719 }
720
721 pub async fn deliver_all(
724 &self,
725 note: &ChangeNotification,
726 topic_matches: impl Fn(&str) -> bool,
727 ) -> Vec<(String, WebhookDelivery)> {
728 let subs: Vec<Subscription> = {
729 let guard = self.subscriptions.read().await;
730 guard
731 .values()
732 .filter(|s| topic_matches(&s.topic))
733 .cloned()
734 .collect()
735 };
736 let mut out = Vec::with_capacity(subs.len());
737 let mut dropped = Vec::new();
738 for sub in subs {
739 let result = self.deliver_one(&sub.receive_from, note).await;
740 if matches!(result, WebhookDelivery::FatalDrop { .. }) {
741 dropped.push(sub.id.clone());
742 }
743 out.push((sub.id, result));
744 }
745 if !dropped.is_empty() {
746 let mut guard = self.subscriptions.write().await;
747 for id in dropped {
748 guard.remove(&id);
749 }
750 }
751 out
752 }
753
754 pub async fn pump_from_storage(
759 self,
760 mut rx: tokio::sync::mpsc::Receiver<StorageEvent>,
761 pod_base: String,
762 ) {
763 while let Some(event) = rx.recv().await {
764 let path = match &event {
765 StorageEvent::Created(p) | StorageEvent::Updated(p) | StorageEvent::Deleted(p) => {
766 p.clone()
767 }
768 };
769 let note = ChangeNotification::from_storage_event(&event, &pod_base);
770 self.deliver_all(¬e, |topic| path.starts_with(topic)).await;
771 }
772 }
773}
774
775#[async_trait]
776impl Notifications for WebhookChannelManager {
777 async fn subscribe(&self, subscription: Subscription) -> Result<(), PodError> {
778 self.subscriptions
779 .write()
780 .await
781 .insert(subscription.id.clone(), subscription);
782 Ok(())
783 }
784
785 async fn unsubscribe(&self, id: &str) -> Result<(), PodError> {
786 self.subscriptions.write().await.remove(id);
787 Ok(())
788 }
789
790 async fn publish(
791 &self,
792 topic: &str,
793 notification: ChangeNotification,
794 ) -> Result<(), PodError> {
795 let matches_topic = |t: &str| topic.starts_with(t) || t == topic;
796 self.deliver_all(¬ification, matches_topic).await;
797 Ok(())
798 }
799}
800
801pub fn discovery_document(pod_base: &str) -> serde_json::Value {
808 let base = pod_base.trim_end_matches('/');
809 serde_json::json!({
810 "@context": ["https://www.w3.org/ns/solid/notifications-context/v1"],
811 "id": format!("{base}/.notifications"),
812 "channelTypes": [
813 {
814 "id": "WebSocketChannel2023",
815 "endpoint": format!("{base}/.notifications/websocket"),
816 "features": ["as:Create", "as:Update", "as:Delete"]
817 },
818 {
819 "id": "WebhookChannel2023",
820 "endpoint": format!("{base}/.notifications/webhook"),
821 "features": ["as:Create", "as:Update", "as:Delete"]
822 }
823 ]
824 })
825}
826
827#[cfg(feature = "webhook-signing")]
837fn jitter_factor() -> f64 {
838 use rand::Rng;
839 rand::thread_rng().gen_range(0.8_f64..1.0_f64)
840}
841
842#[cfg(not(feature = "webhook-signing"))]
843fn jitter_factor() -> f64 {
844 use std::sync::atomic::{AtomicU64, Ordering};
845 static SEED: AtomicU64 = AtomicU64::new(0);
847 let seed = {
848 let n = std::time::Instant::now().elapsed().as_nanos() as u64;
849 let prev = SEED.fetch_add(n | 1, Ordering::Relaxed);
850 prev.wrapping_add(n).wrapping_add(0x9E3779B97F4A7C15)
851 };
852 let mut x = seed;
853 x = (x ^ (x >> 30)).wrapping_mul(0xBF58476D1CE4E5B9);
854 x = (x ^ (x >> 27)).wrapping_mul(0x94D049BB133111EB);
855 x ^= x >> 31;
856 let unit = (x >> 11) as f64 / (1u64 << 53) as f64;
858 0.8 + unit * 0.2
859}
860
861fn urlencoding(s: &str) -> String {
866 let mut out = String::with_capacity(s.len());
867 for b in s.bytes() {
868 match b {
869 b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' | b'/' => {
870 out.push(b as char);
871 }
872 _ => {
873 out.push_str(&format!("%{:02X}", b));
874 }
875 }
876 }
877 out
878}
879
880#[cfg(test)]
885mod tests {
886 use super::*;
887
888 #[tokio::test]
889 async fn subscribe_unsubscribe_roundtrip() {
890 let n = InMemoryNotifications::new();
891 let sub = Subscription {
892 id: "sub-1".into(),
893 topic: "/public/".into(),
894 channel_type: ChannelType::WebhookChannel2023,
895 receive_from: "https://example.com/hook".into(),
896 };
897 n.subscribe(sub.clone()).await.unwrap();
898 n.unsubscribe("sub-1").await.unwrap();
899 n.publish(
900 "/public/",
901 ChangeNotification {
902 context: as_ns::CONTEXT.into(),
903 id: "urn:uuid:test".into(),
904 kind: "Update".into(),
905 object: "/public/x".into(),
906 published: chrono::Utc::now().to_rfc3339(),
907 },
908 )
909 .await
910 .unwrap();
911 }
912
913 #[tokio::test]
914 async fn websocket_manager_broadcasts_events() {
915 let m = WebSocketChannelManager::new();
916 let mut rx = m.stream();
917 let sub = m.subscribe("/public/", "wss://pod.example").await;
918 assert_eq!(sub.channel_type, ChannelType::WebSocketChannel2023);
919 assert!(sub.receive_from.contains("/subscription/"));
920
921 let note = ChangeNotification::from_storage_event(
922 &StorageEvent::Created("/public/x".into()),
923 "https://pod.example",
924 );
925 m.publish("/public/", note.clone()).await.unwrap();
926 let received = tokio::time::timeout(Duration::from_secs(1), rx.recv())
927 .await
928 .unwrap()
929 .unwrap();
930 assert_eq!(received.kind, "Create");
931 assert_eq!(received.object, "https://pod.example/public/x");
932 }
933
934 #[tokio::test]
935 async fn change_notification_maps_event_types() {
936 let c = ChangeNotification::from_storage_event(
937 &StorageEvent::Created("/x".into()),
938 "https://p.example",
939 );
940 assert_eq!(c.kind, "Create");
941 let u = ChangeNotification::from_storage_event(
942 &StorageEvent::Updated("/x".into()),
943 "https://p.example",
944 );
945 assert_eq!(u.kind, "Update");
946 let d = ChangeNotification::from_storage_event(
947 &StorageEvent::Deleted("/x".into()),
948 "https://p.example",
949 );
950 assert_eq!(d.kind, "Delete");
951 }
952
953 #[test]
954 fn discovery_lists_both_channels() {
955 let doc = discovery_document("https://pod.example");
956 let arr = doc["channelTypes"].as_array().unwrap();
957 assert_eq!(arr.len(), 2);
958 let ids: Vec<&str> = arr.iter().map(|v| v["id"].as_str().unwrap()).collect();
959 assert!(ids.contains(&"WebSocketChannel2023"));
960 assert!(ids.contains(&"WebhookChannel2023"));
961 }
962
963 #[test]
964 fn webhook_manager_default_retries() {
965 let m = WebhookChannelManager::new();
966 assert_eq!(m.max_retries, 3);
967 }
968
969 #[tokio::test]
970 async fn websocket_active_subscriptions_count() {
971 let m = WebSocketChannelManager::new();
972 assert_eq!(m.active_subscriptions().await, 0);
973 let s = m.subscribe("/a/", "wss://p").await;
974 assert_eq!(m.active_subscriptions().await, 1);
975 m.unsubscribe(&s.id).await;
976 assert_eq!(m.active_subscriptions().await, 0);
977 }
978
979 #[tokio::test]
980 async fn inmemory_bounded_evicts_oldest_at_capacity() {
981 let n = InMemoryNotifications::with_capacity(3);
982 for i in 0..3 {
983 let sub = Subscription {
984 id: format!("sub-{i}"),
985 topic: "/t/".into(),
986 channel_type: ChannelType::WebhookChannel2023,
987 receive_from: format!("https://example.com/hook-{i}"),
988 };
989 n.subscribe(sub).await.unwrap();
990 }
991 let sub4 = Subscription {
993 id: "sub-3".into(),
994 topic: "/t/".into(),
995 channel_type: ChannelType::WebhookChannel2023,
996 receive_from: "https://example.com/hook-3".into(),
997 };
998 n.subscribe(sub4).await.unwrap();
999
1000 let guard = n.inner.read().await;
1002 assert_eq!(guard.total_count, 3);
1003 let subs = guard.topics.get("/t/").unwrap();
1005 assert!(!subs.iter().any(|s| s.id == "sub-0"));
1006 assert!(subs.iter().any(|s| s.id == "sub-3"));
1007 }
1008
1009 #[tokio::test]
1010 async fn inmemory_unsubscribe_decrements_total_count() {
1011 let n = InMemoryNotifications::with_capacity(100);
1012 let sub = Subscription {
1013 id: "sub-x".into(),
1014 topic: "/x/".into(),
1015 channel_type: ChannelType::WebhookChannel2023,
1016 receive_from: "https://example.com/hook".into(),
1017 };
1018 n.subscribe(sub).await.unwrap();
1019 {
1020 let guard = n.inner.read().await;
1021 assert_eq!(guard.total_count, 1);
1022 }
1023 n.unsubscribe("sub-x").await.unwrap();
1024 {
1025 let guard = n.inner.read().await;
1026 assert_eq!(guard.total_count, 0);
1027 assert!(guard.topics.is_empty());
1029 }
1030 }
1031}