Skip to main content

ghpascon_rust/devices/printer/sato/
sato.rs

1use std::collections::HashMap;
2use std::collections::VecDeque;
3use std::sync::{
4    Arc,
5    atomic::{AtomicBool, Ordering},
6};
7
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
9use tokio::time::{Duration, sleep, timeout};
10use uuid::Uuid;
11
12use super::config::{ParamMap, SatoConfig};
13use super::transport::{SharedEventHandler, default_event_handler, dispatch_event};
14use super::types::SatoEvent;
15
16pub(crate) struct SatoShared {
17    pub is_connected: AtomicBool,
18    pub running: AtomicBool,
19    pub writer: tokio::sync::Mutex<Option<tokio::net::tcp::OwnedWriteHalf>>,
20    pub print_queue: tokio::sync::Mutex<VecDeque<String>>,
21    pub status: tokio::sync::Mutex<String>,
22}
23
24impl SatoShared {
25    pub fn new() -> Arc<Self> {
26        Arc::new(Self {
27            is_connected: AtomicBool::new(false),
28            running: AtomicBool::new(true),
29            writer: tokio::sync::Mutex::new(None),
30            print_queue: tokio::sync::Mutex::new(VecDeque::new()),
31            status: tokio::sync::Mutex::new(String::new()),
32        })
33    }
34}
35
36/// SATO label printer (TCP/IP connection).
37///
38/// `clone()` is cheap – all runtime state is behind an `Arc`.
39pub struct SatoPrinter {
40    pub config: SatoConfig,
41    pub on_event: SharedEventHandler,
42    pub(crate) shared: Arc<SatoShared>,
43}
44
45impl Clone for SatoPrinter {
46    fn clone(&self) -> Self {
47        Self {
48            config: self.config.clone(),
49            on_event: Arc::clone(&self.on_event),
50            shared: Arc::clone(&self.shared),
51        }
52    }
53}
54
55impl Default for SatoPrinter {
56    fn default() -> Self {
57        Self::new(SatoConfig::default())
58    }
59}
60
61impl SatoPrinter {
62    pub fn new(config: SatoConfig) -> Self {
63        Self {
64            config,
65            on_event: default_event_handler(),
66            shared: SatoShared::new(),
67        }
68    }
69
70    pub fn from_map(data: HashMap<String, serde_json::Value>) -> Self {
71        Self::new(SatoConfig::from_map(data))
72    }
73
74    pub fn with_event_handler(mut self, handler: SharedEventHandler) -> Self {
75        self.on_event = handler;
76        self
77    }
78
79    pub fn set_event_handler(&mut self, handler: SharedEventHandler) {
80        self.on_event = handler;
81    }
82
83    pub fn is_connected(&self) -> bool {
84        self.shared.is_connected.load(Ordering::Relaxed)
85    }
86
87    pub fn can_print(&self) -> bool {
88        self.is_connected()
89    }
90
91    pub fn pending_print_jobs(&self) -> usize {
92        self.shared
93            .print_queue
94            .try_lock()
95            .map(|queue| queue.len())
96            .unwrap_or(0)
97    }
98
99    pub fn to_map(&self) -> ParamMap {
100        self.config.to_map()
101    }
102
103    pub fn connect_instruction(&self) -> String {
104        format!("TCP {}:{}", self.config.ip, self.config.port)
105    }
106
107    pub async fn connect(&self) {
108        self.shared.running.store(true, Ordering::Relaxed);
109        loop {
110            if !self.shared.running.load(Ordering::Relaxed) {
111                break;
112            }
113
114            let addr = format!("{}:{}", self.config.ip, self.config.port);
115            match timeout(
116                Duration::from_secs(3),
117                tokio::net::TcpStream::connect(&addr),
118            )
119            .await
120            {
121                Ok(Ok(stream)) => {
122                    let (read_half, write_half) = stream.into_split();
123                    *self.shared.writer.lock().await = Some(write_half);
124                    self.on_connected();
125
126                    let recv_self = self.clone();
127                    let recv_task = tokio::spawn(async move {
128                        let mut buf_reader = BufReader::new(read_half);
129                        let mut line = String::new();
130                        loop {
131                            if !recv_self.shared.is_connected.load(Ordering::Relaxed) {
132                                break;
133                            }
134                            line.clear();
135                            match buf_reader.read_line(&mut line).await {
136                                Ok(0) => {
137                                    recv_self
138                                        .shared
139                                        .is_connected
140                                        .store(false, Ordering::Relaxed);
141                                    break;
142                                }
143                                Ok(_) => {
144                                    let trimmed = line.trim().to_string();
145                                    if !trimmed.is_empty() {
146                                        *recv_self.shared.status.lock().await = trimmed.clone();
147                                        dispatch_event(
148                                            &recv_self.on_event,
149                                            &recv_self.config.name,
150                                            &SatoEvent::Status(trimmed),
151                                        );
152                                    }
153                                }
154                                Err(_) => {
155                                    recv_self
156                                        .shared
157                                        .is_connected
158                                        .store(false, Ordering::Relaxed);
159                                    break;
160                                }
161                            }
162                        }
163                    });
164                    recv_task.await.ok();
165                    *self.shared.writer.lock().await = None;
166                    self.on_disconnected();
167                }
168                _ => {
169                    eprintln!(
170                        "[{}] TCP connection failed to {}, retrying in {}s",
171                        self.config.name, addr, self.config.reconnection_time
172                    );
173                }
174            }
175
176            if !self.shared.running.load(Ordering::Relaxed) {
177                break;
178            }
179            sleep(Duration::from_secs(self.config.reconnection_time)).await;
180        }
181    }
182
183    pub async fn close(&self) {
184        self.shared.running.store(false, Ordering::Relaxed);
185        self.shared.is_connected.store(false, Ordering::Relaxed);
186        *self.shared.writer.lock().await = None;
187        dispatch_event(
188            &self.on_event,
189            &self.config.name,
190            &SatoEvent::Connection(false),
191        );
192    }
193
194    pub async fn print(&self, zpl: &str) -> Result<String, String> {
195        if !self.is_connected() {
196            return Err("not connected".to_string());
197        }
198        let print_id = Uuid::new_v4().to_string();
199        let data = zpl.as_bytes().to_vec();
200        let mut guard = self.shared.writer.lock().await;
201        if let Some(writer) = guard.as_mut() {
202            writer
203                .write_all(&data)
204                .await
205                .map_err(|e| format!("print error: {e}"))?;
206        } else {
207            return Err("not connected".to_string());
208        }
209        drop(guard);
210        dispatch_event(
211            &self.on_event,
212            &self.config.name,
213            &SatoEvent::PrintSent(print_id.clone()),
214        );
215        Ok(print_id)
216    }
217
218    pub async fn add_to_print_queue(&self, labels: Vec<String>) {
219        {
220            let mut queue = self.shared.print_queue.lock().await;
221            for label in labels {
222                queue.push_back(label);
223            }
224        }
225        if self.is_connected() {
226            self.process_queue().await;
227        }
228    }
229
230    pub async fn process_queue(&self) {
231        let mut queue = self.shared.print_queue.lock().await;
232        let items: Vec<String> = queue.drain(..).collect();
233        drop(queue);
234        for label in items {
235            self.print(&label).await.ok();
236        }
237    }
238
239    fn on_connected(&self) {
240        self.shared.is_connected.store(true, Ordering::Relaxed);
241        dispatch_event(
242            &self.on_event,
243            &self.config.name,
244            &SatoEvent::Connection(true),
245        );
246        if self.pending_print_jobs() > 0 {
247            let printer = self.clone();
248            tokio::spawn(async move {
249                printer.process_queue().await;
250            });
251        }
252    }
253
254    fn on_disconnected(&self) {
255        self.shared.is_connected.store(false, Ordering::Relaxed);
256        dispatch_event(
257            &self.on_event,
258            &self.config.name,
259            &SatoEvent::Connection(false),
260        );
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267
268    #[tokio::test]
269    async fn queued_labels_are_kept_until_connected() {
270        let printer = SatoPrinter::default();
271        assert!(!printer.can_print());
272
273        printer
274            .add_to_print_queue(vec!["^XA^XZ".to_string(), "^XA^FO50,50^XZ".to_string()])
275            .await;
276
277        assert_eq!(printer.pending_print_jobs(), 2);
278    }
279}