1use std::{collections::BTreeMap, fs::File, io::BufWriter, path::Path};
2
3use anyhow::{Context, Result, anyhow};
4use serde::{Deserialize, Serialize};
5
6use crate::{
7 data::{ComparableValue, parse_typed_value},
8 io_utils,
9 schema::{ColumnMeta, ColumnType, Schema},
10};
11
12use encoding_rs::Encoding;
13
14const INDEX_VERSION: u32 = 2;
15
16#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
17pub enum SortDirection {
18 Asc,
19 Desc,
20}
21
22impl SortDirection {
23 pub fn is_ascending(self) -> bool {
24 matches!(self, SortDirection::Asc)
25 }
26
27 fn as_str(self) -> &'static str {
28 match self {
29 SortDirection::Asc => "asc",
30 SortDirection::Desc => "desc",
31 }
32 }
33}
34
35impl std::fmt::Display for SortDirection {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 write!(f, "{}", self.as_str())
38 }
39}
40
41#[derive(Debug, Clone)]
42pub struct IndexDefinition {
43 pub columns: Vec<String>,
44 pub directions: Vec<SortDirection>,
45 pub name: Option<String>,
46}
47
48impl IndexDefinition {
49 pub fn from_columns(columns: Vec<String>) -> Result<Self> {
50 let cleaned: Vec<String> = columns
51 .into_iter()
52 .map(|c| c.trim().to_string())
53 .filter(|c| !c.is_empty())
54 .collect();
55 if cleaned.is_empty() {
56 return Err(anyhow!("At least one column is required to build an index"));
57 }
58 Ok(IndexDefinition {
59 directions: vec![SortDirection::Asc; cleaned.len()],
60 columns: cleaned,
61 name: None,
62 })
63 }
64
65 pub fn parse(spec: &str) -> Result<Self> {
66 let (name, remainder) = if let Some((raw_name, rest)) = spec.split_once('=') {
67 let trimmed_name = raw_name.trim();
68 if trimmed_name.is_empty() {
69 return Err(anyhow!(
70 "Index specification is missing a variant name before '=': '{spec}'"
71 ));
72 }
73 let trimmed_rest = rest.trim();
74 if trimmed_rest.is_empty() {
75 return Err(anyhow!(
76 "Index specification '{spec}' is missing column definitions after '='"
77 ));
78 }
79 (Some(trimmed_name.to_string()), trimmed_rest)
80 } else {
81 (None, spec)
82 };
83
84 let mut columns = Vec::new();
85 let mut directions = Vec::new();
86 for token in remainder.split(',') {
87 let mut parts = token.split(':');
88 let column = parts
89 .next()
90 .map(|s| s.trim())
91 .filter(|s| !s.is_empty())
92 .ok_or_else(|| anyhow!("Index specification is missing a column name"))?;
93 let direction = parts
94 .next()
95 .map(|raw| raw.trim().to_ascii_lowercase())
96 .filter(|s| !s.is_empty())
97 .map(|value| match value.as_str() {
98 "asc" => Ok(SortDirection::Asc),
99 "desc" => Ok(SortDirection::Desc),
100 other => Err(anyhow!("Unknown sort direction '{other}'")),
101 })
102 .transpose()?;
103 columns.push(column.to_string());
104 directions.push(direction.unwrap_or(SortDirection::Asc));
105 }
106 if columns.is_empty() {
107 return Err(anyhow!(
108 "Index specification did not contain any columns: '{spec}'"
109 ));
110 }
111 Ok(IndexDefinition {
112 columns,
113 directions,
114 name,
115 })
116 }
117
118 pub fn expand_combo_spec(spec: &str) -> Result<Vec<Self>> {
119 let (name_prefix, remainder) = if let Some((raw_name, rest)) = spec.split_once('=') {
120 let trimmed_name = raw_name.trim();
121 if trimmed_name.is_empty() {
122 return Err(anyhow!(
123 "Combination specification is missing a name before '=': '{spec}'"
124 ));
125 }
126 let trimmed_rest = rest.trim();
127 if trimmed_rest.is_empty() {
128 return Err(anyhow!(
129 "Combination specification '{spec}' is missing column definitions after '='"
130 ));
131 }
132 (Some(trimmed_name.to_string()), trimmed_rest)
133 } else {
134 (None, spec.trim())
135 };
136
137 let columns = remainder
138 .split(',')
139 .map(|token| token.trim())
140 .filter(|token| !token.is_empty())
141 .map(parse_combo_column)
142 .collect::<Result<Vec<_>>>()?;
143
144 if columns.is_empty() {
145 return Err(anyhow!(
146 "Combination specification did not contain any columns: '{spec}'"
147 ));
148 }
149
150 let mut definitions = Vec::new();
151 for prefix_len in 1..=columns.len() {
152 let prefix = &columns[..prefix_len];
153 let direction_sets = prefix
154 .iter()
155 .map(|column| column.directions.as_slice())
156 .collect::<Vec<_>>();
157 for directions in cartesian_product(&direction_sets) {
158 let column_names = prefix
159 .iter()
160 .map(|column| column.name.clone())
161 .collect::<Vec<_>>();
162 let variant_name =
163 build_combo_name(name_prefix.as_deref(), &column_names, &directions);
164 definitions.push(IndexDefinition {
165 columns: column_names,
166 directions,
167 name: Some(variant_name),
168 });
169 }
170 }
171
172 Ok(definitions)
173 }
174}
175
176#[derive(Debug, Clone)]
177struct ComboColumn {
178 name: String,
179 directions: Vec<SortDirection>,
180}
181
182fn parse_combo_column(token: &str) -> Result<ComboColumn> {
183 let mut parts = token.split(':');
184 let name = parts
185 .next()
186 .map(|s| s.trim())
187 .filter(|s| !s.is_empty())
188 .ok_or_else(|| anyhow!("Combination specification is missing a column name"))?;
189 let directions = if let Some(dir_part) = parts.next() {
190 let options = dir_part
191 .split('|')
192 .map(|raw| raw.trim().to_ascii_lowercase())
193 .filter(|s| !s.is_empty())
194 .map(|value| match value.as_str() {
195 "asc" => Ok(SortDirection::Asc),
196 "desc" => Ok(SortDirection::Desc),
197 other => Err(anyhow!("Unknown sort direction '{other}'")),
198 })
199 .collect::<Result<Vec<_>>>()?;
200 if options.is_empty() {
201 vec![SortDirection::Asc]
202 } else {
203 options
204 }
205 } else {
206 vec![SortDirection::Asc]
207 };
208
209 Ok(ComboColumn {
210 name: name.to_string(),
211 directions,
212 })
213}
214
215fn cartesian_product(options: &[&[SortDirection]]) -> Vec<Vec<SortDirection>> {
216 let mut acc = vec![Vec::new()];
217 for set in options {
218 let mut next = Vec::new();
219 for combination in &acc {
220 for direction in *set {
221 let mut updated = combination.clone();
222 updated.push(*direction);
223 next.push(updated);
224 }
225 }
226 acc = next;
227 }
228 acc
229}
230
231fn build_combo_name(
232 prefix: Option<&str>,
233 columns: &[String],
234 directions: &[SortDirection],
235) -> String {
236 let suffix = columns
237 .iter()
238 .zip(directions.iter())
239 .map(|(column, direction)| {
240 format!("{}-{}", sanitize_identifier(column), direction.as_str())
241 })
242 .collect::<Vec<_>>()
243 .join("_");
244 match prefix {
245 Some(p) => {
246 if suffix.is_empty() {
247 sanitize_identifier(p)
248 } else {
249 format!("{}_{}", sanitize_identifier(p), suffix)
250 }
251 }
252 None => suffix,
253 }
254}
255
256fn sanitize_identifier(value: &str) -> String {
257 value
258 .chars()
259 .map(|ch| match ch {
260 'a'..='z' | 'A'..='Z' | '0'..='9' | '_' => ch,
261 _ => '_',
262 })
263 .collect()
264}
265
266#[derive(Debug, Clone, Serialize, Deserialize)]
267pub struct CsvIndex {
268 version: u32,
269 headers: Vec<String>,
270 variants: Vec<IndexVariant>,
271 row_count: usize,
272}
273
274impl CsvIndex {
275 pub fn build(
276 csv_path: &Path,
277 definitions: &[IndexDefinition],
278 schema: Option<&Schema>,
279 limit: Option<usize>,
280 delimiter: u8,
281 encoding: &'static Encoding,
282 ) -> Result<Self> {
283 if definitions.is_empty() {
284 return Err(anyhow!(
285 "Specify at least one column set via --columns or --spec"
286 ));
287 }
288
289 let mut reader = io_utils::open_seekable_csv_reader(csv_path, delimiter, true)?;
290 let headers = io_utils::reader_headers(&mut reader, encoding)?;
291
292 let mut builders = definitions
293 .iter()
294 .map(|definition| IndexVariantBuilder::new(definition, &headers, schema, encoding))
295 .collect::<Result<Vec<_>>>()?;
296
297 let mut record = csv::ByteRecord::new();
298 let mut processed = 0usize;
299
300 loop {
301 if limit.is_some_and(|limit| processed >= limit) {
302 break;
303 }
304 let start_offset = reader.position().byte();
305 if !reader.read_byte_record(&mut record)? {
306 break;
307 }
308 for builder in builders.iter_mut() {
309 builder.add_record(&record, start_offset)?;
310 }
311 processed += 1;
312 }
313
314 let variants = builders
315 .into_iter()
316 .map(IndexVariantBuilder::finish)
317 .collect::<Vec<_>>();
318
319 Ok(CsvIndex {
320 version: INDEX_VERSION,
321 headers,
322 row_count: processed,
323 variants,
324 })
325 }
326
327 pub fn save(&self, path: &Path) -> Result<()> {
328 let file = File::create(path).with_context(|| format!("Creating index file {path:?}"))?;
329 let writer = BufWriter::new(file);
330 bincode::serialize_into(writer, self).context("Writing index file")
331 }
332
333 pub fn load(path: &Path) -> Result<Self> {
334 let bytes = std::fs::read(path).with_context(|| format!("Opening index file {path:?}"))?;
335 match bincode::deserialize::<CsvIndex>(&bytes) {
336 Ok(index) => {
337 if index.version != INDEX_VERSION {
338 return Err(anyhow!(
339 "Unsupported index version {} (expected {INDEX_VERSION})",
340 index.version
341 ));
342 }
343 Ok(index)
344 }
345 Err(_) => {
346 let legacy: LegacyCsvIndex =
347 bincode::deserialize(&bytes).context("Reading legacy index file format")?;
348 Ok(legacy.into())
349 }
350 }
351 }
352
353 pub fn variants(&self) -> &[IndexVariant] {
354 &self.variants
355 }
356
357 pub fn row_count(&self) -> usize {
358 self.row_count
359 }
360
361 pub fn variant_by_name(&self, name: &str) -> Option<&IndexVariant> {
362 self.variants
363 .iter()
364 .find(|variant| variant.name.as_deref() == Some(name))
365 }
366
367 pub fn best_match(&self, directives: &[(String, SortDirection)]) -> Option<&IndexVariant> {
368 let mut best: Option<&IndexVariant> = None;
369 for variant in &self.variants {
370 if variant.matches(directives) {
371 let replace = match best {
372 None => true,
373 Some(current) => variant.columns.len() > current.columns.len(),
374 };
375 if replace {
376 best = Some(variant);
377 }
378 }
379 }
380 best
381 }
382}
383
384#[derive(Debug, Clone, Serialize, Deserialize)]
385pub struct IndexVariant {
386 columns: Vec<String>,
387 directions: Vec<SortDirection>,
388 column_types: Vec<ColumnType>,
389 map: BTreeMap<Vec<DirectionalComparableValue>, Vec<u64>>,
390 #[serde(default)]
391 name: Option<String>,
392}
393
394impl IndexVariant {
395 pub fn columns(&self) -> &[String] {
396 &self.columns
397 }
398
399 pub fn directions(&self) -> &[SortDirection] {
400 &self.directions
401 }
402
403 pub fn name(&self) -> Option<&str> {
404 self.name.as_deref()
405 }
406
407 pub fn column_types(&self) -> &[ColumnType] {
408 &self.column_types
409 }
410
411 pub fn ordered_offsets(&self) -> impl Iterator<Item = u64> + '_ {
412 self.map
413 .values()
414 .flat_map(|offsets| offsets.iter().copied())
415 }
416
417 pub fn matches(&self, directives: &[(String, SortDirection)]) -> bool {
418 if directives.len() < self.columns.len() {
419 return false;
420 }
421 self.columns
422 .iter()
423 .zip(self.directions.iter())
424 .zip(directives.iter())
425 .all(
426 |((column, direction), (requested_column, requested_direction))| {
427 column == requested_column && direction == requested_direction
428 },
429 )
430 }
431
432 pub fn describe(&self) -> String {
433 let body = self
434 .columns
435 .iter()
436 .zip(self.directions.iter())
437 .map(|(column, direction)| format!("{column}:{direction}"))
438 .collect::<Vec<_>>()
439 .join(", ");
440 match &self.name {
441 Some(name) => format!("{name} -> {body}"),
442 None => body,
443 }
444 }
445}
446
447#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
448struct DirectionalComparableValue {
449 value: ComparableValue,
450 direction: SortDirection,
451}
452
453impl DirectionalComparableValue {
454 fn new(value: ComparableValue, direction: SortDirection) -> Self {
455 Self { value, direction }
456 }
457}
458
459impl std::cmp::Ord for DirectionalComparableValue {
460 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
461 debug_assert_eq!(self.direction, other.direction);
462 match self.direction {
463 SortDirection::Asc => self.value.cmp(&other.value),
464 SortDirection::Desc => other.value.cmp(&self.value),
465 }
466 }
467}
468
469impl PartialOrd for DirectionalComparableValue {
470 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
471 Some(self.cmp(other))
472 }
473}
474
475struct IndexVariantBuilder {
476 columns: Vec<String>,
477 directions: Vec<SortDirection>,
478 column_indices: Vec<usize>,
479 column_types: Vec<ColumnType>,
480 column_meta: Vec<Option<ColumnMeta>>,
481 map: BTreeMap<Vec<DirectionalComparableValue>, Vec<u64>>,
482 encoding: &'static Encoding,
483 name: Option<String>,
484}
485
486impl IndexVariantBuilder {
487 fn new(
488 definition: &IndexDefinition,
489 headers: &[String],
490 schema: Option<&Schema>,
491 encoding: &'static Encoding,
492 ) -> Result<Self> {
493 if definition.columns.len() != definition.directions.len() {
494 return Err(anyhow!(
495 "Column count and direction count mismatch for index specification"
496 ));
497 }
498 let column_indices = lookup_indices(headers, &definition.columns)?;
499 let column_meta = definition
500 .columns
501 .iter()
502 .map(|name| {
503 schema
504 .and_then(|s| s.columns.iter().find(|c| c.name == *name))
505 .cloned()
506 })
507 .collect::<Vec<_>>();
508 let column_types = column_meta
509 .iter()
510 .map(|meta| {
511 meta.as_ref()
512 .map(|c| c.datatype.clone())
513 .unwrap_or(ColumnType::String)
514 })
515 .collect();
516 Ok(IndexVariantBuilder {
517 columns: definition.columns.clone(),
518 directions: definition.directions.clone(),
519 column_indices,
520 column_types,
521 column_meta,
522 map: BTreeMap::new(),
523 encoding,
524 name: definition.name.clone(),
525 })
526 }
527
528 fn add_record(&mut self, record: &csv::ByteRecord, offset: u64) -> Result<()> {
529 let mut key_components = Vec::with_capacity(self.column_indices.len());
530 for (idx, column_index) in self.column_indices.iter().enumerate() {
531 let raw = record
532 .get(*column_index)
533 .map(|slice| io_utils::decode_bytes(slice, self.encoding))
534 .transpose()?;
535 let comparable = match raw {
536 Some(value) => {
537 let ty = &self.column_types[idx];
538 let normalized = self
539 .column_meta
540 .get(idx)
541 .and_then(|meta| meta.as_ref())
542 .map(|meta| meta.normalize_value(&value))
543 .unwrap_or_else(|| std::borrow::Cow::Borrowed(value.as_str()));
544 let parsed = parse_typed_value(normalized.as_ref(), ty)?;
545 ComparableValue(parsed)
546 }
547 None => ComparableValue(None),
548 };
549 key_components.push(DirectionalComparableValue::new(
550 comparable,
551 self.directions[idx],
552 ));
553 }
554 self.map.entry(key_components).or_default().push(offset);
555 Ok(())
556 }
557
558 fn finish(self) -> IndexVariant {
559 IndexVariant {
560 columns: self.columns,
561 directions: self.directions,
562 column_types: self.column_types,
563 map: self.map,
564 name: self.name,
565 }
566 }
567}
568
569fn lookup_indices(headers: &[String], columns: &[String]) -> Result<Vec<usize>> {
570 columns
571 .iter()
572 .map(|column| {
573 headers
574 .iter()
575 .position(|header| header == column)
576 .ok_or_else(|| anyhow!("Column '{column}' not found in CSV headers"))
577 })
578 .collect()
579}
580
581#[derive(Debug, Clone, Serialize, Deserialize)]
582struct LegacyCsvIndex {
583 version: u32,
584 columns: Vec<String>,
585 column_types: Vec<ColumnType>,
586 headers: Vec<String>,
587 map: BTreeMap<Vec<ComparableValue>, Vec<u64>>,
588}
589
590impl From<LegacyCsvIndex> for CsvIndex {
591 fn from(legacy: LegacyCsvIndex) -> Self {
592 let directions = vec![SortDirection::Asc; legacy.columns.len()];
593 let map = legacy
594 .map
595 .into_iter()
596 .map(|(key, offsets)| {
597 let directional_key = key
598 .into_iter()
599 .map(|value| DirectionalComparableValue::new(value, SortDirection::Asc))
600 .collect::<Vec<_>>();
601 (directional_key, offsets)
602 })
603 .collect::<BTreeMap<_, _>>();
604 let row_count = map.values().map(|offsets| offsets.len()).sum();
605 CsvIndex {
606 version: INDEX_VERSION,
607 headers: legacy.headers,
608 variants: vec![IndexVariant {
609 columns: legacy.columns,
610 directions,
611 column_types: legacy.column_types,
612 map,
613 name: None,
614 }],
615 row_count,
616 }
617 }
618}
619
620#[cfg(test)]
621mod tests {
622 use super::*;
623 use encoding_rs::UTF_8;
624 use tempfile::tempdir;
625
626 #[test]
627 fn parse_index_spec_supports_mixed_directions() {
628 let spec = IndexDefinition::parse("col1:desc,col2:asc,col3").unwrap();
629 assert_eq!(spec.columns, vec!["col1", "col2", "col3"]);
630 assert_eq!(
631 spec.directions,
632 vec![SortDirection::Desc, SortDirection::Asc, SortDirection::Asc]
633 );
634 assert!(spec.name.is_none());
635 }
636
637 #[test]
638 fn parse_index_spec_supports_named_variants() {
639 let spec = IndexDefinition::parse("top=col1:desc,col2").unwrap();
640 assert_eq!(spec.name.as_deref(), Some("top"));
641 assert_eq!(spec.columns, vec!["col1", "col2"]);
642 assert_eq!(
643 spec.directions,
644 vec![SortDirection::Desc, SortDirection::Asc]
645 );
646 }
647
648 #[test]
649 fn expand_combo_spec_generates_prefix_variants() {
650 let variants = IndexDefinition::expand_combo_spec("col1:asc|desc,col2:asc").unwrap();
651 assert_eq!(variants.len(), 4);
652 let combos: Vec<(Vec<String>, Vec<SortDirection>, String)> = variants
653 .into_iter()
654 .map(|definition| {
655 (
656 definition.columns,
657 definition.directions,
658 definition.name.unwrap(),
659 )
660 })
661 .collect();
662 assert!(combos.iter().any(|(cols, dirs, _)| {
663 cols == &vec!["col1".to_string()] && dirs == &vec![SortDirection::Asc]
664 }));
665 assert!(combos.iter().any(|(cols, dirs, _)| {
666 cols == &vec!["col1".to_string()] && dirs == &vec![SortDirection::Desc]
667 }));
668 assert!(combos.iter().any(|(cols, dirs, name)| {
669 cols == &vec!["col1".to_string(), "col2".to_string()]
670 && dirs == &vec![SortDirection::Asc, SortDirection::Asc]
671 && name.contains("col1-asc")
672 }));
673 }
674
675 #[test]
676 fn expand_combo_spec_honors_name_prefix() {
677 let variants =
678 IndexDefinition::expand_combo_spec("geo=country:asc|desc,region:asc|desc").unwrap();
679 assert!(variants.len() >= 4);
680 for definition in variants {
681 let name = definition.name.unwrap();
682 assert!(name.starts_with("geo_"));
683 assert_eq!(definition.columns[0], "country");
684 }
685 }
686
687 #[test]
688 fn build_multiple_variants_and_match() {
689 let dir = tempdir().unwrap();
690 let csv_path = dir.path().join("data.csv");
691 std::fs::write(&csv_path, "a,b,c\n1,x,alpha\n2,y,beta\n3,z,gamma\n").unwrap();
692
693 let definitions = vec![
694 IndexDefinition::from_columns(vec!["a".to_string()]).unwrap(),
695 IndexDefinition::parse("descending=a:desc,b:asc").unwrap(),
696 ];
697
698 let index = CsvIndex::build(&csv_path, &definitions, None, None, b',', UTF_8).unwrap();
699
700 assert_eq!(index.variants().len(), 2);
701
702 let asc_match = index
703 .best_match(&[("a".to_string(), SortDirection::Asc)])
704 .unwrap();
705 assert_eq!(
706 asc_match
707 .columns()
708 .iter()
709 .map(|s| s.as_str())
710 .collect::<Vec<_>>(),
711 vec!["a"]
712 );
713
714 let desc_match = index
715 .best_match(&[
716 ("a".to_string(), SortDirection::Desc),
717 ("b".to_string(), SortDirection::Asc),
718 ])
719 .unwrap();
720 assert_eq!(desc_match.name(), Some("descending"));
721 assert_eq!(
722 desc_match
723 .columns()
724 .iter()
725 .map(|s| s.as_str())
726 .collect::<Vec<_>>(),
727 vec!["a", "b"]
728 );
729
730 let offsets: Vec<u64> = desc_match.ordered_offsets().collect();
731 assert_eq!(offsets.len(), 3);
732 assert!(offsets[0] > offsets[2]);
734 }
735}