[][src]Crate tokio_resource_pool

A generic resource pool for the Tokio ecosystem.

Example

To use it, you need to implment Manage for your resource, and then create a Pool with its background worker, spawn the worker. Once this is done, you can request a resource by calling Pool::check_out.

use futures::{try_ready, Async, Poll};
use futures::future::{lazy, Future, FutureResult, IntoFuture};
use redis::{RedisError, RedisFuture, RedisResult};
use redis::aio::{Connection, ConnectionLike};
use tokio;

use tokio_resource_pool::{Builder, CheckOut, Manage, Pool, Status, RealDependencies};

struct RedisManager {
    client: redis::Client,
}

impl RedisManager {
    fn new(url: impl redis::IntoConnectionInfo) -> RedisResult<Self> {
        let client = redis::Client::open(url)?;
        Ok(Self { client })
    }
}

impl Manage for RedisManager {
    type Resource = Connection;

    type Dependencies = RealDependencies;

    type CheckOut = RedisCheckOut;

    type Error = RedisError;

    type CreateFuture = Box<dyn Future<Item = Self::Resource, Error = Self::Error> + Send>;

    fn create(&self) -> Self::CreateFuture {
        Box::new(self.client.get_async_connection())
    }

    fn status(&self, _: &Self::Resource) -> Status {
        Status::Valid
    }

    type RecycleFuture = RecycleFuture;

    fn recycle(&self, connection: Self::Resource) -> Self::RecycleFuture {
        let inner = redis::cmd("PING").query_async::<_, ()>(connection);
        RecycleFuture { inner }
    }
}

pub struct RecycleFuture {
    inner: RedisFuture<(Connection, ())>,
}

impl Future for RecycleFuture {
    type Item = Option<Connection>;

    type Error = RedisError;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        let (connection, ()) = try_ready!(self.inner.poll());
        Ok(Async::Ready(Some(connection)))
    }
}

pub struct RedisCheckOut {
    inner: CheckOut<RedisManager>,
}

impl ConnectionLike for RedisCheckOut {
    fn req_packed_command(
        self,
        cmd: Vec<u8>,
    ) -> Box<dyn Future<Item = (Self, redis::Value), Error = RedisError> + Send> {
        let borrower = move |connection: Connection| connection.req_packed_command(cmd);
        Box::new(self.inner.lend(borrower))
    }

    fn req_packed_commands(
        self,
        cmd: Vec<u8>,
        offset: usize,
        count: usize,
    ) -> Box<dyn Future<Item = (Self, Vec<redis::Value>), Error = RedisError> + Send> {
        let borrower =
            move |connection: Connection| connection.req_packed_commands(cmd, offset, count);
        Box::new(self.inner.lend(borrower))
    }

    fn get_db(&self) -> i64 {
        self.inner.get_db()
    }
}

impl From<CheckOut<RedisManager>> for RedisCheckOut {
    fn from(inner: CheckOut<RedisManager>) -> Self {
        Self { inner }
    }
}

let manager = RedisManager::new("redis://127.0.0.1/")?;
tokio::run(lazy(move || {
    let pool = Builder::new().build(4, manager);
    tokio::spawn(
        pool.check_out()
            .and_then(|connection| {
                redis::cmd("INFO").query_async::<_, redis::InfoDict>(connection)
            })
            .map(|(_, info)| println!("{:#?}", info))
            .map_err(|error| eprintln!("error: {}", error)),
    )
}));

Alternatives

There is another resource pool called bb8. It has two significant differences.

  • The API is different. This library gives you a struct that dereferences to your resource while bb8 turns a closure from a resource to a Future that yields the resource back.

  • Reaping is done differently. This library reaps resources as soon as they are returned, while bb8 reaps them at a given interval.

Structs

Builder
CheckOut

A check out of a resource from a Pool. The resource is automatically returned when the CheckOut is dropped.

CheckOutFuture

A Future that will yield a resource from the pool on completion.

LentCheckOut

A future where a resource is lent to an opaque, asynchronous computation.

Pool

A handle to a pool through which resources are requested.

Enums

RealDependencies
Status

Traits

Dependencies
Manage

A trait for managing the lifecycle of a resource.