use std::time::Duration;
use solid_pod_rs::notifications::{
discovery_document, as_ns, ChangeNotification, ChannelType, InMemoryNotifications,
Notifications, Subscription, WebSocketChannelManager, WebhookChannelManager,
};
use solid_pod_rs::storage::StorageEvent;
fn sample_notification(object: &str) -> ChangeNotification {
ChangeNotification {
context: as_ns::CONTEXT.to_string(),
id: "urn:uuid:mem-1".into(),
kind: as_ns::UPDATE.to_string(),
object: object.into(),
published: "2025-04-20T12:00:00Z".into(),
}
}
#[tokio::test]
async fn inmemory_notifications_emits_to_subscribers() {
let n = InMemoryNotifications::new();
let sub = Subscription {
id: "sub-a1".into(),
topic: "/a/".into(),
channel_type: ChannelType::WebhookChannel2023,
receive_from: "https://client.example/hook".into(),
};
n.subscribe(sub.clone()).await.expect("subscribe works");
n.publish("/a/", sample_notification("/a/x"))
.await
.expect("publish for known topic is Ok");
}
#[tokio::test]
async fn inmemory_notifications_filters_unrelated_resources() {
let n = InMemoryNotifications::new();
let sub = Subscription {
id: "sub-a2".into(),
topic: "/a/".into(),
channel_type: ChannelType::WebhookChannel2023,
receive_from: "https://client.example/hook".into(),
};
n.subscribe(sub).await.unwrap();
n.publish("/b/", sample_notification("/b/y"))
.await
.expect("publish on unrelated topic returns Ok");
}
#[tokio::test]
async fn inmemory_notifications_drops_subscription_on_close() {
let n = InMemoryNotifications::new();
let sub = Subscription {
id: "sub-drop".into(),
topic: "/drop/".into(),
channel_type: ChannelType::WebSocketChannel2023,
receive_from: "wss://pod.example/subscription/drop".into(),
};
n.subscribe(sub).await.unwrap();
n.unsubscribe("sub-drop").await.unwrap();
n.publish("/drop/", sample_notification("/drop/x"))
.await
.expect("publish after unsubscribe is Ok");
}
#[test]
fn discovery_document_advertises_all_channels() {
let doc = discovery_document("https://pod.example/");
let arr = doc["channelTypes"]
.as_array()
.expect("channelTypes is an array");
assert_eq!(arr.len(), 2, "exactly WebSocketChannel2023 + WebhookChannel2023");
let ids: Vec<&str> = arr
.iter()
.map(|v| v["id"].as_str().expect("channel id"))
.collect();
assert!(ids.contains(&"WebSocketChannel2023"));
assert!(ids.contains(&"WebhookChannel2023"));
}
#[test]
fn discovery_document_serialises_to_jsonld() {
let doc = discovery_document("https://pod.example");
assert!(doc["@context"].is_array(), "@context is an array");
let ctx = doc["@context"]
.as_array()
.unwrap()
.iter()
.filter_map(|v| v.as_str())
.collect::<Vec<_>>();
assert!(
ctx.iter().any(|s| s.contains("notifications-context")),
"@context includes the notifications-context IRI"
);
assert_eq!(
doc["id"].as_str().unwrap(),
"https://pod.example/.notifications"
);
for entry in doc["channelTypes"].as_array().unwrap() {
assert!(entry["id"].is_string());
assert!(entry["endpoint"].is_string());
assert!(entry["features"].is_array());
}
}
#[test]
fn change_notification_serialises_with_activity_streams_envelope() {
let note = ChangeNotification::from_storage_event(
&StorageEvent::Created("/a/x".into()),
"https://pod.example",
);
let wire = serde_json::to_value(¬e).expect("serialise AS2 notification");
assert_eq!(wire["@context"], as_ns::CONTEXT);
assert_eq!(wire["type"], "Create");
assert_eq!(wire["object"], "https://pod.example/a/x");
assert!(wire["id"].as_str().unwrap().starts_with("urn:uuid:"));
assert!(wire["published"].is_string());
let back: ChangeNotification = serde_json::from_value(wire).unwrap();
assert_eq!(back.kind, "Create");
assert_eq!(back.object, "https://pod.example/a/x");
}
#[tokio::test]
async fn subscription_matches_exact_resource_uri() {
let m = WebhookChannelManager::new();
let sub = m.subscribe("/exact/resource", "https://client.example/hook").await;
assert_eq!(sub.topic, "/exact/resource");
assert_eq!(sub.channel_type, ChannelType::WebhookChannel2023);
assert_eq!(sub.receive_from, "https://client.example/hook");
assert_eq!(m.active_subscriptions().await, 1);
m.unsubscribe(&sub.id).await;
assert_eq!(m.active_subscriptions().await, 0);
}
#[tokio::test]
async fn subscription_prefix_matches_container_subtree() {
let m = WebhookChannelManager::new();
let sub = m
.subscribe("/public/", "https://client.example/hook")
.await;
let nested = "/public/foo.ttl";
assert!(nested.starts_with(&sub.topic));
assert_eq!(m.active_subscriptions().await, 1);
}
#[test]
fn webhook_channel_manager_constructs_with_default_config() {
let m = WebhookChannelManager::new();
assert_eq!(m.max_retries, 3, "default max_retries matches ADR default");
assert_eq!(m.retry_base, Duration::from_millis(500));
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(1))
.build()
.expect("reqwest client builds");
let _m2 = WebhookChannelManager::with_client(client);
let _m3 = WebhookChannelManager::default();
}
#[test]
fn websocket_channel_manager_smoke() {
let m = WebSocketChannelManager::new();
assert_eq!(m.heartbeat_interval(), Duration::from_secs(30));
let m2 = WebSocketChannelManager::new().with_heartbeat(Duration::from_secs(7));
assert_eq!(m2.heartbeat_interval(), Duration::from_secs(7));
let _default = WebSocketChannelManager::default();
}
#[tokio::test]
async fn websocket_channel_manager_broadcasts_via_trait() {
let m = WebSocketChannelManager::new();
let mut rx = m.stream();
let note = ChangeNotification::from_storage_event(
&StorageEvent::Updated("/w/x".into()),
"https://pod.example",
);
<WebSocketChannelManager as Notifications>::publish(&m, "/w/", note.clone())
.await
.unwrap();
let got = tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timely delivery")
.expect("broadcast succeeds");
assert_eq!(got.kind, "Update");
assert_eq!(got.object, "https://pod.example/w/x");
}