redshift_iam/redshift.rs
1use arrow::record_batch::RecordBatch;
2use connectorx::errors::ConnectorXOutError;
3use secrecy::{ExposeSecret, SecretString};
4
5/// Client for executing queries against an Amazon Redshift cluster.
6///
7/// Uses [ConnectorX](https://github.com/sfu-db/connector-x) under the hood and
8/// returns results as Arrow [`RecordBatch`]es.
9pub struct Redshift {
10 username: String,
11 password: String,
12 host: String,
13 port: Option<u16>,
14 database: String,
15 connection_string: Option<SecretString>,
16 // require_ssl: bool,
17}
18
19impl Redshift {
20 /// Creates a new `Redshift` client.
21 ///
22 /// `port` defaults to `5439` when `None`.
23 ///
24 /// # Examples
25 /// ```
26 /// use redshift_iam::Redshift;
27 /// use secrecy::ExposeSecret;
28 ///
29 /// let r = Redshift::new("alice", "s3cr3t", "my-host.example.com", None, "analytics");
30 /// let cs = r.connection_string();
31 /// assert!(cs.expose_secret().starts_with("postgresql://"));
32 /// assert!(cs.expose_secret().contains(":5439/"));
33 /// assert!(cs.expose_secret().contains("my-host.example.com"));
34 /// assert!(cs.expose_secret().contains("/analytics"));
35 /// ```
36 pub fn new(
37 username: impl ToString,
38 password: impl ToString,
39 host: impl ToString,
40 port: Option<u16>,
41 database: impl ToString,
42 ) -> Self {
43 Self {
44 username: username.to_string(),
45 password: password.to_string(),
46 host: host.to_string(),
47 port,
48 database: database.to_string(),
49 connection_string: None::<SecretString>,
50 // require_ssl: bool,
51 }
52 }
53
54 /// Builds the URL-encoded `postgresql://` connection string used by ConnectorX.
55 ///
56 /// Credentials are percent-encoded so that special characters (e.g. `@`) do not
57 /// corrupt the URL. The result is wrapped in a [`SecretString`] to prevent
58 /// accidental logging.
59 ///
60 /// # Examples
61 /// ```
62 /// use redshift_iam::Redshift;
63 /// use secrecy::ExposeSecret;
64 ///
65 /// // Special characters in passwords are percent-encoded
66 /// let r = Redshift::new("user", "p@ssword", "host", None, "db");
67 /// let cs = r.connection_string();
68 /// assert!(cs.expose_secret().contains("p%40ssword"));
69 /// assert!(cs.expose_secret().contains("cxprotocol=cursor"));
70 /// ```
71 pub fn connection_string(&self) -> SecretString {
72 if let Some(connection_string) = &self.connection_string {
73 return connection_string.clone();
74 }
75 let uri = format!(
76 "postgresql://{}:{}/{}?cxprotocol=cursor",
77 self.host,
78 self.port.unwrap_or(5439),
79 self.database,
80 );
81 let mut redshift_url = reqwest::Url::parse(&uri).unwrap();
82 // URL-encode credentials
83 redshift_url.set_username(&self.username).unwrap();
84 redshift_url.set_password(Some(&self.password)).unwrap();
85
86 SecretString::new(redshift_url.as_str().to_string().into_boxed_str())
87 }
88
89 /// Executes `query` and returns the results as a `Vec<RecordBatch>`.
90 pub fn execute(&self, query: impl ToString) -> Result<Vec<RecordBatch>, ConnectorXOutError> {
91 // could make more flexible by letting user specify output format
92 let destination = connectorx::get_arrow::get_arrow(
93 &connectorx::source_router::SourceConn::try_from(
94 self.connection_string().expose_secret(),
95 )
96 .unwrap(),
97 None,
98 &[connectorx::sql::CXQuery::Naked(query.to_string())],
99 None,
100 )?;
101 Ok(destination.arrow()?)
102 }
103}