use burn::nn::{
pool::{AdaptiveAvgPool1d, AdaptiveAvgPool1dConfig},
Dropout, DropoutConfig, LayerNorm, LayerNormConfig, Linear, LinearConfig,
};
use burn::prelude::*;
use burn::tensor::activation::{gelu, softmax};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GMLPConfig {
pub n_vars: usize,
pub seq_len: usize,
pub n_classes: usize,
pub d_model: usize,
pub ff_mult: usize,
pub n_layers: usize,
pub dropout: f64,
}
impl Default for GMLPConfig {
fn default() -> Self {
Self {
n_vars: 1,
seq_len: 100,
n_classes: 2,
d_model: 128,
ff_mult: 4,
n_layers: 4,
dropout: 0.1,
}
}
}
impl GMLPConfig {
pub fn new(n_vars: usize, seq_len: usize, n_classes: usize) -> Self {
Self {
n_vars,
seq_len,
n_classes,
..Default::default()
}
}
#[must_use]
pub fn with_d_model(mut self, d_model: usize) -> Self {
self.d_model = d_model;
self
}
#[must_use]
pub fn with_ff_mult(mut self, ff_mult: usize) -> Self {
self.ff_mult = ff_mult;
self
}
#[must_use]
pub fn with_n_layers(mut self, n_layers: usize) -> Self {
self.n_layers = n_layers;
self
}
#[must_use]
pub fn with_dropout(mut self, dropout: f64) -> Self {
self.dropout = dropout;
self
}
pub fn init<B: Backend>(&self, device: &B::Device) -> GMLP<B> {
GMLP::new(self.clone(), device)
}
}
#[derive(Module, Debug)]
struct SpatialGatingUnit<B: Backend> {
norm: LayerNorm<B>,
proj: Linear<B>,
#[module(skip)]
seq_len: usize,
}
impl<B: Backend> SpatialGatingUnit<B> {
fn new(d_ff: usize, seq_len: usize, device: &B::Device) -> Self {
let norm = LayerNormConfig::new(d_ff / 2).init(device);
let proj = LinearConfig::new(seq_len, seq_len).init(device);
Self { norm, proj, seq_len }
}
fn forward(&self, x: Tensor<B, 3>) -> Tensor<B, 3> {
let [batch, seq_len, d_ff] = x.dims();
let half_d = d_ff / 2;
let u = x.clone().slice([0..batch, 0..seq_len, 0..half_d]);
let v = x.slice([0..batch, 0..seq_len, half_d..d_ff]);
let v = self.norm.forward(v);
let v = v.swap_dims(1, 2);
let v = self.proj.forward(v);
let v = v.swap_dims(1, 2);
u * v
}
}
#[derive(Module, Debug)]
struct GMLPBlock<B: Backend> {
norm: LayerNorm<B>,
proj_in: Linear<B>,
sgu: SpatialGatingUnit<B>,
proj_out: Linear<B>,
dropout: Dropout,
}
impl<B: Backend> GMLPBlock<B> {
fn new(d_model: usize, d_ff: usize, seq_len: usize, dropout: f64, device: &B::Device) -> Self {
let norm = LayerNormConfig::new(d_model).init(device);
let proj_in = LinearConfig::new(d_model, d_ff).init(device);
let sgu = SpatialGatingUnit::new(d_ff, seq_len, device);
let proj_out = LinearConfig::new(d_ff / 2, d_model).init(device);
let dropout_layer = DropoutConfig::new(dropout).init();
Self {
norm,
proj_in,
sgu,
proj_out,
dropout: dropout_layer,
}
}
fn forward(&self, x: Tensor<B, 3>) -> Tensor<B, 3> {
let residual = x.clone();
let out = self.norm.forward(x);
let out = self.proj_in.forward(out);
let out = gelu(out);
let out = self.sgu.forward(out);
let out = self.proj_out.forward(out);
let out = self.dropout.forward(out);
residual + out
}
}
#[derive(Module, Debug)]
pub struct GMLP<B: Backend> {
input_proj: Linear<B>,
blocks: Vec<GMLPBlock<B>>,
final_norm: LayerNorm<B>,
gap: AdaptiveAvgPool1d,
head: Linear<B>,
#[module(skip)]
d_model: usize,
}
impl<B: Backend> GMLP<B> {
pub fn new(config: GMLPConfig, device: &B::Device) -> Self {
let d_ff = config.d_model * config.ff_mult;
let input_proj = LinearConfig::new(config.n_vars, config.d_model).init(device);
let blocks: Vec<_> = (0..config.n_layers)
.map(|_| GMLPBlock::new(config.d_model, d_ff, config.seq_len, config.dropout, device))
.collect();
let final_norm = LayerNormConfig::new(config.d_model).init(device);
let gap = AdaptiveAvgPool1dConfig::new(1).init();
let head = LinearConfig::new(config.d_model, config.n_classes).init(device);
Self {
input_proj,
blocks,
final_norm,
gap,
head,
d_model: config.d_model,
}
}
pub fn forward(&self, x: Tensor<B, 3>) -> Tensor<B, 2> {
let [batch_size, _n_vars, _seq_len] = x.dims();
let out = x.swap_dims(1, 2);
let out = self.input_proj.forward(out);
let mut out = out;
for block in &self.blocks {
out = block.forward(out);
}
let out = self.final_norm.forward(out);
let out = out.swap_dims(1, 2);
let out = self.gap.forward(out);
let out = out.reshape([batch_size, self.d_model]);
self.head.forward(out)
}
pub fn forward_probs(&self, x: Tensor<B, 3>) -> Tensor<B, 2> {
let logits = self.forward(x);
softmax(logits, 1)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_gmlp_config_default() {
let config = GMLPConfig::default();
assert_eq!(config.d_model, 128);
assert_eq!(config.ff_mult, 4);
assert_eq!(config.n_layers, 4);
}
#[test]
fn test_gmlp_config_new() {
let config = GMLPConfig::new(3, 200, 10);
assert_eq!(config.n_vars, 3);
assert_eq!(config.seq_len, 200);
assert_eq!(config.n_classes, 10);
}
#[test]
fn test_gmlp_config_builder() {
let config = GMLPConfig::new(3, 100, 5)
.with_d_model(256)
.with_ff_mult(6)
.with_n_layers(8)
.with_dropout(0.2);
assert_eq!(config.d_model, 256);
assert_eq!(config.ff_mult, 6);
assert_eq!(config.n_layers, 8);
assert_eq!(config.dropout, 0.2);
}
}