Skip to main content

ghpascon_rust/devices/generic/tcp/
tcp_device.rs

1use std::sync::{
2    Arc,
3    atomic::{AtomicBool, Ordering},
4};
5
6use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
7use tokio::time::{Duration, sleep, timeout};
8
9use super::config::{ParamMap, TcpDeviceConfig};
10use super::transport::{SharedEventHandler, default_event_handler, dispatch_event};
11use super::types::TcpDeviceEvent;
12
13pub(crate) struct TcpDeviceShared {
14    pub is_connected: AtomicBool,
15    pub writer: tokio::sync::Mutex<Option<tokio::net::tcp::OwnedWriteHalf>>,
16    pub running: AtomicBool,
17}
18
19impl TcpDeviceShared {
20    pub fn new() -> Arc<Self> {
21        Arc::new(Self {
22            is_connected: AtomicBool::new(false),
23            writer: tokio::sync::Mutex::new(None),
24            running: AtomicBool::new(true),
25        })
26    }
27}
28
29/// Generic TCP device.
30///
31/// `clone()` is cheap – all runtime state is behind an `Arc`.
32pub struct TcpDevice {
33    pub config: TcpDeviceConfig,
34    pub on_event: SharedEventHandler,
35    pub(crate) shared: Arc<TcpDeviceShared>,
36}
37
38impl Clone for TcpDevice {
39    fn clone(&self) -> Self {
40        Self {
41            config: self.config.clone(),
42            on_event: Arc::clone(&self.on_event),
43            shared: Arc::clone(&self.shared),
44        }
45    }
46}
47
48impl Default for TcpDevice {
49    fn default() -> Self {
50        Self::new(TcpDeviceConfig::default())
51    }
52}
53
54impl TcpDevice {
55    pub fn new(config: TcpDeviceConfig) -> Self {
56        Self {
57            config,
58            on_event: default_event_handler(),
59            shared: TcpDeviceShared::new(),
60        }
61    }
62
63    pub fn from_map(data: ParamMap) -> Self {
64        Self::new(TcpDeviceConfig::from_map(data))
65    }
66
67    pub fn with_event_handler(mut self, handler: SharedEventHandler) -> Self {
68        self.on_event = handler;
69        self
70    }
71
72    pub fn set_event_handler(&mut self, handler: SharedEventHandler) {
73        self.on_event = handler;
74    }
75
76    pub fn is_connected(&self) -> bool {
77        self.shared.is_connected.load(Ordering::Relaxed)
78    }
79
80    pub fn to_map(&self) -> ParamMap {
81        self.config.to_map()
82    }
83
84    pub fn connect_instruction(&self) -> String {
85        format!("TCP {}:{}", self.config.ip, self.config.port)
86    }
87
88    pub async fn connect(&self) {
89        self.shared.running.store(true, Ordering::Relaxed);
90        loop {
91            if !self.shared.running.load(Ordering::Relaxed) {
92                break;
93            }
94
95            let addr = format!("{}:{}", self.config.ip, self.config.port);
96            match timeout(
97                Duration::from_secs(3),
98                tokio::net::TcpStream::connect(&addr),
99            )
100            .await
101            {
102                Ok(Ok(stream)) => {
103                    let (read_half, write_half) = stream.into_split();
104                    *self.shared.writer.lock().await = Some(write_half);
105                    self.on_connected();
106
107                    let recv_self = self.clone();
108                    let recv_task = tokio::spawn(async move {
109                        let mut buf_reader = BufReader::new(read_half);
110                        let mut line = String::new();
111                        loop {
112                            if !recv_self.shared.is_connected.load(Ordering::Relaxed) {
113                                break;
114                            }
115                            line.clear();
116                            match buf_reader.read_line(&mut line).await {
117                                Ok(0) => {
118                                    recv_self
119                                        .shared
120                                        .is_connected
121                                        .store(false, Ordering::Relaxed);
122                                    break;
123                                }
124                                Ok(_) => {
125                                    let trimmed = line.trim();
126                                    if !trimmed.is_empty() {
127                                        recv_self.on_receive(trimmed);
128                                    }
129                                }
130                                Err(_) => {
131                                    recv_self
132                                        .shared
133                                        .is_connected
134                                        .store(false, Ordering::Relaxed);
135                                    break;
136                                }
137                            }
138                        }
139                    });
140                    recv_task.await.ok();
141                    *self.shared.writer.lock().await = None;
142                    self.on_disconnected();
143                }
144                _ => {
145                    eprintln!(
146                        "[{}] TCP connection failed to {}, retrying in {}s",
147                        self.config.name, addr, self.config.reconnection_time
148                    );
149                }
150            }
151
152            if !self.shared.running.load(Ordering::Relaxed) {
153                break;
154            }
155            sleep(Duration::from_secs(self.config.reconnection_time)).await;
156        }
157    }
158
159    pub async fn close(&self) {
160        self.shared.running.store(false, Ordering::Relaxed);
161        self.shared.is_connected.store(false, Ordering::Relaxed);
162        *self.shared.writer.lock().await = None;
163        dispatch_event(
164            &self.on_event,
165            &self.config.name,
166            &TcpDeviceEvent::Connection(false),
167        );
168    }
169
170    pub async fn write(&self, data: &str) -> Result<(), String> {
171        let frame = format!("{}\n", data.trim()).into_bytes();
172        let mut guard = self.shared.writer.lock().await;
173        if let Some(writer) = guard.as_mut() {
174            writer
175                .write_all(&frame)
176                .await
177                .map_err(|e| format!("write error: {e}"))
178        } else {
179            Err("not connected".to_string())
180        }
181    }
182
183    pub fn on_receive(&self, data: &str) {
184        dispatch_event(
185            &self.on_event,
186            &self.config.name,
187            &TcpDeviceEvent::Data(data.to_string()),
188        );
189    }
190
191    fn on_connected(&self) {
192        self.shared.is_connected.store(true, Ordering::Relaxed);
193        dispatch_event(
194            &self.on_event,
195            &self.config.name,
196            &TcpDeviceEvent::Connection(true),
197        );
198    }
199
200    fn on_disconnected(&self) {
201        self.shared.is_connected.store(false, Ordering::Relaxed);
202        dispatch_event(
203            &self.on_event,
204            &self.config.name,
205            &TcpDeviceEvent::Connection(false),
206        );
207    }
208}