flatterer_web/
lib.rs

1mod buffered_byte_stream;
2mod limited_copy;
3use async_std::fs::File;
4use async_std::io::prelude::*;
5use async_std::io::{BufReader, BufWriter};
6use limited_copy::copy as limited_copy;
7use buffered_byte_stream::BufferedBytesStream;
8use libflatterer::{flatten, Options};
9use std::collections::HashMap;
10use std::fs::File as StdFile;
11use std::io::{copy as std_copy, BufReader as StdBufReader};
12use surf::http::{Method, Url};
13use tempfile::TempDir;
14use tide::{http, log, utils, Body, Request, Response, StatusCode};
15//use async_std::task;
16use csv::Reader;
17use multer::{Constraints, Multipart, SizeLimit};
18use serde::{Deserialize, Serialize};
19use serde_json::{json, Value};
20use std::env::var;
21use std::path::PathBuf;
22use uuid::Uuid;
23use walkdir::WalkDir;
24use http_types::headers::HeaderValue;
25use tide::security::{CorsMiddleware, Origin};
26
27#[derive(Deserialize, Debug, Clone)]
28struct Query {
29    id: Option<String>,
30    output_format: Option<String>,
31    file_url: Option<String>,
32    array_key: Option<String>,
33    json_lines: Option<bool>,
34    main_table_name: Option<String>,
35    inline_one_to_one: Option<bool>,
36    json_schema: Option<String>,
37    table_prefix: Option<String>,
38    path_seperator: Option<String>,
39    schema_titles: Option<String>,
40    fields_only: Option<bool>,
41    tables_only: Option<bool>,
42    pushdown: Option<String>,
43}
44
45fn get_app() -> tide::Result<tide::Server<()>> {
46    let mut app = tide::new();
47
48    app.with(utils::After(|res: Response| async move {
49        if let Some(err) = res.error() {
50            if res.status() == http::StatusCode::InternalServerError {
51                log::error!("Internal Error: {:?}", err)
52            } else {
53                log::error!("HTTP Error: {:?}", err)
54            }
55        }
56        Ok(res)
57    }));
58
59    let static_files = if let Ok(static_files) = var("STATIC_FILES") {
60        if let Some(static_files) = static_files.strip_suffix("/") {
61            static_files.to_owned()
62        } else {
63            static_files
64        }
65    } else {
66        "dist".to_owned()
67    };
68
69    let cors = CorsMiddleware::new()
70        .allow_methods("GET, POST, OPTIONS".parse::<HeaderValue>()?)
71        .allow_origin(Origin::from("*"))
72        .allow_credentials(false);
73    
74    app.with(cors);
75
76    app.at("/api/convert").get(convert);
77    app.at("/api/convert").post(convert);
78    app.at("/api/convert").put(convert);
79    app.at("/about").serve_file(format!("{static_files}/index.html"))?;
80    app.at("/").serve_file(format!("{static_files}/index.html"))?;
81    app.at("/").serve_dir(format!("{static_files}/"))?;
82
83    Ok(app)
84}
85
86#[async_std::main]
87pub async fn main() -> tide::Result<()> {
88    env_logger::init();
89    clean_tmp()?;
90
91    let app = get_app()?;
92    let port = if let Ok(port) = var("PORT") {
93        port
94    } else {
95        "8080".to_string()
96    };
97
98    let host = if let Ok(host) = var("HOST") {
99        host
100    } else {
101        "127.0.0.1".to_string()
102    };
103
104    let open_browser = if let Ok(_) = var("OPEN_BROWSER") {
105        true
106    } else {
107        false
108    };
109
110    let path = format!("http://{}:{}", host, port);
111
112    if open_browser {
113        match open::that(&path) {
114            Ok(()) => println!("Opened browser '{}' successfully.", path),
115            Err(err) => eprintln!("An error occurred when opening browser'{}': {}", path, err),
116        } 
117    } else {
118        println!("Running at '{path}'.")
119    }
120
121    app.listen(format!("http://{}:{}", host, port)).await?;
122
123    Ok(())
124}
125
126
127#[derive(Debug, Deserialize, Serialize)]
128struct FieldsRecord {
129    table_name: String,
130    field_name: String,
131    field_type: String,
132    field_title: Option<String>,
133}
134
135
136async fn download(url_string: String, tmp_dir: PathBuf) -> tide::Result<()> {
137
138    if !url_string.starts_with("http") {
139        return Err(tide::Error::from_str(tide::StatusCode::BadRequest, "`url` is empty or does not start with `http`"))
140    }
141
142    let url = Url::parse(&url_string)?;
143    let req = surf::Request::new(Method::Get, url);
144    let client = surf::client();
145
146    let mut file_response = client.send(req).await?;
147
148    if !file_response.status().is_success() {
149        return Err(tide::Error::from_str(tide::StatusCode::BadRequest, "file download failed due to bad request status code`"))
150    }
151
152    let download_file = tmp_dir.join("download.json");
153    let file = File::create(&download_file).await?;
154    let mut writer = BufWriter::new(file);
155
156    limited_copy(&mut file_response, &mut writer).await?;
157
158    Ok(())
159}
160
161async fn multipart_upload(req: Request<()>, multipart_boundry: String, tmp_dir: PathBuf) -> tide::Result<Vec<String>> {
162
163    let body_stream = BufferedBytesStream { inner: req };
164
165    let max_size = if let Ok(max_size) = var("MAX_SIZE") {
166        match max_size.parse::<u64>() {
167            Ok(max_size) => {max_size},
168            _ => {500}
169        }
170    } else {
171        500
172    };
173
174    let constraints = Constraints::new()
175    .size_limit(
176        SizeLimit::new()
177            .whole_stream(max_size * 1024 * 1024)
178    );
179    let mut multipart = Multipart::with_constraints(body_stream, multipart_boundry.clone(), constraints);
180
181    let mut output = vec![];
182
183    while let Some(mut field) = multipart.next_field().await? {
184        let download_file;
185        let mut download_output;
186
187        if field.name() == Some("file") {
188            download_file = tmp_dir.join("download.json");
189            output.push("file".to_string());
190        }
191        else if field.name() == Some("fields") {
192            download_file = tmp_dir.join("fields.csv");
193            output.push("fields".to_string());
194        }
195        else if field.name() == Some("tables") {
196            download_file = tmp_dir.join("tables.csv");
197            output.push("tables".to_string());
198        } else {
199            break
200        }
201        download_output = File::create(&download_file).await?;
202        while let Some(chunk) = field.chunk().await? {
203            download_output.write_all(&chunk).await?;
204        }
205    }
206
207    Ok(output)
208}
209
210async fn json_request(mut req: Request<()>, tmp_dir: PathBuf) -> tide::Result<()> {
211    let download_file = tmp_dir.join("download.json");
212
213    let mut output = File::create(&download_file).await?;
214    limited_copy(&mut req, &mut output).await?;
215    Ok(())
216}
217
218fn clean_tmp() -> tide::Result<()> {
219
220    let clean_tmp_time = if let Ok(clean_tmp_time) = var("CLEAN_TMP_TIME") {
221        match clean_tmp_time.parse::<u64>() {
222            Ok(clean_tmp_time) => {clean_tmp_time},
223            _ => {3600}
224        }
225    } else {
226        3600
227    };
228
229
230    for entry in WalkDir::new("/tmp/")
231        .min_depth(1)
232        .into_iter()
233        .filter_map(|e| e.ok())
234    {
235        if !entry
236            .file_name()
237            .to_string_lossy()
238            .starts_with("flatterer-")
239        {
240            continue;
241        }
242        if entry.metadata()?.modified()?.elapsed()?.as_secs() > clean_tmp_time {
243            log::debug!("Removing tmp dir: {:?}", entry);
244
245            if entry.metadata()?.is_dir() {
246                std::fs::remove_dir_all(&entry.into_path())?;
247            }
248        }
249    }
250    Ok(())
251}
252
253async fn convert(req: Request<()>) -> tide::Result<Response> {
254    let query: Query = req.query()?;
255    let tmp_dir = TempDir::new()?;
256    let tmp_dir_path = tmp_dir.path();
257    let output_path = tmp_dir_path.join("output");
258
259    let mut multipart_boundry = "".to_string();
260    let mut content_type = "".to_string();
261
262    if let Some(mime) = req.content_type() {
263        content_type = mime.essence().to_string();
264        if content_type == "multipart/form-data" {
265            if let Some(boundry) = mime.param("boundary") {
266                multipart_boundry = boundry.to_string()
267            }
268        }
269    }
270
271    let mut json_output;
272
273    if let Some(id) = &query.id {
274        json_output = json!({ "id": id });
275    } else {  
276        clean_tmp()?;
277        let uuid = Uuid::new_v4().hyphenated();
278        let tmp_dir = std::env::temp_dir().join(format!("flatterer-{}", uuid));
279        json_output = json!({ "id": uuid.to_string() });
280        async_std::fs::create_dir(&tmp_dir).await?;
281
282        let mut uploaded_files = vec![];
283
284        if !multipart_boundry.is_empty() {
285            match multipart_upload(req, multipart_boundry, tmp_dir.clone()).await {
286                 Err(error) => {json_output = json!({"error": error.to_string()})}
287                 Ok(val) => {uploaded_files = val}
288            }
289        } else if content_type == "application/json" {
290            if let Err(error) = json_request(req, tmp_dir.clone()).await {
291                json_output = json!({"error": error.to_string()})
292            }
293            uploaded_files.push("file".to_string());
294        } 
295
296        if let Some(file_url) = &query.file_url {
297            if let Err(error) = download(file_url.clone(), tmp_dir).await {
298                json_output = json!({"error": error.to_string()})
299            }
300            uploaded_files.push("file".to_string());
301        }
302
303        if !uploaded_files.contains(&"file".to_string()) {
304            json_output = json!({"error": "need to supply either an id or filename or supply data in request body"});
305        }
306    }
307
308    let mut download_path = std::env::temp_dir();
309    let mut download_file = std::env::temp_dir();
310    let mut id = "".to_string();
311
312    if let Some(id_value) = json_output.get("id") {
313        if let Some(id_string) = id_value.as_str() {
314            id = id_string.to_string();
315            download_path.push(format!("flatterer-{}", id_string));
316            download_file.push(format!("flatterer-{}", id_string));
317            download_file.push("download.json");
318            if !download_file.exists() {
319                json_output = json!({"error": "id does not exist, you may need to ask you file to be downloaded again or to upload the file again."})
320            }
321        }
322    }
323
324    if json_output.get("error").is_some() {
325        let mut res = Response::new(StatusCode::BadRequest);
326        let body = Body::from_json(&json_output)?;
327        res.set_body(body);
328        return Ok(res);
329    }
330
331    let mut file = File::open(download_file).await?;
332    let mut buf = vec![0;10240];
333    let n = file.read(&mut buf).await?;
334    let start = String::from_utf8_lossy(&buf[..n]);        
335
336    let mut path = "".to_string();
337
338    if let Some(array_key) = &query.array_key {
339        path = array_key.to_owned();
340    };
341
342    let mut json_lines = query.json_lines.unwrap_or(false);
343
344    let mut guess_text = "".to_string();
345
346    if path.is_empty() && !json_lines {
347        match libflatterer::guess_array(&start) {
348            Ok((guess, _)) => {
349                if guess == "stream" {
350                    json_lines = true;
351                    guess_text = "JSON Stream".to_string()
352                };
353            }
354            Err(err) => {
355                let mut res = Response::new(StatusCode::BadRequest);
356                let output = json!({"id": id, "error": err.to_string(), "start": start});
357                let body = Body::from_json(&output)?;
358                res.set_body(body);
359                return Ok(res);
360            }
361        }
362    }
363
364    let output_path_copy = output_path.clone();
365    let query_copy = query.clone();
366
367    let flatterer_result = async_std::task::spawn_blocking(move || -> tide::Result<()> {
368        run_flatterer(query_copy, download_path, output_path_copy, json_lines, path)?;
369        Ok(())
370    })
371    .await;
372
373    if let Err(err) = flatterer_result {
374        let mut res = Response::new(StatusCode::BadRequest);
375        let output = json!({"id": id, "error": err.to_string(), "start": start});
376        let body = Body::from_json(&output)?;
377        res.set_body(body);
378        return Ok(res);
379    }
380
381    let tmp_dir_path_to_move = tmp_dir_path.to_path_buf();
382
383    let output_format = query.output_format.unwrap_or_else(|| "zip".to_string());
384
385    if output_format == "fields" {
386        let fields_file = File::open(output_path.join("fields.csv")).await?;
387        let fields_file_buf = BufReader::new(fields_file);
388
389        let mut res = Response::new(StatusCode::Ok);
390        let body = Body::from_reader(fields_file_buf, None);
391        res.set_body(body);
392        res.set_content_type("text/csv");
393        res.append_header(
394            "Content-Disposition",
395            format!("attachment; filename=\"{}\"", "fields.csv"),
396        );
397        return Ok(res);
398    }
399
400    if output_format == "tables" {
401        let tables_file = File::open(output_path.join("tables.csv")).await?;
402        let tables_file_buf = BufReader::new(tables_file);
403
404        let mut res = Response::new(StatusCode::Ok);
405        let body = Body::from_reader(tables_file_buf, None);
406        res.set_body(body);
407        res.set_content_type("text/csv");
408        res.append_header(
409            "Content-Disposition",
410            format!("attachment; filename=\"{}\"", "tables.csv"),
411        );
412        return Ok(res);
413    }
414
415    if output_format == "preview" {
416        let fields_value = fields_output(output_path.clone())?;
417        let preview_value = preview_output(output_path.clone(), fields_value).await?;
418        let output = json!({"id": id, "preview": preview_value, "start": start, "guess_text": guess_text});
419        let mut res = Response::new(StatusCode::Ok);
420        let body = Body::from_json(&output)?;
421        res.set_body(body);
422        return Ok(res);
423    }
424
425    if output_format == "xlsx" {
426        let xlsx_file = File::open(output_path.join("output.xlsx")).await?;
427        let xlsx_file_buf = BufReader::new(xlsx_file);
428
429        let mut res = Response::new(StatusCode::Ok);
430        let body = Body::from_reader(xlsx_file_buf, None);
431        res.set_body(body);
432        res.set_content_type("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet");
433        res.append_header(
434            "Content-Disposition",
435            format!("attachment; filename=\"{}.xlsx\"", "flatterer-output"),
436        );
437        return Ok(res);
438    }
439    
440    if output_format == "sqlite" {
441        let sqlite_file = File::open(output_path.join("sqlite.db")).await?;
442        let sqlite_file_buf = BufReader::new(sqlite_file);
443
444        let mut res = Response::new(StatusCode::Ok);
445        let body = Body::from_reader(sqlite_file_buf, None);
446        res.set_body(body);
447        res.set_content_type("application/x-sqlite3");
448        res.append_header(
449            "Content-Disposition",
450            format!("attachment; filename=\"{}.db\"", "flatterer"),
451        );
452        return Ok(res);
453    }
454
455    if output_format == "csv" {
456        let main_table_name = query.main_table_name.unwrap_or_else(|| "main".to_string());
457
458        let csv_file = File::open(output_path.join("csv").join(format!("{}.csv", main_table_name))).await?;
459        let csv_file_buf = BufReader::new(csv_file);
460
461        let mut res = Response::new(StatusCode::Ok);
462        let body = Body::from_reader(csv_file_buf, None);
463        res.set_body(body);
464        res.set_content_type("text/csv");
465        res.append_header(
466            "Content-Disposition",
467            format!("attachment; filename=\"{}.csv\"", "flatterer-output"),
468        );
469        return Ok(res);
470    }
471
472    async_std::task::spawn_blocking(move || -> tide::Result<()> {
473        zip_output(output_path.clone(), tmp_dir_path_to_move.to_path_buf())?;
474        Ok(())
475    })
476    .await?;
477
478    let zip_file = tmp_dir_path.join("export.zip");
479    let mut res = Response::new(StatusCode::Ok);
480    let output = File::open(zip_file).await?;
481
482    let body = Body::from_reader(BufReader::new(output), None); // set the body length
483
484    res.set_body(body);
485    res.set_content_type("application/zip");
486    res.append_header(
487        "Content-Disposition",
488        format!("attachment; filename=\"{}.zip\"", "flatterer-download"),
489    );
490
491    Ok(res)
492}
493
494fn run_flatterer(
495    query: Query,
496    download_path: PathBuf,
497    output_path: PathBuf,
498    json_lines: bool,
499    path: String,
500) -> tide::Result<()> {
501    let file = StdFile::open(download_path.join("download.json"))?;
502    let reader = StdBufReader::new(file);
503
504    let output_format = query.output_format.unwrap_or_else(|| "zip".to_string());
505
506    let mut options = Options::builder().build();
507
508    if output_format != "zip" {
509        options.csv = false;
510        options.xlsx = false;
511        options.sqlite = false;
512    }
513
514    if output_format == "xlsx" {
515        options.xlsx = true;
516    }
517    if output_format == "csv" {
518        options.csv = true;
519    }
520    if output_format == "sqlite" {
521        options.sqlite = true;
522    }
523    if output_format == "preview" {
524        options.csv = true;
525        options.preview = 10;
526    }
527    options.force = true;
528    options.main_table_name = query.main_table_name.unwrap_or_else(|| "main".to_string());
529
530    options.inline_one_to_one = query.inline_one_to_one.unwrap_or(false);
531
532    options.schema = query.json_schema.unwrap_or_else(|| "".to_string());
533
534    options.table_prefix = query.table_prefix.unwrap_or_else(|| "".to_string());
535    options.path_separator = query.path_seperator.unwrap_or_else(|| "_".to_string());
536    options.schema_titles = query.schema_titles.unwrap_or_else(|| "".to_string());
537    options.json_stream = json_lines;
538
539    let fields_path = download_path.join("fields.csv");
540    if fields_path.exists() {
541        options.fields_csv = fields_path.to_string_lossy().into();
542    }
543    options.only_fields = query.fields_only.unwrap_or_else(|| false);
544
545    let tables_path = download_path.join("tables.csv");
546    if tables_path.exists() {
547        options.tables_csv = tables_path.to_string_lossy().into();
548    }
549    options.only_tables = query.tables_only.unwrap_or_else(|| false);
550
551    let pushdown = query.pushdown.unwrap_or_else(|| "".into());
552    if !pushdown.is_empty() {
553        options.pushdown = vec![pushdown];
554    }
555
556    let mut path_vec = vec![];
557
558    if !path.is_empty() && !json_lines {
559        path_vec.push(path);
560    }
561    options.path = path_vec;
562
563    flatten(
564        Box::new(reader),
565        output_path.to_string_lossy().to_string(),
566        options
567    )?;
568    Ok(())
569}
570
571fn zip_output(output_path: PathBuf, tmp_dir_path: PathBuf) -> tide::Result<()> {
572    let zip_file = tmp_dir_path.join("export.zip");
573
574    let file = StdFile::create(&zip_file)?;
575    let mut zip = zip::ZipWriter::new(file);
576
577    let options = zip::write::FileOptions::default();
578
579    for entry in WalkDir::new(output_path.clone())
580        .min_depth(1)
581        .into_iter()
582        .filter_map(|e| e.ok())
583    {
584        let path = entry.path();
585
586        if path.is_dir() {
587            zip.add_directory(
588                path.strip_prefix(output_path.clone())?.to_string_lossy(),
589                options,
590            )?;
591        } else {
592            zip.start_file(
593                path.strip_prefix(output_path.clone())?.to_string_lossy(),
594                options,
595            )?;
596            let mut file = StdFile::open(path)?;
597            std_copy(&mut file, &mut zip)?;
598        }
599    }
600    Ok(())
601}
602
603fn fields_output(output_path: PathBuf) -> tide::Result<Vec<HashMap<String, String>>> {
604    let mut csv_reader = Reader::from_path(output_path.join("fields.csv"))?;
605
606    let mut all_fields = vec![];
607
608    for result in csv_reader.deserialize() {
609        let record: HashMap<String, String> = result?;
610        all_fields.push(record)
611    }
612    Ok(all_fields)
613}
614
615async fn preview_output(output_path: PathBuf, fields: Vec<HashMap<String, String>>) -> tide::Result<Value> {
616    let mut previews = vec![];
617
618    let mut tables_reader = Reader::from_path(output_path.join("tables.csv"))?;
619
620    for row in tables_reader.deserialize() {
621        let table_row: HashMap<String, String> = row?;
622        let table = table_row.get("table_name").unwrap().clone();
623        let table_title = table_row.get("table_title").unwrap().clone();
624
625        let path = output_path.join("csv").join(format!("{}.csv", table_title));
626
627        let mut table_fields = vec![];
628
629        for field in fields.iter() {
630            if field.get("table_name").unwrap() == &table {
631                table_fields.push(field.clone());
632            }
633        }
634
635        let mut reader = Reader::from_path(path)?;
636        for (row_num, row) in reader.deserialize().enumerate() {
637            let row: Vec<String> = row?;
638            for (col_num, item) in row.iter().enumerate(){
639                table_fields[col_num].insert(format!("row {}", row_num), item.clone());
640            }
641        }
642
643        let preview = json!({"table_name": table_title, "fields": table_fields});
644
645        previews.push(preview);
646    }
647    Ok(serde_json::to_value(previews)?)
648}
649
650#[cfg(test)]
651mod tests {
652    // Note this useful idiom: importing names from outer (for mod tests) scope.
653    use super::*;
654    use async_std::fs::read_to_string;
655    use tide_testing::TideTestingExt;
656
657    #[test]
658    fn test_preview_output() {
659        async_std::task::block_on(async {
660            let app = get_app().unwrap();
661
662            let body_string = read_to_string("fixtures/basic.json").await.unwrap();
663
664            let response_body: serde_json::value::Value = app
665                .post("/api/convert?output_format=preview")
666                .body(tide::Body::from_string(body_string))
667                .content_type("application/json")
668                .recv_json()
669                .await
670                .unwrap();
671
672            insta::with_settings!({sort_maps => true}, {
673                insta::assert_yaml_snapshot!(&response_body, {".id" => "[id]"});
674            });
675        })
676    }
677}