1use crate::commands::*;
2use crate::constants::{OK_CODES, QCODES, TCP_PORT};
3use crate::error::{DVRIPError, Result};
4use crate::protocol::{receive_data, receive_json, receive_packet_header, send_packet};
5use serde_json::{Value, json};
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
8use tokio::io::AsyncWriteExt;
9use tokio::net::TcpStream;
10use tokio::sync::Mutex;
11use tokio::time::Duration;
12
13pub struct DVRIPCam {
14 pub(crate) ip: String,
15 pub(crate) port: u16,
16 pub(crate) timeout: Duration,
17
18 pub(crate) username: Option<String>,
19
20 pub(crate) connected: Arc<AtomicBool>,
22 pub(crate) authenticated: Arc<AtomicBool>,
23 pub(crate) monitoring: Arc<AtomicBool>,
24 pub(crate) alarm_monitoring: Arc<AtomicBool>,
25
26 pub(crate) session: Arc<AtomicU32>,
28 pub(crate) packet_count: Arc<AtomicU32>,
29
30 pub(crate) stream: Arc<Mutex<Option<TcpStream>>>,
32
33 pub(crate) alarm_callback: Arc<Mutex<Option<AlarmCallback>>>,
35
36 pub(crate) keep_alive_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
38 pub(crate) alarm_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
39
40 pub(crate) alive_time: Arc<AtomicU64>,
42}
43
44impl DVRIPCam {
45 pub fn new(ip: impl Into<String>) -> Self {
46 let ip = ip.into();
47 Self {
48 ip,
49 username: None,
50 port: TCP_PORT,
51 timeout: Duration::from_secs(10),
52 connected: Arc::new(AtomicBool::new(false)),
53 authenticated: Arc::new(AtomicBool::new(false)),
54 monitoring: Arc::new(AtomicBool::new(false)),
55 alarm_monitoring: Arc::new(AtomicBool::new(false)),
56 session: Arc::new(AtomicU32::new(0)),
57 packet_count: Arc::new(AtomicU32::new(1)),
58 stream: Arc::new(Mutex::new(None)),
59 alarm_callback: Arc::new(Mutex::new(None)),
60 keep_alive_handle: Arc::new(Mutex::new(None)),
61 alarm_handle: Arc::new(Mutex::new(None)),
62 alive_time: Arc::new(AtomicU64::new(20)),
63 }
64 }
65
66 pub fn with_port(mut self, port: u16) -> Self {
67 self.port = port;
68 self
69 }
70
71 pub fn with_timeout(mut self, timeout: Duration) -> Self {
72 self.timeout = timeout;
73 self
74 }
75
76 pub(crate) async fn send_command_recv_bin(
77 &self,
78 msg_id: u16,
79 data: Value,
80 wait_response: bool,
81 ) -> Result<Option<Vec<u8>>> {
82 if !self.connected.load(Ordering::Acquire) {
83 return Err(DVRIPError::ConnectionError("Not connected".to_string()));
84 }
85
86 let mut stream_guard = self.stream.lock().await;
87 let stream = stream_guard
88 .as_mut()
89 .ok_or_else(|| DVRIPError::ConnectionError("Stream not available".to_string()))?;
90
91 let (mut reader, mut writer) = tokio::io::split(stream);
94
95 let session = self.session.load(Ordering::Acquire);
96 let packet_count = self.packet_count.fetch_add(1, Ordering::SeqCst);
97
98 let data_bytes = serde_json::to_string(&data)
99 .map_err(|e| DVRIPError::SerializationError(e.to_string()))?
100 .into_bytes();
101
102 send_packet(&mut writer, session, packet_count, msg_id, &data_bytes, 0).await?;
103 writer.flush().await?; if !wait_response {
106 return Ok(None);
107 }
108
109 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
112
113 let header = match receive_packet_header(&mut reader).await {
114 Ok(h) => h,
115 Err(e) => {
116 self.connected.store(false, Ordering::Release);
118 return Err(e);
119 }
120 };
121 self.session.store(header.session, Ordering::Release);
122
123 let timeout = self.timeout;
124 let reply = match receive_data(&mut reader, header.data_len as usize, timeout).await {
125 Ok(r) => r,
126 Err(e) => {
127 self.connected.store(false, Ordering::Release);
129 return Err(e);
130 }
131 };
132
133 Ok(Some(reply))
134 }
135
136 pub(crate) async fn send_command(
137 &self,
138 msg_id: u16,
139 data: Value,
140 wait_response: bool,
141 ) -> Result<Option<Value>> {
142 let Some(data) = self
143 .send_command_recv_bin(msg_id, data, wait_response)
144 .await?
145 .map(|x| serde_json::from_slice(&x[..x.len() - 2]))
146 else {
147 return Ok(None);
148 };
149 data.map_err(|_| DVRIPError::SerializationError("Failed to parse JSON Header".to_owned()))
150 }
151
152 pub(crate) async fn get_command(&self, command: &str, code: Option<u32>) -> Result<Value> {
153 let msg_id =
154 code.unwrap_or_else(|| QCODES.get(command).copied().unwrap_or(0).into()) as u16;
155
156 let session = self.session.load(Ordering::Acquire);
157 let data = json!({
158 "Name": command,
159 "SessionID": format!("0x{:08X}", session)
160 });
161
162 let reply = self
163 .send_command(msg_id, data, true)
164 .await?
165 .ok_or_else(|| DVRIPError::ProtocolError("Empty response".to_string()))?;
166
167 if let Some(ret) = reply.get("Ret")
168 && let Some(ret_code) = ret.as_u64()
169 && OK_CODES.contains(&(ret_code as u32))
170 && let Some(cmd_data) = reply.get(command)
171 {
172 return Ok(cmd_data.clone());
173 }
174
175 Ok(reply)
176 }
177
178 pub(crate) async fn set_command(
179 &self,
180 command: &str,
181 data: Value,
182 code: Option<u32>,
183 ) -> Result<Value> {
184 let msg_id =
185 code.unwrap_or_else(|| QCODES.get(command).copied().unwrap_or(0) as u32) as u16;
186
187 let session = self.session.load(Ordering::Acquire);
188 let mut cmd_data = json!({
189 "Name": command,
190 "SessionID": format!("0x{:08X}", session),
191 });
192 cmd_data[command] = data;
193
194 let reply = self
195 .send_command(msg_id, cmd_data, true)
196 .await?
197 .ok_or_else(|| DVRIPError::ProtocolError("Empty response".to_string()))?;
198
199 Ok(reply)
200 }
201
202 pub(crate) async fn start_keep_alive(&self) {
203 let session = self.session.clone();
204 let alive_time = self.alive_time.clone();
205 let stream = self.stream.clone();
206 let connected = self.connected.clone();
207 let _ = self.timeout;
208 let keep_alive_code = QCODES.get("KeepAlive").copied().unwrap_or(1006);
209
210 let handle = tokio::spawn(async move {
211 loop {
212 if !connected.load(Ordering::Acquire) {
213 break;
214 }
215
216 let interval = Duration::from_secs(alive_time.load(Ordering::Acquire));
217 tokio::time::sleep(interval).await;
218
219 let mut stream_guard = stream.lock().await;
220 if let Some(s) = stream_guard.as_mut() {
221 let (_, mut writer) = s.split();
222 let session_id = session.load(Ordering::Acquire);
223 let packet_count = 0u32; let data = json!({
226 "Name": "KeepAlive",
227 "SessionID": format!("0x{:08X}", session_id)
228 });
229
230 if let Ok(data_bytes) = serde_json::to_string(&data) {
231 if send_packet(
233 &mut writer,
234 session_id,
235 packet_count,
236 keep_alive_code,
237 data_bytes.as_bytes(),
238 0,
239 )
240 .await
241 .is_err()
242 {
243 connected.store(false, Ordering::Release);
244 break;
245 }
246 if writer.flush().await.is_err() {
248 connected.store(false, Ordering::Release);
249 break;
250 }
251 }
252 } else {
253 connected.store(false, Ordering::Release);
254 break;
255 }
256 }
257 });
258
259 *self.keep_alive_handle.lock().await = Some(handle);
260 }
261
262 pub(crate) async fn start_alarm_worker(&self) {
263 let stream = self.stream.clone();
264 let session = self.session.clone();
265 let packet_count = self.packet_count.clone();
266 let alarm_callback = self.alarm_callback.clone();
267 let alarm_monitoring = self.alarm_monitoring.clone();
268 let connected = self.connected.clone();
269 let timeout = self.timeout;
270 let alarm_info_code = QCODES.get("AlarmInfo").copied().unwrap_or(1504);
271
272 let handle = tokio::spawn(async move {
273 while alarm_monitoring.load(Ordering::Acquire) && connected.load(Ordering::Acquire) {
274 let mut stream_guard = stream.lock().await;
275 if let Some(s) = stream_guard.as_mut() {
276 let (mut reader, _) = s.split();
277
278 match receive_packet_header(&mut reader).await {
279 Ok(header) => {
280 if header.msg_id == alarm_info_code
281 && header.session == session.load(Ordering::Acquire)
282 {
283 match receive_json(&mut reader, header.data_len as usize, timeout)
284 .await
285 {
286 Ok(reply) => {
287 packet_count.fetch_add(1, Ordering::SeqCst);
288 let callback_guard = alarm_callback.lock().await;
289 if let Some(ref callback) = *callback_guard
290 && let Some(name) =
291 reply.get("Name").and_then(|n| n.as_str())
292 && let Some(alarm_data) = reply.get(name)
293 {
294 callback(alarm_data.clone(), header.packet_count);
295 }
296 }
297 Err(e) => {
298 match &e {
300 DVRIPError::ConnectionError(_)
301 | DVRIPError::IoError(_) => {
302 connected.store(false, Ordering::Release);
303 break;
304 }
305 _ => {}
306 }
307 }
308 }
309 }
310 }
311 Err(e) => {
312 match &e {
314 DVRIPError::ConnectionError(_) | DVRIPError::IoError(_) => {
315 connected.store(false, Ordering::Release);
316 break;
317 }
318 _ => {
319 tokio::time::sleep(Duration::from_millis(100)).await;
320 }
321 }
322 }
323 }
324 } else {
325 break;
326 }
327 }
328 });
329
330 *self.alarm_handle.lock().await = Some(handle);
331 }
332}