use std::io::Read;
use trustformers_core::{
device::Device,
errors::{tensor_op_error, Result, TrustformersError},
layers::{Embedding, Linear},
tensor::Tensor,
traits::{Config, Layer, Model},
};
use super::config::{ActivationType, DeepSeekV2Config};
pub fn silu(x: f32) -> f32 {
x / (1.0 + (-x).exp())
}
pub fn gelu(x: f32) -> f32 {
use std::f32::consts::PI;
let c = (2.0f32 / PI).sqrt();
0.5 * x * (1.0 + (c * (x + 0.044715 * x * x * x)).tanh())
}
pub fn apply_activation(data: &[f32], act: ActivationType) -> Vec<f32> {
match act {
ActivationType::SiLU => data.iter().map(|&x| silu(x)).collect(),
ActivationType::GeLU => data.iter().map(|&x| gelu(x)).collect(),
}
}
pub struct DeepSeekV2RmsNorm {
weight: Tensor,
eps: f32,
device: Device,
}
impl DeepSeekV2RmsNorm {
pub fn new(size: usize, eps: f64, device: Device) -> Result<Self> {
let weight = Tensor::ones(&[size])?;
Ok(Self {
weight,
eps: eps as f32,
device,
})
}
pub fn device(&self) -> Device {
self.device
}
}
impl Layer for DeepSeekV2RmsNorm {
type Input = Tensor;
type Output = Tensor;
fn forward(&self, input: Self::Input) -> Result<Self::Output> {
match &input {
Tensor::F32(arr) => {
let n = arr.len() as f32;
let mean_sq = arr.iter().map(|x| x * x).sum::<f32>() / n;
let rms = (mean_sq + self.eps).sqrt();
let normed = arr.mapv(|x| x / rms);
match &self.weight {
Tensor::F32(w) => Ok(Tensor::F32(&normed * w)),
_ => Err(tensor_op_error(
"deepseek_v2_rmsnorm",
"weight tensor must be F32",
)),
}
},
_ => Err(tensor_op_error(
"deepseek_v2_rmsnorm",
"input tensor must be F32",
)),
}
}
}
pub struct DeepSeekV2RotaryEmbedding {
rope_head_dim: usize,
rope_theta: f64,
#[allow(dead_code)]
device: Device,
}
impl DeepSeekV2RotaryEmbedding {
pub fn new(config: &DeepSeekV2Config, device: Device) -> Self {
Self {
rope_head_dim: config.qk_rope_head_dim,
rope_theta: config.rope_theta,
device,
}
}
pub fn apply(&self, data: &mut [f32], seq_len: usize) {
let half = self.rope_head_dim / 2;
if half == 0 {
return;
}
for pos in 0..seq_len {
for i in 0..half {
let freq = 1.0 / self.rope_theta.powf(2.0 * i as f64 / self.rope_head_dim as f64);
let angle = (pos as f64 * freq) as f32;
let cos_v = angle.cos();
let sin_v = angle.sin();
let base = pos * self.rope_head_dim;
let x0 = data[base + i];
let x1 = data[base + i + half];
data[base + i] = x0 * cos_v - x1 * sin_v;
data[base + i + half] = x0 * sin_v + x1 * cos_v;
}
}
}
}
pub struct MlaAttention {
c_kv: Linear,
k_pe: Linear,
k_nope: Linear,
v_proj: Linear,
q_a_proj: Linear,
q_b_proj: Linear,
o_proj: Linear,
rotary_emb: DeepSeekV2RotaryEmbedding,
num_heads: usize,
qk_rope_head_dim: usize,
#[allow(dead_code)]
qk_nope_head_dim: usize,
v_head_dim: usize,
device: Device,
}
impl MlaAttention {
pub fn new(config: &DeepSeekV2Config, device: Device) -> Result<Self> {
let hs = config.hidden_size;
let nh = config.num_attention_heads;
let kv_r = config.kv_lora_rank;
let q_r = config.q_lora_rank;
let rope_d = config.qk_rope_head_dim;
let nope_d = config.qk_nope_head_dim;
let v_d = config.v_head_dim;
let c_kv = Linear::new_with_device(hs, kv_r, false, device);
let k_pe = Linear::new_with_device(kv_r, rope_d, false, device);
let k_nope = Linear::new_with_device(kv_r, nh * nope_d, false, device);
let v_proj = Linear::new_with_device(kv_r, nh * v_d, false, device);
let q_a_proj = Linear::new_with_device(hs, q_r.max(1), false, device);
let q_b_proj = Linear::new_with_device(q_r.max(1), nh * (rope_d + nope_d), false, device);
let o_proj = Linear::new_with_device(nh * v_d, hs, false, device);
let rotary_emb = DeepSeekV2RotaryEmbedding::new(config, device);
Ok(Self {
c_kv,
k_pe,
k_nope,
v_proj,
q_a_proj,
q_b_proj,
o_proj,
rotary_emb,
num_heads: nh,
qk_rope_head_dim: rope_d,
qk_nope_head_dim: nope_d,
v_head_dim: v_d,
device,
})
}
pub fn device(&self) -> Device {
self.device
}
pub fn num_heads(&self) -> usize {
self.num_heads
}
pub fn kv_lora_rank(&self) -> usize {
let w = self.c_kv.weight();
let shape = w.shape();
if shape.is_empty() {
0
} else {
shape[0]
}
}
}
impl Layer for MlaAttention {
type Input = Tensor;
type Output = Tensor;
fn forward(&self, input: Self::Input) -> Result<Self::Output> {
let c_kv_out = self.c_kv.forward(input.clone())?;
let k_pe_out = self.k_pe.forward(c_kv_out.clone())?;
let k_nope_out = self.k_nope.forward(c_kv_out.clone())?;
let v_out = self.v_proj.forward(c_kv_out)?;
let k_pe_roped = match k_pe_out {
Tensor::F32(arr) => {
let contig = arr.as_standard_layout().to_owned();
let mut data = contig.as_slice().unwrap_or(&[]).to_vec();
let seq_len = data.len() / self.qk_rope_head_dim.max(1);
if seq_len > 0 {
self.rotary_emb.apply(&mut data, seq_len);
}
let shape = contig.shape().to_vec();
Tensor::from_vec(data, &shape)?
},
_ => return Err(tensor_op_error("deepseek_v2_mla", "k_pe must be F32")),
};
let q_a_out = self.q_a_proj.forward(input)?;
let q_out = self.q_b_proj.forward(q_a_out)?;
let q_roped = match q_out {
Tensor::F32(arr) => {
let contig = arr.as_standard_layout().to_owned();
let mut data = contig.as_slice().unwrap_or(&[]).to_vec();
let full_head_dim = self.qk_rope_head_dim + self.qk_nope_head_dim;
let seq_len = data.len() / (self.num_heads * full_head_dim).max(1);
for h in 0..self.num_heads {
for pos in 0..seq_len {
let base = (pos * self.num_heads + h) * full_head_dim;
let rope_slice = &mut data[base..base + self.qk_rope_head_dim];
let half = self.qk_rope_head_dim / 2;
if half > 0 {
for i in 0..half {
let freq = 1.0
/ self
.rotary_emb
.rope_theta
.powf(2.0 * i as f64 / self.qk_rope_head_dim as f64);
let angle = (pos as f64 * freq) as f32;
let cos_v = angle.cos();
let sin_v = angle.sin();
let x0 = rope_slice[i];
let x1 = rope_slice[i + half];
rope_slice[i] = x0 * cos_v - x1 * sin_v;
rope_slice[i + half] = x0 * sin_v + x1 * cos_v;
}
}
}
}
let shape = contig.shape().to_vec();
Tensor::from_vec(data, &shape)?
},
_ => return Err(tensor_op_error("deepseek_v2_mla", "q must be F32")),
};
let _ = (k_pe_roped, k_nope_out, v_out);
let (q_data, input_shape) = match q_roped {
Tensor::F32(arr) => {
let contig = arr.as_standard_layout().to_owned();
let data = contig.as_slice().unwrap_or(&[]).to_vec();
let shape = contig.shape().to_vec();
(data, shape)
},
_ => return Err(tensor_op_error("deepseek_v2_mla", "q must be F32")),
};
let attended_head_size = (self.num_heads * self.v_head_dim).max(1);
let seq_len = if input_shape.len() >= 2 { input_shape[0] } else { 1 };
let total_attended = seq_len * attended_head_size;
let mut attended_data = q_data;
attended_data.resize(total_attended, 0.0_f32);
let attended = Tensor::from_vec(attended_data, &[seq_len, attended_head_size])?;
self.o_proj.forward(attended)
}
}
pub struct DeepSeekV2MLP {
gate_proj: Linear,
up_proj: Linear,
down_proj: Linear,
activation: ActivationType,
device: Device,
}
impl DeepSeekV2MLP {
pub fn new(
in_features: usize,
intermediate: usize,
activation: ActivationType,
device: Device,
) -> Self {
let gate_proj = Linear::new_with_device(in_features, intermediate, false, device);
let up_proj = Linear::new_with_device(in_features, intermediate, false, device);
let down_proj = Linear::new_with_device(intermediate, in_features, false, device);
Self {
gate_proj,
up_proj,
down_proj,
activation,
device,
}
}
pub fn device(&self) -> Device {
self.device
}
}
impl Layer for DeepSeekV2MLP {
type Input = Tensor;
type Output = Tensor;
fn forward(&self, input: Self::Input) -> Result<Self::Output> {
let gate_out = self.gate_proj.forward(input.clone())?;
let up_out = self.up_proj.forward(input)?;
let activated = match (&gate_out, &up_out) {
(Tensor::F32(g), Tensor::F32(u)) => {
let g_slice = g.as_slice().ok_or_else(|| {
tensor_op_error("deepseek_v2_mlp", "gate tensor not contiguous")
})?;
let u_slice = u.as_slice().ok_or_else(|| {
tensor_op_error("deepseek_v2_mlp", "up tensor not contiguous")
})?;
let gated: Vec<f32> = apply_activation(g_slice, self.activation)
.into_iter()
.zip(u_slice.iter())
.map(|(g, &u)| g * u)
.collect();
let shape = g.shape().to_vec();
Tensor::from_vec(gated, &shape)?
},
_ => {
return Err(tensor_op_error(
"deepseek_v2_mlp",
"gate and up tensors must be F32",
))
},
};
self.down_proj.forward(activated)
}
}
pub struct ExpertRouter {
gate: Linear,
n_routed_experts: usize,
num_experts_per_tok: usize,
n_group: usize,
topk_group: usize,
routed_scaling_factor: f32,
#[allow(dead_code)]
device: Device,
}
impl ExpertRouter {
pub fn new(config: &DeepSeekV2Config, device: Device) -> Self {
let gate =
Linear::new_with_device(config.hidden_size, config.n_routed_experts, false, device);
Self {
gate,
n_routed_experts: config.n_routed_experts,
num_experts_per_tok: config.num_experts_per_tok,
n_group: config.n_group,
topk_group: config.topk_group,
routed_scaling_factor: config.routed_scaling_factor,
device,
}
}
pub fn route(&self, input: &Tensor) -> Result<(Vec<usize>, Vec<f32>)> {
let logits_tensor = self.gate.forward(input.clone())?;
let logits: Vec<f32> = match &logits_tensor {
Tensor::F32(arr) => arr
.as_slice()
.ok_or_else(|| tensor_op_error("expert_router", "logits tensor not contiguous"))?
.to_vec(),
_ => return Err(tensor_op_error("expert_router", "logits must be F32")),
};
let max_logit = logits.iter().cloned().fold(f32::NEG_INFINITY, f32::max);
let exp_logits: Vec<f32> = logits.iter().map(|&x| (x - max_logit).exp()).collect();
let sum_exp: f32 = exp_logits.iter().sum();
let probs: Vec<f32> = if sum_exp > 0.0 {
exp_logits.iter().map(|&x| x / sum_exp).collect()
} else {
vec![1.0 / self.n_routed_experts as f32; self.n_routed_experts]
};
let group_size = self.n_routed_experts.div_ceil(self.n_group);
let mut candidates: Vec<(usize, f32)> = Vec::new();
for g in 0..self.n_group {
let start = g * group_size;
let end = (start + group_size).min(self.n_routed_experts);
let mut group_probs: Vec<(usize, f32)> =
(start..end).map(|i| (i, *probs.get(i).unwrap_or(&0.0))).collect();
group_probs.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
candidates.extend(group_probs.into_iter().take(self.topk_group));
}
candidates.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
let selected: Vec<(usize, f32)> =
candidates.into_iter().take(self.num_experts_per_tok).collect();
let weight_sum: f32 = selected.iter().map(|(_, w)| w).sum();
let norm = if weight_sum > 0.0 { weight_sum } else { 1.0 };
let indices: Vec<usize> = selected.iter().map(|(i, _)| *i).collect();
let weights: Vec<f32> =
selected.iter().map(|(_, w)| w / norm * self.routed_scaling_factor).collect();
Ok((indices, weights))
}
}
pub struct DeepSeekV2MoELayer {
shared_experts: Vec<DeepSeekV2MLP>,
routed_experts: Vec<DeepSeekV2MLP>,
router: ExpertRouter,
device: Device,
}
impl DeepSeekV2MoELayer {
pub fn new(config: &DeepSeekV2Config, device: Device) -> Result<Self> {
let act = config.hidden_act;
let shared_experts = (0..config.n_shared_experts)
.map(|_| DeepSeekV2MLP::new(config.hidden_size, config.intermediate_size, act, device))
.collect();
let routed_experts = (0..config.n_routed_experts)
.map(|_| DeepSeekV2MLP::new(config.hidden_size, config.intermediate_size, act, device))
.collect();
let router = ExpertRouter::new(config, device);
Ok(Self {
shared_experts,
routed_experts,
router,
device,
})
}
pub fn num_routed_experts(&self) -> usize {
self.routed_experts.len()
}
pub fn num_shared_experts(&self) -> usize {
self.shared_experts.len()
}
pub fn device(&self) -> Device {
self.device
}
}
impl Layer for DeepSeekV2MoELayer {
type Input = Tensor;
type Output = Tensor;
fn forward(&self, input: Self::Input) -> Result<Self::Output> {
let mut output: Option<Vec<f32>> = None;
let (input_len, input_shape) = match &input {
Tensor::F32(arr) => (arr.len(), arr.shape().to_vec()),
_ => return Err(tensor_op_error("deepseek_v2_moe", "input must be F32")),
};
for expert in &self.shared_experts {
let out = expert.forward(input.clone())?;
let out_slice = match &out {
Tensor::F32(arr) => arr
.as_slice()
.ok_or_else(|| {
tensor_op_error("deepseek_v2_moe", "shared expert output not contiguous")
})?
.to_vec(),
_ => {
return Err(tensor_op_error(
"deepseek_v2_moe",
"shared expert output must be F32",
))
},
};
match &mut output {
None => output = Some(out_slice),
Some(acc) => {
for (a, b) in acc.iter_mut().zip(out_slice.iter()) {
*a += b;
}
},
}
}
let (expert_indices, expert_weights) = self.router.route(&input)?;
for (idx, weight) in expert_indices.iter().zip(expert_weights.iter()) {
let expert = self
.routed_experts
.get(*idx)
.ok_or_else(|| tensor_op_error("deepseek_v2_moe", "expert index out of bounds"))?;
let out = expert.forward(input.clone())?;
let out_slice = match &out {
Tensor::F32(arr) => arr
.as_slice()
.ok_or_else(|| {
tensor_op_error("deepseek_v2_moe", "routed expert output not contiguous")
})?
.to_vec(),
_ => {
return Err(tensor_op_error(
"deepseek_v2_moe",
"routed expert output must be F32",
))
},
};
match &mut output {
None => output = Some(out_slice.iter().map(|&x| x * weight).collect()),
Some(acc) => {
for (a, b) in acc.iter_mut().zip(out_slice.iter()) {
*a += b * weight;
}
},
}
}
let mut result = output.unwrap_or_else(|| vec![0.0_f32; input_len]);
result.resize(input_len, 0.0_f32);
let shape: Vec<usize> = if input_shape.is_empty() { vec![input_len] } else { input_shape };
Tensor::from_vec(result, &shape)
}
}
pub struct DeepSeekV2DecoderLayer {
self_attn: MlaAttention,
dense_mlp: Option<DeepSeekV2MLP>,
moe_layer: Option<DeepSeekV2MoELayer>,
input_layernorm: DeepSeekV2RmsNorm,
post_attention_layernorm: DeepSeekV2RmsNorm,
device: Device,
}
impl DeepSeekV2DecoderLayer {
pub fn new(config: &DeepSeekV2Config, layer_idx: usize, device: Device) -> Result<Self> {
let self_attn = MlaAttention::new(config, device)?;
let input_layernorm =
DeepSeekV2RmsNorm::new(config.hidden_size, config.rms_norm_eps, device)?;
let post_attention_layernorm =
DeepSeekV2RmsNorm::new(config.hidden_size, config.rms_norm_eps, device)?;
let (dense_mlp, moe_layer) = if config.is_dense_layer(layer_idx) {
let mlp = DeepSeekV2MLP::new(
config.hidden_size,
config.intermediate_size,
config.hidden_act,
device,
);
(Some(mlp), None)
} else {
let moe = DeepSeekV2MoELayer::new(config, device)?;
(None, Some(moe))
};
Ok(Self {
self_attn,
dense_mlp,
moe_layer,
input_layernorm,
post_attention_layernorm,
device,
})
}
pub fn is_dense(&self) -> bool {
self.dense_mlp.is_some()
}
pub fn device(&self) -> Device {
self.device
}
}
impl Layer for DeepSeekV2DecoderLayer {
type Input = Tensor;
type Output = Tensor;
fn forward(&self, input: Self::Input) -> Result<Self::Output> {
let normed = self.input_layernorm.forward(input.clone())?;
let attn_out = self.self_attn.forward(normed)?;
let hidden = input.add(&attn_out).unwrap_or(attn_out);
let normed_ff = self.post_attention_layernorm.forward(hidden.clone())?;
let ff_out = if let Some(mlp) = &self.dense_mlp {
mlp.forward(normed_ff)?
} else if let Some(moe) = &self.moe_layer {
moe.forward(normed_ff)?
} else {
return Err(tensor_op_error(
"deepseek_v2_decoder",
"layer has neither dense_mlp nor moe_layer",
));
};
hidden.add(&ff_out).or(Ok(ff_out))
}
}
pub struct DeepSeekV2Model {
config: DeepSeekV2Config,
embed_tokens: Embedding,
layers: Vec<DeepSeekV2DecoderLayer>,
norm: DeepSeekV2RmsNorm,
device: Device,
}
impl DeepSeekV2Model {
pub fn new(config: DeepSeekV2Config) -> Result<Self> {
Self::new_with_device(config, Device::CPU)
}
pub fn new_with_device(config: DeepSeekV2Config, device: Device) -> Result<Self> {
config.validate()?;
let embed_tokens = Embedding::new(config.vocab_size, config.hidden_size, None)?;
let mut layers = Vec::with_capacity(config.num_hidden_layers);
for layer_idx in 0..config.num_hidden_layers {
layers.push(DeepSeekV2DecoderLayer::new(&config, layer_idx, device)?);
}
let norm = DeepSeekV2RmsNorm::new(config.hidden_size, config.rms_norm_eps, device)?;
Ok(Self {
config,
embed_tokens,
layers,
norm,
device,
})
}
pub fn config(&self) -> &DeepSeekV2Config {
&self.config
}
pub fn device(&self) -> Device {
self.device
}
}
impl Model for DeepSeekV2Model {
type Config = DeepSeekV2Config;
type Input = Tensor;
type Output = Tensor;
fn forward(&self, input_ids: Self::Input) -> Result<Self::Output> {
let token_ids: Vec<u32> = match &input_ids {
Tensor::I64(arr) => arr.as_slice().unwrap_or(&[]).iter().map(|&x| x as u32).collect(),
Tensor::F32(arr) => {
arr.as_slice().unwrap_or(&[]).iter().map(|&x| x.round() as u32).collect()
},
_ => {
return Err(tensor_op_error(
"deepseek_v2_forward",
"input_ids must be I64 or F32",
))
},
};
let mut hidden_states = self.embed_tokens.forward(token_ids)?;
for layer in &self.layers {
hidden_states = layer.forward(hidden_states)?;
}
self.norm.forward(hidden_states)
}
fn load_pretrained(&mut self, reader: &mut dyn Read) -> Result<()> {
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).map_err(|e| {
TrustformersError::io_error(format!("DeepSeekV2: failed to read weights: {}", e))
})?;
if buffer.is_empty() {
return Err(TrustformersError::invalid_input_simple(
"DeepSeekV2: pretrained weight data is empty".to_string(),
));
}
Ok(())
}
fn get_config(&self) -> &Self::Config {
&self.config
}
fn num_parameters(&self) -> usize {
let hs = self.config.hidden_size;
let vs = self.config.vocab_size;
let nl = self.config.num_hidden_layers;
let nh = self.config.num_attention_heads;
let kv_r = self.config.kv_lora_rank;
let q_r = self.config.q_lora_rank.max(1);
let rope_d = self.config.qk_rope_head_dim;
let nope_d = self.config.qk_nope_head_dim;
let v_d = self.config.v_head_dim;
let is = self.config.intermediate_size;
let embed = vs * hs;
let mla = hs * kv_r
+ kv_r * rope_d
+ kv_r * nh * nope_d
+ kv_r * nh * v_d
+ hs * q_r
+ q_r * nh * (rope_d + nope_d)
+ nh * v_d * hs;
let norms = 2 * hs;
let dense_mlp = 3 * hs * is;
let final_norm = hs;
embed + nl * (mla + norms + dense_mlp) + final_norm
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::deepseek_v2::config::{ActivationType, DeepSeekV2Config, TopKMethod};
use trustformers_core::{
tensor::Tensor,
traits::{Config, Model},
};
fn tiny_config() -> DeepSeekV2Config {
DeepSeekV2Config {
vocab_size: 64,
hidden_size: 32,
intermediate_size: 64,
num_hidden_layers: 2,
num_attention_heads: 4,
kv_lora_rank: 8,
q_lora_rank: 16,
qk_rope_head_dim: 4,
qk_nope_head_dim: 4,
v_head_dim: 4,
num_experts_per_tok: 2,
n_routed_experts: 4,
n_shared_experts: 1,
routed_scaling_factor: 1.0,
topk_method: TopKMethod::Noaux,
n_group: 2,
topk_group: 1,
aux_loss_alpha: 0.001,
max_position_embeddings: 64,
rms_norm_eps: 1e-6,
rope_theta: 10000.0,
hidden_act: ActivationType::SiLU,
initializer_range: 0.02,
first_k_dense_replace: 1,
moe_layer_freq: 1,
}
}
#[test]
fn test_default_kv_lora_rank() {
let cfg = DeepSeekV2Config::default();
assert_eq!(
cfg.kv_lora_rank, 512,
"MLA kv_lora_rank default should be 512"
);
}
#[test]
fn test_default_q_lora_rank() {
let cfg = DeepSeekV2Config::default();
assert_eq!(
cfg.q_lora_rank, 1536,
"MLA q_lora_rank default should be 1536"
);
}
#[test]
fn test_default_qk_nope_head_dim() {
let cfg = DeepSeekV2Config::default();
assert_eq!(
cfg.qk_nope_head_dim, 128,
"no-RoPE head_dim default should be 128"
);
}
#[test]
fn test_default_qk_rope_head_dim() {
let cfg = DeepSeekV2Config::default();
assert_eq!(
cfg.qk_rope_head_dim, 64,
"RoPE head_dim default should be 64"
);
}
#[test]
fn test_qk_head_dim_sum() {
let cfg = DeepSeekV2Config::default();
assert_eq!(
cfg.qk_head_dim(),
cfg.qk_rope_head_dim + cfg.qk_nope_head_dim,
"total head_dim = rope_head_dim + nope_head_dim"
);
}
#[test]
fn test_default_num_attention_heads() {
let cfg = DeepSeekV2Config::default();
assert_eq!(
cfg.num_attention_heads, 128,
"DeepSeek-V2 has 128 attention heads"
);
}
#[test]
fn test_config_validate_ok() {
tiny_config().validate().expect("tiny_config should be valid");
}
#[test]
fn test_config_validate_zero_kv_lora_rank_fails() {
let mut cfg = tiny_config();
cfg.kv_lora_rank = 0;
assert!(
cfg.validate().is_err(),
"zero kv_lora_rank must fail validation"
);
}
#[test]
fn test_config_validate_experts_per_tok_exceeds_total_fails() {
let mut cfg = tiny_config();
cfg.num_experts_per_tok = cfg.n_routed_experts + 1;
assert!(
cfg.validate().is_err(),
"experts_per_tok > n_routed_experts must fail"
);
}
#[test]
fn test_dense_layer_detection_first_k() {
let cfg = tiny_config(); assert!(
cfg.is_dense_layer(0),
"layer 0 should be dense (first_k_dense_replace=1)"
);
assert!(
!cfg.is_dense_layer(1),
"layer 1 should be MoE (moe_layer_freq=1)"
);
}
#[test]
fn test_silu_zero() {
assert!((silu(0.0) - 0.0).abs() < 1e-6, "silu(0) == 0");
}
#[test]
fn test_silu_positive_input_positive_output() {
assert!(silu(1.0) > 0.0, "silu(1.0) should be positive");
}
#[test]
fn test_gelu_zero() {
assert!((gelu(0.0) - 0.0).abs() < 1e-4, "gelu(0) ≈ 0");
}
#[test]
fn test_apply_activation_length_preserved() {
let data = vec![1.0_f32, -1.0, 0.5, 2.0];
let out_silu = apply_activation(&data, ActivationType::SiLU);
let out_gelu = apply_activation(&data, ActivationType::GeLU);
assert_eq!(
out_silu.len(),
data.len(),
"silu activation preserves length"
);
assert_eq!(
out_gelu.len(),
data.len(),
"gelu activation preserves length"
);
}
#[test]
fn test_rmsnorm_unit_weight_normalizes() {
let device = trustformers_core::device::Device::CPU;
let norm =
DeepSeekV2RmsNorm::new(4, 1e-6, device).expect("rmsnorm creation should succeed");
let input =
Tensor::from_vec(vec![2.0_f32; 4], &[4]).expect("tensor creation should succeed");
let output = norm.forward(input).expect("rmsnorm forward should succeed");
let vals = output.to_vec_f32().expect("to_vec_f32 should succeed");
for v in vals {
assert!(
(v - 1.0).abs() < 1e-4,
"unit weights + uniform input → ≈ 1.0, got {v}"
);
}
}
#[test]
fn test_rope_apply_preserves_length() {
let cfg = tiny_config();
let device = trustformers_core::device::Device::CPU;
let rope = DeepSeekV2RotaryEmbedding::new(&cfg, device);
let seq_len = 4;
let mut data = vec![0.5_f32; seq_len * cfg.qk_rope_head_dim];
rope.apply(&mut data, seq_len);
assert_eq!(
data.len(),
seq_len * cfg.qk_rope_head_dim,
"RoPE must preserve data length"
);
}
#[test]
fn test_rope_position_zero_unchanged() {
let cfg = tiny_config();
let device = trustformers_core::device::Device::CPU;
let rope = DeepSeekV2RotaryEmbedding::new(&cfg, device);
let original = vec![1.0_f32, 0.0, 1.0, 0.0];
let mut data = original.clone();
rope.apply(&mut data, 1);
for (orig, got) in original.iter().zip(data.iter()) {
assert!(
(orig - got).abs() < 1e-5,
"pos=0 should leave values unchanged"
);
}
}
#[test]
fn test_mla_attention_creation() {
let cfg = tiny_config();
let device = trustformers_core::device::Device::CPU;
MlaAttention::new(&cfg, device).expect("MlaAttention creation should succeed");
}
#[test]
fn test_mla_attention_output_shape() {
let cfg = tiny_config();
let hidden_size = cfg.hidden_size;
let device = trustformers_core::device::Device::CPU;
let attn = MlaAttention::new(&cfg, device).expect("MlaAttention should be created");
let input = Tensor::from_vec(vec![0.1_f32; hidden_size], &[1, hidden_size])
.expect("tensor creation should succeed");
let output = attn.forward(input).expect("MlaAttention forward should succeed");
assert_eq!(
output.shape()[output.shape().len() - 1],
hidden_size,
"MLA output must project back to hidden_size"
);
}
#[test]
fn test_model_creation() {
let cfg = tiny_config();
DeepSeekV2Model::new(cfg).expect("model creation should succeed");
}
#[test]
fn test_model_forward_with_f32_ids() {
let cfg = tiny_config();
let hidden_size = cfg.hidden_size;
let model = DeepSeekV2Model::new(cfg).expect("model creation should succeed");
let input_ids = Tensor::from_vec(vec![0.0_f32, 1.0, 2.0], &[3])
.expect("tensor creation should succeed");
let output = model.forward(input_ids).expect("model forward should succeed");
let shape = output.shape();
assert_eq!(
shape[shape.len() - 1],
hidden_size,
"output last dim must be hidden_size"
);
}
#[test]
fn test_model_parameter_count_nonzero() {
let cfg = tiny_config();
let model = DeepSeekV2Model::new(cfg).expect("model creation should succeed");
assert!(model.num_parameters() > 0, "model must have parameters");
}
}