csv_managed/
index.rs

1use std::{borrow::Cow, collections::BTreeMap, fs::File, io::BufWriter, path::Path};
2
3use anyhow::{Context, Result, anyhow};
4use serde::{Deserialize, Serialize};
5
6use crate::{
7    data::{ComparableValue, parse_typed_value},
8    io_utils,
9    schema::{ColumnMeta, ColumnType, Schema},
10};
11
12use encoding_rs::Encoding;
13
14const INDEX_VERSION: u32 = 2;
15
16#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
17pub enum SortDirection {
18    Asc,
19    Desc,
20}
21
22impl SortDirection {
23    pub fn is_ascending(self) -> bool {
24        matches!(self, SortDirection::Asc)
25    }
26
27    fn as_str(self) -> &'static str {
28        match self {
29            SortDirection::Asc => "asc",
30            SortDirection::Desc => "desc",
31        }
32    }
33}
34
35impl std::fmt::Display for SortDirection {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        write!(f, "{}", self.as_str())
38    }
39}
40
41#[derive(Debug, Clone)]
42pub struct IndexDefinition {
43    pub columns: Vec<String>,
44    pub directions: Vec<SortDirection>,
45    pub name: Option<String>,
46}
47
48impl IndexDefinition {
49    pub fn from_columns(columns: Vec<String>) -> Result<Self> {
50        let cleaned: Vec<String> = columns
51            .into_iter()
52            .map(|c| c.trim().to_string())
53            .filter(|c| !c.is_empty())
54            .collect();
55        if cleaned.is_empty() {
56            return Err(anyhow!("At least one column is required to build an index"));
57        }
58        Ok(IndexDefinition {
59            directions: vec![SortDirection::Asc; cleaned.len()],
60            columns: cleaned,
61            name: None,
62        })
63    }
64
65    pub fn parse(spec: &str) -> Result<Self> {
66        let (name, remainder) = if let Some((raw_name, rest)) = spec.split_once('=') {
67            let trimmed_name = raw_name.trim();
68            if trimmed_name.is_empty() {
69                return Err(anyhow!(
70                    "Index specification is missing a variant name before '=': '{spec}'"
71                ));
72            }
73            let trimmed_rest = rest.trim();
74            if trimmed_rest.is_empty() {
75                return Err(anyhow!(
76                    "Index specification '{spec}' is missing column definitions after '='"
77                ));
78            }
79            (Some(trimmed_name.to_string()), trimmed_rest)
80        } else {
81            (None, spec)
82        };
83
84        let mut columns = Vec::new();
85        let mut directions = Vec::new();
86        for token in remainder.split(',') {
87            let mut parts = token.split(':');
88            let column = parts
89                .next()
90                .map(|s| s.trim())
91                .filter(|s| !s.is_empty())
92                .ok_or_else(|| anyhow!("Index specification is missing a column name"))?;
93            let direction = parts
94                .next()
95                .map(|raw| raw.trim().to_ascii_lowercase())
96                .filter(|s| !s.is_empty())
97                .map(|value| match value.as_str() {
98                    "asc" => Ok(SortDirection::Asc),
99                    "desc" => Ok(SortDirection::Desc),
100                    other => Err(anyhow!("Unknown sort direction '{other}'")),
101                })
102                .transpose()?;
103            columns.push(column.to_string());
104            directions.push(direction.unwrap_or(SortDirection::Asc));
105        }
106        if columns.is_empty() {
107            return Err(anyhow!(
108                "Index specification did not contain any columns: '{spec}'"
109            ));
110        }
111        Ok(IndexDefinition {
112            columns,
113            directions,
114            name,
115        })
116    }
117
118    pub fn expand_covering_spec(spec: &str) -> Result<Vec<Self>> {
119        let (name_prefix, remainder) = if let Some((raw_name, rest)) = spec.split_once('=') {
120            let trimmed_name = raw_name.trim();
121            if trimmed_name.is_empty() {
122                return Err(anyhow!(
123                    "Covering specification is missing a name before '=': '{spec}'"
124                ));
125            }
126            let trimmed_rest = rest.trim();
127            if trimmed_rest.is_empty() {
128                return Err(anyhow!(
129                    "Covering specification '{spec}' is missing column definitions after '='"
130                ));
131            }
132            (Some(trimmed_name.to_string()), trimmed_rest)
133        } else {
134            (None, spec.trim())
135        };
136
137        let columns = remainder
138            .split(',')
139            .map(|token| token.trim())
140            .filter(|token| !token.is_empty())
141            .map(parse_covering_column)
142            .collect::<Result<Vec<_>>>()?;
143
144        if columns.is_empty() {
145            return Err(anyhow!(
146                "Covering specification did not contain any columns: '{spec}'"
147            ));
148        }
149
150        let mut definitions = Vec::new();
151        for prefix_len in 1..=columns.len() {
152            let prefix = &columns[..prefix_len];
153            let direction_sets = prefix
154                .iter()
155                .map(|column| column.directions.as_slice())
156                .collect::<Vec<_>>();
157            for directions in cartesian_product(&direction_sets) {
158                let column_names = prefix
159                    .iter()
160                    .map(|column| column.name.clone())
161                    .collect::<Vec<_>>();
162                let variant_name =
163                    build_covering_name(name_prefix.as_deref(), &column_names, &directions);
164                definitions.push(IndexDefinition {
165                    columns: column_names,
166                    directions,
167                    name: Some(variant_name),
168                });
169            }
170        }
171
172        Ok(definitions)
173    }
174}
175
176#[derive(Debug, Clone)]
177struct CoveringColumn {
178    name: String,
179    directions: Vec<SortDirection>,
180}
181
182fn parse_covering_column(token: &str) -> Result<CoveringColumn> {
183    let mut parts = token.split(':');
184    let name = parts
185        .next()
186        .map(|s| s.trim())
187        .filter(|s| !s.is_empty())
188        .ok_or_else(|| anyhow!("Covering specification is missing a column name"))?;
189    let directions = if let Some(dir_part) = parts.next() {
190        let options = dir_part
191            .split('|')
192            .map(|raw| raw.trim().to_ascii_lowercase())
193            .filter(|s| !s.is_empty())
194            .map(|value| match value.as_str() {
195                "asc" => Ok(SortDirection::Asc),
196                "desc" => Ok(SortDirection::Desc),
197                other => Err(anyhow!("Unknown sort direction '{other}'")),
198            })
199            .collect::<Result<Vec<_>>>()?;
200        if options.is_empty() {
201            vec![SortDirection::Asc]
202        } else {
203            options
204        }
205    } else {
206        vec![SortDirection::Asc]
207    };
208
209    Ok(CoveringColumn {
210        name: name.to_string(),
211        directions,
212    })
213}
214
215fn cartesian_product(options: &[&[SortDirection]]) -> Vec<Vec<SortDirection>> {
216    let mut acc = vec![Vec::new()];
217    for set in options {
218        let mut next = Vec::new();
219        for combination in &acc {
220            for direction in *set {
221                let mut updated = combination.clone();
222                updated.push(*direction);
223                next.push(updated);
224            }
225        }
226        acc = next;
227    }
228    acc
229}
230
231fn build_covering_name(
232    prefix: Option<&str>,
233    columns: &[String],
234    directions: &[SortDirection],
235) -> String {
236    let suffix = columns
237        .iter()
238        .zip(directions.iter())
239        .map(|(column, direction)| {
240            format!("{}-{}", sanitize_identifier(column), direction.as_str())
241        })
242        .collect::<Vec<_>>()
243        .join("_");
244    match prefix {
245        Some(p) => {
246            if suffix.is_empty() {
247                sanitize_identifier(p)
248            } else {
249                format!("{}_{}", sanitize_identifier(p), suffix)
250            }
251        }
252        None => suffix,
253    }
254}
255
256fn sanitize_identifier(value: &str) -> String {
257    value
258        .chars()
259        .map(|ch| match ch {
260            'a'..='z' | 'A'..='Z' | '0'..='9' | '_' => ch,
261            _ => '_',
262        })
263        .collect()
264}
265
266#[derive(Debug, Clone, Serialize, Deserialize)]
267pub struct CsvIndex {
268    version: u32,
269    headers: Vec<String>,
270    variants: Vec<IndexVariant>,
271    row_count: usize,
272}
273
274impl CsvIndex {
275    pub fn build(
276        csv_path: &Path,
277        definitions: &[IndexDefinition],
278        schema: Option<&Schema>,
279        limit: Option<usize>,
280        delimiter: u8,
281        encoding: &'static Encoding,
282    ) -> Result<Self> {
283        if definitions.is_empty() {
284            return Err(anyhow!(
285                "Specify at least one column set via --columns or --spec"
286            ));
287        }
288
289        let mut reader = io_utils::open_seekable_csv_reader(csv_path, delimiter, true)?;
290        let headers = io_utils::reader_headers(&mut reader, encoding)?;
291
292        let mut builders = definitions
293            .iter()
294            .map(|definition| IndexVariantBuilder::new(definition, &headers, schema, encoding))
295            .collect::<Result<Vec<_>>>()?;
296
297        let mut record = csv::ByteRecord::new();
298        let mut processed = 0usize;
299
300        loop {
301            if limit.is_some_and(|limit| processed >= limit) {
302                break;
303            }
304            let start_offset = reader.position().byte();
305            if !reader.read_byte_record(&mut record)? {
306                break;
307            }
308            for builder in builders.iter_mut() {
309                builder.add_record(&record, start_offset)?;
310            }
311            processed += 1;
312        }
313
314        let variants = builders
315            .into_iter()
316            .map(IndexVariantBuilder::finish)
317            .collect::<Vec<_>>();
318
319        Ok(CsvIndex {
320            version: INDEX_VERSION,
321            headers,
322            row_count: processed,
323            variants,
324        })
325    }
326
327    pub fn save(&self, path: &Path) -> Result<()> {
328        let file = File::create(path).with_context(|| format!("Creating index file {path:?}"))?;
329        let mut writer = BufWriter::new(file);
330        bincode::serde::encode_into_std_write(self, &mut writer, bincode::config::legacy())
331            .context("Writing index file")?;
332        Ok(())
333    }
334
335    pub fn load(path: &Path) -> Result<Self> {
336        let bytes = std::fs::read(path).with_context(|| format!("Opening index file {path:?}"))?;
337        let config = bincode::config::legacy();
338        match bincode::serde::decode_from_slice::<CsvIndex, _>(&bytes, config) {
339            Ok((index, _)) => {
340                if index.version != INDEX_VERSION {
341                    return Err(anyhow!(
342                        "Unsupported index version {} (expected {INDEX_VERSION})",
343                        index.version
344                    ));
345                }
346                Ok(index)
347            }
348            Err(err) => {
349                let (legacy, _) =
350                    bincode::serde::decode_from_slice::<LegacyCsvIndex, _>(&bytes, config)
351                        .with_context(|| {
352                            format!("Reading legacy index file format after decode error: {err}")
353                        })?;
354                Ok(legacy.into())
355            }
356        }
357    }
358
359    pub fn variants(&self) -> &[IndexVariant] {
360        &self.variants
361    }
362
363    pub fn row_count(&self) -> usize {
364        self.row_count
365    }
366
367    pub fn variant_by_name(&self, name: &str) -> Option<&IndexVariant> {
368        self.variants
369            .iter()
370            .find(|variant| variant.name.as_deref() == Some(name))
371    }
372
373    pub fn best_match(&self, directives: &[(String, SortDirection)]) -> Option<&IndexVariant> {
374        let mut best: Option<&IndexVariant> = None;
375        for variant in &self.variants {
376            if variant.matches(directives) {
377                let replace = match best {
378                    None => true,
379                    Some(current) => variant.columns.len() > current.columns.len(),
380                };
381                if replace {
382                    best = Some(variant);
383                }
384            }
385        }
386        best
387    }
388}
389
390#[derive(Debug, Clone, Serialize, Deserialize)]
391pub struct IndexVariant {
392    columns: Vec<String>,
393    directions: Vec<SortDirection>,
394    column_types: Vec<ColumnType>,
395    map: BTreeMap<Vec<DirectionalComparableValue>, Vec<u64>>,
396    #[serde(default)]
397    name: Option<String>,
398}
399
400impl IndexVariant {
401    pub fn columns(&self) -> &[String] {
402        &self.columns
403    }
404
405    pub fn directions(&self) -> &[SortDirection] {
406        &self.directions
407    }
408
409    pub fn name(&self) -> Option<&str> {
410        self.name.as_deref()
411    }
412
413    pub fn column_types(&self) -> &[ColumnType] {
414        &self.column_types
415    }
416
417    pub fn ordered_offsets(&self) -> impl Iterator<Item = u64> + '_ {
418        self.map
419            .values()
420            .flat_map(|offsets| offsets.iter().copied())
421    }
422
423    pub fn matches(&self, directives: &[(String, SortDirection)]) -> bool {
424        if directives.len() < self.columns.len() {
425            return false;
426        }
427        self.columns
428            .iter()
429            .zip(self.directions.iter())
430            .zip(directives.iter())
431            .all(
432                |((column, direction), (requested_column, requested_direction))| {
433                    column == requested_column && direction == requested_direction
434                },
435            )
436    }
437
438    pub fn describe(&self) -> String {
439        let body = self
440            .columns
441            .iter()
442            .zip(self.directions.iter())
443            .map(|(column, direction)| format!("{column}:{direction}"))
444            .collect::<Vec<_>>()
445            .join(", ");
446        match &self.name {
447            Some(name) => format!("{name} -> {body}"),
448            None => body,
449        }
450    }
451}
452
453#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
454struct DirectionalComparableValue {
455    value: ComparableValue,
456    direction: SortDirection,
457}
458
459impl DirectionalComparableValue {
460    fn new(value: ComparableValue, direction: SortDirection) -> Self {
461        Self { value, direction }
462    }
463}
464
465impl std::cmp::Ord for DirectionalComparableValue {
466    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
467        debug_assert_eq!(self.direction, other.direction);
468        match self.direction {
469            SortDirection::Asc => self.value.cmp(&other.value),
470            SortDirection::Desc => other.value.cmp(&self.value),
471        }
472    }
473}
474
475impl PartialOrd for DirectionalComparableValue {
476    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
477        Some(self.cmp(other))
478    }
479}
480
481struct IndexVariantBuilder {
482    columns: Vec<String>,
483    directions: Vec<SortDirection>,
484    column_indices: Vec<usize>,
485    column_types: Vec<ColumnType>,
486    column_meta: Vec<Option<ColumnMeta>>,
487    map: BTreeMap<Vec<DirectionalComparableValue>, Vec<u64>>,
488    encoding: &'static Encoding,
489    name: Option<String>,
490}
491
492impl IndexVariantBuilder {
493    fn new(
494        definition: &IndexDefinition,
495        headers: &[String],
496        schema: Option<&Schema>,
497        encoding: &'static Encoding,
498    ) -> Result<Self> {
499        if definition.columns.len() != definition.directions.len() {
500            return Err(anyhow!(
501                "Column count and direction count mismatch for index specification"
502            ));
503        }
504        let column_indices = lookup_indices(headers, &definition.columns)?;
505        let column_meta = definition
506            .columns
507            .iter()
508            .map(|name| {
509                schema
510                    .and_then(|s| s.columns.iter().find(|c| c.name == *name))
511                    .cloned()
512            })
513            .collect::<Vec<_>>();
514        let column_types = column_meta
515            .iter()
516            .map(|meta| {
517                meta.as_ref()
518                    .map(|c| c.datatype.clone())
519                    .unwrap_or(ColumnType::String)
520            })
521            .collect();
522        Ok(IndexVariantBuilder {
523            columns: definition.columns.clone(),
524            directions: definition.directions.clone(),
525            column_indices,
526            column_types,
527            column_meta,
528            map: BTreeMap::new(),
529            encoding,
530            name: definition.name.clone(),
531        })
532    }
533
534    fn add_record(&mut self, record: &csv::ByteRecord, offset: u64) -> Result<()> {
535        let mut key_components = Vec::with_capacity(self.column_indices.len());
536        for (idx, column_index) in self.column_indices.iter().enumerate() {
537            let raw = record
538                .get(*column_index)
539                .map(|slice| io_utils::decode_bytes(slice, self.encoding))
540                .transpose()?;
541            let comparable = match raw {
542                Some(value) => {
543                    let ty = &self.column_types[idx];
544                    let final_value = if let Some(meta) =
545                        self.column_meta.get(idx).and_then(|meta| meta.as_ref())
546                    {
547                        let mut current: Cow<'_, str> = Cow::Borrowed(value.as_str());
548                        if meta.has_mappings() {
549                            current = match meta.apply_mappings_to_value(current.as_ref())? {
550                                Some(mapped) => Cow::Owned(mapped),
551                                None => Cow::Owned(String::new()),
552                            };
553                        }
554                        current = match meta.normalize_value(current.as_ref()) {
555                            Cow::Borrowed(_) => current,
556                            Cow::Owned(replaced) => Cow::Owned(replaced),
557                        };
558                        current
559                    } else {
560                        Cow::Borrowed(value.as_str())
561                    };
562                    let parsed = parse_typed_value(final_value.as_ref(), ty)?;
563                    ComparableValue(parsed)
564                }
565                None => ComparableValue(None),
566            };
567            key_components.push(DirectionalComparableValue::new(
568                comparable,
569                self.directions[idx],
570            ));
571        }
572        self.map.entry(key_components).or_default().push(offset);
573        Ok(())
574    }
575
576    fn finish(self) -> IndexVariant {
577        IndexVariant {
578            columns: self.columns,
579            directions: self.directions,
580            column_types: self.column_types,
581            map: self.map,
582            name: self.name,
583        }
584    }
585}
586
587fn lookup_indices(headers: &[String], columns: &[String]) -> Result<Vec<usize>> {
588    columns
589        .iter()
590        .map(|column| {
591            headers
592                .iter()
593                .position(|header| header == column)
594                .ok_or_else(|| anyhow!("Column '{column}' not found in CSV headers"))
595        })
596        .collect()
597}
598
599#[derive(Debug, Clone, Serialize, Deserialize)]
600struct LegacyCsvIndex {
601    version: u32,
602    columns: Vec<String>,
603    column_types: Vec<ColumnType>,
604    headers: Vec<String>,
605    map: BTreeMap<Vec<ComparableValue>, Vec<u64>>,
606}
607
608impl From<LegacyCsvIndex> for CsvIndex {
609    fn from(legacy: LegacyCsvIndex) -> Self {
610        let directions = vec![SortDirection::Asc; legacy.columns.len()];
611        let map = legacy
612            .map
613            .into_iter()
614            .map(|(key, offsets)| {
615                let directional_key = key
616                    .into_iter()
617                    .map(|value| DirectionalComparableValue::new(value, SortDirection::Asc))
618                    .collect::<Vec<_>>();
619                (directional_key, offsets)
620            })
621            .collect::<BTreeMap<_, _>>();
622        let row_count = map.values().map(|offsets| offsets.len()).sum();
623        CsvIndex {
624            version: INDEX_VERSION,
625            headers: legacy.headers,
626            variants: vec![IndexVariant {
627                columns: legacy.columns,
628                directions,
629                column_types: legacy.column_types,
630                map,
631                name: None,
632            }],
633            row_count,
634        }
635    }
636}
637
638#[cfg(test)]
639mod tests {
640    use super::*;
641    use crate::schema::{ColumnMeta, ColumnType, DecimalSpec, Schema};
642    use encoding_rs::UTF_8;
643    use std::fs;
644    use tempfile::tempdir;
645
646    #[test]
647    fn parse_index_spec_supports_mixed_directions() {
648        let spec = IndexDefinition::parse("col1:desc,col2:asc,col3").unwrap();
649        assert_eq!(spec.columns, vec!["col1", "col2", "col3"]);
650        assert_eq!(
651            spec.directions,
652            vec![SortDirection::Desc, SortDirection::Asc, SortDirection::Asc]
653        );
654        assert!(spec.name.is_none());
655    }
656
657    #[test]
658    fn parse_index_spec_supports_named_variants() {
659        let spec = IndexDefinition::parse("top=col1:desc,col2").unwrap();
660        assert_eq!(spec.name.as_deref(), Some("top"));
661        assert_eq!(spec.columns, vec!["col1", "col2"]);
662        assert_eq!(
663            spec.directions,
664            vec![SortDirection::Desc, SortDirection::Asc]
665        );
666    }
667
668    #[test]
669    fn parse_index_spec_requires_column_name() {
670        let err =
671            IndexDefinition::parse("col1,,col2").expect_err("spec with missing column should fail");
672        assert!(err.to_string().contains("missing a column name"));
673    }
674
675    #[test]
676    fn parse_index_spec_rejects_unknown_direction() {
677        let err =
678            IndexDefinition::parse("col1:sideways").expect_err("unknown direction should fail");
679        assert!(err.to_string().contains("Unknown sort direction"));
680    }
681
682    #[test]
683    fn index_definition_from_columns_rejects_empty() {
684        let err = IndexDefinition::from_columns(vec![" ".to_string()])
685            .expect_err("empty column list should fail");
686        assert!(err.to_string().contains("At least one column"));
687    }
688
689    #[test]
690    fn expand_covering_spec_generates_prefix_variants() {
691        let variants = IndexDefinition::expand_covering_spec("col1:asc|desc,col2:asc").unwrap();
692        assert_eq!(variants.len(), 4);
693        let coverings: Vec<(Vec<String>, Vec<SortDirection>, String)> = variants
694            .into_iter()
695            .map(|definition| {
696                (
697                    definition.columns,
698                    definition.directions,
699                    definition.name.unwrap(),
700                )
701            })
702            .collect();
703        assert!(coverings.iter().any(|(cols, dirs, _)| {
704            cols == &vec!["col1".to_string()] && dirs == &vec![SortDirection::Asc]
705        }));
706        assert!(coverings.iter().any(|(cols, dirs, _)| {
707            cols == &vec!["col1".to_string()] && dirs == &vec![SortDirection::Desc]
708        }));
709        assert!(coverings.iter().any(|(cols, dirs, name)| {
710            cols == &vec!["col1".to_string(), "col2".to_string()]
711                && dirs == &vec![SortDirection::Asc, SortDirection::Asc]
712                && name.contains("col1-asc")
713        }));
714    }
715
716    #[test]
717    fn save_and_load_index_with_decimal_column() {
718        let temp = tempdir().expect("temp dir");
719        let csv_path = temp.path().join("decimal.csv");
720        fs::write(&csv_path, "id,amount\n1,42.50\n2,13.37\n").expect("write csv");
721
722        let schema = Schema {
723            columns: vec![
724                ColumnMeta {
725                    name: "id".to_string(),
726                    datatype: ColumnType::Integer,
727                    rename: None,
728                    value_replacements: Vec::new(),
729                    datatype_mappings: Vec::new(),
730                },
731                ColumnMeta {
732                    name: "amount".to_string(),
733                    datatype: ColumnType::Decimal(
734                        DecimalSpec::new(4, 2).expect("valid decimal spec"),
735                    ),
736                    rename: None,
737                    value_replacements: Vec::new(),
738                    datatype_mappings: Vec::new(),
739                },
740            ],
741            schema_version: None,
742            has_headers: true,
743        };
744
745        let definition = IndexDefinition::from_columns(vec!["amount".to_string()]).unwrap();
746        let index = CsvIndex::build(&csv_path, &[definition], Some(&schema), None, b',', UTF_8)
747            .expect("build index");
748
749        let index_path = temp.path().join("decimal.idx");
750        index.save(&index_path).expect("save index");
751
752        let loaded = CsvIndex::load(&index_path).expect("load index");
753        assert_eq!(loaded.variants().len(), index.variants().len());
754        assert_eq!(loaded.row_count(), index.row_count());
755    }
756
757    #[test]
758    fn expand_covering_spec_honors_name_prefix() {
759        let variants =
760            IndexDefinition::expand_covering_spec("geo=country:asc|desc,region:asc|desc").unwrap();
761        assert!(variants.len() >= 4);
762        for definition in variants {
763            let name = definition.name.unwrap();
764            assert!(name.starts_with("geo_"));
765            assert_eq!(definition.columns[0], "country");
766        }
767    }
768
769    #[test]
770    fn build_multiple_variants_and_match() {
771        let dir = tempdir().unwrap();
772        let csv_path = dir.path().join("data.csv");
773        std::fs::write(&csv_path, "a,b,c\n1,x,alpha\n2,y,beta\n3,z,gamma\n").unwrap();
774
775        let definitions = vec![
776            IndexDefinition::from_columns(vec!["a".to_string()]).unwrap(),
777            IndexDefinition::parse("descending=a:desc,b:asc").unwrap(),
778        ];
779
780        let index = CsvIndex::build(&csv_path, &definitions, None, None, b',', UTF_8).unwrap();
781
782        assert_eq!(index.variants().len(), 2);
783
784        let asc_match = index
785            .best_match(&[("a".to_string(), SortDirection::Asc)])
786            .unwrap();
787        assert_eq!(
788            asc_match
789                .columns()
790                .iter()
791                .map(|s| s.as_str())
792                .collect::<Vec<_>>(),
793            vec!["a"]
794        );
795
796        let desc_match = index
797            .best_match(&[
798                ("a".to_string(), SortDirection::Desc),
799                ("b".to_string(), SortDirection::Asc),
800            ])
801            .unwrap();
802        assert_eq!(desc_match.name(), Some("descending"));
803        assert_eq!(
804            desc_match
805                .columns()
806                .iter()
807                .map(|s| s.as_str())
808                .collect::<Vec<_>>(),
809            vec!["a", "b"]
810        );
811
812        let offsets: Vec<u64> = desc_match.ordered_offsets().collect();
813        assert_eq!(offsets.len(), 3);
814        // Ensure first offset corresponds to highest "a" value (3)
815        assert!(offsets[0] > offsets[2]);
816    }
817}