ghpascon_rust/devices/generic/serial/
serial_device.rs1use std::sync::{
2 Arc,
3 atomic::{AtomicBool, Ordering},
4};
5
6use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
7use tokio::time::{Duration, sleep};
8
9use super::config::{ParamMap, SerialDeviceConfig};
10use super::transport::{SharedEventHandler, default_event_handler, dispatch_event};
11use super::types::SerialDeviceEvent;
12
13pub(crate) struct SerialDeviceShared {
14 pub is_connected: AtomicBool,
15 pub writer: tokio::sync::Mutex<Option<Box<dyn tokio::io::AsyncWrite + Send + Unpin>>>,
16 pub running: AtomicBool,
17}
18
19impl SerialDeviceShared {
20 pub fn new() -> Arc<Self> {
21 Arc::new(Self {
22 is_connected: AtomicBool::new(false),
23 writer: tokio::sync::Mutex::new(None),
24 running: AtomicBool::new(true),
25 })
26 }
27}
28
29pub struct SerialDevice {
33 pub config: SerialDeviceConfig,
34 pub on_event: SharedEventHandler,
35 pub(crate) shared: Arc<SerialDeviceShared>,
36}
37
38impl Clone for SerialDevice {
39 fn clone(&self) -> Self {
40 Self {
41 config: self.config.clone(),
42 on_event: Arc::clone(&self.on_event),
43 shared: Arc::clone(&self.shared),
44 }
45 }
46}
47
48impl Default for SerialDevice {
49 fn default() -> Self {
50 Self::new(SerialDeviceConfig::default())
51 }
52}
53
54impl SerialDevice {
55 pub fn new(config: SerialDeviceConfig) -> Self {
56 Self {
57 config,
58 on_event: default_event_handler(),
59 shared: SerialDeviceShared::new(),
60 }
61 }
62
63 pub fn from_map(data: ParamMap) -> Self {
64 Self::new(SerialDeviceConfig::from_map(data))
65 }
66
67 pub fn with_event_handler(mut self, handler: SharedEventHandler) -> Self {
68 self.on_event = handler;
69 self
70 }
71
72 pub fn set_event_handler(&mut self, handler: SharedEventHandler) {
73 self.on_event = handler;
74 }
75
76 pub fn is_connected(&self) -> bool {
77 self.shared.is_connected.load(Ordering::Relaxed)
78 }
79
80 pub fn to_map(&self) -> ParamMap {
81 self.config.to_map()
82 }
83
84 pub fn connect_instruction(&self) -> String {
85 format!(
86 "SERIAL {} @ {} (VID={:#06x}, PID={:#06x})",
87 self.config.port, self.config.baudrate, self.config.vid, self.config.pid
88 )
89 }
90
91 pub async fn connect(&self) {
92 self.shared.running.store(true, Ordering::Relaxed);
93 loop {
94 if !self.shared.running.load(Ordering::Relaxed) {
95 break;
96 }
97
98 let port_name = if self.config.port.to_uppercase() == "AUTO" {
99 match detect_serial_port(self.config.vid, self.config.pid) {
100 Some(p) => p,
101 None => {
102 eprintln!(
103 "[{}] No serial port found (VID={:#06x} PID={:#06x}), retrying in {}s",
104 self.config.name,
105 self.config.vid,
106 self.config.pid,
107 self.config.reconnection_time
108 );
109 sleep(Duration::from_secs(self.config.reconnection_time)).await;
110 continue;
111 }
112 }
113 } else {
114 self.config.port.clone()
115 };
116
117 let builder = tokio_serial::new(&port_name, self.config.baudrate);
118 match tokio_serial::SerialStream::open(&builder) {
119 Ok(stream) => {
120 let (read_half, write_half) = tokio::io::split(stream);
121 *self.shared.writer.lock().await =
122 Some(Box::new(write_half) as Box<dyn tokio::io::AsyncWrite + Send + Unpin>);
123 self.on_connected();
124
125 let recv_self = self.clone();
126 let recv_task = tokio::spawn(async move {
127 let mut buf_reader = BufReader::new(read_half);
128 let mut line = String::new();
129 loop {
130 if !recv_self.shared.is_connected.load(Ordering::Relaxed) {
131 break;
132 }
133 line.clear();
134 match buf_reader.read_line(&mut line).await {
135 Ok(0) => {
136 recv_self
137 .shared
138 .is_connected
139 .store(false, Ordering::Relaxed);
140 break;
141 }
142 Ok(_) => {
143 let trimmed = line.trim();
144 if !trimmed.is_empty() {
145 recv_self.on_receive(trimmed);
146 }
147 }
148 Err(_) => {
149 recv_self
150 .shared
151 .is_connected
152 .store(false, Ordering::Relaxed);
153 break;
154 }
155 }
156 }
157 });
158 recv_task.await.ok();
159 *self.shared.writer.lock().await = None;
160 self.on_disconnected();
161 }
162 Err(e) => {
163 eprintln!("[{}] Serial open error: {}", self.config.name, e);
164 }
165 }
166
167 if !self.shared.running.load(Ordering::Relaxed) {
168 break;
169 }
170 sleep(Duration::from_secs(self.config.reconnection_time)).await;
171 }
172 }
173
174 pub async fn close(&self) {
175 self.shared.running.store(false, Ordering::Relaxed);
176 self.shared.is_connected.store(false, Ordering::Relaxed);
177 *self.shared.writer.lock().await = None;
178 dispatch_event(
179 &self.on_event,
180 &self.config.name,
181 &SerialDeviceEvent::Connection(false),
182 );
183 }
184
185 pub async fn write(&self, data: &str) -> Result<(), String> {
186 let frame = format!("{}\n", data.trim()).into_bytes();
187 let mut guard = self.shared.writer.lock().await;
188 if let Some(writer) = guard.as_mut() {
189 writer
190 .write_all(&frame)
191 .await
192 .map_err(|e| format!("write error: {e}"))
193 } else {
194 Err("not connected".to_string())
195 }
196 }
197
198 pub fn on_receive(&self, data: &str) {
199 dispatch_event(
200 &self.on_event,
201 &self.config.name,
202 &SerialDeviceEvent::Data(data.to_string()),
203 );
204 }
205
206 fn on_connected(&self) {
207 self.shared.is_connected.store(true, Ordering::Relaxed);
208 dispatch_event(
209 &self.on_event,
210 &self.config.name,
211 &SerialDeviceEvent::Connection(true),
212 );
213 }
214
215 fn on_disconnected(&self) {
216 self.shared.is_connected.store(false, Ordering::Relaxed);
217 dispatch_event(
218 &self.on_event,
219 &self.config.name,
220 &SerialDeviceEvent::Connection(false),
221 );
222 }
223}
224
225fn detect_serial_port(vid: u16, pid: u16) -> Option<String> {
226 let ports = serialport::available_ports().ok()?;
227 for port in ports {
228 if let serialport::SerialPortType::UsbPort(info) = port.port_type {
229 if info.vid == vid && info.pid == pid {
230 return Some(port.port_name);
231 }
232 }
233 }
234 None
235}