Skip to main content

redshift_iam/
redshift.rs

1#[cfg(feature = "read_sql")]
2use arrow::record_batch::RecordBatch;
3#[cfg(feature = "read_sql")]
4use connectorx::errors::ConnectorXOutError;
5#[cfg(feature = "read_sql")]
6use secrecy::ExposeSecret;
7use secrecy::SecretString;
8
9/// Client for executing queries against an Amazon Redshift cluster.
10///
11/// Uses [ConnectorX](https://github.com/sfu-db/connector-x) under the hood and
12/// returns results as Arrow [`RecordBatch`]es.
13pub struct Redshift {
14    username: String,
15    password: String,
16    host: String,
17    port: Option<u16>,
18    database: String,
19    connection_string: Option<SecretString>,
20    // require_ssl: bool,
21}
22
23impl Redshift {
24    /// Creates a new `Redshift` client.
25    ///
26    /// `port` defaults to `5439` when `None`.
27    ///
28    /// # Examples
29    /// ```
30    /// use redshift_iam::Redshift;
31    /// use secrecy::ExposeSecret;
32    ///
33    /// let r = Redshift::new("alice", "s3cr3t", "my-host.example.com", None, "analytics");
34    /// let cs = r.connection_string();
35    /// assert!(cs.expose_secret().starts_with("postgresql://"));
36    /// assert!(cs.expose_secret().contains(":5439/"));
37    /// assert!(cs.expose_secret().contains("my-host.example.com"));
38    /// assert!(cs.expose_secret().contains("/analytics"));
39    /// ```
40    pub fn new(
41        username: impl ToString,
42        password: impl ToString,
43        host: impl ToString,
44        port: Option<u16>,
45        database: impl ToString,
46    ) -> Self {
47        Self {
48            username: username.to_string(),
49            password: password.to_string(),
50            host: host.to_string(),
51            port,
52            database: database.to_string(),
53            connection_string: None::<SecretString>,
54            // require_ssl: bool,
55        }
56    }
57
58    /// Builds the URL-encoded `postgresql://` connection string used by ConnectorX.
59    ///
60    /// Credentials are percent-encoded so that special characters (e.g. `@`) do not
61    /// corrupt the URL. The result is wrapped in a [`SecretString`] to prevent
62    /// accidental logging.
63    ///
64    /// # Examples
65    /// ```
66    /// use redshift_iam::Redshift;
67    /// use secrecy::ExposeSecret;
68    ///
69    /// // Special characters in passwords are percent-encoded
70    /// let r = Redshift::new("user", "p@ssword", "host", None, "db");
71    /// let cs = r.connection_string();
72    /// assert!(cs.expose_secret().contains("p%40ssword"));
73    /// assert!(cs.expose_secret().contains("cxprotocol=cursor"));
74    /// ```
75    pub fn connection_string(&self) -> SecretString {
76        if let Some(connection_string) = &self.connection_string {
77            return connection_string.clone();
78        }
79        let uri = format!(
80            "postgresql://{}:{}/{}?cxprotocol=cursor",
81            self.host,
82            self.port.unwrap_or(5439),
83            self.database,
84        );
85        let mut redshift_url = reqwest::Url::parse(&uri).unwrap();
86        // URL-encode credentials
87        redshift_url.set_username(&self.username).unwrap();
88        redshift_url.set_password(Some(&self.password)).unwrap();
89
90        SecretString::new(redshift_url.as_str().to_string().into_boxed_str())
91    }
92
93    /// Executes `query` and returns the results as a `Vec<RecordBatch>`.
94    #[cfg(feature = "read_sql")]
95    pub fn execute(&self, query: impl ToString) -> Result<Vec<RecordBatch>, ConnectorXOutError> {
96        // could make more flexible by letting user specify output format
97        let destination = connectorx::get_arrow::get_arrow(
98            &connectorx::source_router::SourceConn::try_from(
99                self.connection_string().expose_secret(),
100            )
101            .unwrap(),
102            None,
103            &[connectorx::sql::CXQuery::Naked(query.to_string())],
104            None,
105        )?;
106        Ok(destination.arrow()?)
107    }
108}