hirn_engine/consolidation/
compactor.rs1use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::{Duration, Instant};
17
18use async_trait::async_trait;
19use hirn_core::HirnResult;
20use hirn_core::episodic::EpisodicRecord;
21use hirn_core::id::MemoryId;
22use hirn_storage::store::{CompactOptions, CompactResult};
23
24use crate::HirnDB;
25use crate::event::MemoryEvent;
26
27use super::{ConsolidateBuilder, ConsolidationConfig, ConsolidationResult};
28
29static COMPACTION_GENERATION: AtomicU64 = AtomicU64::new(0);
31
32const DEFAULT_SLOW_COMPACTION_SECS: u64 = 30;
34
35const COMPACTABLE_DATASETS: &[&str] = &[
37 "episodic",
38 "semantic",
39 "procedural",
40 "working",
41 "graph_nodes",
42 "graph_edges",
43 "svo_events",
44 "prospective_implications",
45 "topic_loom",
46 "mcfa_audit_log",
47];
48
49#[derive(Debug, Clone)]
51pub struct LifecycleCompactionResult {
52 pub fragments_removed: u64,
54 pub fragments_added: u64,
55 pub datasets_compacted: usize,
57 pub consolidation: Option<ConsolidationResult>,
59 pub memories_archived: usize,
61 pub compaction_generation: u64,
63 pub execution_time_ms: f64,
65}
66
67pub struct LifecycleCompactBuilder<'a> {
69 db: &'a HirnDB,
70 consolidation_config: Option<ConsolidationConfig>,
71 run_consolidation: bool,
72 run_archival: bool,
73 archive_age_secs: u64,
74 slow_threshold_secs: u64,
75 compact_options: CompactOptions,
76 agent_id: Option<String>,
77 llm: Option<Arc<dyn hirn_core::embed::LlmProvider>>,
78}
79
80#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
81struct CompactionTotals {
82 fragments_removed: u64,
83 fragments_added: u64,
84 datasets_compacted: usize,
85}
86
87#[async_trait]
88trait LifecycleCompactionStore: Send + Sync {
89 async fn exists(&self, dataset: &str) -> HirnResult<bool>;
90 async fn compact(&self, dataset: &str, opts: CompactOptions) -> HirnResult<CompactResult>;
91 async fn optimize_indices(&self, dataset: &str) -> HirnResult<()>;
92}
93
94#[async_trait]
95impl<T> LifecycleCompactionStore for T
96where
97 T: hirn_storage::PhysicalStore + ?Sized,
98{
99 async fn exists(&self, dataset: &str) -> HirnResult<bool> {
100 Ok(hirn_storage::PhysicalStore::exists(self, dataset).await?)
101 }
102
103 async fn compact(&self, dataset: &str, opts: CompactOptions) -> HirnResult<CompactResult> {
104 Ok(hirn_storage::PhysicalStore::compact(self, dataset, opts).await?)
105 }
106
107 async fn optimize_indices(&self, dataset: &str) -> HirnResult<()> {
108 Ok(hirn_storage::PhysicalStore::optimize_indices(self, dataset).await?)
109 }
110}
111
112#[async_trait]
113trait LifecycleArchivalRuntime: Send + Sync {
114 async fn list_episodes_for_archival(&self, limit: usize) -> HirnResult<Vec<EpisodicRecord>>;
115 async fn archive_episode_for_compaction(&self, id: MemoryId) -> HirnResult<()>;
116}
117
118#[async_trait]
119impl LifecycleArchivalRuntime for HirnDB {
120 async fn list_episodes_for_archival(&self, limit: usize) -> HirnResult<Vec<EpisodicRecord>> {
121 let filter = crate::db::EpisodicFilter {
122 include_archived: false,
123 limit: Some(limit),
124 ..Default::default()
125 };
126 self.list_episodes(&filter).await
127 }
128
129 async fn archive_episode_for_compaction(&self, id: MemoryId) -> HirnResult<()> {
130 self.archive_episode(id).await
131 }
132}
133
134impl<'a> LifecycleCompactBuilder<'a> {
135 pub(crate) fn new(db: &'a HirnDB) -> Self {
136 Self {
137 db,
138 consolidation_config: None,
139 run_consolidation: true,
140 run_archival: true,
141 archive_age_secs: 86_400 * 30, slow_threshold_secs: DEFAULT_SLOW_COMPACTION_SECS,
143 compact_options: CompactOptions::default(),
144 agent_id: None,
145 llm: None,
146 }
147 }
148
149 #[must_use]
151 pub const fn skip_consolidation(mut self) -> Self {
152 self.run_consolidation = false;
153 self
154 }
155
156 #[must_use]
158 pub const fn skip_archival(mut self) -> Self {
159 self.run_archival = false;
160 self
161 }
162
163 #[must_use]
165 pub const fn archive_age_secs(mut self, secs: u64) -> Self {
166 self.archive_age_secs = secs;
167 self
168 }
169
170 #[must_use]
172 pub const fn slow_threshold_secs(mut self, secs: u64) -> Self {
173 self.slow_threshold_secs = secs;
174 self
175 }
176
177 #[must_use]
179 pub fn consolidation_config(mut self, config: ConsolidationConfig) -> Self {
180 self.consolidation_config = Some(config);
181 self
182 }
183
184 #[must_use]
186 pub fn target_rows_per_fragment(mut self, rows: usize) -> Self {
187 self.compact_options.target_rows_per_fragment = Some(rows);
188 self
189 }
190
191 #[must_use]
193 pub fn agent_id(mut self, id: impl Into<String>) -> Self {
194 self.agent_id = Some(id.into());
195 self
196 }
197
198 #[must_use]
200 pub fn llm(mut self, llm: Arc<dyn hirn_core::embed::LlmProvider>) -> Self {
201 self.llm = Some(llm);
202 self
203 }
204
205 pub async fn execute(self) -> HirnResult<LifecycleCompactionResult> {
207 let start = Instant::now();
208
209 let agent = self.agent_id.as_deref().unwrap_or("anonymous");
211 self.db
212 .enforce(
213 agent,
214 crate::policy::Action::Consolidate,
215 &self.db.config().default_realm,
216 "",
217 )
218 .await?;
219
220 let CompactionTotals {
222 fragments_removed,
223 fragments_added,
224 datasets_compacted,
225 } = compact_all_datasets(self.db.storage_backend(), &self.compact_options).await?;
226
227 let consolidation = if self.run_consolidation {
229 let mut builder = ConsolidateBuilder::new(self.db);
230 if let Some(config) = self.consolidation_config {
231 builder = builder.config(config);
232 }
233 if let Some(llm) = self.llm {
234 builder = builder.llm(llm);
235 }
236 if let Some(ref aid) = self.agent_id {
237 builder = builder.agent_id(aid);
238 }
239 Some(builder.execute().await?)
240 } else {
241 None
242 };
243
244 let memories_archived = if self.run_archival {
246 archive_old_memories(self.db, self.archive_age_secs).await?
247 } else {
248 0
249 };
250
251 let generation = COMPACTION_GENERATION.fetch_add(1, Ordering::Relaxed);
253
254 let execution_time_ms = start.elapsed().as_secs_f64() * 1000.0;
255
256 if start.elapsed() > Duration::from_secs(self.slow_threshold_secs) {
258 tracing::warn!(
259 duration_ms = execution_time_ms,
260 "lifecycle compaction slow (> 30s)"
261 );
262 }
263
264 metrics::histogram!(crate::metrics::COMPACTION_DURATION_SECONDS)
265 .record(start.elapsed().as_secs_f64());
266 metrics::counter!(crate::metrics::COMPACTION_TOTAL).increment(1);
267 metrics::gauge!(crate::metrics::COMPACTION_FRAGMENTS_REMOVED).set(fragments_removed as f64);
268 metrics::gauge!(crate::metrics::COMPACTION_FRAGMENTS_ADDED).set(fragments_added as f64);
269 metrics::gauge!(crate::metrics::COMPACTION_DATASETS).set(datasets_compacted as f64);
270 metrics::gauge!(crate::metrics::COMPACTION_MEMORIES_ARCHIVED).set(memories_archived as f64);
271
272 self.db
273 .emit(MemoryEvent::CompactionCompleted {
274 before_seq: datasets_compacted as u64,
276 events_removed: fragments_removed,
277 })
278 .await;
279
280 Ok(LifecycleCompactionResult {
281 fragments_removed,
282 fragments_added,
283 datasets_compacted,
284 consolidation,
285 memories_archived,
286 compaction_generation: generation,
287 execution_time_ms,
288 })
289 }
290}
291
292async fn compact_all_datasets(
297 storage: &(impl LifecycleCompactionStore + ?Sized),
298 opts: &CompactOptions,
299) -> HirnResult<CompactionTotals> {
300 let mut total_removed = 0u64;
301 let mut total_added = 0u64;
302 let mut datasets_compacted = 0usize;
303
304 for &dataset in COMPACTABLE_DATASETS {
305 if !storage.exists(dataset).await? {
306 continue;
307 }
308
309 let result = storage.compact(dataset, opts.clone()).await?;
310 total_removed += result.fragments_removed;
311 total_added += result.fragments_added;
312 datasets_compacted += 1;
313
314 storage.optimize_indices(dataset).await?;
315 }
316
317 Ok(CompactionTotals {
318 fragments_removed: total_removed,
319 fragments_added: total_added,
320 datasets_compacted,
321 })
322}
323
324async fn archive_old_memories(
330 runtime: &(impl LifecycleArchivalRuntime + ?Sized),
331 age_secs: u64,
332) -> HirnResult<usize> {
333 let age_secs_i64 = i64::try_from(age_secs).unwrap_or(i64::MAX);
334 let cutoff = chrono::Utc::now() - chrono::Duration::seconds(age_secs_i64);
335 let cutoff_ts = hirn_core::timestamp::Timestamp::from_datetime(cutoff);
336
337 let episodes = runtime.list_episodes_for_archival(1000).await?;
338
339 let to_archive: Vec<_> = episodes
341 .iter()
342 .filter(|ep| ep.timestamp < cutoff_ts)
343 .map(|ep| ep.id)
344 .collect();
345
346 let mut archived = 0;
347 for id in &to_archive {
348 runtime.archive_episode_for_compaction(*id).await?;
349 archived += 1;
350 }
351
352 Ok(archived)
353}
354
355#[cfg(test)]
356mod tests {
357 use std::collections::HashSet;
358 use std::sync::Mutex;
359
360 use hirn_core::HirnError;
361 use hirn_core::timestamp::Timestamp;
362 use hirn_core::types::AgentId;
363
364 use super::*;
365
366 struct FakeCompactionStore {
367 existing: HashSet<&'static str>,
368 fail_compact: Option<&'static str>,
369 fail_optimize: Option<&'static str>,
370 optimized: Mutex<Vec<String>>,
371 }
372
373 #[async_trait]
374 impl LifecycleCompactionStore for FakeCompactionStore {
375 async fn exists(&self, dataset: &str) -> HirnResult<bool> {
376 Ok(self.existing.contains(dataset))
377 }
378
379 async fn compact(&self, dataset: &str, _opts: CompactOptions) -> HirnResult<CompactResult> {
380 if self.fail_compact == Some(dataset) {
381 return Err(HirnError::Unsupported(format!(
382 "simulated compaction failure for {dataset}"
383 )));
384 }
385 Ok(CompactResult {
386 fragments_removed: 2,
387 fragments_added: 1,
388 rows_removed: 0,
389 })
390 }
391
392 async fn optimize_indices(&self, dataset: &str) -> HirnResult<()> {
393 if self.fail_optimize == Some(dataset) {
394 return Err(HirnError::Unsupported(format!(
395 "simulated optimize failure for {dataset}"
396 )));
397 }
398 self.optimized.lock().unwrap().push(dataset.to_string());
399 Ok(())
400 }
401 }
402
403 struct FakeArchivalRuntime {
404 episodes: Vec<EpisodicRecord>,
405 fail_archive: Option<MemoryId>,
406 archived: Mutex<Vec<MemoryId>>,
407 }
408
409 #[async_trait]
410 impl LifecycleArchivalRuntime for FakeArchivalRuntime {
411 async fn list_episodes_for_archival(
412 &self,
413 _limit: usize,
414 ) -> HirnResult<Vec<EpisodicRecord>> {
415 Ok(self.episodes.clone())
416 }
417
418 async fn archive_episode_for_compaction(&self, id: MemoryId) -> HirnResult<()> {
419 if self.fail_archive == Some(id) {
420 return Err(HirnError::Unsupported(format!(
421 "simulated archival failure for {id}"
422 )));
423 }
424 self.archived.lock().unwrap().push(id);
425 Ok(())
426 }
427 }
428
429 fn old_episode(content: &str) -> EpisodicRecord {
430 EpisodicRecord::builder()
431 .content(content)
432 .embedding(vec![0.1])
433 .agent_id(AgentId::new("compactor_test").unwrap())
434 .timestamp(Timestamp::from_datetime(
435 chrono::Utc::now() - chrono::Duration::days(90),
436 ))
437 .build()
438 .unwrap()
439 }
440
441 #[tokio::test(flavor = "multi_thread")]
442 async fn compact_all_datasets_fails_on_compaction_error() {
443 let store = FakeCompactionStore {
444 existing: HashSet::from(["episodic"]),
445 fail_compact: Some("episodic"),
446 fail_optimize: None,
447 optimized: Mutex::new(Vec::new()),
448 };
449
450 let error = compact_all_datasets(&store, &CompactOptions::default())
451 .await
452 .expect_err("compaction failure should abort lifecycle compaction");
453 assert!(matches!(error, HirnError::Unsupported(_)));
454 assert!(store.optimized.lock().unwrap().is_empty());
455 }
456
457 #[tokio::test(flavor = "multi_thread")]
458 async fn compact_all_datasets_fails_on_optimize_error() {
459 let store = FakeCompactionStore {
460 existing: HashSet::from(["episodic"]),
461 fail_compact: None,
462 fail_optimize: Some("episodic"),
463 optimized: Mutex::new(Vec::new()),
464 };
465
466 let error = compact_all_datasets(&store, &CompactOptions::default())
467 .await
468 .expect_err("index optimization failure should abort lifecycle compaction");
469 assert!(matches!(error, HirnError::Unsupported(_)));
470 }
471
472 #[tokio::test(flavor = "multi_thread")]
473 async fn archive_old_memories_fails_on_archival_error() {
474 let first = old_episode("first-old");
475 let second = old_episode("second-old");
476 let runtime = FakeArchivalRuntime {
477 fail_archive: Some(first.id),
478 episodes: vec![first.clone(), second],
479 archived: Mutex::new(Vec::new()),
480 };
481
482 let error = archive_old_memories(&runtime, 0)
483 .await
484 .expect_err("archival failure should abort lifecycle compaction");
485 assert!(matches!(error, HirnError::Unsupported(_)));
486 assert!(runtime.archived.lock().unwrap().is_empty());
487 }
488}