#![allow(clippy::arithmetic_side_effects)]
use crate::control_types::ControlSignal;
use crate::llmosafe_kernel::{
CognitiveEntropy, KernelError, SiftedProof, SiftedSynapse, ValidatedProof, ValidatedSynapse,
};
#[derive(Debug, Clone, Copy)]
pub struct MemoryOutput {
pub error_mem: f32,
pub trend: f64,
pub mean_entropy: f64,
}
impl ControlSignal for MemoryOutput {
fn error(&self) -> f32 {
self.error_mem
}
fn setpoint(&self) -> f32 {
(self.mean_entropy / 65535.0) as f32
}
}
pub struct WorkingMemory<const SIZE: usize = 64> {
state: [CognitiveEntropy<28, 2>; SIZE],
current_index: usize,
surprise_threshold: i128,
}
impl<const SIZE: usize> WorkingMemory<SIZE> {
const _SIZE_CHECK: () = assert!(SIZE > 0, "WorkingMemory size must be > 0");
pub const fn new(threshold: i128) -> Self {
Self {
state: [CognitiveEntropy::new(0); SIZE],
current_index: 0,
surprise_threshold: threshold,
}
}
pub fn update(
&mut self,
sifted: SiftedSynapse,
_proof: SiftedProof,
) -> Result<(ValidatedSynapse, ValidatedProof), KernelError> {
sifted.validate()?;
if sifted.surprise() > self.surprise_threshold {
return Err(KernelError::HallucinationDetected);
}
self.state[self.current_index] = sifted.entropy();
let prev_index = self.current_index;
self.current_index = (self.current_index + 1) % SIZE;
let validated = ValidatedSynapse::new(sifted.into_inner());
let validated_proof = ValidatedProof(());
debug_assert!(
prev_index < SIZE,
"CMIT: memory index {} overflowed SIZE={}",
prev_index,
SIZE,
);
Ok((validated, validated_proof))
}
pub fn mean_entropy(&self) -> f64 {
let sum: i128 = self.state.iter().map(CognitiveEntropy::mantissa).sum();
sum as f64 / SIZE as f64
}
pub fn entropy_variance(&self) -> f64 {
let mean = self.mean_entropy();
let variance_sum: f64 = self
.state
.iter()
.map(|e| {
let diff = e.mantissa() as f64 - mean;
diff * diff
})
.sum();
variance_sum / SIZE as f64
}
pub fn trend(&self) -> f64 {
let n = SIZE as f64;
let mut sum_y = 0.0;
let mut sum_x_times_y = 0.0;
for offset in 0..SIZE {
let idx = (self.current_index + offset) % SIZE;
let x = offset as f64;
let y = self.state[idx].mantissa() as f64;
sum_y += y;
sum_x_times_y += x * y;
}
let sum_x = (n * (n - 1.0)) / 2.0;
let sum_xx = (n * (n - 1.0) * (2.0 * n - 1.0)) / 6.0;
let denominator = n * sum_xx - sum_x * sum_x;
if denominator == 0.0 {
return 0.0;
}
(n * sum_x_times_y - sum_x * sum_y) / denominator
}
pub fn is_drifting(&self, threshold: f64) -> bool {
self.trend().abs() > threshold
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::llmosafe_kernel::{SiftedProof, Synapse};
#[test]
fn test_homeostatic_stats() {
let mut memory = WorkingMemory::<4>::new(1000);
for i in 0..4 {
let mut synapse = Synapse::new();
synapse.set_raw_entropy(100 * (i + 1) as u16);
let sifted = SiftedSynapse::new(synapse);
memory.update(sifted, SiftedProof::for_testing()).unwrap();
}
assert_eq!(memory.mean_entropy(), 250.0);
assert!((memory.trend() - 100.0).abs() < 0.01);
assert!(memory.is_drifting(10.0));
}
#[test]
fn test_memory_update_gating() {
let mut memory = WorkingMemory::<4>::new(500);
let mut synapse = Synapse::new();
synapse.set_raw_entropy(400);
synapse.set_raw_surprise(100);
synapse.set_has_bias(false);
let sifted = SiftedSynapse::new(synapse);
assert!(memory.update(sifted, SiftedProof::for_testing()).is_ok());
let mut synapse = Synapse::new();
synapse.set_raw_entropy(400);
synapse.set_raw_surprise(600);
synapse.set_has_bias(false);
let sifted = SiftedSynapse::new(synapse);
assert_eq!(
memory
.update(sifted, SiftedProof::for_testing())
.unwrap_err(),
KernelError::HallucinationDetected
);
let mut synapse = Synapse::new();
synapse.set_raw_entropy(400);
synapse.set_raw_surprise(100);
synapse.set_has_bias(true);
let sifted = SiftedSynapse::new(synapse);
assert_eq!(
memory
.update(sifted, SiftedProof::for_testing())
.unwrap_err(),
KernelError::BiasHaloDetected
);
let mut synapse = Synapse::new();
synapse.set_raw_entropy(50001);
synapse.set_raw_surprise(100);
synapse.set_has_bias(false);
let sifted = SiftedSynapse::new(synapse);
assert_eq!(
memory
.update(sifted, SiftedProof::for_testing())
.unwrap_err(),
KernelError::CognitiveInstability
);
}
#[test]
fn test_working_memory_size_1() {
let mut memory = WorkingMemory::<1>::new(1000);
let mut s1 = Synapse::new();
s1.set_raw_entropy(100);
let sifted1 = SiftedSynapse::new(s1);
memory.update(sifted1, SiftedProof::for_testing()).unwrap();
assert!(memory.state[0].is_stable(100));
let mut s2 = Synapse::new();
s2.set_raw_entropy(200);
let sifted2 = SiftedSynapse::new(s2);
memory.update(sifted2, SiftedProof::for_testing()).unwrap();
assert!(memory.state[0].is_stable(200));
assert_eq!(memory.current_index, 0);
}
#[test]
fn test_memory_new_max_threshold() {
let memory = WorkingMemory::<64>::new(i128::MAX);
assert_eq!(memory.surprise_threshold, i128::MAX);
}
#[test]
fn test_memory_zero_threshold() {
let mut memory = WorkingMemory::<64>::new(0);
let mut synapse = Synapse::new();
synapse.set_raw_surprise(1);
let sifted = SiftedSynapse::new(synapse);
assert_eq!(
memory
.update(sifted, SiftedProof::for_testing())
.unwrap_err(),
KernelError::HallucinationDetected
);
}
#[test]
fn test_memory_negative_threshold() {
let mut memory = WorkingMemory::<64>::new(-1);
let synapse = Synapse::new();
let sifted = SiftedSynapse::new(synapse);
assert_eq!(
memory
.update(sifted, SiftedProof::for_testing())
.unwrap_err(),
KernelError::HallucinationDetected
);
}
}
#[cfg(test)]
mod proptests {
use super::*;
use crate::llmosafe_kernel::{SiftedProof, Synapse};
use proptest::prelude::*;
proptest! {
#[test]
fn test_working_memory_random_synapse_sequence(
entropies in prop::collection::vec(0u16..800u16, 1..200)
) {
let mut memory = WorkingMemory::<64>::new(1000);
for e in entropies {
let mut synapse = Synapse::new();
synapse.set_raw_entropy(e);
let sifted = SiftedSynapse::new(synapse);
prop_assert!(memory.update(sifted, SiftedProof::for_testing()).is_ok());
}
}
}
}
#[cfg(feature = "std")]
pub mod cognitive_memory {
use super::*;
use crate::llmosafe_kernel::Synapse;
use std::sync::Mutex;
static GLOBAL_MEMORY: Mutex<WorkingMemory<64>> = Mutex::new(WorkingMemory::<64>::new(58000));
pub fn process_state_update(synapse_bits: u128) -> i32 {
let synapse = Synapse::from_raw_u128(synapse_bits);
let sifted = SiftedSynapse::new(synapse);
let proof = SiftedProof::from_raw_bits_bypass();
let mut memory = GLOBAL_MEMORY
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
match memory.update(sifted, proof) {
Ok(_) => 0,
Err(KernelError::DepthExceeded) => -1,
Err(KernelError::CognitiveInstability) => -2,
Err(KernelError::BiasHaloDetected) => -3,
Err(KernelError::HallucinationDetected) => -4,
Err(KernelError::ResourceExhaustion) => -5,
Err(KernelError::SelfMemoryExceeded) => -6,
Err(KernelError::DeadlineExceeded) => -7,
}
}
pub fn get_memory_stats() -> (f64, f64, f64, bool) {
let memory = GLOBAL_MEMORY
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let mean = memory.mean_entropy();
let variance = memory.entropy_variance();
let trend = memory.trend();
let is_drifting = memory.is_drifting(10.0);
(mean, variance, trend, is_drifting)
}
}