1#[macro_use]
19extern crate lalrpop_util;
20
21mod ast;
22pub mod validate;
23
24lalrpop_mod!(pub valve_grammar);
25
26use crate::validate::{
27 validate_rows_constraints, validate_rows_intra, validate_rows_trees,
28 validate_tree_foreign_keys, validate_under, ResultRow,
29};
30use crate::{ast::Expression, valve_grammar::StartParser};
31use chrono::Utc;
32use crossbeam;
33use futures::executor::block_on;
34use indoc::indoc;
35use itertools::{IntoChunks, Itertools};
36use lazy_static::lazy_static;
37use petgraph::{
38 algo::{all_simple_paths, toposort},
39 graphmap::DiGraphMap,
40 Direction,
41};
42use regex::Regex;
43use serde_json::{json, Value as SerdeValue};
44use sqlx::{
45 any::{AnyConnectOptions, AnyKind, AnyPool, AnyPoolOptions, AnyRow},
46 query as sqlx_query, Column, Row, ValueRef,
47};
48use std::{
49 collections::{BTreeMap, HashMap},
50 fs::File,
51 process,
52 str::FromStr,
53 sync::Arc,
54};
55
56static CHUNK_SIZE: usize = 500;
58
59static MULTI_THREADED: bool = true;
61
62static SQL_PARAM: &str = "VALVEPARAM";
68
69lazy_static! {
70 static ref PG_SQL_TYPES: Vec<&'static str> = vec!["text", "varchar", "integer"];
71 static ref SL_SQL_TYPES: Vec<&'static str> = vec!["text", "integer"];
72}
73
74pub type ConfigMap = serde_json::Map<String, SerdeValue>;
78
79#[derive(Clone)]
83pub struct ParsedStructure {
84 original: String,
85 parsed: Expression,
86}
87
88impl std::fmt::Debug for ParsedStructure {
91 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
92 write!(
93 f,
94 "{{\"parsed_structure\": {{\"original\": \"{}\", \"parsed\": {:?}}}}}",
95 &self.original, &self.parsed
96 )
97 }
98}
99
100#[derive(Clone)]
103pub struct CompiledCondition {
104 original: String,
105 parsed: Expression,
106 compiled: Arc<dyn Fn(&str) -> bool + Sync + Send>,
107}
108
109impl std::fmt::Debug for CompiledCondition {
112 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
113 write!(
114 f,
115 "{{\"compiled_condition\": {{\"original\": \"{}\", \"parsed\": {:?}}}}}",
116 &self.original, &self.parsed
117 )
118 }
119}
120
121pub struct ColumnRule {
125 when: CompiledCondition,
126 then: CompiledCondition,
127}
128
129impl std::fmt::Debug for ColumnRule {
132 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
133 write!(f, "{{\"column_rule\": {{\"when\": {:?}, \"then\": {:?}}}}}", &self.when, &self.then)
134 }
135}
136
137pub fn read_config_files(
141 path: &str,
142 config_table: &str,
143) -> (ConfigMap, ConfigMap, ConfigMap, ConfigMap) {
144 let special_table_types = json!({
145 "table": {"required": true},
146 "column": {"required": true},
147 "datatype": {"required": true},
148 "rule": {"required": false},
149 });
150 let special_table_types = special_table_types.as_object().unwrap();
151
152 let mut specials_config = ConfigMap::new();
154 for t in special_table_types.keys() {
155 specials_config.insert(t.to_string(), SerdeValue::Null);
156 }
157
158 let mut tables_config = ConfigMap::new();
160 let rows = {
161 if path.to_lowercase().ends_with(".tsv") {
164 read_tsv_into_vector(path)
165 } else {
166 read_db_table_into_vector(path, config_table)
167 }
168 };
169
170 for mut row in rows {
171 for column in vec!["table", "path", "type"] {
172 if !row.contains_key(column) || row.get(column) == None {
173 panic!("Missing required column '{}' reading '{}'", column, path);
174 }
175 }
176
177 for column in vec!["table", "path"] {
178 if row.get(column).and_then(|c| c.as_str()).unwrap() == "" {
179 panic!("Missing required value for '{}' reading '{}'", column, path);
180 }
181 }
182
183 for column in vec!["type"] {
184 if row.get(column).and_then(|c| c.as_str()).unwrap() == "" {
185 row.remove(&column.to_string());
186 }
187 }
188
189 if let Some(SerdeValue::String(row_type)) = row.get("type") {
190 if row_type == "table" {
191 let row_path = row.get("path").unwrap();
192 if path.to_lowercase().ends_with(".tsv") && row_path != path {
193 panic!(
194 "Special 'table' path '{}' does not match this path '{}'",
195 row_path, path
196 );
197 }
198 }
199
200 if special_table_types.contains_key(row_type) {
201 match specials_config.get(row_type) {
202 Some(SerdeValue::Null) => (),
203 _ => panic!("Multiple tables with type '{}' declared in '{}'", row_type, path),
204 }
205 let row_table = row.get("table").and_then(|t| t.as_str()).unwrap();
206 specials_config
207 .insert(row_type.to_string(), SerdeValue::String(row_table.to_string()));
208 } else {
209 panic!("Unrecognized table type '{}' in '{}'", row_type, path);
210 }
211 }
212
213 row.insert(String::from("column"), SerdeValue::Object(ConfigMap::new()));
214 let row_table = row.get("table").and_then(|t| t.as_str()).unwrap();
215 tables_config.insert(row_table.to_string(), SerdeValue::Object(row));
216 }
217
218 for (table_type, table_spec) in special_table_types.iter() {
220 if let Some(SerdeValue::Bool(true)) = table_spec.get("required") {
221 if let Some(SerdeValue::Null) = specials_config.get(table_type) {
222 panic!("Missing required '{}' table in '{}'", table_type, path);
223 }
224 }
225 }
226
227 fn get_special_config(
234 table_type: &str,
235 specials_config: &ConfigMap,
236 tables_config: &ConfigMap,
237 path: &str,
238 ) -> Vec<ConfigMap> {
239 if path.to_lowercase().ends_with(".tsv") {
240 let table_name = specials_config.get(table_type).and_then(|d| d.as_str()).unwrap();
241 let path = String::from(
242 tables_config
243 .get(table_name)
244 .and_then(|t| t.get("path"))
245 .and_then(|p| p.as_str())
246 .unwrap(),
247 );
248 return read_tsv_into_vector(&path.to_string());
249 } else {
250 let mut db_table = None;
251 for (table_name, table_config) in tables_config {
252 let this_type = table_config.get("type");
253 if let Some(this_type) = this_type {
254 let this_type = this_type.as_str().unwrap();
255 if this_type == table_type {
256 db_table = Some(table_name);
257 break;
258 }
259 }
260 }
261 if db_table == None {
262 panic!("Could not determine special table name for type '{}'.", table_type);
263 }
264 let db_table = db_table.unwrap();
265 read_db_table_into_vector(path, db_table)
266 }
267 }
268
269 let mut datatypes_config = ConfigMap::new();
271 let rows = get_special_config("datatype", &specials_config, &tables_config, path);
272 for mut row in rows {
273 for column in vec!["datatype", "parent", "condition", "SQLite type", "PostgreSQL type"] {
274 if !row.contains_key(column) || row.get(column) == None {
275 panic!("Missing required column '{}' reading '{}'", column, path);
276 }
277 }
278
279 for column in vec!["datatype"] {
280 if row.get(column).and_then(|c| c.as_str()).unwrap() == "" {
281 panic!("Missing required value for '{}' reading '{}'", column, path);
282 }
283 }
284
285 for column in vec!["parent", "condition", "SQLite type", "PostgreSQL type"] {
286 if row.get(column).and_then(|c| c.as_str()).unwrap() == "" {
287 row.remove(&column.to_string());
288 }
289 }
290
291 let dt_name = row.get("datatype").and_then(|d| d.as_str()).unwrap();
292 datatypes_config.insert(dt_name.to_string(), SerdeValue::Object(row));
293 }
294
295 for dt in vec!["text", "empty", "line", "word"] {
296 if !datatypes_config.contains_key(dt) {
297 panic!("Missing required datatype: '{}'", dt);
298 }
299 }
300
301 let rows = get_special_config("column", &specials_config, &tables_config, path);
303 for mut row in rows {
304 for column in vec!["table", "column", "nulltype", "datatype"] {
305 if !row.contains_key(column) || row.get(column) == None {
306 panic!("Missing required column '{}' reading '{}'", column, path);
307 }
308 }
309
310 for column in vec!["table", "column", "datatype"] {
311 if row.get(column).and_then(|c| c.as_str()).unwrap() == "" {
312 panic!("Missing required value for '{}' reading '{}'", column, path);
313 }
314 }
315
316 for column in vec!["nulltype"] {
317 if row.get(column).and_then(|c| c.as_str()).unwrap() == "" {
318 row.remove(&column.to_string());
319 }
320 }
321
322 let row_table = row.get("table").and_then(|t| t.as_str()).unwrap();
323 if !tables_config.contains_key(row_table) {
324 panic!("Undefined table '{}' reading '{}'", row_table, path);
325 }
326
327 if let Some(SerdeValue::String(nulltype)) = row.get("nulltype") {
328 if !datatypes_config.contains_key(nulltype) {
329 panic!("Undefined nulltype '{}' reading '{}'", nulltype, path);
330 }
331 }
332
333 let datatype = row.get("datatype").and_then(|d| d.as_str()).unwrap();
334 if !datatypes_config.contains_key(datatype) {
335 panic!("Undefined datatype '{}' reading '{}'", datatype, path);
336 }
337
338 let row_table = row.get("table").and_then(|t| t.as_str()).unwrap();
339 let column_name = row.get("column").and_then(|c| c.as_str()).unwrap();
340
341 let columns_config = tables_config
342 .get_mut(row_table)
343 .and_then(|t| t.get_mut("column"))
344 .and_then(|c| c.as_object_mut())
345 .unwrap();
346 columns_config.insert(column_name.to_string(), SerdeValue::Object(row));
347 }
348
349 let mut rules_config = ConfigMap::new();
351 if let Some(SerdeValue::String(table_name)) = specials_config.get("rule") {
352 let rows = get_special_config(table_name, &specials_config, &tables_config, path);
353 for row in rows {
354 for column in vec![
355 "table",
356 "when column",
357 "when condition",
358 "then column",
359 "then condition",
360 "level",
361 "description",
362 ] {
363 if !row.contains_key(column) || row.get(column) == None {
364 panic!("Missing required column '{}' reading '{}'", column, path);
365 }
366 if row.get(column).and_then(|c| c.as_str()).unwrap() == "" {
367 panic!("Missing required value for '{}' reading '{}'", column, path);
368 }
369 }
370
371 let row_table = row.get("table").and_then(|t| t.as_str()).unwrap();
372 if !tables_config.contains_key(row_table) {
373 panic!("Undefined table '{}' reading '{}'", row_table, path);
374 }
375
376 let row_when_column = row.get("when column").and_then(|c| c.as_str()).unwrap();
379 if !rules_config.contains_key(row_table) {
380 rules_config.insert(String::from(row_table), SerdeValue::Object(ConfigMap::new()));
381 }
382
383 let table_rule_config =
384 rules_config.get_mut(row_table).and_then(|t| t.as_object_mut()).unwrap();
385 if !table_rule_config.contains_key(row_when_column) {
386 table_rule_config.insert(String::from(row_when_column), SerdeValue::Array(vec![]));
387 }
388 let column_rule_config = table_rule_config
389 .get_mut(&row_when_column.to_string())
390 .and_then(|w| w.as_array_mut())
391 .unwrap();
392 column_rule_config.push(SerdeValue::Object(row));
393 }
394 }
395
396 (specials_config, tables_config, datatypes_config, rules_config)
398}
399
400pub fn get_compiled_datatype_conditions(
404 config: &ConfigMap,
405 parser: &StartParser,
406) -> HashMap<String, CompiledCondition> {
407 let mut compiled_datatype_conditions: HashMap<String, CompiledCondition> = HashMap::new();
408 let datatypes_config = config.get("datatype").and_then(|t| t.as_object()).unwrap();
409 for (_, row) in datatypes_config.iter() {
410 let row = row.as_object().unwrap();
411 let dt_name = row.get("datatype").and_then(|d| d.as_str()).unwrap();
412 let condition = row.get("condition").and_then(|c| c.as_str());
413 let compiled_condition =
414 compile_condition(condition, parser, &compiled_datatype_conditions);
415 if let Some(_) = condition {
416 compiled_datatype_conditions.insert(dt_name.to_string(), compiled_condition);
417 }
418 }
419
420 compiled_datatype_conditions
421}
422
423pub fn get_compiled_rule_conditions(
436 config: &ConfigMap,
437 compiled_datatype_conditions: HashMap<String, CompiledCondition>,
438 parser: &StartParser,
439) -> HashMap<String, HashMap<String, Vec<ColumnRule>>> {
440 let mut compiled_rule_conditions = HashMap::new();
441 let tables_config = config.get("table").and_then(|t| t.as_object()).unwrap();
442 let rules_config = config.get("rule").and_then(|t| t.as_object()).unwrap();
443 for (rules_table, table_rules) in rules_config.iter() {
444 let table_rules = table_rules.as_object().unwrap();
445 for (_, column_rules) in table_rules.iter() {
446 let column_rules = column_rules.as_array().unwrap();
447 for row in column_rules {
448 let mut column_rule_key = None;
450 for column in vec!["when column", "then column"] {
451 let row_column = row.get(column).and_then(|c| c.as_str()).unwrap();
452 if column == "when column" {
453 column_rule_key = Some(row_column.to_string());
454 }
455 if !tables_config
456 .get(rules_table)
457 .and_then(|t| t.get("column"))
458 .and_then(|c| c.as_object())
459 .and_then(|c| Some(c.contains_key(row_column)))
460 .unwrap()
461 {
462 panic!("Undefined column '{}.{}' in rules table", rules_table, row_column);
463 }
464 }
465 let column_rule_key = column_rule_key.unwrap();
466
467 let mut when_compiled = None;
468 let mut then_compiled = None;
469 for column in vec!["when condition", "then condition"] {
470 let condition_option = row.get(column).and_then(|c| c.as_str());
471 if let Some(_) = condition_option {
472 let compiled_condition = compile_condition(
473 condition_option,
474 parser,
475 &compiled_datatype_conditions,
476 );
477 if column == "when condition" {
478 when_compiled = Some(compiled_condition);
479 } else if column == "then condition" {
480 then_compiled = Some(compiled_condition);
481 }
482 }
483 }
484
485 if let (Some(when_compiled), Some(then_compiled)) = (when_compiled, then_compiled) {
486 if !compiled_rule_conditions.contains_key(rules_table) {
487 let table_rules = HashMap::new();
488 compiled_rule_conditions.insert(rules_table.to_string(), table_rules);
489 }
490 let table_rules = compiled_rule_conditions.get_mut(rules_table).unwrap();
491 if !table_rules.contains_key(&column_rule_key) {
492 table_rules.insert(column_rule_key.to_string(), vec![]);
493 }
494 let column_rules = table_rules.get_mut(&column_rule_key).unwrap();
495 column_rules.push(ColumnRule { when: when_compiled, then: then_compiled });
496 }
497 }
498 }
499 }
500
501 compiled_rule_conditions
502}
503
504pub fn get_parsed_structure_conditions(
508 config: &ConfigMap,
509 parser: &StartParser,
510) -> HashMap<String, ParsedStructure> {
511 let mut parsed_structure_conditions = HashMap::new();
512 let tables_config = config.get("table").and_then(|t| t.as_object()).unwrap();
513 for (table, table_config) in tables_config.iter() {
514 let columns_config = table_config.get("column").and_then(|c| c.as_object()).unwrap();
515 for (_, row) in columns_config.iter() {
516 let row_table = table;
517 let structure = row.get("structure").and_then(|s| s.as_str());
518 match structure {
519 Some(structure) if structure != "" => {
520 let parsed_structure = parser.parse(structure);
521 if let Err(e) = parsed_structure {
522 panic!(
523 "While parsing structure: '{}' for column: '{}.{}' got error:\n{}",
524 structure,
525 row_table,
526 row.get("table").and_then(|t| t.as_str()).unwrap(),
527 e
528 );
529 }
530 let parsed_structure = parsed_structure.unwrap();
531 let parsed_structure = &parsed_structure[0];
532 let parsed_structure = ParsedStructure {
533 original: structure.to_string(),
534 parsed: *parsed_structure.clone(),
535 };
536 parsed_structure_conditions.insert(structure.to_string(), parsed_structure);
537 }
538 _ => (),
539 };
540 }
541 }
542
543 parsed_structure_conditions
544}
545
546pub async fn configure_db(
555 tables_config: &mut ConfigMap,
556 datatypes_config: &mut ConfigMap,
557 pool: &AnyPool,
558 parser: &StartParser,
559 verbose: bool,
560 command: &ValveCommand,
561) -> Result<(Vec<String>, ConfigMap), sqlx::Error> {
562 let mut constraints_config = ConfigMap::new();
564 constraints_config.insert(String::from("foreign"), SerdeValue::Object(ConfigMap::new()));
565 constraints_config.insert(String::from("unique"), SerdeValue::Object(ConfigMap::new()));
566 constraints_config.insert(String::from("primary"), SerdeValue::Object(ConfigMap::new()));
567 constraints_config.insert(String::from("tree"), SerdeValue::Object(ConfigMap::new()));
568 constraints_config.insert(String::from("under"), SerdeValue::Object(ConfigMap::new()));
569
570 let mut setup_statements = HashMap::new();
574 let table_names: Vec<String> = tables_config.keys().cloned().collect();
575 for table_name in table_names {
576 let path = tables_config
577 .get(&table_name)
578 .and_then(|r| r.get("path"))
579 .and_then(|p| p.as_str())
580 .unwrap();
581
582 let defined_columns: Vec<String> = tables_config
584 .get(&table_name)
585 .and_then(|r| r.get("column"))
586 .and_then(|v| v.as_object())
587 .and_then(|o| Some(o.keys()))
588 .and_then(|k| Some(k.cloned()))
589 .and_then(|k| Some(k.collect()))
590 .unwrap();
591
592 let mut rdr = csv::ReaderBuilder::new().has_headers(false).delimiter(b'\t').from_reader(
595 File::open(path.clone()).unwrap_or_else(|err| {
596 panic!("Unable to open '{}': {}", path.clone(), err);
597 }),
598 );
599 let mut iter = rdr.records();
600 let actual_columns;
601 if let Some(result) = iter.next() {
602 actual_columns = result.unwrap();
603 } else {
604 panic!("'{}' is empty", path);
605 }
606
607 let mut column_order = vec![];
610 let mut all_columns: ConfigMap = ConfigMap::new();
611 for column_name in &actual_columns {
612 let column;
613 if !defined_columns.contains(&column_name.to_string()) {
614 let mut cmap = ConfigMap::new();
615 cmap.insert(String::from("table"), SerdeValue::String(table_name.to_string()));
616 cmap.insert(String::from("column"), SerdeValue::String(column_name.to_string()));
617 cmap.insert(String::from("nulltype"), SerdeValue::String(String::from("empty")));
618 cmap.insert(String::from("datatype"), SerdeValue::String(String::from("text")));
619 column = SerdeValue::Object(cmap);
620 } else {
621 column = tables_config
622 .get(&table_name)
623 .and_then(|r| r.get("column"))
624 .and_then(|v| v.as_object())
625 .and_then(|o| o.get(column_name))
626 .unwrap()
627 .clone();
628 }
629 column_order.push(SerdeValue::String(column_name.to_string()));
630 all_columns.insert(column_name.to_string(), column);
631 }
632
633 tables_config.get_mut(&table_name).and_then(|t| t.as_object_mut()).and_then(|o| {
634 o.insert(String::from("column"), SerdeValue::Object(all_columns));
635 o.insert(String::from("column_order"), SerdeValue::Array(column_order))
636 });
637
638 let mut table_statements = vec![];
640 for table in vec![table_name.to_string(), format!("{}_conflict", table_name)] {
641 let (mut statements, table_constraints) =
642 create_table_statement(tables_config, datatypes_config, parser, &table, &pool);
643 table_statements.append(&mut statements);
644 if !table.ends_with("_conflict") {
645 for constraint_type in vec!["foreign", "unique", "primary", "tree", "under"] {
646 let table_constraints = table_constraints.get(constraint_type).unwrap().clone();
647 constraints_config
648 .get_mut(constraint_type)
649 .and_then(|o| o.as_object_mut())
650 .and_then(|o| o.insert(table_name.to_string(), table_constraints));
651 }
652 }
653 }
654
655 let mut drop_view_sql = format!(r#"DROP VIEW IF EXISTS "{}_view""#, table_name);
657 let inner_t;
658 if pool.any_kind() == AnyKind::Postgres {
659 drop_view_sql.push_str(" CASCADE");
660 inner_t = format!(
661 indoc! {r#"
662 (
663 SELECT JSON_AGG(t)::TEXT FROM (
664 SELECT "column", "value", "level", "rule", "message"
665 FROM "message"
666 WHERE "table" = '{t}'
667 AND "row" = union_t."row_number"
668 ORDER BY "column", "message_id"
669 ) t
670 )
671 "#},
672 t = table_name,
673 );
674 } else {
675 inner_t = format!(
676 indoc! {r#"
677 (
678 SELECT NULLIF(
679 JSON_GROUP_ARRAY(
680 JSON_OBJECT(
681 'column', "column",
682 'value', "value",
683 'level', "level",
684 'rule', "rule",
685 'message', "message"
686 )
687 ),
688 '[]'
689 )
690 FROM "message"
691 WHERE "table" = '{t}'
692 AND "row" = union_t."row_number"
693 ORDER BY "column", "message_id"
694 )
695 "#},
696 t = table_name,
697 );
698 }
699 drop_view_sql.push_str(";");
700
701 let create_view_sql = format!(
702 indoc! {r#"
703 CREATE VIEW "{t}_view" AS
704 SELECT
705 union_t.*,
706 {inner_t} AS "message"
707 FROM (
708 SELECT * FROM "{t}"
709 UNION ALL
710 SELECT * FROM "{t}_conflict"
711 ) as union_t;
712 "#},
713 t = table_name,
714 inner_t = inner_t,
715 );
716 table_statements.push(drop_view_sql);
717 table_statements.push(create_view_sql);
718
719 setup_statements.insert(table_name.to_string(), table_statements);
720 }
721
722 let unsorted_tables: Vec<String> = setup_statements.keys().cloned().collect();
725 let sorted_tables = verify_table_deps_and_sort(&unsorted_tables, &constraints_config);
726
727 if *command != ValveCommand::Config || verbose {
728 let mut message_statements = vec![];
730 let drop_sql = {
731 let mut sql = r#"DROP TABLE IF EXISTS "message""#.to_string();
732 if pool.any_kind() == AnyKind::Postgres {
733 sql.push_str(" CASCADE");
734 }
735 sql.push_str(";");
736 sql
737 };
738 message_statements.push(drop_sql);
739 message_statements.push(format!(
740 indoc! {r#"
741 CREATE TABLE "message" (
742 {}
743 "table" TEXT,
744 "row" BIGINT,
745 "column" TEXT,
746 "value" TEXT,
747 "level" TEXT,
748 "rule" TEXT,
749 "message" TEXT,
750 UNIQUE ("table", "row", "column", "rule")
751 );
752 "#},
753 {
754 if pool.any_kind() == AnyKind::Sqlite {
755 "\"message_id\" INTEGER PRIMARY KEY,"
756 } else {
757 "\"message_id\" SERIAL PRIMARY KEY,"
758 }
759 },
760 ));
761 message_statements.push(
762 r#"CREATE INDEX "message_trc_idx" ON "message"("table", "row", "column");"#.to_string(),
763 );
764 setup_statements.insert("message".to_string(), message_statements);
765
766 let mut tables_to_create = vec!["message".to_string()];
769 tables_to_create.append(&mut sorted_tables.clone());
770 for table in &tables_to_create {
771 let table_statements = setup_statements.get(table).unwrap();
772 if *command != ValveCommand::Config {
773 for stmt in table_statements {
774 sqlx_query(stmt)
775 .execute(pool)
776 .await
777 .expect(format!("The SQL statement: {} returned an error", stmt).as_str());
778 }
779 }
780 if verbose {
781 let output = String::from(table_statements.join("\n"));
782 println!("{}\n", output);
783 }
784 }
785 }
786
787 return Ok((sorted_tables, constraints_config));
788}
789
790#[derive(Debug, PartialEq, Eq)]
792pub enum ValveCommand {
793 Config,
795 Create,
797 Load,
799}
800
801pub async fn valve(
809 table_table: &str,
810 database: &str,
811 command: &ValveCommand,
812 verbose: bool,
813 config_table: &str,
814) -> Result<String, sqlx::Error> {
815 let parser = StartParser::new();
816
817 let (specials_config, mut tables_config, mut datatypes_config, rules_config) =
818 read_config_files(&table_table.to_string(), config_table);
819
820 let connection_options;
830 if database.starts_with("postgresql://") {
831 connection_options = AnyConnectOptions::from_str(database)?;
832 } else {
833 let connection_string;
834 if !database.starts_with("sqlite://") {
835 connection_string = format!("sqlite://{}?mode=rwc", database);
836 } else {
837 connection_string = database.to_string();
838 }
839 connection_options = AnyConnectOptions::from_str(connection_string.as_str()).unwrap();
840 }
841
842 let pool = AnyPoolOptions::new().max_connections(5).connect_with(connection_options).await?;
843 if *command == ValveCommand::Load && pool.any_kind() == AnyKind::Sqlite {
844 sqlx_query("PRAGMA foreign_keys = ON").execute(&pool).await?;
845 }
846
847 let (sorted_table_list, constraints_config) =
848 configure_db(&mut tables_config, &mut datatypes_config, &pool, &parser, verbose, command)
849 .await?;
850
851 let mut config = ConfigMap::new();
852 config.insert(String::from("special"), SerdeValue::Object(specials_config.clone()));
853 config.insert(String::from("table"), SerdeValue::Object(tables_config.clone()));
854 config.insert(String::from("datatype"), SerdeValue::Object(datatypes_config.clone()));
855 config.insert(String::from("rule"), SerdeValue::Object(rules_config.clone()));
856 config.insert(String::from("constraints"), SerdeValue::Object(constraints_config.clone()));
857 let mut sorted_table_serdevalue_list: Vec<SerdeValue> = vec![];
858 for table in &sorted_table_list {
859 sorted_table_serdevalue_list.push(SerdeValue::String(table.to_string()));
860 }
861 config
862 .insert(String::from("sorted_table_list"), SerdeValue::Array(sorted_table_serdevalue_list));
863
864 let compiled_datatype_conditions = get_compiled_datatype_conditions(&config, &parser);
865 let compiled_rule_conditions =
866 get_compiled_rule_conditions(&config, compiled_datatype_conditions.clone(), &parser);
867
868 if *command == ValveCommand::Load {
869 if verbose {
870 eprintln!("{} - Processing {} tables.", Utc::now(), sorted_table_list.len());
871 }
872 load_db(&config, &pool, &compiled_datatype_conditions, &compiled_rule_conditions, verbose)
873 .await?;
874 }
875
876 let config = SerdeValue::Object(config);
877 Ok(config.to_string())
878}
879
880pub async fn insert_new_row(
883 global_config: &ConfigMap,
884 pool: &AnyPool,
885 table_name: &str,
886 row: &ConfigMap,
887) -> Result<u32, sqlx::Error> {
888 let sql = format!(r#"SELECT MAX("row_number") AS "row_number" FROM "{}_view""#, table_name);
890 let query = sqlx_query(&sql);
891 let result_row = query.fetch_one(pool).await?;
892 let result = result_row.try_get_raw("row_number").unwrap();
893 let new_row_number: i64;
894 if result.is_null() {
895 new_row_number = 1;
896 } else {
897 new_row_number = result_row.get("row_number");
898 }
899 let new_row_number = new_row_number as u32 + 1;
900
901 let mut insert_columns = vec![];
902 let mut insert_values = vec![];
903 let mut insert_params = vec![];
904 let mut messages = vec![];
905 let sorted_datatypes = get_sorted_datatypes(global_config);
906 for (column, cell) in row.iter() {
907 insert_columns.append(&mut vec![format!(r#""{}""#, column)]);
908 let cell = cell.as_object().unwrap();
909 let cell_valid = cell.get("valid").and_then(|v| v.as_bool()).unwrap();
910 let cell_value = cell.get("value").and_then(|v| v.as_str()).unwrap();
911 let mut cell_for_insert = cell.clone();
912 if cell_valid {
913 cell_for_insert.remove("value");
914 let sql_type = get_sql_type_from_global_config(
915 &global_config,
916 &table_name.to_string(),
917 &column,
918 pool,
919 )
920 .unwrap();
921 insert_values.push(cast_sql_param_from_text(&sql_type));
922 insert_params.push(String::from(cell_value));
923 } else {
924 insert_values.push(String::from("NULL"));
925 let cell_messages = sort_messages(
926 &sorted_datatypes,
927 cell.get("messages").and_then(|m| m.as_array()).unwrap(),
928 );
929 for cell_message in cell_messages {
930 messages.push(json!({
931 "column": column,
932 "value": cell_value,
933 "level": cell_message.get("level").and_then(|s| s.as_str()).unwrap(),
934 "rule": cell_message.get("rule").and_then(|s| s.as_str()).unwrap(),
935 "message": cell_message.get("message").and_then(|s| s.as_str()).unwrap(),
936 }));
937 }
938 }
939 }
940
941 let insert_stmt = local_sql_syntax(
943 &pool,
944 &format!(
945 r#"INSERT INTO "{}" ("row_number", {}) VALUES ({}, {})"#,
946 table_name,
947 insert_columns.join(", "),
948 new_row_number,
949 insert_values.join(", "),
950 ),
951 );
952
953 let mut query = sqlx_query(&insert_stmt);
954 for param in &insert_params {
955 query = query.bind(param);
956 }
957 query.execute(pool).await?;
958
959 for m in messages {
961 let column = m.get("column").and_then(|c| c.as_str()).unwrap();
962 let value = m.get("value").and_then(|c| c.as_str()).unwrap();
963 let level = m.get("level").and_then(|c| c.as_str()).unwrap();
964 let rule = m.get("rule").and_then(|c| c.as_str()).unwrap();
965 let message = m.get("message").and_then(|c| c.as_str()).unwrap();
966 let message_sql = format!(
967 r#"INSERT INTO "message"
968 ("table", "row", "column", "value", "level", "rule", "message")
969 VALUES ('{}', {}, '{}', '{}', '{}', '{}', '{}')"#,
970 table_name, new_row_number, column, value, level, rule, message
971 );
972 let query = sqlx_query(&message_sql);
973 query.execute(pool).await?;
974 }
975
976 Ok(new_row_number)
977}
978
979pub async fn update_row(
982 global_config: &ConfigMap,
983 pool: &AnyPool,
984 table_name: &str,
985 row: &ConfigMap,
986 row_number: u32,
987) -> Result<(), sqlx::Error> {
988 let mut assignments = vec![];
989 let mut params = vec![];
990 let mut messages = vec![];
991 let sorted_datatypes = get_sorted_datatypes(global_config);
992 for (column, cell) in row.iter() {
993 let cell = cell.as_object().unwrap();
994 let cell_valid = cell.get("valid").and_then(|v| v.as_bool()).unwrap();
995 let cell_value = cell.get("value").and_then(|v| v.as_str()).unwrap();
996 let mut cell_for_insert = cell.clone();
997 if cell_valid {
998 cell_for_insert.remove("value");
999 let sql_type = get_sql_type_from_global_config(
1000 &global_config,
1001 &table_name.to_string(),
1002 &column,
1003 pool,
1004 )
1005 .unwrap();
1006 assignments.push(format!(r#""{}" = {}"#, column, cast_sql_param_from_text(&sql_type)));
1007 params.push(String::from(cell_value));
1008 } else {
1009 assignments.push(format!(r#""{}" = NULL"#, column));
1010 let cell_messages = sort_messages(
1011 &sorted_datatypes,
1012 cell.get("messages").and_then(|m| m.as_array()).unwrap(),
1013 );
1014 for cell_message in cell_messages {
1015 messages.push(json!({
1016 "column": String::from(column),
1017 "value": String::from(cell_value),
1018 "level": cell_message.get("level").and_then(|s| s.as_str()).unwrap(),
1019 "rule": cell_message.get("rule").and_then(|s| s.as_str()).unwrap(),
1020 "message": cell_message.get("message").and_then(|s| s.as_str()).unwrap(),
1021 }));
1022 }
1023 }
1024 }
1025
1026 let mut update_stmt = format!(r#"UPDATE "{}" SET "#, table_name);
1028 update_stmt.push_str(&assignments.join(", "));
1029 update_stmt.push_str(&format!(r#" WHERE "row_number" = {}"#, row_number));
1030 let update_stmt = local_sql_syntax(&pool, &update_stmt);
1031
1032 let mut query = sqlx_query(&update_stmt);
1033 for param in ¶ms {
1034 query = query.bind(param);
1035 }
1036 query.execute(pool).await?;
1037
1038 let delete_sql = format!(
1041 r#"DELETE FROM "message" WHERE "table" = '{}' AND "row" = {}"#,
1042 table_name, row_number
1043 );
1044 let query = sqlx_query(&delete_sql);
1045 query.execute(pool).await?;
1046
1047 for m in messages {
1049 let column = m.get("column").and_then(|c| c.as_str()).unwrap();
1050 let value = m.get("value").and_then(|c| c.as_str()).unwrap();
1051 let level = m.get("level").and_then(|c| c.as_str()).unwrap();
1052 let rule = m.get("rule").and_then(|c| c.as_str()).unwrap();
1053 let message = m.get("message").and_then(|c| c.as_str()).unwrap();
1054 let insert_sql = format!(
1055 r#"INSERT INTO "message"
1056 ("table", "row", "column", "value", "level", "rule", "message")
1057 VALUES ('{}', {}, '{}', '{}', '{}', '{}', '{}')"#,
1058 table_name, row_number, column, value, level, rule, message
1059 );
1060 let query = sqlx_query(&insert_sql);
1061 query.execute(pool).await?;
1062 }
1063
1064 Ok(())
1065}
1066
1067fn read_tsv_into_vector(path: &str) -> Vec<ConfigMap> {
1071 let mut rdr = csv::ReaderBuilder::new().delimiter(b'\t').from_reader(
1072 File::open(path).unwrap_or_else(|err| {
1073 panic!("Unable to open '{}': {}", path, err);
1074 }),
1075 );
1076
1077 let rows: Vec<_> = rdr
1078 .deserialize()
1079 .map(|result| {
1080 let row: ConfigMap = result.expect(format!("Error reading: {}", path).as_str());
1081 row
1082 })
1083 .collect();
1084
1085 if rows.len() < 1 {
1086 panic!("No rows in {}", path);
1087 }
1088
1089 for (i, row) in rows.iter().enumerate() {
1090 let i = i + 1;
1092 for (col, val) in row {
1093 let val = val.as_str().unwrap();
1094 let trimmed_val = val.trim();
1095 if trimmed_val != val {
1096 eprintln!(
1097 "Error: Value '{}' of column '{}' in row {} of table '{}' {}",
1098 val, col, i, path, "has leading and/or trailing whitespace."
1099 );
1100 process::exit(1);
1101 }
1102 }
1103 }
1104
1105 rows
1106}
1107
1108fn read_db_table_into_vector(database: &str, config_table: &str) -> Vec<ConfigMap> {
1111 let connection_options;
1112 if database.starts_with("postgresql://") {
1113 connection_options = AnyConnectOptions::from_str(database).unwrap();
1114 } else {
1115 let connection_string;
1116 if !database.starts_with("sqlite://") {
1117 connection_string = format!("sqlite://{}?mode=ro", database);
1118 } else {
1119 connection_string = database.to_string();
1120 }
1121 connection_options = AnyConnectOptions::from_str(connection_string.as_str()).unwrap();
1122 }
1123
1124 let pool = block_on(AnyPoolOptions::new().max_connections(5).connect_with(connection_options))
1125 .unwrap();
1126
1127 let sql = format!("SELECT * FROM \"{}\"", config_table);
1128 let rows = block_on(sqlx_query(&sql).fetch_all(&pool)).unwrap();
1129 let mut table_rows = vec![];
1130 for row in rows {
1131 let mut table_row = ConfigMap::new();
1132 for column in row.columns() {
1133 let cname = column.name();
1134 if cname != "row_number" {
1135 let raw_value = row.try_get_raw(format!(r#"{}"#, cname).as_str()).unwrap();
1136 if !raw_value.is_null() {
1137 let value = get_column_value(&row, &cname, "text");
1138 table_row.insert(cname.to_string(), json!(value));
1139 } else {
1140 table_row.insert(cname.to_string(), json!(""));
1141 }
1142 }
1143 }
1144 table_rows.push(table_row);
1145 }
1146 table_rows
1147}
1148
1149fn compile_condition(
1154 condition_option: Option<&str>,
1155 parser: &StartParser,
1156 compiled_datatype_conditions: &HashMap<String, CompiledCondition>,
1157) -> CompiledCondition {
1158 let unquoted_re = Regex::new(r#"^['"](?P<unquoted>.*)['"]$"#).unwrap();
1159 match condition_option {
1160 None => {
1165 return CompiledCondition {
1166 original: String::from(""),
1167 parsed: Expression::None,
1168 compiled: Arc::new(|_| true),
1169 }
1170 }
1171 Some("null") => {
1172 return CompiledCondition {
1173 original: String::from("null"),
1174 parsed: Expression::Null,
1175 compiled: Arc::new(|_| true),
1176 }
1177 }
1178 Some("not null") => {
1179 return CompiledCondition {
1180 original: String::from("not null"),
1181 parsed: Expression::NotNull,
1182 compiled: Arc::new(|_| true),
1183 }
1184 }
1185 Some(condition) => {
1186 let parsed_condition = parser.parse(condition);
1187 if let Err(_) = parsed_condition {
1188 panic!("ERROR: Could not parse condition: {}", condition);
1189 }
1190 let parsed_condition = parsed_condition.unwrap();
1191 if parsed_condition.len() != 1 {
1192 panic!(
1193 "ERROR: Invalid condition: '{}'. Only one condition per column is allowed.",
1194 condition
1195 );
1196 }
1197 let parsed_condition = &parsed_condition[0];
1198 match &**parsed_condition {
1199 Expression::Function(name, args) => {
1200 if name == "equals" {
1201 if let Expression::Label(label) = &*args[0] {
1202 let label = String::from(unquoted_re.replace(label, "$unquoted"));
1203 return CompiledCondition {
1204 original: condition.to_string(),
1205 parsed: *parsed_condition.clone(),
1206 compiled: Arc::new(move |x| x == label),
1207 };
1208 } else {
1209 panic!("ERROR: Invalid condition: {}", condition);
1210 }
1211 } else if vec!["exclude", "match", "search"].contains(&name.as_str()) {
1212 if let Expression::RegexMatch(pattern, flags) = &*args[0] {
1213 let mut pattern =
1214 String::from(unquoted_re.replace(pattern, "$unquoted"));
1215 let mut flags = String::from(flags);
1216 if flags != "" {
1217 flags = format!("(?{})", flags.as_str());
1218 }
1219 match name.as_str() {
1220 "exclude" => {
1221 pattern = format!("{}{}", flags, pattern);
1222 let re = Regex::new(pattern.as_str()).unwrap();
1223 return CompiledCondition {
1224 original: condition.to_string(),
1225 parsed: *parsed_condition.clone(),
1226 compiled: Arc::new(move |x| !re.is_match(x)),
1227 };
1228 }
1229 "match" => {
1230 pattern = format!("^{}{}$", flags, pattern);
1231 let re = Regex::new(pattern.as_str()).unwrap();
1232 return CompiledCondition {
1233 original: condition.to_string(),
1234 parsed: *parsed_condition.clone(),
1235 compiled: Arc::new(move |x| re.is_match(x)),
1236 };
1237 }
1238 "search" => {
1239 pattern = format!("{}{}", flags, pattern);
1240 let re = Regex::new(pattern.as_str()).unwrap();
1241 return CompiledCondition {
1242 original: condition.to_string(),
1243 parsed: *parsed_condition.clone(),
1244 compiled: Arc::new(move |x| re.is_match(x)),
1245 };
1246 }
1247 _ => panic!("Unrecognized function name: {}", name),
1248 };
1249 } else {
1250 panic!(
1251 "Argument to condition: {} is not a regular expression",
1252 condition
1253 );
1254 }
1255 } else if name == "in" {
1256 let mut alternatives: Vec<String> = vec![];
1257 for arg in args {
1258 if let Expression::Label(value) = &**arg {
1259 let value = unquoted_re.replace(value, "$unquoted");
1260 alternatives.push(value.to_string());
1261 } else {
1262 panic!("Argument: {:?} to function 'in' is not a label", arg);
1263 }
1264 }
1265 return CompiledCondition {
1266 original: condition.to_string(),
1267 parsed: *parsed_condition.clone(),
1268 compiled: Arc::new(move |x| alternatives.contains(&x.to_string())),
1269 };
1270 } else {
1271 panic!("Unrecognized function name: {}", name);
1272 }
1273 }
1274 Expression::Label(value)
1275 if compiled_datatype_conditions.contains_key(&value.to_string()) =>
1276 {
1277 let compiled_datatype_condition =
1278 compiled_datatype_conditions.get(&value.to_string()).unwrap();
1279 return CompiledCondition {
1280 original: value.to_string(),
1281 parsed: compiled_datatype_condition.parsed.clone(),
1282 compiled: compiled_datatype_condition.compiled.clone(),
1283 };
1284 }
1285 _ => {
1286 panic!("Unrecognized condition: {}", condition);
1287 }
1288 };
1289 }
1290 };
1291}
1292
1293fn get_sql_type(dt_config: &ConfigMap, datatype: &String, pool: &AnyPool) -> Option<String> {
1296 if !dt_config.contains_key(datatype) {
1297 return None;
1298 }
1299
1300 let sql_type_column = {
1301 if pool.any_kind() == AnyKind::Sqlite {
1302 "SQLite type"
1303 } else {
1304 "PostgreSQL type"
1305 }
1306 };
1307
1308 if let Some(sql_type) = dt_config.get(datatype).and_then(|d| d.get(sql_type_column)) {
1309 return Some(sql_type.as_str().and_then(|s| Some(s.to_string())).unwrap());
1310 }
1311
1312 let parent_datatype =
1313 dt_config.get(datatype).and_then(|d| d.get("parent")).and_then(|p| p.as_str()).unwrap();
1314
1315 return get_sql_type(dt_config, &parent_datatype.to_string(), pool);
1316}
1317
1318fn get_sql_type_from_global_config(
1321 global_config: &ConfigMap,
1322 table: &str,
1323 column: &str,
1324 pool: &AnyPool,
1325) -> Option<String> {
1326 let dt_config = global_config.get("datatype").and_then(|d| d.as_object()).unwrap();
1327 let normal_table_name;
1328 if let Some(s) = table.strip_suffix("_conflict") {
1329 normal_table_name = String::from(s);
1330 } else {
1331 normal_table_name = table.to_string();
1332 }
1333 let dt = global_config
1334 .get("table")
1335 .and_then(|t| t.get(normal_table_name))
1336 .and_then(|t| t.get("column"))
1337 .and_then(|c| c.get(column))
1338 .and_then(|c| c.get("datatype"))
1339 .and_then(|d| d.as_str())
1340 .and_then(|d| Some(d.to_string()))
1341 .unwrap();
1342 get_sql_type(&dt_config, &dt, pool)
1343}
1344
1345fn cast_sql_param_from_text(sql_type: &str) -> String {
1348 if sql_type.to_lowercase() == "integer" {
1349 format!("CAST(NULLIF({}, '') AS INTEGER)", SQL_PARAM)
1350 } else {
1351 String::from(SQL_PARAM)
1352 }
1353}
1354
1355fn cast_column_sql_to_text(column: &str, sql_type: &str) -> String {
1358 if sql_type.to_lowercase() == "integer" {
1359 format!(r#"CAST("{}" AS TEXT)"#, column)
1360 } else {
1361 format!(r#""{}""#, column)
1362 }
1363}
1364
1365fn get_column_value(row: &AnyRow, column: &str, sql_type: &str) -> String {
1368 if sql_type.to_lowercase() == "integer" {
1369 let value: i32 = row.get(format!(r#"{}"#, column).as_str());
1370 value.to_string()
1371 } else {
1372 let value: &str = row.get(format!(r#"{}"#, column).as_str());
1373 value.to_string()
1374 }
1375}
1376
1377fn local_sql_syntax(pool: &AnyPool, sql: &String) -> String {
1382 let rx = Regex::new(&format!(
1384 r#"('[^'\\]*(?:\\.[^'\\]*)*'|"[^"\\]*(?:\\.[^"\\]*)*")|\b{}\b"#,
1385 SQL_PARAM
1386 ))
1387 .unwrap();
1388
1389 let mut final_sql = String::from("");
1390 let mut pg_param_idx = 1;
1391 let mut saved_start = 0;
1392 for m in rx.find_iter(sql) {
1393 let this_match = &sql[m.start()..m.end()];
1394 final_sql.push_str(&sql[saved_start..m.start()]);
1395 if this_match == SQL_PARAM {
1396 if pool.any_kind() == AnyKind::Postgres {
1397 final_sql.push_str(&format!("${}", pg_param_idx));
1398 pg_param_idx += 1;
1399 } else {
1400 final_sql.push_str(&format!("?"));
1401 }
1402 } else {
1403 final_sql.push_str(&format!("{}", this_match));
1404 }
1405 saved_start = m.start() + this_match.len();
1406 }
1407 final_sql.push_str(&sql[saved_start..]);
1408 final_sql
1409}
1410
1411fn verify_table_deps_and_sort(table_list: &Vec<String>, constraints: &ConfigMap) -> Vec<String> {
1417 fn get_cycles(g: &DiGraphMap<&str, ()>) -> Result<Vec<String>, Vec<Vec<String>>> {
1418 let mut cycles = vec![];
1419 match toposort(&g, None) {
1420 Err(cycle) => {
1421 let problem_node = cycle.node_id();
1422 let neighbours = g.neighbors_directed(problem_node, Direction::Outgoing);
1423 for neighbour in neighbours {
1424 let ways_to_problem_node =
1425 all_simple_paths::<Vec<_>, _>(&g, neighbour, problem_node, 0, None);
1426 for mut way in ways_to_problem_node {
1427 let mut cycle = vec![problem_node];
1428 cycle.append(&mut way);
1429 let cycle = cycle.iter().map(|&item| item.to_string()).collect::<Vec<_>>();
1430 cycles.push(cycle);
1431 }
1432 }
1433 Err(cycles)
1434 }
1435 Ok(sorted) => {
1436 let mut sorted = sorted.iter().map(|&item| item.to_string()).collect::<Vec<_>>();
1437 sorted.reverse();
1438 Ok(sorted)
1439 }
1440 }
1441 }
1442
1443 let trees = constraints.get("tree").and_then(|t| t.as_object()).unwrap();
1444 for table_name in table_list {
1445 let mut dependency_graph = DiGraphMap::<&str, ()>::new();
1446 let table_trees = trees.get(table_name).and_then(|t| t.as_array()).unwrap();
1447 for tree in table_trees {
1448 let tree = tree.as_object().unwrap();
1449 let child = tree.get("child").and_then(|c| c.as_str()).unwrap();
1450 let parent = tree.get("parent").and_then(|p| p.as_str()).unwrap();
1451 let c_index = dependency_graph.add_node(child);
1452 let p_index = dependency_graph.add_node(parent);
1453 dependency_graph.add_edge(c_index, p_index, ());
1454 }
1455 match get_cycles(&dependency_graph) {
1456 Ok(_) => (),
1457 Err(cycles) => {
1458 let mut message = String::new();
1459 for cycle in cycles {
1460 message.push_str(
1461 format!("Cyclic dependency in table '{}': ", table_name).as_str(),
1462 );
1463 let end_index = cycle.len() - 1;
1464 for (i, child) in cycle.iter().enumerate() {
1465 if i < end_index {
1466 let dep = table_trees
1467 .iter()
1468 .find(|d| d.get("child").unwrap().as_str() == Some(child))
1469 .and_then(|d| d.as_object())
1470 .unwrap();
1471 let parent = dep.get("parent").unwrap();
1472 message.push_str(
1473 format!("tree({}) references {}", child, parent).as_str(),
1474 );
1475 }
1476 if i < (end_index - 1) {
1477 message.push_str(" and ");
1478 }
1479 }
1480 message.push_str(". ");
1481 }
1482 panic!("{}", message);
1483 }
1484 };
1485 }
1486
1487 let foreign_keys = constraints.get("foreign").and_then(|f| f.as_object()).unwrap();
1488 let under_keys = constraints.get("under").and_then(|u| u.as_object()).unwrap();
1489 let mut dependency_graph = DiGraphMap::<&str, ()>::new();
1490 for table_name in table_list {
1491 let t_index = dependency_graph.add_node(table_name);
1492 let fkeys = foreign_keys.get(table_name).and_then(|f| f.as_array()).unwrap();
1493 for fkey in fkeys {
1494 let ftable = fkey.get("ftable").and_then(|f| f.as_str()).unwrap();
1495 let f_index = dependency_graph.add_node(ftable);
1496 dependency_graph.add_edge(t_index, f_index, ());
1497 }
1498
1499 let ukeys = under_keys.get(table_name).and_then(|u| u.as_array()).unwrap();
1500 for ukey in ukeys {
1501 let ttable = ukey.get("ttable").and_then(|t| t.as_str()).unwrap();
1502 let tcolumn = ukey.get("tcolumn").and_then(|t| t.as_str()).unwrap();
1503 let value = ukey.get("value").and_then(|t| t.as_str()).unwrap();
1504 if ttable != table_name {
1505 let ttable_trees = trees.get(ttable).and_then(|t| t.as_array()).unwrap();
1506 if ttable_trees
1507 .iter()
1508 .filter(|d| d.get("child").unwrap().as_str() == Some(tcolumn))
1509 .collect::<Vec<_>>()
1510 .is_empty()
1511 {
1512 panic!(
1513 "under({}.{}, {}) refers to a non-existent tree",
1514 ttable, tcolumn, value
1515 );
1516 }
1517 let tt_index = dependency_graph.add_node(ttable);
1518 dependency_graph.add_edge(t_index, tt_index, ());
1519 }
1520 }
1521 }
1522
1523 match get_cycles(&dependency_graph) {
1524 Ok(sorted_table_list) => {
1525 return sorted_table_list;
1526 }
1527 Err(cycles) => {
1528 let mut message = String::new();
1529 for cycle in cycles {
1530 message.push_str(
1531 format!("Cyclic dependency between tables {}: ", cycle.join(", ")).as_str(),
1532 );
1533 let end_index = cycle.len() - 1;
1534 for (i, table) in cycle.iter().enumerate() {
1535 if i < end_index {
1536 let dep_name = cycle.get(i + 1).unwrap().as_str();
1537 let fkeys = foreign_keys.get(table).and_then(|f| f.as_array()).unwrap();
1538 let ukeys = under_keys.get(table).and_then(|u| u.as_array()).unwrap();
1539 let column;
1540 let ref_table;
1541 let ref_column;
1542 if let Some(dep) = fkeys
1543 .iter()
1544 .find(|d| d.get("ftable").unwrap().as_str() == Some(dep_name))
1545 .and_then(|d| d.as_object())
1546 {
1547 column = dep.get("column").unwrap();
1548 ref_table = dep.get("ftable").unwrap();
1549 ref_column = dep.get("fcolumn").unwrap();
1550 } else if let Some(dep) = ukeys
1551 .iter()
1552 .find(|d| d.get("ttable").unwrap().as_str() == Some(dep_name))
1553 .and_then(|d| d.as_object())
1554 {
1555 column = dep.get("column").unwrap();
1556 ref_table = dep.get("ttable").unwrap();
1557 ref_column = dep.get("tcolumn").unwrap();
1558 } else {
1559 panic!("{}. Unable to retrieve the details.", message);
1560 }
1561
1562 message.push_str(
1563 format!(
1564 "{}.{} depends on {}.{}",
1565 table,
1566 column,
1567 ref_table,
1568 ref_column,
1569 )
1570 .as_str(),
1571 );
1572 }
1573 if i < (end_index - 1) {
1574 message.push_str(" and ");
1575 }
1576 }
1577 message.push_str(". ");
1578 }
1579 panic!("{}", message);
1580 }
1581 };
1582}
1583
1584fn create_table_statement(
1588 tables_config: &mut ConfigMap,
1589 datatypes_config: &mut ConfigMap,
1590 parser: &StartParser,
1591 table_name: &String,
1592 pool: &AnyPool,
1593) -> (Vec<String>, SerdeValue) {
1594 let mut drop_table_sql = format!(r#"DROP TABLE IF EXISTS "{}""#, table_name);
1595 if pool.any_kind() == AnyKind::Postgres {
1596 drop_table_sql.push_str(" CASCADE");
1597 }
1598 drop_table_sql.push_str(";");
1599 let mut statements = vec![drop_table_sql];
1600 let mut create_lines = vec![
1601 format!(r#"CREATE TABLE "{}" ("#, table_name),
1602 String::from(r#" "row_number" BIGINT,"#),
1603 ];
1604
1605 let normal_table_name;
1606 if let Some(s) = table_name.strip_suffix("_conflict") {
1607 normal_table_name = String::from(s);
1608 } else {
1609 normal_table_name = table_name.to_string();
1610 }
1611
1612 let column_names = tables_config
1613 .get(&normal_table_name)
1614 .and_then(|t| t.get("column_order"))
1615 .and_then(|c| c.as_array())
1616 .unwrap()
1617 .iter()
1618 .map(|v| v.as_str().unwrap().to_string())
1619 .collect::<Vec<_>>();
1620
1621 let columns = tables_config
1622 .get(normal_table_name.as_str())
1623 .and_then(|c| c.as_object())
1624 .and_then(|o| o.get("column"))
1625 .and_then(|c| c.as_object())
1626 .unwrap();
1627
1628 let mut table_constraints = json!({
1629 "foreign": [],
1630 "unique": [],
1631 "primary": [],
1632 "tree": [],
1633 "under": [],
1634 });
1635
1636 let mut colvals: Vec<ConfigMap> = vec![];
1637 for column_name in &column_names {
1638 let column = columns.get(column_name).and_then(|c| c.as_object()).unwrap();
1639 colvals.push(column.clone());
1640 }
1641
1642 let c = colvals.len();
1643 let mut r = 0;
1644 for row in colvals {
1645 r += 1;
1646 let sql_type = get_sql_type(
1647 datatypes_config,
1648 &row.get("datatype")
1649 .and_then(|d| d.as_str())
1650 .and_then(|s| Some(s.to_string()))
1651 .unwrap(),
1652 pool,
1653 );
1654
1655 if let None = sql_type {
1656 panic!("Missing SQL type for {}", row.get("datatype").unwrap());
1657 }
1658 let sql_type = sql_type.unwrap();
1659
1660 let short_sql_type = {
1661 if sql_type.to_lowercase().as_str().starts_with("varchar(") {
1662 "VARCHAR"
1663 } else {
1664 &sql_type
1665 }
1666 };
1667
1668 if pool.any_kind() == AnyKind::Postgres {
1669 if !PG_SQL_TYPES.contains(&short_sql_type.to_lowercase().as_str()) {
1670 panic!(
1671 "Unrecognized PostgreSQL SQL type '{}' for datatype: '{}'. \
1672 Accepted SQL types for PostgreSQL are: {}",
1673 sql_type,
1674 row.get("datatype").and_then(|d| d.as_str()).unwrap(),
1675 PG_SQL_TYPES.join(", ")
1676 );
1677 }
1678 } else {
1679 if !SL_SQL_TYPES.contains(&short_sql_type.to_lowercase().as_str()) {
1680 panic!(
1681 "Unrecognized SQLite SQL type '{}' for datatype '{}'. \
1682 Accepted SQL datatypes for SQLite are: {}",
1683 sql_type,
1684 row.get("datatype").and_then(|d| d.as_str()).unwrap(),
1685 SL_SQL_TYPES.join(", ")
1686 );
1687 }
1688 }
1689
1690 let column_name = row.get("column").and_then(|s| s.as_str()).unwrap();
1691 let mut line = format!(r#" "{}" {}"#, column_name, sql_type);
1692 let structure = row.get("structure").and_then(|s| s.as_str());
1693 if let Some(structure) = structure {
1694 if structure != "" && !table_name.ends_with("_conflict") {
1695 let parsed_structure = parser.parse(structure).unwrap();
1696 for expression in parsed_structure {
1697 match *expression {
1698 Expression::Label(value) if value == "primary" => {
1699 line.push_str(" PRIMARY KEY");
1700 let primary_keys = table_constraints
1701 .get_mut("primary")
1702 .and_then(|v| v.as_array_mut())
1703 .unwrap();
1704 primary_keys.push(SerdeValue::String(column_name.to_string()));
1705 }
1706 Expression::Label(value) if value == "unique" => {
1707 line.push_str(" UNIQUE");
1708 let unique_constraints = table_constraints
1709 .get_mut("unique")
1710 .and_then(|v| v.as_array_mut())
1711 .unwrap();
1712 unique_constraints.push(SerdeValue::String(column_name.to_string()));
1713 }
1714 Expression::Function(name, args) if name == "from" => {
1715 if args.len() != 1 {
1716 panic!("Invalid foreign key: {} for: {}", structure, table_name);
1717 }
1718 match &*args[0] {
1719 Expression::Field(ftable, fcolumn) => {
1720 let foreign_keys = table_constraints
1721 .get_mut("foreign")
1722 .and_then(|v| v.as_array_mut())
1723 .unwrap();
1724 let foreign_key = json!({
1725 "column": column_name,
1726 "ftable": ftable,
1727 "fcolumn": fcolumn,
1728 });
1729 foreign_keys.push(foreign_key);
1730 }
1731 _ => {
1732 panic!("Invalid foreign key: {} for: {}", structure, table_name)
1733 }
1734 };
1735 }
1736 Expression::Function(name, args) if name == "tree" => {
1737 if args.len() != 1 {
1738 panic!(
1739 "Invalid 'tree' constraint: {} for: {}",
1740 structure, table_name
1741 );
1742 }
1743 match &*args[0] {
1744 Expression::Label(child) => {
1745 let child_datatype = columns
1746 .get(child)
1747 .and_then(|c| c.get("datatype"))
1748 .and_then(|d| d.as_str());
1749 if let None = child_datatype {
1750 panic!(
1751 "Could not determine SQL datatype for {} of tree({})",
1752 child, child
1753 );
1754 }
1755 let child_datatype = child_datatype.unwrap();
1756 let parent = column_name;
1757 let child_sql_type = get_sql_type(
1758 datatypes_config,
1759 &child_datatype.to_string(),
1760 pool,
1761 )
1762 .unwrap();
1763 if sql_type != child_sql_type {
1764 panic!(
1765 "SQL type '{}' of '{}' in 'tree({})' for table \
1766 '{}' doe snot match SQL type: '{}' of parent: '{}'.",
1767 child_sql_type,
1768 child,
1769 child,
1770 table_name,
1771 sql_type,
1772 parent
1773 );
1774 }
1775 let tree_constraints = table_constraints
1776 .get_mut("tree")
1777 .and_then(|t| t.as_array_mut())
1778 .unwrap();
1779 let entry = json!({"parent": column_name,
1780 "child": child});
1781 tree_constraints.push(entry);
1782 }
1783 _ => {
1784 panic!(
1785 "Invalid 'tree' constraint: {} for: {}",
1786 structure, table_name
1787 );
1788 }
1789 };
1790 }
1791 Expression::Function(name, args) if name == "under" => {
1792 let generic_error = format!(
1793 "Invalid 'under' constraint: {} for: {}",
1794 structure, table_name
1795 );
1796 if args.len() != 2 {
1797 panic!("{}", generic_error);
1798 }
1799 match (&*args[0], &*args[1]) {
1800 (Expression::Field(ttable, tcolumn), Expression::Label(value)) => {
1801 let under_constraints = table_constraints
1802 .get_mut("under")
1803 .and_then(|u| u.as_array_mut())
1804 .unwrap();
1805 let entry = json!({"column": column_name,
1806 "ttable": ttable,
1807 "tcolumn": tcolumn,
1808 "value": value});
1809 under_constraints.push(entry);
1810 }
1811 (_, _) => panic!("{}", generic_error),
1812 };
1813 }
1814 _ => panic!(
1815 "Unrecognized structure: {} for {}.{}",
1816 structure, table_name, column_name
1817 ),
1818 };
1819 }
1820 }
1821 }
1822 if r >= c
1823 && table_constraints
1824 .get("foreign")
1825 .and_then(|v| v.as_array())
1826 .and_then(|v| Some(v.is_empty()))
1827 .unwrap()
1828 {
1829 line.push_str("");
1830 } else {
1831 line.push_str(",");
1832 }
1833 create_lines.push(line);
1834 }
1835
1836 let foreign_keys = table_constraints.get("foreign").and_then(|v| v.as_array()).unwrap();
1837 let num_fkeys = foreign_keys.len();
1838 for (i, fkey) in foreign_keys.iter().enumerate() {
1839 create_lines.push(format!(
1840 r#" FOREIGN KEY ("{}") REFERENCES "{}"("{}"){}"#,
1841 fkey.get("column").and_then(|s| s.as_str()).unwrap(),
1842 fkey.get("ftable").and_then(|s| s.as_str()).unwrap(),
1843 fkey.get("fcolumn").and_then(|s| s.as_str()).unwrap(),
1844 if i < (num_fkeys - 1) { "," } else { "" }
1845 ));
1846 }
1847 create_lines.push(String::from(");"));
1848 statements.push(String::from(create_lines.join("\n")));
1851
1852 let tree_constraints = table_constraints.get("tree").and_then(|v| v.as_array()).unwrap();
1855 for tree in tree_constraints {
1856 let unique_keys = table_constraints.get("unique").and_then(|v| v.as_array()).unwrap();
1857 let primary_keys = table_constraints.get("primary").and_then(|v| v.as_array()).unwrap();
1858 let tree_child = tree.get("child").and_then(|c| c.as_str()).unwrap();
1859 if !unique_keys.contains(&SerdeValue::String(tree_child.to_string()))
1860 && !primary_keys.contains(&SerdeValue::String(tree_child.to_string()))
1861 {
1862 statements.push(format!(
1863 r#"CREATE UNIQUE INDEX "{}_{}_idx" ON "{}"("{}");"#,
1864 table_name, tree_child, table_name, tree_child
1865 ));
1866 }
1867 }
1868
1869 statements.push(format!(
1871 r#"CREATE UNIQUE INDEX "{}_row_number_idx" ON "{}"("row_number");"#,
1872 table_name, table_name
1873 ));
1874
1875 return (statements, table_constraints);
1876}
1877
1878fn add_message_counts(messages: &Vec<SerdeValue>, messages_stats: &mut HashMap<String, usize>) {
1882 for message in messages {
1883 let message = message.as_object().unwrap();
1884 let level = message.get("level").unwrap();
1885 if level == "error" {
1886 let current_errors = messages_stats.get("error").unwrap();
1887 messages_stats.insert("error".to_string(), current_errors + 1);
1888 } else if level == "warning" {
1889 let current_warnings = messages_stats.get("warning").unwrap();
1890 messages_stats.insert("warning".to_string(), current_warnings + 1);
1891 } else if level == "info" {
1892 let current_infos = messages_stats.get("info").unwrap();
1893 messages_stats.insert("info".to_string(), current_infos + 1);
1894 } else {
1895 eprintln!("Warning: unknown message type: {}", level);
1896 }
1897 }
1898}
1899
1900fn get_sorted_datatypes(global_config: &ConfigMap) -> Vec<&str> {
1903 let mut graph = DiGraphMap::<&str, ()>::new();
1904 let dt_config = global_config.get("datatype").and_then(|d| d.as_object()).unwrap();
1905 for (dt_name, dt_obj) in dt_config.iter() {
1906 let d_index = graph.add_node(dt_name);
1907 if let Some(parent) = dt_obj.get("parent").and_then(|p| p.as_str()) {
1908 let p_index = graph.add_node(parent);
1909 graph.add_edge(d_index, p_index, ());
1910 }
1911 }
1912
1913 let mut cycles = vec![];
1914 match toposort(&graph, None) {
1915 Err(cycle) => {
1916 let problem_node = cycle.node_id();
1917 let neighbours = graph.neighbors_directed(problem_node, Direction::Outgoing);
1918 for neighbour in neighbours {
1919 let ways_to_problem_node =
1920 all_simple_paths::<Vec<_>, _>(&graph, neighbour, problem_node, 0, None);
1921 for mut way in ways_to_problem_node {
1922 let mut cycle = vec![problem_node];
1923 cycle.append(&mut way);
1924 let cycle = cycle.iter().map(|&item| item.to_string()).collect::<Vec<_>>();
1925 cycles.push(cycle);
1926 }
1927 }
1928 panic!("Defined datatypes contain circular dependencies: {:?}", cycles);
1929 }
1930 Ok(mut sorted) => {
1931 sorted.reverse();
1932 sorted
1933 }
1934 }
1935}
1936
1937fn sort_messages(sorted_datatypes: &Vec<&str>, cell_messages: &Vec<SerdeValue>) -> Vec<SerdeValue> {
1944 let mut datatype_messages = vec![];
1945 let mut structure_messages = vec![];
1946 let mut rule_messages = vec![];
1947 for message in cell_messages {
1948 let rule = message
1949 .get("rule")
1950 .and_then(|r| Some(r.as_str().unwrap().splitn(2, ":").collect::<Vec<_>>()))
1951 .unwrap();
1952 if rule[0] == "rule" {
1953 rule_messages.push(message.clone());
1954 } else if rule[0] == "datatype" {
1955 datatype_messages.push(message.clone());
1956 } else {
1957 structure_messages.push(message.clone());
1958 }
1959 }
1960
1961 if datatype_messages.len() > 0 {
1962 datatype_messages = {
1963 let mut sorted_messages = vec![];
1964 for datatype in sorted_datatypes {
1965 let mut messages = datatype_messages
1966 .iter()
1967 .filter(|m| {
1968 m.get("rule").and_then(|r| r.as_str()).unwrap()
1969 == format!("datatype:{}", datatype)
1970 })
1971 .map(|m| m.clone())
1972 .collect::<Vec<_>>();
1973 sorted_messages.append(&mut messages);
1974 }
1975 sorted_messages
1976 }
1977 }
1978
1979 let mut messages = datatype_messages;
1980 messages.append(&mut rule_messages);
1981 messages.append(&mut structure_messages);
1982 messages
1983}
1984
1985async fn make_inserts(
1994 config: &ConfigMap,
1995 table_name: &String,
1996 rows: &mut Vec<ResultRow>,
1997 chunk_number: usize,
1998 messages_stats: &mut HashMap<String, usize>,
1999 verbose: bool,
2000 pool: &AnyPool,
2001) -> Result<
2002 ((String, Vec<String>, String, Vec<String>), (String, Vec<String>, String, Vec<String>)),
2003 sqlx::Error,
2004> {
2005 let conflict_columns = {
2006 let mut conflict_columns = vec![];
2007 let primaries = config
2008 .get("constraints")
2009 .and_then(|c| c.as_object())
2010 .and_then(|c| c.get("primary"))
2011 .and_then(|t| t.as_object())
2012 .and_then(|t| t.get(table_name))
2013 .and_then(|t| t.as_array())
2014 .unwrap();
2015
2016 let uniques = config
2017 .get("constraints")
2018 .and_then(|c| c.as_object())
2019 .and_then(|c| c.get("unique"))
2020 .and_then(|t| t.as_object())
2021 .and_then(|t| t.get(table_name))
2022 .and_then(|t| t.as_array())
2023 .unwrap();
2024
2025 let trees = config
2026 .get("constraints")
2027 .and_then(|c| c.as_object())
2028 .and_then(|o| o.get("tree"))
2029 .and_then(|t| t.as_object())
2030 .and_then(|o| o.get(table_name))
2031 .and_then(|t| t.as_array())
2032 .unwrap()
2033 .iter()
2034 .map(|v| v.as_object().unwrap())
2035 .map(|v| v.get("child").unwrap().clone())
2036 .collect::<Vec<_>>();
2037
2038 for key_columns in vec![primaries, uniques, &trees] {
2039 for column in key_columns {
2040 if !conflict_columns.contains(column) {
2041 conflict_columns.push(column.clone());
2042 }
2043 }
2044 }
2045
2046 conflict_columns
2047 };
2048
2049 fn generate_sql(
2050 config: &ConfigMap,
2051 table_name: &String,
2052 column_names: &Vec<String>,
2053 rows: &Vec<ResultRow>,
2054 messages_stats: &mut HashMap<String, usize>,
2055 verbose: bool,
2056 pool: &AnyPool,
2057 ) -> (String, Vec<String>, String, Vec<String>) {
2058 let mut lines = vec![];
2059 let mut params = vec![];
2060 let mut message_lines = vec![];
2061 let mut message_params = vec![];
2062 let sorted_datatypes = get_sorted_datatypes(config);
2063 for row in rows.iter() {
2064 let mut values = vec![format!("{}", row.row_number.unwrap())];
2065 for column in column_names {
2066 let cell = row.contents.get(column).unwrap();
2067
2068 if cell.nulltype == None && cell.valid {
2071 let sql_type =
2072 get_sql_type_from_global_config(&config, &table_name, &column, pool)
2073 .unwrap();
2074 values.push(cast_sql_param_from_text(&sql_type));
2075 params.push(cell.value.clone());
2076 } else {
2077 values.push(String::from("NULL"));
2078 }
2079
2080 if !cell.valid {
2083 if verbose {
2084 add_message_counts(&cell.messages, messages_stats);
2085 }
2086 for message in sort_messages(&sorted_datatypes, &cell.messages) {
2087 let row = row.row_number.unwrap().to_string();
2088 let message_values = vec![
2089 SQL_PARAM, &row, SQL_PARAM, SQL_PARAM, SQL_PARAM, SQL_PARAM, SQL_PARAM,
2090 ];
2091
2092 let message = message.as_object().unwrap();
2093 message_params.push({
2094 let normal_table_name;
2095 if let Some(s) = table_name.strip_suffix("_conflict") {
2096 normal_table_name = String::from(s);
2097 } else {
2098 normal_table_name = table_name.to_string();
2099 }
2100 normal_table_name
2101 });
2102 message_params.push(column.clone());
2103 message_params.push(cell.value.clone());
2104 message_params.push(
2105 message.get("level").and_then(|s| s.as_str()).unwrap().to_string(),
2106 );
2107 message_params.push(
2108 message.get("rule").and_then(|s| s.as_str()).unwrap().to_string(),
2109 );
2110 message_params.push(
2111 message.get("message").and_then(|s| s.as_str()).unwrap().to_string(),
2112 );
2113 let line = message_values.join(", ");
2114 let line = format!("({})", line);
2115 message_lines.push(line);
2116 }
2117 }
2118 }
2119 let line = values.join(", ");
2120 let line = format!("({})", line);
2121 lines.push(line);
2122 }
2123
2124 let mut output = String::from("");
2126 if !lines.is_empty() {
2127 output.push_str(&format!(
2128 r#"INSERT INTO "{}" ("row_number", {}) VALUES"#,
2129 table_name,
2130 {
2131 let mut all_columns = vec![];
2132 for column_name in column_names {
2133 let quoted_column_name = format!(r#""{}""#, column_name);
2134 all_columns.push(quoted_column_name);
2135 }
2136 all_columns.join(", ")
2137 }
2138 ));
2139 output.push_str("\n");
2140 output.push_str(&lines.join(",\n"));
2141 output.push_str(";");
2142 }
2143
2144 let mut message_output = String::from("");
2146 if !message_lines.is_empty() {
2147 message_output.push_str(r#"INSERT INTO "message" "#);
2148 message_output
2149 .push_str(r#"("table", "row", "column", "value", "level", "rule", "message") "#);
2150 message_output.push_str("VALUES");
2151 message_output.push_str("\n");
2152 message_output.push_str(&message_lines.join(",\n"));
2153 message_output.push_str(";");
2154 }
2155
2156 (output, params, message_output, message_params)
2157 }
2158
2159 fn has_conflict(row: &ResultRow, conflict_columns: &Vec<SerdeValue>) -> bool {
2160 for (column, cell) in &row.contents {
2161 let column = SerdeValue::String(column.to_string());
2162 if conflict_columns.contains(&column) && !cell.valid {
2163 return true;
2164 }
2165 }
2166 return false;
2167 }
2168
2169 let mut main_rows = vec![];
2170 let mut conflict_rows = vec![];
2171 for (i, row) in rows.iter_mut().enumerate() {
2172 let i = i + 1;
2174 row.row_number = Some(i as u32 + chunk_number as u32 * CHUNK_SIZE as u32);
2175 if has_conflict(&row, &conflict_columns) {
2176 conflict_rows.push(row.clone());
2177 } else {
2178 main_rows.push(row.clone());
2179 }
2180 }
2181
2182 let column_names = config
2185 .get("table")
2186 .and_then(|t| t.get(table_name))
2187 .and_then(|t| t.get("column_order"))
2188 .and_then(|c| c.as_array())
2189 .unwrap()
2190 .iter()
2191 .map(|v| v.as_str().unwrap().to_string())
2192 .collect::<Vec<_>>();
2193
2194 let (main_sql, main_params, main_message_sql, main_message_params) = generate_sql(
2195 &config,
2196 &table_name,
2197 &column_names,
2198 &main_rows,
2199 messages_stats,
2200 verbose,
2201 pool,
2202 );
2203 let (conflict_sql, conflict_params, conflict_message_sql, conflict_message_params) =
2204 generate_sql(
2205 &config,
2206 &format!("{}_conflict", table_name),
2207 &column_names,
2208 &conflict_rows,
2209 messages_stats,
2210 verbose,
2211 pool,
2212 );
2213
2214 Ok((
2215 (main_sql, main_params, main_message_sql, main_message_params),
2216 (conflict_sql, conflict_params, conflict_message_sql, conflict_message_params),
2217 ))
2218}
2219
2220async fn validate_rows_inter_and_insert(
2225 config: &ConfigMap,
2226 pool: &AnyPool,
2227 table_name: &String,
2228 rows: &mut Vec<ResultRow>,
2229 chunk_number: usize,
2230 messages_stats: &mut HashMap<String, usize>,
2231 verbose: bool,
2232) -> Result<(), sqlx::Error> {
2233 validate_rows_trees(config, pool, table_name, rows).await?;
2235
2236 let mut tmp_messages_stats = HashMap::new();
2246 tmp_messages_stats.insert("error".to_string(), 0);
2247 tmp_messages_stats.insert("warning".to_string(), 0);
2248 tmp_messages_stats.insert("info".to_string(), 0);
2249 let (
2250 (main_sql, main_params, main_message_sql, main_message_params),
2251 (conflict_sql, conflict_params, conflict_message_sql, conflict_message_params),
2252 ) = make_inserts(
2253 config,
2254 table_name,
2255 rows,
2256 chunk_number,
2257 &mut tmp_messages_stats,
2258 verbose,
2259 pool,
2260 )
2261 .await?;
2262
2263 let main_sql = local_sql_syntax(&pool, &main_sql);
2264 let mut main_query = sqlx_query(&main_sql);
2265 for param in &main_params {
2266 main_query = main_query.bind(param);
2267 }
2268 let main_result = main_query.execute(pool).await;
2269 match main_result {
2270 Ok(_) => {
2271 let conflict_sql = local_sql_syntax(&pool, &conflict_sql);
2272 let mut conflict_query = sqlx_query(&conflict_sql);
2273 for param in &conflict_params {
2274 conflict_query = conflict_query.bind(param);
2275 }
2276 conflict_query.execute(pool).await?;
2277
2278 let main_message_sql = local_sql_syntax(&pool, &main_message_sql);
2279 let mut main_message_query = sqlx_query(&main_message_sql);
2280 for param in &main_message_params {
2281 main_message_query = main_message_query.bind(param);
2282 }
2283 main_message_query.execute(pool).await?;
2284
2285 let conflict_message_sql = local_sql_syntax(&pool, &conflict_message_sql);
2286 let mut conflict_message_query = sqlx_query(&conflict_message_sql);
2287 for param in &conflict_message_params {
2288 conflict_message_query = conflict_message_query.bind(param);
2289 }
2290 conflict_message_query.execute(pool).await?;
2291
2292 if verbose {
2293 let curr_errors = messages_stats.get("error").unwrap();
2294 messages_stats.insert(
2295 "error".to_string(),
2296 curr_errors + tmp_messages_stats.get("error").unwrap(),
2297 );
2298 let curr_warnings = messages_stats.get("warning").unwrap();
2299 messages_stats.insert(
2300 "warning".to_string(),
2301 curr_warnings + tmp_messages_stats.get("warning").unwrap(),
2302 );
2303 let curr_infos = messages_stats.get("info").unwrap();
2304 messages_stats.insert(
2305 "info".to_string(),
2306 curr_infos + tmp_messages_stats.get("info").unwrap(),
2307 );
2308 }
2309 }
2310 Err(_) => {
2311 validate_rows_constraints(config, pool, table_name, rows).await?;
2312
2313 let (
2314 (main_sql, main_params, main_message_sql, main_message_params),
2315 (conflict_sql, conflict_params, conflict_message_sql, conflict_message_params),
2316 ) = make_inserts(config, table_name, rows, chunk_number, messages_stats, verbose, pool)
2317 .await?;
2318
2319 let main_sql = local_sql_syntax(&pool, &main_sql);
2320 let mut main_query = sqlx_query(&main_sql);
2321 for param in &main_params {
2322 main_query = main_query.bind(param);
2323 }
2324 main_query.execute(pool).await?;
2325
2326 let conflict_sql = local_sql_syntax(&pool, &conflict_sql);
2327 let mut conflict_query = sqlx_query(&conflict_sql);
2328 for param in &conflict_params {
2329 conflict_query = conflict_query.bind(param);
2330 }
2331 conflict_query.execute(pool).await?;
2332
2333 let main_message_sql = local_sql_syntax(&pool, &main_message_sql);
2334 let mut main_message_query = sqlx_query(&main_message_sql);
2335 for param in &main_message_params {
2336 main_message_query = main_message_query.bind(param);
2337 }
2338 main_message_query.execute(pool).await?;
2339
2340 let conflict_message_sql = local_sql_syntax(&pool, &conflict_message_sql);
2341 let mut conflict_message_query = sqlx_query(&conflict_message_sql);
2342 for param in &conflict_message_params {
2343 conflict_message_query = conflict_message_query.bind(param);
2344 }
2345 conflict_message_query.execute(pool).await?;
2346 }
2347 };
2348
2349 Ok(())
2350}
2351
2352async fn validate_and_insert_chunks(
2358 config: &ConfigMap,
2359 pool: &AnyPool,
2360 compiled_datatype_conditions: &HashMap<String, CompiledCondition>,
2361 compiled_rule_conditions: &HashMap<String, HashMap<String, Vec<ColumnRule>>>,
2362 table_name: &String,
2363 chunks: &IntoChunks<csv::StringRecordsIter<'_, std::fs::File>>,
2364 headers: &csv::StringRecord,
2365 messages_stats: &mut HashMap<String, usize>,
2366 verbose: bool,
2367) -> Result<(), sqlx::Error> {
2368 if !MULTI_THREADED {
2369 for (chunk_number, chunk) in chunks.into_iter().enumerate() {
2370 let mut rows: Vec<_> = chunk.collect();
2371 let mut intra_validated_rows = validate_rows_intra(
2372 config,
2373 compiled_datatype_conditions,
2374 compiled_rule_conditions,
2375 table_name,
2376 headers,
2377 &mut rows,
2378 );
2379 validate_rows_inter_and_insert(
2380 config,
2381 pool,
2382 table_name,
2383 &mut intra_validated_rows,
2384 chunk_number,
2385 messages_stats,
2386 verbose,
2387 )
2388 .await?;
2389 }
2390 Ok(())
2391 } else {
2392 let num_cpus = num_cpus::get();
2402 let batches = chunks.into_iter().chunks(num_cpus);
2403 let mut chunk_number = 0;
2404 for batch in batches.into_iter() {
2405 let mut results = BTreeMap::new();
2406 crossbeam::scope(|scope| {
2407 let mut workers = vec![];
2408 for chunk in batch.into_iter() {
2409 let mut rows: Vec<_> = chunk.collect();
2410 workers.push(scope.spawn(move |_| {
2411 validate_rows_intra(
2412 config,
2413 compiled_datatype_conditions,
2414 compiled_rule_conditions,
2415 table_name,
2416 headers,
2417 &mut rows,
2418 )
2419 }));
2420 }
2421
2422 for worker in workers {
2423 let result = worker.join().unwrap();
2424 results.insert(chunk_number, result);
2425 chunk_number += 1;
2426 }
2427 })
2428 .expect("A child thread panicked");
2429
2430 for (chunk_number, mut intra_validated_rows) in results {
2431 validate_rows_inter_and_insert(
2432 config,
2433 pool,
2434 table_name,
2435 &mut intra_validated_rows,
2436 chunk_number,
2437 messages_stats,
2438 verbose,
2439 )
2440 .await?;
2441 }
2442 }
2443
2444 Ok(())
2445 }
2446}
2447
2448async fn load_db(
2454 config: &ConfigMap,
2455 pool: &AnyPool,
2456 compiled_datatype_conditions: &HashMap<String, CompiledCondition>,
2457 compiled_rule_conditions: &HashMap<String, HashMap<String, Vec<ColumnRule>>>,
2458 verbose: bool,
2459) -> Result<(), sqlx::Error> {
2460 let mut table_list = vec![];
2461 for table in config.get("sorted_table_list").and_then(|l| l.as_array()).unwrap() {
2462 table_list.push(table.as_str().and_then(|s| Some(s.to_string())).unwrap());
2463 }
2464 let table_list = table_list; let num_tables = table_list.len();
2466 let mut total_errors = 0;
2467 let mut total_warnings = 0;
2468 let mut total_infos = 0;
2469 let mut table_num = 1;
2470 for table_name in table_list {
2471 if verbose {
2472 eprintln!(
2473 "{} - Loading table {}/{}: {}",
2474 Utc::now(),
2475 table_num,
2476 num_tables,
2477 table_name
2478 );
2479 }
2480 table_num += 1;
2481 let path = String::from(
2482 config
2483 .get("table")
2484 .and_then(|t| t.as_object())
2485 .and_then(|o| o.get(&table_name))
2486 .and_then(|n| n.get("path"))
2487 .and_then(|p| p.as_str())
2488 .unwrap(),
2489 );
2490 let mut rdr = csv::ReaderBuilder::new().has_headers(false).delimiter(b'\t').from_reader(
2491 File::open(path.clone()).unwrap_or_else(|err| {
2492 panic!("Unable to open '{}': {}", path.clone(), err);
2493 }),
2494 );
2495
2496 let mut records = rdr.records();
2498 let headers;
2499 if let Some(result) = records.next() {
2500 headers = result.unwrap();
2501 } else {
2502 panic!("'{}' is empty", path);
2503 }
2504
2505 for header in headers.iter() {
2506 if header.trim().is_empty() {
2507 panic!("One or more of the header fields is empty for table '{}'", table_name);
2508 }
2509 }
2510
2511 let mut messages_stats = HashMap::new();
2514 messages_stats.insert("error".to_string(), 0);
2515 messages_stats.insert("warning".to_string(), 0);
2516 messages_stats.insert("info".to_string(), 0);
2517
2518 let chunks = records.chunks(CHUNK_SIZE);
2521 validate_and_insert_chunks(
2522 config,
2523 pool,
2524 compiled_datatype_conditions,
2525 compiled_rule_conditions,
2526 &table_name,
2527 &chunks,
2528 &headers,
2529 &mut messages_stats,
2530 verbose,
2531 )
2532 .await?;
2533
2534 let mut recs_to_update =
2540 validate_tree_foreign_keys(config, pool, &table_name, None).await?;
2541 recs_to_update.append(&mut validate_under(config, pool, &table_name, None).await?);
2542
2543 for record in recs_to_update {
2544 let row_number = record.get("row_number").unwrap();
2545 let column_name = record.get("column").and_then(|s| s.as_str()).unwrap();
2546 let value = record.get("value").and_then(|s| s.as_str()).unwrap();
2547 let level = record.get("level").and_then(|s| s.as_str()).unwrap();
2548 let rule = record.get("rule").and_then(|s| s.as_str()).unwrap();
2549 let message = record.get("message").and_then(|s| s.as_str()).unwrap();
2550
2551 let sql = format!(
2552 r#"UPDATE "{}" SET "{}" = NULL WHERE "row_number" = {}"#,
2553 table_name, column_name, row_number
2554 );
2555 let query = sqlx_query(&sql);
2556 query.execute(pool).await?;
2557
2558 let sql = local_sql_syntax(
2559 &pool,
2560 &format!(
2561 r#"INSERT INTO "message"
2562 ("table", "row", "column", "value", "level", "rule", "message")
2563 VALUES ({}, {}, {}, {}, {}, {}, {})"#,
2564 SQL_PARAM, row_number, SQL_PARAM, SQL_PARAM, SQL_PARAM, SQL_PARAM, SQL_PARAM
2565 ),
2566 );
2567 let mut query = sqlx_query(&sql);
2568 query = query.bind(&table_name);
2569 query = query.bind(&column_name);
2570 query = query.bind(&value);
2571 query = query.bind(&level);
2572 query = query.bind(&rule);
2573 query = query.bind(&message);
2574 query.execute(pool).await?;
2575
2576 if verbose {
2577 let messages = vec![json!({
2579 "message": message,
2580 "level": level,
2581 })];
2582 add_message_counts(&messages, &mut messages_stats);
2583 }
2584 }
2585
2586 if verbose {
2587 let errors = messages_stats.get("error").unwrap();
2589 let warnings = messages_stats.get("warning").unwrap();
2590 let infos = messages_stats.get("info").unwrap();
2591 let status_message = format!(
2592 "{} errors, {} warnings, and {} information messages generated for {}",
2593 errors, warnings, infos, table_name
2594 );
2595 eprintln!("{} - {}", Utc::now(), status_message);
2596 total_errors += errors;
2597 total_warnings += warnings;
2598 total_infos += infos;
2599 }
2600 }
2601
2602 if verbose {
2603 eprintln!(
2604 "{} - Loading complete with {} errors, {} warnings, and {} information messages",
2605 Utc::now(),
2606 total_errors,
2607 total_warnings,
2608 total_infos
2609 );
2610 }
2611
2612 Ok(())
2613}