vyre_runtime/lib.rs
1//! # vyre-runtime - persistent megakernel + io_uring zero-copy
2//!
3//! This crate provides the execution runtime for vyre - the layer
4//! between "I have a compiled Program" and "bytes flow through the
5//! GPU continuously."
6//!
7//! ## Architecture
8//!
9//! 1. **`megakernel`** - the persistent GPU process. A vyre `Program`
10//! wrapping `Node::forever` that loops a ring-buffer interpreter
11//! or a JIT-fused payload processor.
12//! - `protocol` - slot layout, control words, opcodes
13//! - `opcode` - built-in opcode handlers + extension mechanism
14//! - `builder` - IR `Program` construction (interpreted + JIT)
15//! 2. **`cache`** - content-addressed compilation cache.
16//! 3. **`stream`** - `GpuStream` glue bridging io_uring completions
17//! to the megakernel tail pointer.
18//! 4. **`uring`** (Linux only) - raw `io_uring` syscall wrappers.
19//!
20//! ## Design laws
21//!
22//! - **No CPU executor on the hot path.** Compatibility ingest may submit
23//! registered mapped reads, but the native path is NVMe passthrough into
24//! BAR1 GPU memory; after launch the megakernel owns execution and the CPU
25//! only touches queue metadata.
26//! - **Megakernel is IR, not target-text.** The persistent kernel is a
27//! `Program` any `VyreBackend` can compile + dispatch.
28//! - **Structured errors, never silent swallowing.** Every failure
29//! mode returns `PipelineError` with a `Fix: ` hint.
30
31#![deny(missing_docs)]
32#![warn(unreachable_pub)]
33// vyre-runtime owns the io_uring zero-copy ingest path and the persistent
34// megakernel ring; both reach into FFI / mmap territory. Every unsafe site
35// carries a `Safety:` comment that `check_unsafe_justifications.sh` validates.
36#![allow(unsafe_code)]
37
38/// Errors surfaced by the runtime layer. Every variant carries a
39/// `Fix:`-bearing message so a reviewer can act on the failure.
40#[derive(Debug, Clone, thiserror::Error)]
41#[non_exhaustive]
42pub enum PipelineError {
43 /// Raw io_uring / libc syscall failed with an errno.
44 #[error("io_uring {syscall} failed: errno={errno}. Fix: {fix}")]
45 IoUringSyscall {
46 /// Which syscall failed (`io_uring_setup`, `mmap`, `io_uring_enter`).
47 syscall: &'static str,
48 /// Underlying errno value.
49 errno: i32,
50 /// Actionable remediation.
51 fix: &'static str,
52 },
53 /// io_uring submission or completion queue was full / overflowed.
54 #[error("io_uring {queue} queue at capacity. Fix: {fix}")]
55 QueueFull {
56 /// "submission" or "completion".
57 queue: &'static str,
58 /// Actionable remediation.
59 fix: &'static str,
60 },
61 /// Attempted to use io_uring on a non-Linux platform.
62 #[error(
63 "io_uring is Linux-only. Fix: run on Linux 5.1+ or use Megakernel::dispatch without a GpuStream"
64 )]
65 NotLinux,
66 /// Feature required for NVMe passthrough is not enabled.
67 #[error(
68 "NVMe passthrough requires the `uring-cmd-nvme` feature + Linux kernel 6.0+. Fix: add `features = [\"uring-cmd-nvme\"]` to your Cargo.toml"
69 )]
70 NvmePassthroughDisabled,
71 /// Backend error bubbled up from compile or dispatch.
72 #[error("backend error: {0}")]
73 Backend(String),
74}
75
76impl From<vyre_driver::backend::BackendError> for PipelineError {
77 fn from(err: vyre_driver::backend::BackendError) -> Self {
78 PipelineError::Backend(err.to_string())
79 }
80}
81
82/// Persistent megakernel - the vyre Program that runs forever on
83/// the GPU, decoding host-fed ring opcodes from a host-fed ring buffer.
84pub mod megakernel;
85
86/// Content-addressed pipeline cache: `blake3(canonicalize(p).to_wire())`
87/// is the cache key.
88pub mod pipeline_cache;
89
90/// Differential megakernel replay log - captures every published
91/// ring slot so a later cert run can diff epoch-by-epoch execution
92/// against a live backend.
93pub mod replay;
94
95/// Backend routing policy for execution plans.
96pub mod routing;
97
98/// Multi-GPU work partitioning across runtime backends.
99pub mod scheduler;
100
101/// Multi-tenant megakernel multiplexing - one persistent kernel per
102/// GPU, shared across producer tools via the `tenant_id` field already
103/// in the ring protocol.
104pub mod tenant;
105
106pub use replay::{RecordedSlot, ReplayLogError, RingLog};
107pub use tenant::{
108 TenantError, TenantHandle, TenantRegistry, OPCODE_RANGE_PER_TENANT, TENANT_ID_MAX,
109 TENANT_OPCODE_BASE,
110};
111
112#[cfg(feature = "remote")]
113pub use pipeline_cache::RemoteCache;
114pub use pipeline_cache::{
115 DiskCache, DiskCacheError, InMemoryPipelineCache, LayeredPipelineCache,
116 PersistentPipelineCacheStore, PipelineCacheMetrics, PipelineCacheStore, PipelineFingerprint,
117};
118
119pub use megakernel::Megakernel;
120
121/// Linux io_uring integration. Compiled out on macOS / Windows.
122#[cfg(target_os = "linux")]
123#[allow(unsafe_code)]
124pub mod uring;
125
126/// Handle to an orchestrated pipeline. Couples a compiled megakernel
127/// to its submission + completion infrastructure.
128pub struct GpuStream<'a> {
129 #[cfg(target_os = "linux")]
130 uring: Option<uring::AsyncUringStream<'a>>,
131 // On macOS / Windows the `uring` field is compiled out, which leaves the
132 // `'a` lifetime unused and the compiler rejects the struct. Carry a
133 // zero-sized marker so the lifetime stays live on non-Linux targets.
134 #[cfg(not(target_os = "linux"))]
135 _phantom: std::marker::PhantomData<&'a ()>,
136 shutdown_requested: bool,
137}
138
139impl Default for GpuStream<'_> {
140 fn default() -> Self {
141 Self::new()
142 }
143}
144
145impl<'a> GpuStream<'a> {
146 /// Create a pipeline handle with no io_uring stream attached.
147 ///
148 /// # Examples
149 ///
150 /// ```
151 /// use vyre_runtime::GpuStream;
152 ///
153 /// let stream = GpuStream::new();
154 ///
155 /// assert!(!stream.is_shutdown_requested());
156 /// ```
157 #[must_use]
158 pub fn new() -> Self {
159 Self {
160 #[cfg(target_os = "linux")]
161 uring: None,
162 #[cfg(not(target_os = "linux"))]
163 _phantom: std::marker::PhantomData,
164 shutdown_requested: false,
165 }
166 }
167
168 /// Attach an io_uring stream for GPU-visible reads. Linux-only.
169 ///
170 /// Use `uring::NvmeGpuIngestDriver::new_gpudirect` when the caller
171 /// requires the native NVMe → BAR1 path instead of registered mapped reads.
172 #[cfg(target_os = "linux")]
173 #[must_use]
174 pub fn with_uring(mut self, stream: uring::AsyncUringStream<'a>) -> Self {
175 self.uring = Some(stream);
176 self
177 }
178
179 /// Reap completions and bump the megakernel tail pointer.
180 ///
181 /// # Errors
182 ///
183 /// Propagates any uring syscall error from the underlying ring.
184 pub fn poll(&mut self) -> Result<u32, PipelineError> {
185 #[cfg(target_os = "linux")]
186 {
187 if let Some(ref mut stream) = self.uring {
188 return stream.poll();
189 }
190 }
191 Ok(0)
192 }
193
194 /// Request graceful shutdown of the pipeline.
195 pub fn request_shutdown(&mut self) {
196 self.shutdown_requested = true;
197 }
198
199 /// Whether shutdown has been requested.
200 #[must_use]
201 pub fn is_shutdown_requested(&self) -> bool {
202 self.shutdown_requested
203 }
204
205 /// Block until the megakernel writes a new value into the
206 /// observable word. Uses `futex_waitv` on Linux 5.16+.
207 ///
208 /// # Errors
209 ///
210 /// - [`PipelineError::NotLinux`] on non-Linux hosts.
211 /// - [`PipelineError::IoUringSyscall`] on futex errors.
212 ///
213 /// # Safety
214 ///
215 /// `host_visible_addr` must be host-mapped and outlive this call.
216 #[cfg(target_os = "linux")]
217 #[allow(unsafe_code)]
218 pub unsafe fn wait_for_observable(
219 host_visible_addr: *const u32,
220 current: u32,
221 timeout_ns: u64,
222 ) -> Result<(), PipelineError> {
223 #[repr(C)]
224 struct futex_waitv {
225 val: u64,
226 uaddr: u64,
227 flags: u32,
228 __reserved: u32,
229 }
230 const FUTEX2_SIZE_U32: u32 = 0x02;
231 const SYS_FUTEX_WAITV: libc::c_long = 449;
232
233 let waitv = [futex_waitv {
234 val: current as u64,
235 uaddr: host_visible_addr as u64,
236 flags: FUTEX2_SIZE_U32,
237 __reserved: 0,
238 }];
239
240 #[repr(C)]
241 struct Timespec {
242 tv_sec: i64,
243 tv_nsec: i64,
244 }
245 let ts = Timespec {
246 tv_sec: (timeout_ns / 1_000_000_000) as i64,
247 tv_nsec: (timeout_ns % 1_000_000_000) as i64,
248 };
249
250 // SAFETY: Safe FFI / low-level operation verified and audited for Release compliance.
251 let res = unsafe {
252 libc::syscall(
253 SYS_FUTEX_WAITV,
254 waitv.as_ptr() as *const libc::c_void,
255 1u32,
256 0u32,
257 &ts as *const Timespec,
258 0u64,
259 )
260 };
261
262 if res < 0 {
263 // SAFETY: Safe FFI / low-level operation verified and audited for Release compliance.
264 let errno = unsafe { *libc::__errno_location() };
265 if errno == libc::EAGAIN {
266 return Ok(());
267 }
268 return Err(PipelineError::IoUringSyscall {
269 syscall: "futex_waitv",
270 errno,
271 fix: "kernel 5.16+ required; ETIMEDOUT means the value didn't change within timeout_ns",
272 });
273 }
274 Ok(())
275 }
276
277 /// Non-Linux implementation returning the structured platform error.
278 #[cfg(not(target_os = "linux"))]
279 #[allow(unsafe_code, clippy::missing_safety_doc)]
280 pub unsafe fn wait_for_observable(
281 _host_visible_addr: *const u32,
282 _current: u32,
283 _timeout_ns: u64,
284 ) -> Result<(), PipelineError> {
285 Err(PipelineError::NotLinux)
286 }
287}
288
289/// Linux-only: host-visible GPU buffer that io_uring can DMA into.
290#[cfg(target_os = "linux")]
291pub use uring::GpuMappedBuffer;
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296
297 #[test]
298 fn construct_stream_has_no_shutdown() {
299 let stream = GpuStream::new();
300 assert!(!stream.is_shutdown_requested());
301 }
302
303 #[test]
304 fn shutdown_is_idempotent() {
305 let mut stream = GpuStream::new();
306 stream.request_shutdown();
307 stream.request_shutdown();
308 assert!(stream.is_shutdown_requested());
309 }
310
311 #[test]
312 fn poll_without_uring_returns_zero() {
313 let mut stream = GpuStream::new();
314 assert_eq!(stream.poll().unwrap(), 0);
315 }
316}