use crate::consolidation::{ConsolidatedEntry, ConsolidationPolicy, ConsolidationResult};
use crate::delta::{Delta, DeltaType};
use crate::error::Result;
use crate::hash_chain::HashChain;
use crate::plasticity::PlasticityRule;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::Path;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum ThermalState {
#[default]
Hot,
Cold,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ThermalConfig {
pub crystallization_threshold: f32,
pub min_observations: usize,
pub prune_threshold: f32,
pub allow_warming: bool,
pub warming_delta: f32,
}
impl Default for ThermalConfig {
fn default() -> Self {
Self {
crystallization_threshold: 0.75,
min_observations: 3,
prune_threshold: 0.1,
allow_warming: true,
warming_delta: 0.3,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Thermogram {
pub id: String,
pub name: String,
pub hot_entries: HashMap<String, ConsolidatedEntry>,
pub cold_entries: HashMap<String, ConsolidatedEntry>,
pub dirty_chain: HashChain,
pub plasticity_rule: PlasticityRule,
pub consolidation_policy: ConsolidationPolicy,
pub thermal_config: ThermalConfig,
pub metadata: ThermogramMetadata,
}
#[derive(Debug, Clone, Default)]
pub struct CrystallizationResult {
pub crystallized: usize,
pub merged: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ThermogramMetadata {
pub created_at: DateTime<Utc>,
pub last_consolidation: DateTime<Utc>,
pub total_deltas: usize,
pub total_consolidations: usize,
pub custom: serde_json::Value,
}
impl Thermogram {
pub fn new(name: impl Into<String>, plasticity_rule: PlasticityRule) -> Self {
let now = Utc::now();
Self {
id: uuid::Uuid::new_v4().to_string(),
name: name.into(),
hot_entries: HashMap::new(),
cold_entries: HashMap::new(),
dirty_chain: HashChain::new(),
plasticity_rule,
consolidation_policy: ConsolidationPolicy::default(),
thermal_config: ThermalConfig::default(),
metadata: ThermogramMetadata {
created_at: now,
last_consolidation: now,
total_deltas: 0,
total_consolidations: 0,
custom: serde_json::Value::Null,
},
}
}
pub fn with_thermal_config(
name: impl Into<String>,
plasticity_rule: PlasticityRule,
thermal_config: ThermalConfig,
) -> Self {
let mut thermo = Self::new(name, plasticity_rule);
thermo.thermal_config = thermal_config;
thermo
}
pub fn apply_delta(&mut self, delta: Delta) -> Result<()> {
self.dirty_chain.append(delta)?;
self.metadata.total_deltas += 1;
if self.should_consolidate() {
self.consolidate()?;
}
Ok(())
}
pub fn read(&self, key: &str) -> Result<Option<Vec<u8>>> {
if let Some(delta) = self.dirty_chain.get_latest(key) {
match delta.delta_type {
DeltaType::Create | DeltaType::Update | DeltaType::Merge => {
return Ok(Some(delta.value.clone()));
}
DeltaType::Delete => {
return Ok(None);
}
}
}
if let Some(entry) = self.hot_entries.get(key) {
return Ok(Some(entry.value.clone()));
}
Ok(self.cold_entries.get(key).map(|entry| entry.value.clone()))
}
pub fn read_with_strength(&self, key: &str) -> Result<Option<(Vec<u8>, f32)>> {
if let Some(delta) = self.dirty_chain.get_latest(key) {
match delta.delta_type {
DeltaType::Create | DeltaType::Update | DeltaType::Merge => {
return Ok(Some((delta.value.clone(), delta.metadata.strength)));
}
DeltaType::Delete => {
return Ok(None);
}
}
}
if let Some(entry) = self.hot_entries.get(key) {
return Ok(Some((entry.value.clone(), entry.strength)));
}
Ok(self
.cold_entries
.get(key)
.map(|entry| (entry.value.clone(), entry.strength)))
}
pub fn read_with_state(&self, key: &str) -> Result<Option<(Vec<u8>, f32, ThermalState)>> {
if let Some(delta) = self.dirty_chain.get_latest(key) {
match delta.delta_type {
DeltaType::Create | DeltaType::Update | DeltaType::Merge => {
return Ok(Some((delta.value.clone(), delta.metadata.strength, ThermalState::Hot)));
}
DeltaType::Delete => {
return Ok(None);
}
}
}
if let Some(entry) = self.hot_entries.get(key) {
return Ok(Some((entry.value.clone(), entry.strength, ThermalState::Hot)));
}
Ok(self
.cold_entries
.get(key)
.map(|entry| (entry.value.clone(), entry.strength, ThermalState::Cold)))
}
pub fn keys(&self) -> Vec<String> {
let mut keys: Vec<String> = self.cold_entries.keys().cloned().collect();
for key in self.hot_entries.keys() {
if !keys.contains(key) {
keys.push(key.clone());
}
}
for delta in &self.dirty_chain.deltas {
if !keys.contains(&delta.key) && delta.delta_type != DeltaType::Delete {
keys.push(delta.key.clone());
}
}
keys
}
pub fn hot_keys(&self) -> Vec<String> {
self.hot_entries.keys().cloned().collect()
}
pub fn cold_keys(&self) -> Vec<String> {
self.cold_entries.keys().cloned().collect()
}
pub fn history(&self, key: &str) -> Vec<&Delta> {
self.dirty_chain.get_history(key)
}
pub fn should_consolidate(&self) -> bool {
let dirty_size = self.dirty_chain.len();
let dirty_bytes = self.estimate_dirty_size();
self.consolidation_policy.should_consolidate(
dirty_size,
&self.metadata.last_consolidation,
dirty_bytes,
)
}
pub fn consolidate(&mut self) -> Result<ConsolidationResult> {
let (new_hot_state, mut result) = crate::consolidation::consolidate(
&self.dirty_chain.deltas,
&self.hot_entries,
&self.plasticity_rule,
&self.consolidation_policy,
)?;
self.hot_entries = new_hot_state;
self.dirty_chain = HashChain::new();
self.metadata.last_consolidation = Utc::now();
self.metadata.total_consolidations += 1;
let crystal_result = self.crystallize()?;
result.entries_merged += crystal_result.crystallized;
Ok(result)
}
pub fn crystallize(&mut self) -> Result<CrystallizationResult> {
let mut result = CrystallizationResult::default();
let mut keys_to_crystallize = Vec::new();
for (key, entry) in &self.hot_entries {
if entry.strength >= self.thermal_config.crystallization_threshold
&& entry.update_count >= self.thermal_config.min_observations
{
keys_to_crystallize.push(key.clone());
}
}
for key in keys_to_crystallize {
if let Some(entry) = self.hot_entries.remove(&key) {
if let Some(cold_entry) = self.cold_entries.get_mut(&key) {
cold_entry.strength = cold_entry.strength * 0.3 + entry.strength * 0.7;
cold_entry.value = entry.value;
cold_entry.updated_at = entry.updated_at;
cold_entry.update_count += entry.update_count;
result.merged += 1;
} else {
self.cold_entries.insert(key, entry);
result.crystallized += 1;
}
}
}
Ok(result)
}
pub fn warm(&mut self, key: &str) -> Result<bool> {
if !self.thermal_config.allow_warming {
return Ok(false);
}
if let Some(mut entry) = self.cold_entries.remove(key) {
entry.strength = (entry.strength - self.thermal_config.warming_delta * 0.1).max(0.1);
entry.updated_at = Utc::now();
self.hot_entries.insert(key.to_string(), entry);
Ok(true)
} else {
Ok(false)
}
}
pub fn warm_matching<F>(&mut self, predicate: F) -> Result<usize>
where
F: Fn(&str, &ConsolidatedEntry) -> bool,
{
if !self.thermal_config.allow_warming {
return Ok(0);
}
let keys_to_warm: Vec<String> = self
.cold_entries
.iter()
.filter(|(k, v)| predicate(k, v))
.map(|(k, _)| k.clone())
.collect();
let mut warmed = 0;
for key in keys_to_warm {
if self.warm(&key)? {
warmed += 1;
}
}
Ok(warmed)
}
pub fn prune_hot(&mut self) -> usize {
let before = self.hot_entries.len();
self.hot_entries.retain(|_, entry| {
entry.strength >= self.thermal_config.prune_threshold
});
before - self.hot_entries.len()
}
fn estimate_dirty_size(&self) -> usize {
self.dirty_chain
.deltas
.iter()
.map(|d| d.value.len() + d.key.len() + 200) .sum()
}
pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
let path = path.as_ref();
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let json = serde_json::to_string_pretty(self)?;
std::fs::write(path, json)?;
Ok(())
}
pub fn load(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref();
let json = std::fs::read_to_string(path)?;
let thermo: Thermogram = serde_json::from_str(&json)?;
thermo.dirty_chain.verify()?;
Ok(thermo)
}
pub fn stats(&self) -> ThermogramStats {
ThermogramStats {
total_keys: self.keys().len(),
hot_entries: self.hot_entries.len(),
cold_entries: self.cold_entries.len(),
dirty_deltas: self.dirty_chain.len(),
total_deltas_lifetime: self.metadata.total_deltas,
total_consolidations: self.metadata.total_consolidations,
created_at: self.metadata.created_at,
last_consolidation: self.metadata.last_consolidation,
estimated_size_bytes: self.estimate_size(),
}
}
fn estimate_size(&self) -> usize {
let hot_size: usize = self
.hot_entries
.values()
.map(|e| e.value.len() + e.key.len() + 100)
.sum();
let cold_size: usize = self
.cold_entries
.values()
.map(|e| e.value.len() + e.key.len() + 100)
.sum();
hot_size + cold_size + self.estimate_dirty_size()
}
}
#[derive(Debug, Clone)]
pub struct ThermogramStats {
pub total_keys: usize,
pub hot_entries: usize,
pub cold_entries: usize,
pub dirty_deltas: usize,
pub total_deltas_lifetime: usize,
pub total_consolidations: usize,
pub created_at: DateTime<Utc>,
pub last_consolidation: DateTime<Utc>,
pub estimated_size_bytes: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_create_thermogram() {
let thermo = Thermogram::new("test", PlasticityRule::stdp_like());
assert_eq!(thermo.name, "test");
assert!(thermo.hot_entries.is_empty());
assert!(thermo.cold_entries.is_empty());
assert!(thermo.dirty_chain.is_empty());
}
#[test]
fn test_apply_and_read() {
let mut thermo = Thermogram::new("test", PlasticityRule::stdp_like());
let delta = Delta::create("key1", b"value1".to_vec(), "source");
thermo.apply_delta(delta).unwrap();
let value = thermo.read("key1").unwrap();
assert_eq!(value, Some(b"value1".to_vec()));
}
#[test]
fn test_update() {
let mut thermo = Thermogram::new("test", PlasticityRule::stdp_like());
let delta1 = Delta::create("key1", b"value1".to_vec(), "source");
thermo.apply_delta(delta1).unwrap();
let prev_hash = thermo.dirty_chain.head_hash.clone();
let delta2 = Delta::update("key1", b"value2".to_vec(), "source", 0.8, prev_hash);
thermo.apply_delta(delta2).unwrap();
let value = thermo.read("key1").unwrap();
assert_eq!(value, Some(b"value2".to_vec()));
}
#[test]
fn test_delete() {
let mut thermo = Thermogram::new("test", PlasticityRule::stdp_like());
let delta1 = Delta::create("key1", b"value1".to_vec(), "source");
thermo.apply_delta(delta1).unwrap();
let prev_hash = thermo.dirty_chain.head_hash.clone();
let delta2 = Delta::delete("key1", "source", prev_hash);
thermo.apply_delta(delta2).unwrap();
let value = thermo.read("key1").unwrap();
assert_eq!(value, None);
}
#[test]
fn test_manual_consolidation() {
let mut thermo = Thermogram::new("test", PlasticityRule::stdp_like());
let delta = Delta::create("key1", b"value1".to_vec(), "source");
thermo.apply_delta(delta).unwrap();
let result = thermo.consolidate().unwrap();
assert_eq!(result.deltas_processed, 1);
assert_eq!(thermo.hot_entries.len(), 1);
assert_eq!(thermo.dirty_chain.len(), 0);
}
#[test]
fn test_crystallization() {
let mut thermo = Thermogram::new("test", PlasticityRule::stdp_like());
thermo.thermal_config.crystallization_threshold = 0.7;
thermo.thermal_config.min_observations = 2;
let mut delta = Delta::create("key1", b"value1".to_vec(), "source");
delta.metadata.strength = 0.9;
thermo.apply_delta(delta).unwrap();
thermo.consolidate().unwrap();
let prev_hash = thermo.dirty_chain.head_hash.clone();
let mut delta2 = Delta::update("key1", b"value1".to_vec(), "source", 0.9, prev_hash);
delta2.metadata.strength = 0.9;
thermo.apply_delta(delta2).unwrap();
thermo.consolidate().unwrap();
assert_eq!(thermo.cold_entries.len(), 1);
assert!(thermo.hot_entries.is_empty());
}
#[test]
fn test_warming() {
let mut thermo = Thermogram::new("test", PlasticityRule::stdp_like());
thermo.cold_entries.insert(
"cold_key".to_string(),
ConsolidatedEntry {
key: "cold_key".to_string(),
value: b"cold_value".to_vec(),
strength: 0.8,
ternary_strength: None,
updated_at: Utc::now(),
update_count: 5,
},
);
let warmed = thermo.warm("cold_key").unwrap();
assert!(warmed);
assert!(thermo.cold_entries.is_empty());
assert_eq!(thermo.hot_entries.len(), 1);
}
#[test]
fn test_thermal_state_read() {
let mut thermo = Thermogram::new("test", PlasticityRule::stdp_like());
thermo.hot_entries.insert(
"hot_key".to_string(),
ConsolidatedEntry {
key: "hot_key".to_string(),
value: b"hot".to_vec(),
strength: 0.5,
ternary_strength: None,
updated_at: Utc::now(),
update_count: 1,
},
);
thermo.cold_entries.insert(
"cold_key".to_string(),
ConsolidatedEntry {
key: "cold_key".to_string(),
value: b"cold".to_vec(),
strength: 0.9,
ternary_strength: None,
updated_at: Utc::now(),
update_count: 10,
},
);
let (_, _, state) = thermo.read_with_state("hot_key").unwrap().unwrap();
assert_eq!(state, ThermalState::Hot);
let (_, _, state) = thermo.read_with_state("cold_key").unwrap().unwrap();
assert_eq!(state, ThermalState::Cold);
}
#[test]
fn test_save_load() {
use tempfile::tempdir;
let dir = tempdir().unwrap();
let path = dir.path().join("test.thermo");
let mut thermo = Thermogram::new("test", PlasticityRule::stdp_like());
let delta = Delta::create("key1", b"value1".to_vec(), "source");
thermo.apply_delta(delta).unwrap();
thermo.save(&path).unwrap();
let loaded = Thermogram::load(&path).unwrap();
assert_eq!(loaded.name, "test");
assert_eq!(loaded.read("key1").unwrap(), Some(b"value1".to_vec()));
}
}