rusb_async/
pool.rs

1use std::collections::VecDeque;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use rusb::{ffi, DeviceHandle, UsbContext};
6
7use crate::{error::Error, error::Result, Transfer};
8
9use std::sync::atomic::{AtomicBool, Ordering};
10
11/// Represents a pool of asynchronous transfers, that can be polled to completion
12pub struct TransferPool<C: UsbContext> {
13    device: Arc<DeviceHandle<C>>,
14    pending: VecDeque<Transfer>,
15}
16
17impl<C: UsbContext> TransferPool<C> {
18    pub fn new(device: Arc<DeviceHandle<C>>) -> Result<Self> {
19        Ok(Self {
20            device,
21            pending: VecDeque::new(),
22        })
23    }
24
25    pub fn submit_bulk(&mut self, endpoint: u8, buf: Vec<u8>) -> Result<()> {
26        // Safety: If transfer is submitted, it is pushed onto `pending` where it will be
27        // dropped before `device` is freed.
28        unsafe {
29            let mut transfer = Transfer::bulk(self.device.as_raw(), endpoint, buf);
30            transfer.submit()?;
31            self.pending.push_back(transfer);
32            Ok(())
33        }
34    }
35
36    pub fn submit_control(
37        &mut self,
38        request_type: u8,
39        request: u8,
40        value: u16,
41        index: u16,
42        data: &[u8],
43    ) -> Result<()> {
44        // Safety: If transfer is submitted, it is pushed onto `pending` where it will be
45        // dropped before `device` is freed.
46        unsafe {
47            let mut transfer = Transfer::control(
48                self.device.as_raw(),
49                request_type,
50                request,
51                value,
52                index,
53                data,
54            );
55            transfer.submit()?;
56            self.pending.push_back(transfer);
57            Ok(())
58        }
59    }
60
61    pub unsafe fn submit_control_raw(&mut self, buffer: Vec<u8>) -> Result<()> {
62        // Safety: If transfer is submitted, it is pushed onto `pending` where it will be
63        // dropped before `device` is freed.
64        unsafe {
65            let mut transfer = Transfer::control_raw(self.device.as_raw(), buffer);
66            transfer.submit()?;
67            self.pending.push_back(transfer);
68            Ok(())
69        }
70    }
71
72    pub fn submit_interrupt(&mut self, endpoint: u8, buf: Vec<u8>) -> Result<()> {
73        // Safety: If transfer is submitted, it is pushed onto `pending` where it will be
74        // dropped before `device` is freed.
75        unsafe {
76            let mut transfer = Transfer::interrupt(self.device.as_raw(), endpoint, buf);
77            transfer.submit()?;
78            self.pending.push_back(transfer);
79            Ok(())
80        }
81    }
82
83    pub fn submit_iso(&mut self, endpoint: u8, buf: Vec<u8>, iso_packets: i32) -> Result<()> {
84        // Safety: If transfer is submitted, it is pushed onto `pending` where it will be
85        // dropped before `device` is freed.
86        unsafe {
87            let mut transfer = Transfer::iso(self.device.as_raw(), endpoint, buf, iso_packets);
88            transfer.submit()?;
89            self.pending.push_back(transfer);
90            Ok(())
91        }
92    }
93
94    pub fn poll(&mut self, timeout: Duration) -> Result<Vec<u8>> {
95        let next = self.pending.front().ok_or(Error::NoTransfersPending)?;
96        if poll_completed(self.device.context(), timeout, next.completed_flag()) {
97            let mut transfer = self.pending.pop_front().unwrap();
98            let res = transfer.handle_completed();
99            res
100        } else {
101            Err(Error::PollTimeout)
102        }
103    }
104
105    pub fn cancel_all(&mut self) {
106        // Cancel in reverse order to avoid a race condition in which one
107        // transfer is cancelled but another submitted later makes its way onto
108        // the bus.
109        for transfer in self.pending.iter_mut().rev() {
110            transfer.cancel();
111        }
112    }
113
114    /// Returns the number of async transfers pending
115    pub fn pending(&self) -> usize {
116        self.pending.len()
117    }
118}
119
120unsafe impl<C: UsbContext> Send for TransferPool<C> {}
121unsafe impl<C: UsbContext> Sync for TransferPool<C> {}
122
123impl<C: UsbContext> Drop for TransferPool<C> {
124    fn drop(&mut self) {
125        self.cancel_all();
126        while self.pending() > 0 {
127            self.poll(Duration::from_secs(1)).ok();
128        }
129    }
130}
131
132/// This is effectively libusb_handle_events_timeout_completed, but with
133/// `completed` as `AtomicBool` instead of `c_int` so it is safe to access
134/// without the events lock held. It also continues polling until completion,
135/// timeout, or error, instead of potentially returning early.
136///
137/// This design is based on
138/// https://libusb.sourceforge.io/api-1.0/libusb_mtasync.html#threadwait
139///
140/// Returns `true` when `completed` becomes true, `false` on timeout, and panics on
141/// any other libusb error.
142fn poll_completed(ctx: &impl UsbContext, timeout: Duration, completed: &AtomicBool) -> bool {
143    use ffi::constants::LIBUSB_ERROR_TIMEOUT;
144
145    let deadline = Instant::now() + timeout;
146
147    let mut err = 0;
148    while err == 0 && !completed.load(Ordering::SeqCst) && deadline > Instant::now() {
149        let remaining = deadline.saturating_duration_since(Instant::now());
150        let timeval = libc::timeval {
151            tv_sec: remaining.as_secs().try_into().unwrap(),
152            tv_usec: remaining.subsec_micros().try_into().unwrap(),
153        };
154        unsafe {
155            if ffi::libusb_try_lock_events(ctx.as_raw()) == 0 {
156                if !completed.load(Ordering::SeqCst)
157                    && ffi::libusb_event_handling_ok(ctx.as_raw()) != 0
158                {
159                    err = ffi::libusb_handle_events_locked(ctx.as_raw(), &timeval as *const _);
160                }
161                ffi::libusb_unlock_events(ctx.as_raw());
162            } else {
163                ffi::libusb_lock_event_waiters(ctx.as_raw());
164                if !completed.load(Ordering::SeqCst)
165                    && ffi::libusb_event_handler_active(ctx.as_raw()) != 0
166                {
167                    ffi::libusb_wait_for_event(ctx.as_raw(), &timeval as *const _);
168                }
169                ffi::libusb_unlock_event_waiters(ctx.as_raw());
170            }
171        }
172    }
173
174    match err {
175        0 => completed.load(Ordering::SeqCst),
176        LIBUSB_ERROR_TIMEOUT => false,
177        _ => panic!(
178            "Error {} when polling transfers: {}",
179            err,
180            unsafe { std::ffi::CStr::from_ptr(ffi::libusb_strerror(err)) }.to_string_lossy()
181        ),
182    }
183}