use super::OnvifClient;
use crate::error::OnvifError;
use crate::soap::{find_response, parse_soap_body};
use crate::types::{
EventProperties, NotificationMessage, PullPointSubscription, PushSubscription, xml_escape,
};
use futures_core::Stream;
impl OnvifClient {
pub async fn get_event_properties(
&self,
events_url: &str,
) -> Result<EventProperties, OnvifError> {
const ACTION: &str =
"http://www.onvif.org/ver10/events/wsdl/EventPortType/GetEventPropertiesRequest";
const BODY: &str = "<tev:GetEventProperties/>";
let xml = self.call(events_url, ACTION, BODY).await?;
let body_node = parse_soap_body(&xml)?;
let resp = find_response(&body_node, "GetEventPropertiesResponse")?;
EventProperties::from_xml(resp)
}
pub async fn create_pull_point_subscription(
&self,
events_url: &str,
filter: Option<&str>,
initial_termination_time: Option<&str>,
) -> Result<PullPointSubscription, OnvifError> {
const ACTION: &str = "http://www.onvif.org/ver10/events/wsdl/EventPortType/CreatePullPointSubscriptionRequest";
let filter_el = filter
.map(|f| {
format!(
"<tev:Filter>\
<wsnt:TopicExpression \
Dialect=\"http://www.onvif.org/ver10/tev/topicExpression/ConcreteSet\"\
>{}</wsnt:TopicExpression>\
</tev:Filter>",
xml_escape(f)
)
})
.unwrap_or_default();
let termination_el = initial_termination_time
.map(|t| {
format!(
"<tev:InitialTerminationTime>{}</tev:InitialTerminationTime>",
xml_escape(t)
)
})
.unwrap_or_default();
let body = format!(
"<tev:CreatePullPointSubscription>\
{filter_el}{termination_el}\
</tev:CreatePullPointSubscription>"
);
let xml = self.call(events_url, ACTION, &body).await?;
let body_node = parse_soap_body(&xml)?;
let resp = find_response(&body_node, "CreatePullPointSubscriptionResponse")?;
PullPointSubscription::from_xml(resp)
}
pub async fn pull_messages(
&self,
subscription_url: &str,
timeout: &str,
max_messages: u32,
) -> Result<Vec<NotificationMessage>, OnvifError> {
const ACTION: &str =
"http://www.onvif.org/ver10/events/wsdl/PullPointSubscription/PullMessagesRequest";
let timeout = xml_escape(timeout);
let body = format!(
"<tev:PullMessages>\
<tev:Timeout>{timeout}</tev:Timeout>\
<tev:MessageLimit>{max_messages}</tev:MessageLimit>\
</tev:PullMessages>"
);
let xml = self.call(subscription_url, ACTION, &body).await?;
let body_node = parse_soap_body(&xml)?;
let resp = find_response(&body_node, "PullMessagesResponse")?;
Ok(NotificationMessage::vec_from_xml(resp))
}
pub async fn renew_subscription(
&self,
subscription_url: &str,
termination_time: &str,
) -> Result<String, OnvifError> {
const ACTION: &str = "http://docs.oasis-open.org/wsn/bw-2/SubscriptionManager/RenewRequest";
let termination_time = xml_escape(termination_time);
let body = format!(
"<wsnt:Renew>\
<wsnt:TerminationTime>{termination_time}</wsnt:TerminationTime>\
</wsnt:Renew>"
);
let xml = self.call(subscription_url, ACTION, &body).await?;
let body_node = parse_soap_body(&xml)?;
let resp = find_response(&body_node, "RenewResponse")?;
Ok(resp
.child("TerminationTime")
.map(|n| n.text().to_string())
.unwrap_or_default())
}
pub async fn unsubscribe(&self, subscription_url: &str) -> Result<(), OnvifError> {
const ACTION: &str =
"http://docs.oasis-open.org/wsn/bw-2/SubscriptionManager/UnsubscribeRequest";
const BODY: &str = "<wsnt:Unsubscribe/>";
let xml = self.call(subscription_url, ACTION, BODY).await?;
let body_node = parse_soap_body(&xml)?;
find_response(&body_node, "UnsubscribeResponse")?;
Ok(())
}
pub async fn subscribe(
&self,
events_url: &str,
consumer_url: &str,
filter: Option<&str>,
termination_time: Option<&str>,
) -> Result<PushSubscription, OnvifError> {
const ACTION: &str =
"http://docs.oasis-open.org/wsn/bw-2/NotificationProducer/SubscribeRequest";
let consumer_url = xml_escape(consumer_url);
let filter_el = filter
.map(|f| {
format!(
"<wsnt:Filter>\
<wsnt:TopicExpression \
Dialect=\"http://www.onvif.org/ver10/tev/topicExpression/ConcreteSet\"\
>{}</wsnt:TopicExpression>\
</wsnt:Filter>",
xml_escape(f)
)
})
.unwrap_or_default();
let termination_el = termination_time
.map(|t| {
format!(
"<wsnt:InitialTerminationTime>{}</wsnt:InitialTerminationTime>",
xml_escape(t)
)
})
.unwrap_or_default();
let body = format!(
"<wsnt:Subscribe>\
<wsnt:ConsumerReference>\
<wsa:Address>{consumer_url}</wsa:Address>\
</wsnt:ConsumerReference>\
{filter_el}{termination_el}\
</wsnt:Subscribe>"
);
let xml = self.call(events_url, ACTION, &body).await?;
let body_node = parse_soap_body(&xml)?;
let resp = find_response(&body_node, "SubscribeResponse")?;
PushSubscription::from_xml(resp)
}
pub fn event_stream<'a>(
&'a self,
subscription_url: &'a str,
timeout: &'a str,
max_messages: u32,
) -> std::pin::Pin<Box<dyn Stream<Item = Result<NotificationMessage, OnvifError>> + 'a>> {
Box::pin(async_stream::try_stream! {
loop {
let messages = self
.pull_messages(subscription_url, timeout, max_messages)
.await?;
for msg in messages {
yield msg;
}
}
})
}
}
pub fn notification_listener(
bind_addr: std::net::SocketAddr,
) -> std::pin::Pin<Box<dyn futures_core::Stream<Item = NotificationMessage> + Send>> {
let (tx, mut rx) = tokio::sync::mpsc::channel::<NotificationMessage>(256);
tokio::spawn(async move {
let Ok(listener) = tokio::net::TcpListener::bind(bind_addr).await else {
return;
};
while let Ok((mut conn, _)) = listener.accept().await {
let tx = tx.clone();
tokio::spawn(async move {
for msg in handle_notify_connection(&mut conn).await {
if tx.send(msg).await.is_err() {
break; }
}
});
}
});
Box::pin(async_stream::stream! {
while let Some(msg) = rx.recv().await {
yield msg;
}
})
}
async fn handle_notify_connection(conn: &mut tokio::net::TcpStream) -> Vec<NotificationMessage> {
use tokio::io::AsyncWriteExt;
let body = match read_http_body(conn).await {
Ok(b) => b,
Err(_) => {
let _ = conn
.write_all(b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n")
.await;
return vec![];
}
};
let _ = conn
.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n")
.await;
let Ok(root) = crate::soap::XmlNode::parse(&body) else {
return vec![];
};
let body_el = root.child("Body").unwrap_or(&root);
let notify = body_el.child("Notify").unwrap_or(body_el);
NotificationMessage::vec_from_xml(notify)
}
const MAX_NOTIFY_BODY: usize = 1_048_576;
async fn read_http_body(conn: &mut tokio::net::TcpStream) -> std::io::Result<String> {
use tokio::io::AsyncReadExt;
let mut buf: Vec<u8> = Vec::with_capacity(8192);
let mut tmp = [0u8; 4096];
let header_end = loop {
let n = conn.read(&mut tmp).await?;
if n == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"connection closed before headers",
));
}
buf.extend_from_slice(&tmp[..n]);
if let Some(pos) = find_subsequence(&buf, b"\r\n\r\n") {
break pos + 4;
}
if buf.len() > 131_072 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"headers too large",
));
}
};
let headers = std::str::from_utf8(&buf[..header_end]).unwrap_or("");
let content_length: usize = headers
.lines()
.find(|l| l.to_ascii_lowercase().starts_with("content-length:"))
.and_then(|l| l.split_once(':').map(|x| x.1))
.and_then(|v| v.trim().parse().ok())
.unwrap_or(0);
if content_length > MAX_NOTIFY_BODY {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"notification body too large",
));
}
let already_read = buf.len() - header_end;
if content_length > already_read {
buf.resize(header_end + content_length, 0);
conn.read_exact(&mut buf[header_end + already_read..])
.await?;
}
Ok(String::from_utf8_lossy(&buf[header_end..header_end + content_length]).into_owned())
}
fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
haystack.windows(needle.len()).position(|w| w == needle)
}