pub struct EndpointReceiver(/* private fields */);Expand description
USB endpoint from host to device receiver.
Receiving is asynchronous and uses a queue backed by Linux AIO. Empty
buffers are enqueued with recv (or try_recv /
recv_async) and submitted to the kernel, which fills
them in the background as USB data arrives.
Filled buffers are retrieved with fetch,
fetch_timeout, or try_fetch.
For a combined enqueue-and-wait operation, use
recv_and_fetch.
recv, recv_async and
recv_timeout automatically wait for queue space
by fetching completed buffers, which they return.
When using try_recv directly, call
is_ready first or fetch completed buffers
to ensure space is available.
§Buffer size and throughput
Buffer size has a major impact on bulk transfer throughput. The FunctionFS kernel driver submits one USB transfer per AIO read request. When the buffer capacity equals the maximum packet size (MPS, typically 512 bytes for high-speed or 1024 bytes for super-speed), each USB packet triggers a separate AIO completion — leading to high per-packet overhead and poor throughput.
Using buffers much larger than the MPS (e.g. 16 KiB) allows the kernel
to batch multiple USB packets into a single read, dramatically improving
throughput. A good starting point is a buffer capacity that is a multiple of
the MPS and at least several KiB (e.g. 16 KiB). Use
max_packet_size to query the negotiated MPS at
runtime.
For high-throughput workloads, also consider increasing the queue depth (default: 16).
§Buffer size and zero-length packets (ZLP)
The USB host uses zero-length packets to signal the end of a transfer whose total size is a multiple of the maximum packet size (MPS). When the receive buffer capacity equals the MPS, each USB packet completes one read request, and a ZLP is delivered as a separate zero-length completion.
When the buffer capacity is larger than the MPS (recommended for
throughput), the kernel submits a single USB transfer that spans multiple
packets. A short packet — including a ZLP — terminates the transfer early
and the read completes with fewer bytes than the buffer capacity. This is
the standard way to detect end-of-message: any completion where
received < capacity indicates that a short packet (or ZLP) arrived.
MPS-sized buffers are only necessary when each individual USB packet must be observed separately (e.g. for packet-level diagnostics).
Implementations§
Source§impl EndpointReceiver
impl EndpointReceiver
Sourcepub fn control(&mut self) -> Result<EndpointControl<'_>>
pub fn control(&mut self) -> Result<EndpointControl<'_>>
Gets the endpoint control interface.
Examples found in repository?
111fn run(mut ep1_rx: EndpointReceiver, mut ep2_tx: EndpointSender, mut custom: Custom) {
112 let ep1_control = ep1_rx.control().unwrap();
113 println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
114 println!("ep1 real address: {}", ep1_control.real_address().unwrap());
115 println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
116 println!();
117
118 let ep2_control = ep2_tx.control().unwrap();
119 println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
120 println!("ep2 real address: {}", ep2_control.real_address().unwrap());
121 println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
122 println!();
123
124 let stop = Arc::new(AtomicBool::new(false));
125
126 thread::scope(|s| {
127 s.spawn(|| {
128 let size = ep1_rx.max_packet_size().unwrap();
129 let mut b = 0;
130 while !stop.load(Ordering::Relaxed) {
131 let data = ep1_rx
132 .recv_timeout(BytesMut::with_capacity(size), Duration::from_secs(1))
133 .expect("recv failed");
134 match data {
135 Some(data) => {
136 println!("received {} bytes: {data:x?}", data.len());
137 if !data.iter().all(|x| *x == b) {
138 panic!("wrong data received");
139 }
140 b = b.wrapping_add(1);
141 }
142 None => {
143 println!("receive empty");
144 }
145 }
146 }
147 });
148
149 s.spawn(|| {
150 let size = ep2_tx.max_packet_size().unwrap();
151 let mut b = 0u8;
152 while !stop.load(Ordering::Relaxed) {
153 let data = vec![b; size];
154 match ep2_tx.send_timeout(data.into(), Duration::from_secs(1)) {
155 Ok(()) => {
156 println!("sent data {b} of size {size} bytes");
157 b = b.wrapping_add(1);
158 }
159 Err(err) if err.kind() == ErrorKind::TimedOut => println!("send timeout"),
160 Err(err) => panic!("send failed: {err}"),
161 }
162 }
163 });
164
165 s.spawn(|| {
166 let mut ctrl_data = Vec::new();
167
168 while !stop.load(Ordering::Relaxed) {
169 if let Some(event) = custom.event_timeout(Duration::from_secs(1)).expect("event failed") {
170 println!("Event: {event:?}");
171 match event {
172 Event::SetupHostToDevice(req) => {
173 if req.ctrl_req().request == 255 {
174 println!("Stopping");
175 stop.store(true, Ordering::Relaxed);
176 }
177 ctrl_data = req.recv_all().unwrap();
178 println!("Control data: {ctrl_data:x?}");
179 }
180 Event::SetupDeviceToHost(req) => {
181 println!("Replying with data");
182 req.send(&ctrl_data).unwrap();
183 }
184 _ => (),
185 }
186 } else {
187 println!("no event");
188 }
189 }
190 });
191 });
192}More examples
19async fn main() {
20 env_logger::init();
21
22 usb_gadget::remove_all().expect("cannot remove all gadgets");
23
24 let (mut ep1_rx, ep1_dir) = EndpointDirection::host_to_device();
25 let (mut ep2_tx, ep2_dir) = EndpointDirection::device_to_host();
26
27 let (mut custom, handle) = Custom::builder()
28 .with_interface(
29 Interface::new(Class::vendor_specific(1, 2), "custom interface")
30 .with_endpoint(Endpoint::bulk(ep1_dir))
31 .with_endpoint(Endpoint::bulk(ep2_dir)),
32 )
33 .build();
34
35 let udc = default_udc().expect("cannot get UDC");
36 let reg = Gadget::new(
37 Class::vendor_specific(255, 3),
38 Id::new(6, 0x11),
39 Strings::new("manufacturer", "custom USB interface", "serial_number"),
40 )
41 .with_config(Config::new("config").with_function(handle))
42 .with_os_descriptor(OsDescriptor::microsoft())
43 .with_web_usb(WebUsb::new(0xf1, "http://webusb.org"))
44 .bind(&udc)
45 .expect("cannot bind to UDC");
46
47 println!("Custom function at {}", custom.status().unwrap().path().unwrap().display());
48 println!();
49
50 let ep1_control = ep1_rx.control().unwrap();
51 println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
52 println!("ep1 real address: {}", ep1_control.real_address().unwrap());
53 println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
54 println!();
55
56 let ep2_control = ep2_tx.control().unwrap();
57 println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
58 println!("ep2 real address: {}", ep2_control.real_address().unwrap());
59 println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
60 println!();
61
62 let stop = Arc::new(AtomicBool::new(false));
63
64 let stop1 = stop.clone();
65 tokio::spawn(async move {
66 let size = ep1_rx.max_packet_size().unwrap();
67 let mut b = 0;
68 while !stop1.load(Ordering::Relaxed) {
69 let data = ep1_rx.recv_async(BytesMut::with_capacity(size)).await.expect("recv_async failed");
70 match data {
71 Some(data) => {
72 println!("received {} bytes: {data:x?}", data.len());
73 if !data.iter().all(|x| *x == b) {
74 panic!("wrong data received");
75 }
76 b = b.wrapping_add(1);
77 }
78 None => {
79 println!("receive empty");
80 }
81 }
82 }
83 });
84
85 let stop2 = stop.clone();
86 tokio::spawn(async move {
87 let size = ep2_tx.max_packet_size().unwrap();
88 let mut b = 0u8;
89 while !stop2.load(Ordering::Relaxed) {
90 let data = vec![b; size];
91 match ep2_tx.send_async(data.into()).await {
92 Ok(()) => {
93 println!("sent data {b} of size {size} bytes");
94 b = b.wrapping_add(1);
95 }
96 Err(err) => panic!("send failed: {err}"),
97 }
98 }
99 });
100
101 let mut ctrl_data = Vec::new();
102 while !stop.load(Ordering::Relaxed) {
103 custom.wait_event().await.expect("wait for event failed");
104 println!("event ready");
105 let event = custom.event().expect("event failed");
106
107 println!("Event: {event:?}");
108 match event {
109 Event::SetupHostToDevice(req) => {
110 if req.ctrl_req().request == 255 {
111 println!("Stopping");
112 stop.store(true, Ordering::Relaxed);
113 }
114 ctrl_data = req.recv_all().unwrap();
115 println!("Control data: {ctrl_data:x?}");
116 }
117 Event::SetupDeviceToHost(req) => {
118 println!("Replying with data");
119 req.send(&ctrl_data).unwrap();
120 }
121 _ => (),
122 }
123 }
124
125 tokio::time::sleep(Duration::from_secs(1)).await;
126
127 println!("Unregistering");
128 reg.remove().unwrap();
129}20fn main() {
21 env_logger::init();
22
23 usb_gadget::remove_all().expect("cannot remove all gadgets");
24
25 let (mut ep1_rx, ep1_dir) = EndpointDirection::host_to_device();
26 let (mut ep2_tx, ep2_dir) = EndpointDirection::device_to_host();
27
28 let (mut custom, handle) = Custom::builder()
29 .with_interface(
30 Interface::new(Class::vendor_specific(1, 2), "custom interface")
31 .with_endpoint(Endpoint::bulk(ep1_dir))
32 .with_endpoint(Endpoint::bulk(ep2_dir)),
33 )
34 .build();
35
36 let udc = default_udc().expect("cannot get UDC");
37 let reg = Gadget::new(
38 Class::vendor_specific(255, 3),
39 Id::new(6, 0x11),
40 Strings::new("manufacturer", "custom USB interface", "serial_number"),
41 )
42 .with_config(Config::new("config").with_function(handle))
43 .with_os_descriptor(OsDescriptor::microsoft())
44 .with_web_usb(WebUsb::new(0xf1, "http://webusb.org"))
45 .bind(&udc)
46 .expect("cannot bind to UDC");
47
48 println!("Custom function at {}", custom.status().unwrap().path().unwrap().display());
49 println!();
50
51 let ep1_control = ep1_rx.control().unwrap();
52 println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
53 println!("ep1 real address: {}", ep1_control.real_address().unwrap());
54 println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
55 println!();
56
57 let ep2_control = ep2_tx.control().unwrap();
58 println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
59 println!("ep2 real address: {}", ep2_control.real_address().unwrap());
60 println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
61 println!();
62
63 let stop = Arc::new(AtomicBool::new(false));
64
65 thread::scope(|s| {
66 s.spawn(|| {
67 let size = ep1_rx.max_packet_size().unwrap();
68 let mut b = 0;
69 while !stop.load(Ordering::Relaxed) {
70 let data = ep1_rx
71 .recv_timeout(BytesMut::with_capacity(size), Duration::from_secs(1))
72 .expect("recv failed");
73 match data {
74 Some(data) => {
75 println!("received {} bytes: {data:x?}", data.len());
76 if !data.iter().all(|x| *x == b) {
77 panic!("wrong data received");
78 }
79 b = b.wrapping_add(1);
80 }
81 None => {
82 println!("receive empty");
83 }
84 }
85 }
86 });
87
88 s.spawn(|| {
89 let size = ep2_tx.max_packet_size().unwrap();
90 let mut b = 0u8;
91 while !stop.load(Ordering::Relaxed) {
92 let data = vec![b; size];
93 match ep2_tx.send_timeout(data.into(), Duration::from_secs(1)) {
94 Ok(()) => {
95 println!("sent data {b} of size {size} bytes");
96 b = b.wrapping_add(1);
97 }
98 Err(err) if err.kind() == ErrorKind::TimedOut => println!("send timeout"),
99 Err(err) => panic!("send failed: {err}"),
100 }
101 }
102 });
103
104 s.spawn(|| {
105 let mut ctrl_data = Vec::new();
106
107 while !stop.load(Ordering::Relaxed) {
108 if let Some(event) = custom.event_timeout(Duration::from_secs(1)).expect("event failed") {
109 println!("Event: {event:?}");
110 match event {
111 Event::SetupHostToDevice(req) => {
112 if req.ctrl_req().request == 255 {
113 println!("Stopping");
114 stop.store(true, Ordering::Relaxed);
115 }
116 ctrl_data = req.recv_all().unwrap();
117 println!("Control data: {ctrl_data:x?}");
118 }
119 Event::SetupDeviceToHost(req) => {
120 println!("Replying with data");
121 req.send(&ctrl_data).unwrap();
122 }
123 _ => (),
124 }
125 } else {
126 println!("no event");
127 }
128 }
129 });
130 });
131
132 thread::sleep(Duration::from_secs(1));
133
134 println!("Unregistering");
135 reg.remove().unwrap();
136}Sourcepub fn max_packet_size(&mut self) -> Result<usize>
pub fn max_packet_size(&mut self) -> Result<usize>
Maximum packet size.
Examples found in repository?
111fn run(mut ep1_rx: EndpointReceiver, mut ep2_tx: EndpointSender, mut custom: Custom) {
112 let ep1_control = ep1_rx.control().unwrap();
113 println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
114 println!("ep1 real address: {}", ep1_control.real_address().unwrap());
115 println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
116 println!();
117
118 let ep2_control = ep2_tx.control().unwrap();
119 println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
120 println!("ep2 real address: {}", ep2_control.real_address().unwrap());
121 println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
122 println!();
123
124 let stop = Arc::new(AtomicBool::new(false));
125
126 thread::scope(|s| {
127 s.spawn(|| {
128 let size = ep1_rx.max_packet_size().unwrap();
129 let mut b = 0;
130 while !stop.load(Ordering::Relaxed) {
131 let data = ep1_rx
132 .recv_timeout(BytesMut::with_capacity(size), Duration::from_secs(1))
133 .expect("recv failed");
134 match data {
135 Some(data) => {
136 println!("received {} bytes: {data:x?}", data.len());
137 if !data.iter().all(|x| *x == b) {
138 panic!("wrong data received");
139 }
140 b = b.wrapping_add(1);
141 }
142 None => {
143 println!("receive empty");
144 }
145 }
146 }
147 });
148
149 s.spawn(|| {
150 let size = ep2_tx.max_packet_size().unwrap();
151 let mut b = 0u8;
152 while !stop.load(Ordering::Relaxed) {
153 let data = vec![b; size];
154 match ep2_tx.send_timeout(data.into(), Duration::from_secs(1)) {
155 Ok(()) => {
156 println!("sent data {b} of size {size} bytes");
157 b = b.wrapping_add(1);
158 }
159 Err(err) if err.kind() == ErrorKind::TimedOut => println!("send timeout"),
160 Err(err) => panic!("send failed: {err}"),
161 }
162 }
163 });
164
165 s.spawn(|| {
166 let mut ctrl_data = Vec::new();
167
168 while !stop.load(Ordering::Relaxed) {
169 if let Some(event) = custom.event_timeout(Duration::from_secs(1)).expect("event failed") {
170 println!("Event: {event:?}");
171 match event {
172 Event::SetupHostToDevice(req) => {
173 if req.ctrl_req().request == 255 {
174 println!("Stopping");
175 stop.store(true, Ordering::Relaxed);
176 }
177 ctrl_data = req.recv_all().unwrap();
178 println!("Control data: {ctrl_data:x?}");
179 }
180 Event::SetupDeviceToHost(req) => {
181 println!("Replying with data");
182 req.send(&ctrl_data).unwrap();
183 }
184 _ => (),
185 }
186 } else {
187 println!("no event");
188 }
189 }
190 });
191 });
192}More examples
19async fn main() {
20 env_logger::init();
21
22 usb_gadget::remove_all().expect("cannot remove all gadgets");
23
24 let (mut ep1_rx, ep1_dir) = EndpointDirection::host_to_device();
25 let (mut ep2_tx, ep2_dir) = EndpointDirection::device_to_host();
26
27 let (mut custom, handle) = Custom::builder()
28 .with_interface(
29 Interface::new(Class::vendor_specific(1, 2), "custom interface")
30 .with_endpoint(Endpoint::bulk(ep1_dir))
31 .with_endpoint(Endpoint::bulk(ep2_dir)),
32 )
33 .build();
34
35 let udc = default_udc().expect("cannot get UDC");
36 let reg = Gadget::new(
37 Class::vendor_specific(255, 3),
38 Id::new(6, 0x11),
39 Strings::new("manufacturer", "custom USB interface", "serial_number"),
40 )
41 .with_config(Config::new("config").with_function(handle))
42 .with_os_descriptor(OsDescriptor::microsoft())
43 .with_web_usb(WebUsb::new(0xf1, "http://webusb.org"))
44 .bind(&udc)
45 .expect("cannot bind to UDC");
46
47 println!("Custom function at {}", custom.status().unwrap().path().unwrap().display());
48 println!();
49
50 let ep1_control = ep1_rx.control().unwrap();
51 println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
52 println!("ep1 real address: {}", ep1_control.real_address().unwrap());
53 println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
54 println!();
55
56 let ep2_control = ep2_tx.control().unwrap();
57 println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
58 println!("ep2 real address: {}", ep2_control.real_address().unwrap());
59 println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
60 println!();
61
62 let stop = Arc::new(AtomicBool::new(false));
63
64 let stop1 = stop.clone();
65 tokio::spawn(async move {
66 let size = ep1_rx.max_packet_size().unwrap();
67 let mut b = 0;
68 while !stop1.load(Ordering::Relaxed) {
69 let data = ep1_rx.recv_async(BytesMut::with_capacity(size)).await.expect("recv_async failed");
70 match data {
71 Some(data) => {
72 println!("received {} bytes: {data:x?}", data.len());
73 if !data.iter().all(|x| *x == b) {
74 panic!("wrong data received");
75 }
76 b = b.wrapping_add(1);
77 }
78 None => {
79 println!("receive empty");
80 }
81 }
82 }
83 });
84
85 let stop2 = stop.clone();
86 tokio::spawn(async move {
87 let size = ep2_tx.max_packet_size().unwrap();
88 let mut b = 0u8;
89 while !stop2.load(Ordering::Relaxed) {
90 let data = vec![b; size];
91 match ep2_tx.send_async(data.into()).await {
92 Ok(()) => {
93 println!("sent data {b} of size {size} bytes");
94 b = b.wrapping_add(1);
95 }
96 Err(err) => panic!("send failed: {err}"),
97 }
98 }
99 });
100
101 let mut ctrl_data = Vec::new();
102 while !stop.load(Ordering::Relaxed) {
103 custom.wait_event().await.expect("wait for event failed");
104 println!("event ready");
105 let event = custom.event().expect("event failed");
106
107 println!("Event: {event:?}");
108 match event {
109 Event::SetupHostToDevice(req) => {
110 if req.ctrl_req().request == 255 {
111 println!("Stopping");
112 stop.store(true, Ordering::Relaxed);
113 }
114 ctrl_data = req.recv_all().unwrap();
115 println!("Control data: {ctrl_data:x?}");
116 }
117 Event::SetupDeviceToHost(req) => {
118 println!("Replying with data");
119 req.send(&ctrl_data).unwrap();
120 }
121 _ => (),
122 }
123 }
124
125 tokio::time::sleep(Duration::from_secs(1)).await;
126
127 println!("Unregistering");
128 reg.remove().unwrap();
129}20fn main() {
21 env_logger::init();
22
23 usb_gadget::remove_all().expect("cannot remove all gadgets");
24
25 let (mut ep1_rx, ep1_dir) = EndpointDirection::host_to_device();
26 let (mut ep2_tx, ep2_dir) = EndpointDirection::device_to_host();
27
28 let (mut custom, handle) = Custom::builder()
29 .with_interface(
30 Interface::new(Class::vendor_specific(1, 2), "custom interface")
31 .with_endpoint(Endpoint::bulk(ep1_dir))
32 .with_endpoint(Endpoint::bulk(ep2_dir)),
33 )
34 .build();
35
36 let udc = default_udc().expect("cannot get UDC");
37 let reg = Gadget::new(
38 Class::vendor_specific(255, 3),
39 Id::new(6, 0x11),
40 Strings::new("manufacturer", "custom USB interface", "serial_number"),
41 )
42 .with_config(Config::new("config").with_function(handle))
43 .with_os_descriptor(OsDescriptor::microsoft())
44 .with_web_usb(WebUsb::new(0xf1, "http://webusb.org"))
45 .bind(&udc)
46 .expect("cannot bind to UDC");
47
48 println!("Custom function at {}", custom.status().unwrap().path().unwrap().display());
49 println!();
50
51 let ep1_control = ep1_rx.control().unwrap();
52 println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
53 println!("ep1 real address: {}", ep1_control.real_address().unwrap());
54 println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
55 println!();
56
57 let ep2_control = ep2_tx.control().unwrap();
58 println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
59 println!("ep2 real address: {}", ep2_control.real_address().unwrap());
60 println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
61 println!();
62
63 let stop = Arc::new(AtomicBool::new(false));
64
65 thread::scope(|s| {
66 s.spawn(|| {
67 let size = ep1_rx.max_packet_size().unwrap();
68 let mut b = 0;
69 while !stop.load(Ordering::Relaxed) {
70 let data = ep1_rx
71 .recv_timeout(BytesMut::with_capacity(size), Duration::from_secs(1))
72 .expect("recv failed");
73 match data {
74 Some(data) => {
75 println!("received {} bytes: {data:x?}", data.len());
76 if !data.iter().all(|x| *x == b) {
77 panic!("wrong data received");
78 }
79 b = b.wrapping_add(1);
80 }
81 None => {
82 println!("receive empty");
83 }
84 }
85 }
86 });
87
88 s.spawn(|| {
89 let size = ep2_tx.max_packet_size().unwrap();
90 let mut b = 0u8;
91 while !stop.load(Ordering::Relaxed) {
92 let data = vec![b; size];
93 match ep2_tx.send_timeout(data.into(), Duration::from_secs(1)) {
94 Ok(()) => {
95 println!("sent data {b} of size {size} bytes");
96 b = b.wrapping_add(1);
97 }
98 Err(err) if err.kind() == ErrorKind::TimedOut => println!("send timeout"),
99 Err(err) => panic!("send failed: {err}"),
100 }
101 }
102 });
103
104 s.spawn(|| {
105 let mut ctrl_data = Vec::new();
106
107 while !stop.load(Ordering::Relaxed) {
108 if let Some(event) = custom.event_timeout(Duration::from_secs(1)).expect("event failed") {
109 println!("Event: {event:?}");
110 match event {
111 Event::SetupHostToDevice(req) => {
112 if req.ctrl_req().request == 255 {
113 println!("Stopping");
114 stop.store(true, Ordering::Relaxed);
115 }
116 ctrl_data = req.recv_all().unwrap();
117 println!("Control data: {ctrl_data:x?}");
118 }
119 Event::SetupDeviceToHost(req) => {
120 println!("Replying with data");
121 req.send(&ctrl_data).unwrap();
122 }
123 _ => (),
124 }
125 } else {
126 println!("no event");
127 }
128 }
129 });
130 });
131
132 thread::sleep(Duration::from_secs(1));
133
134 println!("Unregistering");
135 reg.remove().unwrap();
136}Sourcepub fn recv_and_fetch(&mut self, buf: BytesMut) -> Result<BytesMut>
pub fn recv_and_fetch(&mut self, buf: BytesMut) -> Result<BytesMut>
Receive data synchronously.
The buffer should have been allocated with the desired capacity using
BytesMut::with_capacity. The capacity should be a positive multiple
of the endpoint’s maximum packet size.
Blocks until the operation completes and returns its result.
Sourcepub fn recv_and_fetch_timeout(
&mut self,
buf: BytesMut,
timeout: Duration,
) -> Result<BytesMut>
pub fn recv_and_fetch_timeout( &mut self, buf: BytesMut, timeout: Duration, ) -> Result<BytesMut>
Receive data synchronously with a timeout.
The buffer should have been allocated with the desired capacity using
BytesMut::with_capacity. The capacity should be a positive multiple
of the endpoint’s maximum packet size.
Blocks until the operation completes and returns its result.
Sourcepub fn recv(&mut self, buf: BytesMut) -> Result<Option<BytesMut>>
pub fn recv(&mut self, buf: BytesMut) -> Result<Option<BytesMut>>
Receive data.
The buffer should have been allocated with the desired capacity using
BytesMut::with_capacity. The capacity should be a positive multiple
of the endpoint’s maximum packet size.
Waits for space in the receive queue and enqueues the buffer for receiving data. Returns received data, if a buffer in the receive queue was filled.
Sourcepub async fn recv_async(&mut self, buf: BytesMut) -> Result<Option<BytesMut>>
Available on crate feature tokio only.
pub async fn recv_async(&mut self, buf: BytesMut) -> Result<Option<BytesMut>>
tokio only.Asynchronously receive data.
The buffer should have been allocated with the desired capacity using
BytesMut::with_capacity. The capacity should be a positive multiple
of the endpoint’s maximum packet size.
Waits for space in the receive queue and enqueues the buffer for receiving data. Returns received data, if a buffer in the receive queue was filled.
Examples found in repository?
19async fn main() {
20 env_logger::init();
21
22 usb_gadget::remove_all().expect("cannot remove all gadgets");
23
24 let (mut ep1_rx, ep1_dir) = EndpointDirection::host_to_device();
25 let (mut ep2_tx, ep2_dir) = EndpointDirection::device_to_host();
26
27 let (mut custom, handle) = Custom::builder()
28 .with_interface(
29 Interface::new(Class::vendor_specific(1, 2), "custom interface")
30 .with_endpoint(Endpoint::bulk(ep1_dir))
31 .with_endpoint(Endpoint::bulk(ep2_dir)),
32 )
33 .build();
34
35 let udc = default_udc().expect("cannot get UDC");
36 let reg = Gadget::new(
37 Class::vendor_specific(255, 3),
38 Id::new(6, 0x11),
39 Strings::new("manufacturer", "custom USB interface", "serial_number"),
40 )
41 .with_config(Config::new("config").with_function(handle))
42 .with_os_descriptor(OsDescriptor::microsoft())
43 .with_web_usb(WebUsb::new(0xf1, "http://webusb.org"))
44 .bind(&udc)
45 .expect("cannot bind to UDC");
46
47 println!("Custom function at {}", custom.status().unwrap().path().unwrap().display());
48 println!();
49
50 let ep1_control = ep1_rx.control().unwrap();
51 println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
52 println!("ep1 real address: {}", ep1_control.real_address().unwrap());
53 println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
54 println!();
55
56 let ep2_control = ep2_tx.control().unwrap();
57 println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
58 println!("ep2 real address: {}", ep2_control.real_address().unwrap());
59 println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
60 println!();
61
62 let stop = Arc::new(AtomicBool::new(false));
63
64 let stop1 = stop.clone();
65 tokio::spawn(async move {
66 let size = ep1_rx.max_packet_size().unwrap();
67 let mut b = 0;
68 while !stop1.load(Ordering::Relaxed) {
69 let data = ep1_rx.recv_async(BytesMut::with_capacity(size)).await.expect("recv_async failed");
70 match data {
71 Some(data) => {
72 println!("received {} bytes: {data:x?}", data.len());
73 if !data.iter().all(|x| *x == b) {
74 panic!("wrong data received");
75 }
76 b = b.wrapping_add(1);
77 }
78 None => {
79 println!("receive empty");
80 }
81 }
82 }
83 });
84
85 let stop2 = stop.clone();
86 tokio::spawn(async move {
87 let size = ep2_tx.max_packet_size().unwrap();
88 let mut b = 0u8;
89 while !stop2.load(Ordering::Relaxed) {
90 let data = vec![b; size];
91 match ep2_tx.send_async(data.into()).await {
92 Ok(()) => {
93 println!("sent data {b} of size {size} bytes");
94 b = b.wrapping_add(1);
95 }
96 Err(err) => panic!("send failed: {err}"),
97 }
98 }
99 });
100
101 let mut ctrl_data = Vec::new();
102 while !stop.load(Ordering::Relaxed) {
103 custom.wait_event().await.expect("wait for event failed");
104 println!("event ready");
105 let event = custom.event().expect("event failed");
106
107 println!("Event: {event:?}");
108 match event {
109 Event::SetupHostToDevice(req) => {
110 if req.ctrl_req().request == 255 {
111 println!("Stopping");
112 stop.store(true, Ordering::Relaxed);
113 }
114 ctrl_data = req.recv_all().unwrap();
115 println!("Control data: {ctrl_data:x?}");
116 }
117 Event::SetupDeviceToHost(req) => {
118 println!("Replying with data");
119 req.send(&ctrl_data).unwrap();
120 }
121 _ => (),
122 }
123 }
124
125 tokio::time::sleep(Duration::from_secs(1)).await;
126
127 println!("Unregistering");
128 reg.remove().unwrap();
129}Sourcepub fn recv_timeout(
&mut self,
buf: BytesMut,
timeout: Duration,
) -> Result<Option<BytesMut>>
pub fn recv_timeout( &mut self, buf: BytesMut, timeout: Duration, ) -> Result<Option<BytesMut>>
Receive data with a timeout.
The buffer should have been allocated with the desired capacity using
BytesMut::with_capacity. The capacity should be a positive multiple
of the endpoint’s maximum packet size.
Waits for space in the receive queue and enqueues the buffer for receiving data. Returns received data, if a buffer in the receive queue was filled.
Examples found in repository?
111fn run(mut ep1_rx: EndpointReceiver, mut ep2_tx: EndpointSender, mut custom: Custom) {
112 let ep1_control = ep1_rx.control().unwrap();
113 println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
114 println!("ep1 real address: {}", ep1_control.real_address().unwrap());
115 println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
116 println!();
117
118 let ep2_control = ep2_tx.control().unwrap();
119 println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
120 println!("ep2 real address: {}", ep2_control.real_address().unwrap());
121 println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
122 println!();
123
124 let stop = Arc::new(AtomicBool::new(false));
125
126 thread::scope(|s| {
127 s.spawn(|| {
128 let size = ep1_rx.max_packet_size().unwrap();
129 let mut b = 0;
130 while !stop.load(Ordering::Relaxed) {
131 let data = ep1_rx
132 .recv_timeout(BytesMut::with_capacity(size), Duration::from_secs(1))
133 .expect("recv failed");
134 match data {
135 Some(data) => {
136 println!("received {} bytes: {data:x?}", data.len());
137 if !data.iter().all(|x| *x == b) {
138 panic!("wrong data received");
139 }
140 b = b.wrapping_add(1);
141 }
142 None => {
143 println!("receive empty");
144 }
145 }
146 }
147 });
148
149 s.spawn(|| {
150 let size = ep2_tx.max_packet_size().unwrap();
151 let mut b = 0u8;
152 while !stop.load(Ordering::Relaxed) {
153 let data = vec![b; size];
154 match ep2_tx.send_timeout(data.into(), Duration::from_secs(1)) {
155 Ok(()) => {
156 println!("sent data {b} of size {size} bytes");
157 b = b.wrapping_add(1);
158 }
159 Err(err) if err.kind() == ErrorKind::TimedOut => println!("send timeout"),
160 Err(err) => panic!("send failed: {err}"),
161 }
162 }
163 });
164
165 s.spawn(|| {
166 let mut ctrl_data = Vec::new();
167
168 while !stop.load(Ordering::Relaxed) {
169 if let Some(event) = custom.event_timeout(Duration::from_secs(1)).expect("event failed") {
170 println!("Event: {event:?}");
171 match event {
172 Event::SetupHostToDevice(req) => {
173 if req.ctrl_req().request == 255 {
174 println!("Stopping");
175 stop.store(true, Ordering::Relaxed);
176 }
177 ctrl_data = req.recv_all().unwrap();
178 println!("Control data: {ctrl_data:x?}");
179 }
180 Event::SetupDeviceToHost(req) => {
181 println!("Replying with data");
182 req.send(&ctrl_data).unwrap();
183 }
184 _ => (),
185 }
186 } else {
187 println!("no event");
188 }
189 }
190 });
191 });
192}More examples
20fn main() {
21 env_logger::init();
22
23 usb_gadget::remove_all().expect("cannot remove all gadgets");
24
25 let (mut ep1_rx, ep1_dir) = EndpointDirection::host_to_device();
26 let (mut ep2_tx, ep2_dir) = EndpointDirection::device_to_host();
27
28 let (mut custom, handle) = Custom::builder()
29 .with_interface(
30 Interface::new(Class::vendor_specific(1, 2), "custom interface")
31 .with_endpoint(Endpoint::bulk(ep1_dir))
32 .with_endpoint(Endpoint::bulk(ep2_dir)),
33 )
34 .build();
35
36 let udc = default_udc().expect("cannot get UDC");
37 let reg = Gadget::new(
38 Class::vendor_specific(255, 3),
39 Id::new(6, 0x11),
40 Strings::new("manufacturer", "custom USB interface", "serial_number"),
41 )
42 .with_config(Config::new("config").with_function(handle))
43 .with_os_descriptor(OsDescriptor::microsoft())
44 .with_web_usb(WebUsb::new(0xf1, "http://webusb.org"))
45 .bind(&udc)
46 .expect("cannot bind to UDC");
47
48 println!("Custom function at {}", custom.status().unwrap().path().unwrap().display());
49 println!();
50
51 let ep1_control = ep1_rx.control().unwrap();
52 println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
53 println!("ep1 real address: {}", ep1_control.real_address().unwrap());
54 println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
55 println!();
56
57 let ep2_control = ep2_tx.control().unwrap();
58 println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
59 println!("ep2 real address: {}", ep2_control.real_address().unwrap());
60 println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
61 println!();
62
63 let stop = Arc::new(AtomicBool::new(false));
64
65 thread::scope(|s| {
66 s.spawn(|| {
67 let size = ep1_rx.max_packet_size().unwrap();
68 let mut b = 0;
69 while !stop.load(Ordering::Relaxed) {
70 let data = ep1_rx
71 .recv_timeout(BytesMut::with_capacity(size), Duration::from_secs(1))
72 .expect("recv failed");
73 match data {
74 Some(data) => {
75 println!("received {} bytes: {data:x?}", data.len());
76 if !data.iter().all(|x| *x == b) {
77 panic!("wrong data received");
78 }
79 b = b.wrapping_add(1);
80 }
81 None => {
82 println!("receive empty");
83 }
84 }
85 }
86 });
87
88 s.spawn(|| {
89 let size = ep2_tx.max_packet_size().unwrap();
90 let mut b = 0u8;
91 while !stop.load(Ordering::Relaxed) {
92 let data = vec![b; size];
93 match ep2_tx.send_timeout(data.into(), Duration::from_secs(1)) {
94 Ok(()) => {
95 println!("sent data {b} of size {size} bytes");
96 b = b.wrapping_add(1);
97 }
98 Err(err) if err.kind() == ErrorKind::TimedOut => println!("send timeout"),
99 Err(err) => panic!("send failed: {err}"),
100 }
101 }
102 });
103
104 s.spawn(|| {
105 let mut ctrl_data = Vec::new();
106
107 while !stop.load(Ordering::Relaxed) {
108 if let Some(event) = custom.event_timeout(Duration::from_secs(1)).expect("event failed") {
109 println!("Event: {event:?}");
110 match event {
111 Event::SetupHostToDevice(req) => {
112 if req.ctrl_req().request == 255 {
113 println!("Stopping");
114 stop.store(true, Ordering::Relaxed);
115 }
116 ctrl_data = req.recv_all().unwrap();
117 println!("Control data: {ctrl_data:x?}");
118 }
119 Event::SetupDeviceToHost(req) => {
120 println!("Replying with data");
121 req.send(&ctrl_data).unwrap();
122 }
123 _ => (),
124 }
125 } else {
126 println!("no event");
127 }
128 }
129 });
130 });
131
132 thread::sleep(Duration::from_secs(1));
133
134 println!("Unregistering");
135 reg.remove().unwrap();
136}Sourcepub fn try_recv(&mut self, buf: BytesMut) -> Result<()>
pub fn try_recv(&mut self, buf: BytesMut) -> Result<()>
Enqueue the buffer for receiving without waiting for receive queue space.
The buffer should have been allocated with the desired capacity using
BytesMut::with_capacity. The capacity should be a positive multiple
of the endpoint’s maximum packet size.
Fails if no receive queue space is available.
Sourcepub fn is_ready(&mut self) -> bool
pub fn is_ready(&mut self) -> bool
Whether receive queue space is available.
Receive space will only become available when fetch,
fetch_timeout or try_fetch are called.
Sourcepub fn is_empty(&mut self) -> bool
pub fn is_empty(&mut self) -> bool
Whether no buffers are enqueued for receiving data.
The receive queue will only be drained when fetch,
fetch_timeout or try_fetch are called.
Sourcepub fn fetch(&mut self) -> Result<Option<BytesMut>>
pub fn fetch(&mut self) -> Result<Option<BytesMut>>
Waits for data to be received into a previously enqueued receive buffer, then returns it.
Ok(None) is returned if no receive buffers are enqueued.
Sourcepub async fn fetch_async(&mut self) -> Result<Option<BytesMut>>
Available on crate feature tokio only.
pub async fn fetch_async(&mut self) -> Result<Option<BytesMut>>
tokio only.Asynchronously waits for data to be received into a previously enqueued receive buffer, then returns it.
Ok(None) is returned if no receive buffers are enqueued.
Sourcepub fn fetch_timeout(&mut self, timeout: Duration) -> Result<Option<BytesMut>>
pub fn fetch_timeout(&mut self, timeout: Duration) -> Result<Option<BytesMut>>
Waits for data to be received into a previously enqueued receive buffer with a timeout, then returns it.
Ok(None) is returned if no receive buffers are enqueued.