Skip to main content

vyre_runtime/
lib.rs

1//! # vyre-runtime  -  persistent megakernel + io_uring zero-copy
2//!
3//! This crate provides the execution runtime for vyre  -  the layer
4//! between "I have a compiled Program" and "bytes flow through the
5//! GPU continuously."
6//!
7//! ## Architecture
8//!
9//! 1. **`megakernel`**  -  the persistent GPU process. A vyre `Program`
10//!    wrapping `Node::forever` that loops a ring-buffer interpreter
11//!    or a JIT-fused payload processor.
12//!    - `protocol`  -  slot layout, control words, opcodes
13//!    - `opcode`  -  built-in opcode handlers + extension mechanism
14//!    - `builder`  -  IR `Program` construction (interpreted + JIT)
15//! 2. **`cache`**  -  content-addressed compilation cache.
16//! 3. **`stream`**  -  `GpuStream` glue bridging io_uring completions
17//!    to the megakernel tail pointer.
18//! 4. **`uring`** (Linux only)  -  raw `io_uring` syscall wrappers.
19//!
20//! ## Design laws
21//!
22//! - **No CPU executor on the hot path.** Compatibility ingest may submit
23//!   registered mapped reads, but the native path is NVMe passthrough into
24//!   BAR1 GPU memory; after launch the megakernel owns execution and the CPU
25//!   only touches queue metadata.
26//! - **Megakernel is IR, not target-text.** The persistent kernel is a
27//!   `Program` any `VyreBackend` can compile + dispatch.
28//! - **Structured errors, never silent swallowing.** Every failure
29//!   mode returns `PipelineError` with a `Fix: ` hint.
30
31#![deny(missing_docs)]
32#![warn(unreachable_pub)]
33// vyre-runtime owns the io_uring zero-copy ingest path and the persistent
34// megakernel ring; both reach into FFI / mmap territory. Every unsafe site
35// carries a `Safety:` comment that `check_unsafe_justifications.sh` validates.
36#![allow(unsafe_code)]
37
38/// Errors surfaced by the runtime layer. Every variant carries a
39/// `Fix:`-bearing message so a reviewer can act on the failure.
40#[derive(Debug, Clone, thiserror::Error)]
41#[non_exhaustive]
42pub enum PipelineError {
43    /// Raw io_uring / libc syscall failed with an errno.
44    #[error("io_uring {syscall} failed: errno={errno}. Fix: {fix}")]
45    IoUringSyscall {
46        /// Which syscall failed (`io_uring_setup`, `mmap`, `io_uring_enter`).
47        syscall: &'static str,
48        /// Underlying errno value.
49        errno: i32,
50        /// Actionable remediation.
51        fix: &'static str,
52    },
53    /// io_uring submission or completion queue was full / overflowed.
54    #[error("io_uring {queue} queue at capacity. Fix: {fix}")]
55    QueueFull {
56        /// "submission" or "completion".
57        queue: &'static str,
58        /// Actionable remediation.
59        fix: &'static str,
60    },
61    /// Attempted to use io_uring on a non-Linux platform.
62    #[error(
63        "io_uring is Linux-only. Fix: run on Linux 5.1+ or use Megakernel::dispatch without a GpuStream"
64    )]
65    NotLinux,
66    /// Feature required for NVMe passthrough is not enabled.
67    #[error(
68        "NVMe passthrough requires the `uring-cmd-nvme` feature + Linux kernel 6.0+. Fix: add `features = [\"uring-cmd-nvme\"]` to your Cargo.toml"
69    )]
70    NvmePassthroughDisabled,
71    /// Backend error bubbled up from compile or dispatch.
72    #[error("backend error: {0}")]
73    Backend(String),
74}
75
76impl From<vyre_driver::backend::BackendError> for PipelineError {
77    fn from(err: vyre_driver::backend::BackendError) -> Self {
78        PipelineError::Backend(err.to_string())
79    }
80}
81
82/// Persistent megakernel  -  the vyre Program that runs forever on
83/// the GPU, decoding host-fed ring opcodes from a host-fed ring buffer.
84pub mod megakernel;
85
86/// Content-addressed pipeline cache: `blake3(canonicalize(p).to_wire())`
87/// is the cache key.
88pub mod pipeline_cache;
89
90/// Differential megakernel replay log  -  captures every published
91/// ring slot so a later cert run can diff epoch-by-epoch execution
92/// against a live backend.
93pub mod replay;
94
95/// Backend routing policy for execution plans.
96pub mod routing;
97
98/// Multi-GPU work partitioning across runtime backends.
99pub mod scheduler;
100
101/// Multi-tenant megakernel multiplexing  -  one persistent kernel per
102/// GPU, shared across producer tools via the `tenant_id` field already
103/// in the ring protocol.
104pub mod tenant;
105
106pub use replay::{
107    RecordedSlot, ReplayFailureClass, ReplayFailureEvidence, ReplayLogError, ReplayRecord, RingLog,
108};
109pub use tenant::{
110    TenantError, TenantHandle, TenantRegistry, OPCODE_RANGE_PER_TENANT, TENANT_ID_MAX,
111    TENANT_OPCODE_BASE,
112};
113
114#[cfg(feature = "remote")]
115pub use pipeline_cache::RemoteCache;
116pub use pipeline_cache::{
117    DiskCache, DiskCacheError, InMemoryPipelineCache, LayeredPipelineCache,
118    PersistentPipelineCacheStore, PipelineCacheMetricError, PipelineCacheMetrics,
119    PipelineCacheStore, PipelineFingerprint,
120};
121
122pub use megakernel::Megakernel;
123
124/// Linux io_uring integration. Compiled out on macOS / Windows.
125#[cfg(target_os = "linux")]
126#[allow(unsafe_code)]
127pub mod uring;
128
129/// Handle to an orchestrated pipeline. Couples a compiled megakernel
130/// to its submission + completion infrastructure.
131pub struct GpuStream<'a> {
132    #[cfg(target_os = "linux")]
133    uring: Option<uring::AsyncUringStream<'a>>,
134    // On macOS / Windows the `uring` field is compiled out, which leaves the
135    // `'a` lifetime unused and the compiler rejects the struct. Carry a
136    // zero-sized marker so the lifetime stays live on non-Linux targets.
137    #[cfg(not(target_os = "linux"))]
138    _phantom: std::marker::PhantomData<&'a ()>,
139    shutdown_requested: bool,
140}
141
142impl Default for GpuStream<'_> {
143    fn default() -> Self {
144        Self::new()
145    }
146}
147
148impl<'a> GpuStream<'a> {
149    /// Create a pipeline handle with no io_uring stream attached.
150    ///
151    /// # Examples
152    ///
153    /// ```
154    /// use vyre_runtime::GpuStream;
155    ///
156    /// let stream = GpuStream::new();
157    ///
158    /// assert!(!stream.is_shutdown_requested());
159    /// ```
160    #[must_use]
161    pub fn new() -> Self {
162        Self {
163            #[cfg(target_os = "linux")]
164            uring: None,
165            #[cfg(not(target_os = "linux"))]
166            _phantom: std::marker::PhantomData,
167            shutdown_requested: false,
168        }
169    }
170
171    /// Attach an io_uring stream for GPU-visible reads. Linux-only.
172    ///
173    /// Use `uring::NvmeGpuIngestDriver::new_gpudirect` when the caller
174    /// requires the native NVMe → BAR1 path instead of registered mapped reads.
175    #[cfg(target_os = "linux")]
176    #[must_use]
177    pub fn with_uring(mut self, stream: uring::AsyncUringStream<'a>) -> Self {
178        self.uring = Some(stream);
179        self
180    }
181
182    /// Reap completions and bump the megakernel tail pointer.
183    ///
184    /// # Errors
185    ///
186    /// Propagates any uring syscall error from the underlying ring.
187    pub fn poll(&mut self) -> Result<u32, PipelineError> {
188        #[cfg(target_os = "linux")]
189        {
190            if let Some(ref mut stream) = self.uring {
191                return stream.poll();
192            }
193        }
194        Ok(0)
195    }
196
197    /// Request graceful shutdown of the pipeline.
198    pub fn request_shutdown(&mut self) {
199        self.shutdown_requested = true;
200    }
201
202    /// Whether shutdown has been requested.
203    #[must_use]
204    pub fn is_shutdown_requested(&self) -> bool {
205        self.shutdown_requested
206    }
207
208    /// Block until the megakernel writes a new value into the
209    /// observable word. Uses `futex_waitv` on Linux 5.16+.
210    ///
211    /// # Errors
212    ///
213    /// - [`PipelineError::NotLinux`] on non-Linux hosts.
214    /// - [`PipelineError::IoUringSyscall`] on futex errors.
215    ///
216    /// # Safety
217    ///
218    /// `host_visible_addr` must be host-mapped and outlive this call.
219    #[cfg(target_os = "linux")]
220    #[allow(unsafe_code)]
221    pub unsafe fn wait_for_observable(
222        host_visible_addr: *const u32,
223        current: u32,
224        timeout_ns: u64,
225    ) -> Result<(), PipelineError> {
226        #[repr(C)]
227        struct futex_waitv {
228            val: u64,
229            uaddr: u64,
230            flags: u32,
231            __reserved: u32,
232        }
233        const FUTEX2_SIZE_U32: u32 = 0x02;
234        const SYS_FUTEX_WAITV: libc::c_long = 449;
235
236        let waitv = [futex_waitv {
237            val: current as u64,
238            uaddr: host_visible_addr as u64,
239            flags: FUTEX2_SIZE_U32,
240            __reserved: 0,
241        }];
242
243        #[repr(C)]
244        struct Timespec {
245            tv_sec: i64,
246            tv_nsec: i64,
247        }
248        let ts = Timespec {
249            tv_sec: (timeout_ns / 1_000_000_000) as i64,
250            tv_nsec: (timeout_ns % 1_000_000_000) as i64,
251        };
252
253        // SAFETY: Safe FFI / low-level operation verified and audited for Release compliance.
254        let res = unsafe {
255            libc::syscall(
256                SYS_FUTEX_WAITV,
257                waitv.as_ptr() as *const libc::c_void,
258                1u32,
259                0u32,
260                &ts as *const Timespec,
261                0u64,
262            )
263        };
264
265        if res < 0 {
266            // SAFETY: Safe FFI / low-level operation verified and audited for Release compliance.
267            let errno = unsafe { *libc::__errno_location() };
268            if errno == libc::EAGAIN {
269                return Ok(());
270            }
271            return Err(PipelineError::IoUringSyscall {
272                syscall: "futex_waitv",
273                errno,
274                fix: "kernel 5.16+ required; ETIMEDOUT means the value didn't change within timeout_ns",
275            });
276        }
277        Ok(())
278    }
279
280    /// Non-Linux implementation returning the structured platform error.
281    #[cfg(not(target_os = "linux"))]
282    #[allow(unsafe_code, clippy::missing_safety_doc)]
283    pub unsafe fn wait_for_observable(
284        _host_visible_addr: *const u32,
285        _current: u32,
286        _timeout_ns: u64,
287    ) -> Result<(), PipelineError> {
288        Err(PipelineError::NotLinux)
289    }
290}
291
292/// Linux-only: host-visible GPU buffer that io_uring can DMA into.
293#[cfg(target_os = "linux")]
294pub use uring::GpuMappedBuffer;
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299
300    #[test]
301    fn construct_stream_has_no_shutdown() {
302        let stream = GpuStream::new();
303        assert!(!stream.is_shutdown_requested());
304    }
305
306    #[test]
307    fn shutdown_is_idempotent() {
308        let mut stream = GpuStream::new();
309        stream.request_shutdown();
310        stream.request_shutdown();
311        assert!(stream.is_shutdown_requested());
312    }
313
314    #[test]
315    fn poll_without_uring_returns_zero() {
316        let mut stream = GpuStream::new();
317        assert_eq!(stream.poll().unwrap(), 0);
318    }
319}