Skip to main content

dvrip_rs/commands/
connection.rs

1use crate::dvrip::DVRIPCam;
2use crate::error::Result;
3use async_trait::async_trait;
4use std::sync::atomic::Ordering;
5use tokio::io::AsyncWriteExt;
6use tokio::net::TcpStream;
7use tokio::time::Duration;
8
9#[async_trait]
10pub trait Connection: Send + Sync {
11    /// Connect to the device
12    async fn connect(&mut self, timeout: tokio::time::Duration) -> Result<()>;
13
14    /// Disconnect from the device
15    async fn close(&mut self) -> Result<()>;
16
17    /// Check if connected
18    fn is_connected(&self) -> bool;
19
20    /// Get the device IP address
21    fn ip(&self) -> &str;
22
23    /// Get the device port
24    fn port(&self) -> u16;
25}
26
27#[async_trait]
28impl Connection for DVRIPCam {
29    async fn connect(&mut self, timeout: Duration) -> Result<()> {
30        self.timeout = timeout;
31
32        let stream: TcpStream =
33            tokio::time::timeout(timeout, TcpStream::connect((self.ip.as_str(), self.port)))
34                .await
35                .map_err(|_| {
36                    crate::error::DVRIPError::ConnectionError("Connection timeout".to_string())
37                })?
38                .map_err(|e| {
39                    crate::error::DVRIPError::ConnectionError(format!("Connection error: {}", e))
40                })?;
41
42        *self.stream.lock().await = Some(stream);
43        self.connected.store(true, Ordering::Release);
44
45        Ok(())
46    }
47
48    async fn close(&mut self) -> Result<()> {
49        self.connected.store(false, Ordering::Release);
50        self.authenticated.store(false, Ordering::Release);
51        self.monitoring.store(false, Ordering::Release);
52        self.alarm_monitoring.store(false, Ordering::Release);
53
54        // Cancel background tasks
55        if let Some(handle) = self.keep_alive_handle.lock().await.take() {
56            handle.abort();
57        }
58        if let Some(handle) = self.alarm_handle.lock().await.take() {
59            handle.abort();
60        }
61
62        if let Some(mut stream) = self.stream.lock().await.take() {
63            let _ = stream.shutdown().await;
64        }
65
66        Ok(())
67    }
68
69    fn is_connected(&self) -> bool {
70        self.connected.load(Ordering::Acquire)
71    }
72
73    fn ip(&self) -> &str {
74        &self.ip
75    }
76
77    fn port(&self) -> u16 {
78        self.port
79    }
80}