use std::ffi::c_void;
use std::fs::File;
use std::io;
use std::os::unix::fs::FileExt;
#[cfg(feature = "model-cogito-v2-671b")]
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
use memmap2::Mmap;
use metal::{Buffer, MTLResourceOptions, NSUInteger};
use moeflux_metal::ResidencySet;
use crate::riir::backend::buftype::ExpertBaseBuf;
use crate::riir::backend::{BufId, MetalBufferPool};
use crate::riir::variants::{Variant, VARIANT};
#[cfg(feature = "model-cogito-v2-671b")]
fn disable_readahead(file: &File) {
#[cfg(target_os = "macos")]
unsafe {
let rc = libc::fcntl(file.as_raw_fd(), libc::F_RDAHEAD, 0i32);
if rc < 0 {
let err = std::io::Error::last_os_error();
eprintln!("[experts] fcntl(F_RDAHEAD, 0) failed: {err}");
}
}
#[cfg(not(target_os = "macos"))]
{
let _ = file;
}
}
#[cfg(not(feature = "model-cogito-v2-671b"))]
fn disable_readahead(_file: &File) {}
#[derive(Debug, thiserror::Error)]
pub enum ExpertIoError {
#[error("layer_idx {layer} out of range (must be < {num_layers})")]
BadLayerIdx { layer: i32, num_layers: usize },
#[error("expert_idx {expert} out of range (must be < {num_experts})")]
BadExpertIdx { expert: i32, num_experts: usize },
#[error("layer {layer} file not opened (probably missing on disk)")]
LayerFileMissing { layer: usize },
#[error(
"expert blob read short: expected {expected} bytes at \
layer={layer} expert={expert}, got {actual}"
)]
ShortRead {
layer: usize,
expert: usize,
expected: usize,
actual: usize,
},
#[error("out buffer must be EXPERT_SIZE={expected} bytes, got {actual}")]
BadOutLen { expected: usize, actual: usize },
#[error("I/O error reading layer {layer}: {source}")]
Io {
layer: usize,
#[source]
source: io::Error,
},
}
pub struct ExpertFiles {
expert_rset: Option<ResidencySet>,
layers: Vec<Option<File>>,
expert_size: usize,
experts_dir: PathBuf,
mmap_buffers: Vec<Option<Buffer>>,
mmap_buf_ids: Vec<Option<BufId<ExpertBaseBuf>>>,
mmap_layers: Vec<Option<Mmap>>,
}
impl ExpertFiles {
pub fn open(experts_dir: &Path) -> Result<Self, ExpertIoError> {
let v: Variant = VARIANT;
let subdir = experts_dir.join("packed_experts");
let mut layers = Vec::with_capacity(v.num_layers);
let mut mmap_layers: Vec<Option<Mmap>> =
(0..v.num_layers).map(|_| None).collect();
for i in 0..v.num_layers {
let path = subdir.join(format!("layer_{i:02}.bin"));
match File::open(&path) {
Ok(f) => {
disable_readahead(&f);
let mmap = unsafe { Mmap::map(&f) }.map_err(|e| {
ExpertIoError::Io {
layer: i,
source: e,
}
})?;
mmap_layers[i] = Some(mmap);
layers.push(Some(f));
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
layers.push(None);
}
Err(e) => {
return Err(ExpertIoError::Io {
layer: i,
source: e,
});
}
}
}
let mmap_buffers = (0..v.num_layers).map(|_| None).collect();
let mmap_buf_ids = (0..v.num_layers).map(|_| None).collect();
eprintln!(
"[experts] {} layer(s) mmap'd \
(Metal buffers built on first batched call)",
mmap_layers.iter().filter(|m| m.is_some()).count()
);
Ok(Self {
expert_rset: None,
layers,
expert_size: v.expert_size_4bit(),
experts_dir: experts_dir.to_path_buf(),
mmap_buffers,
mmap_buf_ids,
mmap_layers,
})
}
pub fn attach_to_device(
&mut self,
pool: &mut MetalBufferPool,
mode: super::expert_io_mode::ExpertIoMode,
) {
if mode.is_pread() {
return;
}
if self.expert_rset.is_none() {
self.expert_rset = ResidencySet::new(
pool.device(),
"moeflux.experts",
self.mmap_layers.len() as u64,
);
if self.expert_rset.is_none()
&& !moeflux_metal::residency_set::is_available()
{
eprintln!(
"[experts] MTLResidencySet unavailable (macOS < 15) \
— expert buffers will not be pinned"
);
}
}
let mut added_any = false;
for i in 0..self.mmap_layers.len() {
if self.mmap_buffers[i].is_some() {
continue;
}
let Some(mmap) = self.mmap_layers[i].as_ref() else {
continue;
};
let raw_len = mmap.len();
let page = 16384usize;
let aligned_len = (raw_len + page - 1) & !(page - 1);
let buf = pool.device().new_buffer_with_bytes_no_copy(
mmap.as_ptr() as *const c_void,
aligned_len as NSUInteger,
MTLResourceOptions::StorageModeShared,
None,
);
let id = pool.register_borrowed(
buf.clone(),
aligned_len,
"expert_io.mmap_layer",
true,
);
if let Some(rset) = self.expert_rset.as_ref() {
rset.add_allocation(&buf);
}
self.mmap_buffers[i] = Some(buf);
self.mmap_buf_ids[i] = Some(id);
added_any = true;
}
if added_any {
if let Some(rset) = self.expert_rset.as_ref() {
rset.commit();
rset.request_residency();
}
}
}
pub fn mmap_id_for_expert(
&self,
layer_idx: usize,
expert_idx: u32,
) -> Option<(BufId<ExpertBaseBuf>, u64)> {
let id = self.mmap_buf_ids.get(layer_idx)?.as_ref()?;
let off = (expert_idx as u64) * (self.expert_size as u64);
Some((*id, off))
}
pub fn mmap_buffer_for_expert(
&self,
layer_idx: usize,
expert_idx: u32,
) -> Option<(&Buffer, u64)> {
let buf = self.mmap_buffers.get(layer_idx)?.as_ref()?;
let off = (expert_idx as u64) * (self.expert_size as u64);
Some((buf, off))
}
pub fn read_expert(
&self,
layer_idx: usize,
expert_idx: usize,
out: &mut [u8],
) -> Result<(), ExpertIoError> {
let v: Variant = VARIANT;
if layer_idx >= v.num_layers {
return Err(ExpertIoError::BadLayerIdx {
layer: layer_idx as i32,
num_layers: v.num_layers,
});
}
if expert_idx >= v.num_experts {
return Err(ExpertIoError::BadExpertIdx {
expert: expert_idx as i32,
num_experts: v.num_experts,
});
}
if out.len() != self.expert_size {
return Err(ExpertIoError::BadOutLen {
expected: self.expert_size,
actual: out.len(),
});
}
let Some(file) = self.layers[layer_idx].as_ref() else {
return Err(ExpertIoError::LayerFileMissing { layer: layer_idx });
};
let off = (expert_idx as u64) * (self.expert_size as u64);
let n = file.read_at(out, off).map_err(|e| ExpertIoError::Io {
layer: layer_idx,
source: e,
})?;
if n != self.expert_size {
return Err(ExpertIoError::ShortRead {
layer: layer_idx,
expert: expert_idx,
expected: self.expert_size,
actual: n,
});
}
Ok(())
}
pub fn num_layers(&self) -> usize {
self.layers.len()
}
pub fn has_layer(&self, layer_idx: usize) -> bool {
self.layers
.get(layer_idx)
.map(Option::is_some)
.unwrap_or(false)
}
}
impl std::fmt::Debug for ExpertFiles {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let opened = self.layers.iter().filter(|s| s.is_some()).count();
f.debug_struct("ExpertFiles")
.field("experts_dir", &self.experts_dir)
.field("num_layers", &self.layers.len())
.field("opened", &opened)
.field("expert_size", &self.expert_size)
.finish()
}
}