use std::collections::HashMap;
use async_trait::async_trait;
use time::OffsetDateTime;
use url::Url;
use uuid::Uuid;
pub mod discovery;
pub mod distribute;
pub mod publish;
pub mod signature;
pub mod subscribe;
#[cfg(test)]
mod test_utils {
use super::*;
use uuid::Uuid;
use url::Url;
use time::OffsetDateTime;
pub fn create_test_subscription() -> Subscription<Uuid> {
Subscription {
id: Uuid::nil(), topic: Url::parse("https://example.com/feed").unwrap(),
callback: Url::parse("https://subscriber.example.com/callback").unwrap(),
secret: Some("test_secret".to_string()),
lease_seconds: Some(3600),
expires_at: Some(OffsetDateTime::now_utc() + time::Duration::hours(1)),
status: SubscriptionStatus::Pending,
created_at: OffsetDateTime::now_utc(),
}
}
pub fn create_active_subscription() -> Subscription<Uuid> {
let mut sub = create_test_subscription();
sub.status = SubscriptionStatus::Active;
sub
}
pub fn create_expired_subscription() -> Subscription<Uuid> {
let mut sub = create_test_subscription();
sub.expires_at = Some(OffsetDateTime::now_utc() - time::Duration::hours(1));
sub.status = SubscriptionStatus::Expired;
sub
}
pub fn create_test_content_distribution() -> ContentDistribution {
ContentDistribution {
content_type: "application/atom+xml".to_string(),
body: crate::http::Body::Bytes(
r#"<?xml version="1.0"?>
<feed xmlns="http://www.w3.org/2005/Atom">
<title>Test Feed</title>
<entry>
<title>Test Post</title>
<content type="html"><p>Test content</p></content>
</entry>
</feed>"#.as_bytes().to_vec()
),
signature: None,
}
}
pub fn generate_hmac_signature(secret: &str, body: &[u8]) -> String {
use hmac::{Hmac, Mac};
use sha2::Sha256;
let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
mac.update(body);
let result = mac.finalize();
format!("sha256={}", hex::encode(result.into_bytes()))
}
}
#[derive(Debug, thiserror::Error, miette::Diagnostic)]
pub enum WebSubError {
#[error("HTTP error: {0}")]
#[diagnostic(code(websub::http_error))]
Http(String),
#[error("Network error: {0}")]
#[diagnostic(code(websub::network_error))]
Network(#[from] Box<dyn std::error::Error + Send + Sync>),
#[error("No hub found for topic")]
#[diagnostic(code(websub::no_hub))]
NoHubFound,
#[error("Subscription verification failed")]
#[diagnostic(code(websub::verification_failed))]
VerificationFailed,
#[error("Invalid topic URL")]
#[diagnostic(code(websub::invalid_topic))]
InvalidTopic,
#[error("Content distribution failed")]
#[diagnostic(code(websub::distribution_failed))]
DistributionFailed,
#[error("Subscription not found")]
#[diagnostic(code(websub::subscription_not_found))]
SubscriptionNotFound,
}
pub type Result<T> = std::result::Result<T, WebSubError>;
#[derive(Debug, Clone)]
pub struct Subscription<Id> {
pub id: Id,
pub topic: Url,
pub callback: Url,
pub secret: Option<String>,
pub lease_seconds: Option<u32>,
pub expires_at: Option<OffsetDateTime>,
pub status: SubscriptionStatus,
pub created_at: OffsetDateTime,
}
#[derive(Debug, Clone, PartialEq)]
pub enum SubscriptionStatus {
Pending,
Active,
Expired,
Rejected,
}
#[derive(Debug, Clone)]
pub struct CachedContent<Id> {
pub id: Id,
pub content: Vec<u8>,
pub content_type: String,
pub last_updated: OffsetDateTime,
pub signature: Option<String>,
}
#[derive(Debug, Clone)]
pub struct ContentDistribution {
pub content_type: String,
pub body: crate::http::Body,
pub signature: Option<String>,
}
#[derive(Debug, Clone)]
pub struct DeliveryResult<Id> {
pub subscriber_id: Id,
pub subscriber_callback: Url,
pub status: DeliveryStatus,
pub response_code: Option<u16>,
pub error_message: Option<String>,
pub retry_count: u32,
pub delivered_at: Option<OffsetDateTime>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum DeliveryStatus {
Success,
TemporaryFailure,
PermanentFailure,
SubscriptionDeleted,
}
#[derive(Debug, Clone)]
pub struct VerifyIntentRequest {
pub mode: SubscriptionMode,
pub topic: Url,
pub callback: Url,
pub challenge: String,
pub lease_seconds: Option<u32>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum SubscriptionMode {
Subscribe,
Unsubscribe,
}
#[derive(Debug, Clone)]
pub enum VerificationResponse {
Verified,
Failed,
Denied { reason: Option<String> },
}
#[async_trait]
pub trait Hub {
type SubscriptionId: Clone + Send + Sync;
type ContentId: Clone + Send + Sync;
async fn subscriptions_for_topic(&self, topic: &Url) -> Result<Vec<Subscription<Self::SubscriptionId>>>;
async fn cached_content(&self, topic: &Url) -> Result<Option<CachedContent<Self::ContentId>>>;
async fn store_subscription(&mut self, subscription: Subscription<Self::SubscriptionId>) -> Result<Self::SubscriptionId>;
async fn remove_subscription(&mut self, id: &Self::SubscriptionId) -> Result<()>;
async fn delete_subscription(&mut self, id: &Self::SubscriptionId) -> Result<()>;
async fn cache_content(&mut self, topic: Url, content: CachedContent<Self::ContentId>) -> Result<Self::ContentId>;
async fn verify_subscription_intent(&self, request: VerifyIntentRequest) -> Result<VerificationResponse>;
async fn distribute_content(&self, topic: &Url, content: ContentDistribution) -> Result<Vec<DeliveryResult<Self::SubscriptionId>>>;
}
#[async_trait]
pub trait BatchedHub: Hub {
async fn batch_subscribe(&mut self, subscriptions: Vec<Subscription<Self::SubscriptionId>>) -> Result<Vec<Result<Self::SubscriptionId>>> {
let mut results = Vec::new();
for subscription in subscriptions {
let result = self.store_subscription(subscription).await;
results.push(result);
}
Ok(results)
}
async fn batch_unsubscribe(&mut self, subscription_ids: Vec<Self::SubscriptionId>) -> Result<Vec<Result<()>>> {
let mut results = Vec::new();
for id in subscription_ids {
let result = self.remove_subscription(&id).await;
results.push(result);
}
Ok(results)
}
async fn subscriptions_for_topics(&self, topics: &[Url]) -> Result<HashMap<Url, Vec<Subscription<Self::SubscriptionId>>>> {
let mut results = HashMap::new();
for topic in topics {
let subscriptions = self.subscriptions_for_topic(topic).await?;
results.insert(topic.clone(), subscriptions);
}
Ok(results)
}
async fn batch_distribute(&mut self, updates: Vec<(Url, ContentDistribution)>) -> Result<Vec<Result<Vec<DeliveryResult<Self::SubscriptionId>>>>> {
let mut results = Vec::new();
for (topic, content) in updates {
let result = self.distribute_content(&topic, content).await;
results.push(result);
}
Ok(results)
}
}
#[derive(Debug)]
pub struct InMemoryHub {
subscriptions: HashMap<Url, Vec<Subscription<Uuid>>>,
content_cache: HashMap<Url, CachedContent<Uuid>>,
}
impl InMemoryHub {
pub fn new() -> Self {
Self {
subscriptions: HashMap::new(),
content_cache: HashMap::new(),
}
}
}
impl Default for InMemoryHub {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Hub for InMemoryHub {
type SubscriptionId = Uuid;
type ContentId = Uuid;
async fn subscriptions_for_topic(&self, topic: &Url) -> Result<Vec<Subscription<Uuid>>> {
Ok(self.subscriptions
.get(topic)
.cloned()
.unwrap_or_default()
.into_iter()
.filter(|sub| matches!(sub.status, SubscriptionStatus::Active))
.collect())
}
async fn cached_content(&self, topic: &Url) -> Result<Option<CachedContent<Uuid>>> {
Ok(self.content_cache.get(topic).cloned())
}
async fn store_subscription(&mut self, mut subscription: Subscription<Uuid>) -> Result<Uuid> {
let id = Uuid::new_v4();
subscription.id = id;
let topic_subs = self.subscriptions.entry(subscription.topic.clone()).or_default();
topic_subs.push(subscription);
Ok(id)
}
async fn remove_subscription(&mut self, id: &Uuid) -> Result<()> {
for subs in self.subscriptions.values_mut() {
subs.retain(|s| &s.id != id);
}
Ok(())
}
async fn delete_subscription(&mut self, id: &Uuid) -> Result<()> {
self.remove_subscription(id).await
}
async fn cache_content(&mut self, topic: Url, mut content: CachedContent<Uuid>) -> Result<Uuid> {
let id = Uuid::new_v4();
content.id = id;
self.content_cache.insert(topic, content);
Ok(id)
}
async fn verify_subscription_intent(&self, _request: VerifyIntentRequest) -> Result<VerificationResponse> {
Ok(VerificationResponse::Verified)
}
async fn distribute_content(&self, topic: &Url, _content: ContentDistribution) -> Result<Vec<DeliveryResult<Uuid>>> {
let subscribers = self.subscriptions_for_topic(topic).await?;
let mut results = Vec::new();
for subscriber in subscribers {
let result = DeliveryResult {
subscriber_id: subscriber.id,
subscriber_callback: subscriber.callback,
status: DeliveryStatus::Success,
response_code: Some(200),
error_message: None,
retry_count: 0,
delivered_at: Some(time::OffsetDateTime::now_utc()),
};
results.push(result);
}
Ok(results)
}
}
#[async_trait]
impl BatchedHub for InMemoryHub {
}
#[cfg(test)]
mod trait_tests {
use super::*;
#[tokio::test]
async fn subscription_creation_assigns_unique_id() {
let mut hub = InMemoryHub::new();
let subscription = test_utils::create_test_subscription();
let id = hub.store_subscription(subscription).await.unwrap();
assert_ne!(id, Uuid::nil());
}
#[tokio::test]
async fn subscriptions_for_topic_returns_only_active() {
let mut hub = InMemoryHub::new();
let active_sub = test_utils::create_active_subscription();
let pending_sub = test_utils::create_test_subscription();
hub.store_subscription(active_sub.clone()).await.unwrap();
hub.store_subscription(pending_sub).await.unwrap();
let active_only = hub.subscriptions_for_topic(&active_sub.topic).await.unwrap();
assert_eq!(active_only.len(), 1); assert_eq!(active_only[0].status, SubscriptionStatus::Active);
}
#[tokio::test]
async fn subscriptions_for_topic_filters_expired() {
let mut hub = InMemoryHub::new();
let active_sub = test_utils::create_active_subscription();
let expired_sub = test_utils::create_expired_subscription();
hub.store_subscription(active_sub.clone()).await.unwrap();
hub.store_subscription(expired_sub).await.unwrap();
let active_only = hub.subscriptions_for_topic(&active_sub.topic).await.unwrap();
assert_eq!(active_only.len(), 1); assert_eq!(active_only[0].status, SubscriptionStatus::Active);
}
#[tokio::test]
async fn content_caching_stores_and_retrieves() {
let mut hub = InMemoryHub::new();
let topic_url = Url::parse("https://example.com/feed").unwrap();
let content = test_utils::create_test_content_distribution();
let cached_content = CachedContent {
id: Uuid::nil(), content: vec![1, 2, 3, 4, 5], content_type: content.content_type.clone(),
last_updated: time::OffsetDateTime::now_utc(),
signature: content.signature.clone(),
};
let content_id = hub.cache_content(topic_url.clone(), cached_content.clone()).await.unwrap();
let retrieved = hub.cached_content(&topic_url).await.unwrap().unwrap();
assert_eq!(retrieved.id, content_id);
assert_eq!(retrieved.content, cached_content.content);
assert_eq!(retrieved.content_type, cached_content.content_type);
}
#[tokio::test]
async fn content_distribution_returns_success_for_in_memory_hub() {
let hub = InMemoryHub::new();
let topic_url = Url::parse("https://example.com/feed").unwrap();
let content = test_utils::create_test_content_distribution();
let results = hub.distribute_content(&topic_url, content).await.unwrap();
assert_eq!(results.len(), 0);
}
#[tokio::test]
async fn content_distribution_with_subscriber_returns_success() {
let mut hub = InMemoryHub::new();
let subscription = test_utils::create_active_subscription();
let topic_url = subscription.topic.clone();
hub.store_subscription(subscription).await.unwrap();
let content = test_utils::create_test_content_distribution();
let results = hub.distribute_content(&topic_url, content).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].status, DeliveryStatus::Success);
assert_eq!(results[0].response_code, Some(200));
assert!(results[0].delivered_at.is_some());
}
#[tokio::test]
async fn subscription_deletion_removes_from_storage() {
let mut hub = InMemoryHub::new();
let subscription = test_utils::create_active_subscription();
let id = hub.store_subscription(subscription.clone()).await.unwrap();
let subscriptions = hub.subscriptions_for_topic(&subscription.topic).await.unwrap();
assert_eq!(subscriptions.len(), 1);
hub.delete_subscription(&id).await.unwrap();
let subscriptions = hub.subscriptions_for_topic(&subscription.topic).await.unwrap();
assert_eq!(subscriptions.len(), 0);
}
#[tokio::test]
async fn batched_subscribe_processes_multiple_subscriptions() {
let mut hub = InMemoryHub::new();
let subscriptions = vec![
test_utils::create_test_subscription(),
test_utils::create_active_subscription(),
];
let results = hub.batch_subscribe(subscriptions).await.unwrap();
assert_eq!(results.len(), 2);
assert!(results[0].is_ok());
assert!(results[1].is_ok());
}
#[tokio::test]
async fn batched_unsubscribe_processes_multiple_ids() {
let mut hub = InMemoryHub::new();
let sub1 = test_utils::create_active_subscription();
let sub2 = test_utils::create_active_subscription();
let id1 = hub.store_subscription(sub1).await.unwrap();
let id2 = hub.store_subscription(sub2).await.unwrap();
let results = hub.batch_unsubscribe(vec![id1, id2]).await.unwrap();
assert_eq!(results.len(), 2);
assert!(results[0].is_ok());
assert!(results[1].is_ok());
}
#[tokio::test]
async fn subscriptions_for_topics_returns_multiple_topics() {
let mut hub = InMemoryHub::new();
let topic1 = Url::parse("https://example.com/feed1").unwrap();
let topic2 = Url::parse("https://example.com/feed2").unwrap();
let mut sub1 = test_utils::create_active_subscription();
sub1.topic = topic1.clone();
let mut sub2 = test_utils::create_active_subscription();
sub2.topic = topic2.clone();
hub.store_subscription(sub1).await.unwrap();
hub.store_subscription(sub2).await.unwrap();
let results = hub.subscriptions_for_topics(&[topic1.clone(), topic2.clone()]).await.unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[&topic1].len(), 1);
assert_eq!(results[&topic2].len(), 1);
}
#[tokio::test]
async fn batched_distribute_processes_multiple_updates() {
let mut hub = InMemoryHub::new();
let topic1 = Url::parse("https://example.com/feed1").unwrap();
let topic2 = Url::parse("https://example.com/feed2").unwrap();
let content1 = test_utils::create_test_content_distribution();
let content2 = test_utils::create_test_content_distribution();
let updates = vec![
(topic1, content1),
(topic2, content2),
];
let results = hub.batch_distribute(updates).await.unwrap();
assert_eq!(results.len(), 2);
assert!(results[0].is_ok());
assert!(results[1].is_ok());
}
#[tokio::test]
async fn verification_response_defaults_to_verified() {
let hub = InMemoryHub::new();
let request = VerifyIntentRequest {
mode: SubscriptionMode::Subscribe,
topic: Url::parse("https://example.com/feed").unwrap(),
callback: Url::parse("https://subscriber.com/callback").unwrap(),
challenge: "test_challenge_123".to_string(),
lease_seconds: Some(3600),
};
let response = hub.verify_subscription_intent(request).await.unwrap();
assert!(matches!(response, VerificationResponse::Verified));
}
}
#[cfg(test)]
mod http_integration_tests {
use super::*;
use mockito::Mock;
use crate::standards::websub::{discovery, distribute, subscribe};
struct TestServer {
server: mockito::ServerGuard,
}
impl TestServer {
async fn new() -> Self {
Self {
server: mockito::Server::new_async().await,
}
}
fn url(&self) -> String {
self.server.url()
}
fn mock_topic_with_hub(&mut self) -> Mock {
let hub_link = format!("<{}/hub>; rel=\"hub\"", self.url());
let self_link = format!("<{}/feed>; rel=\"self\"", self.url());
self.server.mock("GET", "/feed")
.with_header("Link", &hub_link)
.with_header("Link", &self_link)
.with_status(200)
.create()
}
fn mock_hub_subscription_accept(&mut self) -> Mock {
self.server.mock("POST", "/hub")
.match_body(mockito::Matcher::Regex("hub\\.mode=subscribe".into()))
.with_status(202)
.create()
}
fn mock_hub_publish_accept(&mut self) -> Mock {
self.server.mock("POST", "/hub")
.match_body(mockito::Matcher::Regex("hub\\.mode=publish".into()))
.match_body(mockito::Matcher::Regex("hub\\.url=".into()))
.with_status(202)
.create()
}
fn mock_subscriber_callback(&mut self, expected_status: u16) -> Mock {
self.server.mock("POST", "/callback")
.match_header("Content-Type", mockito::Matcher::Any)
.match_header("X-Hub-Signature", mockito::Matcher::Any)
.match_header(
"Link",
mockito::Matcher::Regex(
"rel=\\\"hub\\\".*rel=\\\"self\\\"|rel=\\\"self\\\".*rel=\\\"hub\\\""
.into(),
),
)
.with_status(expected_status as usize)
.create()
}
fn mock_subscriber_gone(&mut self) -> Mock {
self.mock_subscriber_callback(410)
}
fn mock_network_failure(&mut self) -> String {
format!("{}/nonexistent", self.url())
}
}
#[tokio::test]
async fn hub_discovery_extracts_correct_hub_urls() {
let mut server = TestServer::new().await;
let topic_mock = server.mock_topic_with_hub();
let client = crate::http::reqwest::Client::default();
let topic_url = Url::parse(&format!("{}/feed", server.url())).unwrap();
let expected_hub_url = Url::parse(&format!("{}/hub", server.url())).unwrap();
let hubs = discovery::discover_hubs(&client, &topic_url).await.unwrap();
topic_mock.assert();
assert_eq!(hubs, vec![expected_hub_url]);
}
#[tokio::test]
async fn content_distribution_sends_correct_websub_headers() {
let mut server = TestServer::new().await;
let callback_mock = server.mock_subscriber_callback(200);
let client = crate::http::reqwest::Client::default();
let callback_url = Url::parse(&format!("{}/callback", server.url())).unwrap();
let hub_url = Url::parse(&format!("{}/hub", server.url())).unwrap();
let topic_url = Url::parse(&format!("{}/feed", server.url())).unwrap();
let mut content = test_utils::create_test_content_distribution();
content.signature = Some("sha256=test_signature".to_string());
let result = distribute::distribute_to_callback(
&client,
&hub_url,
&topic_url,
&callback_url,
&content,
)
.await
.unwrap();
callback_mock.assert();
assert_eq!(result.status, DeliveryStatus::Success);
}
#[tokio::test]
async fn subscription_automatic_deletion_on_http_410() {
let mut server = TestServer::new().await;
let gone_mock = server.mock_subscriber_gone();
let client = crate::http::reqwest::Client::default();
let callback_url = Url::parse(&format!("{}/callback", server.url())).unwrap();
let hub_url = Url::parse(&format!("{}/hub", server.url())).unwrap();
let topic_url = Url::parse(&format!("{}/feed", server.url())).unwrap();
let mut content = test_utils::create_test_content_distribution();
content.signature = Some("sha256=test_signature".to_string());
let result = distribute::distribute_to_callback(
&client,
&hub_url,
&topic_url,
&callback_url,
&content,
)
.await
.unwrap();
gone_mock.assert();
assert_eq!(result.status, DeliveryStatus::SubscriptionDeleted);
}
#[tokio::test]
async fn network_failures_are_handled_gracefully() {
let mut server = TestServer::new().await;
let invalid_callback_url = server.mock_network_failure();
let client = crate::http::reqwest::Client::default();
let callback_url = Url::parse(&invalid_callback_url).unwrap();
let hub_url = Url::parse(&format!("{}/hub", server.url())).unwrap();
let topic_url = Url::parse(&format!("{}/feed", server.url())).unwrap();
let content = test_utils::create_test_content_distribution();
let result = distribute::distribute_to_callback(
&client,
&hub_url,
&topic_url,
&callback_url,
&content,
)
.await;
assert!(result.is_ok());
let result = result.unwrap();
assert_eq!(result.status, DeliveryStatus::TemporaryFailure);
}
#[tokio::test]
async fn hub_subscription_endpoint_accepts_valid_requests() {
let mut server = TestServer::new().await;
let subscription_mock = server.mock_hub_subscription_accept();
let client = crate::http::reqwest::Client::default();
let hub_url = Url::parse(&format!("{}/hub", server.url())).unwrap();
let request = SubscribeRequest {
topic: Url::parse(&format!("{}/feed", server.url())).unwrap(),
callback: Url::parse(&format!("{}/callback", server.url())).unwrap(),
secret: Some("secret".to_string()),
lease_seconds: Some(3600),
};
let result = subscribe::send_subscription_request(
&client,
&hub_url,
&request,
SubscriptionMode::Subscribe,
)
.await
.unwrap();
subscription_mock.assert();
assert_eq!(result.status, http::StatusCode::ACCEPTED);
}
#[tokio::test]
async fn hub_publish_endpoint_accepts_valid_requests() {
let mut server = TestServer::new().await;
let publish_mock = server.mock_hub_publish_accept();
let client = crate::http::reqwest::Client::default();
let hub_url = Url::parse(&format!("{}/hub", server.url())).unwrap();
let topic_url = Url::parse(&format!("{}/feed", server.url())).unwrap();
let result = crate::standards::websub::publish::publish(&client, &hub_url, &topic_url)
.await
.unwrap();
publish_mock.assert();
assert_eq!(result.status, http::StatusCode::ACCEPTED);
}
#[tokio::test]
async fn content_distribution_preserves_content_type() {
let mut server = TestServer::new().await;
let callback_mock = server.server.mock("POST", "/callback")
.match_header("Content-Type", "application/atom+xml")
.with_status(200)
.create();
let client = crate::http::reqwest::Client::default();
let callback_url = Url::parse(&format!("{}/callback", server.url())).unwrap();
let hub_url = Url::parse(&format!("{}/hub", server.url())).unwrap();
let topic_url = Url::parse(&format!("{}/feed", server.url())).unwrap();
let mut content = test_utils::create_test_content_distribution();
content.signature = Some("sha256=test_signature".to_string());
assert_eq!(content.content_type, "application/atom+xml");
let result = distribute::distribute_to_callback(
&client,
&hub_url,
&topic_url,
&callback_url,
&content,
)
.await
.unwrap();
callback_mock.assert();
assert_eq!(result.status, DeliveryStatus::Success);
}
#[tokio::test]
async fn content_distribution_includes_required_link_headers() {
let mut server = TestServer::new().await;
let callback_mock = server.mock_subscriber_callback(200);
let client = crate::http::reqwest::Client::default();
let callback_url = Url::parse(&format!("{}/callback", server.url())).unwrap();
let hub_url = Url::parse(&format!("{}/hub", server.url())).unwrap();
let topic_url = Url::parse(&format!("{}/feed", server.url())).unwrap();
let mut content = test_utils::create_test_content_distribution();
content.signature = Some("sha256=test_signature".to_string());
let result = distribute::distribute_to_callback(
&client,
&hub_url,
&topic_url,
&callback_url,
&content,
)
.await
.unwrap();
callback_mock.assert();
assert_eq!(result.status, DeliveryStatus::Success);
}
}
#[cfg(test)]
mod compliance_tests {
use super::*;
#[tokio::test]
async fn websub_specification_link_header_format() {
let link_header = "<https://example.com/hub>; rel=\"hub\", <https://example.com/feed>; rel=\"self\"";
let header_value = http::HeaderValue::from_str(link_header).unwrap();
let base_url = Url::parse("https://example.com/feed").unwrap();
let hubs = crate::algorithms::link_rel::get_rels_for_header(&header_value, "hub", &base_url);
let selfs = crate::algorithms::link_rel::get_rels_for_header(&header_value, "self", &base_url);
assert_eq!(hubs, vec![Url::parse("https://example.com/hub").unwrap()]);
assert_eq!(selfs, vec![Url::parse("https://example.com/feed").unwrap()]);
}
#[tokio::test]
async fn websub_specification_content_distribution_headers() {
let body = b"test content";
let signature = crate::standards::websub::signature::sign("test_secret", body).unwrap();
let content = ContentDistribution {
content_type: "application/atom+xml".to_string(),
body: crate::http::Body::Bytes(body.to_vec()),
signature: Some(signature),
};
assert_eq!(content.content_type, "application/atom+xml");
assert!(content.signature.is_some());
assert!(content.signature.as_ref().unwrap().starts_with("sha256="));
}
#[tokio::test]
async fn websub_specification_subscription_verification_flow() {
let hub = InMemoryHub::new();
let request = VerifyIntentRequest {
mode: SubscriptionMode::Subscribe,
topic: Url::parse("https://example.com/feed").unwrap(),
callback: Url::parse("https://subscriber.com/callback").unwrap(),
challenge: "test_challenge_123".to_string(),
lease_seconds: Some(3600),
};
let response = hub.verify_subscription_intent(request).await.unwrap();
assert!(matches!(response, VerificationResponse::Verified));
}
#[tokio::test]
async fn websub_specification_subscription_lifecycle() {
let mut hub = InMemoryHub::new();
let subscription = test_utils::create_test_subscription();
assert_eq!(subscription.status, SubscriptionStatus::Pending);
let _id = hub.store_subscription(subscription.clone()).await.unwrap();
let subscriptions = hub.subscriptions_for_topic(&subscription.topic).await.unwrap();
assert_eq!(subscriptions.len(), 0);
let content = test_utils::create_test_content_distribution();
let results = hub.distribute_content(&subscription.topic, content).await.unwrap();
assert_eq!(results.len(), 0);
}
#[tokio::test]
async fn websub_specification_error_response_codes() {
let response_mapping = vec![
(http::StatusCode::OK, DeliveryStatus::Success),
(http::StatusCode::GONE, DeliveryStatus::SubscriptionDeleted),
(http::StatusCode::NOT_FOUND, DeliveryStatus::PermanentFailure),
(http::StatusCode::INTERNAL_SERVER_ERROR, DeliveryStatus::TemporaryFailure),
];
for (status, expected) in response_mapping {
let (got, _message) = crate::standards::websub::distribute::map_status(status);
assert_eq!(got, expected);
}
}
#[tokio::test]
async fn websub_specification_content_type_preservation() {
let content_types = vec![
"application/atom+xml",
"application/rss+xml",
"application/json",
"text/plain",
];
for content_type in content_types {
let mut content = test_utils::create_test_content_distribution();
content.content_type = content_type.to_string();
assert_eq!(content.content_type, content_type);
}
}
#[tokio::test]
async fn websub_specification_signature_format() {
let secret = "test_secret";
let body = b"test content";
let signature = test_utils::generate_hmac_signature(secret, body);
assert!(signature.starts_with("sha256="));
assert!(signature.len() > 7);
let hex_part = &signature[7..];
assert!(hex::decode(hex_part).is_ok());
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::standards::websub::InMemoryHub;
use url::Url;
use uuid::Uuid;
#[tokio::test]
async fn test_in_memory_hub_basic_operations() {
let mut hub = InMemoryHub::new();
let subscription = Subscription {
id: Uuid::new_v4(),
topic: Url::parse("https://example.com/feed").unwrap(),
callback: Url::parse("https://subscriber.com/callback").unwrap(),
secret: Some("secret".to_string()),
lease_seconds: Some(3600),
expires_at: Some(time::OffsetDateTime::now_utc() + time::Duration::hours(1)),
status: SubscriptionStatus::Active,
created_at: time::OffsetDateTime::now_utc(),
};
let id = hub.store_subscription(subscription.clone()).await.unwrap();
assert_ne!(id, Uuid::default());
let subscriptions = hub.subscriptions_for_topic(&subscription.topic).await.unwrap();
assert_eq!(subscriptions.len(), 1);
assert_eq!(subscriptions[0].id, id);
let content = CachedContent {
id: Uuid::new_v4(),
content: b"test content".to_vec(),
content_type: "text/plain".to_string(),
last_updated: time::OffsetDateTime::now_utc(),
signature: None,
};
let content_id = hub.cache_content(subscription.topic.clone(), content.clone()).await.unwrap();
let cached = hub.cached_content(&subscription.topic).await.unwrap();
assert_eq!(cached.as_ref().unwrap().id, content_id);
let distribution_content = ContentDistribution {
content_type: "text/plain".to_string(),
body: crate::http::Body::Bytes(b"test content".to_vec()),
signature: None,
};
let results = hub.distribute_content(&subscription.topic, distribution_content).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].status, DeliveryStatus::Success);
assert_eq!(results[0].response_code, Some(200));
}
#[tokio::test]
async fn test_subscription_deletion() {
let mut hub = InMemoryHub::new();
let subscription = Subscription {
id: Uuid::new_v4(),
topic: Url::parse("https://example.com/feed").unwrap(),
callback: Url::parse("https://subscriber.com/callback").unwrap(),
secret: None,
lease_seconds: None,
expires_at: None,
status: SubscriptionStatus::Active,
created_at: time::OffsetDateTime::now_utc(),
};
let stored_id = hub.store_subscription(subscription.clone()).await.unwrap();
let subscriptions = hub.subscriptions_for_topic(&subscription.topic).await.unwrap();
assert_eq!(subscriptions.len(), 1);
hub.delete_subscription(&stored_id).await.unwrap();
let subscriptions = hub.subscriptions_for_topic(&subscription.topic).await.unwrap();
assert_eq!(subscriptions.len(), 0);
}
}
#[derive(Debug, Clone)]
pub struct SubscribeRequest {
pub topic: Url,
pub callback: Url,
pub secret: Option<String>,
pub lease_seconds: Option<u32>,
}