Skip to main content

dvrip_rs/commands/
monitoring.rs

1use crate::Authentication;
2use crate::constants::{OK_CODES, QCODES};
3use crate::dvrip::DVRIPCam;
4use crate::error::Result;
5use crate::protocol::{receive_data, receive_packet_header};
6use async_trait::async_trait;
7use byteorder::{BigEndian, ByteOrder, LittleEndian};
8use serde_json::json;
9use std::sync::Arc;
10use std::sync::atomic::Ordering;
11use tokio::io::AsyncReadExt;
12
13#[derive(Debug)]
14pub struct FrameMetadata {
15    pub width: Option<u32>,
16    pub height: Option<u32>,
17    pub fps: Option<u8>,
18    pub frame_type: Option<String>,
19    pub media_type: Option<String>,
20    pub datetime: Option<chrono::DateTime<chrono::Local>>,
21}
22
23pub type FrameCallback = Box<dyn Fn(Vec<u8>, FrameMetadata) + Send + Sync>;
24
25#[async_trait]
26pub trait Monitoring: Send + Sync {
27    /// Start video monitoring
28    async fn start_monitor(
29        &mut self,
30        callback: FrameCallback,
31        stream: &str,
32        channel: u8,
33    ) -> Result<()>;
34
35    /// Stop video monitoring
36    async fn stop_monitor(&mut self) -> Result<()>;
37
38    /// Get a snapshot (screenshot)
39    async fn snapshot(&mut self, channel: u8) -> Result<Vec<u8>>;
40
41    /// Check if monitoring
42    fn is_monitoring(&self) -> bool;
43}
44
45#[async_trait]
46impl Monitoring for DVRIPCam {
47    async fn start_monitor(
48        &mut self,
49        callback: FrameCallback,
50        stream: &str,
51        channel: u8,
52    ) -> Result<()> {
53        let params = json!({
54            "Channel": channel,
55            "CombinMode": "NONE",
56            "StreamType": stream,
57            "TransMode": "TCP",
58        });
59
60        let data = json!({
61            "Action": "Claim",
62            "Parameter": params,
63        });
64
65        let reply = self.set_command("OPMonitor", data, None).await?;
66        if let Some(ret) = reply.get("Ret").and_then(|r| r.as_u64())
67            && !OK_CODES.contains(&(ret as u32))
68        {
69            return Err(crate::error::DVRIPError::ProtocolError(
70                "Failed to start monitoring".to_string(),
71            ));
72        }
73
74        let session = self.session_id();
75        let start_data = json!({
76            "Name": "OPMonitor",
77            "SessionID": format!("0x{:08X}", session),
78            "OPMonitor": {
79                "Action": "Start",
80                "Parameter": params,
81            },
82        });
83
84        self.send_command(1410, start_data, false).await?;
85        self.monitoring.store(true, Ordering::Release);
86
87        // Iniciar worker de monitoramento
88        let stream_clone = self.stream.clone();
89        let monitoring_flag = self.monitoring.clone();
90        let timeout = self.timeout;
91        let callback = Arc::new(callback);
92
93        tokio::spawn(async move {
94            while monitoring_flag.load(Ordering::Acquire) {
95                let mut stream_guard = stream_clone.lock().await;
96                if let Some(s) = stream_guard.as_mut() {
97                    let (mut reader, _) = s.split();
98
99                    match DVRIPCam::reassemble_bin_payload_static(&mut reader, timeout).await {
100                        Ok((frame, metadata)) => {
101                            callback(frame, metadata);
102                        }
103                        Err(_) => {
104                            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
105                        }
106                    }
107                } else {
108                    break;
109                }
110            }
111        });
112
113        Ok(())
114    }
115
116    async fn stop_monitor(&mut self) -> Result<()> {
117        self.monitoring.store(false, Ordering::Release);
118        Ok(())
119    }
120
121    async fn snapshot(&mut self, channel: u8) -> Result<Vec<u8>> {
122        let session = self.session_id();
123        let data = json!({
124            "Name": "OPSNAP",
125            "SessionID": format!("0x{:08X}", session),
126            "OPSNAP": {
127                "Channel": channel,
128            },
129        });
130
131        self.send_command(QCODES.get("OPSNAP").copied().unwrap_or(1560), data, false)
132            .await?;
133
134        let mut stream_guard = self.stream.lock().await;
135        if let Some(s) = stream_guard.as_mut() {
136            let (mut reader, _) = s.split();
137            let (frame, _) =
138                DVRIPCam::reassemble_bin_payload_static(&mut reader, self.timeout).await?;
139            return Ok(frame);
140        }
141
142        Err(crate::error::DVRIPError::ConnectionError(
143            "Stream not available".to_string(),
144        ))
145    }
146
147    fn is_monitoring(&self) -> bool {
148        self.monitoring.load(Ordering::Acquire)
149    }
150}
151
152impl DVRIPCam {
153    pub(crate) async fn reassemble_bin_payload_static<R: AsyncReadExt + Unpin>(
154        reader: &mut R,
155        timeout: tokio::time::Duration,
156    ) -> Result<(Vec<u8>, FrameMetadata)> {
157        let mut metadata = FrameMetadata {
158            width: None,
159            height: None,
160            fps: None,
161            frame_type: None,
162            media_type: None,
163            datetime: None,
164        };
165
166        let mut length = 0u32;
167        let mut buf = Vec::new();
168        let start_time = tokio::time::Instant::now();
169
170        loop {
171            if start_time.elapsed() > timeout {
172                return Err(crate::error::DVRIPError::ConnectionError(
173                    "Timeout receiving payload".to_string(),
174                ));
175            }
176
177            let header = receive_packet_header(reader).await?;
178            let packet = receive_data(reader, header.data_len as usize, timeout).await?;
179
180            let mut frame_len = 0usize;
181
182            if length == 0 {
183                if packet.len() < 4 {
184                    continue;
185                }
186
187                let data_type = BigEndian::read_u32(&packet[0..4]);
188
189                if data_type == 0x1FC || data_type == 0x1FE {
190                    frame_len = 16;
191                    if packet.len() >= frame_len {
192                        let media = packet[4];
193                        metadata.fps = Some(packet[5]);
194                        let w = packet[6] as u32;
195                        let h = packet[7] as u32;
196                        let dt = LittleEndian::read_u32(&packet[8..12]);
197                        length = LittleEndian::read_u32(&packet[12..16]);
198
199                        metadata.width = Some(w * 8);
200                        metadata.height = Some(h * 8);
201                        metadata.datetime = Some(Self::internal_to_datetime_static(dt));
202
203                        if data_type == 0x1FC {
204                            metadata.frame_type = Some("I".to_string());
205                        }
206
207                        metadata.media_type = Self::internal_to_type_static(data_type, media);
208                    }
209                } else if data_type == 0x1FD {
210                    frame_len = 8;
211                    if packet.len() >= frame_len {
212                        length = LittleEndian::read_u32(&packet[4..8]);
213                        metadata.frame_type = Some("P".to_string());
214                    }
215                } else if data_type == 0x1FA {
216                    frame_len = 8;
217                    if packet.len() >= frame_len {
218                        let media = packet[4];
219                        let _samp_rate = LittleEndian::read_u16(&packet[5..7]);
220                        length = LittleEndian::read_u16(&packet[6..8]) as u32;
221                        metadata.media_type = Self::internal_to_type_static(data_type, media);
222                    }
223                } else if data_type == 0x1F9 {
224                    frame_len = 8;
225                    if packet.len() >= frame_len {
226                        let media = packet[4];
227                        let _n = packet[5];
228                        length = LittleEndian::read_u16(&packet[6..8]) as u32;
229                        metadata.media_type = Self::internal_to_type_static(data_type, media);
230                    }
231                } else if data_type == 0xFFD8FFE0 {
232                    return Ok((packet, metadata));
233                } else {
234                    return Err(crate::error::DVRIPError::ProtocolError(format!(
235                        "Unknown data type: 0x{:X}",
236                        data_type
237                    )));
238                }
239            }
240
241            if frame_len < packet.len() {
242                buf.extend_from_slice(&packet[frame_len..]);
243            }
244
245            if length > 0 && buf.len() >= length as usize {
246                buf.truncate(length as usize);
247                return Ok((buf, metadata));
248            }
249        }
250    }
251
252    fn internal_to_type_static(data_type: u32, value: u8) -> Option<String> {
253        match data_type {
254            0x1FC | 0x1FD => match value {
255                1 => Some("mpeg4".to_string()),
256                2 => Some("h264".to_string()),
257                3 => Some("h265".to_string()),
258                _ => None,
259            },
260            0x1F9 => {
261                if value == 1 || value == 6 {
262                    Some("info".to_string())
263                } else {
264                    None
265                }
266            }
267            0x1FA => {
268                if value == 0xE {
269                    Some("g711a".to_string())
270                } else {
271                    None
272                }
273            }
274            0x1FE => {
275                if value == 0 {
276                    Some("jpeg".to_string())
277                } else {
278                    None
279                }
280            }
281            _ => None,
282        }
283    }
284
285    fn internal_to_datetime_static(value: u32) -> chrono::DateTime<chrono::Local> {
286        let second = value & 0x3F;
287        let minute = (value & 0xFC0) >> 6;
288        let hour = (value & 0x1F000) >> 12;
289        let day = (value & 0x3E0000) >> 17;
290        let month = (value & 0x3C00000) >> 22;
291        let year = ((value & 0xFC000000) >> 26) + 2000;
292
293        chrono::NaiveDate::from_ymd_opt(year as i32, month, day)
294            .and_then(|d| d.and_hms_opt(hour, minute, second))
295            .map(|dt| {
296                chrono::DateTime::from_naive_utc_and_offset(dt, *chrono::Local::now().offset())
297            })
298            .unwrap_or_else(chrono::Local::now)
299    }
300}