csv_query/
executor.rs

1use std::io::{BufRead, Read, Write};
2use std::path::Path;
3
4use csv;
5use rusqlite;
6
7use csv_utils::string_to_csv_output;
8use db_utils::{escape_columns, escape_values, AllString};
9use errors::{Result, ResultExt};
10
11/// Main struct that parses the CSV and put the data into a SQLite
12pub struct Executor<W: Write> {
13    conn: rusqlite::Connection,
14    output: W,
15    delimiter: u8,
16}
17
18impl<W> Executor<W>
19where
20    W: Write,
21{
22    pub fn new<R>(
23        readers: Vec<R>,
24        output: W,
25        delimiter: u8,
26        batch_insert_number: usize,
27    ) -> Result<Executor<W>>
28    where
29        R: BufRead,
30    {
31        let conn = Self::create_database()?;
32        Self::process_csv_files(readers, delimiter, batch_insert_number, &conn)?;
33        Ok(Executor {
34            conn,
35            output,
36            delimiter,
37        })
38    }
39
40    fn create_database() -> Result<rusqlite::Connection> {
41        Ok(rusqlite::Connection::open_in_memory().chain_err(|| "Opening memory database.")?)
42    }
43
44    fn process_csv_files<R>(
45        readers: Vec<R>,
46        delimiter: u8,
47        batch_insert_number: usize,
48        conn: &rusqlite::Connection,
49    ) -> Result<()>
50    where
51        R: Read,
52    {
53        for (i, reader) in readers.into_iter().enumerate() {
54            let table_number = i + 1;
55            let mut csv_reader = csv::ReaderBuilder::new()
56                .delimiter(delimiter)
57                .from_reader(reader);
58
59            let columns = Self::get_csv_columns(&mut csv_reader)?;
60            Self::create_table(&conn, &columns, table_number)?;
61            Self::fill_data(
62                &conn,
63                &columns,
64                table_number,
65                batch_insert_number,
66                csv_reader,
67            )?;
68        }
69        Ok(())
70    }
71
72    fn get_csv_columns<R>(csv_reader: &mut csv::Reader<R>) -> Result<csv::StringRecord>
73    where
74        R: Read,
75    {
76        Ok(csv_reader
77            .headers()
78            .chain_err(|| "Reading headers")?
79            .clone())
80    }
81
82    fn create_table(
83        conn: &rusqlite::Connection,
84        columns: &csv::StringRecord,
85        table_number: usize,
86    ) -> Result<()> {
87        let quoted_columns: Vec<String> = columns
88            .iter()
89            .map(|c| format!("\"{}\" VARCHAR NULL", c))
90            .collect();
91        let create_query = format!(
92            "CREATE TABLE table{} ({})",
93            table_number,
94            quoted_columns.join(", ")
95        );
96        conn.execute(&create_query, &[])
97            .chain_err(|| format!("Error creating the database. Used query {}", create_query))?;
98        Ok(())
99    }
100
101    fn fill_data<R>(
102        conn: &rusqlite::Connection,
103        columns: &csv::StringRecord,
104        table_number: usize,
105        batch_insert_number: usize,
106        mut reader: csv::Reader<R>,
107    ) -> Result<()>
108    where
109        R: Read,
110    {
111        let quoted_columns = escape_columns(columns);
112        let insert = format!(
113            "INSERT INTO table{} ({}) VALUES\n",
114            table_number,
115            quoted_columns.join(", ")
116        );
117
118        let mut rows: Vec<String> = vec![];
119        for (i, row) in reader.records().enumerate() {
120            let row = row.chain_err(|| "Error reading row")?;
121            let db_row = escape_values(&row);
122            rows.push(format!("({})", db_row.join(", ")));
123            if i % batch_insert_number == 0 {
124                Self::batch_insert(&conn, &insert, &mut rows)?;
125            }
126        }
127        Self::batch_insert(&conn, &insert, &mut rows)?;
128        Ok(())
129    }
130
131    /// Consume rows vector and write them into sqlite
132    fn batch_insert(
133        conn: &rusqlite::Connection,
134        insert: &str,
135        rows: &mut Vec<String>,
136    ) -> Result<()> {
137        let mut batch = insert.to_owned();
138        batch.push_str(&rows.join(",\n"));
139        conn.execute(&batch, &[])
140            .chain_err(|| "Error running insert query.")?;
141        rows.clear();
142        Ok(())
143    }
144
145    fn delimiter_to_string(&self) -> String {
146        let mut delimiter = String::new();
147        delimiter.push(self.delimiter as char);
148        delimiter
149    }
150
151    /// Run the query and write its result as CSV into the specified output stream
152    pub fn write_query_results(&mut self, query: &str) -> Result<()> {
153        let delimiter = self.delimiter_to_string();
154        let mut prepared = Self::prepare_query(&self.conn, query)?;
155        let output_error = "Error writing on selected output";
156        Self::write_headers(&prepared, &mut self.output, &output_error, &delimiter)?;
157        let mut rows = prepared
158            .query(&[])
159            .chain_err(|| "Error binding parameters")?;
160        Self::write_rows(&mut rows, &mut self.output, &output_error, &delimiter)?;
161        self.output.flush().chain_err(|| "Error writing results")?;
162        Ok(())
163    }
164
165    fn prepare_query<'a>(
166        conn: &'a rusqlite::Connection,
167        query: &str,
168    ) -> Result<rusqlite::Statement<'a>> {
169        Ok(conn
170            .prepare(query)
171            .chain_err(|| format!("Error preparing query: {}", query))?)
172    }
173
174    fn write_headers(
175        prepared: &rusqlite::Statement,
176        output: &mut W,
177        output_error: &str,
178        delimiter: &str,
179    ) -> Result<()> {
180        let columns_names = prepared
181            .column_names()
182            .iter()
183            .map(|c| format!("\"{}\"", c))
184            .collect::<Vec<String>>()
185            .join(&delimiter);
186        writeln!(output, "{}", columns_names).chain_err(|| output_error)?;
187        Ok(())
188    }
189
190    fn write_rows(
191        rows: &mut rusqlite::Rows,
192        output: &mut W,
193        output_error: &str,
194        delimiter: &str,
195    ) -> Result<()> {
196        while let Some(row) = rows.next() {
197            let row = row.chain_err(|| "Error reading results")?;
198            let output_rows = (0..row.column_count())
199                .map(|r| row.get::<usize, AllString>(r).into())
200                .map(|r| string_to_csv_output(&r))
201                .collect::<Vec<String>>()
202                .join(&delimiter);
203            writeln!(output, "{}", output_rows).chain_err(|| output_error)?;
204        }
205
206        Ok(())
207    }
208
209    pub fn dump_database<P>(&self, output: P) -> Result<()>
210    where
211        P: AsRef<Path>,
212    {
213        self.conn
214            .backup(rusqlite::DatabaseName::Main, output, None)
215            .chain_err(|| "Failed to dump database")?;
216        Ok(())
217    }
218}