tarantool_rs/client/
connection.rs

1use std::{
2    fmt,
3    num::NonZeroUsize,
4    sync::{
5        atomic::{AtomicU32, Ordering},
6        Arc,
7    },
8    time::Duration,
9};
10
11use async_trait::async_trait;
12use futures::TryFutureExt;
13use lru::LruCache;
14use parking_lot::Mutex;
15use rmpv::Value;
16use tokio::time::timeout;
17use tracing::{debug, trace};
18
19use crate::{
20    builder::ConnectionBuilder,
21    client::{Executor, Stream, Transaction, TransactionBuilder},
22    codec::{
23        consts::TransactionIsolationLevel,
24        request::{EncodedRequest, Id, Request},
25        response::ResponseBody,
26    },
27    transport::DispatcherSender,
28    ExecutorExt, Result,
29};
30
31/// Connection to Tarantool instance.
32///
33/// This type doesn't represent single TCP connection, but rather an abstraction
34/// for interaction with Tarantool instance.
35///
36/// Underling implemenation could reconnect automatically (depending on builder configuration),
37/// and could do pooling in the future (not yet implemented!).
38#[derive(Clone)]
39pub struct Connection {
40    inner: Arc<ConnectionInner>,
41}
42
43struct ConnectionInner {
44    dispatcher_sender: DispatcherSender,
45    // TODO: change how stream id assigned when dispatcher have more than one connection
46    next_stream_id: AtomicU32,
47    timeout: Option<Duration>,
48    transaction_timeout_secs: Option<f64>,
49    transaction_isolation_level: TransactionIsolationLevel,
50    async_rt_handle: tokio::runtime::Handle,
51    // TODO: tests
52    // TODO: move sql statement cache to separate type
53    sql_statement_cache: Option<Mutex<LruCache<String, u64>>>,
54    sql_statement_cache_update_lock: Mutex<()>,
55}
56
57impl Connection {
58    /// Create new [`ConnectionBuilder`].
59    pub fn builder() -> ConnectionBuilder {
60        ConnectionBuilder::default()
61    }
62
63    pub(crate) fn new(
64        dispatcher_sender: DispatcherSender,
65        timeout: Option<Duration>,
66        transaction_timeout: Option<Duration>,
67        transaction_isolation_level: TransactionIsolationLevel,
68        sql_statement_cache_capacity: usize,
69    ) -> Self {
70        Self {
71            inner: Arc::new(ConnectionInner {
72                dispatcher_sender,
73                // TODO: check if 0 is valid value
74                next_stream_id: AtomicU32::new(1),
75                timeout,
76                transaction_timeout_secs: transaction_timeout.as_ref().map(Duration::as_secs_f64),
77                transaction_isolation_level,
78                // NOTE: Safety: this method can be called only in async tokio context (because it
79                // is called only from ConnectionBuilder).
80                async_rt_handle: tokio::runtime::Handle::current(),
81                sql_statement_cache: NonZeroUsize::new(sql_statement_cache_capacity)
82                    .map(|x| Mutex::new(LruCache::new(x))),
83                sql_statement_cache_update_lock: Mutex::new(()),
84            }),
85        }
86    }
87
88    /// Synchronously send request to channel and drop response.
89    #[allow(clippy::let_underscore_future)]
90    pub(crate) fn send_request_sync_and_forget(&self, body: impl Request, stream_id: Option<u32>) {
91        let this = self.clone();
92        let req = EncodedRequest::new(body, stream_id);
93        let _ = self.inner.async_rt_handle.spawn(async move {
94            let res = futures::future::ready(req)
95                .err_into()
96                .and_then(|x| this.send_encoded_request(x))
97                .await;
98            debug!("Response for background request: {:?}", res);
99        });
100    }
101
102    // TODO: maybe other Ordering??
103    pub(crate) fn next_stream_id(&self) -> u32 {
104        let next = self.inner.next_stream_id.fetch_add(1, Ordering::Relaxed);
105        if next != 0 {
106            next
107        } else {
108            self.inner.next_stream_id.fetch_add(1, Ordering::Relaxed)
109        }
110    }
111
112    // TODO: return response from server
113    /// Send ID request ([docs](https://www.tarantool.io/en/doc/latest/dev_guide/internals/box_protocol/#iproto-id-0x49)).
114    pub(crate) async fn id(&self, features: Id) -> Result<()> {
115        self.send_request(features).await.map(drop)
116    }
117
118    pub(crate) fn stream(&self) -> Stream {
119        Stream::new(self.clone())
120    }
121
122    /// Create transaction, overriding default connection's parameters.
123    pub(crate) fn transaction_builder(&self) -> TransactionBuilder {
124        TransactionBuilder::new(
125            self.clone(),
126            self.inner.transaction_timeout_secs,
127            self.inner.transaction_isolation_level,
128        )
129    }
130
131    /// Create transaction.
132    pub(crate) async fn transaction(&self) -> Result<Transaction> {
133        self.transaction_builder().begin().await
134    }
135
136    /// Get prepared statement id from cache (if it is enabled).
137    ///
138    /// If statement not present in cache, then prepare statement and put it
139    /// to cache.
140    ///
141    /// Only one statement can be prepared at the time. All other will immediately
142    /// return None, when there is already a statement being prepared. Eventually
143    /// all statements should be allowed to prepare.
144    async fn get_cached_sql_statement_id_inner(&self, statement: &str) -> Option<u64> {
145        // Lock cache mutex (if cache is not None) and check
146        // if statement present in cache.
147        let cache = self.inner.sql_statement_cache.as_ref()?;
148        if let Some(stmt_id) = cache.lock().get(statement) {
149            return Some(*stmt_id);
150        }
151
152        // If statement not found, try to lock update lock mutex.
153        // If successful, proceed with preparing SQL statement,
154        // otherwise return None.
155        let update_lock = self.inner.sql_statement_cache_update_lock.try_lock();
156        let stmt_id = {
157            let stmt_id = match self.prepare_sql(statement).await {
158                Ok(x) => {
159                    let stmt_id = x.stmt_id();
160                    trace!(statement, "Statement prepared with id {stmt_id}");
161                    stmt_id
162                }
163                Err(err) => {
164                    debug!("Failed to prepare statement for cache: {:#}", err);
165                    return None;
166                }
167            };
168            let _ = cache.lock().put(statement.into(), stmt_id);
169            stmt_id
170        };
171        drop(update_lock);
172
173        Some(stmt_id)
174    }
175}
176
177#[async_trait]
178impl Executor for Connection {
179    async fn send_encoded_request(&self, request: EncodedRequest) -> Result<Value> {
180        let fut = self.inner.dispatcher_sender.send(request);
181        let resp = match self.inner.timeout {
182            Some(x) => timeout(x, fut).await??,
183            None => fut.await?,
184        };
185        match resp.body {
186            ResponseBody::Ok(x) => Ok(x),
187            ResponseBody::Error(x) => Err(x.into()),
188        }
189    }
190
191    fn stream(&self) -> Stream {
192        self.stream()
193    }
194
195    fn transaction_builder(&self) -> TransactionBuilder {
196        self.transaction_builder()
197    }
198
199    async fn transaction(&self) -> Result<Transaction> {
200        self.transaction().await
201    }
202
203    async fn get_cached_sql_statement_id(&self, statement: &str) -> Option<u64> {
204        self.get_cached_sql_statement_id_inner(statement).await
205    }
206}
207
208impl fmt::Debug for Connection {
209    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210        write!(f, "Connection")
211    }
212}