safer_ring/future/io_futures/network_io.rs
1//! Network I/O futures for socket operations.
2
3use std::future::Future;
4use std::io;
5use std::marker::PhantomData;
6use std::os::unix::io::RawFd;
7use std::pin::Pin as StdPin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use super::common::{impl_future_drop, poll_io_operation};
12use crate::future::waker::WakerRegistry;
13use crate::operation::{Operation, Submitted};
14use crate::ring::Ring;
15
16/// Future for socket accept operations.
17///
18/// This future waits for an incoming connection on a listening socket and
19/// returns the new client file descriptor when a connection is accepted.
20/// Unlike read/write operations, accept operations don't require buffer
21/// management, so this future has a simpler interface.
22///
23/// # Type Parameters
24///
25/// * `'ring` - Lifetime of the io_uring Ring instance
26///
27/// # Returns
28///
29/// Returns the raw file descriptor (`RawFd`) of the accepted client connection
30/// on success. The caller is responsible for managing the returned file descriptor.
31///
32/// # Examples
33///
34/// ```rust,ignore
35/// # use safer_ring::{Ring, Operation};
36/// # use std::net::{TcpListener, SocketAddr};
37/// # use std::os::unix::io::AsRawFd;
38/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
39/// let mut ring = Ring::new(32)?;
40/// let listener = TcpListener::bind("127.0.0.1:8080")?;
41/// listener.set_nonblocking(true)?;
42///
43/// let accept_future = ring.accept(listener.as_raw_fd())?;
44/// let client_fd = accept_future.await?;
45///
46/// println!("Accepted client connection: fd {}", client_fd);
47/// # Ok(())
48/// # }
49/// ```
50pub struct AcceptFuture<'ring> {
51 operation: Option<Operation<'ring, 'static, Submitted>>,
52 ring: &'ring mut Ring<'ring>,
53 waker_registry: Arc<WakerRegistry>,
54 // No buffer lifetime needed for accept operations
55 _phantom: PhantomData<&'ring ()>,
56}
57
58/// Future for socket send operations.
59///
60/// This future sends data over a socket connection and returns the number
61/// of bytes sent along with buffer ownership when the operation completes.
62/// The future handles partial sends and error conditions appropriately.
63///
64/// # Type Parameters
65///
66/// * `'ring` - Lifetime of the io_uring Ring instance
67/// * `'buf` - Lifetime of the buffer containing data to send
68///
69/// # Returns
70///
71/// Returns `(usize, Pin<&'buf mut [u8]>)` on success where:
72/// - `usize` is the number of bytes sent
73/// - `Pin<&'buf mut [u8]>` is the buffer that was sent from
74///
75/// # Examples
76///
77/// ```rust,ignore
78/// # use safer_ring::{Ring, Operation, PinnedBuffer};
79/// # use std::net::TcpStream;
80/// # use std::os::unix::io::AsRawFd;
81/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
82/// let mut ring = Ring::new(32)?;
83/// let mut buffer = PinnedBuffer::from_slice(b"Hello, client!");
84/// let stream = TcpStream::connect("127.0.0.1:8080")?;
85/// stream.set_nonblocking(true)?;
86///
87/// let send_future = ring.send(stream.as_raw_fd(), buffer.as_mut_slice())?;
88/// let (bytes_sent, _buffer) = send_future.await?;
89///
90/// println!("Sent {} bytes", bytes_sent);
91/// # Ok(())
92/// # }
93/// ```
94pub struct SendFuture<'ring, 'buf> {
95 operation: Option<Operation<'ring, 'buf, Submitted>>,
96 ring: &'ring mut Ring<'ring>,
97 waker_registry: Arc<WakerRegistry>,
98 _phantom: PhantomData<(&'ring (), &'buf ())>,
99}
100
101/// Future for socket receive operations.
102///
103/// This future receives data from a socket connection and returns the number
104/// of bytes received along with buffer ownership when the operation completes.
105/// The buffer will contain the received data upon successful completion.
106///
107/// # Type Parameters
108///
109/// * `'ring` - Lifetime of the io_uring Ring instance
110/// * `'buf` - Lifetime of the buffer to receive data into
111///
112/// # Returns
113///
114/// Returns `(usize, Pin<&'buf mut [u8]>)` on success where:
115/// - `usize` is the number of bytes received
116/// - `Pin<&'buf mut [u8]>` is the buffer containing the received data
117///
118/// # Examples
119///
120/// ```rust,ignore
121/// # use safer_ring::{Ring, Operation, PinnedBuffer};
122/// # use std::net::TcpStream;
123/// # use std::os::unix::io::AsRawFd;
124/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
125/// let mut ring = Ring::new(32)?;
126/// let mut buffer = PinnedBuffer::with_capacity(1024);
127/// let stream = TcpStream::connect("127.0.0.1:8080")?;
128/// stream.set_nonblocking(true)?;
129///
130/// let recv_future = ring.recv(stream.as_raw_fd(), buffer.as_mut_slice())?;
131/// let (bytes_received, _buffer) = recv_future.await?;
132///
133/// println!("Received {} bytes", bytes_received);
134/// # Ok(())
135/// # }
136/// ```
137pub struct RecvFuture<'ring, 'buf> {
138 operation: Option<Operation<'ring, 'buf, Submitted>>,
139 ring: &'ring mut Ring<'ring>,
140 waker_registry: Arc<WakerRegistry>,
141 _phantom: PhantomData<(&'ring (), &'buf ())>,
142}
143
144impl<'ring> AcceptFuture<'ring> {
145 pub(crate) fn new(
146 operation: Operation<'ring, 'static, Submitted>,
147 ring: &'ring mut Ring<'ring>,
148 waker_registry: Arc<WakerRegistry>,
149 ) -> Self {
150 Self {
151 operation: Some(operation),
152 ring,
153 waker_registry,
154 _phantom: PhantomData,
155 }
156 }
157}
158
159impl<'ring, 'buf> SendFuture<'ring, 'buf> {
160 pub(crate) fn new(
161 operation: Operation<'ring, 'buf, Submitted>,
162 ring: &'ring mut Ring<'ring>,
163 waker_registry: Arc<WakerRegistry>,
164 ) -> Self {
165 Self {
166 operation: Some(operation),
167 ring,
168 waker_registry,
169 _phantom: PhantomData,
170 }
171 }
172}
173
174impl<'ring, 'buf> RecvFuture<'ring, 'buf> {
175 pub(crate) fn new(
176 operation: Operation<'ring, 'buf, Submitted>,
177 ring: &'ring mut Ring<'ring>,
178 waker_registry: Arc<WakerRegistry>,
179 ) -> Self {
180 Self {
181 operation: Some(operation),
182 ring,
183 waker_registry,
184 _phantom: PhantomData,
185 }
186 }
187}
188
189impl<'ring> Future for AcceptFuture<'ring> {
190 type Output = io::Result<RawFd>;
191
192 fn poll(mut self: StdPin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
193 let operation = match self.operation.as_ref() {
194 Some(op) => op,
195 None => {
196 panic!("AcceptFuture polled after completion");
197 }
198 };
199
200 let operation_id = operation.id();
201
202 match self.ring.try_complete_by_id(operation_id) {
203 Ok(Some(result)) => {
204 let _operation = self.operation.take().unwrap();
205 self.waker_registry.remove_waker(operation_id);
206
207 match result {
208 Ok(fd) => {
209 // Accept operations return the new client fd as i32
210 // Negative values indicate errors in io_uring
211 if fd >= 0 {
212 Poll::Ready(Ok(fd))
213 } else {
214 Poll::Ready(Err(io::Error::other(
215 "Accept returned invalid file descriptor",
216 )))
217 }
218 }
219 Err(e) => Poll::Ready(Err(e)),
220 }
221 }
222 Ok(None) => {
223 self.waker_registry
224 .register_waker(operation_id, cx.waker().clone());
225 Poll::Pending
226 }
227 Err(e) => {
228 self.waker_registry.remove_waker(operation_id);
229 Poll::Ready(Err(io::Error::other(format!(
230 "Error checking accept completion: {e}"
231 ))))
232 }
233 }
234 }
235}
236
237impl<'ring, 'buf> Future for SendFuture<'ring, 'buf> {
238 type Output = io::Result<(usize, StdPin<&'buf mut [u8]>)>;
239
240 fn poll(mut self: StdPin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
241 poll_io_operation!(self, cx, "SendFuture")
242 }
243}
244
245impl<'ring, 'buf> Future for RecvFuture<'ring, 'buf> {
246 type Output = io::Result<(usize, StdPin<&'buf mut [u8]>)>;
247
248 fn poll(mut self: StdPin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
249 poll_io_operation!(self, cx, "RecvFuture")
250 }
251}
252
253impl<'ring> Drop for AcceptFuture<'ring> {
254 fn drop(&mut self) {
255 impl_future_drop!(self);
256 }
257}
258
259impl<'ring, 'buf> Drop for SendFuture<'ring, 'buf> {
260 fn drop(&mut self) {
261 impl_future_drop!(self);
262 }
263}
264
265impl<'ring, 'buf> Drop for RecvFuture<'ring, 'buf> {
266 fn drop(&mut self) {
267 impl_future_drop!(self);
268 }
269}