Skip to main content

redshift_iam/
redshift.rs

1use arrow::record_batch::RecordBatch;
2use connectorx::errors::ConnectorXOutError;
3use secrecy::{ExposeSecret, SecretString};
4
5/// Client for executing queries against an Amazon Redshift cluster.
6///
7/// Uses [ConnectorX](https://github.com/sfu-db/connector-x) under the hood and
8/// returns results as Arrow [`RecordBatch`]es.
9pub struct Redshift {
10    username: String,
11    password: String,
12    host: String,
13    port: Option<u16>,
14    database: String,
15    connection_string: Option<SecretString>,
16    // require_ssl: bool,
17}
18
19impl Redshift {
20    /// Creates a new `Redshift` client.
21    ///
22    /// `port` defaults to `5439` when `None`.
23    ///
24    /// # Examples
25    /// ```
26    /// use redshift_iam::Redshift;
27    /// use secrecy::ExposeSecret;
28    ///
29    /// let r = Redshift::new("alice", "s3cr3t", "my-host.example.com", None, "analytics");
30    /// let cs = r.connection_string();
31    /// assert!(cs.expose_secret().starts_with("postgresql://"));
32    /// assert!(cs.expose_secret().contains(":5439/"));
33    /// assert!(cs.expose_secret().contains("my-host.example.com"));
34    /// assert!(cs.expose_secret().contains("/analytics"));
35    /// ```
36    pub fn new(
37        username: impl ToString,
38        password: impl ToString,
39        host: impl ToString,
40        port: Option<u16>,
41        database: impl ToString,
42    ) -> Self {
43        Self {
44            username: username.to_string(),
45            password: password.to_string(),
46            host: host.to_string(),
47            port,
48            database: database.to_string(),
49            connection_string: None::<SecretString>,
50            // require_ssl: bool,
51        }
52    }
53
54    /// Builds the URL-encoded `postgresql://` connection string used by ConnectorX.
55    ///
56    /// Credentials are percent-encoded so that special characters (e.g. `@`) do not
57    /// corrupt the URL. The result is wrapped in a [`SecretString`] to prevent
58    /// accidental logging.
59    ///
60    /// # Examples
61    /// ```
62    /// use redshift_iam::Redshift;
63    /// use secrecy::ExposeSecret;
64    ///
65    /// // Special characters in passwords are percent-encoded
66    /// let r = Redshift::new("user", "p@ssword", "host", None, "db");
67    /// let cs = r.connection_string();
68    /// assert!(cs.expose_secret().contains("p%40ssword"));
69    /// assert!(cs.expose_secret().contains("cxprotocol=cursor"));
70    /// ```
71    pub fn connection_string(&self) -> SecretString {
72        if let Some(connection_string) = &self.connection_string {
73            return connection_string.clone();
74        }
75        let uri = format!(
76            "postgresql://{}:{}/{}?cxprotocol=cursor",
77            self.host,
78            self.port.unwrap_or(5439),
79            self.database,
80        );
81        let mut redshift_url = reqwest::Url::parse(&uri).unwrap();
82        // URL-encode credentials
83        redshift_url.set_username(&self.username).unwrap();
84        redshift_url.set_password(Some(&self.password)).unwrap();
85
86        SecretString::new(redshift_url.as_str().to_string().into_boxed_str())
87    }
88
89    /// Executes `query` and returns the results as a `Vec<RecordBatch>`.
90    pub fn execute(&self, query: impl ToString) -> Result<Vec<RecordBatch>, ConnectorXOutError> {
91        // could make more flexible by letting user specify output format
92        let destination = connectorx::get_arrow::get_arrow(
93            &connectorx::source_router::SourceConn::try_from(
94                self.connection_string().expose_secret(),
95            )
96            .unwrap(),
97            None,
98            &[connectorx::sql::CXQuery::Naked(query.to_string())],
99            None,
100        )?;
101        Ok(destination.arrow()?)
102    }
103}