nornir 0.5.0

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
//! Warehouse **maintenance** — small-file COMPACTION + snapshot EXPIRY.
//!
//! Every warehouse write `fast_append`s a tiny Parquet data file and a new
//! snapshot. Left alone they accrete forever: a scan reads thousands of Parquet
//! footers (slow) and disk grows unbounded. iceberg-rust 0.9 ships no
//! `rewrite_data_files` / `expire_snapshots`, so both maintenance ops are
//! implemented here at the metadata layer through the same `RedbCatalog`
//! `commit_table` primitive skade's equality-delete writer uses.
//!
//! ## COMPACTION — [`compact_table`] (the `nornir warehouse compact` op)
//!
//! Per (table, partition): read the partition's rows, **sort them by the table's
//! declared [`SortOrder`]** (the data-skipping key — tight, non-overlapping
//! per-file bounds), rewrite them into a few **large** Parquet files with the
//! warehouse zstd codec, and commit a **true REPLACE snapshot**: a fresh `Data`
//! manifest holding only the new large files, carried by a manifest list that
//! **drops every old data manifest** (non-data manifests — equality deletes —
//! are carried forward). The head snapshot then references only the new files;
//! the old small data files are unreferenced by `main` and reclaimed by a
//! later [`expire_snapshots`] once the pre-compaction history snapshots age out.
//!
//! This is a TRUE replace at the table head (not append-then-supersede): the new
//! `main` snapshot does not reference any old data file. iceberg-rust 0.9's
//! `Transaction` only does `fast_append` (it rejects a manifest list that drops
//! files), so the replace is built at the metadata layer and committed via
//! `RedbCatalog::commit_table` with an optimistic `RefSnapshotIdMatch`
//! requirement — the same wire pieces an Iceberg-REST `commit_table` carries.
//!
//! ## EXPIRY — [`expire_snapshots`] (the `nornir warehouse expire` op)
//!
//! Retain the last `keep_last` snapshots (by timestamp) **and** every snapshot
//! within `keep_days` days, always keeping the current head. Remove the rest
//! from the catalog (`TableUpdate::RemoveSnapshots`), then compute the **live
//! file set** = every data file + manifest + manifest-list referenced by any
//! RETAINED snapshot, and delete every on-disk data/metadata file NOT in that
//! set. The live-set is computed from the retained snapshots' manifest lists, so
//! a file is deleted **only** when no retained snapshot references it — the
//! never-delete-a-referenced-file safety invariant.

use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};

use anyhow::{anyhow, Context, Result};
use arrow::array::{Array, RecordBatch, StringArray};
use arrow::compute::concat_batches;
use iceberg::spec::{
    DataContentType, DataFile, DataFileFormat, ManifestFile, ManifestListWriter,
    ManifestWriterBuilder, Operation, Snapshot, SnapshotReference, SnapshotRetention, Struct,
    Summary,
};
use iceberg::table::Table as IceTable;
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use iceberg::writer::file_writer::location_generator::{
    DefaultFileNameGenerator, DefaultLocationGenerator,
};
use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::{Catalog, TableIdent, TableRequirement, TableUpdate};
use skade::parquet::file::properties::WriterProperties;
use skade_katalog::RedbCatalog;

use super::iceberg::{
    warehouse_compression, warehouse_sort_cols, IcebergWarehouse,
};

// ---------------------------------------------------------------------------
// Reports — what the ops return (and what the JobRecord detail folds from).
// ---------------------------------------------------------------------------

/// Per-table outcome of a [`compact_table`] run.
#[derive(Debug, Clone, Default, serde::Serialize)]
pub struct CompactReport {
    pub table: String,
    /// Data files referenced by `main` BEFORE compaction.
    pub files_before: usize,
    /// Data files referenced by `main` AFTER compaction.
    pub files_after: usize,
    /// Rows rewritten (== rows in the table; row count is conserved).
    pub rows: u64,
    /// Partitions compacted (1 for an unpartitioned table).
    pub partitions: usize,
    /// `true` when nothing needed doing (no SortOrder, ≤1 file, or empty).
    pub skipped: bool,
}

/// Per-table outcome of an [`expire_snapshots`] run.
#[derive(Debug, Clone, Default, serde::Serialize)]
pub struct ExpireReport {
    pub table: String,
    pub snapshots_before: usize,
    pub snapshots_after: usize,
    /// Data + manifest + manifest-list files deleted from disk.
    pub files_deleted: usize,
    /// Bytes reclaimed from disk.
    pub bytes_reclaimed: u64,
}

// ---------------------------------------------------------------------------
// COMPACTION
// ---------------------------------------------------------------------------

/// Compact every table in the warehouse that declares a [`SortOrder`] (the
/// data-skipping key), or just `only_table` when `Some`. Returns one
/// [`CompactReport`] per table touched (including skipped ones).
pub fn compact_all(
    wh: &IcebergWarehouse,
    only_table: Option<&str>,
    only_repo: Option<&str>,
) -> Result<Vec<CompactReport>> {
    let tables: Vec<String> = match only_table {
        Some(t) => vec![t.to_string()],
        None => wh
            .table_names()?
            .into_iter()
            // Only the sorted, point-lookup tables carry a SortOrder worth
            // compacting; the rest write unsorted event logs (no skip win).
            .filter(|t| !warehouse_sort_cols(t).is_empty())
            .collect(),
    };
    let mut out = Vec::with_capacity(tables.len());
    for t in tables {
        out.push(compact_table(wh, &t, only_repo)?);
    }
    Ok(out)
}

/// Compact one table: rewrite its small per-partition data files into a few
/// large SORTED Parquet files and commit a true REPLACE snapshot. `only_repo`
/// restricts to a single `repo` partition (no-op for the others).
pub fn compact_table(
    wh: &IcebergWarehouse,
    table: &str,
    only_repo: Option<&str>,
) -> Result<CompactReport> {
    let catalog = wh.catalog().clone();
    let ident = wh.table_ident(table);
    let sort_cols = warehouse_sort_cols(table);
    wh.block_on(async move {
        compact_table_async(catalog.as_ref(), &ident, table, sort_cols, only_repo).await
    })
}

async fn compact_table_async(
    catalog: &RedbCatalog,
    ident: &TableIdent,
    table: &str,
    sort_cols: &[&str],
    only_repo: Option<&str>,
) -> Result<CompactReport> {
    let mut report = CompactReport { table: table.to_string(), skipped: true, ..Default::default() };
    if sort_cols.is_empty() {
        // No declared sort key — nothing to gain from rewriting.
        return Ok(report);
    }

    let tbl = catalog.load_table(ident).await?;
    let metadata = tbl.metadata();
    let Some(current) = metadata.current_snapshot() else {
        return Ok(report); // empty table, no snapshot.
    };
    let file_io = tbl.file_io().clone();

    // ── enumerate the head snapshot's manifests, splitting data vs non-data ──
    let manifest_list = current.load_manifest_list(&file_io, metadata).await?;
    let mut data_manifests: Vec<ManifestFile> = Vec::new();
    let mut nondata_manifests: Vec<ManifestFile> = Vec::new();
    let mut live_data_files: Vec<DataFile> = Vec::new();
    for mf in manifest_list.entries() {
        let manifest = mf.load_manifest(&file_io).await?;
        let mut has_data = false;
        for entry in manifest.entries() {
            if entry.content_type() == DataContentType::Data {
                has_data = true;
                live_data_files.push(entry.data_file().clone());
            }
        }
        if has_data {
            data_manifests.push(mf.clone());
        } else {
            nondata_manifests.push(mf.clone());
        }
    }
    report.files_before = live_data_files.len();

    // ── group the live rows by partition value (the `repo` identity col) ─────
    let part_col = partition_column(metadata);
    let batches = skade::read_all(&tbl).await?;
    let total_rows: u64 = batches.iter().map(|b| b.num_rows() as u64).sum();
    report.rows = total_rows;
    if live_data_files.len() <= 1 || total_rows == 0 {
        // Nothing to merge.
        return Ok(report);
    }

    let groups = group_by_partition(&batches, part_col.as_deref(), only_repo)?;
    if groups.is_empty() {
        return Ok(report);
    }

    // ── write one large SORTED Parquet file per partition group ──────────────
    let mut new_files: Vec<DataFile> = Vec::new();
    let mut compacted_partitions: HashSet<Option<String>> = HashSet::new();
    for (part_value, batch) in &groups {
        let sorted = super::iceberg::sort_batch_by_pub(batch, sort_cols)?;
        let files = write_one_file(&tbl, &sorted, part_value.as_deref()).await?;
        new_files.extend(files);
        compacted_partitions.insert(part_value.clone());
        report.partitions += 1;
    }

    // When restricted to one repo, the un-touched partitions' OLD data files
    // must be carried forward into the new head (they are still live). Re-add
    // every old data file whose partition we did NOT compact.
    let mut carried_old: Vec<DataFile> = Vec::new();
    if only_repo.is_some() {
        for df in &live_data_files {
            let pv = data_file_partition_value(df, part_col.as_deref());
            if !compacted_partitions.contains(&pv) {
                carried_old.push(df.clone());
            }
        }
    }

    if new_files.is_empty() {
        return Ok(report);
    }
    report.files_after = new_files.len() + carried_old.len();
    report.skipped = false;

    // ── build the REPLACE snapshot (new Data manifest, drop old data manifests) ──
    commit_replace_snapshot(
        catalog,
        &tbl,
        new_files,
        carried_old,
        nondata_manifests,
    )
    .await?;

    Ok(report)
}

/// Write `batch` to one (or, if it overflows the rolling target size, a few)
/// Parquet data file(s) via the iceberg-rust `DataFileWriter` — the SAME writer
/// path `skade::append_with` uses, so the resulting [`DataFile`]s carry the
/// per-column lower/upper bounds the manifest needs for file-skipping. Returns
/// the closed `DataFile`s (tagged with `part_value`'s partition key).
async fn write_one_file(
    tbl: &IceTable,
    batch: &RecordBatch,
    part_value: Option<&str>,
) -> Result<Vec<DataFile>> {
    let schema = tbl.metadata().current_schema().clone();
    let data_location = format!("{}/data", tbl.metadata().location());
    let location_gen = DefaultLocationGenerator::with_data_location(data_location);
    let file_name_gen =
        DefaultFileNameGenerator::new(unique_prefix(), Some("compact".into()), DataFileFormat::Parquet);

    let props = WriterProperties::builder()
        .set_compression(warehouse_compression())
        .build();
    let pw = ParquetWriterBuilder::new(props, schema.clone());
    let rolling = RollingFileWriterBuilder::new_with_default_file_size(
        pw,
        tbl.file_io().clone(),
        location_gen,
        file_name_gen,
    );

    // Derive the partition key for this group (identity partition over `repo`).
    let partition = match part_value {
        Some(v) => Some(partition_key_for_value(tbl, v)?),
        None => None,
    };
    let mut writer = DataFileWriterBuilder::new(rolling).build(partition).await?;
    writer.write(batch.clone()).await?;
    let files = writer.close().await?;
    Ok(files)
}

/// Commit a `Replace` snapshot whose manifest list = a fresh Data manifest of
/// `new_files` (+ `carried_old`) + every carried-forward `nondata_manifests`.
/// The old data manifests are intentionally NOT carried, so the new head
/// references only the compacted files. Optimistic: requires `main` still points
/// at the snapshot we read.
async fn commit_replace_snapshot(
    catalog: &RedbCatalog,
    tbl: &IceTable,
    new_files: Vec<DataFile>,
    carried_old: Vec<DataFile>,
    nondata_manifests: Vec<ManifestFile>,
) -> Result<()> {
    let metadata = tbl.metadata();
    let schema = metadata.current_schema().clone();
    let parent = metadata.current_snapshot();
    let parent_id = parent.map(|s| s.snapshot_id());
    let new_seq = metadata.last_sequence_number() + 1;
    let snapshot_id = fresh_snapshot_id(parent_id);
    let location = metadata.location().to_string();
    let file_io = tbl.file_io().clone();

    // 1. a new Data manifest with the compacted (+ carried-old) files, Added.
    let manifest_path = format!("{location}/metadata/compact-m-{snapshot_id}.avro");
    let mut mw = ManifestWriterBuilder::new(
        file_io.new_output(&manifest_path)?,
        Some(snapshot_id),
        None,
        schema.clone(),
        metadata.default_partition_spec().as_ref().clone(),
    )
    .build_v2_data();
    let mut added_rows: u64 = 0;
    for df in new_files.into_iter().chain(carried_old.into_iter()) {
        added_rows += df.record_count();
        mw.add_file(df, new_seq)?;
    }
    let data_manifest: ManifestFile = mw.write_manifest_file().await?;

    // 2. manifest list = the new data manifest + carried-forward non-data ones.
    //    (Old DATA manifests are dropped — that is the replace.)
    let list_path = format!("{location}/metadata/snap-{snapshot_id}.avro");
    let mut lw =
        ManifestListWriter::v2(file_io.new_output(&list_path)?, snapshot_id, parent_id, new_seq);
    lw.add_manifests(std::iter::once(data_manifest))?;
    lw.add_manifests(nondata_manifests.into_iter())?;
    lw.close().await?;

    // 3. the Replace snapshot + commit through the catalog (optimistic on main).
    let snapshot = Snapshot::builder()
        .with_snapshot_id(snapshot_id)
        .with_parent_snapshot_id(parent_id)
        .with_sequence_number(new_seq)
        .with_timestamp_ms(now_ms())
        .with_manifest_list(list_path)
        .with_row_range(metadata.next_row_id(), added_rows)
        .with_schema_id(schema.schema_id())
        .with_summary(Summary {
            operation: Operation::Replace,
            additional_properties: HashMap::from([(
                "nornir.compacted".to_string(),
                "true".to_string(),
            )]),
        })
        .build();

    let requirements = vec![TableRequirement::RefSnapshotIdMatch {
        r#ref: "main".into(),
        snapshot_id: parent_id,
    }];
    let updates = vec![
        TableUpdate::AddSnapshot { snapshot },
        TableUpdate::SetSnapshotRef {
            ref_name: "main".into(),
            reference: SnapshotReference {
                snapshot_id,
                retention: SnapshotRetention::Branch {
                    min_snapshots_to_keep: None,
                    max_snapshot_age_ms: None,
                    max_ref_age_ms: None,
                },
            },
        },
    ];
    catalog
        .commit_table(tbl.identifier().clone(), requirements, updates)
        .await
        .context("commit compaction replace snapshot")?;
    Ok(())
}

// ---------------------------------------------------------------------------
// EXPIRY
// ---------------------------------------------------------------------------

/// Default number of most-recent snapshots to retain when `--keep-last` is unset.
pub const DEFAULT_KEEP_LAST: usize = 10;

/// Expire old snapshots across every table (or just `only_table`), retaining the
/// last `keep_last` snapshots and every snapshot within `keep_days` days, then
/// deleting the now-unreferenced data/manifest files from disk.
pub fn expire_all(
    wh: &IcebergWarehouse,
    only_table: Option<&str>,
    keep_last: usize,
    keep_days: Option<u64>,
) -> Result<Vec<ExpireReport>> {
    let tables: Vec<String> = match only_table {
        Some(t) => vec![t.to_string()],
        None => wh.table_names()?,
    };
    let mut out = Vec::with_capacity(tables.len());
    for t in tables {
        out.push(expire_snapshots(wh, &t, keep_last, keep_days)?);
    }
    Ok(out)
}

/// Expire one table's old snapshots + GC its now-orphaned files.
pub fn expire_snapshots(
    wh: &IcebergWarehouse,
    table: &str,
    keep_last: usize,
    keep_days: Option<u64>,
) -> Result<ExpireReport> {
    let catalog = wh.catalog().clone();
    let ident = wh.table_ident(table);
    let root = wh.root().to_path_buf();
    let table = table.to_string();
    wh.block_on(async move {
        expire_snapshots_async(catalog.as_ref(), &ident, &table, &root, keep_last, keep_days).await
    })
}

async fn expire_snapshots_async(
    catalog: &RedbCatalog,
    ident: &TableIdent,
    table: &str,
    root: &Path,
    keep_last: usize,
    keep_days: Option<u64>,
) -> Result<ExpireReport> {
    let mut report = ExpireReport { table: table.to_string(), ..Default::default() };

    let tbl = catalog.load_table(ident).await?;
    let metadata = tbl.metadata();
    let file_io = tbl.file_io().clone();

    // Snapshots newest-first.
    let mut snaps: Vec<&Snapshot> = metadata.snapshots().map(|s| s.as_ref()).collect();
    snaps.sort_by_key(|s| std::cmp::Reverse(s.timestamp_ms()));
    report.snapshots_before = snaps.len();
    if snaps.is_empty() {
        return Ok(report);
    }

    let keep_last = keep_last.max(1); // always keep at least the current head.
    let cutoff_ms = keep_days.map(|d| now_ms() - (d as i64) * 86_400_000);
    let current_id = metadata.current_snapshot_id();

    // RETAIN: the newest `keep_last`, anything newer than the day-cutoff, and the
    // current head (never expire the snapshot `main` points at).
    let mut retained_ids: HashSet<i64> = HashSet::new();
    for (i, s) in snaps.iter().enumerate() {
        let within_n = i < keep_last;
        let within_days = cutoff_ms.map(|c| s.timestamp_ms() >= c).unwrap_or(false);
        let is_current = Some(s.snapshot_id()) == current_id;
        if within_n || within_days || is_current {
            retained_ids.insert(s.snapshot_id());
        }
    }
    let remove_ids: Vec<i64> = snaps
        .iter()
        .map(|s| s.snapshot_id())
        .filter(|id| !retained_ids.contains(id))
        .collect();
    report.snapshots_after = retained_ids.len();
    if remove_ids.is_empty() {
        return Ok(report); // nothing aged out.
    }

    // ── compute the LIVE file set = files referenced by any RETAINED snapshot ──
    // (data files, manifests, AND manifest lists). Anything else under
    // data/ + metadata/ that we wrote is an orphan once the removal commits.
    let mut live: HashSet<String> = HashSet::new();
    for s in snaps.iter().filter(|s| retained_ids.contains(&s.snapshot_id())) {
        live.insert(canon(s.manifest_list()));
        let ml = s.load_manifest_list(&file_io, metadata).await?;
        for mf in ml.entries() {
            live.insert(canon(&mf.manifest_path));
            let manifest = mf.load_manifest(&file_io).await?;
            for entry in manifest.entries() {
                live.insert(canon(entry.data_file().file_path()));
            }
        }
    }
    // Keep every metadata.json version pointer too (never GC the catalog's own
    // table-metadata blobs — only data files, manifests, manifest lists).
    // We scope deletion to data/ + the snap/manifest .avro files in metadata/.

    // ── commit the snapshot removal through the catalog FIRST (atomic) ──
    catalog
        .commit_table(
            ident.clone(),
            Vec::new(),
            vec![TableUpdate::RemoveSnapshots { snapshot_ids: remove_ids.clone() }],
        )
        .await
        .context("commit snapshot removal")?;

    // ── only AFTER the catalog no longer references them, delete orphan files ──
    let (deleted, bytes) = gc_orphans(root, table, &live)?;
    report.files_deleted = deleted;
    report.bytes_reclaimed = bytes;
    Ok(report)
}

/// Delete every data file + `*.avro` manifest/manifest-list under the table's
/// `data/` and `metadata/` dirs that is NOT in `live`. `metadata/*.json`
/// (table-metadata version blobs) are never touched — only data + avro orphans.
fn gc_orphans(root: &Path, table: &str, live: &HashSet<String>) -> Result<(usize, u64)> {
    let root = root.canonicalize().unwrap_or_else(|_| root.to_path_buf());
    let table_dir = root.join("warehouse").join("nornir").join(table);
    let mut deleted = 0usize;
    let mut bytes = 0u64;

    // data/ — every Parquet data file.
    let mut candidates: Vec<PathBuf> = Vec::new();
    collect_files(&table_dir.join("data"), &mut candidates, &["parquet"]);
    // metadata/ — manifest + manifest-list avro only (NOT *.json metadata blobs).
    collect_files(&table_dir.join("metadata"), &mut candidates, &["avro"]);

    for path in candidates {
        let key = canon(&path.to_string_lossy());
        if live.contains(&key) {
            continue; // still referenced by a retained snapshot — keep.
        }
        let size = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
        std::fs::remove_file(&path)
            .with_context(|| format!("delete orphan {}", path.display()))?;
        deleted += 1;
        bytes += size;
    }
    Ok((deleted, bytes))
}

// ---------------------------------------------------------------------------
// helpers
// ---------------------------------------------------------------------------

/// The single identity-partition column name (e.g. `repo`), or `None` for an
/// unpartitioned table. The warehouse only uses single-column identity specs.
fn partition_column(metadata: &iceberg::spec::TableMetadata) -> Option<String> {
    let spec = metadata.default_partition_spec();
    let f = spec.fields().first()?;
    let schema = metadata.current_schema();
    schema.field_by_id(f.source_id).map(|nf| nf.name.clone())
}

/// Group `batches` into one merged `RecordBatch` per distinct value of the
/// partition column. `None` partition col ⇒ one group, key `None`. `only_repo`
/// restricts to rows whose partition value matches (the others are dropped from
/// the rewrite and carried forward as old files by the caller).
fn group_by_partition(
    batches: &[RecordBatch],
    part_col: Option<&str>,
    only_repo: Option<&str>,
) -> Result<Vec<(Option<String>, RecordBatch)>> {
    if batches.is_empty() {
        return Ok(Vec::new());
    }
    let schema = batches[0].schema();

    let Some(col) = part_col else {
        // Unpartitioned: one big group.
        let merged = concat_batches(&schema, batches)?;
        if merged.num_rows() == 0 {
            return Ok(Vec::new());
        }
        return Ok(vec![(None, merged)]);
    };

    // Bucket row indices per partition value, then take() each bucket.
    use arrow::compute::take;
    let mut order: Vec<String> = Vec::new();
    let mut buckets: HashMap<String, Vec<RecordBatch>> = HashMap::new();
    for b in batches {
        let arr = b
            .column_by_name(col)
            .ok_or_else(|| anyhow!("partition column `{col}` missing from batch"))?;
        let sa = arr
            .as_any()
            .downcast_ref::<StringArray>()
            .ok_or_else(|| anyhow!("partition column `{col}` is not Utf8"))?;
        // Per-value row index lists.
        let mut per_value: HashMap<String, Vec<u32>> = HashMap::new();
        for i in 0..b.num_rows() {
            let v = if sa.is_null(i) { String::new() } else { sa.value(i).to_string() };
            if let Some(want) = only_repo {
                if v != want {
                    continue;
                }
            }
            per_value.entry(v).or_default().push(i as u32);
        }
        for (v, idxs) in per_value {
            let idx = arrow::array::UInt32Array::from(idxs);
            let cols: Vec<_> = b
                .columns()
                .iter()
                .map(|c| take(c.as_ref(), &idx, None))
                .collect::<std::result::Result<_, _>>()?;
            let part_batch = RecordBatch::try_new(b.schema(), cols)?;
            if !buckets.contains_key(&v) {
                order.push(v.clone());
            }
            buckets.entry(v).or_default().push(part_batch);
        }
    }

    let mut out = Vec::with_capacity(order.len());
    for v in order {
        let parts = buckets.remove(&v).unwrap_or_default();
        if parts.is_empty() {
            continue;
        }
        let merged = concat_batches(&schema, &parts)?;
        if merged.num_rows() > 0 {
            out.push((Some(v), merged));
        }
    }
    Ok(out)
}

/// The string partition value of an existing `DataFile` for `part_col`, read from
/// its partition `Struct` (identity transform ⇒ the literal column value).
fn data_file_partition_value(df: &DataFile, part_col: Option<&str>) -> Option<String> {
    part_col?;
    let p: &Struct = df.partition();
    // Single-column identity spec ⇒ the first (only) literal is the value.
    p.iter().next().flatten().map(|lit| literal_to_string(lit))
}

fn literal_to_string(lit: &iceberg::spec::Literal) -> String {
    use iceberg::spec::{Literal, PrimitiveLiteral};
    match lit {
        Literal::Primitive(PrimitiveLiteral::String(s)) => s.clone(),
        other => format!("{other:?}"),
    }
}

/// Build a single-column identity [`PartitionKey`] for `value` against `tbl`.
fn partition_key_for_value(
    tbl: &IceTable,
    value: &str,
) -> Result<iceberg::spec::PartitionKey> {
    use iceberg::spec::{Literal, PartitionKey, Struct};
    let spec = tbl.metadata().default_partition_spec().as_ref().clone();
    let schema = tbl.metadata().current_schema().clone();
    let lit = Some(Literal::string(value));
    Ok(PartitionKey::new(spec, schema, Struct::from_iter(std::iter::once(lit))))
}

/// A fresh, monotonic-ish snapshot id distinct from `parent` (wall-clock ns).
fn fresh_snapshot_id(parent: Option<i64>) -> i64 {
    let ns = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .map(|d| d.as_nanos() as i64)
        .unwrap_or(1)
        .max(1);
    if Some(ns) == parent { ns + 1 } else { ns }
}

fn now_ms() -> i64 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .map(|d| d.as_millis() as i64)
        .unwrap_or(0)
}

/// A unique file-name prefix (matches skade's `unique_prefix` convention).
fn unique_prefix() -> String {
    format!("{}", uuid::Uuid::new_v4().simple())
}

/// Normalize a path/URI to a comparable key — strips a leading `file://`, then
/// canonicalizes when it resolves on disk (so `file://`, absolute, and symlinked
/// forms of the same data file collapse to one key).
fn canon(p: &str) -> String {
    let stripped = p.strip_prefix("file://").unwrap_or(p);
    match std::fs::canonicalize(stripped) {
        Ok(c) => c.to_string_lossy().to_string(),
        Err(_) => stripped.to_string(),
    }
}

/// Recursively collect files under `dir` whose extension is in `exts`.
fn collect_files(dir: &Path, out: &mut Vec<PathBuf>, exts: &[&str]) {
    let Ok(rd) = std::fs::read_dir(dir) else { return };
    for entry in rd.flatten() {
        let p = entry.path();
        if p.is_dir() {
            collect_files(&p, out, exts);
        } else if let Some(ext) = p.extension().and_then(|e| e.to_str()) {
            if exts.contains(&ext) {
                out.push(p);
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::knowledge::symbols::{SymbolRow, SymbolScan};
    use crate::warehouse::iceberg::{IcebergWarehouse, TABLE_SYMBOL_FACTS};
    use crate::warehouse::Warehouse;

    // ── fixture helpers ──────────────────────────────────────────────────────

    /// One symbol scan for `repo` with the named functions (in the given,
    /// deliberately-UNSORTED order so a missing sort is observable). Each append
    /// writes its own tiny Parquet data file + snapshot.
    fn mk_scan(repo: &str, names: &[&str]) -> SymbolScan {
        SymbolScan {
            snapshot_id: uuid::Uuid::new_v4(),
            ts: chrono::Utc::now(),
            repo: repo.into(),
            symbols: names
                .iter()
                .enumerate()
                .map(|(i, name)| SymbolRow {
                    crate_name: repo.into(),
                    module_path: format!("{repo}::m"),
                    item_kind: "fn".into(),
                    item_name: (*name).into(),
                    visibility: "pub".into(),
                    file: format!("src/{}.rs", if i % 2 == 0 { "zeta" } else { "alpha" }),
                    line: i as u32,
                    doc_lines: 0,
                    signature: Some(format!("fn {name}()")),
                })
                .collect(),
            ..Default::default()
        }
    }

    /// Count Parquet data files physically present under the table's `data/` dir.
    fn count_data_files_on_disk(root: &Path, table: &str) -> usize {
        let root = root.canonicalize().unwrap_or_else(|_| root.to_path_buf());
        let dir = root.join("warehouse").join("nornir").join(table).join("data");
        let mut v = Vec::new();
        collect_files(&dir, &mut v, &["parquet"]);
        v.len()
    }

    /// Count `*.avro` manifest/manifest-list files under the table's metadata dir.
    fn count_avro_files_on_disk(root: &Path, table: &str) -> usize {
        let root = root.canonicalize().unwrap_or_else(|_| root.to_path_buf());
        let dir = root.join("warehouse").join("nornir").join(table).join("metadata");
        let mut v = Vec::new();
        collect_files(&dir, &mut v, &["avro"]);
        v.len()
    }

    /// The canonicalized set of all files (data + manifests + manifest-lists) the
    /// `keep` newest snapshots reference — the live set the GC must preserve.
    fn retained_file_set(wh: &IcebergWarehouse, table: &str, keep: usize) -> HashSet<String> {
        let ident = wh.table_ident(table);
        wh.block_on(async {
            let tbl = wh.catalog().load_table(&ident).await.unwrap();
            let md = tbl.metadata();
            let fio = tbl.file_io().clone();
            let mut snaps: Vec<_> = md.snapshots().map(|s| s.as_ref()).collect();
            snaps.sort_by_key(|s| std::cmp::Reverse(s.timestamp_ms()));
            let mut out = HashSet::new();
            for s in snaps.iter().take(keep) {
                out.insert(canon(s.manifest_list()));
                let ml = s.load_manifest_list(&fio, md).await.unwrap();
                for mf in ml.entries() {
                    out.insert(canon(&mf.manifest_path));
                    let m = mf.load_manifest(&fio).await.unwrap();
                    for e in m.entries() {
                        out.insert(canon(e.data_file().file_path()));
                    }
                }
            }
            out
        })
    }

    /// The set of `(repo, file, item_name)` rows the table currently reads back.
    fn read_rows(wh: &IcebergWarehouse, table: &str) -> Vec<(String, String, String)> {
        let batches = wh.scan_arrow(table).unwrap();
        let mut rows = Vec::new();
        for b in &batches {
            let repo = b.column_by_name("repo").unwrap().as_any().downcast_ref::<StringArray>().unwrap();
            let file = b.column_by_name("file").unwrap().as_any().downcast_ref::<StringArray>().unwrap();
            let name = b.column_by_name("item_name").unwrap().as_any().downcast_ref::<StringArray>().unwrap();
            for i in 0..b.num_rows() {
                rows.push((repo.value(i).to_string(), file.value(i).to_string(), name.value(i).to_string()));
            }
        }
        rows
    }

    /// Data files referenced by the table's CURRENT (head) snapshot manifest.
    fn head_data_files(wh: &IcebergWarehouse, table: &str) -> Vec<String> {
        let ident = wh.table_ident(table);
        wh.block_on(async {
            let tbl = wh.catalog().load_table(&ident).await.unwrap();
            let md = tbl.metadata();
            let cur = md.current_snapshot().unwrap();
            let fio = tbl.file_io().clone();
            let ml = cur.load_manifest_list(&fio, md).await.unwrap();
            let mut out = Vec::new();
            for mf in ml.entries() {
                let m = mf.load_manifest(&fio).await.unwrap();
                for e in m.entries() {
                    if e.content_type() == DataContentType::Data {
                        out.push(e.data_file().file_path().to_string());
                    }
                }
            }
            out
        })
    }

    fn snapshot_count(wh: &IcebergWarehouse, table: &str) -> usize {
        let ident = wh.table_ident(table);
        wh.block_on(async {
            wh.catalog().load_table(&ident).await.unwrap().metadata().snapshots().len()
        })
    }

    // ── COMPACTION ───────────────────────────────────────────────────────────

    #[test]
    fn compact_merges_small_files_preserving_rows_sortedness_and_tightens_bounds() {
        use skade::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
        use skade::parquet::file::reader::FileReader;
        use skade::parquet::file::serialized_reader::SerializedFileReader;

        let dir = tempfile::tempdir().unwrap();
        let root = dir.path().to_path_buf();
        let wh = IcebergWarehouse::open(&root).unwrap();

        // N tiny appends across two repos → many small files + many snapshots.
        wh.append_symbol_scan(&mk_scan("facett", &["mango", "apple", "zulu"])).unwrap();
        wh.append_symbol_scan(&mk_scan("facett", &["delta", "banana"])).unwrap();
        wh.append_symbol_scan(&mk_scan("nordisk", &["sierra", "alpha"])).unwrap();
        wh.append_symbol_scan(&mk_scan("nordisk", &["tango", "bravo", "kilo"])).unwrap();
        wh.append_symbol_scan(&mk_scan("facett", &["echo"])).unwrap();

        let before_head = head_data_files(&wh, TABLE_SYMBOL_FACTS).len();
        let before_rows = read_rows(&wh, TABLE_SYMBOL_FACTS);
        assert!(before_head >= 5, "many small head data files before compaction (got {before_head})");
        assert!(!before_rows.is_empty(), "RAGNARÖK: fixture has rows");

        // Compact.
        let reports = compact_all(&wh, Some(TABLE_SYMBOL_FACTS), None).unwrap();
        let rep = &reports[0];
        assert!(!rep.skipped, "compaction ran (not skipped)");

        // (a) FILE COUNT DROPPED — the head now references far fewer data files.
        let after_head = head_data_files(&wh, TABLE_SYMBOL_FACTS);
        assert!(
            after_head.len() < before_head,
            "head data file count dropped: {before_head} -> {}",
            after_head.len(),
        );
        // Two repos = two identity partitions ⇒ at most 2 compacted files.
        assert!(after_head.len() <= 2, "compacted to ≤2 files (one per repo partition), got {}", after_head.len());
        assert_eq!(rep.files_before, before_head);
        assert_eq!(rep.files_after, after_head.len());

        // (b) ROW COUNT + CONTENT IDENTICAL — read back, compare as multisets.
        let after_rows = read_rows(&wh, TABLE_SYMBOL_FACTS);
        assert_eq!(after_rows.len(), before_rows.len(), "row count conserved");
        let mut b = before_rows.clone();
        let mut a = after_rows.clone();
        b.sort();
        a.sort();
        assert_eq!(a, b, "content identical after compaction (multiset equal)");
        assert_eq!(rep.rows, before_rows.len() as u64);

        // (c) DATA STILL SORTED by the SortOrder within each compacted file, AND
        // (d) TIGHTER BOUNDS — assert the new files have non-overlapping repo
        // ranges (each compacted file is a single repo partition).
        let mut file_repo_ranges: Vec<(String, String)> = Vec::new();
        for path in &after_head {
            let p = path.strip_prefix("file://").unwrap_or(path);
            // sortedness within the file
            let mut prev: Option<(String, String)> = None;
            let f = std::fs::File::open(p).unwrap();
            let mut rb = ParquetRecordBatchReaderBuilder::try_new(f).unwrap().build().unwrap();
            while let Some(batch) = rb.next() {
                let batch = batch.unwrap();
                let file_col = batch.column_by_name("file").unwrap().as_any().downcast_ref::<StringArray>().unwrap();
                let name_col = batch.column_by_name("item_name").unwrap().as_any().downcast_ref::<StringArray>().unwrap();
                for i in 0..batch.num_rows() {
                    let key = (file_col.value(i).to_string(), name_col.value(i).to_string());
                    if let Some(pv) = &prev {
                        assert!(pv <= &key, "compacted file rows sorted by (file,item_name): {pv:?} !<= {key:?}");
                    }
                    prev = Some(key);
                }
            }
            // per-file repo min/max bounds (the manifest skip bounds)
            let f2 = std::fs::File::open(p).unwrap();
            let reader = SerializedFileReader::new(f2).unwrap();
            let meta = reader.metadata();
            let (mut lo, mut hi): (Option<String>, Option<String>) = (None, None);
            for rg in 0..meta.num_row_groups() {
                for col in meta.row_group(rg).columns() {
                    if col.column_path().string() == "repo" {
                        let s = col.statistics().expect("repo stats present (manifest bounds)");
                        let min = String::from_utf8_lossy(s.min_bytes_opt().unwrap()).to_string();
                        let max = String::from_utf8_lossy(s.max_bytes_opt().unwrap()).to_string();
                        lo = Some(lo.map_or(min.clone(), |x| x.min(min)));
                        hi = Some(hi.map_or(max.clone(), |x| x.max(max)));
                    }
                }
            }
            let (lo, hi) = (lo.unwrap(), hi.unwrap());
            // tight: a single-repo file has repo min == max (degenerate-tight range).
            assert_eq!(lo, hi, "compacted file is a single repo partition ⇒ tight repo bound [{lo},{hi}]");
            file_repo_ranges.push((lo, hi));
        }
        // (d) NON-OVERLAPPING across files: the two repos land in distinct files.
        let distinct: HashSet<_> = file_repo_ranges.iter().map(|(lo, _)| lo.clone()).collect();
        assert_eq!(distinct.len(), after_head.len(), "each compacted file covers a distinct repo (non-overlapping bounds)");

        eprintln!(
            "[compact] symbol_facts: {before_head}{} head data file(s), {} row(s) preserved, sorted, tight per-repo bounds {file_repo_ranges:?}",
            after_head.len(),
            after_rows.len(),
        );

        // Sensitivity note: dropping the manifest-list "drop old data manifests"
        // step would leave files_after >= files_before and fail assertion (a).
        let _ = count_data_files_on_disk; // (used by the expiry test)
    }

    // ── EXPIRY ───────────────────────────────────────────────────────────────

    #[test]
    fn expire_keeps_last_n_and_gcs_orphans_without_touching_retained_files() {
        let dir = tempfile::tempdir().unwrap();
        let root = dir.path().to_path_buf();
        let wh = IcebergWarehouse::open(&root).unwrap();

        // K appends ⇒ K snapshots (single repo so every file is the same partition).
        for batch in [
            &["a1", "a2"][..],
            &["b1"][..],
            &["c1", "c2", "c3"][..],
            &["d1"][..],
            &["e1", "e2"][..],
        ] {
            wh.append_symbol_scan(&mk_scan("facett", batch)).unwrap();
        }
        let snaps_before = snapshot_count(&wh, TABLE_SYMBOL_FACTS);
        assert!(snaps_before >= 5, "K snapshots created (got {snaps_before})");

        // Files referenced by the snapshots we will RETAIN (the last 2). Capturing
        // them lets us prove a retained file is NEVER deleted.
        // The complete set of files (data + manifests + manifest-lists) the 2
        // newest snapshots reference — these MUST survive the GC untouched.
        // NOTE: an append-only table's HEAD references EVERY accumulated data
        // file (fast_append carries prior manifests forward), so expiry here
        // reclaims the stale per-snapshot manifest-LISTs + orphaned manifest
        // avros, NOT data parquet — data reclaim needs a prior compaction (see
        // `compact_then_expire_reclaims_superseded_data_files`). This is the
        // correct, safe Iceberg semantics.
        let live_before = retained_file_set(&wh, TABLE_SYMBOL_FACTS, 2);
        let avro_before = count_avro_files_on_disk(&root, TABLE_SYMBOL_FACTS);

        // Expire: keep the last 2 snapshots.
        let reports = expire_all(&wh, Some(TABLE_SYMBOL_FACTS), 2, None).unwrap();
        let rep = reports.iter().find(|r| r.table == TABLE_SYMBOL_FACTS).unwrap();

        // Only 2 snapshots remain.
        let snaps_after = snapshot_count(&wh, TABLE_SYMBOL_FACTS);
        assert_eq!(snaps_after, 2, "exactly 2 snapshots retained (was {snaps_before})");
        assert_eq!(rep.snapshots_after, 2);
        assert!(rep.snapshots_before >= 5);

        // The current snapshot still reads correctly (RAGNARÖK: non-empty + the
        // newest rows present + EVERY appended row still there — the head carries
        // all data forward).
        let rows = read_rows(&wh, TABLE_SYMBOL_FACTS);
        assert!(!rows.is_empty(), "current snapshot reads non-empty after expiry");
        assert!(rows.iter().any(|(_, _, n)| n == "e1"), "head snapshot's newest rows still readable");
        assert!(rows.iter().any(|(_, _, n)| n == "a1"), "head still carries the oldest appended rows");

        // Orphans were deleted: the stale manifest-list/manifest avro count dropped.
        let avro_after = count_avro_files_on_disk(&root, TABLE_SYMBOL_FACTS);
        assert!(rep.files_deleted > 0, "deleted some orphan files");
        assert!(
            avro_after < avro_before,
            "stale manifest/manifest-list avros GC'd: {avro_before} -> {avro_after}",
        );

        // SAFETY INVARIANT: every file the retained snapshots reference STILL
        // exists on disk (no retained-snapshot file was deleted).
        for f in &live_before {
            assert!(Path::new(f).exists(), "retained-snapshot file was wrongly deleted: {f}");
        }

        eprintln!(
            "[expire] symbol_facts: {snaps_before}{snaps_after} snapshot(s); deleted {} orphan avro/file(s) ({}{} avros on disk), {} byte(s) reclaimed; all {} retained files intact",
            rep.files_deleted, avro_before, avro_after, rep.bytes_reclaimed, live_before.len(),
        );
    }

    /// The realistic DATA-reclaim path: compact (replace snapshot drops old data
    /// manifests) → expire (removes the pre-compaction snapshots) → the old small
    /// data files are no longer referenced by any retained snapshot and are GC'd.
    /// Proves (a) data parquet files drop on disk, (b) row content survives, and
    /// (c) no retained-snapshot file was deleted.
    #[test]
    fn compact_then_expire_reclaims_superseded_data_files() {
        let dir = tempfile::tempdir().unwrap();
        let root = dir.path().to_path_buf();
        let wh = IcebergWarehouse::open(&root).unwrap();

        for batch in [&["a1", "a2"][..], &["b1"][..], &["c1", "c2"][..], &["d1"][..]] {
            wh.append_symbol_scan(&mk_scan("facett", batch)).unwrap();
        }
        let rows_before = read_rows(&wh, TABLE_SYMBOL_FACTS);
        let data_before = count_data_files_on_disk(&root, TABLE_SYMBOL_FACTS);
        assert!(data_before >= 4, "many small data files before (got {data_before})");

        // Compact ⇒ a Replace head referencing 1 file; old files now only live in
        // the superseded history snapshots.
        compact_all(&wh, Some(TABLE_SYMBOL_FACTS), None).unwrap();
        // Expire down to the current (compacted) snapshot only ⇒ the superseded
        // history (with the old data manifests) is removed, orphaning old files.
        let reports = expire_all(&wh, Some(TABLE_SYMBOL_FACTS), 1, None).unwrap();
        let rep = reports.iter().find(|r| r.table == TABLE_SYMBOL_FACTS).unwrap();

        // The retained head's files (post-compaction) must survive.
        let live = retained_file_set(&wh, TABLE_SYMBOL_FACTS, 1);

        // (a) data parquet files dropped — the small pre-compaction files are gone.
        let data_after = count_data_files_on_disk(&root, TABLE_SYMBOL_FACTS);
        assert!(
            data_after < data_before,
            "data parquet files reclaimed: {data_before} -> {data_after}",
        );
        assert!(rep.files_deleted > 0 && rep.bytes_reclaimed > 0, "disk reclaimed");

        // (b) content survives identically (multiset equal).
        let rows_after = read_rows(&wh, TABLE_SYMBOL_FACTS);
        let (mut a, mut b) = (rows_after.clone(), rows_before.clone());
        a.sort();
        b.sort();
        assert_eq!(a, b, "row content identical through compact+expire");

        // (c) no retained file deleted.
        for f in &live {
            assert!(Path::new(f).exists(), "retained file wrongly deleted: {f}");
        }
        eprintln!(
            "[compact+expire] symbol_facts: data files {data_before}{data_after} on disk, {} byte(s) reclaimed, {} rows preserved, retained files intact",
            rep.bytes_reclaimed, rows_after.len(),
        );
    }

    #[test]
    fn expire_keeps_all_when_under_threshold() {
        let dir = tempfile::tempdir().unwrap();
        let root = dir.path().to_path_buf();
        let wh = IcebergWarehouse::open(&root).unwrap();
        wh.append_symbol_scan(&mk_scan("facett", &["a"])).unwrap();
        wh.append_symbol_scan(&mk_scan("facett", &["b"])).unwrap();
        let before = snapshot_count(&wh, TABLE_SYMBOL_FACTS);
        let reports = expire_all(&wh, Some(TABLE_SYMBOL_FACTS), 10, None).unwrap();
        let rep = reports.iter().find(|r| r.table == TABLE_SYMBOL_FACTS).unwrap();
        assert_eq!(rep.files_deleted, 0, "nothing aged out under keep-last=10");
        assert_eq!(snapshot_count(&wh, TABLE_SYMBOL_FACTS), before, "all snapshots retained");
    }
}