dvrip_rs/commands/
monitoring.rs1use crate::Authentication;
2use crate::constants::{OK_CODES, QCODES};
3use crate::dvrip::DVRIPCam;
4use crate::error::Result;
5use crate::protocol::{receive_data, receive_packet_header};
6use async_trait::async_trait;
7use byteorder::{BigEndian, ByteOrder, LittleEndian};
8use serde_json::json;
9use std::sync::Arc;
10use std::sync::atomic::Ordering;
11use tokio::io::AsyncReadExt;
12
13#[derive(Debug)]
14pub struct FrameMetadata {
15 pub width: Option<u32>,
16 pub height: Option<u32>,
17 pub fps: Option<u8>,
18 pub frame_type: Option<String>,
19 pub media_type: Option<String>,
20 pub datetime: Option<chrono::DateTime<chrono::Local>>,
21}
22
23pub type FrameCallback = Box<dyn Fn(Vec<u8>, FrameMetadata) + Send + Sync>;
24
25#[async_trait]
26pub trait Monitoring: Send + Sync {
27 async fn start_monitor(
29 &mut self,
30 callback: FrameCallback,
31 stream: &str,
32 channel: u8,
33 ) -> Result<()>;
34
35 async fn stop_monitor(&mut self) -> Result<()>;
37
38 async fn snapshot(&mut self, channel: u8) -> Result<Vec<u8>>;
40
41 fn is_monitoring(&self) -> bool;
43}
44
45#[async_trait]
46impl Monitoring for DVRIPCam {
47 async fn start_monitor(
48 &mut self,
49 callback: FrameCallback,
50 stream: &str,
51 channel: u8,
52 ) -> Result<()> {
53 let params = json!({
54 "Channel": channel,
55 "CombinMode": "NONE",
56 "StreamType": stream,
57 "TransMode": "TCP",
58 });
59
60 let data = json!({
61 "Action": "Claim",
62 "Parameter": params,
63 });
64
65 let reply = self.set_command("OPMonitor", data, None).await?;
66 if let Some(ret) = reply.get("Ret").and_then(|r| r.as_u64())
67 && !OK_CODES.contains(&(ret as u32))
68 {
69 return Err(crate::error::DVRIPError::ProtocolError(
70 "Failed to start monitoring".to_string(),
71 ));
72 }
73
74 let session = self.session_id();
75 let start_data = json!({
76 "Name": "OPMonitor",
77 "SessionID": format!("0x{:08X}", session),
78 "OPMonitor": {
79 "Action": "Start",
80 "Parameter": params,
81 },
82 });
83
84 self.send_command(1410, start_data, false).await?;
85 self.monitoring.store(true, Ordering::Release);
86
87 let stream_clone = self.stream.clone();
89 let monitoring_flag = self.monitoring.clone();
90 let timeout = self.timeout;
91 let callback = Arc::new(callback);
92
93 tokio::spawn(async move {
94 while monitoring_flag.load(Ordering::Acquire) {
95 let mut stream_guard = stream_clone.lock().await;
96 if let Some(s) = stream_guard.as_mut() {
97 let (mut reader, _) = s.split();
98
99 match DVRIPCam::reassemble_bin_payload_static(&mut reader, timeout).await {
100 Ok((frame, metadata)) => {
101 callback(frame, metadata);
102 }
103 Err(_) => {
104 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
105 }
106 }
107 } else {
108 break;
109 }
110 }
111 });
112
113 Ok(())
114 }
115
116 async fn stop_monitor(&mut self) -> Result<()> {
117 self.monitoring.store(false, Ordering::Release);
118 Ok(())
119 }
120
121 async fn snapshot(&mut self, channel: u8) -> Result<Vec<u8>> {
122 let session = self.session_id();
123 let data = json!({
124 "Name": "OPSNAP",
125 "SessionID": format!("0x{:08X}", session),
126 "OPSNAP": {
127 "Channel": channel,
128 },
129 });
130
131 self.send_command(QCODES.get("OPSNAP").copied().unwrap_or(1560), data, false)
132 .await?;
133
134 let mut stream_guard = self.stream.lock().await;
135 if let Some(s) = stream_guard.as_mut() {
136 let (mut reader, _) = s.split();
137 let (frame, _) =
138 DVRIPCam::reassemble_bin_payload_static(&mut reader, self.timeout).await?;
139 return Ok(frame);
140 }
141
142 Err(crate::error::DVRIPError::ConnectionError(
143 "Stream not available".to_string(),
144 ))
145 }
146
147 fn is_monitoring(&self) -> bool {
148 self.monitoring.load(Ordering::Acquire)
149 }
150}
151
152impl DVRIPCam {
153 pub(crate) async fn reassemble_bin_payload_static<R: AsyncReadExt + Unpin>(
154 reader: &mut R,
155 timeout: tokio::time::Duration,
156 ) -> Result<(Vec<u8>, FrameMetadata)> {
157 let mut metadata = FrameMetadata {
158 width: None,
159 height: None,
160 fps: None,
161 frame_type: None,
162 media_type: None,
163 datetime: None,
164 };
165
166 let mut length = 0u32;
167 let mut buf = Vec::new();
168 let start_time = tokio::time::Instant::now();
169
170 loop {
171 if start_time.elapsed() > timeout {
172 return Err(crate::error::DVRIPError::ConnectionError(
173 "Timeout receiving payload".to_string(),
174 ));
175 }
176
177 let header = receive_packet_header(reader).await?;
178 let packet = receive_data(reader, header.data_len as usize, timeout).await?;
179
180 let mut frame_len = 0usize;
181
182 if length == 0 {
183 if packet.len() < 4 {
184 continue;
185 }
186
187 let data_type = BigEndian::read_u32(&packet[0..4]);
188
189 if data_type == 0x1FC || data_type == 0x1FE {
190 frame_len = 16;
191 if packet.len() >= frame_len {
192 let media = packet[4];
193 metadata.fps = Some(packet[5]);
194 let w = packet[6] as u32;
195 let h = packet[7] as u32;
196 let dt = LittleEndian::read_u32(&packet[8..12]);
197 length = LittleEndian::read_u32(&packet[12..16]);
198
199 metadata.width = Some(w * 8);
200 metadata.height = Some(h * 8);
201 metadata.datetime = Some(Self::internal_to_datetime_static(dt));
202
203 if data_type == 0x1FC {
204 metadata.frame_type = Some("I".to_string());
205 }
206
207 metadata.media_type = Self::internal_to_type_static(data_type, media);
208 }
209 } else if data_type == 0x1FD {
210 frame_len = 8;
211 if packet.len() >= frame_len {
212 length = LittleEndian::read_u32(&packet[4..8]);
213 metadata.frame_type = Some("P".to_string());
214 }
215 } else if data_type == 0x1FA {
216 frame_len = 8;
217 if packet.len() >= frame_len {
218 let media = packet[4];
219 let _samp_rate = LittleEndian::read_u16(&packet[5..7]);
220 length = LittleEndian::read_u16(&packet[6..8]) as u32;
221 metadata.media_type = Self::internal_to_type_static(data_type, media);
222 }
223 } else if data_type == 0x1F9 {
224 frame_len = 8;
225 if packet.len() >= frame_len {
226 let media = packet[4];
227 let _n = packet[5];
228 length = LittleEndian::read_u16(&packet[6..8]) as u32;
229 metadata.media_type = Self::internal_to_type_static(data_type, media);
230 }
231 } else if data_type == 0xFFD8FFE0 {
232 return Ok((packet, metadata));
233 } else {
234 return Err(crate::error::DVRIPError::ProtocolError(format!(
235 "Unknown data type: 0x{:X}",
236 data_type
237 )));
238 }
239 }
240
241 if frame_len < packet.len() {
242 buf.extend_from_slice(&packet[frame_len..]);
243 }
244
245 if length > 0 && buf.len() >= length as usize {
246 buf.truncate(length as usize);
247 return Ok((buf, metadata));
248 }
249 }
250 }
251
252 fn internal_to_type_static(data_type: u32, value: u8) -> Option<String> {
253 match data_type {
254 0x1FC | 0x1FD => match value {
255 1 => Some("mpeg4".to_string()),
256 2 => Some("h264".to_string()),
257 3 => Some("h265".to_string()),
258 _ => None,
259 },
260 0x1F9 => {
261 if value == 1 || value == 6 {
262 Some("info".to_string())
263 } else {
264 None
265 }
266 }
267 0x1FA => {
268 if value == 0xE {
269 Some("g711a".to_string())
270 } else {
271 None
272 }
273 }
274 0x1FE => {
275 if value == 0 {
276 Some("jpeg".to_string())
277 } else {
278 None
279 }
280 }
281 _ => None,
282 }
283 }
284
285 fn internal_to_datetime_static(value: u32) -> chrono::DateTime<chrono::Local> {
286 let second = value & 0x3F;
287 let minute = (value & 0xFC0) >> 6;
288 let hour = (value & 0x1F000) >> 12;
289 let day = (value & 0x3E0000) >> 17;
290 let month = (value & 0x3C00000) >> 22;
291 let year = ((value & 0xFC000000) >> 26) + 2000;
292
293 chrono::NaiveDate::from_ymd_opt(year as i32, month, day)
294 .and_then(|d| d.and_hms_opt(hour, minute, second))
295 .map(|dt| {
296 chrono::DateTime::from_naive_utc_and_offset(dt, *chrono::Local::now().offset())
297 })
298 .unwrap_or_else(chrono::Local::now)
299 }
300}