use crate::{
error::{Error, UPnPError},
find_in_xml,
scpd::SCPD,
utils::{self, HttpResponseExt},
Result,
};
use futures_core::stream::Stream;
use genawaiter::sync::{Co, Gen};
use tokio::{
io::{AsyncBufReadExt, BufReader},
net::TcpListener,
};
use http::{uri::PathAndQuery, Request, Uri};
use roxmltree::{Document, Node};
use ssdp_client::URN;
use std::collections::HashMap;
use utils::HyperBodyExt;
#[derive(Debug, Clone)]
pub struct Service {
service_type: URN,
service_id: String,
scpd_endpoint: PathAndQuery,
control_endpoint: PathAndQuery,
event_sub_endpoint: PathAndQuery,
}
impl Service {
pub(crate) fn from_xml(node: Node<'_, '_>) -> Result<Self> {
#[allow(non_snake_case)]
let (service_type, service_id, scpd_endpoint, control_endpoint, event_sub_endpoint) =
find_in_xml! { node => serviceType, serviceId, SCPDURL, controlURL, eventSubURL };
Ok(Self {
service_type: utils::parse_node_text(service_type)?,
service_id: utils::parse_node_text(service_id)?,
scpd_endpoint: utils::parse_node_text(scpd_endpoint)?,
control_endpoint: utils::parse_node_text(control_endpoint)?,
event_sub_endpoint: utils::parse_node_text(event_sub_endpoint)?,
})
}
pub fn service_type(&self) -> &URN {
&self.service_type
}
pub fn service_id(&self) -> &str {
&self.service_id
}
pub(crate) fn control_url(&self, url: &Uri) -> Uri {
replace_url_path(url, &self.control_endpoint)
}
pub(crate) fn scpd_url(&self, url: &Uri) -> Uri {
replace_url_path(url, &self.scpd_endpoint)
}
pub(crate) fn event_sub_url(&self, url: &Uri) -> Uri {
replace_url_path(url, &self.event_sub_endpoint)
}
pub async fn scpd(&self, url: &Uri) -> Result<SCPD> {
Ok(SCPD::from_url(&self.scpd_url(url), self.service_type().clone()).await?)
}
pub async fn action(
&self,
url: &Uri,
action: &str,
payload: &str,
) -> Result<HashMap<String, String>> {
let body = format!(
r#"
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"
s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
<u:{action} xmlns:u="{service}">
{payload}
</u:{action}>
</s:Body>
</s:Envelope>"#,
service = &self.service_type,
action = action,
payload = payload
);
let soap_action = format!("\"{}#{}\"", &self.service_type, action);
let request = Request::post(self.control_url(url))
.header("CONTENT-TYPE", "xml")
.header("SOAPAction", soap_action)
.body(body.into())
.expect("infallible");
let doc = hyper::Client::new()
.request(request)
.await?
.err_if_not_200()?
.into_body()
.text()
.await?;
let doc = std::str::from_utf8(&doc)?;
let document = Document::parse(&doc)?;
let response = utils::find_root(&document, "Body", "UPnP Response")?
.first_element_child()
.ok_or_else(|| {
Error::XmlMissingElement("Body".to_string(), format!("{}Response", action))
})?;
if response.tag_name().name().eq_ignore_ascii_case("Fault") {
return Err(UPnPError::from_fault_node(response)?.into());
}
let values: HashMap<_, _> = response
.children()
.filter(Node::is_element)
.filter_map(|node| -> Option<(String, String)> {
if let Some(text) = node.text() {
Some((node.tag_name().name().to_string(), text.to_string()))
} else {
None
}
})
.collect();
Ok(values)
}
async fn make_subscribe_request(
&self,
url: &Uri,
callback: &str,
timeout_secs: u32,
) -> Result<String> {
let req = Request::builder()
.uri(self.event_sub_url(url))
.method("SUBSCRIBE")
.header("CALLBACK", format!("<{}>", callback))
.header("NT", "upnp:event")
.header("TIMEOUT", format!("Second-{}", timeout_secs))
.body(hyper::Body::empty())
.expect("infallible");
let response = hyper::Client::new().request(req).await?.err_if_not_200()?;
let sid = response
.headers()
.get("sid")
.ok_or_else(|| Error::ParseError("missing http header `SID`"))?
.to_str()
.map_err(|_| Error::ParseError("SID header contained non-visible ASCII bytes"))?
.to_string();
Ok(sid)
}
pub async fn subscribe(
&self,
url: &Uri,
timeout_secs: u32,
) -> Result<(String, impl Stream<Item = Result<HashMap<String, String>>>)> {
let addr = utils::get_local_addr()?;
let listener = TcpListener::bind(addr).await?;
let addr = format!("http://{}", listener.local_addr()?);
let sid = self
.make_subscribe_request(url, &addr, timeout_secs)
.await?;
let stream = Gen::new(move |co: Co<Result<_>>| subscribe_stream(listener, co));
Ok((sid, stream))
}
pub async fn renew_subscription(&self, url: &Uri, sid: &str, timeout_secs: u32) -> Result<()> {
let req = Request::builder()
.uri(self.event_sub_url(url))
.method("SUBSCRIBE")
.header("SID", sid)
.header("TIMEOUT", format!("Second-{}", timeout_secs))
.body(hyper::Body::empty())
.expect("infallible");
hyper::Client::new().request(req).await?.err_if_not_200()?;
Ok(())
}
pub async fn unsubscribe(&self, url: &Uri, sid: &str) -> Result<()> {
let req = Request::builder()
.uri(self.event_sub_url(url))
.method("UNSUBSCRIBE")
.header("SID", sid)
.body(hyper::Body::empty())
.expect("infallible");
hyper::Client::new().request(req).await?.err_if_not_200()?;
Ok(())
}
}
macro_rules! yield_try {
( $co:expr => $expr:expr ) => {
match $expr {
Ok(val) => val,
Err(e) => {
$co.yield_(Err(e.into())).await;
continue;
}
}
};
}
fn propertyset_to_map(input: &str) -> Result<HashMap<String, String>, roxmltree::Error> {
let doc = Document::parse(&input)?;
let hashmap: HashMap<String, String> = doc
.root_element()
.children()
.filter_map(|child| child.first_element_child())
.filter_map(|node| {
if let Some(text) = node.text() {
Some((node.tag_name().name().to_string(), text.to_string()))
} else {
None
}
})
.collect();
Ok(hashmap)
}
async fn subscribe_stream(listener: TcpListener, co: Co<Result<HashMap<String, String>>>) {
loop {
let (stream, _) = yield_try!(co => listener.accept().await);
let mut lines = BufReader::new(stream).lines();
let mut input = String::new();
let mut is_xml = false;
while let Ok(Some(line)) = lines.next_line().await {
if is_xml || line.starts_with("<e:propertyset") {
input.push_str(&line);
is_xml = true;
}
if line.ends_with("</e:propertyset>") {
break;
};
}
let hashmap = yield_try!(co => propertyset_to_map(&input));
co.yield_(Ok(hashmap)).await;
}
}
fn replace_url_path(url: &Uri, path: &PathAndQuery) -> Uri {
let mut parts = url.clone().into_parts();
parts.path_and_query = Some(path.clone());
Uri::from_parts(parts).expect("infallible")
}