vyre_runtime/uring/
io_loop.rs1use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9use std::thread::{self, JoinHandle};
10use std::time::Duration;
11
12use crate::megakernel::io::{claim_io_requests_into, complete_io_request, io_op};
13use crate::uring::stream::AsyncUringStream;
14use crate::PipelineError;
15
16const IDLE_SPINS: u32 = 64;
17const MIN_IDLE_PARK: Duration = Duration::from_micros(10);
18const MAX_IDLE_PARK: Duration = Duration::from_micros(100);
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub struct RegisteredIoDestination {
27 pub handle: u32,
29 pub buf_index: u16,
31 pub target_offset: u64,
33}
34
35#[derive(Default)]
36struct IdleBackoff {
37 polls: u32,
38}
39
40impl IdleBackoff {
41 fn reset(&mut self) {
42 self.polls = 0;
43 }
44
45 fn wait(&mut self, shutdown: &AtomicBool) {
46 if shutdown.load(Ordering::Acquire) {
47 return;
48 }
49 self.polls = self.polls.saturating_add(1);
50 if self.polls <= IDLE_SPINS {
51 std::hint::spin_loop();
52 return;
53 }
54 let shift = (self.polls - IDLE_SPINS).min(7);
55 let multiplier = 1_u32 << shift;
56 let park = MIN_IDLE_PARK
57 .checked_mul(multiplier)
58 .unwrap_or(MAX_IDLE_PARK)
59 .min(MAX_IDLE_PARK);
60 thread::park_timeout(park);
61 }
62}
63
64pub struct MegakernelIoLoop {
66 shutdown: Arc<AtomicBool>,
67 handle: Option<JoinHandle<Result<(), PipelineError>>>,
68}
69
70impl MegakernelIoLoop {
71 pub fn spawn(stream: AsyncUringStream<'static>, io_queue_mapped: &'static mut [u8]) -> Self {
76 Self::spawn_with_registered_destinations(stream, io_queue_mapped, Vec::new())
77 }
78
79 pub fn spawn_with_registered_destinations(
86 mut stream: AsyncUringStream<'static>,
87 io_queue_mapped: &'static mut [u8],
88 registered_destinations: Vec<RegisteredIoDestination>,
89 ) -> Self {
90 let shutdown = Arc::new(AtomicBool::new(false));
91 let shutdown_clone = Arc::clone(&shutdown);
92
93 let handle = thread::spawn(move || {
94 let mut backoff = IdleBackoff::default();
95 let mut requests = Vec::new();
96 let mut registered_destinations = registered_destinations;
97 registered_destinations.sort_unstable_by_key(|destination| destination.handle);
98 while !shutdown_clone.load(Ordering::Acquire) {
99 while let Some(cqe) = stream.ring_state.peek_cqe() {
100 let res = cqe.res;
101 let slot_idx = cqe.user_data;
102 stream.ring_state.advance_cq();
103 stream.inflight =
104 stream
105 .inflight
106 .checked_sub(1)
107 .ok_or(PipelineError::QueueFull {
108 queue: "io_uring",
109 fix: "megakernel IO loop completion arrived with no inflight SQE; rebuild the IO stream state",
110 })?;
111 let slot_idx = u32::try_from(slot_idx).map_err(|error| {
112 PipelineError::QueueFull {
113 queue: "completion",
114 fix: match error {
115 _ => "io_uring completion user_data does not fit megakernel IO slot index; keep user_data in u32 slot-id range",
116 },
117 }
118 })?;
119 complete_io_request(io_queue_mapped, slot_idx, res >= 0)?;
120 backoff.reset();
121 }
122
123 claim_io_requests_into(io_queue_mapped, &mut requests)?;
125
126 if requests.is_empty() {
127 if stream.inflight() > 0 {
128 stream.flush_submissions()?;
129 stream.ring_state.enter(0, 1, 1)?;
130 } else {
131 backoff.wait(&shutdown_clone);
132 }
133 continue;
134 }
135 backoff.reset();
136
137 for req in requests.iter().copied() {
138 match req.op_type {
139 io_op::READ => unsafe {
142 let fd = req.src_handle as i32;
143 if let Ok(destination_idx) = registered_destinations
144 .binary_search_by_key(&req.dst_handle, |destination| {
145 destination.handle
146 })
147 {
148 let destination = registered_destinations[destination_idx];
149 if let Err(e) = stream.submit_read_fixed_at(
156 fd,
157 req.offset,
158 req.byte_count,
159 destination.target_offset,
160 destination.buf_index,
161 u64::from(req.slot_idx),
162 ) {
163 let _ =
164 complete_io_request(io_queue_mapped, req.slot_idx, false);
165 return Err(PipelineError::Backend(e.to_string()));
166 }
167 } else {
168 complete_io_request(io_queue_mapped, req.slot_idx, false)?;
169 return Err(PipelineError::Backend(format!(
170 "megakernel IO READ requested unregistered GPU destination handle {} in slot {}. Fix: register the destination with MegakernelIoLoop::spawn_with_registered_destinations before publishing READ requests.",
171 req.dst_handle, req.slot_idx
172 )));
173 }
174 },
175 io_op::FENCE => complete_io_request(io_queue_mapped, req.slot_idx, true)?,
176 io_op::WRITE => complete_io_request(io_queue_mapped, req.slot_idx, false)?,
177 _ => complete_io_request(io_queue_mapped, req.slot_idx, false)?,
178 }
179 }
180 if let Err(e) = stream.flush_submissions() {
185 for req in requests.iter().copied() {
186 if req.op_type == io_op::READ {
187 let _ = complete_io_request(io_queue_mapped, req.slot_idx, false);
188 }
189 }
190 return Err(e);
191 }
192 }
193 Ok(())
194 });
195
196 Self {
197 shutdown,
198 handle: Some(handle),
199 }
200 }
201
202 pub fn stop(&mut self) -> Result<(), PipelineError> {
204 self.shutdown.store(true, Ordering::Release);
205 if let Some(handle) = self.handle.take() {
206 handle.thread().unpark();
207 handle
208 .join()
209 .map_err(|_| PipelineError::Backend("IO loop thread panicked".to_string()))?
210 } else {
211 Ok(())
212 }
213 }
214}