tarantool_rs/client/
connection.rs1use std::{
2 fmt,
3 num::NonZeroUsize,
4 sync::{
5 atomic::{AtomicU32, Ordering},
6 Arc,
7 },
8 time::Duration,
9};
10
11use async_trait::async_trait;
12use futures::TryFutureExt;
13use lru::LruCache;
14use parking_lot::Mutex;
15use rmpv::Value;
16use tokio::time::timeout;
17use tracing::{debug, trace};
18
19use crate::{
20 builder::ConnectionBuilder,
21 client::{Executor, Stream, Transaction, TransactionBuilder},
22 codec::{
23 consts::TransactionIsolationLevel,
24 request::{EncodedRequest, Id, Request},
25 response::ResponseBody,
26 },
27 transport::DispatcherSender,
28 ExecutorExt, Result,
29};
30
31#[derive(Clone)]
39pub struct Connection {
40 inner: Arc<ConnectionInner>,
41}
42
43struct ConnectionInner {
44 dispatcher_sender: DispatcherSender,
45 next_stream_id: AtomicU32,
47 timeout: Option<Duration>,
48 transaction_timeout_secs: Option<f64>,
49 transaction_isolation_level: TransactionIsolationLevel,
50 async_rt_handle: tokio::runtime::Handle,
51 sql_statement_cache: Option<Mutex<LruCache<String, u64>>>,
54 sql_statement_cache_update_lock: Mutex<()>,
55}
56
57impl Connection {
58 pub fn builder() -> ConnectionBuilder {
60 ConnectionBuilder::default()
61 }
62
63 pub(crate) fn new(
64 dispatcher_sender: DispatcherSender,
65 timeout: Option<Duration>,
66 transaction_timeout: Option<Duration>,
67 transaction_isolation_level: TransactionIsolationLevel,
68 sql_statement_cache_capacity: usize,
69 ) -> Self {
70 Self {
71 inner: Arc::new(ConnectionInner {
72 dispatcher_sender,
73 next_stream_id: AtomicU32::new(1),
75 timeout,
76 transaction_timeout_secs: transaction_timeout.as_ref().map(Duration::as_secs_f64),
77 transaction_isolation_level,
78 async_rt_handle: tokio::runtime::Handle::current(),
81 sql_statement_cache: NonZeroUsize::new(sql_statement_cache_capacity)
82 .map(|x| Mutex::new(LruCache::new(x))),
83 sql_statement_cache_update_lock: Mutex::new(()),
84 }),
85 }
86 }
87
88 #[allow(clippy::let_underscore_future)]
90 pub(crate) fn send_request_sync_and_forget(&self, body: impl Request, stream_id: Option<u32>) {
91 let this = self.clone();
92 let req = EncodedRequest::new(body, stream_id);
93 let _ = self.inner.async_rt_handle.spawn(async move {
94 let res = futures::future::ready(req)
95 .err_into()
96 .and_then(|x| this.send_encoded_request(x))
97 .await;
98 debug!("Response for background request: {:?}", res);
99 });
100 }
101
102 pub(crate) fn next_stream_id(&self) -> u32 {
104 let next = self.inner.next_stream_id.fetch_add(1, Ordering::Relaxed);
105 if next != 0 {
106 next
107 } else {
108 self.inner.next_stream_id.fetch_add(1, Ordering::Relaxed)
109 }
110 }
111
112 pub(crate) async fn id(&self, features: Id) -> Result<()> {
115 self.send_request(features).await.map(drop)
116 }
117
118 pub(crate) fn stream(&self) -> Stream {
119 Stream::new(self.clone())
120 }
121
122 pub(crate) fn transaction_builder(&self) -> TransactionBuilder {
124 TransactionBuilder::new(
125 self.clone(),
126 self.inner.transaction_timeout_secs,
127 self.inner.transaction_isolation_level,
128 )
129 }
130
131 pub(crate) async fn transaction(&self) -> Result<Transaction> {
133 self.transaction_builder().begin().await
134 }
135
136 async fn get_cached_sql_statement_id_inner(&self, statement: &str) -> Option<u64> {
145 let cache = self.inner.sql_statement_cache.as_ref()?;
148 if let Some(stmt_id) = cache.lock().get(statement) {
149 return Some(*stmt_id);
150 }
151
152 let update_lock = self.inner.sql_statement_cache_update_lock.try_lock();
156 let stmt_id = {
157 let stmt_id = match self.prepare_sql(statement).await {
158 Ok(x) => {
159 let stmt_id = x.stmt_id();
160 trace!(statement, "Statement prepared with id {stmt_id}");
161 stmt_id
162 }
163 Err(err) => {
164 debug!("Failed to prepare statement for cache: {:#}", err);
165 return None;
166 }
167 };
168 let _ = cache.lock().put(statement.into(), stmt_id);
169 stmt_id
170 };
171 drop(update_lock);
172
173 Some(stmt_id)
174 }
175}
176
177#[async_trait]
178impl Executor for Connection {
179 async fn send_encoded_request(&self, request: EncodedRequest) -> Result<Value> {
180 let fut = self.inner.dispatcher_sender.send(request);
181 let resp = match self.inner.timeout {
182 Some(x) => timeout(x, fut).await??,
183 None => fut.await?,
184 };
185 match resp.body {
186 ResponseBody::Ok(x) => Ok(x),
187 ResponseBody::Error(x) => Err(x.into()),
188 }
189 }
190
191 fn stream(&self) -> Stream {
192 self.stream()
193 }
194
195 fn transaction_builder(&self) -> TransactionBuilder {
196 self.transaction_builder()
197 }
198
199 async fn transaction(&self) -> Result<Transaction> {
200 self.transaction().await
201 }
202
203 async fn get_cached_sql_statement_id(&self, statement: &str) -> Option<u64> {
204 self.get_cached_sql_statement_id_inner(statement).await
205 }
206}
207
208impl fmt::Debug for Connection {
209 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210 write!(f, "Connection")
211 }
212}