ghpascon_rust/devices/generic/tcp/
tcp_device.rs1use std::sync::{
2 Arc,
3 atomic::{AtomicBool, Ordering},
4};
5
6use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
7use tokio::time::{Duration, sleep, timeout};
8
9use super::config::{ParamMap, TcpDeviceConfig};
10use super::transport::{SharedEventHandler, default_event_handler, dispatch_event};
11use super::types::TcpDeviceEvent;
12
13pub(crate) struct TcpDeviceShared {
14 pub is_connected: AtomicBool,
15 pub writer: tokio::sync::Mutex<Option<tokio::net::tcp::OwnedWriteHalf>>,
16 pub running: AtomicBool,
17}
18
19impl TcpDeviceShared {
20 pub fn new() -> Arc<Self> {
21 Arc::new(Self {
22 is_connected: AtomicBool::new(false),
23 writer: tokio::sync::Mutex::new(None),
24 running: AtomicBool::new(true),
25 })
26 }
27}
28
29pub struct TcpDevice {
33 pub config: TcpDeviceConfig,
34 pub on_event: SharedEventHandler,
35 pub(crate) shared: Arc<TcpDeviceShared>,
36}
37
38impl Clone for TcpDevice {
39 fn clone(&self) -> Self {
40 Self {
41 config: self.config.clone(),
42 on_event: Arc::clone(&self.on_event),
43 shared: Arc::clone(&self.shared),
44 }
45 }
46}
47
48impl Default for TcpDevice {
49 fn default() -> Self {
50 Self::new(TcpDeviceConfig::default())
51 }
52}
53
54impl TcpDevice {
55 pub fn new(config: TcpDeviceConfig) -> Self {
56 Self {
57 config,
58 on_event: default_event_handler(),
59 shared: TcpDeviceShared::new(),
60 }
61 }
62
63 pub fn from_map(data: ParamMap) -> Self {
64 Self::new(TcpDeviceConfig::from_map(data))
65 }
66
67 pub fn with_event_handler(mut self, handler: SharedEventHandler) -> Self {
68 self.on_event = handler;
69 self
70 }
71
72 pub fn set_event_handler(&mut self, handler: SharedEventHandler) {
73 self.on_event = handler;
74 }
75
76 pub fn is_connected(&self) -> bool {
77 self.shared.is_connected.load(Ordering::Relaxed)
78 }
79
80 pub fn to_map(&self) -> ParamMap {
81 self.config.to_map()
82 }
83
84 pub fn connect_instruction(&self) -> String {
85 format!("TCP {}:{}", self.config.ip, self.config.port)
86 }
87
88 pub async fn connect(&self) {
89 self.shared.running.store(true, Ordering::Relaxed);
90 loop {
91 if !self.shared.running.load(Ordering::Relaxed) {
92 break;
93 }
94
95 let addr = format!("{}:{}", self.config.ip, self.config.port);
96 match timeout(
97 Duration::from_secs(3),
98 tokio::net::TcpStream::connect(&addr),
99 )
100 .await
101 {
102 Ok(Ok(stream)) => {
103 let (read_half, write_half) = stream.into_split();
104 *self.shared.writer.lock().await = Some(write_half);
105 self.on_connected();
106
107 let recv_self = self.clone();
108 let recv_task = tokio::spawn(async move {
109 let mut buf_reader = BufReader::new(read_half);
110 let mut line = String::new();
111 loop {
112 if !recv_self.shared.is_connected.load(Ordering::Relaxed) {
113 break;
114 }
115 line.clear();
116 match buf_reader.read_line(&mut line).await {
117 Ok(0) => {
118 recv_self
119 .shared
120 .is_connected
121 .store(false, Ordering::Relaxed);
122 break;
123 }
124 Ok(_) => {
125 let trimmed = line.trim();
126 if !trimmed.is_empty() {
127 recv_self.on_receive(trimmed);
128 }
129 }
130 Err(_) => {
131 recv_self
132 .shared
133 .is_connected
134 .store(false, Ordering::Relaxed);
135 break;
136 }
137 }
138 }
139 });
140 recv_task.await.ok();
141 *self.shared.writer.lock().await = None;
142 self.on_disconnected();
143 }
144 _ => {
145 eprintln!(
146 "[{}] TCP connection failed to {}, retrying in {}s",
147 self.config.name, addr, self.config.reconnection_time
148 );
149 }
150 }
151
152 if !self.shared.running.load(Ordering::Relaxed) {
153 break;
154 }
155 sleep(Duration::from_secs(self.config.reconnection_time)).await;
156 }
157 }
158
159 pub async fn close(&self) {
160 self.shared.running.store(false, Ordering::Relaxed);
161 self.shared.is_connected.store(false, Ordering::Relaxed);
162 *self.shared.writer.lock().await = None;
163 dispatch_event(
164 &self.on_event,
165 &self.config.name,
166 &TcpDeviceEvent::Connection(false),
167 );
168 }
169
170 pub async fn write(&self, data: &str) -> Result<(), String> {
171 let frame = format!("{}\n", data.trim()).into_bytes();
172 let mut guard = self.shared.writer.lock().await;
173 if let Some(writer) = guard.as_mut() {
174 writer
175 .write_all(&frame)
176 .await
177 .map_err(|e| format!("write error: {e}"))
178 } else {
179 Err("not connected".to_string())
180 }
181 }
182
183 pub fn on_receive(&self, data: &str) {
184 dispatch_event(
185 &self.on_event,
186 &self.config.name,
187 &TcpDeviceEvent::Data(data.to_string()),
188 );
189 }
190
191 fn on_connected(&self) {
192 self.shared.is_connected.store(true, Ordering::Relaxed);
193 dispatch_event(
194 &self.on_event,
195 &self.config.name,
196 &TcpDeviceEvent::Connection(true),
197 );
198 }
199
200 fn on_disconnected(&self) {
201 self.shared.is_connected.store(false, Ordering::Relaxed);
202 dispatch_event(
203 &self.on_event,
204 &self.config.name,
205 &TcpDeviceEvent::Connection(false),
206 );
207 }
208}