tokio_opengauss/client.rs
1use crate::codec::BackendMessages;
2use crate::config::{Host, SslMode};
3use crate::connection::{Request, RequestMessages};
4use crate::copy_out::CopyOutStream;
5use crate::query::RowStream;
6use crate::simple_query::SimpleQueryStream;
7#[cfg(feature = "runtime")]
8use crate::tls::MakeTlsConnect;
9use crate::tls::TlsConnect;
10use crate::types::{Oid, ToSql, Type};
11#[cfg(feature = "runtime")]
12use crate::Socket;
13use crate::{
14 copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken, CopyInSink, Error,
15 Row, SimpleQueryMessage, Statement, ToStatement, Transaction, TransactionBuilder,
16};
17use bytes::{Buf, BytesMut};
18use fallible_iterator::FallibleIterator;
19use futures::channel::mpsc;
20use futures::{future, pin_mut, ready, StreamExt, TryStreamExt};
21use parking_lot::Mutex;
22use opengauss_protocol::message::backend::Message;
23use opengauss_types::BorrowToSql;
24use std::collections::HashMap;
25use std::fmt;
26use std::sync::Arc;
27use std::task::{Context, Poll};
28use std::time::Duration;
29use tokio::io::{AsyncRead, AsyncWrite};
30
31pub struct Responses {
32 receiver: mpsc::Receiver<BackendMessages>,
33 cur: BackendMessages,
34}
35
36impl Responses {
37 pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Message, Error>> {
38 loop {
39 match self.cur.next().map_err(Error::parse)? {
40 Some(Message::ErrorResponse(body)) => return Poll::Ready(Err(Error::db(body))),
41 Some(message) => return Poll::Ready(Ok(message)),
42 None => {}
43 }
44
45 match ready!(self.receiver.poll_next_unpin(cx)) {
46 Some(messages) => self.cur = messages,
47 None => return Poll::Ready(Err(Error::closed())),
48 }
49 }
50 }
51
52 pub async fn next(&mut self) -> Result<Message, Error> {
53 future::poll_fn(|cx| self.poll_next(cx)).await
54 }
55}
56
57/// A cache of type info and prepared statements for fetching type info
58/// (corresponding to the queries in the [prepare](prepare) module).
59#[derive(Default)]
60struct CachedTypeInfo {
61 /// A statement for basic information for a type from its
62 /// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its
63 /// fallback).
64 typeinfo: Option<Statement>,
65 /// A statement for getting information for a composite type from its OID.
66 /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
67 typeinfo_composite: Option<Statement>,
68 /// A statement for getting information for a composite type from its OID.
69 /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or
70 /// its fallback).
71 typeinfo_enum: Option<Statement>,
72
73 /// Cache of types already looked up.
74 types: HashMap<Oid, Type>,
75}
76
77pub struct InnerClient {
78 sender: mpsc::UnboundedSender<Request>,
79 cached_typeinfo: Mutex<CachedTypeInfo>,
80
81 /// A buffer to use when writing out postgres commands.
82 buffer: Mutex<BytesMut>,
83}
84
85impl InnerClient {
86 pub fn send(&self, messages: RequestMessages) -> Result<Responses, Error> {
87 let (sender, receiver) = mpsc::channel(1);
88 let request = Request { messages, sender };
89 self.sender
90 .unbounded_send(request)
91 .map_err(|_| Error::closed())?;
92
93 Ok(Responses {
94 receiver,
95 cur: BackendMessages::empty(),
96 })
97 }
98
99 pub fn typeinfo(&self) -> Option<Statement> {
100 self.cached_typeinfo.lock().typeinfo.clone()
101 }
102
103 pub fn set_typeinfo(&self, statement: &Statement) {
104 self.cached_typeinfo.lock().typeinfo = Some(statement.clone());
105 }
106
107 pub fn typeinfo_composite(&self) -> Option<Statement> {
108 self.cached_typeinfo.lock().typeinfo_composite.clone()
109 }
110
111 pub fn set_typeinfo_composite(&self, statement: &Statement) {
112 self.cached_typeinfo.lock().typeinfo_composite = Some(statement.clone());
113 }
114
115 pub fn typeinfo_enum(&self) -> Option<Statement> {
116 self.cached_typeinfo.lock().typeinfo_enum.clone()
117 }
118
119 pub fn set_typeinfo_enum(&self, statement: &Statement) {
120 self.cached_typeinfo.lock().typeinfo_enum = Some(statement.clone());
121 }
122
123 pub fn type_(&self, oid: Oid) -> Option<Type> {
124 self.cached_typeinfo.lock().types.get(&oid).cloned()
125 }
126
127 pub fn set_type(&self, oid: Oid, type_: &Type) {
128 self.cached_typeinfo.lock().types.insert(oid, type_.clone());
129 }
130
131 pub fn clear_type_cache(&self) {
132 self.cached_typeinfo.lock().types.clear();
133 }
134
135 /// Call the given function with a buffer to be used when writing out
136 /// postgres commands.
137 pub fn with_buf<F, R>(&self, f: F) -> R
138 where
139 F: FnOnce(&mut BytesMut) -> R,
140 {
141 let mut buffer = self.buffer.lock();
142 let r = f(&mut buffer);
143 buffer.clear();
144 r
145 }
146}
147
148#[derive(Clone)]
149pub(crate) struct SocketConfig {
150 pub host: Host,
151 pub port: u16,
152 pub connect_timeout: Option<Duration>,
153 pub keepalives: bool,
154 pub keepalives_idle: Duration,
155}
156
157/// An asynchronous openGauss client.
158///
159/// The client is one half of what is returned when a connection is established. Users interact with the database
160/// through this client object.
161pub struct Client {
162 inner: Arc<InnerClient>,
163 #[cfg(feature = "runtime")]
164 socket_config: Option<SocketConfig>,
165 ssl_mode: SslMode,
166 process_id: i32,
167 secret_key: i32,
168}
169
170impl Client {
171 pub(crate) fn new(
172 sender: mpsc::UnboundedSender<Request>,
173 ssl_mode: SslMode,
174 process_id: i32,
175 secret_key: i32,
176 ) -> Client {
177 Client {
178 inner: Arc::new(InnerClient {
179 sender,
180 cached_typeinfo: Default::default(),
181 buffer: Default::default(),
182 }),
183 #[cfg(feature = "runtime")]
184 socket_config: None,
185 ssl_mode,
186 process_id,
187 secret_key,
188 }
189 }
190
191 pub(crate) fn inner(&self) -> &Arc<InnerClient> {
192 &self.inner
193 }
194
195 #[cfg(feature = "runtime")]
196 pub(crate) fn set_socket_config(&mut self, socket_config: SocketConfig) {
197 self.socket_config = Some(socket_config);
198 }
199
200 /// Creates a new prepared statement.
201 ///
202 /// Prepared statements can be executed repeatedly, and may contain query parameters (indicated by `$1`, `$2`, etc),
203 /// which are set when executed. Prepared statements can only be used with the connection that created them.
204 pub async fn prepare(&self, query: &str) -> Result<Statement, Error> {
205 self.prepare_typed(query, &[]).await
206 }
207
208 /// Like `prepare`, but allows the types of query parameters to be explicitly specified.
209 ///
210 /// The list of types may be smaller than the number of parameters - the types of the remaining parameters will be
211 /// inferred. For example, `client.prepare_typed(query, &[])` is equivalent to `client.prepare(query)`.
212 pub async fn prepare_typed(
213 &self,
214 query: &str,
215 parameter_types: &[Type],
216 ) -> Result<Statement, Error> {
217 prepare::prepare(&self.inner, query, parameter_types).await
218 }
219
220 /// Executes a statement, returning a vector of the resulting rows.
221 ///
222 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
223 /// provided, 1-indexed.
224 ///
225 /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
226 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
227 /// with the `prepare` method.
228 ///
229 /// # Panics
230 ///
231 /// Panics if the number of parameters provided does not match the number expected.
232 pub async fn query<T>(
233 &self,
234 statement: &T,
235 params: &[&(dyn ToSql + Sync)],
236 ) -> Result<Vec<Row>, Error>
237 where
238 T: ?Sized + ToStatement,
239 {
240 self.query_raw(statement, slice_iter(params))
241 .await?
242 .try_collect()
243 .await
244 }
245
246 /// Executes a statement which returns a single row, returning it.
247 ///
248 /// Returns an error if the query does not return exactly one row.
249 ///
250 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
251 /// provided, 1-indexed.
252 ///
253 /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
254 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
255 /// with the `prepare` method.
256 ///
257 /// # Panics
258 ///
259 /// Panics if the number of parameters provided does not match the number expected.
260 pub async fn query_one<T>(
261 &self,
262 statement: &T,
263 params: &[&(dyn ToSql + Sync)],
264 ) -> Result<Row, Error>
265 where
266 T: ?Sized + ToStatement,
267 {
268 let stream = self.query_raw(statement, slice_iter(params)).await?;
269 pin_mut!(stream);
270
271 let row = match stream.try_next().await? {
272 Some(row) => row,
273 None => return Err(Error::row_count()),
274 };
275
276 if stream.try_next().await?.is_some() {
277 return Err(Error::row_count());
278 }
279
280 Ok(row)
281 }
282
283 /// Executes a statements which returns zero or one rows, returning it.
284 ///
285 /// Returns an error if the query returns more than one row.
286 ///
287 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
288 /// provided, 1-indexed.
289 ///
290 /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
291 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
292 /// with the `prepare` method.
293 ///
294 /// # Panics
295 ///
296 /// Panics if the number of parameters provided does not match the number expected.
297 pub async fn query_opt<T>(
298 &self,
299 statement: &T,
300 params: &[&(dyn ToSql + Sync)],
301 ) -> Result<Option<Row>, Error>
302 where
303 T: ?Sized + ToStatement,
304 {
305 let stream = self.query_raw(statement, slice_iter(params)).await?;
306 pin_mut!(stream);
307
308 let row = match stream.try_next().await? {
309 Some(row) => row,
310 None => return Ok(None),
311 };
312
313 if stream.try_next().await?.is_some() {
314 return Err(Error::row_count());
315 }
316
317 Ok(Some(row))
318 }
319
320 /// The maximally flexible version of [`query`].
321 ///
322 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
323 /// provided, 1-indexed.
324 ///
325 /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
326 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
327 /// with the `prepare` method.
328 ///
329 /// # Panics
330 ///
331 /// Panics if the number of parameters provided does not match the number expected.
332 ///
333 /// [`query`]: #method.query
334 ///
335 /// # Examples
336 ///
337 /// ```no_run
338 /// # async fn async_main(client: &tokio_opengauss::Client) -> Result<(), tokio_opengauss::Error> {
339 /// use tokio_opengauss::types::ToSql;
340 /// use futures::{pin_mut, TryStreamExt};
341 ///
342 /// let params: Vec<String> = vec![
343 /// "first param".into(),
344 /// "second param".into(),
345 /// ];
346 /// let mut it = client.query_raw(
347 /// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
348 /// params,
349 /// ).await?;
350 ///
351 /// pin_mut!(it);
352 /// while let Some(row) = it.try_next().await? {
353 /// let foo: i32 = row.get("foo");
354 /// println!("foo: {}", foo);
355 /// }
356 /// # Ok(())
357 /// # }
358 /// ```
359 pub async fn query_raw<T, P, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
360 where
361 T: ?Sized + ToStatement,
362 P: BorrowToSql,
363 I: IntoIterator<Item = P>,
364 I::IntoIter: ExactSizeIterator,
365 {
366 let statement = statement.__convert().into_statement(self).await?;
367 query::query(&self.inner, statement, params).await
368 }
369
370 /// Executes a statement, returning the number of rows modified.
371 ///
372 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
373 /// provided, 1-indexed.
374 ///
375 /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
376 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
377 /// with the `prepare` method.
378 ///
379 /// If the statement does not modify any rows (e.g. `SELECT`), 0 is returned.
380 ///
381 /// # Panics
382 ///
383 /// Panics if the number of parameters provided does not match the number expected.
384 pub async fn execute<T>(
385 &self,
386 statement: &T,
387 params: &[&(dyn ToSql + Sync)],
388 ) -> Result<u64, Error>
389 where
390 T: ?Sized + ToStatement,
391 {
392 self.execute_raw(statement, slice_iter(params)).await
393 }
394
395 /// The maximally flexible version of [`execute`].
396 ///
397 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
398 /// provided, 1-indexed.
399 ///
400 /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
401 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
402 /// with the `prepare` method.
403 ///
404 /// # Panics
405 ///
406 /// Panics if the number of parameters provided does not match the number expected.
407 ///
408 /// [`execute`]: #method.execute
409 pub async fn execute_raw<T, P, I>(&self, statement: &T, params: I) -> Result<u64, Error>
410 where
411 T: ?Sized + ToStatement,
412 P: BorrowToSql,
413 I: IntoIterator<Item = P>,
414 I::IntoIter: ExactSizeIterator,
415 {
416 let statement = statement.__convert().into_statement(self).await?;
417 query::execute(self.inner(), statement, params).await
418 }
419
420 /// Executes a `COPY FROM STDIN` statement, returning a sink used to write the copy data.
421 ///
422 /// openGauss does not support parameters in `COPY` statements, so this method does not take any. The copy *must*
423 /// be explicitly completed via the `Sink::close` or `finish` methods. If it is not, the copy will be aborted.
424 ///
425 /// # Panics
426 ///
427 /// Panics if the statement contains parameters.
428 pub async fn copy_in<T, U>(&self, statement: &T) -> Result<CopyInSink<U>, Error>
429 where
430 T: ?Sized + ToStatement,
431 U: Buf + 'static + Send,
432 {
433 let statement = statement.__convert().into_statement(self).await?;
434 copy_in::copy_in(self.inner(), statement).await
435 }
436
437 /// Executes a `COPY TO STDOUT` statement, returning a stream of the resulting data.
438 ///
439 /// openGauss does not support parameters in `COPY` statements, so this method does not take any.
440 ///
441 /// # Panics
442 ///
443 /// Panics if the statement contains parameters.
444 pub async fn copy_out<T>(&self, statement: &T) -> Result<CopyOutStream, Error>
445 where
446 T: ?Sized + ToStatement,
447 {
448 let statement = statement.__convert().into_statement(self).await?;
449 copy_out::copy_out(self.inner(), statement).await
450 }
451
452 /// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
453 ///
454 /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
455 /// point. The simple query protocol returns the values in rows as strings rather than in their binary encodings,
456 /// so the associated row type doesn't work with the `FromSql` trait. Rather than simply returning a list of the
457 /// rows, this method returns a list of an enum which indicates either the completion of one of the commands,
458 /// or a row of data. This preserves the framing between the separate statements in the request.
459 ///
460 /// # Warning
461 ///
462 /// Prepared statements should be use for any query which contains user-specified data, as they provided the
463 /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
464 /// them to this method!
465 pub async fn simple_query(&self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
466 self.simple_query_raw(query).await?.try_collect().await
467 }
468
469 pub(crate) async fn simple_query_raw(&self, query: &str) -> Result<SimpleQueryStream, Error> {
470 simple_query::simple_query(self.inner(), query).await
471 }
472
473 /// Executes a sequence of SQL statements using the simple query protocol.
474 ///
475 /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
476 /// point. This is intended for use when, for example, initializing a database schema.
477 ///
478 /// # Warning
479 ///
480 /// Prepared statements should be use for any query which contains user-specified data, as they provided the
481 /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
482 /// them to this method!
483 pub async fn batch_execute(&self, query: &str) -> Result<(), Error> {
484 simple_query::batch_execute(self.inner(), query).await
485 }
486
487 /// Begins a new database transaction.
488 ///
489 /// The transaction will roll back by default - use the `commit` method to commit it.
490 pub async fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
491 self.batch_execute("BEGIN").await?;
492 Ok(Transaction::new(self))
493 }
494
495 /// Returns a builder for a transaction with custom settings.
496 ///
497 /// Unlike the `transaction` method, the builder can be used to control the transaction's isolation level and other
498 /// attributes.
499 pub fn build_transaction(&mut self) -> TransactionBuilder<'_> {
500 TransactionBuilder::new(self)
501 }
502
503 /// Constructs a cancellation token that can later be used to request cancellation of a query running on the
504 /// connection associated with this client.
505 pub fn cancel_token(&self) -> CancelToken {
506 CancelToken {
507 #[cfg(feature = "runtime")]
508 socket_config: self.socket_config.clone(),
509 ssl_mode: self.ssl_mode,
510 process_id: self.process_id,
511 secret_key: self.secret_key,
512 }
513 }
514
515 /// Attempts to cancel an in-progress query.
516 ///
517 /// The server provides no information about whether a cancellation attempt was successful or not. An error will
518 /// only be returned if the client was unable to connect to the database.
519 ///
520 /// Requires the `runtime` Cargo feature (enabled by default).
521 #[cfg(feature = "runtime")]
522 #[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
523 pub async fn cancel_query<T>(&self, tls: T) -> Result<(), Error>
524 where
525 T: MakeTlsConnect<Socket>,
526 {
527 self.cancel_token().cancel_query(tls).await
528 }
529
530 /// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new
531 /// connection itself.
532 #[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
533 pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error>
534 where
535 S: AsyncRead + AsyncWrite + Unpin,
536 T: TlsConnect<S>,
537 {
538 self.cancel_token().cancel_query_raw(stream, tls).await
539 }
540
541 /// Clears the client's type information cache.
542 ///
543 /// When user-defined types are used in a query, the client loads their definitions from the database and caches
544 /// them for the lifetime of the client. If those definitions are changed in the database, this method can be used
545 /// to flush the local cache and allow the new, updated definitions to be loaded.
546 pub fn clear_type_cache(&self) {
547 self.inner().clear_type_cache();
548 }
549
550 /// Determines if the connection to the server has already closed.
551 ///
552 /// In that case, all future queries will fail.
553 pub fn is_closed(&self) -> bool {
554 self.inner.sender.is_closed()
555 }
556
557 #[doc(hidden)]
558 pub fn __private_api_close(&mut self) {
559 self.inner.sender.close_channel()
560 }
561}
562
563impl fmt::Debug for Client {
564 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
565 f.debug_struct("Client").finish()
566 }
567}