1use hmac::{Hmac, Mac};
12use sha2::Sha256;
13use std::fmt;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, SystemTime};
16use uuid::Uuid;
17
18#[derive(Debug)]
22pub enum WebSubError {
23 InvalidUrl(String),
25 MissingParameter(String),
27 VerificationFailed(String),
29 HmacError(String),
31 SubscriberError(String),
33 LockPoisoned,
35}
36
37impl fmt::Display for WebSubError {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 match self {
40 WebSubError::InvalidUrl(msg) => write!(f, "invalid URL: {msg}"),
41 WebSubError::MissingParameter(p) => write!(f, "missing parameter: {p}"),
42 WebSubError::VerificationFailed(msg) => write!(f, "verification failed: {msg}"),
43 WebSubError::HmacError(msg) => write!(f, "HMAC error: {msg}"),
44 WebSubError::SubscriberError(msg) => write!(f, "subscriber error: {msg}"),
45 WebSubError::LockPoisoned => write!(f, "mutex lock was poisoned"),
46 }
47 }
48}
49
50impl std::error::Error for WebSubError {}
51
52pub type Result<T> = std::result::Result<T, WebSubError>;
54
55#[derive(Debug, Clone)]
59pub struct Subscription {
60 pub topic_url: String,
62 pub callback_url: String,
64 pub secret: Option<String>,
66 pub lease_seconds: u64,
68 pub expires_at: SystemTime,
70 pub active: bool,
72}
73
74impl Subscription {
75 pub fn new(
77 topic_url: impl Into<String>,
78 callback_url: impl Into<String>,
79 secret: Option<String>,
80 lease_seconds: u64,
81 ) -> Self {
82 let expires_at = SystemTime::now() + Duration::from_secs(lease_seconds);
83 Self {
84 topic_url: topic_url.into(),
85 callback_url: callback_url.into(),
86 secret,
87 lease_seconds,
88 expires_at,
89 active: false,
90 }
91 }
92
93 pub fn is_valid(&self) -> bool {
95 self.active && SystemTime::now() < self.expires_at
96 }
97}
98
99#[derive(Debug, Default)]
105pub struct WebSubHub {
106 pub hub_url: String,
108 pub subscriptions: Vec<Subscription>,
110}
111
112impl WebSubHub {
113 pub fn new(hub_url: impl Into<String>) -> Self {
115 Self {
116 hub_url: hub_url.into(),
117 subscriptions: Vec::new(),
118 }
119 }
120
121 pub fn add_subscription(&mut self, sub: Subscription) -> usize {
123 let idx = self.subscriptions.len();
124 self.subscriptions.push(sub);
125 idx
126 }
127
128 pub fn activate(&mut self, index: usize) -> Result<()> {
130 let sub = self
131 .subscriptions
132 .get_mut(index)
133 .ok_or_else(|| WebSubError::MissingParameter(format!("index {index}")))?;
134 sub.active = true;
135 Ok(())
136 }
137
138 pub fn active_subscriptions_for(&self, topic_url: &str) -> Vec<&Subscription> {
140 self.subscriptions
141 .iter()
142 .filter(|s| s.topic_url == topic_url && s.is_valid())
143 .collect()
144 }
145
146 pub fn purge_expired(&mut self) -> usize {
148 let before = self.subscriptions.len();
149 self.subscriptions
150 .retain(|s| SystemTime::now() < s.expires_at);
151 before - self.subscriptions.len()
152 }
153}
154
155#[derive(Debug, Clone)]
160pub struct WebSubPublisher {
161 pub hub_url: String,
163}
164
165impl WebSubPublisher {
166 pub fn new(hub_url: impl Into<String>) -> Self {
168 Self {
169 hub_url: hub_url.into(),
170 }
171 }
172
173 pub fn publish_params(&self, topic_url: &str) -> Vec<(String, String)> {
175 vec![
176 ("hub.mode".to_string(), "publish".to_string()),
177 ("hub.url".to_string(), topic_url.to_string()),
178 ]
179 }
180
181 pub fn publish(&self, topic_url: &str) -> Result<Vec<(String, String)>> {
186 if topic_url.is_empty() {
187 return Err(WebSubError::MissingParameter("topic_url".to_string()));
188 }
189 Ok(self.publish_params(topic_url))
190 }
191
192 pub fn link_headers(&self, topic_url: &str) -> Vec<String> {
200 vec![
201 format!("<{}>; rel=\"hub\"", self.hub_url),
202 format!("<{}>; rel=\"self\"", topic_url),
203 ]
204 }
205}
206
207#[derive(Debug, Clone)]
211pub struct WebSubSubscriber {
212 pub callback_url: String,
214 pub secret: Option<String>,
216}
217
218impl WebSubSubscriber {
219 pub fn new(callback_url: impl Into<String>) -> Self {
221 Self {
222 callback_url: callback_url.into(),
223 secret: None,
224 }
225 }
226
227 pub fn with_secret(mut self, secret: impl Into<String>) -> Self {
229 self.secret = Some(secret.into());
230 self
231 }
232
233 pub fn subscribe_params(
235 &self,
236 hub_url: &str,
237 topic_url: &str,
238 lease_seconds: u64,
239 ) -> Vec<(String, String)> {
240 let mut params = vec![
241 ("hub.callback".to_string(), self.callback_url.clone()),
242 ("hub.mode".to_string(), "subscribe".to_string()),
243 ("hub.topic".to_string(), topic_url.to_string()),
244 ("hub.lease_seconds".to_string(), lease_seconds.to_string()),
245 ];
246 if let Some(ref s) = self.secret {
247 params.push(("hub.secret".to_string(), s.clone()));
248 }
249 params.push(("hub.hub_url".to_string(), hub_url.to_string()));
250 params
251 }
252
253 pub fn unsubscribe_params(&self, hub_url: &str, topic_url: &str) -> Vec<(String, String)> {
255 vec![
256 ("hub.callback".to_string(), self.callback_url.clone()),
257 ("hub.mode".to_string(), "unsubscribe".to_string()),
258 ("hub.topic".to_string(), topic_url.to_string()),
259 ("hub.hub_url".to_string(), hub_url.to_string()),
260 ]
261 }
262
263 pub fn verify_intent(
268 &self,
269 mode: &str,
270 topic: &str,
271 challenge: &str,
272 _lease_seconds: Option<u64>,
273 ) -> Result<String> {
274 if mode.is_empty() {
275 return Err(WebSubError::MissingParameter("hub.mode".to_string()));
276 }
277 if topic.is_empty() {
278 return Err(WebSubError::MissingParameter("hub.topic".to_string()));
279 }
280 if challenge.is_empty() {
281 return Err(WebSubError::MissingParameter("hub.challenge".to_string()));
282 }
283 match mode {
284 "subscribe" | "unsubscribe" => Ok(challenge.to_string()),
285 other => Err(WebSubError::VerificationFailed(format!(
286 "unknown mode: {other}"
287 ))),
288 }
289 }
290
291 pub fn verify_signature(&self, body: &[u8], signature_header: &str) -> Result<bool> {
297 let secret = self
298 .secret
299 .as_deref()
300 .ok_or_else(|| WebSubError::VerificationFailed("no secret configured".to_string()))?;
301
302 let expected_hex = signature_header.strip_prefix("sha256=").ok_or_else(|| {
303 WebSubError::VerificationFailed(format!(
304 "unsupported signature algorithm in: {signature_header}"
305 ))
306 })?;
307
308 type HmacSha256 = Hmac<Sha256>;
309 let mut mac = HmacSha256::new_from_slice(secret.as_bytes())
310 .map_err(|e| WebSubError::HmacError(e.to_string()))?;
311 mac.update(body);
312 let computed = mac.finalize().into_bytes();
313 let computed_hex = hex::encode(computed);
314
315 Ok(computed_hex == expected_hex)
316 }
317
318 pub fn compute_signature(&self, body: &[u8]) -> Result<String> {
322 let secret = self
323 .secret
324 .as_deref()
325 .ok_or_else(|| WebSubError::VerificationFailed("no secret configured".to_string()))?;
326
327 type HmacSha256 = Hmac<Sha256>;
328 let mut mac = HmacSha256::new_from_slice(secret.as_bytes())
329 .map_err(|e| WebSubError::HmacError(e.to_string()))?;
330 mac.update(body);
331 let digest = mac.finalize().into_bytes();
332 Ok(format!("sha256={}", hex::encode(digest)))
333 }
334}
335
336#[derive(Debug, Clone, PartialEq, Eq)]
340pub enum ChangeType {
341 TripleAdded,
343 TripleRemoved,
345 GraphCleared,
347 GraphDropped,
349 GraphCreated,
351 BulkUpdate {
353 triple_count: usize,
355 },
356}
357
358#[derive(Debug, Clone)]
362pub struct DatasetChangeEvent {
363 pub dataset_url: String,
365 pub change_type: ChangeType,
367 pub affected_graph: Option<String>,
369 pub added_triples: Vec<(String, String, String)>,
371 pub removed_triples: Vec<(String, String, String)>,
373 pub timestamp: SystemTime,
375 pub change_id: String,
377}
378
379impl DatasetChangeEvent {
380 pub fn new(
382 dataset_url: impl Into<String>,
383 change_type: ChangeType,
384 affected_graph: Option<String>,
385 added_triples: Vec<(String, String, String)>,
386 removed_triples: Vec<(String, String, String)>,
387 ) -> Self {
388 Self {
389 dataset_url: dataset_url.into(),
390 change_type,
391 affected_graph,
392 added_triples,
393 removed_triples,
394 timestamp: SystemTime::now(),
395 change_id: Uuid::new_v4().to_string(),
396 }
397 }
398
399 pub fn has_delta(&self) -> bool {
401 !self.added_triples.is_empty() || !self.removed_triples.is_empty()
402 }
403}
404
405pub trait ChangeSubscriber: Send + Sync {
409 fn on_change(&self, event: &DatasetChangeEvent) -> Result<()>;
411}
412
413pub struct DatasetEventBus {
422 subscribers: Vec<Box<dyn ChangeSubscriber>>,
423}
424
425impl Default for DatasetEventBus {
426 fn default() -> Self {
427 Self::new()
428 }
429}
430
431impl DatasetEventBus {
432 pub fn new() -> Self {
434 Self {
435 subscribers: Vec::new(),
436 }
437 }
438
439 pub fn subscribe(&mut self, subscriber: Box<dyn ChangeSubscriber>) {
441 self.subscribers.push(subscriber);
442 }
443
444 pub fn publish(&self, event: DatasetChangeEvent) -> Result<()> {
449 let mut first_error: Option<WebSubError> = None;
450 for sub in &self.subscribers {
451 if let Err(e) = sub.on_change(&event) {
452 if first_error.is_none() {
453 first_error = Some(e);
454 }
455 }
456 }
457 match first_error {
458 Some(e) => Err(e),
459 None => Ok(()),
460 }
461 }
462
463 pub fn subscriber_count(&self) -> usize {
465 self.subscribers.len()
466 }
467}
468
469pub struct InMemorySubscriber {
474 received: Arc<Mutex<Vec<DatasetChangeEvent>>>,
475}
476
477impl InMemorySubscriber {
478 pub fn new() -> (Self, Arc<Mutex<Vec<DatasetChangeEvent>>>) {
482 let received = Arc::new(Mutex::new(Vec::new()));
483 let handle = Arc::clone(&received);
484 (Self { received }, handle)
485 }
486
487 pub fn events(&self) -> Vec<DatasetChangeEvent> {
489 self.received.lock().map(|g| g.clone()).unwrap_or_default()
490 }
491}
492
493impl Default for InMemorySubscriber {
494 fn default() -> Self {
495 Self {
496 received: Arc::new(Mutex::new(Vec::new())),
497 }
498 }
499}
500
501impl ChangeSubscriber for InMemorySubscriber {
502 fn on_change(&self, event: &DatasetChangeEvent) -> Result<()> {
503 self.received
504 .lock()
505 .map_err(|_| WebSubError::LockPoisoned)?
506 .push(event.clone());
507 Ok(())
508 }
509}
510
511pub fn sign_notification(secret: &str, body: &[u8]) -> Result<String> {
518 type HmacSha256 = Hmac<Sha256>;
519 let mut mac = HmacSha256::new_from_slice(secret.as_bytes())
520 .map_err(|e| WebSubError::HmacError(e.to_string()))?;
521 mac.update(body);
522 let digest = mac.finalize().into_bytes();
523 Ok(format!("sha256={}", hex::encode(digest)))
524}
525
526#[cfg(test)]
531mod tests {
532 use super::*;
533
534 fn make_publisher() -> WebSubPublisher {
537 WebSubPublisher::new("https://hub.example.com/")
538 }
539
540 fn make_subscriber() -> WebSubSubscriber {
541 WebSubSubscriber::new("https://subscriber.example.com/callback")
542 }
543
544 fn make_secret_subscriber() -> WebSubSubscriber {
545 WebSubSubscriber::new("https://subscriber.example.com/callback")
546 .with_secret("my-secret-key")
547 }
548
549 fn make_event(ct: ChangeType) -> DatasetChangeEvent {
550 DatasetChangeEvent::new("https://ds.example.com/", ct, None, vec![], vec![])
551 }
552
553 #[test]
556 fn publisher_link_headers_hub_rel() {
557 let pub_ = make_publisher();
558 let headers = pub_.link_headers("https://publisher.example.com/topic");
559 assert_eq!(headers.len(), 2);
560 assert!(
561 headers[0].contains("rel=\"hub\""),
562 "first header should have rel=hub"
563 );
564 }
565
566 #[test]
567 fn publisher_link_headers_self_rel() {
568 let pub_ = make_publisher();
569 let headers = pub_.link_headers("https://publisher.example.com/topic");
570 assert!(
571 headers[1].contains("rel=\"self\""),
572 "second header should have rel=self"
573 );
574 }
575
576 #[test]
577 fn publisher_link_headers_contain_hub_url() {
578 let pub_ = make_publisher();
579 let headers = pub_.link_headers("https://publisher.example.com/topic");
580 assert!(headers[0].contains("https://hub.example.com/"));
581 }
582
583 #[test]
584 fn publisher_link_headers_contain_topic_url() {
585 let pub_ = make_publisher();
586 let topic = "https://publisher.example.com/topic";
587 let headers = pub_.link_headers(topic);
588 assert!(headers[1].contains(topic));
589 }
590
591 #[test]
592 fn publisher_publish_returns_params() {
593 let pub_ = make_publisher();
594 let params = pub_.publish("https://ds.example.com/").unwrap();
595 assert!(params
596 .iter()
597 .any(|(k, v)| k == "hub.mode" && v == "publish"));
598 }
599
600 #[test]
601 fn publisher_publish_includes_topic() {
602 let pub_ = make_publisher();
603 let topic = "https://ds.example.com/dataset1";
604 let params = pub_.publish(topic).unwrap();
605 assert!(params.iter().any(|(k, v)| k == "hub.url" && v == topic));
606 }
607
608 #[test]
609 fn publisher_publish_empty_topic_error() {
610 let pub_ = make_publisher();
611 assert!(pub_.publish("").is_err());
612 }
613
614 #[test]
615 fn publisher_new_stores_hub_url() {
616 let pub_ = WebSubPublisher::new("https://custom.hub.com/");
617 assert_eq!(pub_.hub_url, "https://custom.hub.com/");
618 }
619
620 #[test]
623 fn subscriber_subscribe_params_mode() {
624 let sub = make_subscriber();
625 let params = sub.subscribe_params(
626 "https://hub.example.com/",
627 "https://topic.example.com/",
628 86400,
629 );
630 assert!(params
631 .iter()
632 .any(|(k, v)| k == "hub.mode" && v == "subscribe"));
633 }
634
635 #[test]
636 fn subscriber_subscribe_params_callback() {
637 let sub = make_subscriber();
638 let params = sub.subscribe_params(
639 "https://hub.example.com/",
640 "https://topic.example.com/",
641 86400,
642 );
643 assert!(params
644 .iter()
645 .any(|(k, v)| k == "hub.callback" && v == "https://subscriber.example.com/callback"));
646 }
647
648 #[test]
649 fn subscriber_subscribe_params_topic() {
650 let sub = make_subscriber();
651 let topic = "https://topic.example.com/graph1";
652 let params = sub.subscribe_params("https://hub.example.com/", topic, 86400);
653 assert!(params.iter().any(|(k, v)| k == "hub.topic" && v == topic));
654 }
655
656 #[test]
657 fn subscriber_subscribe_params_lease_seconds() {
658 let sub = make_subscriber();
659 let params = sub.subscribe_params(
660 "https://hub.example.com/",
661 "https://topic.example.com/",
662 3600,
663 );
664 assert!(params
665 .iter()
666 .any(|(k, v)| k == "hub.lease_seconds" && v == "3600"));
667 }
668
669 #[test]
670 fn subscriber_subscribe_params_with_secret() {
671 let sub = make_secret_subscriber();
672 let params = sub.subscribe_params(
673 "https://hub.example.com/",
674 "https://topic.example.com/",
675 86400,
676 );
677 assert!(params
678 .iter()
679 .any(|(k, v)| k == "hub.secret" && v == "my-secret-key"));
680 }
681
682 #[test]
683 fn subscriber_subscribe_params_no_secret_if_unset() {
684 let sub = make_subscriber();
685 let params = sub.subscribe_params(
686 "https://hub.example.com/",
687 "https://topic.example.com/",
688 86400,
689 );
690 assert!(!params.iter().any(|(k, _)| k == "hub.secret"));
691 }
692
693 #[test]
696 fn subscriber_unsubscribe_params_mode() {
697 let sub = make_subscriber();
698 let params =
699 sub.unsubscribe_params("https://hub.example.com/", "https://topic.example.com/");
700 assert!(params
701 .iter()
702 .any(|(k, v)| k == "hub.mode" && v == "unsubscribe"));
703 }
704
705 #[test]
706 fn subscriber_unsubscribe_params_topic() {
707 let sub = make_subscriber();
708 let topic = "https://topic.example.com/";
709 let params = sub.unsubscribe_params("https://hub.example.com/", topic);
710 assert!(params.iter().any(|(k, v)| k == "hub.topic" && v == topic));
711 }
712
713 #[test]
714 fn subscriber_unsubscribe_params_callback() {
715 let sub = make_subscriber();
716 let params =
717 sub.unsubscribe_params("https://hub.example.com/", "https://topic.example.com/");
718 assert!(params.iter().any(|(k, _)| k == "hub.callback"));
719 }
720
721 #[test]
724 fn verify_intent_subscribe_returns_challenge() {
725 let sub = make_subscriber();
726 let challenge = "abc123xyz";
727 let result = sub.verify_intent(
728 "subscribe",
729 "https://topic.example.com/",
730 challenge,
731 Some(86400),
732 );
733 assert_eq!(result.unwrap(), challenge);
734 }
735
736 #[test]
737 fn verify_intent_unsubscribe_returns_challenge() {
738 let sub = make_subscriber();
739 let challenge = "unsubchallenge";
740 let result =
741 sub.verify_intent("unsubscribe", "https://topic.example.com/", challenge, None);
742 assert_eq!(result.unwrap(), challenge);
743 }
744
745 #[test]
746 fn verify_intent_unknown_mode_is_error() {
747 let sub = make_subscriber();
748 let result = sub.verify_intent("denied", "https://topic.example.com/", "ch", None);
749 assert!(result.is_err());
750 }
751
752 #[test]
753 fn verify_intent_empty_mode_is_error() {
754 let sub = make_subscriber();
755 let result = sub.verify_intent("", "https://topic.example.com/", "ch", None);
756 assert!(result.is_err());
757 }
758
759 #[test]
760 fn verify_intent_empty_topic_is_error() {
761 let sub = make_subscriber();
762 let result = sub.verify_intent("subscribe", "", "ch", None);
763 assert!(result.is_err());
764 }
765
766 #[test]
767 fn verify_intent_empty_challenge_is_error() {
768 let sub = make_subscriber();
769 let result = sub.verify_intent("subscribe", "https://topic.example.com/", "", None);
770 assert!(result.is_err());
771 }
772
773 #[test]
776 fn verify_signature_valid() {
777 let sub = make_secret_subscriber();
778 let body = b"hello world notification body";
779 let sig = sub.compute_signature(body).unwrap();
780 assert!(sub.verify_signature(body, &sig).unwrap());
781 }
782
783 #[test]
784 fn verify_signature_wrong_secret_returns_false() {
785 let sub_signer =
786 WebSubSubscriber::new("https://sub.example.com/cb").with_secret("correct-secret");
787 let sub_verifier =
788 WebSubSubscriber::new("https://sub.example.com/cb").with_secret("wrong-secret");
789 let body = b"notification payload";
790 let sig = sub_signer.compute_signature(body).unwrap();
791 assert!(!sub_verifier.verify_signature(body, &sig).unwrap());
792 }
793
794 #[test]
795 fn verify_signature_tampered_body_returns_false() {
796 let sub = make_secret_subscriber();
797 let body = b"original body";
798 let sig = sub.compute_signature(body).unwrap();
799 assert!(!sub.verify_signature(b"tampered body", &sig).unwrap());
800 }
801
802 #[test]
803 fn verify_signature_no_secret_is_error() {
804 let sub = make_subscriber();
805 let result = sub.verify_signature(b"body", "sha256=abc");
806 assert!(result.is_err());
807 }
808
809 #[test]
810 fn verify_signature_bad_algorithm_prefix_is_error() {
811 let sub = make_secret_subscriber();
812 let result = sub.verify_signature(b"body", "md5=deadbeef");
813 assert!(result.is_err());
814 }
815
816 #[test]
817 fn compute_signature_format_starts_with_sha256() {
818 let sub = make_secret_subscriber();
819 let sig = sub.compute_signature(b"data").unwrap();
820 assert!(sig.starts_with("sha256="));
821 }
822
823 #[test]
824 fn compute_signature_hex_length() {
825 let sub = make_secret_subscriber();
827 let sig = sub.compute_signature(b"data").unwrap();
828 assert_eq!(sig.len(), 71);
829 }
830
831 #[test]
834 fn sign_notification_helper_matches_subscriber() {
835 let secret = "shared-secret";
836 let body = b"rdf patch notification";
837 let sig = sign_notification(secret, body).unwrap();
838 let sub = WebSubSubscriber::new("cb").with_secret(secret);
839 assert!(sub.verify_signature(body, &sig).unwrap());
840 }
841
842 #[test]
845 fn change_type_triple_added() {
846 let ct = ChangeType::TripleAdded;
847 assert_eq!(ct, ChangeType::TripleAdded);
848 }
849
850 #[test]
851 fn change_type_triple_removed() {
852 let ct = ChangeType::TripleRemoved;
853 assert_eq!(ct, ChangeType::TripleRemoved);
854 }
855
856 #[test]
857 fn change_type_graph_cleared() {
858 let ct = ChangeType::GraphCleared;
859 assert_eq!(ct, ChangeType::GraphCleared);
860 }
861
862 #[test]
863 fn change_type_graph_dropped() {
864 let ct = ChangeType::GraphDropped;
865 assert_eq!(ct, ChangeType::GraphDropped);
866 }
867
868 #[test]
869 fn change_type_graph_created() {
870 let ct = ChangeType::GraphCreated;
871 assert_eq!(ct, ChangeType::GraphCreated);
872 }
873
874 #[test]
875 fn change_type_bulk_update_with_count() {
876 let ct = ChangeType::BulkUpdate { triple_count: 500 };
877 if let ChangeType::BulkUpdate { triple_count } = ct {
878 assert_eq!(triple_count, 500);
879 } else {
880 panic!("expected BulkUpdate");
881 }
882 }
883
884 #[test]
885 fn change_type_bulk_update_zero_count() {
886 let ct = ChangeType::BulkUpdate { triple_count: 0 };
887 if let ChangeType::BulkUpdate { triple_count } = ct {
888 assert_eq!(triple_count, 0);
889 } else {
890 panic!("expected BulkUpdate");
891 }
892 }
893
894 #[test]
897 fn dataset_change_event_has_change_id() {
898 let ev = make_event(ChangeType::TripleAdded);
899 assert!(!ev.change_id.is_empty());
900 }
901
902 #[test]
903 fn dataset_change_event_unique_ids() {
904 let ev1 = make_event(ChangeType::TripleAdded);
905 let ev2 = make_event(ChangeType::TripleAdded);
906 assert_ne!(ev1.change_id, ev2.change_id);
907 }
908
909 #[test]
910 fn dataset_change_event_has_delta_true_when_triples() {
911 let ev = DatasetChangeEvent::new(
912 "https://ds.example.com/",
913 ChangeType::TripleAdded,
914 None,
915 vec![("<s>".to_string(), "<p>".to_string(), "<o>".to_string())],
916 vec![],
917 );
918 assert!(ev.has_delta());
919 }
920
921 #[test]
922 fn dataset_change_event_has_delta_false_when_empty() {
923 let ev = make_event(ChangeType::GraphCreated);
924 assert!(!ev.has_delta());
925 }
926
927 #[test]
928 fn dataset_change_event_stores_dataset_url() {
929 let ev = DatasetChangeEvent::new(
930 "https://my-dataset.example.com/",
931 ChangeType::GraphCreated,
932 None,
933 vec![],
934 vec![],
935 );
936 assert_eq!(ev.dataset_url, "https://my-dataset.example.com/");
937 }
938
939 #[test]
940 fn dataset_change_event_stores_affected_graph() {
941 let graph = "https://my-dataset.example.com/graph1";
942 let ev = DatasetChangeEvent::new(
943 "https://my-dataset.example.com/",
944 ChangeType::GraphCreated,
945 Some(graph.to_string()),
946 vec![],
947 vec![],
948 );
949 assert_eq!(ev.affected_graph.as_deref(), Some(graph));
950 }
951
952 #[test]
955 fn event_bus_subscriber_count_zero_initially() {
956 let bus = DatasetEventBus::new();
957 assert_eq!(bus.subscriber_count(), 0);
958 }
959
960 #[test]
961 fn event_bus_subscriber_count_after_register() {
962 let mut bus = DatasetEventBus::new();
963 let (sub, _) = InMemorySubscriber::new();
964 bus.subscribe(Box::new(sub));
965 assert_eq!(bus.subscriber_count(), 1);
966 }
967
968 #[test]
969 fn event_bus_multiple_subscriber_count() {
970 let mut bus = DatasetEventBus::new();
971 for _ in 0..5 {
972 let (sub, _) = InMemorySubscriber::new();
973 bus.subscribe(Box::new(sub));
974 }
975 assert_eq!(bus.subscriber_count(), 5);
976 }
977
978 #[test]
979 fn event_bus_publish_to_single_subscriber() {
980 let mut bus = DatasetEventBus::new();
981 let (sub, handle) = InMemorySubscriber::new();
982 bus.subscribe(Box::new(sub));
983
984 let ev = make_event(ChangeType::TripleAdded);
985 bus.publish(ev).unwrap();
986
987 let events = handle.lock().unwrap();
988 assert_eq!(events.len(), 1);
989 }
990
991 #[test]
992 fn event_bus_publish_to_multiple_subscribers() {
993 let mut bus = DatasetEventBus::new();
994 let (sub1, handle1) = InMemorySubscriber::new();
995 let (sub2, handle2) = InMemorySubscriber::new();
996 bus.subscribe(Box::new(sub1));
997 bus.subscribe(Box::new(sub2));
998
999 bus.publish(make_event(ChangeType::GraphCreated)).unwrap();
1000
1001 assert_eq!(handle1.lock().unwrap().len(), 1);
1002 assert_eq!(handle2.lock().unwrap().len(), 1);
1003 }
1004
1005 #[test]
1006 fn event_bus_publish_preserves_event_data() {
1007 let mut bus = DatasetEventBus::new();
1008 let (sub, handle) = InMemorySubscriber::new();
1009 bus.subscribe(Box::new(sub));
1010
1011 let ev = DatasetChangeEvent::new(
1012 "https://ds.example.com/",
1013 ChangeType::BulkUpdate { triple_count: 42 },
1014 Some("https://ds.example.com/graph1".to_string()),
1015 vec![("<s>".to_string(), "<p>".to_string(), "<o>".to_string())],
1016 vec![],
1017 );
1018 let expected_id = ev.change_id.clone();
1019 bus.publish(ev).unwrap();
1020
1021 let events = handle.lock().unwrap();
1022 assert_eq!(events[0].change_id, expected_id);
1023 }
1024
1025 #[test]
1028 fn in_memory_subscriber_receives_event() {
1029 let (sub, _handle) = InMemorySubscriber::new();
1030 let ev = make_event(ChangeType::TripleAdded);
1031 sub.on_change(&ev).unwrap();
1032 let events = sub.events();
1033 assert_eq!(events.len(), 1);
1034 }
1035
1036 #[test]
1037 fn in_memory_subscriber_accumulates_multiple_events() {
1038 let (sub, _handle) = InMemorySubscriber::new();
1039 for ct in [
1040 ChangeType::TripleAdded,
1041 ChangeType::TripleRemoved,
1042 ChangeType::GraphCleared,
1043 ] {
1044 sub.on_change(&make_event(ct)).unwrap();
1045 }
1046 assert_eq!(sub.events().len(), 3);
1047 }
1048
1049 #[test]
1050 fn in_memory_subscriber_shared_handle_sees_same_events() {
1051 let (sub, handle) = InMemorySubscriber::new();
1052 sub.on_change(&make_event(ChangeType::GraphDropped))
1053 .unwrap();
1054 assert_eq!(handle.lock().unwrap().len(), 1);
1055 }
1056
1057 #[test]
1058 fn in_memory_subscriber_event_change_type_preserved() {
1059 let (sub, _) = InMemorySubscriber::new();
1060 sub.on_change(&make_event(ChangeType::BulkUpdate { triple_count: 100 }))
1061 .unwrap();
1062 let events = sub.events();
1063 if let ChangeType::BulkUpdate { triple_count } = &events[0].change_type {
1064 assert_eq!(*triple_count, 100);
1065 } else {
1066 panic!("wrong change type");
1067 }
1068 }
1069
1070 #[test]
1073 fn round_trip_single_event() {
1074 let mut bus = DatasetEventBus::new();
1075 let (sub, handle) = InMemorySubscriber::new();
1076 bus.subscribe(Box::new(sub));
1077
1078 let ev = DatasetChangeEvent::new(
1079 "https://ds.example.com/",
1080 ChangeType::TripleAdded,
1081 None,
1082 vec![(
1083 "<http://s>".to_string(),
1084 "<http://p>".to_string(),
1085 "<http://o>".to_string(),
1086 )],
1087 vec![],
1088 );
1089 bus.publish(ev).unwrap();
1090
1091 let guard = handle.lock().unwrap();
1092 assert_eq!(guard.len(), 1);
1093 assert!(guard[0].has_delta());
1094 }
1095
1096 #[test]
1097 fn round_trip_bulk_update_event() {
1098 let mut bus = DatasetEventBus::new();
1099 let (sub, handle) = InMemorySubscriber::new();
1100 bus.subscribe(Box::new(sub));
1101
1102 let ev = DatasetChangeEvent::new(
1103 "https://ds.example.com/",
1104 ChangeType::BulkUpdate { triple_count: 9999 },
1105 None,
1106 vec![],
1107 vec![],
1108 );
1109 bus.publish(ev).unwrap();
1110
1111 let guard = handle.lock().unwrap();
1112 assert_eq!(guard.len(), 1);
1113 if let ChangeType::BulkUpdate { triple_count } = guard[0].change_type {
1114 assert_eq!(triple_count, 9999);
1115 } else {
1116 panic!("expected BulkUpdate");
1117 }
1118 }
1119
1120 #[test]
1121 fn round_trip_graph_cleared_event() {
1122 let mut bus = DatasetEventBus::new();
1123 let (sub, handle) = InMemorySubscriber::new();
1124 bus.subscribe(Box::new(sub));
1125
1126 bus.publish(make_event(ChangeType::GraphCleared)).unwrap();
1127
1128 let guard = handle.lock().unwrap();
1129 assert_eq!(guard[0].change_type, ChangeType::GraphCleared);
1130 }
1131
1132 #[test]
1135 fn hub_add_and_count_subscriptions() {
1136 let mut hub = WebSubHub::new("https://hub.example.com/");
1137 let sub = Subscription::new(
1138 "https://topic.example.com/",
1139 "https://sub.example.com/cb",
1140 None,
1141 3600,
1142 );
1143 hub.add_subscription(sub);
1144 assert_eq!(hub.subscriptions.len(), 1);
1145 }
1146
1147 #[test]
1148 fn hub_activate_subscription() {
1149 let mut hub = WebSubHub::new("https://hub.example.com/");
1150 let sub = Subscription::new(
1151 "https://topic.example.com/",
1152 "https://sub.example.com/cb",
1153 None,
1154 3600,
1155 );
1156 let idx = hub.add_subscription(sub);
1157 hub.activate(idx).unwrap();
1158 assert!(hub.subscriptions[idx].active);
1159 }
1160
1161 #[test]
1162 fn hub_active_subscriptions_for_topic() {
1163 let mut hub = WebSubHub::new("https://hub.example.com/");
1164 let sub = Subscription::new(
1165 "https://topic.example.com/",
1166 "https://sub.example.com/cb",
1167 None,
1168 3600,
1169 );
1170 let idx = hub.add_subscription(sub);
1171 hub.activate(idx).unwrap();
1172 let active = hub.active_subscriptions_for("https://topic.example.com/");
1173 assert_eq!(active.len(), 1);
1174 }
1175
1176 #[test]
1177 fn hub_active_subscriptions_empty_for_unknown_topic() {
1178 let hub = WebSubHub::new("https://hub.example.com/");
1179 let active = hub.active_subscriptions_for("https://unknown.example.com/");
1180 assert!(active.is_empty());
1181 }
1182
1183 #[test]
1184 fn subscription_new_is_not_active_by_default() {
1185 let sub = Subscription::new(
1186 "https://topic.example.com/",
1187 "https://cb.example.com/",
1188 None,
1189 3600,
1190 );
1191 assert!(!sub.active);
1192 }
1193
1194 #[test]
1195 fn subscription_is_valid_when_active_and_not_expired() {
1196 let mut sub = Subscription::new(
1197 "https://topic.example.com/",
1198 "https://cb.example.com/",
1199 None,
1200 9999,
1201 );
1202 sub.active = true;
1203 assert!(sub.is_valid());
1204 }
1205
1206 #[test]
1207 fn subscription_is_invalid_when_not_active() {
1208 let sub = Subscription::new(
1209 "https://topic.example.com/",
1210 "https://cb.example.com/",
1211 None,
1212 9999,
1213 );
1214 assert!(!sub.is_valid());
1215 }
1216}