Skip to main content

EndpointSender

Struct EndpointSender 

Source
pub struct EndpointSender(/* private fields */);
Expand description

USB endpoint from device to host sender.

Sending is asynchronous and uses a queue backed by Linux AIO. Data is enqueued with send (or try_send / send_async) and submitted to the kernel immediately. The kernel performs the actual USB transfer in the background.

To wait for all enqueued transfers to complete, call flush. For a combined enqueue-and-wait operation, use send_and_flush.

send, send_async and send_timeout automatically wait for queue space. When using try_send directly, call ready, ready_timeout, or try_ready first to ensure space is available. These also surface errors from previously completed transfers.

§Buffer size and throughput

For best throughput, send data in chunks much larger than the endpoint’s maximum packet size (MPS). This allows the kernel to submit multi-packet USB transfers and reduces per-packet overhead. A good starting point is a chunk size that is a multiple of the MPS and at least several KiB (e.g. 16 KiB). Use max_packet_size to query the negotiated MPS at runtime.

Also consider increasing the queue depth (default: 16) so multiple transfers can be in flight simultaneously.

Implementations§

Source§

impl EndpointSender

Source

pub fn control(&mut self) -> Result<EndpointControl<'_>>

Gets the endpoint control interface.

Examples found in repository?
examples/custom_interface_device_split.rs (line 118)
111fn run(mut ep1_rx: EndpointReceiver, mut ep2_tx: EndpointSender, mut custom: Custom) {
112    let ep1_control = ep1_rx.control().unwrap();
113    println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
114    println!("ep1 real address: {}", ep1_control.real_address().unwrap());
115    println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
116    println!();
117
118    let ep2_control = ep2_tx.control().unwrap();
119    println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
120    println!("ep2 real address: {}", ep2_control.real_address().unwrap());
121    println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
122    println!();
123
124    let stop = Arc::new(AtomicBool::new(false));
125
126    thread::scope(|s| {
127        s.spawn(|| {
128            let size = ep1_rx.max_packet_size().unwrap();
129            let mut b = 0;
130            while !stop.load(Ordering::Relaxed) {
131                let data = ep1_rx
132                    .recv_timeout(BytesMut::with_capacity(size), Duration::from_secs(1))
133                    .expect("recv failed");
134                match data {
135                    Some(data) => {
136                        println!("received {} bytes: {data:x?}", data.len());
137                        if !data.iter().all(|x| *x == b) {
138                            panic!("wrong data received");
139                        }
140                        b = b.wrapping_add(1);
141                    }
142                    None => {
143                        println!("receive empty");
144                    }
145                }
146            }
147        });
148
149        s.spawn(|| {
150            let size = ep2_tx.max_packet_size().unwrap();
151            let mut b = 0u8;
152            while !stop.load(Ordering::Relaxed) {
153                let data = vec![b; size];
154                match ep2_tx.send_timeout(data.into(), Duration::from_secs(1)) {
155                    Ok(()) => {
156                        println!("sent data {b} of size {size} bytes");
157                        b = b.wrapping_add(1);
158                    }
159                    Err(err) if err.kind() == ErrorKind::TimedOut => println!("send timeout"),
160                    Err(err) => panic!("send failed: {err}"),
161                }
162            }
163        });
164
165        s.spawn(|| {
166            let mut ctrl_data = Vec::new();
167
168            while !stop.load(Ordering::Relaxed) {
169                if let Some(event) = custom.event_timeout(Duration::from_secs(1)).expect("event failed") {
170                    println!("Event: {event:?}");
171                    match event {
172                        Event::SetupHostToDevice(req) => {
173                            if req.ctrl_req().request == 255 {
174                                println!("Stopping");
175                                stop.store(true, Ordering::Relaxed);
176                            }
177                            ctrl_data = req.recv_all().unwrap();
178                            println!("Control data: {ctrl_data:x?}");
179                        }
180                        Event::SetupDeviceToHost(req) => {
181                            println!("Replying with data");
182                            req.send(&ctrl_data).unwrap();
183                        }
184                        _ => (),
185                    }
186                } else {
187                    println!("no event");
188                }
189            }
190        });
191    });
192}
More examples
Hide additional examples
examples/custom_interface_device_async.rs (line 56)
19async fn main() {
20    env_logger::init();
21
22    usb_gadget::remove_all().expect("cannot remove all gadgets");
23
24    let (mut ep1_rx, ep1_dir) = EndpointDirection::host_to_device();
25    let (mut ep2_tx, ep2_dir) = EndpointDirection::device_to_host();
26
27    let (mut custom, handle) = Custom::builder()
28        .with_interface(
29            Interface::new(Class::vendor_specific(1, 2), "custom interface")
30                .with_endpoint(Endpoint::bulk(ep1_dir))
31                .with_endpoint(Endpoint::bulk(ep2_dir)),
32        )
33        .build();
34
35    let udc = default_udc().expect("cannot get UDC");
36    let reg = Gadget::new(
37        Class::vendor_specific(255, 3),
38        Id::new(6, 0x11),
39        Strings::new("manufacturer", "custom USB interface", "serial_number"),
40    )
41    .with_config(Config::new("config").with_function(handle))
42    .with_os_descriptor(OsDescriptor::microsoft())
43    .with_web_usb(WebUsb::new(0xf1, "http://webusb.org"))
44    .bind(&udc)
45    .expect("cannot bind to UDC");
46
47    println!("Custom function at {}", custom.status().unwrap().path().unwrap().display());
48    println!();
49
50    let ep1_control = ep1_rx.control().unwrap();
51    println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
52    println!("ep1 real address: {}", ep1_control.real_address().unwrap());
53    println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
54    println!();
55
56    let ep2_control = ep2_tx.control().unwrap();
57    println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
58    println!("ep2 real address: {}", ep2_control.real_address().unwrap());
59    println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
60    println!();
61
62    let stop = Arc::new(AtomicBool::new(false));
63
64    let stop1 = stop.clone();
65    tokio::spawn(async move {
66        let size = ep1_rx.max_packet_size().unwrap();
67        let mut b = 0;
68        while !stop1.load(Ordering::Relaxed) {
69            let data = ep1_rx.recv_async(BytesMut::with_capacity(size)).await.expect("recv_async failed");
70            match data {
71                Some(data) => {
72                    println!("received {} bytes: {data:x?}", data.len());
73                    if !data.iter().all(|x| *x == b) {
74                        panic!("wrong data received");
75                    }
76                    b = b.wrapping_add(1);
77                }
78                None => {
79                    println!("receive empty");
80                }
81            }
82        }
83    });
84
85    let stop2 = stop.clone();
86    tokio::spawn(async move {
87        let size = ep2_tx.max_packet_size().unwrap();
88        let mut b = 0u8;
89        while !stop2.load(Ordering::Relaxed) {
90            let data = vec![b; size];
91            match ep2_tx.send_async(data.into()).await {
92                Ok(()) => {
93                    println!("sent data {b} of size {size} bytes");
94                    b = b.wrapping_add(1);
95                }
96                Err(err) => panic!("send failed: {err}"),
97            }
98        }
99    });
100
101    let mut ctrl_data = Vec::new();
102    while !stop.load(Ordering::Relaxed) {
103        custom.wait_event().await.expect("wait for event failed");
104        println!("event ready");
105        let event = custom.event().expect("event failed");
106
107        println!("Event: {event:?}");
108        match event {
109            Event::SetupHostToDevice(req) => {
110                if req.ctrl_req().request == 255 {
111                    println!("Stopping");
112                    stop.store(true, Ordering::Relaxed);
113                }
114                ctrl_data = req.recv_all().unwrap();
115                println!("Control data: {ctrl_data:x?}");
116            }
117            Event::SetupDeviceToHost(req) => {
118                println!("Replying with data");
119                req.send(&ctrl_data).unwrap();
120            }
121            _ => (),
122        }
123    }
124
125    tokio::time::sleep(Duration::from_secs(1)).await;
126
127    println!("Unregistering");
128    reg.remove().unwrap();
129}
examples/custom_interface_device.rs (line 57)
20fn main() {
21    env_logger::init();
22
23    usb_gadget::remove_all().expect("cannot remove all gadgets");
24
25    let (mut ep1_rx, ep1_dir) = EndpointDirection::host_to_device();
26    let (mut ep2_tx, ep2_dir) = EndpointDirection::device_to_host();
27
28    let (mut custom, handle) = Custom::builder()
29        .with_interface(
30            Interface::new(Class::vendor_specific(1, 2), "custom interface")
31                .with_endpoint(Endpoint::bulk(ep1_dir))
32                .with_endpoint(Endpoint::bulk(ep2_dir)),
33        )
34        .build();
35
36    let udc = default_udc().expect("cannot get UDC");
37    let reg = Gadget::new(
38        Class::vendor_specific(255, 3),
39        Id::new(6, 0x11),
40        Strings::new("manufacturer", "custom USB interface", "serial_number"),
41    )
42    .with_config(Config::new("config").with_function(handle))
43    .with_os_descriptor(OsDescriptor::microsoft())
44    .with_web_usb(WebUsb::new(0xf1, "http://webusb.org"))
45    .bind(&udc)
46    .expect("cannot bind to UDC");
47
48    println!("Custom function at {}", custom.status().unwrap().path().unwrap().display());
49    println!();
50
51    let ep1_control = ep1_rx.control().unwrap();
52    println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
53    println!("ep1 real address: {}", ep1_control.real_address().unwrap());
54    println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
55    println!();
56
57    let ep2_control = ep2_tx.control().unwrap();
58    println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
59    println!("ep2 real address: {}", ep2_control.real_address().unwrap());
60    println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
61    println!();
62
63    let stop = Arc::new(AtomicBool::new(false));
64
65    thread::scope(|s| {
66        s.spawn(|| {
67            let size = ep1_rx.max_packet_size().unwrap();
68            let mut b = 0;
69            while !stop.load(Ordering::Relaxed) {
70                let data = ep1_rx
71                    .recv_timeout(BytesMut::with_capacity(size), Duration::from_secs(1))
72                    .expect("recv failed");
73                match data {
74                    Some(data) => {
75                        println!("received {} bytes: {data:x?}", data.len());
76                        if !data.iter().all(|x| *x == b) {
77                            panic!("wrong data received");
78                        }
79                        b = b.wrapping_add(1);
80                    }
81                    None => {
82                        println!("receive empty");
83                    }
84                }
85            }
86        });
87
88        s.spawn(|| {
89            let size = ep2_tx.max_packet_size().unwrap();
90            let mut b = 0u8;
91            while !stop.load(Ordering::Relaxed) {
92                let data = vec![b; size];
93                match ep2_tx.send_timeout(data.into(), Duration::from_secs(1)) {
94                    Ok(()) => {
95                        println!("sent data {b} of size {size} bytes");
96                        b = b.wrapping_add(1);
97                    }
98                    Err(err) if err.kind() == ErrorKind::TimedOut => println!("send timeout"),
99                    Err(err) => panic!("send failed: {err}"),
100                }
101            }
102        });
103
104        s.spawn(|| {
105            let mut ctrl_data = Vec::new();
106
107            while !stop.load(Ordering::Relaxed) {
108                if let Some(event) = custom.event_timeout(Duration::from_secs(1)).expect("event failed") {
109                    println!("Event: {event:?}");
110                    match event {
111                        Event::SetupHostToDevice(req) => {
112                            if req.ctrl_req().request == 255 {
113                                println!("Stopping");
114                                stop.store(true, Ordering::Relaxed);
115                            }
116                            ctrl_data = req.recv_all().unwrap();
117                            println!("Control data: {ctrl_data:x?}");
118                        }
119                        Event::SetupDeviceToHost(req) => {
120                            println!("Replying with data");
121                            req.send(&ctrl_data).unwrap();
122                        }
123                        _ => (),
124                    }
125                } else {
126                    println!("no event");
127                }
128            }
129        });
130    });
131
132    thread::sleep(Duration::from_secs(1));
133
134    println!("Unregistering");
135    reg.remove().unwrap();
136}
Source

pub fn max_packet_size(&mut self) -> Result<usize>

Maximum packet size.

Examples found in repository?
examples/custom_interface_device_split.rs (line 150)
111fn run(mut ep1_rx: EndpointReceiver, mut ep2_tx: EndpointSender, mut custom: Custom) {
112    let ep1_control = ep1_rx.control().unwrap();
113    println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
114    println!("ep1 real address: {}", ep1_control.real_address().unwrap());
115    println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
116    println!();
117
118    let ep2_control = ep2_tx.control().unwrap();
119    println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
120    println!("ep2 real address: {}", ep2_control.real_address().unwrap());
121    println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
122    println!();
123
124    let stop = Arc::new(AtomicBool::new(false));
125
126    thread::scope(|s| {
127        s.spawn(|| {
128            let size = ep1_rx.max_packet_size().unwrap();
129            let mut b = 0;
130            while !stop.load(Ordering::Relaxed) {
131                let data = ep1_rx
132                    .recv_timeout(BytesMut::with_capacity(size), Duration::from_secs(1))
133                    .expect("recv failed");
134                match data {
135                    Some(data) => {
136                        println!("received {} bytes: {data:x?}", data.len());
137                        if !data.iter().all(|x| *x == b) {
138                            panic!("wrong data received");
139                        }
140                        b = b.wrapping_add(1);
141                    }
142                    None => {
143                        println!("receive empty");
144                    }
145                }
146            }
147        });
148
149        s.spawn(|| {
150            let size = ep2_tx.max_packet_size().unwrap();
151            let mut b = 0u8;
152            while !stop.load(Ordering::Relaxed) {
153                let data = vec![b; size];
154                match ep2_tx.send_timeout(data.into(), Duration::from_secs(1)) {
155                    Ok(()) => {
156                        println!("sent data {b} of size {size} bytes");
157                        b = b.wrapping_add(1);
158                    }
159                    Err(err) if err.kind() == ErrorKind::TimedOut => println!("send timeout"),
160                    Err(err) => panic!("send failed: {err}"),
161                }
162            }
163        });
164
165        s.spawn(|| {
166            let mut ctrl_data = Vec::new();
167
168            while !stop.load(Ordering::Relaxed) {
169                if let Some(event) = custom.event_timeout(Duration::from_secs(1)).expect("event failed") {
170                    println!("Event: {event:?}");
171                    match event {
172                        Event::SetupHostToDevice(req) => {
173                            if req.ctrl_req().request == 255 {
174                                println!("Stopping");
175                                stop.store(true, Ordering::Relaxed);
176                            }
177                            ctrl_data = req.recv_all().unwrap();
178                            println!("Control data: {ctrl_data:x?}");
179                        }
180                        Event::SetupDeviceToHost(req) => {
181                            println!("Replying with data");
182                            req.send(&ctrl_data).unwrap();
183                        }
184                        _ => (),
185                    }
186                } else {
187                    println!("no event");
188                }
189            }
190        });
191    });
192}
More examples
Hide additional examples
examples/custom_interface_device_async.rs (line 87)
19async fn main() {
20    env_logger::init();
21
22    usb_gadget::remove_all().expect("cannot remove all gadgets");
23
24    let (mut ep1_rx, ep1_dir) = EndpointDirection::host_to_device();
25    let (mut ep2_tx, ep2_dir) = EndpointDirection::device_to_host();
26
27    let (mut custom, handle) = Custom::builder()
28        .with_interface(
29            Interface::new(Class::vendor_specific(1, 2), "custom interface")
30                .with_endpoint(Endpoint::bulk(ep1_dir))
31                .with_endpoint(Endpoint::bulk(ep2_dir)),
32        )
33        .build();
34
35    let udc = default_udc().expect("cannot get UDC");
36    let reg = Gadget::new(
37        Class::vendor_specific(255, 3),
38        Id::new(6, 0x11),
39        Strings::new("manufacturer", "custom USB interface", "serial_number"),
40    )
41    .with_config(Config::new("config").with_function(handle))
42    .with_os_descriptor(OsDescriptor::microsoft())
43    .with_web_usb(WebUsb::new(0xf1, "http://webusb.org"))
44    .bind(&udc)
45    .expect("cannot bind to UDC");
46
47    println!("Custom function at {}", custom.status().unwrap().path().unwrap().display());
48    println!();
49
50    let ep1_control = ep1_rx.control().unwrap();
51    println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
52    println!("ep1 real address: {}", ep1_control.real_address().unwrap());
53    println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
54    println!();
55
56    let ep2_control = ep2_tx.control().unwrap();
57    println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
58    println!("ep2 real address: {}", ep2_control.real_address().unwrap());
59    println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
60    println!();
61
62    let stop = Arc::new(AtomicBool::new(false));
63
64    let stop1 = stop.clone();
65    tokio::spawn(async move {
66        let size = ep1_rx.max_packet_size().unwrap();
67        let mut b = 0;
68        while !stop1.load(Ordering::Relaxed) {
69            let data = ep1_rx.recv_async(BytesMut::with_capacity(size)).await.expect("recv_async failed");
70            match data {
71                Some(data) => {
72                    println!("received {} bytes: {data:x?}", data.len());
73                    if !data.iter().all(|x| *x == b) {
74                        panic!("wrong data received");
75                    }
76                    b = b.wrapping_add(1);
77                }
78                None => {
79                    println!("receive empty");
80                }
81            }
82        }
83    });
84
85    let stop2 = stop.clone();
86    tokio::spawn(async move {
87        let size = ep2_tx.max_packet_size().unwrap();
88        let mut b = 0u8;
89        while !stop2.load(Ordering::Relaxed) {
90            let data = vec![b; size];
91            match ep2_tx.send_async(data.into()).await {
92                Ok(()) => {
93                    println!("sent data {b} of size {size} bytes");
94                    b = b.wrapping_add(1);
95                }
96                Err(err) => panic!("send failed: {err}"),
97            }
98        }
99    });
100
101    let mut ctrl_data = Vec::new();
102    while !stop.load(Ordering::Relaxed) {
103        custom.wait_event().await.expect("wait for event failed");
104        println!("event ready");
105        let event = custom.event().expect("event failed");
106
107        println!("Event: {event:?}");
108        match event {
109            Event::SetupHostToDevice(req) => {
110                if req.ctrl_req().request == 255 {
111                    println!("Stopping");
112                    stop.store(true, Ordering::Relaxed);
113                }
114                ctrl_data = req.recv_all().unwrap();
115                println!("Control data: {ctrl_data:x?}");
116            }
117            Event::SetupDeviceToHost(req) => {
118                println!("Replying with data");
119                req.send(&ctrl_data).unwrap();
120            }
121            _ => (),
122        }
123    }
124
125    tokio::time::sleep(Duration::from_secs(1)).await;
126
127    println!("Unregistering");
128    reg.remove().unwrap();
129}
examples/custom_interface_device.rs (line 89)
20fn main() {
21    env_logger::init();
22
23    usb_gadget::remove_all().expect("cannot remove all gadgets");
24
25    let (mut ep1_rx, ep1_dir) = EndpointDirection::host_to_device();
26    let (mut ep2_tx, ep2_dir) = EndpointDirection::device_to_host();
27
28    let (mut custom, handle) = Custom::builder()
29        .with_interface(
30            Interface::new(Class::vendor_specific(1, 2), "custom interface")
31                .with_endpoint(Endpoint::bulk(ep1_dir))
32                .with_endpoint(Endpoint::bulk(ep2_dir)),
33        )
34        .build();
35
36    let udc = default_udc().expect("cannot get UDC");
37    let reg = Gadget::new(
38        Class::vendor_specific(255, 3),
39        Id::new(6, 0x11),
40        Strings::new("manufacturer", "custom USB interface", "serial_number"),
41    )
42    .with_config(Config::new("config").with_function(handle))
43    .with_os_descriptor(OsDescriptor::microsoft())
44    .with_web_usb(WebUsb::new(0xf1, "http://webusb.org"))
45    .bind(&udc)
46    .expect("cannot bind to UDC");
47
48    println!("Custom function at {}", custom.status().unwrap().path().unwrap().display());
49    println!();
50
51    let ep1_control = ep1_rx.control().unwrap();
52    println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
53    println!("ep1 real address: {}", ep1_control.real_address().unwrap());
54    println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
55    println!();
56
57    let ep2_control = ep2_tx.control().unwrap();
58    println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
59    println!("ep2 real address: {}", ep2_control.real_address().unwrap());
60    println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
61    println!();
62
63    let stop = Arc::new(AtomicBool::new(false));
64
65    thread::scope(|s| {
66        s.spawn(|| {
67            let size = ep1_rx.max_packet_size().unwrap();
68            let mut b = 0;
69            while !stop.load(Ordering::Relaxed) {
70                let data = ep1_rx
71                    .recv_timeout(BytesMut::with_capacity(size), Duration::from_secs(1))
72                    .expect("recv failed");
73                match data {
74                    Some(data) => {
75                        println!("received {} bytes: {data:x?}", data.len());
76                        if !data.iter().all(|x| *x == b) {
77                            panic!("wrong data received");
78                        }
79                        b = b.wrapping_add(1);
80                    }
81                    None => {
82                        println!("receive empty");
83                    }
84                }
85            }
86        });
87
88        s.spawn(|| {
89            let size = ep2_tx.max_packet_size().unwrap();
90            let mut b = 0u8;
91            while !stop.load(Ordering::Relaxed) {
92                let data = vec![b; size];
93                match ep2_tx.send_timeout(data.into(), Duration::from_secs(1)) {
94                    Ok(()) => {
95                        println!("sent data {b} of size {size} bytes");
96                        b = b.wrapping_add(1);
97                    }
98                    Err(err) if err.kind() == ErrorKind::TimedOut => println!("send timeout"),
99                    Err(err) => panic!("send failed: {err}"),
100                }
101            }
102        });
103
104        s.spawn(|| {
105            let mut ctrl_data = Vec::new();
106
107            while !stop.load(Ordering::Relaxed) {
108                if let Some(event) = custom.event_timeout(Duration::from_secs(1)).expect("event failed") {
109                    println!("Event: {event:?}");
110                    match event {
111                        Event::SetupHostToDevice(req) => {
112                            if req.ctrl_req().request == 255 {
113                                println!("Stopping");
114                                stop.store(true, Ordering::Relaxed);
115                            }
116                            ctrl_data = req.recv_all().unwrap();
117                            println!("Control data: {ctrl_data:x?}");
118                        }
119                        Event::SetupDeviceToHost(req) => {
120                            println!("Replying with data");
121                            req.send(&ctrl_data).unwrap();
122                        }
123                        _ => (),
124                    }
125                } else {
126                    println!("no event");
127                }
128            }
129        });
130    });
131
132    thread::sleep(Duration::from_secs(1));
133
134    println!("Unregistering");
135    reg.remove().unwrap();
136}
Source

pub fn send_and_flush(&mut self, data: Bytes) -> Result<()>

Send data synchronously.

Blocks until the send operation completes and returns its result.

Source

pub fn send_and_flush_timeout( &mut self, data: Bytes, timeout: Duration, ) -> Result<()>

Send data synchronously with a timeout.

Blocks until the send operation completes and returns its result.

Source

pub fn send(&mut self, data: Bytes) -> Result<()>

Enqueue data for sending.

Blocks until send space is available. Also returns errors of previously enqueued send operations.

Source

pub async fn send_async(&mut self, data: Bytes) -> Result<()>

Available on crate feature tokio only.

Asynchronously Enqueue data for sending.

Waits until send space is available. Also returns errors of previously enqueued send operations.

Examples found in repository?
examples/custom_interface_device_async.rs (line 91)
19async fn main() {
20    env_logger::init();
21
22    usb_gadget::remove_all().expect("cannot remove all gadgets");
23
24    let (mut ep1_rx, ep1_dir) = EndpointDirection::host_to_device();
25    let (mut ep2_tx, ep2_dir) = EndpointDirection::device_to_host();
26
27    let (mut custom, handle) = Custom::builder()
28        .with_interface(
29            Interface::new(Class::vendor_specific(1, 2), "custom interface")
30                .with_endpoint(Endpoint::bulk(ep1_dir))
31                .with_endpoint(Endpoint::bulk(ep2_dir)),
32        )
33        .build();
34
35    let udc = default_udc().expect("cannot get UDC");
36    let reg = Gadget::new(
37        Class::vendor_specific(255, 3),
38        Id::new(6, 0x11),
39        Strings::new("manufacturer", "custom USB interface", "serial_number"),
40    )
41    .with_config(Config::new("config").with_function(handle))
42    .with_os_descriptor(OsDescriptor::microsoft())
43    .with_web_usb(WebUsb::new(0xf1, "http://webusb.org"))
44    .bind(&udc)
45    .expect("cannot bind to UDC");
46
47    println!("Custom function at {}", custom.status().unwrap().path().unwrap().display());
48    println!();
49
50    let ep1_control = ep1_rx.control().unwrap();
51    println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
52    println!("ep1 real address: {}", ep1_control.real_address().unwrap());
53    println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
54    println!();
55
56    let ep2_control = ep2_tx.control().unwrap();
57    println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
58    println!("ep2 real address: {}", ep2_control.real_address().unwrap());
59    println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
60    println!();
61
62    let stop = Arc::new(AtomicBool::new(false));
63
64    let stop1 = stop.clone();
65    tokio::spawn(async move {
66        let size = ep1_rx.max_packet_size().unwrap();
67        let mut b = 0;
68        while !stop1.load(Ordering::Relaxed) {
69            let data = ep1_rx.recv_async(BytesMut::with_capacity(size)).await.expect("recv_async failed");
70            match data {
71                Some(data) => {
72                    println!("received {} bytes: {data:x?}", data.len());
73                    if !data.iter().all(|x| *x == b) {
74                        panic!("wrong data received");
75                    }
76                    b = b.wrapping_add(1);
77                }
78                None => {
79                    println!("receive empty");
80                }
81            }
82        }
83    });
84
85    let stop2 = stop.clone();
86    tokio::spawn(async move {
87        let size = ep2_tx.max_packet_size().unwrap();
88        let mut b = 0u8;
89        while !stop2.load(Ordering::Relaxed) {
90            let data = vec![b; size];
91            match ep2_tx.send_async(data.into()).await {
92                Ok(()) => {
93                    println!("sent data {b} of size {size} bytes");
94                    b = b.wrapping_add(1);
95                }
96                Err(err) => panic!("send failed: {err}"),
97            }
98        }
99    });
100
101    let mut ctrl_data = Vec::new();
102    while !stop.load(Ordering::Relaxed) {
103        custom.wait_event().await.expect("wait for event failed");
104        println!("event ready");
105        let event = custom.event().expect("event failed");
106
107        println!("Event: {event:?}");
108        match event {
109            Event::SetupHostToDevice(req) => {
110                if req.ctrl_req().request == 255 {
111                    println!("Stopping");
112                    stop.store(true, Ordering::Relaxed);
113                }
114                ctrl_data = req.recv_all().unwrap();
115                println!("Control data: {ctrl_data:x?}");
116            }
117            Event::SetupDeviceToHost(req) => {
118                println!("Replying with data");
119                req.send(&ctrl_data).unwrap();
120            }
121            _ => (),
122        }
123    }
124
125    tokio::time::sleep(Duration::from_secs(1)).await;
126
127    println!("Unregistering");
128    reg.remove().unwrap();
129}
Source

pub fn send_timeout(&mut self, data: Bytes, timeout: Duration) -> Result<()>

Enqueue data for sending with a timeout.

Blocks until send space is available with the specified timeout. Also returns errors of previously enqueued send operations.

Examples found in repository?
examples/custom_interface_device_split.rs (line 154)
111fn run(mut ep1_rx: EndpointReceiver, mut ep2_tx: EndpointSender, mut custom: Custom) {
112    let ep1_control = ep1_rx.control().unwrap();
113    println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
114    println!("ep1 real address: {}", ep1_control.real_address().unwrap());
115    println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
116    println!();
117
118    let ep2_control = ep2_tx.control().unwrap();
119    println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
120    println!("ep2 real address: {}", ep2_control.real_address().unwrap());
121    println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
122    println!();
123
124    let stop = Arc::new(AtomicBool::new(false));
125
126    thread::scope(|s| {
127        s.spawn(|| {
128            let size = ep1_rx.max_packet_size().unwrap();
129            let mut b = 0;
130            while !stop.load(Ordering::Relaxed) {
131                let data = ep1_rx
132                    .recv_timeout(BytesMut::with_capacity(size), Duration::from_secs(1))
133                    .expect("recv failed");
134                match data {
135                    Some(data) => {
136                        println!("received {} bytes: {data:x?}", data.len());
137                        if !data.iter().all(|x| *x == b) {
138                            panic!("wrong data received");
139                        }
140                        b = b.wrapping_add(1);
141                    }
142                    None => {
143                        println!("receive empty");
144                    }
145                }
146            }
147        });
148
149        s.spawn(|| {
150            let size = ep2_tx.max_packet_size().unwrap();
151            let mut b = 0u8;
152            while !stop.load(Ordering::Relaxed) {
153                let data = vec![b; size];
154                match ep2_tx.send_timeout(data.into(), Duration::from_secs(1)) {
155                    Ok(()) => {
156                        println!("sent data {b} of size {size} bytes");
157                        b = b.wrapping_add(1);
158                    }
159                    Err(err) if err.kind() == ErrorKind::TimedOut => println!("send timeout"),
160                    Err(err) => panic!("send failed: {err}"),
161                }
162            }
163        });
164
165        s.spawn(|| {
166            let mut ctrl_data = Vec::new();
167
168            while !stop.load(Ordering::Relaxed) {
169                if let Some(event) = custom.event_timeout(Duration::from_secs(1)).expect("event failed") {
170                    println!("Event: {event:?}");
171                    match event {
172                        Event::SetupHostToDevice(req) => {
173                            if req.ctrl_req().request == 255 {
174                                println!("Stopping");
175                                stop.store(true, Ordering::Relaxed);
176                            }
177                            ctrl_data = req.recv_all().unwrap();
178                            println!("Control data: {ctrl_data:x?}");
179                        }
180                        Event::SetupDeviceToHost(req) => {
181                            println!("Replying with data");
182                            req.send(&ctrl_data).unwrap();
183                        }
184                        _ => (),
185                    }
186                } else {
187                    println!("no event");
188                }
189            }
190        });
191    });
192}
More examples
Hide additional examples
examples/custom_interface_device.rs (line 93)
20fn main() {
21    env_logger::init();
22
23    usb_gadget::remove_all().expect("cannot remove all gadgets");
24
25    let (mut ep1_rx, ep1_dir) = EndpointDirection::host_to_device();
26    let (mut ep2_tx, ep2_dir) = EndpointDirection::device_to_host();
27
28    let (mut custom, handle) = Custom::builder()
29        .with_interface(
30            Interface::new(Class::vendor_specific(1, 2), "custom interface")
31                .with_endpoint(Endpoint::bulk(ep1_dir))
32                .with_endpoint(Endpoint::bulk(ep2_dir)),
33        )
34        .build();
35
36    let udc = default_udc().expect("cannot get UDC");
37    let reg = Gadget::new(
38        Class::vendor_specific(255, 3),
39        Id::new(6, 0x11),
40        Strings::new("manufacturer", "custom USB interface", "serial_number"),
41    )
42    .with_config(Config::new("config").with_function(handle))
43    .with_os_descriptor(OsDescriptor::microsoft())
44    .with_web_usb(WebUsb::new(0xf1, "http://webusb.org"))
45    .bind(&udc)
46    .expect("cannot bind to UDC");
47
48    println!("Custom function at {}", custom.status().unwrap().path().unwrap().display());
49    println!();
50
51    let ep1_control = ep1_rx.control().unwrap();
52    println!("ep1 unclaimed: {:?}", ep1_control.unclaimed_fifo());
53    println!("ep1 real address: {}", ep1_control.real_address().unwrap());
54    println!("ep1 descriptor: {:?}", ep1_control.descriptor().unwrap());
55    println!();
56
57    let ep2_control = ep2_tx.control().unwrap();
58    println!("ep2 unclaimed: {:?}", ep2_control.unclaimed_fifo());
59    println!("ep2 real address: {}", ep2_control.real_address().unwrap());
60    println!("ep2 descriptor: {:?}", ep2_control.descriptor().unwrap());
61    println!();
62
63    let stop = Arc::new(AtomicBool::new(false));
64
65    thread::scope(|s| {
66        s.spawn(|| {
67            let size = ep1_rx.max_packet_size().unwrap();
68            let mut b = 0;
69            while !stop.load(Ordering::Relaxed) {
70                let data = ep1_rx
71                    .recv_timeout(BytesMut::with_capacity(size), Duration::from_secs(1))
72                    .expect("recv failed");
73                match data {
74                    Some(data) => {
75                        println!("received {} bytes: {data:x?}", data.len());
76                        if !data.iter().all(|x| *x == b) {
77                            panic!("wrong data received");
78                        }
79                        b = b.wrapping_add(1);
80                    }
81                    None => {
82                        println!("receive empty");
83                    }
84                }
85            }
86        });
87
88        s.spawn(|| {
89            let size = ep2_tx.max_packet_size().unwrap();
90            let mut b = 0u8;
91            while !stop.load(Ordering::Relaxed) {
92                let data = vec![b; size];
93                match ep2_tx.send_timeout(data.into(), Duration::from_secs(1)) {
94                    Ok(()) => {
95                        println!("sent data {b} of size {size} bytes");
96                        b = b.wrapping_add(1);
97                    }
98                    Err(err) if err.kind() == ErrorKind::TimedOut => println!("send timeout"),
99                    Err(err) => panic!("send failed: {err}"),
100                }
101            }
102        });
103
104        s.spawn(|| {
105            let mut ctrl_data = Vec::new();
106
107            while !stop.load(Ordering::Relaxed) {
108                if let Some(event) = custom.event_timeout(Duration::from_secs(1)).expect("event failed") {
109                    println!("Event: {event:?}");
110                    match event {
111                        Event::SetupHostToDevice(req) => {
112                            if req.ctrl_req().request == 255 {
113                                println!("Stopping");
114                                stop.store(true, Ordering::Relaxed);
115                            }
116                            ctrl_data = req.recv_all().unwrap();
117                            println!("Control data: {ctrl_data:x?}");
118                        }
119                        Event::SetupDeviceToHost(req) => {
120                            println!("Replying with data");
121                            req.send(&ctrl_data).unwrap();
122                        }
123                        _ => (),
124                    }
125                } else {
126                    println!("no event");
127                }
128            }
129        });
130    });
131
132    thread::sleep(Duration::from_secs(1));
133
134    println!("Unregistering");
135    reg.remove().unwrap();
136}
Source

pub fn try_send(&mut self, data: Bytes) -> Result<()>

Enqueue data for sending without waiting for send space.

Fails if no send space is available. Also returns errors of previously enqueued send operations.

Source

pub fn is_ready(&mut self) -> bool

Whether send space is available.

Send space will only become available when ready, ready_timeout or try_ready are called.

Source

pub fn is_empty(&mut self) -> bool

Whether the send queue is empty.

The send queue will only be drained when ready, ready_timeout or try_ready are called.

Source

pub async fn wait_ready(&mut self) -> Result<()>

Available on crate feature tokio only.

Asynchronously wait for send space to be available.

Also returns errors of previously enqueued send operations.

Source

pub fn ready(&mut self) -> Result<()>

Wait for send space to be available.

Also returns errors of previously enqueued send operations.

Source

pub fn ready_timeout(&mut self, timeout: Duration) -> Result<()>

Wait for send space to be available with a timeout.

Also returns errors of previously enqueued send operations.

Source

pub fn try_ready(&mut self) -> Result<()>

Check for availability of send space.

Also returns errors of previously enqueued send operations.

Source

pub fn flush(&mut self) -> Result<()>

Waits for all enqueued data to be sent.

Returns an error if any enqueued send operation has failed.

Source

pub async fn flush_async(&mut self) -> Result<()>

Available on crate feature tokio only.

Waits for all enqueued data to be sent.

Returns an error if any enqueued send operation has failed.

Source

pub fn flush_timeout(&mut self, timeout: Duration) -> Result<()>

Waits for all enqueued data to be sent with a timeout.

Returns an error if any enqueued send operation has failed.

Source

pub fn cancel(&mut self) -> Result<()>

Removes all data from the send queue and clears all errors.

Trait Implementations§

Source§

impl Debug for EndpointSender

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.