axion_data/dataframe/
core.rs

1use crate::series::{SeriesTrait, Series};
2use crate::dtype::{DataType, DataTypeTrait};
3use crate::error::{AxionError, AxionResult};
4use super::groupby::GroupBy;
5use std::collections::{HashMap, HashSet};
6use std::fmt::{self, Debug};
7use std::cmp::Ordering;
8use rayon::prelude::*;
9use crate::io::csv::WriteCsvOptions;
10use csv;
11use std::io::Write;
12use std::fs::File;
13use std::path::Path;
14
15/// 高性能数据处理框架的核心数据结构 DataFrame。
16/// 
17/// DataFrame 是一个二维表格数据结构,类似于电子表格或数据库表,
18/// 由多个具有相同长度的列(Series)组成。每列可以包含不同类型的数据。
19/// 
20/// # 特性
21/// 
22/// - **类型安全**: 使用 Rust 的类型系统确保数据类型安全
23/// - **高性能**: 利用 Rayon 实现并行处理
24/// - **内存高效**: 零拷贝操作和智能内存管理
25/// - **丰富的操作**: 支持过滤、连接、分组、排序等操作
26/// 
27/// # 示例
28/// 
29/// ```rust
30/// use axion::dataframe::DataFrame;
31/// use axion::series::Series;
32/// 
33/// // 创建一个简单的 DataFrame
34/// let name_series = Series::new("姓名".to_string(), vec!["张三", "李四", "王五"]);
35/// let age_series = Series::new("年龄".to_string(), vec![25, 30, 35]);
36/// 
37/// let df = DataFrame::new(vec![
38///     Box::new(name_series),
39///     Box::new(age_series),
40/// ])?;
41/// 
42/// println!("{}", df);
43/// ```
44#[derive(Clone)]
45pub struct DataFrame {
46    /// DataFrame 的行数
47    height: usize,
48    /// 存储所有列的向量,每列都是一个实现了 SeriesTrait 的对象
49    pub columns: Vec<Box<dyn SeriesTrait>>,
50    /// 列名到数据类型的映射,用于快速查找和验证
51    schema: HashMap<String, DataType>,
52}
53
54impl DataFrame {
55    /// 从列向量创建新的 DataFrame。
56    ///
57    /// # 参数
58    /// 
59    /// * `columns` - 实现了 `SeriesTrait` 的列向量
60    ///
61    /// # 返回值
62    /// 
63    /// 成功时返回新创建的 DataFrame,失败时返回错误
64    ///
65    /// # 错误
66    /// 
67    /// * `AxionError::MismatchedLengths` - 当列长度不一致时
68    /// * `AxionError::DuplicateColumnName` - 当存在重复列名时
69    ///
70    /// # 示例
71    /// 
72    /// ```rust
73    /// let columns = vec![
74    ///     Box::new(Series::new("A".to_string(), vec![1, 2, 3])),
75    ///     Box::new(Series::new("B".to_string(), vec![4, 5, 6])),
76    /// ];
77    /// let df = DataFrame::new(columns)?;
78    /// ```
79    pub fn new(columns: Vec<Box<dyn SeriesTrait>>) -> AxionResult<Self> {
80        let height = columns.first().map_or(0, |col| col.len());
81        let mut schema = HashMap::with_capacity(columns.len());
82
83        for col in &columns {
84            if col.len() != height {
85                return Err(AxionError::MismatchedLengths {
86                    expected: height,
87                    found: col.len(),
88                    name: col.name().to_string(),
89                });
90            }
91            if schema.insert(col.name().to_string(), col.dtype()).is_some() {
92                return Err(AxionError::DuplicateColumnName(col.name().to_string()));
93            }
94        }
95
96        Ok(DataFrame { height, columns, schema })
97    }
98
99    /// 创建一个空的 DataFrame。
100    /// 
101    /// # 返回值
102    /// 
103    /// 返回一个没有行和列的 DataFrame
104    /// 
105    /// # 示例
106    /// 
107    /// ```rust
108    /// let empty_df = DataFrame::new_empty();
109    /// assert_eq!(empty_df.shape(), (0, 0));
110    /// ```
111    pub fn new_empty() -> Self {
112        DataFrame {
113            height: 0,
114            columns: Vec::new(),
115            schema: HashMap::new(),
116        }
117    }
118
119    /// 获取 DataFrame 的形状(行数,列数)。
120    /// 
121    /// # 返回值
122    /// 
123    /// 返回一个元组 `(行数, 列数)`
124    /// 
125    /// # 示例
126    /// 
127    /// ```rust
128    /// let (rows, cols) = df.shape();
129    /// println!("DataFrame 有 {} 行 {} 列", rows, cols);
130    /// ```
131    pub fn shape(&self) -> (usize, usize) {
132        (self.height, self.columns.len())
133    }
134
135    /// 获取 DataFrame 的行数。
136    pub fn height(&self) -> usize {
137        self.height
138    }
139
140    /// 获取 DataFrame 的列数。
141    pub fn width(&self) -> usize {
142        self.columns.len()
143    }
144
145    /// 获取所有列名的向量。
146    /// 
147    /// # 返回值
148    /// 
149    /// 返回包含所有列名的字符串切片向量
150    pub fn columns_names(&self) -> Vec<&str> {
151        self.columns.iter().map(|col| col.name()).collect()
152    }
153
154    /// 获取所有列的数据类型。
155    /// 
156    /// # 返回值
157    /// 
158    /// 返回包含所有列数据类型的向量
159    pub fn dtypes(&self) -> Vec<DataType> {
160        self.columns.iter().map(|col| col.dtype()).collect()
161    }
162
163    /// 获取 DataFrame 的模式(列名到数据类型的映射)。
164    pub fn schema(&self) -> &HashMap<String, DataType> {
165        &self.schema
166    }
167
168    /// 根据列名获取列的引用。
169    ///
170    /// # 参数
171    /// 
172    /// * `name` - 要查找的列名
173    ///
174    /// # 返回值
175    /// 
176    /// 成功时返回列的引用,失败时返回 `ColumnNotFound` 错误
177    pub fn column(&self, name: &str) -> AxionResult<&dyn SeriesTrait> {
178        self.columns
179            .iter()
180            .find(|col| col.name() == name)
181            .map(|col| col.as_ref())
182            .ok_or_else(|| AxionError::ColumnNotFound(name.to_string()))
183    }
184
185    /// 根据列名获取列的可变引用。
186    pub fn column_mut<'a>(&'a mut self, name: &str) -> AxionResult<&'a mut dyn SeriesTrait> {
187        self.columns
188            .iter_mut()
189            .find(|col| col.name() == name)
190            .map(|col| col.as_mut() as &mut dyn SeriesTrait)
191            .ok_or_else(|| AxionError::ColumnNotFound(name.to_string()))
192    }
193
194    /// 根据索引获取列的引用。
195    ///
196    /// # 参数
197    /// 
198    /// * `index` - 列的索引位置
199    pub fn column_at(&self, index: usize) -> AxionResult<&dyn SeriesTrait> {
200        self.columns
201            .get(index)
202            .map(|col| col.as_ref())
203            .ok_or_else(|| AxionError::ColumnNotFound(format!("index {}", index)))
204    }
205
206    /// 根据索引获取列的可变引用。
207    pub fn column_at_mut(&mut self, index: usize) -> AxionResult<&mut dyn SeriesTrait> {
208        self.columns
209            .get_mut(index)
210            .map(|col| col.as_mut() as &mut dyn SeriesTrait)
211            .ok_or_else(|| AxionError::ColumnNotFound(format!("index {}", index)))
212    }
213
214    /// 向 DataFrame 添加一个新列。
215    ///
216    /// # 参数
217    /// 
218    /// * `series` - 要添加的列,必须实现 `SeriesTrait`
219    ///
220    /// # 错误
221    /// 
222    /// * `AxionError::MismatchedLengths` - 新列长度与现有行数不匹配
223    /// * `AxionError::DuplicateColumnName` - 列名已存在
224    ///
225    /// # 示例
226    /// 
227    /// ```rust
228    /// let new_col = Series::new("新列".to_string(), vec![1, 2, 3]);
229    /// df.add_column(Box::new(new_col))?;
230    /// ```
231    pub fn add_column(&mut self, series: Box<dyn SeriesTrait>) -> AxionResult<()> {
232        if self.columns.is_empty() && self.height == 0 {
233            self.height = series.len();
234        } else if series.len() != self.height {
235            return Err(AxionError::MismatchedLengths {
236                expected: self.height,
237                found: series.len(),
238                name: series.name().to_string(),
239            });
240        }
241
242        if self.schema.contains_key(series.name()) {
243            return Err(AxionError::DuplicateColumnName(series.name().to_string()));
244        }
245
246        self.schema.insert(series.name().to_string(), series.dtype());
247        self.columns.push(series);
248        Ok(())
249    }
250
251    /// 从 DataFrame 中删除指定列。
252    ///
253    /// # 参数
254    /// 
255    /// * `name` - 要删除的列名
256    ///
257    /// # 返回值
258    /// 
259    /// 返回被删除的列
260    ///
261    /// # 错误
262    /// 
263    /// * `AxionError::ColumnNotFound` - 指定列不存在
264    pub fn drop_column(&mut self, name: &str) -> AxionResult<Box<dyn SeriesTrait>> {
265        let position = self.columns.iter().position(|col| col.name() == name);
266
267        if let Some(pos) = position {
268            self.schema.remove(name);
269            let removed_col = self.columns.remove(pos);
270            if self.columns.is_empty() {
271                self.height = 0;
272            }
273            Ok(removed_col)
274        } else {
275            Err(AxionError::ColumnNotFound(name.to_string()))
276        }
277    }
278
279    /// 重命名 DataFrame 中的列。
280    ///
281    /// # 参数
282    /// 
283    /// * `old_name` - 当前列名
284    /// * `new_name` - 新列名
285    ///
286    /// # 错误
287    /// 
288    /// * `AxionError::ColumnNotFound` - 原列名不存在
289    /// * `AxionError::DuplicateColumnName` - 新列名已存在
290    pub fn rename_column(&mut self, old_name: &str, new_name: &str) -> AxionResult<()> {
291        if old_name == new_name {
292            return Ok(());
293        }
294
295        if !self.schema.contains_key(old_name) {
296            return Err(AxionError::ColumnNotFound(old_name.to_string()));
297        }
298
299        if self.schema.contains_key(new_name) {
300            return Err(AxionError::DuplicateColumnName(new_name.to_string()));
301        }
302
303        let dtype = self.schema.remove(old_name).unwrap();
304        self.schema.insert(new_name.to_string(), dtype);
305
306        for col in self.columns.iter_mut() {
307            if col.name() == old_name {
308                col.rename(new_name);
309                break;
310            }
311        }
312        Ok(())
313    }
314
315    /// 将列向下转型为特定类型的 Series。
316    ///
317    /// # 类型参数
318    /// 
319    /// * `T` - 目标数据类型
320    ///
321    /// # 参数
322    /// 
323    /// * `name` - 列名
324    ///
325    /// # 返回值
326    /// 
327    /// 成功时返回指定类型的 Series 引用
328    pub fn downcast_column<T>(&self, name: &str) -> AxionResult<&Series<T>>
329    where
330        T: DataTypeTrait + 'static,
331        Series<T>: 'static,
332    {
333        let series_trait = self.column(name)?;
334        series_trait
335            .as_any()
336            .downcast_ref::<Series<T>>()
337            .ok_or_else(|| AxionError::TypeMismatch {
338                expected: T::DTYPE,
339                found: series_trait.dtype(),
340                name: name.to_string(),
341            })
342    }
343
344    /// 检查 DataFrame 是否为空。
345    /// 
346    /// # 返回值
347    /// 
348    /// 如果没有行或没有列则返回 true
349    pub fn is_empty(&self) -> bool {
350        self.height == 0 || self.columns.is_empty()
351    }
352
353    /// 获取 DataFrame 的前 n 行。
354    ///
355    /// # 参数
356    /// 
357    /// * `n` - 要获取的行数
358    ///
359    /// # 返回值
360    /// 
361    /// 返回包含前 n 行的新 DataFrame
362    pub fn head(&self, n: usize) -> DataFrame {
363        let n = std::cmp::min(n, self.height);
364        if n == self.height {
365            return self.clone();
366        }
367        let new_columns = self.columns.iter().map(|col| col.slice(0, n)).collect();
368        DataFrame::new(new_columns).unwrap_or_else(|_| {
369            DataFrame::new(vec![]).unwrap()
370        })
371    }
372
373    /// 获取 DataFrame 的后 n 行。
374    ///
375    /// # 参数
376    /// 
377    /// * `n` - 要获取的行数
378    pub fn tail(&self, n: usize) -> DataFrame {
379        let n = std::cmp::min(n, self.height);
380        if n == self.height {
381            return self.clone();
382        }
383        let offset = self.height - n;
384        let new_columns = self.columns.iter().map(|col| col.slice(offset, n)).collect();
385        DataFrame::new(new_columns).unwrap_or_else(|_| {
386            DataFrame::new(vec![]).unwrap()
387        })
388    }
389
390    /// 选择指定的列创建新的 DataFrame。
391    ///
392    /// # 参数
393    /// 
394    /// * `names` - 要选择的列名数组
395    ///
396    /// # 返回值
397    /// 
398    /// 返回只包含指定列的新 DataFrame
399    pub fn select(&self, names: &[&str]) -> AxionResult<DataFrame> {
400        let mut new_columns = Vec::with_capacity(names.len());
401        for name in names {
402            let col = self.column(name)?;
403            new_columns.push(col.clone_box());
404        }
405        DataFrame::new(new_columns)
406    }
407
408    /// 删除指定列后创建新的 DataFrame。
409    ///
410    /// # 参数
411    /// 
412    /// * `name_to_drop` - 要删除的列名
413    pub fn drop(&self, name_to_drop: &str) -> AxionResult<DataFrame> {
414        if !self.schema.contains_key(name_to_drop) {
415            return Err(AxionError::ColumnNotFound(name_to_drop.to_string()));
416        }
417
418        let new_columns = self.columns
419            .iter()
420            .filter(|col| col.name() != name_to_drop)
421            .map(|col| col.clone_box())
422            .collect();
423
424        DataFrame::new(new_columns)
425    }
426
427    /// 根据布尔掩码过滤 DataFrame 行。
428    ///
429    /// # 参数
430    /// 
431    /// * `mask` - 布尔类型的 Series,true 表示保留该行
432    ///
433    /// # 返回值
434    /// 
435    /// 返回过滤后的新 DataFrame
436    ///
437    /// # 错误
438    /// 
439    /// * `AxionError::MismatchedLengths` - 掩码长度与 DataFrame 行数不匹配
440    pub fn filter(&self, mask: &Series<bool>) -> AxionResult<DataFrame> {
441        if mask.len() != self.height {
442            return Err(AxionError::MismatchedLengths {
443                expected: self.height,
444                found: mask.len(),
445                name: "过滤掩码".to_string(),
446            });
447        }
448
449        let mut filtered_columns = Vec::with_capacity(self.columns.len());
450        for col in &self.columns {
451            let filtered_col = col.filter(mask)?;
452            filtered_columns.push(filtered_col);
453        }
454
455        DataFrame::new(filtered_columns)
456    }
457
458    /// 并行过滤 DataFrame 行,提供更好的性能。
459    ///
460    /// 该方法使用 Rayon 并行处理每一列的过滤操作,
461    /// 在处理大型数据集时能显著提升性能。
462    ///
463    /// # 参数
464    /// 
465    /// * `mask` - 布尔类型的 Series,true 表示保留该行
466    ///
467    /// # 返回值
468    /// 
469    /// 返回过滤后的新 DataFrame
470    pub fn par_filter(&self, mask: &Series<bool>) -> AxionResult<DataFrame> {
471        if mask.len() != self.height {
472            return Err(AxionError::MismatchedLengths {
473                expected: self.height,
474                found: mask.len(),
475                name: "过滤掩码".to_string(),
476            });
477        }
478        if self.is_empty() {
479            return Ok(self.clone());
480        }
481        if mask.is_empty() && self.height > 0 {
482             return Err(AxionError::MismatchedLengths {
483                expected: self.height,
484                found: mask.len(),
485                name: "非空DataFrame的过滤掩码".to_string(),
486            });
487        }
488        if mask.is_empty() && self.height == 0 {
489            return Ok(self.clone());
490        }
491
492        let new_columns_results: Vec<AxionResult<Box<dyn SeriesTrait>>> = self
493            .columns
494            .par_iter()
495            .map(|col| col.filter(mask))
496            .collect();
497
498        let mut new_columns = Vec::with_capacity(new_columns_results.len());
499        for result in new_columns_results {
500            new_columns.push(result?);
501        }
502
503        DataFrame::new(new_columns)
504    }
505
506    /// 内连接操作。
507    ///
508    /// 只保留两个 DataFrame 中连接键都存在的行。
509    ///
510    /// # 参数
511    /// 
512    /// * `right` - 右侧 DataFrame
513    /// * `left_on` - 左侧连接键列名
514    /// * `right_on` - 右侧连接键列名
515    ///
516    /// # 返回值
517    /// 
518    /// 返回连接后的新 DataFrame
519    pub fn inner_join(
520        &self,
521        right: &DataFrame,
522        left_on: &str,
523        right_on: &str,
524    ) -> AxionResult<DataFrame> {
525        let left_key_col: &Series<String> = self.downcast_column(left_on).map_err(|e| match e {
526            AxionError::ColumnNotFound(_) => AxionError::ColumnNotFound(format!("左侧连接键列 '{}'", left_on)),
527            AxionError::TypeMismatch { expected: _, found, name } => AxionError::JoinKeyTypeError {
528                side: "左侧".to_string(),
529                name,
530                expected: DataType::String,
531                found,
532            },
533            other => other,
534        })?;
535        let right_key_col: &Series<String> = right.downcast_column(right_on).map_err(|e| match e {
536            AxionError::ColumnNotFound(_) => AxionError::ColumnNotFound(format!("右侧连接键列 '{}'", right_on)),
537            AxionError::TypeMismatch { expected: _, found, name } => AxionError::JoinKeyTypeError {
538                side: "右侧".to_string(),
539                name,
540                expected: DataType::String,
541                found,
542            },
543            other => other,
544        })?;
545
546        let mut right_indices_map: HashMap<&Option<String>, Vec<usize>> = HashMap::new();
547        for (idx, opt_key) in right_key_col.data_internal().iter().enumerate() {
548            right_indices_map.entry(opt_key).or_default().push(idx);
549        }
550
551        let mut join_indices: Vec<(usize, usize)> = Vec::new();
552        for (left_idx, left_opt_key) in left_key_col.data_internal().iter().enumerate() {
553            if let Some(right_indices) = right_indices_map.get(left_opt_key) {
554                for &right_idx in right_indices {
555                    join_indices.push((left_idx, right_idx));
556                }
557            }
558        }
559
560        let (left_result_indices, right_result_indices): (Vec<usize>, Vec<usize>) =
561            join_indices.into_iter().unzip();
562
563        let mut result_columns: Vec<Box<dyn SeriesTrait>> =
564            Vec::with_capacity(self.width() + right.width() - 1);
565        let mut left_column_names: HashSet<String> = HashSet::with_capacity(self.width());
566
567        for col in &self.columns {
568            let taken_left_col = col.take_indices(&left_result_indices)?;
569            left_column_names.insert(taken_left_col.name().to_string());
570            result_columns.push(taken_left_col);
571        }
572
573        for col in &right.columns {
574            if col.name() != right_on {
575                let original_right_name = col.name();
576                let mut taken_right_col = col.take_indices(&right_result_indices)?;
577
578                if left_column_names.contains(original_right_name) {
579                    let new_name = format!("{}_right", original_right_name);
580                    taken_right_col.rename(&new_name);
581                    result_columns.push(taken_right_col);
582                } else {
583                    result_columns.push(taken_right_col);
584                }
585            }
586        }
587
588        DataFrame::new(result_columns)
589    }
590
591    /// 左连接操作。
592    ///
593    /// 保留左侧 DataFrame 的所有行,如果右侧没有匹配则填充空值。
594    pub fn left_join(
595        &self,
596        right: &DataFrame,
597        left_on: &str,
598        right_on: &str,
599    ) -> AxionResult<DataFrame> {
600        let left_key_col: &Series<String> = self
601            .downcast_column(left_on)
602            .map_err(|e| match e {
603                AxionError::ColumnNotFound(_) => AxionError::ColumnNotFound(format!("left key column '{}'", left_on)),
604                AxionError::TypeMismatch { expected: _, found, name } => AxionError::JoinKeyTypeError {
605                    side: "left".to_string(), name, expected: DataType::String, found,
606                },
607                other => other,
608            })?;
609
610        let right_key_col: &Series<String> = right
611            .downcast_column(right_on)
612            .map_err(|e| match e {
613                AxionError::ColumnNotFound(_) => AxionError::ColumnNotFound(format!("right key column '{}'", right_on)),
614                AxionError::TypeMismatch { expected: _, found, name } => AxionError::JoinKeyTypeError {
615                    side: "right".to_string(), name, expected: DataType::String, found,
616                },
617                other => other,
618            })?;
619
620        let mut right_indices_map: HashMap<&Option<String>, Vec<usize>> = HashMap::new();
621        for (idx, opt_key) in right_key_col.data_internal().iter().enumerate() {
622            right_indices_map.entry(opt_key).or_default().push(idx);
623        }
624
625        let mut join_indices: Vec<(usize, Option<usize>)> = Vec::new();
626        for (left_idx, left_opt_key) in left_key_col.data_internal().iter().enumerate() {
627            if let Some(right_indices) = right_indices_map.get(left_opt_key) {
628                for &right_idx in right_indices {
629                    join_indices.push((left_idx, Some(right_idx)));
630                }
631            } else {
632                join_indices.push((left_idx, None));
633            }
634        }
635
636        let (left_result_indices, right_result_indices): (Vec<usize>, Vec<Option<usize>>) =
637            join_indices.into_iter().unzip();
638
639        let mut result_columns: Vec<Box<dyn SeriesTrait>> =
640            Vec::with_capacity(self.width() + right.width() - 1);
641        let mut left_column_names: HashSet<String> = HashSet::with_capacity(self.width());
642
643        for col in &self.columns {
644            let taken_left_col = col.take_indices(&left_result_indices)?;
645            left_column_names.insert(taken_left_col.name().to_string());
646            result_columns.push(taken_left_col);
647        }
648
649        for col in &right.columns {
650            if col.name() != right_on {
651                let original_right_name = col.name();
652                let mut taken_right_col = col.take_indices_option(&right_result_indices)?;
653
654                if left_column_names.contains(original_right_name) {
655                    let new_name = format!("{}_right", original_right_name);
656                    taken_right_col.rename(&new_name);
657                    result_columns.push(taken_right_col);
658                } else {
659                    result_columns.push(taken_right_col);
660                }
661            }
662        }
663
664        DataFrame::new(result_columns)
665    }
666
667    /// 右连接操作。
668    ///
669    /// 保留右侧 DataFrame 的所有行,如果左侧没有匹配则填充空值。
670    pub fn right_join(
671        &self,
672        right: &DataFrame,
673        left_on: &str,
674        right_on: &str,
675    ) -> AxionResult<DataFrame> {
676        let left_key_col: &Series<String> = self
677            .downcast_column(left_on)
678            .map_err(|e| match e {
679                AxionError::ColumnNotFound(_) => AxionError::ColumnNotFound(format!("left key column '{}'", left_on)),
680                AxionError::TypeMismatch { expected: _, found, name } => AxionError::JoinKeyTypeError {
681                    side: "left".to_string(), name, expected: DataType::String, found,
682                },
683                other => other,
684            })?;
685
686        let right_key_col: &Series<String> = right
687            .downcast_column(right_on)
688            .map_err(|e| match e {
689                AxionError::ColumnNotFound(_) => AxionError::ColumnNotFound(format!("right key column '{}'", right_on)),
690                AxionError::TypeMismatch { expected: _, found, name } => AxionError::JoinKeyTypeError {
691                    side: "right".to_string(), name, expected: DataType::String, found,
692                },
693                other => other,
694            })?;
695
696        let mut left_indices_map: HashMap<&Option<String>, Vec<usize>> = HashMap::new();
697        for (idx, opt_key) in left_key_col.data_internal().iter().enumerate() {
698            left_indices_map.entry(opt_key).or_default().push(idx);
699        }
700
701        let mut join_indices: Vec<(Option<usize>, usize)> = Vec::new();
702        for (right_idx, right_opt_key) in right_key_col.data_internal().iter().enumerate() {
703            if let Some(left_indices) = left_indices_map.get(right_opt_key) {
704                for &left_idx in left_indices {
705                    join_indices.push((Some(left_idx), right_idx));
706                }
707            } else {
708                join_indices.push((None, right_idx));
709            }
710        }
711
712        let (left_result_indices, right_result_indices): (Vec<Option<usize>>, Vec<usize>) =
713            join_indices.into_iter().unzip();
714
715        let mut result_columns: Vec<Box<dyn SeriesTrait>> =
716            Vec::with_capacity(self.width() + right.width() - 1);
717        let mut right_column_names: HashSet<String> = HashSet::with_capacity(right.width());
718
719        for col in &right.columns {
720            let taken_right_col = col.take_indices(&right_result_indices)?;
721            right_column_names.insert(taken_right_col.name().to_string());
722            result_columns.push(taken_right_col);
723        }
724
725        for col in &self.columns {
726            if col.name() != left_on {
727                let original_left_name = col.name();
728                let mut taken_left_col = col.take_indices_option(&left_result_indices)?;
729
730                if right_column_names.contains(original_left_name) {
731                    let new_name = format!("{}_left", original_left_name);
732                    taken_left_col.rename(&new_name);
733                    result_columns.push(taken_left_col);
734                } else {
735                    result_columns.push(taken_left_col);
736                }
737            }
738        }
739
740        DataFrame::new(result_columns)
741    }
742
743    /// 外连接操作。
744    ///
745    /// 保留两个 DataFrame 的所有行,没有匹配的地方填充空值。
746    pub fn outer_join(
747        &self,
748        right: &DataFrame,
749        left_on: &str,
750        right_on: &str,
751    ) -> AxionResult<DataFrame> {
752        let left_key_col: &Series<String> = self
753            .downcast_column(left_on)
754            .map_err(|e| match e {
755                AxionError::ColumnNotFound(_) => AxionError::ColumnNotFound(format!("left key column '{}'", left_on)),
756                AxionError::TypeMismatch { expected: _, found, name } => AxionError::JoinKeyTypeError {
757                    side: "left".to_string(),
758                    name,
759                    expected: DataType::String,
760                    found,
761                },
762                other => other,
763            })?;
764        let right_key_col: &Series<String> = right
765            .downcast_column(right_on)
766            .map_err(|e| match e {
767                AxionError::ColumnNotFound(_) => AxionError::ColumnNotFound(format!("right key column '{}'", right_on)),
768                AxionError::TypeMismatch { expected: _, found, name } => AxionError::JoinKeyTypeError {
769                    side: "right".to_string(),
770                    name,
771                    expected: DataType::String,
772                    found,
773                },
774                other => other,
775            })?;
776
777        let mut right_indices_map: HashMap<&Option<String>, Vec<usize>> = HashMap::new();
778        for (idx, opt_key) in right_key_col.data_internal().iter().enumerate() {
779            right_indices_map.entry(opt_key).or_default().push(idx);
780        }
781
782        let mut join_indices: Vec<(Option<usize>, Option<usize>)> = Vec::new();
783        let mut used_right_indices: HashSet<usize> = HashSet::new();
784
785        for (left_idx, left_opt_key) in left_key_col.data_internal().iter().enumerate() {
786            if let Some(right_indices) = right_indices_map.get(left_opt_key) {
787                for &right_idx in right_indices {
788                    join_indices.push((Some(left_idx), Some(right_idx)));
789                    used_right_indices.insert(right_idx);
790                }
791            } else {
792                join_indices.push((Some(left_idx), None));
793            }
794        }
795
796        for (right_idx, _right_opt_key) in right_key_col.data_internal().iter().enumerate() {
797            if !used_right_indices.contains(&right_idx) {
798                join_indices.push((None, Some(right_idx)));
799            }
800        }
801
802        let (left_result_indices, right_result_indices): (Vec<Option<usize>>, Vec<Option<usize>>) =
803            join_indices.into_iter().unzip();
804
805        let mut result_columns: Vec<Box<dyn SeriesTrait>> =
806            Vec::with_capacity(self.width() + right.width() - 1);
807        let mut left_column_names: HashSet<String> = HashSet::with_capacity(self.width());
808
809        for col in &self.columns {
810            let taken_left_col = col.take_indices_option(&left_result_indices)?;
811            left_column_names.insert(taken_left_col.name().to_string());
812            result_columns.push(taken_left_col);
813        }
814
815        for col in &right.columns {
816            if col.name() != right_on {
817                let original_right_name = col.name();
818                let mut taken_right_col = col.take_indices_option(&right_result_indices)?;
819
820                if left_column_names.contains(original_right_name) {
821                    let new_name = format!("{}_right", original_right_name);
822                    taken_right_col.rename(&new_name);
823                    result_columns.push(taken_right_col);
824                } else {
825                    result_columns.push(taken_right_col);
826                }
827            }
828        }
829
830        DataFrame::new(result_columns)
831    }
832
833    /// 创建分组操作对象。
834    ///
835    /// # 参数
836    /// 
837    /// * `keys` - 用于分组的列名数组
838    ///
839    /// # 返回值
840    /// 
841    /// 返回 GroupBy 对象,可用于执行聚合操作
842    ///
843    /// # 示例
844    /// 
845    /// ```rust
846    /// let grouped = df.groupby(&["类别"])?;
847    /// let result = grouped.sum()?;
848    /// ```
849    pub fn groupby<'a>(&'a self, keys: &[&str]) -> AxionResult<GroupBy<'a>> {
850        let key_strings: Vec<String> = keys.iter().map(|s| s.to_string()).collect();
851        GroupBy::new(self, key_strings)
852    }
853
854    /// 对 DataFrame 进行排序。
855    ///
856    /// # 参数
857    /// 
858    /// * `by` - 用于排序的列名数组
859    /// * `descending` - 对应每列的排序方向,true 为降序,false 为升序
860    ///
861    /// # 返回值
862    /// 
863    /// 返回排序后的新 DataFrame
864    ///
865    /// # 错误
866    /// 
867    /// * `AxionError::InvalidArgument` - 列名数组和排序方向数组长度不匹配
868    /// * `AxionError::UnsupportedOperation` - 尝试对不支持排序的数据类型进行排序
869    ///
870    /// # 示例
871    /// 
872    /// ```rust
873    /// // 按年龄升序,姓名降序排序
874    /// let sorted_df = df.sort(&["年龄", "姓名"], &[false, true])?;
875    /// ```
876    pub fn sort(&self, by: &[&str], descending: &[bool]) -> AxionResult<DataFrame> {
877        if by.is_empty() {
878            return Ok(self.clone());
879        }
880        if by.len() != descending.len() {
881            return Err(AxionError::InvalidArgument(
882                "排序键数量和降序标志数量必须匹配".to_string(),
883            ));
884        }
885
886        let mut sort_key_columns: Vec<&dyn SeriesTrait> = Vec::with_capacity(by.len());
887        for key_name in by {
888            let col = self.column(key_name)?;
889            if let DataType::List(_) = col.dtype() {
890                return Err(AxionError::UnsupportedOperation(format!(
891                    "列 '{}' 的 List 类型不支持排序", key_name
892                )));
893            }
894            sort_key_columns.push(col);
895        }
896
897        let height = self.height();
898        let mut indices: Vec<usize> = (0..height).collect();
899
900        indices.sort_unstable_by(|&a_idx, &b_idx| {
901            for (i, key_col) in sort_key_columns.iter().enumerate() {
902                let order = key_col.compare_row(a_idx, b_idx);
903                let current_order = if descending[i] { order.reverse() } else { order };
904
905                if current_order != Ordering::Equal {
906                    return current_order;
907                }
908            }
909            Ordering::Equal
910        });
911
912        let mut sorted_columns: Vec<Box<dyn SeriesTrait>> = Vec::with_capacity(self.columns.len());
913        for col in &self.columns {
914            let sorted_col = col.take_indices(&indices)?;
915            sorted_columns.push(sorted_col);
916        }
917
918        DataFrame::new(sorted_columns)
919    }
920
921    /// 将 DataFrame 导出为 CSV 文件。
922    ///
923    /// # 参数
924    /// 
925    /// * `filepath` - 输出文件路径
926    /// * `options` - 可选的 CSV 写入配置
927    ///
928    /// # 错误
929    /// 
930    /// * `AxionError::IoError` - 文件创建或写入失败
931    ///
932    /// # 示例
933    /// 
934    /// ```rust
935    /// use axion::io::csv::WriteCsvOptions;
936    /// 
937    /// // 使用默认配置导出
938    /// df.to_csv("output.csv", None)?;
939    /// 
940    /// // 使用自定义配置导出
941    /// let options = WriteCsvOptions {
942    ///     has_header: true,
943    ///     delimiter: b';',
944    ///     ..Default::default()
945    /// };
946    /// df.to_csv("output.csv", Some(options))?;
947    /// ```
948    pub fn to_csv(&self, filepath: impl AsRef<Path>, options: Option<WriteCsvOptions>) -> AxionResult<()> {
949        let path_ref = filepath.as_ref();
950        let mut file_writer = File::create(path_ref)
951            .map_err(|e| AxionError::IoError(format!("无法创建或打开文件 {:?}: {}", path_ref, e)))?;
952        self.to_csv_writer(&mut file_writer, options)
953    }
954
955    /// 将 DataFrame 写入到实现了 Write trait 的写入器中。
956    ///
957    /// 这是核心的 CSV 写入逻辑。
958    ///
959    /// # 参数
960    /// 
961    /// * `writer` - 实现了 `std::io::Write` 的写入器
962    /// * `options` - 可选的 CSV 写入配置
963    pub fn to_csv_writer<W: Write>(&self, writer: &mut W, options: Option<WriteCsvOptions>) -> AxionResult<()> {
964        let opts = options.unwrap_or_default();
965
966        let mut csv_builder = csv::WriterBuilder::new();
967        csv_builder.delimiter(opts.delimiter);
968
969        csv_builder.quote_style(match opts.quote_style {
970            crate::io::csv::QuoteStyle::Always => csv::QuoteStyle::Always,
971            crate::io::csv::QuoteStyle::Necessary => csv::QuoteStyle::Necessary,
972            crate::io::csv::QuoteStyle::Never => csv::QuoteStyle::Never,
973            crate::io::csv::QuoteStyle::NonNumeric => csv::QuoteStyle::NonNumeric,
974        });
975
976        if opts.line_terminator == "\r\n" {
977            csv_builder.terminator(csv::Terminator::CRLF);
978        } else if opts.line_terminator == "\n" {
979            csv_builder.terminator(csv::Terminator::Any(b'\n'));
980        } else if opts.line_terminator.len() == 1 {
981            csv_builder.terminator(csv::Terminator::Any(opts.line_terminator.as_bytes()[0]));
982        } else {
983            return Err(AxionError::CsvError(format!(
984                "不支持的行终止符: {:?}",
985                opts.line_terminator
986            )));
987        }
988
989        let mut csv_writer = csv_builder.from_writer(writer);
990
991        if opts.has_header && self.width() > 0 {
992            if let Err(e) = csv_writer.write_record(self.columns_names()) {
993                 return Err(AxionError::from(e));
994            }
995        }
996
997        if self.width() > 0 {
998            let mut record_buffer: Vec<String> = Vec::with_capacity(self.width());
999            for row_idx in 0..self.height() {
1000                record_buffer.clear();
1001                for col_idx in 0..self.width() {
1002                    let series = self.column_at(col_idx)?;
1003                     let value_to_write: String;
1004
1005                     if series.is_null_at(row_idx) {
1006                         value_to_write = opts.na_rep.clone();
1007                     } else {
1008                         match series.get_str(row_idx) {
1009                                Some(s_val) => {
1010                                    value_to_write = s_val;
1011                                }
1012                                None => {
1013                                    return Err(AxionError::InternalError(format!(
1014                                        "无法获取位置 ({}, {}) 的字符串表示,列名: '{}'",
1015                                        row_idx, col_idx, series.name()
1016                                    )));
1017                                }
1018                            }
1019                        }
1020                     record_buffer.push(value_to_write);
1021                }
1022                if let Err(e) = csv_writer.write_record(&record_buffer) {
1023                    return Err(AxionError::from(e));
1024                }
1025            }
1026        }
1027
1028        if let Err(e) = csv_writer.flush() {
1029            return Err(AxionError::from(e));
1030        }
1031
1032        Ok(())
1033    }
1034}
1035
1036impl PartialEq for DataFrame {
1037    fn eq(&self, other: &Self) -> bool {
1038        if self.shape() != other.shape() {
1039            return false;
1040        }
1041
1042        if self.columns_names() != other.columns_names() {
1043            return false;
1044        }
1045
1046        for col_name in self.columns_names() {
1047            let self_col = self.column(col_name).unwrap();
1048            let other_col = other.column(col_name).unwrap();
1049
1050            if format!("{:?}", self_col) != format!("{:?}", other_col) {
1051                return false;
1052            }
1053        }
1054
1055        true
1056    }
1057}
1058
1059impl Debug for DataFrame {
1060    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1061        f.debug_struct("DataFrame")
1062            .field("height", &self.height)
1063            .field("columns_count", &self.columns.len())
1064            .field("schema", &self.schema)
1065            .finish()
1066    }
1067}
1068
1069impl fmt::Display for DataFrame {
1070    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1071        if self.is_empty() {
1072            return writeln!(f, "DataFrame (0x0)");
1073        }
1074
1075        const MAX_ROWS_TO_PRINT: usize = 10;
1076        const MIN_COL_WIDTH: usize = 5;
1077        const NULL_STR: &str = "null";
1078
1079        let height = self.height();
1080        let width = self.width();
1081        let num_rows_to_print = std::cmp::min(height, MAX_ROWS_TO_PRINT);
1082
1083        let col_names = self.columns_names();
1084        let dtypes: Vec<String> = self.dtypes().iter().map(|dt| format!("{:?}", dt)).collect();
1085
1086        let mut col_widths: Vec<usize> = Vec::with_capacity(width);
1087        for i in 0..width {
1088            let name_len = col_names[i].len();
1089            let type_len = dtypes[i].len();
1090            let mut max_data_len = MIN_COL_WIDTH;
1091
1092            for row_idx in 0..num_rows_to_print {
1093                if let Some(val_str) = self.columns[i].get_str(row_idx) {
1094                    max_data_len = std::cmp::max(max_data_len, val_str.len());
1095                } else {
1096                    max_data_len = std::cmp::max(max_data_len, NULL_STR.len());
1097                }
1098            }
1099            col_widths.push(std::cmp::max(MIN_COL_WIDTH, std::cmp::max(name_len, std::cmp::max(type_len, max_data_len))));
1100        }
1101
1102        write!(f, "+")?;
1103        for w in &col_widths { write!(f, "{:-<width$}+", "", width = w + 2)?; }
1104        writeln!(f)?;
1105
1106        write!(f, "|")?;
1107        for (i, name) in col_names.iter().enumerate() {
1108            write!(f, " {:<width$} |", name, width = col_widths[i])?;
1109        }
1110        writeln!(f)?;
1111
1112        write!(f, "|")?;
1113        for w in &col_widths { write!(f, "{:-<width$}|", "", width = w + 2)?; }
1114        writeln!(f)?;
1115
1116        write!(f, "|")?;
1117        for (i, dtype_str) in dtypes.iter().enumerate() {
1118            write!(f, " {:<width$} |", dtype_str, width = col_widths[i])?;
1119        }
1120        writeln!(f)?;
1121
1122        write!(f, "+")?;
1123        for w in &col_widths { write!(f, "{:=<width$}+", "", width = w + 2)?; }
1124        writeln!(f)?;
1125
1126        for row_idx in 0..num_rows_to_print {
1127            write!(f, "|")?;
1128            for (col_idx, col) in self.columns.iter().enumerate() {
1129                let val_str = col.get_str(row_idx).unwrap_or_else(|| NULL_STR.to_string());
1130                write!(f, " {:<width$} |", val_str, width = col_widths[col_idx])?;
1131            }
1132            writeln!(f)?;
1133            write!(f, "+")?;
1134            for w in &col_widths { write!(f, "{:-<width$}+", "", width = w + 2)?; }
1135            writeln!(f)?;
1136        }
1137
1138        if height > num_rows_to_print {
1139            writeln!(f, "... (还有 {} 行)", height - num_rows_to_print)?;
1140        }
1141
1142        Ok(())
1143    }
1144}
1145
1146// #[macro_export]
1147// macro_rules! df {
1148//     ( $( $col_name:literal : $col_type:ty => $col_data:expr ),* $(,)? ) => {
1149//         {
1150//             let mut columns: Vec<Box<dyn $crate::series::SeriesTrait>> = Vec::new();
1151//             $(
1152//                 let series = $crate::series::Series::<$col_type>::new($col_name.into(), $col_data);
1153//                 columns.push(Box::new(series));
1154//             )*
1155//             $crate::dataframe::DataFrame::new(columns)
1156//         }
1157//     };
1158
1159//     ( $( $col_name:literal => $col_data:expr ),* $(,)? ) => {
1160//         {
1161//             use $crate::series::IntoSeriesBox;
1162//             let mut columns: Vec<Box<dyn $crate::series::SeriesTrait>> = Vec::new();
1163//             $(
1164//                 let boxed_series = ($col_data).into_series_box($col_name.into());
1165//                 columns.push(boxed_series);
1166//             )*
1167//             $crate::dataframe::DataFrame::new(columns)
1168//         }
1169//     };
1170// }
1171