Skip to main content

vyre_runtime/
lib.rs

1//! # vyre-runtime  -  persistent megakernel + io_uring zero-copy
2//!
3//! This crate provides the execution runtime for vyre  -  the layer
4//! between "I have a compiled Program" and "bytes flow through the
5//! GPU continuously."
6//!
7//! ## Architecture
8//!
9//! 1. **`megakernel`**  -  the persistent GPU process. A vyre `Program`
10//!    wrapping `Node::forever` that loops a ring-buffer interpreter
11//!    or a JIT-fused payload processor.
12//!    - `protocol`  -  slot layout, control words, opcodes
13//!    - `opcode`  -  built-in opcode handlers + extension mechanism
14//!    - `builder`  -  IR `Program` construction (interpreted + JIT)
15//! 2. **`cache`**  -  content-addressed compilation cache.
16//! 3. **`stream`**  -  `GpuStream` glue bridging io_uring completions
17//!    to the megakernel tail pointer.
18//! 4. **`uring`** (Linux only)  -  raw `io_uring` syscall wrappers.
19//!
20//! ## Design laws
21//!
22//! - **No CPU executor on the hot path.** Compatibility ingest may submit
23//!   registered mapped reads, but the native path is NVMe passthrough into
24//!   BAR1 GPU memory; after launch the megakernel owns execution and the CPU
25//!   only touches queue metadata.
26//! - **Megakernel is IR, not target-text.** The persistent kernel is a
27//!   `Program` any `VyreBackend` can compile + dispatch.
28//! - **Structured errors, never silent swallowing.** Every failure
29//!   mode returns `PipelineError` with a `Fix: ` hint.
30
31#![deny(missing_docs)]
32#![warn(unreachable_pub)]
33// vyre-runtime owns the io_uring zero-copy ingest path and the persistent
34// megakernel ring; both reach into FFI / mmap territory. Every unsafe site
35// carries a `Safety:` comment that `check_unsafe_justifications.sh` validates.
36#![allow(unsafe_code)]
37
38/// Errors surfaced by the runtime layer. Every variant carries a
39/// `Fix:`-bearing message so a reviewer can act on the failure.
40#[derive(Debug, Clone, thiserror::Error)]
41#[non_exhaustive]
42pub enum PipelineError {
43    /// Raw io_uring / libc syscall failed with an errno.
44    #[error("io_uring {syscall} failed: errno={errno}. Fix: {fix}")]
45    IoUringSyscall {
46        /// Which syscall failed (`io_uring_setup`, `mmap`, `io_uring_enter`).
47        syscall: &'static str,
48        /// Underlying errno value.
49        errno: i32,
50        /// Actionable remediation.
51        fix: &'static str,
52    },
53    /// io_uring submission or completion queue was full / overflowed.
54    #[error("io_uring {queue} queue at capacity. Fix: {fix}")]
55    QueueFull {
56        /// "submission" or "completion".
57        queue: &'static str,
58        /// Actionable remediation.
59        fix: &'static str,
60    },
61    /// Attempted to use io_uring on a non-Linux platform.
62    #[error(
63        "io_uring is Linux-only. Fix: run on Linux 5.1+ or use Megakernel::dispatch without a GpuStream"
64    )]
65    NotLinux,
66    /// Feature required for NVMe passthrough is not enabled.
67    #[error(
68        "NVMe passthrough requires the `uring-cmd-nvme` feature + Linux kernel 6.0+. Fix: add `features = [\"uring-cmd-nvme\"]` to your Cargo.toml"
69    )]
70    NvmePassthroughDisabled,
71    /// Backend error bubbled up from compile or dispatch.
72    #[error("backend error: {0}")]
73    Backend(String),
74}
75
76impl From<vyre_driver::backend::BackendError> for PipelineError {
77    fn from(err: vyre_driver::backend::BackendError) -> Self {
78        PipelineError::Backend(err.to_string())
79    }
80}
81
82/// Persistent megakernel  -  the vyre Program that runs forever on
83/// the GPU, decoding host-fed ring opcodes from a host-fed ring buffer.
84pub mod megakernel;
85
86/// Content-addressed pipeline cache: `blake3(canonicalize(p).to_wire())`
87/// is the cache key.
88pub mod pipeline_cache;
89
90/// Differential megakernel replay log  -  captures every published
91/// ring slot so a later cert run can diff epoch-by-epoch execution
92/// against a live backend.
93pub mod replay;
94
95/// Backend routing policy for execution plans.
96pub mod routing;
97
98/// Multi-GPU work partitioning across runtime backends.
99pub mod scheduler;
100
101/// Multi-tenant megakernel multiplexing  -  one persistent kernel per
102/// GPU, shared across producer tools via the `tenant_id` field already
103/// in the ring protocol.
104pub mod tenant;
105
106pub use replay::{RecordedSlot, ReplayLogError, RingLog};
107pub use tenant::{
108    TenantError, TenantHandle, TenantRegistry, OPCODE_RANGE_PER_TENANT, TENANT_ID_MAX,
109    TENANT_OPCODE_BASE,
110};
111
112#[cfg(feature = "remote")]
113pub use pipeline_cache::RemoteCache;
114pub use pipeline_cache::{
115    DiskCache, DiskCacheError, InMemoryPipelineCache, LayeredPipelineCache,
116    PersistentPipelineCacheStore, PipelineCacheMetrics, PipelineCacheStore, PipelineFingerprint,
117};
118
119pub use megakernel::Megakernel;
120
121/// Linux io_uring integration. Compiled out on macOS / Windows.
122#[cfg(target_os = "linux")]
123#[allow(unsafe_code)]
124pub mod uring;
125
126/// Handle to an orchestrated pipeline. Couples a compiled megakernel
127/// to its submission + completion infrastructure.
128pub struct GpuStream<'a> {
129    #[cfg(target_os = "linux")]
130    uring: Option<uring::AsyncUringStream<'a>>,
131    // On macOS / Windows the `uring` field is compiled out, which leaves the
132    // `'a` lifetime unused and the compiler rejects the struct. Carry a
133    // zero-sized marker so the lifetime stays live on non-Linux targets.
134    #[cfg(not(target_os = "linux"))]
135    _phantom: std::marker::PhantomData<&'a ()>,
136    shutdown_requested: bool,
137}
138
139impl Default for GpuStream<'_> {
140    fn default() -> Self {
141        Self::new()
142    }
143}
144
145impl<'a> GpuStream<'a> {
146    /// Create a pipeline handle with no io_uring stream attached.
147    ///
148    /// # Examples
149    ///
150    /// ```
151    /// use vyre_runtime::GpuStream;
152    ///
153    /// let stream = GpuStream::new();
154    ///
155    /// assert!(!stream.is_shutdown_requested());
156    /// ```
157    #[must_use]
158    pub fn new() -> Self {
159        Self {
160            #[cfg(target_os = "linux")]
161            uring: None,
162            #[cfg(not(target_os = "linux"))]
163            _phantom: std::marker::PhantomData,
164            shutdown_requested: false,
165        }
166    }
167
168    /// Attach an io_uring stream for GPU-visible reads. Linux-only.
169    ///
170    /// Use `uring::NvmeGpuIngestDriver::new_gpudirect` when the caller
171    /// requires the native NVMe → BAR1 path instead of registered mapped reads.
172    #[cfg(target_os = "linux")]
173    #[must_use]
174    pub fn with_uring(mut self, stream: uring::AsyncUringStream<'a>) -> Self {
175        self.uring = Some(stream);
176        self
177    }
178
179    /// Reap completions and bump the megakernel tail pointer.
180    ///
181    /// # Errors
182    ///
183    /// Propagates any uring syscall error from the underlying ring.
184    pub fn poll(&mut self) -> Result<u32, PipelineError> {
185        #[cfg(target_os = "linux")]
186        {
187            if let Some(ref mut stream) = self.uring {
188                return stream.poll();
189            }
190        }
191        Ok(0)
192    }
193
194    /// Request graceful shutdown of the pipeline.
195    pub fn request_shutdown(&mut self) {
196        self.shutdown_requested = true;
197    }
198
199    /// Whether shutdown has been requested.
200    #[must_use]
201    pub fn is_shutdown_requested(&self) -> bool {
202        self.shutdown_requested
203    }
204
205    /// Block until the megakernel writes a new value into the
206    /// observable word. Uses `futex_waitv` on Linux 5.16+.
207    ///
208    /// # Errors
209    ///
210    /// - [`PipelineError::NotLinux`] on non-Linux hosts.
211    /// - [`PipelineError::IoUringSyscall`] on futex errors.
212    ///
213    /// # Safety
214    ///
215    /// `host_visible_addr` must be host-mapped and outlive this call.
216    #[cfg(target_os = "linux")]
217    #[allow(unsafe_code)]
218    pub unsafe fn wait_for_observable(
219        host_visible_addr: *const u32,
220        current: u32,
221        timeout_ns: u64,
222    ) -> Result<(), PipelineError> {
223        #[repr(C)]
224        struct futex_waitv {
225            val: u64,
226            uaddr: u64,
227            flags: u32,
228            __reserved: u32,
229        }
230        const FUTEX2_SIZE_U32: u32 = 0x02;
231        const SYS_FUTEX_WAITV: libc::c_long = 449;
232
233        let waitv = [futex_waitv {
234            val: current as u64,
235            uaddr: host_visible_addr as u64,
236            flags: FUTEX2_SIZE_U32,
237            __reserved: 0,
238        }];
239
240        #[repr(C)]
241        struct Timespec {
242            tv_sec: i64,
243            tv_nsec: i64,
244        }
245        let ts = Timespec {
246            tv_sec: (timeout_ns / 1_000_000_000) as i64,
247            tv_nsec: (timeout_ns % 1_000_000_000) as i64,
248        };
249
250        // SAFETY: Safe FFI / low-level operation verified and audited for Release compliance.
251        let res = unsafe {
252            libc::syscall(
253                SYS_FUTEX_WAITV,
254                waitv.as_ptr() as *const libc::c_void,
255                1u32,
256                0u32,
257                &ts as *const Timespec,
258                0u64,
259            )
260        };
261
262        if res < 0 {
263            // SAFETY: Safe FFI / low-level operation verified and audited for Release compliance.
264            let errno = unsafe { *libc::__errno_location() };
265            if errno == libc::EAGAIN {
266                return Ok(());
267            }
268            return Err(PipelineError::IoUringSyscall {
269                syscall: "futex_waitv",
270                errno,
271                fix: "kernel 5.16+ required; ETIMEDOUT means the value didn't change within timeout_ns",
272            });
273        }
274        Ok(())
275    }
276
277    /// Non-Linux implementation returning the structured platform error.
278    #[cfg(not(target_os = "linux"))]
279    #[allow(unsafe_code, clippy::missing_safety_doc)]
280    pub unsafe fn wait_for_observable(
281        _host_visible_addr: *const u32,
282        _current: u32,
283        _timeout_ns: u64,
284    ) -> Result<(), PipelineError> {
285        Err(PipelineError::NotLinux)
286    }
287}
288
289/// Linux-only: host-visible GPU buffer that io_uring can DMA into.
290#[cfg(target_os = "linux")]
291pub use uring::GpuMappedBuffer;
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296
297    #[test]
298    fn construct_stream_has_no_shutdown() {
299        let stream = GpuStream::new();
300        assert!(!stream.is_shutdown_requested());
301    }
302
303    #[test]
304    fn shutdown_is_idempotent() {
305        let mut stream = GpuStream::new();
306        stream.request_shutdown();
307        stream.request_shutdown();
308        assert!(stream.is_shutdown_requested());
309    }
310
311    #[test]
312    fn poll_without_uring_returns_zero() {
313        let mut stream = GpuStream::new();
314        assert_eq!(stream.poll().unwrap(), 0);
315    }
316}