1use std::io::{BufRead, Read, Write};
2use std::path::Path;
3
4use csv;
5use rusqlite;
6
7use csv_utils::string_to_csv_output;
8use db_utils::{escape_columns, escape_values, AllString};
9use errors::{Result, ResultExt};
10
11pub struct Executor<W: Write> {
13 conn: rusqlite::Connection,
14 output: W,
15 delimiter: u8,
16}
17
18impl<W> Executor<W>
19where
20 W: Write,
21{
22 pub fn new<R>(
23 readers: Vec<R>,
24 output: W,
25 delimiter: u8,
26 batch_insert_number: usize,
27 ) -> Result<Executor<W>>
28 where
29 R: BufRead,
30 {
31 let conn = Self::create_database()?;
32 Self::process_csv_files(readers, delimiter, batch_insert_number, &conn)?;
33 Ok(Executor {
34 conn,
35 output,
36 delimiter,
37 })
38 }
39
40 fn create_database() -> Result<rusqlite::Connection> {
41 Ok(rusqlite::Connection::open_in_memory().chain_err(|| "Opening memory database.")?)
42 }
43
44 fn process_csv_files<R>(
45 readers: Vec<R>,
46 delimiter: u8,
47 batch_insert_number: usize,
48 conn: &rusqlite::Connection,
49 ) -> Result<()>
50 where
51 R: Read,
52 {
53 for (i, reader) in readers.into_iter().enumerate() {
54 let table_number = i + 1;
55 let mut csv_reader = csv::ReaderBuilder::new()
56 .delimiter(delimiter)
57 .from_reader(reader);
58
59 let columns = Self::get_csv_columns(&mut csv_reader)?;
60 Self::create_table(&conn, &columns, table_number)?;
61 Self::fill_data(
62 &conn,
63 &columns,
64 table_number,
65 batch_insert_number,
66 csv_reader,
67 )?;
68 }
69 Ok(())
70 }
71
72 fn get_csv_columns<R>(csv_reader: &mut csv::Reader<R>) -> Result<csv::StringRecord>
73 where
74 R: Read,
75 {
76 Ok(csv_reader
77 .headers()
78 .chain_err(|| "Reading headers")?
79 .clone())
80 }
81
82 fn create_table(
83 conn: &rusqlite::Connection,
84 columns: &csv::StringRecord,
85 table_number: usize,
86 ) -> Result<()> {
87 let quoted_columns: Vec<String> = columns
88 .iter()
89 .map(|c| format!("\"{}\" VARCHAR NULL", c))
90 .collect();
91 let create_query = format!(
92 "CREATE TABLE table{} ({})",
93 table_number,
94 quoted_columns.join(", ")
95 );
96 conn.execute(&create_query, &[])
97 .chain_err(|| format!("Error creating the database. Used query {}", create_query))?;
98 Ok(())
99 }
100
101 fn fill_data<R>(
102 conn: &rusqlite::Connection,
103 columns: &csv::StringRecord,
104 table_number: usize,
105 batch_insert_number: usize,
106 mut reader: csv::Reader<R>,
107 ) -> Result<()>
108 where
109 R: Read,
110 {
111 let quoted_columns = escape_columns(columns);
112 let insert = format!(
113 "INSERT INTO table{} ({}) VALUES\n",
114 table_number,
115 quoted_columns.join(", ")
116 );
117
118 let mut rows: Vec<String> = vec![];
119 for (i, row) in reader.records().enumerate() {
120 let row = row.chain_err(|| "Error reading row")?;
121 let db_row = escape_values(&row);
122 rows.push(format!("({})", db_row.join(", ")));
123 if i % batch_insert_number == 0 {
124 Self::batch_insert(&conn, &insert, &mut rows)?;
125 }
126 }
127 Self::batch_insert(&conn, &insert, &mut rows)?;
128 Ok(())
129 }
130
131 fn batch_insert(
133 conn: &rusqlite::Connection,
134 insert: &str,
135 rows: &mut Vec<String>,
136 ) -> Result<()> {
137 let mut batch = insert.to_owned();
138 batch.push_str(&rows.join(",\n"));
139 conn.execute(&batch, &[])
140 .chain_err(|| "Error running insert query.")?;
141 rows.clear();
142 Ok(())
143 }
144
145 fn delimiter_to_string(&self) -> String {
146 let mut delimiter = String::new();
147 delimiter.push(self.delimiter as char);
148 delimiter
149 }
150
151 pub fn write_query_results(&mut self, query: &str) -> Result<()> {
153 let delimiter = self.delimiter_to_string();
154 let mut prepared = Self::prepare_query(&self.conn, query)?;
155 let output_error = "Error writing on selected output";
156 Self::write_headers(&prepared, &mut self.output, &output_error, &delimiter)?;
157 let mut rows = prepared
158 .query(&[])
159 .chain_err(|| "Error binding parameters")?;
160 Self::write_rows(&mut rows, &mut self.output, &output_error, &delimiter)?;
161 self.output.flush().chain_err(|| "Error writing results")?;
162 Ok(())
163 }
164
165 fn prepare_query<'a>(
166 conn: &'a rusqlite::Connection,
167 query: &str,
168 ) -> Result<rusqlite::Statement<'a>> {
169 Ok(conn
170 .prepare(query)
171 .chain_err(|| format!("Error preparing query: {}", query))?)
172 }
173
174 fn write_headers(
175 prepared: &rusqlite::Statement,
176 output: &mut W,
177 output_error: &str,
178 delimiter: &str,
179 ) -> Result<()> {
180 let columns_names = prepared
181 .column_names()
182 .iter()
183 .map(|c| format!("\"{}\"", c))
184 .collect::<Vec<String>>()
185 .join(&delimiter);
186 writeln!(output, "{}", columns_names).chain_err(|| output_error)?;
187 Ok(())
188 }
189
190 fn write_rows(
191 rows: &mut rusqlite::Rows,
192 output: &mut W,
193 output_error: &str,
194 delimiter: &str,
195 ) -> Result<()> {
196 while let Some(row) = rows.next() {
197 let row = row.chain_err(|| "Error reading results")?;
198 let output_rows = (0..row.column_count())
199 .map(|r| row.get::<usize, AllString>(r).into())
200 .map(|r| string_to_csv_output(&r))
201 .collect::<Vec<String>>()
202 .join(&delimiter);
203 writeln!(output, "{}", output_rows).chain_err(|| output_error)?;
204 }
205
206 Ok(())
207 }
208
209 pub fn dump_database<P>(&self, output: P) -> Result<()>
210 where
211 P: AsRef<Path>,
212 {
213 self.conn
214 .backup(rusqlite::DatabaseName::Main, output, None)
215 .chain_err(|| "Failed to dump database")?;
216 Ok(())
217 }
218}