flatterer_web/
lib.rs

1use actix_web::{web::{self}, App, Either, HttpResponse, HttpServer, Responder, Result};
2use futures::StreamExt;
3use actix_web::body::BoxBody;
4use csv::Reader;
5use actix_files::Files;
6use std::fs::File;
7use std::{collections::HashMap, env::var};
8use walkdir::WalkDir;
9use actix_web::middleware::Logger;
10use serde::Deserialize;
11use tempfile::TempDir;
12use uuid::Uuid;
13use serde_json::{json, Value};
14use std::path::PathBuf;
15use actix_multipart::form::tempfile::TempFile;
16use actix_multipart::form::MultipartForm;
17use std::io::Read;
18use libflatterer::{flatten, Options};
19
20#[derive(Debug, MultipartForm)]
21struct UploadForm {
22    #[multipart(limit = "100MB")]
23    file: Option<TempFile>,
24    #[multipart(limit = "100MB")]
25    fields: Option<TempFile>,
26    #[multipart(limit = "100MB")]
27    tables: Option<TempFile>,
28}
29
30#[derive(Deserialize, Debug, Clone)]
31struct Query {
32    id: Option<String>,
33    output_format: Option<String>,
34    file_url: Option<String>,
35    array_key: Option<String>,
36    json_lines: Option<bool>,
37    main_table_name: Option<String>,
38    inline_one_to_one: Option<bool>,
39    json_schema: Option<String>,
40    table_prefix: Option<String>,
41    path_seperator: Option<String>,
42    schema_titles: Option<String>,
43    fields_only: Option<bool>,
44    tables_only: Option<bool>,
45    pushdown: Option<String>,
46}
47
48fn run_flatterer(
49    query: Query,
50    download_path: PathBuf,
51    output_path: PathBuf,
52    json_lines: bool,
53    path: String,
54) -> Result<()> {
55    let file = std::fs::File::open(download_path.join("download.json"))?;
56    let reader = std::io::BufReader::new(file);
57
58    let output_format = query.output_format.unwrap_or_else(|| "zip".to_string());
59
60    let mut options = Options::builder().build();
61
62    if output_format != "zip" {
63        options.csv = false;
64        options.xlsx = false;
65        options.sqlite = false;
66    }
67
68    if output_format == "xlsx" {
69        options.xlsx = true;
70    }
71    if output_format == "csv" {
72        options.csv = true;
73    }
74    if output_format == "sqlite" {
75        options.sqlite = true;
76    }
77    if output_format == "preview" {
78        options.csv = true;
79        options.preview = 10;
80    }
81    options.force = true;
82    options.main_table_name = query.main_table_name.unwrap_or_else(|| "main".to_string());
83
84    options.inline_one_to_one = query.inline_one_to_one.unwrap_or(false);
85
86    options.schema = query.json_schema.unwrap_or_else(|| "".to_string());
87
88    options.table_prefix = query.table_prefix.unwrap_or_else(|| "".to_string());
89    options.path_separator = query.path_seperator.unwrap_or_else(|| "_".to_string());
90    options.schema_titles = query.schema_titles.unwrap_or_else(|| "".to_string());
91    options.json_stream = json_lines;
92
93    let fields_path = download_path.join("fields.csv");
94    if fields_path.exists() {
95        options.fields_csv = fields_path.to_string_lossy().into();
96    }
97    options.only_fields = query.fields_only.unwrap_or_else(|| false);
98
99    let tables_path = download_path.join("tables.csv");
100    if tables_path.exists() {
101        options.tables_csv = tables_path.to_string_lossy().into();
102    }
103    options.only_tables = query.tables_only.unwrap_or_else(|| false);
104
105    let pushdown = query.pushdown.unwrap_or_else(|| "".into());
106    if !pushdown.is_empty() {
107        options.pushdown = vec![pushdown];
108    }
109
110    let mut path_vec = vec![];
111
112    if !path.is_empty() && !json_lines {
113        path_vec.push(path);
114    }
115    options.path = path_vec;
116
117    flatten(
118        Box::new(reader),
119        output_path.to_string_lossy().to_string(),
120        options
121    ).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
122    Ok(())
123}
124
125async fn download(url_string: String, tmp_dir: PathBuf) -> eyre::Result<()> {
126
127    if !url_string.starts_with("http") {
128        // return Err(tide::Error::from_str(tide::StatusCode::BadRequest, "`url` is empty or does not start with `http`"))
129        return Err(eyre::eyre!("`url` is empty or does not start with `http`"))
130    }
131    let download_file = tmp_dir.join("download.json");
132
133    let mut file = tokio::fs::File::create(&download_file).await.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
134
135    let mut stream = reqwest::get(&url_string).await.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?.bytes_stream();
136
137    while let Some(item) = stream.next().await {
138        tokio::io::copy(&mut item?.as_ref(), &mut file).await?;
139    }
140
141    Ok(())
142}
143
144fn fields_output(output_path: PathBuf) -> csv::Result<Vec<HashMap<String, String>>> {
145    let mut csv_reader = Reader::from_path(output_path.join("fields.csv"))?;
146
147    let mut all_fields = vec![];
148
149    for result in csv_reader.deserialize() {
150        let record: HashMap<String, String> = result?;
151        all_fields.push(record)
152    }
153    Ok(all_fields)
154}
155
156async fn preview_output(output_path: PathBuf, fields: Vec<HashMap<String, String>>) -> csv::Result<Value> {
157    let mut previews = vec![];
158
159    let mut tables_reader = Reader::from_path(output_path.join("tables.csv"))?;
160
161    for row in tables_reader.deserialize() {
162        let table_row: HashMap<String, String> = row?;
163        let table = table_row.get("table_name").unwrap().clone();
164        let table_title = table_row.get("table_title").unwrap().clone();
165
166        let path = output_path.join("csv").join(format!("{}.csv", table_title));
167
168        let mut table_fields = vec![];
169
170        for field in fields.iter() {
171            if field.get("table_name").unwrap() == &table {
172                table_fields.push(field.clone());
173            }
174        }
175
176        let mut reader = Reader::from_path(path)?;
177        for (row_num, row) in reader.deserialize().enumerate() {
178            let row: Vec<String> = row?;
179            for (col_num, item) in row.iter().enumerate(){
180                table_fields[col_num].insert(format!("row {}", row_num), item.clone());
181            }
182        }
183
184        let preview = json!({"table_name": table_title, "fields": table_fields});
185
186        previews.push(preview);
187    }
188    Ok(serde_json::to_value(previews).expect("should not have issue converting to value"))
189}
190
191fn zip_output(output_path: PathBuf, tmp_dir_path: PathBuf) -> Result<()> {
192    let zip_file = tmp_dir_path.join("export.zip");
193
194    let file = File::create(&zip_file)?;
195    let mut zip = zip::ZipWriter::new(file);
196
197    let options = zip::write::FileOptions::default();
198
199    for entry in WalkDir::new(output_path.clone())
200        .min_depth(1)
201        .into_iter()
202        .filter_map(|e| e.ok())
203    {
204        let path = entry.path();
205
206        if path.is_dir() {
207            zip.add_directory(
208                path.strip_prefix(output_path.clone()).expect("known to be a directory").to_string_lossy(),
209                options,
210            ).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
211        } else {
212            zip.start_file(
213                path.strip_prefix(output_path.clone()).expect("known to be a file").to_string_lossy(),
214                options,
215            ).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
216            let mut file = File::open(path)?;
217            std::io::copy(&mut file, &mut zip)?;
218        }
219    }
220    Ok(())
221}
222
223
224fn internal_error_json(error: String) -> HttpResponse<BoxBody> {
225    HttpResponse::InternalServerError().body(json!({"error": error}).to_string())
226}
227
228fn bad_request_json(error_json: Value) -> HttpResponse<BoxBody> {
229    HttpResponse::BadRequest().body(error_json.to_string())
230}
231
232async fn convert(query: web::Query<Query>) -> Either<HttpResponse<BoxBody>, impl Responder> {
233    process(query, None).await
234}
235
236async fn get_input(query: web::Query<Query>, MultipartForm(form): MultipartForm<UploadForm>) -> Either<HttpResponse<BoxBody>, impl Responder> {
237    process(query, Some(form)).await
238}
239
240
241async fn process(query: web::Query<Query>, upload_form: Option<UploadForm>) -> Either<HttpResponse<BoxBody>, impl Responder> {
242    let tmp_dir = TempDir::new();
243    if let Err(e) = tmp_dir {
244        return Either::Left(internal_error_json(format!("Error creating temp dir: {:?}", e)));
245    } 
246    let tmp_dir_path = tmp_dir.expect("just checked").path().to_owned();
247
248    let output_path = tmp_dir_path.join("output");
249
250    let mut json_output;
251
252    if let Some(id) = &query.id {
253        json_output = json!({ "id": id });
254    } else {  
255        let mut uploaded_files = vec![];
256        let clean_tmp_result = clean_tmp();
257        if let Err(e) = clean_tmp_result {
258            return Either::Left(internal_error_json(format!("Error cleaning tmp dir: {:?}", e)));
259        }
260        let uuid = Uuid::new_v4().hyphenated();
261        let tmp_dir = std::env::temp_dir().join(format!("flatterer-{}", uuid));
262        json_output = json!({ "id": uuid.to_string() });
263        let create_dir_result = std::fs::create_dir(&tmp_dir);
264        if let Err(e) = create_dir_result {
265            return Either::Left(internal_error_json(format!("Error creating tmp dir: {:?}", e)));
266        }
267
268        if let Some(form) = upload_form {
269            if form.file.is_some() {
270                let file = form.file.unwrap();
271                let file_parsist_result = file.file.persist(tmp_dir.join("download.json"));
272                if let Err(e) = file_parsist_result {
273                    return Either::Left(internal_error_json(format!("Error persisting file: {:?}", e)));
274                }
275                uploaded_files.push("file".to_string());
276            }
277
278            if form.fields.is_some() {
279                let fields = form.fields.unwrap();
280                let file_parsist_result = fields.file.persist(tmp_dir.join("fields.csv"));
281                if let Err(e) = file_parsist_result {
282                    return Either::Left(internal_error_json(format!("Error persisting file: {:?}", e)));
283                }
284                uploaded_files.push("fields".to_string());
285            }
286
287            if form.tables.is_some() {
288                let tables = form.tables.unwrap();
289                let file_parsist_result = tables.file.persist(tmp_dir.join("tables.csv"));
290                if let Err(e) = file_parsist_result {
291                    return Either::Left(internal_error_json(format!("Error persisting file: {:?}", e)));
292                }
293                uploaded_files.push("tables".to_string());
294            }
295
296            if let Some(file_url) = &query.file_url {
297                if let Err(error) = download(file_url.clone(), tmp_dir).await {
298                    json_output = json!({"error": error.to_string()})
299                }
300                uploaded_files.push("file".to_string());
301            }
302        }
303
304        if !uploaded_files.contains(&"file".to_string()) {
305            json_output = json!({"error": "need to supply either an id or filename or supply data in request body"});
306        }
307    }
308
309    let mut download_path = std::env::temp_dir();
310    let mut download_file = std::env::temp_dir();
311    let mut id = "".to_string();
312
313    if let Some(id_value) = json_output.get("id") {
314        if let Some(id_string) = id_value.as_str() {
315            id = id_string.to_string();
316            download_path.push(format!("flatterer-{}", id_string));
317            download_file.push(format!("flatterer-{}", id_string));
318            download_file.push("download.json");
319            if !download_file.exists() {
320                json_output = json!({"error": "id does not exist, you may need to ask you file to be downloaded again or to upload the file again."})
321            }
322        }
323    }
324
325    if json_output.get("error").is_some() {
326        // return Either::Left(generate_json_error(format!("Error creating temp dir")));
327        return Either::Left(bad_request_json(json_output))
328    }
329
330    let file_result = File::open(download_file);
331    if file_result.is_err() {
332        return Either::Left(internal_error_json(format!("Error opening file: {:?}", file_result.err().unwrap())));
333    }
334    let mut file = file_result.unwrap();
335
336    let mut buf = vec![0;10240];
337    let read_result = file.read(&mut buf);
338    if read_result.is_err() {
339        return Either::Left(internal_error_json(format!("Error reading file: {:?}", read_result.err().unwrap())));
340    }
341    let n = read_result.unwrap();
342    let start = String::from_utf8_lossy(&buf[..n]);        
343
344    let mut path = "".to_string();
345
346    if let Some(array_key) = &query.array_key {
347        path = array_key.to_owned();
348    };
349
350    let mut json_lines = query.json_lines.unwrap_or(false);
351
352    let mut guess_text = "".to_string();
353
354    if path.is_empty() && !json_lines {
355        match libflatterer::guess_array(&start) {
356            Ok((guess, _)) => {
357                if guess == "stream" {
358                    json_lines = true;
359                    guess_text = "JSON Stream".to_string()
360                };
361            }
362            Err(err) => {
363                let output = json!({"id": id, "error": err.to_string(), "start": start});
364                return Either::Left(bad_request_json(output))
365            }
366        }
367    }
368
369    let output_path_copy = output_path.clone();
370    let query_copy = query.clone().into_inner();
371
372    if let Err(err) = run_flatterer(query_copy, download_path, output_path_copy, json_lines, path) {
373        let output = json!({"id": id, "error": err.to_string(), "start": start});
374        return Either::Left(bad_request_json(output))
375    }
376
377    let tmp_dir_path_to_move = tmp_dir_path.to_path_buf();
378
379    let output_format = query.output_format.clone().unwrap_or_else(|| "zip".to_string());
380
381    if output_format == "fields" {
382        return Either::Right(actix_files::NamedFile::open_async(output_path.join("fields.csv")).await);
383    }
384
385    if output_format == "tables" {
386        return Either::Right(actix_files::NamedFile::open_async(output_path.join("tables.csv")).await);
387    }
388
389    if output_format == "preview" {
390        let fields_value_result = fields_output(output_path.clone());
391        if let Err(e) = fields_value_result {
392            return Either::Left(internal_error_json(format!("Error reading fields.csv: {:?}", e)));
393        }
394        let fields_value = fields_value_result.unwrap();
395
396        let preview_value_result = preview_output(output_path.clone(), fields_value).await;
397        if let Err(e) = preview_value_result {
398            return Either::Left(internal_error_json(format!("Error creating preview: {:?}", e)));
399        }
400
401        let preview_value = preview_value_result.expect("just checked");
402        let output = json!({"id": id, "preview": preview_value, "start": start, "guess_text": guess_text});
403
404        return Either::Left(HttpResponse::Ok().body(output.to_string()));
405
406    }
407
408    if output_format == "xlsx" {
409        return Either::Right(actix_files::NamedFile::open_async(output_path.join("output.xlsx")).await);
410    }
411    
412    if output_format == "sqlite" {
413        return Either::Right(actix_files::NamedFile::open_async(output_path.join("sqlite.db")).await);
414    }
415
416    if output_format == "csv" {
417        let main_table_name = query.main_table_name.clone().unwrap_or_else(|| "main".to_string());
418        return Either::Right(actix_files::NamedFile::open_async(output_path.join("csv").join(format!("{}.csv", main_table_name))).await);
419    }
420
421    let zip_output_result = zip_output(output_path.clone(), tmp_dir_path_to_move.to_path_buf());
422
423    if let Err(e) = zip_output_result {
424        return Either::Left(internal_error_json(format!("Error zipping output: {:?}", e)));
425    }
426
427    return Either::Right(actix_files::NamedFile::open_async( tmp_dir_path.join("export.zip")).await);
428}
429
430
431fn clean_tmp() ->  std::io::Result<()> {
432
433    let clean_tmp_time = if let Ok(clean_tmp_time) = var("CLEAN_TMP_TIME") {
434        match clean_tmp_time.parse::<u64>() {
435            Ok(clean_tmp_time) => {clean_tmp_time},
436            _ => {3600}
437        }
438    } else {
439        3600
440    };
441
442
443    for entry in WalkDir::new("/tmp/")
444        .min_depth(1)
445        .into_iter()
446        .filter_map(|e| e.ok())
447    {
448        if !entry
449            .file_name()
450            .to_string_lossy()
451            .starts_with("flatterer-")
452        {
453            continue;
454        }
455        if entry.metadata()?.modified()?.elapsed().map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "elapsed time not able to be calculated"))?.as_secs() > clean_tmp_time {
456            log::debug!("Removing tmp dir: {:?}", entry);
457
458            if entry.metadata()?.is_dir() {
459                std::fs::remove_dir_all(&entry.into_path())?;
460            }
461        }
462    }
463    Ok(())
464}
465
466
467#[actix_web::main]
468pub async fn main() -> std::io::Result<()> {
469    env_logger::init();
470    clean_tmp()?;
471
472    let port = if let Ok(port) = var("PORT") {
473        port
474    } else {
475        "8080".to_string()
476    };
477
478    let host = if let Ok(host) = var("HOST") {
479        host
480    } else {
481        "127.0.0.1".to_string()
482    };
483
484
485
486    let open_browser = if let Ok(_) = var("OPEN_BROWSER") {
487        true
488    } else {
489        false
490    };
491
492    let path = format!("http://{}:{}", host, port);
493
494    if open_browser {
495        match open::that(&path) {
496            Ok(()) => println!("Opened browser '{}' successfully.", path),
497            Err(err) => eprintln!("An error occurred when opening browser'{}': {}", path, err),
498        } 
499    } else {
500        println!("Running at '{path}'.")
501    }
502
503    let static_files = if let Ok(static_files) = var("STATIC_FILES") {
504        if let Some(static_files) = static_files.strip_suffix("/") {
505            static_files.to_owned()
506        } else {
507            static_files
508        }
509    } else {
510        "dist".to_owned()
511    };
512
513    let port: u16 = port.parse().expect("PORT must be a valid u16 integer");
514
515    HttpServer::new(move || {
516        App::new()
517            .wrap(Logger::default())
518            .service(
519                web::resource("/api/get_input")
520                .route(web::post().to(get_input))
521                .route(web::get().to(get_input))
522                .route(web::put().to(get_input))
523            )
524            .service(
525                web::resource("/api/convert")
526                .route(web::post().to(convert))
527                .route(web::get().to(convert))
528                .route(web::put().to(convert))
529            )
530            .service(Files::new("/", static_files.clone()).index_file("index.html"))
531    })
532    .bind((host, port))?
533    .run()
534    .await
535}