kvarn_tokio_uring/io/
send_to.rs

1use crate::buf::BoundedBuf;
2use crate::io::SharedFd;
3use crate::runtime::driver::op::{Completable, CqeResult, Op};
4use crate::runtime::CONTEXT;
5use crate::BufResult;
6use socket2::SockAddr;
7use std::io::IoSlice;
8use std::{boxed::Box, io, net::SocketAddr};
9
10pub(crate) struct SendTo<T> {
11    #[allow(dead_code)]
12    fd: SharedFd,
13    pub(crate) buf: T,
14    #[allow(dead_code)]
15    io_slices: Vec<IoSlice<'static>>,
16    #[allow(dead_code)]
17    socket_addr: Option<Box<SockAddr>>,
18    pub(crate) msghdr: Box<libc::msghdr>,
19}
20
21impl<T: BoundedBuf> Op<SendTo<T>> {
22    pub(crate) fn send_to(
23        fd: &SharedFd,
24        buf: T,
25        socket_addr: Option<SocketAddr>,
26    ) -> io::Result<Op<SendTo<T>>> {
27        use io_uring::{opcode, types};
28
29        let io_slices = vec![IoSlice::new(unsafe {
30            std::slice::from_raw_parts(buf.stable_ptr(), buf.bytes_init())
31        })];
32
33        let mut msghdr: Box<libc::msghdr> = Box::new(unsafe { std::mem::zeroed() });
34        msghdr.msg_iov = io_slices.as_ptr() as *mut _;
35        msghdr.msg_iovlen = io_slices.len() as _;
36
37        let socket_addr = match socket_addr {
38            Some(_socket_addr) => {
39                let socket_addr = Box::new(SockAddr::from(_socket_addr));
40                msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void;
41                msghdr.msg_namelen = socket_addr.len();
42                Some(socket_addr)
43            }
44            None => {
45                msghdr.msg_name = std::ptr::null_mut();
46                msghdr.msg_namelen = 0;
47                None
48            }
49        };
50
51        CONTEXT.with(|x| {
52            x.handle().expect("Not in a runtime context").submit_op(
53                SendTo {
54                    fd: fd.clone(),
55                    buf,
56                    io_slices,
57                    socket_addr,
58                    msghdr,
59                },
60                |send_to| {
61                    opcode::SendMsg::new(
62                        types::Fd(send_to.fd.raw_fd()),
63                        send_to.msghdr.as_ref() as *const _,
64                    )
65                    .build()
66                },
67            )
68        })
69    }
70}
71
72impl<T> Completable for SendTo<T> {
73    type Output = BufResult<usize, T>;
74
75    fn complete(self, cqe: CqeResult) -> Self::Output {
76        // Convert the operation result to `usize`
77        let res = cqe.result.map(|v| v as usize);
78        // Recover the buffer
79        let buf = self.buf;
80
81        (res, buf)
82    }
83}