1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use super::{credentials_sql, RedshiftLocator};
use crate::common::*;
use crate::drivers::{
postgres::{connect, prepare_table},
postgres_shared::{pg_quote, PgCreateTable, TableName},
s3::S3Locator,
};
use crate::schema::{Column, DataType};
pub(crate) async fn write_remote_data_helper(
ctx: Context,
source: BoxLocator,
dest: RedshiftLocator,
shared_args: SharedArguments<Unverified>,
source_args: SourceArguments<Unverified>,
dest_args: DestinationArguments<Unverified>,
) -> Result<Vec<BoxLocator>> {
let source_url = source
.as_any()
.downcast_ref::<S3Locator>()
.ok_or_else(|| format_err!("not a s3:// locator: {}", source))?
.as_url()
.to_owned();
let ctx = ctx.child(o!("source_url" => source_url.as_str().to_owned()));
let shared_args = shared_args.verify(RedshiftLocator::features())?;
let _source_args = source_args.verify(Features::empty())?;
let dest_args = dest_args.verify(RedshiftLocator::features())?;
let schema = shared_args.schema();
let to_args = dest_args.driver_args();
let if_exists = dest_args.if_exists().to_owned();
schema.verify_redshift_can_import_from_csv()?;
let table_name = dest.table_name();
let pg_create_table =
PgCreateTable::from_name_and_columns(table_name.to_owned(), &schema.columns)?;
let mut client = connect(ctx.clone(), dest.url().to_owned()).await?;
prepare_table(&ctx, &mut client, pg_create_table.clone(), &if_exists).await?;
let copy_sql = format!(
"COPY {dest} FROM {source}\n{credentials}FORMAT CSV\nIGNOREHEADER 1",
dest = TableName(table_name),
source = pg_quote(source_url.as_str()),
credentials = credentials_sql(to_args)?,
);
let copy_stmt = client.prepare(©_sql).compat().await?;
client
.execute(©_stmt, &[])
.compat()
.await
.with_context(|_| {
format!("error copying {} from {}", pg_create_table.name, source_url)
})?;
Ok(vec![dest.boxed()])
}
trait VerifyRedshiftCanImportFromCsv {
fn verify_redshift_can_import_from_csv(&self) -> Result<()>;
}
impl VerifyRedshiftCanImportFromCsv for Table {
fn verify_redshift_can_import_from_csv(&self) -> Result<()> {
for col in &self.columns {
col.verify_redshift_can_import_from_csv()?;
}
Ok(())
}
}
impl VerifyRedshiftCanImportFromCsv for Column {
fn verify_redshift_can_import_from_csv(&self) -> Result<()> {
self.data_type
.verify_redshift_can_import_from_csv()
.with_context(|_| format!("cannot import column {:?}", self.name))?;
Ok(())
}
}
impl VerifyRedshiftCanImportFromCsv for DataType {
fn verify_redshift_can_import_from_csv(&self) -> Result<()> {
match self {
DataType::Bool
| DataType::Date
| DataType::Float32
| DataType::Float64
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Text
| DataType::TimestampWithoutTimeZone
| DataType::TimestampWithTimeZone => Ok(()),
DataType::Array(_)
| DataType::Decimal
| DataType::GeoJson(_)
| DataType::Json
| DataType::Other(_)
| DataType::Uuid => Err(format_err!(
"Redshift driver does not support data type {:?}",
self
)),
}
}
}