Skip to main content

solid_pod_rs/notifications/
mod.rs

1//! Solid Notifications Protocol (0.2) — Phase 2 implementation.
2//!
3//! Ships both `WebSocketChannel2023` and `WebhookChannel2023` channel
4//! types on top of a `broadcast::Sender<StorageEvent>` fed by the
5//! `Storage::watch()` method added in Phase 1.
6//!
7//! Reference: <https://solid.github.io/notifications/protocol/>
8//!
9//! Payload shape (per spec §7, Activity Streams 2.0 on JSON-LD):
10//!
11//! ```json
12//! {
13//!   "@context": "https://www.w3.org/ns/activitystreams",
14//!   "id": "urn:uuid:...",
15//!   "type": "Create" | "Update" | "Delete",
16//!   "object": "https://pod.example.com/path",
17//!   "published": "2025-04-20T12:00:00Z"
18//! }
19//! ```
20
21use std::collections::{HashMap, VecDeque};
22use std::sync::Arc;
23use std::time::Duration;
24
25use async_trait::async_trait;
26use serde::{Deserialize, Serialize};
27use tokio::sync::{broadcast, RwLock};
28
29use crate::error::PodError;
30use crate::storage::StorageEvent;
31
32// F3 (Sprint 4): Legacy `solid-0.1` notification adapter for SolidOS
33// data-browser compat. Feature-gated — zero runtime cost when off.
34#[cfg(feature = "legacy-notifications")]
35pub mod legacy;
36
37// Sprint 6 C: RFC 9421 HTTP Message Signatures for webhook deliveries.
38// Gated behind `webhook-signing`; when disabled, the signer pathway is
39// compiled out entirely and the manager remains drop-in compatible
40// with older consumers.
41#[cfg(feature = "webhook-signing")]
42pub mod signing;
43
44/// `as:` type URIs per Activity Streams 2.0.
45pub mod as_ns {
46    pub const CONTEXT: &str = "https://www.w3.org/ns/activitystreams";
47    pub const CREATE: &str = "Create";
48    pub const UPDATE: &str = "Update";
49    pub const DELETE: &str = "Delete";
50}
51
52/// Channel type advertised by `.notifications` discovery.
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
54#[serde(rename_all = "PascalCase")]
55pub enum ChannelType {
56    WebSocketChannel2023,
57    WebhookChannel2023,
58}
59
60/// A single subscription record.
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct Subscription {
63    /// Opaque subscription id (UUID in practice).
64    pub id: String,
65    /// Target resource/container path the client is interested in.
66    pub topic: String,
67    /// Which channel the client requested.
68    pub channel_type: ChannelType,
69    /// For webhooks: the URL the server will POST to. For
70    /// WebSockets: the URL the client should connect to (populated
71    /// by the server on subscription creation).
72    pub receive_from: String,
73}
74
75/// Activity Streams 2.0 change notification payload.
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct ChangeNotification {
78    #[serde(rename = "@context")]
79    pub context: String,
80    pub id: String,
81    #[serde(rename = "type")]
82    pub kind: String,
83    pub object: String,
84    pub published: String,
85}
86
87impl ChangeNotification {
88    /// Build a notification from a `StorageEvent`.
89    pub fn from_storage_event(event: &StorageEvent, pod_base: &str) -> Self {
90        let (kind, path) = match event {
91            StorageEvent::Created(p) => (as_ns::CREATE, p),
92            StorageEvent::Updated(p) => (as_ns::UPDATE, p),
93            StorageEvent::Deleted(p) => (as_ns::DELETE, p),
94        };
95        let object = format!("{}{}", pod_base.trim_end_matches('/'), path);
96        Self {
97            context: as_ns::CONTEXT.to_string(),
98            id: format!("urn:uuid:{}", uuid::Uuid::new_v4()),
99            kind: kind.to_string(),
100            object,
101            published: chrono::Utc::now().to_rfc3339(),
102        }
103    }
104}
105
106/// Public trait for notification backends.
107#[async_trait]
108pub trait Notifications: Send + Sync {
109    /// Register a subscription for a topic.
110    async fn subscribe(&self, subscription: Subscription) -> Result<(), PodError>;
111
112    /// Remove a subscription.
113    async fn unsubscribe(&self, id: &str) -> Result<(), PodError>;
114
115    /// Deliver a notification to all subscribers of `topic`.
116    async fn publish(&self, topic: &str, notification: ChangeNotification) -> Result<(), PodError>;
117}
118
119// ---------------------------------------------------------------------------
120// In-memory backend (shared by both channel types)
121// ---------------------------------------------------------------------------
122
123/// Default maximum number of subscriptions held across all topics.
124/// Prevents OOM under ActivityPub federation firehose conditions.
125pub const DEFAULT_MAX_SUBSCRIPTIONS: usize = 10_000;
126
127#[derive(Clone)]
128pub struct InMemoryNotifications {
129    inner: Arc<RwLock<InMemoryInner>>,
130}
131
132#[derive(Clone)]
133struct InMemoryInner {
134    topics: HashMap<String, VecDeque<Subscription>>,
135    /// Maximum total subscriptions across all topics.
136    max_capacity: usize,
137    /// Running total so we avoid recomputing on every insert.
138    total_count: usize,
139}
140
141impl Default for InMemoryInner {
142    fn default() -> Self {
143        Self {
144            topics: HashMap::new(),
145            max_capacity: DEFAULT_MAX_SUBSCRIPTIONS,
146            total_count: 0,
147        }
148    }
149}
150
151impl Default for InMemoryNotifications {
152    fn default() -> Self {
153        Self {
154            inner: Arc::new(RwLock::new(InMemoryInner::default())),
155        }
156    }
157}
158
159impl InMemoryNotifications {
160    pub fn new() -> Self {
161        Self::default()
162    }
163
164    /// Create with a custom maximum subscription capacity.
165    pub fn with_capacity(max_capacity: usize) -> Self {
166        Self {
167            inner: Arc::new(RwLock::new(InMemoryInner {
168                topics: HashMap::new(),
169                max_capacity,
170                total_count: 0,
171            })),
172        }
173    }
174}
175
176#[async_trait]
177impl Notifications for InMemoryNotifications {
178    async fn subscribe(&self, subscription: Subscription) -> Result<(), PodError> {
179        let mut guard = self.inner.write().await;
180        // Evict oldest subscription across all topics when at capacity.
181        if guard.total_count >= guard.max_capacity {
182            // Find the first non-empty topic and pop its oldest entry.
183            let evict_topic = guard
184                .topics
185                .iter()
186                .find(|(_, subs)| !subs.is_empty())
187                .map(|(t, _)| t.clone());
188            if let Some(topic_key) = evict_topic {
189                let now_empty = {
190                    let subs = guard.topics.get_mut(&topic_key).unwrap();
191                    subs.pop_front();
192                    subs.is_empty()
193                };
194                guard.total_count = guard.total_count.saturating_sub(1);
195                if now_empty {
196                    guard.topics.remove(&topic_key);
197                }
198            }
199        }
200        guard
201            .topics
202            .entry(subscription.topic.clone())
203            .or_default()
204            .push_back(subscription);
205        guard.total_count += 1;
206        Ok(())
207    }
208
209    async fn unsubscribe(&self, id: &str) -> Result<(), PodError> {
210        let mut guard = self.inner.write().await;
211        let mut removed = 0usize;
212        for subs in guard.topics.values_mut() {
213            let before = subs.len();
214            subs.retain(|s| s.id != id);
215            removed += before - subs.len();
216        }
217        guard.total_count = guard.total_count.saturating_sub(removed);
218        // Remove empty topic entries to avoid map bloat.
219        guard.topics.retain(|_, subs| !subs.is_empty());
220        Ok(())
221    }
222
223    async fn publish(
224        &self,
225        topic: &str,
226        _notification: ChangeNotification,
227    ) -> Result<(), PodError> {
228        let guard = self.inner.read().await;
229        let _ = guard.topics.get(topic);
230        Ok(())
231    }
232}
233
234// ---------------------------------------------------------------------------
235// WebSocketChannel2023
236// ---------------------------------------------------------------------------
237
238/// WebSocket-based notification channel. The manager maintains the
239/// list of subscriptions and emits serialised change notifications on
240/// a `tokio::sync::broadcast` channel that upstream HTTP servers
241/// attach WebSocket tasks to.
242#[derive(Clone)]
243pub struct WebSocketChannelManager {
244    subscriptions: Arc<RwLock<HashMap<String, Subscription>>>,
245    sender: broadcast::Sender<ChangeNotification>,
246    heartbeat_interval: Duration,
247}
248
249impl Default for WebSocketChannelManager {
250    fn default() -> Self {
251        Self::new()
252    }
253}
254
255impl WebSocketChannelManager {
256    pub fn new() -> Self {
257        let (tx, _) = broadcast::channel(1024);
258        Self {
259            subscriptions: Arc::new(RwLock::new(HashMap::new())),
260            sender: tx,
261            heartbeat_interval: Duration::from_secs(30),
262        }
263    }
264
265    /// Override the heartbeat interval (default 30s).
266    pub fn with_heartbeat(mut self, interval: Duration) -> Self {
267        self.heartbeat_interval = interval;
268        self
269    }
270
271    /// Internal test hook.
272    pub fn heartbeat_interval(&self) -> Duration {
273        self.heartbeat_interval
274    }
275
276    /// Register a new WebSocket subscription. Returns the
277    /// `receive_from` URL the client should connect to.
278    pub async fn subscribe(&self, topic: &str, base_url: &str) -> Subscription {
279        let id = uuid::Uuid::new_v4().to_string();
280        let receive_from = format!(
281            "{}/subscription/{}",
282            base_url.trim_end_matches('/'),
283            urlencoding(topic)
284        );
285        let sub = Subscription {
286            id: id.clone(),
287            topic: topic.to_string(),
288            channel_type: ChannelType::WebSocketChannel2023,
289            receive_from,
290        };
291        self.subscriptions.write().await.insert(id, sub.clone());
292        sub
293    }
294
295    /// Remove a subscription.
296    pub async fn unsubscribe(&self, id: &str) {
297        self.subscriptions.write().await.remove(id);
298    }
299
300    /// Subscribe to the broadcast stream. Each delivered message is a
301    /// pre-serialised `ChangeNotification` that the transport layer
302    /// writes to the WebSocket frame.
303    pub fn stream(&self) -> broadcast::Receiver<ChangeNotification> {
304        self.sender.subscribe()
305    }
306
307    /// Number of active subscriptions.
308    pub async fn active_subscriptions(&self) -> usize {
309        self.subscriptions.read().await.len()
310    }
311
312    /// Attach this manager to a stream of storage events. Each event
313    /// is translated into an Activity Streams notification and
314    /// broadcast to every connected client whose subscription topic
315    /// covers the event path.
316    pub async fn pump_from_storage(
317        self,
318        mut rx: tokio::sync::mpsc::Receiver<StorageEvent>,
319        pod_base: String,
320    ) {
321        while let Some(event) = rx.recv().await {
322            let note = ChangeNotification::from_storage_event(&event, &pod_base);
323            let _ = self.sender.send(note);
324        }
325    }
326}
327
328#[async_trait]
329impl Notifications for WebSocketChannelManager {
330    async fn subscribe(&self, subscription: Subscription) -> Result<(), PodError> {
331        self.subscriptions
332            .write()
333            .await
334            .insert(subscription.id.clone(), subscription);
335        Ok(())
336    }
337
338    async fn unsubscribe(&self, id: &str) -> Result<(), PodError> {
339        self.subscriptions.write().await.remove(id);
340        Ok(())
341    }
342
343    async fn publish(
344        &self,
345        _topic: &str,
346        notification: ChangeNotification,
347    ) -> Result<(), PodError> {
348        let _ = self.sender.send(notification);
349        Ok(())
350    }
351}
352
353// ---------------------------------------------------------------------------
354// WebhookChannel2023
355// ---------------------------------------------------------------------------
356
357/// Outcome of a webhook delivery attempt.
358#[derive(Debug, Clone, PartialEq, Eq)]
359pub enum WebhookDelivery {
360    /// 2xx response from the webhook target.
361    Delivered { status: u16 },
362    /// 4xx response — subscription is dropped.
363    FatalDrop { status: u16 },
364    /// 5xx or network — retry will be scheduled.
365    TransientRetry { reason: String },
366}
367
368/// Webhook notification channel with built-in retry logic. The
369/// manager keeps an internal map of subscriptions → target URL, and
370/// `deliver_all()` POSTs the Activity Streams payload to each target.
371///
372/// Sprint 6 C additions (ADR-058):
373/// * Optional RFC 9421 HTTP Message Signatures via [`Self::with_signer`].
374/// * `Retry-After` honoured on 429.
375/// * 410 Gone treated as `FatalDrop`; other 4xx retried as transient.
376/// * Full-jitter exponential back-off bounded by `max_backoff`.
377/// * Simple per-manager circuit breaker — consecutive failures are
378///   counted across `deliver_one` calls; once the threshold is reached
379///   further calls short-circuit to [`WebhookDelivery::TransientRetry`]
380///   with a `circuit open` reason until a successful delivery resets
381///   the counter.
382#[derive(Clone)]
383pub struct WebhookChannelManager {
384    client: reqwest::Client,
385    subscriptions: Arc<RwLock<HashMap<String, Subscription>>>,
386    /// Exponential backoff base (starting delay). Default 500ms.
387    pub retry_base: Duration,
388    /// Max retry attempts on 5xx. Default 3 (preserved for backward
389    /// compat; tests that exercise Sprint 6 semantics call
390    /// `with_max_attempts` explicitly).
391    pub max_retries: u32,
392    /// Cap on a single back-off wait. Default 1h.
393    pub max_backoff: Duration,
394    /// Sprint 6 C: consecutive failures before the circuit opens.
395    pub circuit_threshold: u32,
396    /// Current consecutive-failure counter; shared across clones so a
397    /// single logical channel shares breaker state.
398    consecutive_failures: Arc<std::sync::atomic::AtomicU32>,
399    /// Optional RFC 9421 signer. `None` leaves requests unsigned
400    /// (legacy behaviour) and emits a one-shot `tracing::warn` on
401    /// first use.
402    #[cfg(feature = "webhook-signing")]
403    signer: Option<signing::SignerConfig>,
404}
405
406impl Default for WebhookChannelManager {
407    fn default() -> Self {
408        Self::new()
409    }
410}
411
412impl WebhookChannelManager {
413    pub fn new() -> Self {
414        Self {
415            client: reqwest::Client::builder()
416                .timeout(Duration::from_secs(10))
417                .build()
418                .unwrap_or_default(),
419            subscriptions: Arc::new(RwLock::new(HashMap::new())),
420            retry_base: Duration::from_millis(500),
421            max_retries: 3,
422            max_backoff: Duration::from_secs(3600),
423            circuit_threshold: 10,
424            consecutive_failures: Arc::new(std::sync::atomic::AtomicU32::new(0)),
425            #[cfg(feature = "webhook-signing")]
426            signer: None,
427        }
428    }
429
430    /// Create a manager with a specific `reqwest::Client` (used in
431    /// tests with wiremock).
432    pub fn with_client(client: reqwest::Client) -> Self {
433        let mut m = Self::new();
434        m.client = client;
435        m
436    }
437
438    /// Sprint 6 C: attach an RFC 9421 signer. Subsequent deliveries
439    /// attach `Signature-Input` / `Signature` headers.
440    #[cfg(feature = "webhook-signing")]
441    pub fn with_signer(mut self, signer: signing::SignerConfig) -> Self {
442        self.signer = Some(signer);
443        self
444    }
445
446    /// Override the max attempts (1 == no retries). Default 5.
447    pub fn with_max_attempts(mut self, attempts: u32) -> Self {
448        // Internally we still carry `max_retries` so older public API
449        // callers keep working. `max_retries` is the *retry* count,
450        // i.e. one less than the total attempts.
451        self.max_retries = attempts.saturating_sub(1);
452        self
453    }
454
455    /// Override the maximum single back-off wait. Default 1h.
456    pub fn with_max_backoff(mut self, max: Duration) -> Self {
457        self.max_backoff = max;
458        self
459    }
460
461    /// Override the consecutive-failure threshold that opens the
462    /// breaker. Default 10.
463    pub fn with_circuit_threshold(mut self, threshold: u32) -> Self {
464        self.circuit_threshold = threshold;
465        self
466    }
467
468    /// True iff the breaker is currently open.
469    pub fn circuit_open(&self) -> bool {
470        self.consecutive_failures
471            .load(std::sync::atomic::Ordering::Relaxed)
472            >= self.circuit_threshold
473    }
474
475    /// Current consecutive-failure count. Public for observability and
476    /// tests.
477    pub fn consecutive_failures(&self) -> u32 {
478        self.consecutive_failures
479            .load(std::sync::atomic::Ordering::Relaxed)
480    }
481
482    /// Reset the consecutive-failure counter (e.g. after operator
483    /// intervention). Test hook, also exposed for admin UIs.
484    pub fn reset_circuit(&self) {
485        self.consecutive_failures
486            .store(0, std::sync::atomic::Ordering::Relaxed);
487    }
488
489    pub async fn subscribe(&self, topic: &str, target_url: &str) -> Subscription {
490        let sub = Subscription {
491            id: uuid::Uuid::new_v4().to_string(),
492            topic: topic.to_string(),
493            channel_type: ChannelType::WebhookChannel2023,
494            receive_from: target_url.to_string(),
495        };
496        self.subscriptions
497            .write()
498            .await
499            .insert(sub.id.clone(), sub.clone());
500        sub
501    }
502
503    pub async fn unsubscribe(&self, id: &str) {
504        self.subscriptions.write().await.remove(id);
505    }
506
507    pub async fn active_subscriptions(&self) -> usize {
508        self.subscriptions.read().await.len()
509    }
510
511    /// Parse an HTTP `Retry-After` header value — either a
512    /// delta-seconds integer (RFC 7231 §7.1.3) or an HTTP-date.
513    fn parse_retry_after(raw: &str) -> Option<Duration> {
514        if let Ok(secs) = raw.trim().parse::<u64>() {
515            return Some(Duration::from_secs(secs));
516        }
517        #[cfg(feature = "webhook-signing")]
518        {
519            if let Ok(when) = httpdate::parse_http_date(raw.trim()) {
520                if let Ok(delta) = when.duration_since(std::time::SystemTime::now()) {
521                    return Some(delta);
522                }
523            }
524        }
525        None
526    }
527
528    /// Full-jitter back-off: a random value in `[0.8 * cap, cap]`
529    /// where `cap = min(base * 2^attempt, max_backoff)`. The 20%
530    /// jitter window is what `tests/webhook_retry.rs::webhook_jitter_within_window`
531    /// asserts. Public for testability — not stable API.
532    #[doc(hidden)]
533    pub fn compute_backoff(&self, attempt: u32) -> Duration {
534        let exp = self
535            .retry_base
536            .saturating_mul(2u32.saturating_pow(attempt.min(20)));
537        let cap = std::cmp::min(exp, self.max_backoff);
538        // Jitter: pick a factor in [0.8, 1.0] so each back-off stays
539        // within 20% of the deterministic ceiling (tests assert the
540        // ±20% window). Uses the OS RNG when `webhook-signing` pulls
541        // `rand` in, otherwise falls back to a cheap time-based
542        // perturbation good enough for the jitter test.
543        let factor = jitter_factor();
544        let nanos = (cap.as_nanos() as f64 * factor) as u128;
545        Duration::from_nanos(nanos.min(u64::MAX as u128) as u64)
546    }
547
548    /// Build and send a single HTTP request, optionally signed.
549    async fn send_once(
550        &self,
551        url: &str,
552        note: &ChangeNotification,
553    ) -> Result<reqwest::Response, reqwest::Error> {
554        let body = serde_json::to_vec(note).unwrap_or_default();
555        #[cfg(feature = "webhook-signing")]
556        let notification_id = note.id.clone();
557        #[cfg_attr(not(feature = "webhook-signing"), allow(unused_mut))]
558        let mut req = self
559            .client
560            .post(url)
561            .header("Content-Type", "application/ld+json");
562
563        #[cfg(feature = "webhook-signing")]
564        {
565            if let Some(cfg) = &self.signer {
566                let now = std::time::SystemTime::now()
567                    .duration_since(std::time::UNIX_EPOCH)
568                    .map(|d| d.as_secs())
569                    .unwrap_or_default();
570                let signed = signing::sign_request(
571                    cfg,
572                    "POST",
573                    url,
574                    "application/ld+json",
575                    &body,
576                    &notification_id,
577                    now,
578                );
579                // send_once rebuilds the Content-Type header itself;
580                // attach every *other* header from the signer.
581                for (name, value) in &signed.headers {
582                    if name.eq_ignore_ascii_case("content-type") {
583                        continue;
584                    }
585                    req = req.header(name.as_str(), value.as_str());
586                }
587            } else {
588                tracing::warn!(
589                    "webhook manager delivering {} unsigned — consider configuring a SignerConfig",
590                    url
591                );
592            }
593        }
594
595        req.body(body).send().await
596    }
597
598    /// Deliver a single event to a single webhook URL, with full
599    /// Sprint 6 C retry / back-off / circuit-breaker semantics.
600    pub async fn deliver_one(&self, url: &str, note: &ChangeNotification) -> WebhookDelivery {
601        // Circuit breaker — bail before touching the network if open.
602        if self.circuit_open() {
603            return WebhookDelivery::TransientRetry {
604                reason: "circuit open".to_string(),
605            };
606        }
607
608        let total_attempts = self.max_retries.saturating_add(1);
609        let mut attempt = 0u32;
610        loop {
611            let resp = self.send_once(url, note).await;
612            match resp {
613                Ok(r) => {
614                    let status = r.status().as_u16();
615                    // 2xx — success resets the breaker.
616                    if r.status().is_success() {
617                        self.consecutive_failures
618                            .store(0, std::sync::atomic::Ordering::Relaxed);
619                        return WebhookDelivery::Delivered { status };
620                    }
621                    // 410 Gone — receiver asked to be unsubscribed.
622                    if status == 410 {
623                        self.consecutive_failures
624                            .store(0, std::sync::atomic::Ordering::Relaxed);
625                        return WebhookDelivery::FatalDrop { status };
626                    }
627                    // 429 — honour Retry-After then retry.
628                    if status == 429 {
629                        let retry_after = r
630                            .headers()
631                            .get("retry-after")
632                            .and_then(|v| v.to_str().ok())
633                            .and_then(Self::parse_retry_after)
634                            .unwrap_or_else(|| self.compute_backoff(attempt));
635                        attempt += 1;
636                        if attempt >= total_attempts {
637                            self.record_failure();
638                            return WebhookDelivery::TransientRetry {
639                                reason: format!("429 after {attempt} attempts"),
640                            };
641                        }
642                        tokio::time::sleep(retry_after.min(self.max_backoff)).await;
643                        continue;
644                    }
645                    // 5xx (incl. 503 with Retry-After) — retry with
646                    // back-off, honouring Retry-After if present.
647                    if r.status().is_server_error() {
648                        let wait = r
649                            .headers()
650                            .get("retry-after")
651                            .and_then(|v| v.to_str().ok())
652                            .and_then(Self::parse_retry_after)
653                            .unwrap_or_else(|| self.compute_backoff(attempt));
654                        attempt += 1;
655                        if attempt >= total_attempts {
656                            self.record_failure();
657                            return WebhookDelivery::TransientRetry {
658                                reason: format!("5xx after {attempt} attempts"),
659                            };
660                        }
661                        tokio::time::sleep(wait.min(self.max_backoff)).await;
662                        continue;
663                    }
664                    // Other 4xx (401/403/404/422/…) — subscription
665                    // stays alive; retry with back-off.
666                    if r.status().is_client_error() {
667                        let wait = self.compute_backoff(attempt);
668                        attempt += 1;
669                        if attempt >= total_attempts {
670                            self.record_failure();
671                            return WebhookDelivery::TransientRetry {
672                                reason: format!("{status} after {attempt} attempts"),
673                            };
674                        }
675                        tokio::time::sleep(wait.min(self.max_backoff)).await;
676                        continue;
677                    }
678                    // 3xx/1xx — treat as transient.
679                    let wait = self.compute_backoff(attempt);
680                    attempt += 1;
681                    if attempt >= total_attempts {
682                        self.record_failure();
683                        return WebhookDelivery::TransientRetry {
684                            reason: format!("status {status} after {attempt} attempts"),
685                        };
686                    }
687                    tokio::time::sleep(wait.min(self.max_backoff)).await;
688                }
689                Err(e) => {
690                    // Network error — same treatment as 5xx.
691                    let wait = self.compute_backoff(attempt);
692                    attempt += 1;
693                    if attempt >= total_attempts {
694                        self.record_failure();
695                        return WebhookDelivery::TransientRetry {
696                            reason: format!("network error: {e}"),
697                        };
698                    }
699                    tokio::time::sleep(wait.min(self.max_backoff)).await;
700                }
701            }
702        }
703    }
704
705    fn record_failure(&self) {
706        self.consecutive_failures
707            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
708    }
709
710    /// Deliver the notification to every matching subscription.
711    /// Returns the per-subscription outcome.
712    pub async fn deliver_all(
713        &self,
714        note: &ChangeNotification,
715        topic_matches: impl Fn(&str) -> bool,
716    ) -> Vec<(String, WebhookDelivery)> {
717        let subs: Vec<Subscription> = {
718            let guard = self.subscriptions.read().await;
719            guard
720                .values()
721                .filter(|s| topic_matches(&s.topic))
722                .cloned()
723                .collect()
724        };
725        let mut out = Vec::with_capacity(subs.len());
726        let mut dropped = Vec::new();
727        for sub in subs {
728            let result = self.deliver_one(&sub.receive_from, note).await;
729            if matches!(result, WebhookDelivery::FatalDrop { .. }) {
730                dropped.push(sub.id.clone());
731            }
732            out.push((sub.id, result));
733        }
734        if !dropped.is_empty() {
735            let mut guard = self.subscriptions.write().await;
736            for id in dropped {
737                guard.remove(&id);
738            }
739        }
740        out
741    }
742
743    /// Attach the manager to a storage event stream. Each event is
744    /// translated into an Activity Streams notification and delivered
745    /// to every subscription whose topic is a prefix of the event
746    /// path.
747    pub async fn pump_from_storage(
748        self,
749        mut rx: tokio::sync::mpsc::Receiver<StorageEvent>,
750        pod_base: String,
751    ) {
752        while let Some(event) = rx.recv().await {
753            let path = match &event {
754                StorageEvent::Created(p) | StorageEvent::Updated(p) | StorageEvent::Deleted(p) => {
755                    p.clone()
756                }
757            };
758            let note = ChangeNotification::from_storage_event(&event, &pod_base);
759            self.deliver_all(&note, |topic| path.starts_with(topic))
760                .await;
761        }
762    }
763}
764
765#[async_trait]
766impl Notifications for WebhookChannelManager {
767    async fn subscribe(&self, subscription: Subscription) -> Result<(), PodError> {
768        self.subscriptions
769            .write()
770            .await
771            .insert(subscription.id.clone(), subscription);
772        Ok(())
773    }
774
775    async fn unsubscribe(&self, id: &str) -> Result<(), PodError> {
776        self.subscriptions.write().await.remove(id);
777        Ok(())
778    }
779
780    async fn publish(&self, topic: &str, notification: ChangeNotification) -> Result<(), PodError> {
781        let matches_topic = |t: &str| topic.starts_with(t) || t == topic;
782        self.deliver_all(&notification, matches_topic).await;
783        Ok(())
784    }
785}
786
787// ---------------------------------------------------------------------------
788// Subscription discovery (.notifications)
789// ---------------------------------------------------------------------------
790
791/// Build the subscription-discovery JSON-LD document served at
792/// `.notifications` per the Notifications Protocol §5.
793pub fn discovery_document(pod_base: &str) -> serde_json::Value {
794    let base = pod_base.trim_end_matches('/');
795    serde_json::json!({
796        "@context": ["https://www.w3.org/ns/solid/notifications-context/v1"],
797        "id": format!("{base}/.notifications"),
798        "channelTypes": [
799            {
800                "id": "WebSocketChannel2023",
801                "endpoint": format!("{base}/.notifications/websocket"),
802                "features": ["as:Create", "as:Update", "as:Delete"]
803            },
804            {
805                "id": "WebhookChannel2023",
806                "endpoint": format!("{base}/.notifications/webhook"),
807                "features": ["as:Create", "as:Update", "as:Delete"]
808            }
809        ]
810    })
811}
812
813// ---------------------------------------------------------------------------
814// Jitter helper — Sprint 6 C. Returns a multiplier in [0.8, 1.0]. When
815// the `webhook-signing` feature is enabled we use the `rand` OS RNG;
816// otherwise we derive a deterministic-but-varying factor from the
817// monotonic clock, which gives enough dispersion across a hundred
818// trials for the back-off jitter test to pass without a new
819// always-on dependency.
820// ---------------------------------------------------------------------------
821
822#[cfg(feature = "webhook-signing")]
823fn jitter_factor() -> f64 {
824    use rand::Rng;
825    rand::thread_rng().gen_range(0.8_f64..1.0_f64)
826}
827
828#[cfg(not(feature = "webhook-signing"))]
829fn jitter_factor() -> f64 {
830    use std::sync::atomic::{AtomicU64, Ordering};
831    // Splitmix64 step seeded by the monotonic nanoseconds.
832    static SEED: AtomicU64 = AtomicU64::new(0);
833    let seed = {
834        let n = std::time::Instant::now().elapsed().as_nanos() as u64;
835        let prev = SEED.fetch_add(n | 1, Ordering::Relaxed);
836        prev.wrapping_add(n).wrapping_add(0x9E3779B97F4A7C15)
837    };
838    let mut x = seed;
839    x = (x ^ (x >> 30)).wrapping_mul(0xBF58476D1CE4E5B9);
840    x = (x ^ (x >> 27)).wrapping_mul(0x94D049BB133111EB);
841    x ^= x >> 31;
842    // Map to [0.8, 1.0).
843    let unit = (x >> 11) as f64 / (1u64 << 53) as f64;
844    0.8 + unit * 0.2
845}
846
847// ---------------------------------------------------------------------------
848// Small util: percent-encode path for use in URLs.
849// ---------------------------------------------------------------------------
850
851fn urlencoding(s: &str) -> String {
852    let mut out = String::with_capacity(s.len());
853    for b in s.bytes() {
854        match b {
855            b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' | b'/' => {
856                out.push(b as char);
857            }
858            _ => {
859                out.push_str(&format!("%{:02X}", b));
860            }
861        }
862    }
863    out
864}
865
866// ---------------------------------------------------------------------------
867// Tests
868// ---------------------------------------------------------------------------
869
870#[cfg(test)]
871mod tests {
872    use super::*;
873
874    #[tokio::test]
875    async fn subscribe_unsubscribe_roundtrip() {
876        let n = InMemoryNotifications::new();
877        let sub = Subscription {
878            id: "sub-1".into(),
879            topic: "/public/".into(),
880            channel_type: ChannelType::WebhookChannel2023,
881            receive_from: "https://example.com/hook".into(),
882        };
883        n.subscribe(sub.clone()).await.unwrap();
884        n.unsubscribe("sub-1").await.unwrap();
885        n.publish(
886            "/public/",
887            ChangeNotification {
888                context: as_ns::CONTEXT.into(),
889                id: "urn:uuid:test".into(),
890                kind: "Update".into(),
891                object: "/public/x".into(),
892                published: chrono::Utc::now().to_rfc3339(),
893            },
894        )
895        .await
896        .unwrap();
897    }
898
899    #[tokio::test]
900    async fn websocket_manager_broadcasts_events() {
901        let m = WebSocketChannelManager::new();
902        let mut rx = m.stream();
903        let sub = m.subscribe("/public/", "wss://pod.example").await;
904        assert_eq!(sub.channel_type, ChannelType::WebSocketChannel2023);
905        assert!(sub.receive_from.contains("/subscription/"));
906
907        let note = ChangeNotification::from_storage_event(
908            &StorageEvent::Created("/public/x".into()),
909            "https://pod.example",
910        );
911        m.publish("/public/", note.clone()).await.unwrap();
912        let received = tokio::time::timeout(Duration::from_secs(1), rx.recv())
913            .await
914            .unwrap()
915            .unwrap();
916        assert_eq!(received.kind, "Create");
917        assert_eq!(received.object, "https://pod.example/public/x");
918    }
919
920    #[tokio::test]
921    async fn change_notification_maps_event_types() {
922        let c = ChangeNotification::from_storage_event(
923            &StorageEvent::Created("/x".into()),
924            "https://p.example",
925        );
926        assert_eq!(c.kind, "Create");
927        let u = ChangeNotification::from_storage_event(
928            &StorageEvent::Updated("/x".into()),
929            "https://p.example",
930        );
931        assert_eq!(u.kind, "Update");
932        let d = ChangeNotification::from_storage_event(
933            &StorageEvent::Deleted("/x".into()),
934            "https://p.example",
935        );
936        assert_eq!(d.kind, "Delete");
937    }
938
939    #[test]
940    fn discovery_lists_both_channels() {
941        let doc = discovery_document("https://pod.example");
942        let arr = doc["channelTypes"].as_array().unwrap();
943        assert_eq!(arr.len(), 2);
944        let ids: Vec<&str> = arr.iter().map(|v| v["id"].as_str().unwrap()).collect();
945        assert!(ids.contains(&"WebSocketChannel2023"));
946        assert!(ids.contains(&"WebhookChannel2023"));
947    }
948
949    #[test]
950    fn webhook_manager_default_retries() {
951        let m = WebhookChannelManager::new();
952        assert_eq!(m.max_retries, 3);
953    }
954
955    #[tokio::test]
956    async fn websocket_active_subscriptions_count() {
957        let m = WebSocketChannelManager::new();
958        assert_eq!(m.active_subscriptions().await, 0);
959        let s = m.subscribe("/a/", "wss://p").await;
960        assert_eq!(m.active_subscriptions().await, 1);
961        m.unsubscribe(&s.id).await;
962        assert_eq!(m.active_subscriptions().await, 0);
963    }
964
965    #[tokio::test]
966    async fn inmemory_bounded_evicts_oldest_at_capacity() {
967        let n = InMemoryNotifications::with_capacity(3);
968        for i in 0..3 {
969            let sub = Subscription {
970                id: format!("sub-{i}"),
971                topic: "/t/".into(),
972                channel_type: ChannelType::WebhookChannel2023,
973                receive_from: format!("https://example.com/hook-{i}"),
974            };
975            n.subscribe(sub).await.unwrap();
976        }
977        // At capacity (3). Adding a 4th should evict the oldest (sub-0).
978        let sub4 = Subscription {
979            id: "sub-3".into(),
980            topic: "/t/".into(),
981            channel_type: ChannelType::WebhookChannel2023,
982            receive_from: "https://example.com/hook-3".into(),
983        };
984        n.subscribe(sub4).await.unwrap();
985
986        // Verify total count stays at capacity.
987        let guard = n.inner.read().await;
988        assert_eq!(guard.total_count, 3);
989        // The oldest (sub-0) was evicted.
990        let subs = guard.topics.get("/t/").unwrap();
991        assert!(!subs.iter().any(|s| s.id == "sub-0"));
992        assert!(subs.iter().any(|s| s.id == "sub-3"));
993    }
994
995    #[tokio::test]
996    async fn inmemory_unsubscribe_decrements_total_count() {
997        let n = InMemoryNotifications::with_capacity(100);
998        let sub = Subscription {
999            id: "sub-x".into(),
1000            topic: "/x/".into(),
1001            channel_type: ChannelType::WebhookChannel2023,
1002            receive_from: "https://example.com/hook".into(),
1003        };
1004        n.subscribe(sub).await.unwrap();
1005        {
1006            let guard = n.inner.read().await;
1007            assert_eq!(guard.total_count, 1);
1008        }
1009        n.unsubscribe("sub-x").await.unwrap();
1010        {
1011            let guard = n.inner.read().await;
1012            assert_eq!(guard.total_count, 0);
1013            // Empty topic entry is cleaned up.
1014            assert!(guard.topics.is_empty());
1015        }
1016    }
1017}