use polars::{io::RowIndex, prelude::*};
pub fn add_row_index_column(
df: DataFrame,
opt_row_index: Option<RowIndex>,
) -> PolarsResult<DataFrame> {
match opt_row_index {
Some(row_index) => {
tracing::debug!(
"Adding row index column '{}' with offset {}.",
row_index.name,
row_index.offset
);
df.lazy() .with_row_index(row_index.name, Some(row_index.offset))
.collect() }
None => {
tracing::trace!(
"Row index addition not requested or config could not be resolved by caller."
);
Ok(df) }
}
}
#[cfg(test)]
mod tests_add_row_index_column {
use super::*;
use crate::{DataFilter, MAX_ATTEMPTS};
fn get_filter(enabled: bool, name: &str, offset: u32) -> DataFilter {
DataFilter {
add_row_index: enabled,
index_column_name: name.to_string(),
index_column_offset: offset,
..Default::default()
}
}
fn assert_df_equal(df_output: &DataFrame, df_expected: &DataFrame, context: &str) {
assert!(
df_output.equals_missing(df_expected),
"\nAssertion Failed: {}\n\nOutput DF:\n{}\nSchema: {:?}\n\nExpected DF:\n{}\nSchema: {:?}\n",
context,
df_output,
df_output.schema(),
df_expected,
df_expected.schema()
);
}
#[test]
fn test_add_index_col_when_disabled() -> PolarsResult<()> {
let df_input = df! {"col_a" => &[1, 2, 3], "col_b" => &["x", "y", "z"]}?;
let df_expected = df_input.clone();
let filter = get_filter(false, "Any Name", 1);
let row_index = filter.get_row_index(df_input.schema())?;
println!("df_input: {df_input}");
println!("row_index: {row_index:?}");
let df_output = add_row_index_column(df_input.clone(), row_index)?;
println!("df_output: {df_output}");
assert_df_equal(&df_output, &df_expected, "add_index_col_when_disabled");
Ok(())
}
#[test]
fn test_add_index_col_when_enabled_default_name_offset() -> PolarsResult<()> {
let df_input = df! {"data" => &[10, 20]}?;
let df_expected = df! {
"index" => &[0u32, 1],
"data" => &[10, 20]
}?;
let filter = get_filter(true, "index", 0);
let row_index = filter.get_row_index(df_input.schema())?;
println!("df_input: {df_input}");
println!("row_index: {row_index:?}");
let df_output = add_row_index_column(df_input.clone(), row_index)?;
println!("df_output: {df_output}");
assert_df_equal(
&df_output,
&df_expected,
r#"Test case with default Polars behavior (name="index", offset=0)"#,
);
Ok(())
}
#[test]
fn test_add_index_col_when_enabled_custom_name_offset() -> PolarsResult<()> {
let df_input = df! {"Value" => &[1.1, 2.2, 3.3]}?; let custom_name = "row_num";
let custom_offset = 1u32;
let df_expected = df! {
custom_name => &[custom_offset, custom_offset + 1, custom_offset + 2],
"Value" => &[1.1, 2.2, 3.3]
}?;
let filter = get_filter(true, custom_name, custom_offset);
let row_index = filter.get_row_index(df_input.schema())?;
println!("df_input: {df_input}");
println!("row_index: {row_index:?}");
let df_output = add_row_index_column(df_input.clone(), row_index)?;
println!("df_output: {df_output}");
assert_df_equal(
&df_output,
&df_expected,
r#"Test case with row_index {name="row_num", offset=1}"#,
);
Ok(())
}
#[test]
fn test_add_index_col_name_conflict_base() -> PolarsResult<()> {
let df_input = df! {"Row Number" => &[99, 100], "data" => &["a", "b"]}?;
let expected_name = "Row Number_1";
let df_expected = df! {
expected_name => &[0, 1],
"Row Number" => &[99, 100],
"data" => &["a", "b"],
}?;
let filter = get_filter(true, "Row Number", 0);
let row_index = filter.get_row_index(df_input.schema())?;
println!("df_input: {df_input}");
println!("row_index: {row_index:?}");
let df_output = add_row_index_column(df_input.clone(), row_index)?;
println!("df_output: {df_output}");
assert_df_equal(
&df_output,
&df_expected,
r#"Test case with row_index {name=Row Number", offset=1}"#,
);
Ok(())
}
#[test]
fn test_add_index_col_name_conflict_multiple_suffixes() -> PolarsResult<()> {
let df_input = df! {
"CustomID" => &[1], "CustomID_1" => &[2], "CustomID_2" => &[3] }?;
let expected_name = "CustomID_3";
let df_expected = df! {
expected_name => &[10],
"CustomID" => &[1], "CustomID_1" => &[2], "CustomID_2" => &[3] }?;
let filter = get_filter(true, "CustomID", 10);
let row_index = filter.get_row_index(df_input.schema())?;
println!("df_input: {df_input}");
println!("row_index: {row_index:?}");
let df_output = add_row_index_column(df_input.clone(), row_index)?;
println!("df_output: {df_output}");
assert_df_equal(
&df_output,
&df_expected,
r#"Test case with row_index {name=Row Number", offset=1}"#,
);
Ok(())
}
#[test]
fn test_add_index_col_name_conflict_max_attempts_error() -> PolarsResult<()> {
let mut df_input = df! {"BaseName" => &[1]}?;
for i in 0..MAX_ATTEMPTS {
let suffix = i + 1; let col = Column::new(format!("BaseName_{suffix}").into(), &[i as i32]);
df_input.with_column(col)?;
}
println!("1. df_input: {df_input}");
let col = Column::new("BaseName".into(), &[9999]);
df_input
.with_column(col)
.expect("Adding base name column failed");
println!("2. df_input: {df_input}");
let filter = get_filter(true, "BaseName", 0);
match filter.get_row_index(df_input.schema()) {
Ok(_) => panic!("Expected error due to max attempts, but got Ok"),
Err(e) => {
println!("error: '{e}'");
assert!(matches!(e, PolarsError::ComputeError(_)));
assert!(
e.to_string()
.contains("Failed to find a unique column name starting with")
);
assert!(e.to_string().contains("BaseName"));
assert!(e.to_string().contains("1000"));
}
}
Ok(()) }
#[test] fn test_add_index_col_empty_dataframe() -> PolarsResult<()> {
let df_input = DataFrame::empty(); let expected_df = df! {"RowID" => Vec::<u32>::new()}?;
let filter = get_filter(true, "RowID", 0);
let row_index = filter.get_row_index(df_input.schema())?;
let df_output = add_row_index_column(df_input.clone(), row_index)?;
assert_eq!(df_output.schema(), expected_df.schema());
assert_eq!(df_output.height(), 0); assert_eq!(df_output.width(), 1); assert!(df_output.column("RowID").is_ok());
Ok(())
}
}