android_usbser/
usb_sync.rs

1// Related issue: <https://github.com/kevinmehall/nusb/issues/4>.
2
3use crate::Error;
4use jni_min_helper::block_for_timeout;
5
6use futures_lite::future::block_on;
7use std::{io::ErrorKind, time::Duration};
8
9use nusb::transfer::{Queue, RequestBuffer, TransferError};
10type ReadQueue = Queue<RequestBuffer>;
11type WriteQueue = Queue<Vec<u8>>;
12
13/// Synchronous wrapper of a `nusb` IN transfer queue.
14pub struct SyncReader {
15    queue: ReadQueue,
16    buf: Option<Vec<u8>>,
17}
18impl SyncReader {
19    /// Wraps the asynchronous queue.
20    pub fn new(queue: ReadQueue) -> Self {
21        Self {
22            queue,
23            buf: Some(Vec::new()),
24        }
25    }
26    /// It is similar to `read()` in the standard `Read` trait, requiring timeout parameter.
27    pub fn read(&mut self, buf: &mut [u8], timeout: Duration) -> std::io::Result<usize> {
28        if buf.is_empty() {
29            return Ok(0);
30        }
31        let buf_async = self.buf.take().unwrap();
32        // Safety: `RequestBuffer::reuse()` may reserve larger capacity to reach buf.len()
33        let req = nusb::transfer::RequestBuffer::reuse(buf_async, buf.len());
34
35        self.queue.submit(req);
36        let fut = self.queue.next_complete();
37        let comp = {
38            let mut maybe_comp = block_for_timeout(fut, timeout);
39            if maybe_comp.is_none() {
40                self.queue.cancel_all(); // the only one
41                if self.queue.pending() == 0 {
42                    self.buf.replace(Vec::new());
43                    return Err(Error::other("Unable to get the transfer result"));
44                }
45                let comp = block_on(self.queue.next_complete());
46                maybe_comp.replace(comp);
47            }
48            maybe_comp.unwrap()
49        };
50        let len_reveived = comp.data.len();
51
52        let result = match comp.status {
53            Ok(()) => {
54                buf[..len_reveived].copy_from_slice(&comp.data);
55                Ok(len_reveived)
56            }
57            Err(TransferError::Cancelled) => {
58                if len_reveived > 0 {
59                    buf[..len_reveived].copy_from_slice(&comp.data);
60                    Ok(len_reveived)
61                } else {
62                    Err(Error::from(ErrorKind::TimedOut))
63                }
64            }
65            Err(TransferError::Disconnected) => Err(Error::from(ErrorKind::NotConnected)),
66            Err(TransferError::Stall) => {
67                let _ = self.queue.clear_halt();
68                Err(Error::other(TransferError::Stall))
69            }
70            Err(e) => Err(Error::other(e)),
71        };
72        self.buf.replace(comp.data);
73        result
74    }
75}
76
77impl From<ReadQueue> for SyncReader {
78    fn from(value: ReadQueue) -> Self {
79        Self::new(value)
80    }
81}
82
83impl From<SyncReader> for ReadQueue {
84    fn from(value: SyncReader) -> Self {
85        value.queue
86    }
87}
88
89/// Synchronous wrapper of a `nusb` OUT transfer queue.
90pub struct SyncWriter {
91    queue: WriteQueue,
92    buf: Option<Vec<u8>>,
93}
94
95impl SyncWriter {
96    /// Wraps the asynchronous queue.
97    pub fn new(queue: WriteQueue) -> Self {
98        Self {
99            queue,
100            buf: Some(Vec::new()),
101        }
102    }
103    /// It is similar to `write()` in the standard `Write` trait, requiring timeout parameter.
104    /// It is always synchronous, and `flush()` is not needed.
105    pub fn write(&mut self, buf: &[u8], timeout: Duration) -> std::io::Result<usize> {
106        if buf.is_empty() {
107            return Ok(0);
108        }
109        let mut buf_async = self.buf.take().unwrap();
110        buf_async.clear(); // it has no effect on the allocated capacity
111        buf_async.extend_from_slice(buf);
112
113        self.queue.submit(buf_async);
114        let fut = self.queue.next_complete();
115        let comp = {
116            let mut maybe_comp = block_for_timeout(fut, timeout);
117            if maybe_comp.is_none() {
118                self.queue.cancel_all(); // the only one
119                if self.queue.pending() == 0 {
120                    self.buf.replace(Vec::new());
121                    return Err(Error::other("Unable to get the transfer result"));
122                }
123                let comp = block_on(self.queue.next_complete());
124                maybe_comp.replace(comp);
125            }
126            maybe_comp.unwrap()
127        };
128        let len_sent = comp.data.actual_length();
129
130        let result = match comp.status {
131            Ok(()) => Ok(len_sent),
132            Err(TransferError::Cancelled) => {
133                if len_sent > 0 {
134                    Ok(len_sent)
135                } else {
136                    Err(Error::from(ErrorKind::TimedOut))
137                }
138            }
139            Err(TransferError::Disconnected) => Err(Error::from(ErrorKind::NotConnected)),
140            Err(TransferError::Stall) => {
141                let _ = self.queue.clear_halt();
142                Err(Error::other(TransferError::Stall))
143            }
144            Err(e) => Err(Error::other(e)),
145        };
146        self.buf.replace(comp.data.reuse());
147        result
148    }
149}
150
151impl From<WriteQueue> for SyncWriter {
152    fn from(value: WriteQueue) -> Self {
153        Self::new(value)
154    }
155}
156
157impl From<SyncWriter> for WriteQueue {
158    fn from(value: SyncWriter) -> Self {
159        value.queue
160    }
161}