1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
//! Implementation of `BigQueryLocator::write_remote_data`.
use std::{
fs::File,
process::{Command, Stdio},
};
use tempdir::TempDir;
use tokio::io;
use tokio_process::CommandExt;
use super::BigQueryLocator;
use crate::common::*;
use crate::drivers::{
bigquery_shared::{if_exists_to_bq_load_arg, BqTable, TableBigQueryExt, Usage},
gs::GsLocator,
};
/// Copy `source` to `dest` using `schema`.
///
/// The function `BigQueryLocator::write_remote_data` isn't (yet) allowed to be
/// async, because it's part of a trait. This version is an `async fn`, which
/// makes the code much clearer.
pub(crate) async fn write_remote_data_helper(
ctx: Context,
source: BoxLocator,
dest: BigQueryLocator,
shared_args: SharedArguments<Unverified>,
source_args: SourceArguments<Unverified>,
dest_args: DestinationArguments<Unverified>,
) -> Result<Vec<BoxLocator>> {
// Convert the source locator into the underlying `gs://` URL. This is a bit
// fiddly because we're downcasting `source` and relying on knowledge about
// the `GsLocator` type, and Rust doesn't make that especially easy.
let mut source_url = source
.as_any()
.downcast_ref::<GsLocator>()
.ok_or_else(|| format_err!("not a gs:// locator: {}", source))?
.as_url()
.to_owned();
// Verify our arguments.
let shared_args = shared_args.verify(BigQueryLocator::features())?;
let _source_args = source_args.verify(Features::empty())?;
let dest_args = dest_args.verify(BigQueryLocator::features())?;
// Get the arguments we care about.
let schema = shared_args.schema();
let temporary_storage = shared_args.temporary_storage();
let if_exists = dest_args.if_exists();
// If our URL looks like a directory, add a glob.
//
// TODO: Is this the right way to default this? Or should we make users
// always specify `*.csv`? This should probably be part of some larger
// `dbcrossbar` property. Elsewhere, we're trying to default to adding
// `**/*.csv`, but that's not supported by BigQuery.
if source_url.as_str().ends_with('/') {
source_url = source_url.join("*.csv")?;
}
let ctx = ctx.child(o!("source_url" => source_url.as_str().to_owned()));
// Decide if we need to use a temp table.
let use_temp = !schema.bigquery_can_import_from_csv()? || if_exists.is_upsert();
let initial_table_name = if use_temp {
let initial_table_name =
dest.table_name.temporary_table_name(temporary_storage)?;
debug!(
ctx.log(),
"loading into temporary table {}", initial_table_name
);
initial_table_name
} else {
let initial_table_name = dest.table_name.clone();
debug!(
ctx.log(),
"loading directly into final table {}", initial_table_name,
);
initial_table_name
};
// Build the information we'll need about our initial table.
let initial_table = BqTable::for_table_name_and_columns(
initial_table_name,
&schema.columns,
if use_temp {
Usage::CsvLoad
} else {
Usage::FinalTable
},
)?;
// Write our schema to a temp file. This actually needs to be somewhere on
// disk, and `bq` uses various hueristics to detect that it's a file
// containing a schema, and not just a string with schema text. (Note this
// code is synchronous, but that's not a huge deal.)
//
// We use `use_temp` to decide whether to generate the final schema or a
// temporary schema that we'll fix later.
let tmp_dir = TempDir::new("bq_load")?;
let initial_schema_path = tmp_dir.path().join("schema.json");
let mut initial_schema_file = File::create(&initial_schema_path)?;
initial_table.write_json_schema(&mut initial_schema_file)?;
// Decide how to handle overwrites of the initial table.
let initial_table_replace = if use_temp {
"--replace"
} else {
if_exists_to_bq_load_arg(&if_exists)?
};
// Build and run a `bq load` command.
debug!(ctx.log(), "running `bq load`");
let load_child = Command::new("bq")
// These arguments can all be represented as UTF-8 `&str`.
.args(&[
"load",
"--headless",
"--skip_leading_rows=1",
&format!("--project_id={}", dest.project()),
initial_table_replace,
&initial_table.name().to_string(),
source_url.as_str(),
])
// Throw away stdout so it doesn't corrupt our output.
.stdout(Stdio::null())
// This argument is a path, and so it might contain non-UTF-8
// characters. We pass it separately because Rust won't allow us to
// create an array of mixed strings and paths.
.arg(&initial_schema_path)
.spawn_async()
.context("error starting `bq load`")?;
let status = load_child
.compat()
.await
.context("error running `bq load`")?;
if !status.success() {
return Err(format_err!("`bq load` failed with {}", status));
}
// If `use_temp` is false, then we're done. Otherwise, run the update SQL to
// build the final table (if needed).
if use_temp {
// Build a `BqTable` for our final table.
let dest_table = BqTable::for_table_name_and_columns(
dest.table_name.clone(),
&schema.columns,
Usage::FinalTable,
)?;
debug!(
ctx.log(),
"transforming data into final table {}",
dest_table.name(),
);
// If we're doing an upsert, make sure the destination table exists.
if if_exists.is_upsert() {
debug!(ctx.log(), "making sure table {} exists", dest_table.name(),);
let dest_schema_path = tmp_dir.path().join("schema.json");
let mut dest_schema_file = File::create(&dest_schema_path)?;
dest_table.write_json_schema(&mut dest_schema_file)?;
let mk_child = Command::new("bq")
// Use `--force` to ignore existing tables.
.args(&[
"mk",
"--headless",
"--force",
"--schema",
// --project_id actually makes this fail for some reason.
])
// Pass separately, because paths may not be UTF-8.
.arg(&dest_schema_path)
.arg(&dest_table.name().to_string())
// Throw away stdout so it doesn't corrupt our output.
.stdout(Stdio::null())
.spawn_async()
.context("error starting `bq mk`")?;
let status = mk_child.compat().await.context("error running `bq mk`")?;
if !status.success() {
return Err(format_err!("`bq mk` failed with {}", status));
}
}
// Generate our import query.
let mut query = Vec::new();
if let IfExists::Upsert(merge_keys) = &if_exists {
dest_table.write_merge_sql(
initial_table.name(),
&merge_keys[..],
&mut query,
)?;
} else {
dest_table.write_import_sql(initial_table.name(), &mut query)?;
}
debug!(ctx.log(), "import sql: {}", String::from_utf8_lossy(&query));
// Pipe our query text to `bq query`.
debug!(ctx.log(), "running `bq query`");
let mut query_command = Command::new("bq");
query_command
// We'll pass the query on `stdin`.
.stdin(Stdio::piped())
// Throw away stdout so it doesn't corrupt our output.
.stdout(Stdio::null())
// Run query with no output.
.args(&[
"query",
"--headless",
"--format=none",
if_exists_to_bq_load_arg(&if_exists)?,
"--nouse_legacy_sql",
&format!("--project_id={}", dest.project()),
]);
if !if_exists.is_upsert() {
query_command.arg(&format!("--destination_table={}", dest_table.name()));
}
let mut query_child = query_command
.spawn_async()
.context("error starting `bq query`")?;
let child_stdin = query_child
.stdin()
.take()
.expect("don't have stdio that we requested");
io::write_all(child_stdin, query)
.compat()
.await
.context("error piping query to `bq query`")?;
let status = query_child
.compat()
.await
.context("error running `bq query`")?;
if !status.success() {
return Err(format_err!("`bq query` failed with {}", status));
}
// Delete temp table.
debug!(
ctx.log(),
"deleting import temp table: {}",
initial_table.name()
);
let rm_child = Command::new("bq")
.args(&[
"rm",
"--headless",
"-f",
"-t",
&format!("--project_id={}", dest.project()),
&initial_table.name().to_string(),
])
// Throw away stdout so it doesn't corrupt our output.
.stdout(Stdio::null())
.spawn_async()
.context("error starting `bq rm`")?;
let status = rm_child.compat().await.context("error running `bq rm`")?;
if !status.success() {
return Err(format_err!("`bq rm` failed with {}", status));
}
}
Ok(vec![dest.boxed()])
}