1mod buffered_byte_stream;
2mod limited_copy;
3use async_std::fs::File;
4use async_std::io::prelude::*;
5use async_std::io::{BufReader, BufWriter};
6use limited_copy::copy as limited_copy;
7use buffered_byte_stream::BufferedBytesStream;
8use libflatterer::{flatten, Options};
9use std::collections::HashMap;
10use std::fs::File as StdFile;
11use std::io::{copy as std_copy, BufReader as StdBufReader};
12use surf::http::{Method, Url};
13use tempfile::TempDir;
14use tide::{http, log, utils, Body, Request, Response, StatusCode};
15use csv::Reader;
17use multer::{Constraints, Multipart, SizeLimit};
18use serde::{Deserialize, Serialize};
19use serde_json::{json, Value};
20use std::env::var;
21use std::path::PathBuf;
22use uuid::Uuid;
23use walkdir::WalkDir;
24use http_types::headers::HeaderValue;
25use tide::security::{CorsMiddleware, Origin};
26
27#[derive(Deserialize, Debug, Clone)]
28struct Query {
29 id: Option<String>,
30 output_format: Option<String>,
31 file_url: Option<String>,
32 array_key: Option<String>,
33 json_lines: Option<bool>,
34 main_table_name: Option<String>,
35 inline_one_to_one: Option<bool>,
36 json_schema: Option<String>,
37 table_prefix: Option<String>,
38 path_seperator: Option<String>,
39 schema_titles: Option<String>,
40 fields_only: Option<bool>,
41 tables_only: Option<bool>,
42 pushdown: Option<String>,
43}
44
45fn get_app() -> tide::Result<tide::Server<()>> {
46 let mut app = tide::new();
47
48 app.with(utils::After(|res: Response| async move {
49 if let Some(err) = res.error() {
50 if res.status() == http::StatusCode::InternalServerError {
51 log::error!("Internal Error: {:?}", err)
52 } else {
53 log::error!("HTTP Error: {:?}", err)
54 }
55 }
56 Ok(res)
57 }));
58
59 let static_files = if let Ok(static_files) = var("STATIC_FILES") {
60 if let Some(static_files) = static_files.strip_suffix("/") {
61 static_files.to_owned()
62 } else {
63 static_files
64 }
65 } else {
66 "dist".to_owned()
67 };
68
69 let cors = CorsMiddleware::new()
70 .allow_methods("GET, POST, OPTIONS".parse::<HeaderValue>()?)
71 .allow_origin(Origin::from("*"))
72 .allow_credentials(false);
73
74 app.with(cors);
75
76 app.at("/api/convert").get(convert);
77 app.at("/api/convert").post(convert);
78 app.at("/api/convert").put(convert);
79 app.at("/about").serve_file(format!("{static_files}/index.html"))?;
80 app.at("/").serve_file(format!("{static_files}/index.html"))?;
81 app.at("/").serve_dir(format!("{static_files}/"))?;
82
83 Ok(app)
84}
85
86#[async_std::main]
87pub async fn main() -> tide::Result<()> {
88 env_logger::init();
89 clean_tmp()?;
90
91 let app = get_app()?;
92 let port = if let Ok(port) = var("PORT") {
93 port
94 } else {
95 "8080".to_string()
96 };
97
98 let host = if let Ok(host) = var("HOST") {
99 host
100 } else {
101 "127.0.0.1".to_string()
102 };
103
104 let open_browser = if let Ok(_) = var("OPEN_BROWSER") {
105 true
106 } else {
107 false
108 };
109
110 let path = format!("http://{}:{}", host, port);
111
112 if open_browser {
113 match open::that(&path) {
114 Ok(()) => println!("Opened browser '{}' successfully.", path),
115 Err(err) => eprintln!("An error occurred when opening browser'{}': {}", path, err),
116 }
117 } else {
118 println!("Running at '{path}'.")
119 }
120
121 app.listen(format!("http://{}:{}", host, port)).await?;
122
123 Ok(())
124}
125
126
127#[derive(Debug, Deserialize, Serialize)]
128struct FieldsRecord {
129 table_name: String,
130 field_name: String,
131 field_type: String,
132 field_title: Option<String>,
133}
134
135
136async fn download(url_string: String, tmp_dir: PathBuf) -> tide::Result<()> {
137
138 if !url_string.starts_with("http") {
139 return Err(tide::Error::from_str(tide::StatusCode::BadRequest, "`url` is empty or does not start with `http`"))
140 }
141
142 let url = Url::parse(&url_string)?;
143 let req = surf::Request::new(Method::Get, url);
144 let client = surf::client();
145
146 let mut file_response = client.send(req).await?;
147
148 if !file_response.status().is_success() {
149 return Err(tide::Error::from_str(tide::StatusCode::BadRequest, "file download failed due to bad request status code`"))
150 }
151
152 let download_file = tmp_dir.join("download.json");
153 let file = File::create(&download_file).await?;
154 let mut writer = BufWriter::new(file);
155
156 limited_copy(&mut file_response, &mut writer).await?;
157
158 Ok(())
159}
160
161async fn multipart_upload(req: Request<()>, multipart_boundry: String, tmp_dir: PathBuf) -> tide::Result<Vec<String>> {
162
163 let body_stream = BufferedBytesStream { inner: req };
164
165 let max_size = if let Ok(max_size) = var("MAX_SIZE") {
166 match max_size.parse::<u64>() {
167 Ok(max_size) => {max_size},
168 _ => {500}
169 }
170 } else {
171 500
172 };
173
174 let constraints = Constraints::new()
175 .size_limit(
176 SizeLimit::new()
177 .whole_stream(max_size * 1024 * 1024)
178 );
179 let mut multipart = Multipart::with_constraints(body_stream, multipart_boundry.clone(), constraints);
180
181 let mut output = vec![];
182
183 while let Some(mut field) = multipart.next_field().await? {
184 let download_file;
185 let mut download_output;
186
187 if field.name() == Some("file") {
188 download_file = tmp_dir.join("download.json");
189 output.push("file".to_string());
190 }
191 else if field.name() == Some("fields") {
192 download_file = tmp_dir.join("fields.csv");
193 output.push("fields".to_string());
194 }
195 else if field.name() == Some("tables") {
196 download_file = tmp_dir.join("tables.csv");
197 output.push("tables".to_string());
198 } else {
199 break
200 }
201 download_output = File::create(&download_file).await?;
202 while let Some(chunk) = field.chunk().await? {
203 download_output.write_all(&chunk).await?;
204 }
205 }
206
207 Ok(output)
208}
209
210async fn json_request(mut req: Request<()>, tmp_dir: PathBuf) -> tide::Result<()> {
211 let download_file = tmp_dir.join("download.json");
212
213 let mut output = File::create(&download_file).await?;
214 limited_copy(&mut req, &mut output).await?;
215 Ok(())
216}
217
218fn clean_tmp() -> tide::Result<()> {
219
220 let clean_tmp_time = if let Ok(clean_tmp_time) = var("CLEAN_TMP_TIME") {
221 match clean_tmp_time.parse::<u64>() {
222 Ok(clean_tmp_time) => {clean_tmp_time},
223 _ => {3600}
224 }
225 } else {
226 3600
227 };
228
229
230 for entry in WalkDir::new("/tmp/")
231 .min_depth(1)
232 .into_iter()
233 .filter_map(|e| e.ok())
234 {
235 if !entry
236 .file_name()
237 .to_string_lossy()
238 .starts_with("flatterer-")
239 {
240 continue;
241 }
242 if entry.metadata()?.modified()?.elapsed()?.as_secs() > clean_tmp_time {
243 log::debug!("Removing tmp dir: {:?}", entry);
244
245 if entry.metadata()?.is_dir() {
246 std::fs::remove_dir_all(&entry.into_path())?;
247 }
248 }
249 }
250 Ok(())
251}
252
253async fn convert(req: Request<()>) -> tide::Result<Response> {
254 let query: Query = req.query()?;
255 let tmp_dir = TempDir::new()?;
256 let tmp_dir_path = tmp_dir.path();
257 let output_path = tmp_dir_path.join("output");
258
259 let mut multipart_boundry = "".to_string();
260 let mut content_type = "".to_string();
261
262 if let Some(mime) = req.content_type() {
263 content_type = mime.essence().to_string();
264 if content_type == "multipart/form-data" {
265 if let Some(boundry) = mime.param("boundary") {
266 multipart_boundry = boundry.to_string()
267 }
268 }
269 }
270
271 let mut json_output;
272
273 if let Some(id) = &query.id {
274 json_output = json!({ "id": id });
275 } else {
276 clean_tmp()?;
277 let uuid = Uuid::new_v4().hyphenated();
278 let tmp_dir = std::env::temp_dir().join(format!("flatterer-{}", uuid));
279 json_output = json!({ "id": uuid.to_string() });
280 async_std::fs::create_dir(&tmp_dir).await?;
281
282 let mut uploaded_files = vec![];
283
284 if !multipart_boundry.is_empty() {
285 match multipart_upload(req, multipart_boundry, tmp_dir.clone()).await {
286 Err(error) => {json_output = json!({"error": error.to_string()})}
287 Ok(val) => {uploaded_files = val}
288 }
289 } else if content_type == "application/json" {
290 if let Err(error) = json_request(req, tmp_dir.clone()).await {
291 json_output = json!({"error": error.to_string()})
292 }
293 uploaded_files.push("file".to_string());
294 }
295
296 if let Some(file_url) = &query.file_url {
297 if let Err(error) = download(file_url.clone(), tmp_dir).await {
298 json_output = json!({"error": error.to_string()})
299 }
300 uploaded_files.push("file".to_string());
301 }
302
303 if !uploaded_files.contains(&"file".to_string()) {
304 json_output = json!({"error": "need to supply either an id or filename or supply data in request body"});
305 }
306 }
307
308 let mut download_path = std::env::temp_dir();
309 let mut download_file = std::env::temp_dir();
310 let mut id = "".to_string();
311
312 if let Some(id_value) = json_output.get("id") {
313 if let Some(id_string) = id_value.as_str() {
314 id = id_string.to_string();
315 download_path.push(format!("flatterer-{}", id_string));
316 download_file.push(format!("flatterer-{}", id_string));
317 download_file.push("download.json");
318 if !download_file.exists() {
319 json_output = json!({"error": "id does not exist, you may need to ask you file to be downloaded again or to upload the file again."})
320 }
321 }
322 }
323
324 if json_output.get("error").is_some() {
325 let mut res = Response::new(StatusCode::BadRequest);
326 let body = Body::from_json(&json_output)?;
327 res.set_body(body);
328 return Ok(res);
329 }
330
331 let mut file = File::open(download_file).await?;
332 let mut buf = vec![0;10240];
333 let n = file.read(&mut buf).await?;
334 let start = String::from_utf8_lossy(&buf[..n]);
335
336 let mut path = "".to_string();
337
338 if let Some(array_key) = &query.array_key {
339 path = array_key.to_owned();
340 };
341
342 let mut json_lines = query.json_lines.unwrap_or(false);
343
344 let mut guess_text = "".to_string();
345
346 if path.is_empty() && !json_lines {
347 match libflatterer::guess_array(&start) {
348 Ok((guess, _)) => {
349 if guess == "stream" {
350 json_lines = true;
351 guess_text = "JSON Stream".to_string()
352 };
353 }
354 Err(err) => {
355 let mut res = Response::new(StatusCode::BadRequest);
356 let output = json!({"id": id, "error": err.to_string(), "start": start});
357 let body = Body::from_json(&output)?;
358 res.set_body(body);
359 return Ok(res);
360 }
361 }
362 }
363
364 let output_path_copy = output_path.clone();
365 let query_copy = query.clone();
366
367 let flatterer_result = async_std::task::spawn_blocking(move || -> tide::Result<()> {
368 run_flatterer(query_copy, download_path, output_path_copy, json_lines, path)?;
369 Ok(())
370 })
371 .await;
372
373 if let Err(err) = flatterer_result {
374 let mut res = Response::new(StatusCode::BadRequest);
375 let output = json!({"id": id, "error": err.to_string(), "start": start});
376 let body = Body::from_json(&output)?;
377 res.set_body(body);
378 return Ok(res);
379 }
380
381 let tmp_dir_path_to_move = tmp_dir_path.to_path_buf();
382
383 let output_format = query.output_format.unwrap_or_else(|| "zip".to_string());
384
385 if output_format == "fields" {
386 let fields_file = File::open(output_path.join("fields.csv")).await?;
387 let fields_file_buf = BufReader::new(fields_file);
388
389 let mut res = Response::new(StatusCode::Ok);
390 let body = Body::from_reader(fields_file_buf, None);
391 res.set_body(body);
392 res.set_content_type("text/csv");
393 res.append_header(
394 "Content-Disposition",
395 format!("attachment; filename=\"{}\"", "fields.csv"),
396 );
397 return Ok(res);
398 }
399
400 if output_format == "tables" {
401 let tables_file = File::open(output_path.join("tables.csv")).await?;
402 let tables_file_buf = BufReader::new(tables_file);
403
404 let mut res = Response::new(StatusCode::Ok);
405 let body = Body::from_reader(tables_file_buf, None);
406 res.set_body(body);
407 res.set_content_type("text/csv");
408 res.append_header(
409 "Content-Disposition",
410 format!("attachment; filename=\"{}\"", "tables.csv"),
411 );
412 return Ok(res);
413 }
414
415 if output_format == "preview" {
416 let fields_value = fields_output(output_path.clone())?;
417 let preview_value = preview_output(output_path.clone(), fields_value).await?;
418 let output = json!({"id": id, "preview": preview_value, "start": start, "guess_text": guess_text});
419 let mut res = Response::new(StatusCode::Ok);
420 let body = Body::from_json(&output)?;
421 res.set_body(body);
422 return Ok(res);
423 }
424
425 if output_format == "xlsx" {
426 let xlsx_file = File::open(output_path.join("output.xlsx")).await?;
427 let xlsx_file_buf = BufReader::new(xlsx_file);
428
429 let mut res = Response::new(StatusCode::Ok);
430 let body = Body::from_reader(xlsx_file_buf, None);
431 res.set_body(body);
432 res.set_content_type("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet");
433 res.append_header(
434 "Content-Disposition",
435 format!("attachment; filename=\"{}.xlsx\"", "flatterer-output"),
436 );
437 return Ok(res);
438 }
439
440 if output_format == "sqlite" {
441 let sqlite_file = File::open(output_path.join("sqlite.db")).await?;
442 let sqlite_file_buf = BufReader::new(sqlite_file);
443
444 let mut res = Response::new(StatusCode::Ok);
445 let body = Body::from_reader(sqlite_file_buf, None);
446 res.set_body(body);
447 res.set_content_type("application/x-sqlite3");
448 res.append_header(
449 "Content-Disposition",
450 format!("attachment; filename=\"{}.db\"", "flatterer"),
451 );
452 return Ok(res);
453 }
454
455 if output_format == "csv" {
456 let main_table_name = query.main_table_name.unwrap_or_else(|| "main".to_string());
457
458 let csv_file = File::open(output_path.join("csv").join(format!("{}.csv", main_table_name))).await?;
459 let csv_file_buf = BufReader::new(csv_file);
460
461 let mut res = Response::new(StatusCode::Ok);
462 let body = Body::from_reader(csv_file_buf, None);
463 res.set_body(body);
464 res.set_content_type("text/csv");
465 res.append_header(
466 "Content-Disposition",
467 format!("attachment; filename=\"{}.csv\"", "flatterer-output"),
468 );
469 return Ok(res);
470 }
471
472 async_std::task::spawn_blocking(move || -> tide::Result<()> {
473 zip_output(output_path.clone(), tmp_dir_path_to_move.to_path_buf())?;
474 Ok(())
475 })
476 .await?;
477
478 let zip_file = tmp_dir_path.join("export.zip");
479 let mut res = Response::new(StatusCode::Ok);
480 let output = File::open(zip_file).await?;
481
482 let body = Body::from_reader(BufReader::new(output), None); res.set_body(body);
485 res.set_content_type("application/zip");
486 res.append_header(
487 "Content-Disposition",
488 format!("attachment; filename=\"{}.zip\"", "flatterer-download"),
489 );
490
491 Ok(res)
492}
493
494fn run_flatterer(
495 query: Query,
496 download_path: PathBuf,
497 output_path: PathBuf,
498 json_lines: bool,
499 path: String,
500) -> tide::Result<()> {
501 let file = StdFile::open(download_path.join("download.json"))?;
502 let reader = StdBufReader::new(file);
503
504 let output_format = query.output_format.unwrap_or_else(|| "zip".to_string());
505
506 let mut options = Options::builder().build();
507
508 if output_format != "zip" {
509 options.csv = false;
510 options.xlsx = false;
511 options.sqlite = false;
512 }
513
514 if output_format == "xlsx" {
515 options.xlsx = true;
516 }
517 if output_format == "csv" {
518 options.csv = true;
519 }
520 if output_format == "sqlite" {
521 options.sqlite = true;
522 }
523 if output_format == "preview" {
524 options.csv = true;
525 options.preview = 10;
526 }
527 options.force = true;
528 options.main_table_name = query.main_table_name.unwrap_or_else(|| "main".to_string());
529
530 options.inline_one_to_one = query.inline_one_to_one.unwrap_or(false);
531
532 options.schema = query.json_schema.unwrap_or_else(|| "".to_string());
533
534 options.table_prefix = query.table_prefix.unwrap_or_else(|| "".to_string());
535 options.path_separator = query.path_seperator.unwrap_or_else(|| "_".to_string());
536 options.schema_titles = query.schema_titles.unwrap_or_else(|| "".to_string());
537 options.json_stream = json_lines;
538
539 let fields_path = download_path.join("fields.csv");
540 if fields_path.exists() {
541 options.fields_csv = fields_path.to_string_lossy().into();
542 }
543 options.only_fields = query.fields_only.unwrap_or_else(|| false);
544
545 let tables_path = download_path.join("tables.csv");
546 if tables_path.exists() {
547 options.tables_csv = tables_path.to_string_lossy().into();
548 }
549 options.only_tables = query.tables_only.unwrap_or_else(|| false);
550
551 let pushdown = query.pushdown.unwrap_or_else(|| "".into());
552 if !pushdown.is_empty() {
553 options.pushdown = vec![pushdown];
554 }
555
556 let mut path_vec = vec![];
557
558 if !path.is_empty() && !json_lines {
559 path_vec.push(path);
560 }
561 options.path = path_vec;
562
563 flatten(
564 Box::new(reader),
565 output_path.to_string_lossy().to_string(),
566 options
567 )?;
568 Ok(())
569}
570
571fn zip_output(output_path: PathBuf, tmp_dir_path: PathBuf) -> tide::Result<()> {
572 let zip_file = tmp_dir_path.join("export.zip");
573
574 let file = StdFile::create(&zip_file)?;
575 let mut zip = zip::ZipWriter::new(file);
576
577 let options = zip::write::FileOptions::default();
578
579 for entry in WalkDir::new(output_path.clone())
580 .min_depth(1)
581 .into_iter()
582 .filter_map(|e| e.ok())
583 {
584 let path = entry.path();
585
586 if path.is_dir() {
587 zip.add_directory(
588 path.strip_prefix(output_path.clone())?.to_string_lossy(),
589 options,
590 )?;
591 } else {
592 zip.start_file(
593 path.strip_prefix(output_path.clone())?.to_string_lossy(),
594 options,
595 )?;
596 let mut file = StdFile::open(path)?;
597 std_copy(&mut file, &mut zip)?;
598 }
599 }
600 Ok(())
601}
602
603fn fields_output(output_path: PathBuf) -> tide::Result<Vec<HashMap<String, String>>> {
604 let mut csv_reader = Reader::from_path(output_path.join("fields.csv"))?;
605
606 let mut all_fields = vec![];
607
608 for result in csv_reader.deserialize() {
609 let record: HashMap<String, String> = result?;
610 all_fields.push(record)
611 }
612 Ok(all_fields)
613}
614
615async fn preview_output(output_path: PathBuf, fields: Vec<HashMap<String, String>>) -> tide::Result<Value> {
616 let mut previews = vec![];
617
618 let mut tables_reader = Reader::from_path(output_path.join("tables.csv"))?;
619
620 for row in tables_reader.deserialize() {
621 let table_row: HashMap<String, String> = row?;
622 let table = table_row.get("table_name").unwrap().clone();
623 let table_title = table_row.get("table_title").unwrap().clone();
624
625 let path = output_path.join("csv").join(format!("{}.csv", table_title));
626
627 let mut table_fields = vec![];
628
629 for field in fields.iter() {
630 if field.get("table_name").unwrap() == &table {
631 table_fields.push(field.clone());
632 }
633 }
634
635 let mut reader = Reader::from_path(path)?;
636 for (row_num, row) in reader.deserialize().enumerate() {
637 let row: Vec<String> = row?;
638 for (col_num, item) in row.iter().enumerate(){
639 table_fields[col_num].insert(format!("row {}", row_num), item.clone());
640 }
641 }
642
643 let preview = json!({"table_name": table_title, "fields": table_fields});
644
645 previews.push(preview);
646 }
647 Ok(serde_json::to_value(previews)?)
648}
649
650#[cfg(test)]
651mod tests {
652 use super::*;
654 use async_std::fs::read_to_string;
655 use tide_testing::TideTestingExt;
656
657 #[test]
658 fn test_preview_output() {
659 async_std::task::block_on(async {
660 let app = get_app().unwrap();
661
662 let body_string = read_to_string("fixtures/basic.json").await.unwrap();
663
664 let response_body: serde_json::value::Value = app
665 .post("/api/convert?output_format=preview")
666 .body(tide::Body::from_string(body_string))
667 .content_type("application/json")
668 .recv_json()
669 .await
670 .unwrap();
671
672 insta::with_settings!({sort_maps => true}, {
673 insta::assert_yaml_snapshot!(&response_body, {".id" => "[id]"});
674 });
675 })
676 }
677}