#![allow(clippy::arithmetic_side_effects)]
use crate::control_types::ControlSignal;
use crate::llmosafe_classifier::{classify_text, ClassificationResult};
use crate::llmosafe_kernel::SiftedProof;
use crate::llmosafe_kernel::SiftedSynapse;
use crate::llmosafe_kernel::Synapse;
use crate::llmosafe_kernel::U16_MAX_F32;
#[derive(Debug, Clone, Copy)]
pub struct SifterOutput {
pub error_sift: f32,
pub raw_entropy: u16,
pub classifier_prob: f32,
pub has_bias: bool,
pub oov_ratio: u8,
}
impl ControlSignal for SifterOutput {
fn error(&self) -> f32 {
self.error_sift
}
fn setpoint(&self) -> f32 {
0.0
}
}
impl SifterOutput {
pub fn from_classification(classification: &ClassificationResult) -> Self {
let entropy = (U16_MAX_F32 * classification.probability.clamp(0.0, 1.0)) as u16;
Self {
error_sift: classification.probability,
raw_entropy: entropy,
classifier_prob: classification.probability,
has_bias: classification.is_manipulation,
oov_ratio: (classification.oov_ratio * 255.0_f32) as u8,
}
}
}
pub const AUTHORITY_BIAS: &[&str] = &[
"expert",
"experts",
"official",
"officials",
"government",
"doctor",
"doctors",
"scientist",
"scientists",
"guaranteed",
"certified",
"proven",
];
pub const NEGATION_WORDS: &[&str] = &[
"not",
"no",
"never",
"none",
"neither",
"nor",
"hardly",
"scarcely",
"barely",
"doesn't",
"isn't",
"wasn't",
"shouldn't",
"won't",
"don't",
];
pub const SOCIAL_PROOF: &[&str] = &[
"everyone",
"thousands",
"millions",
"trending",
"viral",
"bestseller",
"bestsellers",
"testimonials",
"consensus",
"majority",
"crowd",
];
pub const SCARCITY: &[&str] = &[
"limited",
"rare",
"exclusive",
"handcrafted",
"small-batch",
"collectible",
"once-in-a-lifetime",
"restricted",
"shortage",
"vanishing",
"low-stock",
"while-supplies-last",
"sold-out",
"member-only",
];
pub const URGENCY: &[&str] = &[
"hurry",
"rush",
"deadline",
"expiring",
"immediately",
"limited-time",
"last-chance",
"act-now",
"don't-wait",
];
pub const EMOTIONAL_APPEAL: &[&str] = &[
"fear",
"shocking",
"miracle",
"incredible",
"tragic",
"desperate",
"heartwarming",
"devastating",
"thrilling",
"terrifying",
];
pub const EXPERTISE_SIGNALING: &[&str] = &[
"sophisticated",
"cutting-edge",
"state-of-the-art",
"revolutionary",
"revolutionaries",
"patented",
"breakthrough",
"breakthroughs",
"proprietary",
"paradigm",
"holistic",
"synergy",
];
pub const SEMANTIC_TRAPS: &[&str] = &["not but", "instead of", "rather than", "on the other hand"];
pub const TEMPLATE_FITTING: &[&str] = &[
"as an ai",
"my purpose is",
"according to my instructions",
"it is important to remember",
"please note that",
"i cannot",
"i am programmed to",
];
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct BiasBreakdown {
pub authority: u16,
pub social_proof: u16,
pub scarcity: u16,
pub urgency: u16,
pub emotional_appeal: u16,
pub expertise_signaling: u16,
pub semantic_traps: u16,
pub template_fitting: u16,
pub emphasis: u16,
}
impl BiasBreakdown {
pub fn total(&self) -> u16 {
self.authority
.saturating_add(self.social_proof)
.saturating_add(self.scarcity)
.saturating_add(self.urgency)
.saturating_add(self.emotional_appeal)
.saturating_add(self.expertise_signaling)
.saturating_add(self.semantic_traps)
.saturating_add(self.template_fitting)
.saturating_add(self.emphasis)
}
}
#[inline]
fn word_in_list(word: &str, list: &[&str]) -> bool {
list.iter().any(|kw| word.eq_ignore_ascii_case(kw))
}
#[cfg(feature = "std")]
#[inline]
fn phrase_matches(window: &[&str], phrase_words: &[&str]) -> bool {
if window.len() < phrase_words.len() {
return false;
}
window[..phrase_words.len()]
.iter()
.zip(phrase_words.iter())
.all(|(a, b)| a.eq_ignore_ascii_case(b))
}
#[deprecated(
since = "0.8.0",
note = "keyword-based detection retained for backward compatibility. Use sift_perceptions() for higher accuracy (93.4% vs keyword-based) via the TF-IDF classifier."
)]
pub fn get_bias_breakdown(text: &str) -> BiasBreakdown {
let mut breakdown = BiasBreakdown::default();
let mut negation_ttl = 0u8;
for raw_word in text.split_whitespace() {
let trimmed = raw_word.trim_matches(|c: char| c.is_ascii_punctuation());
let is_negation = word_in_list(trimmed, NEGATION_WORDS);
let negated = negation_ttl > 0;
if is_negation {
negation_ttl = 6;
} else {
negation_ttl = negation_ttl.saturating_sub(1);
}
if negated {
continue;
}
if word_in_list(trimmed, AUTHORITY_BIAS) {
breakdown.authority = breakdown.authority.saturating_add(100);
}
if word_in_list(trimmed, SOCIAL_PROOF) {
breakdown.social_proof = breakdown.social_proof.saturating_add(100);
}
if word_in_list(trimmed, SCARCITY) {
breakdown.scarcity = breakdown.scarcity.saturating_add(100);
}
if word_in_list(trimmed, URGENCY) {
breakdown.urgency = breakdown.urgency.saturating_add(100);
}
if word_in_list(trimmed, EMOTIONAL_APPEAL) {
breakdown.emotional_appeal = breakdown.emotional_appeal.saturating_add(100);
}
if word_in_list(trimmed, EXPERTISE_SIGNALING) {
breakdown.expertise_signaling = breakdown.expertise_signaling.saturating_add(100);
}
if word_in_list(trimmed, SEMANTIC_TRAPS) {
breakdown.semantic_traps = breakdown.semantic_traps.saturating_add(100);
}
if word_in_list(trimmed, TEMPLATE_FITTING) {
breakdown.template_fitting = breakdown.template_fitting.saturating_add(100);
}
if trimmed.len() >= 2 && trimmed.chars().all(|c| c.is_ascii_uppercase()) {
breakdown.emphasis = breakdown.emphasis.saturating_add(50);
}
}
#[cfg(feature = "std")]
{
let tokens: Vec<&str> = text
.split_whitespace()
.map(|w| w.trim_matches(|c: char| c.is_ascii_punctuation()))
.collect();
let mut negated_positions = vec![false; tokens.len()];
let mut neg_ttl = 0u8;
for (i, token) in tokens.iter().enumerate() {
let is_neg = word_in_list(token, NEGATION_WORDS);
let curr_negated = neg_ttl > 0;
if is_neg {
neg_ttl = 6;
} else {
neg_ttl = neg_ttl.saturating_sub(1);
}
negated_positions[i] = curr_negated;
}
let mut phrase_words_buf: Vec<&str> = Vec::new();
for phrase in SEMANTIC_TRAPS {
if !phrase.contains(' ') {
continue;
}
phrase_words_buf.clear();
phrase_words_buf.extend(phrase.split_whitespace());
if tokens
.windows(phrase_words_buf.len())
.enumerate()
.any(|(i, w)| !negated_positions[i] && phrase_matches(w, &phrase_words_buf))
{
breakdown.semantic_traps = breakdown.semantic_traps.saturating_add(100);
}
}
for phrase in TEMPLATE_FITTING {
if !phrase.contains(' ') {
continue;
}
phrase_words_buf.clear();
phrase_words_buf.extend(phrase.split_whitespace());
if tokens
.windows(phrase_words_buf.len())
.enumerate()
.any(|(i, w)| !negated_positions[i] && phrase_matches(w, &phrase_words_buf))
{
breakdown.template_fitting = breakdown.template_fitting.saturating_add(100);
}
}
}
breakdown
}
#[deprecated(
since = "0.8.0",
note = "keyword-based detection retained for backward compatibility. Use sift_perceptions() for higher accuracy (93.4% vs keyword-based) via the TF-IDF classifier."
)]
pub fn calculate_halo_signal(text: &str) -> u16 {
#[allow(deprecated)]
get_bias_breakdown(text).total()
}
pub fn calculate_utility(observation: &str, objective: &str) -> u16 {
let mut obj_words = [""; 256];
let mut obj_len = 0;
for word_b in objective.split_whitespace() {
if obj_len < 256 {
obj_words[obj_len] = word_b.trim_matches(|c: char| c.is_ascii_punctuation());
obj_len += 1;
} else {
break;
}
}
let mut count = 0usize;
for word_a in observation.split_whitespace() {
let trimmed_a = word_a.trim_matches(|c: char| c.is_ascii_punctuation());
for word_b in obj_words.iter().take(obj_len) {
if trimmed_a.eq_ignore_ascii_case(word_b) {
count += 1;
break;
}
}
}
count.saturating_mul(100).min(u16::MAX as usize) as u16
}
#[allow(deprecated)]
pub(crate) fn sift_text_with_score(observation: &str) -> (SiftedSynapse, SiftedProof, f32) {
let classification = classify_text(observation);
let bias = get_bias_breakdown(observation);
let classifier_entropy = (U16_MAX_F32 * classification.probability.clamp(0.0, 1.0)) as u16;
let keyword_boost = if bias.total() > 0 {
((bias.total() as u32).saturating_mul(65535) / 9000).min(65535) as u16
} else {
0
};
let entropy = classifier_entropy.max(keyword_boost);
let surprise = (U16_MAX_F32 * classification.oov_ratio.clamp(0.0, 1.0)) as u16;
let has_bias = classification.is_manipulation || bias.total() > 0;
let mut synapse = Synapse::new();
synapse.set_raw_entropy(entropy);
synapse.set_raw_surprise(surprise);
synapse.set_has_bias(has_bias);
synapse.set_oov_ratio((classification.oov_ratio.clamp(0.0, 1.0) * 255.0) as u8);
let anchor_hash = adler32::adler32(observation.as_bytes());
synapse.set_anchor_hash(anchor_hash & 0x7FFFFFFF);
let sifted = SiftedSynapse::new(synapse);
let proof = SiftedProof::mint();
(sifted, proof, classification.score)
}
pub fn sift_text(observation: &str) -> (SiftedSynapse, SiftedProof) {
let (sifted, proof, _score) = sift_text_with_score(observation);
(sifted, proof)
}
#[allow(deprecated)]
pub fn sift_observation(
classification: &ClassificationResult,
observation: &str,
) -> (SiftedSynapse, SiftedProof) {
let bias = get_bias_breakdown(observation);
let classifier_entropy = (U16_MAX_F32 * classification.probability.clamp(0.0, 1.0)) as u16;
let keyword_boost = if bias.total() > 0 {
((bias.total() as u32).saturating_mul(65535) / 9000).min(65535) as u16
} else {
0
};
let entropy = classifier_entropy.max(keyword_boost);
let surprise = (U16_MAX_F32 * classification.oov_ratio.clamp(0.0, 1.0)) as u16;
let has_bias = classification.is_manipulation || bias.total() > 0;
let mut synapse = Synapse::new();
synapse.set_raw_entropy(entropy);
synapse.set_raw_surprise(surprise);
synapse.set_has_bias(has_bias);
synapse.set_oov_ratio((classification.oov_ratio.clamp(0.0, 1.0) * 255.0) as u8);
let anchor_hash = adler32::adler32(observation.as_bytes());
synapse.set_anchor_hash(anchor_hash & 0x7FFFFFFF);
let sifted = SiftedSynapse::new(synapse);
let proof = SiftedProof::mint();
(sifted, proof)
}
pub fn sift_perceptions(observations: &[&str], _objective: &str) -> (SiftedSynapse, SiftedProof) {
if observations.is_empty() {
let mut synapse = Synapse::new();
synapse.set_raw_entropy(0xFFFF);
synapse.set_raw_surprise(0);
synapse.set_has_bias(false);
synapse.set_anchor_hash(0);
return (SiftedSynapse::new(synapse), SiftedProof::mint());
}
let mut best_entropy: u16 = 0;
let mut best_result: Option<(SiftedSynapse, SiftedProof)> = None;
for obs in observations {
let result = sift_text(obs);
let entropy = result.0.raw_entropy();
if entropy > best_entropy {
best_entropy = entropy;
best_result = Some(result);
}
}
best_result.unwrap_or_else(|| {
let mut synapse = Synapse::new();
synapse.set_raw_entropy(0xFFFF);
synapse.set_raw_surprise(0);
synapse.set_has_bias(false);
synapse.set_anchor_hash(0);
(SiftedSynapse::new(synapse), SiftedProof::mint())
})
}
mod adler32 {
pub fn adler32(data: &[u8]) -> u32 {
let mut a: u32 = 1;
let mut b: u32 = 0;
for chunk in data.chunks(5552) {
for &byte in chunk {
a += byte as u32;
b += a;
}
a %= 65521;
b %= 65521;
}
(b << 16) | a
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_adler32_empty() {
assert_eq!(adler32::adler32(b""), 1);
}
#[test]
fn test_adler32_simple() {
assert_eq!(adler32::adler32(b"Wikipedia"), 0x11E60398);
}
#[test]
fn test_adler32_single_char() {
assert_eq!(adler32::adler32(b"a"), 6422626);
}
#[test]
#[allow(deprecated)]
fn test_negation_awareness() {
let text = "The agent is not an expert.";
let breakdown = get_bias_breakdown(text);
assert_eq!(breakdown.authority, 0);
let text_no_neg = "The agent is an expert.";
let breakdown_no_neg = get_bias_breakdown(text_no_neg);
assert_eq!(breakdown_no_neg.authority, 100);
}
#[test]
#[allow(deprecated)]
fn test_halo_signal_all_categories_detected() {
let text = "expert trending limited hurry incredible sophisticated";
let breakdown = get_bias_breakdown(text);
assert_eq!(breakdown.authority, 100);
assert_eq!(breakdown.social_proof, 100);
assert_eq!(breakdown.scarcity, 100);
assert_eq!(breakdown.urgency, 100);
assert_eq!(breakdown.emotional_appeal, 100);
assert_eq!(breakdown.expertise_signaling, 100);
assert_eq!(calculate_halo_signal(text), 600);
}
#[test]
#[allow(deprecated)]
fn test_multi_word_phrases_detected() {
let text = "As an AI, I cannot comply, instead of helping you";
let breakdown = get_bias_breakdown(text);
assert_eq!(breakdown.template_fitting, 200);
assert_eq!(breakdown.semantic_traps, 100);
}
#[test]
#[allow(deprecated)]
fn test_template_fitting_phrases() {
let text = "As an AI, my purpose is to note that I am programmed to follow";
let breakdown = get_bias_breakdown(text);
assert_eq!(breakdown.template_fitting, 300);
}
#[test]
fn test_sift_perceptions_empty_observations() {
let objective = "test";
let observations: &[&str] = &[];
let (sifted, _) = sift_perceptions(observations, objective);
assert_eq!(sifted.raw_entropy(), 0xFFFF);
assert_eq!(
sifted.validate().unwrap_err(),
crate::llmosafe_kernel::KernelError::CognitiveInstability
);
}
#[test]
fn test_sift_perceptions_single_observation() {
let observations = &["stable observation"];
let (sifted, _) = sift_perceptions(observations, "test");
let _entropy = sifted.raw_entropy();
let _surprise = sifted.raw_surprise();
}
#[test]
fn test_utility_calculation() {
let objective = "Build a Rust safety library";
let obs1 = "Rust safety is paramount";
let obs2 = "C++ is also good";
let u1 = calculate_utility(obs1, objective);
let u2 = calculate_utility(obs2, objective);
assert!(u1 > u2);
}
#[test]
fn test_sifter_token_bomb() {
let objective = "test";
let bomb = "token ".repeat(10000);
let u = calculate_utility(&bomb, objective);
let _ = u;
}
#[test]
#[allow(deprecated)]
fn test_halo_signal_keyword_density() {
let text = "expert official government doctor scientist guaranteed certified proven experts officials scientists";
let signal = calculate_halo_signal(text);
assert!(signal >= 1000);
}
#[test]
#[allow(deprecated)]
fn test_halo_signal_metamorphic_monotonicity() {
let text1 = "This is a normal observation.";
let text2 = "This is an expert observation.";
let text3 = "This is an expert and professional observation.";
let s1 = calculate_halo_signal(text1);
let s2 = calculate_halo_signal(text2);
let s3 = calculate_halo_signal(text3);
assert!(s1 <= s2);
assert!(s2 <= s3);
assert!(s3 > s1);
}
#[test]
fn test_utility_metamorphic_shuffle() {
let objective = "Safety Critical AI";
let obs1 = "Formal verification ensures deterministic execution.";
let obs2 = "execution deterministic ensures verification Formal.";
let u1 = calculate_utility(obs1, objective);
let u2 = calculate_utility(obs2, objective);
assert_eq!(u1, u2);
}
#[test]
fn test_sift_quantization_differential() {
let observations = &["Safety is paramount"];
let (sifted, _) = sift_perceptions(observations, "Safety");
let _entropy = sifted.raw_entropy();
let _surprise = sifted.raw_surprise();
}
#[test]
fn test_sift_perceptions_logic() {
let observations = &[
"Rust is the most secure language due to its ownership model",
"Python is very popular and easy to learn",
"C is a limited but performant systems language",
];
let (sifted, _) = sift_perceptions(observations, "coding language safety");
let _entropy = sifted.raw_entropy();
let _surprise = sifted.raw_surprise();
assert!(sifted.anchor_hash() != 0);
}
#[test]
#[allow(deprecated)]
fn test_negation_ttl_covers_six_tokens() {
let breakdown = get_bias_breakdown("not a very well known expert");
assert_eq!(breakdown.authority, 0, "authority should be 0 when negated");
let breakdown2 = get_bias_breakdown("a very well known expert");
assert_eq!(breakdown2.authority, 100);
}
#[test]
#[allow(deprecated)]
fn test_phase2_negation_multi_word() {
let breakdown = get_bias_breakdown("not as an ai");
assert_eq!(
breakdown.template_fitting, 0,
"template_fitting should be 0 when negated"
);
let breakdown2 = get_bias_breakdown("not as an ai");
assert_eq!(breakdown2.semantic_traps, 0);
}
#[test]
#[allow(deprecated)]
fn test_while_not_a_semantic_trap() {
let breakdown = get_bias_breakdown("while processing data");
assert_eq!(
breakdown.semantic_traps, 0,
"while should not trigger semantic_traps"
);
}
#[test]
#[allow(deprecated)]
fn test_sift_observation_produces_valid_synapse() {
let class_result = ClassificationResult {
score: 2.5,
probability: 0.92,
is_manipulation: true,
oov_ratio: 0.15,
tokens_matched: 8,
tokens_total: 10,
};
let (sifted, _proof) = sift_observation(&class_result, "test observation text");
let entropy = sifted.raw_entropy();
assert!(
entropy > 50000,
"sifted entropy should be high for p=0.92 manipulation: {}",
entropy
);
let surprise = sifted.raw_surprise();
assert!(
surprise > 5000,
"sifted surprise should be non-zero for oov_ratio=0.15: {}",
surprise
);
assert!(
sifted.has_bias(),
"has_bias should be true for manipulation"
);
assert_ne!(sifted.anchor_hash(), 0);
}
#[test]
#[allow(deprecated)]
fn test_sift_observation_empty_text() {
let class_result = ClassificationResult {
score: 0.0,
probability: 0.5,
is_manipulation: false,
oov_ratio: 0.0,
tokens_matched: 0,
tokens_total: 0,
};
let (sifted, _proof) = sift_observation(&class_result, "");
let entropy = sifted.raw_entropy();
assert!(
entropy > 0,
"sifted entropy should be > 0 even for empty text"
);
assert_ne!(sifted.anchor_hash(), 0);
}
#[test]
fn test_calculate_utility_empty_observation() {
let utility = calculate_utility("", "safety objective");
assert_eq!(utility, 0);
}
#[test]
fn test_calculate_utility_empty_objective() {
let utility = calculate_utility("some observation text", "");
assert_eq!(utility, 0);
}
#[test]
fn test_sifter_output_from_classification() {
let class_result = ClassificationResult {
score: 1.5,
probability: 0.82,
is_manipulation: true,
oov_ratio: 0.25,
tokens_matched: 5,
tokens_total: 10,
};
let output = SifterOutput::from_classification(&class_result);
assert!((output.error_sift - 0.82).abs() < 0.01);
assert!(output.raw_entropy > 50000);
assert!(output.has_bias);
assert_eq!(output.classifier_prob, 0.82);
assert!(output.oov_ratio > 0);
}
}