use crate::Authentication;
use crate::constants::{OK_CODES, QCODES};
use crate::dvrip::DVRIPCam;
use crate::error::Result;
use crate::protocol::{receive_data, receive_packet_header};
use async_trait::async_trait;
use byteorder::{BigEndian, ByteOrder, LittleEndian};
use serde_json::json;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use tokio::io::AsyncReadExt;
#[derive(Debug)]
pub struct FrameMetadata {
pub width: Option<u32>,
pub height: Option<u32>,
pub fps: Option<u8>,
pub frame_type: Option<String>,
pub media_type: Option<String>,
pub datetime: Option<chrono::DateTime<chrono::Local>>,
}
pub type FrameCallback = Box<dyn Fn(Vec<u8>, FrameMetadata) + Send + Sync>;
#[async_trait]
pub trait Monitoring: Send + Sync {
async fn start_monitor(
&mut self,
callback: FrameCallback,
stream: &str,
channel: u8,
) -> Result<()>;
async fn stop_monitor(&mut self) -> Result<()>;
async fn snapshot(&mut self, channel: u8) -> Result<Vec<u8>>;
fn is_monitoring(&self) -> bool;
}
#[async_trait]
impl Monitoring for DVRIPCam {
async fn start_monitor(
&mut self,
callback: FrameCallback,
stream: &str,
channel: u8,
) -> Result<()> {
let params = json!({
"Channel": channel,
"CombinMode": "NONE",
"StreamType": stream,
"TransMode": "TCP",
});
let data = json!({
"Action": "Claim",
"Parameter": params,
});
let reply = self.set_command("OPMonitor", data, None).await?;
if let Some(ret) = reply.get("Ret").and_then(|r| r.as_u64())
&& !OK_CODES.contains(&(ret as u32))
{
return Err(crate::error::DVRIPError::ProtocolError(
"Failed to start monitoring".to_string(),
));
}
let session = self.session_id();
let start_data = json!({
"Name": "OPMonitor",
"SessionID": format!("0x{:08X}", session),
"OPMonitor": {
"Action": "Start",
"Parameter": params,
},
});
self.send_command(1410, start_data, false).await?;
self.monitoring.store(true, Ordering::Release);
let stream_clone = self.stream.clone();
let monitoring_flag = self.monitoring.clone();
let timeout = self.timeout;
let callback = Arc::new(callback);
tokio::spawn(async move {
while monitoring_flag.load(Ordering::Acquire) {
let mut stream_guard = stream_clone.lock().await;
if let Some(s) = stream_guard.as_mut() {
let (mut reader, _) = s.split();
match DVRIPCam::reassemble_bin_payload_static(&mut reader, timeout).await {
Ok((frame, metadata)) => {
callback(frame, metadata);
}
Err(_) => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}
} else {
break;
}
}
});
Ok(())
}
async fn stop_monitor(&mut self) -> Result<()> {
self.monitoring.store(false, Ordering::Release);
Ok(())
}
async fn snapshot(&mut self, channel: u8) -> Result<Vec<u8>> {
let session = self.session_id();
let data = json!({
"Name": "OPSNAP",
"SessionID": format!("0x{:08X}", session),
"OPSNAP": {
"Channel": channel,
},
});
self.send_command(QCODES.get("OPSNAP").copied().unwrap_or(1560), data, false)
.await?;
let mut stream_guard = self.stream.lock().await;
if let Some(s) = stream_guard.as_mut() {
let (mut reader, _) = s.split();
let (frame, _) =
DVRIPCam::reassemble_bin_payload_static(&mut reader, self.timeout).await?;
return Ok(frame);
}
Err(crate::error::DVRIPError::ConnectionError(
"Stream not available".to_string(),
))
}
fn is_monitoring(&self) -> bool {
self.monitoring.load(Ordering::Acquire)
}
}
impl DVRIPCam {
pub(crate) async fn reassemble_bin_payload_static<R: AsyncReadExt + Unpin>(
reader: &mut R,
timeout: tokio::time::Duration,
) -> Result<(Vec<u8>, FrameMetadata)> {
let mut metadata = FrameMetadata {
width: None,
height: None,
fps: None,
frame_type: None,
media_type: None,
datetime: None,
};
let mut length = 0u32;
let mut buf = Vec::new();
let start_time = tokio::time::Instant::now();
loop {
if start_time.elapsed() > timeout {
return Err(crate::error::DVRIPError::ConnectionError(
"Timeout receiving payload".to_string(),
));
}
let header = receive_packet_header(reader).await?;
let packet = receive_data(reader, header.data_len as usize, timeout).await?;
let mut frame_len = 0usize;
if length == 0 {
if packet.len() < 4 {
continue;
}
let data_type = BigEndian::read_u32(&packet[0..4]);
if data_type == 0x1FC || data_type == 0x1FE {
frame_len = 16;
if packet.len() >= frame_len {
let media = packet[4];
metadata.fps = Some(packet[5]);
let w = packet[6] as u32;
let h = packet[7] as u32;
let dt = LittleEndian::read_u32(&packet[8..12]);
length = LittleEndian::read_u32(&packet[12..16]);
metadata.width = Some(w * 8);
metadata.height = Some(h * 8);
metadata.datetime = Some(Self::internal_to_datetime_static(dt));
if data_type == 0x1FC {
metadata.frame_type = Some("I".to_string());
}
metadata.media_type = Self::internal_to_type_static(data_type, media);
}
} else if data_type == 0x1FD {
frame_len = 8;
if packet.len() >= frame_len {
length = LittleEndian::read_u32(&packet[4..8]);
metadata.frame_type = Some("P".to_string());
}
} else if data_type == 0x1FA {
frame_len = 8;
if packet.len() >= frame_len {
let media = packet[4];
let _samp_rate = LittleEndian::read_u16(&packet[5..7]);
length = LittleEndian::read_u16(&packet[6..8]) as u32;
metadata.media_type = Self::internal_to_type_static(data_type, media);
}
} else if data_type == 0x1F9 {
frame_len = 8;
if packet.len() >= frame_len {
let media = packet[4];
let _n = packet[5];
length = LittleEndian::read_u16(&packet[6..8]) as u32;
metadata.media_type = Self::internal_to_type_static(data_type, media);
}
} else if data_type == 0xFFD8FFE0 {
return Ok((packet, metadata));
} else {
return Err(crate::error::DVRIPError::ProtocolError(format!(
"Unknown data type: 0x{:X}",
data_type
)));
}
}
if frame_len < packet.len() {
buf.extend_from_slice(&packet[frame_len..]);
}
if length > 0 && buf.len() >= length as usize {
buf.truncate(length as usize);
return Ok((buf, metadata));
}
}
}
fn internal_to_type_static(data_type: u32, value: u8) -> Option<String> {
match data_type {
0x1FC | 0x1FD => match value {
1 => Some("mpeg4".to_string()),
2 => Some("h264".to_string()),
3 => Some("h265".to_string()),
_ => None,
},
0x1F9 => {
if value == 1 || value == 6 {
Some("info".to_string())
} else {
None
}
}
0x1FA => {
if value == 0xE {
Some("g711a".to_string())
} else {
None
}
}
0x1FE => {
if value == 0 {
Some("jpeg".to_string())
} else {
None
}
}
_ => None,
}
}
fn internal_to_datetime_static(value: u32) -> chrono::DateTime<chrono::Local> {
let second = value & 0x3F;
let minute = (value & 0xFC0) >> 6;
let hour = (value & 0x1F000) >> 12;
let day = (value & 0x3E0000) >> 17;
let month = (value & 0x3C00000) >> 22;
let year = ((value & 0xFC000000) >> 26) + 2000;
chrono::NaiveDate::from_ymd_opt(year as i32, month, day)
.and_then(|d| d.and_hms_opt(hour, minute, second))
.map(|dt| {
chrono::DateTime::from_naive_utc_and_offset(dt, *chrono::Local::now().offset())
})
.unwrap_or_else(chrono::Local::now)
}
}