1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
pub use ps_driver_deserializer::Database;
pub use querybuilder::QueryBuilder;
pub use response::Deserializer;

use anyhow::Result;
use structs::{ExecuteRequest, ExecuteResponse, Session, VitessError};
use utils::to_base64;

mod querybuilder;
mod response;
mod structs;
mod utils;

#[derive(Clone)]
pub struct PSConnection {
    pub host: String,
    pub auth: String,
    pub session: Option<Session>,
    pub client: reqwest::Client,
}

impl PSConnection {
    /// Creates a new connection
    pub fn new(host: &str, username: &str, password: &str) -> Self {
        Self {
            host: host.into(),
            auth: format!("Basic {}", to_base64(&format!("{}:{}", username, password))),
            session: None,
            client: reqwest::Client::new(),
        }
    }

    /// Executes a SQL query
    pub async fn execute(&self, query: &str) -> Result<ExecuteResponse> {
        let url = format!("https://{}/psdb.v1alpha1.Database/Execute", self.host);
        let sql = ExecuteRequest {
            query: query.into(),
            session: None,
        };

        let res: ExecuteResponse = post(self, &url, sql).await?;
        Ok(res)
    }

    /// Execute a multiple SQL queries using transactions
    pub async fn transaction(&self, q: Vec<QueryBuilder>) -> Result<()> {
        let mut conn = self.clone();
        conn.execute_session("BEGIN").await?;

        for query in q {
            let res = query.execute_session(&mut conn).await?;
            if let Some(err) = res.error {
                conn.execute_session("ROLLBACK").await?;
                anyhow::bail!("Code: \"{}\", message: \"{}\"", err.code, err.message);
            }
        }

        conn.execute_session("COMMIT").await?;
        return Ok(());
    }

    pub async fn execute_session(&mut self, query: &str) -> Result<ExecuteResponse> {
        let url = format!("https://{}/psdb.v1alpha1.Database/Execute", self.host);
        let sql = ExecuteRequest {
            query: query.into(),
            session: self.session.clone(),
        };

        let res: ExecuteResponse = post(self, &url, sql).await?;
        self.session = Some(res.session.clone());

        Ok(res)
    }

    /// Refreshes the session
    pub async fn refresh(&mut self) -> Result<()> {
        let url = format!("https://{}/psdb.v1alpha1.Database/CreateSession", self.host);
        let res: ExecuteResponse = post_wob(self, &url).await?;
        self.session = Some(res.session.clone());

        Ok(())
    }
}

// MAYBE ![CFG] THIS?
async fn post<B, R>(connection: &PSConnection, url: &str, body: B) -> Result<R>
where
    B: serde::Serialize,
    R: serde::de::DeserializeOwned,
{
    let req = connection
        .client
        .post(url)
        .header("Content-Type", "application/json")
        .header("User-Agent", "database-rust/0.1.0")
        .header("Authorization", &connection.auth)
        .body(serde_json::to_string(&body)?);
    let res = req.send().await?;

    if !res.status().is_success() {
        let error: VitessError = serde_json::from_str(&res.text().await?)?;
        anyhow::bail!("Code: \"{}\", message: \"{}\"", error.code, error.message);
    }

    Ok(serde_json::from_str(&res.text().await?)?)
}

async fn post_wob<R>(connection: &PSConnection, url: &str) -> Result<R>
where
    R: serde::de::DeserializeOwned,
{
    let req = connection
        .client
        .post(url)
        .header("Content-Type", "application/json")
        .header("User-Agent", "database-rust/0.1.0")
        .header("Authorization", &connection.auth)
        .body("{}");
    let res = req.send().await?;

    if !res.status().is_success() {
        let error: VitessError = serde_json::from_str(&res.text().await?)?;
        anyhow::bail!("Code: \"{}\", message: \"{}\"", error.code, error.message);
    }

    Ok(serde_json::from_str(&res.text().await?)?)
}