ydb_unofficial/sqlx/
connection.rs1use std::str::FromStr;
2use std::time::Duration;
3use super::YdbError;
4use super::database::Ydb;
5use super::executor::{YdbExecutor, YdbSchemeExecutor};
6use futures::Future;
7use sqlx_core::transaction::{Transaction, TransactionManager};
8use sqlx_core::pool::MaybePoolConnection;
9use tonic::codegen::futures_core::future::BoxFuture;
10use sqlx_core::connection::{ConnectOptions, Connection};
11use ydb_grpc_bindings::generated::ydb;
12use ydb::table::{TransactionControl, BeginTransactionRequest};
13use ydb::table::transaction_control::TxSelector;
14use ydb::table::TransactionSettings;
15use ydb::table::transaction_settings::TxMode;
16use crate::{AsciiValue, YdbTransaction};
17use crate::auth::UpdatableToken;
18use crate::client::YdbEndpoint;
19
20use crate::payload::YdbResponseWithResult;
21
22pub struct YdbConnection {
23 inner: crate::YdbConnection<UpdatableToken>,
24 options: YdbConnectOptions,
25 tx_control: TransactionControl,
26 log_options: LogOptions,
27 pub retry: bool,
28}
29#[derive(Default, Clone, Copy, Debug)]
30pub struct LogOptions {
31 level: Option<log::Level>,
32 slow: Option<(log::Level, Duration)>
33}
34
35impl LogOptions {
36 pub async fn wrap<F: Future>(self, msg: &str, f: F) -> F::Output {
37 if let Some(level) = self.level {
38 log::log!(level, "{}", msg);
39 }
40 let start = std::time::Instant::now();
41 let result = f.await;
42 if let Some((level, duration)) = self.slow {
43 if start.elapsed() > duration {
44 log::log!(level, "{} execution time exeeded", msg);
45 }
46 }
47 result
48 }
49}
50
51#[derive(Debug, Clone)]
52pub struct YdbConnectOptions {
53 endpoint: YdbEndpoint,
54 db_name: AsciiValue,
55 creds: UpdatableToken,
56 log_options: LogOptions,
57}
58
59impl YdbConnectOptions {
60 pub fn with_creds(mut self, creds: UpdatableToken) -> Self {
61 self.creds = creds;
62 self
63 }
64}
65
66impl FromStr for YdbConnectOptions {
67 type Err = sqlx_core::Error;
68
69 fn from_str(s: &str) -> Result<Self, Self::Err> {
70 let url = sqlx_core::Url::try_from(s).map_err(|e|sqlx_core::Error::Configuration(format!("Cannot parse connection string as url: {e}").into()))?;
71 Self::from_url(&url)
72 }
73}
74
75#[test]
76fn test_conn_options_from_str() {
77 let options = YdbConnectOptions::from_str("ydbs://ydb.serverless.yandexcloud.net:2135/ru-central1/some-anfslndundf908/234ndfnsdkjf").unwrap();
78 assert!(options.endpoint.ssl);
79 assert_eq!(options.endpoint.host, "ydb.serverless.yandexcloud.net");
80 assert_eq!(options.endpoint.port, 2135);
81 assert_eq!(options.db_name.as_bytes(), "/ru-central1/some-anfslndundf908/234ndfnsdkjf".as_bytes());
82}
83
84fn default_tx_control() -> TransactionControl {
85 TransactionControl {
86 commit_tx: true,
87 tx_selector: Some(TxSelector::BeginTx(TransactionSettings {
88 tx_mode: Some(TxMode::SerializableReadWrite(Default::default()))
90 }))
91 }
92}
93
94impl ConnectOptions for YdbConnectOptions {
95 type Connection = YdbConnection;
96
97 fn from_url(url: &sqlx_core::Url) -> Result<Self, sqlx_core::Error> {
98 use sqlx_core::Error::Configuration as ConfErr;
99 let ssl = match url.scheme() {
100 "ydb" | "grpc" => false,
101 "ydbs" | "grpcs" => true,
102 _ => return Err(ConfErr("Unknown scheme".into()))
103 };
104 let host = url.host_str().ok_or(ConfErr("no host".into()))?.into();
105 let port = url.port().ok_or(ConfErr("no port".into()))?;
106 let mut db_name = url.path().try_into().map_err(|e|ConfErr(format!("cannot parse database name: {e}").into()))?;
107 let endpoint = YdbEndpoint { ssl, host, port, load_factor: 0.0 };
108 let mut creds = UpdatableToken::new("".try_into().unwrap());
109 for (k,v) in url.query_pairs() {
110 match k.as_ref() {
111 "database" => {
112 db_name = v.as_ref().try_into().map_err(|e|ConfErr(format!("cannot parse database name: {e}").into()))?;
113 }
114 "token" => {
115 let token = v.as_ref().try_into().map_err(|e|ConfErr(format!("cannot parse token: {e}").into()))?;
116 creds = UpdatableToken::new(token);
117 break;
118 }
119 #[cfg(feature = "auth-sa")]
120 "sa-key" => {
121 let file = std::fs::File::open(v.as_ref()).map_err(|e|ConfErr(format!("cannot open sa file: {e}").into()))?;
122 use crate::auth::sa::*;
123 let key: ServiceAccountKey = serde_json::from_reader(file).map_err(|e|ConfErr(format!("cannot parse sa file: {e}").into()))?;
124 creds = futures::executor::block_on(async {
125 ServiceAccountCredentials::create(key).await
126 })
127 .map_err(YdbError::from)?.into();
128 break;
129 }
130 _ => {}
131 }
132 };
133 Ok(Self{endpoint, db_name, creds, log_options: Default::default()})
134 }
135
136 fn connect(&self) -> BoxFuture<'_, Result<Self::Connection, sqlx_core::Error>>
137 where
138 Self::Connection: Sized { let channel = self.endpoint.make_endpoint().connect_lazy();
140 let mut inner = crate::YdbConnection::new(channel, self.db_name.clone(), self.creds.clone());
141 let tx_control = default_tx_control();
142 let log_options = self.log_options;
143 Box::pin(async move {
144 let _ = inner.table().await?;
145 Ok(YdbConnection { inner, options: self.clone(), tx_control, log_options, retry: true })
146 })
147 }
148
149 fn log_statements(mut self, level: log::LevelFilter) -> Self {
150 if let Some(level) = level.to_level() {
151 self.log_options.level = Some(level);
152 } else {
153 self.log_options.level = None;
154 }
155 self
156 }
157
158 fn log_slow_statements(mut self, level: log::LevelFilter, duration: std::time::Duration) -> Self {
159 if let Some(level) = level.to_level() {
160 self.log_options.slow = Some((level, duration));
161 } else {
162 self.log_options.slow = None;
163 }
164 self
165 }
166}
167
168impl Connection for YdbConnection {
169 type Database = Ydb;
170
171 type Options = YdbConnectOptions;
172
173 fn close(mut self) -> BoxFuture<'static, Result<(), sqlx_core::Error>> {
174 Box::pin(async move{
175 self.inner.close_session().await?;
176 Ok(())
177 })
178 }
179
180 fn close_hard(self) -> BoxFuture<'static, Result<(), sqlx_core::Error>> {
181 self.inner.close_session_hard();
182 Box::pin(async {Ok(())})
183 }
184
185 fn ping(&mut self) -> BoxFuture<'_, Result<(), sqlx_core::Error>> { Box::pin( async {
186 self.inner.table() .await?.keep_alive(Default::default()).await?;
188 Ok(())
189 })}
190
191 fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Ydb>, sqlx_core::Error>> where Self: Sized {
192 Transaction::begin(MaybePoolConnection::Connection(self))
193 }
194 fn shrink_buffers(&mut self) {}
195 fn flush(&mut self) -> BoxFuture<'_, Result<(), sqlx_core::Error>> {
196 Box::pin(futures::future::ok(()))
197 }
198 fn should_flush(&self) -> bool {false}
199}
200
201impl YdbConnection {
202 pub fn executor(&mut self) -> Result<YdbExecutor<'_>, YdbError> {
204 let tx_control = self.tx_control.clone();
205 let log_options = self.log_options;
206 let table = self.inner.table_if_ready().ok_or(YdbError::NoSession)?;
207 let inner = YdbTransaction::new(table, tx_control);
208 Ok(YdbExecutor {retry: self.retry, inner, log_options })
209 }
210 pub fn scheme_executor(&mut self) -> Result<YdbSchemeExecutor<'_>, YdbError> {
213 let log_options = self.log_options;
214 let inner = self.inner.table_if_ready().ok_or(YdbError::NoSession)?;
215 Ok(YdbSchemeExecutor{ inner, log_options })
216 }
217 pub async fn reconnect(&mut self) -> Result<(), sqlx_core::Error> {
220 let conn = self.options.connect().await?;
221 *self = conn;
222 Ok(())
223 }
224}
225
226pub struct YdbTransactionManager;
227
228impl TransactionManager for YdbTransactionManager {
229 type Database = Ydb;
230
231 fn begin(conn: &mut YdbConnection) -> BoxFuture<'_, Result<(), sqlx_core::Error>> {Box::pin(async{
232 let tx_settings = Some(TransactionSettings{tx_mode: Some(TxMode::SerializableReadWrite(Default::default()))});
233 let response = conn.inner.table().await?.begin_transaction(BeginTransactionRequest{tx_settings, ..Default::default()}).await?;
234 let tx_id = response.into_inner().result().map_err(YdbError::from)?.tx_meta.unwrap().id;
235 conn.tx_control = TransactionControl{commit_tx: false, tx_selector: Some(TxSelector::TxId(tx_id))};
236 Ok(())
237 })}
238
239 fn commit(conn: &mut YdbConnection) -> BoxFuture<'_, Result<(), sqlx_core::Error>> { Box::pin(async {
240 conn.executor()?.inner.commit_inner().await?;
241 conn.tx_control = default_tx_control();
242 Ok(())
243 })}
244
245 fn rollback(conn: &mut YdbConnection) -> BoxFuture<'_, Result<(), sqlx_core::Error>> { Box::pin(async {
246 conn.executor()?.inner.rollback_inner().await?;
247 conn.tx_control = default_tx_control();
248 Ok(())
249 })}
250
251 fn start_rollback(conn: &mut YdbConnection) {
252 conn.tx_control = default_tx_control();
253 log::error!("start_rollback method is unimplemented");
254 }
255}