safer_ring/future/io_futures/
network_io.rs

1//! Network I/O futures for socket operations.
2
3use std::future::Future;
4use std::io;
5use std::marker::PhantomData;
6use std::os::unix::io::RawFd;
7use std::pin::Pin as StdPin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use super::common::{impl_future_drop, poll_io_operation};
12use crate::future::waker::WakerRegistry;
13use crate::operation::{Operation, Submitted};
14use crate::ring::Ring;
15
16/// Future for socket accept operations.
17///
18/// This future waits for an incoming connection on a listening socket and
19/// returns the new client file descriptor when a connection is accepted.
20/// Unlike read/write operations, accept operations don't require buffer
21/// management, so this future has a simpler interface.
22///
23/// # Type Parameters
24///
25/// * `'ring` - Lifetime of the io_uring Ring instance
26///
27/// # Returns
28///
29/// Returns the raw file descriptor (`RawFd`) of the accepted client connection
30/// on success. The caller is responsible for managing the returned file descriptor.
31///
32/// # Examples
33///
34/// ```rust,ignore
35/// # use safer_ring::{Ring, Operation};
36/// # use std::net::{TcpListener, SocketAddr};
37/// # use std::os::unix::io::AsRawFd;
38/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
39/// let mut ring = Ring::new(32)?;
40/// let listener = TcpListener::bind("127.0.0.1:8080")?;
41/// listener.set_nonblocking(true)?;
42///
43/// let accept_future = ring.accept(listener.as_raw_fd())?;
44/// let client_fd = accept_future.await?;
45///
46/// println!("Accepted client connection: fd {}", client_fd);
47/// # Ok(())
48/// # }
49/// ```
50pub struct AcceptFuture<'ring> {
51    operation: Option<Operation<'ring, 'static, Submitted>>,
52    ring: &'ring mut Ring<'ring>,
53    waker_registry: Arc<WakerRegistry>,
54    // No buffer lifetime needed for accept operations
55    _phantom: PhantomData<&'ring ()>,
56}
57
58/// Future for socket send operations.
59///
60/// This future sends data over a socket connection and returns the number
61/// of bytes sent along with buffer ownership when the operation completes.
62/// The future handles partial sends and error conditions appropriately.
63///
64/// # Type Parameters
65///
66/// * `'ring` - Lifetime of the io_uring Ring instance
67/// * `'buf` - Lifetime of the buffer containing data to send
68///
69/// # Returns
70///
71/// Returns `(usize, Pin<&'buf mut [u8]>)` on success where:
72/// - `usize` is the number of bytes sent
73/// - `Pin<&'buf mut [u8]>` is the buffer that was sent from
74///
75/// # Examples
76///
77/// ```rust,ignore
78/// # use safer_ring::{Ring, Operation, PinnedBuffer};
79/// # use std::net::TcpStream;
80/// # use std::os::unix::io::AsRawFd;
81/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
82/// let mut ring = Ring::new(32)?;
83/// let mut buffer = PinnedBuffer::from_slice(b"Hello, client!");
84/// let stream = TcpStream::connect("127.0.0.1:8080")?;
85/// stream.set_nonblocking(true)?;
86///
87/// let send_future = ring.send(stream.as_raw_fd(), buffer.as_mut_slice())?;
88/// let (bytes_sent, _buffer) = send_future.await?;
89///
90/// println!("Sent {} bytes", bytes_sent);
91/// # Ok(())
92/// # }
93/// ```
94pub struct SendFuture<'ring, 'buf> {
95    operation: Option<Operation<'ring, 'buf, Submitted>>,
96    ring: &'ring mut Ring<'ring>,
97    waker_registry: Arc<WakerRegistry>,
98    _phantom: PhantomData<(&'ring (), &'buf ())>,
99}
100
101/// Future for socket receive operations.
102///
103/// This future receives data from a socket connection and returns the number
104/// of bytes received along with buffer ownership when the operation completes.
105/// The buffer will contain the received data upon successful completion.
106///
107/// # Type Parameters
108///
109/// * `'ring` - Lifetime of the io_uring Ring instance
110/// * `'buf` - Lifetime of the buffer to receive data into
111///
112/// # Returns
113///
114/// Returns `(usize, Pin<&'buf mut [u8]>)` on success where:
115/// - `usize` is the number of bytes received
116/// - `Pin<&'buf mut [u8]>` is the buffer containing the received data
117///
118/// # Examples
119///
120/// ```rust,ignore
121/// # use safer_ring::{Ring, Operation, PinnedBuffer};
122/// # use std::net::TcpStream;
123/// # use std::os::unix::io::AsRawFd;
124/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
125/// let mut ring = Ring::new(32)?;
126/// let mut buffer = PinnedBuffer::with_capacity(1024);
127/// let stream = TcpStream::connect("127.0.0.1:8080")?;
128/// stream.set_nonblocking(true)?;
129///
130/// let recv_future = ring.recv(stream.as_raw_fd(), buffer.as_mut_slice())?;
131/// let (bytes_received, _buffer) = recv_future.await?;
132///
133/// println!("Received {} bytes", bytes_received);
134/// # Ok(())
135/// # }
136/// ```
137pub struct RecvFuture<'ring, 'buf> {
138    operation: Option<Operation<'ring, 'buf, Submitted>>,
139    ring: &'ring mut Ring<'ring>,
140    waker_registry: Arc<WakerRegistry>,
141    _phantom: PhantomData<(&'ring (), &'buf ())>,
142}
143
144impl<'ring> AcceptFuture<'ring> {
145    pub(crate) fn new(
146        operation: Operation<'ring, 'static, Submitted>,
147        ring: &'ring mut Ring<'ring>,
148        waker_registry: Arc<WakerRegistry>,
149    ) -> Self {
150        Self {
151            operation: Some(operation),
152            ring,
153            waker_registry,
154            _phantom: PhantomData,
155        }
156    }
157}
158
159impl<'ring, 'buf> SendFuture<'ring, 'buf> {
160    pub(crate) fn new(
161        operation: Operation<'ring, 'buf, Submitted>,
162        ring: &'ring mut Ring<'ring>,
163        waker_registry: Arc<WakerRegistry>,
164    ) -> Self {
165        Self {
166            operation: Some(operation),
167            ring,
168            waker_registry,
169            _phantom: PhantomData,
170        }
171    }
172}
173
174impl<'ring, 'buf> RecvFuture<'ring, 'buf> {
175    pub(crate) fn new(
176        operation: Operation<'ring, 'buf, Submitted>,
177        ring: &'ring mut Ring<'ring>,
178        waker_registry: Arc<WakerRegistry>,
179    ) -> Self {
180        Self {
181            operation: Some(operation),
182            ring,
183            waker_registry,
184            _phantom: PhantomData,
185        }
186    }
187}
188
189impl<'ring> Future for AcceptFuture<'ring> {
190    type Output = io::Result<RawFd>;
191
192    fn poll(mut self: StdPin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
193        let operation = match self.operation.as_ref() {
194            Some(op) => op,
195            None => {
196                panic!("AcceptFuture polled after completion");
197            }
198        };
199
200        let operation_id = operation.id();
201
202        match self.ring.try_complete_by_id(operation_id) {
203            Ok(Some(result)) => {
204                let _operation = self.operation.take().unwrap();
205                self.waker_registry.remove_waker(operation_id);
206
207                match result {
208                    Ok(fd) => {
209                        // Accept operations return the new client fd as i32
210                        // Negative values indicate errors in io_uring
211                        if fd >= 0 {
212                            Poll::Ready(Ok(fd))
213                        } else {
214                            Poll::Ready(Err(io::Error::other(
215                                "Accept returned invalid file descriptor",
216                            )))
217                        }
218                    }
219                    Err(e) => Poll::Ready(Err(e)),
220                }
221            }
222            Ok(None) => {
223                self.waker_registry
224                    .register_waker(operation_id, cx.waker().clone());
225                Poll::Pending
226            }
227            Err(e) => {
228                self.waker_registry.remove_waker(operation_id);
229                Poll::Ready(Err(io::Error::other(format!(
230                    "Error checking accept completion: {e}"
231                ))))
232            }
233        }
234    }
235}
236
237impl<'ring, 'buf> Future for SendFuture<'ring, 'buf> {
238    type Output = io::Result<(usize, StdPin<&'buf mut [u8]>)>;
239
240    fn poll(mut self: StdPin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
241        poll_io_operation!(self, cx, "SendFuture")
242    }
243}
244
245impl<'ring, 'buf> Future for RecvFuture<'ring, 'buf> {
246    type Output = io::Result<(usize, StdPin<&'buf mut [u8]>)>;
247
248    fn poll(mut self: StdPin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
249        poll_io_operation!(self, cx, "RecvFuture")
250    }
251}
252
253impl<'ring> Drop for AcceptFuture<'ring> {
254    fn drop(&mut self) {
255        impl_future_drop!(self);
256    }
257}
258
259impl<'ring, 'buf> Drop for SendFuture<'ring, 'buf> {
260    fn drop(&mut self) {
261        impl_future_drop!(self);
262    }
263}
264
265impl<'ring, 'buf> Drop for RecvFuture<'ring, 'buf> {
266    fn drop(&mut self) {
267        impl_future_drop!(self);
268    }
269}