1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use crate::client::Config;
use anyhow::Result;
use async_trait::async_trait;
use crate::{BatchResult, ResultSet, Statement};
pub struct Client {
client: hrana_client::Client,
client_future: hrana_client::ConnFut,
stream: hrana_client::Stream,
}
impl Client {
pub async fn new(url: impl Into<String>, token: impl Into<String>) -> Result<Self> {
let token = token.into();
let url = url.into();
let (client, client_future) =
hrana_client::Client::connect(&url, if token.is_empty() { None } else { Some(token) })
.await?;
let stream = client.open_stream().await?;
Ok(Self {
client,
client_future,
stream,
})
}
pub async fn from_url<T: TryInto<url::Url>>(url: T) -> anyhow::Result<Client>
where
<T as TryInto<url::Url>>::Error: std::fmt::Display,
{
let url: url::Url = url
.try_into()
.map_err(|e| anyhow::anyhow!(format!("{e}")))?;
let url_str = if url.scheme() == "libsql" {
let new_url = format!("wss://{}", url.as_str().strip_prefix("libsql://").unwrap());
url::Url::parse(&new_url).unwrap().to_string()
} else {
url.to_string()
};
let mut params = url.query_pairs();
if let Some((_, token)) = params.find(|(param_key, _)| param_key == "jwt") {
Client::new(url_str, token).await
} else {
Client::new(url_str, "").await
}
}
pub async fn from_config(config: Config) -> Result<Self> {
Self::new(config.url, config.auth_token.unwrap_or_default()).await
}
pub async fn shutdown(self) -> Result<()> {
self.client.shutdown().await?;
self.client_future.await?;
Ok(())
}
}
#[async_trait(?Send)]
impl crate::DatabaseClient for Client {
async fn raw_batch(
&self,
stmts: impl IntoIterator<Item = impl Into<Statement>>,
) -> anyhow::Result<BatchResult> {
let mut batch = hrana_client::proto::Batch::new();
for stmt in stmts.into_iter() {
let stmt: Statement = stmt.into();
let mut hrana_stmt = hrana_client::proto::Stmt::new(stmt.sql, true);
for param in stmt.args {
hrana_stmt.bind(param);
}
batch.step(None, hrana_stmt);
}
self.stream
.execute_batch(batch)
.await
.map_err(|e| anyhow::anyhow!("{}", e))
}
async fn execute(&self, stmt: impl Into<Statement>) -> Result<ResultSet> {
let stmt: Statement = stmt.into();
let mut hrana_stmt = hrana_client::proto::Stmt::new(stmt.sql, true);
for param in stmt.args {
hrana_stmt.bind(param);
}
self.stream
.execute(hrana_stmt)
.await
.map(ResultSet::from)
.map_err(|e| anyhow::anyhow!("{}", e))
}
}