planetscale_driver/
connections.rs1use crate::{
2 request::{post, post_raw},
3 structs::{ExecuteRequest, ExecuteResponse, Session},
4 utils::to_base64,
5};
6use anyhow::Result;
7use std::{env, sync::Arc};
8use tokio::sync::Mutex;
9
10#[derive(Clone)]
11pub struct PSConnection {
12 pub(crate) host: String,
13 pub(crate) auth: String,
14 pub session: Arc<Mutex<Option<Session>>>,
15 pub client: reqwest::Client,
16}
17
18impl PSConnection {
19 pub fn new(host: &str, username: &str, password: &str) -> Self {
21 Self {
22 host: host.into(),
23 auth: format!("Basic {}", to_base64(&format!("{}:{}", username, password))),
24 session: Arc::new(Mutex::new(None)),
25 client: reqwest::Client::new(),
26 }
27 }
28
29 pub fn new_from_env() -> Result<Self> {
31 Ok(PSConnection::new(
32 &env::var("DATABASE_HOST")?,
33 &env::var("DATABASE_USERNAME")?,
34 &env::var("DATABASE_PASSWORD")?,
35 ))
36 }
37
38 pub async fn execute(&self, query: &str) -> Result<()> {
40 self.execute_raw(query).await?;
41 Ok(())
42 }
43
44 pub async fn execute_raw(&self, query: &str) -> Result<ExecuteResponse> {
46 let url = format!("https://{}/psdb.v1alpha1.Database/Execute", self.host);
47 let sql = ExecuteRequest {
48 query: query.into(),
49 session: self.session.lock().await.clone(),
50 };
51
52 let res: ExecuteResponse = post(self, &url, sql).await?;
53 self.session.lock().await.replace(res.session.clone());
54
55 if let Some(err) = res.error {
56 anyhow::bail!("Code: \"{}\", message: \"{}\"", err.code, err.message);
57 }
58
59 Ok(res)
60 }
61
62 pub async fn transaction<F, Fut>(&self, f: F) -> Result<()>
64 where
65 F: FnOnce(Self) -> Fut,
66 Fut: std::future::Future<Output = Result<()>>,
67 {
68 self.execute("BEGIN").await?;
69 let res = f(self.clone()).await;
70 if res.is_err() {
71 self.execute("ROLLBACK").await?;
72 return res;
73 }
74
75 self.execute("COMMIT").await?;
76 Ok(())
77 }
78
79 pub async fn refresh(&self) -> Result<()> {
81 let url = format!("https://{}/psdb.v1alpha1.Database/CreateSession", self.host);
82 let res: ExecuteResponse = post_raw(self, &url, String::from("{}")).await?;
83 self.session.lock().await.replace(res.session.clone());
84
85 Ok(())
86 }
87}