use std::collections::VecDeque;
use std::io::Read;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use crossbeam_channel as channel;
use serialport::SerialPort;
use crate::protocol::parser::{self, ParseEvent, StreamParser};
use crate::types::{ButtonMask, CatchEvent};
pub(crate) struct ReaderSignal {
pub alive: AtomicBool,
pub disconnect_notify: (Mutex<bool>, Condvar),
}
impl ReaderSignal {
pub fn new() -> Self {
Self {
alive: AtomicBool::new(true),
disconnect_notify: (Mutex::new(false), Condvar::new()),
}
}
}
pub(crate) fn reader_thread(
mut port: Box<dyn SerialPort>,
pending_responses: Arc<Mutex<VecDeque<channel::Sender<Vec<u8>>>>>,
button_subs: Arc<Mutex<Vec<channel::Sender<ButtonMask>>>>,
catch_subs: Arc<Mutex<Vec<channel::Sender<CatchEvent>>>>,
signal: Arc<ReaderSignal>,
) {
let mut parser = StreamParser::new();
let mut buf = [0u8; 256];
loop {
match port.read(&mut buf) {
Ok(n) => {
if !signal.alive.load(Ordering::Acquire) {
break;
}
for &byte in &buf[..n] {
if let Some(event) = parser.feed(byte) {
match event {
ParseEvent::ButtonEvent(mask) => {
let mut subs = button_subs.lock().unwrap();
subs.retain(|sub| sub.send(ButtonMask(mask)).is_ok());
}
ParseEvent::Response(data) => {
if let Some(catch_event) = parser::parse_catch_event(&data) {
let mut subs = catch_subs.lock().unwrap();
subs.retain(|sub| sub.send(catch_event).is_ok());
} else {
let mut pending = pending_responses.lock().unwrap();
if let Some(tx) = pending.pop_front() {
let _ = tx.send(data);
}
}
}
}
}
}
}
Err(e) if e.kind() == std::io::ErrorKind::TimedOut => {
if !signal.alive.load(Ordering::Acquire) {
break;
}
continue;
}
Err(_) => break,
}
}
signal.alive.store(false, Ordering::Release);
let (lock, cvar) = &signal.disconnect_notify;
let mut disconnected = lock.lock().unwrap();
*disconnected = true;
cvar.notify_all();
}