Skip to main content

vyre_runtime/uring/
io_loop.rs

1//! Autonomous IO loop for persistent megakernel.
2//!
3//! This module implements Innovation I.5: host-side pump thread that
4//! polls the GPU's `io_queue` for requests and services them via
5//! io_uring. This removes the CPU from the dispatch critical path.
6
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9use std::thread::{self, JoinHandle};
10use std::time::Duration;
11
12use crate::megakernel::io::{claim_io_requests_into, complete_io_request, io_op};
13use crate::uring::stream::AsyncUringStream;
14use crate::PipelineError;
15
16const IDLE_SPINS: u32 = 64;
17const MIN_IDLE_PARK: Duration = Duration::from_micros(10);
18const MAX_IDLE_PARK: Duration = Duration::from_micros(100);
19
20/// Fixed-buffer destination registered with io_uring.
21///
22/// A megakernel IO request whose `dst_handle` matches `handle` can be serviced
23/// with `IORING_OP_READ_FIXED`, avoiding per-request iovec allocation and
24/// kernel-side iovec validation.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub struct RegisteredIoDestination {
27    /// Megakernel IO destination handle.
28    pub handle: u32,
29    /// io_uring registered-buffer index.
30    pub buf_index: u16,
31    /// Byte offset inside the registered GPU-visible buffer.
32    pub target_offset: u64,
33}
34
35#[derive(Default)]
36struct IdleBackoff {
37    polls: u32,
38}
39
40impl IdleBackoff {
41    fn reset(&mut self) {
42        self.polls = 0;
43    }
44
45    fn wait(&mut self, shutdown: &AtomicBool) {
46        if shutdown.load(Ordering::Acquire) {
47            return;
48        }
49        self.polls = self.polls.saturating_add(1);
50        if self.polls <= IDLE_SPINS {
51            std::hint::spin_loop();
52            return;
53        }
54        let shift = (self.polls - IDLE_SPINS).min(7);
55        let multiplier = 1_u32 << shift;
56        let park = MIN_IDLE_PARK
57            .checked_mul(multiplier)
58            .unwrap_or(MAX_IDLE_PARK)
59            .min(MAX_IDLE_PARK);
60        thread::park_timeout(park);
61    }
62}
63
64/// Host-side pump that services GPU-driven IO requests.
65pub struct MegakernelIoLoop {
66    shutdown: Arc<AtomicBool>,
67    handle: Option<JoinHandle<Result<(), PipelineError>>>,
68}
69
70impl MegakernelIoLoop {
71    /// Start a background thread that polls `io_queue_mapped`.
72    ///
73    /// READ requests require registered destinations; call
74    /// [`Self::spawn_with_registered_destinations`] for production IO.
75    pub fn spawn(stream: AsyncUringStream<'static>, io_queue_mapped: &'static mut [u8]) -> Self {
76        Self::spawn_with_registered_destinations(stream, io_queue_mapped, Vec::new())
77    }
78
79    /// Start a background IO pump with a registered destination table.
80    ///
81    /// Requests whose `dst_handle` is present in `registered_destinations` use
82    /// fixed-buffer reads. Unregistered READ destinations stop the pump with a
83    /// host-visible error instead of silently taking a host-iovec compatibility
84    /// route.
85    pub fn spawn_with_registered_destinations(
86        mut stream: AsyncUringStream<'static>,
87        io_queue_mapped: &'static mut [u8],
88        registered_destinations: Vec<RegisteredIoDestination>,
89    ) -> Self {
90        let shutdown = Arc::new(AtomicBool::new(false));
91        let shutdown_clone = Arc::clone(&shutdown);
92
93        let handle = thread::spawn(move || {
94            let mut backoff = IdleBackoff::default();
95            let mut requests = Vec::new();
96            let mut registered_destinations = registered_destinations;
97            registered_destinations.sort_unstable_by_key(|destination| destination.handle);
98            while !shutdown_clone.load(Ordering::Acquire) {
99                while let Some(cqe) = stream.ring_state.peek_cqe() {
100                    let res = cqe.res;
101                    let slot_idx = cqe.user_data;
102                    stream.ring_state.advance_cq();
103                    stream.inflight =
104                        stream
105                            .inflight
106                            .checked_sub(1)
107                            .ok_or(PipelineError::QueueFull {
108                                queue: "io_uring",
109                                fix: "megakernel IO loop completion arrived with no inflight SQE; rebuild the IO stream state",
110                            })?;
111                    let slot_idx = u32::try_from(slot_idx).map_err(|error| {
112                        PipelineError::QueueFull {
113                            queue: "completion",
114                            fix: match error {
115                                _ => "io_uring completion user_data does not fit megakernel IO slot index; keep user_data in u32 slot-id range",
116                            },
117                        }
118                    })?;
119                    complete_io_request(io_queue_mapped, slot_idx, res >= 0)?;
120                    backoff.reset();
121                }
122
123                // 1. Claim GPU-published IO requests exactly once.
124                claim_io_requests_into(io_queue_mapped, &mut requests)?;
125
126                if requests.is_empty() {
127                    if stream.inflight() > 0 {
128                        stream.flush_submissions()?;
129                        stream.ring_state.enter(0, 1, 1)?;
130                    } else {
131                        backoff.wait(&shutdown_clone);
132                    }
133                    continue;
134                }
135                backoff.reset();
136
137                for req in requests.iter().copied() {
138                    match req.op_type {
139                        // SAFETY: io_uring submission queue entry initialized in-place; the SQE
140                        // memory is owned by the ring and lives for the duration of the submit.
141                        io_op::READ => unsafe {
142                            let fd = req.src_handle as i32;
143                            if let Ok(destination_idx) = registered_destinations
144                                .binary_search_by_key(&req.dst_handle, |destination| {
145                                    destination.handle
146                                })
147                            {
148                                let destination = registered_destinations[destination_idx];
149                                // Bug fix: a submit_read_fixed_at error
150                                // previously returned via `?` while the
151                                // slot was still CLAIMED, hanging the
152                                // GPU which never saw a completion.
153                                // Mark the slot failed first, then
154                                // propagate the error.
155                                if let Err(e) = stream.submit_read_fixed_at(
156                                    fd,
157                                    req.offset,
158                                    req.byte_count,
159                                    destination.target_offset,
160                                    destination.buf_index,
161                                    u64::from(req.slot_idx),
162                                ) {
163                                    let _ =
164                                        complete_io_request(io_queue_mapped, req.slot_idx, false);
165                                    return Err(PipelineError::Backend(e.to_string()));
166                                }
167                            } else {
168                                complete_io_request(io_queue_mapped, req.slot_idx, false)?;
169                                return Err(PipelineError::Backend(format!(
170                                    "megakernel IO READ requested unregistered GPU destination handle {} in slot {}. Fix: register the destination with MegakernelIoLoop::spawn_with_registered_destinations before publishing READ requests.",
171                                    req.dst_handle, req.slot_idx
172                                )));
173                            }
174                        },
175                        io_op::FENCE => complete_io_request(io_queue_mapped, req.slot_idx, true)?,
176                        io_op::WRITE => complete_io_request(io_queue_mapped, req.slot_idx, false)?,
177                        _ => complete_io_request(io_queue_mapped, req.slot_idx, false)?,
178                    }
179                }
180                // Bug fix: same hazard as the per-request submit error
181                //  -  if flush_submissions fails, every slot we just
182                // claimed is stranded in CLAIMED. Mark every still-
183                // claimed slot failed before propagating.
184                if let Err(e) = stream.flush_submissions() {
185                    for req in requests.iter().copied() {
186                        if req.op_type == io_op::READ {
187                            let _ = complete_io_request(io_queue_mapped, req.slot_idx, false);
188                        }
189                    }
190                    return Err(e);
191                }
192            }
193            Ok(())
194        });
195
196        Self {
197            shutdown,
198            handle: Some(handle),
199        }
200    }
201
202    /// Stop the pump thread.
203    pub fn stop(&mut self) -> Result<(), PipelineError> {
204        self.shutdown.store(true, Ordering::Release);
205        if let Some(handle) = self.handle.take() {
206            handle.thread().unpark();
207            handle
208                .join()
209                .map_err(|_| PipelineError::Backend("IO loop thread panicked".to_string()))?
210        } else {
211            Ok(())
212        }
213    }
214}