use crate::albert::AlbertForSequenceClassification;
use crate::bart::BartForSequenceClassification;
use crate::bert::BertForSequenceClassification;
use crate::common::error::RustBertError;
use crate::deberta::DebertaForSequenceClassification;
use crate::distilbert::DistilBertModelClassifier;
use crate::fnet::FNetForSequenceClassification;
use crate::longformer::LongformerForSequenceClassification;
use crate::mobilebert::MobileBertForSequenceClassification;
use crate::pipelines::common::{
cast_var_store, get_device, ConfigOption, ModelResource, ModelType, TokenizerOption,
};
use crate::reformer::ReformerForSequenceClassification;
use crate::resources::ResourceProvider;
use crate::roberta::RobertaForSequenceClassification;
use crate::xlnet::XLNetForSequenceClassification;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tch::nn::VarStore;
use tch::{no_grad, Device, Kind, Tensor};
use crate::deberta_v2::DebertaV2ForSequenceClassification;
#[cfg(feature = "onnx")]
use crate::pipelines::onnx::{config::ONNXEnvironmentConfig, ONNXEncoder};
#[cfg(feature = "remote")]
use crate::{
distilbert::{DistilBertConfigResources, DistilBertModelResources, DistilBertVocabResources},
resources::RemoteResource,
};
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Label {
pub text: String,
pub score: f64,
pub id: i64,
#[serde(default)]
pub sentence: usize,
}
pub struct SequenceClassificationConfig {
pub model_type: ModelType,
pub model_resource: ModelResource,
pub config_resource: Box<dyn ResourceProvider + Send>,
pub vocab_resource: Box<dyn ResourceProvider + Send>,
pub merges_resource: Option<Box<dyn ResourceProvider + Send>>,
pub lower_case: bool,
pub strip_accents: Option<bool>,
pub add_prefix_space: Option<bool>,
pub device: Device,
pub kind: Option<Kind>,
}
impl SequenceClassificationConfig {
pub fn new<RC, RV>(
model_type: ModelType,
model_resource: ModelResource,
config_resource: RC,
vocab_resource: RV,
merges_resource: Option<RV>,
lower_case: bool,
strip_accents: impl Into<Option<bool>>,
add_prefix_space: impl Into<Option<bool>>,
) -> SequenceClassificationConfig
where
RC: ResourceProvider + Send + 'static,
RV: ResourceProvider + Send + 'static,
{
SequenceClassificationConfig {
model_type,
model_resource,
config_resource: Box::new(config_resource),
vocab_resource: Box::new(vocab_resource),
merges_resource: merges_resource.map(|r| Box::new(r) as Box<_>),
lower_case,
strip_accents: strip_accents.into(),
add_prefix_space: add_prefix_space.into(),
device: Device::cuda_if_available(),
kind: None,
}
}
}
#[cfg(feature = "remote")]
impl Default for SequenceClassificationConfig {
fn default() -> SequenceClassificationConfig {
SequenceClassificationConfig::new(
ModelType::DistilBert,
ModelResource::Torch(Box::new(RemoteResource::from_pretrained(
DistilBertModelResources::DISTIL_BERT_SST2,
))),
RemoteResource::from_pretrained(DistilBertConfigResources::DISTIL_BERT_SST2),
RemoteResource::from_pretrained(DistilBertVocabResources::DISTIL_BERT_SST2),
None,
true,
None,
None,
)
}
}
#[allow(clippy::large_enum_variant)]
pub enum SequenceClassificationOption {
Bert(BertForSequenceClassification),
Deberta(DebertaForSequenceClassification),
DebertaV2(DebertaV2ForSequenceClassification),
DistilBert(DistilBertModelClassifier),
MobileBert(MobileBertForSequenceClassification),
Roberta(RobertaForSequenceClassification),
XLMRoberta(RobertaForSequenceClassification),
Albert(AlbertForSequenceClassification),
XLNet(XLNetForSequenceClassification),
Bart(BartForSequenceClassification),
Reformer(ReformerForSequenceClassification),
Longformer(LongformerForSequenceClassification),
FNet(FNetForSequenceClassification),
#[cfg(feature = "onnx")]
ONNX(ONNXEncoder),
}
impl SequenceClassificationOption {
pub fn new(config: &SequenceClassificationConfig) -> Result<Self, RustBertError> {
match config.model_resource {
ModelResource::Torch(_) => Self::new_torch(config),
#[cfg(feature = "onnx")]
ModelResource::ONNX(_) => Self::new_onnx(config),
}
}
fn new_torch(config: &SequenceClassificationConfig) -> Result<Self, RustBertError> {
let device = config.device;
let weights_path = config.model_resource.get_torch_local_path()?;
let mut var_store = VarStore::new(device);
let model_config =
&ConfigOption::from_file(config.model_type, config.config_resource.get_local_path()?);
let model_type = config.model_type;
let model = match model_type {
ModelType::Bert => {
if let ConfigOption::Bert(config) = model_config {
Ok(Self::Bert(
BertForSequenceClassification::new(var_store.root(), config)?,
))
} else {
Err(RustBertError::InvalidConfigurationError(
"You can only supply a BertConfig for Bert!".to_string(),
))
}
}
ModelType::Deberta => {
if let ConfigOption::Deberta(config) = model_config {
Ok(Self::Deberta(
DebertaForSequenceClassification::new(var_store.root(), config)?,
))
} else {
Err(RustBertError::InvalidConfigurationError(
"You can only supply a DebertaConfig for DeBERTa!".to_string(),
))
}
}
ModelType::DebertaV2 => {
if let ConfigOption::DebertaV2(config) = model_config {
Ok(Self::DebertaV2(
DebertaV2ForSequenceClassification::new(var_store.root(), config)?,
))
} else {
Err(RustBertError::InvalidConfigurationError(
"You can only supply a DebertaV2Config for DeBERTa V2!".to_string(),
))
}
}
ModelType::DistilBert => {
if let ConfigOption::DistilBert(config) = model_config {
Ok(Self::DistilBert(
DistilBertModelClassifier::new(var_store.root(), config)?,
))
} else {
Err(RustBertError::InvalidConfigurationError(
"You can only supply a DistilBertConfig for DistilBert!".to_string(),
))
}
}
ModelType::MobileBert => {
if let ConfigOption::MobileBert(config) = model_config {
Ok(Self::MobileBert(
MobileBertForSequenceClassification::new(var_store.root(), config)?,
))
} else {
Err(RustBertError::InvalidConfigurationError(
"You can only supply a MobileBertConfig for MobileBert!".to_string(),
))
}
}
ModelType::Roberta => {
if let ConfigOption::Roberta(config) = model_config {
Ok(Self::Roberta(
RobertaForSequenceClassification::new(var_store.root(), config)?,
))
} else {
Err(RustBertError::InvalidConfigurationError(
"You can only supply a RobertaConfig for Roberta!".to_string(),
))
}
}
ModelType::XLMRoberta => {
if let ConfigOption::Roberta(config) = model_config {
Ok(Self::XLMRoberta(
RobertaForSequenceClassification::new(var_store.root(), config)?,
))
} else {
Err(RustBertError::InvalidConfigurationError(
"You can only supply a RobertaConfig for Roberta!".to_string(),
))
}
}
ModelType::Albert => {
if let ConfigOption::Albert(config) = model_config {
Ok(Self::Albert(
AlbertForSequenceClassification::new(var_store.root(), config)?,
))
} else {
Err(RustBertError::InvalidConfigurationError(
"You can only supply an AlbertConfig for Albert!".to_string(),
))
}
}
ModelType::XLNet => {
if let ConfigOption::XLNet(config) = model_config {
Ok(Self::XLNet(
XLNetForSequenceClassification::new(var_store.root(), config)?,
))
} else {
Err(RustBertError::InvalidConfigurationError(
"You can only supply an XLNetConfig for XLNet!".to_string(),
))
}
}
ModelType::Bart => {
if let ConfigOption::Bart(config) = model_config {
Ok(Self::Bart(
BartForSequenceClassification::new(var_store.root(), config)?,
))
} else {
Err(RustBertError::InvalidConfigurationError(
"You can only supply a BertConfig for Bert!".to_string(),
))
}
}
ModelType::Reformer => {
if let ConfigOption::Reformer(config) = model_config {
Ok(Self::Reformer(
ReformerForSequenceClassification::new(var_store.root(), config)?,
))
} else {
Err(RustBertError::InvalidConfigurationError(
"You can only supply a ReformerConfig for Reformer!".to_string(),
))
}
}
ModelType::Longformer => {
if let ConfigOption::Longformer(config) = model_config {
Ok(Self::Longformer(
LongformerForSequenceClassification::new(var_store.root(), config)?,
))
} else {
Err(RustBertError::InvalidConfigurationError(
"You can only supply a LongformerConfig for Longformer!".to_string(),
))
}
}
ModelType::FNet => {
if let ConfigOption::FNet(config) = model_config {
Ok(Self::FNet(
FNetForSequenceClassification::new(var_store.root(), config)?,
))
} else {
Err(RustBertError::InvalidConfigurationError(
"You can only supply a FNetConfig for FNet!".to_string(),
))
}
}
#[cfg(feature = "onnx")]
ModelType::ONNX => Err(RustBertError::InvalidConfigurationError(
"A `ModelType::ONNX` ModelType was provided in the configuration with `ModelResources::TORCH`, these are incompatible".to_string(),
)),
_ => Err(RustBertError::InvalidConfigurationError(format!(
"Sequence Classification not implemented for {model_type:?}!",
))),
}?;
var_store.load(weights_path)?;
cast_var_store(&mut var_store, config.kind, device);
Ok(model)
}
#[cfg(feature = "onnx")]
pub fn new_onnx(config: &SequenceClassificationConfig) -> Result<Self, RustBertError> {
let onnx_config = ONNXEnvironmentConfig::from_device(config.device);
let environment = onnx_config.get_environment()?;
let encoder_file = config
.model_resource
.get_onnx_local_paths()?
.encoder_path
.ok_or(RustBertError::InvalidConfigurationError(
"An encoder file must be provided for sequence classification ONNX models."
.to_string(),
))?;
Ok(Self::ONNX(ONNXEncoder::new(
encoder_file,
&environment,
&onnx_config,
)?))
}
pub fn model_type(&self) -> ModelType {
match *self {
Self::Bert(_) => ModelType::Bert,
Self::Deberta(_) => ModelType::Deberta,
Self::DebertaV2(_) => ModelType::DebertaV2,
Self::Roberta(_) => ModelType::Roberta,
Self::XLMRoberta(_) => ModelType::Roberta,
Self::DistilBert(_) => ModelType::DistilBert,
Self::MobileBert(_) => ModelType::MobileBert,
Self::Albert(_) => ModelType::Albert,
Self::XLNet(_) => ModelType::XLNet,
Self::Bart(_) => ModelType::Bart,
Self::Reformer(_) => ModelType::Reformer,
Self::Longformer(_) => ModelType::Longformer,
Self::FNet(_) => ModelType::FNet,
#[cfg(feature = "onnx")]
Self::ONNX(_) => ModelType::ONNX,
}
}
pub fn forward_t(
&self,
input_ids: Option<&Tensor>,
mask: Option<&Tensor>,
token_type_ids: Option<&Tensor>,
position_ids: Option<&Tensor>,
input_embeds: Option<&Tensor>,
train: bool,
) -> Tensor {
match *self {
Self::Bart(ref model) => {
model
.forward_t(
input_ids.expect("`input_ids` must be provided for BART models"),
mask,
None,
None,
None,
train,
)
.decoder_output
}
Self::Bert(ref model) => {
model
.forward_t(
input_ids,
mask,
token_type_ids,
position_ids,
input_embeds,
train,
)
.logits
}
Self::Deberta(ref model) => {
model
.forward_t(
input_ids,
mask,
token_type_ids,
position_ids,
input_embeds,
train,
)
.expect("Error in Deberta forward_t")
.logits
}
Self::DebertaV2(ref model) => {
model
.forward_t(
input_ids,
mask,
token_type_ids,
position_ids,
input_embeds,
train,
)
.expect("Error in Deberta V2 forward_t")
.logits
}
Self::DistilBert(ref model) => {
model
.forward_t(input_ids, mask, input_embeds, train)
.expect("Error in distilbert forward_t")
.logits
}
Self::MobileBert(ref model) => {
model
.forward_t(input_ids, None, None, input_embeds, mask, train)
.expect("Error in mobilebert forward_t")
.logits
}
Self::Roberta(ref model) | Self::XLMRoberta(ref model) => {
model
.forward_t(
input_ids,
mask,
token_type_ids,
position_ids,
input_embeds,
train,
)
.logits
}
Self::Albert(ref model) => {
model
.forward_t(
input_ids,
mask,
token_type_ids,
position_ids,
input_embeds,
train,
)
.logits
}
Self::XLNet(ref model) => {
model
.forward_t(
input_ids,
mask,
None,
None,
None,
token_type_ids,
input_embeds,
train,
)
.logits
}
Self::Reformer(ref model) => {
model
.forward_t(input_ids, None, None, mask, None, train)
.expect("Error in Reformer forward pass.")
.logits
}
Self::Longformer(ref model) => {
model
.forward_t(
input_ids,
mask,
None,
token_type_ids,
position_ids,
input_embeds,
train,
)
.expect("Error in Longformer forward pass.")
.logits
}
Self::FNet(ref model) => {
model
.forward_t(input_ids, token_type_ids, position_ids, input_embeds, train)
.expect("Error in FNet forward pass.")
.logits
}
#[cfg(feature = "onnx")]
Self::ONNX(ref model) => {
let attention_mask = input_ids.unwrap().ones_like();
model
.forward(
input_ids,
Some(&attention_mask),
token_type_ids,
position_ids,
input_embeds,
)
.expect("Error in ONNX forward pass.")
.logits
.unwrap()
}
}
}
}
pub struct SequenceClassificationModel {
tokenizer: TokenizerOption,
sequence_classifier: SequenceClassificationOption,
label_mapping: HashMap<i64, String>,
device: Device,
max_length: usize,
}
impl SequenceClassificationModel {
pub fn new(
config: SequenceClassificationConfig,
) -> Result<SequenceClassificationModel, RustBertError> {
let vocab_path = config.vocab_resource.get_local_path()?;
let merges_path = config
.merges_resource
.as_ref()
.map(|resource| resource.get_local_path())
.transpose()?;
let tokenizer = TokenizerOption::from_file(
config.model_type,
vocab_path.to_str().unwrap(),
merges_path.as_deref().map(|path| path.to_str().unwrap()),
config.lower_case,
config.strip_accents,
config.add_prefix_space,
)?;
Self::new_with_tokenizer(config, tokenizer)
}
pub fn new_with_tokenizer(
config: SequenceClassificationConfig,
tokenizer: TokenizerOption,
) -> Result<SequenceClassificationModel, RustBertError> {
let config_path = config.config_resource.get_local_path()?;
let sequence_classifier = SequenceClassificationOption::new(&config)?;
let model_config = ConfigOption::from_file(config.model_type, config_path);
let max_length = model_config
.get_max_len()
.map(|v| v as usize)
.unwrap_or(usize::MAX);
let label_mapping = model_config.get_label_mapping().clone();
let device = get_device(config.model_resource, config.device);
Ok(SequenceClassificationModel {
tokenizer,
sequence_classifier,
label_mapping,
device,
max_length,
})
}
pub fn get_tokenizer(&self) -> &TokenizerOption {
&self.tokenizer
}
pub fn get_tokenizer_mut(&mut self) -> &mut TokenizerOption {
&mut self.tokenizer
}
pub fn predict<'a, S>(&self, input: S) -> Vec<Label>
where
S: AsRef<[&'a str]>,
{
let (input_ids, token_type_ids) =
self.tokenizer
.tokenize_and_pad(input.as_ref(), self.max_length, self.device);
let output = no_grad(|| {
let output = self.sequence_classifier.forward_t(
Some(&input_ids),
None,
Some(&token_type_ids),
None,
None,
false,
);
output.softmax(-1, Kind::Float).detach().to(Device::Cpu)
});
let label_indices = output.as_ref().argmax(-1, true).squeeze_dim(1);
let scores = output
.gather(1, &label_indices.unsqueeze(-1), false)
.squeeze_dim(1);
let label_indices = label_indices.iter::<i64>().unwrap().collect::<Vec<i64>>();
let scores = scores.iter::<f64>().unwrap().collect::<Vec<f64>>();
let mut labels: Vec<Label> = vec![];
for sentence_idx in 0..label_indices.len() {
let label_string = self
.label_mapping
.get(&label_indices[sentence_idx])
.unwrap()
.clone();
let label = Label {
text: label_string,
score: scores[sentence_idx],
id: label_indices[sentence_idx],
sentence: sentence_idx,
};
labels.push(label)
}
labels
}
pub fn predict_multilabel(
&self,
input: &[&str],
threshold: f64,
) -> Result<Vec<Vec<Label>>, RustBertError> {
let (input_ids, token_type_ids) =
self.tokenizer
.tokenize_and_pad(input.as_ref(), self.max_length, self.device);
let output = no_grad(|| {
let output = self.sequence_classifier.forward_t(
Some(&input_ids),
None,
Some(&token_type_ids),
None,
None,
false,
);
output.sigmoid().detach().to(Device::Cpu)
});
let label_indices = output.as_ref().ge(threshold).nonzero();
let mut labels: Vec<Vec<Label>> = vec![];
let mut sequence_labels: Vec<Label> = vec![];
for sentence_idx in 0..label_indices.size()[0] {
let label_index_tensor = label_indices.get(sentence_idx);
let sentence_label = label_index_tensor
.iter::<i64>()
.unwrap()
.collect::<Vec<i64>>();
let (sentence, id) = (sentence_label[0], sentence_label[1]);
if sentence as usize > labels.len() {
labels.push(sequence_labels);
sequence_labels = vec![];
}
let score = output.double_value(sentence_label.as_slice());
let label_string = self.label_mapping.get(&id).unwrap().to_owned();
let label = Label {
text: label_string,
score,
id,
sentence: sentence as usize,
};
sequence_labels.push(label);
}
if !sequence_labels.is_empty() {
labels.push(sequence_labels);
}
Ok(labels)
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
#[ignore] fn test() {
let config = SequenceClassificationConfig::default();
let _: Box<dyn Send> = Box::new(SequenceClassificationModel::new(config));
}
}