1use crate::{event, inet::ExplicitCongestionNotification, path};
5use core::{
6 task::{Context, Poll},
7 time::Duration,
8};
9
10pub mod handle_map;
11pub mod router;
12
13pub trait Tx: Sized {
14 type PathHandle;
15 type Queue: Queue<Handle = Self::PathHandle>;
18 type Error;
19
20 #[inline]
22 fn ready(&mut self) -> TxReady<'_, Self> {
23 TxReady(self)
24 }
25
26 fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>>;
28
29 fn queue<F: FnOnce(&mut Self::Queue)>(&mut self, f: F);
31
32 fn handle_error<E: event::EndpointPublisher>(self, error: Self::Error, event: &mut E);
34}
35
36impl_ready_future!(Tx, TxReady, Result<(), T::Error>);
37
38pub trait TxExt: Tx {
40 #[inline]
42 fn with_router<Router, Other>(
43 self,
44 router: Router,
45 other: Other,
46 ) -> router::Channel<Router, Self, Other>
47 where
48 Router: router::Router,
49 Other: Tx,
50 {
51 router::Channel {
52 router,
53 a: self,
54 b: other,
55 }
56 }
57
58 #[inline]
60 fn with_handle_map<Map, Handle>(self, map: Map) -> handle_map::Channel<Map, Self, Handle>
61 where
62 Map: Fn(&Handle) -> Self::PathHandle,
63 {
64 handle_map::Channel {
65 map,
66 tx: self,
67 handle: Default::default(),
68 }
69 }
70}
71
72impl<T: Tx> TxExt for T {}
74
75pub trait Queue {
77 type Handle: path::Handle;
78
79 const SUPPORTS_ECN: bool = false;
81
82 const SUPPORTS_PACING: bool = false;
84
85 const SUPPORTS_FLOW_LABELS: bool = false;
87
88 fn push<M: Message<Handle = Self::Handle>>(&mut self, message: M) -> Result<Outcome, Error>;
93
94 #[inline]
98 fn flush(&mut self) {
99 }
101
102 fn capacity(&self) -> usize;
104
105 #[inline]
107 fn has_capacity(&self) -> bool {
108 self.capacity() != 0
109 }
110}
111
112pub struct Outcome {
113 pub len: usize,
114 pub index: usize,
115}
116
117#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq)]
118pub enum Error {
119 EmptyPayload,
121
122 UndersizedBuffer,
124
125 AtCapacity,
127}
128
129pub trait Message {
136 type Handle: path::Handle;
137
138 fn path_handle(&self) -> &Self::Handle;
140
141 fn ecn(&mut self) -> ExplicitCongestionNotification;
143
144 fn delay(&mut self) -> Duration;
148
149 fn ipv6_flow_label(&mut self) -> u32;
151
152 fn can_gso(&self, segment_len: usize, segment_count: usize) -> bool;
154
155 fn write_payload(&mut self, buffer: PayloadBuffer, gso_offset: usize) -> Result<usize, Error>;
157}
158
159#[derive(Debug)]
160pub struct PayloadBuffer<'a>(&'a mut [u8]);
161
162impl<'a> PayloadBuffer<'a> {
163 #[inline]
164 pub fn new(bytes: &'a mut [u8]) -> Self {
165 Self(bytes)
166 }
167
168 #[inline]
172 pub unsafe fn into_mut_slice(self) -> &'a mut [u8] {
173 self.0
174 }
175
176 #[track_caller]
177 #[inline]
178 pub fn write(&mut self, bytes: &[u8]) -> Result<usize, Error> {
179 if bytes.is_empty() {
180 return Err(Error::EmptyPayload);
181 }
182
183 if let Some(buffer) = self.0.get_mut(0..bytes.len()) {
184 buffer.copy_from_slice(bytes);
185 Ok(bytes.len())
186 } else {
187 debug_assert!(
188 false,
189 "tried to write more bytes than was available in the buffer"
190 );
191 Err(Error::UndersizedBuffer)
192 }
193 }
194}
195
196impl<Handle: path::Handle, Payload: AsRef<[u8]>> Message for (Handle, Payload) {
197 type Handle = Handle;
198
199 fn path_handle(&self) -> &Self::Handle {
200 &self.0
201 }
202
203 fn ecn(&mut self) -> ExplicitCongestionNotification {
204 Default::default()
205 }
206
207 fn delay(&mut self) -> Duration {
208 Default::default()
209 }
210
211 fn ipv6_flow_label(&mut self) -> u32 {
212 0
213 }
214
215 fn can_gso(&self, segment_len: usize, _segment_count: usize) -> bool {
216 segment_len >= self.1.as_ref().len()
217 }
218
219 fn write_payload(
220 &mut self,
221 mut buffer: PayloadBuffer,
222 _gso_offset: usize,
223 ) -> Result<usize, Error> {
224 buffer.write(self.1.as_ref())
225 }
226}
227
228impl<Handle: path::Handle, Payload: AsRef<[u8]>> Message
229 for (Handle, ExplicitCongestionNotification, Payload)
230{
231 type Handle = Handle;
232
233 fn path_handle(&self) -> &Self::Handle {
234 &self.0
235 }
236
237 fn ecn(&mut self) -> ExplicitCongestionNotification {
238 self.1
239 }
240
241 fn delay(&mut self) -> Duration {
242 Default::default()
243 }
244
245 fn ipv6_flow_label(&mut self) -> u32 {
246 0
247 }
248
249 fn can_gso(&self, segment_len: usize, _segment_count: usize) -> bool {
250 segment_len >= self.2.as_ref().len()
251 }
252
253 fn write_payload(
254 &mut self,
255 mut buffer: PayloadBuffer,
256 _gso_offset: usize,
257 ) -> Result<usize, Error> {
258 buffer.write(self.2.as_ref())
259 }
260}
261
262#[cfg(test)]
263mod tests {
264 use super::*;
265 use crate::inet::SocketAddressV4;
266
267 #[test]
268 fn message_tuple_test() {
269 let remote_address = SocketAddressV4::new([127, 0, 0, 1], 80).into();
270 let local_address = SocketAddressV4::new([192, 168, 0, 1], 3000).into();
271 let tuple = path::Tuple {
272 remote_address,
273 local_address,
274 };
275 let mut message = (tuple, [1u8, 2, 3]);
276
277 let mut buffer = [0u8; 10];
278
279 assert_eq!(*message.path_handle(), tuple);
280 assert_eq!(message.ecn(), Default::default());
281 assert_eq!(message.delay(), Default::default());
282 assert_eq!(message.ipv6_flow_label(), 0);
283 assert_eq!(
284 message.write_payload(PayloadBuffer::new(&mut buffer[..]), 0),
285 Ok(3)
286 );
287 }
288
289 #[test]
290 #[should_panic]
291 fn message_tuple_undersized_test() {
292 let remote_address = SocketAddressV4::new([127, 0, 0, 1], 80).into();
293 let local_address = SocketAddressV4::new([192, 168, 0, 1], 3000).into();
294 let tuple = path::Tuple {
295 remote_address,
296 local_address,
297 };
298 let mut message = (tuple, [1u8, 2, 3]);
299
300 let _ = message.write_payload(PayloadBuffer::new(&mut [][..]), 0);
302 }
303}