1use std::collections::VecDeque;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use rusb::{ffi, DeviceHandle, UsbContext};
6
7use crate::{error::Error, error::Result, Transfer};
8
9use std::sync::atomic::{AtomicBool, Ordering};
10
11pub struct TransferPool<C: UsbContext> {
13 device: Arc<DeviceHandle<C>>,
14 pending: VecDeque<Transfer>,
15}
16
17impl<C: UsbContext> TransferPool<C> {
18 pub fn new(device: Arc<DeviceHandle<C>>) -> Result<Self> {
19 Ok(Self {
20 device,
21 pending: VecDeque::new(),
22 })
23 }
24
25 pub fn submit_bulk(&mut self, endpoint: u8, buf: Vec<u8>) -> Result<()> {
26 unsafe {
29 let mut transfer = Transfer::bulk(self.device.as_raw(), endpoint, buf);
30 transfer.submit()?;
31 self.pending.push_back(transfer);
32 Ok(())
33 }
34 }
35
36 pub fn submit_control(
37 &mut self,
38 request_type: u8,
39 request: u8,
40 value: u16,
41 index: u16,
42 data: &[u8],
43 ) -> Result<()> {
44 unsafe {
47 let mut transfer = Transfer::control(
48 self.device.as_raw(),
49 request_type,
50 request,
51 value,
52 index,
53 data,
54 );
55 transfer.submit()?;
56 self.pending.push_back(transfer);
57 Ok(())
58 }
59 }
60
61 pub unsafe fn submit_control_raw(&mut self, buffer: Vec<u8>) -> Result<()> {
62 unsafe {
65 let mut transfer = Transfer::control_raw(self.device.as_raw(), buffer);
66 transfer.submit()?;
67 self.pending.push_back(transfer);
68 Ok(())
69 }
70 }
71
72 pub fn submit_interrupt(&mut self, endpoint: u8, buf: Vec<u8>) -> Result<()> {
73 unsafe {
76 let mut transfer = Transfer::interrupt(self.device.as_raw(), endpoint, buf);
77 transfer.submit()?;
78 self.pending.push_back(transfer);
79 Ok(())
80 }
81 }
82
83 pub fn submit_iso(&mut self, endpoint: u8, buf: Vec<u8>, iso_packets: i32) -> Result<()> {
84 unsafe {
87 let mut transfer = Transfer::iso(self.device.as_raw(), endpoint, buf, iso_packets);
88 transfer.submit()?;
89 self.pending.push_back(transfer);
90 Ok(())
91 }
92 }
93
94 pub fn poll(&mut self, timeout: Duration) -> Result<Vec<u8>> {
95 let next = self.pending.front().ok_or(Error::NoTransfersPending)?;
96 if poll_completed(self.device.context(), timeout, next.completed_flag()) {
97 let mut transfer = self.pending.pop_front().unwrap();
98 let res = transfer.handle_completed();
99 res
100 } else {
101 Err(Error::PollTimeout)
102 }
103 }
104
105 pub fn cancel_all(&mut self) {
106 for transfer in self.pending.iter_mut().rev() {
110 transfer.cancel();
111 }
112 }
113
114 pub fn pending(&self) -> usize {
116 self.pending.len()
117 }
118}
119
120unsafe impl<C: UsbContext> Send for TransferPool<C> {}
121unsafe impl<C: UsbContext> Sync for TransferPool<C> {}
122
123impl<C: UsbContext> Drop for TransferPool<C> {
124 fn drop(&mut self) {
125 self.cancel_all();
126 while self.pending() > 0 {
127 self.poll(Duration::from_secs(1)).ok();
128 }
129 }
130}
131
132fn poll_completed(ctx: &impl UsbContext, timeout: Duration, completed: &AtomicBool) -> bool {
143 use ffi::constants::LIBUSB_ERROR_TIMEOUT;
144
145 let deadline = Instant::now() + timeout;
146
147 let mut err = 0;
148 while err == 0 && !completed.load(Ordering::SeqCst) && deadline > Instant::now() {
149 let remaining = deadline.saturating_duration_since(Instant::now());
150 let timeval = libc::timeval {
151 tv_sec: remaining.as_secs().try_into().unwrap(),
152 tv_usec: remaining.subsec_micros().try_into().unwrap(),
153 };
154 unsafe {
155 if ffi::libusb_try_lock_events(ctx.as_raw()) == 0 {
156 if !completed.load(Ordering::SeqCst)
157 && ffi::libusb_event_handling_ok(ctx.as_raw()) != 0
158 {
159 err = ffi::libusb_handle_events_locked(ctx.as_raw(), &timeval as *const _);
160 }
161 ffi::libusb_unlock_events(ctx.as_raw());
162 } else {
163 ffi::libusb_lock_event_waiters(ctx.as_raw());
164 if !completed.load(Ordering::SeqCst)
165 && ffi::libusb_event_handler_active(ctx.as_raw()) != 0
166 {
167 ffi::libusb_wait_for_event(ctx.as_raw(), &timeval as *const _);
168 }
169 ffi::libusb_unlock_event_waiters(ctx.as_raw());
170 }
171 }
172 }
173
174 match err {
175 0 => completed.load(Ordering::SeqCst),
176 LIBUSB_ERROR_TIMEOUT => false,
177 _ => panic!(
178 "Error {} when polling transfers: {}",
179 err,
180 unsafe { std::ffi::CStr::from_ptr(ffi::libusb_strerror(err)) }.to_string_lossy()
181 ),
182 }
183}