kvarn_tokio_uring/io/
send_to.rs1use crate::buf::BoundedBuf;
2use crate::io::SharedFd;
3use crate::runtime::driver::op::{Completable, CqeResult, Op};
4use crate::runtime::CONTEXT;
5use crate::BufResult;
6use socket2::SockAddr;
7use std::io::IoSlice;
8use std::{boxed::Box, io, net::SocketAddr};
9
10pub(crate) struct SendTo<T> {
11 #[allow(dead_code)]
12 fd: SharedFd,
13 pub(crate) buf: T,
14 #[allow(dead_code)]
15 io_slices: Vec<IoSlice<'static>>,
16 #[allow(dead_code)]
17 socket_addr: Option<Box<SockAddr>>,
18 pub(crate) msghdr: Box<libc::msghdr>,
19}
20
21impl<T: BoundedBuf> Op<SendTo<T>> {
22 pub(crate) fn send_to(
23 fd: &SharedFd,
24 buf: T,
25 socket_addr: Option<SocketAddr>,
26 ) -> io::Result<Op<SendTo<T>>> {
27 use io_uring::{opcode, types};
28
29 let io_slices = vec![IoSlice::new(unsafe {
30 std::slice::from_raw_parts(buf.stable_ptr(), buf.bytes_init())
31 })];
32
33 let mut msghdr: Box<libc::msghdr> = Box::new(unsafe { std::mem::zeroed() });
34 msghdr.msg_iov = io_slices.as_ptr() as *mut _;
35 msghdr.msg_iovlen = io_slices.len() as _;
36
37 let socket_addr = match socket_addr {
38 Some(_socket_addr) => {
39 let socket_addr = Box::new(SockAddr::from(_socket_addr));
40 msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void;
41 msghdr.msg_namelen = socket_addr.len();
42 Some(socket_addr)
43 }
44 None => {
45 msghdr.msg_name = std::ptr::null_mut();
46 msghdr.msg_namelen = 0;
47 None
48 }
49 };
50
51 CONTEXT.with(|x| {
52 x.handle().expect("Not in a runtime context").submit_op(
53 SendTo {
54 fd: fd.clone(),
55 buf,
56 io_slices,
57 socket_addr,
58 msghdr,
59 },
60 |send_to| {
61 opcode::SendMsg::new(
62 types::Fd(send_to.fd.raw_fd()),
63 send_to.msghdr.as_ref() as *const _,
64 )
65 .build()
66 },
67 )
68 })
69 }
70}
71
72impl<T> Completable for SendTo<T> {
73 type Output = BufResult<usize, T>;
74
75 fn complete(self, cqe: CqeResult) -> Self::Output {
76 let res = cqe.result.map(|v| v as usize);
78 let buf = self.buf;
80
81 (res, buf)
82 }
83}