Skip to main content

solid_pod_rs/notifications/
mod.rs

1//! Solid Notifications Protocol (0.2) — Phase 2 implementation.
2//!
3//! Ships both `WebSocketChannel2023` and `WebhookChannel2023` channel
4//! types on top of a `broadcast::Sender<StorageEvent>` fed by the
5//! `Storage::watch()` method added in Phase 1.
6//!
7//! Reference: <https://solid.github.io/notifications/protocol/>
8//!
9//! Payload shape (per spec §7, Activity Streams 2.0 on JSON-LD):
10//!
11//! ```json
12//! {
13//!   "@context": "https://www.w3.org/ns/activitystreams",
14//!   "id": "urn:uuid:...",
15//!   "type": "Create" | "Update" | "Delete",
16//!   "object": "https://pod.example.com/path",
17//!   "published": "2025-04-20T12:00:00Z"
18//! }
19//! ```
20
21use std::collections::{HashMap, VecDeque};
22use std::sync::Arc;
23use std::time::Duration;
24
25use async_trait::async_trait;
26use serde::{Deserialize, Serialize};
27use tokio::sync::{broadcast, RwLock};
28
29use crate::error::PodError;
30use crate::storage::StorageEvent;
31
32// F3 (Sprint 4): Legacy `solid-0.1` notification adapter for SolidOS
33// data-browser compat. Feature-gated — zero runtime cost when off.
34#[cfg(feature = "legacy-notifications")]
35pub mod legacy;
36
37// Sprint 6 C: RFC 9421 HTTP Message Signatures for webhook deliveries.
38// Gated behind `webhook-signing`; when disabled, the signer pathway is
39// compiled out entirely and the manager remains drop-in compatible
40// with older consumers.
41#[cfg(feature = "webhook-signing")]
42pub mod signing;
43
44/// `as:` type URIs per Activity Streams 2.0.
45pub mod as_ns {
46    pub const CONTEXT: &str = "https://www.w3.org/ns/activitystreams";
47    pub const CREATE: &str = "Create";
48    pub const UPDATE: &str = "Update";
49    pub const DELETE: &str = "Delete";
50}
51
52/// Channel type advertised by `.notifications` discovery.
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
54#[serde(rename_all = "PascalCase")]
55pub enum ChannelType {
56    WebSocketChannel2023,
57    WebhookChannel2023,
58}
59
60/// A single subscription record.
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct Subscription {
63    /// Opaque subscription id (UUID in practice).
64    pub id: String,
65    /// Target resource/container path the client is interested in.
66    pub topic: String,
67    /// Which channel the client requested.
68    pub channel_type: ChannelType,
69    /// For webhooks: the URL the server will POST to. For
70    /// WebSockets: the URL the client should connect to (populated
71    /// by the server on subscription creation).
72    pub receive_from: String,
73}
74
75/// Activity Streams 2.0 change notification payload.
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct ChangeNotification {
78    #[serde(rename = "@context")]
79    pub context: String,
80    pub id: String,
81    #[serde(rename = "type")]
82    pub kind: String,
83    pub object: String,
84    pub published: String,
85}
86
87impl ChangeNotification {
88    /// Build a notification from a `StorageEvent`.
89    pub fn from_storage_event(event: &StorageEvent, pod_base: &str) -> Self {
90        let (kind, path) = match event {
91            StorageEvent::Created(p) => (as_ns::CREATE, p),
92            StorageEvent::Updated(p) => (as_ns::UPDATE, p),
93            StorageEvent::Deleted(p) => (as_ns::DELETE, p),
94        };
95        let object = format!("{}{}", pod_base.trim_end_matches('/'), path);
96        Self {
97            context: as_ns::CONTEXT.to_string(),
98            id: format!("urn:uuid:{}", uuid::Uuid::new_v4()),
99            kind: kind.to_string(),
100            object,
101            published: chrono::Utc::now().to_rfc3339(),
102        }
103    }
104}
105
106/// Public trait for notification backends.
107#[async_trait]
108pub trait Notifications: Send + Sync {
109    /// Register a subscription for a topic.
110    async fn subscribe(&self, subscription: Subscription) -> Result<(), PodError>;
111
112    /// Remove a subscription.
113    async fn unsubscribe(&self, id: &str) -> Result<(), PodError>;
114
115    /// Deliver a notification to all subscribers of `topic`.
116    async fn publish(
117        &self,
118        topic: &str,
119        notification: ChangeNotification,
120    ) -> Result<(), PodError>;
121}
122
123// ---------------------------------------------------------------------------
124// In-memory backend (shared by both channel types)
125// ---------------------------------------------------------------------------
126
127/// Default maximum number of subscriptions held across all topics.
128/// Prevents OOM under ActivityPub federation firehose conditions.
129pub const DEFAULT_MAX_SUBSCRIPTIONS: usize = 10_000;
130
131#[derive(Clone)]
132pub struct InMemoryNotifications {
133    inner: Arc<RwLock<InMemoryInner>>,
134}
135
136#[derive(Clone)]
137struct InMemoryInner {
138    topics: HashMap<String, VecDeque<Subscription>>,
139    /// Maximum total subscriptions across all topics.
140    max_capacity: usize,
141    /// Running total so we avoid recomputing on every insert.
142    total_count: usize,
143}
144
145impl Default for InMemoryInner {
146    fn default() -> Self {
147        Self {
148            topics: HashMap::new(),
149            max_capacity: DEFAULT_MAX_SUBSCRIPTIONS,
150            total_count: 0,
151        }
152    }
153}
154
155impl Default for InMemoryNotifications {
156    fn default() -> Self {
157        Self {
158            inner: Arc::new(RwLock::new(InMemoryInner::default())),
159        }
160    }
161}
162
163impl InMemoryNotifications {
164    pub fn new() -> Self {
165        Self::default()
166    }
167
168    /// Create with a custom maximum subscription capacity.
169    pub fn with_capacity(max_capacity: usize) -> Self {
170        Self {
171            inner: Arc::new(RwLock::new(InMemoryInner {
172                topics: HashMap::new(),
173                max_capacity,
174                total_count: 0,
175            })),
176        }
177    }
178}
179
180#[async_trait]
181impl Notifications for InMemoryNotifications {
182    async fn subscribe(&self, subscription: Subscription) -> Result<(), PodError> {
183        let mut guard = self.inner.write().await;
184        // Evict oldest subscription across all topics when at capacity.
185        if guard.total_count >= guard.max_capacity {
186            // Find the first non-empty topic and pop its oldest entry.
187            let evict_topic = guard
188                .topics
189                .iter()
190                .find(|(_, subs)| !subs.is_empty())
191                .map(|(t, _)| t.clone());
192            if let Some(topic_key) = evict_topic {
193                let now_empty = {
194                    let subs = guard.topics.get_mut(&topic_key).unwrap();
195                    subs.pop_front();
196                    subs.is_empty()
197                };
198                guard.total_count = guard.total_count.saturating_sub(1);
199                if now_empty {
200                    guard.topics.remove(&topic_key);
201                }
202            }
203        }
204        guard
205            .topics
206            .entry(subscription.topic.clone())
207            .or_default()
208            .push_back(subscription);
209        guard.total_count += 1;
210        Ok(())
211    }
212
213    async fn unsubscribe(&self, id: &str) -> Result<(), PodError> {
214        let mut guard = self.inner.write().await;
215        let mut removed = 0usize;
216        for subs in guard.topics.values_mut() {
217            let before = subs.len();
218            subs.retain(|s| s.id != id);
219            removed += before - subs.len();
220        }
221        guard.total_count = guard.total_count.saturating_sub(removed);
222        // Remove empty topic entries to avoid map bloat.
223        guard.topics.retain(|_, subs| !subs.is_empty());
224        Ok(())
225    }
226
227    async fn publish(
228        &self,
229        topic: &str,
230        _notification: ChangeNotification,
231    ) -> Result<(), PodError> {
232        let guard = self.inner.read().await;
233        let _ = guard.topics.get(topic);
234        Ok(())
235    }
236}
237
238// ---------------------------------------------------------------------------
239// WebSocketChannel2023
240// ---------------------------------------------------------------------------
241
242/// WebSocket-based notification channel. The manager maintains the
243/// list of subscriptions and emits serialised change notifications on
244/// a `tokio::sync::broadcast` channel that upstream HTTP servers
245/// attach WebSocket tasks to.
246#[derive(Clone)]
247pub struct WebSocketChannelManager {
248    subscriptions: Arc<RwLock<HashMap<String, Subscription>>>,
249    sender: broadcast::Sender<ChangeNotification>,
250    heartbeat_interval: Duration,
251}
252
253impl Default for WebSocketChannelManager {
254    fn default() -> Self {
255        Self::new()
256    }
257}
258
259impl WebSocketChannelManager {
260    pub fn new() -> Self {
261        let (tx, _) = broadcast::channel(1024);
262        Self {
263            subscriptions: Arc::new(RwLock::new(HashMap::new())),
264            sender: tx,
265            heartbeat_interval: Duration::from_secs(30),
266        }
267    }
268
269    /// Override the heartbeat interval (default 30s).
270    pub fn with_heartbeat(mut self, interval: Duration) -> Self {
271        self.heartbeat_interval = interval;
272        self
273    }
274
275    /// Internal test hook.
276    pub fn heartbeat_interval(&self) -> Duration {
277        self.heartbeat_interval
278    }
279
280    /// Register a new WebSocket subscription. Returns the
281    /// `receive_from` URL the client should connect to.
282    pub async fn subscribe(&self, topic: &str, base_url: &str) -> Subscription {
283        let id = uuid::Uuid::new_v4().to_string();
284        let receive_from = format!(
285            "{}/subscription/{}",
286            base_url.trim_end_matches('/'),
287            urlencoding(topic)
288        );
289        let sub = Subscription {
290            id: id.clone(),
291            topic: topic.to_string(),
292            channel_type: ChannelType::WebSocketChannel2023,
293            receive_from,
294        };
295        self.subscriptions.write().await.insert(id, sub.clone());
296        sub
297    }
298
299    /// Remove a subscription.
300    pub async fn unsubscribe(&self, id: &str) {
301        self.subscriptions.write().await.remove(id);
302    }
303
304    /// Subscribe to the broadcast stream. Each delivered message is a
305    /// pre-serialised `ChangeNotification` that the transport layer
306    /// writes to the WebSocket frame.
307    pub fn stream(&self) -> broadcast::Receiver<ChangeNotification> {
308        self.sender.subscribe()
309    }
310
311    /// Number of active subscriptions.
312    pub async fn active_subscriptions(&self) -> usize {
313        self.subscriptions.read().await.len()
314    }
315
316    /// Attach this manager to a stream of storage events. Each event
317    /// is translated into an Activity Streams notification and
318    /// broadcast to every connected client whose subscription topic
319    /// covers the event path.
320    pub async fn pump_from_storage(
321        self,
322        mut rx: tokio::sync::mpsc::Receiver<StorageEvent>,
323        pod_base: String,
324    ) {
325        while let Some(event) = rx.recv().await {
326            let note = ChangeNotification::from_storage_event(&event, &pod_base);
327            let _ = self.sender.send(note);
328        }
329    }
330}
331
332#[async_trait]
333impl Notifications for WebSocketChannelManager {
334    async fn subscribe(&self, subscription: Subscription) -> Result<(), PodError> {
335        self.subscriptions
336            .write()
337            .await
338            .insert(subscription.id.clone(), subscription);
339        Ok(())
340    }
341
342    async fn unsubscribe(&self, id: &str) -> Result<(), PodError> {
343        self.subscriptions.write().await.remove(id);
344        Ok(())
345    }
346
347    async fn publish(
348        &self,
349        _topic: &str,
350        notification: ChangeNotification,
351    ) -> Result<(), PodError> {
352        let _ = self.sender.send(notification);
353        Ok(())
354    }
355}
356
357// ---------------------------------------------------------------------------
358// WebhookChannel2023
359// ---------------------------------------------------------------------------
360
361/// Outcome of a webhook delivery attempt.
362#[derive(Debug, Clone, PartialEq, Eq)]
363pub enum WebhookDelivery {
364    /// 2xx response from the webhook target.
365    Delivered { status: u16 },
366    /// 4xx response — subscription is dropped.
367    FatalDrop { status: u16 },
368    /// 5xx or network — retry will be scheduled.
369    TransientRetry { reason: String },
370}
371
372/// Webhook notification channel with built-in retry logic. The
373/// manager keeps an internal map of subscriptions → target URL, and
374/// `deliver_all()` POSTs the Activity Streams payload to each target.
375///
376/// Sprint 6 C additions (ADR-058):
377/// * Optional RFC 9421 HTTP Message Signatures via [`Self::with_signer`].
378/// * `Retry-After` honoured on 429.
379/// * 410 Gone treated as `FatalDrop`; other 4xx retried as transient.
380/// * Full-jitter exponential back-off bounded by `max_backoff`.
381/// * Simple per-manager circuit breaker — consecutive failures are
382///   counted across `deliver_one` calls; once the threshold is reached
383///   further calls short-circuit to [`WebhookDelivery::TransientRetry`]
384///   with a `circuit open` reason until a successful delivery resets
385///   the counter.
386#[derive(Clone)]
387pub struct WebhookChannelManager {
388    client: reqwest::Client,
389    subscriptions: Arc<RwLock<HashMap<String, Subscription>>>,
390    /// Exponential backoff base (starting delay). Default 500ms.
391    pub retry_base: Duration,
392    /// Max retry attempts on 5xx. Default 3 (preserved for backward
393    /// compat; tests that exercise Sprint 6 semantics call
394    /// `with_max_attempts` explicitly).
395    pub max_retries: u32,
396    /// Cap on a single back-off wait. Default 1h.
397    pub max_backoff: Duration,
398    /// Sprint 6 C: consecutive failures before the circuit opens.
399    pub circuit_threshold: u32,
400    /// Current consecutive-failure counter; shared across clones so a
401    /// single logical channel shares breaker state.
402    consecutive_failures: Arc<std::sync::atomic::AtomicU32>,
403    /// Optional RFC 9421 signer. `None` leaves requests unsigned
404    /// (legacy behaviour) and emits a one-shot `tracing::warn` on
405    /// first use.
406    #[cfg(feature = "webhook-signing")]
407    signer: Option<signing::SignerConfig>,
408}
409
410impl Default for WebhookChannelManager {
411    fn default() -> Self {
412        Self::new()
413    }
414}
415
416impl WebhookChannelManager {
417    pub fn new() -> Self {
418        Self {
419            client: reqwest::Client::builder()
420                .timeout(Duration::from_secs(10))
421                .build()
422                .unwrap_or_default(),
423            subscriptions: Arc::new(RwLock::new(HashMap::new())),
424            retry_base: Duration::from_millis(500),
425            max_retries: 3,
426            max_backoff: Duration::from_secs(3600),
427            circuit_threshold: 10,
428            consecutive_failures: Arc::new(std::sync::atomic::AtomicU32::new(0)),
429            #[cfg(feature = "webhook-signing")]
430            signer: None,
431        }
432    }
433
434    /// Create a manager with a specific `reqwest::Client` (used in
435    /// tests with wiremock).
436    pub fn with_client(client: reqwest::Client) -> Self {
437        let mut m = Self::new();
438        m.client = client;
439        m
440    }
441
442    /// Sprint 6 C: attach an RFC 9421 signer. Subsequent deliveries
443    /// attach `Signature-Input` / `Signature` headers.
444    #[cfg(feature = "webhook-signing")]
445    pub fn with_signer(mut self, signer: signing::SignerConfig) -> Self {
446        self.signer = Some(signer);
447        self
448    }
449
450    /// Override the max attempts (1 == no retries). Default 5.
451    pub fn with_max_attempts(mut self, attempts: u32) -> Self {
452        // Internally we still carry `max_retries` so older public API
453        // callers keep working. `max_retries` is the *retry* count,
454        // i.e. one less than the total attempts.
455        self.max_retries = attempts.saturating_sub(1);
456        self
457    }
458
459    /// Override the maximum single back-off wait. Default 1h.
460    pub fn with_max_backoff(mut self, max: Duration) -> Self {
461        self.max_backoff = max;
462        self
463    }
464
465    /// Override the consecutive-failure threshold that opens the
466    /// breaker. Default 10.
467    pub fn with_circuit_threshold(mut self, threshold: u32) -> Self {
468        self.circuit_threshold = threshold;
469        self
470    }
471
472    /// True iff the breaker is currently open.
473    pub fn circuit_open(&self) -> bool {
474        self.consecutive_failures
475            .load(std::sync::atomic::Ordering::Relaxed)
476            >= self.circuit_threshold
477    }
478
479    /// Current consecutive-failure count. Public for observability and
480    /// tests.
481    pub fn consecutive_failures(&self) -> u32 {
482        self.consecutive_failures
483            .load(std::sync::atomic::Ordering::Relaxed)
484    }
485
486    /// Reset the consecutive-failure counter (e.g. after operator
487    /// intervention). Test hook, also exposed for admin UIs.
488    pub fn reset_circuit(&self) {
489        self.consecutive_failures
490            .store(0, std::sync::atomic::Ordering::Relaxed);
491    }
492
493    pub async fn subscribe(&self, topic: &str, target_url: &str) -> Subscription {
494        let sub = Subscription {
495            id: uuid::Uuid::new_v4().to_string(),
496            topic: topic.to_string(),
497            channel_type: ChannelType::WebhookChannel2023,
498            receive_from: target_url.to_string(),
499        };
500        self.subscriptions
501            .write()
502            .await
503            .insert(sub.id.clone(), sub.clone());
504        sub
505    }
506
507    pub async fn unsubscribe(&self, id: &str) {
508        self.subscriptions.write().await.remove(id);
509    }
510
511    pub async fn active_subscriptions(&self) -> usize {
512        self.subscriptions.read().await.len()
513    }
514
515    /// Parse an HTTP `Retry-After` header value — either a
516    /// delta-seconds integer (RFC 7231 §7.1.3) or an HTTP-date.
517    fn parse_retry_after(raw: &str) -> Option<Duration> {
518        if let Ok(secs) = raw.trim().parse::<u64>() {
519            return Some(Duration::from_secs(secs));
520        }
521        #[cfg(feature = "webhook-signing")]
522        {
523            if let Ok(when) = httpdate::parse_http_date(raw.trim()) {
524                if let Ok(delta) = when.duration_since(std::time::SystemTime::now()) {
525                    return Some(delta);
526                }
527            }
528        }
529        None
530    }
531
532    /// Full-jitter back-off: a random value in `[0.8 * cap, cap]`
533    /// where `cap = min(base * 2^attempt, max_backoff)`. The 20%
534    /// jitter window is what `tests/webhook_retry.rs::webhook_jitter_within_window`
535    /// asserts. Public for testability — not stable API.
536    #[doc(hidden)]
537    pub fn compute_backoff(&self, attempt: u32) -> Duration {
538        let exp = self
539            .retry_base
540            .saturating_mul(2u32.saturating_pow(attempt.min(20)));
541        let cap = std::cmp::min(exp, self.max_backoff);
542        // Jitter: pick a factor in [0.8, 1.0] so each back-off stays
543        // within 20% of the deterministic ceiling (tests assert the
544        // ±20% window). Uses the OS RNG when `webhook-signing` pulls
545        // `rand` in, otherwise falls back to a cheap time-based
546        // perturbation good enough for the jitter test.
547        let factor = jitter_factor();
548        let nanos = (cap.as_nanos() as f64 * factor) as u128;
549        Duration::from_nanos(nanos.min(u64::MAX as u128) as u64)
550    }
551
552    /// Build and send a single HTTP request, optionally signed.
553    async fn send_once(
554        &self,
555        url: &str,
556        note: &ChangeNotification,
557    ) -> Result<reqwest::Response, reqwest::Error> {
558        let body = serde_json::to_vec(note).unwrap_or_default();
559        #[cfg(feature = "webhook-signing")]
560        let notification_id = note.id.clone();
561        #[cfg_attr(not(feature = "webhook-signing"), allow(unused_mut))]
562        let mut req = self
563            .client
564            .post(url)
565            .header("Content-Type", "application/ld+json");
566
567        #[cfg(feature = "webhook-signing")]
568        {
569            if let Some(cfg) = &self.signer {
570                let now = std::time::SystemTime::now()
571                    .duration_since(std::time::UNIX_EPOCH)
572                    .map(|d| d.as_secs())
573                    .unwrap_or_default();
574                let signed = signing::sign_request(
575                    cfg,
576                    "POST",
577                    url,
578                    "application/ld+json",
579                    &body,
580                    &notification_id,
581                    now,
582                );
583                // send_once rebuilds the Content-Type header itself;
584                // attach every *other* header from the signer.
585                for (name, value) in &signed.headers {
586                    if name.eq_ignore_ascii_case("content-type") {
587                        continue;
588                    }
589                    req = req.header(name.as_str(), value.as_str());
590                }
591            } else {
592                tracing::warn!(
593                    "webhook manager delivering {} unsigned — consider configuring a SignerConfig",
594                    url
595                );
596            }
597        }
598
599        req.body(body).send().await
600    }
601
602    /// Deliver a single event to a single webhook URL, with full
603    /// Sprint 6 C retry / back-off / circuit-breaker semantics.
604    pub async fn deliver_one(
605        &self,
606        url: &str,
607        note: &ChangeNotification,
608    ) -> WebhookDelivery {
609        // Circuit breaker — bail before touching the network if open.
610        if self.circuit_open() {
611            return WebhookDelivery::TransientRetry {
612                reason: "circuit open".to_string(),
613            };
614        }
615
616        let total_attempts = self.max_retries.saturating_add(1);
617        let mut attempt = 0u32;
618        loop {
619            let resp = self.send_once(url, note).await;
620            match resp {
621                Ok(r) => {
622                    let status = r.status().as_u16();
623                    // 2xx — success resets the breaker.
624                    if r.status().is_success() {
625                        self.consecutive_failures
626                            .store(0, std::sync::atomic::Ordering::Relaxed);
627                        return WebhookDelivery::Delivered { status };
628                    }
629                    // 410 Gone — receiver asked to be unsubscribed.
630                    if status == 410 {
631                        self.consecutive_failures
632                            .store(0, std::sync::atomic::Ordering::Relaxed);
633                        return WebhookDelivery::FatalDrop { status };
634                    }
635                    // 429 — honour Retry-After then retry.
636                    if status == 429 {
637                        let retry_after = r
638                            .headers()
639                            .get("retry-after")
640                            .and_then(|v| v.to_str().ok())
641                            .and_then(Self::parse_retry_after)
642                            .unwrap_or_else(|| self.compute_backoff(attempt));
643                        attempt += 1;
644                        if attempt >= total_attempts {
645                            self.record_failure();
646                            return WebhookDelivery::TransientRetry {
647                                reason: format!("429 after {attempt} attempts"),
648                            };
649                        }
650                        tokio::time::sleep(
651                            retry_after.min(self.max_backoff),
652                        )
653                        .await;
654                        continue;
655                    }
656                    // 5xx (incl. 503 with Retry-After) — retry with
657                    // back-off, honouring Retry-After if present.
658                    if r.status().is_server_error() {
659                        let wait = r
660                            .headers()
661                            .get("retry-after")
662                            .and_then(|v| v.to_str().ok())
663                            .and_then(Self::parse_retry_after)
664                            .unwrap_or_else(|| self.compute_backoff(attempt));
665                        attempt += 1;
666                        if attempt >= total_attempts {
667                            self.record_failure();
668                            return WebhookDelivery::TransientRetry {
669                                reason: format!("5xx after {attempt} attempts"),
670                            };
671                        }
672                        tokio::time::sleep(wait.min(self.max_backoff)).await;
673                        continue;
674                    }
675                    // Other 4xx (401/403/404/422/…) — subscription
676                    // stays alive; retry with back-off.
677                    if r.status().is_client_error() {
678                        let wait = self.compute_backoff(attempt);
679                        attempt += 1;
680                        if attempt >= total_attempts {
681                            self.record_failure();
682                            return WebhookDelivery::TransientRetry {
683                                reason: format!("{status} after {attempt} attempts"),
684                            };
685                        }
686                        tokio::time::sleep(wait.min(self.max_backoff)).await;
687                        continue;
688                    }
689                    // 3xx/1xx — treat as transient.
690                    let wait = self.compute_backoff(attempt);
691                    attempt += 1;
692                    if attempt >= total_attempts {
693                        self.record_failure();
694                        return WebhookDelivery::TransientRetry {
695                            reason: format!("status {status} after {attempt} attempts"),
696                        };
697                    }
698                    tokio::time::sleep(wait.min(self.max_backoff)).await;
699                }
700                Err(e) => {
701                    // Network error — same treatment as 5xx.
702                    let wait = self.compute_backoff(attempt);
703                    attempt += 1;
704                    if attempt >= total_attempts {
705                        self.record_failure();
706                        return WebhookDelivery::TransientRetry {
707                            reason: format!("network error: {e}"),
708                        };
709                    }
710                    tokio::time::sleep(wait.min(self.max_backoff)).await;
711                }
712            }
713        }
714    }
715
716    fn record_failure(&self) {
717        self.consecutive_failures
718            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
719    }
720
721    /// Deliver the notification to every matching subscription.
722    /// Returns the per-subscription outcome.
723    pub async fn deliver_all(
724        &self,
725        note: &ChangeNotification,
726        topic_matches: impl Fn(&str) -> bool,
727    ) -> Vec<(String, WebhookDelivery)> {
728        let subs: Vec<Subscription> = {
729            let guard = self.subscriptions.read().await;
730            guard
731                .values()
732                .filter(|s| topic_matches(&s.topic))
733                .cloned()
734                .collect()
735        };
736        let mut out = Vec::with_capacity(subs.len());
737        let mut dropped = Vec::new();
738        for sub in subs {
739            let result = self.deliver_one(&sub.receive_from, note).await;
740            if matches!(result, WebhookDelivery::FatalDrop { .. }) {
741                dropped.push(sub.id.clone());
742            }
743            out.push((sub.id, result));
744        }
745        if !dropped.is_empty() {
746            let mut guard = self.subscriptions.write().await;
747            for id in dropped {
748                guard.remove(&id);
749            }
750        }
751        out
752    }
753
754    /// Attach the manager to a storage event stream. Each event is
755    /// translated into an Activity Streams notification and delivered
756    /// to every subscription whose topic is a prefix of the event
757    /// path.
758    pub async fn pump_from_storage(
759        self,
760        mut rx: tokio::sync::mpsc::Receiver<StorageEvent>,
761        pod_base: String,
762    ) {
763        while let Some(event) = rx.recv().await {
764            let path = match &event {
765                StorageEvent::Created(p) | StorageEvent::Updated(p) | StorageEvent::Deleted(p) => {
766                    p.clone()
767                }
768            };
769            let note = ChangeNotification::from_storage_event(&event, &pod_base);
770            self.deliver_all(&note, |topic| path.starts_with(topic)).await;
771        }
772    }
773}
774
775#[async_trait]
776impl Notifications for WebhookChannelManager {
777    async fn subscribe(&self, subscription: Subscription) -> Result<(), PodError> {
778        self.subscriptions
779            .write()
780            .await
781            .insert(subscription.id.clone(), subscription);
782        Ok(())
783    }
784
785    async fn unsubscribe(&self, id: &str) -> Result<(), PodError> {
786        self.subscriptions.write().await.remove(id);
787        Ok(())
788    }
789
790    async fn publish(
791        &self,
792        topic: &str,
793        notification: ChangeNotification,
794    ) -> Result<(), PodError> {
795        let matches_topic = |t: &str| topic.starts_with(t) || t == topic;
796        self.deliver_all(&notification, matches_topic).await;
797        Ok(())
798    }
799}
800
801// ---------------------------------------------------------------------------
802// Subscription discovery (.notifications)
803// ---------------------------------------------------------------------------
804
805/// Build the subscription-discovery JSON-LD document served at
806/// `.notifications` per the Notifications Protocol §5.
807pub fn discovery_document(pod_base: &str) -> serde_json::Value {
808    let base = pod_base.trim_end_matches('/');
809    serde_json::json!({
810        "@context": ["https://www.w3.org/ns/solid/notifications-context/v1"],
811        "id": format!("{base}/.notifications"),
812        "channelTypes": [
813            {
814                "id": "WebSocketChannel2023",
815                "endpoint": format!("{base}/.notifications/websocket"),
816                "features": ["as:Create", "as:Update", "as:Delete"]
817            },
818            {
819                "id": "WebhookChannel2023",
820                "endpoint": format!("{base}/.notifications/webhook"),
821                "features": ["as:Create", "as:Update", "as:Delete"]
822            }
823        ]
824    })
825}
826
827// ---------------------------------------------------------------------------
828// Jitter helper — Sprint 6 C. Returns a multiplier in [0.8, 1.0]. When
829// the `webhook-signing` feature is enabled we use the `rand` OS RNG;
830// otherwise we derive a deterministic-but-varying factor from the
831// monotonic clock, which gives enough dispersion across a hundred
832// trials for the back-off jitter test to pass without a new
833// always-on dependency.
834// ---------------------------------------------------------------------------
835
836#[cfg(feature = "webhook-signing")]
837fn jitter_factor() -> f64 {
838    use rand::Rng;
839    rand::thread_rng().gen_range(0.8_f64..1.0_f64)
840}
841
842#[cfg(not(feature = "webhook-signing"))]
843fn jitter_factor() -> f64 {
844    use std::sync::atomic::{AtomicU64, Ordering};
845    // Splitmix64 step seeded by the monotonic nanoseconds.
846    static SEED: AtomicU64 = AtomicU64::new(0);
847    let seed = {
848        let n = std::time::Instant::now().elapsed().as_nanos() as u64;
849        let prev = SEED.fetch_add(n | 1, Ordering::Relaxed);
850        prev.wrapping_add(n).wrapping_add(0x9E3779B97F4A7C15)
851    };
852    let mut x = seed;
853    x = (x ^ (x >> 30)).wrapping_mul(0xBF58476D1CE4E5B9);
854    x = (x ^ (x >> 27)).wrapping_mul(0x94D049BB133111EB);
855    x ^= x >> 31;
856    // Map to [0.8, 1.0).
857    let unit = (x >> 11) as f64 / (1u64 << 53) as f64;
858    0.8 + unit * 0.2
859}
860
861// ---------------------------------------------------------------------------
862// Small util: percent-encode path for use in URLs.
863// ---------------------------------------------------------------------------
864
865fn urlencoding(s: &str) -> String {
866    let mut out = String::with_capacity(s.len());
867    for b in s.bytes() {
868        match b {
869            b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' | b'/' => {
870                out.push(b as char);
871            }
872            _ => {
873                out.push_str(&format!("%{:02X}", b));
874            }
875        }
876    }
877    out
878}
879
880// ---------------------------------------------------------------------------
881// Tests
882// ---------------------------------------------------------------------------
883
884#[cfg(test)]
885mod tests {
886    use super::*;
887
888    #[tokio::test]
889    async fn subscribe_unsubscribe_roundtrip() {
890        let n = InMemoryNotifications::new();
891        let sub = Subscription {
892            id: "sub-1".into(),
893            topic: "/public/".into(),
894            channel_type: ChannelType::WebhookChannel2023,
895            receive_from: "https://example.com/hook".into(),
896        };
897        n.subscribe(sub.clone()).await.unwrap();
898        n.unsubscribe("sub-1").await.unwrap();
899        n.publish(
900            "/public/",
901            ChangeNotification {
902                context: as_ns::CONTEXT.into(),
903                id: "urn:uuid:test".into(),
904                kind: "Update".into(),
905                object: "/public/x".into(),
906                published: chrono::Utc::now().to_rfc3339(),
907            },
908        )
909        .await
910        .unwrap();
911    }
912
913    #[tokio::test]
914    async fn websocket_manager_broadcasts_events() {
915        let m = WebSocketChannelManager::new();
916        let mut rx = m.stream();
917        let sub = m.subscribe("/public/", "wss://pod.example").await;
918        assert_eq!(sub.channel_type, ChannelType::WebSocketChannel2023);
919        assert!(sub.receive_from.contains("/subscription/"));
920
921        let note = ChangeNotification::from_storage_event(
922            &StorageEvent::Created("/public/x".into()),
923            "https://pod.example",
924        );
925        m.publish("/public/", note.clone()).await.unwrap();
926        let received = tokio::time::timeout(Duration::from_secs(1), rx.recv())
927            .await
928            .unwrap()
929            .unwrap();
930        assert_eq!(received.kind, "Create");
931        assert_eq!(received.object, "https://pod.example/public/x");
932    }
933
934    #[tokio::test]
935    async fn change_notification_maps_event_types() {
936        let c = ChangeNotification::from_storage_event(
937            &StorageEvent::Created("/x".into()),
938            "https://p.example",
939        );
940        assert_eq!(c.kind, "Create");
941        let u = ChangeNotification::from_storage_event(
942            &StorageEvent::Updated("/x".into()),
943            "https://p.example",
944        );
945        assert_eq!(u.kind, "Update");
946        let d = ChangeNotification::from_storage_event(
947            &StorageEvent::Deleted("/x".into()),
948            "https://p.example",
949        );
950        assert_eq!(d.kind, "Delete");
951    }
952
953    #[test]
954    fn discovery_lists_both_channels() {
955        let doc = discovery_document("https://pod.example");
956        let arr = doc["channelTypes"].as_array().unwrap();
957        assert_eq!(arr.len(), 2);
958        let ids: Vec<&str> = arr.iter().map(|v| v["id"].as_str().unwrap()).collect();
959        assert!(ids.contains(&"WebSocketChannel2023"));
960        assert!(ids.contains(&"WebhookChannel2023"));
961    }
962
963    #[test]
964    fn webhook_manager_default_retries() {
965        let m = WebhookChannelManager::new();
966        assert_eq!(m.max_retries, 3);
967    }
968
969    #[tokio::test]
970    async fn websocket_active_subscriptions_count() {
971        let m = WebSocketChannelManager::new();
972        assert_eq!(m.active_subscriptions().await, 0);
973        let s = m.subscribe("/a/", "wss://p").await;
974        assert_eq!(m.active_subscriptions().await, 1);
975        m.unsubscribe(&s.id).await;
976        assert_eq!(m.active_subscriptions().await, 0);
977    }
978
979    #[tokio::test]
980    async fn inmemory_bounded_evicts_oldest_at_capacity() {
981        let n = InMemoryNotifications::with_capacity(3);
982        for i in 0..3 {
983            let sub = Subscription {
984                id: format!("sub-{i}"),
985                topic: "/t/".into(),
986                channel_type: ChannelType::WebhookChannel2023,
987                receive_from: format!("https://example.com/hook-{i}"),
988            };
989            n.subscribe(sub).await.unwrap();
990        }
991        // At capacity (3). Adding a 4th should evict the oldest (sub-0).
992        let sub4 = Subscription {
993            id: "sub-3".into(),
994            topic: "/t/".into(),
995            channel_type: ChannelType::WebhookChannel2023,
996            receive_from: "https://example.com/hook-3".into(),
997        };
998        n.subscribe(sub4).await.unwrap();
999
1000        // Verify total count stays at capacity.
1001        let guard = n.inner.read().await;
1002        assert_eq!(guard.total_count, 3);
1003        // The oldest (sub-0) was evicted.
1004        let subs = guard.topics.get("/t/").unwrap();
1005        assert!(!subs.iter().any(|s| s.id == "sub-0"));
1006        assert!(subs.iter().any(|s| s.id == "sub-3"));
1007    }
1008
1009    #[tokio::test]
1010    async fn inmemory_unsubscribe_decrements_total_count() {
1011        let n = InMemoryNotifications::with_capacity(100);
1012        let sub = Subscription {
1013            id: "sub-x".into(),
1014            topic: "/x/".into(),
1015            channel_type: ChannelType::WebhookChannel2023,
1016            receive_from: "https://example.com/hook".into(),
1017        };
1018        n.subscribe(sub).await.unwrap();
1019        {
1020            let guard = n.inner.read().await;
1021            assert_eq!(guard.total_count, 1);
1022        }
1023        n.unsubscribe("sub-x").await.unwrap();
1024        {
1025            let guard = n.inner.read().await;
1026            assert_eq!(guard.total_count, 0);
1027            // Empty topic entry is cleaned up.
1028            assert!(guard.topics.is_empty());
1029        }
1030    }
1031}