ghpascon_rust/devices/printer/sato/
sato.rs1use std::collections::HashMap;
2use std::collections::VecDeque;
3use std::sync::{
4 Arc,
5 atomic::{AtomicBool, Ordering},
6};
7
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
9use tokio::time::{Duration, sleep, timeout};
10use uuid::Uuid;
11
12use super::config::{ParamMap, SatoConfig};
13use super::transport::{SharedEventHandler, default_event_handler, dispatch_event};
14use super::types::SatoEvent;
15
16pub(crate) struct SatoShared {
17 pub is_connected: AtomicBool,
18 pub running: AtomicBool,
19 pub writer: tokio::sync::Mutex<Option<tokio::net::tcp::OwnedWriteHalf>>,
20 pub print_queue: tokio::sync::Mutex<VecDeque<String>>,
21 pub status: tokio::sync::Mutex<String>,
22}
23
24impl SatoShared {
25 pub fn new() -> Arc<Self> {
26 Arc::new(Self {
27 is_connected: AtomicBool::new(false),
28 running: AtomicBool::new(true),
29 writer: tokio::sync::Mutex::new(None),
30 print_queue: tokio::sync::Mutex::new(VecDeque::new()),
31 status: tokio::sync::Mutex::new(String::new()),
32 })
33 }
34}
35
36pub struct SatoPrinter {
40 pub config: SatoConfig,
41 pub on_event: SharedEventHandler,
42 pub(crate) shared: Arc<SatoShared>,
43}
44
45impl Clone for SatoPrinter {
46 fn clone(&self) -> Self {
47 Self {
48 config: self.config.clone(),
49 on_event: Arc::clone(&self.on_event),
50 shared: Arc::clone(&self.shared),
51 }
52 }
53}
54
55impl Default for SatoPrinter {
56 fn default() -> Self {
57 Self::new(SatoConfig::default())
58 }
59}
60
61impl SatoPrinter {
62 pub fn new(config: SatoConfig) -> Self {
63 Self {
64 config,
65 on_event: default_event_handler(),
66 shared: SatoShared::new(),
67 }
68 }
69
70 pub fn from_map(data: HashMap<String, serde_json::Value>) -> Self {
71 Self::new(SatoConfig::from_map(data))
72 }
73
74 pub fn with_event_handler(mut self, handler: SharedEventHandler) -> Self {
75 self.on_event = handler;
76 self
77 }
78
79 pub fn set_event_handler(&mut self, handler: SharedEventHandler) {
80 self.on_event = handler;
81 }
82
83 pub fn is_connected(&self) -> bool {
84 self.shared.is_connected.load(Ordering::Relaxed)
85 }
86
87 pub fn can_print(&self) -> bool {
88 self.is_connected()
89 }
90
91 pub fn pending_print_jobs(&self) -> usize {
92 self.shared
93 .print_queue
94 .try_lock()
95 .map(|queue| queue.len())
96 .unwrap_or(0)
97 }
98
99 pub fn to_map(&self) -> ParamMap {
100 self.config.to_map()
101 }
102
103 pub fn connect_instruction(&self) -> String {
104 format!("TCP {}:{}", self.config.ip, self.config.port)
105 }
106
107 pub async fn connect(&self) {
108 self.shared.running.store(true, Ordering::Relaxed);
109 loop {
110 if !self.shared.running.load(Ordering::Relaxed) {
111 break;
112 }
113
114 let addr = format!("{}:{}", self.config.ip, self.config.port);
115 match timeout(
116 Duration::from_secs(3),
117 tokio::net::TcpStream::connect(&addr),
118 )
119 .await
120 {
121 Ok(Ok(stream)) => {
122 let (read_half, write_half) = stream.into_split();
123 *self.shared.writer.lock().await = Some(write_half);
124 self.on_connected();
125
126 let recv_self = self.clone();
127 let recv_task = tokio::spawn(async move {
128 let mut buf_reader = BufReader::new(read_half);
129 let mut line = String::new();
130 loop {
131 if !recv_self.shared.is_connected.load(Ordering::Relaxed) {
132 break;
133 }
134 line.clear();
135 match buf_reader.read_line(&mut line).await {
136 Ok(0) => {
137 recv_self
138 .shared
139 .is_connected
140 .store(false, Ordering::Relaxed);
141 break;
142 }
143 Ok(_) => {
144 let trimmed = line.trim().to_string();
145 if !trimmed.is_empty() {
146 *recv_self.shared.status.lock().await = trimmed.clone();
147 dispatch_event(
148 &recv_self.on_event,
149 &recv_self.config.name,
150 &SatoEvent::Status(trimmed),
151 );
152 }
153 }
154 Err(_) => {
155 recv_self
156 .shared
157 .is_connected
158 .store(false, Ordering::Relaxed);
159 break;
160 }
161 }
162 }
163 });
164 recv_task.await.ok();
165 *self.shared.writer.lock().await = None;
166 self.on_disconnected();
167 }
168 _ => {
169 eprintln!(
170 "[{}] TCP connection failed to {}, retrying in {}s",
171 self.config.name, addr, self.config.reconnection_time
172 );
173 }
174 }
175
176 if !self.shared.running.load(Ordering::Relaxed) {
177 break;
178 }
179 sleep(Duration::from_secs(self.config.reconnection_time)).await;
180 }
181 }
182
183 pub async fn close(&self) {
184 self.shared.running.store(false, Ordering::Relaxed);
185 self.shared.is_connected.store(false, Ordering::Relaxed);
186 *self.shared.writer.lock().await = None;
187 dispatch_event(
188 &self.on_event,
189 &self.config.name,
190 &SatoEvent::Connection(false),
191 );
192 }
193
194 pub async fn print(&self, zpl: &str) -> Result<String, String> {
195 if !self.is_connected() {
196 return Err("not connected".to_string());
197 }
198 let print_id = Uuid::new_v4().to_string();
199 let data = zpl.as_bytes().to_vec();
200 let mut guard = self.shared.writer.lock().await;
201 if let Some(writer) = guard.as_mut() {
202 writer
203 .write_all(&data)
204 .await
205 .map_err(|e| format!("print error: {e}"))?;
206 } else {
207 return Err("not connected".to_string());
208 }
209 drop(guard);
210 dispatch_event(
211 &self.on_event,
212 &self.config.name,
213 &SatoEvent::PrintSent(print_id.clone()),
214 );
215 Ok(print_id)
216 }
217
218 pub async fn add_to_print_queue(&self, labels: Vec<String>) {
219 {
220 let mut queue = self.shared.print_queue.lock().await;
221 for label in labels {
222 queue.push_back(label);
223 }
224 }
225 if self.is_connected() {
226 self.process_queue().await;
227 }
228 }
229
230 pub async fn process_queue(&self) {
231 let mut queue = self.shared.print_queue.lock().await;
232 let items: Vec<String> = queue.drain(..).collect();
233 drop(queue);
234 for label in items {
235 self.print(&label).await.ok();
236 }
237 }
238
239 fn on_connected(&self) {
240 self.shared.is_connected.store(true, Ordering::Relaxed);
241 dispatch_event(
242 &self.on_event,
243 &self.config.name,
244 &SatoEvent::Connection(true),
245 );
246 if self.pending_print_jobs() > 0 {
247 let printer = self.clone();
248 tokio::spawn(async move {
249 printer.process_queue().await;
250 });
251 }
252 }
253
254 fn on_disconnected(&self) {
255 self.shared.is_connected.store(false, Ordering::Relaxed);
256 dispatch_event(
257 &self.on_event,
258 &self.config.name,
259 &SatoEvent::Connection(false),
260 );
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267
268 #[tokio::test]
269 async fn queued_labels_are_kept_until_connected() {
270 let printer = SatoPrinter::default();
271 assert!(!printer.can_print());
272
273 printer
274 .add_to_print_queue(vec!["^XA^XZ".to_string(), "^XA^FO50,50^XZ".to_string()])
275 .await;
276
277 assert_eq!(printer.pending_print_jobs(), 2);
278 }
279}