vyre_runtime/uring/
io_loop.rs1use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9use std::thread::{self, JoinHandle};
10use std::time::Duration;
11
12use crate::megakernel::io::{claim_io_requests_into, complete_io_request, io_op};
13use crate::uring::stream::AsyncUringStream;
14use crate::PipelineError;
15
16const IDLE_SPINS: u32 = 64;
17const MIN_IDLE_PARK: Duration = Duration::from_micros(10);
18const MAX_IDLE_PARK: Duration = Duration::from_micros(100);
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub struct RegisteredIoDestination {
27 pub handle: u32,
29 pub buf_index: u16,
31 pub target_offset: u64,
33}
34
35#[derive(Default)]
36struct IdleBackoff {
37 polls: u32,
38}
39
40impl IdleBackoff {
41 fn reset(&mut self) {
42 self.polls = 0;
43 }
44
45 fn wait(&mut self, shutdown: &AtomicBool) {
46 if shutdown.load(Ordering::Acquire) {
47 return;
48 }
49 self.polls = self.polls.checked_add(1).unwrap_or_else(|| {
50 panic!(
51 "megakernel IO loop idle poll counter overflowed u32. Fix: reset idle backoff before polling indefinitely."
52 )
53 });
54 if self.polls <= IDLE_SPINS {
55 std::hint::spin_loop();
56 return;
57 }
58 let shift = (self.polls - IDLE_SPINS).min(7);
59 let multiplier = 1_u32.checked_shl(shift).unwrap_or_else(|| {
60 panic!(
61 "megakernel IO loop idle park multiplier overflowed u32. Fix: lower idle backoff shift."
62 )
63 });
64 let park = MIN_IDLE_PARK
65 .checked_mul(multiplier)
66 .unwrap_or_else(|| {
67 panic!(
68 "megakernel IO loop idle park duration overflowed. Fix: lower idle backoff bounds."
69 )
70 })
71 .min(MAX_IDLE_PARK);
72 thread::park_timeout(park);
73 }
74}
75
76pub struct MegakernelIoLoop {
78 shutdown: Arc<AtomicBool>,
79 handle: Option<JoinHandle<Result<(), PipelineError>>>,
80}
81
82impl MegakernelIoLoop {
83 pub fn spawn(stream: AsyncUringStream<'static>, io_queue_mapped: &'static mut [u8]) -> Self {
88 Self::spawn_with_registered_destinations(stream, io_queue_mapped, Vec::new())
89 }
90
91 pub fn spawn_with_registered_destinations(
98 mut stream: AsyncUringStream<'static>,
99 io_queue_mapped: &'static mut [u8],
100 registered_destinations: Vec<RegisteredIoDestination>,
101 ) -> Self {
102 let shutdown = Arc::new(AtomicBool::new(false));
103 let shutdown_clone = Arc::clone(&shutdown);
104
105 let handle = thread::spawn(move || {
106 let mut backoff = IdleBackoff::default();
107 let mut requests = Vec::new();
108 let mut registered_destinations = registered_destinations;
109 registered_destinations.sort_unstable_by_key(|destination| destination.handle);
110 while !shutdown_clone.load(Ordering::Acquire) {
111 while let Some(cqe) = stream.ring_state.peek_cqe() {
112 let res = cqe.res;
113 let slot_idx = cqe.user_data;
114 stream.ring_state.advance_cq();
115 stream.inflight = stream.inflight.checked_sub(1).unwrap_or_else(|| {
116 panic!(
117 "megakernel IO loop completion arrived with no inflight SQE. Fix: rebuild the IO stream state."
118 )
119 });
120 let slot_idx = u32::try_from(slot_idx).map_err(|error| {
121 PipelineError::QueueFull {
122 queue: "completion",
123 fix: match error {
124 _ => "io_uring completion user_data does not fit megakernel IO slot index; keep user_data in u32 slot-id range",
125 },
126 }
127 })?;
128 complete_io_request(io_queue_mapped, slot_idx, res >= 0)?;
129 backoff.reset();
130 }
131
132 claim_io_requests_into(io_queue_mapped, &mut requests)?;
134
135 if requests.is_empty() {
136 if stream.inflight() > 0 {
137 stream.flush_submissions()?;
138 stream.ring_state.enter(0, 1, 1)?;
139 } else {
140 backoff.wait(&shutdown_clone);
141 }
142 continue;
143 }
144 backoff.reset();
145
146 for req in requests.iter().copied() {
147 match req.op_type {
148 io_op::READ => unsafe {
151 let fd = req.src_handle as i32;
152 if let Ok(destination_idx) = registered_destinations
153 .binary_search_by_key(&req.dst_handle, |destination| {
154 destination.handle
155 })
156 {
157 let destination = registered_destinations[destination_idx];
158 if let Err(e) = stream.submit_read_fixed_at(
165 fd,
166 req.offset,
167 req.byte_count,
168 destination.target_offset,
169 destination.buf_index,
170 u64::from(req.slot_idx),
171 ) {
172 let _ =
173 complete_io_request(io_queue_mapped, req.slot_idx, false);
174 return Err(PipelineError::Backend(e.to_string()));
175 }
176 } else {
177 complete_io_request(io_queue_mapped, req.slot_idx, false)?;
178 return Err(PipelineError::Backend(format!(
179 "megakernel IO READ requested unregistered GPU destination handle {} in slot {}. Fix: register the destination with MegakernelIoLoop::spawn_with_registered_destinations before publishing READ requests.",
180 req.dst_handle, req.slot_idx
181 )));
182 }
183 },
184 io_op::FENCE => complete_io_request(io_queue_mapped, req.slot_idx, true)?,
185 io_op::WRITE => complete_io_request(io_queue_mapped, req.slot_idx, false)?,
186 _ => complete_io_request(io_queue_mapped, req.slot_idx, false)?,
187 }
188 }
189 if let Err(e) = stream.flush_submissions() {
194 for req in requests.iter().copied() {
195 if req.op_type == io_op::READ {
196 let _ = complete_io_request(io_queue_mapped, req.slot_idx, false);
197 }
198 }
199 return Err(e);
200 }
201 }
202 Ok(())
203 });
204
205 Self {
206 shutdown,
207 handle: Some(handle),
208 }
209 }
210
211 pub fn stop(&mut self) -> Result<(), PipelineError> {
213 self.shutdown.store(true, Ordering::Release);
214 if let Some(handle) = self.handle.take() {
215 handle.thread().unpark();
216 handle
217 .join()
218 .map_err(|_| PipelineError::Backend("IO loop thread panicked".to_string()))?
219 } else {
220 Ok(())
221 }
222 }
223}