Skip to main content

hirn_storage/
compaction.rs

1//! Lifecycle-aware compaction.
2//!
3//! Unifies Lance fragment compaction with cognitive lifecycle operations:
4//! merge small fragments, prune deleted rows, archive cold episodes,
5//! generate semantic summaries, and record provenance links.
6
7use arrow_array::RecordBatch;
8use async_trait::async_trait;
9use futures::TryStreamExt;
10
11use crate::HirnDbError;
12use crate::store::{CompactOptions, PhysicalStore, RecordBatchStream, ScanOptions};
13
14/// Current time as milliseconds since the Unix epoch.
15fn now_millis() -> i64 {
16    std::time::SystemTime::now()
17        .duration_since(std::time::UNIX_EPOCH)
18        .expect("system clock before epoch")
19        .as_millis() as i64
20}
21
22// ── Options ──
23
24/// Options for lifecycle-aware compaction.
25#[derive(Debug, Clone)]
26pub struct LifecycleCompactOptions {
27    /// Retention score below which episodes are considered cold and eligible
28    /// for archival.  Episodes where `importance × retention(t) < threshold`
29    /// will be archived (or summarised first when `summarize` is true).
30    /// Range `[0.0, 1.0]`.  Default `0.0` (disabled).
31    pub archive_threshold: f32,
32
33    /// When `true`, cold episodes are fed to the [`Summarizer`] callback
34    /// before archival so a semantic summary can be persisted.
35    pub summarize: bool,
36
37    /// Maximum number of episodes that may be batched into a single
38    /// summary.  Default `50`.
39    pub max_episodes_per_summary: usize,
40
41    /// Standard Lance compaction options (fragment merging, row grouping).
42    pub compact_opts: CompactOptions,
43
44    /// Maximum number of episode IDs to accumulate in memory during a single
45    /// archival pass (N-M21 — prevents unbounded in-memory accumulation for
46    /// very large datasets). Set to `usize::MAX` to disable the cap.
47    /// Default: `10_000`.
48    pub max_archive_batch: usize,
49
50    /// Restrict compaction to a single realm (namespace).
51    /// When `None`, the entire dataset is compacted.
52    pub realm: Option<String>,
53
54    /// Run `optimize_indices` after compaction.  Default `true`.
55    pub optimize_indices: bool,
56}
57
58impl Default for LifecycleCompactOptions {
59    fn default() -> Self {
60        Self {
61            archive_threshold: 0.0,
62            summarize: false,
63            max_episodes_per_summary: 50,
64            max_archive_batch: 10_000,
65            compact_opts: CompactOptions::default(),
66            realm: None,
67            optimize_indices: true,
68        }
69    }
70}
71
72// ── Result ──
73
74/// Outcome of a lifecycle-aware compaction pass.
75#[derive(Debug, Clone, Default)]
76pub struct LifecycleCompactResult {
77    /// Number of Lance fragments removed by compaction.
78    pub fragments_removed: u64,
79    /// Number of Lance fragments created by compaction.
80    pub fragments_added: u64,
81    /// Number of deleted/pruned rows reclaimed.
82    pub rows_pruned: u64,
83    /// Number of episodes moved to archived status.
84    pub episodes_archived: u64,
85    /// Number of semantic summaries created from archived episodes.
86    pub summaries_created: u64,
87}
88
89// ── Summarizer callback ──
90
91/// Callback trait for generating semantic summaries from cold episodes.
92///
93/// Implementors typically call an LLM to produce a condensed description
94/// of the batch of episodes.  The returned `RecordBatch` must conform to
95/// the **semantic** dataset schema so it can be appended directly.
96#[async_trait]
97pub trait Summarizer: Send + Sync {
98    /// Produce zero or more semantic-dataset `RecordBatch`es from a batch
99    /// of episodic rows that are about to be archived.
100    async fn summarize(&self, episodes: &[RecordBatch]) -> Result<Vec<RecordBatch>, HirnDbError>;
101}
102
103// ── Core function ──
104
105/// Datasets that carry the `archived` and retention-relevant columns.
106const ARCHIVABLE_DATASETS: &[&str] = &["episodic"];
107
108/// Perform a lifecycle-aware compaction pass on a single dataset.
109///
110/// 1. **Fragment compaction** — merge small Lance fragments and prune
111///    deleted rows via `PhysicalStore::compact`.
112/// 2. **Cold episode identification** — scan for episodes whose
113///    `importance × retention(access_age, stability, access_count)`
114///    falls below `archive_threshold`.
115/// 3. **Summarisation** (optional) — feed cold episodes to `summarizer`
116///    and append the resulting semantic records to the `semantic` dataset.
117/// 4. **Archival** — mark cold episodes as `archived = true`.
118/// 5. **Index optimisation** (optional) — call `optimize_indices` on all
119///    modified datasets.
120///
121/// For realm-isolated compaction set `opts.realm`.  Only fragments
122/// belonging to that namespace are scanned for archival; the underlying
123/// `compact()` still operates on the whole dataset (Lance API limitation)
124/// but cold-episode identification and archival are namespace-scoped.
125pub async fn lifecycle_compact(
126    store: &dyn PhysicalStore,
127    dataset: &str,
128    opts: &LifecycleCompactOptions,
129    summarizer: Option<&dyn Summarizer>,
130) -> Result<LifecycleCompactResult, HirnDbError> {
131    let mut result = LifecycleCompactResult::default();
132
133    // ── Step 1: Standard fragment compaction ──
134    let compact_result = store.compact(dataset, opts.compact_opts.clone()).await?;
135    result.fragments_removed = compact_result.fragments_removed;
136    result.fragments_added = compact_result.fragments_added;
137    result.rows_pruned = compact_result.rows_removed;
138
139    // ── Steps 2-4: Cold episode identification + summarisation + archival ──
140    if opts.archive_threshold > 0.0 && ARCHIVABLE_DATASETS.contains(&dataset) {
141        let (archived, summaries) = archive_cold_episodes(store, dataset, opts, summarizer).await?;
142        result.episodes_archived = archived;
143        result.summaries_created = summaries;
144    }
145
146    // ── Step 5: Index optimisation ──
147    if opts.optimize_indices {
148        store.optimize_indices(dataset).await?;
149
150        // If we created summaries, also optimise the semantic dataset.
151        if result.summaries_created > 0 {
152            store.optimize_indices("semantic").await?;
153        }
154    }
155
156    Ok(result)
157}
158
159// ── Cold-episode archival ──
160
161/// Scan for cold episodes, optionally summarise them, then mark archived.
162///
163/// Returns `(episodes_archived, summaries_created)`.
164async fn archive_cold_episodes(
165    store: &dyn PhysicalStore,
166    dataset: &str,
167    opts: &LifecycleCompactOptions,
168    summarizer: Option<&dyn Summarizer>,
169) -> Result<(u64, u64), HirnDbError> {
170    // Build filter: non-archived, optionally scoped to realm.
171    let mut filters: Vec<String> = vec!["archived = false".to_string()];
172
173    if let Some(ref realm) = opts.realm {
174        let escaped = realm.replace('\'', "''");
175        filters.push(format!("namespace = '{escaped}'"));
176    }
177
178    let filter = filters.join(" AND ");
179
180    // Fetch retention-relevant columns.
181    let columns = vec![
182        "id".to_string(),
183        "importance".to_string(),
184        "last_accessed_ms".to_string(),
185        "stability".to_string(),
186        "access_count".to_string(),
187    ];
188
189    let cold_ids = collect_cold_ids_from_stream(
190        store
191            .scan_stream(
192                dataset,
193                ScanOptions {
194                    filter: Some(filter),
195                    exact_filter: None,
196                    columns: Some(columns),
197                    order_by: None,
198                    limit: None,
199                    offset: None,
200                },
201            )
202            .await?,
203        opts.archive_threshold,
204        opts.max_archive_batch,
205    )
206    .await?;
207    if cold_ids.is_empty() {
208        return Ok((0, 0));
209    }
210
211    let episodes_archived = cold_ids.len() as u64;
212    let mut summaries_created: u64 = 0;
213
214    // ── Summarisation ──
215    if opts.summarize
216        && let Some(summarizer) = summarizer
217    {
218        // Fetch full episode rows for cold IDs.
219        let cold_batches = fetch_by_ids(store, dataset, &cold_ids, opts.realm.as_deref()).await?;
220        if !cold_batches.is_empty() {
221            // Chunk into batches of max_episodes_per_summary.
222            for chunk in chunk_batches(&cold_batches, opts.max_episodes_per_summary) {
223                let summaries = summarizer.summarize(&chunk).await?;
224                let non_empty = summaries
225                    .into_iter()
226                    .filter(|batch| batch.num_rows() > 0)
227                    .collect::<Vec<_>>();
228                if !non_empty.is_empty() {
229                    summaries_created += non_empty.len() as u64;
230                    store.append_batches("semantic", non_empty).await?;
231                }
232            }
233        }
234    }
235
236    // ── Mark archived ──
237    archive_by_ids(store, dataset, &cold_ids).await?;
238
239    Ok((episodes_archived, summaries_created))
240}
241
242fn collect_cold_ids_from_batch(
243    batch: &RecordBatch,
244    threshold: f32,
245    cold_ids: &mut Vec<String>,
246) -> Result<(), HirnDbError> {
247    use arrow_array::{Float32Array, Int64Array, StringArray, UInt64Array};
248
249    let now_ms = now_millis();
250    let ids = batch
251        .column_by_name("id")
252        .and_then(|c| c.as_any().downcast_ref::<StringArray>())
253        .ok_or_else(|| HirnDbError::InvalidArgument("missing id column".into()))?;
254    let importances = batch
255        .column_by_name("importance")
256        .and_then(|c| c.as_any().downcast_ref::<Float32Array>())
257        .ok_or_else(|| HirnDbError::InvalidArgument("missing importance column".into()))?;
258    let last_accessed = batch
259        .column_by_name("last_accessed_ms")
260        .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
261        .ok_or_else(|| HirnDbError::InvalidArgument("missing last_accessed_ms column".into()))?;
262    let stabilities = batch
263        .column_by_name("stability")
264        .and_then(|c| c.as_any().downcast_ref::<Float32Array>())
265        .ok_or_else(|| HirnDbError::InvalidArgument("missing stability column".into()))?;
266    let access_counts = batch
267        .column_by_name("access_count")
268        .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
269        .ok_or_else(|| HirnDbError::InvalidArgument("missing access_count column".into()))?;
270
271    for i in 0..batch.num_rows() {
272        let importance = importances.value(i);
273        let last_ms = last_accessed.value(i);
274        let stability = stabilities.value(i);
275        let access_count = access_counts.value(i);
276
277        let hours_since = (now_ms - last_ms) as f64 / 3_600_000.0;
278        let retention = retention_score(hours_since, stability, access_count);
279        let effective = importance * retention;
280
281        if effective < threshold {
282            cold_ids.push(ids.value(i).to_string());
283        }
284    }
285
286    Ok(())
287}
288
289/// Collect episode IDs whose computed retention falls below `threshold`.
290///
291/// Collection is capped at `max_ids` entries to prevent unbounded in-memory
292/// accumulation for very large datasets (N-M21). Callers should call this
293/// function in a loop for full coverage if needed.
294async fn collect_cold_ids_from_stream(
295    mut stream: RecordBatchStream,
296    threshold: f32,
297    max_ids: usize,
298) -> Result<Vec<String>, HirnDbError> {
299    let mut cold_ids = Vec::new();
300    while let Some(batch) = stream.try_next().await? {
301        collect_cold_ids_from_batch(&batch, threshold, &mut cold_ids)?;
302        if cold_ids.len() >= max_ids {
303            cold_ids.truncate(max_ids);
304            break;
305        }
306    }
307    Ok(cold_ids)
308}
309
310/// Ebbinghaus retention score: `R = e^(-t / S)` where
311/// `S = stability × (1 + 0.5 × ln(rehearsal_count))`.
312fn retention_score(hours_since_access: f64, stability: f32, rehearsal_count: u64) -> f32 {
313    let effective_stability = stability as f64 * (1.0 + 0.5 * (rehearsal_count.max(1) as f64).ln());
314    if effective_stability <= 0.0 {
315        return 0.0;
316    }
317    (-hours_since_access / effective_stability).exp() as f32
318}
319
320/// Fetch full rows for the given IDs, optionally filtered by realm.
321async fn fetch_by_ids(
322    store: &dyn PhysicalStore,
323    dataset: &str,
324    ids: &[String],
325    realm: Option<&str>,
326) -> Result<Vec<RecordBatch>, HirnDbError> {
327    if ids.is_empty() {
328        return Ok(Vec::new());
329    }
330
331    // Build IN predicate for IDs (batch to avoid oversized predicates).
332    let mut all_batches = Vec::new();
333
334    for chunk in ids.chunks(500) {
335        let in_list: Vec<String> = chunk
336            .iter()
337            .map(|id| {
338                let escaped = id.replace('\'', "''");
339                format!("'{escaped}'")
340            })
341            .collect();
342
343        let mut filter = format!("id IN ({})", in_list.join(", "));
344
345        if let Some(r) = realm {
346            let escaped_realm = r.replace('\'', "''");
347            filter = format!("({filter}) AND namespace = '{escaped_realm}'");
348        }
349
350        let batches = store
351            .scan(
352                dataset,
353                ScanOptions {
354                    filter: Some(filter),
355                    exact_filter: None,
356                    columns: None,
357                    order_by: None,
358                    limit: None,
359                    offset: None,
360                },
361            )
362            .await?;
363
364        all_batches.extend(batches);
365    }
366
367    Ok(all_batches)
368}
369
370/// Chunk batches by total row count.
371fn chunk_batches(batches: &[RecordBatch], max_rows: usize) -> Vec<Vec<RecordBatch>> {
372    let mut chunks = Vec::new();
373    let mut current_chunk = Vec::new();
374    let mut current_rows = 0usize;
375
376    for batch in batches {
377        if current_rows + batch.num_rows() > max_rows && !current_chunk.is_empty() {
378            chunks.push(std::mem::take(&mut current_chunk));
379            current_rows = 0;
380        }
381        current_rows += batch.num_rows();
382        current_chunk.push(batch.clone());
383    }
384
385    if !current_chunk.is_empty() {
386        chunks.push(current_chunk);
387    }
388
389    chunks
390}
391
392/// Mark episodes as archived with a narrow in-place update.
393///
394/// Uses `PhysicalStore::update_where` — a targeted `SET archived = true WHERE id IN (…)`
395/// statement — rather than a full-row scan → modify → merge_insert.  The old approach
396/// was a non-atomic read-modify-write that could silently clobber concurrent column
397/// writes that occurred between the scan and the re-insert (N-C2).
398async fn archive_by_ids(
399    store: &dyn PhysicalStore,
400    dataset: &str,
401    ids: &[String],
402) -> Result<(), HirnDbError> {
403    if ids.is_empty() {
404        return Ok(());
405    }
406
407    for chunk in ids.chunks(500) {
408        let in_list: Vec<String> = chunk
409            .iter()
410            .map(|id| {
411                let escaped = id.replace('\'', "''");
412                format!("'{escaped}'")
413            })
414            .collect();
415
416        let filter = format!("id IN ({})", in_list.join(", "));
417        store
418            .update_where(dataset, &filter, &[("archived", "true")])
419            .await?;
420    }
421
422    Ok(())
423}
424
425#[cfg(test)]
426mod tests {
427    use super::*;
428    use crate::memory_store::MemoryStore;
429    use crate::store::ScanOptions;
430    use arrow_array::{BooleanArray, Float32Array, Int64Array, StringArray, UInt64Array};
431    use arrow_schema::{DataType, Field, Schema};
432    use std::sync::Arc;
433
434    /// Minimal episodic-like schema for testing.
435    fn test_episodic_schema() -> Arc<Schema> {
436        Arc::new(Schema::new(vec![
437            Field::new("id", DataType::Utf8, false),
438            Field::new("importance", DataType::Float32, false),
439            Field::new("last_accessed_ms", DataType::Int64, false),
440            Field::new("stability", DataType::Float32, false),
441            Field::new("access_count", DataType::UInt64, false),
442            Field::new("archived", DataType::Boolean, false),
443            Field::new("namespace", DataType::Utf8, false),
444            Field::new("content", DataType::Utf8, false),
445        ]))
446    }
447
448    #[allow(clippy::too_many_arguments)]
449    fn make_episode(
450        id: &str,
451        importance: f32,
452        last_accessed_ms: i64,
453        stability: f32,
454        access_count: u64,
455        archived: bool,
456        namespace: &str,
457        content: &str,
458    ) -> RecordBatch {
459        RecordBatch::try_new(
460            test_episodic_schema(),
461            vec![
462                Arc::new(StringArray::from(vec![id])),
463                Arc::new(Float32Array::from(vec![importance])),
464                Arc::new(Int64Array::from(vec![last_accessed_ms])),
465                Arc::new(Float32Array::from(vec![stability])),
466                Arc::new(UInt64Array::from(vec![access_count])),
467                Arc::new(BooleanArray::from(vec![archived])),
468                Arc::new(StringArray::from(vec![namespace])),
469                Arc::new(StringArray::from(vec![content])),
470            ],
471        )
472        .unwrap()
473    }
474
475    async fn seed_episodes(store: &MemoryStore, episodes: Vec<RecordBatch>) {
476        // Create dataset with first batch.
477        for batch in episodes {
478            store.append("episodic", batch).await.unwrap();
479        }
480    }
481
482    #[tokio::test(flavor = "multi_thread")]
483    async fn compact_merges_fragments() {
484        let store = MemoryStore::new();
485        let schema = test_episodic_schema();
486
487        // Create 10 tiny fragments.
488        for i in 0..10 {
489            let batch = RecordBatch::try_new(
490                schema.clone(),
491                vec![
492                    Arc::new(StringArray::from(vec![format!("ep-{i}")])),
493                    Arc::new(Float32Array::from(vec![0.9])),
494                    Arc::new(Int64Array::from(vec![now_millis()])),
495                    Arc::new(Float32Array::from(vec![100.0])),
496                    Arc::new(UInt64Array::from(vec![5u64])),
497                    Arc::new(BooleanArray::from(vec![false])),
498                    Arc::new(StringArray::from(vec!["default"])),
499                    Arc::new(StringArray::from(vec![format!("content {i}")])),
500                ],
501            )
502            .unwrap();
503            store.append("episodic", batch).await.unwrap();
504        }
505
506        let opts = LifecycleCompactOptions::default();
507        let result = lifecycle_compact(&store, "episodic", &opts, None)
508            .await
509            .unwrap();
510
511        // MemoryStore compact is a no-op, so fragments_removed = 0.
512        assert_eq!(result.fragments_removed, 0);
513        // But the data should still be intact.
514        let count = store.count("episodic", None).await.unwrap();
515        assert_eq!(count, 10);
516    }
517
518    #[tokio::test(flavor = "multi_thread")]
519    async fn archive_cold_episodes_below_threshold() {
520        let store = MemoryStore::new();
521
522        // 1 hour ago in ms.
523        let one_hour_ago = now_millis() - 3_600_000;
524        // 30 days ago in ms.
525        let thirty_days_ago = now_millis() - 30 * 24 * 3_600_000;
526
527        let episodes = vec![
528            // Recent, high importance → should NOT be archived.
529            make_episode(
530                "ep-hot",
531                0.9,
532                one_hour_ago,
533                100.0,
534                10,
535                false,
536                "default",
537                "hot",
538            ),
539            // Very old, low importance, low stability → should be archived.
540            make_episode(
541                "ep-cold",
542                0.1,
543                thirty_days_ago,
544                1.0,
545                1,
546                false,
547                "default",
548                "cold",
549            ),
550        ];
551        seed_episodes(&store, episodes).await;
552
553        let opts = LifecycleCompactOptions {
554            archive_threshold: 0.05,
555            optimize_indices: false,
556            ..Default::default()
557        };
558
559        let result = lifecycle_compact(&store, "episodic", &opts, None)
560            .await
561            .unwrap();
562
563        assert_eq!(result.episodes_archived, 1);
564
565        // Verify the cold episode is now archived.
566        let batches = store
567            .scan(
568                "episodic",
569                ScanOptions {
570                    filter: Some("id = 'ep-cold'".to_string()),
571                    columns: Some(vec!["id".to_string(), "archived".to_string()]),
572                    ..Default::default()
573                },
574            )
575            .await
576            .unwrap();
577
578        assert_eq!(batches.len(), 1);
579        let archived_col = batches[0]
580            .column_by_name("archived")
581            .unwrap()
582            .as_any()
583            .downcast_ref::<BooleanArray>()
584            .unwrap();
585        assert!(archived_col.value(0));
586
587        // Hot episode should remain not archived.
588        let batches = store
589            .scan(
590                "episodic",
591                ScanOptions {
592                    filter: Some("id = 'ep-hot'".to_string()),
593                    columns: Some(vec!["id".to_string(), "archived".to_string()]),
594                    ..Default::default()
595                },
596            )
597            .await
598            .unwrap();
599        assert_eq!(batches.len(), 1);
600        let archived_col = batches[0]
601            .column_by_name("archived")
602            .unwrap()
603            .as_any()
604            .downcast_ref::<BooleanArray>()
605            .unwrap();
606        assert!(!archived_col.value(0));
607    }
608
609    #[tokio::test(flavor = "multi_thread")]
610    async fn archive_threshold_zero_skips_archival() {
611        let store = MemoryStore::new();
612        let thirty_days_ago = now_millis() - 30 * 24 * 3_600_000;
613
614        let episodes = vec![make_episode(
615            "ep-1",
616            0.01,
617            thirty_days_ago,
618            1.0,
619            1,
620            false,
621            "default",
622            "old",
623        )];
624        seed_episodes(&store, episodes).await;
625
626        // archive_threshold = 0 → disabled.
627        let opts = LifecycleCompactOptions {
628            archive_threshold: 0.0,
629            optimize_indices: false,
630            ..Default::default()
631        };
632
633        let result = lifecycle_compact(&store, "episodic", &opts, None)
634            .await
635            .unwrap();
636
637        assert_eq!(result.episodes_archived, 0);
638    }
639
640    #[tokio::test(flavor = "multi_thread")]
641    async fn realm_isolated_archival() {
642        let store = MemoryStore::new();
643        let thirty_days_ago = now_millis() - 30 * 24 * 3_600_000;
644
645        let episodes = vec![
646            make_episode(
647                "ep-a",
648                0.05,
649                thirty_days_ago,
650                1.0,
651                1,
652                false,
653                "realm_a",
654                "a-content",
655            ),
656            make_episode(
657                "ep-b",
658                0.05,
659                thirty_days_ago,
660                1.0,
661                1,
662                false,
663                "realm_b",
664                "b-content",
665            ),
666        ];
667        seed_episodes(&store, episodes).await;
668
669        // Only compact realm_a.
670        let opts = LifecycleCompactOptions {
671            archive_threshold: 0.1,
672            realm: Some("realm_a".to_string()),
673            optimize_indices: false,
674            ..Default::default()
675        };
676
677        let result = lifecycle_compact(&store, "episodic", &opts, None)
678            .await
679            .unwrap();
680
681        assert_eq!(result.episodes_archived, 1);
682
683        // realm_a episode archived.
684        let batches = store
685            .scan(
686                "episodic",
687                ScanOptions {
688                    filter: Some("id = 'ep-a'".to_string()),
689                    columns: Some(vec!["archived".to_string()]),
690                    ..Default::default()
691                },
692            )
693            .await
694            .unwrap();
695        let archived = batches[0]
696            .column_by_name("archived")
697            .unwrap()
698            .as_any()
699            .downcast_ref::<BooleanArray>()
700            .unwrap();
701        assert!(archived.value(0));
702
703        // realm_b episode NOT archived.
704        let batches = store
705            .scan(
706                "episodic",
707                ScanOptions {
708                    filter: Some("id = 'ep-b'".to_string()),
709                    columns: Some(vec!["archived".to_string()]),
710                    ..Default::default()
711                },
712            )
713            .await
714            .unwrap();
715        let archived = batches[0]
716            .column_by_name("archived")
717            .unwrap()
718            .as_any()
719            .downcast_ref::<BooleanArray>()
720            .unwrap();
721        assert!(!archived.value(0));
722    }
723
724    /// A test-only summarizer that produces a minimal semantic batch.
725    struct TestSummarizer;
726
727    #[async_trait]
728    impl Summarizer for TestSummarizer {
729        async fn summarize(
730            &self,
731            episodes: &[RecordBatch],
732        ) -> Result<Vec<RecordBatch>, HirnDbError> {
733            // Count total rows across input batches.
734            let total_rows: usize = episodes.iter().map(|b| b.num_rows()).sum();
735
736            // Produce a single semantic-like batch (minimal schema).
737            let schema = Arc::new(Schema::new(vec![
738                Field::new("id", DataType::Utf8, false),
739                Field::new("summary", DataType::Utf8, false),
740            ]));
741
742            let batch = RecordBatch::try_new(
743                schema,
744                vec![
745                    Arc::new(StringArray::from(vec!["summary-1"])),
746                    Arc::new(StringArray::from(vec![format!(
747                        "Summary of {total_rows} episodes"
748                    )])),
749                ],
750            )
751            .map_err(HirnDbError::from)?;
752
753            Ok(vec![batch])
754        }
755    }
756
757    #[tokio::test(flavor = "multi_thread")]
758    async fn summarizer_callback_invoked_for_cold_episodes() {
759        let store = MemoryStore::new();
760        let thirty_days_ago = now_millis() - 30 * 24 * 3_600_000;
761
762        let episodes = vec![
763            make_episode(
764                "ep-cold-1",
765                0.02,
766                thirty_days_ago,
767                1.0,
768                1,
769                false,
770                "default",
771                "c1",
772            ),
773            make_episode(
774                "ep-cold-2",
775                0.02,
776                thirty_days_ago,
777                1.0,
778                1,
779                false,
780                "default",
781                "c2",
782            ),
783        ];
784        seed_episodes(&store, episodes).await;
785
786        let summarizer = TestSummarizer;
787        let opts = LifecycleCompactOptions {
788            archive_threshold: 0.1,
789            summarize: true,
790            max_episodes_per_summary: 10,
791            optimize_indices: false,
792            ..Default::default()
793        };
794
795        let result = lifecycle_compact(&store, "episodic", &opts, Some(&summarizer))
796            .await
797            .unwrap();
798
799        assert_eq!(result.episodes_archived, 2);
800        assert_eq!(result.summaries_created, 1);
801
802        // Verify semantic dataset received the summary.
803        let sem_batches = store
804            .scan("semantic", ScanOptions::default())
805            .await
806            .unwrap();
807        assert!(!sem_batches.is_empty());
808        let total_rows: usize = sem_batches.iter().map(|b| b.num_rows()).sum();
809        assert_eq!(total_rows, 1);
810    }
811
812    #[tokio::test(flavor = "multi_thread")]
813    async fn summarize_false_skips_summarizer() {
814        let store = MemoryStore::new();
815        let thirty_days_ago = now_millis() - 30 * 24 * 3_600_000;
816
817        let episodes = vec![make_episode(
818            "ep-cold",
819            0.02,
820            thirty_days_ago,
821            1.0,
822            1,
823            false,
824            "default",
825            "content",
826        )];
827        seed_episodes(&store, episodes).await;
828
829        let summarizer = TestSummarizer;
830        let opts = LifecycleCompactOptions {
831            archive_threshold: 0.1,
832            summarize: false,
833            optimize_indices: false,
834            ..Default::default()
835        };
836
837        let result = lifecycle_compact(&store, "episodic", &opts, Some(&summarizer))
838            .await
839            .unwrap();
840
841        assert_eq!(result.episodes_archived, 1);
842        assert_eq!(result.summaries_created, 0);
843    }
844
845    #[tokio::test(flavor = "multi_thread")]
846    async fn non_archivable_dataset_skips_archival() {
847        let store = MemoryStore::new();
848
849        // Create some data in a non-archivable dataset.
850        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
851        let batch =
852            RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(vec!["row-1"]))]).unwrap();
853        store.append("graph_nodes", batch).await.unwrap();
854
855        let opts = LifecycleCompactOptions {
856            archive_threshold: 0.5,
857            optimize_indices: false,
858            ..Default::default()
859        };
860
861        let result = lifecycle_compact(&store, "graph_nodes", &opts, None)
862            .await
863            .unwrap();
864
865        // No archival for non-episodic datasets.
866        assert_eq!(result.episodes_archived, 0);
867        assert_eq!(result.summaries_created, 0);
868    }
869
870    #[tokio::test(flavor = "multi_thread")]
871    async fn retention_score_computation() {
872        // Just accessed → retention ≈ 1.0.
873        let r = retention_score(0.0, 100.0, 5);
874        assert!((r - 1.0).abs() < 0.01);
875
876        // 24h ago, stability=1.0, 1 rehearsal → very low.
877        let r = retention_score(24.0, 1.0, 1);
878        assert!(r < 0.01);
879
880        // Zero stability → 0.0 (no crash).
881        let r = retention_score(10.0, 0.0, 1);
882        assert_eq!(r, 0.0);
883    }
884
885    #[tokio::test(flavor = "multi_thread")]
886    async fn chunk_batches_respects_max_rows() {
887        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
888
889        let make_batch = |n: usize| {
890            let ids: Vec<String> = (0..n).map(|i| format!("id-{i}")).collect();
891            let refs: Vec<&str> = ids.iter().map(|s| s.as_str()).collect();
892            RecordBatch::try_new(schema.clone(), vec![Arc::new(StringArray::from(refs))]).unwrap()
893        };
894
895        let batches = vec![make_batch(3), make_batch(3), make_batch(3)];
896        let chunks = chunk_batches(&batches, 5);
897
898        // 3 + 3 > 5 → chunk after first; then 3 + 3 > 5 → chunk after second.
899        assert_eq!(chunks.len(), 3);
900    }
901
902    #[tokio::test(flavor = "multi_thread")]
903    async fn already_archived_episodes_not_rearchived() {
904        let store = MemoryStore::new();
905        let thirty_days_ago = now_millis() - 30 * 24 * 3_600_000;
906
907        let episodes = vec![
908            // Already archived → filter `archived = false` excludes it.
909            make_episode(
910                "ep-already",
911                0.01,
912                thirty_days_ago,
913                1.0,
914                1,
915                true,
916                "default",
917                "old",
918            ),
919            // Not archived, but high retention → not cold.
920            make_episode(
921                "ep-hot",
922                0.9,
923                now_millis(),
924                100.0,
925                10,
926                false,
927                "default",
928                "hot",
929            ),
930        ];
931        seed_episodes(&store, episodes).await;
932
933        let opts = LifecycleCompactOptions {
934            archive_threshold: 0.05,
935            optimize_indices: false,
936            ..Default::default()
937        };
938
939        let result = lifecycle_compact(&store, "episodic", &opts, None)
940            .await
941            .unwrap();
942
943        assert_eq!(result.episodes_archived, 0);
944    }
945}