ydb_unofficial/sqlx/
connection.rs

1use std::str::FromStr;
2use std::time::Duration;
3use super::YdbError;
4use super::database::Ydb;
5use super::executor::{YdbExecutor, YdbSchemeExecutor};
6use futures::Future;
7use sqlx_core::transaction::{Transaction, TransactionManager};
8use sqlx_core::pool::MaybePoolConnection;
9use tonic::codegen::futures_core::future::BoxFuture;
10use sqlx_core::connection::{ConnectOptions, Connection};
11use ydb_grpc_bindings::generated::ydb;
12use ydb::table::{TransactionControl, BeginTransactionRequest};
13use ydb::table::transaction_control::TxSelector;
14use ydb::table::TransactionSettings;
15use ydb::table::transaction_settings::TxMode;
16use crate::{AsciiValue, YdbTransaction};
17use crate::auth::UpdatableToken;
18use crate::client::YdbEndpoint;
19
20use crate::payload::YdbResponseWithResult;
21
22pub struct YdbConnection {
23    inner: crate::YdbConnection<UpdatableToken>,
24    options: YdbConnectOptions,
25    tx_control: TransactionControl,
26    log_options: LogOptions,
27    pub retry: bool,
28}
29#[derive(Default, Clone, Copy, Debug)]
30pub struct LogOptions {
31    level: Option<log::Level>,
32    slow: Option<(log::Level, Duration)>
33}
34
35impl LogOptions {
36    pub async fn wrap<F: Future>(self, msg: &str, f: F) -> F::Output {
37        if let Some(level) = self.level {
38            log::log!(level, "{}", msg);
39        }
40        let start = std::time::Instant::now();
41        let result = f.await;
42        if let Some((level, duration)) = self.slow {
43            if start.elapsed() > duration {
44                log::log!(level, "{} execution time exeeded", msg);
45            }
46        }
47        result
48    }
49}
50
51#[derive(Debug, Clone)]
52pub struct YdbConnectOptions {
53    endpoint: YdbEndpoint,
54    db_name: AsciiValue,
55    creds: UpdatableToken,
56    log_options: LogOptions,
57}
58
59impl YdbConnectOptions {
60    pub fn with_creds(mut self, creds: UpdatableToken) -> Self {
61        self.creds = creds;
62        self
63    }
64}
65
66impl FromStr for YdbConnectOptions {
67    type Err = sqlx_core::Error;
68
69    fn from_str(s: &str) -> Result<Self, Self::Err> {
70        let url = sqlx_core::Url::try_from(s).map_err(|e|sqlx_core::Error::Configuration(format!("Cannot parse connection string as url: {e}").into()))?;
71        Self::from_url(&url)
72    }
73}
74
75#[test]
76fn test_conn_options_from_str() {
77    let options = YdbConnectOptions::from_str("ydbs://ydb.serverless.yandexcloud.net:2135/ru-central1/some-anfslndundf908/234ndfnsdkjf").unwrap();
78    assert!(options.endpoint.ssl);
79    assert_eq!(options.endpoint.host, "ydb.serverless.yandexcloud.net");
80    assert_eq!(options.endpoint.port, 2135);
81    assert_eq!(options.db_name.as_bytes(), "/ru-central1/some-anfslndundf908/234ndfnsdkjf".as_bytes());
82}
83
84fn default_tx_control() -> TransactionControl {
85    TransactionControl { 
86        commit_tx: true, 
87        tx_selector: Some(TxSelector::BeginTx(TransactionSettings { 
88            //TODO: продумать разные варианты TxMode
89            tx_mode: Some(TxMode::SerializableReadWrite(Default::default())) 
90        }))
91    }
92}
93
94impl ConnectOptions for YdbConnectOptions {
95    type Connection = YdbConnection;
96
97    fn from_url(url: &sqlx_core::Url) -> Result<Self, sqlx_core::Error> {
98        use sqlx_core::Error::Configuration as ConfErr;
99        let ssl = match url.scheme() {
100            "ydb" | "grpc" => false,
101            "ydbs" | "grpcs" => true,
102            _ => return Err(ConfErr("Unknown scheme".into()))
103        };
104        let host = url.host_str().ok_or(ConfErr("no host".into()))?.into();
105        let port = url.port().ok_or(ConfErr("no port".into()))?;
106        let mut db_name = url.path().try_into().map_err(|e|ConfErr(format!("cannot parse database name: {e}").into()))?;
107        let endpoint = YdbEndpoint { ssl, host, port, load_factor: 0.0 };
108        let mut creds = UpdatableToken::new("".try_into().unwrap());
109        for (k,v) in url.query_pairs() {
110            match k.as_ref() {
111                "database" => {
112                    db_name = v.as_ref().try_into().map_err(|e|ConfErr(format!("cannot parse database name: {e}").into()))?;
113                }
114                "token" => {
115                    let token = v.as_ref().try_into().map_err(|e|ConfErr(format!("cannot parse token: {e}").into()))?;
116                    creds = UpdatableToken::new(token);
117                    break;
118                }
119                #[cfg(feature = "auth-sa")]
120                "sa-key" => {
121                    let file = std::fs::File::open(v.as_ref()).map_err(|e|ConfErr(format!("cannot open sa file: {e}").into()))?;
122                    use crate::auth::sa::*;
123                    let key: ServiceAccountKey = serde_json::from_reader(file).map_err(|e|ConfErr(format!("cannot parse sa file: {e}").into()))?;
124                    creds = futures::executor::block_on(async {
125                        ServiceAccountCredentials::create(key).await
126                    })
127                    .map_err(YdbError::from)?.into();
128                    break;
129                }
130                _ => {}
131            }
132        };
133        Ok(Self{endpoint, db_name, creds, log_options: Default::default()})
134    }
135
136    fn connect(&self) -> BoxFuture<'_, Result<Self::Connection, sqlx_core::Error>>
137    where
138        Self::Connection: Sized { //TODO: реализовать подключение к разным эндпойнтам из discovery (чтобы pool подключался как надо)
139        let channel = self.endpoint.make_endpoint().connect_lazy();
140        let mut inner = crate::YdbConnection::new(channel, self.db_name.clone(), self.creds.clone());
141        let tx_control = default_tx_control();
142        let log_options = self.log_options;
143        Box::pin(async move {
144            let _ = inner.table().await?;
145            Ok(YdbConnection { inner, options: self.clone(), tx_control, log_options, retry: true })
146        })
147    }
148
149    fn log_statements(mut self, level: log::LevelFilter) -> Self {
150        if let Some(level) = level.to_level() {
151            self.log_options.level = Some(level);
152        } else {
153            self.log_options.level = None;
154        }
155        self
156    }
157
158    fn log_slow_statements(mut self, level: log::LevelFilter, duration: std::time::Duration) -> Self {
159        if let Some(level) = level.to_level() {
160            self.log_options.slow = Some((level, duration));
161        } else {
162            self.log_options.slow = None;
163        }
164        self
165    }
166}
167
168impl Connection for YdbConnection {
169    type Database = Ydb;
170
171    type Options = YdbConnectOptions;
172
173    fn close(mut self) -> BoxFuture<'static, Result<(), sqlx_core::Error>> {
174        Box::pin(async move{
175            self.inner.close_session().await?;
176            Ok(())
177        })
178    }
179
180    fn close_hard(self) -> BoxFuture<'static, Result<(), sqlx_core::Error>> {
181        self.inner.close_session_hard();
182        Box::pin(async {Ok(())})
183    }
184
185    fn ping(&mut self) -> BoxFuture<'_, Result<(), sqlx_core::Error>> { Box::pin( async {
186        self.inner.table() //коль скоро мы в асинхронной функции, можем и восстановить сессию. Поэтому table()
187            .await?.keep_alive(Default::default()).await?;
188        Ok(())
189    })}
190
191    fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Ydb>, sqlx_core::Error>> where Self: Sized {
192        Transaction::begin(MaybePoolConnection::Connection(self))
193    }
194    fn shrink_buffers(&mut self) {}
195    fn flush(&mut self) -> BoxFuture<'_, Result<(), sqlx_core::Error>> {
196        Box::pin(futures::future::ok(()))
197    }
198    fn should_flush(&self) -> bool {false}
199}
200
201impl YdbConnection {
202    /// Retrieve DML executor, that can select/insert/update values in existing tables, but cannot modify their definitions
203    pub fn executor(&mut self) -> Result<YdbExecutor<'_>, YdbError> {
204        let tx_control = self.tx_control.clone();
205        let log_options = self.log_options;
206        let table = self.inner.table_if_ready().ok_or(YdbError::NoSession)?;
207        let inner = YdbTransaction::new(table, tx_control);
208        Ok(YdbExecutor {retry: self.retry, inner, log_options })
209    }
210    /// Retrieve DDL executor, that makes operations on tables (create, delete, replace tables/indexes/etc).
211    /// Note that DDL executor cannot fetch results, prepare and describe (never can used in sqlx macro). Parameter binding also unavailable
212    pub fn scheme_executor(&mut self) -> Result<YdbSchemeExecutor<'_>, YdbError> {
213        let log_options = self.log_options;
214        let inner = self.inner.table_if_ready().ok_or(YdbError::NoSession)?;
215        Ok(YdbSchemeExecutor{ inner, log_options })
216    }
217    /// Reconnect to Ydb if received [YdbError::NoSession] received
218    /// Sometimes Ydb service can invalidate connection with Session. An if you use single connection, you need to reconnect them
219    pub async fn reconnect(&mut self) -> Result<(), sqlx_core::Error> {
220        let conn = self.options.connect().await?;
221        *self = conn;
222        Ok(())
223    }
224}
225
226pub struct YdbTransactionManager;
227
228impl TransactionManager for YdbTransactionManager {
229    type Database = Ydb;
230
231    fn begin(conn: &mut YdbConnection) -> BoxFuture<'_, Result<(), sqlx_core::Error>> {Box::pin(async{
232        let tx_settings = Some(TransactionSettings{tx_mode: Some(TxMode::SerializableReadWrite(Default::default()))});
233        let response = conn.inner.table().await?.begin_transaction(BeginTransactionRequest{tx_settings, ..Default::default()}).await?;
234        let tx_id = response.into_inner().result().map_err(YdbError::from)?.tx_meta.unwrap().id;
235        conn.tx_control = TransactionControl{commit_tx: false, tx_selector: Some(TxSelector::TxId(tx_id))};
236        Ok(())
237    })}
238
239    fn commit(conn: &mut YdbConnection) -> BoxFuture<'_, Result<(), sqlx_core::Error>> { Box::pin(async { 
240        conn.executor()?.inner.commit_inner().await?;
241        conn.tx_control = default_tx_control();
242        Ok(())
243    })}
244
245    fn rollback(conn: &mut YdbConnection) -> BoxFuture<'_, Result<(), sqlx_core::Error>> { Box::pin(async {
246        conn.executor()?.inner.rollback_inner().await?;
247        conn.tx_control = default_tx_control();
248        Ok(())
249    })}
250
251    fn start_rollback(conn: &mut YdbConnection) {
252        conn.tx_control = default_tx_control();
253        log::error!("start_rollback method is unimplemented");
254    }
255}