Skip to main content

oxirs_stream/websub/
mod.rs

1//! WebSub (W3C Recommendation) implementation for RDF dataset change notifications.
2//!
3//! Spec: <https://www.w3.org/TR/websub/>
4//!
5//! WebSub enables publish-subscribe over HTTP for any web resource. This module provides:
6//! - [`WebSubHub`]: Tracks subscriptions and dispatches notifications.
7//! - [`WebSubPublisher`]: Notifies a hub that a topic has been updated.
8//! - [`WebSubSubscriber`]: Handles intent verification and signature checking.
9//! - [`DatasetEventBus`]: In-process change notifications for RDF datasets.
10
11use hmac::{Hmac, Mac};
12use sha2::Sha256;
13use std::fmt;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, SystemTime};
16use uuid::Uuid;
17
18// ── Error type ────────────────────────────────────────────────────────────────
19
20/// Errors produced by WebSub operations.
21#[derive(Debug)]
22pub enum WebSubError {
23    /// The hub or callback URL is invalid.
24    InvalidUrl(String),
25    /// A required parameter is missing.
26    MissingParameter(String),
27    /// The challenge/signature did not match.
28    VerificationFailed(String),
29    /// HMAC MAC initialisation failed.
30    HmacError(String),
31    /// A subscriber callback produced an error.
32    SubscriberError(String),
33    /// Lock poisoning.
34    LockPoisoned,
35}
36
37impl fmt::Display for WebSubError {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        match self {
40            WebSubError::InvalidUrl(msg) => write!(f, "invalid URL: {msg}"),
41            WebSubError::MissingParameter(p) => write!(f, "missing parameter: {p}"),
42            WebSubError::VerificationFailed(msg) => write!(f, "verification failed: {msg}"),
43            WebSubError::HmacError(msg) => write!(f, "HMAC error: {msg}"),
44            WebSubError::SubscriberError(msg) => write!(f, "subscriber error: {msg}"),
45            WebSubError::LockPoisoned => write!(f, "mutex lock was poisoned"),
46        }
47    }
48}
49
50impl std::error::Error for WebSubError {}
51
52/// Convenience result alias.
53pub type Result<T> = std::result::Result<T, WebSubError>;
54
55// ── Subscription ──────────────────────────────────────────────────────────────
56
57/// A single WebSub subscription held by a hub.
58#[derive(Debug, Clone)]
59pub struct Subscription {
60    /// The RDF dataset URL being watched.
61    pub topic_url: String,
62    /// The subscriber's callback endpoint.
63    pub callback_url: String,
64    /// Optional HMAC-SHA256 secret shared with the subscriber.
65    pub secret: Option<String>,
66    /// Requested subscription duration in seconds.
67    pub lease_seconds: u64,
68    /// Wall-clock time at which this subscription expires.
69    pub expires_at: SystemTime,
70    /// Whether the hub has confirmed this subscription is active.
71    pub active: bool,
72}
73
74impl Subscription {
75    /// Create a new subscription, computing `expires_at` from `lease_seconds`.
76    pub fn new(
77        topic_url: impl Into<String>,
78        callback_url: impl Into<String>,
79        secret: Option<String>,
80        lease_seconds: u64,
81    ) -> Self {
82        let expires_at = SystemTime::now() + Duration::from_secs(lease_seconds);
83        Self {
84            topic_url: topic_url.into(),
85            callback_url: callback_url.into(),
86            secret,
87            lease_seconds,
88            expires_at,
89            active: false,
90        }
91    }
92
93    /// Return `true` if this subscription has not yet expired and is active.
94    pub fn is_valid(&self) -> bool {
95        self.active && SystemTime::now() < self.expires_at
96    }
97}
98
99// ── WebSubHub ─────────────────────────────────────────────────────────────────
100
101/// A WebSub hub that stores subscriptions and tracks which topics have been
102/// updated.  A production hub would also drive HTTP callbacks; this
103/// implementation focuses on the subscription registry.
104#[derive(Debug, Default)]
105pub struct WebSubHub {
106    /// The publicly reachable URL of this hub.
107    pub hub_url: String,
108    /// All subscriptions held by this hub.
109    pub subscriptions: Vec<Subscription>,
110}
111
112impl WebSubHub {
113    /// Create a new hub with the given public URL.
114    pub fn new(hub_url: impl Into<String>) -> Self {
115        Self {
116            hub_url: hub_url.into(),
117            subscriptions: Vec::new(),
118        }
119    }
120
121    /// Register a new subscription.  Returns the index of the new entry.
122    pub fn add_subscription(&mut self, sub: Subscription) -> usize {
123        let idx = self.subscriptions.len();
124        self.subscriptions.push(sub);
125        idx
126    }
127
128    /// Activate a subscription by index (after intent verification succeeds).
129    pub fn activate(&mut self, index: usize) -> Result<()> {
130        let sub = self
131            .subscriptions
132            .get_mut(index)
133            .ok_or_else(|| WebSubError::MissingParameter(format!("index {index}")))?;
134        sub.active = true;
135        Ok(())
136    }
137
138    /// Return all active, non-expired subscriptions for the given topic.
139    pub fn active_subscriptions_for(&self, topic_url: &str) -> Vec<&Subscription> {
140        self.subscriptions
141            .iter()
142            .filter(|s| s.topic_url == topic_url && s.is_valid())
143            .collect()
144    }
145
146    /// Remove expired subscriptions and return the count removed.
147    pub fn purge_expired(&mut self) -> usize {
148        let before = self.subscriptions.len();
149        self.subscriptions
150            .retain(|s| SystemTime::now() < s.expires_at);
151        before - self.subscriptions.len()
152    }
153}
154
155// ── WebSubPublisher ───────────────────────────────────────────────────────────
156
157/// Publishes topic updates to a WebSub hub and generates HTTP `Link` headers
158/// for self-discovery (RFC 5988 / RFC 8288).
159#[derive(Debug, Clone)]
160pub struct WebSubPublisher {
161    /// The hub to notify.
162    pub hub_url: String,
163}
164
165impl WebSubPublisher {
166    /// Create a publisher that notifies the given hub.
167    pub fn new(hub_url: impl Into<String>) -> Self {
168        Self {
169            hub_url: hub_url.into(),
170        }
171    }
172
173    /// Build the form-encoded body for a hub ping (`hub.mode=publish`).
174    pub fn publish_params(&self, topic_url: &str) -> Vec<(String, String)> {
175        vec![
176            ("hub.mode".to_string(), "publish".to_string()),
177            ("hub.url".to_string(), topic_url.to_string()),
178        ]
179    }
180
181    /// Notify the hub that `topic_url` has been updated.
182    ///
183    /// Returns the form-encoded parameter pairs that would be sent to the hub.
184    /// Callers are responsible for making the actual HTTP POST.
185    pub fn publish(&self, topic_url: &str) -> Result<Vec<(String, String)>> {
186        if topic_url.is_empty() {
187            return Err(WebSubError::MissingParameter("topic_url".to_string()));
188        }
189        Ok(self.publish_params(topic_url))
190    }
191
192    /// Return the RFC 8288 `Link` headers a publisher should include in its
193    /// HTTP responses to allow subscriber discovery.
194    ///
195    /// ```text
196    /// Link: <https://hub.example.com/>; rel="hub"
197    /// Link: <https://publisher.example.com/topic>; rel="self"
198    /// ```
199    pub fn link_headers(&self, topic_url: &str) -> Vec<String> {
200        vec![
201            format!("<{}>; rel=\"hub\"", self.hub_url),
202            format!("<{}>; rel=\"self\"", topic_url),
203        ]
204    }
205}
206
207// ── WebSubSubscriber ──────────────────────────────────────────────────────────
208
209/// Builds subscription requests and handles incoming WebSub callbacks.
210#[derive(Debug, Clone)]
211pub struct WebSubSubscriber {
212    /// The callback URL this subscriber exposes.
213    pub callback_url: String,
214    /// Optional HMAC-SHA256 secret for signature verification.
215    pub secret: Option<String>,
216}
217
218impl WebSubSubscriber {
219    /// Create a subscriber with the given callback URL and no secret.
220    pub fn new(callback_url: impl Into<String>) -> Self {
221        Self {
222            callback_url: callback_url.into(),
223            secret: None,
224        }
225    }
226
227    /// Attach an HMAC secret (builder style).
228    pub fn with_secret(mut self, secret: impl Into<String>) -> Self {
229        self.secret = Some(secret.into());
230        self
231    }
232
233    /// Return the form-encoded parameters for a `subscribe` request.
234    pub fn subscribe_params(
235        &self,
236        hub_url: &str,
237        topic_url: &str,
238        lease_seconds: u64,
239    ) -> Vec<(String, String)> {
240        let mut params = vec![
241            ("hub.callback".to_string(), self.callback_url.clone()),
242            ("hub.mode".to_string(), "subscribe".to_string()),
243            ("hub.topic".to_string(), topic_url.to_string()),
244            ("hub.lease_seconds".to_string(), lease_seconds.to_string()),
245        ];
246        if let Some(ref s) = self.secret {
247            params.push(("hub.secret".to_string(), s.clone()));
248        }
249        params.push(("hub.hub_url".to_string(), hub_url.to_string()));
250        params
251    }
252
253    /// Return the form-encoded parameters for an `unsubscribe` request.
254    pub fn unsubscribe_params(&self, hub_url: &str, topic_url: &str) -> Vec<(String, String)> {
255        vec![
256            ("hub.callback".to_string(), self.callback_url.clone()),
257            ("hub.mode".to_string(), "unsubscribe".to_string()),
258            ("hub.topic".to_string(), topic_url.to_string()),
259            ("hub.hub_url".to_string(), hub_url.to_string()),
260        ]
261    }
262
263    /// Handle the hub's intent-verification GET request.
264    ///
265    /// The WebSub spec requires the subscriber to echo back the `challenge`
266    /// when it agrees with the `mode` and `topic`.  Returns `Ok(challenge)`.
267    pub fn verify_intent(
268        &self,
269        mode: &str,
270        topic: &str,
271        challenge: &str,
272        _lease_seconds: Option<u64>,
273    ) -> Result<String> {
274        if mode.is_empty() {
275            return Err(WebSubError::MissingParameter("hub.mode".to_string()));
276        }
277        if topic.is_empty() {
278            return Err(WebSubError::MissingParameter("hub.topic".to_string()));
279        }
280        if challenge.is_empty() {
281            return Err(WebSubError::MissingParameter("hub.challenge".to_string()));
282        }
283        match mode {
284            "subscribe" | "unsubscribe" => Ok(challenge.to_string()),
285            other => Err(WebSubError::VerificationFailed(format!(
286                "unknown mode: {other}"
287            ))),
288        }
289    }
290
291    /// Verify the `X-Hub-Signature` header on an incoming notification.
292    ///
293    /// The header format is `sha256=<hex-digest>`.  Returns `Ok(true)` on
294    /// success, `Ok(false)` if the signature does not match, and `Err` when
295    /// no secret is configured or the header is malformed.
296    pub fn verify_signature(&self, body: &[u8], signature_header: &str) -> Result<bool> {
297        let secret = self
298            .secret
299            .as_deref()
300            .ok_or_else(|| WebSubError::VerificationFailed("no secret configured".to_string()))?;
301
302        let expected_hex = signature_header.strip_prefix("sha256=").ok_or_else(|| {
303            WebSubError::VerificationFailed(format!(
304                "unsupported signature algorithm in: {signature_header}"
305            ))
306        })?;
307
308        type HmacSha256 = Hmac<Sha256>;
309        let mut mac = HmacSha256::new_from_slice(secret.as_bytes())
310            .map_err(|e| WebSubError::HmacError(e.to_string()))?;
311        mac.update(body);
312        let computed = mac.finalize().into_bytes();
313        let computed_hex = hex::encode(computed);
314
315        Ok(computed_hex == expected_hex)
316    }
317
318    /// Compute the `X-Hub-Signature` header value for a notification body.
319    ///
320    /// Useful for hub implementations that need to sign outgoing content.
321    pub fn compute_signature(&self, body: &[u8]) -> Result<String> {
322        let secret = self
323            .secret
324            .as_deref()
325            .ok_or_else(|| WebSubError::VerificationFailed("no secret configured".to_string()))?;
326
327        type HmacSha256 = Hmac<Sha256>;
328        let mut mac = HmacSha256::new_from_slice(secret.as_bytes())
329            .map_err(|e| WebSubError::HmacError(e.to_string()))?;
330        mac.update(body);
331        let digest = mac.finalize().into_bytes();
332        Ok(format!("sha256={}", hex::encode(digest)))
333    }
334}
335
336// ── ChangeType ────────────────────────────────────────────────────────────────
337
338/// The kind of mutation that occurred in an RDF dataset.
339#[derive(Debug, Clone, PartialEq, Eq)]
340pub enum ChangeType {
341    /// A single triple was added.
342    TripleAdded,
343    /// A single triple was removed.
344    TripleRemoved,
345    /// All triples in a named graph were deleted.
346    GraphCleared,
347    /// A named graph was dropped entirely.
348    GraphDropped,
349    /// A new named graph was created (possibly empty).
350    GraphCreated,
351    /// A bulk transaction affecting many triples at once.
352    BulkUpdate {
353        /// Number of triples affected by this bulk operation.
354        triple_count: usize,
355    },
356}
357
358// ── DatasetChangeEvent ────────────────────────────────────────────────────────
359
360/// A single change notification produced when an RDF dataset is mutated.
361#[derive(Debug, Clone)]
362pub struct DatasetChangeEvent {
363    /// The dataset that changed.
364    pub dataset_url: String,
365    /// What kind of change occurred.
366    pub change_type: ChangeType,
367    /// The named graph affected (if applicable).
368    pub affected_graph: Option<String>,
369    /// Triples that were added (subject, predicate, object as strings).
370    pub added_triples: Vec<(String, String, String)>,
371    /// Triples that were removed (subject, predicate, object as strings).
372    pub removed_triples: Vec<(String, String, String)>,
373    /// When the change occurred.
374    pub timestamp: SystemTime,
375    /// Unique identifier for deduplication across distributed systems.
376    pub change_id: String,
377}
378
379impl DatasetChangeEvent {
380    /// Construct a new change event, generating a fresh UUID for `change_id`.
381    pub fn new(
382        dataset_url: impl Into<String>,
383        change_type: ChangeType,
384        affected_graph: Option<String>,
385        added_triples: Vec<(String, String, String)>,
386        removed_triples: Vec<(String, String, String)>,
387    ) -> Self {
388        Self {
389            dataset_url: dataset_url.into(),
390            change_type,
391            affected_graph,
392            added_triples,
393            removed_triples,
394            timestamp: SystemTime::now(),
395            change_id: Uuid::new_v4().to_string(),
396        }
397    }
398
399    /// Return `true` if this event contains at least one triple mutation.
400    pub fn has_delta(&self) -> bool {
401        !self.added_triples.is_empty() || !self.removed_triples.is_empty()
402    }
403}
404
405// ── ChangeSubscriber trait ────────────────────────────────────────────────────
406
407/// Implemented by anything that wants to receive dataset change notifications.
408pub trait ChangeSubscriber: Send + Sync {
409    /// Called synchronously on the publishing thread.
410    fn on_change(&self, event: &DatasetChangeEvent) -> Result<()>;
411}
412
413// ── DatasetEventBus ───────────────────────────────────────────────────────────
414
415/// An in-process publish-subscribe bus for RDF dataset change events.
416///
417/// Multiple [`ChangeSubscriber`] implementations can be registered.  When
418/// [`DatasetEventBus::publish`] is called every subscriber receives the event
419/// in registration order.  Errors from individual subscribers are collected
420/// but do not prevent later subscribers from receiving the event.
421pub struct DatasetEventBus {
422    subscribers: Vec<Box<dyn ChangeSubscriber>>,
423}
424
425impl Default for DatasetEventBus {
426    fn default() -> Self {
427        Self::new()
428    }
429}
430
431impl DatasetEventBus {
432    /// Create a new, empty event bus.
433    pub fn new() -> Self {
434        Self {
435            subscribers: Vec::new(),
436        }
437    }
438
439    /// Register a subscriber.
440    pub fn subscribe(&mut self, subscriber: Box<dyn ChangeSubscriber>) {
441        self.subscribers.push(subscriber);
442    }
443
444    /// Publish a change event to all registered subscribers.
445    ///
446    /// All subscribers are called even if earlier ones return an error.
447    /// If any subscriber failed the first error is returned.
448    pub fn publish(&self, event: DatasetChangeEvent) -> Result<()> {
449        let mut first_error: Option<WebSubError> = None;
450        for sub in &self.subscribers {
451            if let Err(e) = sub.on_change(&event) {
452                if first_error.is_none() {
453                    first_error = Some(e);
454                }
455            }
456        }
457        match first_error {
458            Some(e) => Err(e),
459            None => Ok(()),
460        }
461    }
462
463    /// Return the number of registered subscribers.
464    pub fn subscriber_count(&self) -> usize {
465        self.subscribers.len()
466    }
467}
468
469// ── InMemorySubscriber ────────────────────────────────────────────────────────
470
471/// A [`ChangeSubscriber`] that collects received events in a `Vec` behind a
472/// shared `Mutex`.  Useful for unit tests.
473pub struct InMemorySubscriber {
474    received: Arc<Mutex<Vec<DatasetChangeEvent>>>,
475}
476
477impl InMemorySubscriber {
478    /// Create a new subscriber and return it together with a shared handle to
479    /// the underlying event buffer so tests can inspect received events without
480    /// keeping a reference to the subscriber itself.
481    pub fn new() -> (Self, Arc<Mutex<Vec<DatasetChangeEvent>>>) {
482        let received = Arc::new(Mutex::new(Vec::new()));
483        let handle = Arc::clone(&received);
484        (Self { received }, handle)
485    }
486
487    /// Return a snapshot of all received events (cloned).
488    pub fn events(&self) -> Vec<DatasetChangeEvent> {
489        self.received.lock().map(|g| g.clone()).unwrap_or_default()
490    }
491}
492
493impl Default for InMemorySubscriber {
494    fn default() -> Self {
495        Self {
496            received: Arc::new(Mutex::new(Vec::new())),
497        }
498    }
499}
500
501impl ChangeSubscriber for InMemorySubscriber {
502    fn on_change(&self, event: &DatasetChangeEvent) -> Result<()> {
503        self.received
504            .lock()
505            .map_err(|_| WebSubError::LockPoisoned)?
506            .push(event.clone());
507        Ok(())
508    }
509}
510
511// ── Helper: compute HMAC-SHA256 signature ─────────────────────────────────────
512
513/// Compute a raw HMAC-SHA256 hex digest over `body` using `secret`.
514///
515/// Returns a `sha256=<hex>` formatted string suitable for the
516/// `X-Hub-Signature` header.
517pub fn sign_notification(secret: &str, body: &[u8]) -> Result<String> {
518    type HmacSha256 = Hmac<Sha256>;
519    let mut mac = HmacSha256::new_from_slice(secret.as_bytes())
520        .map_err(|e| WebSubError::HmacError(e.to_string()))?;
521    mac.update(body);
522    let digest = mac.finalize().into_bytes();
523    Ok(format!("sha256={}", hex::encode(digest)))
524}
525
526// ─────────────────────────────────────────────────────────────────────────────
527// Tests
528// ─────────────────────────────────────────────────────────────────────────────
529
530#[cfg(test)]
531mod tests {
532    use super::*;
533
534    // ── helpers ──────────────────────────────────────────────────────────────
535
536    fn make_publisher() -> WebSubPublisher {
537        WebSubPublisher::new("https://hub.example.com/")
538    }
539
540    fn make_subscriber() -> WebSubSubscriber {
541        WebSubSubscriber::new("https://subscriber.example.com/callback")
542    }
543
544    fn make_secret_subscriber() -> WebSubSubscriber {
545        WebSubSubscriber::new("https://subscriber.example.com/callback")
546            .with_secret("my-secret-key")
547    }
548
549    fn make_event(ct: ChangeType) -> DatasetChangeEvent {
550        DatasetChangeEvent::new("https://ds.example.com/", ct, None, vec![], vec![])
551    }
552
553    // ── WebSubPublisher ───────────────────────────────────────────────────────
554
555    #[test]
556    fn publisher_link_headers_hub_rel() {
557        let pub_ = make_publisher();
558        let headers = pub_.link_headers("https://publisher.example.com/topic");
559        assert_eq!(headers.len(), 2);
560        assert!(
561            headers[0].contains("rel=\"hub\""),
562            "first header should have rel=hub"
563        );
564    }
565
566    #[test]
567    fn publisher_link_headers_self_rel() {
568        let pub_ = make_publisher();
569        let headers = pub_.link_headers("https://publisher.example.com/topic");
570        assert!(
571            headers[1].contains("rel=\"self\""),
572            "second header should have rel=self"
573        );
574    }
575
576    #[test]
577    fn publisher_link_headers_contain_hub_url() {
578        let pub_ = make_publisher();
579        let headers = pub_.link_headers("https://publisher.example.com/topic");
580        assert!(headers[0].contains("https://hub.example.com/"));
581    }
582
583    #[test]
584    fn publisher_link_headers_contain_topic_url() {
585        let pub_ = make_publisher();
586        let topic = "https://publisher.example.com/topic";
587        let headers = pub_.link_headers(topic);
588        assert!(headers[1].contains(topic));
589    }
590
591    #[test]
592    fn publisher_publish_returns_params() {
593        let pub_ = make_publisher();
594        let params = pub_.publish("https://ds.example.com/").unwrap();
595        assert!(params
596            .iter()
597            .any(|(k, v)| k == "hub.mode" && v == "publish"));
598    }
599
600    #[test]
601    fn publisher_publish_includes_topic() {
602        let pub_ = make_publisher();
603        let topic = "https://ds.example.com/dataset1";
604        let params = pub_.publish(topic).unwrap();
605        assert!(params.iter().any(|(k, v)| k == "hub.url" && v == topic));
606    }
607
608    #[test]
609    fn publisher_publish_empty_topic_error() {
610        let pub_ = make_publisher();
611        assert!(pub_.publish("").is_err());
612    }
613
614    #[test]
615    fn publisher_new_stores_hub_url() {
616        let pub_ = WebSubPublisher::new("https://custom.hub.com/");
617        assert_eq!(pub_.hub_url, "https://custom.hub.com/");
618    }
619
620    // ── WebSubSubscriber: subscribe_params ─────────────────────────────────
621
622    #[test]
623    fn subscriber_subscribe_params_mode() {
624        let sub = make_subscriber();
625        let params = sub.subscribe_params(
626            "https://hub.example.com/",
627            "https://topic.example.com/",
628            86400,
629        );
630        assert!(params
631            .iter()
632            .any(|(k, v)| k == "hub.mode" && v == "subscribe"));
633    }
634
635    #[test]
636    fn subscriber_subscribe_params_callback() {
637        let sub = make_subscriber();
638        let params = sub.subscribe_params(
639            "https://hub.example.com/",
640            "https://topic.example.com/",
641            86400,
642        );
643        assert!(params
644            .iter()
645            .any(|(k, v)| k == "hub.callback" && v == "https://subscriber.example.com/callback"));
646    }
647
648    #[test]
649    fn subscriber_subscribe_params_topic() {
650        let sub = make_subscriber();
651        let topic = "https://topic.example.com/graph1";
652        let params = sub.subscribe_params("https://hub.example.com/", topic, 86400);
653        assert!(params.iter().any(|(k, v)| k == "hub.topic" && v == topic));
654    }
655
656    #[test]
657    fn subscriber_subscribe_params_lease_seconds() {
658        let sub = make_subscriber();
659        let params = sub.subscribe_params(
660            "https://hub.example.com/",
661            "https://topic.example.com/",
662            3600,
663        );
664        assert!(params
665            .iter()
666            .any(|(k, v)| k == "hub.lease_seconds" && v == "3600"));
667    }
668
669    #[test]
670    fn subscriber_subscribe_params_with_secret() {
671        let sub = make_secret_subscriber();
672        let params = sub.subscribe_params(
673            "https://hub.example.com/",
674            "https://topic.example.com/",
675            86400,
676        );
677        assert!(params
678            .iter()
679            .any(|(k, v)| k == "hub.secret" && v == "my-secret-key"));
680    }
681
682    #[test]
683    fn subscriber_subscribe_params_no_secret_if_unset() {
684        let sub = make_subscriber();
685        let params = sub.subscribe_params(
686            "https://hub.example.com/",
687            "https://topic.example.com/",
688            86400,
689        );
690        assert!(!params.iter().any(|(k, _)| k == "hub.secret"));
691    }
692
693    // ── WebSubSubscriber: unsubscribe_params ──────────────────────────────
694
695    #[test]
696    fn subscriber_unsubscribe_params_mode() {
697        let sub = make_subscriber();
698        let params =
699            sub.unsubscribe_params("https://hub.example.com/", "https://topic.example.com/");
700        assert!(params
701            .iter()
702            .any(|(k, v)| k == "hub.mode" && v == "unsubscribe"));
703    }
704
705    #[test]
706    fn subscriber_unsubscribe_params_topic() {
707        let sub = make_subscriber();
708        let topic = "https://topic.example.com/";
709        let params = sub.unsubscribe_params("https://hub.example.com/", topic);
710        assert!(params.iter().any(|(k, v)| k == "hub.topic" && v == topic));
711    }
712
713    #[test]
714    fn subscriber_unsubscribe_params_callback() {
715        let sub = make_subscriber();
716        let params =
717            sub.unsubscribe_params("https://hub.example.com/", "https://topic.example.com/");
718        assert!(params.iter().any(|(k, _)| k == "hub.callback"));
719    }
720
721    // ── WebSubSubscriber: verify_intent ───────────────────────────────────
722
723    #[test]
724    fn verify_intent_subscribe_returns_challenge() {
725        let sub = make_subscriber();
726        let challenge = "abc123xyz";
727        let result = sub.verify_intent(
728            "subscribe",
729            "https://topic.example.com/",
730            challenge,
731            Some(86400),
732        );
733        assert_eq!(result.unwrap(), challenge);
734    }
735
736    #[test]
737    fn verify_intent_unsubscribe_returns_challenge() {
738        let sub = make_subscriber();
739        let challenge = "unsubchallenge";
740        let result =
741            sub.verify_intent("unsubscribe", "https://topic.example.com/", challenge, None);
742        assert_eq!(result.unwrap(), challenge);
743    }
744
745    #[test]
746    fn verify_intent_unknown_mode_is_error() {
747        let sub = make_subscriber();
748        let result = sub.verify_intent("denied", "https://topic.example.com/", "ch", None);
749        assert!(result.is_err());
750    }
751
752    #[test]
753    fn verify_intent_empty_mode_is_error() {
754        let sub = make_subscriber();
755        let result = sub.verify_intent("", "https://topic.example.com/", "ch", None);
756        assert!(result.is_err());
757    }
758
759    #[test]
760    fn verify_intent_empty_topic_is_error() {
761        let sub = make_subscriber();
762        let result = sub.verify_intent("subscribe", "", "ch", None);
763        assert!(result.is_err());
764    }
765
766    #[test]
767    fn verify_intent_empty_challenge_is_error() {
768        let sub = make_subscriber();
769        let result = sub.verify_intent("subscribe", "https://topic.example.com/", "", None);
770        assert!(result.is_err());
771    }
772
773    // ── WebSubSubscriber: verify_signature ────────────────────────────────
774
775    #[test]
776    fn verify_signature_valid() {
777        let sub = make_secret_subscriber();
778        let body = b"hello world notification body";
779        let sig = sub.compute_signature(body).unwrap();
780        assert!(sub.verify_signature(body, &sig).unwrap());
781    }
782
783    #[test]
784    fn verify_signature_wrong_secret_returns_false() {
785        let sub_signer =
786            WebSubSubscriber::new("https://sub.example.com/cb").with_secret("correct-secret");
787        let sub_verifier =
788            WebSubSubscriber::new("https://sub.example.com/cb").with_secret("wrong-secret");
789        let body = b"notification payload";
790        let sig = sub_signer.compute_signature(body).unwrap();
791        assert!(!sub_verifier.verify_signature(body, &sig).unwrap());
792    }
793
794    #[test]
795    fn verify_signature_tampered_body_returns_false() {
796        let sub = make_secret_subscriber();
797        let body = b"original body";
798        let sig = sub.compute_signature(body).unwrap();
799        assert!(!sub.verify_signature(b"tampered body", &sig).unwrap());
800    }
801
802    #[test]
803    fn verify_signature_no_secret_is_error() {
804        let sub = make_subscriber();
805        let result = sub.verify_signature(b"body", "sha256=abc");
806        assert!(result.is_err());
807    }
808
809    #[test]
810    fn verify_signature_bad_algorithm_prefix_is_error() {
811        let sub = make_secret_subscriber();
812        let result = sub.verify_signature(b"body", "md5=deadbeef");
813        assert!(result.is_err());
814    }
815
816    #[test]
817    fn compute_signature_format_starts_with_sha256() {
818        let sub = make_secret_subscriber();
819        let sig = sub.compute_signature(b"data").unwrap();
820        assert!(sig.starts_with("sha256="));
821    }
822
823    #[test]
824    fn compute_signature_hex_length() {
825        // SHA-256 produces 32 bytes => 64 hex chars + "sha256=" prefix = 71
826        let sub = make_secret_subscriber();
827        let sig = sub.compute_signature(b"data").unwrap();
828        assert_eq!(sig.len(), 71);
829    }
830
831    // ── sign_notification helper ──────────────────────────────────────────
832
833    #[test]
834    fn sign_notification_helper_matches_subscriber() {
835        let secret = "shared-secret";
836        let body = b"rdf patch notification";
837        let sig = sign_notification(secret, body).unwrap();
838        let sub = WebSubSubscriber::new("cb").with_secret(secret);
839        assert!(sub.verify_signature(body, &sig).unwrap());
840    }
841
842    // ── ChangeType variants ───────────────────────────────────────────────
843
844    #[test]
845    fn change_type_triple_added() {
846        let ct = ChangeType::TripleAdded;
847        assert_eq!(ct, ChangeType::TripleAdded);
848    }
849
850    #[test]
851    fn change_type_triple_removed() {
852        let ct = ChangeType::TripleRemoved;
853        assert_eq!(ct, ChangeType::TripleRemoved);
854    }
855
856    #[test]
857    fn change_type_graph_cleared() {
858        let ct = ChangeType::GraphCleared;
859        assert_eq!(ct, ChangeType::GraphCleared);
860    }
861
862    #[test]
863    fn change_type_graph_dropped() {
864        let ct = ChangeType::GraphDropped;
865        assert_eq!(ct, ChangeType::GraphDropped);
866    }
867
868    #[test]
869    fn change_type_graph_created() {
870        let ct = ChangeType::GraphCreated;
871        assert_eq!(ct, ChangeType::GraphCreated);
872    }
873
874    #[test]
875    fn change_type_bulk_update_with_count() {
876        let ct = ChangeType::BulkUpdate { triple_count: 500 };
877        if let ChangeType::BulkUpdate { triple_count } = ct {
878            assert_eq!(triple_count, 500);
879        } else {
880            panic!("expected BulkUpdate");
881        }
882    }
883
884    #[test]
885    fn change_type_bulk_update_zero_count() {
886        let ct = ChangeType::BulkUpdate { triple_count: 0 };
887        if let ChangeType::BulkUpdate { triple_count } = ct {
888            assert_eq!(triple_count, 0);
889        } else {
890            panic!("expected BulkUpdate");
891        }
892    }
893
894    // ── DatasetChangeEvent ────────────────────────────────────────────────
895
896    #[test]
897    fn dataset_change_event_has_change_id() {
898        let ev = make_event(ChangeType::TripleAdded);
899        assert!(!ev.change_id.is_empty());
900    }
901
902    #[test]
903    fn dataset_change_event_unique_ids() {
904        let ev1 = make_event(ChangeType::TripleAdded);
905        let ev2 = make_event(ChangeType::TripleAdded);
906        assert_ne!(ev1.change_id, ev2.change_id);
907    }
908
909    #[test]
910    fn dataset_change_event_has_delta_true_when_triples() {
911        let ev = DatasetChangeEvent::new(
912            "https://ds.example.com/",
913            ChangeType::TripleAdded,
914            None,
915            vec![("<s>".to_string(), "<p>".to_string(), "<o>".to_string())],
916            vec![],
917        );
918        assert!(ev.has_delta());
919    }
920
921    #[test]
922    fn dataset_change_event_has_delta_false_when_empty() {
923        let ev = make_event(ChangeType::GraphCreated);
924        assert!(!ev.has_delta());
925    }
926
927    #[test]
928    fn dataset_change_event_stores_dataset_url() {
929        let ev = DatasetChangeEvent::new(
930            "https://my-dataset.example.com/",
931            ChangeType::GraphCreated,
932            None,
933            vec![],
934            vec![],
935        );
936        assert_eq!(ev.dataset_url, "https://my-dataset.example.com/");
937    }
938
939    #[test]
940    fn dataset_change_event_stores_affected_graph() {
941        let graph = "https://my-dataset.example.com/graph1";
942        let ev = DatasetChangeEvent::new(
943            "https://my-dataset.example.com/",
944            ChangeType::GraphCreated,
945            Some(graph.to_string()),
946            vec![],
947            vec![],
948        );
949        assert_eq!(ev.affected_graph.as_deref(), Some(graph));
950    }
951
952    // ── DatasetEventBus ───────────────────────────────────────────────────
953
954    #[test]
955    fn event_bus_subscriber_count_zero_initially() {
956        let bus = DatasetEventBus::new();
957        assert_eq!(bus.subscriber_count(), 0);
958    }
959
960    #[test]
961    fn event_bus_subscriber_count_after_register() {
962        let mut bus = DatasetEventBus::new();
963        let (sub, _) = InMemorySubscriber::new();
964        bus.subscribe(Box::new(sub));
965        assert_eq!(bus.subscriber_count(), 1);
966    }
967
968    #[test]
969    fn event_bus_multiple_subscriber_count() {
970        let mut bus = DatasetEventBus::new();
971        for _ in 0..5 {
972            let (sub, _) = InMemorySubscriber::new();
973            bus.subscribe(Box::new(sub));
974        }
975        assert_eq!(bus.subscriber_count(), 5);
976    }
977
978    #[test]
979    fn event_bus_publish_to_single_subscriber() {
980        let mut bus = DatasetEventBus::new();
981        let (sub, handle) = InMemorySubscriber::new();
982        bus.subscribe(Box::new(sub));
983
984        let ev = make_event(ChangeType::TripleAdded);
985        bus.publish(ev).unwrap();
986
987        let events = handle.lock().unwrap();
988        assert_eq!(events.len(), 1);
989    }
990
991    #[test]
992    fn event_bus_publish_to_multiple_subscribers() {
993        let mut bus = DatasetEventBus::new();
994        let (sub1, handle1) = InMemorySubscriber::new();
995        let (sub2, handle2) = InMemorySubscriber::new();
996        bus.subscribe(Box::new(sub1));
997        bus.subscribe(Box::new(sub2));
998
999        bus.publish(make_event(ChangeType::GraphCreated)).unwrap();
1000
1001        assert_eq!(handle1.lock().unwrap().len(), 1);
1002        assert_eq!(handle2.lock().unwrap().len(), 1);
1003    }
1004
1005    #[test]
1006    fn event_bus_publish_preserves_event_data() {
1007        let mut bus = DatasetEventBus::new();
1008        let (sub, handle) = InMemorySubscriber::new();
1009        bus.subscribe(Box::new(sub));
1010
1011        let ev = DatasetChangeEvent::new(
1012            "https://ds.example.com/",
1013            ChangeType::BulkUpdate { triple_count: 42 },
1014            Some("https://ds.example.com/graph1".to_string()),
1015            vec![("<s>".to_string(), "<p>".to_string(), "<o>".to_string())],
1016            vec![],
1017        );
1018        let expected_id = ev.change_id.clone();
1019        bus.publish(ev).unwrap();
1020
1021        let events = handle.lock().unwrap();
1022        assert_eq!(events[0].change_id, expected_id);
1023    }
1024
1025    // ── InMemorySubscriber ────────────────────────────────────────────────
1026
1027    #[test]
1028    fn in_memory_subscriber_receives_event() {
1029        let (sub, _handle) = InMemorySubscriber::new();
1030        let ev = make_event(ChangeType::TripleAdded);
1031        sub.on_change(&ev).unwrap();
1032        let events = sub.events();
1033        assert_eq!(events.len(), 1);
1034    }
1035
1036    #[test]
1037    fn in_memory_subscriber_accumulates_multiple_events() {
1038        let (sub, _handle) = InMemorySubscriber::new();
1039        for ct in [
1040            ChangeType::TripleAdded,
1041            ChangeType::TripleRemoved,
1042            ChangeType::GraphCleared,
1043        ] {
1044            sub.on_change(&make_event(ct)).unwrap();
1045        }
1046        assert_eq!(sub.events().len(), 3);
1047    }
1048
1049    #[test]
1050    fn in_memory_subscriber_shared_handle_sees_same_events() {
1051        let (sub, handle) = InMemorySubscriber::new();
1052        sub.on_change(&make_event(ChangeType::GraphDropped))
1053            .unwrap();
1054        assert_eq!(handle.lock().unwrap().len(), 1);
1055    }
1056
1057    #[test]
1058    fn in_memory_subscriber_event_change_type_preserved() {
1059        let (sub, _) = InMemorySubscriber::new();
1060        sub.on_change(&make_event(ChangeType::BulkUpdate { triple_count: 100 }))
1061            .unwrap();
1062        let events = sub.events();
1063        if let ChangeType::BulkUpdate { triple_count } = &events[0].change_type {
1064            assert_eq!(*triple_count, 100);
1065        } else {
1066            panic!("wrong change type");
1067        }
1068    }
1069
1070    // ── Round-trip: publish event -> subscriber receives it ────────────────
1071
1072    #[test]
1073    fn round_trip_single_event() {
1074        let mut bus = DatasetEventBus::new();
1075        let (sub, handle) = InMemorySubscriber::new();
1076        bus.subscribe(Box::new(sub));
1077
1078        let ev = DatasetChangeEvent::new(
1079            "https://ds.example.com/",
1080            ChangeType::TripleAdded,
1081            None,
1082            vec![(
1083                "<http://s>".to_string(),
1084                "<http://p>".to_string(),
1085                "<http://o>".to_string(),
1086            )],
1087            vec![],
1088        );
1089        bus.publish(ev).unwrap();
1090
1091        let guard = handle.lock().unwrap();
1092        assert_eq!(guard.len(), 1);
1093        assert!(guard[0].has_delta());
1094    }
1095
1096    #[test]
1097    fn round_trip_bulk_update_event() {
1098        let mut bus = DatasetEventBus::new();
1099        let (sub, handle) = InMemorySubscriber::new();
1100        bus.subscribe(Box::new(sub));
1101
1102        let ev = DatasetChangeEvent::new(
1103            "https://ds.example.com/",
1104            ChangeType::BulkUpdate { triple_count: 9999 },
1105            None,
1106            vec![],
1107            vec![],
1108        );
1109        bus.publish(ev).unwrap();
1110
1111        let guard = handle.lock().unwrap();
1112        assert_eq!(guard.len(), 1);
1113        if let ChangeType::BulkUpdate { triple_count } = guard[0].change_type {
1114            assert_eq!(triple_count, 9999);
1115        } else {
1116            panic!("expected BulkUpdate");
1117        }
1118    }
1119
1120    #[test]
1121    fn round_trip_graph_cleared_event() {
1122        let mut bus = DatasetEventBus::new();
1123        let (sub, handle) = InMemorySubscriber::new();
1124        bus.subscribe(Box::new(sub));
1125
1126        bus.publish(make_event(ChangeType::GraphCleared)).unwrap();
1127
1128        let guard = handle.lock().unwrap();
1129        assert_eq!(guard[0].change_type, ChangeType::GraphCleared);
1130    }
1131
1132    // ── WebSubHub ─────────────────────────────────────────────────────────
1133
1134    #[test]
1135    fn hub_add_and_count_subscriptions() {
1136        let mut hub = WebSubHub::new("https://hub.example.com/");
1137        let sub = Subscription::new(
1138            "https://topic.example.com/",
1139            "https://sub.example.com/cb",
1140            None,
1141            3600,
1142        );
1143        hub.add_subscription(sub);
1144        assert_eq!(hub.subscriptions.len(), 1);
1145    }
1146
1147    #[test]
1148    fn hub_activate_subscription() {
1149        let mut hub = WebSubHub::new("https://hub.example.com/");
1150        let sub = Subscription::new(
1151            "https://topic.example.com/",
1152            "https://sub.example.com/cb",
1153            None,
1154            3600,
1155        );
1156        let idx = hub.add_subscription(sub);
1157        hub.activate(idx).unwrap();
1158        assert!(hub.subscriptions[idx].active);
1159    }
1160
1161    #[test]
1162    fn hub_active_subscriptions_for_topic() {
1163        let mut hub = WebSubHub::new("https://hub.example.com/");
1164        let sub = Subscription::new(
1165            "https://topic.example.com/",
1166            "https://sub.example.com/cb",
1167            None,
1168            3600,
1169        );
1170        let idx = hub.add_subscription(sub);
1171        hub.activate(idx).unwrap();
1172        let active = hub.active_subscriptions_for("https://topic.example.com/");
1173        assert_eq!(active.len(), 1);
1174    }
1175
1176    #[test]
1177    fn hub_active_subscriptions_empty_for_unknown_topic() {
1178        let hub = WebSubHub::new("https://hub.example.com/");
1179        let active = hub.active_subscriptions_for("https://unknown.example.com/");
1180        assert!(active.is_empty());
1181    }
1182
1183    #[test]
1184    fn subscription_new_is_not_active_by_default() {
1185        let sub = Subscription::new(
1186            "https://topic.example.com/",
1187            "https://cb.example.com/",
1188            None,
1189            3600,
1190        );
1191        assert!(!sub.active);
1192    }
1193
1194    #[test]
1195    fn subscription_is_valid_when_active_and_not_expired() {
1196        let mut sub = Subscription::new(
1197            "https://topic.example.com/",
1198            "https://cb.example.com/",
1199            None,
1200            9999,
1201        );
1202        sub.active = true;
1203        assert!(sub.is_valid());
1204    }
1205
1206    #[test]
1207    fn subscription_is_invalid_when_not_active() {
1208        let sub = Subscription::new(
1209            "https://topic.example.com/",
1210            "https://cb.example.com/",
1211            None,
1212            9999,
1213        );
1214        assert!(!sub.is_valid());
1215    }
1216}