1use actix_web::{web::{self}, App, Either, HttpResponse, HttpServer, Responder, Result};
2use futures::StreamExt;
3use actix_web::body::BoxBody;
4use csv::Reader;
5use actix_files::Files;
6use std::fs::File;
7use std::{collections::HashMap, env::var};
8use walkdir::WalkDir;
9use actix_web::middleware::Logger;
10use serde::Deserialize;
11use tempfile::TempDir;
12use uuid::Uuid;
13use serde_json::{json, Value};
14use std::path::PathBuf;
15use actix_multipart::form::tempfile::TempFile;
16use actix_multipart::form::MultipartForm;
17use std::io::Read;
18use libflatterer::{flatten, Options};
19
20#[derive(Debug, MultipartForm)]
21struct UploadForm {
22 #[multipart(limit = "100MB")]
23 file: Option<TempFile>,
24 #[multipart(limit = "100MB")]
25 fields: Option<TempFile>,
26 #[multipart(limit = "100MB")]
27 tables: Option<TempFile>,
28}
29
30#[derive(Deserialize, Debug, Clone)]
31struct Query {
32 id: Option<String>,
33 output_format: Option<String>,
34 file_url: Option<String>,
35 array_key: Option<String>,
36 json_lines: Option<bool>,
37 main_table_name: Option<String>,
38 inline_one_to_one: Option<bool>,
39 json_schema: Option<String>,
40 table_prefix: Option<String>,
41 path_seperator: Option<String>,
42 schema_titles: Option<String>,
43 fields_only: Option<bool>,
44 tables_only: Option<bool>,
45 pushdown: Option<String>,
46}
47
48fn run_flatterer(
49 query: Query,
50 download_path: PathBuf,
51 output_path: PathBuf,
52 json_lines: bool,
53 path: String,
54) -> Result<()> {
55 let file = std::fs::File::open(download_path.join("download.json"))?;
56 let reader = std::io::BufReader::new(file);
57
58 let output_format = query.output_format.unwrap_or_else(|| "zip".to_string());
59
60 let mut options = Options::builder().build();
61
62 if output_format != "zip" {
63 options.csv = false;
64 options.xlsx = false;
65 options.sqlite = false;
66 }
67
68 if output_format == "xlsx" {
69 options.xlsx = true;
70 }
71 if output_format == "csv" {
72 options.csv = true;
73 }
74 if output_format == "sqlite" {
75 options.sqlite = true;
76 }
77 if output_format == "preview" {
78 options.csv = true;
79 options.preview = 10;
80 }
81 options.force = true;
82 options.main_table_name = query.main_table_name.unwrap_or_else(|| "main".to_string());
83
84 options.inline_one_to_one = query.inline_one_to_one.unwrap_or(false);
85
86 options.schema = query.json_schema.unwrap_or_else(|| "".to_string());
87
88 options.table_prefix = query.table_prefix.unwrap_or_else(|| "".to_string());
89 options.path_separator = query.path_seperator.unwrap_or_else(|| "_".to_string());
90 options.schema_titles = query.schema_titles.unwrap_or_else(|| "".to_string());
91 options.json_stream = json_lines;
92
93 let fields_path = download_path.join("fields.csv");
94 if fields_path.exists() {
95 options.fields_csv = fields_path.to_string_lossy().into();
96 }
97 options.only_fields = query.fields_only.unwrap_or_else(|| false);
98
99 let tables_path = download_path.join("tables.csv");
100 if tables_path.exists() {
101 options.tables_csv = tables_path.to_string_lossy().into();
102 }
103 options.only_tables = query.tables_only.unwrap_or_else(|| false);
104
105 let pushdown = query.pushdown.unwrap_or_else(|| "".into());
106 if !pushdown.is_empty() {
107 options.pushdown = vec![pushdown];
108 }
109
110 let mut path_vec = vec![];
111
112 if !path.is_empty() && !json_lines {
113 path_vec.push(path);
114 }
115 options.path = path_vec;
116
117 flatten(
118 Box::new(reader),
119 output_path.to_string_lossy().to_string(),
120 options
121 ).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
122 Ok(())
123}
124
125async fn download(url_string: String, tmp_dir: PathBuf) -> eyre::Result<()> {
126
127 if !url_string.starts_with("http") {
128 return Err(eyre::eyre!("`url` is empty or does not start with `http`"))
130 }
131 let download_file = tmp_dir.join("download.json");
132
133 let mut file = tokio::fs::File::create(&download_file).await.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
134
135 let mut stream = reqwest::get(&url_string).await.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?.bytes_stream();
136
137 while let Some(item) = stream.next().await {
138 tokio::io::copy(&mut item?.as_ref(), &mut file).await?;
139 }
140
141 Ok(())
142}
143
144fn fields_output(output_path: PathBuf) -> csv::Result<Vec<HashMap<String, String>>> {
145 let mut csv_reader = Reader::from_path(output_path.join("fields.csv"))?;
146
147 let mut all_fields = vec![];
148
149 for result in csv_reader.deserialize() {
150 let record: HashMap<String, String> = result?;
151 all_fields.push(record)
152 }
153 Ok(all_fields)
154}
155
156async fn preview_output(output_path: PathBuf, fields: Vec<HashMap<String, String>>) -> csv::Result<Value> {
157 let mut previews = vec![];
158
159 let mut tables_reader = Reader::from_path(output_path.join("tables.csv"))?;
160
161 for row in tables_reader.deserialize() {
162 let table_row: HashMap<String, String> = row?;
163 let table = table_row.get("table_name").unwrap().clone();
164 let table_title = table_row.get("table_title").unwrap().clone();
165
166 let path = output_path.join("csv").join(format!("{}.csv", table_title));
167
168 let mut table_fields = vec![];
169
170 for field in fields.iter() {
171 if field.get("table_name").unwrap() == &table {
172 table_fields.push(field.clone());
173 }
174 }
175
176 let mut reader = Reader::from_path(path)?;
177 for (row_num, row) in reader.deserialize().enumerate() {
178 let row: Vec<String> = row?;
179 for (col_num, item) in row.iter().enumerate(){
180 table_fields[col_num].insert(format!("row {}", row_num), item.clone());
181 }
182 }
183
184 let preview = json!({"table_name": table_title, "fields": table_fields});
185
186 previews.push(preview);
187 }
188 Ok(serde_json::to_value(previews).expect("should not have issue converting to value"))
189}
190
191fn zip_output(output_path: PathBuf, tmp_dir_path: PathBuf) -> Result<()> {
192 let zip_file = tmp_dir_path.join("export.zip");
193
194 let file = File::create(&zip_file)?;
195 let mut zip = zip::ZipWriter::new(file);
196
197 let options = zip::write::FileOptions::default();
198
199 for entry in WalkDir::new(output_path.clone())
200 .min_depth(1)
201 .into_iter()
202 .filter_map(|e| e.ok())
203 {
204 let path = entry.path();
205
206 if path.is_dir() {
207 zip.add_directory(
208 path.strip_prefix(output_path.clone()).expect("known to be a directory").to_string_lossy(),
209 options,
210 ).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
211 } else {
212 zip.start_file(
213 path.strip_prefix(output_path.clone()).expect("known to be a file").to_string_lossy(),
214 options,
215 ).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
216 let mut file = File::open(path)?;
217 std::io::copy(&mut file, &mut zip)?;
218 }
219 }
220 Ok(())
221}
222
223
224fn internal_error_json(error: String) -> HttpResponse<BoxBody> {
225 HttpResponse::InternalServerError().body(json!({"error": error}).to_string())
226}
227
228fn bad_request_json(error_json: Value) -> HttpResponse<BoxBody> {
229 HttpResponse::BadRequest().body(error_json.to_string())
230}
231
232async fn convert(query: web::Query<Query>) -> Either<HttpResponse<BoxBody>, impl Responder> {
233 process(query, None).await
234}
235
236async fn get_input(query: web::Query<Query>, MultipartForm(form): MultipartForm<UploadForm>) -> Either<HttpResponse<BoxBody>, impl Responder> {
237 process(query, Some(form)).await
238}
239
240
241async fn process(query: web::Query<Query>, upload_form: Option<UploadForm>) -> Either<HttpResponse<BoxBody>, impl Responder> {
242 let tmp_dir = TempDir::new();
243 if let Err(e) = tmp_dir {
244 return Either::Left(internal_error_json(format!("Error creating temp dir: {:?}", e)));
245 }
246 let tmp_dir_path = tmp_dir.expect("just checked").path().to_owned();
247
248 let output_path = tmp_dir_path.join("output");
249
250 let mut json_output;
251
252 if let Some(id) = &query.id {
253 json_output = json!({ "id": id });
254 } else {
255 let mut uploaded_files = vec![];
256 let clean_tmp_result = clean_tmp();
257 if let Err(e) = clean_tmp_result {
258 return Either::Left(internal_error_json(format!("Error cleaning tmp dir: {:?}", e)));
259 }
260 let uuid = Uuid::new_v4().hyphenated();
261 let tmp_dir = std::env::temp_dir().join(format!("flatterer-{}", uuid));
262 json_output = json!({ "id": uuid.to_string() });
263 let create_dir_result = std::fs::create_dir(&tmp_dir);
264 if let Err(e) = create_dir_result {
265 return Either::Left(internal_error_json(format!("Error creating tmp dir: {:?}", e)));
266 }
267
268 if let Some(form) = upload_form {
269 if form.file.is_some() {
270 let file = form.file.unwrap();
271 let file_parsist_result = file.file.persist(tmp_dir.join("download.json"));
272 if let Err(e) = file_parsist_result {
273 return Either::Left(internal_error_json(format!("Error persisting file: {:?}", e)));
274 }
275 uploaded_files.push("file".to_string());
276 }
277
278 if form.fields.is_some() {
279 let fields = form.fields.unwrap();
280 let file_parsist_result = fields.file.persist(tmp_dir.join("fields.csv"));
281 if let Err(e) = file_parsist_result {
282 return Either::Left(internal_error_json(format!("Error persisting file: {:?}", e)));
283 }
284 uploaded_files.push("fields".to_string());
285 }
286
287 if form.tables.is_some() {
288 let tables = form.tables.unwrap();
289 let file_parsist_result = tables.file.persist(tmp_dir.join("tables.csv"));
290 if let Err(e) = file_parsist_result {
291 return Either::Left(internal_error_json(format!("Error persisting file: {:?}", e)));
292 }
293 uploaded_files.push("tables".to_string());
294 }
295
296 if let Some(file_url) = &query.file_url {
297 if let Err(error) = download(file_url.clone(), tmp_dir).await {
298 json_output = json!({"error": error.to_string()})
299 }
300 uploaded_files.push("file".to_string());
301 }
302 }
303
304 if !uploaded_files.contains(&"file".to_string()) {
305 json_output = json!({"error": "need to supply either an id or filename or supply data in request body"});
306 }
307 }
308
309 let mut download_path = std::env::temp_dir();
310 let mut download_file = std::env::temp_dir();
311 let mut id = "".to_string();
312
313 if let Some(id_value) = json_output.get("id") {
314 if let Some(id_string) = id_value.as_str() {
315 id = id_string.to_string();
316 download_path.push(format!("flatterer-{}", id_string));
317 download_file.push(format!("flatterer-{}", id_string));
318 download_file.push("download.json");
319 if !download_file.exists() {
320 json_output = json!({"error": "id does not exist, you may need to ask you file to be downloaded again or to upload the file again."})
321 }
322 }
323 }
324
325 if json_output.get("error").is_some() {
326 return Either::Left(bad_request_json(json_output))
328 }
329
330 let file_result = File::open(download_file);
331 if file_result.is_err() {
332 return Either::Left(internal_error_json(format!("Error opening file: {:?}", file_result.err().unwrap())));
333 }
334 let mut file = file_result.unwrap();
335
336 let mut buf = vec![0;10240];
337 let read_result = file.read(&mut buf);
338 if read_result.is_err() {
339 return Either::Left(internal_error_json(format!("Error reading file: {:?}", read_result.err().unwrap())));
340 }
341 let n = read_result.unwrap();
342 let start = String::from_utf8_lossy(&buf[..n]);
343
344 let mut path = "".to_string();
345
346 if let Some(array_key) = &query.array_key {
347 path = array_key.to_owned();
348 };
349
350 let mut json_lines = query.json_lines.unwrap_or(false);
351
352 let mut guess_text = "".to_string();
353
354 if path.is_empty() && !json_lines {
355 match libflatterer::guess_array(&start) {
356 Ok((guess, _)) => {
357 if guess == "stream" {
358 json_lines = true;
359 guess_text = "JSON Stream".to_string()
360 };
361 }
362 Err(err) => {
363 let output = json!({"id": id, "error": err.to_string(), "start": start});
364 return Either::Left(bad_request_json(output))
365 }
366 }
367 }
368
369 let output_path_copy = output_path.clone();
370 let query_copy = query.clone().into_inner();
371
372 if let Err(err) = run_flatterer(query_copy, download_path, output_path_copy, json_lines, path) {
373 let output = json!({"id": id, "error": err.to_string(), "start": start});
374 return Either::Left(bad_request_json(output))
375 }
376
377 let tmp_dir_path_to_move = tmp_dir_path.to_path_buf();
378
379 let output_format = query.output_format.clone().unwrap_or_else(|| "zip".to_string());
380
381 if output_format == "fields" {
382 return Either::Right(actix_files::NamedFile::open_async(output_path.join("fields.csv")).await);
383 }
384
385 if output_format == "tables" {
386 return Either::Right(actix_files::NamedFile::open_async(output_path.join("tables.csv")).await);
387 }
388
389 if output_format == "preview" {
390 let fields_value_result = fields_output(output_path.clone());
391 if let Err(e) = fields_value_result {
392 return Either::Left(internal_error_json(format!("Error reading fields.csv: {:?}", e)));
393 }
394 let fields_value = fields_value_result.unwrap();
395
396 let preview_value_result = preview_output(output_path.clone(), fields_value).await;
397 if let Err(e) = preview_value_result {
398 return Either::Left(internal_error_json(format!("Error creating preview: {:?}", e)));
399 }
400
401 let preview_value = preview_value_result.expect("just checked");
402 let output = json!({"id": id, "preview": preview_value, "start": start, "guess_text": guess_text});
403
404 return Either::Left(HttpResponse::Ok().body(output.to_string()));
405
406 }
407
408 if output_format == "xlsx" {
409 return Either::Right(actix_files::NamedFile::open_async(output_path.join("output.xlsx")).await);
410 }
411
412 if output_format == "sqlite" {
413 return Either::Right(actix_files::NamedFile::open_async(output_path.join("sqlite.db")).await);
414 }
415
416 if output_format == "csv" {
417 let main_table_name = query.main_table_name.clone().unwrap_or_else(|| "main".to_string());
418 return Either::Right(actix_files::NamedFile::open_async(output_path.join("csv").join(format!("{}.csv", main_table_name))).await);
419 }
420
421 let zip_output_result = zip_output(output_path.clone(), tmp_dir_path_to_move.to_path_buf());
422
423 if let Err(e) = zip_output_result {
424 return Either::Left(internal_error_json(format!("Error zipping output: {:?}", e)));
425 }
426
427 return Either::Right(actix_files::NamedFile::open_async( tmp_dir_path.join("export.zip")).await);
428}
429
430
431fn clean_tmp() -> std::io::Result<()> {
432
433 let clean_tmp_time = if let Ok(clean_tmp_time) = var("CLEAN_TMP_TIME") {
434 match clean_tmp_time.parse::<u64>() {
435 Ok(clean_tmp_time) => {clean_tmp_time},
436 _ => {3600}
437 }
438 } else {
439 3600
440 };
441
442
443 for entry in WalkDir::new("/tmp/")
444 .min_depth(1)
445 .into_iter()
446 .filter_map(|e| e.ok())
447 {
448 if !entry
449 .file_name()
450 .to_string_lossy()
451 .starts_with("flatterer-")
452 {
453 continue;
454 }
455 if entry.metadata()?.modified()?.elapsed().map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "elapsed time not able to be calculated"))?.as_secs() > clean_tmp_time {
456 log::debug!("Removing tmp dir: {:?}", entry);
457
458 if entry.metadata()?.is_dir() {
459 std::fs::remove_dir_all(&entry.into_path())?;
460 }
461 }
462 }
463 Ok(())
464}
465
466
467#[actix_web::main]
468pub async fn main() -> std::io::Result<()> {
469 env_logger::init();
470 clean_tmp()?;
471
472 let port = if let Ok(port) = var("PORT") {
473 port
474 } else {
475 "8080".to_string()
476 };
477
478 let host = if let Ok(host) = var("HOST") {
479 host
480 } else {
481 "127.0.0.1".to_string()
482 };
483
484
485
486 let open_browser = if let Ok(_) = var("OPEN_BROWSER") {
487 true
488 } else {
489 false
490 };
491
492 let path = format!("http://{}:{}", host, port);
493
494 if open_browser {
495 match open::that(&path) {
496 Ok(()) => println!("Opened browser '{}' successfully.", path),
497 Err(err) => eprintln!("An error occurred when opening browser'{}': {}", path, err),
498 }
499 } else {
500 println!("Running at '{path}'.")
501 }
502
503 let static_files = if let Ok(static_files) = var("STATIC_FILES") {
504 if let Some(static_files) = static_files.strip_suffix("/") {
505 static_files.to_owned()
506 } else {
507 static_files
508 }
509 } else {
510 "dist".to_owned()
511 };
512
513 let port: u16 = port.parse().expect("PORT must be a valid u16 integer");
514
515 HttpServer::new(move || {
516 App::new()
517 .wrap(Logger::default())
518 .service(
519 web::resource("/api/get_input")
520 .route(web::post().to(get_input))
521 .route(web::get().to(get_input))
522 .route(web::put().to(get_input))
523 )
524 .service(
525 web::resource("/api/convert")
526 .route(web::post().to(convert))
527 .route(web::get().to(convert))
528 .route(web::put().to(convert))
529 )
530 .service(Files::new("/", static_files.clone()).index_file("index.html"))
531 })
532 .bind((host, port))?
533 .run()
534 .await
535}