Struct ockam::SqlxDatabase

source ·
pub struct SqlxDatabase {
    pub pool: Arc<Pool<Sqlite>>,
}
Expand description

The SqlxDatabase struct is used to create a database:

  • at a given path
  • with a given schema / or migrations applied to an existing schema

We use sqlx as our primary interface for interacting with the database The database driver is currently Sqlite

Fields§

§pool: Arc<Pool<Sqlite>>

Pool of connections to the database

Implementations§

source§

impl SqlxDatabase

source

pub async fn create(path: impl AsRef<Path>) -> Result<SqlxDatabase, Error>

Constructor for a database persisted on disk

source

pub async fn create_with_migration( path: impl AsRef<Path>, migration_set: impl MigrationSet ) -> Result<SqlxDatabase, Error>

Constructor for a database persisted on disk, with a specific schema / migration

source

pub async fn create_no_migration( path: impl AsRef<Path> ) -> Result<SqlxDatabase, Error>

Constructor for a database persisted on disk without migration

source

pub async fn in_memory(usage: &str) -> Result<SqlxDatabase, Error>

Create a nodes database in memory => this database is deleted on an ockam reset command! (contrary to the application database below)

source

pub async fn application_in_memory(usage: &str) -> Result<SqlxDatabase, Error>

Create an application database in memory The application database which contains the application configurations => this database is NOT deleted on an ockam reset command!

source

pub async fn in_memory_with_migration( usage: &str, migration_set: impl MigrationSet ) -> Result<SqlxDatabase, Error>

Create an in-memory database with a specific migration

source

pub fn map_sql_err(err: Error) -> Error

Map a sqlx error into an ockam error

source

pub fn map_decode_err(err: Error) -> Error

Map a minicbor decode error into an ockam error

Methods from Deref<Target = Pool<Sqlite>>§

source

pub fn acquire( &self ) -> impl Future<Output = Result<PoolConnection<DB>, Error>> + 'static

Retrieves a connection from the pool.

The total time this method is allowed to execute is capped by PoolOptions::acquire_timeout. If that timeout elapses, this will return Error::PoolClosed.

§Note: Cancellation/Timeout May Drop Connections

If acquire is cancelled or times out after it acquires a connection from the idle queue or opens a new one, it will drop that connection because we don’t want to assume it is safe to return to the pool, and testing it to see if it’s safe to release could introduce subtle bugs if not implemented correctly. To avoid that entirely, we’ve decided to not gracefully handle cancellation here.

However, if your workload is sensitive to dropped connections such as using an in-memory SQLite database with a pool size of 1, you can pretty easily ensure that a cancelled acquire() call will never drop connections by tweaking your PoolOptions:

This should eliminate any potential .await points between acquiring a connection and returning it.

source

pub fn try_acquire(&self) -> Option<PoolConnection<DB>>

Attempts to retrieve a connection from the pool if there is one available.

Returns None immediately if there are no idle connections available in the pool or there are tasks waiting for a connection which have yet to wake.

source

pub async fn begin(&self) -> Result<Transaction<'static, DB>, Error>

Retrieves a connection and immediately begins a new transaction.

source

pub async fn try_begin(&self) -> Result<Option<Transaction<'static, DB>>, Error>

Attempts to retrieve a connection and immediately begins a new transaction if successful.

source

pub fn close(&self) -> impl Future<Output = ()>

Shut down the connection pool, immediately waking all tasks waiting for a connection.

Upon calling this method, any currently waiting or subsequent calls to Pool::acquire and the like will immediately return Error::PoolClosed and no new connections will be opened. Checked-out connections are unaffected, but will be gracefully closed on-drop rather than being returned to the pool.

Returns a Future which can be .awaited to ensure all connections are gracefully closed. It will first close any idle connections currently waiting in the pool, then wait for all checked-out connections to be returned or closed.

Waiting for connections to be gracefully closed is optional, but will allow the database server to clean up the resources sooner rather than later. This is especially important for tests that create a new pool every time, otherwise you may see errors about connection limits being exhausted even when running tests in a single thread.

If the returned Future is not run to completion, any remaining connections will be dropped when the last handle for the given pool instance is dropped, which could happen in a task spawned by Pool internally and so may be unpredictable otherwise.

.close() may be safely called and .awaited on multiple handles concurrently.

source

pub fn is_closed(&self) -> bool

Returns true if .close() has been called on the pool, false otherwise.

source

pub fn close_event(&self) -> CloseEvent

Get a future that resolves when Pool::close() is called.

If the pool is already closed, the future resolves immediately.

This can be used to cancel long-running operations that hold onto a PoolConnection so they don’t prevent the pool from closing (which would otherwise wait until all connections are returned).

§Examples

These examples use Postgres and Tokio, but should suffice to demonstrate the concept.

Do something when the pool is closed:

use sqlx::PgPool;

let pool = PgPool::connect("postgresql://...").await?;

let pool2 = pool.clone();

tokio::spawn(async move {
    // Demonstrates that `CloseEvent` is itself a `Future` you can wait on.
    // This lets you implement any kind of on-close event that you like.
    pool2.close_event().await;

    println!("Pool is closing!");

    // Imagine maybe recording application statistics or logging a report, etc.
});

// The rest of the application executes normally...

// Close the pool before the application exits...
pool.close().await;

Cancel a long-running operation:

use sqlx::{Executor, PgPool};

let pool = PgPool::connect("postgresql://...").await?;

let pool2 = pool.clone();

tokio::spawn(async move {
    pool2.close_event().do_until(async {
        // This statement normally won't return for 30 days!
        // (Assuming the connection doesn't time out first, of course.)
        pool2.execute("SELECT pg_sleep('30 days')").await;

        // If the pool is closed before the statement completes, this won't be printed.
        // This is because `.do_until()` cancels the future it's given if the
        // pool is closed first.
        println!("Waited!");
    }).await;
});

// This normally wouldn't return until the above statement completed and the connection
// was returned to the pool. However, thanks to `.do_until()`, the operation was
// cancelled as soon as we called `.close().await`.
pool.close().await;
source

pub fn size(&self) -> u32

Returns the number of connections currently active. This includes idle connections.

source

pub fn num_idle(&self) -> usize

Returns the number of connections active and idle (not in use).

As of 0.6.0, this has been fixed to use a separate atomic counter and so should be fine to call even at high load.

This previously called [crossbeam::queue::ArrayQueue::len()] which waits for the head and tail pointers to be in a consistent state, which may never happen at high levels of churn.

source

pub fn connect_options( &self ) -> Arc<<<DB as Database>::Connection as Connection>::Options>

Gets a clone of the connection options for this pool

source

pub fn set_connect_options( &self, connect_options: <<DB as Database>::Connection as Connection>::Options )

Updates the connection options this pool will use when opening any future connections. Any existing open connection in the pool will be left as-is.

source

pub fn options(&self) -> &PoolOptions<DB>

Get the options for this pool

Trait Implementations§

source§

impl Clone for SqlxDatabase

source§

fn clone(&self) -> SqlxDatabase

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl Debug for SqlxDatabase

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>

Formats the value using the given formatter. Read more
source§

impl Deref for SqlxDatabase

§

type Target = Pool<Sqlite>

The resulting type after dereferencing.
source§

fn deref(&self) -> &<SqlxDatabase as Deref>::Target

Dereferences the value.

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<D> AsyncTryClone for D
where D: Clone + Sync,

source§

fn async_try_clone<'life0, 'async_trait>( &'life0 self ) -> Pin<Box<dyn Future<Output = Result<D, Error>> + Send + 'async_trait>>
where 'life0: 'async_trait, D: 'async_trait,

Try cloning a object and return an Err in case of failure.
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoEither for T

source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<T> ToOwned for T
where T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more