use crate::{
consolidation::ConsolidatedEntry,
core::{Thermogram, ThermalConfig, ThermalState},
delta::Delta,
error::Result,
plasticity::PlasticityRule,
};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use ternary_signal::Signal;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ColonyMetadata {
pub id: String,
pub name: String,
pub created: DateTime<Utc>,
pub modified: DateTime<Utc>,
pub total_entries: usize,
pub split_count: usize,
pub merge_count: usize,
}
impl ColonyMetadata {
pub fn new(id: impl Into<String>, name: impl Into<String>) -> Self {
let now = Utc::now();
Self {
id: id.into(),
name: name.into(),
created: now,
modified: now,
total_entries: 0,
split_count: 0,
merge_count: 0,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ColonyConsolidationResult {
pub promoted: usize,
pub demoted: usize,
pub pruned: usize,
pub rebalanced: usize,
pub splits: usize,
pub merges: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ColonyConfig {
pub split_threshold: usize,
pub merge_threshold: usize,
pub balance_target: f32,
pub balance_tolerance: f32,
pub max_members: usize,
pub thermal_config: ThermalConfig,
}
impl Default for ColonyConfig {
fn default() -> Self {
Self {
split_threshold: 10000,
merge_threshold: 1000,
balance_target: 0.5,
balance_tolerance: 0.3,
max_members: 16,
thermal_config: ThermalConfig::default(),
}
}
}
impl ColonyConfig {
pub fn fast_learner() -> Self {
Self {
split_threshold: 5000,
merge_threshold: 500,
balance_target: 0.5,
balance_tolerance: 0.25,
max_members: 8,
thermal_config: ThermalConfig::fast_learner(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ThermogramColony {
pub members: Vec<Thermogram>,
pub config: ColonyConfig,
pub metadata: ColonyMetadata,
#[serde(default)]
key_locality: HashMap<String, usize>,
}
impl ThermogramColony {
pub fn new(id: impl Into<String>, name: impl Into<String>, rule: PlasticityRule) -> Self {
let id = id.into();
let name = name.into();
let config = ColonyConfig::default();
let first_member = Thermogram::with_thermal_config(
format!("{}_0", id),
rule,
config.thermal_config.clone(),
);
Self {
members: vec![first_member],
config,
metadata: ColonyMetadata::new(&id, &name),
key_locality: HashMap::new(),
}
}
pub fn with_config(
id: impl Into<String>,
name: impl Into<String>,
rule: PlasticityRule,
config: ColonyConfig,
) -> Self {
let id = id.into();
let name = name.into();
let first_member = Thermogram::with_thermal_config(
format!("{}_0", id),
rule,
config.thermal_config.clone(),
);
Self {
members: vec![first_member],
config,
metadata: ColonyMetadata::new(&id, &name),
key_locality: HashMap::new(),
}
}
pub fn total_entries(&self) -> usize {
self.members.iter().map(|m| m.stats().total_keys).sum()
}
pub fn member_count(&self) -> usize {
self.members.len()
}
pub fn read(&self, key: &str) -> Option<&ConsolidatedEntry> {
if let Some(&idx) = self.key_locality.get(key) {
if let Some(member) = self.members.get(idx) {
if let Some(entry) = Self::find_entry_in_member(member, key) {
return Some(entry);
}
}
}
for member in &self.members {
if let Some(entry) = Self::find_entry_in_member(member, key) {
return Some(entry);
}
}
None
}
fn find_entry_in_member<'a>(member: &'a Thermogram, key: &str) -> Option<&'a ConsolidatedEntry> {
for state in ThermalState::all() {
if let Some(entry) = member.entries(state).get(key) {
return Some(entry);
}
}
None
}
fn key_exists_in_member(member: &Thermogram, key: &str) -> bool {
ThermalState::all().iter().any(|state| member.entries(*state).contains_key(key))
}
pub fn read_layer(&self, key: &str, state: ThermalState) -> Option<&ConsolidatedEntry> {
if let Some(&idx) = self.key_locality.get(key) {
if let Some(member) = self.members.get(idx) {
if let Some(entry) = member.entries(state).get(key) {
return Some(entry);
}
}
}
for member in &self.members {
if let Some(entry) = member.entries(state).get(key) {
return Some(entry);
}
}
None
}
pub fn write(&mut self, key: &str, entry: ConsolidatedEntry, state: ThermalState) {
let target_idx = self.select_member_for_write(key);
self.key_locality.insert(key.to_string(), target_idx);
if let Some(member) = self.members.get_mut(target_idx) {
member.entries_mut(state).insert(key.to_string(), entry);
}
self.metadata.modified = Utc::now();
}
pub fn apply_delta(&mut self, delta: Delta) -> Result<()> {
let key = delta.key.clone();
let target_idx = self.select_member_for_write(&key);
self.key_locality.insert(key, target_idx);
if let Some(member) = self.members.get_mut(target_idx) {
member.apply_delta(delta)?;
}
self.metadata.modified = Utc::now();
Ok(())
}
fn select_member_for_write(&self, key: &str) -> usize {
if let Some(&idx) = self.key_locality.get(key) {
if idx < self.members.len() {
return idx;
}
}
let prefix = key.split('_').next().unwrap_or(key);
let mut best_idx = 0;
let mut best_score = 0usize;
for (idx, member) in self.members.iter().enumerate() {
let score = member
.hot_entries
.keys()
.chain(member.warm_entries.keys())
.chain(member.cool_entries.keys())
.chain(member.cold_entries.keys())
.filter(|k| k.starts_with(prefix))
.count();
if score > best_score {
best_score = score;
best_idx = idx;
}
}
if best_score == 0 {
let mut min_entries = usize::MAX;
for (idx, member) in self.members.iter().enumerate() {
let entries = member.stats().total_keys;
if entries < min_entries {
min_entries = entries;
best_idx = idx;
}
}
}
best_idx
}
pub fn consolidate(&mut self) -> Result<ColonyConsolidationResult> {
let mut result = ColonyConsolidationResult::default();
for member in &mut self.members {
member.consolidate()?;
}
for member in &mut self.members {
member.run_thermal_transitions()?;
}
result.splits = self.check_and_split()?;
result.merges = self.check_and_merge()?;
result.rebalanced = self.rebalance()?;
self.metadata.total_entries = self.total_entries();
self.metadata.modified = Utc::now();
Ok(result)
}
fn check_and_split(&mut self) -> Result<usize> {
if self.members.len() >= self.config.max_members {
return Ok(0);
}
let mut splits = 0;
let mut indices_to_split = Vec::new();
for (idx, member) in self.members.iter().enumerate() {
if member.stats().total_keys > self.config.split_threshold {
indices_to_split.push(idx);
}
}
for idx in indices_to_split.into_iter().rev() {
if self.members.len() >= self.config.max_members {
break;
}
self.split_member(idx)?;
splits += 1;
}
self.metadata.split_count += splits;
Ok(splits)
}
fn split_member(&mut self, idx: usize) -> Result<()> {
let member = &self.members[idx];
let new_id = format!("{}_{}", self.metadata.id, self.members.len());
let mut new_member = Thermogram::with_thermal_config(
new_id,
member.plasticity_rule.clone(),
self.config.thermal_config.clone(),
);
let entries_to_move = self.select_entries_for_split(idx);
for (key, state) in entries_to_move {
if let Some(entry) = self.members[idx].entries_mut(state).remove(&key) {
new_member.entries_mut(state).insert(key.clone(), entry);
self.key_locality
.insert(key, self.members.len());
}
}
self.members.push(new_member);
Ok(())
}
fn select_entries_for_split(&self, idx: usize) -> Vec<(String, ThermalState)> {
let member = &self.members[idx];
let mut entries = Vec::new();
for (key, _) in &member.hot_entries {
entries.push((key.clone(), ThermalState::Hot));
}
for (key, _) in &member.warm_entries {
entries.push((key.clone(), ThermalState::Warm));
}
for (key, _) in &member.cool_entries {
entries.push((key.clone(), ThermalState::Cool));
}
for (key, _) in &member.cold_entries {
entries.push((key.clone(), ThermalState::Cold));
}
entries.sort_by(|a, b| a.0.cmp(&b.0));
let split_point = entries.len() / 2;
entries.into_iter().skip(split_point).collect()
}
fn check_and_merge(&mut self) -> Result<usize> {
if self.members.len() <= 1 {
return Ok(0);
}
let mut merges = 0;
loop {
let small_indices: Vec<usize> = self
.members
.iter()
.enumerate()
.filter(|(_, m)| m.stats().total_keys < self.config.merge_threshold)
.map(|(i, _)| i)
.collect();
if small_indices.len() < 2 {
break;
}
let idx_b = small_indices[1];
let idx_a = small_indices[0];
self.merge_members(idx_a, idx_b)?;
merges += 1;
}
self.metadata.merge_count += merges;
Ok(merges)
}
fn merge_members(&mut self, keep_idx: usize, remove_idx: usize) -> Result<()> {
let removed = self.members.remove(remove_idx);
for (key, entry) in removed.hot_entries {
self.members[keep_idx]
.hot_entries
.insert(key.clone(), entry);
self.key_locality.insert(key, keep_idx);
}
for (key, entry) in removed.warm_entries {
self.members[keep_idx]
.warm_entries
.insert(key.clone(), entry);
self.key_locality.insert(key, keep_idx);
}
for (key, entry) in removed.cool_entries {
self.members[keep_idx]
.cool_entries
.insert(key.clone(), entry);
self.key_locality.insert(key, keep_idx);
}
for (key, entry) in removed.cold_entries {
self.members[keep_idx]
.cold_entries
.insert(key.clone(), entry);
self.key_locality.insert(key, keep_idx);
}
for (_, idx) in self.key_locality.iter_mut() {
if *idx > remove_idx {
*idx -= 1;
}
}
Ok(())
}
fn rebalance(&mut self) -> Result<usize> {
if self.members.len() <= 1 {
return Ok(0);
}
let total = self.total_entries();
if total == 0 {
return Ok(0);
}
let target_per_member = total / self.members.len();
let tolerance = (target_per_member as f32 * self.config.balance_tolerance) as usize;
let mut moved = 0;
let mut overloaded: Vec<usize> = Vec::new();
let mut underloaded: Vec<usize> = Vec::new();
for (idx, member) in self.members.iter().enumerate() {
let count = member.stats().total_keys;
if count > target_per_member + tolerance {
overloaded.push(idx);
} else if count < target_per_member.saturating_sub(tolerance) {
underloaded.push(idx);
}
}
for &over_idx in &overloaded {
for &under_idx in &underloaded {
let over_count = self.members[over_idx].stats().total_keys;
let under_count = self.members[under_idx].stats().total_keys;
if over_count <= target_per_member + tolerance {
break;
}
let to_move = (over_count - target_per_member).min(target_per_member - under_count);
moved += self.move_entries(over_idx, under_idx, to_move);
}
}
Ok(moved)
}
fn move_entries(&mut self, from_idx: usize, to_idx: usize, count: usize) -> usize {
let mut moved = 0;
let keys: Vec<String> = self.members[from_idx]
.hot_entries
.keys()
.take(count)
.cloned()
.collect();
for key in keys {
if let Some(entry) = self.members[from_idx].hot_entries.remove(&key) {
self.members[to_idx].hot_entries.insert(key.clone(), entry);
self.key_locality.insert(key, to_idx);
moved += 1;
}
}
if moved >= count {
return moved;
}
let remaining = count - moved;
let keys: Vec<String> = self.members[from_idx]
.warm_entries
.keys()
.take(remaining)
.cloned()
.collect();
for key in keys {
if let Some(entry) = self.members[from_idx].warm_entries.remove(&key) {
self.members[to_idx].warm_entries.insert(key.clone(), entry);
self.key_locality.insert(key, to_idx);
moved += 1;
}
}
moved
}
pub fn reinforce(&mut self, key: &str, amount: Signal) -> bool {
let target_idx = if let Some(&idx) = self.key_locality.get(key) {
idx
} else {
for (idx, member) in self.members.iter().enumerate() {
if Self::key_exists_in_member(member, key) {
return self.members[idx].reinforce(key, amount).unwrap_or(false);
}
}
return false;
};
if let Some(member) = self.members.get_mut(target_idx) {
member.reinforce(key, amount).unwrap_or(false)
} else {
false
}
}
pub fn weaken(&mut self, key: &str, amount: Signal) -> bool {
let target_idx = if let Some(&idx) = self.key_locality.get(key) {
idx
} else {
for (idx, member) in self.members.iter().enumerate() {
if Self::key_exists_in_member(member, key) {
return self.members[idx].weaken(key, amount).unwrap_or(false);
}
}
return false;
};
if let Some(member) = self.members.get_mut(target_idx) {
member.weaken(key, amount).unwrap_or(false)
} else {
false
}
}
pub fn apply_decay(&mut self) {
for member in &mut self.members {
member.apply_decay();
}
}
pub fn stats(&self) -> ColonyStats {
let member_stats: Vec<_> = self.members.iter().map(|m| m.stats()).collect();
ColonyStats {
member_count: self.members.len(),
total_entries: member_stats.iter().map(|s| s.total_keys).sum(),
hot_entries: member_stats.iter().map(|s| s.hot_entries).sum(),
warm_entries: member_stats.iter().map(|s| s.warm_entries).sum(),
cool_entries: member_stats.iter().map(|s| s.cool_entries).sum(),
cold_entries: member_stats.iter().map(|s| s.cold_entries).sum(),
split_count: self.metadata.split_count,
merge_count: self.metadata.merge_count,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ColonyStats {
pub member_count: usize,
pub total_entries: usize,
pub hot_entries: usize,
pub warm_entries: usize,
pub cool_entries: usize,
pub cold_entries: usize,
pub split_count: usize,
pub merge_count: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_colony_creation() {
let colony = ThermogramColony::new("test_colony", "Test Colony", PlasticityRule::stdp_like());
assert_eq!(colony.member_count(), 1);
assert_eq!(colony.total_entries(), 0);
}
#[test]
fn test_colony_write_read() {
let mut colony =
ThermogramColony::new("test_colony", "Test Colony", PlasticityRule::stdp_like());
let entry = ConsolidatedEntry {
key: "key1".to_string(),
value: vec![Signal::positive(1), Signal::positive(2), Signal::positive(3)],
strength: Signal::positive(204), updated_at: Utc::now(),
update_count: 1,
};
colony.write("key1", entry.clone(), ThermalState::Hot);
let read_entry = colony.read("key1");
assert!(read_entry.is_some());
assert_eq!(read_entry.unwrap().value, vec![Signal::positive(1), Signal::positive(2), Signal::positive(3)]);
}
#[test]
fn test_colony_locality() {
let mut colony =
ThermogramColony::new("test_colony", "Test Colony", PlasticityRule::stdp_like());
for i in 0..10 {
let entry = ConsolidatedEntry {
key: format!("group_a_{}", i),
value: vec![Signal::positive(i)],
strength: Signal::positive(204), updated_at: Utc::now(),
update_count: 1,
};
colony.write(&format!("group_a_{}", i), entry, ThermalState::Hot);
}
let stats = colony.stats();
assert_eq!(stats.member_count, 1);
assert_eq!(stats.total_entries, 10);
}
#[test]
fn test_colony_split() {
let config = ColonyConfig {
split_threshold: 5,
merge_threshold: 1,
max_members: 4,
..Default::default()
};
let mut colony = ThermogramColony::with_config(
"test_colony",
"Test Colony",
PlasticityRule::stdp_like(),
config,
);
for i in 0..10 {
let entry = ConsolidatedEntry {
key: format!("key_{}", i),
value: vec![Signal::positive(i)],
strength: Signal::positive(204), updated_at: Utc::now(),
update_count: 5,
};
colony.write(&format!("key_{}", i), entry, ThermalState::Hot);
}
let result = colony.consolidate().unwrap();
assert!(result.splits > 0 || colony.member_count() > 1);
}
#[test]
fn test_colony_reinforce_weaken() {
let mut colony =
ThermogramColony::new("test_colony", "Test Colony", PlasticityRule::stdp_like());
let entry = ConsolidatedEntry {
key: "key1".to_string(),
value: vec![Signal::positive(1), Signal::positive(2), Signal::positive(3)],
strength: Signal::positive(128), updated_at: Utc::now(),
update_count: 1,
};
colony.write("key1", entry, ThermalState::Hot);
assert!(colony.reinforce("key1", Signal::positive(51))); let read = colony.read("key1").unwrap();
assert!(read.strength.magnitude > 128);
assert!(colony.weaken("key1", Signal::positive(26))); }
}