tokio_opengauss/
client.rs

1use crate::codec::BackendMessages;
2use crate::config::{Host, SslMode};
3use crate::connection::{Request, RequestMessages};
4use crate::copy_out::CopyOutStream;
5use crate::query::RowStream;
6use crate::simple_query::SimpleQueryStream;
7#[cfg(feature = "runtime")]
8use crate::tls::MakeTlsConnect;
9use crate::tls::TlsConnect;
10use crate::types::{Oid, ToSql, Type};
11#[cfg(feature = "runtime")]
12use crate::Socket;
13use crate::{
14    copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken, CopyInSink, Error,
15    Row, SimpleQueryMessage, Statement, ToStatement, Transaction, TransactionBuilder,
16};
17use bytes::{Buf, BytesMut};
18use fallible_iterator::FallibleIterator;
19use futures::channel::mpsc;
20use futures::{future, pin_mut, ready, StreamExt, TryStreamExt};
21use parking_lot::Mutex;
22use opengauss_protocol::message::backend::Message;
23use opengauss_types::BorrowToSql;
24use std::collections::HashMap;
25use std::fmt;
26use std::sync::Arc;
27use std::task::{Context, Poll};
28use std::time::Duration;
29use tokio::io::{AsyncRead, AsyncWrite};
30
31pub struct Responses {
32    receiver: mpsc::Receiver<BackendMessages>,
33    cur: BackendMessages,
34}
35
36impl Responses {
37    pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Message, Error>> {
38        loop {
39            match self.cur.next().map_err(Error::parse)? {
40                Some(Message::ErrorResponse(body)) => return Poll::Ready(Err(Error::db(body))),
41                Some(message) => return Poll::Ready(Ok(message)),
42                None => {}
43            }
44
45            match ready!(self.receiver.poll_next_unpin(cx)) {
46                Some(messages) => self.cur = messages,
47                None => return Poll::Ready(Err(Error::closed())),
48            }
49        }
50    }
51
52    pub async fn next(&mut self) -> Result<Message, Error> {
53        future::poll_fn(|cx| self.poll_next(cx)).await
54    }
55}
56
57/// A cache of type info and prepared statements for fetching type info
58/// (corresponding to the queries in the [prepare](prepare) module).
59#[derive(Default)]
60struct CachedTypeInfo {
61    /// A statement for basic information for a type from its
62    /// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its
63    /// fallback).
64    typeinfo: Option<Statement>,
65    /// A statement for getting information for a composite type from its OID.
66    /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
67    typeinfo_composite: Option<Statement>,
68    /// A statement for getting information for a composite type from its OID.
69    /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or
70    /// its fallback).
71    typeinfo_enum: Option<Statement>,
72
73    /// Cache of types already looked up.
74    types: HashMap<Oid, Type>,
75}
76
77pub struct InnerClient {
78    sender: mpsc::UnboundedSender<Request>,
79    cached_typeinfo: Mutex<CachedTypeInfo>,
80
81    /// A buffer to use when writing out postgres commands.
82    buffer: Mutex<BytesMut>,
83}
84
85impl InnerClient {
86    pub fn send(&self, messages: RequestMessages) -> Result<Responses, Error> {
87        let (sender, receiver) = mpsc::channel(1);
88        let request = Request { messages, sender };
89        self.sender
90            .unbounded_send(request)
91            .map_err(|_| Error::closed())?;
92
93        Ok(Responses {
94            receiver,
95            cur: BackendMessages::empty(),
96        })
97    }
98
99    pub fn typeinfo(&self) -> Option<Statement> {
100        self.cached_typeinfo.lock().typeinfo.clone()
101    }
102
103    pub fn set_typeinfo(&self, statement: &Statement) {
104        self.cached_typeinfo.lock().typeinfo = Some(statement.clone());
105    }
106
107    pub fn typeinfo_composite(&self) -> Option<Statement> {
108        self.cached_typeinfo.lock().typeinfo_composite.clone()
109    }
110
111    pub fn set_typeinfo_composite(&self, statement: &Statement) {
112        self.cached_typeinfo.lock().typeinfo_composite = Some(statement.clone());
113    }
114
115    pub fn typeinfo_enum(&self) -> Option<Statement> {
116        self.cached_typeinfo.lock().typeinfo_enum.clone()
117    }
118
119    pub fn set_typeinfo_enum(&self, statement: &Statement) {
120        self.cached_typeinfo.lock().typeinfo_enum = Some(statement.clone());
121    }
122
123    pub fn type_(&self, oid: Oid) -> Option<Type> {
124        self.cached_typeinfo.lock().types.get(&oid).cloned()
125    }
126
127    pub fn set_type(&self, oid: Oid, type_: &Type) {
128        self.cached_typeinfo.lock().types.insert(oid, type_.clone());
129    }
130
131    pub fn clear_type_cache(&self) {
132        self.cached_typeinfo.lock().types.clear();
133    }
134
135    /// Call the given function with a buffer to be used when writing out
136    /// postgres commands.
137    pub fn with_buf<F, R>(&self, f: F) -> R
138    where
139        F: FnOnce(&mut BytesMut) -> R,
140    {
141        let mut buffer = self.buffer.lock();
142        let r = f(&mut buffer);
143        buffer.clear();
144        r
145    }
146}
147
148#[derive(Clone)]
149pub(crate) struct SocketConfig {
150    pub host: Host,
151    pub port: u16,
152    pub connect_timeout: Option<Duration>,
153    pub keepalives: bool,
154    pub keepalives_idle: Duration,
155}
156
157/// An asynchronous openGauss client.
158///
159/// The client is one half of what is returned when a connection is established. Users interact with the database
160/// through this client object.
161pub struct Client {
162    inner: Arc<InnerClient>,
163    #[cfg(feature = "runtime")]
164    socket_config: Option<SocketConfig>,
165    ssl_mode: SslMode,
166    process_id: i32,
167    secret_key: i32,
168}
169
170impl Client {
171    pub(crate) fn new(
172        sender: mpsc::UnboundedSender<Request>,
173        ssl_mode: SslMode,
174        process_id: i32,
175        secret_key: i32,
176    ) -> Client {
177        Client {
178            inner: Arc::new(InnerClient {
179                sender,
180                cached_typeinfo: Default::default(),
181                buffer: Default::default(),
182            }),
183            #[cfg(feature = "runtime")]
184            socket_config: None,
185            ssl_mode,
186            process_id,
187            secret_key,
188        }
189    }
190
191    pub(crate) fn inner(&self) -> &Arc<InnerClient> {
192        &self.inner
193    }
194
195    #[cfg(feature = "runtime")]
196    pub(crate) fn set_socket_config(&mut self, socket_config: SocketConfig) {
197        self.socket_config = Some(socket_config);
198    }
199
200    /// Creates a new prepared statement.
201    ///
202    /// Prepared statements can be executed repeatedly, and may contain query parameters (indicated by `$1`, `$2`, etc),
203    /// which are set when executed. Prepared statements can only be used with the connection that created them.
204    pub async fn prepare(&self, query: &str) -> Result<Statement, Error> {
205        self.prepare_typed(query, &[]).await
206    }
207
208    /// Like `prepare`, but allows the types of query parameters to be explicitly specified.
209    ///
210    /// The list of types may be smaller than the number of parameters - the types of the remaining parameters will be
211    /// inferred. For example, `client.prepare_typed(query, &[])` is equivalent to `client.prepare(query)`.
212    pub async fn prepare_typed(
213        &self,
214        query: &str,
215        parameter_types: &[Type],
216    ) -> Result<Statement, Error> {
217        prepare::prepare(&self.inner, query, parameter_types).await
218    }
219
220    /// Executes a statement, returning a vector of the resulting rows.
221    ///
222    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
223    /// provided, 1-indexed.
224    ///
225    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
226    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
227    /// with the `prepare` method.
228    ///
229    /// # Panics
230    ///
231    /// Panics if the number of parameters provided does not match the number expected.
232    pub async fn query<T>(
233        &self,
234        statement: &T,
235        params: &[&(dyn ToSql + Sync)],
236    ) -> Result<Vec<Row>, Error>
237    where
238        T: ?Sized + ToStatement,
239    {
240        self.query_raw(statement, slice_iter(params))
241            .await?
242            .try_collect()
243            .await
244    }
245
246    /// Executes a statement which returns a single row, returning it.
247    ///
248    /// Returns an error if the query does not return exactly one row.
249    ///
250    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
251    /// provided, 1-indexed.
252    ///
253    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
254    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
255    /// with the `prepare` method.
256    ///
257    /// # Panics
258    ///
259    /// Panics if the number of parameters provided does not match the number expected.
260    pub async fn query_one<T>(
261        &self,
262        statement: &T,
263        params: &[&(dyn ToSql + Sync)],
264    ) -> Result<Row, Error>
265    where
266        T: ?Sized + ToStatement,
267    {
268        let stream = self.query_raw(statement, slice_iter(params)).await?;
269        pin_mut!(stream);
270
271        let row = match stream.try_next().await? {
272            Some(row) => row,
273            None => return Err(Error::row_count()),
274        };
275
276        if stream.try_next().await?.is_some() {
277            return Err(Error::row_count());
278        }
279
280        Ok(row)
281    }
282
283    /// Executes a statements which returns zero or one rows, returning it.
284    ///
285    /// Returns an error if the query returns more than one row.
286    ///
287    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
288    /// provided, 1-indexed.
289    ///
290    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
291    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
292    /// with the `prepare` method.
293    ///
294    /// # Panics
295    ///
296    /// Panics if the number of parameters provided does not match the number expected.
297    pub async fn query_opt<T>(
298        &self,
299        statement: &T,
300        params: &[&(dyn ToSql + Sync)],
301    ) -> Result<Option<Row>, Error>
302    where
303        T: ?Sized + ToStatement,
304    {
305        let stream = self.query_raw(statement, slice_iter(params)).await?;
306        pin_mut!(stream);
307
308        let row = match stream.try_next().await? {
309            Some(row) => row,
310            None => return Ok(None),
311        };
312
313        if stream.try_next().await?.is_some() {
314            return Err(Error::row_count());
315        }
316
317        Ok(Some(row))
318    }
319
320    /// The maximally flexible version of [`query`].
321    ///
322    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
323    /// provided, 1-indexed.
324    ///
325    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
326    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
327    /// with the `prepare` method.
328    ///
329    /// # Panics
330    ///
331    /// Panics if the number of parameters provided does not match the number expected.
332    ///
333    /// [`query`]: #method.query
334    ///
335    /// # Examples
336    ///
337    /// ```no_run
338    /// # async fn async_main(client: &tokio_opengauss::Client) -> Result<(), tokio_opengauss::Error> {
339    /// use tokio_opengauss::types::ToSql;
340    /// use futures::{pin_mut, TryStreamExt};
341    ///
342    /// let params: Vec<String> = vec![
343    ///     "first param".into(),
344    ///     "second param".into(),
345    /// ];
346    /// let mut it = client.query_raw(
347    ///     "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
348    ///     params,
349    /// ).await?;
350    ///
351    /// pin_mut!(it);
352    /// while let Some(row) = it.try_next().await? {
353    ///     let foo: i32 = row.get("foo");
354    ///     println!("foo: {}", foo);
355    /// }
356    /// # Ok(())
357    /// # }
358    /// ```
359    pub async fn query_raw<T, P, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
360    where
361        T: ?Sized + ToStatement,
362        P: BorrowToSql,
363        I: IntoIterator<Item = P>,
364        I::IntoIter: ExactSizeIterator,
365    {
366        let statement = statement.__convert().into_statement(self).await?;
367        query::query(&self.inner, statement, params).await
368    }
369
370    /// Executes a statement, returning the number of rows modified.
371    ///
372    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
373    /// provided, 1-indexed.
374    ///
375    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
376    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
377    /// with the `prepare` method.
378    ///
379    /// If the statement does not modify any rows (e.g. `SELECT`), 0 is returned.
380    ///
381    /// # Panics
382    ///
383    /// Panics if the number of parameters provided does not match the number expected.
384    pub async fn execute<T>(
385        &self,
386        statement: &T,
387        params: &[&(dyn ToSql + Sync)],
388    ) -> Result<u64, Error>
389    where
390        T: ?Sized + ToStatement,
391    {
392        self.execute_raw(statement, slice_iter(params)).await
393    }
394
395    /// The maximally flexible version of [`execute`].
396    ///
397    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
398    /// provided, 1-indexed.
399    ///
400    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
401    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
402    /// with the `prepare` method.
403    ///
404    /// # Panics
405    ///
406    /// Panics if the number of parameters provided does not match the number expected.
407    ///
408    /// [`execute`]: #method.execute
409    pub async fn execute_raw<T, P, I>(&self, statement: &T, params: I) -> Result<u64, Error>
410    where
411        T: ?Sized + ToStatement,
412        P: BorrowToSql,
413        I: IntoIterator<Item = P>,
414        I::IntoIter: ExactSizeIterator,
415    {
416        let statement = statement.__convert().into_statement(self).await?;
417        query::execute(self.inner(), statement, params).await
418    }
419
420    /// Executes a `COPY FROM STDIN` statement, returning a sink used to write the copy data.
421    ///
422    /// openGauss does not support parameters in `COPY` statements, so this method does not take any. The copy *must*
423    /// be explicitly completed via the `Sink::close` or `finish` methods. If it is not, the copy will be aborted.
424    ///
425    /// # Panics
426    ///
427    /// Panics if the statement contains parameters.
428    pub async fn copy_in<T, U>(&self, statement: &T) -> Result<CopyInSink<U>, Error>
429    where
430        T: ?Sized + ToStatement,
431        U: Buf + 'static + Send,
432    {
433        let statement = statement.__convert().into_statement(self).await?;
434        copy_in::copy_in(self.inner(), statement).await
435    }
436
437    /// Executes a `COPY TO STDOUT` statement, returning a stream of the resulting data.
438    ///
439    /// openGauss does not support parameters in `COPY` statements, so this method does not take any.
440    ///
441    /// # Panics
442    ///
443    /// Panics if the statement contains parameters.
444    pub async fn copy_out<T>(&self, statement: &T) -> Result<CopyOutStream, Error>
445    where
446        T: ?Sized + ToStatement,
447    {
448        let statement = statement.__convert().into_statement(self).await?;
449        copy_out::copy_out(self.inner(), statement).await
450    }
451
452    /// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
453    ///
454    /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
455    /// point. The simple query protocol returns the values in rows as strings rather than in their binary encodings,
456    /// so the associated row type doesn't work with the `FromSql` trait. Rather than simply returning a list of the
457    /// rows, this method returns a list of an enum which indicates either the completion of one of the commands,
458    /// or a row of data. This preserves the framing between the separate statements in the request.
459    ///
460    /// # Warning
461    ///
462    /// Prepared statements should be use for any query which contains user-specified data, as they provided the
463    /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
464    /// them to this method!
465    pub async fn simple_query(&self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
466        self.simple_query_raw(query).await?.try_collect().await
467    }
468
469    pub(crate) async fn simple_query_raw(&self, query: &str) -> Result<SimpleQueryStream, Error> {
470        simple_query::simple_query(self.inner(), query).await
471    }
472
473    /// Executes a sequence of SQL statements using the simple query protocol.
474    ///
475    /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
476    /// point. This is intended for use when, for example, initializing a database schema.
477    ///
478    /// # Warning
479    ///
480    /// Prepared statements should be use for any query which contains user-specified data, as they provided the
481    /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
482    /// them to this method!
483    pub async fn batch_execute(&self, query: &str) -> Result<(), Error> {
484        simple_query::batch_execute(self.inner(), query).await
485    }
486
487    /// Begins a new database transaction.
488    ///
489    /// The transaction will roll back by default - use the `commit` method to commit it.
490    pub async fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
491        self.batch_execute("BEGIN").await?;
492        Ok(Transaction::new(self))
493    }
494
495    /// Returns a builder for a transaction with custom settings.
496    ///
497    /// Unlike the `transaction` method, the builder can be used to control the transaction's isolation level and other
498    /// attributes.
499    pub fn build_transaction(&mut self) -> TransactionBuilder<'_> {
500        TransactionBuilder::new(self)
501    }
502
503    /// Constructs a cancellation token that can later be used to request cancellation of a query running on the
504    /// connection associated with this client.
505    pub fn cancel_token(&self) -> CancelToken {
506        CancelToken {
507            #[cfg(feature = "runtime")]
508            socket_config: self.socket_config.clone(),
509            ssl_mode: self.ssl_mode,
510            process_id: self.process_id,
511            secret_key: self.secret_key,
512        }
513    }
514
515    /// Attempts to cancel an in-progress query.
516    ///
517    /// The server provides no information about whether a cancellation attempt was successful or not. An error will
518    /// only be returned if the client was unable to connect to the database.
519    ///
520    /// Requires the `runtime` Cargo feature (enabled by default).
521    #[cfg(feature = "runtime")]
522    #[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
523    pub async fn cancel_query<T>(&self, tls: T) -> Result<(), Error>
524    where
525        T: MakeTlsConnect<Socket>,
526    {
527        self.cancel_token().cancel_query(tls).await
528    }
529
530    /// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new
531    /// connection itself.
532    #[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
533    pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error>
534    where
535        S: AsyncRead + AsyncWrite + Unpin,
536        T: TlsConnect<S>,
537    {
538        self.cancel_token().cancel_query_raw(stream, tls).await
539    }
540
541    /// Clears the client's type information cache.
542    ///
543    /// When user-defined types are used in a query, the client loads their definitions from the database and caches
544    /// them for the lifetime of the client. If those definitions are changed in the database, this method can be used
545    /// to flush the local cache and allow the new, updated definitions to be loaded.
546    pub fn clear_type_cache(&self) {
547        self.inner().clear_type_cache();
548    }
549
550    /// Determines if the connection to the server has already closed.
551    ///
552    /// In that case, all future queries will fail.
553    pub fn is_closed(&self) -> bool {
554        self.inner.sender.is_closed()
555    }
556
557    #[doc(hidden)]
558    pub fn __private_api_close(&mut self) {
559        self.inner.sender.close_channel()
560    }
561}
562
563impl fmt::Debug for Client {
564    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
565        f.debug_struct("Client").finish()
566    }
567}