flipperzero_tools/
serial.rs

1//! UART serial interface.
2
3use std::io;
4
5use bytes::BytesMut;
6use once_cell::sync::Lazy;
7use regex::bytes::Regex as BytesRegex;
8use serialport::{SerialPort, SerialPortInfo, SerialPortType};
9
10/// STMicroelectronics Virtual COM Port
11const HWID: (u16, u16) = (0x0483, 0x5740);
12const BUF_SIZE: usize = 1024;
13pub const BAUD_115200: u32 = 115200;
14
15pub static CLI_PROMPT: Lazy<BytesRegex> = Lazy::new(|| BytesRegex::new(r">: ").unwrap());
16pub static CLI_EOL: Lazy<BytesRegex> = Lazy::new(|| BytesRegex::new(r"\r\n").unwrap());
17pub static CLI_READY: Lazy<BytesRegex> = Lazy::new(|| BytesRegex::new(r"Ready\?\r\n").unwrap());
18
19/// Try to find the Flipper Zero USB serial port.
20pub fn find_flipperzero(port_name: Option<&str>) -> Option<SerialPortInfo> {
21    let ports = serialport::available_ports().ok()?;
22
23    ports.into_iter().find(|p| {
24        if let Some(port) = port_name {
25            // Search for port by name
26            p.port_name == port
27        } else {
28            // Auto-detect port
29            matches!(&p.port_type, SerialPortType::UsbPort(usb) if (usb.vid, usb.pid) == HWID)
30        }
31    })
32}
33
34/// Serial Command-line interface.
35pub struct SerialCli {
36    reader: SerialReader,
37}
38
39impl SerialCli {
40    /// Create a new [`SerialCli`] connected to a [`SerialPort`].
41    pub fn new(port: Box<dyn SerialPort>) -> Self {
42        Self {
43            reader: SerialReader::new(port),
44        }
45    }
46
47    /// Get reference to underlying [`SerialPort`].
48    pub fn port(&self) -> &dyn SerialPort {
49        self.reader.get_ref()
50    }
51
52    /// Get mutable reference to underlying [`SerialPort`].
53    pub fn port_mut(&mut self) -> &mut dyn SerialPort {
54        self.reader.get_mut()
55    }
56
57    /// Reset serial to prompt.
58    pub fn start(&mut self) -> io::Result<()> {
59        self.port().clear(serialport::ClearBuffer::Input)?;
60        self.port_mut()
61            .write_data_terminal_ready(true)
62            .expect("failed to set DTR");
63
64        // Send command with known syntax to make sure buffer is flushed
65        self.send_line("device_info")?;
66        self.reader
67            .read_until(&BytesRegex::new(r"hardware_model").unwrap(), true)?;
68
69        // Read buffer until we get prompt
70        self.read_until_prompt()?;
71
72        Ok(())
73    }
74
75    /// Send line to device.
76    pub fn send_line(&mut self, line: &str) -> io::Result<()> {
77        write!(self.port_mut(), "{line}\r")
78    }
79
80    /// Send line to device and wait for next end-of-line.
81    pub fn send_and_wait_eol(&mut self, line: &str) -> io::Result<BytesMut> {
82        self.send_line(line)?;
83
84        self.read_until_eol()
85    }
86
87    /// Send line to device and wait for next CLI prompt.
88    pub fn send_and_wait_prompt(&mut self, line: &str) -> io::Result<BytesMut> {
89        self.send_line(line)?;
90
91        self.read_until_prompt()
92    }
93
94    /// Read until next CLI prompt.
95    pub fn read_until_prompt(&mut self) -> io::Result<BytesMut> {
96        self.reader.read_until(&CLI_PROMPT, true)
97    }
98
99    /// Read until next CLI "Ready?" prompt.
100    pub fn read_until_ready(&mut self) -> io::Result<BytesMut> {
101        self.reader.read_until(&CLI_READY, true)
102    }
103
104    /// Read until next end-of-line.
105    pub fn read_until_eol(&mut self) -> io::Result<BytesMut> {
106        self.reader.read_until(&CLI_EOL, true)
107    }
108
109    /// Consume command respose, checking for errors.
110    pub fn consume_response(&mut self) -> io::Result<String> {
111        let line = self.reader.read_until(&CLI_EOL, true)?;
112        let line = String::from_utf8_lossy(&line);
113        self.read_until_prompt()?;
114
115        if let Some(error) = Self::get_error(&line) {
116            return Err(io::Error::new(io::ErrorKind::Other, error));
117        }
118
119        Ok(line.into_owned())
120    }
121
122    /// Extract error text.
123    pub fn get_error(data: &str) -> Option<&str> {
124        let (_, text) = data.split_once("Storage error: ")?;
125
126        Some(text.trim())
127    }
128}
129
130/// Buffered reader for [`SerialPort`].
131pub struct SerialReader {
132    port: Box<dyn SerialPort>,
133    buffer: BytesMut,
134}
135
136impl SerialReader {
137    /// Create new [`SerialReader`] connected to a [`SerialPort`].
138    pub fn new(port: Box<dyn SerialPort>) -> Self {
139        Self {
140            port,
141            buffer: BytesMut::with_capacity(BUF_SIZE),
142        }
143    }
144
145    /// Get reference to underlying [`SerialPort`].
146    pub fn get_ref(&self) -> &dyn SerialPort {
147        self.port.as_ref()
148    }
149
150    /// Get mutable reference to underlying [`SerialPort`].
151    pub fn get_mut(&mut self) -> &mut dyn SerialPort {
152        self.port.as_mut()
153    }
154
155    /// Read from [`SerialPort`] until [`BytesRegex`] is matched.
156    pub fn read_until(&mut self, regex: &BytesRegex, trim: bool) -> io::Result<BytesMut> {
157        let mut buf = [0u8; BUF_SIZE];
158        loop {
159            if let Some(m) = regex.find(&self.buffer) {
160                let start = m.start();
161                let end = m.end();
162
163                let mut data = self.buffer.split_to(end);
164                if trim {
165                    // Trim the matched pattern
166                    data.truncate(data.len() - (end - start));
167                }
168
169                return Ok(data);
170            }
171
172            // We always read at least 1 byte.
173            let n = (self.port.bytes_to_read()? as usize).clamp(1, buf.len());
174
175            self.port.read_exact(&mut buf[..n])?;
176            self.buffer.extend_from_slice(&buf[..n]);
177        }
178    }
179}