1extern crate alloc;
4use core::{cmp, iter, mem};
5
6use alloc::rc::Rc;
7use alloc::collections::VecDeque;
8
9use ethox::{layer, nic, wire};
10use ethox::managed::Partial;
11use io_uring::opcode::{SendMsg, RecvMsg, types::Target};
12
13mod pool;
14
15pub struct RawRing {
16 io_ring: io_uring::IoUring,
18 #[allow(dead_code)] memory: Rc<pool::Pool>,
21 fd: libc::c_int,
23 io_queue: Queue,
24}
25
26struct SubmitInterface<'io> {
27 inner: &'io mut io_uring::SubmissionQueue,
28 fd: libc::c_int,
29}
30
31pub struct PacketBuf {
32 inner: Partial<pool::Entry>,
33}
34
35pub struct Handle {
36 state: State,
37 info: PacketInfo,
38}
39
40struct Queue {
42 buffers: mem::ManuallyDrop<Box<[PacketData]>>,
51 to_send: VecDeque<usize>,
53 to_recv: VecDeque<usize>,
55 free: VecDeque<usize>,
57}
58
59#[derive(Clone, Copy, Debug, PartialEq, Eq)]
60enum State {
61 Raw,
62 Received,
63 Unsent,
64 Sending,
65 Receiving,
66}
67
68struct PacketInfo {
69 timestamp: ethox::time::Instant,
71}
72
73struct Tag(u64);
74
75struct PacketData {
76 handle: Handle,
77 buffer: PacketBuf,
78 io_vec: libc::iovec,
79 io_hdr: libc::msghdr,
80}
81
82impl RawRing {
83 pub fn from_fd(fd: libc::c_int) -> Result<Self, std::io::Error> {
84 let ring = io_uring::Builder::default()
85 .build(32)?;
89 Ok(RawRing::from_ring(ring, fd))
90 }
91
92 pub fn from_ring(io_ring: io_uring::IoUring, fd: libc::c_int) -> Self {
93 let memory = Rc::new(pool::Pool::with_size_and_count(2048, 128));
95 let io_queue = Queue::with_capacity(Rc::clone(&memory), 32);
96 RawRing {
97 io_ring,
98 memory,
99 fd,
100 io_queue,
101 }
102 }
103
104 pub fn flush_and_reap(&mut self) -> std::io::Result<usize> {
105 self.io_queue.reap(self.io_ring.completion());
107 let result = self.io_ring.submit();
109 self.io_queue.reap(self.io_ring.completion());
111 result
112 }
113}
114
115impl SubmitInterface<'_> {
116 fn open_slots(&self) -> usize {
117 self.inner.capacity() - self.inner.len()
118 }
119
120 unsafe fn submit_send<'local>(
123 &mut self,
124 data: impl Iterator<Item=(&'local mut PacketData, Tag)> + ExactSizeIterator,
125 ) {
126 let mut submission = self.inner.available();
127 let remaining = submission.capacity() - submission.len();
128 assert!(data.len() <= remaining);
129
130 for (packet, Tag(tag)) in data {
131 packet.io_hdr.msg_iov = &mut packet.io_vec;
132 packet.io_hdr.msg_iovlen = 1;
133 let send = SendMsg::new(Target::Fd(self.fd), &packet.io_hdr)
134 .build()
135 .user_data(tag);
136 #[allow(unused_unsafe)]
137 match unsafe {
138 submission.push(send)
139 } {
140 Ok(()) => packet.handle.state = State::Sending,
141 Err(_) => panic!("Pushed into full queue"),
143 }
144 }
145 }
146
147 unsafe fn submit_recv<'local>(
150 &mut self,
151 data: impl Iterator<Item=(&'local mut PacketData, Tag)> + ExactSizeIterator,
152 ) {
153 let mut submission = self.inner.available();
154 let remaining = submission.capacity() - submission.len();
155 assert!(data.len() <= remaining);
156
157 for (packet, Tag(tag)) in data {
158 packet.io_hdr.msg_iov = &mut packet.io_vec;
159 packet.io_hdr.msg_iovlen = 1;
160 let send = RecvMsg::new(Target::Fd(self.fd), &mut packet.io_hdr)
161 .flags(libc::MSG_DONTWAIT as u32)
163 .build()
164 .user_data(tag);
165 #[allow(unused_unsafe)]
166 match unsafe {
167 submission.push(send)
168 } {
169 Ok(()) => packet.handle.state = State::Receiving,
170 Err(_) => panic!("Pushed into full queue"),
172 }
173 }
174 }
175}
176
177impl PacketData {
178 pub fn new(buffer: pool::Entry) -> Self {
179 let io_vec = pool::Entry::io_vec(&buffer);
180 PacketData {
181 handle: Handle {
182 state: State::Raw,
183 info: PacketInfo {
184 timestamp: ethox::time::Instant::from_secs(0),
185 },
186 },
187 buffer: PacketBuf {
188 inner: Partial::new(buffer),
189 },
190 io_vec,
191 io_hdr: unsafe { mem::zeroed() },
193 }
194 }
195}
196
197impl SubmitInterface<'_> {
198 fn borrow(&mut self) -> SubmitInterface<'_> {
199 SubmitInterface { fd: self.fd, inner: self.inner }
200 }
201}
202
203impl Drop for RawRing {
204 fn drop(&mut self) {
205 unsafe {
206 libc::close(self.fd);
207 }
208 }
209}
210
211impl nic::Device for RawRing {
212 type Payload = PacketBuf;
213 type Handle = Handle;
214
215 fn personality(&self) -> nic::Personality {
216 nic::Personality::baseline()
217 }
218
219 fn rx(&mut self, max: usize, mut receiver: impl nic::Recv<Handle, PacketBuf>)
220 -> layer::Result<usize>
221 {
222 let (submitter, submission, completion) = self.io_ring.split();
223 let mut submit = SubmitInterface {
224 inner: submission,
225 fd: self.fd,
226 };
227
228 self.io_queue.fill(submit.borrow());
229 submitter.submit().map_err(|_| layer::Error::Illegal)?;
230 self.io_queue.reap(completion);
231
232 let mut count = 0;
233
234 for _ in 0..max {
235 let idx = match self.io_queue.pop_recv() {
236 Some(idx) => idx,
237 None => break,
238 };
239
240 let packet = self.io_queue.get_mut(idx).unwrap();
241 count += 1;
242 receiver.receive(nic::Packet {
243 handle: &mut packet.handle,
244 payload: &mut packet.buffer,
245 });
246
247 match packet.handle.state {
248 State::Unsent => {
249 self.io_queue.push_send(idx)
250 },
251 State::Received => {
252 packet.handle.state = State::Raw;
253 self.io_queue.push_free(idx);
254 },
255 other => panic!("Unexpected operation {:?} associated with retransmission buffer.", other),
256 }
257 }
258
259 self.io_queue.flush(submit);
260 self.io_ring.submit().map_err(|_| layer::Error::Illegal)?;
261
262 Ok(count)
263 }
264
265 fn tx(&mut self, max: usize, mut sender: impl nic::Send<Handle, PacketBuf>)
266 -> layer::Result<usize>
267 {
268 let (_, submission, _) = self.io_ring.split();
269 let submit = SubmitInterface {
270 inner: submission,
271 fd: self.fd,
272 };
273
274 let mut count = 0;
275 let max = cmp::min(max, submit.open_slots());
276
277 for _ in 0..max {
278 let idx = match self.io_queue.pop_free() {
279 Some(idx) => idx,
280 None => break,
281 };
282
283 let packet = self.io_queue.get_mut(idx).unwrap();
284 packet.handle.state = State::Raw;
285 packet.handle.info.timestamp = ethox::time::Instant::now();
286
287 sender.send(nic::Packet {
288 handle: &mut packet.handle,
289 payload: &mut packet.buffer,
290 });
291
292 match packet.handle.state {
293 State::Unsent => {
294 self.io_queue.push_send(idx);
295 count += 1;
296 },
297 State::Raw => {
298 packet.handle.state = State::Raw;
299 self.io_queue.push_free(idx);
300 },
301 other => panic!("Unexpected operation {:?} associated with transmission buffer.", other),
302 }
303 }
304
305 self.io_queue.flush(submit);
306 self.io_ring.submit().map_err(|_| layer::Error::Illegal)?;
307
308 Ok(count)
309 }
310}
311
312impl Queue {
313 fn with_capacity(pool: Rc<pool::Pool>, capacity: usize) -> Self {
314 assert_eq!(capacity as u64 as usize, capacity, "Indexing does not survive roundtrip");
315 let entries = pool::Pool::spawn_entries(pool)
316 .take(capacity)
317 .map(PacketData::new)
318 .collect::<Vec<_>>()
319 .into_boxed_slice();
320
321 Queue {
322 buffers: mem::ManuallyDrop::new(entries),
323 to_send: VecDeque::with_capacity(capacity),
324 to_recv: VecDeque::with_capacity(capacity),
325 free: (0..capacity).collect(),
326 }
327 }
328
329 fn get_mut(&mut self, idx: usize) -> Option<&mut PacketData> {
330 self.buffers.get_mut(idx)
331 }
332
333 fn push_send(&mut self, idx: usize) {
334 self.to_send.push_back(idx);
335 }
336
337 fn pop_recv(&mut self) -> Option<usize> {
338 self.to_recv.pop_front()
339 }
340
341 fn push_free(&mut self, idx: usize) {
342 self.free.push_back(idx);
343 }
344
345 fn pop_free(&mut self) -> Option<usize> {
346 self.free.pop_front()
347 }
348
349 fn fill(&mut self, mut submit: SubmitInterface) {
350 let max = submit.open_slots();
351 for _ in 0..max {
352 let idx = match self.free.pop_front() {
353 Some(idx) => idx,
354 None => break,
355 };
356 let packet = self.buffers.get_mut(idx).unwrap();
357 packet.io_vec.iov_len = packet.buffer.inner.capacity();
358 assert_eq!(packet.handle.state, State::Raw);
359 let tag = Tag(idx as u64);
360 unsafe {
361 submit.submit_recv(iter::once((packet, tag)));
362 }
363 }
364 }
365
366 fn reap(&mut self, cq: &mut io_uring::CompletionQueue) {
367 for entry in cq.available() {
368 let idx = entry.user_data() as usize;
369 let packet = self.get_mut(idx).unwrap();
370 match packet.handle.state {
371 State::Sending => {
372 packet.handle.state = State::Raw;
373 self.push_free(idx);
374 continue;
375 },
376 State::Receiving => (),
377 other => panic!("Unexpected operation {:?} associated with completed buffer.", other),
378 }
379
380 if entry.result() >= 0 {
381 packet.handle.state = State::Received;
382 packet.buffer.inner.set_len_unchecked(entry.result() as usize);
383 packet.handle.info.timestamp = ethox::time::Instant::now();
384 self.to_recv.push_back(idx);
385 } else {
386 packet.handle.state = State::Raw;
387 self.free.push_back(idx);
388 }
390 }
391 }
392
393 fn flush(&mut self, mut submit: SubmitInterface) {
394 let max = submit.open_slots();
395 for _ in 0..max {
396 let idx = match self.to_send.pop_front() {
397 Some(idx) => idx,
398 None => break,
399 };
400 let packet = self.buffers.get_mut(idx).unwrap();
401 assert_eq!(packet.handle.state, State::Unsent);
402 packet.io_vec.iov_len = packet.buffer.inner.len();
403 let tag = Tag(idx as u64);
404 unsafe {
405 submit.submit_send(iter::once((packet, tag)));
406 }
407 }
408 }
409}
410
411impl nic::Handle for Handle {
412 fn queue(&mut self) -> Result<(), layer::Error> {
413 self.state = State::Unsent;
414 Ok(())
415 }
416
417 fn info(&self) -> &dyn nic::Info {
418 &self.info
419 }
420}
421
422impl nic::Info for PacketInfo {
423 fn capabilities(&self) -> nic::Capabilities {
424 nic::Capabilities::no_support()
425 }
426
427 fn timestamp(&self) -> ethox::time::Instant {
428 self.timestamp
429 }
430}
431
432impl wire::Payload for PacketBuf {
433 fn payload(&self) -> &wire::payload {
434 <Partial<_> as wire::Payload>::payload(&self.inner)
435 }
436}
437
438impl wire::PayloadMut for PacketBuf {
439 fn payload_mut(&mut self) -> &mut wire::payload {
440 <Partial<_> as wire::PayloadMut>::payload_mut(&mut self.inner)
441 }
442
443 fn resize(&mut self, length: usize) -> Result<(), wire::PayloadError> {
444 <Partial<_> as wire::PayloadMut>::resize(&mut self.inner, length)
445 }
446
447 fn reframe(&mut self, frame: wire::Reframe) -> Result<(), wire::PayloadError> {
448 <Partial<_> as wire::PayloadMut>::reframe(&mut self.inner, frame)
449 }
450}
451