1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
//! io_uring reactor's bounded park — split out of [`crate::uring_reactor`]
//! so that file stays under the project's 500-LOC ceiling. Same
//! `impl Shard`, called from `run_uring`.
#![cfg(target_os = "linux")]
use std::io;
use std::sync::atomic::{Ordering, fence};
use kevy_uring::{IoUring, KernelTimespec};
use crate::Commands;
use crate::shard::Shard;
use crate::uring_conn::ParkState;
use crate::uring_reactor::{OP_TIMEOUT, OP_WAKER};
impl<C: Commands> Shard<C> {
/// Blocking wait, epoll-park equivalent: publish `parked[me]`, close the
/// park/wake race with a fenced re-drain (same pairing as `Shard::run` /
/// `flush_wakes`; loom-verified there), then block in
/// `submit_and_wait(1)` until any CQE — socket I/O, the waker-pipe read
/// (a peer pushed to our inbox), or the bounding timeout (tick cadence,
/// default 50 ms). The CQEs are reaped by the next loop iteration.
pub(crate) fn uring_park(
&mut self,
ring: &mut IoUring,
park: &mut ParkState,
) -> io::Result<()> {
let me = self.id;
self.parked[me].store(true, Ordering::SeqCst);
fence(Ordering::SeqCst);
if self.uring_drain_inbound() {
// A push landed in the race window — process it, don't block.
self.parked[me].store(false, Ordering::SeqCst);
return Ok(());
}
if !park.waker_armed {
// SAFETY: `park` lives on `run_uring`'s stack for the reactor's
// whole life, so `wake_buf` outlives the SQE.
park.waker_armed = unsafe {
ring.prep_read(
self.waker.read_fd(),
park.wake_buf.as_mut_ptr(),
park.wake_buf.len() as u32,
OP_WAKER,
)
};
}
if !park.timeout_inflight {
park.ts = KernelTimespec::from_millis(self.park_timeout_ms.max(1) as u64);
// SAFETY: `ts` is only rewritten when no timeout SQE is in
// flight, and outlives the SQE (same lifetime as `wake_buf`).
park.timeout_inflight = unsafe { ring.prep_timeout(&park.ts, OP_TIMEOUT) };
}
if park.waker_armed || park.timeout_inflight {
ring.submit_and_wait(1)?;
}
self.parked[me].store(false, Ordering::SeqCst);
Ok(())
}
// The `uring_nap` middle-rung was removed after both attempts
// (timeout-only variant kept here, and an inline state-machine
// refactor) deadlocked under Rust-client sequential -c1 load. See
// the idle-ladder comment in `run_uring` for the full story; the
// current path is spin → park.
}