1use crate::flag;
2use rusb::UsbContext;
3
4#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
5pub struct Configuration {
6 pub buffer_length: usize,
7 pub ring_length: usize,
8 pub transfer_queue_length: usize,
9 pub allow_dma: bool,
10}
11
12impl Configuration {
13 pub fn deserialize_bincode(data: &[u8]) -> bincode::Result<Configuration> {
14 bincode::deserialize(data)
15 }
16}
17
18#[derive(thiserror::Error, Debug, Clone)]
19pub enum Error {
20 #[error(transparent)]
21 Rusb(#[from] rusb::Error),
22
23 #[error("device with serial not found")]
24 Serial(String),
25
26 #[error("there is no device on bus {bus_number} at address {address}")]
27 BusNumberAndAddressNotFound { bus_number: u8, address: u8 },
28
29 #[error("unsupported device on bus {bus_number} at address {address}")]
30 BusNumberAndAddressUnsupportedDevice { bus_number: u8, address: u8 },
31
32 #[error("could not access device on bus {bus_number} at address {address} ({error:?})")]
33 BusNumberAndAddressAccessError {
34 bus_number: u8,
35 address: u8,
36 error: rusb::Error,
37 },
38
39 #[error("the device on bus {bus_number} at address {address} has an unsupported VID:PID ({vendor_id:04X}:{product_id:04X})")]
40 BusNumberAndAddressUnexpectedIds {
41 bus_number: u8,
42 address: u8,
43 vendor_id: u16,
44 product_id: u16,
45 },
46
47 #[error("device not found")]
48 Device,
49
50 #[error("ring size is smaller than or equal to transfer queue size")]
51 ConfigurationSizes,
52
53 #[error("ring overflow")]
54 Overflow,
55
56 #[error("control transfer error (expected {expected:?}, read {read:?})")]
57 Mismatch { expected: Vec<u8>, read: Vec<u8> },
58
59 #[error("control transfer error (expected one of {expected:?}, read {read:?})")]
60 MismatchAny {
61 expected: Vec<Vec<u8>>,
62 read: Vec<u8>,
63 },
64
65 #[error("the device is already used by another program")]
66 Busy,
67}
68
69#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
70pub enum Speed {
71 Unknown,
72 Low,
73 Full,
74 High,
75 Super,
76 SuperPlus,
77}
78
79impl From<rusb::Speed> for Speed {
80 fn from(speed: rusb::Speed) -> Self {
81 match speed {
82 rusb::Speed::Low => Self::Low,
83 rusb::Speed::Full => Self::Full,
84 rusb::Speed::High => Self::High,
85 rusb::Speed::Super => Self::Super,
86 rusb::Speed::SuperPlus => Self::SuperPlus,
87 _ => Self::Unknown,
88 }
89 }
90}
91
92impl std::fmt::Display for Speed {
93 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 match self {
95 Self::Unknown => write!(formatter, "USB Unknown speed"),
96 Self::Low => write!(formatter, "USB 1.0 Low Speed (1.5 Mb/s)"),
97 Self::Full => write!(formatter, "USB 1.1 Full Speed (12 Mb/s)"),
98 Self::High => write!(formatter, "USB 2.0 High Speed (480 Mb/s)"),
99 Self::Super => write!(formatter, "USB 3.0 SuperSpeed (5.0 Gb/s)"),
100 Self::SuperPlus => write!(formatter, "USB 3.1 SuperSpeed+ (10.0 Gb/s)"),
101 }
102 }
103}
104
105pub fn assert_control_transfer(
106 handle: &rusb::DeviceHandle<rusb::Context>,
107 request_type: u8,
108 request: u8,
109 value: u16,
110 index: u16,
111 expected_buffer: &[u8],
112 timeout: std::time::Duration,
113) -> Result<(), Error> {
114 let mut buffer = vec![0; expected_buffer.len()];
115 let read = handle.read_control(request_type, request, value, index, &mut buffer, timeout)?;
116 buffer.truncate(read);
117 if expected_buffer == &buffer[..] {
118 Ok(())
119 } else {
120 Err(Error::Mismatch {
121 expected: Vec::from(expected_buffer),
122 read: buffer,
123 })
124 }
125}
126
127pub fn assert_string_descriptor_any(
128 handle: &rusb::DeviceHandle<rusb::Context>,
129 request_type: u8,
130 request: u8,
131 value: u16,
132 index: u16,
133 expected_buffers: &[&[u8]],
134 timeout: std::time::Duration,
135) -> Result<(), Error> {
136 let mut buffer = vec![
137 0;
138 expected_buffers
139 .iter()
140 .fold(0, |maximum, expected_buffer| maximum
141 .max(expected_buffer.len())
142 + 2)
143 ];
144 let read = handle.read_control(request_type, request, value, index, &mut buffer, timeout)?;
145 buffer.truncate(read);
146 for expected_buffer in expected_buffers {
147 if *expected_buffer == &buffer[2..] {
148 return Ok(());
149 }
150 }
151 buffer.drain(0..2);
152 Err(Error::MismatchAny {
153 expected: expected_buffers
154 .iter()
155 .map(|expected_buffer| Vec::from(*expected_buffer))
156 .collect(),
157 read: buffer,
158 })
159}
160
161extern "system" {
162 pub fn libusb_dev_mem_alloc(
163 dev_handle: *mut libusb1_sys::libusb_device_handle,
164 length: libc::ssize_t,
165 ) -> *mut libc::c_uchar;
166
167 pub fn libusb_dev_mem_free(
168 dev_handle: *mut libusb1_sys::libusb_device_handle,
169 buffer: *mut libc::c_uchar,
170 length: libc::ssize_t,
171 ) -> *mut libc::c_int;
172}
173
174struct BufferData(std::ptr::NonNull<u8>);
175
176unsafe impl Send for BufferData {}
177unsafe impl Sync for BufferData {}
178
179impl BufferData {
180 fn as_ptr(&self) -> *mut u8 {
181 self.0.as_ptr()
182 }
183}
184
185struct Buffer {
186 system_time: std::time::SystemTime,
187 instant: std::time::Instant,
188 first_after_overflow: bool,
189 data: BufferData,
190 length: usize,
191 capacity: usize,
192 dma: bool,
193}
194
195pub struct EventLoop {
196 context: rusb::Context,
197 running: std::sync::Arc<std::sync::atomic::AtomicBool>,
198 thread: Option<std::thread::JoinHandle<()>>,
199}
200
201#[derive(Debug, Clone, Copy)]
202pub struct Overflow(());
203
204impl EventLoop {
205 pub fn new<IntoError, IntoWarning>(
206 timeout: std::time::Duration,
207 flag: flag::Flag<IntoError, IntoWarning>,
208 ) -> Result<Self, Error>
209 where
210 IntoError: From<Error> + Clone + Send + 'static,
211 IntoWarning: From<Overflow> + Clone + Send + 'static,
212 {
213 let context = rusb::Context::new()?;
214 let running = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(true));
215 let thread_running = running.clone();
216 let thread_context = context.clone();
217 Ok(Self {
218 context,
219 thread: Some(std::thread::spawn(move || {
220 while thread_running.load(std::sync::atomic::Ordering::Acquire) {
221 if let Err(handle_events_error) = thread_context.handle_events(Some(timeout)) {
222 flag.store_error_if_not_set(Error::from(handle_events_error));
223 }
224 }
225 })),
226 running,
227 })
228 }
229
230 pub fn context(&self) -> &rusb::Context {
231 &self.context
232 }
233}
234
235impl Drop for EventLoop {
236 fn drop(&mut self) {
237 self.running
238 .store(false, std::sync::atomic::Ordering::Release);
239 if let Some(thread) = self.thread.take() {
240 thread.join().expect("event loop joined self");
241 }
242 }
243}
244
245enum TransferStatus {
246 Active,
247 Complete,
248 Cancelling,
249 Deallocated,
250}
251
252#[derive(Clone)]
253pub struct WriteRange {
254 pub start: usize,
255 pub end: usize,
256 pub ring_length: usize,
257}
258
259impl WriteRange {
260 fn increment_start(&mut self) {
261 self.start = (self.start + 1) % self.ring_length;
262 }
263
264 fn increment_end(&mut self) {
265 self.end = (self.end + 1) % self.ring_length;
266 }
267}
268
269#[derive(Debug, Clone, Copy)]
270pub enum Clutch {
271 Disengaged,
272 Engaged,
273}
274
275struct RingContext {
276 read: usize,
277 write_range: WriteRange,
278 transfer_statuses: Vec<TransferStatus>,
279 buffers: Vec<Buffer>,
280 freewheel_buffers: Vec<Buffer>,
281 clutch: Clutch,
282}
283
284struct SharedRingContext {
285 on_error: Box<dyn Fn(Error) + Send + Sync + 'static>,
286 on_overflow: Box<dyn Fn(Overflow) + Send + Sync + 'static>,
287 shared: std::sync::Mutex<RingContext>,
288 shared_condvar: std::sync::Condvar,
289}
290
291struct LibusbTransfer(std::ptr::NonNull<libusb1_sys::libusb_transfer>);
292
293unsafe impl Send for LibusbTransfer {}
294
295impl LibusbTransfer {
296 unsafe fn as_mut(&mut self) -> &mut libusb1_sys::libusb_transfer {
297 self.0.as_mut()
298 }
299
300 fn as_ptr(&self) -> *mut libusb1_sys::libusb_transfer {
301 self.0.as_ptr()
302 }
303}
304
305pub struct Ring {
306 transfers: Vec<LibusbTransfer>,
307 handle: std::sync::Arc<rusb::DeviceHandle<rusb::Context>>,
308 active_buffer_view: std::sync::Arc<std::sync::atomic::AtomicBool>,
309 #[allow(dead_code)]
310 event_loop: std::sync::Arc<EventLoop>,
311 context: std::sync::Arc<SharedRingContext>,
312}
313
314unsafe impl Send for Ring {}
315unsafe impl Sync for Ring {}
316
317pub enum TransferType {
318 Control(std::time::Duration),
319 Isochronous {
320 endpoint: u8,
321 packets: u32,
322 timeout: std::time::Duration,
323 },
324 Bulk {
325 endpoint: u8,
326 timeout: std::time::Duration,
327 },
328 Interrupt {
329 endpoint: u8,
330 timeout: std::time::Duration,
331 },
332 BulkStream {
333 endpoint: u8,
334 stream_id: u32,
335 timeout: std::time::Duration,
336 },
337}
338
339pub struct TransferProperties {
340 pub transfer_type: TransferType,
341 pub timeout: std::time::Duration,
342}
343
344enum TransferClutch {
345 Disengaged,
346 DisengagedFirst,
347 Engaged,
348}
349
350struct TransferContext {
351 ring: std::sync::Arc<SharedRingContext>,
352 transfer_index: usize,
353 clutch: TransferClutch,
354}
355
356#[no_mangle]
357extern "system" fn usb_transfer_callback(transfer_pointer: *mut libusb1_sys::libusb_transfer) {
358 let system_time = std::time::SystemTime::now();
359 let now = std::time::Instant::now();
360 let mut resubmit = false;
361 {
362 let transfer = unsafe { &mut *transfer_pointer };
364 let context = transfer.user_data;
365 assert!(!context.is_null(), "context is null");
366 let context = unsafe { &mut *(context as *mut TransferContext) };
368 let mut error = None;
369 {
370 let mut shared = context
371 .ring
372 .shared
373 .lock()
374 .expect("ring context's lock is not poisoned");
375 match shared.transfer_statuses[context.transfer_index] {
376 TransferStatus::Active => match transfer.status {
377 libusb1_sys::constants::LIBUSB_TRANSFER_COMPLETED
378 | libusb1_sys::constants::LIBUSB_TRANSFER_TIMED_OUT => {
379 if !matches!(context.clutch, TransferClutch::Engaged) {
380 let active_buffer = shared.write_range.start;
381 shared.buffers[active_buffer].system_time = system_time;
382 shared.buffers[active_buffer].instant = now;
383 shared.buffers[active_buffer].first_after_overflow =
384 matches!(context.clutch, TransferClutch::DisengagedFirst);
385 shared.buffers[active_buffer].length = transfer.actual_length as usize;
386 shared.write_range.increment_start();
387 context.ring.shared_condvar.notify_one();
388 }
389 if shared.write_range.end == shared.read {
390 if matches!(shared.clutch, Clutch::Disengaged) {
391 shared.clutch = Clutch::Engaged;
392 (context.ring.on_overflow)(Overflow(()));
393 }
394 context.clutch = TransferClutch::Engaged;
395 transfer.buffer = shared.freewheel_buffers[context.transfer_index]
396 .data
397 .as_ptr();
398 transfer.length =
399 shared.freewheel_buffers[context.transfer_index].capacity as i32;
400 } else {
401 match shared.clutch {
402 Clutch::Disengaged => {
403 context.clutch = TransferClutch::Disengaged;
404 }
405 Clutch::Engaged => {
406 shared.clutch = Clutch::Disengaged;
407 context.clutch = TransferClutch::DisengagedFirst;
408 }
409 }
410 transfer.buffer = shared.buffers[shared.write_range.end].data.as_ptr();
411 transfer.length =
412 shared.buffers[shared.write_range.end].capacity as i32;
413 shared.write_range.increment_end();
414 }
415 resubmit = true;
416 }
417 status @ (libusb1_sys::constants::LIBUSB_TRANSFER_ERROR
418 | libusb1_sys::constants::LIBUSB_TRANSFER_CANCELLED
419 | libusb1_sys::constants::LIBUSB_TRANSFER_STALL
420 | libusb1_sys::constants::LIBUSB_TRANSFER_NO_DEVICE
421 | libusb1_sys::constants::LIBUSB_TRANSFER_OVERFLOW) => {
422 if !matches!(context.clutch, TransferClutch::Engaged) {
423 let active_buffer = shared.write_range.start;
424 shared.buffers[active_buffer].system_time = system_time;
425 shared.buffers[active_buffer].instant = now;
426 shared.buffers[active_buffer].length = transfer.actual_length as usize;
427 shared.write_range.increment_start();
428 context.ring.shared_condvar.notify_one();
429 }
430 shared.clutch = Clutch::Engaged;
432 context.clutch = TransferClutch::Disengaged;
433 shared.transfer_statuses[context.transfer_index] = TransferStatus::Complete;
434 error = Some(
435 match status {
436 libusb1_sys::constants::LIBUSB_TRANSFER_ERROR
437 | libusb1_sys::constants::LIBUSB_TRANSFER_CANCELLED => {
438 rusb::Error::Io
439 }
440 libusb1_sys::constants::LIBUSB_TRANSFER_STALL => rusb::Error::Pipe,
441 libusb1_sys::constants::LIBUSB_TRANSFER_NO_DEVICE => {
442 rusb::Error::NoDevice
443 }
444 libusb1_sys::constants::LIBUSB_TRANSFER_OVERFLOW => {
445 rusb::Error::Overflow
446 }
447 _ => rusb::Error::Other,
448 }
449 .into(),
450 );
451 }
452 unknown_transfer_status => {
453 panic!("unknown transfer status {unknown_transfer_status}")
454 }
455 },
456 TransferStatus::Cancelling => match transfer.status {
457 libusb1_sys::constants::LIBUSB_TRANSFER_COMPLETED
458 | libusb1_sys::constants::LIBUSB_TRANSFER_TIMED_OUT
459 | libusb1_sys::constants::LIBUSB_TRANSFER_ERROR
460 | libusb1_sys::constants::LIBUSB_TRANSFER_CANCELLED
461 | libusb1_sys::constants::LIBUSB_TRANSFER_STALL
462 | libusb1_sys::constants::LIBUSB_TRANSFER_NO_DEVICE => {
463 if !matches!(context.clutch, TransferClutch::Engaged) {
464 let active_buffer = shared.write_range.start;
465 shared.buffers[active_buffer].system_time = system_time;
466 shared.buffers[active_buffer].instant = now;
467 shared.buffers[active_buffer].length = transfer.actual_length as usize;
468 shared.write_range.increment_start();
469 context.ring.shared_condvar.notify_one();
470 }
471 shared.clutch = Clutch::Engaged;
473 context.clutch = TransferClutch::Disengaged;
474 shared.transfer_statuses[context.transfer_index] = TransferStatus::Complete;
475 }
476 unknown_transfer_status => {
477 panic!("unknown transfer status {unknown_transfer_status}")
478 }
479 },
480 TransferStatus::Complete => {
481 panic!("callback called for a transfer marked as complete")
482 }
483 TransferStatus::Deallocated => {
484 panic!("callback called for a transfer marked as deallocated")
485 }
486 }
487 }
488 if let Some(error) = error {
489 (context.ring.on_error)(error);
490 }
491 }
492 if resubmit {
493 match unsafe { libusb1_sys::libusb_submit_transfer(transfer_pointer) } {
495 0 => (),
496 submit_transfer_status => {
497 let transfer = unsafe { &mut *transfer_pointer };
499 transfer.flags = 0;
500 let context = transfer.user_data;
501 assert!(!context.is_null(), "context is null");
502 let context = unsafe { &mut *(context as *mut TransferContext) };
504 (context.ring.on_error)(
505 match submit_transfer_status {
506 libusb1_sys::constants::LIBUSB_ERROR_IO => rusb::Error::Io,
507 libusb1_sys::constants::LIBUSB_ERROR_INVALID_PARAM => {
508 rusb::Error::InvalidParam
509 }
510 libusb1_sys::constants::LIBUSB_ERROR_ACCESS => rusb::Error::Access,
511 libusb1_sys::constants::LIBUSB_ERROR_NO_DEVICE => rusb::Error::NoDevice,
512 libusb1_sys::constants::LIBUSB_ERROR_NOT_FOUND => rusb::Error::NotFound,
513 libusb1_sys::constants::LIBUSB_ERROR_BUSY => rusb::Error::Busy,
514 libusb1_sys::constants::LIBUSB_ERROR_TIMEOUT => rusb::Error::Timeout,
515 libusb1_sys::constants::LIBUSB_ERROR_OVERFLOW => rusb::Error::Overflow,
516 libusb1_sys::constants::LIBUSB_ERROR_PIPE => rusb::Error::Pipe,
517 libusb1_sys::constants::LIBUSB_ERROR_INTERRUPTED => {
518 rusb::Error::Interrupted
519 }
520 libusb1_sys::constants::LIBUSB_ERROR_NO_MEM => rusb::Error::NoMem,
521 libusb1_sys::constants::LIBUSB_ERROR_NOT_SUPPORTED => {
522 rusb::Error::NotSupported
523 }
524 _ => rusb::Error::Other,
525 }
526 .into(),
527 );
528 }
529 }
530 }
531}
532
533impl Ring {
534 pub fn new<OnError, OnOverflow>(
535 handle: std::sync::Arc<rusb::DeviceHandle<rusb::Context>>,
536 configuration: &Configuration,
537 on_error: OnError,
538 on_overflow: OnOverflow,
539 event_loop: std::sync::Arc<EventLoop>,
540 transfer_type: TransferType,
541 ) -> Result<Self, Error>
542 where
543 OnError: Fn(Error) + Send + Sync + 'static,
544 OnOverflow: Fn(Overflow) + Send + Sync + 'static,
545 {
546 assert!(
547 handle.context() == event_loop.context(),
548 "handle and event_loop must have the same context"
549 );
550 if configuration.ring_length <= configuration.transfer_queue_length {
551 return Err(Error::ConfigurationSizes);
552 }
553 let mut buffers = Vec::new();
554 buffers.reserve_exact(configuration.ring_length);
555 let mut freewheel_buffers = Vec::new();
556 freewheel_buffers.reserve_exact(configuration.transfer_queue_length);
557 for index in 0..configuration.ring_length + configuration.transfer_queue_length {
558 let dma_buffer = if configuration.allow_dma {
559 unsafe {
561 libusb_dev_mem_alloc(
562 handle.as_raw(),
563 configuration.buffer_length as libc::ssize_t,
564 )
565 }
566 } else {
567 std::ptr::null_mut()
568 };
569 if dma_buffer.is_null() {
570 (if index < configuration.ring_length {
571 &mut buffers
572 } else {
573 &mut freewheel_buffers
574 })
575 .push(Buffer {
576 system_time: std::time::SystemTime::now(),
577 instant: std::time::Instant::now(),
578 first_after_overflow: false,
579 data: BufferData(
580 std::ptr::NonNull::new(
581 unsafe {
587 std::alloc::alloc(std::alloc::Layout::from_size_align_unchecked(
588 configuration.buffer_length,
589 1,
590 ))
591 },
592 )
593 .ok_or(rusb::Error::NoMem)?,
594 ),
595 length: 0,
596 capacity: configuration.buffer_length,
597 dma: false,
598 });
599 } else {
600 (if index < configuration.ring_length {
601 &mut buffers
602 } else {
603 &mut freewheel_buffers
604 })
605 .push(Buffer {
606 system_time: std::time::SystemTime::now(),
607 instant: std::time::Instant::now(),
608 first_after_overflow: false,
609 data: BufferData(unsafe { std::ptr::NonNull::new_unchecked(dma_buffer) }),
611 length: 0,
612 capacity: configuration.buffer_length,
613 dma: true,
614 });
615 }
616 }
617 let mut transfer_statuses = Vec::new();
618 transfer_statuses.reserve_exact(configuration.transfer_queue_length);
619 for _ in 0..configuration.transfer_queue_length {
620 transfer_statuses.push(TransferStatus::Active);
621 }
622 let context = std::sync::Arc::new(SharedRingContext {
623 on_error: Box::new(on_error),
624 on_overflow: Box::new(on_overflow),
625 shared: std::sync::Mutex::new(RingContext {
626 read: buffers.len() - 1,
627 write_range: WriteRange {
628 start: 0,
629 end: configuration.transfer_queue_length,
630 ring_length: configuration.ring_length,
631 },
632 transfer_statuses,
633 buffers,
634 freewheel_buffers,
635 clutch: Clutch::Disengaged,
636 }),
637 shared_condvar: std::sync::Condvar::new(),
638 });
639 let mut transfers: Vec<LibusbTransfer> = Vec::new();
640 transfers.reserve_exact(configuration.transfer_queue_length);
641 {
642 let shared = context
643 .shared
644 .lock()
645 .expect("ring context's lock is not poisoned");
646 for index in 0..configuration.transfer_queue_length {
647 let mut transfer = match std::ptr::NonNull::new(unsafe {
649 libusb1_sys::libusb_alloc_transfer(0)
650 }) {
651 Some(transfer) => LibusbTransfer(transfer),
652 None => {
653 for transfer in transfers.iter_mut().take(index) {
654 unsafe {
656 let _ = Box::from_raw(
657 (transfer.as_mut()).user_data as *mut TransferContext,
658 );
659 };
660 unsafe { libusb1_sys::libusb_free_transfer(transfer.as_ptr()) };
662 }
663 return Err(rusb::Error::NoMem.into());
664 }
665 };
666 let transfer_context = Box::new(TransferContext {
667 ring: context.clone(),
668 transfer_index: index,
669 clutch: TransferClutch::Disengaged,
670 });
671 let transfer_context_pointer = Box::into_raw(transfer_context);
672 match transfer_type {
673 TransferType::Control(timeout) => unsafe {
675 libusb1_sys::libusb_fill_control_transfer(
676 transfer.as_ptr(),
677 handle.as_raw(),
678 shared.buffers[index].data.as_ptr(),
679 usb_transfer_callback,
680 transfer_context_pointer as *mut libc::c_void,
681 timeout.as_millis() as libc::c_uint,
682 )
683 },
684 TransferType::Isochronous {
686 endpoint,
687 packets,
688 timeout,
689 } => unsafe {
690 libusb1_sys::libusb_fill_iso_transfer(
691 transfer.as_ptr(),
692 handle.as_raw(),
693 endpoint,
694 shared.buffers[index].data.as_ptr(),
695 shared.buffers[index].capacity as libc::c_int,
696 packets as libc::c_int,
697 usb_transfer_callback,
698 transfer_context_pointer as *mut libc::c_void,
699 timeout.as_millis() as libc::c_uint,
700 )
701 },
702 TransferType::Bulk { endpoint, timeout } => unsafe {
704 libusb1_sys::libusb_fill_bulk_transfer(
705 transfer.as_ptr(),
706 handle.as_raw(),
707 endpoint,
708 shared.buffers[index].data.as_ptr(),
709 shared.buffers[index].capacity as libc::c_int,
710 usb_transfer_callback,
711 transfer_context_pointer as *mut libc::c_void,
712 timeout.as_millis() as libc::c_uint,
713 )
714 },
715 TransferType::Interrupt { endpoint, timeout } => unsafe {
717 libusb1_sys::libusb_fill_interrupt_transfer(
718 transfer.as_ptr(),
719 handle.as_raw(),
720 endpoint,
721 shared.buffers[index].data.as_ptr(),
722 shared.buffers[index].capacity as libc::c_int,
723 usb_transfer_callback,
724 transfer_context_pointer as *mut libc::c_void,
725 timeout.as_millis() as libc::c_uint,
726 )
727 },
728 TransferType::BulkStream {
730 endpoint,
731 stream_id,
732 timeout,
733 } => unsafe {
734 libusb1_sys::libusb_fill_bulk_stream_transfer(
735 transfer.as_ptr(),
736 handle.as_raw(),
737 endpoint,
738 stream_id,
739 shared.buffers[index].data.as_ptr(),
740 shared.buffers[index].capacity as libc::c_int,
741 usb_transfer_callback,
742 transfer_context_pointer as *mut libc::c_void,
743 timeout.as_millis() as libc::c_uint,
744 )
745 },
746 }
747 unsafe {
749 transfer.as_mut().flags = 0; }
754 transfers.push(transfer);
755 }
756 }
757 let result = Self {
758 transfers,
759 handle,
760 active_buffer_view: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
761 event_loop,
762 context,
763 };
764 for (index, transfer) in result.transfers.iter().enumerate() {
765 match unsafe { libusb1_sys::libusb_submit_transfer(transfer.as_ptr()) } {
767 0 => (),
768 submit_transfer_status => {
769 {
770 let mut shared = result
771 .context
772 .shared
773 .lock()
774 .expect("ring context's lock is not poisoned");
775 for rest_index in index..result.transfers.len() {
776 shared.transfer_statuses[rest_index] = TransferStatus::Complete;
779 }
780 }
781 return Err(match submit_transfer_status {
782 libusb1_sys::constants::LIBUSB_ERROR_IO => rusb::Error::Io,
783 libusb1_sys::constants::LIBUSB_ERROR_INVALID_PARAM => {
784 rusb::Error::InvalidParam
785 }
786 libusb1_sys::constants::LIBUSB_ERROR_ACCESS => rusb::Error::Access,
787 libusb1_sys::constants::LIBUSB_ERROR_NO_DEVICE => rusb::Error::NoDevice,
788 libusb1_sys::constants::LIBUSB_ERROR_NOT_FOUND => rusb::Error::NotFound,
789 libusb1_sys::constants::LIBUSB_ERROR_BUSY => rusb::Error::Busy,
790 libusb1_sys::constants::LIBUSB_ERROR_TIMEOUT => rusb::Error::Timeout,
791 libusb1_sys::constants::LIBUSB_ERROR_OVERFLOW => rusb::Error::Overflow,
792 libusb1_sys::constants::LIBUSB_ERROR_PIPE => rusb::Error::Pipe,
793 libusb1_sys::constants::LIBUSB_ERROR_INTERRUPTED => {
794 rusb::Error::Interrupted
795 }
796 libusb1_sys::constants::LIBUSB_ERROR_NO_MEM => rusb::Error::NoMem,
797 libusb1_sys::constants::LIBUSB_ERROR_NOT_SUPPORTED => {
798 rusb::Error::NotSupported
799 }
800 _ => rusb::Error::Other,
801 }
802 .into());
803 }
804 }
805 }
806 Ok(result)
807 }
808}
809
810pub struct BufferView<'a> {
811 pub system_time: std::time::SystemTime,
812 pub instant: std::time::Instant,
813 pub first_after_overflow: bool,
814 pub slice: &'a [u8],
815 pub read: usize,
816 pub write_range: WriteRange,
817 pub clutch: Clutch,
818 active: std::sync::Arc<std::sync::atomic::AtomicBool>,
819}
820
821impl BufferView<'_> {
822 pub fn backlog(&self) -> usize {
823 let result = (self.write_range.start + self.write_range.ring_length - 1 - self.read)
824 % self.write_range.ring_length;
825 if matches!(self.clutch, Clutch::Engaged) && result == 0 {
826 self.write_range.ring_length
827 } else {
828 result
829 }
830 }
831
832 pub fn delay(&self) -> std::time::Duration {
833 self.instant.elapsed()
834 }
835}
836
837impl Drop for BufferView<'_> {
838 fn drop(&mut self) {
839 self.active
840 .store(false, std::sync::atomic::Ordering::Release);
841 }
842}
843
844impl Ring {
845 pub fn backlog(&self) -> usize {
846 let shared = self
847 .context
848 .shared
849 .lock()
850 .expect("ring context's lock is not poisoned");
851 (shared.write_range.start + shared.buffers.len() - 1 - shared.read) % shared.buffers.len()
852 }
853
854 pub fn clutch(&self) -> Clutch {
855 let shared = self
856 .context
857 .shared
858 .lock()
859 .expect("ring context's lock is not poisoned");
860 shared.clutch
861 }
862
863 pub fn next_with_timeout(&self, duration: &std::time::Duration) -> Option<BufferView> {
864 if self
865 .active_buffer_view
866 .swap(true, std::sync::atomic::Ordering::AcqRel)
867 {
868 panic!("the buffer returned by a previous call of next_with_timeout must be dropped before calling next_with_timeout again");
869 }
870 let (system_time, instant, first_after_overflow, slice, read, write_range, clutch) = {
871 let start = std::time::Instant::now();
872 let mut shared = self
873 .context
874 .shared
875 .lock()
876 .expect("ring context's lock is not poisoned");
877 loop {
878 shared.read = (shared.read + 1) % shared.buffers.len();
879 while (shared.write_range.end + shared.buffers.len() - 1 - shared.read)
880 % shared.buffers.len()
881 < shared.transfer_statuses.len()
882 {
883 let ellapsed = std::time::Instant::now() - start;
884 if ellapsed >= *duration {
885 self.active_buffer_view
886 .store(false, std::sync::atomic::Ordering::Release);
887 shared.read =
888 (shared.read + shared.buffers.len() - 1) % shared.buffers.len();
889 return None;
890 }
891 shared = self
892 .context
893 .shared_condvar
894 .wait_timeout(shared, *duration - ellapsed)
895 .expect("shared_condvar used with two different mutexes")
896 .0;
897 }
898 if shared.buffers[shared.read].length > 0 {
899 break;
900 }
901 }
902 (
903 shared.buffers[shared.read].system_time,
904 shared.buffers[shared.read].instant,
905 shared.buffers[shared.read].first_after_overflow,
906 unsafe {
908 std::slice::from_raw_parts(
909 shared.buffers[shared.read].data.as_ptr(),
910 shared.buffers[shared.read].length,
911 )
912 },
913 shared.read,
914 shared.write_range.clone(),
915 shared.clutch,
916 )
917 };
918 Some(BufferView {
919 system_time,
920 instant,
921 first_after_overflow,
922 slice,
923 read,
924 write_range,
925 clutch,
926 active: self.active_buffer_view.clone(),
927 })
928 }
929}
930
931impl Drop for Ring {
932 fn drop(&mut self) {
933 let mut dealloc_buffers = true;
934 let before_dealloc_transfers = std::time::Instant::now();
935 #[cfg(target_os = "macos")]
936 {
937 let mut shared = self
938 .context
939 .shared
940 .lock()
941 .expect("ring context's lock is not poisoned");
942 let _ = unsafe { libusb1_sys::libusb_cancel_transfer(self.transfers[0].as_ptr()) };
944 for index in 0..self.transfers.len() {
945 shared.transfer_statuses[index] = TransferStatus::Cancelling;
946 }
947 }
948 loop {
949 let mut deallocated_transfers: usize = 0;
950 {
951 let mut shared = self
952 .context
953 .shared
954 .lock()
955 .expect("ring context's lock is not poisoned");
956 for index in 0..self.transfers.len() {
957 match shared.transfer_statuses[index] {
958 TransferStatus::Active => {
959 let status = unsafe {
960 libusb1_sys::libusb_cancel_transfer(self.transfers[index].as_ptr())
961 };
962 if status == 0 {
963 shared.transfer_statuses[index] = TransferStatus::Cancelling;
964 } else {
965 shared.transfer_statuses[index] = TransferStatus::Complete;
966 }
967 }
968 TransferStatus::Complete => {
969 let _transfer_context = unsafe {
971 Box::from_raw(
972 (self.transfers[index].as_mut()).user_data
973 as *mut TransferContext,
974 )
975 };
976 unsafe {
978 libusb1_sys::libusb_free_transfer(self.transfers[index].as_ptr())
979 };
980 shared.transfer_statuses[index] = TransferStatus::Deallocated;
981 deallocated_transfers += 1;
982 }
983 TransferStatus::Cancelling => (),
984 TransferStatus::Deallocated => {
985 deallocated_transfers += 1;
986 }
987 }
988 }
989 }
990 if deallocated_transfers == self.transfers.len() {
991 break;
992 }
993 if std::time::Instant::now() - before_dealloc_transfers
995 > std::time::Duration::from_secs(1)
996 {
997 dealloc_buffers = false;
998 break;
999 }
1000 std::thread::sleep(std::time::Duration::from_millis(100));
1001 }
1002 if dealloc_buffers {
1003 let shared = self
1004 .context
1005 .shared
1006 .lock()
1007 .expect("ring context's lock is not poisoned");
1008 for buffer in shared.buffers.iter() {
1009 if buffer.dma {
1010 unsafe {
1012 libusb_dev_mem_free(
1013 self.handle.as_raw(),
1014 buffer.data.as_ptr() as *mut libc::c_uchar,
1015 buffer.capacity as libc::ssize_t,
1016 );
1017 };
1018 } else {
1019 unsafe {
1021 std::alloc::dealloc(
1022 buffer.data.as_ptr(),
1023 std::alloc::Layout::from_size_align_unchecked(buffer.capacity, 1),
1024 );
1025 }
1026 }
1027 }
1028 }
1029 }
1030}