use callback_server::{CallbackServer, NotificationPayload};
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::timeout;
#[tokio::test]
async fn test_callback_server_end_to_end() {
let (tx, mut rx) = mpsc::unbounded_channel::<NotificationPayload>();
let server = CallbackServer::new((50000, 50100), tx)
.await
.expect("Failed to create callback server");
let base_url = server.base_url().to_string();
println!("Server started at: {base_url}");
let subscription_id = "test-subscription-123".to_string();
let full_subscription_id = format!("uuid:{subscription_id}");
server.router().register(full_subscription_id.clone()).await;
let client = reqwest::Client::new();
let event_xml = r#"<?xml version="1.0"?>
<e:propertyset xmlns:e="urn:schemas-upnp-org:event-1-0">
<e:property>
<TransportState>PLAYING</TransportState>
</e:property>
<e:property>
<CurrentTrackURI>x-sonos-spotify:spotify%3atrack%3a1234567890</CurrentTrackURI>
</e:property>
</e:propertyset>"#;
let notify_url = format!("{base_url}/notify/{subscription_id}");
let response = client
.request(reqwest::Method::from_bytes(b"NOTIFY").unwrap(), ¬ify_url)
.header("SID", format!("uuid:{subscription_id}"))
.header("NT", "upnp:event")
.header("NTS", "upnp:propchange")
.header("Content-Type", "text/xml")
.body(event_xml.to_string())
.send()
.await
.expect("Failed to send HTTP request");
assert_eq!(response.status(), 200);
let notification = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("Timeout waiting for notification")
.expect("No notification received");
assert_eq!(
notification.subscription_id,
format!("uuid:{subscription_id}")
);
assert!(notification.event_xml.contains("TransportState"));
assert!(notification.event_xml.contains("PLAYING"));
let event_xml2 = r#"<?xml version="1.0"?>
<e:propertyset xmlns:e="urn:schemas-upnp-org:event-1-0">
<e:property>
<Volume>50</Volume>
</e:property>
</e:propertyset>"#;
let response2 = client
.request(reqwest::Method::from_bytes(b"NOTIFY").unwrap(), ¬ify_url)
.header("SID", format!("uuid:{subscription_id}"))
.header("Content-Type", "text/xml")
.body(event_xml2.to_string())
.send()
.await
.expect("Failed to send second HTTP request");
assert_eq!(response2.status(), 200);
let notification2 = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("Timeout waiting for second notification")
.expect("No second notification received");
assert_eq!(
notification2.subscription_id,
format!("uuid:{subscription_id}")
);
assert!(notification2.event_xml.contains("Volume"));
assert!(notification2.event_xml.contains("50"));
let unregistered_url = format!("{base_url}/notify/unregistered-sub");
let response3 = client
.request(
reqwest::Method::from_bytes(b"NOTIFY").unwrap(),
&unregistered_url,
)
.header("SID", "uuid:unregistered-sub")
.header("Content-Type", "text/xml")
.body("<event>test</event>")
.send()
.await
.expect("Failed to send third HTTP request");
assert_eq!(response3.status(), 200);
let no_notification = timeout(Duration::from_millis(100), rx.recv()).await;
assert!(
no_notification.is_err(),
"Should not receive notification for unregistered subscription (buffered only)"
);
let response4 = client
.request(reqwest::Method::from_bytes(b"NOTIFY").unwrap(), ¬ify_url)
.header("Content-Type", "text/xml")
.body("<event>test</event>")
.send()
.await
.expect("Failed to send fourth HTTP request");
assert_eq!(response4.status(), 400);
server.shutdown().await.expect("Failed to shutdown server");
}
#[tokio::test]
async fn test_multiple_subscriptions_concurrent_events() {
let (tx, mut rx) = mpsc::unbounded_channel::<NotificationPayload>();
let server = CallbackServer::new((50200, 50300), tx)
.await
.expect("Failed to create callback server");
let base_url = server.base_url().to_string();
let sub1 = "subscription-1".to_string();
let sub2 = "subscription-2".to_string();
let sub3 = "subscription-3".to_string();
server.router().register(format!("uuid:{sub1}")).await;
server.router().register(format!("uuid:{sub2}")).await;
server.router().register(format!("uuid:{sub3}")).await;
let client = reqwest::Client::new();
let handles = vec![
tokio::spawn({
let client = client.clone();
let base_url = base_url.clone();
let sub1 = sub1.clone();
async move {
client
.request(
reqwest::Method::from_bytes(b"NOTIFY").unwrap(),
format!("{base_url}/notify/{sub1}"),
)
.header("SID", format!("uuid:{sub1}"))
.body("<event>data1</event>")
.send()
.await
}
}),
tokio::spawn({
let client = client.clone();
let base_url = base_url.clone();
let sub2 = sub2.clone();
async move {
client
.request(
reqwest::Method::from_bytes(b"NOTIFY").unwrap(),
format!("{base_url}/notify/{sub2}"),
)
.header("SID", format!("uuid:{sub2}"))
.body("<event>data2</event>")
.send()
.await
}
}),
tokio::spawn({
let client = client.clone();
let base_url = base_url.clone();
let sub3 = sub3.clone();
async move {
client
.request(
reqwest::Method::from_bytes(b"NOTIFY").unwrap(),
format!("{base_url}/notify/{sub3}"),
)
.header("SID", format!("uuid:{sub3}"))
.body("<event>data3</event>")
.send()
.await
}
}),
];
for handle in handles {
let response = handle
.await
.expect("Task failed")
.expect("HTTP request failed");
assert_eq!(response.status(), 200);
}
let mut notifications = Vec::new();
for _ in 0..3 {
let notification = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("Timeout waiting for notification")
.expect("No notification received");
notifications.push(notification);
}
let mut received_subs: Vec<String> = notifications
.iter()
.map(|n| n.subscription_id.clone())
.collect();
received_subs.sort();
let mut expected_subs = vec![
format!("uuid:{}", sub1),
format!("uuid:{}", sub2),
format!("uuid:{}", sub3),
];
expected_subs.sort();
assert_eq!(received_subs, expected_subs);
for notification in notifications {
match notification.subscription_id.as_str() {
"uuid:subscription-1" => assert!(notification.event_xml.contains("data1")),
"uuid:subscription-2" => assert!(notification.event_xml.contains("data2")),
"uuid:subscription-3" => assert!(notification.event_xml.contains("data3")),
_ => panic!(
"Unexpected subscription ID: {}",
notification.subscription_id
),
}
}
server.shutdown().await.expect("Failed to shutdown server");
}
#[tokio::test]
async fn test_dynamic_subscription_management() {
let (tx, mut rx) = mpsc::unbounded_channel::<NotificationPayload>();
let server = CallbackServer::new((50400, 50500), tx)
.await
.expect("Failed to create callback server");
let base_url = server.base_url().to_string();
let client = reqwest::Client::new();
let subscription_id = "dynamic-subscription".to_string();
let notify_url = format!("{base_url}/notify/{subscription_id}");
let response1 = client
.request(reqwest::Method::from_bytes(b"NOTIFY").unwrap(), ¬ify_url)
.header("SID", format!("uuid:{subscription_id}"))
.body("<event>before_register</event>")
.send()
.await
.expect("Failed to send HTTP request");
assert_eq!(response1.status(), 200);
server
.router()
.register(format!("uuid:{subscription_id}"))
.await;
let replayed = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("Timeout waiting for replayed notification")
.expect("No replayed notification received");
assert_eq!(replayed.subscription_id, format!("uuid:{subscription_id}"));
assert!(replayed.event_xml.contains("before_register"));
let response2 = client
.request(reqwest::Method::from_bytes(b"NOTIFY").unwrap(), ¬ify_url)
.header("SID", format!("uuid:{subscription_id}"))
.body("<event>after_register</event>")
.send()
.await
.expect("Failed to send HTTP request");
assert_eq!(response2.status(), 200);
let notification = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("Timeout waiting for notification")
.expect("No notification received");
assert_eq!(
notification.subscription_id,
format!("uuid:{subscription_id}")
);
assert!(notification.event_xml.contains("after_register"));
server
.router()
.unregister(&format!("uuid:{subscription_id}"))
.await;
let response3 = client
.request(reqwest::Method::from_bytes(b"NOTIFY").unwrap(), ¬ify_url)
.header("SID", format!("uuid:{subscription_id}"))
.body("<event>after_unregister</event>")
.send()
.await
.expect("Failed to send HTTP request");
assert_eq!(response3.status(), 200);
let no_notification = timeout(Duration::from_millis(100), rx.recv()).await;
assert!(
no_notification.is_err(),
"Should not receive notification after unregistration (buffered only)"
);
server.shutdown().await.expect("Failed to shutdown server");
}
#[tokio::test]
async fn test_server_ip_and_url_detection() {
let (tx, _rx) = mpsc::unbounded_channel::<NotificationPayload>();
let server = CallbackServer::new((50600, 50700), tx)
.await
.expect("Failed to create callback server");
let base_url = server.base_url();
let port = server.port();
assert!(base_url.starts_with("http://"));
assert!(base_url.contains(&port.to_string()));
assert!((50600..=50700).contains(&port));
let client = reqwest::Client::new();
let test_url = format!("{base_url}/nonexistent");
let response = client
.get(&test_url)
.send()
.await
.expect("Failed to connect to server");
assert_eq!(response.status(), 404);
server.shutdown().await.expect("Failed to shutdown server");
}
#[tokio::test]
async fn test_error_handling() {
let (tx, mut rx) = mpsc::unbounded_channel::<NotificationPayload>();
let server = CallbackServer::new((50800, 50900), tx)
.await
.expect("Failed to create callback server");
let base_url = server.base_url().to_string();
let client = reqwest::Client::new();
let subscription_id = "error-test-sub".to_string();
server
.router()
.register(format!("uuid:{subscription_id}"))
.await;
let notify_url = format!("{base_url}/notify/{subscription_id}");
let response1 = client
.get(¬ify_url)
.header("SID", format!("uuid:{subscription_id}"))
.send()
.await
.expect("Failed to send GET request");
assert_ne!(response1.status(), 200);
let response2 = client
.request(reqwest::Method::from_bytes(b"NOTIFY").unwrap(), ¬ify_url)
.header("SID", format!("uuid:{subscription_id}"))
.header("NT", "invalid-value")
.header("NTS", "upnp:propchange")
.body("<event>test</event>")
.send()
.await
.expect("Failed to send request with invalid NT");
assert_eq!(response2.status(), 400);
let response3 = client
.request(reqwest::Method::from_bytes(b"NOTIFY").unwrap(), ¬ify_url)
.header("SID", format!("uuid:{subscription_id}"))
.header("NT", "upnp:event")
.header("NTS", "invalid-value")
.body("<event>test</event>")
.send()
.await
.expect("Failed to send request with invalid NTS");
assert_eq!(response3.status(), 400);
let no_notification = timeout(Duration::from_millis(100), rx.recv()).await;
assert!(
no_notification.is_err(),
"Should not receive notifications for malformed requests"
);
server.shutdown().await.expect("Failed to shutdown server");
}
#[tokio::test]
async fn test_notify_before_register_is_replayed() {
let (tx, mut rx) = mpsc::unbounded_channel::<NotificationPayload>();
let server = CallbackServer::new((51200, 51300), tx)
.await
.expect("Failed to create callback server");
let base_url = server.base_url().to_string();
let client = reqwest::Client::new();
let sub_id = "uuid:race-integration";
let notify_url = format!("{base_url}/notify/race-test");
let resp = client
.request(reqwest::Method::from_bytes(b"NOTIFY").unwrap(), ¬ify_url)
.header("SID", sub_id)
.header("NT", "upnp:event")
.header("NTS", "upnp:propchange")
.body("<event>initial-state</event>")
.send()
.await
.expect("Failed to send NOTIFY");
assert_eq!(resp.status(), 200);
let no_notification = timeout(Duration::from_millis(100), rx.recv()).await;
assert!(
no_notification.is_err(),
"Event should be buffered, not delivered immediately"
);
server.router().register(sub_id.to_string()).await;
let payload = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("Timeout waiting for replayed event")
.expect("No replayed event received");
assert_eq!(payload.subscription_id, sub_id);
assert!(payload.event_xml.contains("initial-state"));
server.shutdown().await.expect("Failed to shutdown server");
}