use arrow_schema::SchemaRef;
use super::DynModeConfig;
use crate::extractor::{self, KeyExtractError};
impl DynModeConfig {
pub fn from_key_col(schema: SchemaRef, key_col: usize) -> Result<Self, KeyExtractError> {
let fields = schema.fields();
if key_col >= fields.len() {
return Err(KeyExtractError::ColumnOutOfBounds(key_col, fields.len()));
}
let extractor = extractor::projection_for_field(schema.clone(), key_col)?;
Self::new(schema, extractor)
}
pub fn from_key_name(schema: SchemaRef, key_field: &str) -> Result<Self, KeyExtractError> {
let fields = schema.fields();
let Some((idx, _)) = fields
.iter()
.enumerate()
.find(|(_, f)| f.name() == key_field)
else {
return Err(KeyExtractError::NoSuchField {
name: key_field.to_string(),
});
};
Self::from_key_col(schema, idx)
}
pub fn from_metadata(schema: SchemaRef) -> Result<Self, KeyExtractError> {
use std::collections::HashMap;
fn is_truthy(s: &str) -> bool {
matches!(s, "true" | "TRUE" | "True" | "yes" | "YES" | "Yes")
}
fn parse_names_list(s: &str) -> Vec<String> {
let t = s.trim();
if t.starts_with('[') && t.ends_with(']') {
let inner = &t[1..t.len() - 1];
inner
.split(',')
.map(|p| p.trim().trim_matches('"').to_string())
.filter(|p| !p.is_empty())
.collect()
} else {
vec![t.trim_matches('"').to_string()]
}
}
let fields = schema.fields();
let mut marks: Vec<(Option<u32>, usize)> = Vec::new();
for (i, f) in fields.iter().enumerate() {
let md: &HashMap<String, String> = f.metadata();
if let Some(v) = md.get("tonbo.key") {
let v = v.trim();
if let Ok(ord) = v.parse::<u32>() {
marks.push((Some(ord), i));
} else if is_truthy(v) {
marks.push((None, i));
}
}
}
if !marks.is_empty() {
if marks.len() == 1 {
let idx = marks[0].1;
return Self::from_key_col(schema, idx);
}
if marks.iter().any(|(o, _)| o.is_none()) {
return Err(KeyExtractError::NoSuchField {
name: "multiple tonbo.key markers require numeric ordinals".to_string(),
});
}
let mut ordered: Vec<(u32, usize)> = marks
.into_iter()
.filter_map(|(ord, idx)| ord.map(|o| (o, idx)))
.collect();
ordered.sort_by_key(|(ord, _)| *ord);
let indices: Vec<usize> = ordered.into_iter().map(|(_, idx)| idx).collect();
let extractor = extractor::projection_for_columns(schema.clone(), indices)?;
return Self::new(schema, extractor);
}
let smd: &HashMap<String, String> = schema.metadata();
if let Some(namev) = smd.get("tonbo.keys") {
let names = parse_names_list(namev);
if names.is_empty() {
return Err(KeyExtractError::NoSuchField {
name: "tonbo.keys[]".to_string(),
});
}
if names.len() == 1 {
return Self::from_key_name(schema, &names[0]);
}
let mut indices: Vec<usize> = Vec::with_capacity(names.len());
for n in names.iter() {
let Some((idx, _)) = fields.iter().enumerate().find(|(_, f)| f.name() == n) else {
return Err(KeyExtractError::NoSuchField { name: n.clone() });
};
indices.push(idx);
}
let extractor = extractor::projection_for_columns(schema.clone(), indices)?;
return Self::new(schema, extractor);
}
Err(KeyExtractError::NoSuchField {
name: "<tonbo.key|tonbo.keys>".to_string(),
})
}
pub fn from_schema(schema: SchemaRef) -> Result<Self, KeyExtractError> {
Self::from_metadata(schema)
}
}