1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
use crate::client::Config;
use anyhow::Result;
use async_trait::async_trait;

use crate::{BatchResult, ResultSet, Statement};

/// Database client. This is the main structure used to
/// communicate with the database.
pub struct Client {
    url: String,
    token: Option<String>,

    client: hrana_client::Client,
    client_future: hrana_client::ConnFut,
    stream: hrana_client::Stream,
}

impl Client {
    /// Creates a database client with JWT authentication.
    ///
    /// # Arguments
    /// * `url` - URL of the database endpoint
    /// * `token` - auth token
    pub async fn new(url: impl Into<String>, token: impl Into<String>) -> Result<Self> {
        let token = token.into();
        let token = if token.is_empty() { None } else { Some(token) };
        let url = url.into();

        let (client, client_future) = hrana_client::Client::connect(&url, token.clone()).await?;

        let stream = client.open_stream().await?;

        Ok(Self {
            url,
            token,
            client,
            client_future,
            stream,
        })
    }

    pub async fn reconnect(&mut self) -> Result<()> {
        let (client, client_future) =
            hrana_client::Client::connect(&self.url, self.token.clone()).await?;
        let stream = client.open_stream().await?;
        self.client = client;
        self.client_future = client_future;
        self.stream = stream;
        Ok(())
    }

    /// Creates a database client, given a `Url`
    ///
    /// # Arguments
    /// * `url` - `Url` object of the database endpoint. This cannot be a relative URL;
    ///
    /// # Examples
    ///
    /// ```
    /// # use libsql_client::reqwest::Client;
    /// use url::Url;
    ///
    /// let url = Url::parse("https://localhost:8080?jwt=<access token>").unwrap();
    /// let db = Client::from_url(url).unwrap();
    /// ```
    pub async fn from_url<T: TryInto<url::Url>>(url: T) -> anyhow::Result<Client>
    where
        <T as TryInto<url::Url>>::Error: std::fmt::Display,
    {
        let url: url::Url = url
            .try_into()
            .map_err(|e| anyhow::anyhow!(format!("{e}")))?;
        let url_str = if url.scheme() == "libsql" {
            let new_url = format!("wss://{}", url.as_str().strip_prefix("libsql://").unwrap());
            url::Url::parse(&new_url).unwrap().to_string()
        } else {
            url.to_string()
        };
        let mut params = url.query_pairs();
        // Try a jwt=XXX parameter first, continue if not found
        if let Some((_, token)) = params.find(|(param_key, _)| param_key == "jwt") {
            Client::new(url_str, token).await
        } else {
            Client::new(url_str, "").await
        }
    }

    /// Creates a database client from a `Config` object.
    pub async fn from_config(config: Config) -> Result<Self> {
        Self::new(config.url, config.auth_token.unwrap_or_default()).await
    }

    pub async fn shutdown(self) -> Result<()> {
        self.client.shutdown().await?;
        self.client_future.await?;
        Ok(())
    }
}

#[async_trait(?Send)]
impl crate::DatabaseClient for Client {
    async fn raw_batch(
        &self,
        stmts: impl IntoIterator<Item = impl Into<Statement>>,
    ) -> anyhow::Result<BatchResult> {
        let mut batch = hrana_client::proto::Batch::new();
        for stmt in stmts.into_iter() {
            let stmt: Statement = stmt.into();
            let mut hrana_stmt = hrana_client::proto::Stmt::new(stmt.sql, true);
            for param in stmt.args {
                hrana_stmt.bind(param);
            }
            batch.step(None, hrana_stmt);
        }

        self.stream
            .execute_batch(batch)
            .await
            .map_err(|e| anyhow::anyhow!("{}", e))
    }

    async fn execute(&self, stmt: impl Into<Statement>) -> Result<ResultSet> {
        let stmt: Statement = stmt.into();
        let mut hrana_stmt = hrana_client::proto::Stmt::new(stmt.sql, true);
        for param in stmt.args {
            hrana_stmt.bind(param);
        }

        self.stream
            .execute(hrana_stmt)
            .await
            .map(ResultSet::from)
            .map_err(|e| anyhow::anyhow!("{}", e))
    }
}