csv_managed/
index.rs

1use std::{collections::BTreeMap, fs::File, io::BufWriter, path::Path};
2
3use anyhow::{Context, Result, anyhow};
4use serde::{Deserialize, Serialize};
5
6use crate::{
7    data::{ComparableValue, parse_typed_value},
8    io_utils,
9    schema::{ColumnMeta, ColumnType, Schema},
10};
11
12use encoding_rs::Encoding;
13
14const INDEX_VERSION: u32 = 2;
15
16#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
17pub enum SortDirection {
18    Asc,
19    Desc,
20}
21
22impl SortDirection {
23    pub fn is_ascending(self) -> bool {
24        matches!(self, SortDirection::Asc)
25    }
26
27    fn as_str(self) -> &'static str {
28        match self {
29            SortDirection::Asc => "asc",
30            SortDirection::Desc => "desc",
31        }
32    }
33}
34
35impl std::fmt::Display for SortDirection {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        write!(f, "{}", self.as_str())
38    }
39}
40
41#[derive(Debug, Clone)]
42pub struct IndexDefinition {
43    pub columns: Vec<String>,
44    pub directions: Vec<SortDirection>,
45    pub name: Option<String>,
46}
47
48impl IndexDefinition {
49    pub fn from_columns(columns: Vec<String>) -> Result<Self> {
50        let cleaned: Vec<String> = columns
51            .into_iter()
52            .map(|c| c.trim().to_string())
53            .filter(|c| !c.is_empty())
54            .collect();
55        if cleaned.is_empty() {
56            return Err(anyhow!("At least one column is required to build an index"));
57        }
58        Ok(IndexDefinition {
59            directions: vec![SortDirection::Asc; cleaned.len()],
60            columns: cleaned,
61            name: None,
62        })
63    }
64
65    pub fn parse(spec: &str) -> Result<Self> {
66        let (name, remainder) = if let Some((raw_name, rest)) = spec.split_once('=') {
67            let trimmed_name = raw_name.trim();
68            if trimmed_name.is_empty() {
69                return Err(anyhow!(
70                    "Index specification is missing a variant name before '=': '{spec}'"
71                ));
72            }
73            let trimmed_rest = rest.trim();
74            if trimmed_rest.is_empty() {
75                return Err(anyhow!(
76                    "Index specification '{spec}' is missing column definitions after '='"
77                ));
78            }
79            (Some(trimmed_name.to_string()), trimmed_rest)
80        } else {
81            (None, spec)
82        };
83
84        let mut columns = Vec::new();
85        let mut directions = Vec::new();
86        for token in remainder.split(',') {
87            let mut parts = token.split(':');
88            let column = parts
89                .next()
90                .map(|s| s.trim())
91                .filter(|s| !s.is_empty())
92                .ok_or_else(|| anyhow!("Index specification is missing a column name"))?;
93            let direction = parts
94                .next()
95                .map(|raw| raw.trim().to_ascii_lowercase())
96                .filter(|s| !s.is_empty())
97                .map(|value| match value.as_str() {
98                    "asc" => Ok(SortDirection::Asc),
99                    "desc" => Ok(SortDirection::Desc),
100                    other => Err(anyhow!("Unknown sort direction '{other}'")),
101                })
102                .transpose()?;
103            columns.push(column.to_string());
104            directions.push(direction.unwrap_or(SortDirection::Asc));
105        }
106        if columns.is_empty() {
107            return Err(anyhow!(
108                "Index specification did not contain any columns: '{spec}'"
109            ));
110        }
111        Ok(IndexDefinition {
112            columns,
113            directions,
114            name,
115        })
116    }
117
118    pub fn expand_combo_spec(spec: &str) -> Result<Vec<Self>> {
119        let (name_prefix, remainder) = if let Some((raw_name, rest)) = spec.split_once('=') {
120            let trimmed_name = raw_name.trim();
121            if trimmed_name.is_empty() {
122                return Err(anyhow!(
123                    "Combination specification is missing a name before '=': '{spec}'"
124                ));
125            }
126            let trimmed_rest = rest.trim();
127            if trimmed_rest.is_empty() {
128                return Err(anyhow!(
129                    "Combination specification '{spec}' is missing column definitions after '='"
130                ));
131            }
132            (Some(trimmed_name.to_string()), trimmed_rest)
133        } else {
134            (None, spec.trim())
135        };
136
137        let columns = remainder
138            .split(',')
139            .map(|token| token.trim())
140            .filter(|token| !token.is_empty())
141            .map(parse_combo_column)
142            .collect::<Result<Vec<_>>>()?;
143
144        if columns.is_empty() {
145            return Err(anyhow!(
146                "Combination specification did not contain any columns: '{spec}'"
147            ));
148        }
149
150        let mut definitions = Vec::new();
151        for prefix_len in 1..=columns.len() {
152            let prefix = &columns[..prefix_len];
153            let direction_sets = prefix
154                .iter()
155                .map(|column| column.directions.as_slice())
156                .collect::<Vec<_>>();
157            for directions in cartesian_product(&direction_sets) {
158                let column_names = prefix
159                    .iter()
160                    .map(|column| column.name.clone())
161                    .collect::<Vec<_>>();
162                let variant_name =
163                    build_combo_name(name_prefix.as_deref(), &column_names, &directions);
164                definitions.push(IndexDefinition {
165                    columns: column_names,
166                    directions,
167                    name: Some(variant_name),
168                });
169            }
170        }
171
172        Ok(definitions)
173    }
174}
175
176#[derive(Debug, Clone)]
177struct ComboColumn {
178    name: String,
179    directions: Vec<SortDirection>,
180}
181
182fn parse_combo_column(token: &str) -> Result<ComboColumn> {
183    let mut parts = token.split(':');
184    let name = parts
185        .next()
186        .map(|s| s.trim())
187        .filter(|s| !s.is_empty())
188        .ok_or_else(|| anyhow!("Combination specification is missing a column name"))?;
189    let directions = if let Some(dir_part) = parts.next() {
190        let options = dir_part
191            .split('|')
192            .map(|raw| raw.trim().to_ascii_lowercase())
193            .filter(|s| !s.is_empty())
194            .map(|value| match value.as_str() {
195                "asc" => Ok(SortDirection::Asc),
196                "desc" => Ok(SortDirection::Desc),
197                other => Err(anyhow!("Unknown sort direction '{other}'")),
198            })
199            .collect::<Result<Vec<_>>>()?;
200        if options.is_empty() {
201            vec![SortDirection::Asc]
202        } else {
203            options
204        }
205    } else {
206        vec![SortDirection::Asc]
207    };
208
209    Ok(ComboColumn {
210        name: name.to_string(),
211        directions,
212    })
213}
214
215fn cartesian_product(options: &[&[SortDirection]]) -> Vec<Vec<SortDirection>> {
216    let mut acc = vec![Vec::new()];
217    for set in options {
218        let mut next = Vec::new();
219        for combination in &acc {
220            for direction in *set {
221                let mut updated = combination.clone();
222                updated.push(*direction);
223                next.push(updated);
224            }
225        }
226        acc = next;
227    }
228    acc
229}
230
231fn build_combo_name(
232    prefix: Option<&str>,
233    columns: &[String],
234    directions: &[SortDirection],
235) -> String {
236    let suffix = columns
237        .iter()
238        .zip(directions.iter())
239        .map(|(column, direction)| {
240            format!("{}-{}", sanitize_identifier(column), direction.as_str())
241        })
242        .collect::<Vec<_>>()
243        .join("_");
244    match prefix {
245        Some(p) => {
246            if suffix.is_empty() {
247                sanitize_identifier(p)
248            } else {
249                format!("{}_{}", sanitize_identifier(p), suffix)
250            }
251        }
252        None => suffix,
253    }
254}
255
256fn sanitize_identifier(value: &str) -> String {
257    value
258        .chars()
259        .map(|ch| match ch {
260            'a'..='z' | 'A'..='Z' | '0'..='9' | '_' => ch,
261            _ => '_',
262        })
263        .collect()
264}
265
266#[derive(Debug, Clone, Serialize, Deserialize)]
267pub struct CsvIndex {
268    version: u32,
269    headers: Vec<String>,
270    variants: Vec<IndexVariant>,
271    row_count: usize,
272}
273
274impl CsvIndex {
275    pub fn build(
276        csv_path: &Path,
277        definitions: &[IndexDefinition],
278        schema: Option<&Schema>,
279        limit: Option<usize>,
280        delimiter: u8,
281        encoding: &'static Encoding,
282    ) -> Result<Self> {
283        if definitions.is_empty() {
284            return Err(anyhow!(
285                "Specify at least one column set via --columns or --spec"
286            ));
287        }
288
289        let mut reader = io_utils::open_seekable_csv_reader(csv_path, delimiter, true)?;
290        let headers = io_utils::reader_headers(&mut reader, encoding)?;
291
292        let mut builders = definitions
293            .iter()
294            .map(|definition| IndexVariantBuilder::new(definition, &headers, schema, encoding))
295            .collect::<Result<Vec<_>>>()?;
296
297        let mut record = csv::ByteRecord::new();
298        let mut processed = 0usize;
299
300        loop {
301            if limit.is_some_and(|limit| processed >= limit) {
302                break;
303            }
304            let start_offset = reader.position().byte();
305            if !reader.read_byte_record(&mut record)? {
306                break;
307            }
308            for builder in builders.iter_mut() {
309                builder.add_record(&record, start_offset)?;
310            }
311            processed += 1;
312        }
313
314        let variants = builders
315            .into_iter()
316            .map(IndexVariantBuilder::finish)
317            .collect::<Vec<_>>();
318
319        Ok(CsvIndex {
320            version: INDEX_VERSION,
321            headers,
322            row_count: processed,
323            variants,
324        })
325    }
326
327    pub fn save(&self, path: &Path) -> Result<()> {
328        let file = File::create(path).with_context(|| format!("Creating index file {path:?}"))?;
329        let writer = BufWriter::new(file);
330        bincode::serialize_into(writer, self).context("Writing index file")
331    }
332
333    pub fn load(path: &Path) -> Result<Self> {
334        let bytes = std::fs::read(path).with_context(|| format!("Opening index file {path:?}"))?;
335        match bincode::deserialize::<CsvIndex>(&bytes) {
336            Ok(index) => {
337                if index.version != INDEX_VERSION {
338                    return Err(anyhow!(
339                        "Unsupported index version {} (expected {INDEX_VERSION})",
340                        index.version
341                    ));
342                }
343                Ok(index)
344            }
345            Err(_) => {
346                let legacy: LegacyCsvIndex =
347                    bincode::deserialize(&bytes).context("Reading legacy index file format")?;
348                Ok(legacy.into())
349            }
350        }
351    }
352
353    pub fn variants(&self) -> &[IndexVariant] {
354        &self.variants
355    }
356
357    pub fn row_count(&self) -> usize {
358        self.row_count
359    }
360
361    pub fn variant_by_name(&self, name: &str) -> Option<&IndexVariant> {
362        self.variants
363            .iter()
364            .find(|variant| variant.name.as_deref() == Some(name))
365    }
366
367    pub fn best_match(&self, directives: &[(String, SortDirection)]) -> Option<&IndexVariant> {
368        let mut best: Option<&IndexVariant> = None;
369        for variant in &self.variants {
370            if variant.matches(directives) {
371                let replace = match best {
372                    None => true,
373                    Some(current) => variant.columns.len() > current.columns.len(),
374                };
375                if replace {
376                    best = Some(variant);
377                }
378            }
379        }
380        best
381    }
382}
383
384#[derive(Debug, Clone, Serialize, Deserialize)]
385pub struct IndexVariant {
386    columns: Vec<String>,
387    directions: Vec<SortDirection>,
388    column_types: Vec<ColumnType>,
389    map: BTreeMap<Vec<DirectionalComparableValue>, Vec<u64>>,
390    #[serde(default)]
391    name: Option<String>,
392}
393
394impl IndexVariant {
395    pub fn columns(&self) -> &[String] {
396        &self.columns
397    }
398
399    pub fn directions(&self) -> &[SortDirection] {
400        &self.directions
401    }
402
403    pub fn name(&self) -> Option<&str> {
404        self.name.as_deref()
405    }
406
407    pub fn column_types(&self) -> &[ColumnType] {
408        &self.column_types
409    }
410
411    pub fn ordered_offsets(&self) -> impl Iterator<Item = u64> + '_ {
412        self.map
413            .values()
414            .flat_map(|offsets| offsets.iter().copied())
415    }
416
417    pub fn matches(&self, directives: &[(String, SortDirection)]) -> bool {
418        if directives.len() < self.columns.len() {
419            return false;
420        }
421        self.columns
422            .iter()
423            .zip(self.directions.iter())
424            .zip(directives.iter())
425            .all(
426                |((column, direction), (requested_column, requested_direction))| {
427                    column == requested_column && direction == requested_direction
428                },
429            )
430    }
431
432    pub fn describe(&self) -> String {
433        let body = self
434            .columns
435            .iter()
436            .zip(self.directions.iter())
437            .map(|(column, direction)| format!("{column}:{direction}"))
438            .collect::<Vec<_>>()
439            .join(", ");
440        match &self.name {
441            Some(name) => format!("{name} -> {body}"),
442            None => body,
443        }
444    }
445}
446
447#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
448struct DirectionalComparableValue {
449    value: ComparableValue,
450    direction: SortDirection,
451}
452
453impl DirectionalComparableValue {
454    fn new(value: ComparableValue, direction: SortDirection) -> Self {
455        Self { value, direction }
456    }
457}
458
459impl std::cmp::Ord for DirectionalComparableValue {
460    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
461        debug_assert_eq!(self.direction, other.direction);
462        match self.direction {
463            SortDirection::Asc => self.value.cmp(&other.value),
464            SortDirection::Desc => other.value.cmp(&self.value),
465        }
466    }
467}
468
469impl PartialOrd for DirectionalComparableValue {
470    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
471        Some(self.cmp(other))
472    }
473}
474
475struct IndexVariantBuilder {
476    columns: Vec<String>,
477    directions: Vec<SortDirection>,
478    column_indices: Vec<usize>,
479    column_types: Vec<ColumnType>,
480    column_meta: Vec<Option<ColumnMeta>>,
481    map: BTreeMap<Vec<DirectionalComparableValue>, Vec<u64>>,
482    encoding: &'static Encoding,
483    name: Option<String>,
484}
485
486impl IndexVariantBuilder {
487    fn new(
488        definition: &IndexDefinition,
489        headers: &[String],
490        schema: Option<&Schema>,
491        encoding: &'static Encoding,
492    ) -> Result<Self> {
493        if definition.columns.len() != definition.directions.len() {
494            return Err(anyhow!(
495                "Column count and direction count mismatch for index specification"
496            ));
497        }
498        let column_indices = lookup_indices(headers, &definition.columns)?;
499        let column_meta = definition
500            .columns
501            .iter()
502            .map(|name| {
503                schema
504                    .and_then(|s| s.columns.iter().find(|c| c.name == *name))
505                    .cloned()
506            })
507            .collect::<Vec<_>>();
508        let column_types = column_meta
509            .iter()
510            .map(|meta| {
511                meta.as_ref()
512                    .map(|c| c.datatype.clone())
513                    .unwrap_or(ColumnType::String)
514            })
515            .collect();
516        Ok(IndexVariantBuilder {
517            columns: definition.columns.clone(),
518            directions: definition.directions.clone(),
519            column_indices,
520            column_types,
521            column_meta,
522            map: BTreeMap::new(),
523            encoding,
524            name: definition.name.clone(),
525        })
526    }
527
528    fn add_record(&mut self, record: &csv::ByteRecord, offset: u64) -> Result<()> {
529        let mut key_components = Vec::with_capacity(self.column_indices.len());
530        for (idx, column_index) in self.column_indices.iter().enumerate() {
531            let raw = record
532                .get(*column_index)
533                .map(|slice| io_utils::decode_bytes(slice, self.encoding))
534                .transpose()?;
535            let comparable = match raw {
536                Some(value) => {
537                    let ty = &self.column_types[idx];
538                    let normalized = self
539                        .column_meta
540                        .get(idx)
541                        .and_then(|meta| meta.as_ref())
542                        .map(|meta| meta.normalize_value(&value))
543                        .unwrap_or_else(|| std::borrow::Cow::Borrowed(value.as_str()));
544                    let parsed = parse_typed_value(normalized.as_ref(), ty)?;
545                    ComparableValue(parsed)
546                }
547                None => ComparableValue(None),
548            };
549            key_components.push(DirectionalComparableValue::new(
550                comparable,
551                self.directions[idx],
552            ));
553        }
554        self.map.entry(key_components).or_default().push(offset);
555        Ok(())
556    }
557
558    fn finish(self) -> IndexVariant {
559        IndexVariant {
560            columns: self.columns,
561            directions: self.directions,
562            column_types: self.column_types,
563            map: self.map,
564            name: self.name,
565        }
566    }
567}
568
569fn lookup_indices(headers: &[String], columns: &[String]) -> Result<Vec<usize>> {
570    columns
571        .iter()
572        .map(|column| {
573            headers
574                .iter()
575                .position(|header| header == column)
576                .ok_or_else(|| anyhow!("Column '{column}' not found in CSV headers"))
577        })
578        .collect()
579}
580
581#[derive(Debug, Clone, Serialize, Deserialize)]
582struct LegacyCsvIndex {
583    version: u32,
584    columns: Vec<String>,
585    column_types: Vec<ColumnType>,
586    headers: Vec<String>,
587    map: BTreeMap<Vec<ComparableValue>, Vec<u64>>,
588}
589
590impl From<LegacyCsvIndex> for CsvIndex {
591    fn from(legacy: LegacyCsvIndex) -> Self {
592        let directions = vec![SortDirection::Asc; legacy.columns.len()];
593        let map = legacy
594            .map
595            .into_iter()
596            .map(|(key, offsets)| {
597                let directional_key = key
598                    .into_iter()
599                    .map(|value| DirectionalComparableValue::new(value, SortDirection::Asc))
600                    .collect::<Vec<_>>();
601                (directional_key, offsets)
602            })
603            .collect::<BTreeMap<_, _>>();
604        let row_count = map.values().map(|offsets| offsets.len()).sum();
605        CsvIndex {
606            version: INDEX_VERSION,
607            headers: legacy.headers,
608            variants: vec![IndexVariant {
609                columns: legacy.columns,
610                directions,
611                column_types: legacy.column_types,
612                map,
613                name: None,
614            }],
615            row_count,
616        }
617    }
618}
619
620#[cfg(test)]
621mod tests {
622    use super::*;
623    use encoding_rs::UTF_8;
624    use tempfile::tempdir;
625
626    #[test]
627    fn parse_index_spec_supports_mixed_directions() {
628        let spec = IndexDefinition::parse("col1:desc,col2:asc,col3").unwrap();
629        assert_eq!(spec.columns, vec!["col1", "col2", "col3"]);
630        assert_eq!(
631            spec.directions,
632            vec![SortDirection::Desc, SortDirection::Asc, SortDirection::Asc]
633        );
634        assert!(spec.name.is_none());
635    }
636
637    #[test]
638    fn parse_index_spec_supports_named_variants() {
639        let spec = IndexDefinition::parse("top=col1:desc,col2").unwrap();
640        assert_eq!(spec.name.as_deref(), Some("top"));
641        assert_eq!(spec.columns, vec!["col1", "col2"]);
642        assert_eq!(
643            spec.directions,
644            vec![SortDirection::Desc, SortDirection::Asc]
645        );
646    }
647
648    #[test]
649    fn expand_combo_spec_generates_prefix_variants() {
650        let variants = IndexDefinition::expand_combo_spec("col1:asc|desc,col2:asc").unwrap();
651        assert_eq!(variants.len(), 4);
652        let combos: Vec<(Vec<String>, Vec<SortDirection>, String)> = variants
653            .into_iter()
654            .map(|definition| {
655                (
656                    definition.columns,
657                    definition.directions,
658                    definition.name.unwrap(),
659                )
660            })
661            .collect();
662        assert!(combos.iter().any(|(cols, dirs, _)| {
663            cols == &vec!["col1".to_string()] && dirs == &vec![SortDirection::Asc]
664        }));
665        assert!(combos.iter().any(|(cols, dirs, _)| {
666            cols == &vec!["col1".to_string()] && dirs == &vec![SortDirection::Desc]
667        }));
668        assert!(combos.iter().any(|(cols, dirs, name)| {
669            cols == &vec!["col1".to_string(), "col2".to_string()]
670                && dirs == &vec![SortDirection::Asc, SortDirection::Asc]
671                && name.contains("col1-asc")
672        }));
673    }
674
675    #[test]
676    fn expand_combo_spec_honors_name_prefix() {
677        let variants =
678            IndexDefinition::expand_combo_spec("geo=country:asc|desc,region:asc|desc").unwrap();
679        assert!(variants.len() >= 4);
680        for definition in variants {
681            let name = definition.name.unwrap();
682            assert!(name.starts_with("geo_"));
683            assert_eq!(definition.columns[0], "country");
684        }
685    }
686
687    #[test]
688    fn build_multiple_variants_and_match() {
689        let dir = tempdir().unwrap();
690        let csv_path = dir.path().join("data.csv");
691        std::fs::write(&csv_path, "a,b,c\n1,x,alpha\n2,y,beta\n3,z,gamma\n").unwrap();
692
693        let definitions = vec![
694            IndexDefinition::from_columns(vec!["a".to_string()]).unwrap(),
695            IndexDefinition::parse("descending=a:desc,b:asc").unwrap(),
696        ];
697
698        let index = CsvIndex::build(&csv_path, &definitions, None, None, b',', UTF_8).unwrap();
699
700        assert_eq!(index.variants().len(), 2);
701
702        let asc_match = index
703            .best_match(&[("a".to_string(), SortDirection::Asc)])
704            .unwrap();
705        assert_eq!(
706            asc_match
707                .columns()
708                .iter()
709                .map(|s| s.as_str())
710                .collect::<Vec<_>>(),
711            vec!["a"]
712        );
713
714        let desc_match = index
715            .best_match(&[
716                ("a".to_string(), SortDirection::Desc),
717                ("b".to_string(), SortDirection::Asc),
718            ])
719            .unwrap();
720        assert_eq!(desc_match.name(), Some("descending"));
721        assert_eq!(
722            desc_match
723                .columns()
724                .iter()
725                .map(|s| s.as_str())
726                .collect::<Vec<_>>(),
727            vec!["a", "b"]
728        );
729
730        let offsets: Vec<u64> = desc_match.ordered_offsets().collect();
731        assert_eq!(offsets.len(), 3);
732        // Ensure first offset corresponds to highest "a" value (3)
733        assert!(offsets[0] > offsets[2]);
734    }
735}