planetscale_driver/
connections.rs

1use crate::{
2    request::{post, post_raw},
3    structs::{ExecuteRequest, ExecuteResponse, Session},
4    utils::to_base64,
5};
6use anyhow::Result;
7use std::{env, sync::Arc};
8use tokio::sync::Mutex;
9
10#[derive(Clone)]
11pub struct PSConnection {
12    pub(crate) host: String,
13    pub(crate) auth: String,
14    pub session: Arc<Mutex<Option<Session>>>,
15    pub client: reqwest::Client,
16}
17
18impl PSConnection {
19    /// Creates a new connection
20    pub fn new(host: &str, username: &str, password: &str) -> Self {
21        Self {
22            host: host.into(),
23            auth: format!("Basic {}", to_base64(&format!("{}:{}", username, password))),
24            session: Arc::new(Mutex::new(None)),
25            client: reqwest::Client::new(),
26        }
27    }
28
29    /// Creates a new connection from the environment variables (DATABASE_HOST, DATABASE_USERNAME, DATABASE_PASSWORD)
30    pub fn new_from_env() -> Result<Self> {
31        Ok(PSConnection::new(
32            &env::var("DATABASE_HOST")?,
33            &env::var("DATABASE_USERNAME")?,
34            &env::var("DATABASE_PASSWORD")?,
35        ))
36    }
37
38    /// Execute a SQL query
39    pub async fn execute(&self, query: &str) -> Result<()> {
40        self.execute_raw(query).await?;
41        Ok(())
42    }
43
44    /// Execute a SQL query and return the raw response
45    pub async fn execute_raw(&self, query: &str) -> Result<ExecuteResponse> {
46        let url = format!("https://{}/psdb.v1alpha1.Database/Execute", self.host);
47        let sql = ExecuteRequest {
48            query: query.into(),
49            session: self.session.lock().await.clone(),
50        };
51
52        let res: ExecuteResponse = post(self, &url, sql).await?;
53        self.session.lock().await.replace(res.session.clone());
54
55        if let Some(err) = res.error {
56            anyhow::bail!("Code: \"{}\", message: \"{}\"", err.code, err.message);
57        }
58
59        Ok(res)
60    }
61
62    /// As the name suggests, this function is making a transaction
63    pub async fn transaction<F, Fut>(&self, f: F) -> Result<()>
64    where
65        F: FnOnce(Self) -> Fut,
66        Fut: std::future::Future<Output = Result<()>>,
67    {
68        self.execute("BEGIN").await?;
69        let res = f(self.clone()).await;
70        if res.is_err() {
71            self.execute("ROLLBACK").await?;
72            return res;
73        }
74
75        self.execute("COMMIT").await?;
76        Ok(())
77    }
78
79    /// Refreshes the session
80    pub async fn refresh(&self) -> Result<()> {
81        let url = format!("https://{}/psdb.v1alpha1.Database/CreateSession", self.host);
82        let res: ExecuteResponse = post_raw(self, &url, String::from("{}")).await?;
83        self.session.lock().await.replace(res.session.clone());
84
85        Ok(())
86    }
87}