1use arrow_array::RecordBatch;
8use async_trait::async_trait;
9use futures::TryStreamExt;
10
11use crate::HirnDbError;
12use crate::store::{CompactOptions, PhysicalStore, RecordBatchStream, ScanOptions};
13
14fn now_millis() -> i64 {
16 std::time::SystemTime::now()
17 .duration_since(std::time::UNIX_EPOCH)
18 .expect("system clock before epoch")
19 .as_millis() as i64
20}
21
22#[derive(Debug, Clone)]
26pub struct LifecycleCompactOptions {
27 pub archive_threshold: f32,
32
33 pub summarize: bool,
36
37 pub max_episodes_per_summary: usize,
40
41 pub compact_opts: CompactOptions,
43
44 pub max_archive_batch: usize,
49
50 pub realm: Option<String>,
53
54 pub optimize_indices: bool,
56}
57
58impl Default for LifecycleCompactOptions {
59 fn default() -> Self {
60 Self {
61 archive_threshold: 0.0,
62 summarize: false,
63 max_episodes_per_summary: 50,
64 max_archive_batch: 10_000,
65 compact_opts: CompactOptions::default(),
66 realm: None,
67 optimize_indices: true,
68 }
69 }
70}
71
72#[derive(Debug, Clone, Default)]
76pub struct LifecycleCompactResult {
77 pub fragments_removed: u64,
79 pub fragments_added: u64,
81 pub rows_pruned: u64,
83 pub episodes_archived: u64,
85 pub summaries_created: u64,
87}
88
89#[async_trait]
97pub trait Summarizer: Send + Sync {
98 async fn summarize(&self, episodes: &[RecordBatch]) -> Result<Vec<RecordBatch>, HirnDbError>;
101}
102
103const ARCHIVABLE_DATASETS: &[&str] = &["episodic"];
107
108pub async fn lifecycle_compact(
126 store: &dyn PhysicalStore,
127 dataset: &str,
128 opts: &LifecycleCompactOptions,
129 summarizer: Option<&dyn Summarizer>,
130) -> Result<LifecycleCompactResult, HirnDbError> {
131 let mut result = LifecycleCompactResult::default();
132
133 let compact_result = store.compact(dataset, opts.compact_opts.clone()).await?;
135 result.fragments_removed = compact_result.fragments_removed;
136 result.fragments_added = compact_result.fragments_added;
137 result.rows_pruned = compact_result.rows_removed;
138
139 if opts.archive_threshold > 0.0 && ARCHIVABLE_DATASETS.contains(&dataset) {
141 let (archived, summaries) = archive_cold_episodes(store, dataset, opts, summarizer).await?;
142 result.episodes_archived = archived;
143 result.summaries_created = summaries;
144 }
145
146 if opts.optimize_indices {
148 store.optimize_indices(dataset).await?;
149
150 if result.summaries_created > 0 {
152 store.optimize_indices("semantic").await?;
153 }
154 }
155
156 Ok(result)
157}
158
159async fn archive_cold_episodes(
165 store: &dyn PhysicalStore,
166 dataset: &str,
167 opts: &LifecycleCompactOptions,
168 summarizer: Option<&dyn Summarizer>,
169) -> Result<(u64, u64), HirnDbError> {
170 let mut filters: Vec<String> = vec!["archived = false".to_string()];
172
173 if let Some(ref realm) = opts.realm {
174 let escaped = realm.replace('\'', "''");
175 filters.push(format!("namespace = '{escaped}'"));
176 }
177
178 let filter = filters.join(" AND ");
179
180 let columns = vec![
182 "id".to_string(),
183 "importance".to_string(),
184 "last_accessed_ms".to_string(),
185 "stability".to_string(),
186 "access_count".to_string(),
187 ];
188
189 let cold_ids = collect_cold_ids_from_stream(
190 store
191 .scan_stream(
192 dataset,
193 ScanOptions {
194 filter: Some(filter),
195 exact_filter: None,
196 columns: Some(columns),
197 order_by: None,
198 limit: None,
199 offset: None,
200 },
201 )
202 .await?,
203 opts.archive_threshold,
204 opts.max_archive_batch,
205 )
206 .await?;
207 if cold_ids.is_empty() {
208 return Ok((0, 0));
209 }
210
211 let episodes_archived = cold_ids.len() as u64;
212 let mut summaries_created: u64 = 0;
213
214 if opts.summarize
216 && let Some(summarizer) = summarizer
217 {
218 let cold_batches = fetch_by_ids(store, dataset, &cold_ids, opts.realm.as_deref()).await?;
220 if !cold_batches.is_empty() {
221 for chunk in chunk_batches(&cold_batches, opts.max_episodes_per_summary) {
223 let summaries = summarizer.summarize(&chunk).await?;
224 let non_empty = summaries
225 .into_iter()
226 .filter(|batch| batch.num_rows() > 0)
227 .collect::<Vec<_>>();
228 if !non_empty.is_empty() {
229 summaries_created += non_empty.len() as u64;
230 store.append_batches("semantic", non_empty).await?;
231 }
232 }
233 }
234 }
235
236 archive_by_ids(store, dataset, &cold_ids).await?;
238
239 Ok((episodes_archived, summaries_created))
240}
241
242fn collect_cold_ids_from_batch(
243 batch: &RecordBatch,
244 threshold: f32,
245 cold_ids: &mut Vec<String>,
246) -> Result<(), HirnDbError> {
247 use arrow_array::{Float32Array, Int64Array, StringArray, UInt64Array};
248
249 let now_ms = now_millis();
250 let ids = batch
251 .column_by_name("id")
252 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
253 .ok_or_else(|| HirnDbError::InvalidArgument("missing id column".into()))?;
254 let importances = batch
255 .column_by_name("importance")
256 .and_then(|c| c.as_any().downcast_ref::<Float32Array>())
257 .ok_or_else(|| HirnDbError::InvalidArgument("missing importance column".into()))?;
258 let last_accessed = batch
259 .column_by_name("last_accessed_ms")
260 .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
261 .ok_or_else(|| HirnDbError::InvalidArgument("missing last_accessed_ms column".into()))?;
262 let stabilities = batch
263 .column_by_name("stability")
264 .and_then(|c| c.as_any().downcast_ref::<Float32Array>())
265 .ok_or_else(|| HirnDbError::InvalidArgument("missing stability column".into()))?;
266 let access_counts = batch
267 .column_by_name("access_count")
268 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
269 .ok_or_else(|| HirnDbError::InvalidArgument("missing access_count column".into()))?;
270
271 for i in 0..batch.num_rows() {
272 let importance = importances.value(i);
273 let last_ms = last_accessed.value(i);
274 let stability = stabilities.value(i);
275 let access_count = access_counts.value(i);
276
277 let hours_since = (now_ms - last_ms) as f64 / 3_600_000.0;
278 let retention = retention_score(hours_since, stability, access_count);
279 let effective = importance * retention;
280
281 if effective < threshold {
282 cold_ids.push(ids.value(i).to_string());
283 }
284 }
285
286 Ok(())
287}
288
289async fn collect_cold_ids_from_stream(
295 mut stream: RecordBatchStream,
296 threshold: f32,
297 max_ids: usize,
298) -> Result<Vec<String>, HirnDbError> {
299 let mut cold_ids = Vec::new();
300 while let Some(batch) = stream.try_next().await? {
301 collect_cold_ids_from_batch(&batch, threshold, &mut cold_ids)?;
302 if cold_ids.len() >= max_ids {
303 cold_ids.truncate(max_ids);
304 break;
305 }
306 }
307 Ok(cold_ids)
308}
309
310fn retention_score(hours_since_access: f64, stability: f32, rehearsal_count: u64) -> f32 {
313 let effective_stability = stability as f64 * (1.0 + 0.5 * (rehearsal_count.max(1) as f64).ln());
314 if effective_stability <= 0.0 {
315 return 0.0;
316 }
317 (-hours_since_access / effective_stability).exp() as f32
318}
319
320async fn fetch_by_ids(
322 store: &dyn PhysicalStore,
323 dataset: &str,
324 ids: &[String],
325 realm: Option<&str>,
326) -> Result<Vec<RecordBatch>, HirnDbError> {
327 if ids.is_empty() {
328 return Ok(Vec::new());
329 }
330
331 let mut all_batches = Vec::new();
333
334 for chunk in ids.chunks(500) {
335 let in_list: Vec<String> = chunk
336 .iter()
337 .map(|id| {
338 let escaped = id.replace('\'', "''");
339 format!("'{escaped}'")
340 })
341 .collect();
342
343 let mut filter = format!("id IN ({})", in_list.join(", "));
344
345 if let Some(r) = realm {
346 let escaped_realm = r.replace('\'', "''");
347 filter = format!("({filter}) AND namespace = '{escaped_realm}'");
348 }
349
350 let batches = store
351 .scan(
352 dataset,
353 ScanOptions {
354 filter: Some(filter),
355 exact_filter: None,
356 columns: None,
357 order_by: None,
358 limit: None,
359 offset: None,
360 },
361 )
362 .await?;
363
364 all_batches.extend(batches);
365 }
366
367 Ok(all_batches)
368}
369
370fn chunk_batches(batches: &[RecordBatch], max_rows: usize) -> Vec<Vec<RecordBatch>> {
372 let mut chunks = Vec::new();
373 let mut current_chunk = Vec::new();
374 let mut current_rows = 0usize;
375
376 for batch in batches {
377 if current_rows + batch.num_rows() > max_rows && !current_chunk.is_empty() {
378 chunks.push(std::mem::take(&mut current_chunk));
379 current_rows = 0;
380 }
381 current_rows += batch.num_rows();
382 current_chunk.push(batch.clone());
383 }
384
385 if !current_chunk.is_empty() {
386 chunks.push(current_chunk);
387 }
388
389 chunks
390}
391
392async fn archive_by_ids(
399 store: &dyn PhysicalStore,
400 dataset: &str,
401 ids: &[String],
402) -> Result<(), HirnDbError> {
403 if ids.is_empty() {
404 return Ok(());
405 }
406
407 for chunk in ids.chunks(500) {
408 let in_list: Vec<String> = chunk
409 .iter()
410 .map(|id| {
411 let escaped = id.replace('\'', "''");
412 format!("'{escaped}'")
413 })
414 .collect();
415
416 let filter = format!("id IN ({})", in_list.join(", "));
417 store
418 .update_where(dataset, &filter, &[("archived", "true")])
419 .await?;
420 }
421
422 Ok(())
423}
424
425#[cfg(test)]
426mod tests {
427 use super::*;
428 use crate::memory_store::MemoryStore;
429 use crate::store::ScanOptions;
430 use arrow_array::{BooleanArray, Float32Array, Int64Array, StringArray, UInt64Array};
431 use arrow_schema::{DataType, Field, Schema};
432 use std::sync::Arc;
433
434 fn test_episodic_schema() -> Arc<Schema> {
436 Arc::new(Schema::new(vec![
437 Field::new("id", DataType::Utf8, false),
438 Field::new("importance", DataType::Float32, false),
439 Field::new("last_accessed_ms", DataType::Int64, false),
440 Field::new("stability", DataType::Float32, false),
441 Field::new("access_count", DataType::UInt64, false),
442 Field::new("archived", DataType::Boolean, false),
443 Field::new("namespace", DataType::Utf8, false),
444 Field::new("content", DataType::Utf8, false),
445 ]))
446 }
447
448 #[allow(clippy::too_many_arguments)]
449 fn make_episode(
450 id: &str,
451 importance: f32,
452 last_accessed_ms: i64,
453 stability: f32,
454 access_count: u64,
455 archived: bool,
456 namespace: &str,
457 content: &str,
458 ) -> RecordBatch {
459 RecordBatch::try_new(
460 test_episodic_schema(),
461 vec![
462 Arc::new(StringArray::from(vec![id])),
463 Arc::new(Float32Array::from(vec![importance])),
464 Arc::new(Int64Array::from(vec![last_accessed_ms])),
465 Arc::new(Float32Array::from(vec![stability])),
466 Arc::new(UInt64Array::from(vec![access_count])),
467 Arc::new(BooleanArray::from(vec![archived])),
468 Arc::new(StringArray::from(vec![namespace])),
469 Arc::new(StringArray::from(vec![content])),
470 ],
471 )
472 .unwrap()
473 }
474
475 async fn seed_episodes(store: &MemoryStore, episodes: Vec<RecordBatch>) {
476 for batch in episodes {
478 store.append("episodic", batch).await.unwrap();
479 }
480 }
481
482 #[tokio::test(flavor = "multi_thread")]
483 async fn compact_merges_fragments() {
484 let store = MemoryStore::new();
485 let schema = test_episodic_schema();
486
487 for i in 0..10 {
489 let batch = RecordBatch::try_new(
490 schema.clone(),
491 vec![
492 Arc::new(StringArray::from(vec![format!("ep-{i}")])),
493 Arc::new(Float32Array::from(vec![0.9])),
494 Arc::new(Int64Array::from(vec![now_millis()])),
495 Arc::new(Float32Array::from(vec![100.0])),
496 Arc::new(UInt64Array::from(vec![5u64])),
497 Arc::new(BooleanArray::from(vec![false])),
498 Arc::new(StringArray::from(vec!["default"])),
499 Arc::new(StringArray::from(vec![format!("content {i}")])),
500 ],
501 )
502 .unwrap();
503 store.append("episodic", batch).await.unwrap();
504 }
505
506 let opts = LifecycleCompactOptions::default();
507 let result = lifecycle_compact(&store, "episodic", &opts, None)
508 .await
509 .unwrap();
510
511 assert_eq!(result.fragments_removed, 0);
513 let count = store.count("episodic", None).await.unwrap();
515 assert_eq!(count, 10);
516 }
517
518 #[tokio::test(flavor = "multi_thread")]
519 async fn archive_cold_episodes_below_threshold() {
520 let store = MemoryStore::new();
521
522 let one_hour_ago = now_millis() - 3_600_000;
524 let thirty_days_ago = now_millis() - 30 * 24 * 3_600_000;
526
527 let episodes = vec![
528 make_episode(
530 "ep-hot",
531 0.9,
532 one_hour_ago,
533 100.0,
534 10,
535 false,
536 "default",
537 "hot",
538 ),
539 make_episode(
541 "ep-cold",
542 0.1,
543 thirty_days_ago,
544 1.0,
545 1,
546 false,
547 "default",
548 "cold",
549 ),
550 ];
551 seed_episodes(&store, episodes).await;
552
553 let opts = LifecycleCompactOptions {
554 archive_threshold: 0.05,
555 optimize_indices: false,
556 ..Default::default()
557 };
558
559 let result = lifecycle_compact(&store, "episodic", &opts, None)
560 .await
561 .unwrap();
562
563 assert_eq!(result.episodes_archived, 1);
564
565 let batches = store
567 .scan(
568 "episodic",
569 ScanOptions {
570 filter: Some("id = 'ep-cold'".to_string()),
571 columns: Some(vec!["id".to_string(), "archived".to_string()]),
572 ..Default::default()
573 },
574 )
575 .await
576 .unwrap();
577
578 assert_eq!(batches.len(), 1);
579 let archived_col = batches[0]
580 .column_by_name("archived")
581 .unwrap()
582 .as_any()
583 .downcast_ref::<BooleanArray>()
584 .unwrap();
585 assert!(archived_col.value(0));
586
587 let batches = store
589 .scan(
590 "episodic",
591 ScanOptions {
592 filter: Some("id = 'ep-hot'".to_string()),
593 columns: Some(vec!["id".to_string(), "archived".to_string()]),
594 ..Default::default()
595 },
596 )
597 .await
598 .unwrap();
599 assert_eq!(batches.len(), 1);
600 let archived_col = batches[0]
601 .column_by_name("archived")
602 .unwrap()
603 .as_any()
604 .downcast_ref::<BooleanArray>()
605 .unwrap();
606 assert!(!archived_col.value(0));
607 }
608
609 #[tokio::test(flavor = "multi_thread")]
610 async fn archive_threshold_zero_skips_archival() {
611 let store = MemoryStore::new();
612 let thirty_days_ago = now_millis() - 30 * 24 * 3_600_000;
613
614 let episodes = vec![make_episode(
615 "ep-1",
616 0.01,
617 thirty_days_ago,
618 1.0,
619 1,
620 false,
621 "default",
622 "old",
623 )];
624 seed_episodes(&store, episodes).await;
625
626 let opts = LifecycleCompactOptions {
628 archive_threshold: 0.0,
629 optimize_indices: false,
630 ..Default::default()
631 };
632
633 let result = lifecycle_compact(&store, "episodic", &opts, None)
634 .await
635 .unwrap();
636
637 assert_eq!(result.episodes_archived, 0);
638 }
639
640 #[tokio::test(flavor = "multi_thread")]
641 async fn realm_isolated_archival() {
642 let store = MemoryStore::new();
643 let thirty_days_ago = now_millis() - 30 * 24 * 3_600_000;
644
645 let episodes = vec![
646 make_episode(
647 "ep-a",
648 0.05,
649 thirty_days_ago,
650 1.0,
651 1,
652 false,
653 "realm_a",
654 "a-content",
655 ),
656 make_episode(
657 "ep-b",
658 0.05,
659 thirty_days_ago,
660 1.0,
661 1,
662 false,
663 "realm_b",
664 "b-content",
665 ),
666 ];
667 seed_episodes(&store, episodes).await;
668
669 let opts = LifecycleCompactOptions {
671 archive_threshold: 0.1,
672 realm: Some("realm_a".to_string()),
673 optimize_indices: false,
674 ..Default::default()
675 };
676
677 let result = lifecycle_compact(&store, "episodic", &opts, None)
678 .await
679 .unwrap();
680
681 assert_eq!(result.episodes_archived, 1);
682
683 let batches = store
685 .scan(
686 "episodic",
687 ScanOptions {
688 filter: Some("id = 'ep-a'".to_string()),
689 columns: Some(vec!["archived".to_string()]),
690 ..Default::default()
691 },
692 )
693 .await
694 .unwrap();
695 let archived = batches[0]
696 .column_by_name("archived")
697 .unwrap()
698 .as_any()
699 .downcast_ref::<BooleanArray>()
700 .unwrap();
701 assert!(archived.value(0));
702
703 let batches = store
705 .scan(
706 "episodic",
707 ScanOptions {
708 filter: Some("id = 'ep-b'".to_string()),
709 columns: Some(vec!["archived".to_string()]),
710 ..Default::default()
711 },
712 )
713 .await
714 .unwrap();
715 let archived = batches[0]
716 .column_by_name("archived")
717 .unwrap()
718 .as_any()
719 .downcast_ref::<BooleanArray>()
720 .unwrap();
721 assert!(!archived.value(0));
722 }
723
724 struct TestSummarizer;
726
727 #[async_trait]
728 impl Summarizer for TestSummarizer {
729 async fn summarize(
730 &self,
731 episodes: &[RecordBatch],
732 ) -> Result<Vec<RecordBatch>, HirnDbError> {
733 let total_rows: usize = episodes.iter().map(|b| b.num_rows()).sum();
735
736 let schema = Arc::new(Schema::new(vec![
738 Field::new("id", DataType::Utf8, false),
739 Field::new("summary", DataType::Utf8, false),
740 ]));
741
742 let batch = RecordBatch::try_new(
743 schema,
744 vec![
745 Arc::new(StringArray::from(vec!["summary-1"])),
746 Arc::new(StringArray::from(vec![format!(
747 "Summary of {total_rows} episodes"
748 )])),
749 ],
750 )
751 .map_err(HirnDbError::from)?;
752
753 Ok(vec![batch])
754 }
755 }
756
757 #[tokio::test(flavor = "multi_thread")]
758 async fn summarizer_callback_invoked_for_cold_episodes() {
759 let store = MemoryStore::new();
760 let thirty_days_ago = now_millis() - 30 * 24 * 3_600_000;
761
762 let episodes = vec![
763 make_episode(
764 "ep-cold-1",
765 0.02,
766 thirty_days_ago,
767 1.0,
768 1,
769 false,
770 "default",
771 "c1",
772 ),
773 make_episode(
774 "ep-cold-2",
775 0.02,
776 thirty_days_ago,
777 1.0,
778 1,
779 false,
780 "default",
781 "c2",
782 ),
783 ];
784 seed_episodes(&store, episodes).await;
785
786 let summarizer = TestSummarizer;
787 let opts = LifecycleCompactOptions {
788 archive_threshold: 0.1,
789 summarize: true,
790 max_episodes_per_summary: 10,
791 optimize_indices: false,
792 ..Default::default()
793 };
794
795 let result = lifecycle_compact(&store, "episodic", &opts, Some(&summarizer))
796 .await
797 .unwrap();
798
799 assert_eq!(result.episodes_archived, 2);
800 assert_eq!(result.summaries_created, 1);
801
802 let sem_batches = store
804 .scan("semantic", ScanOptions::default())
805 .await
806 .unwrap();
807 assert!(!sem_batches.is_empty());
808 let total_rows: usize = sem_batches.iter().map(|b| b.num_rows()).sum();
809 assert_eq!(total_rows, 1);
810 }
811
812 #[tokio::test(flavor = "multi_thread")]
813 async fn summarize_false_skips_summarizer() {
814 let store = MemoryStore::new();
815 let thirty_days_ago = now_millis() - 30 * 24 * 3_600_000;
816
817 let episodes = vec![make_episode(
818 "ep-cold",
819 0.02,
820 thirty_days_ago,
821 1.0,
822 1,
823 false,
824 "default",
825 "content",
826 )];
827 seed_episodes(&store, episodes).await;
828
829 let summarizer = TestSummarizer;
830 let opts = LifecycleCompactOptions {
831 archive_threshold: 0.1,
832 summarize: false,
833 optimize_indices: false,
834 ..Default::default()
835 };
836
837 let result = lifecycle_compact(&store, "episodic", &opts, Some(&summarizer))
838 .await
839 .unwrap();
840
841 assert_eq!(result.episodes_archived, 1);
842 assert_eq!(result.summaries_created, 0);
843 }
844
845 #[tokio::test(flavor = "multi_thread")]
846 async fn non_archivable_dataset_skips_archival() {
847 let store = MemoryStore::new();
848
849 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
851 let batch =
852 RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(vec!["row-1"]))]).unwrap();
853 store.append("graph_nodes", batch).await.unwrap();
854
855 let opts = LifecycleCompactOptions {
856 archive_threshold: 0.5,
857 optimize_indices: false,
858 ..Default::default()
859 };
860
861 let result = lifecycle_compact(&store, "graph_nodes", &opts, None)
862 .await
863 .unwrap();
864
865 assert_eq!(result.episodes_archived, 0);
867 assert_eq!(result.summaries_created, 0);
868 }
869
870 #[tokio::test(flavor = "multi_thread")]
871 async fn retention_score_computation() {
872 let r = retention_score(0.0, 100.0, 5);
874 assert!((r - 1.0).abs() < 0.01);
875
876 let r = retention_score(24.0, 1.0, 1);
878 assert!(r < 0.01);
879
880 let r = retention_score(10.0, 0.0, 1);
882 assert_eq!(r, 0.0);
883 }
884
885 #[tokio::test(flavor = "multi_thread")]
886 async fn chunk_batches_respects_max_rows() {
887 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
888
889 let make_batch = |n: usize| {
890 let ids: Vec<String> = (0..n).map(|i| format!("id-{i}")).collect();
891 let refs: Vec<&str> = ids.iter().map(|s| s.as_str()).collect();
892 RecordBatch::try_new(schema.clone(), vec![Arc::new(StringArray::from(refs))]).unwrap()
893 };
894
895 let batches = vec![make_batch(3), make_batch(3), make_batch(3)];
896 let chunks = chunk_batches(&batches, 5);
897
898 assert_eq!(chunks.len(), 3);
900 }
901
902 #[tokio::test(flavor = "multi_thread")]
903 async fn already_archived_episodes_not_rearchived() {
904 let store = MemoryStore::new();
905 let thirty_days_ago = now_millis() - 30 * 24 * 3_600_000;
906
907 let episodes = vec![
908 make_episode(
910 "ep-already",
911 0.01,
912 thirty_days_ago,
913 1.0,
914 1,
915 true,
916 "default",
917 "old",
918 ),
919 make_episode(
921 "ep-hot",
922 0.9,
923 now_millis(),
924 100.0,
925 10,
926 false,
927 "default",
928 "hot",
929 ),
930 ];
931 seed_episodes(&store, episodes).await;
932
933 let opts = LifecycleCompactOptions {
934 archive_threshold: 0.05,
935 optimize_indices: false,
936 ..Default::default()
937 };
938
939 let result = lifecycle_compact(&store, "episodic", &opts, None)
940 .await
941 .unwrap();
942
943 assert_eq!(result.episodes_archived, 0);
944 }
945}