1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context as TaskContext, Poll};
5use std::time::Duration;
6
7use fastboop_core::device::{
8 DeviceEvent, DeviceFilter, DeviceHandle as DeviceHandleTrait,
9 DeviceWatcher as DeviceWatcherTrait,
10};
11use fastboop_core::fastboot::{FastbootWire, Response};
12use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender, unbounded};
13use futures_core::Stream;
14use rusb::{Context, Device, DeviceHandle, Direction, TransferType, UsbContext as _};
15use std::sync::{
16 Arc,
17 atomic::{AtomicBool, Ordering},
18};
19use std::thread::JoinHandle;
20use tracing::{trace, warn};
21
22const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
23const HOTPLUG_POLL_INTERVAL: Duration = Duration::from_millis(250);
24const OPEN_BUSY_RETRY_DELAY: Duration = Duration::from_millis(150);
25const OPEN_BUSY_RETRIES: usize = 20;
26const RESPONSE_BUFFER_LEN: usize = 4096;
27
28#[derive(Clone, Copy, Debug)]
29pub struct FastbootInterface {
30 pub interface: u8,
31 pub ep_in: u8,
32 pub ep_out: u8,
33}
34
35pub struct FastbootRusb {
36 handle: DeviceHandle<Context>,
37 interface: u8,
38 ep_in: u8,
39 ep_out: u8,
40 timeout: Duration,
41}
42
43#[derive(Clone, Debug)]
44pub struct RusbDeviceHandle {
45 device: rusb::Device<Context>,
46 vid: u16,
47 pid: u16,
48}
49
50pub type FastbootRusbCandidate = RusbDeviceHandle;
51
52pub struct DeviceWatcher {
53 _context: Arc<Context>,
54 _registration: rusb::Registration<Context>,
55 receiver: UnboundedReceiver<DeviceEvent<RusbDeviceHandle>>,
56 running: Arc<AtomicBool>,
57 thread: Option<JoinHandle<()>>,
58}
59
60#[derive(Debug)]
61pub enum FastbootRusbError {
62 Usb(rusb::Error),
63 NoFastbootInterface,
64 InvalidResponse,
65 Fail(String),
66 DownloadTooLarge(usize),
67 UnexpectedStatus(String),
68 ShortWrite,
69 Utf8(std::string::FromUtf8Error),
70}
71
72#[derive(Debug)]
73pub enum DeviceWatcherError {
74 HotplugUnsupported,
75 Usb(rusb::Error),
76 ThreadSpawn(std::io::Error),
77 ChannelClosed,
78}
79
80impl fmt::Display for FastbootRusbError {
81 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82 match self {
83 Self::Usb(err) => write!(f, "usb error: {err}"),
84 Self::NoFastbootInterface => write!(f, "no fastboot bulk endpoints found"),
85 Self::InvalidResponse => write!(f, "invalid fastboot response"),
86 Self::Fail(msg) => write!(f, "fastboot failure: {msg}"),
87 Self::DownloadTooLarge(size) => write!(f, "download too large: {size} bytes"),
88 Self::UnexpectedStatus(status) => write!(f, "unexpected status: {status}"),
89 Self::ShortWrite => write!(f, "short write while sending data"),
90 Self::Utf8(err) => write!(f, "utf8 error: {err}"),
91 }
92 }
93}
94
95impl std::error::Error for FastbootRusbError {}
96
97impl std::error::Error for DeviceWatcherError {}
98
99impl From<rusb::Error> for FastbootRusbError {
100 fn from(value: rusb::Error) -> Self {
101 Self::Usb(value)
102 }
103}
104
105impl From<rusb::Error> for DeviceWatcherError {
106 fn from(value: rusb::Error) -> Self {
107 Self::Usb(value)
108 }
109}
110
111impl From<std::string::FromUtf8Error> for FastbootRusbError {
112 fn from(value: std::string::FromUtf8Error) -> Self {
113 Self::Utf8(value)
114 }
115}
116
117impl std::fmt::Display for DeviceWatcherError {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 match self {
120 Self::HotplugUnsupported => write!(f, "usb hotplug not supported"),
121 Self::Usb(err) => write!(f, "usb error: {err}"),
122 Self::ThreadSpawn(err) => write!(f, "failed to spawn hotplug thread: {err}"),
123 Self::ChannelClosed => write!(f, "device watcher event stream closed"),
124 }
125 }
126}
127
128struct HotplugCallback {
129 filters: Arc<Vec<DeviceFilter>>,
130 sender: UnboundedSender<DeviceEvent<RusbDeviceHandle>>,
131}
132
133impl rusb::Hotplug<Context> for HotplugCallback {
134 fn device_arrived(&mut self, device: Device<Context>) {
135 enqueue_arrived_if_matching(&self.filters, &self.sender, device);
136 }
137
138 fn device_left(&mut self, device: Device<Context>) {
139 enqueue_left_if_matching(&self.filters, &self.sender, device);
140 }
141}
142
143impl DeviceWatcher {
144 pub fn new(filters: &[DeviceFilter]) -> Result<Self, DeviceWatcherError> {
145 if !rusb::has_hotplug() {
146 return Err(DeviceWatcherError::HotplugUnsupported);
147 }
148
149 let context = Arc::new(Context::new()?);
150 let filters = Arc::new(filters.to_vec());
151 let (sender, receiver) = unbounded();
152
153 if let Ok(devices) = context.devices() {
154 for device in devices.iter() {
155 enqueue_arrived_if_matching(&filters, &sender, device);
156 }
157 }
158
159 let callback = HotplugCallback {
160 filters: Arc::clone(&filters),
161 sender: sender.clone(),
162 };
163 let registration = rusb::HotplugBuilder::new()
164 .enumerate(false)
165 .register(context.as_ref(), Box::new(callback))?;
166 let running = Arc::new(AtomicBool::new(true));
167 let thread_context = Arc::clone(&context);
168 let thread_running = Arc::clone(&running);
169 let thread = std::thread::Builder::new()
170 .name("fastboop-rusb-hotplug".to_string())
171 .spawn(move || {
172 while thread_running.load(Ordering::Relaxed) {
173 if let Err(err) = thread_context.handle_events(Some(HOTPLUG_POLL_INTERVAL)) {
174 if !matches!(err, rusb::Error::Interrupted) {
175 warn!(%err, "rusb hotplug event loop error");
176 }
177 }
178 }
179 })
180 .map_err(DeviceWatcherError::ThreadSpawn)?;
181
182 Ok(Self {
183 _context: context,
184 _registration: registration,
185 receiver,
186 running,
187 thread: Some(thread),
188 })
189 }
190}
191
192impl DeviceWatcherTrait for DeviceWatcher {
193 type Device = RusbDeviceHandle;
194 type Error = DeviceWatcherError;
195
196 fn poll_next_event(
197 mut self: Pin<&mut Self>,
198 cx: &mut TaskContext<'_>,
199 ) -> Poll<Result<DeviceEvent<Self::Device>, Self::Error>> {
200 match Pin::new(&mut self.receiver).poll_next(cx) {
201 Poll::Ready(Some(event)) => Poll::Ready(Ok(event)),
202 Poll::Ready(None) => Poll::Ready(Err(DeviceWatcherError::ChannelClosed)),
203 Poll::Pending => Poll::Pending,
204 }
205 }
206}
207
208impl Drop for DeviceWatcher {
209 fn drop(&mut self) {
210 self.running.store(false, Ordering::Relaxed);
211 if let Some(thread) = self.thread.take() {
212 let _ = thread.join();
213 }
214 }
215}
216
217impl FastbootRusb {
218 pub fn open(device: &Device<Context>) -> Result<Self, FastbootRusbError> {
219 let iface = find_fastboot_interface(device)?;
220 for attempt in 0..OPEN_BUSY_RETRIES {
221 let handle = match device.open() {
222 Ok(handle) => handle,
223 Err(rusb::Error::Busy) if attempt + 1 < OPEN_BUSY_RETRIES => {
224 trace!(
225 attempt = attempt + 1,
226 retries = OPEN_BUSY_RETRIES,
227 "fastboot open busy; retrying"
228 );
229 std::thread::sleep(OPEN_BUSY_RETRY_DELAY);
230 continue;
231 }
232 Err(err) => return Err(err.into()),
233 };
234
235 match Self::from_handle(handle, iface, DEFAULT_TIMEOUT) {
236 Ok(fastboot) => return Ok(fastboot),
237 Err(FastbootRusbError::Usb(rusb::Error::Busy))
238 if attempt + 1 < OPEN_BUSY_RETRIES =>
239 {
240 trace!(
241 attempt = attempt + 1,
242 retries = OPEN_BUSY_RETRIES,
243 "fastboot claim busy; retrying"
244 );
245 std::thread::sleep(OPEN_BUSY_RETRY_DELAY);
246 }
247 Err(err) => return Err(err),
248 }
249 }
250
251 Err(FastbootRusbError::Usb(rusb::Error::Busy))
252 }
253
254 pub fn from_handle(
255 handle: DeviceHandle<Context>,
256 iface: FastbootInterface,
257 timeout: Duration,
258 ) -> Result<Self, FastbootRusbError> {
259 let _ = handle.set_auto_detach_kernel_driver(true);
260 if let Ok(true) = handle.kernel_driver_active(iface.interface) {
261 let _ = handle.detach_kernel_driver(iface.interface);
262 }
263 handle.claim_interface(iface.interface)?;
264 Ok(Self {
265 handle,
266 interface: iface.interface,
267 ep_in: iface.ep_in,
268 ep_out: iface.ep_out,
269 timeout,
270 })
271 }
272
273 pub fn timeout(&self) -> Duration {
274 self.timeout
275 }
276
277 pub fn set_timeout(&mut self, timeout: Duration) {
278 self.timeout = timeout;
279 }
280
281 pub fn interface(&self) -> FastbootInterface {
282 FastbootInterface {
283 interface: self.interface,
284 ep_in: self.ep_in,
285 ep_out: self.ep_out,
286 }
287 }
288
289 fn send_command(&mut self, cmd: &str) -> Result<Response, FastbootRusbError> {
290 trace!(command = %cmd, "fastboot send");
291 self.handle
292 .write_bulk(self.ep_out, cmd.as_bytes(), self.timeout)?;
293 self.read_response_inner()
294 }
295
296 fn read_response_inner(&mut self) -> Result<Response, FastbootRusbError> {
297 let mut buf = vec![0u8; RESPONSE_BUFFER_LEN];
298 let n = self.handle.read_bulk(self.ep_in, &mut buf, self.timeout)?;
299 if n < 4 {
300 return Err(FastbootRusbError::InvalidResponse);
301 }
302 buf.truncate(n);
303 let status = String::from_utf8(buf[0..4].to_vec())?;
304 let payload = String::from_utf8(buf[4..].to_vec())?;
305 trace!(
306 status = %status,
307 payload = %truncate_payload(&payload),
308 "fastboot recv"
309 );
310 Ok(Response { status, payload })
311 }
312}
313
314impl RusbDeviceHandle {
315 pub fn new(device: rusb::Device<Context>, vid: u16, pid: u16) -> Self {
316 Self { device, vid, pid }
317 }
318
319 pub fn from_device(device: rusb::Device<Context>) -> Result<Self, rusb::Error> {
320 let desc = device.device_descriptor()?;
321 Ok(Self::new(device, desc.vendor_id(), desc.product_id()))
322 }
323
324 pub fn device(&self) -> &rusb::Device<Context> {
325 &self.device
326 }
327
328 pub fn usb_serial_number(&self) -> Option<String> {
329 let descriptor = self.device.device_descriptor().ok()?;
330 let handle = self.device.open().ok()?;
331 let serial = handle.read_serial_number_string_ascii(&descriptor).ok()?;
332 let serial = serial.trim();
333 if serial.is_empty() {
334 None
335 } else {
336 Some(serial.to_string())
337 }
338 }
339}
340
341impl DeviceHandleTrait for RusbDeviceHandle {
342 type FastbootWire = FastbootRusb;
343 type OpenFastbootError = FastbootRusbError;
344 type OpenFastbootFuture<'a> = Pin<
345 Box<dyn Future<Output = Result<Self::FastbootWire, Self::OpenFastbootError>> + Send + 'a>,
346 >;
347
348 fn vid(&self) -> u16 {
349 self.vid
350 }
351
352 fn pid(&self) -> u16 {
353 self.pid
354 }
355
356 fn open_fastboot<'a>(&'a self) -> Self::OpenFastbootFuture<'a> {
357 Box::pin(async move { FastbootRusb::open(&self.device) })
358 }
359}
360
361impl Drop for FastbootRusb {
362 fn drop(&mut self) {
363 let _ = self.handle.release_interface(self.interface);
364 }
365}
366
367impl FastbootWire for FastbootRusb {
368 type Error = FastbootRusbError;
369 type SendCommandFuture<'a> =
370 std::pin::Pin<Box<dyn std::future::Future<Output = Result<Response, Self::Error>> + 'a>>;
371 type SendDataFuture<'a> =
372 std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + 'a>>;
373 type ReadResponseFuture<'a> =
374 std::pin::Pin<Box<dyn std::future::Future<Output = Result<Response, Self::Error>> + 'a>>;
375
376 fn send_command<'a>(&'a mut self, cmd: &'a str) -> Self::SendCommandFuture<'a> {
377 Box::pin(async move { self.send_command(cmd) })
378 }
379
380 fn send_data<'a>(&'a mut self, data: &'a [u8]) -> Self::SendDataFuture<'a> {
381 Box::pin(async move {
382 let mut remaining = data;
383 while !remaining.is_empty() {
384 let written = self
385 .handle
386 .write_bulk(self.ep_out, remaining, self.timeout)?;
387 if written == 0 {
388 return Err(FastbootRusbError::ShortWrite);
389 }
390 remaining = &remaining[written..];
391 }
392 Ok(())
393 })
394 }
395
396 fn read_response<'a>(&'a mut self) -> Self::ReadResponseFuture<'a> {
397 Box::pin(async move { self.read_response_inner() })
398 }
399}
400
401pub fn find_fastboot_interface(
402 device: &Device<Context>,
403) -> Result<FastbootInterface, FastbootRusbError> {
404 let device_desc = device.device_descriptor()?;
405 for config_index in 0..device_desc.num_configurations() {
406 let config = device.config_descriptor(config_index)?;
407 for interface in config.interfaces() {
408 for alt in interface.descriptors() {
409 let mut ep_in = None;
410 let mut ep_out = None;
411 for endpoint in alt.endpoint_descriptors() {
412 if endpoint.transfer_type() != TransferType::Bulk {
413 continue;
414 }
415 match endpoint.direction() {
416 Direction::In => ep_in = Some(endpoint.address()),
417 Direction::Out => ep_out = Some(endpoint.address()),
418 }
419 }
420 if let (Some(ep_in), Some(ep_out)) = (ep_in, ep_out) {
421 trace!(
422 interface = alt.interface_number(),
423 ep_in = ep_in,
424 ep_out = ep_out,
425 "fastboot interface selected"
426 );
427 return Ok(FastbootInterface {
428 interface: alt.interface_number(),
429 ep_in,
430 ep_out,
431 });
432 }
433 }
434 }
435 }
436 Err(FastbootRusbError::NoFastbootInterface)
437}
438
439fn enqueue_arrived_if_matching(
440 filters: &[DeviceFilter],
441 sender: &UnboundedSender<DeviceEvent<RusbDeviceHandle>>,
442 device: Device<Context>,
443) {
444 let desc = match device.device_descriptor() {
445 Ok(desc) => desc,
446 Err(_) => return,
447 };
448 let vid = desc.vendor_id();
449 let pid = desc.product_id();
450 if !matches_filters(filters, vid, pid) {
451 return;
452 }
453 let _ = sender.unbounded_send(DeviceEvent::Arrived {
454 device: RusbDeviceHandle::new(device, vid, pid),
455 });
456}
457
458fn enqueue_left_if_matching(
459 filters: &[DeviceFilter],
460 sender: &UnboundedSender<DeviceEvent<RusbDeviceHandle>>,
461 device: Device<Context>,
462) {
463 let desc = match device.device_descriptor() {
464 Ok(desc) => desc,
465 Err(_) => return,
466 };
467 let vid = desc.vendor_id();
468 let pid = desc.product_id();
469 if !matches_filters(filters, vid, pid) {
470 return;
471 }
472 let _ = sender.unbounded_send(DeviceEvent::Left {
473 device: RusbDeviceHandle::new(device, vid, pid),
474 });
475}
476
477fn matches_filters(filters: &[DeviceFilter], vid: u16, pid: u16) -> bool {
478 if filters.is_empty() {
479 return true;
480 }
481 filters
482 .iter()
483 .any(|filter| filter.vid == vid && filter.pid == pid)
484}
485
486fn truncate_payload(payload: &str) -> String {
487 const MAX: usize = 96;
488 if payload.len() <= MAX {
489 payload.to_string()
490 } else {
491 format!("{}…", &payload[..MAX])
492 }
493}