1use crate::flag;
2use rusb::UsbContext;
3
4#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
5pub struct Configuration {
6    pub buffer_length: usize,
7    pub ring_length: usize,
8    pub transfer_queue_length: usize,
9    pub allow_dma: bool,
10}
11
12impl Configuration {
13    pub fn deserialize_bincode(data: &[u8]) -> bincode::Result<Configuration> {
14        bincode::deserialize(data)
15    }
16}
17
18#[derive(thiserror::Error, Debug, Clone)]
19pub enum Error {
20    #[error(transparent)]
21    Rusb(#[from] rusb::Error),
22
23    #[error("device with serial not found")]
24    Serial(String),
25
26    #[error("there is no device on bus {bus_number} at address {address}")]
27    BusNumberAndAddressNotFound { bus_number: u8, address: u8 },
28
29    #[error("unsupported device on bus {bus_number} at address {address}")]
30    BusNumberAndAddressUnsupportedDevice { bus_number: u8, address: u8 },
31
32    #[error("could not access device on bus {bus_number} at address {address} ({error:?})")]
33    BusNumberAndAddressAccessError {
34        bus_number: u8,
35        address: u8,
36        error: rusb::Error,
37    },
38
39    #[error("the device on bus {bus_number} at address {address} has an unsupported VID:PID ({vendor_id:04X}:{product_id:04X})")]
40    BusNumberAndAddressUnexpectedIds {
41        bus_number: u8,
42        address: u8,
43        vendor_id: u16,
44        product_id: u16,
45    },
46
47    #[error("device not found")]
48    Device,
49
50    #[error("ring size is smaller than or equal to transfer queue size")]
51    ConfigurationSizes,
52
53    #[error("ring overflow")]
54    Overflow,
55
56    #[error("control transfer error (expected {expected:?}, read {read:?})")]
57    Mismatch { expected: Vec<u8>, read: Vec<u8> },
58
59    #[error("control transfer error (expected one of {expected:?}, read {read:?})")]
60    MismatchAny {
61        expected: Vec<Vec<u8>>,
62        read: Vec<u8>,
63    },
64
65    #[error("the device is already used by another program")]
66    Busy,
67}
68
69#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
70pub enum Speed {
71    Unknown,
72    Low,
73    Full,
74    High,
75    Super,
76    SuperPlus,
77}
78
79impl From<rusb::Speed> for Speed {
80    fn from(speed: rusb::Speed) -> Self {
81        match speed {
82            rusb::Speed::Low => Self::Low,
83            rusb::Speed::Full => Self::Full,
84            rusb::Speed::High => Self::High,
85            rusb::Speed::Super => Self::Super,
86            rusb::Speed::SuperPlus => Self::SuperPlus,
87            _ => Self::Unknown,
88        }
89    }
90}
91
92impl std::fmt::Display for Speed {
93    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        match self {
95            Self::Unknown => write!(formatter, "USB Unknown speed"),
96            Self::Low => write!(formatter, "USB 1.0 Low Speed (1.5 Mb/s)"),
97            Self::Full => write!(formatter, "USB 1.1 Full Speed (12 Mb/s)"),
98            Self::High => write!(formatter, "USB 2.0 High Speed (480 Mb/s)"),
99            Self::Super => write!(formatter, "USB 3.0 SuperSpeed (5.0 Gb/s)"),
100            Self::SuperPlus => write!(formatter, "USB 3.1 SuperSpeed+ (10.0 Gb/s)"),
101        }
102    }
103}
104
105pub fn assert_control_transfer(
106    handle: &rusb::DeviceHandle<rusb::Context>,
107    request_type: u8,
108    request: u8,
109    value: u16,
110    index: u16,
111    expected_buffer: &[u8],
112    timeout: std::time::Duration,
113) -> Result<(), Error> {
114    let mut buffer = vec![0; expected_buffer.len()];
115    let read = handle.read_control(request_type, request, value, index, &mut buffer, timeout)?;
116    buffer.truncate(read);
117    if expected_buffer == &buffer[..] {
118        Ok(())
119    } else {
120        Err(Error::Mismatch {
121            expected: Vec::from(expected_buffer),
122            read: buffer,
123        })
124    }
125}
126
127pub fn assert_string_descriptor_any(
128    handle: &rusb::DeviceHandle<rusb::Context>,
129    request_type: u8,
130    request: u8,
131    value: u16,
132    index: u16,
133    expected_buffers: &[&[u8]],
134    timeout: std::time::Duration,
135) -> Result<(), Error> {
136    let mut buffer = vec![
137        0;
138        expected_buffers
139            .iter()
140            .fold(0, |maximum, expected_buffer| maximum
141                .max(expected_buffer.len())
142                + 2)
143    ];
144    let read = handle.read_control(request_type, request, value, index, &mut buffer, timeout)?;
145    buffer.truncate(read);
146    for expected_buffer in expected_buffers {
147        if *expected_buffer == &buffer[2..] {
148            return Ok(());
149        }
150    }
151    buffer.drain(0..2);
152    Err(Error::MismatchAny {
153        expected: expected_buffers
154            .iter()
155            .map(|expected_buffer| Vec::from(*expected_buffer))
156            .collect(),
157        read: buffer,
158    })
159}
160
161extern "system" {
162    pub fn libusb_dev_mem_alloc(
163        dev_handle: *mut libusb1_sys::libusb_device_handle,
164        length: libc::ssize_t,
165    ) -> *mut libc::c_uchar;
166
167    pub fn libusb_dev_mem_free(
168        dev_handle: *mut libusb1_sys::libusb_device_handle,
169        buffer: *mut libc::c_uchar,
170        length: libc::ssize_t,
171    ) -> *mut libc::c_int;
172}
173
174struct BufferData(std::ptr::NonNull<u8>);
175
176unsafe impl Send for BufferData {}
177unsafe impl Sync for BufferData {}
178
179impl BufferData {
180    fn as_ptr(&self) -> *mut u8 {
181        self.0.as_ptr()
182    }
183}
184
185struct Buffer {
186    system_time: std::time::SystemTime,
187    instant: std::time::Instant,
188    first_after_overflow: bool,
189    data: BufferData,
190    length: usize,
191    capacity: usize,
192    dma: bool,
193}
194
195pub struct EventLoop {
196    context: rusb::Context,
197    running: std::sync::Arc<std::sync::atomic::AtomicBool>,
198    thread: Option<std::thread::JoinHandle<()>>,
199}
200
201#[derive(Debug, Clone, Copy)]
202pub struct Overflow(());
203
204impl EventLoop {
205    pub fn new<IntoError, IntoWarning>(
206        timeout: std::time::Duration,
207        flag: flag::Flag<IntoError, IntoWarning>,
208    ) -> Result<Self, Error>
209    where
210        IntoError: From<Error> + Clone + Send + 'static,
211        IntoWarning: From<Overflow> + Clone + Send + 'static,
212    {
213        let context = rusb::Context::new()?;
214        let running = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(true));
215        let thread_running = running.clone();
216        let thread_context = context.clone();
217        Ok(Self {
218            context,
219            thread: Some(std::thread::spawn(move || {
220                while thread_running.load(std::sync::atomic::Ordering::Acquire) {
221                    if let Err(handle_events_error) = thread_context.handle_events(Some(timeout)) {
222                        flag.store_error_if_not_set(Error::from(handle_events_error));
223                    }
224                }
225            })),
226            running,
227        })
228    }
229
230    pub fn context(&self) -> &rusb::Context {
231        &self.context
232    }
233}
234
235impl Drop for EventLoop {
236    fn drop(&mut self) {
237        self.running
238            .store(false, std::sync::atomic::Ordering::Release);
239        if let Some(thread) = self.thread.take() {
240            thread.join().expect("event loop joined self");
241        }
242    }
243}
244
245enum TransferStatus {
246    Active,
247    Complete,
248    Cancelling,
249    Deallocated,
250}
251
252#[derive(Clone)]
253pub struct WriteRange {
254    pub start: usize,
255    pub end: usize,
256    pub ring_length: usize,
257}
258
259impl WriteRange {
260    fn increment_start(&mut self) {
261        self.start = (self.start + 1) % self.ring_length;
262    }
263
264    fn increment_end(&mut self) {
265        self.end = (self.end + 1) % self.ring_length;
266    }
267}
268
269#[derive(Debug, Clone, Copy)]
270pub enum Clutch {
271    Disengaged,
272    Engaged,
273}
274
275struct RingContext {
276    read: usize,
277    write_range: WriteRange,
278    transfer_statuses: Vec<TransferStatus>,
279    buffers: Vec<Buffer>,
280    freewheel_buffers: Vec<Buffer>,
281    clutch: Clutch,
282}
283
284struct SharedRingContext {
285    on_error: Box<dyn Fn(Error) + Send + Sync + 'static>,
286    on_overflow: Box<dyn Fn(Overflow) + Send + Sync + 'static>,
287    shared: std::sync::Mutex<RingContext>,
288    shared_condvar: std::sync::Condvar,
289}
290
291struct LibusbTransfer(std::ptr::NonNull<libusb1_sys::libusb_transfer>);
292
293unsafe impl Send for LibusbTransfer {}
294
295impl LibusbTransfer {
296    unsafe fn as_mut(&mut self) -> &mut libusb1_sys::libusb_transfer {
297        self.0.as_mut()
298    }
299
300    fn as_ptr(&self) -> *mut libusb1_sys::libusb_transfer {
301        self.0.as_ptr()
302    }
303}
304
305pub struct Ring {
306    transfers: Vec<LibusbTransfer>,
307    handle: std::sync::Arc<rusb::DeviceHandle<rusb::Context>>,
308    active_buffer_view: std::sync::Arc<std::sync::atomic::AtomicBool>,
309    #[allow(dead_code)]
310    event_loop: std::sync::Arc<EventLoop>,
311    context: std::sync::Arc<SharedRingContext>,
312}
313
314unsafe impl Send for Ring {}
315unsafe impl Sync for Ring {}
316
317pub enum TransferType {
318    Control(std::time::Duration),
319    Isochronous {
320        endpoint: u8,
321        packets: u32,
322        timeout: std::time::Duration,
323    },
324    Bulk {
325        endpoint: u8,
326        timeout: std::time::Duration,
327    },
328    Interrupt {
329        endpoint: u8,
330        timeout: std::time::Duration,
331    },
332    BulkStream {
333        endpoint: u8,
334        stream_id: u32,
335        timeout: std::time::Duration,
336    },
337}
338
339pub struct TransferProperties {
340    pub transfer_type: TransferType,
341    pub timeout: std::time::Duration,
342}
343
344enum TransferClutch {
345    Disengaged,
346    DisengagedFirst,
347    Engaged,
348}
349
350struct TransferContext {
351    ring: std::sync::Arc<SharedRingContext>,
352    transfer_index: usize,
353    clutch: TransferClutch,
354}
355
356#[no_mangle]
357extern "system" fn usb_transfer_callback(transfer_pointer: *mut libusb1_sys::libusb_transfer) {
358    let system_time = std::time::SystemTime::now();
359    let now = std::time::Instant::now();
360    let mut resubmit = false;
361    {
362        let transfer = unsafe { &mut *transfer_pointer };
364        let context = transfer.user_data;
365        assert!(!context.is_null(), "context is null");
366        let context = unsafe { &mut *(context as *mut TransferContext) };
368        let mut error = None;
369        {
370            let mut shared = context
371                .ring
372                .shared
373                .lock()
374                .expect("ring context's lock is not poisoned");
375            match shared.transfer_statuses[context.transfer_index] {
376                TransferStatus::Active => match transfer.status {
377                    libusb1_sys::constants::LIBUSB_TRANSFER_COMPLETED
378                    | libusb1_sys::constants::LIBUSB_TRANSFER_TIMED_OUT => {
379                        if !matches!(context.clutch, TransferClutch::Engaged) {
380                            let active_buffer = shared.write_range.start;
381                            shared.buffers[active_buffer].system_time = system_time;
382                            shared.buffers[active_buffer].instant = now;
383                            shared.buffers[active_buffer].first_after_overflow =
384                                matches!(context.clutch, TransferClutch::DisengagedFirst);
385                            shared.buffers[active_buffer].length = transfer.actual_length as usize;
386                            shared.write_range.increment_start();
387                            context.ring.shared_condvar.notify_one();
388                        }
389                        if shared.write_range.end == shared.read {
390                            if matches!(shared.clutch, Clutch::Disengaged) {
391                                shared.clutch = Clutch::Engaged;
392                                (context.ring.on_overflow)(Overflow(()));
393                            }
394                            context.clutch = TransferClutch::Engaged;
395                            transfer.buffer = shared.freewheel_buffers[context.transfer_index]
396                                .data
397                                .as_ptr();
398                            transfer.length =
399                                shared.freewheel_buffers[context.transfer_index].capacity as i32;
400                        } else {
401                            match shared.clutch {
402                                Clutch::Disengaged => {
403                                    context.clutch = TransferClutch::Disengaged;
404                                }
405                                Clutch::Engaged => {
406                                    shared.clutch = Clutch::Disengaged;
407                                    context.clutch = TransferClutch::DisengagedFirst;
408                                }
409                            }
410                            transfer.buffer = shared.buffers[shared.write_range.end].data.as_ptr();
411                            transfer.length =
412                                shared.buffers[shared.write_range.end].capacity as i32;
413                            shared.write_range.increment_end();
414                        }
415                        resubmit = true;
416                    }
417                    status @ (libusb1_sys::constants::LIBUSB_TRANSFER_ERROR
418                    | libusb1_sys::constants::LIBUSB_TRANSFER_CANCELLED
419                    | libusb1_sys::constants::LIBUSB_TRANSFER_STALL
420                    | libusb1_sys::constants::LIBUSB_TRANSFER_NO_DEVICE
421                    | libusb1_sys::constants::LIBUSB_TRANSFER_OVERFLOW) => {
422                        if !matches!(context.clutch, TransferClutch::Engaged) {
423                            let active_buffer = shared.write_range.start;
424                            shared.buffers[active_buffer].system_time = system_time;
425                            shared.buffers[active_buffer].instant = now;
426                            shared.buffers[active_buffer].length = transfer.actual_length as usize;
427                            shared.write_range.increment_start();
428                            context.ring.shared_condvar.notify_one();
429                        }
430                        shared.clutch = Clutch::Engaged;
432                        context.clutch = TransferClutch::Disengaged;
433                        shared.transfer_statuses[context.transfer_index] = TransferStatus::Complete;
434                        error = Some(
435                            match status {
436                                libusb1_sys::constants::LIBUSB_TRANSFER_ERROR
437                                | libusb1_sys::constants::LIBUSB_TRANSFER_CANCELLED => {
438                                    rusb::Error::Io
439                                }
440                                libusb1_sys::constants::LIBUSB_TRANSFER_STALL => rusb::Error::Pipe,
441                                libusb1_sys::constants::LIBUSB_TRANSFER_NO_DEVICE => {
442                                    rusb::Error::NoDevice
443                                }
444                                libusb1_sys::constants::LIBUSB_TRANSFER_OVERFLOW => {
445                                    rusb::Error::Overflow
446                                }
447                                _ => rusb::Error::Other,
448                            }
449                            .into(),
450                        );
451                    }
452                    unknown_transfer_status => {
453                        panic!("unknown transfer status {unknown_transfer_status}")
454                    }
455                },
456                TransferStatus::Cancelling => match transfer.status {
457                    libusb1_sys::constants::LIBUSB_TRANSFER_COMPLETED
458                    | libusb1_sys::constants::LIBUSB_TRANSFER_TIMED_OUT
459                    | libusb1_sys::constants::LIBUSB_TRANSFER_ERROR
460                    | libusb1_sys::constants::LIBUSB_TRANSFER_CANCELLED
461                    | libusb1_sys::constants::LIBUSB_TRANSFER_STALL
462                    | libusb1_sys::constants::LIBUSB_TRANSFER_NO_DEVICE => {
463                        if !matches!(context.clutch, TransferClutch::Engaged) {
464                            let active_buffer = shared.write_range.start;
465                            shared.buffers[active_buffer].system_time = system_time;
466                            shared.buffers[active_buffer].instant = now;
467                            shared.buffers[active_buffer].length = transfer.actual_length as usize;
468                            shared.write_range.increment_start();
469                            context.ring.shared_condvar.notify_one();
470                        }
471                        shared.clutch = Clutch::Engaged;
473                        context.clutch = TransferClutch::Disengaged;
474                        shared.transfer_statuses[context.transfer_index] = TransferStatus::Complete;
475                    }
476                    unknown_transfer_status => {
477                        panic!("unknown transfer status {unknown_transfer_status}")
478                    }
479                },
480                TransferStatus::Complete => {
481                    panic!("callback called for a transfer marked as complete")
482                }
483                TransferStatus::Deallocated => {
484                    panic!("callback called for a transfer marked as deallocated")
485                }
486            }
487        }
488        if let Some(error) = error {
489            (context.ring.on_error)(error);
490        }
491    }
492    if resubmit {
493        match unsafe { libusb1_sys::libusb_submit_transfer(transfer_pointer) } {
495            0 => (),
496            submit_transfer_status => {
497                let transfer = unsafe { &mut *transfer_pointer };
499                transfer.flags = 0;
500                let context = transfer.user_data;
501                assert!(!context.is_null(), "context is null");
502                let context = unsafe { &mut *(context as *mut TransferContext) };
504                (context.ring.on_error)(
505                    match submit_transfer_status {
506                        libusb1_sys::constants::LIBUSB_ERROR_IO => rusb::Error::Io,
507                        libusb1_sys::constants::LIBUSB_ERROR_INVALID_PARAM => {
508                            rusb::Error::InvalidParam
509                        }
510                        libusb1_sys::constants::LIBUSB_ERROR_ACCESS => rusb::Error::Access,
511                        libusb1_sys::constants::LIBUSB_ERROR_NO_DEVICE => rusb::Error::NoDevice,
512                        libusb1_sys::constants::LIBUSB_ERROR_NOT_FOUND => rusb::Error::NotFound,
513                        libusb1_sys::constants::LIBUSB_ERROR_BUSY => rusb::Error::Busy,
514                        libusb1_sys::constants::LIBUSB_ERROR_TIMEOUT => rusb::Error::Timeout,
515                        libusb1_sys::constants::LIBUSB_ERROR_OVERFLOW => rusb::Error::Overflow,
516                        libusb1_sys::constants::LIBUSB_ERROR_PIPE => rusb::Error::Pipe,
517                        libusb1_sys::constants::LIBUSB_ERROR_INTERRUPTED => {
518                            rusb::Error::Interrupted
519                        }
520                        libusb1_sys::constants::LIBUSB_ERROR_NO_MEM => rusb::Error::NoMem,
521                        libusb1_sys::constants::LIBUSB_ERROR_NOT_SUPPORTED => {
522                            rusb::Error::NotSupported
523                        }
524                        _ => rusb::Error::Other,
525                    }
526                    .into(),
527                );
528            }
529        }
530    }
531}
532
533impl Ring {
534    pub fn new<OnError, OnOverflow>(
535        handle: std::sync::Arc<rusb::DeviceHandle<rusb::Context>>,
536        configuration: &Configuration,
537        on_error: OnError,
538        on_overflow: OnOverflow,
539        event_loop: std::sync::Arc<EventLoop>,
540        transfer_type: TransferType,
541    ) -> Result<Self, Error>
542    where
543        OnError: Fn(Error) + Send + Sync + 'static,
544        OnOverflow: Fn(Overflow) + Send + Sync + 'static,
545    {
546        assert!(
547            handle.context() == event_loop.context(),
548            "handle and event_loop must have the same context"
549        );
550        if configuration.ring_length <= configuration.transfer_queue_length {
551            return Err(Error::ConfigurationSizes);
552        }
553        let mut buffers = Vec::new();
554        buffers.reserve_exact(configuration.ring_length);
555        let mut freewheel_buffers = Vec::new();
556        freewheel_buffers.reserve_exact(configuration.transfer_queue_length);
557        for index in 0..configuration.ring_length + configuration.transfer_queue_length {
558            let dma_buffer = if configuration.allow_dma {
559                unsafe {
561                    libusb_dev_mem_alloc(
562                        handle.as_raw(),
563                        configuration.buffer_length as libc::ssize_t,
564                    )
565                }
566            } else {
567                std::ptr::null_mut()
568            };
569            if dma_buffer.is_null() {
570                (if index < configuration.ring_length {
571                    &mut buffers
572                } else {
573                    &mut freewheel_buffers
574                })
575                .push(Buffer {
576                    system_time: std::time::SystemTime::now(),
577                    instant: std::time::Instant::now(),
578                    first_after_overflow: false,
579                    data: BufferData(
580                        std::ptr::NonNull::new(
581                            unsafe {
587                                std::alloc::alloc(std::alloc::Layout::from_size_align_unchecked(
588                                    configuration.buffer_length,
589                                    1,
590                                ))
591                            },
592                        )
593                        .ok_or(rusb::Error::NoMem)?,
594                    ),
595                    length: 0,
596                    capacity: configuration.buffer_length,
597                    dma: false,
598                });
599            } else {
600                (if index < configuration.ring_length {
601                    &mut buffers
602                } else {
603                    &mut freewheel_buffers
604                })
605                .push(Buffer {
606                    system_time: std::time::SystemTime::now(),
607                    instant: std::time::Instant::now(),
608                    first_after_overflow: false,
609                    data: BufferData(unsafe { std::ptr::NonNull::new_unchecked(dma_buffer) }),
611                    length: 0,
612                    capacity: configuration.buffer_length,
613                    dma: true,
614                });
615            }
616        }
617        let mut transfer_statuses = Vec::new();
618        transfer_statuses.reserve_exact(configuration.transfer_queue_length);
619        for _ in 0..configuration.transfer_queue_length {
620            transfer_statuses.push(TransferStatus::Active);
621        }
622        let context = std::sync::Arc::new(SharedRingContext {
623            on_error: Box::new(on_error),
624            on_overflow: Box::new(on_overflow),
625            shared: std::sync::Mutex::new(RingContext {
626                read: buffers.len() - 1,
627                write_range: WriteRange {
628                    start: 0,
629                    end: configuration.transfer_queue_length,
630                    ring_length: configuration.ring_length,
631                },
632                transfer_statuses,
633                buffers,
634                freewheel_buffers,
635                clutch: Clutch::Disengaged,
636            }),
637            shared_condvar: std::sync::Condvar::new(),
638        });
639        let mut transfers: Vec<LibusbTransfer> = Vec::new();
640        transfers.reserve_exact(configuration.transfer_queue_length);
641        {
642            let shared = context
643                .shared
644                .lock()
645                .expect("ring context's lock is not poisoned");
646            for index in 0..configuration.transfer_queue_length {
647                let mut transfer = match std::ptr::NonNull::new(unsafe {
649                    libusb1_sys::libusb_alloc_transfer(0)
650                }) {
651                    Some(transfer) => LibusbTransfer(transfer),
652                    None => {
653                        for transfer in transfers.iter_mut().take(index) {
654                            unsafe {
656                                let _ = Box::from_raw(
657                                    (transfer.as_mut()).user_data as *mut TransferContext,
658                                );
659                            };
660                            unsafe { libusb1_sys::libusb_free_transfer(transfer.as_ptr()) };
662                        }
663                        return Err(rusb::Error::NoMem.into());
664                    }
665                };
666                let transfer_context = Box::new(TransferContext {
667                    ring: context.clone(),
668                    transfer_index: index,
669                    clutch: TransferClutch::Disengaged,
670                });
671                let transfer_context_pointer = Box::into_raw(transfer_context);
672                match transfer_type {
673                    TransferType::Control(timeout) => unsafe {
675                        libusb1_sys::libusb_fill_control_transfer(
676                            transfer.as_ptr(),
677                            handle.as_raw(),
678                            shared.buffers[index].data.as_ptr(),
679                            usb_transfer_callback,
680                            transfer_context_pointer as *mut libc::c_void,
681                            timeout.as_millis() as libc::c_uint,
682                        )
683                    },
684                    TransferType::Isochronous {
686                        endpoint,
687                        packets,
688                        timeout,
689                    } => unsafe {
690                        libusb1_sys::libusb_fill_iso_transfer(
691                            transfer.as_ptr(),
692                            handle.as_raw(),
693                            endpoint,
694                            shared.buffers[index].data.as_ptr(),
695                            shared.buffers[index].capacity as libc::c_int,
696                            packets as libc::c_int,
697                            usb_transfer_callback,
698                            transfer_context_pointer as *mut libc::c_void,
699                            timeout.as_millis() as libc::c_uint,
700                        )
701                    },
702                    TransferType::Bulk { endpoint, timeout } => unsafe {
704                        libusb1_sys::libusb_fill_bulk_transfer(
705                            transfer.as_ptr(),
706                            handle.as_raw(),
707                            endpoint,
708                            shared.buffers[index].data.as_ptr(),
709                            shared.buffers[index].capacity as libc::c_int,
710                            usb_transfer_callback,
711                            transfer_context_pointer as *mut libc::c_void,
712                            timeout.as_millis() as libc::c_uint,
713                        )
714                    },
715                    TransferType::Interrupt { endpoint, timeout } => unsafe {
717                        libusb1_sys::libusb_fill_interrupt_transfer(
718                            transfer.as_ptr(),
719                            handle.as_raw(),
720                            endpoint,
721                            shared.buffers[index].data.as_ptr(),
722                            shared.buffers[index].capacity as libc::c_int,
723                            usb_transfer_callback,
724                            transfer_context_pointer as *mut libc::c_void,
725                            timeout.as_millis() as libc::c_uint,
726                        )
727                    },
728                    TransferType::BulkStream {
730                        endpoint,
731                        stream_id,
732                        timeout,
733                    } => unsafe {
734                        libusb1_sys::libusb_fill_bulk_stream_transfer(
735                            transfer.as_ptr(),
736                            handle.as_raw(),
737                            endpoint,
738                            stream_id,
739                            shared.buffers[index].data.as_ptr(),
740                            shared.buffers[index].capacity as libc::c_int,
741                            usb_transfer_callback,
742                            transfer_context_pointer as *mut libc::c_void,
743                            timeout.as_millis() as libc::c_uint,
744                        )
745                    },
746                }
747                unsafe {
749                    transfer.as_mut().flags = 0; }
754                transfers.push(transfer);
755            }
756        }
757        let result = Self {
758            transfers,
759            handle,
760            active_buffer_view: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
761            event_loop,
762            context,
763        };
764        for (index, transfer) in result.transfers.iter().enumerate() {
765            match unsafe { libusb1_sys::libusb_submit_transfer(transfer.as_ptr()) } {
767                0 => (),
768                submit_transfer_status => {
769                    {
770                        let mut shared = result
771                            .context
772                            .shared
773                            .lock()
774                            .expect("ring context's lock is not poisoned");
775                        for rest_index in index..result.transfers.len() {
776                            shared.transfer_statuses[rest_index] = TransferStatus::Complete;
779                        }
780                    }
781                    return Err(match submit_transfer_status {
782                        libusb1_sys::constants::LIBUSB_ERROR_IO => rusb::Error::Io,
783                        libusb1_sys::constants::LIBUSB_ERROR_INVALID_PARAM => {
784                            rusb::Error::InvalidParam
785                        }
786                        libusb1_sys::constants::LIBUSB_ERROR_ACCESS => rusb::Error::Access,
787                        libusb1_sys::constants::LIBUSB_ERROR_NO_DEVICE => rusb::Error::NoDevice,
788                        libusb1_sys::constants::LIBUSB_ERROR_NOT_FOUND => rusb::Error::NotFound,
789                        libusb1_sys::constants::LIBUSB_ERROR_BUSY => rusb::Error::Busy,
790                        libusb1_sys::constants::LIBUSB_ERROR_TIMEOUT => rusb::Error::Timeout,
791                        libusb1_sys::constants::LIBUSB_ERROR_OVERFLOW => rusb::Error::Overflow,
792                        libusb1_sys::constants::LIBUSB_ERROR_PIPE => rusb::Error::Pipe,
793                        libusb1_sys::constants::LIBUSB_ERROR_INTERRUPTED => {
794                            rusb::Error::Interrupted
795                        }
796                        libusb1_sys::constants::LIBUSB_ERROR_NO_MEM => rusb::Error::NoMem,
797                        libusb1_sys::constants::LIBUSB_ERROR_NOT_SUPPORTED => {
798                            rusb::Error::NotSupported
799                        }
800                        _ => rusb::Error::Other,
801                    }
802                    .into());
803                }
804            }
805        }
806        Ok(result)
807    }
808}
809
810pub struct BufferView<'a> {
811    pub system_time: std::time::SystemTime,
812    pub instant: std::time::Instant,
813    pub first_after_overflow: bool,
814    pub slice: &'a [u8],
815    pub read: usize,
816    pub write_range: WriteRange,
817    pub clutch: Clutch,
818    active: std::sync::Arc<std::sync::atomic::AtomicBool>,
819}
820
821impl BufferView<'_> {
822    pub fn backlog(&self) -> usize {
823        let result = (self.write_range.start + self.write_range.ring_length - 1 - self.read)
824            % self.write_range.ring_length;
825        if matches!(self.clutch, Clutch::Engaged) && result == 0 {
826            self.write_range.ring_length
827        } else {
828            result
829        }
830    }
831
832    pub fn delay(&self) -> std::time::Duration {
833        self.instant.elapsed()
834    }
835}
836
837impl Drop for BufferView<'_> {
838    fn drop(&mut self) {
839        self.active
840            .store(false, std::sync::atomic::Ordering::Release);
841    }
842}
843
844impl Ring {
845    pub fn backlog(&self) -> usize {
846        let shared = self
847            .context
848            .shared
849            .lock()
850            .expect("ring context's lock is not poisoned");
851        (shared.write_range.start + shared.buffers.len() - 1 - shared.read) % shared.buffers.len()
852    }
853
854    pub fn clutch(&self) -> Clutch {
855        let shared = self
856            .context
857            .shared
858            .lock()
859            .expect("ring context's lock is not poisoned");
860        shared.clutch
861    }
862
863    pub fn next_with_timeout(&self, duration: &std::time::Duration) -> Option<BufferView> {
864        if self
865            .active_buffer_view
866            .swap(true, std::sync::atomic::Ordering::AcqRel)
867        {
868            panic!("the buffer returned by a previous call of next_with_timeout must be dropped before calling next_with_timeout again");
869        }
870        let (system_time, instant, first_after_overflow, slice, read, write_range, clutch) = {
871            let start = std::time::Instant::now();
872            let mut shared = self
873                .context
874                .shared
875                .lock()
876                .expect("ring context's lock is not poisoned");
877            loop {
878                shared.read = (shared.read + 1) % shared.buffers.len();
879                while (shared.write_range.end + shared.buffers.len() - 1 - shared.read)
880                    % shared.buffers.len()
881                    < shared.transfer_statuses.len()
882                {
883                    let ellapsed = std::time::Instant::now() - start;
884                    if ellapsed >= *duration {
885                        self.active_buffer_view
886                            .store(false, std::sync::atomic::Ordering::Release);
887                        shared.read =
888                            (shared.read + shared.buffers.len() - 1) % shared.buffers.len();
889                        return None;
890                    }
891                    shared = self
892                        .context
893                        .shared_condvar
894                        .wait_timeout(shared, *duration - ellapsed)
895                        .expect("shared_condvar used with two different mutexes")
896                        .0;
897                }
898                if shared.buffers[shared.read].length > 0 {
899                    break;
900                }
901            }
902            (
903                shared.buffers[shared.read].system_time,
904                shared.buffers[shared.read].instant,
905                shared.buffers[shared.read].first_after_overflow,
906                unsafe {
908                    std::slice::from_raw_parts(
909                        shared.buffers[shared.read].data.as_ptr(),
910                        shared.buffers[shared.read].length,
911                    )
912                },
913                shared.read,
914                shared.write_range.clone(),
915                shared.clutch,
916            )
917        };
918        Some(BufferView {
919            system_time,
920            instant,
921            first_after_overflow,
922            slice,
923            read,
924            write_range,
925            clutch,
926            active: self.active_buffer_view.clone(),
927        })
928    }
929}
930
931impl Drop for Ring {
932    fn drop(&mut self) {
933        let mut dealloc_buffers = true;
934        let before_dealloc_transfers = std::time::Instant::now();
935        #[cfg(target_os = "macos")]
936        {
937            let mut shared = self
938                .context
939                .shared
940                .lock()
941                .expect("ring context's lock is not poisoned");
942            let _ = unsafe { libusb1_sys::libusb_cancel_transfer(self.transfers[0].as_ptr()) };
944            for index in 0..self.transfers.len() {
945                shared.transfer_statuses[index] = TransferStatus::Cancelling;
946            }
947        }
948        loop {
949            let mut deallocated_transfers: usize = 0;
950            {
951                let mut shared = self
952                    .context
953                    .shared
954                    .lock()
955                    .expect("ring context's lock is not poisoned");
956                for index in 0..self.transfers.len() {
957                    match shared.transfer_statuses[index] {
958                        TransferStatus::Active => {
959                            let status = unsafe {
960                                libusb1_sys::libusb_cancel_transfer(self.transfers[index].as_ptr())
961                            };
962                            if status == 0 {
963                                shared.transfer_statuses[index] = TransferStatus::Cancelling;
964                            } else {
965                                shared.transfer_statuses[index] = TransferStatus::Complete;
966                            }
967                        }
968                        TransferStatus::Complete => {
969                            let _transfer_context = unsafe {
971                                Box::from_raw(
972                                    (self.transfers[index].as_mut()).user_data
973                                        as *mut TransferContext,
974                                )
975                            };
976                            unsafe {
978                                libusb1_sys::libusb_free_transfer(self.transfers[index].as_ptr())
979                            };
980                            shared.transfer_statuses[index] = TransferStatus::Deallocated;
981                            deallocated_transfers += 1;
982                        }
983                        TransferStatus::Cancelling => (),
984                        TransferStatus::Deallocated => {
985                            deallocated_transfers += 1;
986                        }
987                    }
988                }
989            }
990            if deallocated_transfers == self.transfers.len() {
991                break;
992            }
993            if std::time::Instant::now() - before_dealloc_transfers
995                > std::time::Duration::from_secs(1)
996            {
997                dealloc_buffers = false;
998                break;
999            }
1000            std::thread::sleep(std::time::Duration::from_millis(100));
1001        }
1002        if dealloc_buffers {
1003            let shared = self
1004                .context
1005                .shared
1006                .lock()
1007                .expect("ring context's lock is not poisoned");
1008            for buffer in shared.buffers.iter() {
1009                if buffer.dma {
1010                    unsafe {
1012                        libusb_dev_mem_free(
1013                            self.handle.as_raw(),
1014                            buffer.data.as_ptr() as *mut libc::c_uchar,
1015                            buffer.capacity as libc::ssize_t,
1016                        );
1017                    };
1018                } else {
1019                    unsafe {
1021                        std::alloc::dealloc(
1022                            buffer.data.as_ptr(),
1023                            std::alloc::Layout::from_size_align_unchecked(buffer.capacity, 1),
1024                        );
1025                    }
1026                }
1027            }
1028        }
1029    }
1030}