#![deny(missing_docs)]
#![warn(unreachable_pub)]
#![allow(unsafe_code)]
#[derive(Debug, Clone, thiserror::Error)]
#[non_exhaustive]
pub enum PipelineError {
#[error("io_uring {syscall} failed: errno={errno}. Fix: {fix}")]
IoUringSyscall {
syscall: &'static str,
errno: i32,
fix: &'static str,
},
#[error("io_uring {queue} queue at capacity. Fix: {fix}")]
QueueFull {
queue: &'static str,
fix: &'static str,
},
#[error(
"io_uring is Linux-only. Fix: run on Linux 5.1+ or use Megakernel::dispatch without a GpuStream"
)]
NotLinux,
#[error(
"NVMe passthrough requires the `uring-cmd-nvme` feature + Linux kernel 6.0+. Fix: add `features = [\"uring-cmd-nvme\"]` to your Cargo.toml"
)]
NvmePassthroughDisabled,
#[error("backend error: {0}")]
Backend(String),
}
impl From<vyre_driver::backend::BackendError> for PipelineError {
fn from(err: vyre_driver::backend::BackendError) -> Self {
PipelineError::Backend(err.to_string())
}
}
pub mod megakernel;
pub mod pipeline_cache;
pub mod replay;
pub mod routing;
pub mod scheduler;
pub mod tenant;
pub use replay::{RecordedSlot, ReplayLogError, RingLog};
pub use tenant::{
TenantError, TenantHandle, TenantRegistry, OPCODE_RANGE_PER_TENANT, TENANT_ID_MAX,
TENANT_OPCODE_BASE,
};
#[cfg(feature = "remote")]
pub use pipeline_cache::RemoteCache;
pub use pipeline_cache::{
DiskCache, DiskCacheError, InMemoryPipelineCache, LayeredPipelineCache,
PersistentPipelineCacheStore, PipelineCacheMetrics, PipelineCacheStore, PipelineFingerprint,
};
pub use megakernel::Megakernel;
#[cfg(target_os = "linux")]
#[allow(unsafe_code)]
pub mod uring;
pub struct GpuStream<'a> {
#[cfg(target_os = "linux")]
uring: Option<uring::AsyncUringStream<'a>>,
#[cfg(not(target_os = "linux"))]
_phantom: std::marker::PhantomData<&'a ()>,
shutdown_requested: bool,
}
impl Default for GpuStream<'_> {
fn default() -> Self {
Self::new()
}
}
impl<'a> GpuStream<'a> {
#[must_use]
pub fn new() -> Self {
Self {
#[cfg(target_os = "linux")]
uring: None,
#[cfg(not(target_os = "linux"))]
_phantom: std::marker::PhantomData,
shutdown_requested: false,
}
}
#[cfg(target_os = "linux")]
#[must_use]
pub fn with_uring(mut self, stream: uring::AsyncUringStream<'a>) -> Self {
self.uring = Some(stream);
self
}
pub fn poll(&mut self) -> Result<u32, PipelineError> {
#[cfg(target_os = "linux")]
{
if let Some(ref mut stream) = self.uring {
return stream.poll();
}
}
Ok(0)
}
pub fn request_shutdown(&mut self) {
self.shutdown_requested = true;
}
#[must_use]
pub fn is_shutdown_requested(&self) -> bool {
self.shutdown_requested
}
#[cfg(target_os = "linux")]
#[allow(unsafe_code)]
pub unsafe fn wait_for_observable(
host_visible_addr: *const u32,
current: u32,
timeout_ns: u64,
) -> Result<(), PipelineError> {
#[repr(C)]
struct futex_waitv {
val: u64,
uaddr: u64,
flags: u32,
__reserved: u32,
}
const FUTEX2_SIZE_U32: u32 = 0x02;
const SYS_FUTEX_WAITV: libc::c_long = 449;
let waitv = [futex_waitv {
val: current as u64,
uaddr: host_visible_addr as u64,
flags: FUTEX2_SIZE_U32,
__reserved: 0,
}];
#[repr(C)]
struct Timespec {
tv_sec: i64,
tv_nsec: i64,
}
let ts = Timespec {
tv_sec: (timeout_ns / 1_000_000_000) as i64,
tv_nsec: (timeout_ns % 1_000_000_000) as i64,
};
let res = unsafe {
libc::syscall(
SYS_FUTEX_WAITV,
waitv.as_ptr() as *const libc::c_void,
1u32,
0u32,
&ts as *const Timespec,
0u64,
)
};
if res < 0 {
let errno = unsafe { *libc::__errno_location() };
if errno == libc::EAGAIN {
return Ok(());
}
return Err(PipelineError::IoUringSyscall {
syscall: "futex_waitv",
errno,
fix: "kernel 5.16+ required; ETIMEDOUT means the value didn't change within timeout_ns",
});
}
Ok(())
}
#[cfg(not(target_os = "linux"))]
#[allow(unsafe_code, clippy::missing_safety_doc)]
pub unsafe fn wait_for_observable(
_host_visible_addr: *const u32,
_current: u32,
_timeout_ns: u64,
) -> Result<(), PipelineError> {
Err(PipelineError::NotLinux)
}
}
#[cfg(target_os = "linux")]
pub use uring::GpuMappedBuffer;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn construct_stream_has_no_shutdown() {
let stream = GpuStream::new();
assert!(!stream.is_shutdown_requested());
}
#[test]
fn shutdown_is_idempotent() {
let mut stream = GpuStream::new();
stream.request_shutdown();
stream.request_shutdown();
assert!(stream.is_shutdown_requested());
}
#[test]
fn poll_without_uring_returns_zero() {
let mut stream = GpuStream::new();
assert_eq!(stream.poll().unwrap(), 0);
}
}