use std::io;
use bytes::BytesMut;
use once_cell::sync::Lazy;
use regex::bytes::Regex as BytesRegex;
use serialport::{SerialPort, SerialPortInfo, SerialPortType};
const HWID: (u16, u16) = (0x0483, 0x5740);
const BUF_SIZE: usize = 1024;
pub const BAUD_115200: u32 = 115200;
pub static CLI_PROMPT: Lazy<BytesRegex> = Lazy::new(|| BytesRegex::new(r">: ").unwrap());
pub static CLI_EOL: Lazy<BytesRegex> = Lazy::new(|| BytesRegex::new(r"\r\n").unwrap());
pub static CLI_READY: Lazy<BytesRegex> = Lazy::new(|| BytesRegex::new(r"Ready\?\r\n").unwrap());
pub fn find_flipperzero(port_name: Option<&str>) -> Option<SerialPortInfo> {
let ports = serialport::available_ports().ok()?;
ports.into_iter().find(|p| {
if let Some(port) = port_name {
p.port_name == port
} else {
matches!(&p.port_type, SerialPortType::UsbPort(usb) if (usb.vid, usb.pid) == HWID)
}
})
}
pub struct SerialCli {
reader: SerialReader,
}
impl SerialCli {
pub fn new(port: Box<dyn SerialPort>) -> Self {
Self {
reader: SerialReader::new(port),
}
}
pub fn port(&self) -> &dyn SerialPort {
self.reader.get_ref()
}
pub fn port_mut(&mut self) -> &mut dyn SerialPort {
self.reader.get_mut()
}
pub fn start(&mut self) -> io::Result<()> {
self.port().clear(serialport::ClearBuffer::Input)?;
self.port_mut()
.write_data_terminal_ready(true)
.expect("failed to set DTR");
self.send_line("device_info")?;
self.reader
.read_until(&BytesRegex::new(r"hardware_model").unwrap(), true)?;
self.read_until_prompt()?;
Ok(())
}
pub fn send_line(&mut self, line: &str) -> io::Result<()> {
write!(self.port_mut(), "{line}\r")
}
pub fn send_and_wait_eol(&mut self, line: &str) -> io::Result<BytesMut> {
self.send_line(line)?;
self.read_until_eol()
}
pub fn send_and_wait_prompt(&mut self, line: &str) -> io::Result<BytesMut> {
self.send_line(line)?;
self.read_until_prompt()
}
pub fn read_until_prompt(&mut self) -> io::Result<BytesMut> {
self.reader.read_until(&CLI_PROMPT, true)
}
pub fn read_until_ready(&mut self) -> io::Result<BytesMut> {
self.reader.read_until(&CLI_READY, true)
}
pub fn read_until_eol(&mut self) -> io::Result<BytesMut> {
self.reader.read_until(&CLI_EOL, true)
}
pub fn consume_response(&mut self) -> io::Result<String> {
let line = self.reader.read_until(&CLI_EOL, true)?;
let line = String::from_utf8_lossy(&line);
self.read_until_prompt()?;
if let Some(error) = Self::get_error(&line) {
return Err(io::Error::new(io::ErrorKind::Other, error));
}
Ok(line.into_owned())
}
pub fn get_error(data: &str) -> Option<&str> {
let (_, text) = data.split_once("Storage error: ")?;
Some(text.trim())
}
}
pub struct SerialReader {
port: Box<dyn SerialPort>,
buffer: BytesMut,
}
impl SerialReader {
pub fn new(port: Box<dyn SerialPort>) -> Self {
Self {
port,
buffer: BytesMut::with_capacity(BUF_SIZE),
}
}
pub fn get_ref(&self) -> &dyn SerialPort {
self.port.as_ref()
}
pub fn get_mut(&mut self) -> &mut dyn SerialPort {
self.port.as_mut()
}
pub fn read_until(&mut self, regex: &BytesRegex, trim: bool) -> io::Result<BytesMut> {
let mut buf = [0u8; BUF_SIZE];
loop {
if let Some(m) = regex.find(&self.buffer) {
let start = m.start();
let end = m.end();
let mut data = self.buffer.split_to(end);
if trim {
data.truncate(data.len() - (end - start));
}
return Ok(data);
}
let n = (self.port.bytes_to_read()? as usize).clamp(1, buf.len());
self.port.read_exact(&mut buf[..n])?;
self.buffer.extend_from_slice(&buf[..n]);
}
}
}