#![cfg(feature = "legacy-notifications")]
use std::sync::Arc;
use std::time::Duration;
use solid_pod_rs::handlers::legacy_notifications::{LegacyWsDriver, OutboundFrame};
use solid_pod_rs::notifications::legacy::{
AllowAllAuthorizer, LegacyNotificationChannel, PROTOCOL_GREETING,
};
use solid_pod_rs::storage::StorageEvent;
use tokio::sync::broadcast;
use tokio::time::timeout;
#[test]
fn f3a_to_legacy_line_created_is_pub_uri() {
let ev = StorageEvent::Created("https://pod.example.com/foo.ttl".into());
let line = LegacyNotificationChannel::to_legacy_line(&ev);
assert_eq!(line, Some("pub https://pod.example.com/foo.ttl".to_string()));
}
#[test]
fn f3b_parse_subscribe_valid_line() {
let got = LegacyNotificationChannel::parse_subscribe("sub https://pod.example.com/bar/");
assert_eq!(got, Some("https://pod.example.com/bar/".to_string()));
}
#[test]
fn f3c_parse_subscribe_rejects_malformed() {
assert!(LegacyNotificationChannel::parse_subscribe("").is_none());
assert!(LegacyNotificationChannel::parse_subscribe("sub").is_none());
assert!(LegacyNotificationChannel::parse_subscribe("sub ").is_none());
assert!(LegacyNotificationChannel::parse_subscribe("subscribe foo").is_none());
assert!(LegacyNotificationChannel::parse_subscribe("pub https://p/x").is_none());
assert!(LegacyNotificationChannel::parse_subscribe("unsub https://p/x").is_none());
}
#[test]
fn f3d_subscription_prefix_match() {
let (_tx, rx) = broadcast::channel::<StorageEvent>(16);
let mut chan =
LegacyNotificationChannel::new(rx).with_authorizer(Arc::new(AllowAllAuthorizer));
chan.subscribe("https://example.org/foo/".into()).unwrap();
assert!(chan.matches_subscription("https://example.org/foo/"));
assert!(chan.matches_subscription("https://example.org/foo/bar"));
assert!(chan.matches_subscription("https://example.org/foo/bar/baz.ttl"));
assert!(!chan.matches_subscription("https://example.org/foobar"));
assert!(!chan.matches_subscription("https://other.example/foo/"));
}
#[tokio::test]
async fn f3d_end_to_end_pub_fanout_to_container_subscriber() {
let (tx, rx) = broadcast::channel::<StorageEvent>(16);
let driver = LegacyWsDriver::new(rx).with_authorizer(Arc::new(AllowAllAuthorizer));
let (in_tx, mut out_rx, task) = driver.split();
let handle = tokio::spawn(task);
let first = timeout(Duration::from_secs(1), out_rx.recv())
.await
.expect("greeting within 1s")
.expect("some greeting frame");
assert_eq!(first, OutboundFrame::Text(PROTOCOL_GREETING.to_string()));
in_tx
.send("sub https://example.org/foo/".to_string())
.await
.unwrap();
let ack = timeout(Duration::from_secs(1), out_rx.recv())
.await
.expect("ack within 1s")
.expect("ack frame");
assert_eq!(
ack,
OutboundFrame::Text("ack https://example.org/foo/".into())
);
tx.send(StorageEvent::Updated(
"https://example.org/foo/bar.ttl".into(),
))
.expect("broadcast send");
let pub_frame = timeout(Duration::from_secs(1), out_rx.recv())
.await
.expect("pub within 1s")
.expect("pub frame");
assert_eq!(
pub_frame,
OutboundFrame::Text("pub https://example.org/foo/bar.ttl".into())
);
drop(in_tx);
let _ = timeout(Duration::from_secs(2), handle).await;
}
#[tokio::test]
async fn f3e_heartbeat_emits_blank_line_on_interval() {
let (_tx, rx) = broadcast::channel::<StorageEvent>(16);
let driver = LegacyWsDriver::new(rx).with_heartbeat(Duration::from_millis(50));
let (_in_tx, mut out_rx, task) = driver.split();
let handle = tokio::spawn(task);
let first = timeout(Duration::from_secs(1), out_rx.recv())
.await
.expect("greeting within 1s")
.expect("greeting");
assert_eq!(first, OutboundFrame::Text(PROTOCOL_GREETING.to_string()));
let hb = timeout(Duration::from_secs(1), out_rx.recv())
.await
.expect("heartbeat within 1s")
.expect("heartbeat frame");
assert_eq!(hb, OutboundFrame::Text(String::new()));
let hb2 = timeout(Duration::from_secs(1), out_rx.recv())
.await
.expect("second heartbeat within 1s")
.expect("second heartbeat frame");
assert_eq!(hb2, OutboundFrame::Text(String::new()));
handle.abort();
}
#[tokio::test]
async fn unknown_opcode_closes_connection() {
let (_tx, rx) = broadcast::channel::<StorageEvent>(16);
let driver = LegacyWsDriver::new(rx);
let (in_tx, mut out_rx, task) = driver.split();
let handle = tokio::spawn(task);
let _ = timeout(Duration::from_secs(1), out_rx.recv())
.await
.unwrap();
in_tx.send("gibberish foo".to_string()).await.unwrap();
let frame = timeout(Duration::from_secs(1), out_rx.recv())
.await
.expect("close frame within 1s")
.expect("close frame");
assert!(matches!(frame, OutboundFrame::Close(_)));
handle.abort();
}