Skip to main content

vyre_runtime/uring/
io_loop.rs

1//! Autonomous IO loop for persistent megakernel.
2//!
3//! This module implements Innovation I.5: host-side pump thread that
4//! polls the GPU's `io_queue` for requests and services them via
5//! io_uring. This removes the CPU from the dispatch critical path.
6
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9use std::thread::{self, JoinHandle};
10use std::time::Duration;
11
12use crate::megakernel::io::{claim_io_requests_into, complete_io_request, io_op};
13use crate::uring::stream::AsyncUringStream;
14use crate::PipelineError;
15
16const IDLE_SPINS: u32 = 64;
17const MIN_IDLE_PARK: Duration = Duration::from_micros(10);
18const MAX_IDLE_PARK: Duration = Duration::from_micros(100);
19
20/// Fixed-buffer destination registered with io_uring.
21///
22/// A megakernel IO request whose `dst_handle` matches `handle` can be serviced
23/// with `IORING_OP_READ_FIXED`, avoiding per-request iovec allocation and
24/// kernel-side iovec validation.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub struct RegisteredIoDestination {
27    /// Megakernel IO destination handle.
28    pub handle: u32,
29    /// io_uring registered-buffer index.
30    pub buf_index: u16,
31    /// Byte offset inside the registered GPU-visible buffer.
32    pub target_offset: u64,
33}
34
35#[derive(Default)]
36struct IdleBackoff {
37    polls: u32,
38}
39
40impl IdleBackoff {
41    fn reset(&mut self) {
42        self.polls = 0;
43    }
44
45    fn wait(&mut self, shutdown: &AtomicBool) {
46        if shutdown.load(Ordering::Acquire) {
47            return;
48        }
49        self.polls = self.polls.checked_add(1).unwrap_or_else(|| {
50            panic!(
51                "megakernel IO loop idle poll counter overflowed u32. Fix: reset idle backoff before polling indefinitely."
52            )
53        });
54        if self.polls <= IDLE_SPINS {
55            std::hint::spin_loop();
56            return;
57        }
58        let shift = (self.polls - IDLE_SPINS).min(7);
59        let multiplier = 1_u32.checked_shl(shift).unwrap_or_else(|| {
60            panic!(
61                "megakernel IO loop idle park multiplier overflowed u32. Fix: lower idle backoff shift."
62            )
63        });
64        let park = MIN_IDLE_PARK
65            .checked_mul(multiplier)
66            .unwrap_or_else(|| {
67                panic!(
68                    "megakernel IO loop idle park duration overflowed. Fix: lower idle backoff bounds."
69                )
70            })
71            .min(MAX_IDLE_PARK);
72        thread::park_timeout(park);
73    }
74}
75
76/// Host-side pump that services GPU-driven IO requests.
77pub struct MegakernelIoLoop {
78    shutdown: Arc<AtomicBool>,
79    handle: Option<JoinHandle<Result<(), PipelineError>>>,
80}
81
82impl MegakernelIoLoop {
83    /// Start a background thread that polls `io_queue_mapped`.
84    ///
85    /// READ requests require registered destinations; call
86    /// [`Self::spawn_with_registered_destinations`] for production IO.
87    pub fn spawn(stream: AsyncUringStream<'static>, io_queue_mapped: &'static mut [u8]) -> Self {
88        Self::spawn_with_registered_destinations(stream, io_queue_mapped, Vec::new())
89    }
90
91    /// Start a background IO pump with a registered destination table.
92    ///
93    /// Requests whose `dst_handle` is present in `registered_destinations` use
94    /// fixed-buffer reads. Unregistered READ destinations stop the pump with a
95    /// host-visible error instead of silently taking a host-iovec compatibility
96    /// route.
97    pub fn spawn_with_registered_destinations(
98        mut stream: AsyncUringStream<'static>,
99        io_queue_mapped: &'static mut [u8],
100        registered_destinations: Vec<RegisteredIoDestination>,
101    ) -> Self {
102        let shutdown = Arc::new(AtomicBool::new(false));
103        let shutdown_clone = Arc::clone(&shutdown);
104
105        let handle = thread::spawn(move || {
106            let mut backoff = IdleBackoff::default();
107            let mut requests = Vec::new();
108            let mut registered_destinations = registered_destinations;
109            registered_destinations.sort_unstable_by_key(|destination| destination.handle);
110            while !shutdown_clone.load(Ordering::Acquire) {
111                while let Some(cqe) = stream.ring_state.peek_cqe() {
112                    let res = cqe.res;
113                    let slot_idx = cqe.user_data;
114                    stream.ring_state.advance_cq();
115                    stream.inflight = stream.inflight.checked_sub(1).unwrap_or_else(|| {
116                        panic!(
117                            "megakernel IO loop completion arrived with no inflight SQE. Fix: rebuild the IO stream state."
118                        )
119                    });
120                    let slot_idx = u32::try_from(slot_idx).map_err(|error| {
121                        PipelineError::QueueFull {
122                            queue: "completion",
123                            fix: match error {
124                                _ => "io_uring completion user_data does not fit megakernel IO slot index; keep user_data in u32 slot-id range",
125                            },
126                        }
127                    })?;
128                    complete_io_request(io_queue_mapped, slot_idx, res >= 0)?;
129                    backoff.reset();
130                }
131
132                // 1. Claim GPU-published IO requests exactly once.
133                claim_io_requests_into(io_queue_mapped, &mut requests)?;
134
135                if requests.is_empty() {
136                    if stream.inflight() > 0 {
137                        stream.flush_submissions()?;
138                        stream.ring_state.enter(0, 1, 1)?;
139                    } else {
140                        backoff.wait(&shutdown_clone);
141                    }
142                    continue;
143                }
144                backoff.reset();
145
146                for req in requests.iter().copied() {
147                    match req.op_type {
148                        // SAFETY: io_uring submission queue entry initialized in-place; the SQE
149                        // memory is owned by the ring and lives for the duration of the submit.
150                        io_op::READ => unsafe {
151                            let fd = req.src_handle as i32;
152                            if let Ok(destination_idx) = registered_destinations
153                                .binary_search_by_key(&req.dst_handle, |destination| {
154                                    destination.handle
155                                })
156                            {
157                                let destination = registered_destinations[destination_idx];
158                                // Bug fix: a submit_read_fixed_at error
159                                // previously returned via `?` while the
160                                // slot was still CLAIMED, hanging the
161                                // GPU which never saw a completion.
162                                // Mark the slot failed first, then
163                                // propagate the error.
164                                if let Err(e) = stream.submit_read_fixed_at(
165                                    fd,
166                                    req.offset,
167                                    req.byte_count,
168                                    destination.target_offset,
169                                    destination.buf_index,
170                                    u64::from(req.slot_idx),
171                                ) {
172                                    let _ =
173                                        complete_io_request(io_queue_mapped, req.slot_idx, false);
174                                    return Err(PipelineError::Backend(e.to_string()));
175                                }
176                            } else {
177                                complete_io_request(io_queue_mapped, req.slot_idx, false)?;
178                                return Err(PipelineError::Backend(format!(
179                                    "megakernel IO READ requested unregistered GPU destination handle {} in slot {}. Fix: register the destination with MegakernelIoLoop::spawn_with_registered_destinations before publishing READ requests.",
180                                    req.dst_handle, req.slot_idx
181                                )));
182                            }
183                        },
184                        io_op::FENCE => complete_io_request(io_queue_mapped, req.slot_idx, true)?,
185                        io_op::WRITE => complete_io_request(io_queue_mapped, req.slot_idx, false)?,
186                        _ => complete_io_request(io_queue_mapped, req.slot_idx, false)?,
187                    }
188                }
189                // Bug fix: same hazard as the per-request submit error
190                //  -  if flush_submissions fails, every slot we just
191                // claimed is stranded in CLAIMED. Mark every still-
192                // claimed slot failed before propagating.
193                if let Err(e) = stream.flush_submissions() {
194                    for req in requests.iter().copied() {
195                        if req.op_type == io_op::READ {
196                            let _ = complete_io_request(io_queue_mapped, req.slot_idx, false);
197                        }
198                    }
199                    return Err(e);
200                }
201            }
202            Ok(())
203        });
204
205        Self {
206            shutdown,
207            handle: Some(handle),
208        }
209    }
210
211    /// Stop the pump thread.
212    pub fn stop(&mut self) -> Result<(), PipelineError> {
213        self.shutdown.store(true, Ordering::Release);
214        if let Some(handle) = self.handle.take() {
215            handle.thread().unpark();
216            handle
217                .join()
218                .map_err(|_| PipelineError::Backend("IO loop thread panicked".to_string()))?
219        } else {
220            Ok(())
221        }
222    }
223}