Skip to main content

ringline/runtime/
io.rs

1use std::cell::{Cell, RefCell};
2use std::fmt;
3use std::future::Future;
4use std::io;
5use std::net::SocketAddr;
6use std::pin::Pin;
7use std::rc::Rc;
8use std::task::{Context, Poll};
9use std::time::Duration;
10
11use bytes::Bytes;
12
13use crate::backend::Driver;
14#[cfg(has_io_uring)]
15use crate::completion::{OpTag, UserData};
16use crate::error::TimerExhausted;
17use crate::handler::ConnToken;
18#[cfg(has_io_uring)]
19use crate::runtime::TimerSlotPool;
20use crate::runtime::task::TaskId;
21use crate::runtime::waker::STANDALONE_BIT;
22use crate::runtime::{CURRENT_TASK_ID, Executor, IoResult};
23
24/// Result of a parse closure passed to [`ConnCtx::with_data`] or [`ConnCtx::with_bytes`].
25///
26/// When the closure returns `NeedMore` or `Consumed(0)`, the future parks and
27/// retries when more data arrives. `Consumed(0)` on a non-empty buffer is
28/// treated identically to `NeedMore`. When the connection is closed (EOF),
29/// the `with_data`/`with_bytes` future resolves with `0` regardless of the
30/// parse result.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum ParseResult {
33    /// The closure consumed `n` bytes from the buffer.
34    ///
35    /// `Consumed(0)` on non-empty data is treated as "need more data" — the
36    /// future will park and retry when additional bytes arrive.
37    Consumed(usize),
38    /// The closure needs more data before it can make progress.
39    NeedMore,
40}
41
42/// Raw pointer to the driver + executor state, set before polling each task.
43///
44/// # Safety
45///
46/// This is safe because:
47/// 1. Single-threaded: each worker thread has its own driver/executor.
48/// 2. Scoped: set before poll, cleared after poll. The pointer is only
49///    dereferenced within a Future::poll call.
50/// 3. The pointed-to data lives on the worker thread's stack (in AsyncEventLoop::run).
51pub(crate) struct DriverState {
52    pub(crate) driver: *mut Driver,
53    pub(crate) executor: *mut Executor,
54}
55
56thread_local! {
57    pub(crate) static CURRENT_DRIVER: Cell<*mut DriverState> =
58        const { Cell::new(std::ptr::null_mut()) };
59}
60
61/// Set the thread-local driver pointer before polling a task.
62pub(crate) fn set_driver_state(state: *mut DriverState) {
63    CURRENT_DRIVER.with(|c| c.set(state));
64}
65
66/// Clear the thread-local driver pointer after polling a task.
67pub(crate) fn clear_driver_state() {
68    CURRENT_DRIVER.with(|c| c.set(std::ptr::null_mut()));
69}
70
71/// Access the thread-local driver state. Panics if called outside the executor.
72pub(crate) fn with_state<R>(f: impl FnOnce(&mut Driver, &mut Executor) -> R) -> R {
73    let ptr = CURRENT_DRIVER.with(|c| c.get());
74    assert!(!ptr.is_null(), "called outside executor");
75    let state = unsafe { &mut *ptr };
76    let driver = unsafe { &mut *state.driver };
77    let executor = unsafe { &mut *state.executor };
78    f(driver, executor)
79}
80
81/// Access the thread-local driver state, returning `None` if called outside the executor.
82pub(crate) fn try_with_state<R>(f: impl FnOnce(&mut Driver, &mut Executor) -> R) -> Option<R> {
83    let ptr = CURRENT_DRIVER.with(|c| c.get());
84    if ptr.is_null() {
85        return None;
86    }
87    let state = unsafe { &mut *ptr };
88    let driver = unsafe { &mut *state.driver };
89    let executor = unsafe { &mut *state.executor };
90    Some(f(driver, executor))
91}
92
93/// Spawn a standalone async task on the current worker thread.
94///
95/// Unlike connection tasks (which are 1:1 with connections), standalone tasks
96/// are not bound to any connection. They run on the same single-threaded
97/// executor and can use [`sleep()`](crate::sleep) and [`timeout()`](crate::timeout),
98/// but cannot perform connection I/O directly.
99///
100/// Returns `Err` if called outside the ringline async executor or if the
101/// standalone task slab is full.
102pub fn spawn(future: impl Future<Output = ()> + 'static) -> io::Result<TaskId> {
103    try_with_state(
104        |_driver, executor| match executor.standalone_slab.spawn(Box::pin(future)) {
105            Some(idx) => {
106                executor.ready_queue.push_back(idx | STANDALONE_BIT);
107                Ok(TaskId(idx))
108            }
109            None => Err(io::Error::other("standalone task slab exhausted")),
110        },
111    )
112    .unwrap_or_else(|| Err(io::Error::other("called outside executor")))
113}
114
115impl TaskId {
116    /// Cancel a standalone task. Drops the future immediately, freeing
117    /// the slab slot. No-op if the task already completed.
118    ///
119    /// Must be called from within the ringline executor (i.e., from a
120    /// connection task or standalone task). Panics otherwise.
121    ///
122    /// Any pending timers owned by the dropped future are cancelled
123    /// via their `Drop` impl. Stale entries in the ready queue are
124    /// silently skipped when the executor encounters them.
125    pub fn cancel(self) {
126        with_state(|_driver, executor| {
127            executor.standalone_slab.remove(self.0);
128        });
129    }
130}
131
132// ── JoinHandle ──────────────────────────────────────────────────────
133
134/// Shared state between a spawned wrapper future and its [`JoinHandle`].
135struct JoinState<T> {
136    /// The task's return value, written by the wrapper when it completes.
137    result: Option<T>,
138    /// Raw task ID of the task awaiting this handle (includes `STANDALONE_BIT`
139    /// for standalone tasks). Set by `JoinHandle::poll` when it returns `Pending`.
140    waiter: Option<u32>,
141    /// True if [`JoinHandle::abort`] was called.
142    aborted: bool,
143}
144
145/// Handle to a spawned task's return value.
146///
147/// Obtained from [`spawn_with_handle()`]. Implements [`Future`] — awaiting it
148/// yields the task's return value `T` once the task completes.
149///
150/// # Drop semantics
151///
152/// Dropping a `JoinHandle` without awaiting it **detaches** the task: the
153/// task continues running but its result is discarded. This matches tokio's
154/// semantics.
155///
156/// # Abort
157///
158/// [`abort()`](Self::abort) cancels the spawned task. A `JoinHandle` that has
159/// been aborted will never resolve if polled.
160pub struct JoinHandle<T> {
161    state: Rc<RefCell<JoinState<T>>>,
162    task_id: TaskId,
163}
164
165impl<T> JoinHandle<T> {
166    /// Get the underlying [`TaskId`].
167    pub fn id(&self) -> TaskId {
168        self.task_id
169    }
170
171    /// Cancel the spawned task.
172    ///
173    /// The future is dropped immediately and its slab slot is freed.
174    /// After this call, awaiting the handle will hang forever — use
175    /// [`select()`](crate::select) with a flag if you need to detect
176    /// cancellation.
177    pub fn abort(&self) {
178        self.state.borrow_mut().aborted = true;
179        self.task_id.cancel();
180    }
181}
182
183impl<T: 'static> Future for JoinHandle<T> {
184    type Output = T;
185
186    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
187        let mut s = self.state.borrow_mut();
188        if s.aborted {
189            return Poll::Pending;
190        }
191        if let Some(value) = s.result.take() {
192            return Poll::Ready(value);
193        }
194        // Register this task as the waiter so the child can wake us.
195        s.waiter = Some(CURRENT_TASK_ID.with(|c| c.get()));
196        Poll::Pending
197    }
198}
199
200/// Spawn a standalone async task and return a handle to await its result.
201///
202/// Like [`spawn()`], the future runs on the current worker's single-threaded
203/// executor. The returned [`JoinHandle<T>`] implements [`Future<Output = T>`] —
204/// awaiting it yields the task's return value once it completes.
205///
206/// # Detach semantics
207///
208/// Dropping the handle without awaiting it detaches the task: the task keeps
209/// running but its result is silently discarded.
210///
211/// # Errors
212///
213/// Returns `Err` if called outside the ringline executor or if the standalone
214/// task slab is exhausted.
215///
216/// # Panics in the spawned task
217///
218/// A panic in the spawned future unwinds the worker thread (same as [`spawn()`]).
219pub fn spawn_with_handle<T: 'static>(
220    future: impl Future<Output = T> + 'static,
221) -> io::Result<JoinHandle<T>> {
222    let state = Rc::new(RefCell::new(JoinState {
223        result: None,
224        waiter: None,
225        aborted: false,
226    }));
227    let state_for_wrapper = Rc::clone(&state);
228
229    let wrapper = async move {
230        let value = future.await;
231        let mut s = state_for_wrapper.borrow_mut();
232        s.result = Some(value);
233        let waiter = s.waiter.take();
234        // Drop the borrow before calling with_state — defensive against
235        // any re-entrant borrow in the wakeup path.
236        drop(s);
237        if let Some(waiter_id) = waiter {
238            with_state(|_driver, executor| {
239                executor.wake_task(waiter_id);
240            });
241        }
242    };
243
244    try_with_state(
245        |_driver, executor| match executor.standalone_slab.spawn(Box::pin(wrapper)) {
246            Some(idx) => {
247                executor.ready_queue.push_back(idx | STANDALONE_BIT);
248                Ok(JoinHandle {
249                    state: Rc::clone(&state),
250                    task_id: TaskId(idx),
251                })
252            }
253            None => Err(io::Error::other("standalone task slab exhausted")),
254        },
255    )
256    .unwrap_or_else(|| Err(io::Error::other("called outside executor")))
257}
258
259/// Offload a blocking closure to the dedicated blocking thread pool.
260///
261/// The closure runs on a low-priority background thread (`SCHED_IDLE`),
262/// keeping the io_uring event loop unblocked. Returns a future that
263/// resolves to the closure's return value.
264///
265/// # Errors
266///
267/// Returns `Err` if called outside the ringline executor or if the blocking
268/// pool is not configured (`blocking_threads = 0`).
269pub fn spawn_blocking<T: Send + 'static>(
270    f: impl FnOnce() -> T + Send + 'static,
271) -> io::Result<BlockingJoinHandle<T>> {
272    try_with_state(|driver, executor| {
273        let pool = driver
274            .blocking_pool
275            .as_ref()
276            .ok_or_else(|| io::Error::other("blocking pool not configured"))?;
277        let blocking_tx = driver
278            .blocking_tx
279            .as_ref()
280            .ok_or_else(|| io::Error::other("blocking pool not configured"))?;
281
282        let request_id = executor.next_blocking_id;
283        executor.next_blocking_id += 1;
284
285        let task_id = CURRENT_TASK_ID.with(|c| c.get());
286        executor
287            .pending_blocking
288            .insert(request_id, (task_id, None));
289
290        let work = Box::new(move || -> Box<dyn std::any::Any + Send> { Box::new(f()) });
291
292        pool.request_tx
293            .send(crate::blocking::BlockingRequest {
294                work,
295                request_id,
296                response_tx: blocking_tx.clone(),
297                wake_handle: driver.wake_handle,
298            })
299            .map_err(|_| io::Error::other("blocking pool shut down"))?;
300
301        Ok(BlockingJoinHandle {
302            request_id,
303            _phantom: std::marker::PhantomData,
304        })
305    })
306    .unwrap_or_else(|| Err(io::Error::other("called outside executor")))
307}
308
309/// Future returned by [`spawn_blocking()`]. Resolves to the closure's return value.
310pub struct BlockingJoinHandle<T> {
311    request_id: u64,
312    _phantom: std::marker::PhantomData<T>,
313}
314
315impl<T: 'static> Future for BlockingJoinHandle<T> {
316    type Output = T;
317
318    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
319        with_state(|_driver, executor| {
320            if let Some((_, slot)) = executor.pending_blocking.get_mut(&self.request_id)
321                && let Some(boxed) = slot.take()
322            {
323                executor.pending_blocking.remove(&self.request_id);
324                let value = *boxed
325                    .downcast::<T>()
326                    .expect("type mismatch in BlockingJoinHandle");
327                return Poll::Ready(value);
328            }
329            Poll::Pending
330        })
331    }
332}
333
334/// Initiate an outbound TCP connection from any async task (connection or standalone).
335///
336/// This is the free-function equivalent of [`ConnCtx::connect()`] — it can be
337/// called from standalone tasks spawned via [`spawn()`] or from an
338/// [`AsyncEventHandler::on_start()`](crate::AsyncEventHandler::on_start) future,
339/// where no `ConnCtx` is available.
340///
341/// Returns a [`ConnectFuture`] that resolves with a [`ConnCtx`] for the new connection.
342///
343/// # Panics
344///
345/// Panics if called outside the ringline async executor.
346pub fn connect(addr: SocketAddr) -> io::Result<ConnectFuture> {
347    with_state(|driver, executor| {
348        let mut ctx = driver.make_ctx();
349        let token = ctx
350            .connect(addr)
351            .map_err(|e| io::Error::other(e.to_string()))?;
352        let calling_task = CURRENT_TASK_ID.with(|c| c.get());
353        executor.owner_task[token.index as usize] = Some(calling_task);
354        executor.connect_waiters[token.index as usize] = true;
355        Ok(ConnectFuture {
356            conn_index: token.index,
357            generation: token.generation,
358        })
359    })
360}
361
362/// Initiate an outbound TCP connection with a timeout from any async task.
363///
364/// Free-function equivalent of [`ConnCtx::connect_with_timeout()`].
365///
366/// # Panics
367///
368/// Panics if called outside the ringline async executor.
369pub fn connect_with_timeout(addr: SocketAddr, timeout_ms: u64) -> io::Result<ConnectFuture> {
370    with_state(|driver, executor| {
371        let mut ctx = driver.make_ctx();
372        let token = ctx
373            .connect_with_timeout(addr, timeout_ms)
374            .map_err(|e| io::Error::other(e.to_string()))?;
375        let calling_task = CURRENT_TASK_ID.with(|c| c.get());
376        executor.owner_task[token.index as usize] = Some(calling_task);
377        executor.connect_waiters[token.index as usize] = true;
378        Ok(ConnectFuture {
379            conn_index: token.index,
380            generation: token.generation,
381        })
382    })
383}
384
385/// Initiate an outbound Unix domain socket connection from any async task.
386///
387/// Free-function equivalent of [`ConnCtx::connect_unix()`].
388///
389/// Returns a [`ConnectFuture`] that resolves with a [`ConnCtx`] for the new connection.
390///
391/// # Panics
392///
393/// Panics if called outside the ringline async executor.
394pub fn connect_unix(path: impl AsRef<std::path::Path>) -> io::Result<ConnectFuture> {
395    with_state(|driver, executor| {
396        let mut ctx = driver.make_ctx();
397        let token = ctx
398            .connect_unix(path.as_ref())
399            .map_err(|e| io::Error::other(e.to_string()))?;
400        let calling_task = CURRENT_TASK_ID.with(|c| c.get());
401        executor.owner_task[token.index as usize] = Some(calling_task);
402        executor.connect_waiters[token.index as usize] = true;
403        Ok(ConnectFuture {
404            conn_index: token.index,
405            generation: token.generation,
406        })
407    })
408}
409
410/// Initiate an outbound TLS connection from any async task (connection or standalone).
411///
412/// This is the free-function equivalent of [`ConnCtx::connect_tls()`] — it can be
413/// called from standalone tasks spawned via [`spawn()`] or from an
414/// [`AsyncEventHandler::on_start()`](crate::AsyncEventHandler::on_start) future,
415/// where no `ConnCtx` is available.
416///
417/// `server_name` is the SNI hostname for the TLS handshake.
418///
419/// Returns a [`ConnectFuture`] that resolves with a [`ConnCtx`] for the new connection
420/// once both the TCP and TLS handshakes complete.
421///
422/// # Panics
423///
424/// Panics if called outside the ringline async executor.
425pub fn connect_tls(addr: SocketAddr, server_name: &str) -> io::Result<ConnectFuture> {
426    with_state(|driver, executor| {
427        let mut ctx = driver.make_ctx();
428        let token = ctx
429            .connect_tls(addr, server_name)
430            .map_err(|e| io::Error::other(e.to_string()))?;
431        let calling_task = CURRENT_TASK_ID.with(|c| c.get());
432        executor.owner_task[token.index as usize] = Some(calling_task);
433        executor.connect_waiters[token.index as usize] = true;
434        Ok(ConnectFuture {
435            conn_index: token.index,
436            generation: token.generation,
437        })
438    })
439}
440
441/// Initiate an outbound TLS connection with a timeout from any async task.
442///
443/// Free-function equivalent of [`ConnCtx::connect_tls_with_timeout()`].
444///
445/// `server_name` is the SNI hostname for the TLS handshake.
446///
447/// # Panics
448///
449/// Panics if called outside the ringline async executor.
450pub fn connect_tls_with_timeout(
451    addr: SocketAddr,
452    server_name: &str,
453    timeout_ms: u64,
454) -> io::Result<ConnectFuture> {
455    with_state(|driver, executor| {
456        let mut ctx = driver.make_ctx();
457        let token = ctx
458            .connect_tls_with_timeout(addr, server_name, timeout_ms)
459            .map_err(|e| io::Error::other(e.to_string()))?;
460        let calling_task = CURRENT_TASK_ID.with(|c| c.get());
461        executor.owner_task[token.index as usize] = Some(calling_task);
462        executor.connect_waiters[token.index as usize] = true;
463        Ok(ConnectFuture {
464            conn_index: token.index,
465            generation: token.generation,
466        })
467    })
468}
469
470// ── DNS Resolution ──────────────────────────────────────────────────
471
472/// Resolve a hostname to a [`SocketAddr`] using the dedicated resolver pool.
473///
474/// Performs `getaddrinfo` on a background thread, keeping the io_uring event
475/// loop unblocked. Returns the first resolved address.
476///
477/// # Errors
478///
479/// Returns `Err` if called outside the ringline executor, the resolver pool
480/// is not configured (`resolver_threads = 0`), or `getaddrinfo` fails.
481pub fn resolve(host: &str, port: u16) -> io::Result<ResolveFuture> {
482    let host = host.to_string();
483    try_with_state(|driver, executor| {
484        let resolver = driver
485            .resolver
486            .as_ref()
487            .ok_or_else(|| io::Error::other("resolver pool not configured"))?;
488        let resolve_tx = driver
489            .resolve_tx
490            .as_ref()
491            .ok_or_else(|| io::Error::other("resolver pool not configured"))?;
492
493        let request_id = executor.next_resolve_id;
494        executor.next_resolve_id += 1;
495
496        let task_id = CURRENT_TASK_ID.with(|c| c.get());
497        executor
498            .pending_resolves
499            .insert(request_id, (task_id, None));
500
501        resolver
502            .request_tx
503            .send(crate::resolver::ResolveRequest {
504                host,
505                port,
506                request_id,
507                response_tx: resolve_tx.clone(),
508                wake_handle: driver.wake_handle,
509            })
510            .map_err(|_| io::Error::other("resolver pool shut down"))?;
511
512        Ok(ResolveFuture { request_id })
513    })
514    .unwrap_or_else(|| Err(io::Error::other("called outside executor")))
515}
516
517/// Future returned by [`resolve()`]. Resolves to a [`SocketAddr`].
518pub struct ResolveFuture {
519    request_id: u64,
520}
521
522impl Future for ResolveFuture {
523    type Output = io::Result<std::net::SocketAddr>;
524
525    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
526        with_state(|_driver, executor| {
527            if let Some((_, slot)) = executor.pending_resolves.get_mut(&self.request_id)
528                && let Some(result) = slot.take()
529            {
530                executor.pending_resolves.remove(&self.request_id);
531                return Poll::Ready(result);
532            }
533            Poll::Pending
534        })
535    }
536}
537
538/// Request graceful shutdown of the worker event loop from any async task.
539///
540/// This is the free-function equivalent of [`ConnCtx::request_shutdown()`] —
541/// it can be called from standalone tasks or [`AsyncEventHandler::on_start()`](crate::AsyncEventHandler::on_start)
542/// futures where no `ConnCtx` is available.
543///
544/// Returns `Err` if called outside the ringline async executor.
545pub fn request_shutdown() -> io::Result<()> {
546    try_with_state(|driver, _| {
547        let mut ctx = driver.make_ctx();
548        ctx.request_shutdown();
549    })
550    .ok_or_else(|| io::Error::other("called outside executor"))
551}
552
553/// The async equivalent of `ConnToken` + `DriverCtx`. Passed to the
554/// connection's async fn, provides I/O methods.
555///
556/// Async connection context providing send, recv, and connect operations.
557///
558/// Each accepted connection receives a `ConnCtx` in [`AsyncEventHandler::on_accept`](crate::AsyncEventHandler::on_accept).
559/// It exposes an async API for reading data ([`with_data`](Self::with_data),
560/// [`with_bytes`](Self::with_bytes)), sending data ([`send`](Self::send),
561/// [`send_nowait`](Self::send_nowait)), and initiating outbound connections
562/// ([`connect`](Self::connect)).
563///
564/// A `ConnCtx` is valid for the lifetime of the connection's async task.
565/// When the connection is closed, the task is dropped along with the `ConnCtx`.
566#[derive(Clone, Copy)]
567pub struct ConnCtx {
568    pub(crate) conn_index: u32,
569    pub(crate) generation: u32,
570}
571
572impl ConnCtx {
573    /// Create a new ConnCtx for the given connection.
574    pub(crate) fn new(conn_index: u32, generation: u32) -> Self {
575        ConnCtx {
576            conn_index,
577            generation,
578        }
579    }
580
581    /// Returns the connection slot index. Useful for indexing into per-connection arrays.
582    pub fn index(&self) -> usize {
583        self.conn_index as usize
584    }
585
586    /// Returns the `ConnToken` for this connection.
587    pub fn token(&self) -> ConnToken {
588        ConnToken::new(self.conn_index, self.generation)
589    }
590
591    // ── Recv ─────────────────────────────────────────────────────────
592
593    /// Wait until recv data is available, then process it.
594    ///
595    /// The closure receives accumulated bytes and returns a [`ParseResult`]:
596    /// - `ParseResult::Consumed(n)` — `n` bytes were consumed from the buffer.
597    /// - `ParseResult::NeedMore` — the closure needs more data before making progress.
598    ///
599    /// Resolves immediately when data is already buffered (cache-hit hot path).
600    ///
601    /// If the closure returns `NeedMore` or `Consumed(0)` on non-empty data
602    /// (incomplete parse), the future parks and retries when more data arrives.
603    /// The closure must therefore be safe to call multiple times (`FnMut`).
604    pub fn with_data<F: FnMut(&[u8]) -> ParseResult>(&self, f: F) -> WithDataFuture<F> {
605        WithDataFuture {
606            conn_index: self.conn_index,
607            f: Some(f),
608        }
609    }
610
611    /// Wait until recv data is available, then provide it as zero-copy `Bytes`.
612    ///
613    /// Like [`with_data()`](Self::with_data), but the closure receives a `Bytes`
614    /// handle that can be sliced (O(1), refcounted) instead of copied. The
615    /// closure returns a [`ParseResult`] indicating bytes consumed.
616    ///
617    /// This enables zero-copy RESP parsing: the parser can call `bytes.slice()`
618    /// to extract sub-ranges without allocating.
619    pub fn with_bytes<F: FnMut(Bytes) -> ParseResult>(&self, f: F) -> WithBytesFuture<F> {
620        WithBytesFuture {
621            conn_index: self.conn_index,
622            f: Some(f),
623        }
624    }
625
626    /// Install a recv sink so that CQE data is written directly to the
627    /// target buffer instead of the per-connection accumulator.
628    ///
629    /// # Safety
630    ///
631    /// The caller must ensure that `target` points to writable memory of at
632    /// least `len` bytes, and that the memory remains valid until
633    /// [`take_recv_sink()`](Self::take_recv_sink) is called. In practice this
634    /// is guaranteed because ringline is single-threaded: the task sets the sink,
635    /// yields, and the CQE handler (same thread) writes to it before the task
636    /// resumes and clears the sink.
637    pub unsafe fn set_recv_sink(&self, target: *mut u8, len: usize) {
638        with_state(|_driver, executor| {
639            executor.recv_sinks[self.conn_index as usize] = Some(crate::runtime::RecvSink {
640                ptr: target,
641                cap: len,
642                pos: 0,
643            });
644        });
645    }
646
647    /// Remove the recv sink and return the number of bytes written to it.
648    /// Returns 0 if no sink was active.
649    pub fn take_recv_sink(&self) -> usize {
650        with_state(
651            |_driver, executor| match executor.recv_sinks[self.conn_index as usize].take() {
652                Some(sink) => sink.pos,
653                None => 0,
654            },
655        )
656    }
657
658    /// Returns a future that becomes ready when any recv data is available
659    /// (in the accumulator, recv sink, or connection is closed).
660    ///
661    /// Use this with [`set_recv_sink()`](Self::set_recv_sink) to wait for
662    /// direct-to-buffer writes without processing accumulator data.
663    pub fn recv_ready(&self) -> RecvReadyFuture {
664        RecvReadyFuture {
665            conn_index: self.conn_index,
666        }
667    }
668
669    /// Non-blocking accumulator access. Calls `f` with buffered data if any,
670    /// returning `Some(result)`. Returns `None` if the accumulator is empty.
671    pub fn try_with_data<F: FnOnce(&[u8]) -> ParseResult>(&self, f: F) -> Option<ParseResult> {
672        with_state(|driver, _executor| {
673            let data = driver.accumulators.data(self.conn_index);
674            if data.is_empty() {
675                return None;
676            }
677            let result = f(data);
678            if let ParseResult::Consumed(consumed) = result {
679                driver.accumulators.consume(self.conn_index, consumed);
680            }
681            Some(result)
682        })
683    }
684
685    // ── Send (synchronous / fire-and-forget) ─────────────────────────
686
687    /// Fire-and-forget send: copies data into the send pool and submits the SQE.
688    /// One copy, no heap allocation, no future.
689    ///
690    /// This is the hot-path send for cache responses. The CQE is handled
691    /// internally by the executor (resource cleanup + send queue advancement).
692    ///
693    /// # Errors
694    ///
695    /// Returns `Err` if the send copy pool is exhausted or the submission queue is full.
696    ///
697    /// For backpressure-aware sending, use [`send()`](Self::send) instead.
698    pub fn send_nowait(&self, data: &[u8]) -> io::Result<()> {
699        with_state(|driver, _| {
700            let mut ctx = driver.make_ctx();
701            ctx.send(self.token(), data)
702        })
703    }
704
705    /// Forward the current pending recv buffer as a zero-copy send.
706    ///
707    /// This is intended for use inside a `with_data` closure. If the connection
708    /// has a pending recv buffer (from the zero-copy recv path), it is taken and
709    /// used as the send source directly — no copy into the send pool. The recv
710    /// buffer is replenished when the send completes.
711    ///
712    /// Falls back to `send_nowait(data)` if there is no pending recv buffer
713    /// (e.g., data came from the accumulator or a TLS connection).
714    ///
715    /// Only works for plaintext connections; TLS connections always copy.
716    /// On the mio backend, this always uses the copy path.
717    pub fn forward_recv_buf(&self, data: &[u8]) -> io::Result<()> {
718        with_state(|driver, _| {
719            #[cfg_attr(not(has_io_uring), allow(unused_variables))]
720            let conn_index = self.conn_index;
721
722            #[cfg(has_io_uring)]
723            {
724                // Check for pending recv buffer.
725                if let Some(pending) = driver.pending_recv_bufs[conn_index as usize].take() {
726                    // Verify the data pointer matches the pending buffer (sanity check).
727                    let pending_ptr = pending.ptr;
728                    let data_ptr = data.as_ptr();
729                    if data_ptr == pending_ptr && data.len() == pending.len as usize {
730                        // Submit send SQE from the recv buffer. The bid is replenished
731                        // on send completion via handle_send_recv_buf.
732                        // Payload: bid in low 16 bits, remaining_len in high 16 bits.
733                        debug_assert!(
734                            pending.len <= 0xFFFF,
735                            "forward_recv_buf: data length {} exceeds 16-bit payload capacity",
736                            pending.len,
737                        );
738                        let payload = (pending.bid as u32) | ((pending.len) << 16);
739                        // Store original data length for correct offset on partial sends.
740                        driver.send_recv_buf_original_lens[conn_index as usize] = pending.len;
741                        let user_data = crate::completion::UserData::encode(
742                            crate::completion::OpTag::SendRecvBuf,
743                            conn_index,
744                            payload,
745                        );
746                        let entry = io_uring::opcode::Send::new(
747                            io_uring::types::Fixed(conn_index),
748                            pending_ptr,
749                            pending.len,
750                        )
751                        .build()
752                        .user_data(user_data.raw());
753
754                        let built = crate::handler::BuiltSend {
755                            entry,
756                            pool_slot: u16::MAX,
757                            #[cfg(has_io_uring)]
758                            slab_idx: u16::MAX,
759                            total_len: pending.len,
760                        };
761
762                        let result = driver.submit_or_queue_send(conn_index, built);
763                        if result.is_err() {
764                            // Submit failed — replenish the recv buffer.
765                            driver.pending_replenish.push(pending.bid);
766                        }
767                        return result;
768                    }
769
770                    // Pointer mismatch �� put it back and fall through to copy path.
771                    driver.pending_recv_bufs[conn_index as usize] = Some(pending);
772                }
773            }
774
775            // No pending recv buffer (or mio backend) — fall back to copy send.
776            let mut ctx = driver.make_ctx();
777            ctx.send(self.token(), data)
778        })
779    }
780
781    /// Begin building a scatter-gather send with mixed copy + zero-copy guard parts.
782    ///
783    /// This mirrors `DriverCtx::send_parts()` — use `.copy(data)` for copied parts
784    /// and `.guard(guard)` for zero-copy parts backed by `SendGuard`. Call `.submit()`
785    /// to submit the SQE. Fire-and-forget: no future returned.
786    #[cfg(has_io_uring)]
787    pub fn send_parts(&self) -> AsyncSendBuilder {
788        AsyncSendBuilder {
789            token: self.token(),
790        }
791    }
792
793    // ── Send (awaitable) ─────────────────────────────────────────────
794
795    /// Send data and await completion. Copies data into the send pool, submits
796    /// the SQE eagerly, then returns a future that resolves with the total bytes
797    /// sent (or error).
798    ///
799    /// Use this when you need backpressure or send completion notification.
800    /// For fire-and-forget sending, use [`send_nowait()`](Self::send_nowait).
801    ///
802    /// # Errors
803    ///
804    /// Returns `Err` if the send copy pool is exhausted or the submission queue is full.
805    pub fn send(&self, data: &[u8]) -> io::Result<SendFuture> {
806        with_state(|driver, executor| {
807            let mut ctx = driver.make_ctx();
808            ctx.send(self.token(), data)?;
809            executor.send_waiters[self.conn_index as usize] = true;
810            // On mio, the send is buffered synchronously — record a pending
811            // completion so the event loop can deliver wake_send for each
812            // individual awaitable send.
813            #[cfg(not(has_io_uring))]
814            {
815                driver.send_completions[self.conn_index as usize].push_back(data.len() as u32);
816            }
817            Ok(SendFuture {
818                conn_index: self.conn_index,
819            })
820        })
821    }
822
823    // ── Connect ──────────────────────────────────────────────────────
824
825    /// Initiate an outbound TCP connection and await the result.
826    ///
827    /// Returns a new `ConnCtx` for the peer connection on success.
828    pub fn connect(&self, addr: SocketAddr) -> io::Result<ConnectFuture> {
829        with_state(|driver, executor| {
830            let mut ctx = driver.make_ctx();
831            let token = ctx
832                .connect(addr)
833                .map_err(|e| io::Error::other(e.to_string()))?;
834            let calling_task = CURRENT_TASK_ID.with(|c| c.get());
835            executor.owner_task[token.index as usize] = Some(calling_task);
836            executor.connect_waiters[token.index as usize] = true;
837            Ok(ConnectFuture {
838                conn_index: token.index,
839                generation: token.generation,
840            })
841        })
842    }
843
844    /// Initiate an outbound TCP connection with a timeout and await the result.
845    pub fn connect_with_timeout(
846        &self,
847        addr: SocketAddr,
848        timeout_ms: u64,
849    ) -> io::Result<ConnectFuture> {
850        with_state(|driver, executor| {
851            let mut ctx = driver.make_ctx();
852            let token = ctx
853                .connect_with_timeout(addr, timeout_ms)
854                .map_err(|e| io::Error::other(e.to_string()))?;
855            let calling_task = CURRENT_TASK_ID.with(|c| c.get());
856            executor.owner_task[token.index as usize] = Some(calling_task);
857            executor.connect_waiters[token.index as usize] = true;
858            Ok(ConnectFuture {
859                conn_index: token.index,
860                generation: token.generation,
861            })
862        })
863    }
864
865    /// Initiate an outbound Unix domain socket connection and await the result.
866    ///
867    /// Returns a new `ConnCtx` for the peer connection on success.
868    pub fn connect_unix(&self, path: impl AsRef<std::path::Path>) -> io::Result<ConnectFuture> {
869        with_state(|driver, executor| {
870            let mut ctx = driver.make_ctx();
871            let token = ctx
872                .connect_unix(path.as_ref())
873                .map_err(|e| io::Error::other(e.to_string()))?;
874            let calling_task = CURRENT_TASK_ID.with(|c| c.get());
875            executor.owner_task[token.index as usize] = Some(calling_task);
876            executor.connect_waiters[token.index as usize] = true;
877            Ok(ConnectFuture {
878                conn_index: token.index,
879                generation: token.generation,
880            })
881        })
882    }
883
884    // ── Send chain ────────────────────────────────────────────────────
885
886    /// Build an IO_LINK chained send on this connection (fire-and-forget).
887    ///
888    /// The closure receives a [`SendChainBuilder`](crate::handler::SendChainBuilder) for
889    /// constructing linked SQEs. Call `.copy()`, `.parts()...add()` to add SQEs,
890    /// then `.finish()` to submit the chain.
891    ///
892    /// For backpressure-aware chained sending, use [`send_chain()`](Self::send_chain).
893    #[cfg(has_io_uring)]
894    pub fn send_chain_nowait<F, R>(&self, f: F) -> R
895    where
896        F: FnOnce(crate::handler::SendChainBuilder<'_, '_>) -> R,
897    {
898        with_state(|driver, _| {
899            let mut ctx = driver.make_ctx();
900            let token = ConnToken::new(self.conn_index, self.generation);
901            let builder = ctx.send_chain(token);
902            f(builder)
903        })
904    }
905
906    /// Build an IO_LINK chained send and await completion.
907    ///
908    /// The closure receives a [`SendChainBuilder`](crate::handler::SendChainBuilder) for
909    /// constructing linked SQEs. Call `.copy()`, `.parts()...add()` to build the
910    /// chain, then `.finish()` to submit it. Returns a [`SendFuture`] that
911    /// resolves with total bytes sent.
912    ///
913    /// For fire-and-forget chained sending, use [`send_chain_nowait()`](Self::send_chain_nowait).
914    #[cfg(has_io_uring)]
915    pub fn send_chain<F>(&self, f: F) -> io::Result<SendFuture>
916    where
917        F: FnOnce(crate::handler::SendChainBuilder<'_, '_>) -> io::Result<()>,
918    {
919        with_state(|driver, executor| {
920            let mut ctx = driver.make_ctx();
921            let token = ConnToken::new(self.conn_index, self.generation);
922            let builder = ctx.send_chain(token);
923            f(builder)?;
924            executor.send_waiters[self.conn_index as usize] = true;
925            Ok(SendFuture {
926                conn_index: self.conn_index,
927            })
928        })
929    }
930
931    // ── Shutdown / cancel ─────────────────────────────────────────────
932
933    /// Shutdown the write side of the connection (half-close).
934    ///
935    /// Sends a TCP FIN to the peer. The read side remains open.
936    pub fn shutdown_write(&self) {
937        with_state(|driver, _| {
938            let mut ctx = driver.make_ctx();
939            ctx.shutdown_write(self.token());
940        })
941    }
942
943    /// Cancel pending I/O operations on this connection.
944    pub fn cancel(&self) -> io::Result<()> {
945        with_state(|driver, _| {
946            let mut ctx = driver.make_ctx();
947            ctx.cancel(self.token())
948        })
949    }
950
951    /// Request graceful shutdown of the worker event loop.
952    pub fn request_shutdown(&self) {
953        with_state(|driver, _| {
954            let mut ctx = driver.make_ctx();
955            ctx.request_shutdown();
956        })
957    }
958
959    // ── TLS ──────────────────────────────────────────────────────────
960
961    /// Query TLS session info for this connection.
962    pub fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
963        with_state(|driver, _| {
964            let ctx = driver.make_ctx();
965            ctx.tls_info(self.token())
966        })
967    }
968
969    /// Initiate an outbound TLS connection and await the result.
970    pub fn connect_tls(&self, addr: SocketAddr, server_name: &str) -> io::Result<ConnectFuture> {
971        with_state(|driver, executor| {
972            let mut ctx = driver.make_ctx();
973            let token = ctx
974                .connect_tls(addr, server_name)
975                .map_err(|e| io::Error::other(e.to_string()))?;
976            let calling_task = CURRENT_TASK_ID.with(|c| c.get());
977            executor.owner_task[token.index as usize] = Some(calling_task);
978            executor.connect_waiters[token.index as usize] = true;
979            Ok(ConnectFuture {
980                conn_index: token.index,
981                generation: token.generation,
982            })
983        })
984    }
985
986    /// Initiate an outbound TLS connection with a timeout and await the result.
987    pub fn connect_tls_with_timeout(
988        &self,
989        addr: SocketAddr,
990        server_name: &str,
991        timeout_ms: u64,
992    ) -> io::Result<ConnectFuture> {
993        with_state(|driver, executor| {
994            let mut ctx = driver.make_ctx();
995            let token = ctx
996                .connect_tls_with_timeout(addr, server_name, timeout_ms)
997                .map_err(|e| io::Error::other(e.to_string()))?;
998            let calling_task = CURRENT_TASK_ID.with(|c| c.get());
999            executor.owner_task[token.index as usize] = Some(calling_task);
1000            executor.connect_waiters[token.index as usize] = true;
1001            Ok(ConnectFuture {
1002                conn_index: token.index,
1003                generation: token.generation,
1004            })
1005        })
1006    }
1007
1008    // ── Timestamps ─────────────────────────────────────────────────
1009
1010    /// Returns the most recent kernel RX software timestamp as nanoseconds
1011    /// since epoch (`CLOCK_REALTIME`), or 0 if no timestamp has been received.
1012    ///
1013    /// Updated each time a `RecvMsgMulti` completion delivers an
1014    /// `SCM_TIMESTAMPING` cmsg. Only available when the `timestamps` feature
1015    /// is enabled and `Config::timestamps(true)` is set.
1016    #[cfg(feature = "timestamps")]
1017    pub fn recv_timestamp(&self) -> u64 {
1018        with_state(|driver, _| {
1019            driver
1020                .connections
1021                .get(self.conn_index)
1022                .map(|cs| cs.recv_timestamp_ns)
1023                .unwrap_or(0)
1024        })
1025    }
1026
1027    // ── Close / metadata ─────────────────────────────────────────────
1028
1029    /// Close this connection.
1030    pub fn close(&self) {
1031        let ptr = CURRENT_DRIVER.with(|c| c.get());
1032        if ptr.is_null() {
1033            return;
1034        }
1035        let state = unsafe { &mut *ptr };
1036        let driver = unsafe { &mut *state.driver };
1037        driver.close_connection(self.conn_index);
1038    }
1039
1040    /// Access peer address.
1041    pub fn peer_addr(&self) -> Option<crate::connection::PeerAddr> {
1042        with_state(|driver, _| {
1043            let conn = driver.connections.get(self.conn_index)?;
1044            if conn.generation != self.generation {
1045                return None;
1046            }
1047            conn.peer_addr.clone()
1048        })
1049    }
1050
1051    /// Check if this connection is outbound (initiated via connect).
1052    pub fn is_outbound(&self) -> bool {
1053        with_state(|driver, _| {
1054            driver
1055                .connections
1056                .get(self.conn_index)
1057                .map(|cs| cs.generation == self.generation && cs.outbound)
1058                .unwrap_or(false)
1059        })
1060    }
1061}
1062
1063// ── AsyncSendBuilder ─────────────────────────────────────────────────
1064
1065#[cfg(has_io_uring)]
1066/// Builder for scatter-gather sends in the async API.
1067///
1068/// Wraps `DriverCtx::send_parts()` — call `.copy()` and `.guard()` to add
1069/// parts, then `.submit()`. This is a synchronous builder; the send is
1070/// fire-and-forget (no future).
1071pub struct AsyncSendBuilder {
1072    token: ConnToken,
1073}
1074
1075#[cfg(has_io_uring)]
1076impl AsyncSendBuilder {
1077    /// Build and submit the send by calling the provided closure with a
1078    /// `SendBuilder` from the `DriverCtx`.
1079    ///
1080    /// The closure receives the `SendBuilder` and should chain `.copy()` / `.guard()`
1081    /// calls then call `.submit()`.
1082    ///
1083    /// # Example
1084    /// ```no_run
1085    /// # fn example(conn: ringline::ConnCtx) -> std::io::Result<()> {
1086    /// conn.send_parts().build(|b| {
1087    ///     b.copy(b"header").submit()
1088    /// })?;
1089    /// # Ok(())
1090    /// # }
1091    /// ```
1092    pub fn build<F>(self, f: F) -> io::Result<()>
1093    where
1094        F: FnOnce(crate::handler::SendBuilder<'_, '_>) -> io::Result<()>,
1095    {
1096        with_state(|driver, _| {
1097            let mut ctx = driver.make_ctx();
1098            let builder = ctx.send_parts(self.token);
1099            f(builder)
1100        })
1101    }
1102
1103    /// Build and submit a scatter-gather send, then await completion.
1104    ///
1105    /// Like [`build()`](Self::build) but returns a [`SendFuture`] that resolves
1106    /// with the total bytes sent (or error). Use this when you need backpressure
1107    /// or send completion notification.
1108    pub fn build_await<F>(self, f: F) -> io::Result<SendFuture>
1109    where
1110        F: FnOnce(crate::handler::SendBuilder<'_, '_>) -> io::Result<()>,
1111    {
1112        with_state(|driver, executor| {
1113            let mut ctx = driver.make_ctx();
1114            let builder = ctx.send_parts(self.token);
1115            f(builder)?;
1116            let conn_index = self.token.index;
1117            executor.send_waiters[conn_index as usize] = true;
1118            Ok(SendFuture { conn_index })
1119        })
1120    }
1121
1122    /// Submit a scatter-gather send from pre-classified `SendPart`s.
1123    ///
1124    /// This avoids the lifetime constraints of the closure-based [`build()`](Self::build),
1125    /// allowing callers to mix copy and guard parts in a single SQE from borrowed data.
1126    /// Parts are consumed in order up to `MAX_IOVECS` total or `MAX_GUARDS` guards.
1127    ///
1128    /// Returns the number of parts consumed on success.
1129    pub fn submit_batch(self, parts: Vec<crate::handler::SendPart<'_>>) -> io::Result<usize> {
1130        use crate::handler::SendPart;
1131        with_state(|driver, _| {
1132            let mut ctx = driver.make_ctx();
1133            let mut builder = ctx.send_parts(self.token);
1134            let mut consumed = 0usize;
1135            for part in parts {
1136                match part {
1137                    SendPart::Copy(data) => {
1138                        builder = builder.copy(data);
1139                    }
1140                    SendPart::Guard(guard) => {
1141                        builder = builder.guard(guard);
1142                    }
1143                }
1144                consumed += 1;
1145            }
1146            if consumed == 0 {
1147                return Ok(0);
1148            }
1149            builder.submit()?;
1150            Ok(consumed)
1151        })
1152    }
1153
1154    /// Like [`submit_batch`](Self::submit_batch) but returns a [`SendFuture`]
1155    /// for backpressure / yield.
1156    pub fn submit_batch_await(
1157        self,
1158        parts: Vec<crate::handler::SendPart<'_>>,
1159    ) -> io::Result<(usize, SendFuture)> {
1160        use crate::handler::SendPart;
1161        with_state(|driver, executor| {
1162            let mut ctx = driver.make_ctx();
1163            let mut builder = ctx.send_parts(self.token);
1164            let mut consumed = 0usize;
1165            for part in parts {
1166                match part {
1167                    SendPart::Copy(data) => {
1168                        builder = builder.copy(data);
1169                    }
1170                    SendPart::Guard(guard) => {
1171                        builder = builder.guard(guard);
1172                    }
1173                }
1174                consumed += 1;
1175            }
1176            if consumed == 0 {
1177                return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty batch"));
1178            }
1179            builder.submit()?;
1180            let conn_index = self.token.index;
1181            executor.send_waiters[conn_index as usize] = true;
1182            Ok((consumed, SendFuture { conn_index }))
1183        })
1184    }
1185}
1186
1187// ── mio AsyncSendBuilder (copy-only fallback) ───────────────────────
1188
1189#[cfg(not(has_io_uring))]
1190/// Builder for scatter-gather sends in the async API (mio fallback).
1191///
1192/// On the mio backend, all parts are copied into a single buffer and
1193/// sent as one operation. Zero-copy guards are consumed by copying their
1194/// data.
1195pub struct AsyncSendBuilder {
1196    token: ConnToken,
1197}
1198
1199#[cfg(not(has_io_uring))]
1200impl ConnCtx {
1201    /// Begin building a scatter-gather send.
1202    ///
1203    /// On the mio backend, this degrades to copy-only sends.
1204    pub fn send_parts(&self) -> AsyncSendBuilder {
1205        AsyncSendBuilder {
1206            token: self.token(),
1207        }
1208    }
1209}
1210
1211#[cfg(not(has_io_uring))]
1212impl AsyncSendBuilder {
1213    /// Build and submit the send by concatenating all copy parts.
1214    pub fn build<F>(self, f: F) -> io::Result<()>
1215    where
1216        F: FnOnce(MioSendBuilder<'_>) -> io::Result<()>,
1217    {
1218        with_state(|driver, _| {
1219            let mut buf = Vec::new();
1220            let builder = MioSendBuilder { buf: &mut buf };
1221            f(builder)?;
1222            if !buf.is_empty() {
1223                let mut ctx = driver.make_ctx();
1224                ctx.send(self.token, &buf)?;
1225            }
1226            Ok(())
1227        })
1228    }
1229
1230    /// Submit a scatter-gather send from pre-classified `SendPart`s.
1231    ///
1232    /// On the mio backend, guard data is copied (no kernel zero-copy).
1233    pub fn submit_batch(self, parts: Vec<crate::handler::SendPart<'_>>) -> io::Result<usize> {
1234        use crate::handler::SendPart;
1235        with_state(|driver, _| {
1236            let mut buf = Vec::new();
1237            let mut consumed = 0usize;
1238            for part in &parts {
1239                match part {
1240                    SendPart::Copy(data) => buf.extend_from_slice(data),
1241                    SendPart::Guard(guard) => {
1242                        let (ptr, len) = guard.as_ptr_len();
1243                        let data = unsafe { std::slice::from_raw_parts(ptr, len as usize) };
1244                        buf.extend_from_slice(data);
1245                    }
1246                }
1247                consumed += 1;
1248            }
1249            if !buf.is_empty() {
1250                let mut ctx = driver.make_ctx();
1251                ctx.send(self.token, &buf)?;
1252            }
1253            Ok(consumed)
1254        })
1255    }
1256
1257    /// Build and submit, then await completion.
1258    pub fn build_await<F>(self, f: F) -> io::Result<SendFuture>
1259    where
1260        F: FnOnce(MioSendBuilder<'_>) -> io::Result<()>,
1261    {
1262        with_state(|driver, executor| {
1263            let mut buf = Vec::new();
1264            let builder = MioSendBuilder { buf: &mut buf };
1265            f(builder)?;
1266            if !buf.is_empty() {
1267                let mut ctx = driver.make_ctx();
1268                ctx.send(self.token, &buf)?;
1269            }
1270            let conn_index = self.token.index;
1271            executor.send_waiters[conn_index as usize] = true;
1272            Ok(SendFuture { conn_index })
1273        })
1274    }
1275}
1276
1277/// Mio send builder — accumulates parts into a single buffer.
1278#[cfg(not(has_io_uring))]
1279pub struct MioSendBuilder<'a> {
1280    buf: &'a mut Vec<u8>,
1281}
1282
1283#[cfg(not(has_io_uring))]
1284impl<'a> MioSendBuilder<'a> {
1285    /// Add a copy part.
1286    pub fn copy(self, data: &[u8]) -> Self {
1287        self.buf.extend_from_slice(data);
1288        self
1289    }
1290
1291    /// Add a guard part (copies the data on mio backend).
1292    pub fn guard(self, guard: crate::guard::GuardBox) -> Self {
1293        let (ptr, len) = guard.as_ptr_len();
1294        let data = unsafe { std::slice::from_raw_parts(ptr, len as usize) };
1295        self.buf.extend_from_slice(data);
1296        self
1297    }
1298
1299    /// Submit the accumulated send.
1300    pub fn submit(self) -> io::Result<()> {
1301        Ok(())
1302    }
1303}
1304
1305// ── WithDataFuture ───────────────────────────────────────────────────
1306
1307/// Future returned by [`ConnCtx::with_data`].
1308pub struct WithDataFuture<F> {
1309    conn_index: u32,
1310    f: Option<F>,
1311}
1312
1313impl<F: FnMut(&[u8]) -> ParseResult + Unpin> Future for WithDataFuture<F> {
1314    type Output = usize;
1315
1316    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<usize> {
1317        with_state(|driver, executor| {
1318            // Zero-copy fast path: check pending recv buffer before accumulator.
1319            // Only available on io_uring where kernel-provided buffers are used.
1320            #[cfg(has_io_uring)]
1321            if driver.pending_recv_bufs[self.conn_index as usize].is_some() {
1322                let acc_empty = driver.accumulators.data(self.conn_index).is_empty();
1323                if acc_empty {
1324                    // Borrow the kernel buffer in-place — no copy.
1325                    let pending = driver.pending_recv_bufs[self.conn_index as usize].unwrap();
1326                    let data =
1327                        unsafe { std::slice::from_raw_parts(pending.ptr, pending.len as usize) };
1328                    let f = self.f.as_mut().expect("WithDataFuture polled after Ready");
1329                    let result = f(data);
1330                    match result {
1331                        ParseResult::Consumed(consumed) if consumed > 0 => {
1332                            // The closure may have called forward_recv_buf(), which
1333                            // takes the pending slot. Only replenish if still present.
1334                            if let Some(pending) =
1335                                driver.pending_recv_bufs[self.conn_index as usize].take()
1336                            {
1337                                if consumed < pending.len as usize {
1338                                    // Partial consume: copy remainder to accumulator.
1339                                    let remainder = unsafe {
1340                                        std::slice::from_raw_parts(
1341                                            pending.ptr.add(consumed),
1342                                            pending.len as usize - consumed,
1343                                        )
1344                                    };
1345                                    driver.accumulators.append(self.conn_index, remainder);
1346                                }
1347                                driver.pending_replenish.push(pending.bid);
1348                            }
1349                            self.f.take();
1350                            return Poll::Ready(consumed);
1351                        }
1352                        _ => {
1353                            // NeedMore / Consumed(0): flush pending to accumulator
1354                            // and fall through to the existing accumulator path.
1355                            if let Some(pending) =
1356                                driver.pending_recv_bufs[self.conn_index as usize].take()
1357                            {
1358                                let pending_data = unsafe {
1359                                    std::slice::from_raw_parts(pending.ptr, pending.len as usize)
1360                                };
1361                                driver.accumulators.append(self.conn_index, pending_data);
1362                                driver.pending_replenish.push(pending.bid);
1363                            }
1364                            // Fall through to accumulator path below.
1365                        }
1366                    }
1367                } else {
1368                    // Accumulator has data AND there's a pending buffer.
1369                    // Flush pending to accumulator (prepend — it arrived first).
1370                    let pending = driver.pending_recv_bufs[self.conn_index as usize]
1371                        .take()
1372                        .unwrap();
1373                    let pending_data =
1374                        unsafe { std::slice::from_raw_parts(pending.ptr, pending.len as usize) };
1375                    driver.accumulators.prepend(self.conn_index, pending_data);
1376                    driver.pending_replenish.push(pending.bid);
1377                    // Fall through to accumulator path below.
1378                }
1379            }
1380
1381            let data = driver.accumulators.data(self.conn_index);
1382            if data.is_empty() {
1383                // Check if the connection has been closed — return 0 (EOF)
1384                // so the caller can detect disconnection.
1385                let is_closed = driver
1386                    .connections
1387                    .get(self.conn_index)
1388                    .map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
1389                    .unwrap_or(true); // connection already released
1390                if is_closed {
1391                    let f = self.f.as_mut().expect("WithDataFuture polled after Ready");
1392                    let result = f(&[]);
1393                    self.f.take();
1394                    return Poll::Ready(match result {
1395                        ParseResult::Consumed(n) => n,
1396                        ParseResult::NeedMore => 0,
1397                    });
1398                }
1399
1400                // No data available — register as recv waiter and park.
1401                executor.recv_waiters[self.conn_index as usize] = true;
1402                return Poll::Pending;
1403            }
1404
1405            // Data available — call closure immediately (zero-overhead hot path).
1406            let f = self.f.as_mut().expect("WithDataFuture polled after Ready");
1407            let result = f(data);
1408            match result {
1409                ParseResult::Consumed(consumed) if consumed > 0 => {
1410                    driver.accumulators.consume(self.conn_index, consumed);
1411                    self.f.take();
1412                    return Poll::Ready(consumed);
1413                }
1414                _ => {}
1415            }
1416
1417            // NeedMore or Consumed(0) on non-empty data: incomplete parse.
1418            // Check if the connection is closed (EOF with leftover partial data).
1419            let is_closed = driver
1420                .connections
1421                .get(self.conn_index)
1422                .map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
1423                .unwrap_or(true);
1424            if is_closed {
1425                self.f.take();
1426                return Poll::Ready(0);
1427            }
1428
1429            // Connection still open — wait for more data before retrying.
1430            executor.recv_waiters[self.conn_index as usize] = true;
1431            Poll::Pending
1432        })
1433    }
1434}
1435
1436// ── WithBytesFuture ──────────────────────────────────────────────────
1437
1438/// Future returned by [`ConnCtx::with_bytes`].
1439pub struct WithBytesFuture<F> {
1440    conn_index: u32,
1441    f: Option<F>,
1442}
1443
1444impl<F: FnMut(Bytes) -> ParseResult + Unpin> Future for WithBytesFuture<F> {
1445    type Output = usize;
1446
1447    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<usize> {
1448        with_state(|driver, executor| {
1449            // Flush any pending zero-copy recv buffer to accumulator so
1450            // take_frozen() will include it (io_uring only).
1451            #[cfg(has_io_uring)]
1452            if let Some(pending) = driver.pending_recv_bufs[self.conn_index as usize].take() {
1453                let pending_data =
1454                    unsafe { std::slice::from_raw_parts(pending.ptr, pending.len as usize) };
1455                driver.accumulators.append(self.conn_index, pending_data);
1456                driver.pending_replenish.push(pending.bid);
1457            }
1458
1459            let data = driver.accumulators.data(self.conn_index);
1460            if data.is_empty() {
1461                // Check if the connection has been closed — return 0 (EOF).
1462                let is_closed = driver
1463                    .connections
1464                    .get(self.conn_index)
1465                    .map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
1466                    .unwrap_or(true);
1467                if is_closed {
1468                    let f = self.f.as_mut().expect("WithBytesFuture polled after Ready");
1469                    let result = f(Bytes::new());
1470                    self.f.take();
1471                    return Poll::Ready(match result {
1472                        ParseResult::Consumed(n) => n,
1473                        ParseResult::NeedMore => 0,
1474                    });
1475                }
1476
1477                executor.recv_waiters[self.conn_index as usize] = true;
1478                return Poll::Pending;
1479            }
1480
1481            // Detach accumulator as frozen Bytes (O(1)).
1482            let frozen = driver.accumulators.take_frozen(self.conn_index);
1483            let len = frozen.len();
1484
1485            let f = self.f.as_mut().expect("WithBytesFuture polled after Ready");
1486            let result = f(frozen.clone());
1487
1488            match result {
1489                ParseResult::Consumed(consumed) if consumed > 0 => {
1490                    // Put back unconsumed remainder (if any).
1491                    if consumed < len {
1492                        driver
1493                            .accumulators
1494                            .prepend(self.conn_index, &frozen[consumed..]);
1495                    }
1496                    self.f.take();
1497                    return Poll::Ready(consumed);
1498                }
1499                _ => {}
1500            }
1501
1502            // NeedMore or Consumed(0) on non-empty data: incomplete parse.
1503            // Put everything back.
1504            driver.accumulators.prepend(self.conn_index, &frozen[..]);
1505
1506            let is_closed = driver
1507                .connections
1508                .get(self.conn_index)
1509                .map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
1510                .unwrap_or(true);
1511            if is_closed {
1512                self.f.take();
1513                return Poll::Ready(0);
1514            }
1515
1516            executor.recv_waiters[self.conn_index as usize] = true;
1517            Poll::Pending
1518        })
1519    }
1520}
1521
1522// ── RecvReadyFuture ──────────────────────────────────────────────────
1523
1524/// Future returned by [`ConnCtx::recv_ready`]. Resolves when:
1525/// 1. The recv sink has received data (`pos > 0`), OR
1526/// 2. The accumulator has data, OR
1527/// 3. The connection is closed.
1528pub struct RecvReadyFuture {
1529    conn_index: u32,
1530}
1531
1532impl Future for RecvReadyFuture {
1533    type Output = ();
1534
1535    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
1536        with_state(|driver, executor| {
1537            // Check recv sink.
1538            if let Some(sink) = &executor.recv_sinks[self.conn_index as usize]
1539                && sink.pos > 0
1540            {
1541                return Poll::Ready(());
1542            }
1543
1544            // Check accumulator.
1545            if !driver.accumulators.data(self.conn_index).is_empty() {
1546                return Poll::Ready(());
1547            }
1548
1549            // Check if the connection is closed.
1550            let is_closed = driver
1551                .connections
1552                .get(self.conn_index)
1553                .map(|c| matches!(c.recv_mode, crate::connection::RecvMode::Closed))
1554                .unwrap_or(true);
1555            if is_closed {
1556                return Poll::Ready(());
1557            }
1558
1559            // Not ready — register as recv waiter and park.
1560            executor.recv_waiters[self.conn_index as usize] = true;
1561            Poll::Pending
1562        })
1563    }
1564}
1565
1566// ── SendFuture ───────────────────────────────────────────────────────
1567
1568/// Future that awaits send completion. The SQE was already submitted eagerly
1569/// by [`ConnCtx::send`] — this future only waits for the CQE result.
1570/// No data stored in the future. No allocation.
1571pub struct SendFuture {
1572    conn_index: u32,
1573}
1574
1575impl Future for SendFuture {
1576    type Output = io::Result<u32>;
1577
1578    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u32>> {
1579        with_state(|_driver, executor| {
1580            match executor.io_results[self.conn_index as usize].take() {
1581                Some(IoResult::Send(result)) => Poll::Ready(result),
1582                _ => {
1583                    // Not ready yet — re-register waiter.
1584                    executor.send_waiters[self.conn_index as usize] = true;
1585                    Poll::Pending
1586                }
1587            }
1588        })
1589    }
1590}
1591
1592impl Drop for SendFuture {
1593    fn drop(&mut self) {
1594        let ptr = CURRENT_DRIVER.with(|c| c.get());
1595        if ptr.is_null() {
1596            return;
1597        }
1598        let state = unsafe { &mut *ptr };
1599        let executor = unsafe { &mut *state.executor };
1600        executor.send_waiters[self.conn_index as usize] = false;
1601    }
1602}
1603
1604// ── ConnectFuture ────────────────────────────────────────────────────
1605
1606/// Future that awaits an outbound TCP connection. The connect SQE was submitted
1607/// eagerly by [`ConnCtx::connect`] — this future waits for the CQE result.
1608pub struct ConnectFuture {
1609    conn_index: u32,
1610    generation: u32,
1611}
1612
1613impl Future for ConnectFuture {
1614    type Output = io::Result<ConnCtx>;
1615
1616    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<ConnCtx>> {
1617        with_state(|_driver, executor| {
1618            match executor.io_results[self.conn_index as usize].take() {
1619                Some(IoResult::Connect(result)) => match result {
1620                    Ok(()) => Poll::Ready(Ok(ConnCtx::new(self.conn_index, self.generation))),
1621                    Err(e) => Poll::Ready(Err(e)),
1622                },
1623                _ => {
1624                    // Not ready yet — re-register waiter.
1625                    executor.connect_waiters[self.conn_index as usize] = true;
1626                    Poll::Pending
1627                }
1628            }
1629        })
1630    }
1631}
1632
1633impl Drop for ConnectFuture {
1634    fn drop(&mut self) {
1635        let ptr = CURRENT_DRIVER.with(|c| c.get());
1636        if ptr.is_null() {
1637            return;
1638        }
1639        let state = unsafe { &mut *ptr };
1640        let executor = unsafe { &mut *state.executor };
1641        executor.connect_waiters[self.conn_index as usize] = false;
1642    }
1643}
1644
1645// ── Sleep ────────────────────────────────────────────────────────────
1646
1647/// Create a future that completes after the given duration.
1648///
1649/// Uses an io_uring timeout SQE internally — no busy-waiting, no timer
1650/// thread. The timer fires on the same worker thread as the calling task.
1651///
1652/// # Panics
1653///
1654/// Panics if the timer slot pool is exhausted, or if called outside the
1655/// ringline async executor.
1656pub fn sleep(duration: Duration) -> SleepFuture {
1657    SleepFuture {
1658        duration,
1659        timer_slot: None,
1660        generation: 0,
1661        absolute: None,
1662    }
1663}
1664
1665/// Future returned by [`sleep()`] or [`sleep_until()`]. Completes after
1666/// the configured duration or at the given deadline.
1667pub struct SleepFuture {
1668    duration: Duration,
1669    /// None until first poll, then Some(slot_index).
1670    timer_slot: Option<u32>,
1671    /// Generation when the slot was allocated.
1672    generation: u16,
1673    /// If Some, this is an absolute timer (sleep_until).
1674    absolute: Option<Deadline>,
1675}
1676
1677impl Future for SleepFuture {
1678    type Output = ();
1679
1680    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
1681        with_state(|driver, executor| {
1682            let _ = driver; // used only on io_uring path
1683            if let Some(slot) = self.timer_slot {
1684                // Already submitted — check if fired.
1685                if executor.timer_pool.is_fired(slot) {
1686                    executor.timer_pool.release(slot);
1687                    self.timer_slot = None;
1688                    return Poll::Ready(());
1689                }
1690                return Poll::Pending;
1691            }
1692
1693            // First poll — allocate slot, fill timespec, submit SQE.
1694            let waker_id = CURRENT_TASK_ID.with(|c| c.get());
1695            let (slot, generation) = match executor.timer_pool.allocate(waker_id) {
1696                Some(pair) => pair,
1697                None => {
1698                    // Pool exhausted — complete immediately rather than panic.
1699                    // Callers needing explicit error handling should use try_sleep().
1700                    return Poll::Ready(());
1701                }
1702            };
1703
1704            #[cfg(has_io_uring)]
1705            {
1706                let payload = TimerSlotPool::encode_payload(slot, generation);
1707                let ud = UserData::encode(OpTag::Timer, 0, payload);
1708
1709                let submit_result = if let Some(deadline) = self.absolute {
1710                    let ts_ptr =
1711                        executor
1712                            .timer_pool
1713                            .set_absolute(slot, deadline.secs, deadline.nsecs);
1714                    driver.ring.submit_timeout_abs(ts_ptr, ud)
1715                } else {
1716                    let ts_ptr = executor.timer_pool.set_relative(slot, self.duration);
1717                    driver.ring.submit_timeout(ts_ptr, ud)
1718                };
1719
1720                if let Err(_e) = submit_result {
1721                    executor.timer_pool.release(slot);
1722                    // On SQE submission failure, complete immediately rather than hang.
1723                    return Poll::Ready(());
1724                }
1725            }
1726
1727            #[cfg(not(has_io_uring))]
1728            {
1729                // Mio backend: store deadline; the event loop polls timer expiry.
1730                if let Some(deadline) = self.absolute {
1731                    executor
1732                        .timer_pool
1733                        .set_absolute(slot, deadline.secs, deadline.nsecs);
1734                } else {
1735                    executor.timer_pool.set_relative(slot, self.duration);
1736                }
1737            }
1738
1739            self.timer_slot = Some(slot);
1740            self.generation = generation;
1741            Poll::Pending
1742        })
1743    }
1744}
1745
1746impl Drop for SleepFuture {
1747    fn drop(&mut self) {
1748        if let Some(slot) = self.timer_slot {
1749            // Timer was submitted but not yet fired — try to cancel it.
1750            let ptr = CURRENT_DRIVER.with(|c| c.get());
1751            if ptr.is_null() {
1752                return;
1753            }
1754            let state = unsafe { &mut *ptr };
1755            #[cfg(has_io_uring)]
1756            let driver = unsafe { &mut *state.driver };
1757            let executor = unsafe { &mut *state.executor };
1758
1759            if !executor.timer_pool.is_fired(slot) {
1760                #[cfg(has_io_uring)]
1761                {
1762                    let payload = TimerSlotPool::encode_payload(slot, self.generation);
1763                    let target_ud = UserData::encode(OpTag::Timer, 0, payload);
1764                    // Best effort cancel; timer fires harmlessly if already expired.
1765                    let _ = driver.ring.submit_async_cancel(target_ud.raw(), 0);
1766                }
1767            }
1768            // Slot released regardless — stale timer CQE detected via generation.
1769            executor.timer_pool.release(slot);
1770        }
1771    }
1772}
1773
1774/// Create a sleep future, returning an error if the timer pool is exhausted.
1775///
1776/// Unlike [`sleep()`] which panics on pool exhaustion, this returns
1777/// `Err(TimerExhausted)` so callers can handle capacity limits gracefully.
1778///
1779/// The timer slot is allocated eagerly (at call time, not on first poll).
1780///
1781/// # Panics
1782///
1783/// Panics if called outside the ringline async executor.
1784pub fn try_sleep(duration: Duration) -> Result<SleepFuture, TimerExhausted> {
1785    with_state(|driver, executor| {
1786        let _ = driver; // used only on io_uring path
1787        let waker_id = CURRENT_TASK_ID.with(|c| c.get());
1788        let (slot, generation) = executor.timer_pool.allocate(waker_id).ok_or_else(|| {
1789            crate::metrics::POOL.increment(crate::metrics::pool::TIMER_EXHAUSTED);
1790            TimerExhausted
1791        })?;
1792
1793        #[cfg(has_io_uring)]
1794        {
1795            let payload = TimerSlotPool::encode_payload(slot, generation);
1796            let ud = UserData::encode(OpTag::Timer, 0, payload);
1797            let ts_ptr = executor.timer_pool.set_relative(slot, duration);
1798
1799            if let Err(_e) = driver.ring.submit_timeout(ts_ptr, ud) {
1800                executor.timer_pool.release(slot);
1801                // SQE submission failure — complete immediately (same as sleep()).
1802                return Ok(SleepFuture {
1803                    duration,
1804                    timer_slot: None,
1805                    generation: 0,
1806                    absolute: None,
1807                });
1808            }
1809        }
1810
1811        #[cfg(not(has_io_uring))]
1812        {
1813            executor.timer_pool.set_relative(slot, duration);
1814        }
1815
1816        Ok(SleepFuture {
1817            duration,
1818            timer_slot: Some(slot),
1819            generation,
1820            absolute: None,
1821        })
1822    })
1823}
1824
1825// ── Deadline (absolute timer support) ─────────────────────────────────
1826
1827/// A monotonic clock deadline for use with absolute timers.
1828///
1829/// Created via [`Deadline::after()`] or [`Deadline::now()`].
1830/// Uses `CLOCK_MONOTONIC` to match io_uring's default clock.
1831#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
1832pub struct Deadline {
1833    pub(crate) secs: u64,
1834    pub(crate) nsecs: u32,
1835}
1836
1837impl Deadline {
1838    /// Capture the current monotonic time.
1839    pub fn now() -> Self {
1840        let mut ts = libc::timespec {
1841            tv_sec: 0,
1842            tv_nsec: 0,
1843        };
1844        // Safety: clock_gettime with CLOCK_MONOTONIC is safe.
1845        unsafe {
1846            libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts);
1847        }
1848        Deadline {
1849            secs: ts.tv_sec as u64,
1850            nsecs: ts.tv_nsec as u32,
1851        }
1852    }
1853
1854    /// Create a deadline `duration` from now.
1855    pub fn after(duration: Duration) -> Self {
1856        let now = Self::now();
1857        let mut secs = now.secs + duration.as_secs();
1858        let mut nsecs = now.nsecs + duration.subsec_nanos();
1859        if nsecs >= 1_000_000_000 {
1860            nsecs -= 1_000_000_000;
1861            secs += 1;
1862        }
1863        Deadline { secs, nsecs }
1864    }
1865
1866    /// Duration remaining until this deadline (saturates at zero).
1867    pub fn remaining(&self) -> Duration {
1868        let now = Self::now();
1869        if now.secs > self.secs || (now.secs == self.secs && now.nsecs >= self.nsecs) {
1870            return Duration::ZERO;
1871        }
1872        let mut secs = self.secs - now.secs;
1873        let nsecs = if self.nsecs >= now.nsecs {
1874            self.nsecs - now.nsecs
1875        } else {
1876            secs -= 1;
1877            1_000_000_000 + self.nsecs - now.nsecs
1878        };
1879        Duration::new(secs, nsecs)
1880    }
1881}
1882
1883/// Create a future that completes at the given absolute deadline.
1884///
1885/// Uses io_uring's `TIMEOUT_ABS` flag with `CLOCK_MONOTONIC` for
1886/// precise deadline-based timing without accumulated drift.
1887///
1888/// # Panics
1889///
1890/// Panics if the timer slot pool is exhausted, or if called outside the
1891/// ringline async executor.
1892pub fn sleep_until(deadline: Deadline) -> SleepFuture {
1893    SleepFuture {
1894        duration: Duration::ZERO, // unused for absolute timers
1895        timer_slot: None,
1896        generation: 0,
1897        absolute: Some(deadline),
1898    }
1899}
1900
1901/// Create an absolute-deadline sleep, returning an error if the timer pool is exhausted.
1902///
1903/// Unlike [`sleep_until()`] which panics on pool exhaustion, this returns
1904/// `Err(TimerExhausted)` so callers can handle capacity limits gracefully.
1905///
1906/// The timer slot is allocated eagerly (at call time, not on first poll).
1907///
1908/// # Panics
1909///
1910/// Panics if called outside the ringline async executor.
1911pub fn try_sleep_until(deadline: Deadline) -> Result<SleepFuture, TimerExhausted> {
1912    with_state(|driver, executor| {
1913        let _ = driver; // used only on io_uring path
1914        let waker_id = CURRENT_TASK_ID.with(|c| c.get());
1915        let (slot, generation) = executor.timer_pool.allocate(waker_id).ok_or_else(|| {
1916            crate::metrics::POOL.increment(crate::metrics::pool::TIMER_EXHAUSTED);
1917            TimerExhausted
1918        })?;
1919
1920        #[cfg(has_io_uring)]
1921        {
1922            let payload = TimerSlotPool::encode_payload(slot, generation);
1923            let ud = UserData::encode(OpTag::Timer, 0, payload);
1924            let ts_ptr = executor
1925                .timer_pool
1926                .set_absolute(slot, deadline.secs, deadline.nsecs);
1927
1928            if let Err(_e) = driver.ring.submit_timeout_abs(ts_ptr, ud) {
1929                executor.timer_pool.release(slot);
1930                return Ok(SleepFuture {
1931                    duration: Duration::ZERO,
1932                    timer_slot: None,
1933                    generation: 0,
1934                    absolute: Some(deadline),
1935                });
1936            }
1937        }
1938
1939        #[cfg(not(has_io_uring))]
1940        {
1941            executor
1942                .timer_pool
1943                .set_absolute(slot, deadline.secs, deadline.nsecs);
1944        }
1945
1946        Ok(SleepFuture {
1947            duration: Duration::ZERO,
1948            timer_slot: Some(slot),
1949            generation,
1950            absolute: Some(deadline),
1951        })
1952    })
1953}
1954
1955// ── Timeout ──────────────────────────────────────────────────────────
1956
1957/// Error returned when a [`timeout()`] deadline expires.
1958#[derive(Debug, Clone, PartialEq, Eq)]
1959pub struct Elapsed;
1960
1961impl fmt::Display for Elapsed {
1962    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1963        f.write_str("deadline has elapsed")
1964    }
1965}
1966
1967impl std::error::Error for Elapsed {}
1968
1969/// Wrap a future with a deadline. If the future does not complete within
1970/// `duration`, returns `Err(Elapsed)`.
1971///
1972/// # Example
1973///
1974/// ```no_run
1975/// # async fn example() {
1976/// use std::time::Duration;
1977/// match ringline::timeout(Duration::from_secs(1), async { 42 }).await {
1978///     Ok(value) => { /* completed in time */ }
1979///     Err(_elapsed) => { /* timed out */ }
1980/// }
1981/// # }
1982/// ```
1983pub fn timeout<F: Future>(duration: Duration, future: F) -> TimeoutFuture<F> {
1984    TimeoutFuture {
1985        future,
1986        sleep: sleep(duration),
1987    }
1988}
1989
1990/// Wrap a future with a deadline, returning an error if the timer pool is full.
1991///
1992/// Unlike [`timeout()`] which panics on pool exhaustion, this returns
1993/// `Err(TimerExhausted)` so callers can handle capacity limits gracefully.
1994///
1995/// # Panics
1996///
1997/// Panics if called outside the ringline async executor.
1998pub fn try_timeout<F: Future>(
1999    duration: Duration,
2000    future: F,
2001) -> Result<TimeoutFuture<F>, TimerExhausted> {
2002    let sleep = try_sleep(duration)?;
2003    Ok(TimeoutFuture { future, sleep })
2004}
2005
2006/// Wrap a future with an absolute deadline. If the future does not complete
2007/// before `deadline`, returns `Err(Elapsed)`.
2008///
2009/// Uses io_uring's `TIMEOUT_ABS` flag with `CLOCK_MONOTONIC`.
2010///
2011/// # Panics
2012///
2013/// Panics if the timer slot pool is exhausted.
2014pub fn timeout_at<F: Future>(deadline: Deadline, future: F) -> TimeoutFuture<F> {
2015    TimeoutFuture {
2016        future,
2017        sleep: sleep_until(deadline),
2018    }
2019}
2020
2021/// Wrap a future with an absolute deadline, returning an error if the timer pool is full.
2022///
2023/// Unlike [`timeout_at()`] which panics on pool exhaustion, this returns
2024/// `Err(TimerExhausted)` so callers can handle capacity limits gracefully.
2025///
2026/// # Panics
2027///
2028/// Panics if called outside the ringline async executor.
2029pub fn try_timeout_at<F: Future>(
2030    deadline: Deadline,
2031    future: F,
2032) -> Result<TimeoutFuture<F>, TimerExhausted> {
2033    let sleep = try_sleep_until(deadline)?;
2034    Ok(TimeoutFuture { future, sleep })
2035}
2036
2037pin_project_lite::pin_project! {
2038    /// Future returned by [`timeout()`] or [`timeout_at()`].
2039    pub struct TimeoutFuture<F> {
2040        #[pin]
2041        future: F,
2042        #[pin]
2043        sleep: SleepFuture,
2044    }
2045}
2046
2047impl<F: Future> Future for TimeoutFuture<F> {
2048    type Output = Result<F::Output, Elapsed>;
2049
2050    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2051        let this = self.project();
2052
2053        // Poll the inner future first.
2054        if let Poll::Ready(output) = this.future.poll(cx) {
2055            return Poll::Ready(Ok(output));
2056        }
2057
2058        // Poll the sleep timer.
2059        if let Poll::Ready(()) = this.sleep.poll(cx) {
2060            return Poll::Ready(Err(Elapsed));
2061        }
2062
2063        Poll::Pending
2064    }
2065}
2066
2067// ── Disk I/O async API ──────────────────────────────────────────────
2068
2069/// Future that awaits a disk I/O completion (NVMe or Direct I/O).
2070///
2071/// The io_uring SQE was submitted before this future was created.
2072/// On completion, the CQE handler stores the result and wakes the task.
2073pub struct DiskIoFuture {
2074    pub(crate) seq: u32,
2075}
2076
2077impl Future for DiskIoFuture {
2078    type Output = io::Result<i32>;
2079
2080    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<i32>> {
2081        with_state(|_driver, executor| {
2082            match executor.disk_io_results.remove(&self.seq) {
2083                Some(result) if result < 0 => {
2084                    Poll::Ready(Err(io::Error::from_raw_os_error(-result)))
2085                }
2086                Some(result) => Poll::Ready(Ok(result)),
2087                None => {
2088                    // Re-register waiter (polled before CQE arrived or after spurious wake).
2089                    let task_id = CURRENT_TASK_ID.with(|c| c.get());
2090                    executor.disk_io_waiters.insert(self.seq, task_id);
2091                    Poll::Pending
2092                }
2093            }
2094        })
2095    }
2096}
2097
2098impl Drop for DiskIoFuture {
2099    fn drop(&mut self) {
2100        let ptr = CURRENT_DRIVER.with(|c| c.get());
2101        if ptr.is_null() {
2102            return;
2103        }
2104        let state = unsafe { &mut *ptr };
2105        let executor = unsafe { &mut *state.executor };
2106        executor.disk_io_waiters.remove(&self.seq);
2107    }
2108}
2109
2110/// Open a Direct I/O file from any async task.
2111///
2112/// Returns a [`DirectIoFile`](crate::direct_io::DirectIoFile) handle
2113/// for use with [`direct_io_read()`].
2114///
2115/// # Panics
2116///
2117/// Panics if called outside the ringline async executor.
2118pub fn open_direct_io_file(path: &str) -> io::Result<crate::direct_io::DirectIoFile> {
2119    with_state(|driver, _| {
2120        let mut ctx = driver.make_ctx();
2121        ctx.open_direct_io_file(path)
2122    })
2123}
2124
2125/// Open an NVMe device from any async task.
2126///
2127/// Returns an [`NvmeDevice`](crate::nvme::NvmeDevice) handle
2128/// for use with [`nvme_read()`], [`nvme_write()`], and [`nvme_flush()`].
2129///
2130/// # Panics
2131///
2132/// Panics if called outside the ringline async executor.
2133pub fn open_nvme_device(path: &str, nsid: u32) -> io::Result<crate::nvme::NvmeDevice> {
2134    with_state(|driver, _| {
2135        let mut ctx = driver.make_ctx();
2136        ctx.open_nvme_device(path, nsid)
2137    })
2138}
2139
2140/// Submit a Direct I/O read and return a future for the result.
2141///
2142/// Reads `len` bytes from `offset` into the buffer at `buf`.
2143/// The returned future completes when the io_uring CQE arrives.
2144///
2145/// # Safety
2146///
2147/// `buf` must point to aligned, writable memory of at least `len` bytes
2148/// that remains valid until the future completes.
2149///
2150/// # Panics
2151///
2152/// Panics if called outside the ringline async executor.
2153pub unsafe fn direct_io_read(
2154    file: crate::direct_io::DirectIoFile,
2155    offset: u64,
2156    buf: *mut u8,
2157    len: u32,
2158) -> io::Result<DiskIoFuture> {
2159    with_state(|driver, executor| {
2160        let mut ctx = driver.make_ctx();
2161        // Safety: the outer `direct_io_read()` is already unsafe, and the
2162        // caller guarantees the buffer invariants.
2163        #[allow(unused_unsafe)]
2164        let seq = unsafe { ctx.direct_io_read(file, offset, buf, len)? };
2165        let task_id = CURRENT_TASK_ID.with(|c| c.get());
2166        executor.disk_io_waiters.insert(seq, task_id);
2167        Ok(DiskIoFuture { seq })
2168    })
2169}
2170
2171/// Submit an NVMe read and return a future for the result.
2172///
2173/// Reads `num_blocks` logical blocks starting at `lba` into the buffer
2174/// at `buf_addr` with length `buf_len`. The returned future completes
2175/// when the io_uring CQE arrives.
2176///
2177/// # Safety
2178///
2179/// `buf_addr` must point to valid, aligned memory of at least `buf_len`
2180/// bytes that remains valid until the returned future completes.
2181///
2182/// # Panics
2183///
2184/// Panics if called outside the ringline async executor.
2185pub fn nvme_read(
2186    device: crate::nvme::NvmeDevice,
2187    lba: u64,
2188    num_blocks: u16,
2189    buf_addr: u64,
2190    buf_len: u32,
2191) -> io::Result<DiskIoFuture> {
2192    with_state(|driver, executor| {
2193        let mut ctx = driver.make_ctx();
2194        let seq = ctx.nvme_read(device, lba, num_blocks, buf_addr, buf_len)?;
2195        let task_id = CURRENT_TASK_ID.with(|c| c.get());
2196        executor.disk_io_waiters.insert(seq, task_id);
2197        Ok(DiskIoFuture { seq })
2198    })
2199}
2200
2201/// Submit a Direct I/O write and return a future for the result.
2202///
2203/// Writes `len` bytes from the buffer at `buf` to `offset` in the file.
2204/// The returned future completes when the io_uring CQE arrives.
2205///
2206/// # Safety
2207///
2208/// `buf` must point to aligned, readable memory of at least `len` bytes
2209/// that remains valid until the future completes.
2210///
2211/// # Panics
2212///
2213/// Panics if called outside the ringline async executor.
2214pub unsafe fn direct_io_write(
2215    file: crate::direct_io::DirectIoFile,
2216    offset: u64,
2217    buf: *const u8,
2218    len: u32,
2219) -> io::Result<DiskIoFuture> {
2220    with_state(|driver, executor| {
2221        let mut ctx = driver.make_ctx();
2222        // Safety: the outer `direct_io_write()` is already unsafe, and the
2223        // caller guarantees the buffer invariants.
2224        #[allow(unused_unsafe)]
2225        let seq = unsafe { ctx.direct_io_write(file, offset, buf, len)? };
2226        let task_id = CURRENT_TASK_ID.with(|c| c.get());
2227        executor.disk_io_waiters.insert(seq, task_id);
2228        Ok(DiskIoFuture { seq })
2229    })
2230}
2231
2232/// Submit an NVMe flush and return a future for the result.
2233///
2234/// Flushes volatile write caches on the device, ensuring all previously
2235/// written data is persisted to non-volatile storage. The returned future
2236/// completes when the io_uring CQE arrives.
2237///
2238/// # Panics
2239///
2240/// Panics if called outside the ringline async executor.
2241pub fn nvme_flush(device: crate::nvme::NvmeDevice) -> io::Result<DiskIoFuture> {
2242    with_state(|driver, executor| {
2243        let mut ctx = driver.make_ctx();
2244        let seq = ctx.nvme_flush(device)?;
2245        let task_id = CURRENT_TASK_ID.with(|c| c.get());
2246        executor.disk_io_waiters.insert(seq, task_id);
2247        Ok(DiskIoFuture { seq })
2248    })
2249}
2250
2251/// Submit an NVMe write and return a future for the result.
2252///
2253/// Writes `num_blocks` logical blocks starting at `lba` from the buffer
2254/// at `buf_addr` with length `buf_len`. The returned future completes
2255/// when the io_uring CQE arrives.
2256///
2257/// # Safety
2258///
2259/// `buf_addr` must point to valid, aligned memory of at least `buf_len`
2260/// bytes that remains valid until the returned future completes.
2261///
2262/// # Panics
2263///
2264/// Panics if called outside the ringline async executor.
2265pub fn nvme_write(
2266    device: crate::nvme::NvmeDevice,
2267    lba: u64,
2268    num_blocks: u16,
2269    buf_addr: u64,
2270    buf_len: u32,
2271) -> io::Result<DiskIoFuture> {
2272    with_state(|driver, executor| {
2273        let mut ctx = driver.make_ctx();
2274        let seq = ctx.nvme_write(device, lba, num_blocks, buf_addr, buf_len)?;
2275        let task_id = CURRENT_TASK_ID.with(|c| c.get());
2276        executor.disk_io_waiters.insert(seq, task_id);
2277        Ok(DiskIoFuture { seq })
2278    })
2279}
2280
2281// ── UDP async API ───────────────────────────────────────────────────
2282
2283/// Async context for a UDP socket.
2284///
2285/// Passed to [`AsyncEventHandler::on_udp_bind()`](crate::AsyncEventHandler::on_udp_bind)
2286/// for each bound UDP socket. Provides async recv and fire-and-forget send.
2287#[derive(Clone, Copy)]
2288pub struct UdpCtx {
2289    pub(crate) udp_index: u32,
2290}
2291
2292impl UdpCtx {
2293    /// Returns the UDP socket index within this worker.
2294    pub fn index(&self) -> usize {
2295        self.udp_index as usize
2296    }
2297
2298    /// Receive a datagram, returning the payload and source address.
2299    ///
2300    /// Suspends until a datagram is available. Each call returns exactly one
2301    /// datagram. The payload is copied into a `Vec<u8>` (datagrams are
2302    /// typically small, so this is acceptable for the initial implementation).
2303    pub fn recv_from(&self) -> UdpRecvFuture {
2304        UdpRecvFuture {
2305            udp_index: self.udp_index,
2306        }
2307    }
2308
2309    /// Resolve when at least one UDP send slot is available on this socket.
2310    ///
2311    /// Use this to back off when [`UdpCtx::send_to`] returns
2312    /// [`crate::error::UdpSendError::PoolExhausted`], rather than busy-looping.
2313    /// On the io_uring backend the future suspends until a completion frees
2314    /// a slot. On the mio backend sends are synchronous and this future is
2315    /// always immediately ready — callers can still use it to stay
2316    /// backend-agnostic.
2317    ///
2318    /// Only one task should await this per socket at a time; a second waiter
2319    /// overwrites the first, which is then leaked (it must still be driven
2320    /// from another wake source).
2321    pub fn send_ready(&self) -> UdpSendReadyFuture {
2322        UdpSendReadyFuture {
2323            udp_index: self.udp_index,
2324        }
2325    }
2326
2327    /// Send a datagram to the given peer (fire-and-forget, copying).
2328    ///
2329    /// Copies `data` into the send pool and submits a `sendmsg` SQE.
2330    /// Only one send can be in-flight per UDP socket at a time.
2331    #[cfg(has_io_uring)]
2332    pub fn send_to(&self, peer: SocketAddr, data: &[u8]) -> Result<(), crate::error::UdpSendError> {
2333        with_state(|driver, _executor| driver.udp_send_to(self.udp_index, peer, data))
2334    }
2335
2336    /// Send a datagram to the given peer (mio backend — synchronous non-blocking send).
2337    ///
2338    /// `WouldBlock` is surfaced as [`crate::error::UdpSendError::PoolExhausted`] so callers
2339    /// have a single "try again later" branch that matches the io_uring backend.
2340    #[cfg(not(has_io_uring))]
2341    pub fn send_to(&self, peer: SocketAddr, data: &[u8]) -> Result<(), crate::error::UdpSendError> {
2342        with_state(|driver, _executor| {
2343            let idx = self.udp_index as usize;
2344            if idx >= driver.udp_sockets.len() {
2345                return Err(crate::error::UdpSendError::Io(io::Error::other(
2346                    "invalid UDP socket index",
2347                )));
2348            }
2349            match driver.udp_sockets[idx].send_to(data, peer) {
2350                Ok(_) => {
2351                    crate::metrics::UDP.increment(crate::metrics::udp::DATAGRAMS_SENT);
2352                    Ok(())
2353                }
2354                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
2355                    Err(crate::error::UdpSendError::PoolExhausted)
2356                }
2357                Err(e) => Err(crate::error::UdpSendError::Io(e)),
2358            }
2359        })
2360    }
2361}
2362
2363/// Future returned by [`UdpCtx::recv_from()`].
2364pub struct UdpRecvFuture {
2365    udp_index: u32,
2366}
2367
2368impl Future for UdpRecvFuture {
2369    type Output = (Vec<u8>, SocketAddr);
2370
2371    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
2372        with_state(|_driver, executor| {
2373            let idx = self.udp_index as usize;
2374            if idx < executor.udp_recv_queues.len()
2375                && let Some(datagram) = executor.udp_recv_queues[idx].pop_front()
2376            {
2377                return Poll::Ready(datagram);
2378            }
2379            // Register as waiter so the CQE handler wakes us.
2380            let task_id = CURRENT_TASK_ID.with(|c| c.get());
2381            if idx < executor.udp_recv_waiters.len() {
2382                executor.udp_recv_waiters[idx] = Some(task_id);
2383            }
2384            Poll::Pending
2385        })
2386    }
2387}
2388
2389/// Future returned by [`UdpCtx::send_ready()`].
2390pub struct UdpSendReadyFuture {
2391    /// Never read on the mio backend — the future resolves immediately
2392    /// without looking at any per-socket state.
2393    #[cfg_attr(not(has_io_uring), allow(dead_code))]
2394    udp_index: u32,
2395}
2396
2397impl Future for UdpSendReadyFuture {
2398    type Output = ();
2399
2400    #[cfg(has_io_uring)]
2401    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
2402        with_state(|driver, executor| {
2403            let idx = self.udp_index as usize;
2404            if idx < driver.udp_sockets.len() && !driver.udp_sockets[idx].send_freelist.is_empty() {
2405                return Poll::Ready(());
2406            }
2407            // Register as the waiter. CQE handler wakes us when a slot frees.
2408            let task_id = CURRENT_TASK_ID.with(|c| c.get());
2409            if idx < executor.udp_send_ready_waiters.len() {
2410                executor.udp_send_ready_waiters[idx] = Some(task_id);
2411            }
2412            Poll::Pending
2413        })
2414    }
2415
2416    /// Mio backend: sends are synchronous `send_to` calls that never queue,
2417    /// so this future resolves immediately. Callers stay backend-agnostic.
2418    #[cfg(not(has_io_uring))]
2419    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
2420        Poll::Ready(())
2421    }
2422}