ontodev_valve/
lib.rs

1//! <!-- Please do not edit README.md directly. To generate a new readme from the crate documentation
2//!      in src/lib.rs, install cargo-readme using `cargo install cargo-readme` and then run:
3//!      `cargo readme > README.md` -->
4//!
5//! # valve.rs
6//! A lightweight validation engine written in rust.
7//!
8//! ## Command line usage
9//! Run:
10//! ```
11//! valve --help
12//! ```
13//! to see command line options.
14//!
15//! ## Python bindings
16//! See [valve.py](https://github.com/ontodev/valve.py)
17
18#[macro_use]
19extern crate lalrpop_util;
20
21mod ast;
22pub mod validate;
23
24lalrpop_mod!(pub valve_grammar);
25
26use crate::validate::{
27    validate_rows_constraints, validate_rows_intra, validate_rows_trees,
28    validate_tree_foreign_keys, validate_under, ResultRow,
29};
30use crate::{ast::Expression, valve_grammar::StartParser};
31use chrono::Utc;
32use crossbeam;
33use futures::executor::block_on;
34use indoc::indoc;
35use itertools::{IntoChunks, Itertools};
36use lazy_static::lazy_static;
37use petgraph::{
38    algo::{all_simple_paths, toposort},
39    graphmap::DiGraphMap,
40    Direction,
41};
42use regex::Regex;
43use serde_json::{json, Value as SerdeValue};
44use sqlx::{
45    any::{AnyConnectOptions, AnyKind, AnyPool, AnyPoolOptions, AnyRow},
46    query as sqlx_query, Column, Row, ValueRef,
47};
48use std::{
49    collections::{BTreeMap, HashMap},
50    fs::File,
51    process,
52    str::FromStr,
53    sync::Arc,
54};
55
56/// The number of rows that are validated at a time by a thread.
57static CHUNK_SIZE: usize = 500;
58
59/// Run valve in multi-threaded mode.
60static MULTI_THREADED: bool = true;
61
62// Note that SQL_PARAM must be a 'word' (from the point of view of regular expressions) since in the
63// local_sql_syntax() function below we are matchng against it using '\b' which represents a word
64// boundary. If you want to use a non-word placeholder then you must also change '\b' in the regex
65// to '\B'.
66/// The word (in the regex sense) placeholder to use for query parameters when binding using sqlx.
67static SQL_PARAM: &str = "VALVEPARAM";
68
69lazy_static! {
70    static ref PG_SQL_TYPES: Vec<&'static str> = vec!["text", "varchar", "integer"];
71    static ref SL_SQL_TYPES: Vec<&'static str> = vec!["text", "integer"];
72}
73
74/// An alias for [serde_json::Map](..//serde_json/struct.Map.html)<String, [serde_json::Value](../serde_json/enum.Value.html)>.
75// Note: serde_json::Map is
76// [backed by a BTreeMap by default](https://docs.serde.rs/serde_json/map/index.html)
77pub type ConfigMap = serde_json::Map<String, SerdeValue>;
78
79/// Represents a structure such as those found in the `structure` column of the `column` table in
80/// both its parsed format (i.e., as an [Expression](ast/enum.Expression.html)) as well as in its
81/// original format (i.e., as a plain String).
82#[derive(Clone)]
83pub struct ParsedStructure {
84    original: String,
85    parsed: Expression,
86}
87
88// We use Debug here instead of Display because we have only implemented Debug for Expressions.
89// See the comment about this in ast.rs.
90impl std::fmt::Debug for ParsedStructure {
91    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
92        write!(
93            f,
94            "{{\"parsed_structure\": {{\"original\": \"{}\", \"parsed\": {:?}}}}}",
95            &self.original, &self.parsed
96        )
97    }
98}
99
100/// Represents a condition in three different ways: (i) in String format, (ii) as a parsed
101/// [Expression](ast/enum.Expression.html), and (iii) as a pre-compiled regular expression.
102#[derive(Clone)]
103pub struct CompiledCondition {
104    original: String,
105    parsed: Expression,
106    compiled: Arc<dyn Fn(&str) -> bool + Sync + Send>,
107}
108
109// We use Debug here instead of Display because we have only implemented Debug for Expressions.
110// See the comment about this in ast.rs.
111impl std::fmt::Debug for CompiledCondition {
112    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
113        write!(
114            f,
115            "{{\"compiled_condition\": {{\"original\": \"{}\", \"parsed\": {:?}}}}}",
116            &self.original, &self.parsed
117        )
118    }
119}
120
121/// Represents a 'when-then' condition, as found in the `rule` table, as two
122/// [CompiledCondition](struct.CompiledCondition.html) structs corresponding to the when and then
123/// parts of the given rule.
124pub struct ColumnRule {
125    when: CompiledCondition,
126    then: CompiledCondition,
127}
128
129// We use Debug here instead of Display because we have only implemented Debug for Expressions.
130// See the comment about this in ast.rs.
131impl std::fmt::Debug for ColumnRule {
132    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
133        write!(f, "{{\"column_rule\": {{\"when\": {:?}, \"then\": {:?}}}}}", &self.when, &self.then)
134    }
135}
136
137/// Given the path to a configuration table (either a table.tsv file or a database containing a
138/// table named "table"), load and check the 'table', 'column', and 'datatype' tables, and return
139/// ConfigMaps corresponding to specials, tables, datatypes, and rules.
140pub fn read_config_files(
141    path: &str,
142    config_table: &str,
143) -> (ConfigMap, ConfigMap, ConfigMap, ConfigMap) {
144    let special_table_types = json!({
145        "table": {"required": true},
146        "column": {"required": true},
147        "datatype": {"required": true},
148        "rule": {"required": false},
149    });
150    let special_table_types = special_table_types.as_object().unwrap();
151
152    // Initialize the special table entries in the specials config map:
153    let mut specials_config = ConfigMap::new();
154    for t in special_table_types.keys() {
155        specials_config.insert(t.to_string(), SerdeValue::Null);
156    }
157
158    // Load the table table from the given path:
159    let mut tables_config = ConfigMap::new();
160    let rows = {
161        // Read in the configuration entry point (the "table table") from either a file or a
162        // database table.
163        if path.to_lowercase().ends_with(".tsv") {
164            read_tsv_into_vector(path)
165        } else {
166            read_db_table_into_vector(path, config_table)
167        }
168    };
169
170    for mut row in rows {
171        for column in vec!["table", "path", "type"] {
172            if !row.contains_key(column) || row.get(column) == None {
173                panic!("Missing required column '{}' reading '{}'", column, path);
174            }
175        }
176
177        for column in vec!["table", "path"] {
178            if row.get(column).and_then(|c| c.as_str()).unwrap() == "" {
179                panic!("Missing required value for '{}' reading '{}'", column, path);
180            }
181        }
182
183        for column in vec!["type"] {
184            if row.get(column).and_then(|c| c.as_str()).unwrap() == "" {
185                row.remove(&column.to_string());
186            }
187        }
188
189        if let Some(SerdeValue::String(row_type)) = row.get("type") {
190            if row_type == "table" {
191                let row_path = row.get("path").unwrap();
192                if path.to_lowercase().ends_with(".tsv") && row_path != path {
193                    panic!(
194                        "Special 'table' path '{}' does not match this path '{}'",
195                        row_path, path
196                    );
197                }
198            }
199
200            if special_table_types.contains_key(row_type) {
201                match specials_config.get(row_type) {
202                    Some(SerdeValue::Null) => (),
203                    _ => panic!("Multiple tables with type '{}' declared in '{}'", row_type, path),
204                }
205                let row_table = row.get("table").and_then(|t| t.as_str()).unwrap();
206                specials_config
207                    .insert(row_type.to_string(), SerdeValue::String(row_table.to_string()));
208            } else {
209                panic!("Unrecognized table type '{}' in '{}'", row_type, path);
210            }
211        }
212
213        row.insert(String::from("column"), SerdeValue::Object(ConfigMap::new()));
214        let row_table = row.get("table").and_then(|t| t.as_str()).unwrap();
215        tables_config.insert(row_table.to_string(), SerdeValue::Object(row));
216    }
217
218    // Check that all the required special tables are present
219    for (table_type, table_spec) in special_table_types.iter() {
220        if let Some(SerdeValue::Bool(true)) = table_spec.get("required") {
221            if let Some(SerdeValue::Null) = specials_config.get(table_type) {
222                panic!("Missing required '{}' table in '{}'", table_type, path);
223            }
224        }
225    }
226
227    // Helper function for extracting special configuration (other than the main 'table'
228    // configuration) from either a file or a table in the database, depending on the value of
229    // `path`. When `path` ends in '.tsv', the path of the config table corresponding to
230    // `table_type` is looked up, the TSV is read, and the rows are returned. When `path` does not
231    // end in '.tsv', the table name corresponding to `table_type` is looked up in the database
232    // indicated by `path`, the table is read, and the rows are returned.
233    fn get_special_config(
234        table_type: &str,
235        specials_config: &ConfigMap,
236        tables_config: &ConfigMap,
237        path: &str,
238    ) -> Vec<ConfigMap> {
239        if path.to_lowercase().ends_with(".tsv") {
240            let table_name = specials_config.get(table_type).and_then(|d| d.as_str()).unwrap();
241            let path = String::from(
242                tables_config
243                    .get(table_name)
244                    .and_then(|t| t.get("path"))
245                    .and_then(|p| p.as_str())
246                    .unwrap(),
247            );
248            return read_tsv_into_vector(&path.to_string());
249        } else {
250            let mut db_table = None;
251            for (table_name, table_config) in tables_config {
252                let this_type = table_config.get("type");
253                if let Some(this_type) = this_type {
254                    let this_type = this_type.as_str().unwrap();
255                    if this_type == table_type {
256                        db_table = Some(table_name);
257                        break;
258                    }
259                }
260            }
261            if db_table == None {
262                panic!("Could not determine special table name for type '{}'.", table_type);
263            }
264            let db_table = db_table.unwrap();
265            read_db_table_into_vector(path, db_table)
266        }
267    }
268
269    // Load datatype table
270    let mut datatypes_config = ConfigMap::new();
271    let rows = get_special_config("datatype", &specials_config, &tables_config, path);
272    for mut row in rows {
273        for column in vec!["datatype", "parent", "condition", "SQLite type", "PostgreSQL type"] {
274            if !row.contains_key(column) || row.get(column) == None {
275                panic!("Missing required column '{}' reading '{}'", column, path);
276            }
277        }
278
279        for column in vec!["datatype"] {
280            if row.get(column).and_then(|c| c.as_str()).unwrap() == "" {
281                panic!("Missing required value for '{}' reading '{}'", column, path);
282            }
283        }
284
285        for column in vec!["parent", "condition", "SQLite type", "PostgreSQL type"] {
286            if row.get(column).and_then(|c| c.as_str()).unwrap() == "" {
287                row.remove(&column.to_string());
288            }
289        }
290
291        let dt_name = row.get("datatype").and_then(|d| d.as_str()).unwrap();
292        datatypes_config.insert(dt_name.to_string(), SerdeValue::Object(row));
293    }
294
295    for dt in vec!["text", "empty", "line", "word"] {
296        if !datatypes_config.contains_key(dt) {
297            panic!("Missing required datatype: '{}'", dt);
298        }
299    }
300
301    // Load column table
302    let rows = get_special_config("column", &specials_config, &tables_config, path);
303    for mut row in rows {
304        for column in vec!["table", "column", "nulltype", "datatype"] {
305            if !row.contains_key(column) || row.get(column) == None {
306                panic!("Missing required column '{}' reading '{}'", column, path);
307            }
308        }
309
310        for column in vec!["table", "column", "datatype"] {
311            if row.get(column).and_then(|c| c.as_str()).unwrap() == "" {
312                panic!("Missing required value for '{}' reading '{}'", column, path);
313            }
314        }
315
316        for column in vec!["nulltype"] {
317            if row.get(column).and_then(|c| c.as_str()).unwrap() == "" {
318                row.remove(&column.to_string());
319            }
320        }
321
322        let row_table = row.get("table").and_then(|t| t.as_str()).unwrap();
323        if !tables_config.contains_key(row_table) {
324            panic!("Undefined table '{}' reading '{}'", row_table, path);
325        }
326
327        if let Some(SerdeValue::String(nulltype)) = row.get("nulltype") {
328            if !datatypes_config.contains_key(nulltype) {
329                panic!("Undefined nulltype '{}' reading '{}'", nulltype, path);
330            }
331        }
332
333        let datatype = row.get("datatype").and_then(|d| d.as_str()).unwrap();
334        if !datatypes_config.contains_key(datatype) {
335            panic!("Undefined datatype '{}' reading '{}'", datatype, path);
336        }
337
338        let row_table = row.get("table").and_then(|t| t.as_str()).unwrap();
339        let column_name = row.get("column").and_then(|c| c.as_str()).unwrap();
340
341        let columns_config = tables_config
342            .get_mut(row_table)
343            .and_then(|t| t.get_mut("column"))
344            .and_then(|c| c.as_object_mut())
345            .unwrap();
346        columns_config.insert(column_name.to_string(), SerdeValue::Object(row));
347    }
348
349    // Load rule table if it exists
350    let mut rules_config = ConfigMap::new();
351    if let Some(SerdeValue::String(table_name)) = specials_config.get("rule") {
352        let rows = get_special_config(table_name, &specials_config, &tables_config, path);
353        for row in rows {
354            for column in vec![
355                "table",
356                "when column",
357                "when condition",
358                "then column",
359                "then condition",
360                "level",
361                "description",
362            ] {
363                if !row.contains_key(column) || row.get(column) == None {
364                    panic!("Missing required column '{}' reading '{}'", column, path);
365                }
366                if row.get(column).and_then(|c| c.as_str()).unwrap() == "" {
367                    panic!("Missing required value for '{}' reading '{}'", column, path);
368                }
369            }
370
371            let row_table = row.get("table").and_then(|t| t.as_str()).unwrap();
372            if !tables_config.contains_key(row_table) {
373                panic!("Undefined table '{}' reading '{}'", row_table, path);
374            }
375
376            // Add the rule specified in the given row to the list of rules associated with the
377            // value of the when column:
378            let row_when_column = row.get("when column").and_then(|c| c.as_str()).unwrap();
379            if !rules_config.contains_key(row_table) {
380                rules_config.insert(String::from(row_table), SerdeValue::Object(ConfigMap::new()));
381            }
382
383            let table_rule_config =
384                rules_config.get_mut(row_table).and_then(|t| t.as_object_mut()).unwrap();
385            if !table_rule_config.contains_key(row_when_column) {
386                table_rule_config.insert(String::from(row_when_column), SerdeValue::Array(vec![]));
387            }
388            let column_rule_config = table_rule_config
389                .get_mut(&row_when_column.to_string())
390                .and_then(|w| w.as_array_mut())
391                .unwrap();
392            column_rule_config.push(SerdeValue::Object(row));
393        }
394    }
395
396    // Finally, return all the configs:
397    (specials_config, tables_config, datatypes_config, rules_config)
398}
399
400/// Given the global configuration map and a parser, compile all of the datatype conditions,
401/// add them to a hash map whose keys are the text versions of the conditions and whose values
402/// are the compiled conditions, and then finally return the hash map.
403pub fn get_compiled_datatype_conditions(
404    config: &ConfigMap,
405    parser: &StartParser,
406) -> HashMap<String, CompiledCondition> {
407    let mut compiled_datatype_conditions: HashMap<String, CompiledCondition> = HashMap::new();
408    let datatypes_config = config.get("datatype").and_then(|t| t.as_object()).unwrap();
409    for (_, row) in datatypes_config.iter() {
410        let row = row.as_object().unwrap();
411        let dt_name = row.get("datatype").and_then(|d| d.as_str()).unwrap();
412        let condition = row.get("condition").and_then(|c| c.as_str());
413        let compiled_condition =
414            compile_condition(condition, parser, &compiled_datatype_conditions);
415        if let Some(_) = condition {
416            compiled_datatype_conditions.insert(dt_name.to_string(), compiled_condition);
417        }
418    }
419
420    compiled_datatype_conditions
421}
422
423/// Given the global config map, a hash map of compiled datatype conditions (indexed by the text
424/// version of the conditions), and a parser, compile all of the rule conditions, add them to a
425/// hash which has the following structure:
426/// ```
427/// {
428///      table_1: {
429///          when_column_1: [rule_1, rule_2, ...],
430///          ...
431///      },
432///      ...
433/// }
434/// ```
435pub fn get_compiled_rule_conditions(
436    config: &ConfigMap,
437    compiled_datatype_conditions: HashMap<String, CompiledCondition>,
438    parser: &StartParser,
439) -> HashMap<String, HashMap<String, Vec<ColumnRule>>> {
440    let mut compiled_rule_conditions = HashMap::new();
441    let tables_config = config.get("table").and_then(|t| t.as_object()).unwrap();
442    let rules_config = config.get("rule").and_then(|t| t.as_object()).unwrap();
443    for (rules_table, table_rules) in rules_config.iter() {
444        let table_rules = table_rules.as_object().unwrap();
445        for (_, column_rules) in table_rules.iter() {
446            let column_rules = column_rules.as_array().unwrap();
447            for row in column_rules {
448                // Compile and collect the when and then conditions.
449                let mut column_rule_key = None;
450                for column in vec!["when column", "then column"] {
451                    let row_column = row.get(column).and_then(|c| c.as_str()).unwrap();
452                    if column == "when column" {
453                        column_rule_key = Some(row_column.to_string());
454                    }
455                    if !tables_config
456                        .get(rules_table)
457                        .and_then(|t| t.get("column"))
458                        .and_then(|c| c.as_object())
459                        .and_then(|c| Some(c.contains_key(row_column)))
460                        .unwrap()
461                    {
462                        panic!("Undefined column '{}.{}' in rules table", rules_table, row_column);
463                    }
464                }
465                let column_rule_key = column_rule_key.unwrap();
466
467                let mut when_compiled = None;
468                let mut then_compiled = None;
469                for column in vec!["when condition", "then condition"] {
470                    let condition_option = row.get(column).and_then(|c| c.as_str());
471                    if let Some(_) = condition_option {
472                        let compiled_condition = compile_condition(
473                            condition_option,
474                            parser,
475                            &compiled_datatype_conditions,
476                        );
477                        if column == "when condition" {
478                            when_compiled = Some(compiled_condition);
479                        } else if column == "then condition" {
480                            then_compiled = Some(compiled_condition);
481                        }
482                    }
483                }
484
485                if let (Some(when_compiled), Some(then_compiled)) = (when_compiled, then_compiled) {
486                    if !compiled_rule_conditions.contains_key(rules_table) {
487                        let table_rules = HashMap::new();
488                        compiled_rule_conditions.insert(rules_table.to_string(), table_rules);
489                    }
490                    let table_rules = compiled_rule_conditions.get_mut(rules_table).unwrap();
491                    if !table_rules.contains_key(&column_rule_key) {
492                        table_rules.insert(column_rule_key.to_string(), vec![]);
493                    }
494                    let column_rules = table_rules.get_mut(&column_rule_key).unwrap();
495                    column_rules.push(ColumnRule { when: when_compiled, then: then_compiled });
496                }
497            }
498        }
499    }
500
501    compiled_rule_conditions
502}
503
504/// Given the global config map and a parser, parse all of the structure conditions, add them to
505/// a hash map whose keys are given by the text versions of the conditions and whose values are
506/// given by the parsed versions, and finally return the hashmap.
507pub fn get_parsed_structure_conditions(
508    config: &ConfigMap,
509    parser: &StartParser,
510) -> HashMap<String, ParsedStructure> {
511    let mut parsed_structure_conditions = HashMap::new();
512    let tables_config = config.get("table").and_then(|t| t.as_object()).unwrap();
513    for (table, table_config) in tables_config.iter() {
514        let columns_config = table_config.get("column").and_then(|c| c.as_object()).unwrap();
515        for (_, row) in columns_config.iter() {
516            let row_table = table;
517            let structure = row.get("structure").and_then(|s| s.as_str());
518            match structure {
519                Some(structure) if structure != "" => {
520                    let parsed_structure = parser.parse(structure);
521                    if let Err(e) = parsed_structure {
522                        panic!(
523                            "While parsing structure: '{}' for column: '{}.{}' got error:\n{}",
524                            structure,
525                            row_table,
526                            row.get("table").and_then(|t| t.as_str()).unwrap(),
527                            e
528                        );
529                    }
530                    let parsed_structure = parsed_structure.unwrap();
531                    let parsed_structure = &parsed_structure[0];
532                    let parsed_structure = ParsedStructure {
533                        original: structure.to_string(),
534                        parsed: *parsed_structure.clone(),
535                    };
536                    parsed_structure_conditions.insert(structure.to_string(), parsed_structure);
537                }
538                _ => (),
539            };
540        }
541    }
542
543    parsed_structure_conditions
544}
545
546/// Given config maps for tables and datatypes, a database connection pool, and a StartParser,
547/// read in the TSV files corresponding to the tables defined in the tables config, and use that
548/// information to fill in constraints information into a new config map that is then returned along
549/// with a list of the tables in the database sorted according to their mutual dependencies. If
550/// the flag `verbose` is set to true, emit SQL to create the database schema to STDOUT.
551/// If `command` is set to [ValveCommand::Create], execute the SQL statements to create the
552/// database using the given connection pool. If it is set to [ValveCommand::Load], execute the SQL
553/// to load it as well.
554pub async fn configure_db(
555    tables_config: &mut ConfigMap,
556    datatypes_config: &mut ConfigMap,
557    pool: &AnyPool,
558    parser: &StartParser,
559    verbose: bool,
560    command: &ValveCommand,
561) -> Result<(Vec<String>, ConfigMap), sqlx::Error> {
562    // This is the ConfigMap that we will be returning:
563    let mut constraints_config = ConfigMap::new();
564    constraints_config.insert(String::from("foreign"), SerdeValue::Object(ConfigMap::new()));
565    constraints_config.insert(String::from("unique"), SerdeValue::Object(ConfigMap::new()));
566    constraints_config.insert(String::from("primary"), SerdeValue::Object(ConfigMap::new()));
567    constraints_config.insert(String::from("tree"), SerdeValue::Object(ConfigMap::new()));
568    constraints_config.insert(String::from("under"), SerdeValue::Object(ConfigMap::new()));
569
570    // Begin by reading in the TSV files corresponding to the tables defined in tables_config, and
571    // use that information to create the associated database tables, while saving constraint
572    // information to constrains_config.
573    let mut setup_statements = HashMap::new();
574    let table_names: Vec<String> = tables_config.keys().cloned().collect();
575    for table_name in table_names {
576        let path = tables_config
577            .get(&table_name)
578            .and_then(|r| r.get("path"))
579            .and_then(|p| p.as_str())
580            .unwrap();
581
582        // Get the columns that have been previously configured:
583        let defined_columns: Vec<String> = tables_config
584            .get(&table_name)
585            .and_then(|r| r.get("column"))
586            .and_then(|v| v.as_object())
587            .and_then(|o| Some(o.keys()))
588            .and_then(|k| Some(k.cloned()))
589            .and_then(|k| Some(k.collect()))
590            .unwrap();
591
592        // Get the actual columns from the data itself. Note that we set has_headers to false
593        // (even though the files have header rows) in order to explicitly read the header row.
594        let mut rdr = csv::ReaderBuilder::new().has_headers(false).delimiter(b'\t').from_reader(
595            File::open(path.clone()).unwrap_or_else(|err| {
596                panic!("Unable to open '{}': {}", path.clone(), err);
597            }),
598        );
599        let mut iter = rdr.records();
600        let actual_columns;
601        if let Some(result) = iter.next() {
602            actual_columns = result.unwrap();
603        } else {
604            panic!("'{}' is empty", path);
605        }
606
607        // We use column_order to explicitly indicate the order in which the columns should appear
608        // in the table, for later reference.
609        let mut column_order = vec![];
610        let mut all_columns: ConfigMap = ConfigMap::new();
611        for column_name in &actual_columns {
612            let column;
613            if !defined_columns.contains(&column_name.to_string()) {
614                let mut cmap = ConfigMap::new();
615                cmap.insert(String::from("table"), SerdeValue::String(table_name.to_string()));
616                cmap.insert(String::from("column"), SerdeValue::String(column_name.to_string()));
617                cmap.insert(String::from("nulltype"), SerdeValue::String(String::from("empty")));
618                cmap.insert(String::from("datatype"), SerdeValue::String(String::from("text")));
619                column = SerdeValue::Object(cmap);
620            } else {
621                column = tables_config
622                    .get(&table_name)
623                    .and_then(|r| r.get("column"))
624                    .and_then(|v| v.as_object())
625                    .and_then(|o| o.get(column_name))
626                    .unwrap()
627                    .clone();
628            }
629            column_order.push(SerdeValue::String(column_name.to_string()));
630            all_columns.insert(column_name.to_string(), column);
631        }
632
633        tables_config.get_mut(&table_name).and_then(|t| t.as_object_mut()).and_then(|o| {
634            o.insert(String::from("column"), SerdeValue::Object(all_columns));
635            o.insert(String::from("column_order"), SerdeValue::Array(column_order))
636        });
637
638        // Create the table and its corresponding conflict table:
639        let mut table_statements = vec![];
640        for table in vec![table_name.to_string(), format!("{}_conflict", table_name)] {
641            let (mut statements, table_constraints) =
642                create_table_statement(tables_config, datatypes_config, parser, &table, &pool);
643            table_statements.append(&mut statements);
644            if !table.ends_with("_conflict") {
645                for constraint_type in vec!["foreign", "unique", "primary", "tree", "under"] {
646                    let table_constraints = table_constraints.get(constraint_type).unwrap().clone();
647                    constraints_config
648                        .get_mut(constraint_type)
649                        .and_then(|o| o.as_object_mut())
650                        .and_then(|o| o.insert(table_name.to_string(), table_constraints));
651                }
652            }
653        }
654
655        // Create a view as the union of the regular and conflict versions of the table:
656        let mut drop_view_sql = format!(r#"DROP VIEW IF EXISTS "{}_view""#, table_name);
657        let inner_t;
658        if pool.any_kind() == AnyKind::Postgres {
659            drop_view_sql.push_str(" CASCADE");
660            inner_t = format!(
661                indoc! {r#"
662                    (
663                      SELECT JSON_AGG(t)::TEXT FROM (
664                        SELECT "column", "value", "level", "rule", "message"
665                        FROM "message"
666                        WHERE "table" = '{t}'
667                          AND "row" = union_t."row_number"
668                        ORDER BY "column", "message_id"
669                      ) t
670                    )
671                "#},
672                t = table_name,
673            );
674        } else {
675            inner_t = format!(
676                indoc! {r#"
677                    (
678                      SELECT NULLIF(
679                        JSON_GROUP_ARRAY(
680                          JSON_OBJECT(
681                            'column', "column",
682                            'value', "value",
683                            'level', "level",
684                            'rule', "rule",
685                            'message', "message"
686                          )
687                        ),
688                        '[]'
689                      )
690                      FROM "message"
691                      WHERE "table" = '{t}'
692                        AND "row" = union_t."row_number"
693                      ORDER BY "column", "message_id"
694                    )
695                "#},
696                t = table_name,
697            );
698        }
699        drop_view_sql.push_str(";");
700
701        let create_view_sql = format!(
702            indoc! {r#"
703              CREATE VIEW "{t}_view" AS
704                SELECT
705                  union_t.*,
706                  {inner_t} AS "message"
707                FROM (
708                  SELECT * FROM "{t}"
709                  UNION ALL
710                  SELECT * FROM "{t}_conflict"
711                ) as union_t;
712            "#},
713            t = table_name,
714            inner_t = inner_t,
715        );
716        table_statements.push(drop_view_sql);
717        table_statements.push(create_view_sql);
718
719        setup_statements.insert(table_name.to_string(), table_statements);
720    }
721
722    // Sort the tables according to their foreign key dependencies so that tables are always loaded
723    // after the tables they depend on:
724    let unsorted_tables: Vec<String> = setup_statements.keys().cloned().collect();
725    let sorted_tables = verify_table_deps_and_sort(&unsorted_tables, &constraints_config);
726
727    if *command != ValveCommand::Config || verbose {
728        // Generate DDL for the message table:
729        let mut message_statements = vec![];
730        let drop_sql = {
731            let mut sql = r#"DROP TABLE IF EXISTS "message""#.to_string();
732            if pool.any_kind() == AnyKind::Postgres {
733                sql.push_str(" CASCADE");
734            }
735            sql.push_str(";");
736            sql
737        };
738        message_statements.push(drop_sql);
739        message_statements.push(format!(
740            indoc! {r#"
741                CREATE TABLE "message" (
742                  {}
743                  "table" TEXT,
744                  "row" BIGINT,
745                  "column" TEXT,
746                  "value" TEXT,
747                  "level" TEXT,
748                  "rule" TEXT,
749                  "message" TEXT,
750                  UNIQUE ("table", "row", "column", "rule")
751                );
752              "#},
753            {
754                if pool.any_kind() == AnyKind::Sqlite {
755                    "\"message_id\" INTEGER PRIMARY KEY,"
756                } else {
757                    "\"message_id\" SERIAL PRIMARY KEY,"
758                }
759            },
760        ));
761        message_statements.push(
762            r#"CREATE INDEX "message_trc_idx" ON "message"("table", "row", "column");"#.to_string(),
763        );
764        setup_statements.insert("message".to_string(), message_statements);
765
766        // Add the message table to the beginning of the list of tables to create (we add it to the
767        // beginning since the table views all reference it).
768        let mut tables_to_create = vec!["message".to_string()];
769        tables_to_create.append(&mut sorted_tables.clone());
770        for table in &tables_to_create {
771            let table_statements = setup_statements.get(table).unwrap();
772            if *command != ValveCommand::Config {
773                for stmt in table_statements {
774                    sqlx_query(stmt)
775                        .execute(pool)
776                        .await
777                        .expect(format!("The SQL statement: {} returned an error", stmt).as_str());
778                }
779            }
780            if verbose {
781                let output = String::from(table_statements.join("\n"));
782                println!("{}\n", output);
783            }
784        }
785    }
786
787    return Ok((sorted_tables, constraints_config));
788}
789
790/// Various VALVE commands, used with [valve()](valve).
791#[derive(Debug, PartialEq, Eq)]
792pub enum ValveCommand {
793    /// Configure but do not create or load.
794    Config,
795    /// Configure and create but do not load.
796    Create,
797    /// Configure, create, and load.
798    Load,
799}
800
801/// Given a path to a configuration table (either a table.tsv file or a database containing a
802/// table named "table"), and a directory in which to find/create a database: configure the
803/// database using the configuration which can be looked up using the table table, and
804/// optionally create and/or load it according to the value of `command` (see [ValveCommand]).
805/// If the `verbose` flag is set to true, output status messages while loading. If `config_table`
806/// is given and `table_table` indicates a database, query the table called `config_table` for the
807/// table table information. Returns the configuration map as a String.
808pub async fn valve(
809    table_table: &str,
810    database: &str,
811    command: &ValveCommand,
812    verbose: bool,
813    config_table: &str,
814) -> Result<String, sqlx::Error> {
815    let parser = StartParser::new();
816
817    let (specials_config, mut tables_config, mut datatypes_config, rules_config) =
818        read_config_files(&table_table.to_string(), config_table);
819
820    // To connect to a postgresql database listening to a unix domain socket:
821    // ----------------------------------------------------------------------
822    // let connection_options =
823    //     AnyConnectOptions::from_str("postgres:///testdb?host=/var/run/postgresql")?;
824    //
825    // To query the connection type at runtime via the pool:
826    // -----------------------------------------------------
827    // let db_type = pool.any_kind();
828
829    let connection_options;
830    if database.starts_with("postgresql://") {
831        connection_options = AnyConnectOptions::from_str(database)?;
832    } else {
833        let connection_string;
834        if !database.starts_with("sqlite://") {
835            connection_string = format!("sqlite://{}?mode=rwc", database);
836        } else {
837            connection_string = database.to_string();
838        }
839        connection_options = AnyConnectOptions::from_str(connection_string.as_str()).unwrap();
840    }
841
842    let pool = AnyPoolOptions::new().max_connections(5).connect_with(connection_options).await?;
843    if *command == ValveCommand::Load && pool.any_kind() == AnyKind::Sqlite {
844        sqlx_query("PRAGMA foreign_keys = ON").execute(&pool).await?;
845    }
846
847    let (sorted_table_list, constraints_config) =
848        configure_db(&mut tables_config, &mut datatypes_config, &pool, &parser, verbose, command)
849            .await?;
850
851    let mut config = ConfigMap::new();
852    config.insert(String::from("special"), SerdeValue::Object(specials_config.clone()));
853    config.insert(String::from("table"), SerdeValue::Object(tables_config.clone()));
854    config.insert(String::from("datatype"), SerdeValue::Object(datatypes_config.clone()));
855    config.insert(String::from("rule"), SerdeValue::Object(rules_config.clone()));
856    config.insert(String::from("constraints"), SerdeValue::Object(constraints_config.clone()));
857    let mut sorted_table_serdevalue_list: Vec<SerdeValue> = vec![];
858    for table in &sorted_table_list {
859        sorted_table_serdevalue_list.push(SerdeValue::String(table.to_string()));
860    }
861    config
862        .insert(String::from("sorted_table_list"), SerdeValue::Array(sorted_table_serdevalue_list));
863
864    let compiled_datatype_conditions = get_compiled_datatype_conditions(&config, &parser);
865    let compiled_rule_conditions =
866        get_compiled_rule_conditions(&config, compiled_datatype_conditions.clone(), &parser);
867
868    if *command == ValveCommand::Load {
869        if verbose {
870            eprintln!("{} - Processing {} tables.", Utc::now(), sorted_table_list.len());
871        }
872        load_db(&config, &pool, &compiled_datatype_conditions, &compiled_rule_conditions, verbose)
873            .await?;
874    }
875
876    let config = SerdeValue::Object(config);
877    Ok(config.to_string())
878}
879
880/// Given a global config map, a database connection pool, a table name, and a row, assign a new
881/// row number to the row and insert it to the database, then return the new row number.
882pub async fn insert_new_row(
883    global_config: &ConfigMap,
884    pool: &AnyPool,
885    table_name: &str,
886    row: &ConfigMap,
887) -> Result<u32, sqlx::Error> {
888    // The new row number to insert is the current highest row number + 1.
889    let sql = format!(r#"SELECT MAX("row_number") AS "row_number" FROM "{}_view""#, table_name);
890    let query = sqlx_query(&sql);
891    let result_row = query.fetch_one(pool).await?;
892    let result = result_row.try_get_raw("row_number").unwrap();
893    let new_row_number: i64;
894    if result.is_null() {
895        new_row_number = 1;
896    } else {
897        new_row_number = result_row.get("row_number");
898    }
899    let new_row_number = new_row_number as u32 + 1;
900
901    let mut insert_columns = vec![];
902    let mut insert_values = vec![];
903    let mut insert_params = vec![];
904    let mut messages = vec![];
905    let sorted_datatypes = get_sorted_datatypes(global_config);
906    for (column, cell) in row.iter() {
907        insert_columns.append(&mut vec![format!(r#""{}""#, column)]);
908        let cell = cell.as_object().unwrap();
909        let cell_valid = cell.get("valid").and_then(|v| v.as_bool()).unwrap();
910        let cell_value = cell.get("value").and_then(|v| v.as_str()).unwrap();
911        let mut cell_for_insert = cell.clone();
912        if cell_valid {
913            cell_for_insert.remove("value");
914            let sql_type = get_sql_type_from_global_config(
915                &global_config,
916                &table_name.to_string(),
917                &column,
918                pool,
919            )
920            .unwrap();
921            insert_values.push(cast_sql_param_from_text(&sql_type));
922            insert_params.push(String::from(cell_value));
923        } else {
924            insert_values.push(String::from("NULL"));
925            let cell_messages = sort_messages(
926                &sorted_datatypes,
927                cell.get("messages").and_then(|m| m.as_array()).unwrap(),
928            );
929            for cell_message in cell_messages {
930                messages.push(json!({
931                    "column": column,
932                    "value": cell_value,
933                    "level": cell_message.get("level").and_then(|s| s.as_str()).unwrap(),
934                    "rule": cell_message.get("rule").and_then(|s| s.as_str()).unwrap(),
935                    "message": cell_message.get("message").and_then(|s| s.as_str()).unwrap(),
936                }));
937            }
938        }
939    }
940
941    // First add the new row to the table:
942    let insert_stmt = local_sql_syntax(
943        &pool,
944        &format!(
945            r#"INSERT INTO "{}" ("row_number", {}) VALUES ({}, {})"#,
946            table_name,
947            insert_columns.join(", "),
948            new_row_number,
949            insert_values.join(", "),
950        ),
951    );
952
953    let mut query = sqlx_query(&insert_stmt);
954    for param in &insert_params {
955        query = query.bind(param);
956    }
957    query.execute(pool).await?;
958
959    // Next add any validation messages to the message table:
960    for m in messages {
961        let column = m.get("column").and_then(|c| c.as_str()).unwrap();
962        let value = m.get("value").and_then(|c| c.as_str()).unwrap();
963        let level = m.get("level").and_then(|c| c.as_str()).unwrap();
964        let rule = m.get("rule").and_then(|c| c.as_str()).unwrap();
965        let message = m.get("message").and_then(|c| c.as_str()).unwrap();
966        let message_sql = format!(
967            r#"INSERT INTO "message"
968               ("table", "row", "column", "value", "level", "rule", "message")
969               VALUES ('{}', {}, '{}', '{}', '{}', '{}', '{}')"#,
970            table_name, new_row_number, column, value, level, rule, message
971        );
972        let query = sqlx_query(&message_sql);
973        query.execute(pool).await?;
974    }
975
976    Ok(new_row_number)
977}
978
979/// Given global config map, a database connection pool, a table name, a row, and the row number to
980/// update, update the corresponding row in the database with new values as specified by `row`.
981pub async fn update_row(
982    global_config: &ConfigMap,
983    pool: &AnyPool,
984    table_name: &str,
985    row: &ConfigMap,
986    row_number: u32,
987) -> Result<(), sqlx::Error> {
988    let mut assignments = vec![];
989    let mut params = vec![];
990    let mut messages = vec![];
991    let sorted_datatypes = get_sorted_datatypes(global_config);
992    for (column, cell) in row.iter() {
993        let cell = cell.as_object().unwrap();
994        let cell_valid = cell.get("valid").and_then(|v| v.as_bool()).unwrap();
995        let cell_value = cell.get("value").and_then(|v| v.as_str()).unwrap();
996        let mut cell_for_insert = cell.clone();
997        if cell_valid {
998            cell_for_insert.remove("value");
999            let sql_type = get_sql_type_from_global_config(
1000                &global_config,
1001                &table_name.to_string(),
1002                &column,
1003                pool,
1004            )
1005            .unwrap();
1006            assignments.push(format!(r#""{}" = {}"#, column, cast_sql_param_from_text(&sql_type)));
1007            params.push(String::from(cell_value));
1008        } else {
1009            assignments.push(format!(r#""{}" = NULL"#, column));
1010            let cell_messages = sort_messages(
1011                &sorted_datatypes,
1012                cell.get("messages").and_then(|m| m.as_array()).unwrap(),
1013            );
1014            for cell_message in cell_messages {
1015                messages.push(json!({
1016                    "column": String::from(column),
1017                    "value": String::from(cell_value),
1018                    "level": cell_message.get("level").and_then(|s| s.as_str()).unwrap(),
1019                    "rule": cell_message.get("rule").and_then(|s| s.as_str()).unwrap(),
1020                    "message": cell_message.get("message").and_then(|s| s.as_str()).unwrap(),
1021                }));
1022            }
1023        }
1024    }
1025
1026    // First update the given row in the table:
1027    let mut update_stmt = format!(r#"UPDATE "{}" SET "#, table_name);
1028    update_stmt.push_str(&assignments.join(", "));
1029    update_stmt.push_str(&format!(r#" WHERE "row_number" = {}"#, row_number));
1030    let update_stmt = local_sql_syntax(&pool, &update_stmt);
1031
1032    let mut query = sqlx_query(&update_stmt);
1033    for param in &params {
1034        query = query.bind(param);
1035    }
1036    query.execute(pool).await?;
1037
1038    // Next delete any messages that had been previously inserted to the message table for the old
1039    // version of this row:
1040    let delete_sql = format!(
1041        r#"DELETE FROM "message" WHERE "table" = '{}' AND "row" = {}"#,
1042        table_name, row_number
1043    );
1044    let query = sqlx_query(&delete_sql);
1045    query.execute(pool).await?;
1046
1047    // Finally add the messages to the message table for the new version of this row:
1048    for m in messages {
1049        let column = m.get("column").and_then(|c| c.as_str()).unwrap();
1050        let value = m.get("value").and_then(|c| c.as_str()).unwrap();
1051        let level = m.get("level").and_then(|c| c.as_str()).unwrap();
1052        let rule = m.get("rule").and_then(|c| c.as_str()).unwrap();
1053        let message = m.get("message").and_then(|c| c.as_str()).unwrap();
1054        let insert_sql = format!(
1055            r#"INSERT INTO "message"
1056               ("table", "row", "column", "value", "level", "rule", "message")
1057               VALUES ('{}', {}, '{}', '{}', '{}', '{}', '{}')"#,
1058            table_name, row_number, column, value, level, rule, message
1059        );
1060        let query = sqlx_query(&insert_sql);
1061        query.execute(pool).await?;
1062    }
1063
1064    Ok(())
1065}
1066
1067/// Given a path, read a TSV file and return a vector of rows represented as ConfigMaps.
1068/// Note: Use this function to read "small" TSVs only. In particular, use this for the special
1069/// configuration tables.
1070fn read_tsv_into_vector(path: &str) -> Vec<ConfigMap> {
1071    let mut rdr = csv::ReaderBuilder::new().delimiter(b'\t').from_reader(
1072        File::open(path).unwrap_or_else(|err| {
1073            panic!("Unable to open '{}': {}", path, err);
1074        }),
1075    );
1076
1077    let rows: Vec<_> = rdr
1078        .deserialize()
1079        .map(|result| {
1080            let row: ConfigMap = result.expect(format!("Error reading: {}", path).as_str());
1081            row
1082        })
1083        .collect();
1084
1085    if rows.len() < 1 {
1086        panic!("No rows in {}", path);
1087    }
1088
1089    for (i, row) in rows.iter().enumerate() {
1090        // enumerate() begins at 0 but we want to count rows from 1:
1091        let i = i + 1;
1092        for (col, val) in row {
1093            let val = val.as_str().unwrap();
1094            let trimmed_val = val.trim();
1095            if trimmed_val != val {
1096                eprintln!(
1097                    "Error: Value '{}' of column '{}' in row {} of table '{}' {}",
1098                    val, col, i, path, "has leading and/or trailing whitespace."
1099                );
1100                process::exit(1);
1101            }
1102        }
1103    }
1104
1105    rows
1106}
1107
1108/// Given a database at the specified location, query the "table" table and return a vector of rows
1109/// represented as ConfigMaps.
1110fn read_db_table_into_vector(database: &str, config_table: &str) -> Vec<ConfigMap> {
1111    let connection_options;
1112    if database.starts_with("postgresql://") {
1113        connection_options = AnyConnectOptions::from_str(database).unwrap();
1114    } else {
1115        let connection_string;
1116        if !database.starts_with("sqlite://") {
1117            connection_string = format!("sqlite://{}?mode=ro", database);
1118        } else {
1119            connection_string = database.to_string();
1120        }
1121        connection_options = AnyConnectOptions::from_str(connection_string.as_str()).unwrap();
1122    }
1123
1124    let pool = block_on(AnyPoolOptions::new().max_connections(5).connect_with(connection_options))
1125        .unwrap();
1126
1127    let sql = format!("SELECT * FROM \"{}\"", config_table);
1128    let rows = block_on(sqlx_query(&sql).fetch_all(&pool)).unwrap();
1129    let mut table_rows = vec![];
1130    for row in rows {
1131        let mut table_row = ConfigMap::new();
1132        for column in row.columns() {
1133            let cname = column.name();
1134            if cname != "row_number" {
1135                let raw_value = row.try_get_raw(format!(r#"{}"#, cname).as_str()).unwrap();
1136                if !raw_value.is_null() {
1137                    let value = get_column_value(&row, &cname, "text");
1138                    table_row.insert(cname.to_string(), json!(value));
1139                } else {
1140                    table_row.insert(cname.to_string(), json!(""));
1141                }
1142            }
1143        }
1144        table_rows.push(table_row);
1145    }
1146    table_rows
1147}
1148
1149/// Given a condition on a datatype, if the condition is a Function, then parse it using
1150/// StartParser, create a corresponding CompiledCondition, and return it. If the condition is a
1151/// Label, then look for the CompiledCondition corresponding to it in compiled_datatype_conditions
1152/// and return it.
1153fn compile_condition(
1154    condition_option: Option<&str>,
1155    parser: &StartParser,
1156    compiled_datatype_conditions: &HashMap<String, CompiledCondition>,
1157) -> CompiledCondition {
1158    let unquoted_re = Regex::new(r#"^['"](?P<unquoted>.*)['"]$"#).unwrap();
1159    match condition_option {
1160        // The case of no condition, or a "null" or "not null" condition, will be treated specially
1161        // later during the validation phase in a way that does not utilise the associated closure.
1162        // Since we still have to assign some closure in these cases, we use a constant closure that
1163        // always returns true:
1164        None => {
1165            return CompiledCondition {
1166                original: String::from(""),
1167                parsed: Expression::None,
1168                compiled: Arc::new(|_| true),
1169            }
1170        }
1171        Some("null") => {
1172            return CompiledCondition {
1173                original: String::from("null"),
1174                parsed: Expression::Null,
1175                compiled: Arc::new(|_| true),
1176            }
1177        }
1178        Some("not null") => {
1179            return CompiledCondition {
1180                original: String::from("not null"),
1181                parsed: Expression::NotNull,
1182                compiled: Arc::new(|_| true),
1183            }
1184        }
1185        Some(condition) => {
1186            let parsed_condition = parser.parse(condition);
1187            if let Err(_) = parsed_condition {
1188                panic!("ERROR: Could not parse condition: {}", condition);
1189            }
1190            let parsed_condition = parsed_condition.unwrap();
1191            if parsed_condition.len() != 1 {
1192                panic!(
1193                    "ERROR: Invalid condition: '{}'. Only one condition per column is allowed.",
1194                    condition
1195                );
1196            }
1197            let parsed_condition = &parsed_condition[0];
1198            match &**parsed_condition {
1199                Expression::Function(name, args) => {
1200                    if name == "equals" {
1201                        if let Expression::Label(label) = &*args[0] {
1202                            let label = String::from(unquoted_re.replace(label, "$unquoted"));
1203                            return CompiledCondition {
1204                                original: condition.to_string(),
1205                                parsed: *parsed_condition.clone(),
1206                                compiled: Arc::new(move |x| x == label),
1207                            };
1208                        } else {
1209                            panic!("ERROR: Invalid condition: {}", condition);
1210                        }
1211                    } else if vec!["exclude", "match", "search"].contains(&name.as_str()) {
1212                        if let Expression::RegexMatch(pattern, flags) = &*args[0] {
1213                            let mut pattern =
1214                                String::from(unquoted_re.replace(pattern, "$unquoted"));
1215                            let mut flags = String::from(flags);
1216                            if flags != "" {
1217                                flags = format!("(?{})", flags.as_str());
1218                            }
1219                            match name.as_str() {
1220                                "exclude" => {
1221                                    pattern = format!("{}{}", flags, pattern);
1222                                    let re = Regex::new(pattern.as_str()).unwrap();
1223                                    return CompiledCondition {
1224                                        original: condition.to_string(),
1225                                        parsed: *parsed_condition.clone(),
1226                                        compiled: Arc::new(move |x| !re.is_match(x)),
1227                                    };
1228                                }
1229                                "match" => {
1230                                    pattern = format!("^{}{}$", flags, pattern);
1231                                    let re = Regex::new(pattern.as_str()).unwrap();
1232                                    return CompiledCondition {
1233                                        original: condition.to_string(),
1234                                        parsed: *parsed_condition.clone(),
1235                                        compiled: Arc::new(move |x| re.is_match(x)),
1236                                    };
1237                                }
1238                                "search" => {
1239                                    pattern = format!("{}{}", flags, pattern);
1240                                    let re = Regex::new(pattern.as_str()).unwrap();
1241                                    return CompiledCondition {
1242                                        original: condition.to_string(),
1243                                        parsed: *parsed_condition.clone(),
1244                                        compiled: Arc::new(move |x| re.is_match(x)),
1245                                    };
1246                                }
1247                                _ => panic!("Unrecognized function name: {}", name),
1248                            };
1249                        } else {
1250                            panic!(
1251                                "Argument to condition: {} is not a regular expression",
1252                                condition
1253                            );
1254                        }
1255                    } else if name == "in" {
1256                        let mut alternatives: Vec<String> = vec![];
1257                        for arg in args {
1258                            if let Expression::Label(value) = &**arg {
1259                                let value = unquoted_re.replace(value, "$unquoted");
1260                                alternatives.push(value.to_string());
1261                            } else {
1262                                panic!("Argument: {:?} to function 'in' is not a label", arg);
1263                            }
1264                        }
1265                        return CompiledCondition {
1266                            original: condition.to_string(),
1267                            parsed: *parsed_condition.clone(),
1268                            compiled: Arc::new(move |x| alternatives.contains(&x.to_string())),
1269                        };
1270                    } else {
1271                        panic!("Unrecognized function name: {}", name);
1272                    }
1273                }
1274                Expression::Label(value)
1275                    if compiled_datatype_conditions.contains_key(&value.to_string()) =>
1276                {
1277                    let compiled_datatype_condition =
1278                        compiled_datatype_conditions.get(&value.to_string()).unwrap();
1279                    return CompiledCondition {
1280                        original: value.to_string(),
1281                        parsed: compiled_datatype_condition.parsed.clone(),
1282                        compiled: compiled_datatype_condition.compiled.clone(),
1283                    };
1284                }
1285                _ => {
1286                    panic!("Unrecognized condition: {}", condition);
1287                }
1288            };
1289        }
1290    };
1291}
1292
1293/// Given the config map, the name of a datatype, and a database connection pool used to determine
1294/// the database type, climb the datatype tree (as required), and return the first 'SQL type' found.
1295fn get_sql_type(dt_config: &ConfigMap, datatype: &String, pool: &AnyPool) -> Option<String> {
1296    if !dt_config.contains_key(datatype) {
1297        return None;
1298    }
1299
1300    let sql_type_column = {
1301        if pool.any_kind() == AnyKind::Sqlite {
1302            "SQLite type"
1303        } else {
1304            "PostgreSQL type"
1305        }
1306    };
1307
1308    if let Some(sql_type) = dt_config.get(datatype).and_then(|d| d.get(sql_type_column)) {
1309        return Some(sql_type.as_str().and_then(|s| Some(s.to_string())).unwrap());
1310    }
1311
1312    let parent_datatype =
1313        dt_config.get(datatype).and_then(|d| d.get("parent")).and_then(|p| p.as_str()).unwrap();
1314
1315    return get_sql_type(dt_config, &parent_datatype.to_string(), pool);
1316}
1317
1318/// Given the global config map, a table name, a column name, and a database connection pool
1319/// used to determine the database type return the column's SQL type.
1320fn get_sql_type_from_global_config(
1321    global_config: &ConfigMap,
1322    table: &str,
1323    column: &str,
1324    pool: &AnyPool,
1325) -> Option<String> {
1326    let dt_config = global_config.get("datatype").and_then(|d| d.as_object()).unwrap();
1327    let normal_table_name;
1328    if let Some(s) = table.strip_suffix("_conflict") {
1329        normal_table_name = String::from(s);
1330    } else {
1331        normal_table_name = table.to_string();
1332    }
1333    let dt = global_config
1334        .get("table")
1335        .and_then(|t| t.get(normal_table_name))
1336        .and_then(|t| t.get("column"))
1337        .and_then(|c| c.get(column))
1338        .and_then(|c| c.get("datatype"))
1339        .and_then(|d| d.as_str())
1340        .and_then(|d| Some(d.to_string()))
1341        .unwrap();
1342    get_sql_type(&dt_config, &dt, pool)
1343}
1344
1345/// Given a SQL type, return the appropriate CAST(...) statement for casting the SQL_PARAM
1346/// from a TEXT column.
1347fn cast_sql_param_from_text(sql_type: &str) -> String {
1348    if sql_type.to_lowercase() == "integer" {
1349        format!("CAST(NULLIF({}, '') AS INTEGER)", SQL_PARAM)
1350    } else {
1351        String::from(SQL_PARAM)
1352    }
1353}
1354
1355/// Given a SQL type, return the appropriate CAST(...) statement for casting the SQL_PARAM
1356/// to a TEXT column.
1357fn cast_column_sql_to_text(column: &str, sql_type: &str) -> String {
1358    if sql_type.to_lowercase() == "integer" {
1359        format!(r#"CAST("{}" AS TEXT)"#, column)
1360    } else {
1361        format!(r#""{}""#, column)
1362    }
1363}
1364
1365/// Given a database row, the name of a column, and it's SQL type, return the value of that column
1366/// from the given row as a String.
1367fn get_column_value(row: &AnyRow, column: &str, sql_type: &str) -> String {
1368    if sql_type.to_lowercase() == "integer" {
1369        let value: i32 = row.get(format!(r#"{}"#, column).as_str());
1370        value.to_string()
1371    } else {
1372        let value: &str = row.get(format!(r#"{}"#, column).as_str());
1373        value.to_string()
1374    }
1375}
1376
1377/// Given a SQL string, possibly with unbound parameters represented by the placeholder string
1378/// SQL_PARAM, and given a database pool, if the pool is of type Sqlite, then change the syntax used
1379/// for unbound parameters to Sqlite syntax, which uses "?", otherwise use Postgres syntax, which
1380/// uses numbered parameters, i.e., $1, $2, ...
1381fn local_sql_syntax(pool: &AnyPool, sql: &String) -> String {
1382    // Do not replace instances of SQL_PARAM if they are within quotation marks.
1383    let rx = Regex::new(&format!(
1384        r#"('[^'\\]*(?:\\.[^'\\]*)*'|"[^"\\]*(?:\\.[^"\\]*)*")|\b{}\b"#,
1385        SQL_PARAM
1386    ))
1387    .unwrap();
1388
1389    let mut final_sql = String::from("");
1390    let mut pg_param_idx = 1;
1391    let mut saved_start = 0;
1392    for m in rx.find_iter(sql) {
1393        let this_match = &sql[m.start()..m.end()];
1394        final_sql.push_str(&sql[saved_start..m.start()]);
1395        if this_match == SQL_PARAM {
1396            if pool.any_kind() == AnyKind::Postgres {
1397                final_sql.push_str(&format!("${}", pg_param_idx));
1398                pg_param_idx += 1;
1399            } else {
1400                final_sql.push_str(&format!("?"));
1401            }
1402        } else {
1403            final_sql.push_str(&format!("{}", this_match));
1404        }
1405        saved_start = m.start() + this_match.len();
1406    }
1407    final_sql.push_str(&sql[saved_start..]);
1408    final_sql
1409}
1410
1411/// Takes as arguments a list of tables and a configuration map describing all of the constraints
1412/// between tables. After validating that there are no cycles amongst the foreign, tree, and
1413/// under dependencies, returns the list of tables sorted according to their foreign key
1414/// dependencies, such that if table_a depends on table_b, then table_b comes before table_a in the
1415/// list that is returned.
1416fn verify_table_deps_and_sort(table_list: &Vec<String>, constraints: &ConfigMap) -> Vec<String> {
1417    fn get_cycles(g: &DiGraphMap<&str, ()>) -> Result<Vec<String>, Vec<Vec<String>>> {
1418        let mut cycles = vec![];
1419        match toposort(&g, None) {
1420            Err(cycle) => {
1421                let problem_node = cycle.node_id();
1422                let neighbours = g.neighbors_directed(problem_node, Direction::Outgoing);
1423                for neighbour in neighbours {
1424                    let ways_to_problem_node =
1425                        all_simple_paths::<Vec<_>, _>(&g, neighbour, problem_node, 0, None);
1426                    for mut way in ways_to_problem_node {
1427                        let mut cycle = vec![problem_node];
1428                        cycle.append(&mut way);
1429                        let cycle = cycle.iter().map(|&item| item.to_string()).collect::<Vec<_>>();
1430                        cycles.push(cycle);
1431                    }
1432                }
1433                Err(cycles)
1434            }
1435            Ok(sorted) => {
1436                let mut sorted = sorted.iter().map(|&item| item.to_string()).collect::<Vec<_>>();
1437                sorted.reverse();
1438                Ok(sorted)
1439            }
1440        }
1441    }
1442
1443    let trees = constraints.get("tree").and_then(|t| t.as_object()).unwrap();
1444    for table_name in table_list {
1445        let mut dependency_graph = DiGraphMap::<&str, ()>::new();
1446        let table_trees = trees.get(table_name).and_then(|t| t.as_array()).unwrap();
1447        for tree in table_trees {
1448            let tree = tree.as_object().unwrap();
1449            let child = tree.get("child").and_then(|c| c.as_str()).unwrap();
1450            let parent = tree.get("parent").and_then(|p| p.as_str()).unwrap();
1451            let c_index = dependency_graph.add_node(child);
1452            let p_index = dependency_graph.add_node(parent);
1453            dependency_graph.add_edge(c_index, p_index, ());
1454        }
1455        match get_cycles(&dependency_graph) {
1456            Ok(_) => (),
1457            Err(cycles) => {
1458                let mut message = String::new();
1459                for cycle in cycles {
1460                    message.push_str(
1461                        format!("Cyclic dependency in table '{}': ", table_name).as_str(),
1462                    );
1463                    let end_index = cycle.len() - 1;
1464                    for (i, child) in cycle.iter().enumerate() {
1465                        if i < end_index {
1466                            let dep = table_trees
1467                                .iter()
1468                                .find(|d| d.get("child").unwrap().as_str() == Some(child))
1469                                .and_then(|d| d.as_object())
1470                                .unwrap();
1471                            let parent = dep.get("parent").unwrap();
1472                            message.push_str(
1473                                format!("tree({}) references {}", child, parent).as_str(),
1474                            );
1475                        }
1476                        if i < (end_index - 1) {
1477                            message.push_str(" and ");
1478                        }
1479                    }
1480                    message.push_str(". ");
1481                }
1482                panic!("{}", message);
1483            }
1484        };
1485    }
1486
1487    let foreign_keys = constraints.get("foreign").and_then(|f| f.as_object()).unwrap();
1488    let under_keys = constraints.get("under").and_then(|u| u.as_object()).unwrap();
1489    let mut dependency_graph = DiGraphMap::<&str, ()>::new();
1490    for table_name in table_list {
1491        let t_index = dependency_graph.add_node(table_name);
1492        let fkeys = foreign_keys.get(table_name).and_then(|f| f.as_array()).unwrap();
1493        for fkey in fkeys {
1494            let ftable = fkey.get("ftable").and_then(|f| f.as_str()).unwrap();
1495            let f_index = dependency_graph.add_node(ftable);
1496            dependency_graph.add_edge(t_index, f_index, ());
1497        }
1498
1499        let ukeys = under_keys.get(table_name).and_then(|u| u.as_array()).unwrap();
1500        for ukey in ukeys {
1501            let ttable = ukey.get("ttable").and_then(|t| t.as_str()).unwrap();
1502            let tcolumn = ukey.get("tcolumn").and_then(|t| t.as_str()).unwrap();
1503            let value = ukey.get("value").and_then(|t| t.as_str()).unwrap();
1504            if ttable != table_name {
1505                let ttable_trees = trees.get(ttable).and_then(|t| t.as_array()).unwrap();
1506                if ttable_trees
1507                    .iter()
1508                    .filter(|d| d.get("child").unwrap().as_str() == Some(tcolumn))
1509                    .collect::<Vec<_>>()
1510                    .is_empty()
1511                {
1512                    panic!(
1513                        "under({}.{}, {}) refers to a non-existent tree",
1514                        ttable, tcolumn, value
1515                    );
1516                }
1517                let tt_index = dependency_graph.add_node(ttable);
1518                dependency_graph.add_edge(t_index, tt_index, ());
1519            }
1520        }
1521    }
1522
1523    match get_cycles(&dependency_graph) {
1524        Ok(sorted_table_list) => {
1525            return sorted_table_list;
1526        }
1527        Err(cycles) => {
1528            let mut message = String::new();
1529            for cycle in cycles {
1530                message.push_str(
1531                    format!("Cyclic dependency between tables {}: ", cycle.join(", ")).as_str(),
1532                );
1533                let end_index = cycle.len() - 1;
1534                for (i, table) in cycle.iter().enumerate() {
1535                    if i < end_index {
1536                        let dep_name = cycle.get(i + 1).unwrap().as_str();
1537                        let fkeys = foreign_keys.get(table).and_then(|f| f.as_array()).unwrap();
1538                        let ukeys = under_keys.get(table).and_then(|u| u.as_array()).unwrap();
1539                        let column;
1540                        let ref_table;
1541                        let ref_column;
1542                        if let Some(dep) = fkeys
1543                            .iter()
1544                            .find(|d| d.get("ftable").unwrap().as_str() == Some(dep_name))
1545                            .and_then(|d| d.as_object())
1546                        {
1547                            column = dep.get("column").unwrap();
1548                            ref_table = dep.get("ftable").unwrap();
1549                            ref_column = dep.get("fcolumn").unwrap();
1550                        } else if let Some(dep) = ukeys
1551                            .iter()
1552                            .find(|d| d.get("ttable").unwrap().as_str() == Some(dep_name))
1553                            .and_then(|d| d.as_object())
1554                        {
1555                            column = dep.get("column").unwrap();
1556                            ref_table = dep.get("ttable").unwrap();
1557                            ref_column = dep.get("tcolumn").unwrap();
1558                        } else {
1559                            panic!("{}. Unable to retrieve the details.", message);
1560                        }
1561
1562                        message.push_str(
1563                            format!(
1564                                "{}.{} depends on {}.{}",
1565                                table,
1566                                column,
1567                                ref_table,
1568                                ref_column,
1569                            )
1570                            .as_str(),
1571                        );
1572                    }
1573                    if i < (end_index - 1) {
1574                        message.push_str(" and ");
1575                    }
1576                }
1577                message.push_str(". ");
1578            }
1579            panic!("{}", message);
1580        }
1581    };
1582}
1583
1584/// Given the config maps for tables and datatypes, and a table name, generate a SQL schema string,
1585/// including each column C and its matching C_meta column, then return the schema string as well as
1586/// a list of the table's constraints.
1587fn create_table_statement(
1588    tables_config: &mut ConfigMap,
1589    datatypes_config: &mut ConfigMap,
1590    parser: &StartParser,
1591    table_name: &String,
1592    pool: &AnyPool,
1593) -> (Vec<String>, SerdeValue) {
1594    let mut drop_table_sql = format!(r#"DROP TABLE IF EXISTS "{}""#, table_name);
1595    if pool.any_kind() == AnyKind::Postgres {
1596        drop_table_sql.push_str(" CASCADE");
1597    }
1598    drop_table_sql.push_str(";");
1599    let mut statements = vec![drop_table_sql];
1600    let mut create_lines = vec![
1601        format!(r#"CREATE TABLE "{}" ("#, table_name),
1602        String::from(r#"  "row_number" BIGINT,"#),
1603    ];
1604
1605    let normal_table_name;
1606    if let Some(s) = table_name.strip_suffix("_conflict") {
1607        normal_table_name = String::from(s);
1608    } else {
1609        normal_table_name = table_name.to_string();
1610    }
1611
1612    let column_names = tables_config
1613        .get(&normal_table_name)
1614        .and_then(|t| t.get("column_order"))
1615        .and_then(|c| c.as_array())
1616        .unwrap()
1617        .iter()
1618        .map(|v| v.as_str().unwrap().to_string())
1619        .collect::<Vec<_>>();
1620
1621    let columns = tables_config
1622        .get(normal_table_name.as_str())
1623        .and_then(|c| c.as_object())
1624        .and_then(|o| o.get("column"))
1625        .and_then(|c| c.as_object())
1626        .unwrap();
1627
1628    let mut table_constraints = json!({
1629        "foreign": [],
1630        "unique": [],
1631        "primary": [],
1632        "tree": [],
1633        "under": [],
1634    });
1635
1636    let mut colvals: Vec<ConfigMap> = vec![];
1637    for column_name in &column_names {
1638        let column = columns.get(column_name).and_then(|c| c.as_object()).unwrap();
1639        colvals.push(column.clone());
1640    }
1641
1642    let c = colvals.len();
1643    let mut r = 0;
1644    for row in colvals {
1645        r += 1;
1646        let sql_type = get_sql_type(
1647            datatypes_config,
1648            &row.get("datatype")
1649                .and_then(|d| d.as_str())
1650                .and_then(|s| Some(s.to_string()))
1651                .unwrap(),
1652            pool,
1653        );
1654
1655        if let None = sql_type {
1656            panic!("Missing SQL type for {}", row.get("datatype").unwrap());
1657        }
1658        let sql_type = sql_type.unwrap();
1659
1660        let short_sql_type = {
1661            if sql_type.to_lowercase().as_str().starts_with("varchar(") {
1662                "VARCHAR"
1663            } else {
1664                &sql_type
1665            }
1666        };
1667
1668        if pool.any_kind() == AnyKind::Postgres {
1669            if !PG_SQL_TYPES.contains(&short_sql_type.to_lowercase().as_str()) {
1670                panic!(
1671                    "Unrecognized PostgreSQL SQL type '{}' for datatype: '{}'. \
1672                     Accepted SQL types for PostgreSQL are: {}",
1673                    sql_type,
1674                    row.get("datatype").and_then(|d| d.as_str()).unwrap(),
1675                    PG_SQL_TYPES.join(", ")
1676                );
1677            }
1678        } else {
1679            if !SL_SQL_TYPES.contains(&short_sql_type.to_lowercase().as_str()) {
1680                panic!(
1681                    "Unrecognized SQLite SQL type '{}' for datatype '{}'. \
1682                     Accepted SQL datatypes for SQLite are: {}",
1683                    sql_type,
1684                    row.get("datatype").and_then(|d| d.as_str()).unwrap(),
1685                    SL_SQL_TYPES.join(", ")
1686                );
1687            }
1688        }
1689
1690        let column_name = row.get("column").and_then(|s| s.as_str()).unwrap();
1691        let mut line = format!(r#"  "{}" {}"#, column_name, sql_type);
1692        let structure = row.get("structure").and_then(|s| s.as_str());
1693        if let Some(structure) = structure {
1694            if structure != "" && !table_name.ends_with("_conflict") {
1695                let parsed_structure = parser.parse(structure).unwrap();
1696                for expression in parsed_structure {
1697                    match *expression {
1698                        Expression::Label(value) if value == "primary" => {
1699                            line.push_str(" PRIMARY KEY");
1700                            let primary_keys = table_constraints
1701                                .get_mut("primary")
1702                                .and_then(|v| v.as_array_mut())
1703                                .unwrap();
1704                            primary_keys.push(SerdeValue::String(column_name.to_string()));
1705                        }
1706                        Expression::Label(value) if value == "unique" => {
1707                            line.push_str(" UNIQUE");
1708                            let unique_constraints = table_constraints
1709                                .get_mut("unique")
1710                                .and_then(|v| v.as_array_mut())
1711                                .unwrap();
1712                            unique_constraints.push(SerdeValue::String(column_name.to_string()));
1713                        }
1714                        Expression::Function(name, args) if name == "from" => {
1715                            if args.len() != 1 {
1716                                panic!("Invalid foreign key: {} for: {}", structure, table_name);
1717                            }
1718                            match &*args[0] {
1719                                Expression::Field(ftable, fcolumn) => {
1720                                    let foreign_keys = table_constraints
1721                                        .get_mut("foreign")
1722                                        .and_then(|v| v.as_array_mut())
1723                                        .unwrap();
1724                                    let foreign_key = json!({
1725                                        "column": column_name,
1726                                        "ftable": ftable,
1727                                        "fcolumn": fcolumn,
1728                                    });
1729                                    foreign_keys.push(foreign_key);
1730                                }
1731                                _ => {
1732                                    panic!("Invalid foreign key: {} for: {}", structure, table_name)
1733                                }
1734                            };
1735                        }
1736                        Expression::Function(name, args) if name == "tree" => {
1737                            if args.len() != 1 {
1738                                panic!(
1739                                    "Invalid 'tree' constraint: {} for: {}",
1740                                    structure, table_name
1741                                );
1742                            }
1743                            match &*args[0] {
1744                                Expression::Label(child) => {
1745                                    let child_datatype = columns
1746                                        .get(child)
1747                                        .and_then(|c| c.get("datatype"))
1748                                        .and_then(|d| d.as_str());
1749                                    if let None = child_datatype {
1750                                        panic!(
1751                                            "Could not determine SQL datatype for {} of tree({})",
1752                                            child, child
1753                                        );
1754                                    }
1755                                    let child_datatype = child_datatype.unwrap();
1756                                    let parent = column_name;
1757                                    let child_sql_type = get_sql_type(
1758                                        datatypes_config,
1759                                        &child_datatype.to_string(),
1760                                        pool,
1761                                    )
1762                                    .unwrap();
1763                                    if sql_type != child_sql_type {
1764                                        panic!(
1765                                            "SQL type '{}' of '{}' in 'tree({})' for table \
1766                                             '{}' doe snot match SQL type: '{}' of parent: '{}'.",
1767                                            child_sql_type,
1768                                            child,
1769                                            child,
1770                                            table_name,
1771                                            sql_type,
1772                                            parent
1773                                        );
1774                                    }
1775                                    let tree_constraints = table_constraints
1776                                        .get_mut("tree")
1777                                        .and_then(|t| t.as_array_mut())
1778                                        .unwrap();
1779                                    let entry = json!({"parent": column_name,
1780                                                       "child": child});
1781                                    tree_constraints.push(entry);
1782                                }
1783                                _ => {
1784                                    panic!(
1785                                        "Invalid 'tree' constraint: {} for: {}",
1786                                        structure, table_name
1787                                    );
1788                                }
1789                            };
1790                        }
1791                        Expression::Function(name, args) if name == "under" => {
1792                            let generic_error = format!(
1793                                "Invalid 'under' constraint: {} for: {}",
1794                                structure, table_name
1795                            );
1796                            if args.len() != 2 {
1797                                panic!("{}", generic_error);
1798                            }
1799                            match (&*args[0], &*args[1]) {
1800                                (Expression::Field(ttable, tcolumn), Expression::Label(value)) => {
1801                                    let under_constraints = table_constraints
1802                                        .get_mut("under")
1803                                        .and_then(|u| u.as_array_mut())
1804                                        .unwrap();
1805                                    let entry = json!({"column": column_name,
1806                                                       "ttable": ttable,
1807                                                       "tcolumn": tcolumn,
1808                                                       "value": value});
1809                                    under_constraints.push(entry);
1810                                }
1811                                (_, _) => panic!("{}", generic_error),
1812                            };
1813                        }
1814                        _ => panic!(
1815                            "Unrecognized structure: {} for {}.{}",
1816                            structure, table_name, column_name
1817                        ),
1818                    };
1819                }
1820            }
1821        }
1822        if r >= c
1823            && table_constraints
1824                .get("foreign")
1825                .and_then(|v| v.as_array())
1826                .and_then(|v| Some(v.is_empty()))
1827                .unwrap()
1828        {
1829            line.push_str("");
1830        } else {
1831            line.push_str(",");
1832        }
1833        create_lines.push(line);
1834    }
1835
1836    let foreign_keys = table_constraints.get("foreign").and_then(|v| v.as_array()).unwrap();
1837    let num_fkeys = foreign_keys.len();
1838    for (i, fkey) in foreign_keys.iter().enumerate() {
1839        create_lines.push(format!(
1840            r#"  FOREIGN KEY ("{}") REFERENCES "{}"("{}"){}"#,
1841            fkey.get("column").and_then(|s| s.as_str()).unwrap(),
1842            fkey.get("ftable").and_then(|s| s.as_str()).unwrap(),
1843            fkey.get("fcolumn").and_then(|s| s.as_str()).unwrap(),
1844            if i < (num_fkeys - 1) { "," } else { "" }
1845        ));
1846    }
1847    create_lines.push(String::from(");"));
1848    // We are done generating the lines for the 'create table' statement. Join them and add the
1849    // result to the statements to return:
1850    statements.push(String::from(create_lines.join("\n")));
1851
1852    // Loop through the tree constraints and if any of their associated child columns do not already
1853    // have an associated unique or primary index, create one implicitly here:
1854    let tree_constraints = table_constraints.get("tree").and_then(|v| v.as_array()).unwrap();
1855    for tree in tree_constraints {
1856        let unique_keys = table_constraints.get("unique").and_then(|v| v.as_array()).unwrap();
1857        let primary_keys = table_constraints.get("primary").and_then(|v| v.as_array()).unwrap();
1858        let tree_child = tree.get("child").and_then(|c| c.as_str()).unwrap();
1859        if !unique_keys.contains(&SerdeValue::String(tree_child.to_string()))
1860            && !primary_keys.contains(&SerdeValue::String(tree_child.to_string()))
1861        {
1862            statements.push(format!(
1863                r#"CREATE UNIQUE INDEX "{}_{}_idx" ON "{}"("{}");"#,
1864                table_name, tree_child, table_name, tree_child
1865            ));
1866        }
1867    }
1868
1869    // Finally, create a further unique index on row_number:
1870    statements.push(format!(
1871        r#"CREATE UNIQUE INDEX "{}_row_number_idx" ON "{}"("row_number");"#,
1872        table_name, table_name
1873    ));
1874
1875    return (statements, table_constraints);
1876}
1877
1878/// Given a list of messages and a HashMap, messages_stats, with which to collect counts of
1879/// message types, count the various message types encountered in the list and increment the counts
1880/// in messages_stats accordingly.
1881fn add_message_counts(messages: &Vec<SerdeValue>, messages_stats: &mut HashMap<String, usize>) {
1882    for message in messages {
1883        let message = message.as_object().unwrap();
1884        let level = message.get("level").unwrap();
1885        if level == "error" {
1886            let current_errors = messages_stats.get("error").unwrap();
1887            messages_stats.insert("error".to_string(), current_errors + 1);
1888        } else if level == "warning" {
1889            let current_warnings = messages_stats.get("warning").unwrap();
1890            messages_stats.insert("warning".to_string(), current_warnings + 1);
1891        } else if level == "info" {
1892            let current_infos = messages_stats.get("info").unwrap();
1893            messages_stats.insert("info".to_string(), current_infos + 1);
1894        } else {
1895            eprintln!("Warning: unknown message type: {}", level);
1896        }
1897    }
1898}
1899
1900/// Given a global config map, return a list of defined datatype names sorted from the most generic
1901/// to the most specific. This function will panic if circular dependencies are encountered.
1902fn get_sorted_datatypes(global_config: &ConfigMap) -> Vec<&str> {
1903    let mut graph = DiGraphMap::<&str, ()>::new();
1904    let dt_config = global_config.get("datatype").and_then(|d| d.as_object()).unwrap();
1905    for (dt_name, dt_obj) in dt_config.iter() {
1906        let d_index = graph.add_node(dt_name);
1907        if let Some(parent) = dt_obj.get("parent").and_then(|p| p.as_str()) {
1908            let p_index = graph.add_node(parent);
1909            graph.add_edge(d_index, p_index, ());
1910        }
1911    }
1912
1913    let mut cycles = vec![];
1914    match toposort(&graph, None) {
1915        Err(cycle) => {
1916            let problem_node = cycle.node_id();
1917            let neighbours = graph.neighbors_directed(problem_node, Direction::Outgoing);
1918            for neighbour in neighbours {
1919                let ways_to_problem_node =
1920                    all_simple_paths::<Vec<_>, _>(&graph, neighbour, problem_node, 0, None);
1921                for mut way in ways_to_problem_node {
1922                    let mut cycle = vec![problem_node];
1923                    cycle.append(&mut way);
1924                    let cycle = cycle.iter().map(|&item| item.to_string()).collect::<Vec<_>>();
1925                    cycles.push(cycle);
1926                }
1927            }
1928            panic!("Defined datatypes contain circular dependencies: {:?}", cycles);
1929        }
1930        Ok(mut sorted) => {
1931            sorted.reverse();
1932            sorted
1933        }
1934    }
1935}
1936
1937/// Given a sorted list of datatypes and a list of messages for a given cell of some table, sort
1938/// the messages in the following way and return the sorted list of messages:
1939/// 1. Messages pertaining to datatype rule violations, sorted according to the order specified in
1940///    `sorted_datatypes`, followed by:
1941/// 2. Messages pertaining to violations of one of the rules in the rule table, followed by:
1942/// 3. Messages pertaining to structure violations.
1943fn sort_messages(sorted_datatypes: &Vec<&str>, cell_messages: &Vec<SerdeValue>) -> Vec<SerdeValue> {
1944    let mut datatype_messages = vec![];
1945    let mut structure_messages = vec![];
1946    let mut rule_messages = vec![];
1947    for message in cell_messages {
1948        let rule = message
1949            .get("rule")
1950            .and_then(|r| Some(r.as_str().unwrap().splitn(2, ":").collect::<Vec<_>>()))
1951            .unwrap();
1952        if rule[0] == "rule" {
1953            rule_messages.push(message.clone());
1954        } else if rule[0] == "datatype" {
1955            datatype_messages.push(message.clone());
1956        } else {
1957            structure_messages.push(message.clone());
1958        }
1959    }
1960
1961    if datatype_messages.len() > 0 {
1962        datatype_messages = {
1963            let mut sorted_messages = vec![];
1964            for datatype in sorted_datatypes {
1965                let mut messages = datatype_messages
1966                    .iter()
1967                    .filter(|m| {
1968                        m.get("rule").and_then(|r| r.as_str()).unwrap()
1969                            == format!("datatype:{}", datatype)
1970                    })
1971                    .map(|m| m.clone())
1972                    .collect::<Vec<_>>();
1973                sorted_messages.append(&mut messages);
1974            }
1975            sorted_messages
1976        }
1977    }
1978
1979    let mut messages = datatype_messages;
1980    messages.append(&mut rule_messages);
1981    messages.append(&mut structure_messages);
1982    messages
1983}
1984
1985/// Given a configuration map, a table name, a number of rows, their corresponding chunk number,
1986/// and a database connection pool used to determine the database type, return two four-place
1987/// tuples, corresponding to the normal and conflict tables, respectively. Each of these contains
1988/// (i) a SQL string for an insert statement to the table, (ii) parameters to bind to that SQL
1989/// statement, (iii) a SQL string for an insert statement the message table, and (iv) parameters
1990/// to bind to that SQL statement. If the verbose flag is set, the number of errors, warnings,
1991/// and information messages generated are added to messages_stats, the contents of which will
1992/// later be written to stderr.
1993async fn make_inserts(
1994    config: &ConfigMap,
1995    table_name: &String,
1996    rows: &mut Vec<ResultRow>,
1997    chunk_number: usize,
1998    messages_stats: &mut HashMap<String, usize>,
1999    verbose: bool,
2000    pool: &AnyPool,
2001) -> Result<
2002    ((String, Vec<String>, String, Vec<String>), (String, Vec<String>, String, Vec<String>)),
2003    sqlx::Error,
2004> {
2005    let conflict_columns = {
2006        let mut conflict_columns = vec![];
2007        let primaries = config
2008            .get("constraints")
2009            .and_then(|c| c.as_object())
2010            .and_then(|c| c.get("primary"))
2011            .and_then(|t| t.as_object())
2012            .and_then(|t| t.get(table_name))
2013            .and_then(|t| t.as_array())
2014            .unwrap();
2015
2016        let uniques = config
2017            .get("constraints")
2018            .and_then(|c| c.as_object())
2019            .and_then(|c| c.get("unique"))
2020            .and_then(|t| t.as_object())
2021            .and_then(|t| t.get(table_name))
2022            .and_then(|t| t.as_array())
2023            .unwrap();
2024
2025        let trees = config
2026            .get("constraints")
2027            .and_then(|c| c.as_object())
2028            .and_then(|o| o.get("tree"))
2029            .and_then(|t| t.as_object())
2030            .and_then(|o| o.get(table_name))
2031            .and_then(|t| t.as_array())
2032            .unwrap()
2033            .iter()
2034            .map(|v| v.as_object().unwrap())
2035            .map(|v| v.get("child").unwrap().clone())
2036            .collect::<Vec<_>>();
2037
2038        for key_columns in vec![primaries, uniques, &trees] {
2039            for column in key_columns {
2040                if !conflict_columns.contains(column) {
2041                    conflict_columns.push(column.clone());
2042                }
2043            }
2044        }
2045
2046        conflict_columns
2047    };
2048
2049    fn generate_sql(
2050        config: &ConfigMap,
2051        table_name: &String,
2052        column_names: &Vec<String>,
2053        rows: &Vec<ResultRow>,
2054        messages_stats: &mut HashMap<String, usize>,
2055        verbose: bool,
2056        pool: &AnyPool,
2057    ) -> (String, Vec<String>, String, Vec<String>) {
2058        let mut lines = vec![];
2059        let mut params = vec![];
2060        let mut message_lines = vec![];
2061        let mut message_params = vec![];
2062        let sorted_datatypes = get_sorted_datatypes(config);
2063        for row in rows.iter() {
2064            let mut values = vec![format!("{}", row.row_number.unwrap())];
2065            for column in column_names {
2066                let cell = row.contents.get(column).unwrap();
2067
2068                // Insert the value of the cell into the column unless it is invalid or has the
2069                // nulltype field set, in which case insert NULL:
2070                if cell.nulltype == None && cell.valid {
2071                    let sql_type =
2072                        get_sql_type_from_global_config(&config, &table_name, &column, pool)
2073                            .unwrap();
2074                    values.push(cast_sql_param_from_text(&sql_type));
2075                    params.push(cell.value.clone());
2076                } else {
2077                    values.push(String::from("NULL"));
2078                }
2079
2080                // If the cell isn't valid, generate values and params to be used for the insert to
2081                // the message table:
2082                if !cell.valid {
2083                    if verbose {
2084                        add_message_counts(&cell.messages, messages_stats);
2085                    }
2086                    for message in sort_messages(&sorted_datatypes, &cell.messages) {
2087                        let row = row.row_number.unwrap().to_string();
2088                        let message_values = vec![
2089                            SQL_PARAM, &row, SQL_PARAM, SQL_PARAM, SQL_PARAM, SQL_PARAM, SQL_PARAM,
2090                        ];
2091
2092                        let message = message.as_object().unwrap();
2093                        message_params.push({
2094                            let normal_table_name;
2095                            if let Some(s) = table_name.strip_suffix("_conflict") {
2096                                normal_table_name = String::from(s);
2097                            } else {
2098                                normal_table_name = table_name.to_string();
2099                            }
2100                            normal_table_name
2101                        });
2102                        message_params.push(column.clone());
2103                        message_params.push(cell.value.clone());
2104                        message_params.push(
2105                            message.get("level").and_then(|s| s.as_str()).unwrap().to_string(),
2106                        );
2107                        message_params.push(
2108                            message.get("rule").and_then(|s| s.as_str()).unwrap().to_string(),
2109                        );
2110                        message_params.push(
2111                            message.get("message").and_then(|s| s.as_str()).unwrap().to_string(),
2112                        );
2113                        let line = message_values.join(", ");
2114                        let line = format!("({})", line);
2115                        message_lines.push(line);
2116                    }
2117                }
2118            }
2119            let line = values.join(", ");
2120            let line = format!("({})", line);
2121            lines.push(line);
2122        }
2123
2124        // Generate the SQL output for the insert to the table:
2125        let mut output = String::from("");
2126        if !lines.is_empty() {
2127            output.push_str(&format!(
2128                r#"INSERT INTO "{}" ("row_number", {}) VALUES"#,
2129                table_name,
2130                {
2131                    let mut all_columns = vec![];
2132                    for column_name in column_names {
2133                        let quoted_column_name = format!(r#""{}""#, column_name);
2134                        all_columns.push(quoted_column_name);
2135                    }
2136                    all_columns.join(", ")
2137                }
2138            ));
2139            output.push_str("\n");
2140            output.push_str(&lines.join(",\n"));
2141            output.push_str(";");
2142        }
2143
2144        // Generate the output for the insert to the message table:
2145        let mut message_output = String::from("");
2146        if !message_lines.is_empty() {
2147            message_output.push_str(r#"INSERT INTO "message" "#);
2148            message_output
2149                .push_str(r#"("table", "row", "column", "value", "level", "rule", "message") "#);
2150            message_output.push_str("VALUES");
2151            message_output.push_str("\n");
2152            message_output.push_str(&message_lines.join(",\n"));
2153            message_output.push_str(";");
2154        }
2155
2156        (output, params, message_output, message_params)
2157    }
2158
2159    fn has_conflict(row: &ResultRow, conflict_columns: &Vec<SerdeValue>) -> bool {
2160        for (column, cell) in &row.contents {
2161            let column = SerdeValue::String(column.to_string());
2162            if conflict_columns.contains(&column) && !cell.valid {
2163                return true;
2164            }
2165        }
2166        return false;
2167    }
2168
2169    let mut main_rows = vec![];
2170    let mut conflict_rows = vec![];
2171    for (i, row) in rows.iter_mut().enumerate() {
2172        // enumerate begins at 0 but we need to begin at 1:
2173        let i = i + 1;
2174        row.row_number = Some(i as u32 + chunk_number as u32 * CHUNK_SIZE as u32);
2175        if has_conflict(&row, &conflict_columns) {
2176            conflict_rows.push(row.clone());
2177        } else {
2178            main_rows.push(row.clone());
2179        }
2180    }
2181
2182    // Use the "column_order" field of the table config for this table to retrieve the column names
2183    // in the correct order:
2184    let column_names = config
2185        .get("table")
2186        .and_then(|t| t.get(table_name))
2187        .and_then(|t| t.get("column_order"))
2188        .and_then(|c| c.as_array())
2189        .unwrap()
2190        .iter()
2191        .map(|v| v.as_str().unwrap().to_string())
2192        .collect::<Vec<_>>();
2193
2194    let (main_sql, main_params, main_message_sql, main_message_params) = generate_sql(
2195        &config,
2196        &table_name,
2197        &column_names,
2198        &main_rows,
2199        messages_stats,
2200        verbose,
2201        pool,
2202    );
2203    let (conflict_sql, conflict_params, conflict_message_sql, conflict_message_params) =
2204        generate_sql(
2205            &config,
2206            &format!("{}_conflict", table_name),
2207            &column_names,
2208            &conflict_rows,
2209            messages_stats,
2210            verbose,
2211            pool,
2212        );
2213
2214    Ok((
2215        (main_sql, main_params, main_message_sql, main_message_params),
2216        (conflict_sql, conflict_params, conflict_message_sql, conflict_message_params),
2217    ))
2218}
2219
2220/// Given a configuration map, a database connection pool, a table name, some rows to validate,
2221/// and the chunk number corresponding to the rows, do inter-row validation on the rows and insert
2222/// them to the table. If the verbose flag is set to true, error/warning/info stats will be
2223/// collected in messages_stats and later written to stderr.
2224async fn validate_rows_inter_and_insert(
2225    config: &ConfigMap,
2226    pool: &AnyPool,
2227    table_name: &String,
2228    rows: &mut Vec<ResultRow>,
2229    chunk_number: usize,
2230    messages_stats: &mut HashMap<String, usize>,
2231    verbose: bool,
2232) -> Result<(), sqlx::Error> {
2233    // First, do the tree validation:
2234    validate_rows_trees(config, pool, table_name, rows).await?;
2235
2236    // Try to insert the rows to the db first without validating unique and foreign constraints.
2237    // If there are constraint violations this will cause a database error, in which case we then
2238    // explicitly do the constraint validation and insert the resulting rows.
2239    // Note that instead of passing messages_stats here, we are going to initialize an empty map
2240    // and pass that instead. The reason is that if a database error gets thrown, and then we
2241    // redo the validation later, some of the messages will be double-counted. So to avoid that
2242    // we send an empty map here, and in the case of no database error, we will just add the
2243    // contents of the temporary map to messages_stats (in the Ok branch of the match statement
2244    // below).
2245    let mut tmp_messages_stats = HashMap::new();
2246    tmp_messages_stats.insert("error".to_string(), 0);
2247    tmp_messages_stats.insert("warning".to_string(), 0);
2248    tmp_messages_stats.insert("info".to_string(), 0);
2249    let (
2250        (main_sql, main_params, main_message_sql, main_message_params),
2251        (conflict_sql, conflict_params, conflict_message_sql, conflict_message_params),
2252    ) = make_inserts(
2253        config,
2254        table_name,
2255        rows,
2256        chunk_number,
2257        &mut tmp_messages_stats,
2258        verbose,
2259        pool,
2260    )
2261    .await?;
2262
2263    let main_sql = local_sql_syntax(&pool, &main_sql);
2264    let mut main_query = sqlx_query(&main_sql);
2265    for param in &main_params {
2266        main_query = main_query.bind(param);
2267    }
2268    let main_result = main_query.execute(pool).await;
2269    match main_result {
2270        Ok(_) => {
2271            let conflict_sql = local_sql_syntax(&pool, &conflict_sql);
2272            let mut conflict_query = sqlx_query(&conflict_sql);
2273            for param in &conflict_params {
2274                conflict_query = conflict_query.bind(param);
2275            }
2276            conflict_query.execute(pool).await?;
2277
2278            let main_message_sql = local_sql_syntax(&pool, &main_message_sql);
2279            let mut main_message_query = sqlx_query(&main_message_sql);
2280            for param in &main_message_params {
2281                main_message_query = main_message_query.bind(param);
2282            }
2283            main_message_query.execute(pool).await?;
2284
2285            let conflict_message_sql = local_sql_syntax(&pool, &conflict_message_sql);
2286            let mut conflict_message_query = sqlx_query(&conflict_message_sql);
2287            for param in &conflict_message_params {
2288                conflict_message_query = conflict_message_query.bind(param);
2289            }
2290            conflict_message_query.execute(pool).await?;
2291
2292            if verbose {
2293                let curr_errors = messages_stats.get("error").unwrap();
2294                messages_stats.insert(
2295                    "error".to_string(),
2296                    curr_errors + tmp_messages_stats.get("error").unwrap(),
2297                );
2298                let curr_warnings = messages_stats.get("warning").unwrap();
2299                messages_stats.insert(
2300                    "warning".to_string(),
2301                    curr_warnings + tmp_messages_stats.get("warning").unwrap(),
2302                );
2303                let curr_infos = messages_stats.get("info").unwrap();
2304                messages_stats.insert(
2305                    "info".to_string(),
2306                    curr_infos + tmp_messages_stats.get("info").unwrap(),
2307                );
2308            }
2309        }
2310        Err(_) => {
2311            validate_rows_constraints(config, pool, table_name, rows).await?;
2312
2313            let (
2314                (main_sql, main_params, main_message_sql, main_message_params),
2315                (conflict_sql, conflict_params, conflict_message_sql, conflict_message_params),
2316            ) = make_inserts(config, table_name, rows, chunk_number, messages_stats, verbose, pool)
2317                .await?;
2318
2319            let main_sql = local_sql_syntax(&pool, &main_sql);
2320            let mut main_query = sqlx_query(&main_sql);
2321            for param in &main_params {
2322                main_query = main_query.bind(param);
2323            }
2324            main_query.execute(pool).await?;
2325
2326            let conflict_sql = local_sql_syntax(&pool, &conflict_sql);
2327            let mut conflict_query = sqlx_query(&conflict_sql);
2328            for param in &conflict_params {
2329                conflict_query = conflict_query.bind(param);
2330            }
2331            conflict_query.execute(pool).await?;
2332
2333            let main_message_sql = local_sql_syntax(&pool, &main_message_sql);
2334            let mut main_message_query = sqlx_query(&main_message_sql);
2335            for param in &main_message_params {
2336                main_message_query = main_message_query.bind(param);
2337            }
2338            main_message_query.execute(pool).await?;
2339
2340            let conflict_message_sql = local_sql_syntax(&pool, &conflict_message_sql);
2341            let mut conflict_message_query = sqlx_query(&conflict_message_sql);
2342            for param in &conflict_message_params {
2343                conflict_message_query = conflict_message_query.bind(param);
2344            }
2345            conflict_message_query.execute(pool).await?;
2346        }
2347    };
2348
2349    Ok(())
2350}
2351
2352/// Given a configuration map, a database connection pool, maps for compiled datatype and rule
2353/// conditions, a table name, a number of chunks of rows to insert into the table in the database,
2354/// and the headers of the rows to be inserted, validate each chunk and insert the validated rows
2355/// to the table. If the verbose flag is set to true, error/warning/info stats will be collected in
2356/// messages_stats and later written to stderr.
2357async fn validate_and_insert_chunks(
2358    config: &ConfigMap,
2359    pool: &AnyPool,
2360    compiled_datatype_conditions: &HashMap<String, CompiledCondition>,
2361    compiled_rule_conditions: &HashMap<String, HashMap<String, Vec<ColumnRule>>>,
2362    table_name: &String,
2363    chunks: &IntoChunks<csv::StringRecordsIter<'_, std::fs::File>>,
2364    headers: &csv::StringRecord,
2365    messages_stats: &mut HashMap<String, usize>,
2366    verbose: bool,
2367) -> Result<(), sqlx::Error> {
2368    if !MULTI_THREADED {
2369        for (chunk_number, chunk) in chunks.into_iter().enumerate() {
2370            let mut rows: Vec<_> = chunk.collect();
2371            let mut intra_validated_rows = validate_rows_intra(
2372                config,
2373                compiled_datatype_conditions,
2374                compiled_rule_conditions,
2375                table_name,
2376                headers,
2377                &mut rows,
2378            );
2379            validate_rows_inter_and_insert(
2380                config,
2381                pool,
2382                table_name,
2383                &mut intra_validated_rows,
2384                chunk_number,
2385                messages_stats,
2386                verbose,
2387            )
2388            .await?;
2389        }
2390        Ok(())
2391    } else {
2392        // Here is how this works. First of all note that we are given a number of chunks of rows,
2393        // where the number of rows in each chunk is determined by CHUNK_SIZE (defined above). We
2394        // then divide the chunks into batches, where the number of chunks in each batch is
2395        // determined by the number of CPUs present on the system. We then iterate over the
2396        // batches one by one, assigning each chunk in a given batch to a worker thread whose
2397        // job is to perform intra-row validation on that chunk. The workers work in parallel, one
2398        // per CPU, and after all the workers have completed and their results have been collected,
2399        // we then perform inter-row validation on the chunks in the batch, this time serially.
2400        // Once this is done, we move on to the next batch and continue in this fashion.
2401        let num_cpus = num_cpus::get();
2402        let batches = chunks.into_iter().chunks(num_cpus);
2403        let mut chunk_number = 0;
2404        for batch in batches.into_iter() {
2405            let mut results = BTreeMap::new();
2406            crossbeam::scope(|scope| {
2407                let mut workers = vec![];
2408                for chunk in batch.into_iter() {
2409                    let mut rows: Vec<_> = chunk.collect();
2410                    workers.push(scope.spawn(move |_| {
2411                        validate_rows_intra(
2412                            config,
2413                            compiled_datatype_conditions,
2414                            compiled_rule_conditions,
2415                            table_name,
2416                            headers,
2417                            &mut rows,
2418                        )
2419                    }));
2420                }
2421
2422                for worker in workers {
2423                    let result = worker.join().unwrap();
2424                    results.insert(chunk_number, result);
2425                    chunk_number += 1;
2426                }
2427            })
2428            .expect("A child thread panicked");
2429
2430            for (chunk_number, mut intra_validated_rows) in results {
2431                validate_rows_inter_and_insert(
2432                    config,
2433                    pool,
2434                    table_name,
2435                    &mut intra_validated_rows,
2436                    chunk_number,
2437                    messages_stats,
2438                    verbose,
2439                )
2440                .await?;
2441            }
2442        }
2443
2444        Ok(())
2445    }
2446}
2447
2448/// Given a configuration map, a database connection pool, a parser, HashMaps representing
2449/// compiled datatype and rule conditions, and a HashMap representing parsed structure conditions,
2450/// read in the data TSV files corresponding to each configured table, then validate and load all of
2451/// the corresponding data rows. If the verbose flag is set to true, output progress messages to
2452/// stderr during load.
2453async fn load_db(
2454    config: &ConfigMap,
2455    pool: &AnyPool,
2456    compiled_datatype_conditions: &HashMap<String, CompiledCondition>,
2457    compiled_rule_conditions: &HashMap<String, HashMap<String, Vec<ColumnRule>>>,
2458    verbose: bool,
2459) -> Result<(), sqlx::Error> {
2460    let mut table_list = vec![];
2461    for table in config.get("sorted_table_list").and_then(|l| l.as_array()).unwrap() {
2462        table_list.push(table.as_str().and_then(|s| Some(s.to_string())).unwrap());
2463    }
2464    let table_list = table_list; // Change the table_list to read only after populating it.
2465    let num_tables = table_list.len();
2466    let mut total_errors = 0;
2467    let mut total_warnings = 0;
2468    let mut total_infos = 0;
2469    let mut table_num = 1;
2470    for table_name in table_list {
2471        if verbose {
2472            eprintln!(
2473                "{} - Loading table {}/{}: {}",
2474                Utc::now(),
2475                table_num,
2476                num_tables,
2477                table_name
2478            );
2479        }
2480        table_num += 1;
2481        let path = String::from(
2482            config
2483                .get("table")
2484                .and_then(|t| t.as_object())
2485                .and_then(|o| o.get(&table_name))
2486                .and_then(|n| n.get("path"))
2487                .and_then(|p| p.as_str())
2488                .unwrap(),
2489        );
2490        let mut rdr = csv::ReaderBuilder::new().has_headers(false).delimiter(b'\t').from_reader(
2491            File::open(path.clone()).unwrap_or_else(|err| {
2492                panic!("Unable to open '{}': {}", path.clone(), err);
2493            }),
2494        );
2495
2496        // Extract the headers, which we will need later:
2497        let mut records = rdr.records();
2498        let headers;
2499        if let Some(result) = records.next() {
2500            headers = result.unwrap();
2501        } else {
2502            panic!("'{}' is empty", path);
2503        }
2504
2505        for header in headers.iter() {
2506            if header.trim().is_empty() {
2507                panic!("One or more of the header fields is empty for table '{}'", table_name);
2508            }
2509        }
2510
2511        // HashMap used to report info about the number of error/warning/info messages for this
2512        // table when the verbose flag is set to true:
2513        let mut messages_stats = HashMap::new();
2514        messages_stats.insert("error".to_string(), 0);
2515        messages_stats.insert("warning".to_string(), 0);
2516        messages_stats.insert("info".to_string(), 0);
2517
2518        // Split the data into chunks of size CHUNK_SIZE before passing them to the validation
2519        // logic:
2520        let chunks = records.chunks(CHUNK_SIZE);
2521        validate_and_insert_chunks(
2522            config,
2523            pool,
2524            compiled_datatype_conditions,
2525            compiled_rule_conditions,
2526            &table_name,
2527            &chunks,
2528            &headers,
2529            &mut messages_stats,
2530            verbose,
2531        )
2532        .await?;
2533
2534        // We need to wait until all of the rows for a table have been loaded before validating the
2535        // "foreign" constraints on a table's trees, since this checks if the values of one column
2536        // (the tree's parent) are all contained in another column (the tree's child):
2537        // We also need to wait before validating a table's "under" constraints. Although the tree
2538        // associated with such a constraint need not be defined on the same table, it can be.
2539        let mut recs_to_update =
2540            validate_tree_foreign_keys(config, pool, &table_name, None).await?;
2541        recs_to_update.append(&mut validate_under(config, pool, &table_name, None).await?);
2542
2543        for record in recs_to_update {
2544            let row_number = record.get("row_number").unwrap();
2545            let column_name = record.get("column").and_then(|s| s.as_str()).unwrap();
2546            let value = record.get("value").and_then(|s| s.as_str()).unwrap();
2547            let level = record.get("level").and_then(|s| s.as_str()).unwrap();
2548            let rule = record.get("rule").and_then(|s| s.as_str()).unwrap();
2549            let message = record.get("message").and_then(|s| s.as_str()).unwrap();
2550
2551            let sql = format!(
2552                r#"UPDATE "{}" SET "{}" = NULL WHERE "row_number" = {}"#,
2553                table_name, column_name, row_number
2554            );
2555            let query = sqlx_query(&sql);
2556            query.execute(pool).await?;
2557
2558            let sql = local_sql_syntax(
2559                &pool,
2560                &format!(
2561                    r#"INSERT INTO "message"
2562                       ("table", "row", "column", "value", "level", "rule", "message")
2563                       VALUES ({}, {}, {}, {}, {}, {}, {})"#,
2564                    SQL_PARAM, row_number, SQL_PARAM, SQL_PARAM, SQL_PARAM, SQL_PARAM, SQL_PARAM
2565                ),
2566            );
2567            let mut query = sqlx_query(&sql);
2568            query = query.bind(&table_name);
2569            query = query.bind(&column_name);
2570            query = query.bind(&value);
2571            query = query.bind(&level);
2572            query = query.bind(&rule);
2573            query = query.bind(&message);
2574            query.execute(pool).await?;
2575
2576            if verbose {
2577                // Add the generated message to messages_stats:
2578                let messages = vec![json!({
2579                    "message": message,
2580                    "level": level,
2581                })];
2582                add_message_counts(&messages, &mut messages_stats);
2583            }
2584        }
2585
2586        if verbose {
2587            // Output a report on the messages generated to stderr:
2588            let errors = messages_stats.get("error").unwrap();
2589            let warnings = messages_stats.get("warning").unwrap();
2590            let infos = messages_stats.get("info").unwrap();
2591            let status_message = format!(
2592                "{} errors, {} warnings, and {} information messages generated for {}",
2593                errors, warnings, infos, table_name
2594            );
2595            eprintln!("{} - {}", Utc::now(), status_message);
2596            total_errors += errors;
2597            total_warnings += warnings;
2598            total_infos += infos;
2599        }
2600    }
2601
2602    if verbose {
2603        eprintln!(
2604            "{} - Loading complete with {} errors, {} warnings, and {} information messages",
2605            Utc::now(),
2606            total_errors,
2607            total_warnings,
2608            total_infos
2609        );
2610    }
2611
2612    Ok(())
2613}