infino 0.1.0

A fast retrieval engine that stores data on object storage and runs SQL, full-text search, and vector search over it from a single system — search-on-Parquet.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Infino Authors

//! Shared machinery for the search TVFs' custom `ExecutionPlan`s.
//!
//! All search TVFs (`vector_search`, `bm25_search`,
//! `bm25_search_prefix`, ...) produce a `Vec<SuperfileHit>` from a
//! kernel and then face the same two jobs:
//!
//!   1. **Resolve** each `(superfile, local_doc_id)` hit to the
//!      supertable's `_id` + projected scalar columns via
//!      [`SuperfileReader::take_by_local_doc_ids`], preserving the
//!      kernel's rank order, and append a `score` column.
//!   2. **Parse** the literal SQL arguments (`column`, `k`, ...).
//!
//! [`SuperfileReader::take_by_local_doc_ids`]: crate::superfile::SuperfileReader::take_by_local_doc_ids

use std::sync::Arc;

use arrow::compute::{concat_batches, take};
use arrow_array::{
    ArrayRef, Decimal128Array, Float32Array, RecordBatch, RecordBatchOptions, UInt32Array,
};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::{
    error::{DataFusionError, Result as DfResult},
    logical_expr::Expr,
    scalar::ScalarValue,
};
use futures::{TryStreamExt, future::try_join_all};
use object_store::{ObjectStore, path::Path};
use parquet::arrow::{
    ProjectionMask,
    async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder},
};
use rayon::prelude::*;
use tokio::sync::oneshot;

use crate::{
    superfile::{
        SuperfileReader,
        reader::{rank_back_indices, row_selection_for_ids},
    },
    supertable::{
        handle::SupertableReader,
        manifest::SuperfileUri,
        options::{DECIMAL128_PRECISION, DECIMAL128_SCALE},
        query::{SuperfileHit, superfile_reader::superfile_reader},
    },
};

/// Resolve `hits` to one `RecordBatch`, with `projection` naming the
/// output columns (any of `_id`, the visible scalar columns, or the
/// trailing `score`); `None` returns the engine-native `_id` + `score`
/// pair. Names are resolved to output-schema indices and forwarded to
/// [`resolve_hits`], which decodes only the projected columns. Shared
/// by every public row-returning search method (`bm25_search`,
/// `vector_search`, `token_match`, `exact_match`); `what` labels error
/// messages with the calling method.
pub(crate) async fn resolve_hits_named(
    reader: &SupertableReader,
    hits: &[SuperfileHit],
    projection: Option<&[&str]>,
    what: &str,
) -> DfResult<RecordBatch> {
    let scalar_schema = reader.options().scalar_schema();
    let output_schema = output_schema_with_score(&scalar_schema);
    // `None` is the engine-native result: `_id` + `score` only.
    // `_id` decodes from its own dedicated id pages (cheap by
    // design) and `score` is synthesized from the hits, so the
    // bare call never touches user-column data pages — projecting
    // those is an explicit opt-in by name.
    let id_column = reader.options().id_column.clone();
    let bare: [&str; 2] = [id_column.as_str(), SCORE_COLUMN];
    let names: &[&str] = match projection {
        Some(names) => names,
        None => &bare,
    };
    let indices: Option<Vec<usize>> = Some(
        names
            .iter()
            .map(|name| {
                output_schema.index_of(name).map_err(|_| {
                    DataFusionError::Execution(format!("{what}: unknown column {name:?}"))
                })
            })
            .collect::<Result<_, _>>()?,
    );
    resolve_hits(
        reader,
        hits,
        &scalar_schema,
        &output_schema,
        indices.as_deref(),
    )
    .await
}

/// Output column carrying the per-hit score (vector distance or BM25
/// relevance — direction is the originating TVF's contract).
pub(crate) const SCORE_COLUMN: &str = "score";

/// Search-TVF output schema: the scalar schema with a trailing
/// non-null `score: Float32` appended.
pub(crate) fn output_schema_with_score(scalar_schema: &SchemaRef) -> SchemaRef {
    let mut fields: Vec<Field> = scalar_schema
        .fields()
        .iter()
        .map(|f| f.as_ref().clone())
        .collect();
    fields.push(Field::new(SCORE_COLUMN, DataType::Float32, false));
    Arc::new(Schema::new(fields))
}

/// Resolve `hits` (in kernel rank order) to a `RecordBatch` matching
/// `output_schema` projected by `projection`, preserving rank order.
///
/// `output_schema` is the scalar schema with a trailing `score`
/// column ([`output_schema_with_score`]); `projection` indexes into
/// it, exactly as DataFusion hands to `scan`. **Only the scalar
/// columns the projection actually selects are decoded** — a query
/// that selects just `score` opens no superfile readers and touches no
/// scalar bytes (cost-first: never decode a column the query did not
/// select). The `score` column is synthesized from the hits.
///
/// Selected scalar columns are read per superfile (each
/// `take_by_local_doc_ids` is a column-projected read), concatenated,
/// then a single `take` reorders rows back into the global rank order
/// so row `i` is the `i`-th hit.
pub(crate) async fn resolve_hits(
    reader: &SupertableReader,
    hits: &[SuperfileHit],
    scalar_schema: &SchemaRef,
    output_schema: &SchemaRef,
    projection: Option<&[usize]>,
) -> DfResult<RecordBatch> {
    let projected_schema = match projection {
        Some(indices) => Arc::new(
            output_schema
                .project(indices)
                .map_err(|e| DataFusionError::Execution(e.to_string()))?,
        ),
        None => Arc::clone(output_schema),
    };
    if hits.is_empty() {
        return Ok(RecordBatch::new_empty(projected_schema));
    }

    // `score` is the trailing column of `output_schema`; every
    // smaller index is a scalar column.
    let score_idx = scalar_schema.fields().len();
    let requested: Vec<usize> = match projection {
        Some(indices) => indices.to_vec(),
        None => (0..output_schema.fields().len()).collect(),
    };

    // Distinct scalar columns the projection selects, in first-seen
    // order — the only columns we decode.
    let mut needed: Vec<&str> = Vec::new();
    for &p in &requested {
        if p != score_idx {
            let name = scalar_schema.field(p).name().as_str();
            if !needed.contains(&name) {
                needed.push(name);
            }
        }
    }

    let id_column = reader.options().id_column.as_str();
    let resolved = if needed.is_empty() {
        None
    } else if needed == [id_column] {
        // Hit → `_id` translation without touching the file: ids are
        // minted in contiguous spans and the superfile body stores
        // rows in id order, so a segment whose manifest stats satisfy
        // `id_max - id_min + 1 == n_docs` maps `local_doc_id` to
        // `id_min + local_doc_id` by arithmetic. Falls back to the
        // id-page read for any segment where the span check fails
        // (multi-span commits can gap the range).
        match resolve_ids_arithmetic(reader, hits) {
            Some(batch) => Some(batch?),
            None => Some(resolve_columns(reader, hits, &needed).await?),
        }
    } else {
        Some(resolve_columns(reader, hits, &needed).await?)
    };

    // Assemble output columns in the projection's emit order, each
    // drawn from the decoded scalar batch or the synthesized score.
    let score = Arc::new(Float32Array::from_iter_values(hits.iter().map(|h| h.score))) as ArrayRef;
    let mut columns: Vec<ArrayRef> = Vec::with_capacity(requested.len());
    for &p in &requested {
        if p == score_idx {
            columns.push(Arc::clone(&score));
        } else {
            let name = scalar_schema.field(p).name();
            let rb = resolved
                .as_ref()
                .expect("a scalar column is projected => columns resolved");
            let idx = rb
                .schema()
                .index_of(name)
                .map_err(|e| DataFusionError::Execution(e.to_string()))?;
            columns.push(Arc::clone(rb.column(idx)));
        }
    }

    // `try_new_with_options` carries the row count so a projection
    // that selects no columns (e.g. `COUNT(*)`) still reports
    // `hits.len()` rows.
    RecordBatch::try_new_with_options(
        projected_schema,
        columns,
        &RecordBatchOptions::new().with_row_count(Some(hits.len())),
    )
    .map_err(|e| DataFusionError::Execution(e.to_string()))
}

/// Hit → stable-`_id` translation by manifest arithmetic — the
/// no-I/O fast path for the bare (`None`) projection.
///
/// Ids are minted in contiguous spans and the superfile body stores
/// rows in id order, so when a superfile's manifest stats satisfy
/// `id_max - id_min + 1 == n_docs` the stable id of row `local` is
/// exactly `id_min + local`. Returns the single-`_id`-column batch in
/// hit (rank) order, or `None` when any hit's superfile fails the span
/// check (e.g. a multi-span commit gapped the range) — the caller
/// then falls back to the id-page read.
fn resolve_ids_arithmetic(
    reader: &SupertableReader,
    hits: &[SuperfileHit],
) -> Option<DfResult<RecordBatch>> {
    let manifest = reader.manifest();
    // Hit sets are top-k sized, so per-superfile memoization via a
    // linear scan is cheaper than building a map.
    let mut memo: Vec<(SuperfileUri, i128)> = Vec::new();
    let mut ids: Vec<i128> = Vec::with_capacity(hits.len());
    for hit in hits {
        let base = match memo.iter().find(|(uri, _)| *uri == hit.superfile) {
            Some((_, base)) => *base,
            None => {
                let entry = manifest
                    .superfiles
                    .iter()
                    .find(|e| e.uri == hit.superfile)?;
                let n_docs = i128::from(entry.n_docs);
                let span = entry.id_max.checked_sub(entry.id_min)?.checked_add(1)?;
                if n_docs == 0 || span != n_docs {
                    return None;
                }
                memo.push((hit.superfile, entry.id_min));
                entry.id_min
            }
        };
        ids.push(base + i128::from(hit.local_doc_id));
    }

    let array = match Decimal128Array::from_iter_values(ids)
        .with_precision_and_scale(DECIMAL128_PRECISION, DECIMAL128_SCALE)
    {
        Ok(a) => a,
        Err(e) => return Some(Err(DataFusionError::Execution(e.to_string()))),
    };
    let schema = Arc::new(Schema::new(vec![Field::new(
        reader.options().id_column.clone(),
        DataType::Decimal128(DECIMAL128_PRECISION, DECIMAL128_SCALE),
        false,
    )]));
    Some(
        RecordBatch::try_new(schema, vec![Arc::new(array) as ArrayRef])
            .map_err(|e| DataFusionError::Execution(e.to_string())),
    )
}

/// Read `names` (scalar columns) at the `hits`' `(superfile,
/// local_doc_id)` rows and return them in global rank order.
///
/// Hits are grouped by superfile for one column-projected
/// [`take_by_local_doc_ids`] per superfile; the per-superfile batches are
/// concatenated and a single `take` restores rank order. Caller
/// guarantees `hits` and `names` are both non-empty.
///
/// [`take_by_local_doc_ids`]: crate::superfile::SuperfileReader::take_by_local_doc_ids
async fn resolve_columns(
    reader: &SupertableReader,
    hits: &[SuperfileHit],
    names: &[&str],
) -> DfResult<RecordBatch> {
    // Group local_doc_ids by superfile, preserving first-seen superfile
    // order and recording where each global hit lands.
    let mut seg_order: Vec<SuperfileUri> = Vec::new();
    let mut seg_locals: Vec<Vec<u32>> = Vec::new();
    let mut placement: Vec<(usize, usize)> = Vec::with_capacity(hits.len());
    for hit in hits {
        let seg_idx = match seg_order.iter().position(|s| *s == hit.superfile) {
            Some(i) => i,
            None => {
                seg_order.push(hit.superfile);
                seg_locals.push(Vec::new());
                seg_order.len() - 1
            }
        };
        let row = seg_locals[seg_idx].len();
        seg_locals[seg_idx].push(hit.local_doc_id);
        placement.push((seg_idx, row));
    }

    // Open every distinct superfile reader concurrently on the tokio
    // runtime — these are async I/O (in-memory cache lookups /
    // disk-cache cold fetches), so overlapping them is the right
    // model and they cost ~microseconds when warm.
    let manifest = reader.manifest();
    let store = &manifest.options.store;
    let disk_cache = manifest.options.disk_cache.as_ref();
    let storage = manifest.options.storage.as_ref();

    let opened = try_join_all(
        seg_order
            .iter()
            .map(|uri| superfile_reader(store, disk_cache, storage, uri, None)),
    )
    .await
    .map_err(|e| DataFusionError::Execution(e.to_string()))?;

    // Materialize each superfile's projected hit rows, split by tier:
    //
    //   - **Resident readers** (in-memory tier / freshly written):
    //     `take_by_local_doc_ids` is a CPU-bound Parquet page decode
    //     over already-resident bytes, so the whole wave runs on
    //     `options.reader_pool` (rayon) — the same pool the search
    //     kernels and the writer's shard builds use — bridged back via
    //     a oneshot so no tokio worker blocks under the compute.
    //   - **Lazy readers** stream ONLY the projected hit rows through
    //     parquet's async `ParquetObjectReader` (footer + projected
    //     column pages via range GETs) — async I/O that belongs on the
    //     query runtime; a cold read never materializes the superfile.
    //
    // Both waves run concurrently and stitch back in `seg_order`
    // order. Superfile count here is bounded by the global top-k (one
    // entry per distinct hit-bearing superfile), so the fan-out is small.
    let mut warm_inputs: Vec<(usize, Arc<SuperfileReader>, Vec<u32>)> = Vec::new();
    let mut cold_units: Vec<(usize, &SuperfileUri, &Arc<SuperfileReader>, &[u32])> = Vec::new();
    for (i, ((uri, rd), locals)) in seg_order
        .iter()
        .zip(opened.iter())
        .zip(seg_locals.iter())
        .enumerate()
    {
        if rd.parquet_bytes().is_some() {
            warm_inputs.push((i, Arc::clone(rd), locals.clone()));
        } else {
            cold_units.push((i, uri, rd, locals.as_slice()));
        }
    }

    let warm_wave = async {
        if warm_inputs.is_empty() {
            return Ok::<Vec<(usize, RecordBatch)>, DataFusionError>(Vec::new());
        }
        // Owned inputs so the rayon closure is `'static`.
        let owned_names: Vec<String> = names.iter().map(|s| (*s).to_string()).collect();
        let pool = Arc::clone(&manifest.options.reader_pool);
        let inputs = warm_inputs;
        let (tx, rx) = oneshot::channel();
        pool.spawn(move || {
            let name_refs: Vec<&str> = owned_names.iter().map(String::as_str).collect();
            let result: Result<Vec<(usize, RecordBatch)>, _> = inputs
                .into_par_iter()
                .map(|(i, sf, locals)| {
                    sf.take_by_local_doc_ids(&locals, &name_refs)
                        .map(|batch| (i, batch))
                })
                .collect();
            let _ = tx.send(result);
        });
        rx.await
            .map_err(|_| {
                DataFusionError::Execution("resolve decode: reader pool dropped result".into())
            })?
            .map_err(|e| DataFusionError::Execution(e.to_string()))
    };

    let cold_wave = try_join_all(cold_units.into_iter().map(
        |(i, uri, reader, locals)| {
            let storage = storage.cloned();
            let file_size = manifest
                .superfiles
                .iter()
                .find(|e| e.uri == *uri)
                .and_then(|e| e.subsection_offsets.as_ref())
                .map(|o| o.total_size);
            async move {
                let storage = storage.ok_or_else(|| {
                    DataFusionError::Execution(format!(
                        "resolve_hits needs row bytes for {uri:?}, but the reader was lazy and no storage backend is attached"
                    ))
                })?;
                let (store, path) =
                    storage.object_store_handle(&uri.storage_path()).ok_or_else(|| {
                        DataFusionError::Execution(format!(
                            "resolve_hits: storage backend exposes no object_store handle for {uri:?}"
                        ))
                    })?;
                take_rows_object_store(
                    store,
                    path,
                    file_size,
                    reader.schema(),
                    reader.n_docs(),
                    locals,
                    names,
                )
                .await
                .map(|batch| (i, batch))
            }
        },
    ));

    let (warm_done, cold_done) = tokio::join!(warm_wave, cold_wave);
    let mut slots: Vec<Option<RecordBatch>> = vec![None; seg_order.len()];
    for (i, batch) in warm_done?.into_iter().chain(cold_done?) {
        slots[i] = Some(batch);
    }
    let per_superfile: Vec<RecordBatch> = slots
        .into_iter()
        .map(|s| s.expect("invariant: every superfile resolved by exactly one wave"))
        .collect();
    // Concatenate, then reorder rows into global rank order.
    let cat_schema = per_superfile[0].schema();
    let combined = concat_batches(&cat_schema, &per_superfile)
        .map_err(|e| DataFusionError::Execution(e.to_string()))?;

    let mut offsets: Vec<u32> = Vec::with_capacity(per_superfile.len());
    let mut acc: u32 = 0;
    for batch in &per_superfile {
        offsets.push(acc);
        acc += batch.num_rows() as u32;
    }
    let reorder =
        UInt32Array::from_iter_values(placement.iter().map(|(s, r)| offsets[*s] + *r as u32));

    let mut columns: Vec<ArrayRef> = Vec::with_capacity(combined.num_columns());
    for column in combined.columns() {
        columns.push(
            take(column, &reorder, None).map_err(|e| DataFusionError::Execution(e.to_string()))?,
        );
    }
    RecordBatch::try_new(combined.schema(), columns)
        .map_err(|e| DataFusionError::Execution(e.to_string()))
}

/// Stream the projected `names` columns at `local_doc_ids` from a lazy
/// object-store superfile via parquet's async `ParquetObjectReader`
/// (footer + projected column pages fetched as range GETs). Mirrors
/// [`SuperfileReader::take_by_local_doc_ids`]'s row-selection + rank-back,
/// but never materializes the whole superfile — this is the cold/object-
/// store row-resolution path.
///
/// [`SuperfileReader::take_by_local_doc_ids`]: crate::superfile::SuperfileReader::take_by_local_doc_ids
async fn take_rows_object_store(
    store: Arc<dyn ObjectStore>,
    path: Path,
    file_size: Option<u64>,
    file_schema: &SchemaRef,
    n_docs: u64,
    local_doc_ids: &[u32],
    names: &[&str],
) -> DfResult<RecordBatch> {
    // Projected column indices (file order) + output fields (caller order).
    let mut col_indices = Vec::with_capacity(names.len());
    let mut out_fields: Vec<Field> = Vec::with_capacity(names.len());
    for &name in names {
        let idx = file_schema
            .index_of(name)
            .map_err(|_| DataFusionError::Execution(format!("unknown column {name}")))?;
        col_indices.push(idx);
        out_fields.push(file_schema.field(idx).clone());
    }
    let out_schema = Arc::new(Schema::new(out_fields));

    if local_doc_ids.is_empty() {
        return Ok(RecordBatch::new_empty(out_schema));
    }
    for &d in local_doc_ids {
        if u64::from(d) >= n_docs {
            return Err(DataFusionError::Execution(format!(
                "doc id {d} out of range (n_docs={n_docs})"
            )));
        }
    }

    // Distinct, sorted ids → monotonic skip/select runs (decode only the
    // rows the hits land on, not the whole column). Same selection
    // contract as `take_by_local_doc_ids` — shared helpers, different
    // I/O model (async range GETs here vs resident-bytes decode there).
    let (sorted, selection) = row_selection_for_ids(local_doc_ids);

    let mut object_reader = ParquetObjectReader::new(store, path);
    if let Some(size) = file_size.filter(|&s| s > 0) {
        // Skip the size-discovery HEAD when the manifest already knows it.
        object_reader = object_reader.with_file_size(size);
    }
    let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
        .await
        .map_err(|e| DataFusionError::Execution(e.to_string()))?;
    let mask = ProjectionMask::roots(builder.parquet_schema(), col_indices.iter().copied());
    let stream = builder
        .with_projection(mask)
        .with_row_selection(selection)
        .build()
        .map_err(|e| DataFusionError::Execution(e.to_string()))?;
    let batches: Vec<RecordBatch> = stream
        .try_collect()
        .await
        .map_err(|e| DataFusionError::Execution(e.to_string()))?;
    if batches.is_empty() {
        return Ok(RecordBatch::new_empty(out_schema));
    }
    let read_schema = batches[0].schema();
    let selected = concat_batches(&read_schema, &batches)
        .map_err(|e| DataFusionError::Execution(e.to_string()))?;

    // Rank back into the caller's (possibly duplicated) order.
    let indices = rank_back_indices(local_doc_ids, &sorted);

    // Gather columns in caller projection order (parquet returns file order).
    let mut columns: Vec<ArrayRef> = Vec::with_capacity(names.len());
    for &name in names {
        let idx = selected
            .schema()
            .index_of(name)
            .map_err(|_| DataFusionError::Execution(format!("unknown column {name}")))?;
        columns.push(
            take(selected.column(idx), &indices, None)
                .map_err(|e| DataFusionError::Execution(e.to_string()))?,
        );
    }
    RecordBatch::try_new(out_schema, columns).map_err(|e| DataFusionError::Execution(e.to_string()))
}

/// Extract a string literal argument (a column name, query text, ...).
pub(crate) fn arg_to_string(expr: &Expr, what: &str) -> DfResult<String> {
    match expr {
        Expr::Literal(ScalarValue::Utf8(Some(s)), _)
        | Expr::Literal(ScalarValue::LargeUtf8(Some(s)), _)
        | Expr::Literal(ScalarValue::Utf8View(Some(s)), _) => Ok(s.clone()),
        other => Err(DataFusionError::Plan(format!(
            "{what} must be a string literal, got {other:?}"
        ))),
    }
}

/// Extract a non-negative integer literal argument (`k`).
pub(crate) fn arg_to_usize(expr: &Expr, what: &str) -> DfResult<usize> {
    let n: i64 = match expr {
        Expr::Literal(ScalarValue::Int64(Some(n)), _) => *n,
        Expr::Literal(ScalarValue::Int32(Some(n)), _) => i64::from(*n),
        Expr::Literal(ScalarValue::UInt64(Some(n)), _) => *n as i64,
        Expr::Literal(ScalarValue::UInt32(Some(n)), _) => i64::from(*n),
        other => {
            return Err(DataFusionError::Plan(format!(
                "{what} must be an integer literal, got {other:?}"
            )));
        }
    };
    usize::try_from(n).map_err(|_| DataFusionError::Plan(format!("{what} must be >= 0, got {n}")))
}

#[cfg(test)]
mod tests {
    use arrow_array::{Array, FixedSizeListArray, LargeStringArray};
    use arrow_schema::Field;
    use bytes::Bytes;
    use datafusion::prelude::lit;
    use object_store::{ObjectStore, ObjectStoreExt, PutPayload, memory, path::Path as ObjPath};
    use rayon::ThreadPoolBuilder;
    use tempfile::TempDir;

    use super::*;
    use crate::{
        storage::{LocalFsStorageProvider, StorageProvider},
        superfile::{
            builder::{BuilderOptions, FtsConfig, SuperfileBuilder, VectorConfig},
            fts::reader::BoolMode,
            vector::{distance::Metric, rerank_codec::RerankCodec},
        },
        supertable::{
            Supertable, SupertableOptions,
            reader_cache::{ColdFetchMode, DiskCacheConfig, DiskCacheStore},
        },
        test_helpers::{
            build_title_batch, decimal128_id_field, decimal128_ids, default_supertable_options,
            default_tokenizer as tok,
        },
    };

    #[test]
    fn arg_to_string_accepts_utf8_literal_rejects_int() {
        assert_eq!(
            arg_to_string(&lit("emb"), "column").expect("utf8 literal"),
            "emb"
        );
        assert!(arg_to_string(&lit(3_i64), "column").is_err());
    }

    #[test]
    fn arg_to_string_accepts_large_utf8_and_utf8_view() {
        let large = Expr::Literal(ScalarValue::LargeUtf8(Some("body".into())), None);
        assert_eq!(arg_to_string(&large, "column").expect("large utf8"), "body");
        let view = Expr::Literal(ScalarValue::Utf8View(Some("title".into())), None);
        assert_eq!(arg_to_string(&view, "column").expect("utf8 view"), "title");
    }

    #[test]
    fn arg_to_usize_accepts_int_rejects_negative_and_nonint() {
        assert_eq!(arg_to_usize(&lit(10_i64), "k").expect("int literal"), 10);
        assert!(arg_to_usize(&lit(-1_i64), "k").is_err());
        assert!(arg_to_usize(&lit("nope"), "k").is_err());
    }

    #[test]
    fn arg_to_usize_accepts_all_integer_widths() {
        let i32e = Expr::Literal(ScalarValue::Int32(Some(7)), None);
        let u64e = Expr::Literal(ScalarValue::UInt64(Some(8)), None);
        let u32e = Expr::Literal(ScalarValue::UInt32(Some(9)), None);
        assert_eq!(arg_to_usize(&i32e, "k").expect("i32"), 7);
        assert_eq!(arg_to_usize(&u64e, "k").expect("u64"), 8);
        assert_eq!(arg_to_usize(&u32e, "k").expect("u32"), 9);
    }

    #[test]
    fn output_schema_appends_score() {
        let s = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
        let out = output_schema_with_score(&s);
        assert_eq!(out.fields().len(), 2);
        assert_eq!(out.field(1).name(), "score");
        assert_eq!(out.field(1).data_type(), &DataType::Float32);
    }

    // ---- harness exercising resolve_hits_named / resolve_ids_arithmetic /
    //      resolve_columns through the public search methods ----

    fn fixed_list_f32(dim: usize) -> DataType {
        DataType::FixedSizeList(
            Arc::new(Field::new("item", DataType::Float32, true)),
            dim as i32,
        )
    }

    fn options_title_emb(dim: usize) -> SupertableOptions {
        let pool = Arc::new(
            ThreadPoolBuilder::new()
                .num_threads(1)
                .build()
                .expect("pool"),
        );
        let schema = Arc::new(Schema::new(vec![
            Field::new("title", DataType::LargeUtf8, false),
            Field::new("emb", fixed_list_f32(dim), false),
        ]));
        SupertableOptions::new(
            schema,
            vec![FtsConfig {
                column: "title".into(),
            }],
            vec![VectorConfig {
                column: "emb".into(),
                dim,
                n_cent: 4,
                rot_seed: 7,
                metric: Metric::Cosine,
                rerank_codec: RerankCodec::Fp32,
            }],
            Some(tok()),
        )
        .expect("valid options")
        .with_writer_pool(pool)
    }

    fn build_batch(titles: &[&str], dim: usize, schema: Arc<Schema>) -> RecordBatch {
        let n = titles.len();
        let title_arr = LargeStringArray::from(titles.to_vec());
        let mut flat = Vec::<f32>::with_capacity(n * dim);
        for i in 0..n {
            for d in 0..dim {
                flat.push(if d == i % dim { 1.0 } else { 0.0 });
            }
        }
        let fsl = FixedSizeListArray::try_new(
            Arc::new(Field::new("item", DataType::Float32, true)),
            dim as i32,
            Arc::new(Float32Array::from(flat)) as ArrayRef,
            None,
        )
        .expect("FSL");
        RecordBatch::try_new(schema, vec![Arc::new(title_arr), Arc::new(fsl)]).expect("batch")
    }

    fn demo(dim: usize) -> Supertable {
        let st = Supertable::create(options_title_emb(dim)).expect("create");
        let mut w = st.writer().expect("writer");
        let schema = st.options().schema.clone();
        w.append(&build_batch(
            &["rust async", "python data", "rust systems", "go routines"],
            dim,
            schema,
        ))
        .expect("append");
        w.commit().expect("commit");
        st
    }

    #[test]
    fn resolve_hits_named_id_only_takes_arithmetic_fast_path() {
        // `_id`-only projection drives resolve_hits_named → the
        // resolve_ids_arithmetic no-I/O fast path (single contiguous
        // span: id_max - id_min + 1 == n_docs).
        let st = demo(16);
        let batches = st
            .reader()
            .bm25_search("title", "rust", 10, BoolMode::Or, Some(&["_id"]))
            .expect("bm25_search _id");
        let b = &batches[0];
        assert_eq!(b.num_columns(), 1);
        assert_eq!(b.schema().field(0).name(), "_id");
        assert_eq!(b.num_rows(), 2, "two docs contain 'rust'");
    }

    #[test]
    fn resolve_hits_named_default_is_id_and_score() {
        // `None` projection is the engine-native `_id` + `score` pair.
        let st = demo(16);
        let batches = st
            .reader()
            .bm25_search("title", "rust", 10, BoolMode::Or, None)
            .expect("bm25_search default");
        let b = &batches[0];
        assert_eq!(b.num_columns(), 2);
        assert_eq!(b.schema().field(0).name(), "_id");
        assert_eq!(b.schema().field(1).name(), "score");
    }

    #[test]
    fn resolve_hits_named_scalar_column_decodes_via_resolve_columns() {
        // Naming a scalar column (`title`) forces resolve_columns to
        // decode the column bytes; `score` synthesized alongside.
        let st = demo(16);
        let batches = st
            .reader()
            .bm25_search(
                "title",
                "rust",
                10,
                BoolMode::Or,
                Some(&["_id", "title", "score"]),
            )
            .expect("bm25_search title");
        let b = &batches[0];
        assert_eq!(b.num_columns(), 3);
        let titles = b
            .column(1)
            .as_any()
            .downcast_ref::<LargeStringArray>()
            .expect("title col");
        for i in 0..titles.len() {
            assert!(titles.value(i).contains("rust"));
        }
    }

    #[test]
    fn resolve_hits_named_unknown_column_errors() {
        let st = demo(16);
        let res = st
            .reader()
            .bm25_search("title", "rust", 10, BoolMode::Or, Some(&["nope"]));
        assert!(res.is_err(), "unknown projected column must error");
    }

    #[test]
    fn resolve_hits_named_empty_hits_returns_empty_batch() {
        let st = demo(16);
        let batches = st
            .reader()
            .bm25_search("title", "nonexistentterm", 10, BoolMode::Or, Some(&["_id"]))
            .expect("bm25_search empty");
        let total: usize = batches.iter().map(RecordBatch::num_rows).sum();
        assert_eq!(total, 0);
    }

    // ---- resolve_hits direct: projection branches that the named
    //      wrapper never reaches (score-only, empty projection) ----

    /// Two hits against the demo superfile, with deliberately distinct
    /// scores so the synthesized `score` column can be checked.
    fn two_hits(reader: &SupertableReader) -> Vec<SuperfileHit> {
        let entry = Arc::clone(&reader.manifest().superfiles[0]);
        vec![
            SuperfileHit {
                superfile: entry.uri,
                local_doc_id: 0,
                score: 1.5,
            },
            SuperfileHit {
                superfile: entry.uri,
                local_doc_id: (entry.n_docs - 1) as u32,
                score: 0.5,
            },
        ]
    }

    #[test]
    fn resolve_hits_score_only_synthesizes_score_without_decoding_scalars() {
        // Projecting just the trailing `score` index decodes no scalar
        // columns (the cost-first "open no readers" branch): `needed`
        // is empty, `score` is synthesized straight from the hits.
        let st = demo(16);
        let reader = st.reader();
        let hits = two_hits(&reader);
        let scalar_schema = reader.options().scalar_schema();
        let output_schema = output_schema_with_score(&scalar_schema);
        let score_idx = scalar_schema.fields().len();

        let batch = reader
            .block_on(resolve_hits(
                &reader,
                &hits,
                &scalar_schema,
                &output_schema,
                Some(&[score_idx]),
            ))
            .expect("score-only resolve");

        assert_eq!(batch.num_columns(), 1);
        assert_eq!(batch.schema().field(0).name(), SCORE_COLUMN);
        assert_eq!(batch.num_rows(), hits.len());
        let scores = batch
            .column(0)
            .as_any()
            .downcast_ref::<Float32Array>()
            .expect("score col");
        assert_eq!(scores.value(0), 1.5);
        assert_eq!(scores.value(1), 0.5);
    }

    #[test]
    fn resolve_hits_none_projection_returns_all_scalar_columns_and_score() {
        // `projection: None` (distinct from `resolve_hits_named`'s
        // `_id`+`score` default) materializes every scalar column plus
        // the trailing synthesized `score`, in schema order.
        let st = demo(16);
        let reader = st.reader();
        let hits = two_hits(&reader);
        let scalar_schema = reader.options().scalar_schema();
        let output_schema = output_schema_with_score(&scalar_schema);

        let batch = reader
            .block_on(resolve_hits(
                &reader,
                &hits,
                &scalar_schema,
                &output_schema,
                None,
            ))
            .expect("none-projection resolve");

        assert_eq!(batch.num_columns(), output_schema.fields().len());
        assert_eq!(batch.num_rows(), hits.len());
        let last = batch.num_columns() - 1;
        assert_eq!(batch.schema().field(last).name(), SCORE_COLUMN);
        assert_eq!(batch.schema().field(0).name(), "_id");
    }

    #[test]
    fn resolve_hits_empty_projection_preserves_hit_row_count() {
        // A zero-column projection (the `COUNT(*)` shape) emits no
        // columns but must still report `hits.len()` rows — the
        // `with_row_count` path.
        let st = demo(16);
        let reader = st.reader();
        let hits = two_hits(&reader);
        let scalar_schema = reader.options().scalar_schema();
        let output_schema = output_schema_with_score(&scalar_schema);

        let batch = reader
            .block_on(resolve_hits(
                &reader,
                &hits,
                &scalar_schema,
                &output_schema,
                Some(&[]),
            ))
            .expect("empty-projection resolve");

        assert_eq!(batch.num_columns(), 0);
        assert_eq!(batch.num_rows(), hits.len());
    }

    // ---- resolve_ids_arithmetic direct: the no-I/O span arithmetic
    //      and its fallback when the span check can't apply ----

    #[test]
    fn resolve_ids_arithmetic_maps_local_ids_via_manifest_span() {
        // Single contiguous commit => `id_max - id_min + 1 == n_docs`,
        // so row `local` resolves to `id_min + local` with no file read.
        let st = demo(16);
        let reader = st.reader();
        let entry = Arc::clone(&reader.manifest().superfiles[0]);
        let last = (entry.n_docs - 1) as u32;
        let hits = vec![
            SuperfileHit {
                superfile: entry.uri,
                local_doc_id: 0,
                score: 0.0,
            },
            SuperfileHit {
                superfile: entry.uri,
                local_doc_id: last,
                score: 0.0,
            },
        ];

        let batch = resolve_ids_arithmetic(&reader, &hits)
            .expect("contiguous span => Some")
            .expect("ok batch");
        assert_eq!(batch.num_columns(), 1);
        assert_eq!(batch.schema().field(0).name(), "_id");
        let ids = batch
            .column(0)
            .as_any()
            .downcast_ref::<Decimal128Array>()
            .expect("decimal id col");
        assert_eq!(ids.value(0), entry.id_min);
        assert_eq!(ids.value(1), entry.id_min + i128::from(last));
    }

    #[test]
    fn resolve_ids_arithmetic_returns_none_when_superfile_absent() {
        // A hit naming a superfile not in the manifest fails the
        // lookup, so arithmetic bails to `None` and the caller falls
        // back to the id-page read.
        let st = demo(16);
        let reader = st.reader();
        let hits = vec![SuperfileHit {
            superfile: SuperfileUri::new_v4(),
            local_doc_id: 0,
            score: 0.0,
        }];
        assert!(
            resolve_ids_arithmetic(&reader, &hits).is_none(),
            "unknown superfile must abandon the arithmetic fast path",
        );
    }

    // ---- take_rows_object_store: the cold/object-store row-resolution
    //      path, exercised directly against an in-memory object store ----

    /// Titles for the standalone superfile the cold-path tests stream
    /// rows out of. Five rows so a sub-selection genuinely skips some.
    const TITLES: [&str; 5] = ["alpha", "bravo", "charlie", "delta", "echo"];

    /// Build a standalone superfile (id + `title` scalar columns, no
    /// indexes) whose `title` rows are [`TITLES`], in row order.
    fn titled_superfile_bytes() -> Bytes {
        let schema: Arc<Schema> = Arc::new(Schema::new(vec![
            decimal128_id_field("doc_id"),
            Field::new("title", DataType::LargeUtf8, false),
        ]));
        let opts = BuilderOptions::new(schema.clone(), "doc_id", vec![], vec![], None);
        let mut b = SuperfileBuilder::new(opts).expect("builder");
        let ids = decimal128_ids(0..TITLES.len() as u64);
        let title = LargeStringArray::from(TITLES.to_vec());
        let batch = RecordBatch::try_new(schema, vec![Arc::new(ids), Arc::new(title)])
            .expect("build batch");
        b.add_batch(&batch, &[]).expect("add_batch");
        Bytes::from(b.finish().expect("finish builder"))
    }

    /// Open `bytes` eagerly just to read the parquet schema + row count
    /// that the cold path passes to [`take_rows_object_store`].
    fn schema_and_n_docs(bytes: &Bytes) -> (SchemaRef, u64) {
        let reader = SuperfileReader::open(bytes.clone()).expect("open");
        (Arc::clone(reader.schema()), reader.n_docs())
    }

    /// Put `bytes` into a fresh in-memory object store and return the
    /// handle + path the cold reader will range-GET against.
    async fn object_store_with(bytes: &Bytes) -> (Arc<dyn ObjectStore>, ObjPath) {
        let store: Arc<dyn ObjectStore> = Arc::new(memory::InMemory::new());
        let path = ObjPath::from("data/seg.sf.parquet");
        store
            .put(&path, PutPayload::from_bytes(bytes.clone()))
            .await
            .expect("put superfile into object store");
        (store, path)
    }

    /// Downcast the single returned `title` column to a string array.
    fn titles_of(batch: &RecordBatch) -> Vec<String> {
        let arr = batch
            .column(0)
            .as_any()
            .downcast_ref::<LargeStringArray>()
            .expect("title col");
        (0..arr.len()).map(|i| arr.value(i).to_string()).collect()
    }

    #[tokio::test]
    async fn take_rows_object_store_streams_rows_in_caller_rank_order() {
        // Out-of-order ids: rows are decoded sorted but ranked back into
        // the caller's order, so output row i is the i-th requested id.
        let bytes = titled_superfile_bytes();
        let (schema, n_docs) = schema_and_n_docs(&bytes);
        let (store, path) = object_store_with(&bytes).await;

        let batch = take_rows_object_store(
            store,
            path,
            Some(bytes.len() as u64),
            &schema,
            n_docs,
            &[2, 0, 3],
            &["title"],
        )
        .await
        .expect("take rows");

        assert_eq!(batch.num_columns(), 1);
        assert_eq!(batch.schema().field(0).name(), "title");
        assert_eq!(titles_of(&batch), vec!["charlie", "alpha", "delta"]);
    }

    #[tokio::test]
    async fn take_rows_object_store_ranks_back_duplicate_ids() {
        // The same id requested twice must appear twice in the output —
        // rows are decoded once (distinct) and gathered back per request.
        let bytes = titled_superfile_bytes();
        let (schema, n_docs) = schema_and_n_docs(&bytes);
        let (store, path) = object_store_with(&bytes).await;

        let batch = take_rows_object_store(
            store,
            path,
            Some(bytes.len() as u64),
            &schema,
            n_docs,
            &[1, 1, 0],
            &["title"],
        )
        .await
        .expect("take rows");

        assert_eq!(titles_of(&batch), vec!["bravo", "bravo", "alpha"]);
    }

    #[tokio::test]
    async fn take_rows_object_store_discovers_size_when_file_size_none() {
        // `None` file_size omits the `with_file_size` shortcut, so the
        // parquet reader discovers the size itself — same rows out.
        let bytes = titled_superfile_bytes();
        let (schema, n_docs) = schema_and_n_docs(&bytes);
        let (store, path) = object_store_with(&bytes).await;

        let batch = take_rows_object_store(store, path, None, &schema, n_docs, &[4], &["title"])
            .await
            .expect("take rows");

        assert_eq!(titles_of(&batch), vec!["echo"]);
    }

    #[tokio::test]
    async fn take_rows_object_store_empty_ids_returns_empty_batch() {
        let bytes = titled_superfile_bytes();
        let (schema, n_docs) = schema_and_n_docs(&bytes);
        let (store, path) = object_store_with(&bytes).await;

        let batch = take_rows_object_store(
            store,
            path,
            Some(bytes.len() as u64),
            &schema,
            n_docs,
            &[],
            &["title"],
        )
        .await
        .expect("empty ids");

        assert_eq!(batch.num_rows(), 0);
        assert_eq!(batch.schema().field(0).name(), "title");
    }

    #[tokio::test]
    async fn take_rows_object_store_out_of_range_id_errors() {
        let bytes = titled_superfile_bytes();
        let (schema, n_docs) = schema_and_n_docs(&bytes);
        let (store, path) = object_store_with(&bytes).await;

        let err = take_rows_object_store(
            store,
            path,
            Some(bytes.len() as u64),
            &schema,
            n_docs,
            &[n_docs as u32],
            &["title"],
        )
        .await
        .expect_err("doc id past n_docs must error");
        assert!(
            err.to_string().contains("out of range"),
            "expected an out-of-range error, got {err}",
        );
    }

    #[tokio::test]
    async fn take_rows_object_store_unknown_column_errors() {
        let bytes = titled_superfile_bytes();
        let (schema, n_docs) = schema_and_n_docs(&bytes);
        let (store, path) = object_store_with(&bytes).await;

        let err = take_rows_object_store(
            store,
            path,
            Some(bytes.len() as u64),
            &schema,
            n_docs,
            &[0],
            &["nope"],
        )
        .await
        .expect_err("unknown column must error");
        assert!(
            err.to_string().contains("unknown column"),
            "expected an unknown-column error, got {err}",
        );
    }

    // ---- resolve_columns cold path: lazy (object-store) readers ----
    //
    // `demo()` keeps superfile bytes resident, so its readers are warm
    // and `resolve_columns` only ever takes the rayon decode branch. To
    // drive the cold branch (lazy readers → `take_rows_object_store`),
    // commit to durable storage, then *reopen* with a lazy disk cache:
    // the reopened handle's in-memory tier is empty, so every read cold-
    // fetches a lazy reader from object storage.

    /// Commit four titled docs to `storage` via a throwaway producer.
    fn commit_titles_to(storage: &Arc<dyn StorageProvider>) {
        let producer =
            Supertable::create(default_supertable_options().with_storage(Arc::clone(storage)))
                .expect("create producer");
        let mut w = producer.writer().expect("writer");
        w.append(&build_title_batch(&[
            "rust async",
            "python data",
            "rust systems",
            "go routines",
        ]))
        .expect("append");
        w.commit().expect("commit");
    }

    /// Reopen the supertable at `consumer_storage` with a lazy
    /// (`LazyForegroundWithBackgroundFill`) disk cache so reads resolve to
    /// lazy range-GET readers. The reopened handle's in-memory tier is
    /// empty, so every read cold-fetches.
    fn open_cold(
        consumer_storage: Arc<dyn StorageProvider>,
        cache_dir: &TempDir,
    ) -> (Arc<DiskCacheStore>, Supertable) {
        let cfg = DiskCacheConfig {
            cache_root: cache_dir.path().to_path_buf(),
            cold_fetch_mode: ColdFetchMode::LazyForegroundWithBackgroundFill,
            mmap_cold_threshold_secs: 0,
            mmap_sweep_interval_secs: 0,
            ..Default::default()
        };
        let cache =
            DiskCacheStore::new_unpinned(Arc::clone(&consumer_storage), cfg).expect("cache");
        let consumer = Supertable::open(
            default_supertable_options()
                .with_storage(consumer_storage)
                .with_disk_cache(Arc::clone(&cache)),
        )
        .expect("open consumer");
        (cache, consumer)
    }

    /// Producer commits to local-FS storage; consumer reopens cold over
    /// the same storage. Returns the temp dirs (kept alive as RAII
    /// guards), the cache (for stats), and the cold consumer handle.
    fn cold_consumer() -> (TempDir, TempDir, Arc<DiskCacheStore>, Supertable) {
        let storage_dir = TempDir::new().expect("storage tempdir");
        let cache_dir = TempDir::new().expect("cache tempdir");
        let storage: Arc<dyn StorageProvider> =
            Arc::new(LocalFsStorageProvider::new(storage_dir.path()).expect("provider"));
        commit_titles_to(&storage);
        let (cache, consumer) = open_cold(Arc::clone(&storage), &cache_dir);
        (storage_dir, cache_dir, cache, consumer)
    }

    #[test]
    fn resolve_columns_cold_path_streams_scalar_via_object_store() {
        // Naming a non-id scalar column forces resolve_columns; the lazy
        // readers route it through take_rows_object_store (range GETs),
        // and `score` is synthesized alongside in rank order.
        let (_sd, _cd, cache, consumer) = cold_consumer();

        let batches = consumer
            .reader()
            .bm25_search("title", "rust", 10, BoolMode::Or, Some(&["title", "score"]))
            .expect("cold bm25 with scalar projection");

        let b = &batches[0];
        assert_eq!(b.num_columns(), 2);
        assert_eq!(b.schema().field(0).name(), "title");
        assert_eq!(b.schema().field(1).name(), "score");
        let titles = b
            .column(0)
            .as_any()
            .downcast_ref::<LargeStringArray>()
            .expect("title col");
        assert_eq!(titles.len(), 2, "two docs contain 'rust'");
        for i in 0..titles.len() {
            assert!(titles.value(i).contains("rust"));
        }
        // The reopened consumer's in-memory tier is empty, so resolving
        // the rows genuinely cold-fetched through the disk cache — this
        // is what put us on the cold branch.
        assert!(
            cache.stats().n_cold_fetches >= 1,
            "scalar resolution must cold-fetch lazy readers; got {}",
            cache.stats().n_cold_fetches,
        );
    }

    #[test]
    fn resolve_columns_cold_path_empty_hits_opens_no_readers() {
        // A query that matches nothing produces no hits, so resolve_hits
        // short-circuits before resolve_columns ever opens a reader — no
        // cold-fetch is issued for the (absent) scalar resolution.
        let (_sd, _cd, _cache, consumer) = cold_consumer();

        let batches = consumer
            .reader()
            .bm25_search(
                "title",
                "nonexistentterm",
                10,
                BoolMode::Or,
                Some(&["title", "score"]),
            )
            .expect("cold bm25 with no matches");

        let rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
        assert_eq!(rows, 0);
    }
}