Skip to main content

hirn_engine/consolidation/
compactor.rs

1//! Lifecycle-aware compaction — fragment merge + consolidation + archival +
2//! provenance in a single orchestrated pass.
3//!
4//! The [`LifecycleCompactor`] runs the four lifecycle phases in sequence:
5//!
6//! 1. **Fragment merge** — Lance fragment compaction across all datasets.
7//! 2. **Consolidation** — episodic → semantic summarization (existing pipeline).
8//! 3. **Archival** — old memories past archive threshold get reduced confidence.
9//! 4. **Provenance** — stamps `compacted_at` generation, preserves edges.
10//!
11//! Use [`LifecycleCompactBuilder`] via `db.lifecycle_compact()` to configure
12//! and execute.
13
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::{Duration, Instant};
17
18use async_trait::async_trait;
19use hirn_core::HirnResult;
20use hirn_core::episodic::EpisodicRecord;
21use hirn_core::id::MemoryId;
22use hirn_storage::store::{CompactOptions, CompactResult};
23
24use crate::HirnDB;
25use crate::event::MemoryEvent;
26
27use super::{ConsolidateBuilder, ConsolidationConfig, ConsolidationResult};
28
29/// Monotonically increasing compaction generation counter.
30static COMPACTION_GENERATION: AtomicU64 = AtomicU64::new(0);
31
32/// Default threshold above which a compaction is considered slow (30 seconds).
33const DEFAULT_SLOW_COMPACTION_SECS: u64 = 30;
34
35/// All datasets subject to fragment compaction.
36const COMPACTABLE_DATASETS: &[&str] = &[
37    "episodic",
38    "semantic",
39    "procedural",
40    "working",
41    "graph_nodes",
42    "graph_edges",
43    "svo_events",
44    "prospective_implications",
45    "topic_loom",
46    "mcfa_audit_log",
47];
48
49/// Result of a full lifecycle compaction pass.
50#[derive(Debug, Clone)]
51pub struct LifecycleCompactionResult {
52    /// Per-dataset fragment merge results.
53    pub fragments_removed: u64,
54    pub fragments_added: u64,
55    /// Number of datasets that were compacted.
56    pub datasets_compacted: usize,
57    /// Consolidation sub-result (None if consolidation was skipped).
58    pub consolidation: Option<ConsolidationResult>,
59    /// Number of memories archived in the archival phase.
60    pub memories_archived: usize,
61    /// Monotonically increasing compaction generation counter.
62    pub compaction_generation: u64,
63    /// Total execution time in milliseconds.
64    pub execution_time_ms: f64,
65}
66
67/// Builder for lifecycle compaction.
68pub struct LifecycleCompactBuilder<'a> {
69    db: &'a HirnDB,
70    consolidation_config: Option<ConsolidationConfig>,
71    run_consolidation: bool,
72    run_archival: bool,
73    archive_age_secs: u64,
74    slow_threshold_secs: u64,
75    compact_options: CompactOptions,
76    agent_id: Option<String>,
77    llm: Option<Arc<dyn hirn_core::embed::LlmProvider>>,
78}
79
80#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
81struct CompactionTotals {
82    fragments_removed: u64,
83    fragments_added: u64,
84    datasets_compacted: usize,
85}
86
87#[async_trait]
88trait LifecycleCompactionStore: Send + Sync {
89    async fn exists(&self, dataset: &str) -> HirnResult<bool>;
90    async fn compact(&self, dataset: &str, opts: CompactOptions) -> HirnResult<CompactResult>;
91    async fn optimize_indices(&self, dataset: &str) -> HirnResult<()>;
92}
93
94#[async_trait]
95impl<T> LifecycleCompactionStore for T
96where
97    T: hirn_storage::PhysicalStore + ?Sized,
98{
99    async fn exists(&self, dataset: &str) -> HirnResult<bool> {
100        Ok(hirn_storage::PhysicalStore::exists(self, dataset).await?)
101    }
102
103    async fn compact(&self, dataset: &str, opts: CompactOptions) -> HirnResult<CompactResult> {
104        Ok(hirn_storage::PhysicalStore::compact(self, dataset, opts).await?)
105    }
106
107    async fn optimize_indices(&self, dataset: &str) -> HirnResult<()> {
108        Ok(hirn_storage::PhysicalStore::optimize_indices(self, dataset).await?)
109    }
110}
111
112#[async_trait]
113trait LifecycleArchivalRuntime: Send + Sync {
114    async fn list_episodes_for_archival(&self, limit: usize) -> HirnResult<Vec<EpisodicRecord>>;
115    async fn archive_episode_for_compaction(&self, id: MemoryId) -> HirnResult<()>;
116}
117
118#[async_trait]
119impl LifecycleArchivalRuntime for HirnDB {
120    async fn list_episodes_for_archival(&self, limit: usize) -> HirnResult<Vec<EpisodicRecord>> {
121        let filter = crate::db::EpisodicFilter {
122            include_archived: false,
123            limit: Some(limit),
124            ..Default::default()
125        };
126        self.list_episodes(&filter).await
127    }
128
129    async fn archive_episode_for_compaction(&self, id: MemoryId) -> HirnResult<()> {
130        self.archive_episode(id).await
131    }
132}
133
134impl<'a> LifecycleCompactBuilder<'a> {
135    pub(crate) fn new(db: &'a HirnDB) -> Self {
136        Self {
137            db,
138            consolidation_config: None,
139            run_consolidation: true,
140            run_archival: true,
141            archive_age_secs: 86_400 * 30, // 30 days default
142            slow_threshold_secs: DEFAULT_SLOW_COMPACTION_SECS,
143            compact_options: CompactOptions::default(),
144            agent_id: None,
145            llm: None,
146        }
147    }
148
149    /// Skip the consolidation phase.
150    #[must_use]
151    pub const fn skip_consolidation(mut self) -> Self {
152        self.run_consolidation = false;
153        self
154    }
155
156    /// Skip the archival phase.
157    #[must_use]
158    pub const fn skip_archival(mut self) -> Self {
159        self.run_archival = false;
160        self
161    }
162
163    /// Set the age threshold for archival (in seconds). Default: 30 days.
164    #[must_use]
165    pub const fn archive_age_secs(mut self, secs: u64) -> Self {
166        self.archive_age_secs = secs;
167        self
168    }
169
170    /// Set the slow-compaction warning threshold (in seconds). Default: 30.
171    #[must_use]
172    pub const fn slow_threshold_secs(mut self, secs: u64) -> Self {
173        self.slow_threshold_secs = secs;
174        self
175    }
176
177    /// Set a custom consolidation config.
178    #[must_use]
179    pub fn consolidation_config(mut self, config: ConsolidationConfig) -> Self {
180        self.consolidation_config = Some(config);
181        self
182    }
183
184    /// Set target rows per fragment for Lance compaction.
185    #[must_use]
186    pub fn target_rows_per_fragment(mut self, rows: usize) -> Self {
187        self.compact_options.target_rows_per_fragment = Some(rows);
188        self
189    }
190
191    /// Set the agent ID for Cedar policy enforcement.
192    #[must_use]
193    pub fn agent_id(mut self, id: impl Into<String>) -> Self {
194        self.agent_id = Some(id.into());
195        self
196    }
197
198    /// Set an LLM provider for consolidation.
199    #[must_use]
200    pub fn llm(mut self, llm: Arc<dyn hirn_core::embed::LlmProvider>) -> Self {
201        self.llm = Some(llm);
202        self
203    }
204
205    /// Execute the full lifecycle compaction pass.
206    pub async fn execute(self) -> HirnResult<LifecycleCompactionResult> {
207        let start = Instant::now();
208
209        // Cedar enforcement.
210        let agent = self.agent_id.as_deref().unwrap_or("anonymous");
211        self.db
212            .enforce(
213                agent,
214                crate::policy::Action::Consolidate,
215                &self.db.config().default_realm,
216                "",
217            )
218            .await?;
219
220        // Phase 1: Fragment merge across all datasets.
221        let CompactionTotals {
222            fragments_removed,
223            fragments_added,
224            datasets_compacted,
225        } = compact_all_datasets(self.db.storage_backend(), &self.compact_options).await?;
226
227        // Phase 2: Consolidation (optional).
228        let consolidation = if self.run_consolidation {
229            let mut builder = ConsolidateBuilder::new(self.db);
230            if let Some(config) = self.consolidation_config {
231                builder = builder.config(config);
232            }
233            if let Some(llm) = self.llm {
234                builder = builder.llm(llm);
235            }
236            if let Some(ref aid) = self.agent_id {
237                builder = builder.agent_id(aid);
238            }
239            Some(builder.execute().await?)
240        } else {
241            None
242        };
243
244        // Phase 3: Archival — archive old episodic memories.
245        let memories_archived = if self.run_archival {
246            archive_old_memories(self.db, self.archive_age_secs).await?
247        } else {
248            0
249        };
250
251        // Phase 4: Provenance — stamp monotonic compaction generation.
252        let generation = COMPACTION_GENERATION.fetch_add(1, Ordering::Relaxed);
253
254        let execution_time_ms = start.elapsed().as_secs_f64() * 1000.0;
255
256        // Emit diagnostic event.
257        if start.elapsed() > Duration::from_secs(self.slow_threshold_secs) {
258            tracing::warn!(
259                duration_ms = execution_time_ms,
260                "lifecycle compaction slow (> 30s)"
261            );
262        }
263
264        metrics::histogram!(crate::metrics::COMPACTION_DURATION_SECONDS)
265            .record(start.elapsed().as_secs_f64());
266        metrics::counter!(crate::metrics::COMPACTION_TOTAL).increment(1);
267        metrics::gauge!(crate::metrics::COMPACTION_FRAGMENTS_REMOVED).set(fragments_removed as f64);
268        metrics::gauge!(crate::metrics::COMPACTION_FRAGMENTS_ADDED).set(fragments_added as f64);
269        metrics::gauge!(crate::metrics::COMPACTION_DATASETS).set(datasets_compacted as f64);
270        metrics::gauge!(crate::metrics::COMPACTION_MEMORIES_ARCHIVED).set(memories_archived as f64);
271
272        self.db
273            .emit(MemoryEvent::CompactionCompleted {
274                // For fragment compaction, before_seq = datasets compacted (not event seq).
275                before_seq: datasets_compacted as u64,
276                events_removed: fragments_removed,
277            })
278            .await;
279
280        Ok(LifecycleCompactionResult {
281            fragments_removed,
282            fragments_added,
283            datasets_compacted,
284            consolidation,
285            memories_archived,
286            compaction_generation: generation,
287            execution_time_ms,
288        })
289    }
290}
291
292/// Compact all datasets.
293///
294/// Fails if dataset existence checks, fragment compaction, or index optimization fail,
295/// so lifecycle compaction cannot report success ahead of the underlying storage state.
296async fn compact_all_datasets(
297    storage: &(impl LifecycleCompactionStore + ?Sized),
298    opts: &CompactOptions,
299) -> HirnResult<CompactionTotals> {
300    let mut total_removed = 0u64;
301    let mut total_added = 0u64;
302    let mut datasets_compacted = 0usize;
303
304    for &dataset in COMPACTABLE_DATASETS {
305        if !storage.exists(dataset).await? {
306            continue;
307        }
308
309        let result = storage.compact(dataset, opts.clone()).await?;
310        total_removed += result.fragments_removed;
311        total_added += result.fragments_added;
312        datasets_compacted += 1;
313
314        storage.optimize_indices(dataset).await?;
315    }
316
317    Ok(CompactionTotals {
318        fragments_removed: total_removed,
319        fragments_added: total_added,
320        datasets_compacted,
321    })
322}
323
324/// Archive episodic memories older than `age_secs`.
325///
326/// Iterates in bounded batches (1000 per round) to avoid unbounded memory use.
327/// Fails if listing or archival fails so lifecycle compaction cannot emit a
328/// success result ahead of the archival state.
329async fn archive_old_memories(
330    runtime: &(impl LifecycleArchivalRuntime + ?Sized),
331    age_secs: u64,
332) -> HirnResult<usize> {
333    let age_secs_i64 = i64::try_from(age_secs).unwrap_or(i64::MAX);
334    let cutoff = chrono::Utc::now() - chrono::Duration::seconds(age_secs_i64);
335    let cutoff_ts = hirn_core::timestamp::Timestamp::from_datetime(cutoff);
336
337    let episodes = runtime.list_episodes_for_archival(1000).await?;
338
339    // Collect IDs eligible for archival, then archive.
340    let to_archive: Vec<_> = episodes
341        .iter()
342        .filter(|ep| ep.timestamp < cutoff_ts)
343        .map(|ep| ep.id)
344        .collect();
345
346    let mut archived = 0;
347    for id in &to_archive {
348        runtime.archive_episode_for_compaction(*id).await?;
349        archived += 1;
350    }
351
352    Ok(archived)
353}
354
355#[cfg(test)]
356mod tests {
357    use std::collections::HashSet;
358    use std::sync::Mutex;
359
360    use hirn_core::HirnError;
361    use hirn_core::timestamp::Timestamp;
362    use hirn_core::types::AgentId;
363
364    use super::*;
365
366    struct FakeCompactionStore {
367        existing: HashSet<&'static str>,
368        fail_compact: Option<&'static str>,
369        fail_optimize: Option<&'static str>,
370        optimized: Mutex<Vec<String>>,
371    }
372
373    #[async_trait]
374    impl LifecycleCompactionStore for FakeCompactionStore {
375        async fn exists(&self, dataset: &str) -> HirnResult<bool> {
376            Ok(self.existing.contains(dataset))
377        }
378
379        async fn compact(&self, dataset: &str, _opts: CompactOptions) -> HirnResult<CompactResult> {
380            if self.fail_compact == Some(dataset) {
381                return Err(HirnError::Unsupported(format!(
382                    "simulated compaction failure for {dataset}"
383                )));
384            }
385            Ok(CompactResult {
386                fragments_removed: 2,
387                fragments_added: 1,
388                rows_removed: 0,
389            })
390        }
391
392        async fn optimize_indices(&self, dataset: &str) -> HirnResult<()> {
393            if self.fail_optimize == Some(dataset) {
394                return Err(HirnError::Unsupported(format!(
395                    "simulated optimize failure for {dataset}"
396                )));
397            }
398            self.optimized.lock().unwrap().push(dataset.to_string());
399            Ok(())
400        }
401    }
402
403    struct FakeArchivalRuntime {
404        episodes: Vec<EpisodicRecord>,
405        fail_archive: Option<MemoryId>,
406        archived: Mutex<Vec<MemoryId>>,
407    }
408
409    #[async_trait]
410    impl LifecycleArchivalRuntime for FakeArchivalRuntime {
411        async fn list_episodes_for_archival(
412            &self,
413            _limit: usize,
414        ) -> HirnResult<Vec<EpisodicRecord>> {
415            Ok(self.episodes.clone())
416        }
417
418        async fn archive_episode_for_compaction(&self, id: MemoryId) -> HirnResult<()> {
419            if self.fail_archive == Some(id) {
420                return Err(HirnError::Unsupported(format!(
421                    "simulated archival failure for {id}"
422                )));
423            }
424            self.archived.lock().unwrap().push(id);
425            Ok(())
426        }
427    }
428
429    fn old_episode(content: &str) -> EpisodicRecord {
430        EpisodicRecord::builder()
431            .content(content)
432            .embedding(vec![0.1])
433            .agent_id(AgentId::new("compactor_test").unwrap())
434            .timestamp(Timestamp::from_datetime(
435                chrono::Utc::now() - chrono::Duration::days(90),
436            ))
437            .build()
438            .unwrap()
439    }
440
441    #[tokio::test(flavor = "multi_thread")]
442    async fn compact_all_datasets_fails_on_compaction_error() {
443        let store = FakeCompactionStore {
444            existing: HashSet::from(["episodic"]),
445            fail_compact: Some("episodic"),
446            fail_optimize: None,
447            optimized: Mutex::new(Vec::new()),
448        };
449
450        let error = compact_all_datasets(&store, &CompactOptions::default())
451            .await
452            .expect_err("compaction failure should abort lifecycle compaction");
453        assert!(matches!(error, HirnError::Unsupported(_)));
454        assert!(store.optimized.lock().unwrap().is_empty());
455    }
456
457    #[tokio::test(flavor = "multi_thread")]
458    async fn compact_all_datasets_fails_on_optimize_error() {
459        let store = FakeCompactionStore {
460            existing: HashSet::from(["episodic"]),
461            fail_compact: None,
462            fail_optimize: Some("episodic"),
463            optimized: Mutex::new(Vec::new()),
464        };
465
466        let error = compact_all_datasets(&store, &CompactOptions::default())
467            .await
468            .expect_err("index optimization failure should abort lifecycle compaction");
469        assert!(matches!(error, HirnError::Unsupported(_)));
470    }
471
472    #[tokio::test(flavor = "multi_thread")]
473    async fn archive_old_memories_fails_on_archival_error() {
474        let first = old_episode("first-old");
475        let second = old_episode("second-old");
476        let runtime = FakeArchivalRuntime {
477            fail_archive: Some(first.id),
478            episodes: vec![first.clone(), second],
479            archived: Mutex::new(Vec::new()),
480        };
481
482        let error = archive_old_memories(&runtime, 0)
483            .await
484            .expect_err("archival failure should abort lifecycle compaction");
485        assert!(matches!(error, HirnError::Unsupported(_)));
486        assert!(runtime.archived.lock().unwrap().is_empty());
487    }
488}