use std::collections::HashMap;
use flodl::nn::{Dropout, Embedding, LayerNorm, Linear, Module, NamedInputModule, Parameter};
use flodl::{
DType, Device, FlowBuilder, Graph, Result, Tensor, TensorError, Variable,
};
use crate::models::bert::build_extended_attention_mask;
use crate::models::transformer_layer::{LayerNaming, TransformerLayer, TransformerLayerConfig};
use crate::path::{prefix_params, HfPath};
#[derive(Debug, Clone)]
pub struct RobertaConfig {
pub vocab_size: i64,
pub hidden_size: i64,
pub num_hidden_layers: i64,
pub num_attention_heads: i64,
pub intermediate_size: i64,
pub max_position_embeddings: i64,
pub type_vocab_size: i64,
pub pad_token_id: i64,
pub layer_norm_eps: f64,
pub hidden_dropout_prob: f64,
pub attention_probs_dropout_prob: f64,
pub num_labels: Option<i64>,
pub id2label: Option<Vec<String>>,
}
impl RobertaConfig {
pub fn roberta_base() -> Self {
RobertaConfig {
vocab_size: 50265,
hidden_size: 768,
num_hidden_layers: 12,
num_attention_heads: 12,
intermediate_size: 3072,
max_position_embeddings: 514,
type_vocab_size: 1,
pad_token_id: 1,
layer_norm_eps: 1e-5,
hidden_dropout_prob: 0.1,
attention_probs_dropout_prob: 0.1,
num_labels: None,
id2label: None,
}
}
pub fn from_json_str(s: &str) -> Result<Self> {
use crate::config_json::{
optional_f64, optional_i64, parse_id2label, parse_num_labels, required_i64,
};
let v: serde_json::Value = serde_json::from_str(s)
.map_err(|e| TensorError::new(&format!("config.json parse error: {e}")))?;
let id2label = parse_id2label(&v)?;
let num_labels = parse_num_labels(&v, id2label.as_deref());
Ok(RobertaConfig {
vocab_size: required_i64(&v, "vocab_size")?,
hidden_size: required_i64(&v, "hidden_size")?,
num_hidden_layers: required_i64(&v, "num_hidden_layers")?,
num_attention_heads: required_i64(&v, "num_attention_heads")?,
intermediate_size: required_i64(&v, "intermediate_size")?,
max_position_embeddings: required_i64(&v, "max_position_embeddings")?,
type_vocab_size: optional_i64(&v, "type_vocab_size", 1),
pad_token_id: optional_i64(&v, "pad_token_id", 1),
layer_norm_eps: optional_f64(&v, "layer_norm_eps", 1e-5),
hidden_dropout_prob: optional_f64(&v, "hidden_dropout_prob", 0.1),
attention_probs_dropout_prob: optional_f64(&v, "attention_probs_dropout_prob", 0.1),
num_labels,
id2label,
})
}
}
pub struct RobertaEmbeddings {
word_embeddings: Embedding,
position_embeddings: Embedding,
token_type_embeddings: Embedding,
layer_norm: LayerNorm,
dropout: Dropout,
padding_idx: i64,
}
impl RobertaEmbeddings {
pub fn on_device(config: &RobertaConfig, device: Device) -> Result<Self> {
Ok(RobertaEmbeddings {
word_embeddings: Embedding::on_device_with_padding_idx(
config.vocab_size,
config.hidden_size,
Some(config.pad_token_id),
device,
)?,
position_embeddings: Embedding::on_device(
config.max_position_embeddings,
config.hidden_size,
device,
)?,
token_type_embeddings: Embedding::on_device(
config.type_vocab_size,
config.hidden_size,
device,
)?,
layer_norm: LayerNorm::on_device_with_eps(
config.hidden_size,
config.layer_norm_eps,
device,
)?,
dropout: Dropout::new(config.hidden_dropout_prob),
padding_idx: config.pad_token_id,
})
}
fn position_ids_from_input_ids(&self, input_ids: &Tensor) -> Result<Tensor> {
let mask = input_ids.ne_scalar(self.padding_idx as f64)?;
let cum = mask.cumsum(1)?;
cum.mul(&mask)?
.add_scalar(self.padding_idx as f64)?
.to_dtype(DType::Int64)
}
}
impl Module for RobertaEmbeddings {
fn name(&self) -> &str { "roberta_embeddings" }
fn forward(&self, input: &Variable) -> Result<Variable> {
let pos_ids = self.position_ids_from_input_ids(&input.data())?;
let pos_var = Variable::new(pos_ids, false);
let word = self.word_embeddings.forward(input)?;
let pe = self.position_embeddings.forward(&pos_var)?;
let summed = word.add(&pe)?;
let ln = self.layer_norm.forward(&summed)?;
self.dropout.forward(&ln)
}
fn parameters(&self) -> Vec<Parameter> {
let mut out = Vec::new();
out.extend(prefix_params("word_embeddings", self.word_embeddings.parameters()));
out.extend(prefix_params("position_embeddings", self.position_embeddings.parameters()));
out.extend(prefix_params("token_type_embeddings", self.token_type_embeddings.parameters()));
out.extend(prefix_params("LayerNorm", self.layer_norm.parameters()));
out
}
fn as_named_input(&self) -> Option<&dyn NamedInputModule> { Some(self) }
fn set_training(&self, training: bool) {
self.dropout.set_training(training);
}
}
impl NamedInputModule for RobertaEmbeddings {
fn forward_named(
&self,
input: &Variable,
refs: &HashMap<String, Variable>,
) -> Result<Variable> {
let pos_ids = self.position_ids_from_input_ids(&input.data())?;
let pos_var = Variable::new(pos_ids, false);
let word = self.word_embeddings.forward(input)?;
let pe = self.position_embeddings.forward(&pos_var)?;
let mut summed = word.add(&pe)?;
if let Some(tt) = refs.get("token_type_ids") {
let te = self.token_type_embeddings.forward(tt)?;
summed = summed.add(&te)?;
}
let ln = self.layer_norm.forward(&summed)?;
self.dropout.forward(&ln)
}
}
pub struct RobertaPooler {
dense: Linear,
}
impl RobertaPooler {
pub fn on_device(config: &RobertaConfig, device: Device) -> Result<Self> {
Ok(RobertaPooler {
dense: Linear::on_device(config.hidden_size, config.hidden_size, device)?,
})
}
}
impl Module for RobertaPooler {
fn name(&self) -> &str { "roberta_pooler" }
fn forward(&self, input: &Variable) -> Result<Variable> {
let cls = input.select(1, 0)?;
let pooled = self.dense.forward(&cls)?;
pooled.tanh()
}
fn parameters(&self) -> Vec<Parameter> {
prefix_params("dense", self.dense.parameters())
}
}
fn roberta_layer_config(config: &RobertaConfig) -> TransformerLayerConfig {
TransformerLayerConfig {
hidden_size: config.hidden_size,
num_attention_heads: config.num_attention_heads,
intermediate_size: config.intermediate_size,
hidden_dropout_prob: config.hidden_dropout_prob,
attention_probs_dropout_prob: config.attention_probs_dropout_prob,
layer_norm_eps: config.layer_norm_eps,
}
}
fn roberta_backbone_flow(
config: &RobertaConfig,
device: Device,
with_pooler: bool,
) -> Result<FlowBuilder> {
let mut fb = FlowBuilder::new()
.input(&["token_type_ids", "attention_mask"])
.through(RobertaEmbeddings::on_device(config, device)?)
.tag("roberta.embeddings")
.using(&["token_type_ids"]);
let layer_root = HfPath::new("roberta").sub("encoder").sub("layer");
let layer_cfg = roberta_layer_config(config);
for i in 0..config.num_hidden_layers {
let tag = layer_root.sub(i).to_string();
fb = fb
.through(TransformerLayer::on_device(&layer_cfg, LayerNaming::BERT, device)?)
.tag(&tag)
.using(&["attention_mask"]);
}
if with_pooler {
fb = fb
.through(RobertaPooler::on_device(config, device)?)
.tag("roberta.pooler");
}
Ok(fb)
}
pub struct RobertaModel;
impl RobertaModel {
pub fn build(config: &RobertaConfig) -> Result<Graph> {
Self::on_device(config, Device::CPU)
}
pub fn on_device(config: &RobertaConfig, device: Device) -> Result<Graph> {
roberta_backbone_flow(config, device, true)?.build()
}
pub fn on_device_without_pooler(config: &RobertaConfig, device: Device) -> Result<Graph> {
roberta_backbone_flow(config, device, false)?.build()
}
}
use crate::task_heads::{check_num_labels, default_labels, extract_best_span, logits_to_sorted_labels};
pub use crate::task_heads::{Answer, TokenPrediction};
pub struct RobertaClassificationHead {
dropout: Dropout,
dense: Linear,
out_proj: Linear,
}
impl RobertaClassificationHead {
pub fn on_device(
config: &RobertaConfig,
num_labels: i64,
device: Device,
) -> Result<Self> {
Ok(RobertaClassificationHead {
dropout: Dropout::new(config.hidden_dropout_prob),
dense: Linear::on_device(config.hidden_size, config.hidden_size, device)?,
out_proj: Linear::on_device(config.hidden_size, num_labels, device)?,
})
}
}
impl Module for RobertaClassificationHead {
fn name(&self) -> &str { "roberta_classification_head" }
fn forward(&self, input: &Variable) -> Result<Variable> {
let cls = input.select(1, 0)?; let x = self.dropout.forward(&cls)?;
let x = self.dense.forward(&x)?;
let x = x.tanh()?;
let x = self.dropout.forward(&x)?;
self.out_proj.forward(&x)
}
fn parameters(&self) -> Vec<Parameter> {
let mut out = Vec::new();
out.extend(prefix_params("dense", self.dense.parameters()));
out.extend(prefix_params("out_proj", self.out_proj.parameters()));
out
}
fn set_training(&self, training: bool) {
self.dropout.set_training(training);
}
}
pub struct RobertaForSequenceClassification {
graph: Graph,
id2label: Vec<String>,
#[cfg(feature = "tokenizer")]
tokenizer: Option<crate::tokenizer::HfTokenizer>,
}
impl RobertaForSequenceClassification {
pub fn on_device(
config: &RobertaConfig,
num_labels: i64,
device: Device,
) -> Result<Self> {
let num_labels = check_num_labels(num_labels)?;
let graph = roberta_backbone_flow(config, device, false)?
.through(RobertaClassificationHead::on_device(config, num_labels, device)?)
.tag("classifier")
.build()?;
let id2label = config
.id2label
.clone()
.unwrap_or_else(|| default_labels(num_labels));
Ok(Self {
graph,
id2label,
#[cfg(feature = "tokenizer")]
tokenizer: None,
})
}
pub(crate) fn num_labels_from_config(config: &RobertaConfig) -> Result<i64> {
config.num_labels.ok_or_else(|| {
TensorError::new(
"RobertaForSequenceClassification: config.json has no `num_labels` \
(nor `id2label`); cannot infer head size",
)
})
}
pub fn graph(&self) -> &Graph { &self.graph }
pub fn labels(&self) -> &[String] { &self.id2label }
#[cfg(feature = "tokenizer")]
pub fn with_tokenizer(mut self, tok: crate::tokenizer::HfTokenizer) -> Self {
self.tokenizer = Some(tok);
self
}
#[cfg(feature = "tokenizer")]
pub fn classify(
&self,
enc: &crate::tokenizer::EncodedBatch,
) -> Result<Vec<Vec<(String, f32)>>> {
let logits = self.forward_from_encoded(enc)?;
logits_to_sorted_labels(&logits, &self.id2label)
}
#[cfg(feature = "tokenizer")]
pub fn predict(&self, texts: &[&str]) -> Result<Vec<Vec<(String, f32)>>> {
let tok = self.tokenizer.as_ref().ok_or_else(|| {
TensorError::new(
"RobertaForSequenceClassification::predict requires a tokenizer; \
use from_pretrained or .with_tokenizer(...) first",
)
})?;
let enc = tok.encode(texts)?;
self.classify(&enc)
}
#[cfg(feature = "tokenizer")]
fn forward_from_encoded(
&self,
enc: &crate::tokenizer::EncodedBatch,
) -> Result<Variable> {
self.graph.eval();
let mask_f32 = enc.attention_mask.data().to_dtype(DType::Float32)?;
let mask = Variable::new(build_extended_attention_mask(&mask_f32)?, false);
self.graph.forward_multi(&[
enc.input_ids.clone(),
enc.token_type_ids.clone(),
mask,
])
}
}
pub struct RobertaForTokenClassification {
graph: Graph,
id2label: Vec<String>,
#[cfg(feature = "tokenizer")]
tokenizer: Option<crate::tokenizer::HfTokenizer>,
}
impl RobertaForTokenClassification {
pub fn on_device(
config: &RobertaConfig,
num_labels: i64,
device: Device,
) -> Result<Self> {
let num_labels = check_num_labels(num_labels)?;
let graph = roberta_backbone_flow(config, device, false)?
.through(Dropout::new(config.hidden_dropout_prob))
.through(Linear::on_device(config.hidden_size, num_labels, device)?)
.tag("classifier")
.build()?;
let id2label = config
.id2label
.clone()
.unwrap_or_else(|| default_labels(num_labels));
Ok(Self {
graph,
id2label,
#[cfg(feature = "tokenizer")]
tokenizer: None,
})
}
pub(crate) fn num_labels_from_config(config: &RobertaConfig) -> Result<i64> {
config.num_labels.ok_or_else(|| {
TensorError::new(
"RobertaForTokenClassification: config.json has no `num_labels` \
(nor `id2label`); cannot infer head size",
)
})
}
pub fn graph(&self) -> &Graph { &self.graph }
pub fn labels(&self) -> &[String] { &self.id2label }
#[cfg(feature = "tokenizer")]
pub fn with_tokenizer(mut self, tok: crate::tokenizer::HfTokenizer) -> Self {
self.tokenizer = Some(tok);
self
}
#[cfg(feature = "tokenizer")]
pub fn tag(
&self,
enc: &crate::tokenizer::EncodedBatch,
) -> Result<Vec<Vec<TokenPrediction>>> {
let tok = self.tokenizer.as_ref().ok_or_else(|| {
TensorError::new(
"RobertaForTokenClassification::tag requires a tokenizer; \
attach one via .with_tokenizer(...) or from_pretrained",
)
})?;
self.graph.eval();
let mask_f32 = enc.attention_mask.data().to_dtype(DType::Float32)?;
let mask = Variable::new(build_extended_attention_mask(&mask_f32)?, false);
let logits = self.graph.forward_multi(&[
enc.input_ids.clone(),
enc.token_type_ids.clone(),
mask,
])?;
let probs = logits.softmax(-1)?;
let shape = probs.shape();
assert_eq!(shape.len(), 3, "expected [B, S, num_labels], got {shape:?}");
let batch = shape[0] as usize;
let seq = shape[1] as usize;
let n = shape[2] as usize;
let flat = probs.data().to_f32_vec()?;
let input_ids: Vec<i64> = enc.input_ids.data().to_i64_vec()?;
let attn_ids: Vec<i64> = enc.attention_mask.data().to_i64_vec()?;
let mut out = Vec::with_capacity(batch);
for b in 0..batch {
let mut row = Vec::with_capacity(seq);
for s in 0..seq {
let base = (b * seq + s) * n;
let (best_k, &best_p) = flat[base..base + n]
.iter()
.enumerate()
.max_by(|a, b| a.1.partial_cmp(b.1).unwrap_or(std::cmp::Ordering::Equal))
.expect("n > 0 checked by check_num_labels");
let id = input_ids[b * seq + s] as u32;
let token = tok
.inner()
.id_to_token(id)
.unwrap_or_else(|| format!("<unk_id={id}>"));
row.push(TokenPrediction {
token,
label: self.id2label[best_k].clone(),
score: best_p,
attends: attn_ids[b * seq + s] != 0,
});
}
out.push(row);
}
Ok(out)
}
#[cfg(feature = "tokenizer")]
pub fn predict(&self, texts: &[&str]) -> Result<Vec<Vec<TokenPrediction>>> {
let tok = self.tokenizer.as_ref().ok_or_else(|| {
TensorError::new(
"RobertaForTokenClassification::predict requires a tokenizer; \
use from_pretrained or .with_tokenizer(...) first",
)
})?;
let enc = tok.encode(texts)?;
self.tag(&enc)
}
}
pub struct RobertaForQuestionAnswering {
graph: Graph,
#[cfg(feature = "tokenizer")]
tokenizer: Option<crate::tokenizer::HfTokenizer>,
}
impl RobertaForQuestionAnswering {
pub fn on_device(config: &RobertaConfig, device: Device) -> Result<Self> {
let graph = roberta_backbone_flow(config, device, false)?
.through(Linear::on_device(config.hidden_size, 2, device)?)
.tag("qa_outputs")
.build()?;
Ok(Self {
graph,
#[cfg(feature = "tokenizer")]
tokenizer: None,
})
}
pub fn graph(&self) -> &Graph { &self.graph }
#[cfg(feature = "tokenizer")]
pub fn with_tokenizer(mut self, tok: crate::tokenizer::HfTokenizer) -> Self {
self.tokenizer = Some(tok);
self
}
#[cfg(feature = "tokenizer")]
pub fn answer(&self, question: &str, context: &str) -> Result<Answer> {
let mut out = self.answer_batch(&[(question, context)])?;
Ok(out.pop().expect("answer_batch returns one per input"))
}
#[cfg(feature = "tokenizer")]
pub fn answer_batch(&self, pairs: &[(&str, &str)]) -> Result<Vec<Answer>> {
let tok = self.tokenizer.as_ref().ok_or_else(|| {
TensorError::new(
"RobertaForQuestionAnswering::answer requires a tokenizer; \
use from_pretrained or .with_tokenizer(...) first",
)
})?;
let enc = tok.encode_pairs(pairs)?;
self.extract(&enc)
}
#[cfg(feature = "tokenizer")]
pub fn extract(
&self,
enc: &crate::tokenizer::EncodedBatch,
) -> Result<Vec<Answer>> {
let tok = self.tokenizer.as_ref().ok_or_else(|| {
TensorError::new(
"RobertaForQuestionAnswering::extract requires a tokenizer; \
attach one via .with_tokenizer(...) or from_pretrained",
)
})?;
self.graph.eval();
let mask_f32 = enc.attention_mask.data().to_dtype(DType::Float32)?;
let mask = Variable::new(build_extended_attention_mask(&mask_f32)?, false);
let logits = self.graph.forward_multi(&[
enc.input_ids.clone(),
enc.token_type_ids.clone(),
mask,
])?;
extract_best_span(&logits, enc, tok)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::safetensors_io::expected_from_graph;
fn expected_layer_keys(i: i64) -> Vec<String> {
let suffixes = [
"attention.output.LayerNorm.bias",
"attention.output.LayerNorm.weight",
"attention.output.dense.bias",
"attention.output.dense.weight",
"attention.self.key.bias",
"attention.self.key.weight",
"attention.self.query.bias",
"attention.self.query.weight",
"attention.self.value.bias",
"attention.self.value.weight",
"intermediate.dense.bias",
"intermediate.dense.weight",
"output.LayerNorm.bias",
"output.LayerNorm.weight",
"output.dense.bias",
"output.dense.weight",
];
suffixes.iter().map(|s| format!("roberta.encoder.layer.{i}.{s}")).collect()
}
#[test]
fn roberta_parameter_keys_match_hf_dotted_form() {
let config = RobertaConfig::roberta_base();
let graph = RobertaModel::build(&config).unwrap();
let expected = expected_from_graph(&graph);
let mut keys: Vec<String> = expected.iter().map(|p| p.key.clone()).collect();
keys.sort();
let mut want: Vec<String> = vec![
"roberta.embeddings.LayerNorm.bias".into(),
"roberta.embeddings.LayerNorm.weight".into(),
"roberta.embeddings.position_embeddings.weight".into(),
"roberta.embeddings.token_type_embeddings.weight".into(),
"roberta.embeddings.word_embeddings.weight".into(),
];
for i in 0..config.num_hidden_layers {
want.extend(expected_layer_keys(i));
}
want.extend([
"roberta.pooler.dense.bias".into(),
"roberta.pooler.dense.weight".into(),
]);
want.sort();
assert_eq!(keys, want);
}
#[test]
fn roberta_seqcls_head_has_two_layer_keys() {
let config = RobertaConfig::roberta_base();
let head = RobertaForSequenceClassification::on_device(
&config, 3, Device::CPU,
).unwrap();
let expected = expected_from_graph(head.graph());
let keys: Vec<String> = expected.iter().map(|p| p.key.clone()).collect();
assert!(keys.contains(&"classifier.dense.weight".to_string()));
assert!(keys.contains(&"classifier.dense.bias".to_string()));
assert!(keys.contains(&"classifier.out_proj.weight".to_string()));
assert!(keys.contains(&"classifier.out_proj.bias".to_string()));
assert!(!keys.iter().any(|k| k == "classifier.weight"));
}
#[test]
fn roberta_position_ids_follow_hf_convention() {
let config = RobertaConfig::roberta_base();
let emb = RobertaEmbeddings::on_device(&config, Device::CPU).unwrap();
let ids = Tensor::from_i64(&[0, 100, 200, 2, 1, 1], &[1, 6], Device::CPU).unwrap();
let pos = emb.position_ids_from_input_ids(&ids).unwrap();
let flat: Vec<i64> = pos.to_i64_vec().unwrap();
assert_eq!(flat, vec![2, 3, 4, 5, 1, 1]);
}
}