use std::collections::{HashMap, HashSet};
use std::time::Instant;
use hirn_core::embed::{ChatMessage, LlmOptions, LlmProvider};
use hirn_core::episodic::EpisodicRecord;
use hirn_core::id::MemoryId;
use hirn_core::metadata::Metadata;
use hirn_core::provenance::Mutation;
use hirn_core::semantic::SemanticRecord;
use hirn_core::timestamp::Timestamp;
use hirn_core::types::{AgentId, EdgeRelation, KnowledgeType, MutationTrigger, Origin};
use hirn_core::{HirnError, HirnResult};
use crate::HirnDB;
mod community;
mod compactor;
mod concept;
mod dream_cycle;
mod evolution;
mod forgetting;
mod narrative;
mod pattern;
mod pipeline;
mod raptor;
mod reconsolidation;
mod scheduler;
mod segmentation;
pub use community::*;
pub use compactor::*;
pub use concept::*;
pub use dream_cycle::*;
pub use evolution::*;
pub use forgetting::*;
pub use narrative::*;
pub use pattern::*;
pub use pipeline::*;
pub use raptor::*;
pub use reconsolidation::*;
pub use scheduler::*;
pub use segmentation::*;
#[derive(Debug, Clone)]
pub struct ConsolidationConfig {
pub topic_similarity_threshold: f32,
pub surprise_threshold: f32,
pub temporal_gap_seconds: i64,
pub segmentation_lookback: usize,
pub segmentation_gamma: f64,
pub min_pattern_frequency: usize,
pub archive_after_consolidation: bool,
pub thread_similarity_threshold: f32,
pub reconsolidation_window_secs: u64,
pub decay_rate_override: Option<f64>,
pub edge_prune_threshold: f32,
pub spaced_repetition_alpha: f64,
pub working_to_episodic_threshold: f32,
pub consolidation_batch_size: usize,
pub raptor_enabled: bool,
pub raptor_max_levels: usize,
pub raptor_cluster_size: usize,
pub raptor_summary_max_tokens: usize,
pub raptor_min_cluster_input: usize,
pub raptor_min_cluster_size: usize,
pub llm_timeout: std::time::Duration,
pub total_consolidation_timeout: std::time::Duration,
}
impl Default for ConsolidationConfig {
fn default() -> Self {
Self {
topic_similarity_threshold: 0.3,
surprise_threshold: 0.8,
temporal_gap_seconds: 3600,
segmentation_lookback: 20,
segmentation_gamma: 1.5,
min_pattern_frequency: 3,
archive_after_consolidation: false,
thread_similarity_threshold: 0.3,
reconsolidation_window_secs: 3600,
decay_rate_override: None,
edge_prune_threshold: 0.05,
spaced_repetition_alpha: 0.5,
working_to_episodic_threshold: 0.3,
consolidation_batch_size: 1_000,
raptor_enabled: false,
raptor_max_levels: 3,
raptor_cluster_size: 5,
raptor_summary_max_tokens: 256,
raptor_min_cluster_input: 3,
raptor_min_cluster_size: 3,
llm_timeout: std::time::Duration::from_secs(10),
total_consolidation_timeout: std::time::Duration::from_secs(300),
}
}
}
impl ConsolidationConfig {
pub fn validate(&self) -> HirnResult<()> {
fn invalid(field: &str, value: impl std::fmt::Display, reason: &str) -> HirnError {
HirnError::InvalidConfig {
field: field.to_string(),
value: value.to_string(),
reason: reason.to_string(),
}
}
if self.raptor_cluster_size < 2 {
return Err(invalid(
"raptor_cluster_size",
self.raptor_cluster_size,
"must be ≥ 2",
));
}
if self.segmentation_gamma <= 0.0 || self.segmentation_gamma > 1e6 {
return Err(invalid(
"segmentation_gamma",
self.segmentation_gamma,
"must be > 0.0",
));
}
if self.raptor_min_cluster_input < 3 {
return Err(invalid(
"raptor_min_cluster_input",
self.raptor_min_cluster_input,
"must be ≥ 3",
));
}
if self.raptor_min_cluster_size < 1 {
return Err(invalid(
"raptor_min_cluster_size",
self.raptor_min_cluster_size,
"must be ≥ 1",
));
}
if self.llm_timeout.is_zero() {
return Err(invalid("llm_timeout", "0", "must be > 0"));
}
if self.total_consolidation_timeout.is_zero() {
return Err(invalid("total_consolidation_timeout", "0", "must be > 0"));
}
if self.topic_similarity_threshold < 0.0 {
return Err(invalid(
"topic_similarity_threshold",
self.topic_similarity_threshold,
"must be ≥ 0.0",
));
}
if self.surprise_threshold < 0.0 {
return Err(invalid(
"surprise_threshold",
self.surprise_threshold,
"must be ≥ 0.0",
));
}
if self.thread_similarity_threshold < 0.0 {
return Err(invalid(
"thread_similarity_threshold",
self.thread_similarity_threshold,
"must be ≥ 0.0",
));
}
if self.edge_prune_threshold < 0.0 {
return Err(invalid(
"edge_prune_threshold",
self.edge_prune_threshold,
"must be ≥ 0.0",
));
}
if self.spaced_repetition_alpha < 0.0 {
return Err(invalid(
"spaced_repetition_alpha",
self.spaced_repetition_alpha,
"must be ≥ 0.0",
));
}
if self.working_to_episodic_threshold < 0.0 {
return Err(invalid(
"working_to_episodic_threshold",
self.working_to_episodic_threshold,
"must be ≥ 0.0",
));
}
Ok(())
}
}
pub(crate) async fn generate_text_with_timeout(
llm: &dyn LlmProvider,
messages: &[ChatMessage],
options: &LlmOptions,
timeout: std::time::Duration,
) -> HirnResult<String> {
match tokio::time::timeout(timeout, llm.generate_text(messages, options)).await {
Ok(result) => result,
Err(_elapsed) => Err(HirnError::Timeout(format!(
"LLM call timed out after {}s",
timeout.as_secs()
))),
}
}
pub async fn apply_retrieval_effects(
storage: std::sync::Arc<dyn hirn_storage::PhysicalStore>,
retrieved_ids: Vec<MemoryId>,
) -> HirnResult<()> {
if retrieved_ids.is_empty() {
return Ok(());
}
let in_list: String = retrieved_ids
.iter()
.map(|id| {
let escaped = id.to_string().replace('\'', "''");
format!("'{escaped}'")
})
.collect::<Vec<_>>()
.join(", ");
let filter = format!("id IN ({in_list})");
storage
.update_where(
hirn_storage::datasets::episodic::DATASET_NAME,
&filter,
&[("importance", "LEAST(importance + 0.01, 1.0)")],
)
.await
.map_err(|e| HirnError::storage(format!("apply_retrieval_effects: {e}")))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use hirn_core::episodic::EpisodicRecord;
use hirn_core::types::EventType;
fn agent() -> AgentId {
AgentId::new("test").unwrap()
}
fn make_episode(content: &str, importance: f32, surprise: f32) -> EpisodicRecord {
EpisodicRecord::builder()
.event_type(EventType::Observation)
.content(content)
.summary(content)
.importance(importance)
.surprise(surprise)
.agent_id(agent())
.build()
.unwrap()
}
fn make_episode_with_embedding(
content: &str,
embedding: Vec<f32>,
importance: f32,
surprise: f32,
) -> EpisodicRecord {
EpisodicRecord::builder()
.event_type(EventType::Observation)
.content(content)
.summary(content)
.importance(importance)
.surprise(surprise)
.agent_id(agent())
.embedding(embedding)
.build()
.unwrap()
}
fn make_episode_with_entity(
content: &str,
entity: &str,
embedding: Vec<f32>,
) -> EpisodicRecord {
EpisodicRecord::builder()
.event_type(EventType::Observation)
.content(content)
.summary(content)
.importance(0.5)
.agent_id(agent())
.embedding(embedding)
.entity(entity, "topic")
.build()
.unwrap()
}
fn embedding_a() -> Vec<f32> {
let mut v = vec![0.0f32; 768];
v[0] = 1.0;
v
}
fn embedding_b() -> Vec<f32> {
let mut v = vec![0.0f32; 768];
v[1] = 1.0;
v
}
fn _embedding_a_ish() -> Vec<f32> {
let mut v = vec![0.0f32; 768];
v[0] = 0.95;
v[1] = 0.05;
let norm = (v[0] * v[0] + v[1] * v[1]).sqrt();
v[0] /= norm;
v[1] /= norm;
v
}
#[test]
fn segment_empty_records() {
let config = ConsolidationConfig::default();
let segments = segment_episodes(&[], &config);
assert_eq!(segments.len(), 0);
}
#[test]
fn segment_single_record() {
let config = ConsolidationConfig::default();
let records = vec![make_episode("hello", 0.5, 0.1)];
let segments = segment_episodes(&records, &config);
assert_eq!(segments.len(), 1);
assert_eq!(segments[0].records.len(), 1);
}
#[test]
fn segment_by_topic_shift() {
let config = ConsolidationConfig {
topic_similarity_threshold: 0.3,
..Default::default()
};
let mut records = Vec::new();
for i in 0..5 {
records.push(make_episode_with_embedding(
&format!("topic A record {i}"),
embedding_a(),
0.5,
0.1,
));
}
for i in 0..5 {
records.push(make_episode_with_embedding(
&format!("topic B record {i}"),
embedding_b(),
0.5,
0.1,
));
}
let segments = segment_episodes(&records, &config);
assert_eq!(segments.len(), 2);
assert_eq!(segments[0].records.len(), 5);
assert_eq!(segments[1].records.len(), 5);
}
#[test]
fn segment_by_surprise_spike() {
let config = ConsolidationConfig {
surprise_threshold: 0.8,
topic_similarity_threshold: 1.0, temporal_gap_seconds: i64::MAX, ..Default::default()
};
let records = vec![
make_episode("a", 0.5, 0.1),
make_episode("b", 0.5, 0.1),
make_episode("c", 0.5, 0.1),
make_episode("d", 0.5, 0.1),
make_episode("e", 0.5, 0.1),
make_episode("SURPRISE!", 0.5, 0.95),
make_episode("f", 0.5, 0.1),
make_episode("g", 0.5, 0.1),
make_episode("h", 0.5, 0.1),
make_episode("i", 0.5, 0.1),
make_episode("j", 0.5, 0.1),
];
let segments = segment_episodes(&records, &config);
assert_eq!(segments.len(), 2);
assert_eq!(segments[0].records.len(), 5);
assert_eq!(segments[1].records.len(), 6); }
#[test]
fn segment_threshold_zero_adaptive_constant_surprise() {
let config = ConsolidationConfig {
surprise_threshold: 0.0,
topic_similarity_threshold: 1.0,
temporal_gap_seconds: i64::MAX,
..Default::default()
};
let records: Vec<_> = (0..5)
.map(|i| make_episode(&format!("rec {i}"), 0.5, 0.5))
.collect();
let segments = segment_episodes(&records, &config);
assert_eq!(segments.len(), 2);
}
#[test]
fn segment_threshold_one_one_segment() {
let config = ConsolidationConfig {
surprise_threshold: 1.0,
topic_similarity_threshold: 1.0,
temporal_gap_seconds: i64::MAX,
..Default::default()
};
let records: Vec<_> = (0..10)
.map(|i| make_episode(&format!("rec {i}"), 0.5, 0.5))
.collect();
let segments = segment_episodes(&records, &config);
assert_eq!(segments.len(), 1);
}
#[test]
fn detect_entity_frequency_pattern() {
let config = ConsolidationConfig {
min_pattern_frequency: 3,
..Default::default()
};
let mut segments = Vec::new();
for i in 0..10 {
let rec = make_episode_with_entity(
&format!("HNSW related record {i}"),
"HNSW",
embedding_a(),
);
segments.push(EpisodeSegment::from_records(vec![rec]).unwrap());
}
let patterns = detect_entity_patterns(&segments, &config);
assert!(!patterns.is_empty());
let hnsw_pattern = patterns
.iter()
.find(|p| p.entities.contains(&"HNSW".to_string()));
assert!(hnsw_pattern.is_some());
assert_eq!(hnsw_pattern.unwrap().frequency, 10);
}
#[test]
fn detect_entity_cooccurrence() {
let config = ConsolidationConfig {
min_pattern_frequency: 3,
..Default::default()
};
let mut segments = Vec::new();
for i in 0..5 {
let rec = EpisodicRecord::builder()
.event_type(EventType::Observation)
.content(format!("HNSW and vector together {i}"))
.summary(format!("hnsw+vector {i}"))
.importance(0.5)
.agent_id(agent())
.embedding(embedding_a())
.entity("HNSW", "topic")
.entity("vector", "topic")
.build()
.unwrap();
segments.push(EpisodeSegment::from_records(vec![rec]).unwrap());
}
let patterns = detect_entity_patterns(&segments, &config);
let cooccurrence = patterns.iter().find(|p| {
p.entities.contains(&"HNSW".to_string()) && p.entities.contains(&"vector".to_string())
});
assert!(cooccurrence.is_some());
}
#[test]
fn single_entity_not_a_pattern() {
let config = ConsolidationConfig {
min_pattern_frequency: 3,
..Default::default()
};
let rec = make_episode_with_entity("rare entity", "rare_thing", embedding_a());
let segments = vec![EpisodeSegment::from_records(vec![rec]).unwrap()];
let patterns = detect_entity_patterns(&segments, &config);
let rare = patterns
.iter()
.find(|p| p.entities.contains(&"rare_thing".to_string()));
assert!(rare.is_none());
}
#[test]
fn two_distinct_topics_two_threads() {
let config = ConsolidationConfig {
thread_similarity_threshold: 0.3,
..Default::default()
};
let patterns = DetectedPatterns {
entity_patterns: Vec::new(),
temporal_patterns: Vec::new(),
causal_patterns: Vec::new(),
};
let mut segments = Vec::new();
for i in 0..5 {
segments.push(
EpisodeSegment::from_records(vec![make_episode_with_entity(
&format!("HNSW work {i}"),
"HNSW",
embedding_a(),
)])
.unwrap(),
);
}
for i in 0..5 {
segments.push(
EpisodeSegment::from_records(vec![make_episode_with_entity(
&format!("deployment work {i}"),
"deployment",
embedding_b(),
)])
.unwrap(),
);
}
let threads = form_narrative_threads(&segments, &patterns, &config);
assert_eq!(threads.len(), 2);
}
#[test]
fn single_segment_creates_single_thread() {
let config = ConsolidationConfig::default();
let patterns = DetectedPatterns {
entity_patterns: Vec::new(),
temporal_patterns: Vec::new(),
causal_patterns: Vec::new(),
};
let segments =
vec![EpisodeSegment::from_records(vec![make_episode("single", 0.5, 0.1)]).unwrap()];
let threads = form_narrative_threads(&segments, &patterns, &config);
assert_eq!(threads.len(), 1);
}
#[test]
fn thread_titles_contain_entities() {
let config = ConsolidationConfig {
thread_similarity_threshold: 0.0, ..Default::default()
};
let patterns = DetectedPatterns {
entity_patterns: Vec::new(),
temporal_patterns: Vec::new(),
causal_patterns: Vec::new(),
};
let segments = vec![
EpisodeSegment::from_records(vec![make_episode_with_entity(
"HNSW work",
"HNSW",
embedding_a(),
)])
.unwrap(),
];
let threads = form_narrative_threads(&segments, &patterns, &config);
assert_eq!(threads.len(), 1);
assert!(threads[0].title.contains("HNSW"));
}
#[test]
fn concept_confidence_scales_with_evidence() {
let thread_small = NarrativeThread {
title: "small".to_string(),
segment_indices: vec![0],
record_ids: vec![MemoryId::new()],
contents: vec!["one episode".to_string()],
summaries: vec!["one".to_string()],
start_time: Timestamp::now(),
end_time: Timestamp::now(),
entities: vec!["test".to_string()],
sub_threads: Vec::new(),
embedding: None,
};
let thread_large = NarrativeThread {
title: "large".to_string(),
segment_indices: vec![0, 1, 2, 3, 4],
record_ids: (0..10).map(|_| MemoryId::new()).collect(),
contents: (0..10).map(|i| format!("episode {i}")).collect(),
summaries: (0..10).map(|i| format!("summary {i}")).collect(),
start_time: Timestamp::now(),
end_time: Timestamp::now(),
entities: vec!["test".to_string()],
sub_threads: Vec::new(),
embedding: None,
};
let small_confidence = match thread_small.record_ids.len() {
1 => 0.3f32,
2..=3 => 0.5,
4..=7 => 0.7,
_ => 0.85,
};
let large_confidence = match thread_large.record_ids.len() {
1 => 0.3f32,
2..=3 => 0.5,
4..=7 => 0.7,
_ => 0.85,
};
assert!(large_confidence > small_confidence);
}
#[test]
fn concept_extraction_deterministic() {
let thread = NarrativeThread {
title: "test topic".to_string(),
segment_indices: vec![0],
record_ids: vec![MemoryId::new()],
contents: vec!["some content about testing".to_string()],
summaries: vec!["testing content".to_string()],
start_time: Timestamp::now(),
end_time: Timestamp::now(),
entities: vec!["testing".to_string()],
sub_threads: Vec::new(),
embedding: None,
};
let desc1 = build_thread_description(&thread);
let desc2 = build_thread_description(&thread);
assert_eq!(desc1, desc2);
}
#[test]
fn reconsolidation_window_lifecycle() {
let tracker = ReconsolidationTracker::new();
let id = MemoryId::new();
assert!(!tracker.is_labile(id));
tracker.open_window(id, 60);
assert!(tracker.is_labile(id));
tracker.close_window(id);
assert!(!tracker.is_labile(id));
}
#[test]
fn reconsolidation_window_expired() {
let tracker = ReconsolidationTracker::new();
let id = MemoryId::new();
tracker.open_window(id, 0);
assert!(!tracker.is_labile(id));
}
#[test]
fn episode_filter_importance() {
let ep = make_episode("test", 0.7, 0.1);
let filter_gt = WhereFilter {
field: "importance".to_string(),
op: FilterOp::Gt,
value: 0.5,
};
assert!(episode_matches_filter(&ep, &filter_gt));
let filter_lt = WhereFilter {
field: "importance".to_string(),
op: FilterOp::Lt,
value: 0.5,
};
assert!(!episode_matches_filter(&ep, &filter_lt));
}
#[test]
fn infer_prescriptive_knowledge() {
let thread = NarrativeThread {
title: "deploy guide".to_string(),
segment_indices: vec![0],
record_ids: vec![MemoryId::new()],
contents: vec![
"You should always configure the timeout. You must set up the retry policy. Best practice is to deploy in stages.".to_string(),
],
summaries: vec!["deployment guide".to_string()],
start_time: Timestamp::now(),
end_time: Timestamp::now(),
entities: vec![],
sub_threads: Vec::new(),
embedding: None,
};
assert_eq!(infer_knowledge_type(&thread), KnowledgeType::Prescriptive);
}
#[test]
fn infer_propositional_knowledge() {
let thread = NarrativeThread {
title: "experiment results".to_string(),
segment_indices: vec![0],
record_ids: vec![MemoryId::new()],
contents: vec![
"The benchmark showed 50ms latency. Vector search returned relevant results."
.to_string(),
],
summaries: vec!["results".to_string()],
start_time: Timestamp::now(),
end_time: Timestamp::now(),
entities: vec![],
sub_threads: Vec::new(),
embedding: None,
};
assert_eq!(infer_knowledge_type(&thread), KnowledgeType::Propositional);
}
#[test]
fn validate_cluster_size_zero_is_invalid() {
let mut cfg = ConsolidationConfig::default();
cfg.raptor_cluster_size = 0;
let err = cfg.validate().unwrap_err();
match err {
HirnError::InvalidConfig { field, .. } => {
assert_eq!(field, "raptor_cluster_size");
}
other => panic!("expected InvalidConfig, got: {other}"),
}
}
#[test]
fn validate_gamma_zero_is_invalid() {
let mut cfg = ConsolidationConfig::default();
cfg.segmentation_gamma = 0.0;
let err = cfg.validate().unwrap_err();
match err {
HirnError::InvalidConfig { field, .. } => {
assert_eq!(field, "segmentation_gamma");
}
other => panic!("expected InvalidConfig, got: {other}"),
}
}
#[test]
fn validate_default_config_is_valid() {
ConsolidationConfig::default().validate().unwrap();
}
}