ringline/runtime/io.rs
1use std::cell::{Cell, RefCell};
2use std::fmt;
3use std::future::Future;
4use std::io;
5use std::net::SocketAddr;
6use std::pin::Pin;
7use std::rc::Rc;
8use std::task::{Context, Poll};
9use std::time::Duration;
10
11use bytes::Bytes;
12
13use crate::backend::Driver;
14#[cfg(has_io_uring)]
15use crate::completion::{OpTag, UserData};
16use crate::error::TimerExhausted;
17use crate::handler::ConnToken;
18#[cfg(has_io_uring)]
19use crate::runtime::TimerSlotPool;
20use crate::runtime::task::TaskId;
21use crate::runtime::waker::STANDALONE_BIT;
22use crate::runtime::{CURRENT_TASK_ID, Executor, IoResult};
23
24/// Result of a parse closure passed to [`ConnCtx::with_data`] or [`ConnCtx::with_bytes`].
25///
26/// When the closure returns `NeedMore` or `Consumed(0)`, the future parks and
27/// retries when more data arrives. `Consumed(0)` on a non-empty buffer is
28/// treated identically to `NeedMore`. When the connection is closed (EOF),
29/// the `with_data`/`with_bytes` future resolves with `0` regardless of the
30/// parse result.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum ParseResult {
33 /// The closure consumed `n` bytes from the buffer.
34 ///
35 /// `Consumed(0)` on non-empty data is treated as "need more data" — the
36 /// future will park and retry when additional bytes arrive.
37 Consumed(usize),
38 /// The closure needs more data before it can make progress.
39 NeedMore,
40}
41
42/// Raw pointer to the driver + executor state, set before polling each task.
43///
44/// # Safety
45///
46/// This is safe because:
47/// 1. Single-threaded: each worker thread has its own driver/executor.
48/// 2. Scoped: set before poll, cleared after poll. The pointer is only
49/// dereferenced within a Future::poll call.
50/// 3. The pointed-to data lives on the worker thread's stack (in AsyncEventLoop::run).
51pub(crate) struct DriverState {
52 pub(crate) driver: *mut Driver,
53 pub(crate) executor: *mut Executor,
54}
55
56thread_local! {
57 pub(crate) static CURRENT_DRIVER: Cell<*mut DriverState> =
58 const { Cell::new(std::ptr::null_mut()) };
59}
60
61/// Set the thread-local driver pointer before polling a task.
62pub(crate) fn set_driver_state(state: *mut DriverState) {
63 CURRENT_DRIVER.with(|c| c.set(state));
64}
65
66/// Clear the thread-local driver pointer after polling a task.
67pub(crate) fn clear_driver_state() {
68 CURRENT_DRIVER.with(|c| c.set(std::ptr::null_mut()));
69}
70
71/// Access the thread-local driver state. Panics if called outside the executor.
72pub(crate) fn with_state<R>(f: impl FnOnce(&mut Driver, &mut Executor) -> R) -> R {
73 let ptr = CURRENT_DRIVER.with(|c| c.get());
74 assert!(!ptr.is_null(), "called outside executor");
75 let state = unsafe { &mut *ptr };
76 let driver = unsafe { &mut *state.driver };
77 let executor = unsafe { &mut *state.executor };
78 f(driver, executor)
79}
80
81/// Access the thread-local driver state, returning `None` if called outside the executor.
82pub(crate) fn try_with_state<R>(f: impl FnOnce(&mut Driver, &mut Executor) -> R) -> Option<R> {
83 let ptr = CURRENT_DRIVER.with(|c| c.get());
84 if ptr.is_null() {
85 return None;
86 }
87 let state = unsafe { &mut *ptr };
88 let driver = unsafe { &mut *state.driver };
89 let executor = unsafe { &mut *state.executor };
90 Some(f(driver, executor))
91}
92
93/// Spawn a standalone async task on the current worker thread.
94///
95/// Unlike connection tasks (which are 1:1 with connections), standalone tasks
96/// are not bound to any connection. They run on the same single-threaded
97/// executor and can use [`sleep()`](crate::sleep) and [`timeout()`](crate::timeout),
98/// but cannot perform connection I/O directly.
99///
100/// Returns `Err` if called outside the ringline async executor or if the
101/// standalone task slab is full.
102pub fn spawn(future: impl Future<Output = ()> + 'static) -> io::Result<TaskId> {
103 try_with_state(
104 |_driver, executor| match executor.standalone_slab.spawn(Box::pin(future)) {
105 Some(idx) => {
106 executor.ready_queue.push_back(idx | STANDALONE_BIT);
107 Ok(TaskId(idx))
108 }
109 None => Err(io::Error::other("standalone task slab exhausted")),
110 },
111 )
112 .unwrap_or_else(|| Err(io::Error::other("called outside executor")))
113}
114
115impl TaskId {
116 /// Cancel a standalone task. Drops the future immediately, freeing
117 /// the slab slot. No-op if the task already completed.
118 ///
119 /// Must be called from within the ringline executor (i.e., from a
120 /// connection task or standalone task). Panics otherwise.
121 ///
122 /// Any pending timers owned by the dropped future are cancelled
123 /// via their `Drop` impl. Stale entries in the ready queue are
124 /// silently skipped when the executor encounters them.
125 pub fn cancel(self) {
126 with_state(|_driver, executor| {
127 executor.standalone_slab.remove(self.0);
128 });
129 }
130}
131
132// ── JoinHandle ──────────────────────────────────────────────────────
133
134/// Shared state between a spawned wrapper future and its [`JoinHandle`].
135struct JoinState<T> {
136 /// The task's return value, written by the wrapper when it completes.
137 result: Option<T>,
138 /// Raw task ID of the task awaiting this handle (includes `STANDALONE_BIT`
139 /// for standalone tasks). Set by `JoinHandle::poll` when it returns `Pending`.
140 waiter: Option<u32>,
141 /// True if [`JoinHandle::abort`] was called.
142 aborted: bool,
143}
144
145/// Handle to a spawned task's return value.
146///
147/// Obtained from [`spawn_with_handle()`]. Implements [`Future`] — awaiting it
148/// yields the task's return value `T` once the task completes.
149///
150/// # Drop semantics
151///
152/// Dropping a `JoinHandle` without awaiting it **detaches** the task: the
153/// task continues running but its result is discarded. This matches tokio's
154/// semantics.
155///
156/// # Abort
157///
158/// [`abort()`](Self::abort) cancels the spawned task. A `JoinHandle` that has
159/// been aborted will never resolve if polled.
160pub struct JoinHandle<T> {
161 state: Rc<RefCell<JoinState<T>>>,
162 task_id: TaskId,
163}
164
165impl<T> JoinHandle<T> {
166 /// Get the underlying [`TaskId`].
167 pub fn id(&self) -> TaskId {
168 self.task_id
169 }
170
171 /// Cancel the spawned task.
172 ///
173 /// The future is dropped immediately and its slab slot is freed.
174 /// After this call, awaiting the handle will hang forever — use
175 /// [`select()`](crate::select) with a flag if you need to detect
176 /// cancellation.
177 pub fn abort(&self) {
178 self.state.borrow_mut().aborted = true;
179 self.task_id.cancel();
180 }
181}
182
183impl<T: 'static> Future for JoinHandle<T> {
184 type Output = T;
185
186 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
187 let mut s = self.state.borrow_mut();
188 if s.aborted {
189 return Poll::Pending;
190 }
191 if let Some(value) = s.result.take() {
192 return Poll::Ready(value);
193 }
194 // Register this task as the waiter so the child can wake us.
195 s.waiter = Some(CURRENT_TASK_ID.with(|c| c.get()));
196 Poll::Pending
197 }
198}
199
200/// Spawn a standalone async task and return a handle to await its result.
201///
202/// Like [`spawn()`], the future runs on the current worker's single-threaded
203/// executor. The returned [`JoinHandle<T>`] implements [`Future<Output = T>`] —
204/// awaiting it yields the task's return value once it completes.
205///
206/// # Detach semantics
207///
208/// Dropping the handle without awaiting it detaches the task: the task keeps
209/// running but its result is silently discarded.
210///
211/// # Errors
212///
213/// Returns `Err` if called outside the ringline executor or if the standalone
214/// task slab is exhausted.
215///
216/// # Panics in the spawned task
217///
218/// A panic in the spawned future unwinds the worker thread (same as [`spawn()`]).
219pub fn spawn_with_handle<T: 'static>(
220 future: impl Future<Output = T> + 'static,
221) -> io::Result<JoinHandle<T>> {
222 let state = Rc::new(RefCell::new(JoinState {
223 result: None,
224 waiter: None,
225 aborted: false,
226 }));
227 let state_for_wrapper = Rc::clone(&state);
228
229 let wrapper = async move {
230 let value = future.await;
231 let mut s = state_for_wrapper.borrow_mut();
232 s.result = Some(value);
233 let waiter = s.waiter.take();
234 // Drop the borrow before calling with_state — defensive against
235 // any re-entrant borrow in the wakeup path.
236 drop(s);
237 if let Some(waiter_id) = waiter {
238 with_state(|_driver, executor| {
239 executor.wake_task(waiter_id);
240 });
241 }
242 };
243
244 try_with_state(
245 |_driver, executor| match executor.standalone_slab.spawn(Box::pin(wrapper)) {
246 Some(idx) => {
247 executor.ready_queue.push_back(idx | STANDALONE_BIT);
248 Ok(JoinHandle {
249 state: Rc::clone(&state),
250 task_id: TaskId(idx),
251 })
252 }
253 None => Err(io::Error::other("standalone task slab exhausted")),
254 },
255 )
256 .unwrap_or_else(|| Err(io::Error::other("called outside executor")))
257}
258
259/// Offload a blocking closure to the dedicated blocking thread pool.
260///
261/// The closure runs on a low-priority background thread (`SCHED_IDLE`),
262/// keeping the io_uring event loop unblocked. Returns a future that
263/// resolves to the closure's return value.
264///
265/// # Errors
266///
267/// Returns `Err` if called outside the ringline executor or if the blocking
268/// pool is not configured (`blocking_threads = 0`).
269pub fn spawn_blocking<T: Send + 'static>(
270 f: impl FnOnce() -> T + Send + 'static,
271) -> io::Result<BlockingJoinHandle<T>> {
272 try_with_state(|driver, executor| {
273 let pool = driver
274 .blocking_pool
275 .as_ref()
276 .ok_or_else(|| io::Error::other("blocking pool not configured"))?;
277 let blocking_tx = driver
278 .blocking_tx
279 .as_ref()
280 .ok_or_else(|| io::Error::other("blocking pool not configured"))?;
281
282 let request_id = executor.next_blocking_id;
283 executor.next_blocking_id += 1;
284
285 let task_id = CURRENT_TASK_ID.with(|c| c.get());
286 executor
287 .pending_blocking
288 .insert(request_id, (task_id, None));
289
290 let work = Box::new(move || -> Box<dyn std::any::Any + Send> { Box::new(f()) });
291
292 pool.request_tx
293 .send(crate::blocking::BlockingRequest {
294 work,
295 request_id,
296 response_tx: blocking_tx.clone(),
297 wake_handle: driver.wake_handle,
298 })
299 .map_err(|_| io::Error::other("blocking pool shut down"))?;
300
301 Ok(BlockingJoinHandle {
302 request_id,
303 _phantom: std::marker::PhantomData,
304 })
305 })
306 .unwrap_or_else(|| Err(io::Error::other("called outside executor")))
307}
308
309/// Future returned by [`spawn_blocking()`]. Resolves to the closure's return value.
310pub struct BlockingJoinHandle<T> {
311 request_id: u64,
312 _phantom: std::marker::PhantomData<T>,
313}
314
315impl<T: 'static> Future for BlockingJoinHandle<T> {
316 type Output = T;
317
318 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
319 with_state(|_driver, executor| {
320 if let Some((_, slot)) = executor.pending_blocking.get_mut(&self.request_id)
321 && let Some(boxed) = slot.take()
322 {
323 executor.pending_blocking.remove(&self.request_id);
324 let value = *boxed
325 .downcast::<T>()
326 .expect("type mismatch in BlockingJoinHandle");
327 return Poll::Ready(value);
328 }
329 Poll::Pending
330 })
331 }
332}
333
334/// Initiate an outbound TCP connection from any async task (connection or standalone).
335///
336/// This is the free-function equivalent of [`ConnCtx::connect()`] — it can be
337/// called from standalone tasks spawned via [`spawn()`] or from an
338/// [`AsyncEventHandler::on_start()`](crate::AsyncEventHandler::on_start) future,
339/// where no `ConnCtx` is available.
340///
341/// Returns a [`ConnectFuture`] that resolves with a [`ConnCtx`] for the new connection.
342///
343/// # Panics
344///
345/// Panics if called outside the ringline async executor.
346pub fn connect(addr: SocketAddr) -> io::Result<ConnectFuture> {
347 with_state(|driver, executor| {
348 let mut ctx = driver.make_ctx();
349 let token = ctx
350 .connect(addr)
351 .map_err(|e| io::Error::other(e.to_string()))?;
352 let calling_task = CURRENT_TASK_ID.with(|c| c.get());
353 executor.owner_task[token.index as usize] = Some(calling_task);
354 executor.connect_waiters[token.index as usize] = true;
355 Ok(ConnectFuture {
356 conn_index: token.index,
357 generation: token.generation,
358 })
359 })
360}
361
362/// Initiate an outbound TCP connection with a timeout from any async task.
363///
364/// Free-function equivalent of [`ConnCtx::connect_with_timeout()`].
365///
366/// # Panics
367///
368/// Panics if called outside the ringline async executor.
369pub fn connect_with_timeout(addr: SocketAddr, timeout_ms: u64) -> io::Result<ConnectFuture> {
370 with_state(|driver, executor| {
371 let mut ctx = driver.make_ctx();
372 let token = ctx
373 .connect_with_timeout(addr, timeout_ms)
374 .map_err(|e| io::Error::other(e.to_string()))?;
375 let calling_task = CURRENT_TASK_ID.with(|c| c.get());
376 executor.owner_task[token.index as usize] = Some(calling_task);
377 executor.connect_waiters[token.index as usize] = true;
378 Ok(ConnectFuture {
379 conn_index: token.index,
380 generation: token.generation,
381 })
382 })
383}
384
385/// Initiate an outbound Unix domain socket connection from any async task.
386///
387/// Free-function equivalent of [`ConnCtx::connect_unix()`].
388///
389/// Returns a [`ConnectFuture`] that resolves with a [`ConnCtx`] for the new connection.
390///
391/// # Panics
392///
393/// Panics if called outside the ringline async executor.
394pub fn connect_unix(path: impl AsRef<std::path::Path>) -> io::Result<ConnectFuture> {
395 with_state(|driver, executor| {
396 let mut ctx = driver.make_ctx();
397 let token = ctx
398 .connect_unix(path.as_ref())
399 .map_err(|e| io::Error::other(e.to_string()))?;
400 let calling_task = CURRENT_TASK_ID.with(|c| c.get());
401 executor.owner_task[token.index as usize] = Some(calling_task);
402 executor.connect_waiters[token.index as usize] = true;
403 Ok(ConnectFuture {
404 conn_index: token.index,
405 generation: token.generation,
406 })
407 })
408}
409
410/// Initiate an outbound TLS connection from any async task (connection or standalone).
411///
412/// This is the free-function equivalent of [`ConnCtx::connect_tls()`] — it can be
413/// called from standalone tasks spawned via [`spawn()`] or from an
414/// [`AsyncEventHandler::on_start()`](crate::AsyncEventHandler::on_start) future,
415/// where no `ConnCtx` is available.
416///
417/// `server_name` is the SNI hostname for the TLS handshake.
418///
419/// Returns a [`ConnectFuture`] that resolves with a [`ConnCtx`] for the new connection
420/// once both the TCP and TLS handshakes complete.
421///
422/// # Panics
423///
424/// Panics if called outside the ringline async executor.
425pub fn connect_tls(addr: SocketAddr, server_name: &str) -> io::Result<ConnectFuture> {
426 with_state(|driver, executor| {
427 let mut ctx = driver.make_ctx();
428 let token = ctx
429 .connect_tls(addr, server_name)
430 .map_err(|e| io::Error::other(e.to_string()))?;
431 let calling_task = CURRENT_TASK_ID.with(|c| c.get());
432 executor.owner_task[token.index as usize] = Some(calling_task);
433 executor.connect_waiters[token.index as usize] = true;
434 Ok(ConnectFuture {
435 conn_index: token.index,
436 generation: token.generation,
437 })
438 })
439}
440
441/// Initiate an outbound TLS connection with a timeout from any async task.
442///
443/// Free-function equivalent of [`ConnCtx::connect_tls_with_timeout()`].
444///
445/// `server_name` is the SNI hostname for the TLS handshake.
446///
447/// # Panics
448///
449/// Panics if called outside the ringline async executor.
450pub fn connect_tls_with_timeout(
451 addr: SocketAddr,
452 server_name: &str,
453 timeout_ms: u64,
454) -> io::Result<ConnectFuture> {
455 with_state(|driver, executor| {
456 let mut ctx = driver.make_ctx();
457 let token = ctx
458 .connect_tls_with_timeout(addr, server_name, timeout_ms)
459 .map_err(|e| io::Error::other(e.to_string()))?;
460 let calling_task = CURRENT_TASK_ID.with(|c| c.get());
461 executor.owner_task[token.index as usize] = Some(calling_task);
462 executor.connect_waiters[token.index as usize] = true;
463 Ok(ConnectFuture {
464 conn_index: token.index,
465 generation: token.generation,
466 })
467 })
468}
469
470// ── DNS Resolution ──────────────────────────────────────────────────
471
472/// Resolve a hostname to a [`SocketAddr`] using the dedicated resolver pool.
473///
474/// Performs `getaddrinfo` on a background thread, keeping the io_uring event
475/// loop unblocked. Returns the first resolved address.
476///
477/// # Errors
478///
479/// Returns `Err` if called outside the ringline executor, the resolver pool
480/// is not configured (`resolver_threads = 0`), or `getaddrinfo` fails.
481pub fn resolve(host: &str, port: u16) -> io::Result<ResolveFuture> {
482 let host = host.to_string();
483 try_with_state(|driver, executor| {
484 let resolver = driver
485 .resolver
486 .as_ref()
487 .ok_or_else(|| io::Error::other("resolver pool not configured"))?;
488 let resolve_tx = driver
489 .resolve_tx
490 .as_ref()
491 .ok_or_else(|| io::Error::other("resolver pool not configured"))?;
492
493 let request_id = executor.next_resolve_id;
494 executor.next_resolve_id += 1;
495
496 let task_id = CURRENT_TASK_ID.with(|c| c.get());
497 executor
498 .pending_resolves
499 .insert(request_id, (task_id, None));
500
501 resolver
502 .request_tx
503 .send(crate::resolver::ResolveRequest {
504 host,
505 port,
506 request_id,
507 response_tx: resolve_tx.clone(),
508 wake_handle: driver.wake_handle,
509 })
510 .map_err(|_| io::Error::other("resolver pool shut down"))?;
511
512 Ok(ResolveFuture { request_id })
513 })
514 .unwrap_or_else(|| Err(io::Error::other("called outside executor")))
515}
516
517/// Future returned by [`resolve()`]. Resolves to a [`SocketAddr`].
518pub struct ResolveFuture {
519 request_id: u64,
520}
521
522impl Future for ResolveFuture {
523 type Output = io::Result<std::net::SocketAddr>;
524
525 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
526 with_state(|_driver, executor| {
527 if let Some((_, slot)) = executor.pending_resolves.get_mut(&self.request_id)
528 && let Some(result) = slot.take()
529 {
530 executor.pending_resolves.remove(&self.request_id);
531 return Poll::Ready(result);
532 }
533 Poll::Pending
534 })
535 }
536}
537
538/// Request graceful shutdown of the worker event loop from any async task.
539///
540/// This is the free-function equivalent of [`ConnCtx::request_shutdown()`] —
541/// it can be called from standalone tasks or [`AsyncEventHandler::on_start()`](crate::AsyncEventHandler::on_start)
542/// futures where no `ConnCtx` is available.
543///
544/// Returns `Err` if called outside the ringline async executor.
545pub fn request_shutdown() -> io::Result<()> {
546 try_with_state(|driver, _| {
547 let mut ctx = driver.make_ctx();
548 ctx.request_shutdown();
549 })
550 .ok_or_else(|| io::Error::other("called outside executor"))
551}
552
553/// The async equivalent of `ConnToken` + `DriverCtx`. Passed to the
554/// connection's async fn, provides I/O methods.
555///
556/// Async connection context providing send, recv, and connect operations.
557///
558/// Each accepted connection receives a `ConnCtx` in [`AsyncEventHandler::on_accept`](crate::AsyncEventHandler::on_accept).
559/// It exposes an async API for reading data ([`with_data`](Self::with_data),
560/// [`with_bytes`](Self::with_bytes)), sending data ([`send`](Self::send),
561/// [`send_nowait`](Self::send_nowait)), and initiating outbound connections
562/// ([`connect`](Self::connect)).
563///
564/// A `ConnCtx` is valid for the lifetime of the connection's async task.
565/// When the connection is closed, the task is dropped along with the `ConnCtx`.
566#[derive(Clone, Copy)]
567pub struct ConnCtx {
568 pub(crate) conn_index: u32,
569 pub(crate) generation: u32,
570}
571
572impl ConnCtx {
573 /// Create a new ConnCtx for the given connection.
574 pub(crate) fn new(conn_index: u32, generation: u32) -> Self {
575 ConnCtx {
576 conn_index,
577 generation,
578 }
579 }
580
581 /// Returns the connection slot index. Useful for indexing into per-connection arrays.
582 pub fn index(&self) -> usize {
583 self.conn_index as usize
584 }
585
586 /// Returns the `ConnToken` for this connection.
587 pub fn token(&self) -> ConnToken {
588 ConnToken::new(self.conn_index, self.generation)
589 }
590
591 // ── Recv ─────────────────────────────────────────────────────────
592
593 /// Wait until recv data is available, then process it.
594 ///
595 /// The closure receives accumulated bytes and returns a [`ParseResult`]:
596 /// - `ParseResult::Consumed(n)` — `n` bytes were consumed from the buffer.
597 /// - `ParseResult::NeedMore` — the closure needs more data before making progress.
598 ///
599 /// Resolves immediately when data is already buffered (cache-hit hot path).
600 ///
601 /// If the closure returns `NeedMore` or `Consumed(0)` on non-empty data
602 /// (incomplete parse), the future parks and retries when more data arrives.
603 /// The closure must therefore be safe to call multiple times (`FnMut`).
604 pub fn with_data<F: FnMut(&[u8]) -> ParseResult>(&self, f: F) -> WithDataFuture<F> {
605 WithDataFuture {
606 conn_index: self.conn_index,
607 f: Some(f),
608 }
609 }
610
611 /// Wait until recv data is available, then provide it as zero-copy `Bytes`.
612 ///
613 /// Like [`with_data()`](Self::with_data), but the closure receives a `Bytes`
614 /// handle that can be sliced (O(1), refcounted) instead of copied. The
615 /// closure returns a [`ParseResult`] indicating bytes consumed.
616 ///
617 /// This enables zero-copy RESP parsing: the parser can call `bytes.slice()`
618 /// to extract sub-ranges without allocating.
619 pub fn with_bytes<F: FnMut(Bytes) -> ParseResult>(&self, f: F) -> WithBytesFuture<F> {
620 WithBytesFuture {
621 conn_index: self.conn_index,
622 f: Some(f),
623 }
624 }
625
626 /// Install a recv sink so that CQE data is written directly to the
627 /// target buffer instead of the per-connection accumulator.
628 ///
629 /// # Safety
630 ///
631 /// The caller must ensure that `target` points to writable memory of at
632 /// least `len` bytes, and that the memory remains valid until
633 /// [`take_recv_sink()`](Self::take_recv_sink) is called. In practice this
634 /// is guaranteed because ringline is single-threaded: the task sets the sink,
635 /// yields, and the CQE handler (same thread) writes to it before the task
636 /// resumes and clears the sink.
637 pub unsafe fn set_recv_sink(&self, target: *mut u8, len: usize) {
638 with_state(|_driver, executor| {
639 executor.recv_sinks[self.conn_index as usize] = Some(crate::runtime::RecvSink {
640 ptr: target,
641 cap: len,
642 pos: 0,
643 });
644 });
645 }
646
647 /// Remove the recv sink and return the number of bytes written to it.
648 /// Returns 0 if no sink was active.
649 pub fn take_recv_sink(&self) -> usize {
650 with_state(
651 |_driver, executor| match executor.recv_sinks[self.conn_index as usize].take() {
652 Some(sink) => sink.pos,
653 None => 0,
654 },
655 )
656 }
657
658 /// Returns a future that becomes ready when any recv data is available
659 /// (in the accumulator, recv sink, or connection is closed).
660 ///
661 /// Use this with [`set_recv_sink()`](Self::set_recv_sink) to wait for
662 /// direct-to-buffer writes without processing accumulator data.
663 pub fn recv_ready(&self) -> RecvReadyFuture {
664 RecvReadyFuture {
665 conn_index: self.conn_index,
666 }
667 }
668
669 /// Non-blocking accumulator access. Calls `f` with buffered data if any,
670 /// returning `Some(result)`. Returns `None` if the accumulator is empty.
671 pub fn try_with_data<F: FnOnce(&[u8]) -> ParseResult>(&self, f: F) -> Option<ParseResult> {
672 with_state(|driver, _executor| {
673 let data = driver.accumulators.data(self.conn_index);
674 if data.is_empty() {
675 return None;
676 }
677 let result = f(data);
678 if let ParseResult::Consumed(consumed) = result {
679 driver.accumulators.consume(self.conn_index, consumed);
680 }
681 Some(result)
682 })
683 }
684
685 // ── Send (synchronous / fire-and-forget) ─────────────────────────
686
687 /// Fire-and-forget send: copies data into the send pool and submits the SQE.
688 /// One copy, no heap allocation, no future.
689 ///
690 /// This is the hot-path send for cache responses. The CQE is handled
691 /// internally by the executor (resource cleanup + send queue advancement).
692 ///
693 /// # Errors
694 ///
695 /// Returns `Err` if the send copy pool is exhausted or the submission queue is full.
696 ///
697 /// For backpressure-aware sending, use [`send()`](Self::send) instead.
698 pub fn send_nowait(&self, data: &[u8]) -> io::Result<()> {
699 with_state(|driver, _| {
700 let mut ctx = driver.make_ctx();
701 ctx.send(self.token(), data)
702 })
703 }
704
705 /// Forward the current pending recv buffer as a zero-copy send.
706 ///
707 /// This is intended for use inside a `with_data` closure. If the connection
708 /// has a pending recv buffer (from the zero-copy recv path), it is taken and
709 /// used as the send source directly — no copy into the send pool. The recv
710 /// buffer is replenished when the send completes.
711 ///
712 /// Falls back to `send_nowait(data)` if there is no pending recv buffer
713 /// (e.g., data came from the accumulator or a TLS connection).
714 ///
715 /// Only works for plaintext connections; TLS connections always copy.
716 /// On the mio backend, this always uses the copy path.
717 pub fn forward_recv_buf(&self, data: &[u8]) -> io::Result<()> {
718 with_state(|driver, _| {
719 #[cfg_attr(not(has_io_uring), allow(unused_variables))]
720 let conn_index = self.conn_index;
721
722 #[cfg(has_io_uring)]
723 {
724 // Check for pending recv buffer.
725 if let Some(pending) = driver.pending_recv_bufs[conn_index as usize].take() {
726 // Verify the data pointer matches the pending buffer (sanity check).
727 let pending_ptr = pending.ptr;
728 let data_ptr = data.as_ptr();
729 if data_ptr == pending_ptr && data.len() == pending.len as usize {
730 // Submit send SQE from the recv buffer. The bid is replenished
731 // on send completion via handle_send_recv_buf.
732 // Payload: bid in low 16 bits, remaining_len in high 16 bits.
733 debug_assert!(
734 pending.len <= 0xFFFF,
735 "forward_recv_buf: data length {} exceeds 16-bit payload capacity",
736 pending.len,
737 );
738 let payload = (pending.bid as u32) | ((pending.len) << 16);
739 // Store original data length for correct offset on partial sends.
740 driver.send_recv_buf_original_lens[conn_index as usize] = pending.len;
741 let user_data = crate::completion::UserData::encode(
742 crate::completion::OpTag::SendRecvBuf,
743 conn_index,
744 payload,
745 );
746 let entry = io_uring::opcode::Send::new(
747 io_uring::types::Fixed(conn_index),
748 pending_ptr,
749 pending.len,
750 )
751 .build()
752 .user_data(user_data.raw());
753
754 let built = crate::handler::BuiltSend {
755 entry,
756 pool_slot: u16::MAX,
757 #[cfg(has_io_uring)]
758 slab_idx: u16::MAX,
759 total_len: pending.len,
760 };
761
762 let result = driver.submit_or_queue_send(conn_index, built);
763 if result.is_err() {
764 // Submit failed — replenish the recv buffer.
765 driver.pending_replenish.push(pending.bid);
766 }
767 return result;
768 }
769
770 // Pointer mismatch �� put it back and fall through to copy path.
771 driver.pending_recv_bufs[conn_index as usize] = Some(pending);
772 }
773 }
774
775 // No pending recv buffer (or mio backend) — fall back to copy send.
776 let mut ctx = driver.make_ctx();
777 ctx.send(self.token(), data)
778 })
779 }
780
781 /// Begin building a scatter-gather send with mixed copy + zero-copy guard parts.
782 ///
783 /// This mirrors `DriverCtx::send_parts()` — use `.copy(data)` for copied parts
784 /// and `.guard(guard)` for zero-copy parts backed by `SendGuard`. Call `.submit()`
785 /// to submit the SQE. Fire-and-forget: no future returned.
786 #[cfg(has_io_uring)]
787 pub fn send_parts(&self) -> AsyncSendBuilder {
788 AsyncSendBuilder {
789 token: self.token(),
790 }
791 }
792
793 // ── Send (awaitable) ─────────────────────────────────────────────
794
795 /// Send data and await completion. Copies data into the send pool, submits
796 /// the SQE eagerly, then returns a future that resolves with the total bytes
797 /// sent (or error).
798 ///
799 /// Use this when you need backpressure or send completion notification.
800 /// For fire-and-forget sending, use [`send_nowait()`](Self::send_nowait).
801 ///
802 /// # Errors
803 ///
804 /// Returns `Err` if the send copy pool is exhausted or the submission queue is full.
805 pub fn send(&self, data: &[u8]) -> io::Result<SendFuture> {
806 with_state(|driver, executor| {
807 let mut ctx = driver.make_ctx();
808 ctx.send(self.token(), data)?;
809 executor.send_waiters[self.conn_index as usize] = true;
810 // On mio, the send is buffered synchronously — record a pending
811 // completion so the event loop can deliver wake_send for each
812 // individual awaitable send.
813 #[cfg(not(has_io_uring))]
814 {
815 driver.send_completions[self.conn_index as usize].push_back(data.len() as u32);
816 }
817 Ok(SendFuture {
818 conn_index: self.conn_index,
819 })
820 })
821 }
822
823 // ── Connect ──────────────────────────────────────────────────────
824
825 /// Initiate an outbound TCP connection and await the result.
826 ///
827 /// Returns a new `ConnCtx` for the peer connection on success.
828 pub fn connect(&self, addr: SocketAddr) -> io::Result<ConnectFuture> {
829 with_state(|driver, executor| {
830 let mut ctx = driver.make_ctx();
831 let token = ctx
832 .connect(addr)
833 .map_err(|e| io::Error::other(e.to_string()))?;
834 let calling_task = CURRENT_TASK_ID.with(|c| c.get());
835 executor.owner_task[token.index as usize] = Some(calling_task);
836 executor.connect_waiters[token.index as usize] = true;
837 Ok(ConnectFuture {
838 conn_index: token.index,
839 generation: token.generation,
840 })
841 })
842 }
843
844 /// Initiate an outbound TCP connection with a timeout and await the result.
845 pub fn connect_with_timeout(
846 &self,
847 addr: SocketAddr,
848 timeout_ms: u64,
849 ) -> io::Result<ConnectFuture> {
850 with_state(|driver, executor| {
851 let mut ctx = driver.make_ctx();
852 let token = ctx
853 .connect_with_timeout(addr, timeout_ms)
854 .map_err(|e| io::Error::other(e.to_string()))?;
855 let calling_task = CURRENT_TASK_ID.with(|c| c.get());
856 executor.owner_task[token.index as usize] = Some(calling_task);
857 executor.connect_waiters[token.index as usize] = true;
858 Ok(ConnectFuture {
859 conn_index: token.index,
860 generation: token.generation,
861 })
862 })
863 }
864
865 /// Initiate an outbound Unix domain socket connection and await the result.
866 ///
867 /// Returns a new `ConnCtx` for the peer connection on success.
868 pub fn connect_unix(&self, path: impl AsRef<std::path::Path>) -> io::Result<ConnectFuture> {
869 with_state(|driver, executor| {
870 let mut ctx = driver.make_ctx();
871 let token = ctx
872 .connect_unix(path.as_ref())
873 .map_err(|e| io::Error::other(e.to_string()))?;
874 let calling_task = CURRENT_TASK_ID.with(|c| c.get());
875 executor.owner_task[token.index as usize] = Some(calling_task);
876 executor.connect_waiters[token.index as usize] = true;
877 Ok(ConnectFuture {
878 conn_index: token.index,
879 generation: token.generation,
880 })
881 })
882 }
883
884 // ── Send chain ────────────────────────────────────────────────────
885
886 /// Build an IO_LINK chained send on this connection (fire-and-forget).
887 ///
888 /// The closure receives a [`SendChainBuilder`](crate::handler::SendChainBuilder) for
889 /// constructing linked SQEs. Call `.copy()`, `.parts()...add()` to add SQEs,
890 /// then `.finish()` to submit the chain.
891 ///
892 /// For backpressure-aware chained sending, use [`send_chain()`](Self::send_chain).
893 #[cfg(has_io_uring)]
894 pub fn send_chain_nowait<F, R>(&self, f: F) -> R
895 where
896 F: FnOnce(crate::handler::SendChainBuilder<'_, '_>) -> R,
897 {
898 with_state(|driver, _| {
899 let mut ctx = driver.make_ctx();
900 let token = ConnToken::new(self.conn_index, self.generation);
901 let builder = ctx.send_chain(token);
902 f(builder)
903 })
904 }
905
906 /// Build an IO_LINK chained send and await completion.
907 ///
908 /// The closure receives a [`SendChainBuilder`](crate::handler::SendChainBuilder) for
909 /// constructing linked SQEs. Call `.copy()`, `.parts()...add()` to build the
910 /// chain, then `.finish()` to submit it. Returns a [`SendFuture`] that
911 /// resolves with total bytes sent.
912 ///
913 /// For fire-and-forget chained sending, use [`send_chain_nowait()`](Self::send_chain_nowait).
914 #[cfg(has_io_uring)]
915 pub fn send_chain<F>(&self, f: F) -> io::Result<SendFuture>
916 where
917 F: FnOnce(crate::handler::SendChainBuilder<'_, '_>) -> io::Result<()>,
918 {
919 with_state(|driver, executor| {
920 let mut ctx = driver.make_ctx();
921 let token = ConnToken::new(self.conn_index, self.generation);
922 let builder = ctx.send_chain(token);
923 f(builder)?;
924 executor.send_waiters[self.conn_index as usize] = true;
925 Ok(SendFuture {
926 conn_index: self.conn_index,
927 })
928 })
929 }
930
931 // ── Shutdown / cancel ─────────────────────────────────────────────
932
933 /// Shutdown the write side of the connection (half-close).
934 ///
935 /// Sends a TCP FIN to the peer. The read side remains open.
936 pub fn shutdown_write(&self) {
937 with_state(|driver, _| {
938 let mut ctx = driver.make_ctx();
939 ctx.shutdown_write(self.token());
940 })
941 }
942
943 /// Cancel pending I/O operations on this connection.
944 pub fn cancel(&self) -> io::Result<()> {
945 with_state(|driver, _| {
946 let mut ctx = driver.make_ctx();
947 ctx.cancel(self.token())
948 })
949 }
950
951 /// Request graceful shutdown of the worker event loop.
952 pub fn request_shutdown(&self) {
953 with_state(|driver, _| {
954 let mut ctx = driver.make_ctx();
955 ctx.request_shutdown();
956 })
957 }
958
959 // ── TLS ──────────────────────────────────────────────────────────
960
961 /// Query TLS session info for this connection.
962 pub fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
963 with_state(|driver, _| {
964 let ctx = driver.make_ctx();
965 ctx.tls_info(self.token())
966 })
967 }
968
969 /// Initiate an outbound TLS connection and await the result.
970 pub fn connect_tls(&self, addr: SocketAddr, server_name: &str) -> io::Result<ConnectFuture> {
971 with_state(|driver, executor| {
972 let mut ctx = driver.make_ctx();
973 let token = ctx
974 .connect_tls(addr, server_name)
975 .map_err(|e| io::Error::other(e.to_string()))?;
976 let calling_task = CURRENT_TASK_ID.with(|c| c.get());
977 executor.owner_task[token.index as usize] = Some(calling_task);
978 executor.connect_waiters[token.index as usize] = true;
979 Ok(ConnectFuture {
980 conn_index: token.index,
981 generation: token.generation,
982 })
983 })
984 }
985
986 /// Initiate an outbound TLS connection with a timeout and await the result.
987 pub fn connect_tls_with_timeout(
988 &self,
989 addr: SocketAddr,
990 server_name: &str,
991 timeout_ms: u64,
992 ) -> io::Result<ConnectFuture> {
993 with_state(|driver, executor| {
994 let mut ctx = driver.make_ctx();
995 let token = ctx
996 .connect_tls_with_timeout(addr, server_name, timeout_ms)
997 .map_err(|e| io::Error::other(e.to_string()))?;
998 let calling_task = CURRENT_TASK_ID.with(|c| c.get());
999 executor.owner_task[token.index as usize] = Some(calling_task);
1000 executor.connect_waiters[token.index as usize] = true;
1001 Ok(ConnectFuture {
1002 conn_index: token.index,
1003 generation: token.generation,
1004 })
1005 })
1006 }
1007
1008 // ── Timestamps ─────────────────────────────────────────────────
1009
1010 /// Returns the most recent kernel RX software timestamp as nanoseconds
1011 /// since epoch (`CLOCK_REALTIME`), or 0 if no timestamp has been received.
1012 ///
1013 /// Updated each time a `RecvMsgMulti` completion delivers an
1014 /// `SCM_TIMESTAMPING` cmsg. Only available when the `timestamps` feature
1015 /// is enabled and `Config::timestamps(true)` is set.
1016 #[cfg(feature = "timestamps")]
1017 pub fn recv_timestamp(&self) -> u64 {
1018 with_state(|driver, _| {
1019 driver
1020 .connections
1021 .get(self.conn_index)
1022 .map(|cs| cs.recv_timestamp_ns)
1023 .unwrap_or(0)
1024 })
1025 }
1026
1027 // ── Close / metadata ─────────────────────────────────────────────
1028
1029 /// Close this connection.
1030 pub fn close(&self) {
1031 let ptr = CURRENT_DRIVER.with(|c| c.get());
1032 if ptr.is_null() {
1033 return;
1034 }
1035 let state = unsafe { &mut *ptr };
1036 let driver = unsafe { &mut *state.driver };
1037 driver.close_connection(self.conn_index);
1038 }
1039
1040 /// Access peer address.
1041 pub fn peer_addr(&self) -> Option<crate::connection::PeerAddr> {
1042 with_state(|driver, _| {
1043 let conn = driver.connections.get(self.conn_index)?;
1044 if conn.generation != self.generation {
1045 return None;
1046 }
1047 conn.peer_addr.clone()
1048 })
1049 }
1050
1051 /// Check if this connection is outbound (initiated via connect).
1052 pub fn is_outbound(&self) -> bool {
1053 with_state(|driver, _| {
1054 driver
1055 .connections
1056 .get(self.conn_index)
1057 .map(|cs| cs.generation == self.generation && cs.outbound)
1058 .unwrap_or(false)
1059 })
1060 }
1061}
1062
1063// ── AsyncSendBuilder ─────────────────────────────────────────────────
1064
1065#[cfg(has_io_uring)]
1066/// Builder for scatter-gather sends in the async API.
1067///
1068/// Wraps `DriverCtx::send_parts()` — call `.copy()` and `.guard()` to add
1069/// parts, then `.submit()`. This is a synchronous builder; the send is
1070/// fire-and-forget (no future).
1071pub struct AsyncSendBuilder {
1072 token: ConnToken,
1073}
1074
1075#[cfg(has_io_uring)]
1076impl AsyncSendBuilder {
1077 /// Build and submit the send by calling the provided closure with a
1078 /// `SendBuilder` from the `DriverCtx`.
1079 ///
1080 /// The closure receives the `SendBuilder` and should chain `.copy()` / `.guard()`
1081 /// calls then call `.submit()`.
1082 ///
1083 /// # Example
1084 /// ```no_run
1085 /// # fn example(conn: ringline::ConnCtx) -> std::io::Result<()> {
1086 /// conn.send_parts().build(|b| {
1087 /// b.copy(b"header").submit()
1088 /// })?;
1089 /// # Ok(())
1090 /// # }
1091 /// ```
1092 pub fn build<F>(self, f: F) -> io::Result<()>
1093 where
1094 F: FnOnce(crate::handler::SendBuilder<'_, '_>) -> io::Result<()>,
1095 {
1096 with_state(|driver, _| {
1097 let mut ctx = driver.make_ctx();
1098 let builder = ctx.send_parts(self.token);
1099 f(builder)
1100 })
1101 }
1102
1103 /// Build and submit a scatter-gather send, then await completion.
1104 ///
1105 /// Like [`build()`](Self::build) but returns a [`SendFuture`] that resolves
1106 /// with the total bytes sent (or error). Use this when you need backpressure
1107 /// or send completion notification.
1108 pub fn build_await<F>(self, f: F) -> io::Result<SendFuture>
1109 where
1110 F: FnOnce(crate::handler::SendBuilder<'_, '_>) -> io::Result<()>,
1111 {
1112 with_state(|driver, executor| {
1113 let mut ctx = driver.make_ctx();
1114 let builder = ctx.send_parts(self.token);
1115 f(builder)?;
1116 let conn_index = self.token.index;
1117 executor.send_waiters[conn_index as usize] = true;
1118 Ok(SendFuture { conn_index })
1119 })
1120 }
1121
1122 /// Submit a scatter-gather send from pre-classified `SendPart`s.
1123 ///
1124 /// This avoids the lifetime constraints of the closure-based [`build()`](Self::build),
1125 /// allowing callers to mix copy and guard parts in a single SQE from borrowed data.
1126 /// Parts are consumed in order up to `MAX_IOVECS` total or `MAX_GUARDS` guards.
1127 ///
1128 /// Returns the number of parts consumed on success.
1129 pub fn submit_batch(self, parts: Vec<crate::handler::SendPart<'_>>) -> io::Result<usize> {
1130 use crate::handler::SendPart;
1131 with_state(|driver, _| {
1132 let mut ctx = driver.make_ctx();
1133 let mut builder = ctx.send_parts(self.token);
1134 let mut consumed = 0usize;
1135 for part in parts {
1136 match part {
1137 SendPart::Copy(data) => {
1138 builder = builder.copy(data);
1139 }
1140 SendPart::Guard(guard) => {
1141 builder = builder.guard(guard);
1142 }
1143 }
1144 consumed += 1;
1145 }
1146 if consumed == 0 {
1147 return Ok(0);
1148 }
1149 builder.submit()?;
1150 Ok(consumed)
1151 })
1152 }
1153
1154 /// Like [`submit_batch`](Self::submit_batch) but returns a [`SendFuture`]
1155 /// for backpressure / yield.
1156 pub fn submit_batch_await(
1157 self,
1158 parts: Vec<crate::handler::SendPart<'_>>,
1159 ) -> io::Result<(usize, SendFuture)> {
1160 use crate::handler::SendPart;
1161 with_state(|driver, executor| {
1162 let mut ctx = driver.make_ctx();
1163 let mut builder = ctx.send_parts(self.token);
1164 let mut consumed = 0usize;
1165 for part in parts {
1166 match part {
1167 SendPart::Copy(data) => {
1168 builder = builder.copy(data);
1169 }
1170 SendPart::Guard(guard) => {
1171 builder = builder.guard(guard);
1172 }
1173 }
1174 consumed += 1;
1175 }
1176 if consumed == 0 {
1177 return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty batch"));
1178 }
1179 builder.submit()?;
1180 let conn_index = self.token.index;
1181 executor.send_waiters[conn_index as usize] = true;
1182 Ok((consumed, SendFuture { conn_index }))
1183 })
1184 }
1185}
1186
1187// ── mio AsyncSendBuilder (copy-only fallback) ───────────────────────
1188
1189#[cfg(not(has_io_uring))]
1190/// Builder for scatter-gather sends in the async API (mio fallback).
1191///
1192/// On the mio backend, all parts are copied into a single buffer and
1193/// sent as one operation. Zero-copy guards are consumed by copying their
1194/// data.
1195pub struct AsyncSendBuilder {
1196 token: ConnToken,
1197}
1198
1199#[cfg(not(has_io_uring))]
1200impl ConnCtx {
1201 /// Begin building a scatter-gather send.
1202 ///
1203 /// On the mio backend, this degrades to copy-only sends.
1204 pub fn send_parts(&self) -> AsyncSendBuilder {
1205 AsyncSendBuilder {
1206 token: self.token(),
1207 }
1208 }
1209}
1210
1211#[cfg(not(has_io_uring))]
1212impl AsyncSendBuilder {
1213 /// Build and submit the send by concatenating all copy parts.
1214 pub fn build<F>(self, f: F) -> io::Result<()>
1215 where
1216 F: FnOnce(MioSendBuilder<'_>) -> io::Result<()>,
1217 {
1218 with_state(|driver, _| {
1219 let mut buf = Vec::new();
1220 let builder = MioSendBuilder { buf: &mut buf };
1221 f(builder)?;
1222 if !buf.is_empty() {
1223 let mut ctx = driver.make_ctx();
1224 ctx.send(self.token, &buf)?;
1225 }
1226 Ok(())
1227 })
1228 }
1229
1230 /// Submit a scatter-gather send from pre-classified `SendPart`s.
1231 ///
1232 /// On the mio backend, guard data is copied (no kernel zero-copy).
1233 pub fn submit_batch(self, parts: Vec<crate::handler::SendPart<'_>>) -> io::Result<usize> {
1234 use crate::handler::SendPart;
1235 with_state(|driver, _| {
1236 let mut buf = Vec::new();
1237 let mut consumed = 0usize;
1238 for part in &parts {
1239 match part {
1240 SendPart::Copy(data) => buf.extend_from_slice(data),
1241 SendPart::Guard(guard) => {
1242 let (ptr, len) = guard.as_ptr_len();
1243 let data = unsafe { std::slice::from_raw_parts(ptr, len as usize) };
1244 buf.extend_from_slice(data);
1245 }
1246 }
1247 consumed += 1;
1248 }
1249 if !buf.is_empty() {
1250 let mut ctx = driver.make_ctx();
1251 ctx.send(self.token, &buf)?;
1252 }
1253 Ok(consumed)
1254 })
1255 }
1256
1257 /// Build and submit, then await completion.
1258 pub fn build_await<F>(self, f: F) -> io::Result<SendFuture>
1259 where
1260 F: FnOnce(MioSendBuilder<'_>) -> io::Result<()>,
1261 {
1262 with_state(|driver, executor| {
1263 let mut buf = Vec::new();
1264 let builder = MioSendBuilder { buf: &mut buf };
1265 f(builder)?;
1266 if !buf.is_empty() {
1267 let mut ctx = driver.make_ctx();
1268 ctx.send(self.token, &buf)?;
1269 }
1270 let conn_index = self.token.index;
1271 executor.send_waiters[conn_index as usize] = true;
1272 Ok(SendFuture { conn_index })
1273 })
1274 }
1275}
1276
1277/// Mio send builder — accumulates parts into a single buffer.
1278#[cfg(not(has_io_uring))]
1279pub struct MioSendBuilder<'a> {
1280 buf: &'a mut Vec<u8>,
1281}
1282
1283#[cfg(not(has_io_uring))]
1284impl<'a> MioSendBuilder<'a> {
1285 /// Add a copy part.
1286 pub fn copy(self, data: &[u8]) -> Self {
1287 self.buf.extend_from_slice(data);
1288 self
1289 }
1290
1291 /// Add a guard part (copies the data on mio backend).
1292 pub fn guard(self, guard: crate::guard::GuardBox) -> Self {
1293 let (ptr, len) = guard.as_ptr_len();
1294 let data = unsafe { std::slice::from_raw_parts(ptr, len as usize) };
1295 self.buf.extend_from_slice(data);
1296 self
1297 }
1298
1299 /// Submit the accumulated send.
1300 pub fn submit(self) -> io::Result<()> {
1301 Ok(())
1302 }
1303}
1304
1305// ── WithDataFuture ───────────────────────────────────────────────────
1306
1307/// Future returned by [`ConnCtx::with_data`].
1308pub struct WithDataFuture<F> {
1309 conn_index: u32,
1310 f: Option<F>,
1311}
1312
1313impl<F: FnMut(&[u8]) -> ParseResult + Unpin> Future for WithDataFuture<F> {
1314 type Output = usize;
1315
1316 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<usize> {
1317 with_state(|driver, executor| {
1318 // Zero-copy fast path: check pending recv buffer before accumulator.
1319 // Only available on io_uring where kernel-provided buffers are used.
1320 #[cfg(has_io_uring)]
1321 if driver.pending_recv_bufs[self.conn_index as usize].is_some() {
1322 let acc_empty = driver.accumulators.data(self.conn_index).is_empty();
1323 if acc_empty {
1324 // Borrow the kernel buffer in-place — no copy.
1325 let pending = driver.pending_recv_bufs[self.conn_index as usize].unwrap();
1326 let data =
1327 unsafe { std::slice::from_raw_parts(pending.ptr, pending.len as usize) };
1328 let f = self.f.as_mut().expect("WithDataFuture polled after Ready");
1329 let result = f(data);
1330 match result {
1331 ParseResult::Consumed(consumed) if consumed > 0 => {
1332 // The closure may have called forward_recv_buf(), which
1333 // takes the pending slot. Only replenish if still present.
1334 if let Some(pending) =
1335 driver.pending_recv_bufs[self.conn_index as usize].take()
1336 {
1337 if consumed < pending.len as usize {
1338 // Partial consume: copy remainder to accumulator.
1339 let remainder = unsafe {
1340 std::slice::from_raw_parts(
1341 pending.ptr.add(consumed),
1342 pending.len as usize - consumed,
1343 )
1344 };
1345 driver.accumulators.append(self.conn_index, remainder);
1346 }
1347 driver.pending_replenish.push(pending.bid);
1348 }
1349 self.f.take();
1350 return Poll::Ready(consumed);
1351 }
1352 _ => {
1353 // NeedMore / Consumed(0): flush pending to accumulator
1354 // and fall through to the existing accumulator path.
1355 if let Some(pending) =
1356 driver.pending_recv_bufs[self.conn_index as usize].take()
1357 {
1358 let pending_data = unsafe {
1359 std::slice::from_raw_parts(pending.ptr, pending.len as usize)
1360 };
1361 driver.accumulators.append(self.conn_index, pending_data);
1362 driver.pending_replenish.push(pending.bid);
1363 }
1364 // Fall through to accumulator path below.
1365 }
1366 }
1367 } else {
1368 // Accumulator has data AND there's a pending buffer.
1369 // Flush pending to accumulator (prepend — it arrived first).
1370 let pending = driver.pending_recv_bufs[self.conn_index as usize]
1371 .take()
1372 .unwrap();
1373 let pending_data =
1374 unsafe { std::slice::from_raw_parts(pending.ptr, pending.len as usize) };
1375 driver.accumulators.prepend(self.conn_index, pending_data);
1376 driver.pending_replenish.push(pending.bid);
1377 // Fall through to accumulator path below.
1378 }
1379 }
1380
1381 let data = driver.accumulators.data(self.conn_index);
1382 if data.is_empty() {
1383 // Check if the connection has been closed — return 0 (EOF)
1384 // so the caller can detect disconnection.
1385 let is_closed = driver
1386 .connections
1387 .get(self.conn_index)
1388 .map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
1389 .unwrap_or(true); // connection already released
1390 if is_closed {
1391 let f = self.f.as_mut().expect("WithDataFuture polled after Ready");
1392 let result = f(&[]);
1393 self.f.take();
1394 return Poll::Ready(match result {
1395 ParseResult::Consumed(n) => n,
1396 ParseResult::NeedMore => 0,
1397 });
1398 }
1399
1400 // No data available — register as recv waiter and park.
1401 executor.recv_waiters[self.conn_index as usize] = true;
1402 return Poll::Pending;
1403 }
1404
1405 // Data available — call closure immediately (zero-overhead hot path).
1406 let f = self.f.as_mut().expect("WithDataFuture polled after Ready");
1407 let result = f(data);
1408 match result {
1409 ParseResult::Consumed(consumed) if consumed > 0 => {
1410 driver.accumulators.consume(self.conn_index, consumed);
1411 self.f.take();
1412 return Poll::Ready(consumed);
1413 }
1414 _ => {}
1415 }
1416
1417 // NeedMore or Consumed(0) on non-empty data: incomplete parse.
1418 // Check if the connection is closed (EOF with leftover partial data).
1419 let is_closed = driver
1420 .connections
1421 .get(self.conn_index)
1422 .map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
1423 .unwrap_or(true);
1424 if is_closed {
1425 self.f.take();
1426 return Poll::Ready(0);
1427 }
1428
1429 // Connection still open — wait for more data before retrying.
1430 executor.recv_waiters[self.conn_index as usize] = true;
1431 Poll::Pending
1432 })
1433 }
1434}
1435
1436// ── WithBytesFuture ──────────────────────────────────────────────────
1437
1438/// Future returned by [`ConnCtx::with_bytes`].
1439pub struct WithBytesFuture<F> {
1440 conn_index: u32,
1441 f: Option<F>,
1442}
1443
1444impl<F: FnMut(Bytes) -> ParseResult + Unpin> Future for WithBytesFuture<F> {
1445 type Output = usize;
1446
1447 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<usize> {
1448 with_state(|driver, executor| {
1449 // Flush any pending zero-copy recv buffer to accumulator so
1450 // take_frozen() will include it (io_uring only).
1451 #[cfg(has_io_uring)]
1452 if let Some(pending) = driver.pending_recv_bufs[self.conn_index as usize].take() {
1453 let pending_data =
1454 unsafe { std::slice::from_raw_parts(pending.ptr, pending.len as usize) };
1455 driver.accumulators.append(self.conn_index, pending_data);
1456 driver.pending_replenish.push(pending.bid);
1457 }
1458
1459 let data = driver.accumulators.data(self.conn_index);
1460 if data.is_empty() {
1461 // Check if the connection has been closed — return 0 (EOF).
1462 let is_closed = driver
1463 .connections
1464 .get(self.conn_index)
1465 .map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
1466 .unwrap_or(true);
1467 if is_closed {
1468 let f = self.f.as_mut().expect("WithBytesFuture polled after Ready");
1469 let result = f(Bytes::new());
1470 self.f.take();
1471 return Poll::Ready(match result {
1472 ParseResult::Consumed(n) => n,
1473 ParseResult::NeedMore => 0,
1474 });
1475 }
1476
1477 executor.recv_waiters[self.conn_index as usize] = true;
1478 return Poll::Pending;
1479 }
1480
1481 // Detach accumulator as frozen Bytes (O(1)).
1482 let frozen = driver.accumulators.take_frozen(self.conn_index);
1483 let len = frozen.len();
1484
1485 let f = self.f.as_mut().expect("WithBytesFuture polled after Ready");
1486 let result = f(frozen.clone());
1487
1488 match result {
1489 ParseResult::Consumed(consumed) if consumed > 0 => {
1490 // Put back unconsumed remainder (if any).
1491 if consumed < len {
1492 driver
1493 .accumulators
1494 .prepend(self.conn_index, &frozen[consumed..]);
1495 }
1496 self.f.take();
1497 return Poll::Ready(consumed);
1498 }
1499 _ => {}
1500 }
1501
1502 // NeedMore or Consumed(0) on non-empty data: incomplete parse.
1503 // Put everything back.
1504 driver.accumulators.prepend(self.conn_index, &frozen[..]);
1505
1506 let is_closed = driver
1507 .connections
1508 .get(self.conn_index)
1509 .map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
1510 .unwrap_or(true);
1511 if is_closed {
1512 self.f.take();
1513 return Poll::Ready(0);
1514 }
1515
1516 executor.recv_waiters[self.conn_index as usize] = true;
1517 Poll::Pending
1518 })
1519 }
1520}
1521
1522// ── RecvReadyFuture ──────────────────────────────────────────────────
1523
1524/// Future returned by [`ConnCtx::recv_ready`]. Resolves when:
1525/// 1. The recv sink has received data (`pos > 0`), OR
1526/// 2. The accumulator has data, OR
1527/// 3. The connection is closed.
1528pub struct RecvReadyFuture {
1529 conn_index: u32,
1530}
1531
1532impl Future for RecvReadyFuture {
1533 type Output = ();
1534
1535 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
1536 with_state(|driver, executor| {
1537 // Check recv sink.
1538 if let Some(sink) = &executor.recv_sinks[self.conn_index as usize]
1539 && sink.pos > 0
1540 {
1541 return Poll::Ready(());
1542 }
1543
1544 // Check accumulator.
1545 if !driver.accumulators.data(self.conn_index).is_empty() {
1546 return Poll::Ready(());
1547 }
1548
1549 // Check if the connection is closed.
1550 let is_closed = driver
1551 .connections
1552 .get(self.conn_index)
1553 .map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
1554 .unwrap_or(true);
1555 if is_closed {
1556 return Poll::Ready(());
1557 }
1558
1559 // Not ready — register as recv waiter and park.
1560 executor.recv_waiters[self.conn_index as usize] = true;
1561 Poll::Pending
1562 })
1563 }
1564}
1565
1566// ── SendFuture ───────────────────────────────────────────────────────
1567
1568/// Future that awaits send completion. The SQE was already submitted eagerly
1569/// by [`ConnCtx::send`] — this future only waits for the CQE result.
1570/// No data stored in the future. No allocation.
1571pub struct SendFuture {
1572 conn_index: u32,
1573}
1574
1575impl Future for SendFuture {
1576 type Output = io::Result<u32>;
1577
1578 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u32>> {
1579 with_state(|_driver, executor| {
1580 match executor.io_results[self.conn_index as usize].take() {
1581 Some(IoResult::Send(result)) => Poll::Ready(result),
1582 _ => {
1583 // Not ready yet — re-register waiter.
1584 executor.send_waiters[self.conn_index as usize] = true;
1585 Poll::Pending
1586 }
1587 }
1588 })
1589 }
1590}
1591
1592impl Drop for SendFuture {
1593 fn drop(&mut self) {
1594 let ptr = CURRENT_DRIVER.with(|c| c.get());
1595 if ptr.is_null() {
1596 return;
1597 }
1598 let state = unsafe { &mut *ptr };
1599 let executor = unsafe { &mut *state.executor };
1600 executor.send_waiters[self.conn_index as usize] = false;
1601 }
1602}
1603
1604// ── ConnectFuture ────────────────────────────────────────────────────
1605
1606/// Future that awaits an outbound TCP connection. The connect SQE was submitted
1607/// eagerly by [`ConnCtx::connect`] — this future waits for the CQE result.
1608pub struct ConnectFuture {
1609 conn_index: u32,
1610 generation: u32,
1611}
1612
1613impl Future for ConnectFuture {
1614 type Output = io::Result<ConnCtx>;
1615
1616 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<ConnCtx>> {
1617 with_state(|_driver, executor| {
1618 match executor.io_results[self.conn_index as usize].take() {
1619 Some(IoResult::Connect(result)) => match result {
1620 Ok(()) => Poll::Ready(Ok(ConnCtx::new(self.conn_index, self.generation))),
1621 Err(e) => Poll::Ready(Err(e)),
1622 },
1623 _ => {
1624 // Not ready yet — re-register waiter.
1625 executor.connect_waiters[self.conn_index as usize] = true;
1626 Poll::Pending
1627 }
1628 }
1629 })
1630 }
1631}
1632
1633impl Drop for ConnectFuture {
1634 fn drop(&mut self) {
1635 let ptr = CURRENT_DRIVER.with(|c| c.get());
1636 if ptr.is_null() {
1637 return;
1638 }
1639 let state = unsafe { &mut *ptr };
1640 let executor = unsafe { &mut *state.executor };
1641 executor.connect_waiters[self.conn_index as usize] = false;
1642 }
1643}
1644
1645// ── Sleep ────────────────────────────────────────────────────────────
1646
1647/// Create a future that completes after the given duration.
1648///
1649/// Uses an io_uring timeout SQE internally — no busy-waiting, no timer
1650/// thread. The timer fires on the same worker thread as the calling task.
1651///
1652/// # Panics
1653///
1654/// Panics if the timer slot pool is exhausted, or if called outside the
1655/// ringline async executor.
1656pub fn sleep(duration: Duration) -> SleepFuture {
1657 SleepFuture {
1658 duration,
1659 timer_slot: None,
1660 generation: 0,
1661 absolute: None,
1662 }
1663}
1664
1665/// Future returned by [`sleep()`] or [`sleep_until()`]. Completes after
1666/// the configured duration or at the given deadline.
1667pub struct SleepFuture {
1668 duration: Duration,
1669 /// None until first poll, then Some(slot_index).
1670 timer_slot: Option<u32>,
1671 /// Generation when the slot was allocated.
1672 generation: u16,
1673 /// If Some, this is an absolute timer (sleep_until).
1674 absolute: Option<Deadline>,
1675}
1676
1677impl Future for SleepFuture {
1678 type Output = ();
1679
1680 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
1681 with_state(|driver, executor| {
1682 let _ = driver; // used only on io_uring path
1683 if let Some(slot) = self.timer_slot {
1684 // Already submitted — check if fired.
1685 if executor.timer_pool.is_fired(slot) {
1686 executor.timer_pool.release(slot);
1687 self.timer_slot = None;
1688 return Poll::Ready(());
1689 }
1690 return Poll::Pending;
1691 }
1692
1693 // First poll — allocate slot, fill timespec, submit SQE.
1694 let waker_id = CURRENT_TASK_ID.with(|c| c.get());
1695 let (slot, generation) = match executor.timer_pool.allocate(waker_id) {
1696 Some(pair) => pair,
1697 None => {
1698 // Pool exhausted — complete immediately rather than panic.
1699 // Callers needing explicit error handling should use try_sleep().
1700 return Poll::Ready(());
1701 }
1702 };
1703
1704 #[cfg(has_io_uring)]
1705 {
1706 let payload = TimerSlotPool::encode_payload(slot, generation);
1707 let ud = UserData::encode(OpTag::Timer, 0, payload);
1708
1709 let submit_result = if let Some(deadline) = self.absolute {
1710 let ts_ptr =
1711 executor
1712 .timer_pool
1713 .set_absolute(slot, deadline.secs, deadline.nsecs);
1714 driver.ring.submit_timeout_abs(ts_ptr, ud)
1715 } else {
1716 let ts_ptr = executor.timer_pool.set_relative(slot, self.duration);
1717 driver.ring.submit_timeout(ts_ptr, ud)
1718 };
1719
1720 if let Err(_e) = submit_result {
1721 executor.timer_pool.release(slot);
1722 // On SQE submission failure, complete immediately rather than hang.
1723 return Poll::Ready(());
1724 }
1725 }
1726
1727 #[cfg(not(has_io_uring))]
1728 {
1729 // Mio backend: store deadline; the event loop polls timer expiry.
1730 if let Some(deadline) = self.absolute {
1731 executor
1732 .timer_pool
1733 .set_absolute(slot, deadline.secs, deadline.nsecs);
1734 } else {
1735 executor.timer_pool.set_relative(slot, self.duration);
1736 }
1737 }
1738
1739 self.timer_slot = Some(slot);
1740 self.generation = generation;
1741 Poll::Pending
1742 })
1743 }
1744}
1745
1746impl Drop for SleepFuture {
1747 fn drop(&mut self) {
1748 if let Some(slot) = self.timer_slot {
1749 // Timer was submitted but not yet fired — try to cancel it.
1750 let ptr = CURRENT_DRIVER.with(|c| c.get());
1751 if ptr.is_null() {
1752 return;
1753 }
1754 let state = unsafe { &mut *ptr };
1755 #[cfg(has_io_uring)]
1756 let driver = unsafe { &mut *state.driver };
1757 let executor = unsafe { &mut *state.executor };
1758
1759 if !executor.timer_pool.is_fired(slot) {
1760 #[cfg(has_io_uring)]
1761 {
1762 let payload = TimerSlotPool::encode_payload(slot, self.generation);
1763 let target_ud = UserData::encode(OpTag::Timer, 0, payload);
1764 // Best effort cancel; timer fires harmlessly if already expired.
1765 let _ = driver.ring.submit_async_cancel(target_ud.raw(), 0);
1766 }
1767 }
1768 // Slot released regardless — stale timer CQE detected via generation.
1769 executor.timer_pool.release(slot);
1770 }
1771 }
1772}
1773
1774/// Create a sleep future, returning an error if the timer pool is exhausted.
1775///
1776/// Unlike [`sleep()`] which panics on pool exhaustion, this returns
1777/// `Err(TimerExhausted)` so callers can handle capacity limits gracefully.
1778///
1779/// The timer slot is allocated eagerly (at call time, not on first poll).
1780///
1781/// # Panics
1782///
1783/// Panics if called outside the ringline async executor.
1784pub fn try_sleep(duration: Duration) -> Result<SleepFuture, TimerExhausted> {
1785 with_state(|driver, executor| {
1786 let _ = driver; // used only on io_uring path
1787 let waker_id = CURRENT_TASK_ID.with(|c| c.get());
1788 let (slot, generation) = executor.timer_pool.allocate(waker_id).ok_or_else(|| {
1789 crate::metrics::POOL.increment(crate::metrics::pool::TIMER_EXHAUSTED);
1790 TimerExhausted
1791 })?;
1792
1793 #[cfg(has_io_uring)]
1794 {
1795 let payload = TimerSlotPool::encode_payload(slot, generation);
1796 let ud = UserData::encode(OpTag::Timer, 0, payload);
1797 let ts_ptr = executor.timer_pool.set_relative(slot, duration);
1798
1799 if let Err(_e) = driver.ring.submit_timeout(ts_ptr, ud) {
1800 executor.timer_pool.release(slot);
1801 // SQE submission failure — complete immediately (same as sleep()).
1802 return Ok(SleepFuture {
1803 duration,
1804 timer_slot: None,
1805 generation: 0,
1806 absolute: None,
1807 });
1808 }
1809 }
1810
1811 #[cfg(not(has_io_uring))]
1812 {
1813 executor.timer_pool.set_relative(slot, duration);
1814 }
1815
1816 Ok(SleepFuture {
1817 duration,
1818 timer_slot: Some(slot),
1819 generation,
1820 absolute: None,
1821 })
1822 })
1823}
1824
1825// ── Deadline (absolute timer support) ─────────────────────────────────
1826
1827/// A monotonic clock deadline for use with absolute timers.
1828///
1829/// Created via [`Deadline::after()`] or [`Deadline::now()`].
1830/// Uses `CLOCK_MONOTONIC` to match io_uring's default clock.
1831#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
1832pub struct Deadline {
1833 pub(crate) secs: u64,
1834 pub(crate) nsecs: u32,
1835}
1836
1837impl Deadline {
1838 /// Capture the current monotonic time.
1839 pub fn now() -> Self {
1840 let mut ts = libc::timespec {
1841 tv_sec: 0,
1842 tv_nsec: 0,
1843 };
1844 // Safety: clock_gettime with CLOCK_MONOTONIC is safe.
1845 unsafe {
1846 libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts);
1847 }
1848 Deadline {
1849 secs: ts.tv_sec as u64,
1850 nsecs: ts.tv_nsec as u32,
1851 }
1852 }
1853
1854 /// Create a deadline `duration` from now.
1855 pub fn after(duration: Duration) -> Self {
1856 let now = Self::now();
1857 let mut secs = now.secs + duration.as_secs();
1858 let mut nsecs = now.nsecs + duration.subsec_nanos();
1859 if nsecs >= 1_000_000_000 {
1860 nsecs -= 1_000_000_000;
1861 secs += 1;
1862 }
1863 Deadline { secs, nsecs }
1864 }
1865
1866 /// Duration remaining until this deadline (saturates at zero).
1867 pub fn remaining(&self) -> Duration {
1868 let now = Self::now();
1869 if now.secs > self.secs || (now.secs == self.secs && now.nsecs >= self.nsecs) {
1870 return Duration::ZERO;
1871 }
1872 let mut secs = self.secs - now.secs;
1873 let nsecs = if self.nsecs >= now.nsecs {
1874 self.nsecs - now.nsecs
1875 } else {
1876 secs -= 1;
1877 1_000_000_000 + self.nsecs - now.nsecs
1878 };
1879 Duration::new(secs, nsecs)
1880 }
1881}
1882
1883/// Create a future that completes at the given absolute deadline.
1884///
1885/// Uses io_uring's `TIMEOUT_ABS` flag with `CLOCK_MONOTONIC` for
1886/// precise deadline-based timing without accumulated drift.
1887///
1888/// # Panics
1889///
1890/// Panics if the timer slot pool is exhausted, or if called outside the
1891/// ringline async executor.
1892pub fn sleep_until(deadline: Deadline) -> SleepFuture {
1893 SleepFuture {
1894 duration: Duration::ZERO, // unused for absolute timers
1895 timer_slot: None,
1896 generation: 0,
1897 absolute: Some(deadline),
1898 }
1899}
1900
1901/// Create an absolute-deadline sleep, returning an error if the timer pool is exhausted.
1902///
1903/// Unlike [`sleep_until()`] which panics on pool exhaustion, this returns
1904/// `Err(TimerExhausted)` so callers can handle capacity limits gracefully.
1905///
1906/// The timer slot is allocated eagerly (at call time, not on first poll).
1907///
1908/// # Panics
1909///
1910/// Panics if called outside the ringline async executor.
1911pub fn try_sleep_until(deadline: Deadline) -> Result<SleepFuture, TimerExhausted> {
1912 with_state(|driver, executor| {
1913 let _ = driver; // used only on io_uring path
1914 let waker_id = CURRENT_TASK_ID.with(|c| c.get());
1915 let (slot, generation) = executor.timer_pool.allocate(waker_id).ok_or_else(|| {
1916 crate::metrics::POOL.increment(crate::metrics::pool::TIMER_EXHAUSTED);
1917 TimerExhausted
1918 })?;
1919
1920 #[cfg(has_io_uring)]
1921 {
1922 let payload = TimerSlotPool::encode_payload(slot, generation);
1923 let ud = UserData::encode(OpTag::Timer, 0, payload);
1924 let ts_ptr = executor
1925 .timer_pool
1926 .set_absolute(slot, deadline.secs, deadline.nsecs);
1927
1928 if let Err(_e) = driver.ring.submit_timeout_abs(ts_ptr, ud) {
1929 executor.timer_pool.release(slot);
1930 return Ok(SleepFuture {
1931 duration: Duration::ZERO,
1932 timer_slot: None,
1933 generation: 0,
1934 absolute: Some(deadline),
1935 });
1936 }
1937 }
1938
1939 #[cfg(not(has_io_uring))]
1940 {
1941 executor
1942 .timer_pool
1943 .set_absolute(slot, deadline.secs, deadline.nsecs);
1944 }
1945
1946 Ok(SleepFuture {
1947 duration: Duration::ZERO,
1948 timer_slot: Some(slot),
1949 generation,
1950 absolute: Some(deadline),
1951 })
1952 })
1953}
1954
1955// ── Timeout ──────────────────────────────────────────────────────────
1956
1957/// Error returned when a [`timeout()`] deadline expires.
1958#[derive(Debug, Clone, PartialEq, Eq)]
1959pub struct Elapsed;
1960
1961impl fmt::Display for Elapsed {
1962 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1963 f.write_str("deadline has elapsed")
1964 }
1965}
1966
1967impl std::error::Error for Elapsed {}
1968
1969/// Wrap a future with a deadline. If the future does not complete within
1970/// `duration`, returns `Err(Elapsed)`.
1971///
1972/// # Example
1973///
1974/// ```no_run
1975/// # async fn example() {
1976/// use std::time::Duration;
1977/// match ringline::timeout(Duration::from_secs(1), async { 42 }).await {
1978/// Ok(value) => { /* completed in time */ }
1979/// Err(_elapsed) => { /* timed out */ }
1980/// }
1981/// # }
1982/// ```
1983pub fn timeout<F: Future>(duration: Duration, future: F) -> TimeoutFuture<F> {
1984 TimeoutFuture {
1985 future,
1986 sleep: sleep(duration),
1987 }
1988}
1989
1990/// Wrap a future with a deadline, returning an error if the timer pool is full.
1991///
1992/// Unlike [`timeout()`] which panics on pool exhaustion, this returns
1993/// `Err(TimerExhausted)` so callers can handle capacity limits gracefully.
1994///
1995/// # Panics
1996///
1997/// Panics if called outside the ringline async executor.
1998pub fn try_timeout<F: Future>(
1999 duration: Duration,
2000 future: F,
2001) -> Result<TimeoutFuture<F>, TimerExhausted> {
2002 let sleep = try_sleep(duration)?;
2003 Ok(TimeoutFuture { future, sleep })
2004}
2005
2006/// Wrap a future with an absolute deadline. If the future does not complete
2007/// before `deadline`, returns `Err(Elapsed)`.
2008///
2009/// Uses io_uring's `TIMEOUT_ABS` flag with `CLOCK_MONOTONIC`.
2010///
2011/// # Panics
2012///
2013/// Panics if the timer slot pool is exhausted.
2014pub fn timeout_at<F: Future>(deadline: Deadline, future: F) -> TimeoutFuture<F> {
2015 TimeoutFuture {
2016 future,
2017 sleep: sleep_until(deadline),
2018 }
2019}
2020
2021/// Wrap a future with an absolute deadline, returning an error if the timer pool is full.
2022///
2023/// Unlike [`timeout_at()`] which panics on pool exhaustion, this returns
2024/// `Err(TimerExhausted)` so callers can handle capacity limits gracefully.
2025///
2026/// # Panics
2027///
2028/// Panics if called outside the ringline async executor.
2029pub fn try_timeout_at<F: Future>(
2030 deadline: Deadline,
2031 future: F,
2032) -> Result<TimeoutFuture<F>, TimerExhausted> {
2033 let sleep = try_sleep_until(deadline)?;
2034 Ok(TimeoutFuture { future, sleep })
2035}
2036
2037pin_project_lite::pin_project! {
2038 /// Future returned by [`timeout()`] or [`timeout_at()`].
2039 pub struct TimeoutFuture<F> {
2040 #[pin]
2041 future: F,
2042 #[pin]
2043 sleep: SleepFuture,
2044 }
2045}
2046
2047impl<F: Future> Future for TimeoutFuture<F> {
2048 type Output = Result<F::Output, Elapsed>;
2049
2050 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2051 let this = self.project();
2052
2053 // Poll the inner future first.
2054 if let Poll::Ready(output) = this.future.poll(cx) {
2055 return Poll::Ready(Ok(output));
2056 }
2057
2058 // Poll the sleep timer.
2059 if let Poll::Ready(()) = this.sleep.poll(cx) {
2060 return Poll::Ready(Err(Elapsed));
2061 }
2062
2063 Poll::Pending
2064 }
2065}
2066
2067// ── Disk I/O async API ──────────────────────────────────────────────
2068
2069/// Future that awaits a disk I/O completion (NVMe or Direct I/O).
2070///
2071/// The io_uring SQE was submitted before this future was created.
2072/// On completion, the CQE handler stores the result and wakes the task.
2073pub struct DiskIoFuture {
2074 pub(crate) seq: u32,
2075}
2076
2077impl Future for DiskIoFuture {
2078 type Output = io::Result<i32>;
2079
2080 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<i32>> {
2081 with_state(|_driver, executor| {
2082 match executor.disk_io_results.remove(&self.seq) {
2083 Some(result) if result < 0 => {
2084 Poll::Ready(Err(io::Error::from_raw_os_error(-result)))
2085 }
2086 Some(result) => Poll::Ready(Ok(result)),
2087 None => {
2088 // Re-register waiter (polled before CQE arrived or after spurious wake).
2089 let task_id = CURRENT_TASK_ID.with(|c| c.get());
2090 executor.disk_io_waiters.insert(self.seq, task_id);
2091 Poll::Pending
2092 }
2093 }
2094 })
2095 }
2096}
2097
2098impl Drop for DiskIoFuture {
2099 fn drop(&mut self) {
2100 let ptr = CURRENT_DRIVER.with(|c| c.get());
2101 if ptr.is_null() {
2102 return;
2103 }
2104 let state = unsafe { &mut *ptr };
2105 let executor = unsafe { &mut *state.executor };
2106 executor.disk_io_waiters.remove(&self.seq);
2107 }
2108}
2109
2110/// Open a Direct I/O file from any async task.
2111///
2112/// Returns a [`DirectIoFile`](crate::direct_io::DirectIoFile) handle
2113/// for use with [`direct_io_read()`].
2114///
2115/// # Panics
2116///
2117/// Panics if called outside the ringline async executor.
2118pub fn open_direct_io_file(path: &str) -> io::Result<crate::direct_io::DirectIoFile> {
2119 with_state(|driver, _| {
2120 let mut ctx = driver.make_ctx();
2121 ctx.open_direct_io_file(path)
2122 })
2123}
2124
2125/// Open an NVMe device from any async task.
2126///
2127/// Returns an [`NvmeDevice`](crate::nvme::NvmeDevice) handle
2128/// for use with [`nvme_read()`], [`nvme_write()`], and [`nvme_flush()`].
2129///
2130/// # Panics
2131///
2132/// Panics if called outside the ringline async executor.
2133pub fn open_nvme_device(path: &str, nsid: u32) -> io::Result<crate::nvme::NvmeDevice> {
2134 with_state(|driver, _| {
2135 let mut ctx = driver.make_ctx();
2136 ctx.open_nvme_device(path, nsid)
2137 })
2138}
2139
2140/// Submit a Direct I/O read and return a future for the result.
2141///
2142/// Reads `len` bytes from `offset` into the buffer at `buf`.
2143/// The returned future completes when the io_uring CQE arrives.
2144///
2145/// # Safety
2146///
2147/// `buf` must point to aligned, writable memory of at least `len` bytes
2148/// that remains valid until the future completes.
2149///
2150/// # Panics
2151///
2152/// Panics if called outside the ringline async executor.
2153pub unsafe fn direct_io_read(
2154 file: crate::direct_io::DirectIoFile,
2155 offset: u64,
2156 buf: *mut u8,
2157 len: u32,
2158) -> io::Result<DiskIoFuture> {
2159 with_state(|driver, executor| {
2160 let mut ctx = driver.make_ctx();
2161 // Safety: the outer `direct_io_read()` is already unsafe, and the
2162 // caller guarantees the buffer invariants.
2163 #[allow(unused_unsafe)]
2164 let seq = unsafe { ctx.direct_io_read(file, offset, buf, len)? };
2165 let task_id = CURRENT_TASK_ID.with(|c| c.get());
2166 executor.disk_io_waiters.insert(seq, task_id);
2167 Ok(DiskIoFuture { seq })
2168 })
2169}
2170
2171/// Submit an NVMe read and return a future for the result.
2172///
2173/// Reads `num_blocks` logical blocks starting at `lba` into the buffer
2174/// at `buf_addr` with length `buf_len`. The returned future completes
2175/// when the io_uring CQE arrives.
2176///
2177/// # Safety
2178///
2179/// `buf_addr` must point to valid, aligned memory of at least `buf_len`
2180/// bytes that remains valid until the returned future completes.
2181///
2182/// # Panics
2183///
2184/// Panics if called outside the ringline async executor.
2185pub fn nvme_read(
2186 device: crate::nvme::NvmeDevice,
2187 lba: u64,
2188 num_blocks: u16,
2189 buf_addr: u64,
2190 buf_len: u32,
2191) -> io::Result<DiskIoFuture> {
2192 with_state(|driver, executor| {
2193 let mut ctx = driver.make_ctx();
2194 let seq = ctx.nvme_read(device, lba, num_blocks, buf_addr, buf_len)?;
2195 let task_id = CURRENT_TASK_ID.with(|c| c.get());
2196 executor.disk_io_waiters.insert(seq, task_id);
2197 Ok(DiskIoFuture { seq })
2198 })
2199}
2200
2201/// Submit a Direct I/O write and return a future for the result.
2202///
2203/// Writes `len` bytes from the buffer at `buf` to `offset` in the file.
2204/// The returned future completes when the io_uring CQE arrives.
2205///
2206/// # Safety
2207///
2208/// `buf` must point to aligned, readable memory of at least `len` bytes
2209/// that remains valid until the future completes.
2210///
2211/// # Panics
2212///
2213/// Panics if called outside the ringline async executor.
2214pub unsafe fn direct_io_write(
2215 file: crate::direct_io::DirectIoFile,
2216 offset: u64,
2217 buf: *const u8,
2218 len: u32,
2219) -> io::Result<DiskIoFuture> {
2220 with_state(|driver, executor| {
2221 let mut ctx = driver.make_ctx();
2222 // Safety: the outer `direct_io_write()` is already unsafe, and the
2223 // caller guarantees the buffer invariants.
2224 #[allow(unused_unsafe)]
2225 let seq = unsafe { ctx.direct_io_write(file, offset, buf, len)? };
2226 let task_id = CURRENT_TASK_ID.with(|c| c.get());
2227 executor.disk_io_waiters.insert(seq, task_id);
2228 Ok(DiskIoFuture { seq })
2229 })
2230}
2231
2232/// Submit an NVMe flush and return a future for the result.
2233///
2234/// Flushes volatile write caches on the device, ensuring all previously
2235/// written data is persisted to non-volatile storage. The returned future
2236/// completes when the io_uring CQE arrives.
2237///
2238/// # Panics
2239///
2240/// Panics if called outside the ringline async executor.
2241pub fn nvme_flush(device: crate::nvme::NvmeDevice) -> io::Result<DiskIoFuture> {
2242 with_state(|driver, executor| {
2243 let mut ctx = driver.make_ctx();
2244 let seq = ctx.nvme_flush(device)?;
2245 let task_id = CURRENT_TASK_ID.with(|c| c.get());
2246 executor.disk_io_waiters.insert(seq, task_id);
2247 Ok(DiskIoFuture { seq })
2248 })
2249}
2250
2251/// Submit an NVMe write and return a future for the result.
2252///
2253/// Writes `num_blocks` logical blocks starting at `lba` from the buffer
2254/// at `buf_addr` with length `buf_len`. The returned future completes
2255/// when the io_uring CQE arrives.
2256///
2257/// # Safety
2258///
2259/// `buf_addr` must point to valid, aligned memory of at least `buf_len`
2260/// bytes that remains valid until the returned future completes.
2261///
2262/// # Panics
2263///
2264/// Panics if called outside the ringline async executor.
2265pub fn nvme_write(
2266 device: crate::nvme::NvmeDevice,
2267 lba: u64,
2268 num_blocks: u16,
2269 buf_addr: u64,
2270 buf_len: u32,
2271) -> io::Result<DiskIoFuture> {
2272 with_state(|driver, executor| {
2273 let mut ctx = driver.make_ctx();
2274 let seq = ctx.nvme_write(device, lba, num_blocks, buf_addr, buf_len)?;
2275 let task_id = CURRENT_TASK_ID.with(|c| c.get());
2276 executor.disk_io_waiters.insert(seq, task_id);
2277 Ok(DiskIoFuture { seq })
2278 })
2279}
2280
2281// ── UDP async API ───────────────────────────────────────────────────
2282
2283/// Async context for a UDP socket.
2284///
2285/// Passed to [`AsyncEventHandler::on_udp_bind()`](crate::AsyncEventHandler::on_udp_bind)
2286/// for each bound UDP socket. Provides async recv and fire-and-forget send.
2287#[derive(Clone, Copy)]
2288pub struct UdpCtx {
2289 pub(crate) udp_index: u32,
2290}
2291
2292impl UdpCtx {
2293 /// Returns the UDP socket index within this worker.
2294 pub fn index(&self) -> usize {
2295 self.udp_index as usize
2296 }
2297
2298 /// Receive a datagram, returning the payload and source address.
2299 ///
2300 /// Suspends until a datagram is available. Each call returns exactly one
2301 /// datagram. The payload is copied into a `Vec<u8>` (datagrams are
2302 /// typically small, so this is acceptable for the initial implementation).
2303 pub fn recv_from(&self) -> UdpRecvFuture {
2304 UdpRecvFuture {
2305 udp_index: self.udp_index,
2306 }
2307 }
2308
2309 /// Resolve when at least one UDP send slot is available on this socket.
2310 ///
2311 /// Use this to back off when [`UdpCtx::send_to`] returns
2312 /// [`crate::error::UdpSendError::PoolExhausted`], rather than busy-looping.
2313 /// On the io_uring backend the future suspends until a completion frees
2314 /// a slot. On the mio backend sends are synchronous and this future is
2315 /// always immediately ready — callers can still use it to stay
2316 /// backend-agnostic.
2317 ///
2318 /// Only one task should await this per socket at a time; a second waiter
2319 /// overwrites the first, which is then leaked (it must still be driven
2320 /// from another wake source).
2321 pub fn send_ready(&self) -> UdpSendReadyFuture {
2322 UdpSendReadyFuture {
2323 udp_index: self.udp_index,
2324 }
2325 }
2326
2327 /// Send a datagram to the given peer (fire-and-forget, copying).
2328 ///
2329 /// Copies `data` into the send pool and submits a `sendmsg` SQE.
2330 /// Only one send can be in-flight per UDP socket at a time.
2331 #[cfg(has_io_uring)]
2332 pub fn send_to(&self, peer: SocketAddr, data: &[u8]) -> Result<(), crate::error::UdpSendError> {
2333 with_state(|driver, _executor| driver.udp_send_to(self.udp_index, peer, data))
2334 }
2335
2336 /// Send a datagram to the given peer (mio backend — synchronous non-blocking send).
2337 ///
2338 /// `WouldBlock` is surfaced as [`crate::error::UdpSendError::PoolExhausted`] so callers
2339 /// have a single "try again later" branch that matches the io_uring backend.
2340 #[cfg(not(has_io_uring))]
2341 pub fn send_to(&self, peer: SocketAddr, data: &[u8]) -> Result<(), crate::error::UdpSendError> {
2342 with_state(|driver, _executor| {
2343 let idx = self.udp_index as usize;
2344 if idx >= driver.udp_sockets.len() {
2345 return Err(crate::error::UdpSendError::Io(io::Error::other(
2346 "invalid UDP socket index",
2347 )));
2348 }
2349 match driver.udp_sockets[idx].send_to(data, peer) {
2350 Ok(_) => {
2351 crate::metrics::UDP.increment(crate::metrics::udp::DATAGRAMS_SENT);
2352 Ok(())
2353 }
2354 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
2355 Err(crate::error::UdpSendError::PoolExhausted)
2356 }
2357 Err(e) => Err(crate::error::UdpSendError::Io(e)),
2358 }
2359 })
2360 }
2361}
2362
2363/// Future returned by [`UdpCtx::recv_from()`].
2364pub struct UdpRecvFuture {
2365 udp_index: u32,
2366}
2367
2368impl Future for UdpRecvFuture {
2369 type Output = (Vec<u8>, SocketAddr);
2370
2371 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
2372 with_state(|_driver, executor| {
2373 let idx = self.udp_index as usize;
2374 if idx < executor.udp_recv_queues.len()
2375 && let Some(datagram) = executor.udp_recv_queues[idx].pop_front()
2376 {
2377 return Poll::Ready(datagram);
2378 }
2379 // Register as waiter so the CQE handler wakes us.
2380 let task_id = CURRENT_TASK_ID.with(|c| c.get());
2381 if idx < executor.udp_recv_waiters.len() {
2382 executor.udp_recv_waiters[idx] = Some(task_id);
2383 }
2384 Poll::Pending
2385 })
2386 }
2387}
2388
2389/// Future returned by [`UdpCtx::send_ready()`].
2390pub struct UdpSendReadyFuture {
2391 /// Never read on the mio backend — the future resolves immediately
2392 /// without looking at any per-socket state.
2393 #[cfg_attr(not(has_io_uring), allow(dead_code))]
2394 udp_index: u32,
2395}
2396
2397impl Future for UdpSendReadyFuture {
2398 type Output = ();
2399
2400 #[cfg(has_io_uring)]
2401 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
2402 with_state(|driver, executor| {
2403 let idx = self.udp_index as usize;
2404 if idx < driver.udp_sockets.len() && !driver.udp_sockets[idx].send_freelist.is_empty() {
2405 return Poll::Ready(());
2406 }
2407 // Register as the waiter. CQE handler wakes us when a slot frees.
2408 let task_id = CURRENT_TASK_ID.with(|c| c.get());
2409 if idx < executor.udp_send_ready_waiters.len() {
2410 executor.udp_send_ready_waiters[idx] = Some(task_id);
2411 }
2412 Poll::Pending
2413 })
2414 }
2415
2416 /// Mio backend: sends are synchronous `send_to` calls that never queue,
2417 /// so this future resolves immediately. Callers stay backend-agnostic.
2418 #[cfg(not(has_io_uring))]
2419 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
2420 Poll::Ready(())
2421 }
2422}