use crate::delta::{Delta, DeltaType};
use crate::plasticity::PlasticityRule;
use crate::ternary::TernaryWeight;
use crate::error::Result;
use chrono::{DateTime, Utc, Duration};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsolidationPolicy {
pub triggers: Vec<ConsolidationTrigger>,
pub enable_pruning: bool,
pub enable_merging: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ConsolidationTrigger {
DeltaCount(usize),
TimePeriod { hours: u64 },
DirtySize { bytes: usize },
Manual,
}
impl ConsolidationPolicy {
pub fn default() -> Self {
Self {
triggers: vec![
ConsolidationTrigger::DeltaCount(1000),
ConsolidationTrigger::TimePeriod { hours: 24 },
],
enable_pruning: true,
enable_merging: true,
}
}
pub fn aggressive() -> Self {
Self {
triggers: vec![
ConsolidationTrigger::DeltaCount(100),
ConsolidationTrigger::TimePeriod { hours: 1 },
],
enable_pruning: true,
enable_merging: true,
}
}
pub fn conservative() -> Self {
Self {
triggers: vec![
ConsolidationTrigger::DeltaCount(10000),
ConsolidationTrigger::TimePeriod { hours: 168 }, ],
enable_pruning: false,
enable_merging: false,
}
}
pub fn should_consolidate(
&self,
delta_count: usize,
last_consolidation: &DateTime<Utc>,
dirty_size: usize,
) -> bool {
for trigger in &self.triggers {
match trigger {
ConsolidationTrigger::DeltaCount(n) => {
if delta_count >= *n {
return true;
}
}
ConsolidationTrigger::TimePeriod { hours } => {
let elapsed = Utc::now().signed_duration_since(*last_consolidation);
let threshold = Duration::hours(*hours as i64);
if elapsed >= threshold {
return true;
}
}
ConsolidationTrigger::DirtySize { bytes } => {
if dirty_size >= *bytes {
return true;
}
}
ConsolidationTrigger::Manual => {
}
}
}
false
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsolidatedEntry {
pub key: String,
pub value: Vec<u8>,
pub strength: f32,
#[serde(default)]
pub ternary_strength: Option<TernaryWeight>,
pub updated_at: DateTime<Utc>,
pub update_count: usize,
}
impl ConsolidatedEntry {
pub fn is_ternary(&self) -> bool {
self.ternary_strength.is_some()
}
pub fn effective_strength(&self) -> f32 {
if let Some(t) = self.ternary_strength {
t.to_f32()
} else {
self.strength
}
}
pub fn should_prune(&self, threshold: f32) -> bool {
if let Some(t) = self.ternary_strength {
t == TernaryWeight::Zero
} else {
self.strength < threshold
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsolidationResult {
pub deltas_processed: usize,
pub entries_after: usize,
pub entries_pruned: usize,
pub entries_merged: usize,
pub size_reduction: usize,
pub timestamp: DateTime<Utc>,
}
pub fn consolidate(
dirty_deltas: &[Delta],
clean_state: &HashMap<String, ConsolidatedEntry>,
plasticity_rule: &PlasticityRule,
policy: &ConsolidationPolicy,
) -> Result<(HashMap<String, ConsolidatedEntry>, ConsolidationResult)> {
let mut new_state = clean_state.clone();
let mut stats = ConsolidationResult {
deltas_processed: dirty_deltas.len(),
entries_after: 0,
entries_pruned: 0,
entries_merged: 0,
size_reduction: 0,
timestamp: Utc::now(),
};
for delta in dirty_deltas {
match delta.delta_type {
DeltaType::Create | DeltaType::Update => {
if let Some(existing) = new_state.get_mut(&delta.key) {
if delta.is_ternary() {
let current_ternary = existing
.ternary_strength
.unwrap_or(TernaryWeight::from_f32(existing.strength, 0.3));
let observed_ternary = delta.metadata.ternary_strength.unwrap();
let confidence = delta
.metadata
.observation_count
.map(|c| (c as f32 / 10.0).min(1.0))
.unwrap_or(0.5);
let new_ternary = plasticity_rule.apply_ternary_update(
current_ternary,
observed_ternary,
confidence,
);
existing.ternary_strength = Some(new_ternary);
existing.strength = new_ternary.to_f32();
} else {
let time_delta = delta
.metadata
.timestamp
.signed_duration_since(existing.updated_at)
.num_seconds() as f64;
let new_strength = plasticity_rule.apply_update(
existing.strength,
delta.metadata.strength,
time_delta,
);
existing.strength = new_strength;
}
existing.value = delta.value.clone();
existing.updated_at = delta.metadata.timestamp;
existing.update_count += 1;
} else {
new_state.insert(
delta.key.clone(),
ConsolidatedEntry {
key: delta.key.clone(),
value: delta.value.clone(),
strength: delta.metadata.strength,
ternary_strength: delta.metadata.ternary_strength,
updated_at: delta.metadata.timestamp,
update_count: 1,
},
);
}
}
DeltaType::Delete => {
new_state.remove(&delta.key);
}
DeltaType::Merge => {
if let Some(existing) = new_state.get_mut(&delta.key) {
if delta.is_ternary() {
let current_ternary = existing
.ternary_strength
.unwrap_or(TernaryWeight::from_f32(existing.strength, 0.3));
let observed_ternary = delta.metadata.ternary_strength.unwrap();
let merged = PlasticityRule::ternary_majority_vote(&[
current_ternary,
observed_ternary,
]);
existing.ternary_strength = Some(merged);
existing.strength = merged.to_f32();
} else {
let time_delta = delta
.metadata
.timestamp
.signed_duration_since(existing.updated_at)
.num_seconds() as f64;
let new_strength = plasticity_rule.apply_update(
existing.strength,
delta.metadata.strength,
time_delta,
);
existing.strength = new_strength;
}
existing.updated_at = delta.metadata.timestamp;
existing.update_count += 1;
stats.entries_merged += 1;
}
}
}
}
if policy.enable_pruning {
let before_count = new_state.len();
new_state.retain(|_, entry| {
!entry.should_prune(plasticity_rule.prune_threshold)
});
stats.entries_pruned = before_count - new_state.len();
}
stats.entries_after = new_state.len();
Ok((new_state, stats))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_consolidation_policy_delta_count() {
let policy = ConsolidationPolicy::default();
let last_consolidation = Utc::now();
assert!(!policy.should_consolidate(500, &last_consolidation, 0));
assert!(policy.should_consolidate(1000, &last_consolidation, 0));
}
#[test]
fn test_consolidation_policy_time() {
let policy = ConsolidationPolicy::default();
let last_consolidation = Utc::now() - Duration::hours(25);
assert!(policy.should_consolidate(0, &last_consolidation, 0));
}
#[test]
fn test_consolidate_create() {
let deltas = vec![
Delta::create("key1", b"value1".to_vec(), "source"),
];
let clean_state = HashMap::new();
let plasticity = PlasticityRule::stdp_like();
let policy = ConsolidationPolicy::default();
let (new_state, stats) = consolidate(&deltas, &clean_state, &plasticity, &policy).unwrap();
assert_eq!(new_state.len(), 1);
assert_eq!(stats.deltas_processed, 1);
assert_eq!(stats.entries_after, 1);
}
#[test]
fn test_consolidate_update() {
let mut clean_state = HashMap::new();
clean_state.insert(
"key1".to_string(),
ConsolidatedEntry {
key: "key1".to_string(),
value: b"old_value".to_vec(),
strength: 0.5,
ternary_strength: None,
updated_at: Utc::now(),
update_count: 1,
},
);
let deltas = vec![
Delta::update("key1", b"new_value".to_vec(), "source", 0.8, None),
];
let plasticity = PlasticityRule::stdp_like();
let policy = ConsolidationPolicy::default();
let (new_state, _) = consolidate(&deltas, &clean_state, &plasticity, &policy).unwrap();
let entry = new_state.get("key1").unwrap();
assert_eq!(entry.value, b"new_value");
assert!(entry.strength > 0.5); }
#[test]
fn test_consolidate_with_pruning() {
let mut clean_state = HashMap::new();
clean_state.insert(
"weak_key".to_string(),
ConsolidatedEntry {
key: "weak_key".to_string(),
value: b"value".to_vec(),
strength: 0.05, ternary_strength: None,
updated_at: Utc::now(),
update_count: 1,
},
);
let deltas = vec![];
let plasticity = PlasticityRule::stdp_like();
let policy = ConsolidationPolicy::default();
let (new_state, stats) = consolidate(&deltas, &clean_state, &plasticity, &policy).unwrap();
assert_eq!(new_state.len(), 0);
assert_eq!(stats.entries_pruned, 1);
}
}