dfkit/
commands.rs

1use crate::utils::{DfKitError, file_type, register_table, write_output};
2use datafusion::arrow::compute::concat_batches;
3use datafusion::datasource::MemTable;
4use datafusion::logical_expr::col;
5use datafusion::prelude::SessionContext;
6use std::fs;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10pub async fn view(
11    ctx: &SessionContext,
12    filename: &Path,
13    limit: Option<usize>,
14) -> Result<(), DfKitError> {
15    let df = register_table(&ctx, "t", &filename).await?;
16    let limit = limit.unwrap_or(10);
17
18    if limit > 0 {
19        df.show_limit(limit).await?;
20    } else {
21        df.show().await?;
22    }
23
24    Ok(())
25}
26
27pub async fn query(
28    ctx: &SessionContext,
29    filename: &Path,
30    sql: Option<String>,
31    output: Option<PathBuf>,
32) -> Result<(), DfKitError> {
33    let file_type = file_type(&filename)?;
34    let _ = register_table(&ctx, "t", &filename).await?;
35    let df_sql = ctx.sql(&*sql.unwrap()).await?;
36
37    if let Some(path) = output {
38        write_output(df_sql, &path, &file_type).await?;
39        println!("File written to: {}, successfully.", path.display());
40    } else {
41        df_sql.show().await?;
42    }
43
44    Ok(())
45}
46
47pub async fn convert(
48    ctx: &SessionContext,
49    filename: &Path,
50    output_filename: &Path,
51) -> Result<(), DfKitError> {
52    let df = register_table(ctx, "t", &filename).await?;
53    let output_file_type = file_type(&output_filename)?;
54
55    write_output(df, &output_filename, &output_file_type).await?;
56    Ok(())
57}
58
59pub async fn describe(ctx: &SessionContext, filename: &Path) -> Result<(), DfKitError> {
60    let df = register_table(&ctx, "t", &filename).await?;
61    let describe = df.describe().await?;
62    describe.show().await?;
63    Ok(())
64}
65
66pub async fn schema(ctx: &SessionContext, filename: &Path) -> Result<(), DfKitError> {
67    let _ = register_table(&ctx, "t", &filename).await?;
68    let sql = "SELECT column_name, data_type, is_nullable \
69                                FROM information_schema.columns WHERE table_name = 't'";
70    let df = ctx.sql(sql).await?;
71    df.show().await?;
72    Ok(())
73}
74
75pub async fn count(ctx: &SessionContext, filename: &Path) -> Result<(), DfKitError> {
76    let _ = register_table(&ctx, "t", &filename).await?;
77    let sql = "SELECT COUNT(*) FROM t";
78    let df = ctx.sql(&sql).await?;
79    df.show().await?;
80
81    Ok(())
82}
83
84pub async fn sort(
85    ctx: &SessionContext,
86    filename: &Path,
87    columns: &[String],
88    descending: bool,
89    output: Option<PathBuf>,
90) -> Result<(), DfKitError> {
91    let df = register_table(ctx, "t", filename).await?;
92
93    let sort_exprs = columns
94        .iter()
95        .map(|col_name| col(col_name).sort(!descending, descending))
96        .collect();
97
98    let sorted_df = df.sort(sort_exprs)?;
99
100    if let Some(out_path) = output {
101        let format = file_type(&out_path)?;
102        write_output(sorted_df, &out_path, &format).await?;
103        println!("Sorted file written to: {}", out_path.display());
104    } else {
105        sorted_df.show().await?;
106    }
107
108    Ok(())
109}
110
111pub async fn reverse(
112    ctx: &SessionContext,
113    filename: &Path,
114    output: Option<PathBuf>,
115) -> Result<(), DfKitError> {
116    let df = register_table(ctx, "t", filename).await?;
117    let batches = df.collect().await?;
118
119    let schema = batches[0].schema();
120    let mut all_rows = vec![];
121    for batch in &batches {
122        for row in 0..batch.num_rows() {
123            all_rows.push(batch.slice(row, 1));
124        }
125    }
126
127    all_rows.reverse();
128
129    let reversed_batch = concat_batches(&schema, &all_rows)?;
130
131    let provider = MemTable::try_new(schema, vec![vec![reversed_batch]])?;
132    ctx.register_table("reversed", Arc::new(provider))?;
133    let reversed_df = ctx.table("reversed").await?;
134
135    if let Some(out_path) = output {
136        let format = file_type(&out_path)?;
137        write_output(reversed_df, &out_path, &format).await?;
138        println!("Reversed file written to: {}", out_path.display());
139    } else {
140        reversed_df.show().await?;
141    }
142
143    Ok(())
144}
145
146pub async fn dfsplit(
147    ctx: &SessionContext,
148    filename: &Path,
149    chunks: usize,
150    output_dir: &Path,
151) -> Result<(), DfKitError> {
152    if chunks == 0 {
153        return Err(DfKitError::CustomError(
154            "Chunks must be greater than 0".into(),
155        ));
156    }
157    let df = register_table(ctx, "t", filename).await?;
158    let total_rows = df.clone().count().await?;
159    let mut rows_per_chunk = total_rows / chunks; // in the odd case, the last chunk will fill in the rest
160    let mut remainder = total_rows % chunks;
161
162    if remainder > 0 {
163        rows_per_chunk += 1;
164    }
165
166    if chunks > total_rows {
167        return Err(DfKitError::CustomError(
168            "Chunks must be smaller than total rows".into(),
169        ));
170    }
171
172    fs::create_dir_all(output_dir)?;
173
174    let stem = filename.file_stem().unwrap().to_string_lossy();
175    let extension = filename.extension().unwrap_or_default().to_string_lossy();
176    let format = file_type(filename)?;
177
178    for i in 0..chunks {
179        if remainder > 0 && i >= remainder {
180            rows_per_chunk -= 1;
181            remainder = 0;
182        }
183        let offset = i * rows_per_chunk;
184        let chunk_df = df.clone().limit(offset, Some(rows_per_chunk))?;
185
186        let chunk_filename = format!("{}_{}.{}", stem, i + 1, extension);
187        let chunk_path = output_dir.join(chunk_filename);
188
189        write_output(chunk_df, &chunk_path, &format).await?;
190
191        println!("Written chunk {} to {}", i + 1, chunk_path.display());
192    }
193
194    Ok(())
195}
196
197pub async fn cat(
198    ctx: &SessionContext,
199    files: Vec<PathBuf>,
200    out_path: &Path,
201) -> Result<(), DfKitError> {
202    let mut dfs = vec![];
203
204    for (i, file) in files.iter().enumerate() {
205        let table_name = format!("t_{}", i);
206        let df = register_table(ctx, &table_name, file).await?;
207        dfs.push(df);
208    }
209
210    let mut final_df = dfs.remove(0);
211    for df in dfs {
212        final_df = final_df.union(df)?;
213    }
214
215    let format = file_type(&out_path)?;
216    write_output(final_df, out_path, &format).await?;
217    println!("Concatenated file written to: {}", out_path.display());
218
219    Ok(())
220}
221
222pub async fn dedup(
223    ctx: &SessionContext,
224    filename: &Path,
225    output: Option<PathBuf>,
226) -> Result<(), DfKitError> {
227    let _ = register_table(&ctx, "t", &filename).await?;
228    let df = ctx.sql("SELECT DISTINCT * FROM t").await?;
229
230    if let Some(out_path) = output {
231        let file_type = file_type(&filename)?;
232        write_output(df, &out_path, &file_type).await?;
233        println!("Deduplicated file written to: {}", out_path.display());
234    } else {
235        df.show().await?;
236    }
237
238    Ok(())
239}