Skip to main content

dvrip_rs/
dvrip.rs

1use crate::commands::*;
2use crate::constants::{OK_CODES, QCODES, TCP_PORT};
3use crate::error::{DVRIPError, Result};
4use crate::protocol::{receive_data, receive_json, receive_packet_header, send_packet};
5use serde_json::{Value, json};
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
8use tokio::io::AsyncWriteExt;
9use tokio::net::TcpStream;
10use tokio::sync::Mutex;
11use tokio::time::Duration;
12
13pub struct DVRIPCam {
14    pub(crate) ip: String,
15    pub(crate) port: u16,
16    pub(crate) timeout: Duration,
17
18    pub(crate) username: Option<String>,
19
20    // Atomic state
21    pub(crate) connected: Arc<AtomicBool>,
22    pub(crate) authenticated: Arc<AtomicBool>,
23    pub(crate) monitoring: Arc<AtomicBool>,
24    pub(crate) alarm_monitoring: Arc<AtomicBool>,
25
26    // Atomic counters
27    pub(crate) session: Arc<AtomicU32>,
28    pub(crate) packet_count: Arc<AtomicU32>,
29
30    // Connection
31    pub(crate) stream: Arc<Mutex<Option<TcpStream>>>,
32
33    // Callbacks
34    pub(crate) alarm_callback: Arc<Mutex<Option<AlarmCallback>>>,
35
36    // Background tasks
37    pub(crate) keep_alive_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
38    pub(crate) alarm_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
39
40    // Configuration
41    pub(crate) alive_time: Arc<AtomicU64>,
42}
43
44impl DVRIPCam {
45    pub fn new(ip: impl Into<String>) -> Self {
46        let ip = ip.into();
47        Self {
48            ip,
49            username: None,
50            port: TCP_PORT,
51            timeout: Duration::from_secs(10),
52            connected: Arc::new(AtomicBool::new(false)),
53            authenticated: Arc::new(AtomicBool::new(false)),
54            monitoring: Arc::new(AtomicBool::new(false)),
55            alarm_monitoring: Arc::new(AtomicBool::new(false)),
56            session: Arc::new(AtomicU32::new(0)),
57            packet_count: Arc::new(AtomicU32::new(1)),
58            stream: Arc::new(Mutex::new(None)),
59            alarm_callback: Arc::new(Mutex::new(None)),
60            keep_alive_handle: Arc::new(Mutex::new(None)),
61            alarm_handle: Arc::new(Mutex::new(None)),
62            alive_time: Arc::new(AtomicU64::new(20)),
63        }
64    }
65
66    pub fn with_port(mut self, port: u16) -> Self {
67        self.port = port;
68        self
69    }
70
71    pub fn with_timeout(mut self, timeout: Duration) -> Self {
72        self.timeout = timeout;
73        self
74    }
75
76    pub(crate) async fn send_command_recv_bin(
77        &self,
78        msg_id: u16,
79        data: Value,
80        wait_response: bool,
81    ) -> Result<Option<Vec<u8>>> {
82        if !self.connected.load(Ordering::Acquire) {
83            return Err(DVRIPError::ConnectionError("Not connected".to_string()));
84        }
85
86        let mut stream_guard = self.stream.lock().await;
87        let stream = stream_guard
88            .as_mut()
89            .ok_or_else(|| DVRIPError::ConnectionError("Stream not available".to_string()))?;
90
91        // Use split to read and write simultaneously
92        // Note: split() consumes the stream, but returns reader and writer that can be used
93        let (mut reader, mut writer) = tokio::io::split(stream);
94
95        let session = self.session.load(Ordering::Acquire);
96        let packet_count = self.packet_count.fetch_add(1, Ordering::SeqCst);
97
98        let data_bytes = serde_json::to_string(&data)
99            .map_err(|e| DVRIPError::SerializationError(e.to_string()))?
100            .into_bytes();
101
102        send_packet(&mut writer, session, packet_count, msg_id, &data_bytes, 0).await?;
103        writer.flush().await?; // Ensure data was sent
104
105        if !wait_response {
106            return Ok(None);
107        }
108
109        // Small delay to ensure the server processed the request
110        // Similar to sleep(0.1) in Python code
111        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
112
113        let header = match receive_packet_header(&mut reader).await {
114            Ok(h) => h,
115            Err(e) => {
116                // If reading header fails, connection may have been closed
117                self.connected.store(false, Ordering::Release);
118                return Err(e);
119            }
120        };
121        self.session.store(header.session, Ordering::Release);
122
123        let timeout = self.timeout;
124        let reply = match receive_data(&mut reader, header.data_len as usize, timeout).await {
125            Ok(r) => r,
126            Err(e) => {
127                // If reading data fails, connection may have been closed
128                self.connected.store(false, Ordering::Release);
129                return Err(e);
130            }
131        };
132
133        Ok(Some(reply))
134    }
135
136    pub(crate) async fn send_command(
137        &self,
138        msg_id: u16,
139        data: Value,
140        wait_response: bool,
141    ) -> Result<Option<Value>> {
142        let Some(data) = self
143            .send_command_recv_bin(msg_id, data, wait_response)
144            .await?
145            .map(|x| serde_json::from_slice(&x[..x.len() - 2]))
146        else {
147            return Ok(None);
148        };
149        data.map_err(|_| DVRIPError::SerializationError("Failed to parse JSON Header".to_owned()))
150    }
151
152    pub(crate) async fn get_command(&self, command: &str, code: Option<u32>) -> Result<Value> {
153        let msg_id =
154            code.unwrap_or_else(|| QCODES.get(command).copied().unwrap_or(0).into()) as u16;
155
156        let session = self.session.load(Ordering::Acquire);
157        let data = json!({
158            "Name": command,
159            "SessionID": format!("0x{:08X}", session)
160        });
161
162        let reply = self
163            .send_command(msg_id, data, true)
164            .await?
165            .ok_or_else(|| DVRIPError::ProtocolError("Empty response".to_string()))?;
166
167        if let Some(ret) = reply.get("Ret")
168            && let Some(ret_code) = ret.as_u64()
169            && OK_CODES.contains(&(ret_code as u32))
170            && let Some(cmd_data) = reply.get(command)
171        {
172            return Ok(cmd_data.clone());
173        }
174
175        Ok(reply)
176    }
177
178    pub(crate) async fn set_command(
179        &self,
180        command: &str,
181        data: Value,
182        code: Option<u32>,
183    ) -> Result<Value> {
184        let msg_id =
185            code.unwrap_or_else(|| QCODES.get(command).copied().unwrap_or(0) as u32) as u16;
186
187        let session = self.session.load(Ordering::Acquire);
188        let mut cmd_data = json!({
189            "Name": command,
190            "SessionID": format!("0x{:08X}", session),
191        });
192        cmd_data[command] = data;
193
194        let reply = self
195            .send_command(msg_id, cmd_data, true)
196            .await?
197            .ok_or_else(|| DVRIPError::ProtocolError("Empty response".to_string()))?;
198
199        Ok(reply)
200    }
201
202    pub(crate) async fn start_keep_alive(&self) {
203        let session = self.session.clone();
204        let alive_time = self.alive_time.clone();
205        let stream = self.stream.clone();
206        let connected = self.connected.clone();
207        let _ = self.timeout;
208        let keep_alive_code = QCODES.get("KeepAlive").copied().unwrap_or(1006);
209
210        let handle = tokio::spawn(async move {
211            loop {
212                if !connected.load(Ordering::Acquire) {
213                    break;
214                }
215
216                let interval = Duration::from_secs(alive_time.load(Ordering::Acquire));
217                tokio::time::sleep(interval).await;
218
219                let mut stream_guard = stream.lock().await;
220                if let Some(s) = stream_guard.as_mut() {
221                    let (_, mut writer) = s.split();
222                    let session_id = session.load(Ordering::Acquire);
223                    let packet_count = 0u32; // Keep alive can use fixed counter
224
225                    let data = json!({
226                        "Name": "KeepAlive",
227                        "SessionID": format!("0x{:08X}", session_id)
228                    });
229
230                    if let Ok(data_bytes) = serde_json::to_string(&data) {
231                        // We don't wait for keep-alive response, just send
232                        if send_packet(
233                            &mut writer,
234                            session_id,
235                            packet_count,
236                            keep_alive_code,
237                            data_bytes.as_bytes(),
238                            0,
239                        )
240                        .await
241                        .is_err()
242                        {
243                            connected.store(false, Ordering::Release);
244                            break;
245                        }
246                        // Flush to ensure data was sent
247                        if writer.flush().await.is_err() {
248                            connected.store(false, Ordering::Release);
249                            break;
250                        }
251                    }
252                } else {
253                    connected.store(false, Ordering::Release);
254                    break;
255                }
256            }
257        });
258
259        *self.keep_alive_handle.lock().await = Some(handle);
260    }
261
262    pub(crate) async fn start_alarm_worker(&self) {
263        let stream = self.stream.clone();
264        let session = self.session.clone();
265        let packet_count = self.packet_count.clone();
266        let alarm_callback = self.alarm_callback.clone();
267        let alarm_monitoring = self.alarm_monitoring.clone();
268        let connected = self.connected.clone();
269        let timeout = self.timeout;
270        let alarm_info_code = QCODES.get("AlarmInfo").copied().unwrap_or(1504);
271
272        let handle = tokio::spawn(async move {
273            while alarm_monitoring.load(Ordering::Acquire) && connected.load(Ordering::Acquire) {
274                let mut stream_guard = stream.lock().await;
275                if let Some(s) = stream_guard.as_mut() {
276                    let (mut reader, _) = s.split();
277
278                    match receive_packet_header(&mut reader).await {
279                        Ok(header) => {
280                            if header.msg_id == alarm_info_code
281                                && header.session == session.load(Ordering::Acquire)
282                            {
283                                match receive_json(&mut reader, header.data_len as usize, timeout)
284                                    .await
285                                {
286                                    Ok(reply) => {
287                                        packet_count.fetch_add(1, Ordering::SeqCst);
288                                        let callback_guard = alarm_callback.lock().await;
289                                        if let Some(ref callback) = *callback_guard
290                                            && let Some(name) =
291                                                reply.get("Name").and_then(|n| n.as_str())
292                                            && let Some(alarm_data) = reply.get(name)
293                                        {
294                                            callback(alarm_data.clone(), header.packet_count);
295                                        }
296                                    }
297                                    Err(e) => {
298                                        // If there's an error reading JSON, connection may have been closed
299                                        match &e {
300                                            DVRIPError::ConnectionError(_)
301                                            | DVRIPError::IoError(_) => {
302                                                connected.store(false, Ordering::Release);
303                                                break;
304                                            }
305                                            _ => {}
306                                        }
307                                    }
308                                }
309                            }
310                        }
311                        Err(e) => {
312                            // If there's an error reading header, connection may have been closed
313                            match &e {
314                                DVRIPError::ConnectionError(_) | DVRIPError::IoError(_) => {
315                                    connected.store(false, Ordering::Release);
316                                    break;
317                                }
318                                _ => {
319                                    tokio::time::sleep(Duration::from_millis(100)).await;
320                                }
321                            }
322                        }
323                    }
324                } else {
325                    break;
326                }
327            }
328        });
329
330        *self.alarm_handle.lock().await = Some(handle);
331    }
332}