redshift_iam/redshift.rs
1#[cfg(feature = "read_sql")]
2use arrow::record_batch::RecordBatch;
3#[cfg(feature = "read_sql")]
4use connectorx::errors::ConnectorXOutError;
5#[cfg(feature = "read_sql")]
6use secrecy::ExposeSecret;
7use secrecy::SecretString;
8
9/// Client for executing queries against an Amazon Redshift cluster.
10///
11/// Uses [ConnectorX](https://github.com/sfu-db/connector-x) under the hood and
12/// returns results as Arrow [`RecordBatch`]es.
13pub struct Redshift {
14 username: String,
15 password: String,
16 host: String,
17 port: Option<u16>,
18 database: String,
19 connection_string: Option<SecretString>,
20 // require_ssl: bool,
21}
22
23impl Redshift {
24 /// Creates a new `Redshift` client.
25 ///
26 /// `port` defaults to `5439` when `None`.
27 ///
28 /// # Examples
29 /// ```
30 /// use redshift_iam::Redshift;
31 /// use secrecy::ExposeSecret;
32 ///
33 /// let r = Redshift::new("alice", "s3cr3t", "my-host.example.com", None, "analytics");
34 /// let cs = r.connection_string();
35 /// assert!(cs.expose_secret().starts_with("postgresql://"));
36 /// assert!(cs.expose_secret().contains(":5439/"));
37 /// assert!(cs.expose_secret().contains("my-host.example.com"));
38 /// assert!(cs.expose_secret().contains("/analytics"));
39 /// ```
40 pub fn new(
41 username: impl ToString,
42 password: impl ToString,
43 host: impl ToString,
44 port: Option<u16>,
45 database: impl ToString,
46 ) -> Self {
47 Self {
48 username: username.to_string(),
49 password: password.to_string(),
50 host: host.to_string(),
51 port,
52 database: database.to_string(),
53 connection_string: None::<SecretString>,
54 // require_ssl: bool,
55 }
56 }
57
58 /// Builds the URL-encoded `postgresql://` connection string used by ConnectorX.
59 ///
60 /// Credentials are percent-encoded so that special characters (e.g. `@`) do not
61 /// corrupt the URL. The result is wrapped in a [`SecretString`] to prevent
62 /// accidental logging.
63 ///
64 /// # Examples
65 /// ```
66 /// use redshift_iam::Redshift;
67 /// use secrecy::ExposeSecret;
68 ///
69 /// // Special characters in passwords are percent-encoded
70 /// let r = Redshift::new("user", "p@ssword", "host", None, "db");
71 /// let cs = r.connection_string();
72 /// assert!(cs.expose_secret().contains("p%40ssword"));
73 /// assert!(cs.expose_secret().contains("cxprotocol=cursor"));
74 /// ```
75 pub fn connection_string(&self) -> SecretString {
76 if let Some(connection_string) = &self.connection_string {
77 return connection_string.clone();
78 }
79 let uri = format!(
80 "postgresql://{}:{}/{}?cxprotocol=cursor",
81 self.host,
82 self.port.unwrap_or(5439),
83 self.database,
84 );
85 let mut redshift_url = reqwest::Url::parse(&uri).unwrap();
86 // URL-encode credentials
87 redshift_url.set_username(&self.username).unwrap();
88 redshift_url.set_password(Some(&self.password)).unwrap();
89
90 SecretString::new(redshift_url.as_str().to_string().into_boxed_str())
91 }
92
93 /// Executes `query` and returns the results as a `Vec<RecordBatch>`.
94 #[cfg(feature = "read_sql")]
95 pub fn execute(&self, query: impl ToString) -> Result<Vec<RecordBatch>, ConnectorXOutError> {
96 // could make more flexible by letting user specify output format
97 let destination = connectorx::get_arrow::get_arrow(
98 &connectorx::source_router::SourceConn::try_from(
99 self.connection_string().expose_secret(),
100 )
101 .unwrap(),
102 None,
103 &[connectorx::sql::CXQuery::Naked(query.to_string())],
104 None,
105 )?;
106 Ok(destination.arrow()?)
107 }
108}