gel_tokio/client.rs
1use std::future::Future;
2use std::sync::Arc;
3
4use gel_dsn::gel::Config;
5use gel_protocol::common::{Capabilities, Cardinality, IoFormat};
6use gel_protocol::model::Json;
7use gel_protocol::query_arg::QueryArgs;
8use gel_protocol::QueryResult;
9use tokio::time::sleep;
10
11use crate::errors::InvalidArgumentError;
12use crate::errors::NoDataError;
13use crate::errors::{Error, ErrorKind, SHOULD_RETRY};
14use crate::options::{RetryOptions, TransactionOptions};
15use crate::raw::{Options, PoolState, Response};
16use crate::raw::{Pool, QueryCapabilities};
17use crate::state::{AliasesDelta, ConfigDelta, GlobalsDelta};
18use crate::state::{AliasesModifier, ConfigModifier, Fn, GlobalsModifier};
19use crate::transaction;
20use crate::ResultVerbose;
21
22/// Gel database client.
23///
24/// Internally it contains a connection pool.
25///
26/// To create a client, use [`create_client`](crate::create_client) function (it
27/// gets database connection configuration from environment). You can also use
28/// [`Builder`](crate::Builder) to [`build`](`crate::Builder::new`) custom
29/// [`Config`] and [create a client](Client::new) using that config.
30///
31/// The `with_` methods ([`with_retry_options`](crate::Client::with_retry_options), [`with_transaction_options`](crate::Client::with_transaction_options), etc.)
32/// let you create a shallow copy of the client with adjusted options.
33#[derive(Debug, Clone)]
34pub struct Client {
35 options: Arc<Options>,
36 pool: Pool,
37}
38
39impl Client {
40 /// Create a new connection pool.
41 ///
42 /// Note this does not create a connection immediately.
43 /// Use [`ensure_connected()`][Client::ensure_connected] to establish a
44 /// connection and verify that the connection is usable.
45 pub fn new(config: &Config) -> Client {
46 Client {
47 options: Default::default(),
48 pool: Pool::new(config),
49 }
50 }
51
52 /// Ensure that there is at least one working connection to the pool.
53 ///
54 /// This can be used at application startup to ensure that you have a
55 /// working connection.
56 pub async fn ensure_connected(&self) -> Result<(), Error> {
57 self.pool.acquire().await?;
58 Ok(())
59 }
60
61 /// Query with retry.
62 async fn query_helper<R, A>(
63 &self,
64 query: impl AsRef<str>,
65 arguments: &A,
66 io_format: IoFormat,
67 cardinality: Cardinality,
68 ) -> Result<Response<Vec<R>>, Error>
69 where
70 A: QueryArgs,
71 R: QueryResult,
72 {
73 let mut iteration = 0;
74 loop {
75 let mut conn = self.pool.acquire().await?;
76
77 let conn = conn.inner();
78 let state = &self.options.state;
79 let caps = Capabilities::MODIFICATIONS | Capabilities::DDL;
80 match conn
81 .query(
82 query.as_ref(),
83 arguments,
84 state,
85 &self.options.annotations,
86 caps,
87 io_format,
88 cardinality,
89 )
90 .await
91 {
92 Ok(resp) => return Ok(resp),
93 Err(e) => {
94 let allow_retry = match e.get::<QueryCapabilities>() {
95 // Error from a weird source, or just a bug
96 // Let's keep on the safe side
97 None => false,
98 Some(QueryCapabilities::Unparsed) => true,
99 Some(QueryCapabilities::Parsed(c)) => c.is_empty(),
100 };
101 if allow_retry && e.has_tag(SHOULD_RETRY) {
102 let rule = self.options.retry.get_rule(&e);
103 iteration += 1;
104 if iteration < rule.attempts {
105 let duration = (rule.backoff)(iteration);
106 log::info!("Error: {e:#}. Retrying in {duration:?}...");
107 sleep(duration).await;
108 continue;
109 }
110 }
111 return Err(e);
112 }
113 }
114 }
115 }
116
117 /// Execute a query and return a collection of results and warnings produced by the server.
118 ///
119 /// You will usually have to specify the return type for the query:
120 ///
121 /// ```rust,ignore
122 /// let greeting: (Vec<String>, _) = conn.query_with_warnings("select 'hello'", &()).await?;
123 /// ```
124 ///
125 /// This method can be used with both static arguments, like a tuple of
126 /// scalars, and with dynamic arguments [`gel_protocol::value::Value`].
127 /// Similarly, dynamically typed results are also supported.
128 pub async fn query_verbose<R, A>(
129 &self,
130 query: impl AsRef<str> + Send,
131 arguments: &A,
132 ) -> Result<ResultVerbose<Vec<R>>, Error>
133 where
134 A: QueryArgs,
135 R: QueryResult,
136 {
137 Client::query_helper(self, query, arguments, IoFormat::Binary, Cardinality::Many)
138 .await
139 .map(|Response { data, warnings, .. }| ResultVerbose { data, warnings })
140 }
141
142 /// Execute a query and return a collection of results.
143 ///
144 /// You will usually have to specify the return type for the query:
145 ///
146 /// ```rust,ignore
147 /// let greeting = pool.query::<String, _>("SELECT 'hello'", &());
148 /// // or
149 /// let greeting: Vec<String> = pool.query("SELECT 'hello'", &());
150 ///
151 /// let two_numbers: Vec<i32> = conn.query("select {<int32>$0, <int32>$1}", &(10, 20)).await?;
152 /// ```
153 ///
154 /// This method can be used with both static arguments, like a tuple of
155 /// scalars, and with dynamic arguments [`gel_protocol::value::Value`].
156 /// Similarly, dynamically typed results are also supported.
157 pub async fn query<R, A>(
158 &self,
159 query: impl AsRef<str> + Send,
160 arguments: &A,
161 ) -> Result<Vec<R>, Error>
162 where
163 A: QueryArgs,
164 R: QueryResult,
165 {
166 Client::query_helper(self, query, arguments, IoFormat::Binary, Cardinality::Many)
167 .await
168 .map(|r| r.data)
169 }
170
171 /// Execute a query and return a single result
172 ///
173 /// You will usually have to specify the return type for the query:
174 ///
175 /// ```rust,ignore
176 /// let greeting = pool.query_single::<String, _>("SELECT 'hello'", &());
177 /// // or
178 /// let greeting: Option<String> = pool.query_single(
179 /// "SELECT 'hello'",
180 /// &()
181 /// );
182 /// ```
183 ///
184 /// This method can be used with both static arguments, like a tuple of
185 /// scalars, and with dynamic arguments [`gel_protocol::value::Value`].
186 /// Similarly, dynamically typed results are also supported.
187 pub async fn query_single<R, A>(
188 &self,
189 query: impl AsRef<str> + Send,
190 arguments: &A,
191 ) -> Result<Option<R>, Error>
192 where
193 A: QueryArgs,
194 R: QueryResult + Send,
195 {
196 Client::query_helper(
197 self,
198 query,
199 arguments,
200 IoFormat::Binary,
201 Cardinality::AtMostOne,
202 )
203 .await
204 .map(|x| x.data.into_iter().next())
205 }
206
207 /// Execute a query and return a single result
208 ///
209 /// The query must return exactly one element. If the query returns more
210 /// than one element, a
211 /// [`ResultCardinalityMismatchError`][crate::errors::ResultCardinalityMismatchError]
212 /// is raised. If the query returns an empty set, a
213 /// [`NoDataError`][crate::errors::NoDataError] is raised.
214 ///
215 /// You will usually have to specify the return type for the query:
216 ///
217 /// ```rust,ignore
218 /// let greeting = pool.query_required_single::<String, _>(
219 /// "SELECT 'hello'",
220 /// &(),
221 /// );
222 /// // or
223 /// let greeting: String = pool.query_required_single(
224 /// "SELECT 'hello'",
225 /// &(),
226 /// );
227 /// ```
228 ///
229 /// This method can be used with both static arguments, like a tuple of
230 /// scalars, and with dynamic arguments [`gel_protocol::value::Value`].
231 /// Similarly, dynamically typed results are also supported.
232 pub async fn query_required_single<R, A>(
233 &self,
234 query: impl AsRef<str> + Send,
235 arguments: &A,
236 ) -> Result<R, Error>
237 where
238 A: QueryArgs,
239 R: QueryResult + Send,
240 {
241 Client::query_helper(
242 self,
243 query,
244 arguments,
245 IoFormat::Binary,
246 Cardinality::AtMostOne,
247 )
248 .await
249 .and_then(|x| {
250 x.data
251 .into_iter()
252 .next()
253 .ok_or_else(|| NoDataError::with_message("query row returned zero results"))
254 })
255 }
256
257 /// Execute a query and return the result as JSON.
258 pub async fn query_json(
259 &self,
260 query: impl AsRef<str>,
261 arguments: &impl QueryArgs,
262 ) -> Result<Json, Error> {
263 let res = self
264 .query_helper::<String, _>(query, arguments, IoFormat::Json, Cardinality::Many)
265 .await?;
266
267 let json = res
268 .data
269 .into_iter()
270 .next()
271 .ok_or_else(|| NoDataError::with_message("query row returned zero results"))?;
272
273 // we trust database to produce valid json
274 Ok(Json::new_unchecked(json))
275 }
276
277 /// Execute a query and return a single result as JSON.
278 ///
279 /// The query must return exactly one element. If the query returns more
280 /// than one element, a
281 /// [`ResultCardinalityMismatchError`][crate::errors::ResultCardinalityMismatchError]
282 /// is raised.
283 ///
284 /// ```rust,ignore
285 /// let query = "select <json>(
286 /// insert Account {
287 /// username := <str>$0
288 /// }) {
289 /// username,
290 /// id
291 /// };";
292 /// let json_res: Option<Json> = client
293 /// .query_single_json(query, &("SomeUserName",))
294 /// .await?;
295 /// ```
296 pub async fn query_single_json(
297 &self,
298 query: impl AsRef<str>,
299 arguments: &impl QueryArgs,
300 ) -> Result<Option<Json>, Error> {
301 let res = self
302 .query_helper::<String, _>(query, arguments, IoFormat::Json, Cardinality::AtMostOne)
303 .await?;
304
305 // we trust database to produce valid json
306 Ok(res.data.into_iter().next().map(Json::new_unchecked))
307 }
308
309 /// Execute a query and return a single result as JSON.
310 ///
311 /// The query must return exactly one element. If the query returns more
312 /// than one element, a
313 /// [`ResultCardinalityMismatchError`][crate::errors::ResultCardinalityMismatchError]
314 /// is raised. If the query returns an empty set, a
315 /// [`NoDataError`][crate::errors::NoDataError] is raised.
316 pub async fn query_required_single_json(
317 &self,
318 query: impl AsRef<str>,
319 arguments: &impl QueryArgs,
320 ) -> Result<Json, Error> {
321 self.query_single_json(query, arguments)
322 .await?
323 .ok_or_else(|| NoDataError::with_message("query row returned zero results"))
324 }
325
326 /// Execute a query and don't expect result
327 ///
328 /// This method can be used with both static arguments, like a tuple of
329 /// scalars, and with dynamic arguments [`gel_protocol::value::Value`].
330 /// Similarly, dynamically typed results are also supported.
331 pub async fn execute<A>(&self, query: impl AsRef<str>, arguments: &A) -> Result<(), Error>
332 where
333 A: QueryArgs,
334 {
335 let mut iteration = 0;
336 loop {
337 let mut conn = self.pool.acquire().await?;
338
339 let conn = conn.inner();
340 let state = &self.options.state;
341 let caps = Capabilities::MODIFICATIONS | Capabilities::DDL;
342 match conn
343 .execute(
344 query.as_ref(),
345 arguments,
346 state,
347 &self.options.annotations,
348 caps,
349 )
350 .await
351 {
352 Ok(_) => return Ok(()),
353 Err(e) => {
354 let allow_retry = match e.get::<QueryCapabilities>() {
355 // Error from a weird source, or just a bug
356 // Let's keep on the safe side
357 None => false,
358 Some(QueryCapabilities::Unparsed) => true,
359 Some(QueryCapabilities::Parsed(c)) => c.is_empty(),
360 };
361 if allow_retry && e.has_tag(SHOULD_RETRY) {
362 let rule = self.options.retry.get_rule(&e);
363 iteration += 1;
364 if iteration < rule.attempts {
365 let duration = (rule.backoff)(iteration);
366 log::info!("Error: {e:#}. Retrying in {duration:?}...");
367 sleep(duration).await;
368 continue;
369 }
370 }
371 return Err(e);
372 }
373 }
374 }
375 }
376
377 /// Execute a transaction and retry.
378 ///
379 /// Transaction body must be encompassed in the closure. The closure **may
380 /// be executed multiple times**. This includes not only database queries
381 /// but also executing the whole function, so the transaction code must be
382 /// prepared to be idempotent.
383 ///
384 /// # Example
385 ///
386 /// ```rust,no_run
387 /// # async fn main_() -> Result<(), gel_tokio::Error> {
388 /// let conn = gel_tokio::create_client().await?;
389 /// let val = conn.transaction(|mut tx| async move {
390 /// tx.query_required_single::<i64, _>("
391 /// WITH C := UPDATE Counter SET { value := .value + 1}
392 /// SELECT C.value LIMIT 1
393 /// ", &()
394 /// ).await
395 /// }).await?;
396 /// # Ok(())
397 /// # }
398 /// ```
399 ///
400 /// # Commit and rollback
401 ///
402 /// If the closure returns [Result::Ok], the transaction is committed.
403 /// If the closure returns [Result::Err], the transaction is either retried or aborted,
404 /// depending on weather the error has `SHOULD_RETRY`` tag set.
405 ///
406 /// To manually abort a transaction, [gel_errors::UserError] can be returned:
407 ///
408 /// ```rust,no_run
409 /// use gel_errors::ErrorKind;
410 /// # async fn main_() -> Result<(), gel_tokio::Error> {
411 /// # let conn = gel_tokio::create_client().await?;
412 /// let val = conn.transaction(|mut tx| async move {
413 /// tx.execute("UPDATE Foo SET { x := 1 };", &()).await;
414 /// Err(gel_errors::UserError::build()) // abort transaction
415 /// }).await?;
416 /// # Ok(())
417 /// # }
418 /// ```
419 ///
420 /// # Returning custom errors
421 ///
422 /// See [this example](https://github.com/edgedb/edgedb-rust/blob/master/gel-tokio/examples/transaction_errors.rs)
423 /// and [the documentation of the `gel-errors` crate](https://docs.rs/gel-errors/latest/gel_errors/)
424 /// for how to return custom error types.
425 ///
426 /// # Panics
427 ///
428 /// Function panics when transaction object passed to the closure is not
429 /// dropped after closure exists. General rule: do not store transaction
430 /// anywhere and do not send to another coroutine. Pass to all further
431 /// function calls by reference.
432 pub async fn transaction<T, B, F>(&self, body: B) -> Result<T, Error>
433 where
434 B: FnMut(transaction::RetryingTransaction) -> F,
435 F: Future<Output = Result<T, Error>>,
436 {
437 transaction::run_and_retry(&self.pool, self.options.clone(), body).await
438 }
439
440 /// Start a transaction without the retry mechanism.
441 ///
442 /// Returns [RawTransaction] which implements [crate::QueryExecutor] and can
443 /// be used to execute queries within the transaction.
444 ///
445 /// The transaction will never retry failed queries, even if the database signals that the
446 /// query should be retried. For this reason, it is recommended to use [Client::within_transaction]
447 /// when possible.
448 ///
449 /// <div class="warning">
450 /// Transactions can fail for benign reasons and should always handle that case gracefully.
451 /// `RawTransaction` does not provide any retry mechanisms, so this responsibility falls
452 /// onto the user. For example, even only two select queries in a transaction can fail due to
453 /// concurrent modification of the database.
454 /// </div>
455 ///
456 /// # Commit and rollback
457 ///
458 /// To commit the changes made during the transaction,
459 /// [commit](crate::RawTransaction::commit) method must be called, otherwise the
460 /// transaction will roll back when [RawTransaction] is dropped.
461 ///
462 /// # Example
463 ///
464 /// ```rust,no_run
465 /// # async fn main_() -> Result<(), gel_tokio::Error> {
466 /// let conn = gel_tokio::create_client().await?;
467 /// let mut tx = conn.transaction_raw().await?;
468 /// tx.query_required_single::<i64, _>("
469 /// WITH C := UPDATE Counter SET { value := .value + 1}
470 /// SELECT C.value LIMIT 1
471 /// ", &()
472 /// ).await;
473 /// tx.commit().await;
474 /// # Ok(())
475 /// # }
476 /// ```
477 #[cfg(feature = "unstable")]
478 pub async fn transaction_raw(&self) -> Result<transaction::RawTransaction, Error> {
479 crate::transaction::start(&self.pool, self.options.clone()).await
480 }
481
482 /// Returns client with adjusted options for future transactions.
483 ///
484 /// This method returns a "shallow copy" of the current client
485 /// with modified transaction options.
486 ///
487 /// Both ``self`` and returned client can be used after, but when using
488 /// them transaction options applied will be different.
489 ///
490 /// Transaction options are used by the ``transaction`` method.
491 pub fn with_transaction_options(&self, options: TransactionOptions) -> Self {
492 Client {
493 options: Arc::new(Options {
494 transaction: options,
495 retry: self.options.retry.clone(),
496 state: self.options.state.clone(),
497 annotations: self.options.annotations.clone(),
498 }),
499 pool: self.pool.clone(),
500 }
501 }
502 /// Returns client with adjusted options for future retrying
503 /// transactions.
504 ///
505 /// This method returns a "shallow copy" of the current client
506 /// with modified transaction options.
507 ///
508 /// Both ``self`` and returned client can be used after, but when using
509 /// them transaction options applied will be different.
510 pub fn with_retry_options(&self, options: RetryOptions) -> Self {
511 Client {
512 options: Arc::new(Options {
513 transaction: self.options.transaction.clone(),
514 retry: options,
515 state: self.options.state.clone(),
516 annotations: self.options.annotations.clone(),
517 }),
518 pool: self.pool.clone(),
519 }
520 }
521
522 fn with_state(&self, f: impl FnOnce(&PoolState) -> PoolState) -> Self {
523 Client {
524 options: Arc::new(Options {
525 transaction: self.options.transaction.clone(),
526 retry: self.options.retry.clone(),
527 state: Arc::new(f(&self.options.state)),
528 annotations: self.options.annotations.clone(),
529 }),
530 pool: self.pool.clone(),
531 }
532 }
533
534 /// Returns the client with the specified global variables set
535 ///
536 /// Most commonly used with `#[derive(GlobalsDelta)]`.
537 ///
538 /// Note: this method is incremental, i.e. it adds (or removes) globals
539 /// instead of setting a definite set of variables. Use
540 /// `.with_globals(Unset(["name1", "name2"]))` to unset some variables.
541 ///
542 /// This method returns a "shallow copy" of the current client
543 /// with modified global variables
544 ///
545 /// Both ``self`` and returned client can be used after, but when using
546 /// them transaction options applied will be different.
547 pub fn with_globals(&self, globals: impl GlobalsDelta) -> Self {
548 self.with_state(|s| s.with_globals(globals))
549 }
550
551 /// Returns the client with the specified global variables set
552 ///
553 /// This method returns a "shallow copy" of the current client
554 /// with modified global variables
555 ///
556 /// Both ``self`` and returned client can be used after, but when using
557 /// them transaction options applied will be different.
558 ///
559 /// This is equivalent to `.with_globals(Fn(f))` but more ergonomic as it
560 /// allows type inference for lambda.
561 pub fn with_globals_fn(&self, f: impl FnOnce(&mut GlobalsModifier)) -> Self {
562 self.with_state(|s| s.with_globals(Fn(f)))
563 }
564
565 /// Returns the client with the specified aliases set
566 ///
567 /// This method returns a "shallow copy" of the current client
568 /// with modified aliases.
569 ///
570 /// Both ``self`` and returned client can be used after, but when using
571 /// them transaction options applied will be different.
572 pub fn with_aliases(&self, aliases: impl AliasesDelta) -> Self {
573 self.with_state(|s| s.with_aliases(aliases))
574 }
575
576 /// Returns the client with the specified aliases set
577 ///
578 /// This method returns a "shallow copy" of the current client
579 /// with modified aliases.
580 ///
581 /// Both ``self`` and returned client can be used after, but when using
582 /// them transaction options applied will be different.
583 ///
584 /// This is equivalent to `.with_aliases(Fn(f))` but more ergonomic as it
585 /// allows type inference for lambda.
586 pub fn with_aliases_fn(&self, f: impl FnOnce(&mut AliasesModifier)) -> Self {
587 self.with_state(|s| s.with_aliases(Fn(f)))
588 }
589
590 /// Returns the client with the default module set or unset
591 ///
592 /// This method returns a "shallow copy" of the current client
593 /// with modified default module.
594 ///
595 /// Both ``self`` and returned client can be used after, but when using
596 /// them transaction options applied will be different.
597 pub fn with_default_module(&self, module: Option<impl Into<String>>) -> Self {
598 self.with_state(|s| s.with_default_module(module.map(|m| m.into())))
599 }
600
601 /// Returns the client with the specified config
602 ///
603 /// Note: this method is incremental, i.e. it adds (or removes) individual
604 /// settings instead of setting a definite configuration. Use
605 /// `.with_config(Unset(["name1", "name2"]))` to unset some settings.
606 ///
607 /// This method returns a "shallow copy" of the current client
608 /// with modified global variables
609 ///
610 /// Both ``self`` and returned client can be used after, but when using
611 /// them transaction options applied will be different.
612 pub fn with_config(&self, cfg: impl ConfigDelta) -> Self {
613 self.with_state(|s| s.with_config(cfg))
614 }
615
616 /// Returns the client with the specified config
617 ///
618 /// Most commonly used with `#[derive(ConfigDelta)]`.
619 ///
620 /// This method returns a "shallow copy" of the current client
621 /// with modified global variables
622 ///
623 /// Both ``self`` and returned client can be used after, but when using
624 /// them transaction options applied will be different.
625 ///
626 /// This is equivalent to `.with_config(Fn(f))` but more ergonomic as it
627 /// allows type inference for lambda.
628 pub fn with_config_fn(&self, f: impl FnOnce(&mut ConfigModifier)) -> Self {
629 self.with_state(|s| s.with_config(Fn(f)))
630 }
631
632 /// Returns the client with the specified query tag.
633 ///
634 /// This method returns a "shallow copy" of the current client
635 /// with modified query tag.
636 ///
637 /// Both ``self`` and returned client can be used after, but when using
638 /// them query tag applied will be different.
639 pub fn with_tag(&self, tag: Option<&str>) -> Result<Self, Error> {
640 const KEY: &str = "tag";
641
642 let annotations = if self.options.annotations.get(KEY).map(|s| s.as_str()) != tag {
643 let mut annotations = (*self.options.annotations).clone();
644 if let Some(tag) = tag {
645 if tag.starts_with("edgedb/") {
646 return Err(InvalidArgumentError::with_message("reserved tag: edgedb/*"));
647 }
648 if tag.starts_with("gel/") {
649 return Err(InvalidArgumentError::with_message("reserved tag: gel/*"));
650 }
651 if tag.len() > 128 {
652 return Err(InvalidArgumentError::with_message(
653 "tag too long (> 128 bytes)",
654 ));
655 }
656 annotations.insert(KEY.to_string(), tag.to_string());
657 } else {
658 annotations.remove(KEY);
659 }
660 Arc::new(annotations)
661 } else {
662 self.options.annotations.clone()
663 };
664
665 Ok(Client {
666 options: Arc::new(Options {
667 transaction: self.options.transaction.clone(),
668 retry: self.options.retry.clone(),
669 state: self.options.state.clone(),
670 annotations,
671 }),
672 pool: self.pool.clone(),
673 })
674 }
675}