android_usbser/
usb_sync.rs1use crate::Error;
4use jni_min_helper::block_for_timeout;
5
6use futures_lite::future::block_on;
7use std::{io::ErrorKind, time::Duration};
8
9use nusb::transfer::{Queue, RequestBuffer, TransferError};
10type ReadQueue = Queue<RequestBuffer>;
11type WriteQueue = Queue<Vec<u8>>;
12
13pub struct SyncReader {
15 queue: ReadQueue,
16 buf: Option<Vec<u8>>,
17}
18impl SyncReader {
19 pub fn new(queue: ReadQueue) -> Self {
21 Self {
22 queue,
23 buf: Some(Vec::new()),
24 }
25 }
26 pub fn read(&mut self, buf: &mut [u8], timeout: Duration) -> std::io::Result<usize> {
28 if buf.is_empty() {
29 return Ok(0);
30 }
31 let buf_async = self.buf.take().unwrap();
32 let req = nusb::transfer::RequestBuffer::reuse(buf_async, buf.len());
34
35 self.queue.submit(req);
36 let fut = self.queue.next_complete();
37 let comp = {
38 let mut maybe_comp = block_for_timeout(fut, timeout);
39 if maybe_comp.is_none() {
40 self.queue.cancel_all(); if self.queue.pending() == 0 {
42 self.buf.replace(Vec::new());
43 return Err(Error::other("Unable to get the transfer result"));
44 }
45 let comp = block_on(self.queue.next_complete());
46 maybe_comp.replace(comp);
47 }
48 maybe_comp.unwrap()
49 };
50 let len_reveived = comp.data.len();
51
52 let result = match comp.status {
53 Ok(()) => {
54 buf[..len_reveived].copy_from_slice(&comp.data);
55 Ok(len_reveived)
56 }
57 Err(TransferError::Cancelled) => {
58 if len_reveived > 0 {
59 buf[..len_reveived].copy_from_slice(&comp.data);
60 Ok(len_reveived)
61 } else {
62 Err(Error::from(ErrorKind::TimedOut))
63 }
64 }
65 Err(TransferError::Disconnected) => Err(Error::from(ErrorKind::NotConnected)),
66 Err(TransferError::Stall) => {
67 let _ = self.queue.clear_halt();
68 Err(Error::other(TransferError::Stall))
69 }
70 Err(e) => Err(Error::other(e)),
71 };
72 self.buf.replace(comp.data);
73 result
74 }
75}
76
77impl From<ReadQueue> for SyncReader {
78 fn from(value: ReadQueue) -> Self {
79 Self::new(value)
80 }
81}
82
83impl From<SyncReader> for ReadQueue {
84 fn from(value: SyncReader) -> Self {
85 value.queue
86 }
87}
88
89pub struct SyncWriter {
91 queue: WriteQueue,
92 buf: Option<Vec<u8>>,
93}
94
95impl SyncWriter {
96 pub fn new(queue: WriteQueue) -> Self {
98 Self {
99 queue,
100 buf: Some(Vec::new()),
101 }
102 }
103 pub fn write(&mut self, buf: &[u8], timeout: Duration) -> std::io::Result<usize> {
106 if buf.is_empty() {
107 return Ok(0);
108 }
109 let mut buf_async = self.buf.take().unwrap();
110 buf_async.clear(); buf_async.extend_from_slice(buf);
112
113 self.queue.submit(buf_async);
114 let fut = self.queue.next_complete();
115 let comp = {
116 let mut maybe_comp = block_for_timeout(fut, timeout);
117 if maybe_comp.is_none() {
118 self.queue.cancel_all(); if self.queue.pending() == 0 {
120 self.buf.replace(Vec::new());
121 return Err(Error::other("Unable to get the transfer result"));
122 }
123 let comp = block_on(self.queue.next_complete());
124 maybe_comp.replace(comp);
125 }
126 maybe_comp.unwrap()
127 };
128 let len_sent = comp.data.actual_length();
129
130 let result = match comp.status {
131 Ok(()) => Ok(len_sent),
132 Err(TransferError::Cancelled) => {
133 if len_sent > 0 {
134 Ok(len_sent)
135 } else {
136 Err(Error::from(ErrorKind::TimedOut))
137 }
138 }
139 Err(TransferError::Disconnected) => Err(Error::from(ErrorKind::NotConnected)),
140 Err(TransferError::Stall) => {
141 let _ = self.queue.clear_halt();
142 Err(Error::other(TransferError::Stall))
143 }
144 Err(e) => Err(Error::other(e)),
145 };
146 self.buf.replace(comp.data.reuse());
147 result
148 }
149}
150
151impl From<WriteQueue> for SyncWriter {
152 fn from(value: WriteQueue) -> Self {
153 Self::new(value)
154 }
155}
156
157impl From<SyncWriter> for WriteQueue {
158 fn from(value: SyncWriter) -> Self {
159 value.queue
160 }
161}