dvrip_rs/commands/
connection.rs1use crate::dvrip::DVRIPCam;
2use crate::error::Result;
3use async_trait::async_trait;
4use std::sync::atomic::Ordering;
5use tokio::io::AsyncWriteExt;
6use tokio::net::TcpStream;
7use tokio::time::Duration;
8
9#[async_trait]
10pub trait Connection: Send + Sync {
11 async fn connect(&mut self, timeout: tokio::time::Duration) -> Result<()>;
13
14 async fn close(&mut self) -> Result<()>;
16
17 fn is_connected(&self) -> bool;
19
20 fn ip(&self) -> &str;
22
23 fn port(&self) -> u16;
25}
26
27#[async_trait]
28impl Connection for DVRIPCam {
29 async fn connect(&mut self, timeout: Duration) -> Result<()> {
30 self.timeout = timeout;
31
32 let stream: TcpStream =
33 tokio::time::timeout(timeout, TcpStream::connect((self.ip.as_str(), self.port)))
34 .await
35 .map_err(|_| {
36 crate::error::DVRIPError::ConnectionError("Connection timeout".to_string())
37 })?
38 .map_err(|e| {
39 crate::error::DVRIPError::ConnectionError(format!("Connection error: {}", e))
40 })?;
41
42 *self.stream.lock().await = Some(stream);
43 self.connected.store(true, Ordering::Release);
44
45 Ok(())
46 }
47
48 async fn close(&mut self) -> Result<()> {
49 self.connected.store(false, Ordering::Release);
50 self.authenticated.store(false, Ordering::Release);
51 self.monitoring.store(false, Ordering::Release);
52 self.alarm_monitoring.store(false, Ordering::Release);
53
54 if let Some(handle) = self.keep_alive_handle.lock().await.take() {
56 handle.abort();
57 }
58 if let Some(handle) = self.alarm_handle.lock().await.take() {
59 handle.abort();
60 }
61
62 if let Some(mut stream) = self.stream.lock().await.take() {
63 let _ = stream.shutdown().await;
64 }
65
66 Ok(())
67 }
68
69 fn is_connected(&self) -> bool {
70 self.connected.load(Ordering::Acquire)
71 }
72
73 fn ip(&self) -> &str {
74 &self.ip
75 }
76
77 fn port(&self) -> u16 {
78 self.port
79 }
80}