Skip to main content

EndpointReceiver

Struct EndpointReceiver 

Source
pub struct EndpointReceiver(/* private fields */);
Expand description

USB endpoint from host to device receiver.

Receiving is asynchronous and uses a queue backed by Linux AIO. Empty buffers are enqueued with recv (or try_recv / recv_async) and submitted to the kernel, which fills them in the background as USB data arrives.

Filled buffers are retrieved with fetch, fetch_timeout, or try_fetch. For a combined enqueue-and-wait operation, use recv_and_fetch.

recv, recv_async and recv_timeout automatically wait for queue space by fetching completed buffers, which they return. When using try_recv directly, call is_ready first or fetch completed buffers to ensure space is available.

§Buffer size and throughput

Buffer size has a major impact on bulk transfer throughput. The FunctionFS kernel driver submits one USB transfer per AIO read request. When the buffer capacity equals the maximum packet size (MPS, typically 512 bytes for high-speed or 1024 bytes for super-speed), each USB packet triggers a separate AIO completion — leading to high per-packet overhead and poor throughput.

Using buffers much larger than the MPS (e.g. 16 KiB) allows the kernel to batch multiple USB packets into a single read, dramatically improving throughput. A good starting point is a buffer capacity that is a multiple of the MPS and at least several KiB (e.g. 16 KiB). Use max_packet_size to query the negotiated MPS at runtime.

For high-throughput workloads, also consider increasing the queue depth (default: 16).

§Buffer size and zero-length packets (ZLP)

The USB host uses zero-length packets to signal the end of a transfer whose total size is a multiple of the maximum packet size (MPS). When the receive buffer capacity equals the MPS, each USB packet completes one read request, and a ZLP is delivered as a separate zero-length completion.

When the buffer capacity is larger than the MPS (recommended for throughput), the kernel submits a single USB transfer that spans multiple packets. A short packet — including a ZLP — terminates the transfer early and the read completes with fewer bytes than the buffer capacity. This is the standard way to detect end-of-message: any completion where received < capacity indicates that a short packet (or ZLP) arrived.

MPS-sized buffers are only necessary when each individual USB packet must be observed separately (e.g. for packet-level diagnostics).

Implementations§

Source§

impl EndpointReceiver

Source

pub fn control(&mut self) -> Result<EndpointControl<'_>>

Gets the endpoint control interface.

Examples found in repository?
examples/custom_interface_device_split.rs (line 112)
111fn run(mut ep1_rx: EndpointReceiver, mut ep2_tx: EndpointSender, mut custom: Custom) {
112    let ep1_control = ep1_rx.control().unwrap();
113    println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
114    println!("ep1 real address: {}", ep1_control.real_address().unwrap());
115    println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
116    println!();
117
118    let ep2_control = ep2_tx.control().unwrap();
119    println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
120    println!("ep2 real address: {}", ep2_control.real_address().unwrap());
121    println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
122    println!();
123
124    let stop = Arc::new(AtomicBool::new(false));
125
126    thread::scope(|s| {
127        s.spawn(|| {
128            let size = ep1_rx.max_packet_size().unwrap();
129            let mut b = 0;
130            while !stop.load(Ordering::Relaxed) {
131                let data = ep1_rx
132                    .recv_timeout(BytesMut::with_capacity(size), Duration::from_secs(1))
133                    .expect("recv failed");
134                match data {
135                    Some(data) => {
136                        println!("received {} bytes: {data:x?}", data.len());
137                        if !data.iter().all(|x| *x == b) {
138                            panic!("wrong data received");
139                        }
140                        b = b.wrapping_add(1);
141                    }
142                    None => {
143                        println!("receive empty");
144                    }
145                }
146            }
147        });
148
149        s.spawn(|| {
150            let size = ep2_tx.max_packet_size().unwrap();
151            let mut b = 0u8;
152            while !stop.load(Ordering::Relaxed) {
153                let data = vec![b; size];
154                match ep2_tx.send_timeout(data.into(), Duration::from_secs(1)) {
155                    Ok(()) => {
156                        println!("sent data {b} of size {size} bytes");
157                        b = b.wrapping_add(1);
158                    }
159                    Err(err) if err.kind() == ErrorKind::TimedOut => println!("send timeout"),
160                    Err(err) => panic!("send failed: {err}"),
161                }
162            }
163        });
164
165        s.spawn(|| {
166            let mut ctrl_data = Vec::new();
167
168            while !stop.load(Ordering::Relaxed) {
169                if let Some(event) = custom.event_timeout(Duration::from_secs(1)).expect("event failed") {
170                    println!("Event: {event:?}");
171                    match event {
172                        Event::SetupHostToDevice(req) => {
173                            if req.ctrl_req().request == 255 {
174                                println!("Stopping");
175                                stop.store(true, Ordering::Relaxed);
176                            }
177                            ctrl_data = req.recv_all().unwrap();
178                            println!("Control data: {ctrl_data:x?}");
179                        }
180                        Event::SetupDeviceToHost(req) => {
181                            println!("Replying with data");
182                            req.send(&ctrl_data).unwrap();
183                        }
184                        _ => (),
185                    }
186                } else {
187                    println!("no event");
188                }
189            }
190        });
191    });
192}
More examples
Hide additional examples
examples/custom_interface_device_async.rs (line 50)
19async fn main() {
20    env_logger::init();
21
22    usb_gadget::remove_all().expect("cannot remove all gadgets");
23
24    let (mut ep1_rx, ep1_dir) = EndpointDirection::host_to_device();
25    let (mut ep2_tx, ep2_dir) = EndpointDirection::device_to_host();
26
27    let (mut custom, handle) = Custom::builder()
28        .with_interface(
29            Interface::new(Class::vendor_specific(1, 2), "custom interface")
30                .with_endpoint(Endpoint::bulk(ep1_dir))
31                .with_endpoint(Endpoint::bulk(ep2_dir)),
32        )
33        .build();
34
35    let udc = default_udc().expect("cannot get UDC");
36    let reg = Gadget::new(
37        Class::vendor_specific(255, 3),
38        Id::new(6, 0x11),
39        Strings::new("manufacturer", "custom USB interface", "serial_number"),
40    )
41    .with_config(Config::new("config").with_function(handle))
42    .with_os_descriptor(OsDescriptor::microsoft())
43    .with_web_usb(WebUsb::new(0xf1, "http://webusb.org"))
44    .bind(&udc)
45    .expect("cannot bind to UDC");
46
47    println!("Custom function at {}", custom.status().unwrap().path().unwrap().display());
48    println!();
49
50    let ep1_control = ep1_rx.control().unwrap();
51    println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
52    println!("ep1 real address: {}", ep1_control.real_address().unwrap());
53    println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
54    println!();
55
56    let ep2_control = ep2_tx.control().unwrap();
57    println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
58    println!("ep2 real address: {}", ep2_control.real_address().unwrap());
59    println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
60    println!();
61
62    let stop = Arc::new(AtomicBool::new(false));
63
64    let stop1 = stop.clone();
65    tokio::spawn(async move {
66        let size = ep1_rx.max_packet_size().unwrap();
67        let mut b = 0;
68        while !stop1.load(Ordering::Relaxed) {
69            let data = ep1_rx.recv_async(BytesMut::with_capacity(size)).await.expect("recv_async failed");
70            match data {
71                Some(data) => {
72                    println!("received {} bytes: {data:x?}", data.len());
73                    if !data.iter().all(|x| *x == b) {
74                        panic!("wrong data received");
75                    }
76                    b = b.wrapping_add(1);
77                }
78                None => {
79                    println!("receive empty");
80                }
81            }
82        }
83    });
84
85    let stop2 = stop.clone();
86    tokio::spawn(async move {
87        let size = ep2_tx.max_packet_size().unwrap();
88        let mut b = 0u8;
89        while !stop2.load(Ordering::Relaxed) {
90            let data = vec![b; size];
91            match ep2_tx.send_async(data.into()).await {
92                Ok(()) => {
93                    println!("sent data {b} of size {size} bytes");
94                    b = b.wrapping_add(1);
95                }
96                Err(err) => panic!("send failed: {err}"),
97            }
98        }
99    });
100
101    let mut ctrl_data = Vec::new();
102    while !stop.load(Ordering::Relaxed) {
103        custom.wait_event().await.expect("wait for event failed");
104        println!("event ready");
105        let event = custom.event().expect("event failed");
106
107        println!("Event: {event:?}");
108        match event {
109            Event::SetupHostToDevice(req) => {
110                if req.ctrl_req().request == 255 {
111                    println!("Stopping");
112                    stop.store(true, Ordering::Relaxed);
113                }
114                ctrl_data = req.recv_all().unwrap();
115                println!("Control data: {ctrl_data:x?}");
116            }
117            Event::SetupDeviceToHost(req) => {
118                println!("Replying with data");
119                req.send(&ctrl_data).unwrap();
120            }
121            _ => (),
122        }
123    }
124
125    tokio::time::sleep(Duration::from_secs(1)).await;
126
127    println!("Unregistering");
128    reg.remove().unwrap();
129}
examples/custom_interface_device.rs (line 51)
20fn main() {
21    env_logger::init();
22
23    usb_gadget::remove_all().expect("cannot remove all gadgets");
24
25    let (mut ep1_rx, ep1_dir) = EndpointDirection::host_to_device();
26    let (mut ep2_tx, ep2_dir) = EndpointDirection::device_to_host();
27
28    let (mut custom, handle) = Custom::builder()
29        .with_interface(
30            Interface::new(Class::vendor_specific(1, 2), "custom interface")
31                .with_endpoint(Endpoint::bulk(ep1_dir))
32                .with_endpoint(Endpoint::bulk(ep2_dir)),
33        )
34        .build();
35
36    let udc = default_udc().expect("cannot get UDC");
37    let reg = Gadget::new(
38        Class::vendor_specific(255, 3),
39        Id::new(6, 0x11),
40        Strings::new("manufacturer", "custom USB interface", "serial_number"),
41    )
42    .with_config(Config::new("config").with_function(handle))
43    .with_os_descriptor(OsDescriptor::microsoft())
44    .with_web_usb(WebUsb::new(0xf1, "http://webusb.org"))
45    .bind(&udc)
46    .expect("cannot bind to UDC");
47
48    println!("Custom function at {}", custom.status().unwrap().path().unwrap().display());
49    println!();
50
51    let ep1_control = ep1_rx.control().unwrap();
52    println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
53    println!("ep1 real address: {}", ep1_control.real_address().unwrap());
54    println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
55    println!();
56
57    let ep2_control = ep2_tx.control().unwrap();
58    println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
59    println!("ep2 real address: {}", ep2_control.real_address().unwrap());
60    println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
61    println!();
62
63    let stop = Arc::new(AtomicBool::new(false));
64
65    thread::scope(|s| {
66        s.spawn(|| {
67            let size = ep1_rx.max_packet_size().unwrap();
68            let mut b = 0;
69            while !stop.load(Ordering::Relaxed) {
70                let data = ep1_rx
71                    .recv_timeout(BytesMut::with_capacity(size), Duration::from_secs(1))
72                    .expect("recv failed");
73                match data {
74                    Some(data) => {
75                        println!("received {} bytes: {data:x?}", data.len());
76                        if !data.iter().all(|x| *x == b) {
77                            panic!("wrong data received");
78                        }
79                        b = b.wrapping_add(1);
80                    }
81                    None => {
82                        println!("receive empty");
83                    }
84                }
85            }
86        });
87
88        s.spawn(|| {
89            let size = ep2_tx.max_packet_size().unwrap();
90            let mut b = 0u8;
91            while !stop.load(Ordering::Relaxed) {
92                let data = vec![b; size];
93                match ep2_tx.send_timeout(data.into(), Duration::from_secs(1)) {
94                    Ok(()) => {
95                        println!("sent data {b} of size {size} bytes");
96                        b = b.wrapping_add(1);
97                    }
98                    Err(err) if err.kind() == ErrorKind::TimedOut => println!("send timeout"),
99                    Err(err) => panic!("send failed: {err}"),
100                }
101            }
102        });
103
104        s.spawn(|| {
105            let mut ctrl_data = Vec::new();
106
107            while !stop.load(Ordering::Relaxed) {
108                if let Some(event) = custom.event_timeout(Duration::from_secs(1)).expect("event failed") {
109                    println!("Event: {event:?}");
110                    match event {
111                        Event::SetupHostToDevice(req) => {
112                            if req.ctrl_req().request == 255 {
113                                println!("Stopping");
114                                stop.store(true, Ordering::Relaxed);
115                            }
116                            ctrl_data = req.recv_all().unwrap();
117                            println!("Control data: {ctrl_data:x?}");
118                        }
119                        Event::SetupDeviceToHost(req) => {
120                            println!("Replying with data");
121                            req.send(&ctrl_data).unwrap();
122                        }
123                        _ => (),
124                    }
125                } else {
126                    println!("no event");
127                }
128            }
129        });
130    });
131
132    thread::sleep(Duration::from_secs(1));
133
134    println!("Unregistering");
135    reg.remove().unwrap();
136}
Source

pub fn max_packet_size(&mut self) -> Result<usize>

Maximum packet size.

Examples found in repository?
examples/custom_interface_device_split.rs (line 128)
111fn run(mut ep1_rx: EndpointReceiver, mut ep2_tx: EndpointSender, mut custom: Custom) {
112    let ep1_control = ep1_rx.control().unwrap();
113    println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
114    println!("ep1 real address: {}", ep1_control.real_address().unwrap());
115    println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
116    println!();
117
118    let ep2_control = ep2_tx.control().unwrap();
119    println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
120    println!("ep2 real address: {}", ep2_control.real_address().unwrap());
121    println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
122    println!();
123
124    let stop = Arc::new(AtomicBool::new(false));
125
126    thread::scope(|s| {
127        s.spawn(|| {
128            let size = ep1_rx.max_packet_size().unwrap();
129            let mut b = 0;
130            while !stop.load(Ordering::Relaxed) {
131                let data = ep1_rx
132                    .recv_timeout(BytesMut::with_capacity(size), Duration::from_secs(1))
133                    .expect("recv failed");
134                match data {
135                    Some(data) => {
136                        println!("received {} bytes: {data:x?}", data.len());
137                        if !data.iter().all(|x| *x == b) {
138                            panic!("wrong data received");
139                        }
140                        b = b.wrapping_add(1);
141                    }
142                    None => {
143                        println!("receive empty");
144                    }
145                }
146            }
147        });
148
149        s.spawn(|| {
150            let size = ep2_tx.max_packet_size().unwrap();
151            let mut b = 0u8;
152            while !stop.load(Ordering::Relaxed) {
153                let data = vec![b; size];
154                match ep2_tx.send_timeout(data.into(), Duration::from_secs(1)) {
155                    Ok(()) => {
156                        println!("sent data {b} of size {size} bytes");
157                        b = b.wrapping_add(1);
158                    }
159                    Err(err) if err.kind() == ErrorKind::TimedOut => println!("send timeout"),
160                    Err(err) => panic!("send failed: {err}"),
161                }
162            }
163        });
164
165        s.spawn(|| {
166            let mut ctrl_data = Vec::new();
167
168            while !stop.load(Ordering::Relaxed) {
169                if let Some(event) = custom.event_timeout(Duration::from_secs(1)).expect("event failed") {
170                    println!("Event: {event:?}");
171                    match event {
172                        Event::SetupHostToDevice(req) => {
173                            if req.ctrl_req().request == 255 {
174                                println!("Stopping");
175                                stop.store(true, Ordering::Relaxed);
176                            }
177                            ctrl_data = req.recv_all().unwrap();
178                            println!("Control data: {ctrl_data:x?}");
179                        }
180                        Event::SetupDeviceToHost(req) => {
181                            println!("Replying with data");
182                            req.send(&ctrl_data).unwrap();
183                        }
184                        _ => (),
185                    }
186                } else {
187                    println!("no event");
188                }
189            }
190        });
191    });
192}
More examples
Hide additional examples
examples/custom_interface_device_async.rs (line 66)
19async fn main() {
20    env_logger::init();
21
22    usb_gadget::remove_all().expect("cannot remove all gadgets");
23
24    let (mut ep1_rx, ep1_dir) = EndpointDirection::host_to_device();
25    let (mut ep2_tx, ep2_dir) = EndpointDirection::device_to_host();
26
27    let (mut custom, handle) = Custom::builder()
28        .with_interface(
29            Interface::new(Class::vendor_specific(1, 2), "custom interface")
30                .with_endpoint(Endpoint::bulk(ep1_dir))
31                .with_endpoint(Endpoint::bulk(ep2_dir)),
32        )
33        .build();
34
35    let udc = default_udc().expect("cannot get UDC");
36    let reg = Gadget::new(
37        Class::vendor_specific(255, 3),
38        Id::new(6, 0x11),
39        Strings::new("manufacturer", "custom USB interface", "serial_number"),
40    )
41    .with_config(Config::new("config").with_function(handle))
42    .with_os_descriptor(OsDescriptor::microsoft())
43    .with_web_usb(WebUsb::new(0xf1, "http://webusb.org"))
44    .bind(&udc)
45    .expect("cannot bind to UDC");
46
47    println!("Custom function at {}", custom.status().unwrap().path().unwrap().display());
48    println!();
49
50    let ep1_control = ep1_rx.control().unwrap();
51    println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
52    println!("ep1 real address: {}", ep1_control.real_address().unwrap());
53    println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
54    println!();
55
56    let ep2_control = ep2_tx.control().unwrap();
57    println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
58    println!("ep2 real address: {}", ep2_control.real_address().unwrap());
59    println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
60    println!();
61
62    let stop = Arc::new(AtomicBool::new(false));
63
64    let stop1 = stop.clone();
65    tokio::spawn(async move {
66        let size = ep1_rx.max_packet_size().unwrap();
67        let mut b = 0;
68        while !stop1.load(Ordering::Relaxed) {
69            let data = ep1_rx.recv_async(BytesMut::with_capacity(size)).await.expect("recv_async failed");
70            match data {
71                Some(data) => {
72                    println!("received {} bytes: {data:x?}", data.len());
73                    if !data.iter().all(|x| *x == b) {
74                        panic!("wrong data received");
75                    }
76                    b = b.wrapping_add(1);
77                }
78                None => {
79                    println!("receive empty");
80                }
81            }
82        }
83    });
84
85    let stop2 = stop.clone();
86    tokio::spawn(async move {
87        let size = ep2_tx.max_packet_size().unwrap();
88        let mut b = 0u8;
89        while !stop2.load(Ordering::Relaxed) {
90            let data = vec![b; size];
91            match ep2_tx.send_async(data.into()).await {
92                Ok(()) => {
93                    println!("sent data {b} of size {size} bytes");
94                    b = b.wrapping_add(1);
95                }
96                Err(err) => panic!("send failed: {err}"),
97            }
98        }
99    });
100
101    let mut ctrl_data = Vec::new();
102    while !stop.load(Ordering::Relaxed) {
103        custom.wait_event().await.expect("wait for event failed");
104        println!("event ready");
105        let event = custom.event().expect("event failed");
106
107        println!("Event: {event:?}");
108        match event {
109            Event::SetupHostToDevice(req) => {
110                if req.ctrl_req().request == 255 {
111                    println!("Stopping");
112                    stop.store(true, Ordering::Relaxed);
113                }
114                ctrl_data = req.recv_all().unwrap();
115                println!("Control data: {ctrl_data:x?}");
116            }
117            Event::SetupDeviceToHost(req) => {
118                println!("Replying with data");
119                req.send(&ctrl_data).unwrap();
120            }
121            _ => (),
122        }
123    }
124
125    tokio::time::sleep(Duration::from_secs(1)).await;
126
127    println!("Unregistering");
128    reg.remove().unwrap();
129}
examples/custom_interface_device.rs (line 67)
20fn main() {
21    env_logger::init();
22
23    usb_gadget::remove_all().expect("cannot remove all gadgets");
24
25    let (mut ep1_rx, ep1_dir) = EndpointDirection::host_to_device();
26    let (mut ep2_tx, ep2_dir) = EndpointDirection::device_to_host();
27
28    let (mut custom, handle) = Custom::builder()
29        .with_interface(
30            Interface::new(Class::vendor_specific(1, 2), "custom interface")
31                .with_endpoint(Endpoint::bulk(ep1_dir))
32                .with_endpoint(Endpoint::bulk(ep2_dir)),
33        )
34        .build();
35
36    let udc = default_udc().expect("cannot get UDC");
37    let reg = Gadget::new(
38        Class::vendor_specific(255, 3),
39        Id::new(6, 0x11),
40        Strings::new("manufacturer", "custom USB interface", "serial_number"),
41    )
42    .with_config(Config::new("config").with_function(handle))
43    .with_os_descriptor(OsDescriptor::microsoft())
44    .with_web_usb(WebUsb::new(0xf1, "http://webusb.org"))
45    .bind(&udc)
46    .expect("cannot bind to UDC");
47
48    println!("Custom function at {}", custom.status().unwrap().path().unwrap().display());
49    println!();
50
51    let ep1_control = ep1_rx.control().unwrap();
52    println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
53    println!("ep1 real address: {}", ep1_control.real_address().unwrap());
54    println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
55    println!();
56
57    let ep2_control = ep2_tx.control().unwrap();
58    println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
59    println!("ep2 real address: {}", ep2_control.real_address().unwrap());
60    println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
61    println!();
62
63    let stop = Arc::new(AtomicBool::new(false));
64
65    thread::scope(|s| {
66        s.spawn(|| {
67            let size = ep1_rx.max_packet_size().unwrap();
68            let mut b = 0;
69            while !stop.load(Ordering::Relaxed) {
70                let data = ep1_rx
71                    .recv_timeout(BytesMut::with_capacity(size), Duration::from_secs(1))
72                    .expect("recv failed");
73                match data {
74                    Some(data) => {
75                        println!("received {} bytes: {data:x?}", data.len());
76                        if !data.iter().all(|x| *x == b) {
77                            panic!("wrong data received");
78                        }
79                        b = b.wrapping_add(1);
80                    }
81                    None => {
82                        println!("receive empty");
83                    }
84                }
85            }
86        });
87
88        s.spawn(|| {
89            let size = ep2_tx.max_packet_size().unwrap();
90            let mut b = 0u8;
91            while !stop.load(Ordering::Relaxed) {
92                let data = vec![b; size];
93                match ep2_tx.send_timeout(data.into(), Duration::from_secs(1)) {
94                    Ok(()) => {
95                        println!("sent data {b} of size {size} bytes");
96                        b = b.wrapping_add(1);
97                    }
98                    Err(err) if err.kind() == ErrorKind::TimedOut => println!("send timeout"),
99                    Err(err) => panic!("send failed: {err}"),
100                }
101            }
102        });
103
104        s.spawn(|| {
105            let mut ctrl_data = Vec::new();
106
107            while !stop.load(Ordering::Relaxed) {
108                if let Some(event) = custom.event_timeout(Duration::from_secs(1)).expect("event failed") {
109                    println!("Event: {event:?}");
110                    match event {
111                        Event::SetupHostToDevice(req) => {
112                            if req.ctrl_req().request == 255 {
113                                println!("Stopping");
114                                stop.store(true, Ordering::Relaxed);
115                            }
116                            ctrl_data = req.recv_all().unwrap();
117                            println!("Control data: {ctrl_data:x?}");
118                        }
119                        Event::SetupDeviceToHost(req) => {
120                            println!("Replying with data");
121                            req.send(&ctrl_data).unwrap();
122                        }
123                        _ => (),
124                    }
125                } else {
126                    println!("no event");
127                }
128            }
129        });
130    });
131
132    thread::sleep(Duration::from_secs(1));
133
134    println!("Unregistering");
135    reg.remove().unwrap();
136}
Source

pub fn recv_and_fetch(&mut self, buf: BytesMut) -> Result<BytesMut>

Receive data synchronously.

The buffer should have been allocated with the desired capacity using BytesMut::with_capacity. The capacity should be a positive multiple of the endpoint’s maximum packet size.

Blocks until the operation completes and returns its result.

Source

pub fn recv_and_fetch_timeout( &mut self, buf: BytesMut, timeout: Duration, ) -> Result<BytesMut>

Receive data synchronously with a timeout.

The buffer should have been allocated with the desired capacity using BytesMut::with_capacity. The capacity should be a positive multiple of the endpoint’s maximum packet size.

Blocks until the operation completes and returns its result.

Source

pub fn recv(&mut self, buf: BytesMut) -> Result<Option<BytesMut>>

Receive data.

The buffer should have been allocated with the desired capacity using BytesMut::with_capacity. The capacity should be a positive multiple of the endpoint’s maximum packet size.

Waits for space in the receive queue and enqueues the buffer for receiving data. Returns received data, if a buffer in the receive queue was filled.

Source

pub async fn recv_async(&mut self, buf: BytesMut) -> Result<Option<BytesMut>>

Available on crate feature tokio only.

Asynchronously receive data.

The buffer should have been allocated with the desired capacity using BytesMut::with_capacity. The capacity should be a positive multiple of the endpoint’s maximum packet size.

Waits for space in the receive queue and enqueues the buffer for receiving data. Returns received data, if a buffer in the receive queue was filled.

Examples found in repository?
examples/custom_interface_device_async.rs (line 69)
19async fn main() {
20    env_logger::init();
21
22    usb_gadget::remove_all().expect("cannot remove all gadgets");
23
24    let (mut ep1_rx, ep1_dir) = EndpointDirection::host_to_device();
25    let (mut ep2_tx, ep2_dir) = EndpointDirection::device_to_host();
26
27    let (mut custom, handle) = Custom::builder()
28        .with_interface(
29            Interface::new(Class::vendor_specific(1, 2), "custom interface")
30                .with_endpoint(Endpoint::bulk(ep1_dir))
31                .with_endpoint(Endpoint::bulk(ep2_dir)),
32        )
33        .build();
34
35    let udc = default_udc().expect("cannot get UDC");
36    let reg = Gadget::new(
37        Class::vendor_specific(255, 3),
38        Id::new(6, 0x11),
39        Strings::new("manufacturer", "custom USB interface", "serial_number"),
40    )
41    .with_config(Config::new("config").with_function(handle))
42    .with_os_descriptor(OsDescriptor::microsoft())
43    .with_web_usb(WebUsb::new(0xf1, "http://webusb.org"))
44    .bind(&udc)
45    .expect("cannot bind to UDC");
46
47    println!("Custom function at {}", custom.status().unwrap().path().unwrap().display());
48    println!();
49
50    let ep1_control = ep1_rx.control().unwrap();
51    println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
52    println!("ep1 real address: {}", ep1_control.real_address().unwrap());
53    println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
54    println!();
55
56    let ep2_control = ep2_tx.control().unwrap();
57    println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
58    println!("ep2 real address: {}", ep2_control.real_address().unwrap());
59    println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
60    println!();
61
62    let stop = Arc::new(AtomicBool::new(false));
63
64    let stop1 = stop.clone();
65    tokio::spawn(async move {
66        let size = ep1_rx.max_packet_size().unwrap();
67        let mut b = 0;
68        while !stop1.load(Ordering::Relaxed) {
69            let data = ep1_rx.recv_async(BytesMut::with_capacity(size)).await.expect("recv_async failed");
70            match data {
71                Some(data) => {
72                    println!("received {} bytes: {data:x?}", data.len());
73                    if !data.iter().all(|x| *x == b) {
74                        panic!("wrong data received");
75                    }
76                    b = b.wrapping_add(1);
77                }
78                None => {
79                    println!("receive empty");
80                }
81            }
82        }
83    });
84
85    let stop2 = stop.clone();
86    tokio::spawn(async move {
87        let size = ep2_tx.max_packet_size().unwrap();
88        let mut b = 0u8;
89        while !stop2.load(Ordering::Relaxed) {
90            let data = vec![b; size];
91            match ep2_tx.send_async(data.into()).await {
92                Ok(()) => {
93                    println!("sent data {b} of size {size} bytes");
94                    b = b.wrapping_add(1);
95                }
96                Err(err) => panic!("send failed: {err}"),
97            }
98        }
99    });
100
101    let mut ctrl_data = Vec::new();
102    while !stop.load(Ordering::Relaxed) {
103        custom.wait_event().await.expect("wait for event failed");
104        println!("event ready");
105        let event = custom.event().expect("event failed");
106
107        println!("Event: {event:?}");
108        match event {
109            Event::SetupHostToDevice(req) => {
110                if req.ctrl_req().request == 255 {
111                    println!("Stopping");
112                    stop.store(true, Ordering::Relaxed);
113                }
114                ctrl_data = req.recv_all().unwrap();
115                println!("Control data: {ctrl_data:x?}");
116            }
117            Event::SetupDeviceToHost(req) => {
118                println!("Replying with data");
119                req.send(&ctrl_data).unwrap();
120            }
121            _ => (),
122        }
123    }
124
125    tokio::time::sleep(Duration::from_secs(1)).await;
126
127    println!("Unregistering");
128    reg.remove().unwrap();
129}
Source

pub fn recv_timeout( &mut self, buf: BytesMut, timeout: Duration, ) -> Result<Option<BytesMut>>

Receive data with a timeout.

The buffer should have been allocated with the desired capacity using BytesMut::with_capacity. The capacity should be a positive multiple of the endpoint’s maximum packet size.

Waits for space in the receive queue and enqueues the buffer for receiving data. Returns received data, if a buffer in the receive queue was filled.

Examples found in repository?
examples/custom_interface_device_split.rs (line 132)
111fn run(mut ep1_rx: EndpointReceiver, mut ep2_tx: EndpointSender, mut custom: Custom) {
112    let ep1_control = ep1_rx.control().unwrap();
113    println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
114    println!("ep1 real address: {}", ep1_control.real_address().unwrap());
115    println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
116    println!();
117
118    let ep2_control = ep2_tx.control().unwrap();
119    println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
120    println!("ep2 real address: {}", ep2_control.real_address().unwrap());
121    println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
122    println!();
123
124    let stop = Arc::new(AtomicBool::new(false));
125
126    thread::scope(|s| {
127        s.spawn(|| {
128            let size = ep1_rx.max_packet_size().unwrap();
129            let mut b = 0;
130            while !stop.load(Ordering::Relaxed) {
131                let data = ep1_rx
132                    .recv_timeout(BytesMut::with_capacity(size), Duration::from_secs(1))
133                    .expect("recv failed");
134                match data {
135                    Some(data) => {
136                        println!("received {} bytes: {data:x?}", data.len());
137                        if !data.iter().all(|x| *x == b) {
138                            panic!("wrong data received");
139                        }
140                        b = b.wrapping_add(1);
141                    }
142                    None => {
143                        println!("receive empty");
144                    }
145                }
146            }
147        });
148
149        s.spawn(|| {
150            let size = ep2_tx.max_packet_size().unwrap();
151            let mut b = 0u8;
152            while !stop.load(Ordering::Relaxed) {
153                let data = vec![b; size];
154                match ep2_tx.send_timeout(data.into(), Duration::from_secs(1)) {
155                    Ok(()) => {
156                        println!("sent data {b} of size {size} bytes");
157                        b = b.wrapping_add(1);
158                    }
159                    Err(err) if err.kind() == ErrorKind::TimedOut => println!("send timeout"),
160                    Err(err) => panic!("send failed: {err}"),
161                }
162            }
163        });
164
165        s.spawn(|| {
166            let mut ctrl_data = Vec::new();
167
168            while !stop.load(Ordering::Relaxed) {
169                if let Some(event) = custom.event_timeout(Duration::from_secs(1)).expect("event failed") {
170                    println!("Event: {event:?}");
171                    match event {
172                        Event::SetupHostToDevice(req) => {
173                            if req.ctrl_req().request == 255 {
174                                println!("Stopping");
175                                stop.store(true, Ordering::Relaxed);
176                            }
177                            ctrl_data = req.recv_all().unwrap();
178                            println!("Control data: {ctrl_data:x?}");
179                        }
180                        Event::SetupDeviceToHost(req) => {
181                            println!("Replying with data");
182                            req.send(&ctrl_data).unwrap();
183                        }
184                        _ => (),
185                    }
186                } else {
187                    println!("no event");
188                }
189            }
190        });
191    });
192}
More examples
Hide additional examples
examples/custom_interface_device.rs (line 71)
20fn main() {
21    env_logger::init();
22
23    usb_gadget::remove_all().expect("cannot remove all gadgets");
24
25    let (mut ep1_rx, ep1_dir) = EndpointDirection::host_to_device();
26    let (mut ep2_tx, ep2_dir) = EndpointDirection::device_to_host();
27
28    let (mut custom, handle) = Custom::builder()
29        .with_interface(
30            Interface::new(Class::vendor_specific(1, 2), "custom interface")
31                .with_endpoint(Endpoint::bulk(ep1_dir))
32                .with_endpoint(Endpoint::bulk(ep2_dir)),
33        )
34        .build();
35
36    let udc = default_udc().expect("cannot get UDC");
37    let reg = Gadget::new(
38        Class::vendor_specific(255, 3),
39        Id::new(6, 0x11),
40        Strings::new("manufacturer", "custom USB interface", "serial_number"),
41    )
42    .with_config(Config::new("config").with_function(handle))
43    .with_os_descriptor(OsDescriptor::microsoft())
44    .with_web_usb(WebUsb::new(0xf1, "http://webusb.org"))
45    .bind(&udc)
46    .expect("cannot bind to UDC");
47
48    println!("Custom function at {}", custom.status().unwrap().path().unwrap().display());
49    println!();
50
51    let ep1_control = ep1_rx.control().unwrap();
52    println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
53    println!("ep1 real address: {}", ep1_control.real_address().unwrap());
54    println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
55    println!();
56
57    let ep2_control = ep2_tx.control().unwrap();
58    println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
59    println!("ep2 real address: {}", ep2_control.real_address().unwrap());
60    println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
61    println!();
62
63    let stop = Arc::new(AtomicBool::new(false));
64
65    thread::scope(|s| {
66        s.spawn(|| {
67            let size = ep1_rx.max_packet_size().unwrap();
68            let mut b = 0;
69            while !stop.load(Ordering::Relaxed) {
70                let data = ep1_rx
71                    .recv_timeout(BytesMut::with_capacity(size), Duration::from_secs(1))
72                    .expect("recv failed");
73                match data {
74                    Some(data) => {
75                        println!("received {} bytes: {data:x?}", data.len());
76                        if !data.iter().all(|x| *x == b) {
77                            panic!("wrong data received");
78                        }
79                        b = b.wrapping_add(1);
80                    }
81                    None => {
82                        println!("receive empty");
83                    }
84                }
85            }
86        });
87
88        s.spawn(|| {
89            let size = ep2_tx.max_packet_size().unwrap();
90            let mut b = 0u8;
91            while !stop.load(Ordering::Relaxed) {
92                let data = vec![b; size];
93                match ep2_tx.send_timeout(data.into(), Duration::from_secs(1)) {
94                    Ok(()) => {
95                        println!("sent data {b} of size {size} bytes");
96                        b = b.wrapping_add(1);
97                    }
98                    Err(err) if err.kind() == ErrorKind::TimedOut => println!("send timeout"),
99                    Err(err) => panic!("send failed: {err}"),
100                }
101            }
102        });
103
104        s.spawn(|| {
105            let mut ctrl_data = Vec::new();
106
107            while !stop.load(Ordering::Relaxed) {
108                if let Some(event) = custom.event_timeout(Duration::from_secs(1)).expect("event failed") {
109                    println!("Event: {event:?}");
110                    match event {
111                        Event::SetupHostToDevice(req) => {
112                            if req.ctrl_req().request == 255 {
113                                println!("Stopping");
114                                stop.store(true, Ordering::Relaxed);
115                            }
116                            ctrl_data = req.recv_all().unwrap();
117                            println!("Control data: {ctrl_data:x?}");
118                        }
119                        Event::SetupDeviceToHost(req) => {
120                            println!("Replying with data");
121                            req.send(&ctrl_data).unwrap();
122                        }
123                        _ => (),
124                    }
125                } else {
126                    println!("no event");
127                }
128            }
129        });
130    });
131
132    thread::sleep(Duration::from_secs(1));
133
134    println!("Unregistering");
135    reg.remove().unwrap();
136}
Source

pub fn try_recv(&mut self, buf: BytesMut) -> Result<()>

Enqueue the buffer for receiving without waiting for receive queue space.

The buffer should have been allocated with the desired capacity using BytesMut::with_capacity. The capacity should be a positive multiple of the endpoint’s maximum packet size.

Fails if no receive queue space is available.

Source

pub fn is_ready(&mut self) -> bool

Whether receive queue space is available.

Receive space will only become available when fetch, fetch_timeout or try_fetch are called.

Source

pub fn is_empty(&mut self) -> bool

Whether no buffers are enqueued for receiving data.

The receive queue will only be drained when fetch, fetch_timeout or try_fetch are called.

Source

pub fn fetch(&mut self) -> Result<Option<BytesMut>>

Waits for data to be received into a previously enqueued receive buffer, then returns it.

Ok(None) is returned if no receive buffers are enqueued.

Source

pub async fn fetch_async(&mut self) -> Result<Option<BytesMut>>

Available on crate feature tokio only.

Asynchronously waits for data to be received into a previously enqueued receive buffer, then returns it.

Ok(None) is returned if no receive buffers are enqueued.

Source

pub fn fetch_timeout(&mut self, timeout: Duration) -> Result<Option<BytesMut>>

Waits for data to be received into a previously enqueued receive buffer with a timeout, then returns it.

Ok(None) is returned if no receive buffers are enqueued.

Source

pub fn try_fetch(&mut self) -> Result<Option<BytesMut>>

If data has been received into a previously enqueued receive buffer, returns it.

Does not wait for data to be received.

Source

pub fn cancel(&mut self) -> Result<()>

Removes all buffers from the receive queue and clears all errors.

Trait Implementations§

Source§

impl Debug for EndpointReceiver

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.