dsq_core/ops/
basic.rs

1//! Basic data operations for dsq
2//!
3//! This module provides fundamental operations like selection, filtering,
4//! mapping, and basic transformations that form the building blocks of
5//! more complex data processing pipelines.
6
7use std::collections::HashMap;
8
9use polars::prelude::*;
10use polars_ops::prelude::UnpivotDF;
11#[cfg(feature = "rand")]
12use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
13
14use crate::error::{Error, Result};
15use crate::Value;
16
17/// Sort options for specifying column and direction
18#[derive(Debug, Clone, PartialEq)]
19pub struct SortOptions {
20    /// Column name to sort by
21    pub column: String,
22    /// Whether to sort in descending order
23    pub descending: bool,
24}
25
26impl SortOptions {
27    /// Create ascending sort options for a column
28    #[must_use]
29    pub fn asc(column: String) -> Self {
30        Self {
31            column,
32            descending: false,
33        }
34    }
35
36    /// Create descending sort options for a column
37    #[must_use]
38    pub fn desc(column: String) -> Self {
39        Self {
40            column,
41            descending: true,
42        }
43    }
44}
45
46/// Select specific columns from a `DataFrame`
47pub fn select(df: &DataFrame, columns: &[String]) -> Result<DataFrame> {
48    let selected = df
49        .select(columns)
50        .map_err(|e| Error::operation(format!("Failed to select columns: {e}")))?;
51    Ok(selected)
52}
53
54/// Select columns by index
55pub fn select_by_index(df: &DataFrame, indices: &[usize]) -> Result<DataFrame> {
56    let column_names: Vec<String> = indices
57        .iter()
58        .filter_map(|&idx| df.get_column_names().get(idx).map(|s| (*s).to_string()))
59        .collect();
60
61    if column_names.len() != indices.len() {
62        return Err(Error::operation("Some column indices are out of bounds"));
63    }
64
65    select(df, &column_names)
66}
67
68/// Filter rows based on a predicate
69pub fn filter(df: &DataFrame, mask: &Series) -> Result<DataFrame> {
70    if mask.dtype() != &DataType::Boolean {
71        return Err(Error::operation("Filter mask must be boolean"));
72    }
73
74    let boolean_mask = mask
75        .bool()
76        .map_err(|e| Error::operation(format!("Failed to convert mask to boolean: {e}")))?;
77
78    let filtered = df
79        .filter(boolean_mask)
80        .map_err(|e| Error::operation(format!("Failed to filter: {e}")))?;
81    Ok(filtered)
82}
83
84/// Get the first n rows from a `DataFrame`
85#[must_use]
86pub fn head_df(df: &DataFrame, n: usize) -> DataFrame {
87    df.head(Some(n))
88}
89
90/// Get the last n rows from a `DataFrame`
91#[must_use]
92pub fn tail_df(df: &DataFrame, n: usize) -> DataFrame {
93    df.tail(Some(n))
94}
95
96/// Get a slice of rows from a `DataFrame`
97#[must_use]
98pub fn slice_df(df: &DataFrame, offset: i64, length: usize) -> DataFrame {
99    df.slice(offset, length)
100}
101
102/// Sort `DataFrame` by columns
103pub fn sort(df: &DataFrame, by: &[String], descending: Vec<bool>) -> Result<DataFrame> {
104    let sorted = df
105        .sort(
106            by,
107            SortMultipleOptions::default().with_order_descending_multi(descending),
108        )
109        .map_err(|e| Error::operation(format!("Failed to sort: {e}")))?;
110    Ok(sorted)
111}
112
113/// Get unique rows from `DataFrame`
114pub fn unique_df(
115    df: &DataFrame,
116    subset: Option<&[String]>,
117    keep: UniqueKeepStrategy,
118) -> Result<DataFrame> {
119    let unique_df = df
120        .unique::<String, String>(subset, keep, None)
121        .map_err(|e| Error::operation(format!("Failed to get unique rows: {e}")))?;
122    Ok(unique_df)
123}
124
125/// Drop null values
126pub fn drop_nulls(df: &DataFrame, subset: Option<&[String]>) -> Result<DataFrame> {
127    let result = df
128        .drop_nulls(subset)
129        .map_err(|e| Error::operation(format!("Failed to drop nulls: {e}")))?;
130    Ok(result)
131}
132
133/// Fill null values with a constant
134pub fn fill_null(df: &DataFrame, _value: &Value) -> Result<DataFrame> {
135    let mut filled = df.clone();
136
137    for column_name in df.get_column_names() {
138        let column = df
139            .column(column_name)
140            .map_err(|e| Error::operation(format!("Failed to get column: {e}")))?;
141
142        if column.null_count() > 0 {
143            // For now, just use forward fill strategy instead of custom values
144            let filled_column = column
145                .fill_null(FillNullStrategy::Forward(None))
146                .map_err(|e| Error::operation(format!("Failed to fill nulls: {e}")))?;
147
148            filled = filled
149                .with_column(filled_column)
150                .map_err(|e| Error::operation(format!("Failed to update column: {e}")))?
151                .clone();
152        }
153    }
154
155    Ok(filled)
156}
157
158/// Rename columns
159#[allow(clippy::implicit_hasher)]
160pub fn rename(df: &DataFrame, mapping: &HashMap<String, String>) -> Result<DataFrame> {
161    let mut renamed = df.clone();
162
163    for (old_name, new_name) in mapping {
164        renamed = renamed
165            .rename(old_name.as_str(), new_name.as_str().into())
166            .map_err(|e| Error::operation(format!("Failed to rename column: {e}")))?
167            .clone();
168    }
169
170    Ok(renamed)
171}
172
173/// Add a new column with a constant value
174pub fn with_column(df: &DataFrame, name: &str, value: &Value) -> Result<DataFrame> {
175    let series = value_to_series(name, value, df.height())?;
176
177    let mut result = df.clone();
178    result
179        .with_column(series)
180        .map_err(|e| Error::operation(format!("Failed to add column: {e}")))?;
181
182    Ok(result)
183}
184
185/// Drop columns
186pub fn drop(df: &DataFrame, columns: &[String]) -> Result<DataFrame> {
187    let dropped = df.drop_many(columns);
188    Ok(dropped)
189}
190
191/// Cast column types
192pub fn cast(df: &DataFrame, column: &str, dtype: &DataType) -> Result<DataFrame> {
193    let casted_column = df
194        .column(column)
195        .map_err(|e| Error::operation(format!("Column not found: {e}")))?
196        .cast(dtype)
197        .map_err(|e| Error::operation(format!("Failed to cast column: {e}")))?;
198
199    let mut result = df.clone();
200    result
201        .with_column(casted_column)
202        .map_err(|e| Error::operation(format!("Failed to update column: {e}")))?;
203
204    Ok(result)
205}
206
207/// Apply a function to each element in a column
208pub fn map_column<F>(df: &DataFrame, column: &str, f: F) -> Result<DataFrame>
209where
210    F: Fn(&Value) -> Result<Value>,
211{
212    let col = df
213        .column(column)
214        .map_err(|e| Error::operation(format!("Column not found: {e}")))?;
215
216    let values: Vec<Value> = series_to_values(col.as_materialized_series())?;
217    let mapped_values: Result<Vec<Value>> = values.iter().map(f).collect();
218    let mapped_values = mapped_values?;
219
220    let mapped_series = values_to_series(column, &mapped_values)?;
221
222    let mut result = df.clone();
223    result
224        .with_column(mapped_series)
225        .map_err(|e| Error::operation(format!("Failed to update column: {e}")))?;
226
227    Ok(result)
228}
229
230/// Transpose `DataFrame`
231pub fn transpose(
232    df: &DataFrame,
233    _include_header: bool,
234    header_name: Option<&str>,
235) -> Result<DataFrame> {
236    // The Polars transpose API has changed, using a simpler version for now
237    let mut df_mut = df.clone();
238    let transposed = df_mut
239        .transpose(header_name, None)
240        .map_err(|e| Error::operation(format!("Failed to transpose: {e}")))?;
241    Ok(transposed)
242}
243
244/// Melt `DataFrame` from wide to long format
245pub fn melt(
246    df: &DataFrame,
247    id_vars: &[String],
248    value_vars: &[String],
249    _variable_name: Option<&str>,
250    _value_name: Option<&str>,
251) -> Result<DataFrame> {
252    let melted = if id_vars.is_empty() {
253        df.unpivot([] as [&str; 0], value_vars)
254            .map_err(|e| Error::operation(format!("Failed to melt: {e}")))?
255    } else {
256        df.unpivot(id_vars, value_vars)
257            .map_err(|e| Error::operation(format!("Failed to melt: {e}")))?
258    };
259
260    Ok(melted)
261}
262
263/// Pivot `DataFrame` from long to wide format (placeholder)
264pub fn pivot(
265    _df: &DataFrame,
266    _values: &[String],
267    _index: &[String],
268    _columns: &[String],
269    _aggregate_fn: Option<&str>,
270) -> Result<DataFrame> {
271    // For now, return an error as pivot requires more complex implementation
272    Err(Error::operation("Pivot functionality not yet implemented"))
273}
274
275/// Sample rows from `DataFrame`
276#[allow(unused_variables)]
277pub fn sample(
278    df: &DataFrame,
279    n: Option<usize>,
280    frac: Option<f64>,
281    _with_replacement: bool,
282    seed: Option<u64>,
283) -> Result<DataFrame> {
284    // For now, implement a simple sampling approach
285    if let Some(n) = n {
286        let total_rows = df.height();
287        let sample_size = n.min(total_rows);
288
289        #[cfg(feature = "rand")]
290        {
291            use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
292            let mut rng = if let Some(seed) = seed {
293                StdRng::seed_from_u64(seed)
294            } else {
295                StdRng::from_os_rng()
296            };
297
298            #[allow(clippy::cast_possible_truncation)]
299            let mut indices: Vec<u32> = (0..total_rows as u32).collect();
300            indices.shuffle(&mut rng);
301            indices.truncate(sample_size);
302
303            let idx_ca = polars::prelude::UInt32Chunked::new("idx".into(), indices);
304            let sampled = df
305                .take(&idx_ca)
306                .map_err(|e| Error::operation(format!("Failed to sample: {e}")))?;
307            Ok(sampled)
308        }
309        #[cfg(not(feature = "rand"))]
310        {
311            Err(Error::operation("Sampling requires rand feature"))
312        }
313    } else if let Some(frac_value) = frac {
314        #[cfg(feature = "rand")]
315        {
316            let total_rows = df.height();
317            #[allow(
318                clippy::cast_precision_loss,
319                clippy::cast_possible_truncation,
320                clippy::cast_sign_loss
321            )]
322            let sample_size = ((total_rows as f64) * frac_value).round() as usize;
323
324            let mut rng = if let Some(seed) = seed {
325                StdRng::seed_from_u64(seed)
326            } else {
327                StdRng::from_os_rng()
328            };
329
330            #[allow(clippy::cast_possible_truncation)]
331            let mut indices: Vec<u32> = (0..total_rows as u32).collect();
332            indices.shuffle(&mut rng);
333            indices.truncate(sample_size);
334
335            let idx_ca = polars::prelude::UInt32Chunked::new("idx".into(), indices);
336            let sampled = df
337                .take(&idx_ca)
338                .map_err(|e| Error::operation(format!("Failed to sample: {e}")))?;
339            Ok(sampled)
340        }
341        #[cfg(not(feature = "rand"))]
342        {
343            Err(Error::operation("Sampling requires rand feature"))
344        }
345    } else {
346        Err(Error::operation(
347            "Either n or frac must be specified for sampling",
348        ))
349    }
350}
351
352/// Explode list columns into separate rows
353pub fn explode(df: &DataFrame, columns: &[String]) -> Result<DataFrame> {
354    let exploded = df
355        .explode(columns)
356        .map_err(|e| Error::operation(format!("Failed to explode: {e}")))?;
357    Ok(exploded)
358}
359
360/// Select columns from a Value (works with `DataFrame`, Array, Object)
361pub fn select_columns(value: &Value, columns: &[String]) -> Result<Value> {
362    match value {
363        Value::DataFrame(df) => {
364            let selected = select(df, columns)?;
365            Ok(Value::DataFrame(selected))
366        }
367        Value::Array(arr) => {
368            // For arrays of objects, select specified fields
369            let selected_objects: Result<Vec<Value>> = arr
370                .iter()
371                .map(|v| match v {
372                    Value::Object(obj) => {
373                        let mut selected_obj = std::collections::HashMap::new();
374                        for column in columns {
375                            if let Some(val) = obj.get(column) {
376                                selected_obj.insert(column.clone(), val.clone());
377                            }
378                        }
379                        Ok(Value::Object(selected_obj))
380                    }
381                    _ => Ok(v.clone()),
382                })
383                .collect();
384            Ok(Value::Array(selected_objects?))
385        }
386        Value::Object(obj) => {
387            let mut selected_obj = std::collections::HashMap::new();
388            for column in columns {
389                if let Some(val) = obj.get(column) {
390                    selected_obj.insert(column.clone(), val.clone());
391                }
392            }
393            Ok(Value::Object(selected_obj))
394        }
395        _ => Err(Error::operation(
396            "Cannot select columns from this value type".to_string(),
397        )),
398    }
399}
400
401/// Filter rows based on a predicate function
402pub fn filter_rows(value: &Value, mask: &Value) -> Result<Value> {
403    match value {
404        Value::DataFrame(df) => {
405            if let Value::Array(mask_arr) = mask {
406                let bool_mask: Result<Vec<bool>> = mask_arr
407                    .iter()
408                    .map(|v| match v {
409                        Value::Bool(b) => Ok(*b),
410                        _ => Err(Error::operation("Filter mask must be boolean")),
411                    })
412                    .collect();
413                let mask_series = Series::new("mask".into(), bool_mask?);
414                let filtered = filter(df, &mask_series)?;
415                Ok(Value::DataFrame(filtered))
416            } else {
417                Err(Error::operation("Filter mask must be array of booleans"))
418            }
419        }
420        Value::Array(arr) => {
421            if let Value::Array(mask_arr) = mask {
422                if mask_arr.len() != arr.len() {
423                    return Err(Error::operation(
424                        "Mask length must match array length".to_string(),
425                    ));
426                }
427                let filtered: Result<Vec<Value>> = arr
428                    .iter()
429                    .zip(mask_arr.iter())
430                    .filter_map(|(val, mask_val)| match mask_val {
431                        Value::Bool(true) => Some(Ok(val.clone())),
432                        Value::Bool(false) => None,
433                        _ => Some(Err(Error::operation("Filter mask must be boolean"))),
434                    })
435                    .collect();
436                Ok(Value::Array(filtered?))
437            } else {
438                Err(Error::operation("Filter mask must be array of booleans"))
439            }
440        }
441        _ => Err(Error::operation(
442            "Cannot filter this value type".to_string(),
443        )),
444    }
445}
446
447/// Filter values based on a predicate function
448pub fn filter_values<F>(value: &Value, predicate: F) -> Result<Value>
449where
450    F: Fn(&Value) -> Result<bool>,
451{
452    match value {
453        Value::Array(arr) => {
454            let filtered: Result<Vec<Value>> = arr
455                .iter()
456                .filter_map(|v| match predicate(v) {
457                    Ok(true) => Some(Ok(v.clone())),
458                    Ok(false) => None,
459                    Err(e) => Some(Err(e)),
460                })
461                .collect();
462            Ok(Value::Array(filtered?))
463        }
464        Value::DataFrame(df) => {
465            // For DataFrames, we need to convert each row to a Value and apply the predicate
466            let mut mask = Vec::new();
467            for i in 0..df.height() {
468                let row_value = df_row_to_value(df, i)?;
469                mask.push(predicate(&row_value)?);
470            }
471            let mask_series = Series::new("mask".into(), mask);
472            let filtered = filter(df, &mask_series)?;
473            Ok(Value::DataFrame(filtered))
474        }
475        _ => {
476            if predicate(value)? {
477                Ok(value.clone())
478            } else {
479                Ok(Value::Null)
480            }
481        }
482    }
483}
484
485/// Sort by columns with sort options
486pub fn sort_by_columns(value: &Value, options: &[SortOptions]) -> Result<Value> {
487    match value {
488        Value::DataFrame(df) => {
489            let columns: Vec<String> = options.iter().map(|opt| opt.column.clone()).collect();
490            let descending: Vec<bool> = options.iter().map(|opt| opt.descending).collect();
491            let sorted = sort(df, &columns, descending)?;
492            Ok(Value::DataFrame(sorted))
493        }
494        Value::Array(arr) => {
495            if options.is_empty() {
496                return Ok(value.clone());
497            }
498
499            let mut sorted_arr = arr.clone();
500            sorted_arr.sort_by(|a, b| {
501                for opt in options {
502                    let cmp = match (a, b) {
503                        (Value::Object(obj_a), Value::Object(obj_b)) => {
504                            let val_a = obj_a.get(&opt.column).unwrap_or(&Value::Null);
505                            let val_b = obj_b.get(&opt.column).unwrap_or(&Value::Null);
506                            compare_values(val_a, val_b)
507                        }
508                        _ => std::cmp::Ordering::Equal,
509                    };
510
511                    let final_cmp = if opt.descending { cmp.reverse() } else { cmp };
512                    if final_cmp != std::cmp::Ordering::Equal {
513                        return final_cmp;
514                    }
515                }
516                std::cmp::Ordering::Equal
517            });
518            Ok(Value::Array(sorted_arr))
519        }
520        _ => Err(Error::operation("Cannot sort this value type".to_string())),
521    }
522}
523
524/// Add a column to a Value
525pub fn add_column(value: &Value, name: &str, column_value: &Value) -> Result<Value> {
526    match value {
527        Value::DataFrame(df) => {
528            let new_df = with_column(df, name, column_value)?;
529            Ok(Value::DataFrame(new_df))
530        }
531        _ => Err(Error::operation(
532            "Cannot add column to this value type".to_string(),
533        )),
534    }
535}
536
537/// Drop columns from a Value
538pub fn drop_columns(value: &Value, columns: &[String]) -> Result<Value> {
539    match value {
540        Value::DataFrame(df) => {
541            let dropped = drop(df, columns)?;
542            Ok(Value::DataFrame(dropped))
543        }
544        _ => Err(Error::operation(
545            "Cannot drop columns from this value type".to_string(),
546        )),
547    }
548}
549
550/// Rename columns in a Value
551#[allow(clippy::implicit_hasher)]
552pub fn rename_columns(value: &Value, mapping: &HashMap<String, String>) -> Result<Value> {
553    match value {
554        Value::DataFrame(df) => {
555            let renamed = rename(df, mapping)?;
556            Ok(Value::DataFrame(renamed))
557        }
558        _ => Err(Error::operation(
559            "Cannot rename columns in this value type".to_string(),
560        )),
561    }
562}
563
564/// Head operation on Value
565pub fn head(value: &Value, n: usize) -> Result<Value> {
566    match value {
567        Value::DataFrame(df) => Ok(Value::DataFrame(df.head(Some(n)))),
568        Value::Array(arr) => {
569            let take = n.min(arr.len());
570            Ok(Value::Array(arr[..take].to_vec()))
571        }
572        _ => Ok(value.clone()),
573    }
574}
575
576/// Tail operation on Value
577pub fn tail(value: &Value, n: usize) -> Result<Value> {
578    match value {
579        Value::DataFrame(df) => Ok(Value::DataFrame(df.tail(Some(n)))),
580        Value::Array(arr) => {
581            let len = arr.len();
582            let start = len.saturating_sub(n);
583            Ok(Value::Array(arr[start..].to_vec()))
584        }
585        _ => Ok(value.clone()),
586    }
587}
588
589/// Slice operation on Value
590pub fn slice(value: &Value, offset: i64, length: usize) -> Result<Value> {
591    match value {
592        Value::DataFrame(df) => Ok(Value::DataFrame(df.slice(offset, length))),
593        Value::Array(arr) => {
594            #[allow(
595                clippy::cast_sign_loss,
596                clippy::cast_possible_truncation,
597                clippy::cast_possible_wrap
598            )]
599            let start = if offset < 0 {
600                (arr.len() as i64 + offset).max(0) as usize
601            } else {
602                (offset as usize).min(arr.len())
603            };
604            let end = (start + length).min(arr.len());
605            Ok(Value::Array(arr[start..end].to_vec()))
606        }
607        _ => Ok(value.clone()),
608    }
609}
610
611/// Reverse operation on Value
612pub fn reverse(value: &Value) -> Result<Value> {
613    match value {
614        Value::DataFrame(df) => {
615            #[allow(clippy::cast_possible_truncation)]
616            let indices: Vec<u32> = (0..df.height() as u32).rev().collect();
617            let idx_ca = polars::prelude::UInt32Chunked::new("idx".into(), indices);
618            let reversed = df
619                .take(&idx_ca)
620                .map_err(|e| Error::operation(format!("Failed to reverse DataFrame: {e}")))?;
621            Ok(Value::DataFrame(reversed))
622        }
623        Value::Array(arr) => {
624            let mut reversed = arr.clone();
625            reversed.reverse();
626            Ok(Value::Array(reversed))
627        }
628        _ => Ok(value.clone()),
629    }
630}
631
632/// Unique operation on Value
633pub fn unique(value: &Value) -> Result<Value> {
634    match value {
635        Value::DataFrame(df) => {
636            let unique_df = df
637                .unique::<String, String>(None, UniqueKeepStrategy::First, None)
638                .map_err(|e| Error::operation(format!("Failed to get unique values: {e}")))?;
639            Ok(Value::DataFrame(unique_df))
640        }
641        Value::Array(arr) => {
642            let mut unique_vals: Vec<Value> = Vec::new();
643            for val in arr {
644                if !unique_vals.contains(val) {
645                    unique_vals.push(val.clone());
646                }
647            }
648            Ok(Value::Array(unique_vals))
649        }
650        _ => Ok(value.clone()),
651    }
652}
653
654/// Count operation on Value
655#[allow(clippy::cast_possible_wrap)]
656pub fn count(value: &Value) -> Result<Value> {
657    let count = match value {
658        Value::DataFrame(df) => df.height() as i64,
659        Value::Array(arr) => arr.len() as i64,
660        Value::Object(obj) => obj.len() as i64,
661        Value::String(s) => s.len() as i64,
662        Value::Null => 0,
663        _ => 1,
664    };
665    Ok(Value::Int(count))
666}
667
668// Helper functions
669
670fn df_row_to_value(df: &DataFrame, row_idx: usize) -> Result<Value> {
671    let mut obj = std::collections::HashMap::new();
672
673    for col_name in df.get_column_names() {
674        let series = df
675            .column(col_name)
676            .map_err(|e| Error::operation(format!("Failed to get column: {e}")))?;
677        let value = series_value_at(series.as_materialized_series(), row_idx)?;
678        obj.insert(col_name.to_string(), value);
679    }
680
681    Ok(Value::Object(obj))
682}
683
684fn series_value_at(series: &Series, idx: usize) -> Result<Value> {
685    if idx >= series.len() {
686        return Ok(Value::Null);
687    }
688
689    match series.dtype() {
690        DataType::Boolean => {
691            let ca = series
692                .bool()
693                .map_err(|e| Error::operation(format!("Failed to get bool: {e}")))?;
694            Ok(ca.get(idx).map_or(Value::Null, Value::Bool))
695        }
696        DataType::Int8 => {
697            let ca = series
698                .i8()
699                .map_err(|e| Error::operation(format!("Failed to get int: {e}")))?;
700            Ok(ca
701                .get(idx)
702                .map_or(Value::Null, |x| Value::Int(i64::from(x))))
703        }
704        DataType::Int16 => {
705            let ca = series
706                .i16()
707                .map_err(|e| Error::operation(format!("Failed to get int: {e}")))?;
708            Ok(ca
709                .get(idx)
710                .map_or(Value::Null, |x| Value::Int(i64::from(x))))
711        }
712        DataType::Int32 => {
713            let ca = series
714                .i32()
715                .map_err(|e| Error::operation(format!("Failed to get int: {e}")))?;
716            Ok(ca
717                .get(idx)
718                .map_or(Value::Null, |x| Value::Int(i64::from(x))))
719        }
720        DataType::Int64 => {
721            let ca = series
722                .i64()
723                .map_err(|e| Error::operation(format!("Failed to get int: {e}")))?;
724            Ok(ca.get(idx).map_or(Value::Null, Value::Int))
725        }
726        DataType::Float32 | DataType::Float64 => {
727            let ca = series
728                .f64()
729                .map_err(|e| Error::operation(format!("Failed to get float: {e}")))?;
730            Ok(ca.get(idx).map_or(Value::Null, Value::Float))
731        }
732        DataType::String => {
733            let ca = series
734                .str()
735                .map_err(|e| Error::operation(format!("Failed to get string: {e}")))?;
736            Ok(ca
737                .get(idx)
738                .map_or(Value::Null, |s| Value::String(s.to_string())))
739        }
740        _ => Ok(Value::Null),
741    }
742}
743
744#[allow(clippy::cast_precision_loss)]
745fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
746    use std::cmp::Ordering;
747
748    match (a, b) {
749        (Value::Null, Value::Null) => Ordering::Equal,
750        (Value::Null, _) => Ordering::Less,
751        (_, Value::Null) => Ordering::Greater,
752        (Value::Bool(a), Value::Bool(b)) => a.cmp(b),
753        (Value::Int(a), Value::Int(b)) => a.cmp(b),
754        (Value::Float(a), Value::Float(b)) => a.partial_cmp(b).unwrap_or(Ordering::Equal),
755        (Value::Int(a), Value::Float(b)) => (*a as f64).partial_cmp(b).unwrap_or(Ordering::Equal),
756        (Value::Float(a), Value::Int(b)) => a.partial_cmp(&(*b as f64)).unwrap_or(Ordering::Equal),
757        (Value::String(a), Value::String(b)) => a.cmp(b),
758        _ => Ordering::Equal,
759    }
760}
761
762/// Helper function to convert Value to Series
763fn value_to_series(name: &str, value: &Value, length: usize) -> Result<Series> {
764    match value {
765        Value::Null => Ok(Series::new_null(name.into(), length)),
766        Value::Bool(b) => Ok(Series::new(name.into(), vec![*b; length])),
767        Value::Int(i) => Ok(Series::new(name.into(), vec![*i; length])),
768        Value::Float(f) => Ok(Series::new(name.into(), vec![*f; length])),
769        Value::String(s) => Ok(Series::new(name.into(), vec![s.as_str(); length])),
770        Value::Array(arr) => {
771            if arr.len() != length {
772                return Err(Error::operation("Array length must match DataFrame height"));
773            }
774            values_to_series(name, arr)
775        }
776        _ => Err(Error::operation("Cannot convert value to series")),
777    }
778}
779
780/// Helper function to convert Series to Values
781fn series_to_values(series: &Series) -> Result<Vec<Value>> {
782    let mut values = Vec::with_capacity(series.len());
783
784    match series.dtype() {
785        DataType::Boolean => {
786            let ca = series
787                .bool()
788                .map_err(|e| Error::operation(format!("Failed to get bool array: {e}")))?;
789            for opt_val in ca {
790                values.push(opt_val.map_or(Value::Null, Value::Bool));
791            }
792        }
793        DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
794            let ca = series
795                .i64()
796                .map_err(|e| Error::operation(format!("Failed to get int array: {e}")))?;
797            for opt_val in ca {
798                values.push(opt_val.map_or(Value::Null, Value::Int));
799            }
800        }
801        DataType::Float32 | DataType::Float64 => {
802            let ca = series
803                .f64()
804                .map_err(|e| Error::operation(format!("Failed to get float array: {e}")))?;
805            for opt_val in ca {
806                values.push(opt_val.map_or(Value::Null, Value::Float));
807            }
808        }
809        DataType::String => {
810            let ca = series
811                .str()
812                .map_err(|e| Error::operation(format!("Failed to get string array: {e}")))?;
813            for opt_val in ca {
814                values.push(opt_val.map_or(Value::Null, |s| Value::String(s.to_string())));
815            }
816        }
817        _ => {
818            return Err(Error::operation(format!(
819                "Unsupported data type: {:?}",
820                series.dtype()
821            )));
822        }
823    }
824
825    Ok(values)
826}
827
828/// Helper function to convert Values to Series
829#[allow(clippy::unnecessary_wraps, clippy::cast_precision_loss)]
830fn values_to_series(name: &str, values: &[Value]) -> Result<Series> {
831    if values.is_empty() {
832        return Ok(Series::new_empty(name.into(), &DataType::Null));
833    }
834
835    // Determine the data type from the first non-null value
836    let dtype = values
837        .iter()
838        .find(|v| !v.is_null())
839        .map_or(DataType::Null, |v| match v {
840            Value::Bool(_) => DataType::Boolean,
841            Value::Int(_) => DataType::Int64,
842            Value::Float(_) => DataType::Float64,
843            Value::String(_) => DataType::String,
844            _ => DataType::Null,
845        });
846
847    match dtype {
848        DataType::Boolean => {
849            let vec: Vec<Option<bool>> = values
850                .iter()
851                .map(|v| match v {
852                    Value::Bool(b) => Some(*b),
853                    _ => None,
854                })
855                .collect();
856            Ok(Series::new(name.into(), vec))
857        }
858        DataType::Int64 => {
859            let vec: Vec<Option<i64>> = values
860                .iter()
861                .map(|v| match v {
862                    Value::Int(i) => Some(*i),
863                    _ => None,
864                })
865                .collect();
866            Ok(Series::new(name.into(), vec))
867        }
868        DataType::Float64 => {
869            let vec: Vec<Option<f64>> = values
870                .iter()
871                .map(|v| match v {
872                    Value::Float(f) => Some(*f),
873                    Value::Int(i) => Some(*i as f64),
874                    _ => None,
875                })
876                .collect();
877            Ok(Series::new(name.into(), vec))
878        }
879        DataType::String => {
880            let vec: Vec<Option<&str>> = values
881                .iter()
882                .map(|v| match v {
883                    Value::String(s) => Some(s.as_str()),
884                    _ => None,
885                })
886                .collect();
887            Ok(Series::new(name.into(), vec))
888        }
889        _ => Ok(Series::new_null(name.into(), values.len())),
890    }
891}
892
893#[cfg(test)]
894mod tests {
895    use super::*;
896
897    #[test]
898    fn test_select() {
899        let df = DataFrame::new(vec![
900            Series::new(PlSmallStr::from("a"), vec![1, 2, 3]).into(),
901            Series::new(PlSmallStr::from("b"), vec![4, 5, 6]).into(),
902            Series::new(PlSmallStr::from("c"), vec![7, 8, 9]).into(),
903        ])
904        .unwrap();
905
906        let selected = select(&df, &["a".to_string(), "c".to_string()]).unwrap();
907        assert_eq!(selected.width(), 2);
908        assert!(selected
909            .get_column_names()
910            .iter()
911            .any(|name| name.as_str() == "a"));
912        assert!(selected
913            .get_column_names()
914            .iter()
915            .any(|name| name.as_str() == "c"));
916    }
917
918    #[test]
919    fn test_filter() {
920        let df = DataFrame::new(vec![
921            Series::new("a".into(), vec![1, 2, 3, 4, 5]).into(),
922            Series::new("b".into(), vec![10, 20, 30, 40, 50]).into(),
923        ])
924        .unwrap();
925
926        let mask = Series::new("mask".into(), vec![true, false, true, false, true]);
927        let filtered = filter(&df, &mask).unwrap();
928
929        assert_eq!(filtered.height(), 3);
930        assert_eq!(filtered.column("a").unwrap().i32().unwrap().get(0), Some(1));
931        assert_eq!(filtered.column("a").unwrap().i32().unwrap().get(1), Some(3));
932        assert_eq!(filtered.column("a").unwrap().i32().unwrap().get(2), Some(5));
933    }
934
935    #[test]
936    fn test_sort() {
937        let df = DataFrame::new(vec![
938            Series::new(PlSmallStr::from("a"), vec![3, 1, 4, 1, 5]).into(),
939            Series::new(PlSmallStr::from("b"), vec!["c", "a", "d", "b", "e"]).into(),
940        ])
941        .unwrap();
942
943        let sorted = sort(&df, &["a".to_string()], vec![false]).unwrap();
944        let col_a = sorted.column("a").unwrap().i32().unwrap();
945
946        assert_eq!(col_a.get(0), Some(1));
947        assert_eq!(col_a.get(1), Some(1));
948        assert_eq!(col_a.get(2), Some(3));
949        assert_eq!(col_a.get(3), Some(4));
950        assert_eq!(col_a.get(4), Some(5));
951    }
952
953    #[test]
954    fn test_rename() {
955        let df = DataFrame::new(vec![
956            Series::new(PlSmallStr::from("old_name"), vec![1, 2, 3]).into(),
957            Series::new(PlSmallStr::from("keep_name"), vec![4, 5, 6]).into(),
958        ])
959        .unwrap();
960
961        let mut mapping = HashMap::new();
962        mapping.insert("old_name".to_string(), "new_name".to_string());
963
964        let renamed = rename(&df, &mapping).unwrap();
965        assert!(renamed
966            .get_column_names()
967            .iter()
968            .any(|name| *name == &PlSmallStr::from("new_name")));
969        assert!(renamed
970            .get_column_names()
971            .iter()
972            .any(|name| *name == &PlSmallStr::from("keep_name")));
973        assert!(!renamed
974            .get_column_names()
975            .iter()
976            .any(|name| *name == &PlSmallStr::from("old_name")));
977    }
978}