gel_tokio/
client.rs

1use std::future::Future;
2use std::sync::Arc;
3
4use gel_dsn::gel::Config;
5use gel_protocol::common::{Capabilities, Cardinality, IoFormat};
6use gel_protocol::model::Json;
7use gel_protocol::query_arg::QueryArgs;
8use gel_protocol::QueryResult;
9use tokio::time::sleep;
10
11use crate::errors::InvalidArgumentError;
12use crate::errors::NoDataError;
13use crate::errors::{Error, ErrorKind, SHOULD_RETRY};
14use crate::options::{RetryOptions, TransactionOptions};
15use crate::raw::{Options, PoolState, Response};
16use crate::raw::{Pool, QueryCapabilities};
17use crate::state::{AliasesDelta, ConfigDelta, GlobalsDelta};
18use crate::state::{AliasesModifier, ConfigModifier, Fn, GlobalsModifier};
19use crate::transaction;
20use crate::ResultVerbose;
21
22/// Gel database client.
23///
24/// Internally it contains a connection pool.
25///
26/// To create a client, use [`create_client`](crate::create_client) function (it
27/// gets database connection configuration from environment). You can also use
28/// [`Builder`](crate::Builder) to [`build`](`crate::Builder::new`) custom
29/// [`Config`] and [create a client](Client::new) using that config.
30///
31/// The `with_` methods ([`with_retry_options`](crate::Client::with_retry_options), [`with_transaction_options`](crate::Client::with_transaction_options), etc.)
32/// let you create a shallow copy of the client with adjusted options.
33#[derive(Debug, Clone)]
34pub struct Client {
35    options: Arc<Options>,
36    pool: Pool,
37}
38
39impl Client {
40    /// Create a new connection pool.
41    ///
42    /// Note this does not create a connection immediately.
43    /// Use [`ensure_connected()`][Client::ensure_connected] to establish a
44    /// connection and verify that the connection is usable.
45    pub fn new(config: &Config) -> Client {
46        Client {
47            options: Default::default(),
48            pool: Pool::new(config),
49        }
50    }
51
52    /// Ensure that there is at least one working connection to the pool.
53    ///
54    /// This can be used at application startup to ensure that you have a
55    /// working connection.
56    pub async fn ensure_connected(&self) -> Result<(), Error> {
57        self.pool.acquire().await?;
58        Ok(())
59    }
60
61    /// Query with retry.
62    async fn query_helper<R, A>(
63        &self,
64        query: impl AsRef<str>,
65        arguments: &A,
66        io_format: IoFormat,
67        cardinality: Cardinality,
68    ) -> Result<Response<Vec<R>>, Error>
69    where
70        A: QueryArgs,
71        R: QueryResult,
72    {
73        let mut iteration = 0;
74        loop {
75            let mut conn = self.pool.acquire().await?;
76
77            let conn = conn.inner();
78            let state = &self.options.state;
79            let caps = Capabilities::MODIFICATIONS | Capabilities::DDL;
80            match conn
81                .query(
82                    query.as_ref(),
83                    arguments,
84                    state,
85                    &self.options.annotations,
86                    caps,
87                    io_format,
88                    cardinality,
89                )
90                .await
91            {
92                Ok(resp) => return Ok(resp),
93                Err(e) => {
94                    let allow_retry = match e.get::<QueryCapabilities>() {
95                        // Error from a weird source, or just a bug
96                        // Let's keep on the safe side
97                        None => false,
98                        Some(QueryCapabilities::Unparsed) => true,
99                        Some(QueryCapabilities::Parsed(c)) => c.is_empty(),
100                    };
101                    if allow_retry && e.has_tag(SHOULD_RETRY) {
102                        let rule = self.options.retry.get_rule(&e);
103                        iteration += 1;
104                        if iteration < rule.attempts {
105                            let duration = (rule.backoff)(iteration);
106                            log::info!("Error: {e:#}. Retrying in {duration:?}...");
107                            sleep(duration).await;
108                            continue;
109                        }
110                    }
111                    return Err(e);
112                }
113            }
114        }
115    }
116
117    /// Execute a query and return a collection of results and warnings produced by the server.
118    ///
119    /// You will usually have to specify the return type for the query:
120    ///
121    /// ```rust,ignore
122    /// let greeting: (Vec<String>, _) = conn.query_with_warnings("select 'hello'", &()).await?;
123    /// ```
124    ///
125    /// This method can be used with both static arguments, like a tuple of
126    /// scalars, and with dynamic arguments [`gel_protocol::value::Value`].
127    /// Similarly, dynamically typed results are also supported.
128    pub async fn query_verbose<R, A>(
129        &self,
130        query: impl AsRef<str> + Send,
131        arguments: &A,
132    ) -> Result<ResultVerbose<Vec<R>>, Error>
133    where
134        A: QueryArgs,
135        R: QueryResult,
136    {
137        Client::query_helper(self, query, arguments, IoFormat::Binary, Cardinality::Many)
138            .await
139            .map(|Response { data, warnings, .. }| ResultVerbose { data, warnings })
140    }
141
142    /// Execute a query and return a collection of results.
143    ///
144    /// You will usually have to specify the return type for the query:
145    ///
146    /// ```rust,ignore
147    /// let greeting = pool.query::<String, _>("SELECT 'hello'", &());
148    /// // or
149    /// let greeting: Vec<String> = pool.query("SELECT 'hello'", &());
150    ///
151    /// let two_numbers: Vec<i32> = conn.query("select {<int32>$0, <int32>$1}", &(10, 20)).await?;
152    /// ```
153    ///
154    /// This method can be used with both static arguments, like a tuple of
155    /// scalars, and with dynamic arguments [`gel_protocol::value::Value`].
156    /// Similarly, dynamically typed results are also supported.
157    pub async fn query<R, A>(
158        &self,
159        query: impl AsRef<str> + Send,
160        arguments: &A,
161    ) -> Result<Vec<R>, Error>
162    where
163        A: QueryArgs,
164        R: QueryResult,
165    {
166        Client::query_helper(self, query, arguments, IoFormat::Binary, Cardinality::Many)
167            .await
168            .map(|r| r.data)
169    }
170
171    /// Execute a query and return a single result
172    ///
173    /// You will usually have to specify the return type for the query:
174    ///
175    /// ```rust,ignore
176    /// let greeting = pool.query_single::<String, _>("SELECT 'hello'", &());
177    /// // or
178    /// let greeting: Option<String> = pool.query_single(
179    ///     "SELECT 'hello'",
180    ///     &()
181    /// );
182    /// ```
183    ///
184    /// This method can be used with both static arguments, like a tuple of
185    /// scalars, and with dynamic arguments [`gel_protocol::value::Value`].
186    /// Similarly, dynamically typed results are also supported.
187    pub async fn query_single<R, A>(
188        &self,
189        query: impl AsRef<str> + Send,
190        arguments: &A,
191    ) -> Result<Option<R>, Error>
192    where
193        A: QueryArgs,
194        R: QueryResult + Send,
195    {
196        Client::query_helper(
197            self,
198            query,
199            arguments,
200            IoFormat::Binary,
201            Cardinality::AtMostOne,
202        )
203        .await
204        .map(|x| x.data.into_iter().next())
205    }
206
207    /// Execute a query and return a single result
208    ///
209    /// The query must return exactly one element. If the query returns more
210    /// than one element, a
211    /// [`ResultCardinalityMismatchError`][crate::errors::ResultCardinalityMismatchError]
212    /// is raised. If the query returns an empty set, a
213    /// [`NoDataError`][crate::errors::NoDataError] is raised.
214    ///
215    /// You will usually have to specify the return type for the query:
216    ///
217    /// ```rust,ignore
218    /// let greeting = pool.query_required_single::<String, _>(
219    ///     "SELECT 'hello'",
220    ///     &(),
221    /// );
222    /// // or
223    /// let greeting: String = pool.query_required_single(
224    ///     "SELECT 'hello'",
225    ///     &(),
226    /// );
227    /// ```
228    ///
229    /// This method can be used with both static arguments, like a tuple of
230    /// scalars, and with dynamic arguments [`gel_protocol::value::Value`].
231    /// Similarly, dynamically typed results are also supported.
232    pub async fn query_required_single<R, A>(
233        &self,
234        query: impl AsRef<str> + Send,
235        arguments: &A,
236    ) -> Result<R, Error>
237    where
238        A: QueryArgs,
239        R: QueryResult + Send,
240    {
241        Client::query_helper(
242            self,
243            query,
244            arguments,
245            IoFormat::Binary,
246            Cardinality::AtMostOne,
247        )
248        .await
249        .and_then(|x| {
250            x.data
251                .into_iter()
252                .next()
253                .ok_or_else(|| NoDataError::with_message("query row returned zero results"))
254        })
255    }
256
257    /// Execute a query and return the result as JSON.
258    pub async fn query_json(
259        &self,
260        query: impl AsRef<str>,
261        arguments: &impl QueryArgs,
262    ) -> Result<Json, Error> {
263        let res = self
264            .query_helper::<String, _>(query, arguments, IoFormat::Json, Cardinality::Many)
265            .await?;
266
267        let json = res
268            .data
269            .into_iter()
270            .next()
271            .ok_or_else(|| NoDataError::with_message("query row returned zero results"))?;
272
273        // we trust database to produce valid json
274        Ok(Json::new_unchecked(json))
275    }
276
277    /// Execute a query and return a single result as JSON.
278    ///
279    /// The query must return exactly one element. If the query returns more
280    /// than one element, a
281    /// [`ResultCardinalityMismatchError`][crate::errors::ResultCardinalityMismatchError]
282    /// is raised.
283    ///
284    /// ```rust,ignore
285    /// let query = "select <json>(
286    ///     insert Account {
287    ///     username := <str>$0
288    ///     }) {
289    ///     username,
290    ///     id
291    ///     };";
292    /// let json_res: Option<Json> = client
293    ///  .query_single_json(query, &("SomeUserName",))
294    ///     .await?;
295    /// ```
296    pub async fn query_single_json(
297        &self,
298        query: impl AsRef<str>,
299        arguments: &impl QueryArgs,
300    ) -> Result<Option<Json>, Error> {
301        let res = self
302            .query_helper::<String, _>(query, arguments, IoFormat::Json, Cardinality::AtMostOne)
303            .await?;
304
305        // we trust database to produce valid json
306        Ok(res.data.into_iter().next().map(Json::new_unchecked))
307    }
308
309    /// Execute a query and return a single result as JSON.
310    ///
311    /// The query must return exactly one element. If the query returns more
312    /// than one element, a
313    /// [`ResultCardinalityMismatchError`][crate::errors::ResultCardinalityMismatchError]
314    /// is raised. If the query returns an empty set, a
315    /// [`NoDataError`][crate::errors::NoDataError] is raised.
316    pub async fn query_required_single_json(
317        &self,
318        query: impl AsRef<str>,
319        arguments: &impl QueryArgs,
320    ) -> Result<Json, Error> {
321        self.query_single_json(query, arguments)
322            .await?
323            .ok_or_else(|| NoDataError::with_message("query row returned zero results"))
324    }
325
326    /// Execute a query and don't expect result
327    ///
328    /// This method can be used with both static arguments, like a tuple of
329    /// scalars, and with dynamic arguments [`gel_protocol::value::Value`].
330    /// Similarly, dynamically typed results are also supported.
331    pub async fn execute<A>(&self, query: impl AsRef<str>, arguments: &A) -> Result<(), Error>
332    where
333        A: QueryArgs,
334    {
335        let mut iteration = 0;
336        loop {
337            let mut conn = self.pool.acquire().await?;
338
339            let conn = conn.inner();
340            let state = &self.options.state;
341            let caps = Capabilities::MODIFICATIONS | Capabilities::DDL;
342            match conn
343                .execute(
344                    query.as_ref(),
345                    arguments,
346                    state,
347                    &self.options.annotations,
348                    caps,
349                )
350                .await
351            {
352                Ok(_) => return Ok(()),
353                Err(e) => {
354                    let allow_retry = match e.get::<QueryCapabilities>() {
355                        // Error from a weird source, or just a bug
356                        // Let's keep on the safe side
357                        None => false,
358                        Some(QueryCapabilities::Unparsed) => true,
359                        Some(QueryCapabilities::Parsed(c)) => c.is_empty(),
360                    };
361                    if allow_retry && e.has_tag(SHOULD_RETRY) {
362                        let rule = self.options.retry.get_rule(&e);
363                        iteration += 1;
364                        if iteration < rule.attempts {
365                            let duration = (rule.backoff)(iteration);
366                            log::info!("Error: {e:#}. Retrying in {duration:?}...");
367                            sleep(duration).await;
368                            continue;
369                        }
370                    }
371                    return Err(e);
372                }
373            }
374        }
375    }
376
377    /// Execute a transaction and retry.
378    ///
379    /// Transaction body must be encompassed in the closure. The closure **may
380    /// be executed multiple times**. This includes not only database queries
381    /// but also executing the whole function, so the transaction code must be
382    /// prepared to be idempotent.
383    ///
384    /// # Example
385    ///
386    /// ```rust,no_run
387    /// # async fn main_() -> Result<(), gel_tokio::Error> {
388    /// let conn = gel_tokio::create_client().await?;
389    /// let val = conn.transaction(|mut tx| async move {
390    ///     tx.query_required_single::<i64, _>("
391    ///         WITH C := UPDATE Counter SET { value := .value + 1}
392    ///         SELECT C.value LIMIT 1
393    ///         ", &()
394    ///     ).await
395    /// }).await?;
396    /// # Ok(())
397    /// # }
398    /// ```
399    ///
400    /// # Commit and rollback
401    ///
402    /// If the closure returns [Result::Ok], the transaction is committed.
403    /// If the closure returns [Result::Err], the transaction is either retried or aborted,
404    /// depending on weather the error has `SHOULD_RETRY`` tag set.
405    ///
406    /// To manually abort a transaction, [gel_errors::UserError] can be returned:
407    ///
408    /// ```rust,no_run
409    /// use gel_errors::ErrorKind;
410    /// # async fn main_() -> Result<(), gel_tokio::Error> {
411    /// # let conn = gel_tokio::create_client().await?;
412    /// let val = conn.transaction(|mut tx| async move {
413    ///     tx.execute("UPDATE Foo SET { x := 1 };", &()).await;
414    ///     Err(gel_errors::UserError::build()) // abort transaction
415    /// }).await?;
416    /// # Ok(())
417    /// # }
418    /// ```
419    ///
420    /// # Returning custom errors
421    ///
422    /// See [this example](https://github.com/edgedb/edgedb-rust/blob/master/gel-tokio/examples/transaction_errors.rs)
423    /// and [the documentation of the `gel-errors` crate](https://docs.rs/gel-errors/latest/gel_errors/)
424    /// for how to return custom error types.
425    ///
426    /// # Panics
427    ///
428    /// Function panics when transaction object passed to the closure is not
429    /// dropped after closure exists. General rule: do not store transaction
430    /// anywhere and do not send to another coroutine. Pass to all further
431    /// function calls by reference.
432    pub async fn transaction<T, B, F>(&self, body: B) -> Result<T, Error>
433    where
434        B: FnMut(transaction::RetryingTransaction) -> F,
435        F: Future<Output = Result<T, Error>>,
436    {
437        transaction::run_and_retry(&self.pool, self.options.clone(), body).await
438    }
439
440    /// Start a transaction without the retry mechanism.
441    ///
442    /// Returns [RawTransaction] which implements [crate::QueryExecutor] and can
443    /// be used to execute queries within the transaction.
444    ///
445    /// The transaction will never retry failed queries, even if the database signals that the
446    /// query should be retried. For this reason, it is recommended to use [Client::within_transaction]
447    /// when possible.
448    ///
449    /// <div class="warning">
450    /// Transactions can fail for benign reasons and should always handle that case gracefully.
451    /// `RawTransaction` does not provide any retry mechanisms, so this responsibility falls
452    /// onto the user. For example, even only two select queries in a transaction can fail due to
453    /// concurrent modification of the database.
454    /// </div>
455    ///
456    /// # Commit and rollback
457    ///
458    /// To commit the changes made during the transaction,
459    /// [commit](crate::RawTransaction::commit) method must be called, otherwise the
460    /// transaction will roll back when [RawTransaction] is dropped.
461    ///
462    /// # Example
463    ///
464    /// ```rust,no_run
465    /// # async fn main_() -> Result<(), gel_tokio::Error> {
466    /// let conn = gel_tokio::create_client().await?;
467    /// let mut tx = conn.transaction_raw().await?;
468    /// tx.query_required_single::<i64, _>("
469    ///     WITH C := UPDATE Counter SET { value := .value + 1}
470    ///     SELECT C.value LIMIT 1
471    ///     ", &()
472    /// ).await;
473    /// tx.commit().await;
474    /// # Ok(())
475    /// # }
476    /// ```
477    #[cfg(feature = "unstable")]
478    pub async fn transaction_raw(&self) -> Result<transaction::RawTransaction, Error> {
479        crate::transaction::start(&self.pool, self.options.clone()).await
480    }
481
482    /// Returns client with adjusted options for future transactions.
483    ///
484    /// This method returns a "shallow copy" of the current client
485    /// with modified transaction options.
486    ///
487    /// Both ``self`` and returned client can be used after, but when using
488    /// them transaction options applied will be different.
489    ///
490    /// Transaction options are used by the ``transaction`` method.
491    pub fn with_transaction_options(&self, options: TransactionOptions) -> Self {
492        Client {
493            options: Arc::new(Options {
494                transaction: options,
495                retry: self.options.retry.clone(),
496                state: self.options.state.clone(),
497                annotations: self.options.annotations.clone(),
498            }),
499            pool: self.pool.clone(),
500        }
501    }
502    /// Returns client with adjusted options for future retrying
503    /// transactions.
504    ///
505    /// This method returns a "shallow copy" of the current client
506    /// with modified transaction options.
507    ///
508    /// Both ``self`` and returned client can be used after, but when using
509    /// them transaction options applied will be different.
510    pub fn with_retry_options(&self, options: RetryOptions) -> Self {
511        Client {
512            options: Arc::new(Options {
513                transaction: self.options.transaction.clone(),
514                retry: options,
515                state: self.options.state.clone(),
516                annotations: self.options.annotations.clone(),
517            }),
518            pool: self.pool.clone(),
519        }
520    }
521
522    fn with_state(&self, f: impl FnOnce(&PoolState) -> PoolState) -> Self {
523        Client {
524            options: Arc::new(Options {
525                transaction: self.options.transaction.clone(),
526                retry: self.options.retry.clone(),
527                state: Arc::new(f(&self.options.state)),
528                annotations: self.options.annotations.clone(),
529            }),
530            pool: self.pool.clone(),
531        }
532    }
533
534    /// Returns the client with the specified global variables set
535    ///
536    /// Most commonly used with `#[derive(GlobalsDelta)]`.
537    ///
538    /// Note: this method is incremental, i.e. it adds (or removes) globals
539    /// instead of setting a definite set of variables. Use
540    /// `.with_globals(Unset(["name1", "name2"]))` to unset some variables.
541    ///
542    /// This method returns a "shallow copy" of the current client
543    /// with modified global variables
544    ///
545    /// Both ``self`` and returned client can be used after, but when using
546    /// them transaction options applied will be different.
547    pub fn with_globals(&self, globals: impl GlobalsDelta) -> Self {
548        self.with_state(|s| s.with_globals(globals))
549    }
550
551    /// Returns the client with the specified global variables set
552    ///
553    /// This method returns a "shallow copy" of the current client
554    /// with modified global variables
555    ///
556    /// Both ``self`` and returned client can be used after, but when using
557    /// them transaction options applied will be different.
558    ///
559    /// This is equivalent to `.with_globals(Fn(f))` but more ergonomic as it
560    /// allows type inference for lambda.
561    pub fn with_globals_fn(&self, f: impl FnOnce(&mut GlobalsModifier)) -> Self {
562        self.with_state(|s| s.with_globals(Fn(f)))
563    }
564
565    /// Returns the client with the specified aliases set
566    ///
567    /// This method returns a "shallow copy" of the current client
568    /// with modified aliases.
569    ///
570    /// Both ``self`` and returned client can be used after, but when using
571    /// them transaction options applied will be different.
572    pub fn with_aliases(&self, aliases: impl AliasesDelta) -> Self {
573        self.with_state(|s| s.with_aliases(aliases))
574    }
575
576    /// Returns the client with the specified aliases set
577    ///
578    /// This method returns a "shallow copy" of the current client
579    /// with modified aliases.
580    ///
581    /// Both ``self`` and returned client can be used after, but when using
582    /// them transaction options applied will be different.
583    ///
584    /// This is equivalent to `.with_aliases(Fn(f))` but more ergonomic as it
585    /// allows type inference for lambda.
586    pub fn with_aliases_fn(&self, f: impl FnOnce(&mut AliasesModifier)) -> Self {
587        self.with_state(|s| s.with_aliases(Fn(f)))
588    }
589
590    /// Returns the client with the default module set or unset
591    ///
592    /// This method returns a "shallow copy" of the current client
593    /// with modified default module.
594    ///
595    /// Both ``self`` and returned client can be used after, but when using
596    /// them transaction options applied will be different.
597    pub fn with_default_module(&self, module: Option<impl Into<String>>) -> Self {
598        self.with_state(|s| s.with_default_module(module.map(|m| m.into())))
599    }
600
601    /// Returns the client with the specified config
602    ///
603    /// Note: this method is incremental, i.e. it adds (or removes) individual
604    /// settings instead of setting a definite configuration. Use
605    /// `.with_config(Unset(["name1", "name2"]))` to unset some settings.
606    ///
607    /// This method returns a "shallow copy" of the current client
608    /// with modified global variables
609    ///
610    /// Both ``self`` and returned client can be used after, but when using
611    /// them transaction options applied will be different.
612    pub fn with_config(&self, cfg: impl ConfigDelta) -> Self {
613        self.with_state(|s| s.with_config(cfg))
614    }
615
616    /// Returns the client with the specified config
617    ///
618    /// Most commonly used with `#[derive(ConfigDelta)]`.
619    ///
620    /// This method returns a "shallow copy" of the current client
621    /// with modified global variables
622    ///
623    /// Both ``self`` and returned client can be used after, but when using
624    /// them transaction options applied will be different.
625    ///
626    /// This is equivalent to `.with_config(Fn(f))` but more ergonomic as it
627    /// allows type inference for lambda.
628    pub fn with_config_fn(&self, f: impl FnOnce(&mut ConfigModifier)) -> Self {
629        self.with_state(|s| s.with_config(Fn(f)))
630    }
631
632    /// Returns the client with the specified query tag.
633    ///
634    /// This method returns a "shallow copy" of the current client
635    /// with modified query tag.
636    ///
637    /// Both ``self`` and returned client can be used after, but when using
638    /// them query tag applied will be different.
639    pub fn with_tag(&self, tag: Option<&str>) -> Result<Self, Error> {
640        const KEY: &str = "tag";
641
642        let annotations = if self.options.annotations.get(KEY).map(|s| s.as_str()) != tag {
643            let mut annotations = (*self.options.annotations).clone();
644            if let Some(tag) = tag {
645                if tag.starts_with("edgedb/") {
646                    return Err(InvalidArgumentError::with_message("reserved tag: edgedb/*"));
647                }
648                if tag.starts_with("gel/") {
649                    return Err(InvalidArgumentError::with_message("reserved tag: gel/*"));
650                }
651                if tag.len() > 128 {
652                    return Err(InvalidArgumentError::with_message(
653                        "tag too long (> 128 bytes)",
654                    ));
655                }
656                annotations.insert(KEY.to_string(), tag.to_string());
657            } else {
658                annotations.remove(KEY);
659            }
660            Arc::new(annotations)
661        } else {
662            self.options.annotations.clone()
663        };
664
665        Ok(Client {
666            options: Arc::new(Options {
667                transaction: self.options.transaction.clone(),
668                retry: self.options.retry.clone(),
669                state: self.options.state.clone(),
670                annotations,
671            }),
672            pool: self.pool.clone(),
673        })
674    }
675}