1use crate::utils::{DfKitError, file_type, register_table, write_output};
2use datafusion::arrow::compute::concat_batches;
3use datafusion::datasource::MemTable;
4use datafusion::logical_expr::col;
5use datafusion::prelude::SessionContext;
6use std::fs;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10pub async fn view(
11 ctx: &SessionContext,
12 filename: &Path,
13 limit: Option<usize>,
14) -> Result<(), DfKitError> {
15 let df = register_table(&ctx, "t", &filename).await?;
16 let limit = limit.unwrap_or(10);
17
18 if limit > 0 {
19 df.show_limit(limit).await?;
20 } else {
21 df.show().await?;
22 }
23
24 Ok(())
25}
26
27pub async fn query(
28 ctx: &SessionContext,
29 filename: &Path,
30 sql: Option<String>,
31 output: Option<PathBuf>,
32) -> Result<(), DfKitError> {
33 let file_type = file_type(&filename)?;
34 let _ = register_table(&ctx, "t", &filename).await?;
35 let df_sql = ctx.sql(&*sql.unwrap()).await?;
36
37 if let Some(path) = output {
38 write_output(df_sql, &path, &file_type).await?;
39 println!("File written to: {}, successfully.", path.display());
40 } else {
41 df_sql.show().await?;
42 }
43
44 Ok(())
45}
46
47pub async fn convert(
48 ctx: &SessionContext,
49 filename: &Path,
50 output_filename: &Path,
51) -> Result<(), DfKitError> {
52 let df = register_table(ctx, "t", &filename).await?;
53 let output_file_type = file_type(&output_filename)?;
54
55 write_output(df, &output_filename, &output_file_type).await?;
56 Ok(())
57}
58
59pub async fn describe(ctx: &SessionContext, filename: &Path) -> Result<(), DfKitError> {
60 let df = register_table(&ctx, "t", &filename).await?;
61 let describe = df.describe().await?;
62 describe.show().await?;
63 Ok(())
64}
65
66pub async fn schema(ctx: &SessionContext, filename: &Path) -> Result<(), DfKitError> {
67 let _ = register_table(&ctx, "t", &filename).await?;
68 let sql = "SELECT column_name, data_type, is_nullable \
69 FROM information_schema.columns WHERE table_name = 't'";
70 let df = ctx.sql(sql).await?;
71 df.show().await?;
72 Ok(())
73}
74
75pub async fn count(ctx: &SessionContext, filename: &Path) -> Result<(), DfKitError> {
76 let _ = register_table(&ctx, "t", &filename).await?;
77 let sql = "SELECT COUNT(*) FROM t";
78 let df = ctx.sql(&sql).await?;
79 df.show().await?;
80
81 Ok(())
82}
83
84pub async fn sort(
85 ctx: &SessionContext,
86 filename: &Path,
87 columns: &[String],
88 descending: bool,
89 output: Option<PathBuf>,
90) -> Result<(), DfKitError> {
91 let df = register_table(ctx, "t", filename).await?;
92
93 let sort_exprs = columns
94 .iter()
95 .map(|col_name| col(col_name).sort(!descending, descending))
96 .collect();
97
98 let sorted_df = df.sort(sort_exprs)?;
99
100 if let Some(out_path) = output {
101 let format = file_type(&out_path)?;
102 write_output(sorted_df, &out_path, &format).await?;
103 println!("Sorted file written to: {}", out_path.display());
104 } else {
105 sorted_df.show().await?;
106 }
107
108 Ok(())
109}
110
111pub async fn reverse(
112 ctx: &SessionContext,
113 filename: &Path,
114 output: Option<PathBuf>,
115) -> Result<(), DfKitError> {
116 let df = register_table(ctx, "t", filename).await?;
117 let batches = df.collect().await?;
118
119 let schema = batches[0].schema();
120 let mut all_rows = vec![];
121 for batch in &batches {
122 for row in 0..batch.num_rows() {
123 all_rows.push(batch.slice(row, 1));
124 }
125 }
126
127 all_rows.reverse();
128
129 let reversed_batch = concat_batches(&schema, &all_rows)?;
130
131 let provider = MemTable::try_new(schema, vec![vec![reversed_batch]])?;
132 ctx.register_table("reversed", Arc::new(provider))?;
133 let reversed_df = ctx.table("reversed").await?;
134
135 if let Some(out_path) = output {
136 let format = file_type(&out_path)?;
137 write_output(reversed_df, &out_path, &format).await?;
138 println!("Reversed file written to: {}", out_path.display());
139 } else {
140 reversed_df.show().await?;
141 }
142
143 Ok(())
144}
145
146pub async fn dfsplit(
147 ctx: &SessionContext,
148 filename: &Path,
149 chunks: usize,
150 output_dir: &Path,
151) -> Result<(), DfKitError> {
152 if chunks == 0 {
153 return Err(DfKitError::CustomError(
154 "Chunks must be greater than 0".into(),
155 ));
156 }
157 let df = register_table(ctx, "t", filename).await?;
158 let total_rows = df.clone().count().await?;
159 let mut rows_per_chunk = total_rows / chunks; let mut remainder = total_rows % chunks;
161
162 if remainder > 0 {
163 rows_per_chunk += 1;
164 }
165
166 if chunks > total_rows {
167 return Err(DfKitError::CustomError(
168 "Chunks must be smaller than total rows".into(),
169 ));
170 }
171
172 fs::create_dir_all(output_dir)?;
173
174 let stem = filename.file_stem().unwrap().to_string_lossy();
175 let extension = filename.extension().unwrap_or_default().to_string_lossy();
176 let format = file_type(filename)?;
177
178 for i in 0..chunks {
179 if remainder > 0 && i >= remainder {
180 rows_per_chunk -= 1;
181 remainder = 0;
182 }
183 let offset = i * rows_per_chunk;
184 let chunk_df = df.clone().limit(offset, Some(rows_per_chunk))?;
185
186 let chunk_filename = format!("{}_{}.{}", stem, i + 1, extension);
187 let chunk_path = output_dir.join(chunk_filename);
188
189 write_output(chunk_df, &chunk_path, &format).await?;
190
191 println!("Written chunk {} to {}", i + 1, chunk_path.display());
192 }
193
194 Ok(())
195}
196
197pub async fn cat(
198 ctx: &SessionContext,
199 files: Vec<PathBuf>,
200 out_path: &Path,
201) -> Result<(), DfKitError> {
202 let mut dfs = vec![];
203
204 for (i, file) in files.iter().enumerate() {
205 let table_name = format!("t_{}", i);
206 let df = register_table(ctx, &table_name, file).await?;
207 dfs.push(df);
208 }
209
210 let mut final_df = dfs.remove(0);
211 for df in dfs {
212 final_df = final_df.union(df)?;
213 }
214
215 let format = file_type(&out_path)?;
216 write_output(final_df, out_path, &format).await?;
217 println!("Concatenated file written to: {}", out_path.display());
218
219 Ok(())
220}
221
222pub async fn dedup(
223 ctx: &SessionContext,
224 filename: &Path,
225 output: Option<PathBuf>,
226) -> Result<(), DfKitError> {
227 let _ = register_table(&ctx, "t", &filename).await?;
228 let df = ctx.sql("SELECT DISTINCT * FROM t").await?;
229
230 if let Some(out_path) = output {
231 let file_type = file_type(&filename)?;
232 write_output(df, &out_path, &file_type).await?;
233 println!("Deduplicated file written to: {}", out_path.display());
234 } else {
235 df.show().await?;
236 }
237
238 Ok(())
239}