actix_redis_client/
lib.rs

1#[macro_use]
2extern crate failure;
3extern crate actix;
4pub extern crate redis;
5
6use actix::prelude::*;
7use std::marker::PhantomData;
8
9mod error;
10pub use self::error::*;
11
12/// Result type
13pub type ActixRedisClientResult<T> = Result<T, ActixRedisClientError>;
14
15/// Basic command that can be sent to Redis client
16/// The redis crate is re-exposed to make use of `redis::cmd()` function to generate commands
17pub struct Command<T> {
18    cmd: redis::Cmd,
19    _marker: PhantomData<T>,
20}
21
22impl<T> Command<T> {
23    pub fn new(cmd: redis::Cmd) -> Self {
24        Command {
25            cmd,
26            _marker: PhantomData::default(),
27        }
28    }
29}
30
31impl<T: 'static> Message for Command<T> {
32    type Result = ActixRedisClientResult<T>;
33}
34
35/// Actor to give to Actix to do the background processing of Redis messages
36pub struct RedisExecutorSync(redis::Client);
37impl RedisExecutorSync {
38    fn new(client: redis::Client) -> Self {
39        RedisExecutorSync(client)
40    }
41
42    /// Starts the executor. Give it a number of threads and a factory `Fn() -> redis::Client` that handles client creation and you're good to go.
43    pub fn start<F>(threads: usize, client_factory: F) -> Addr<Self>
44    where
45        F: Fn() -> redis::Client + Send + Sync + 'static,
46    {
47        SyncArbiter::start(threads, move || Self::new(client_factory()))
48    }
49
50    /// Accessor to retrieve current Redis connection
51    pub fn get_connection(&self) -> Result<redis::Connection, ActixRedisClientError> {
52        match self.0.get_connection() {
53            Ok(v) => Ok(v),
54            Err(e) => Err(e.into()),
55        }
56    }
57
58    /// Accessor to retrieve current PubSub Redis connection
59    pub fn get_pubsub(&self) -> Result<redis::PubSub, ActixRedisClientError> {
60        match self.0.get_pubsub() {
61            Ok(v) => Ok(v),
62            Err(e) => Err(e.into()),
63        }
64    }
65}
66
67impl Actor for RedisExecutorSync {
68    type Context = SyncContext<Self>;
69}
70
71impl<T: redis::FromRedisValue + 'static> Handler<Command<T>> for RedisExecutorSync {
72    type Result = ActixRedisClientResult<T>;
73
74    fn handle(&mut self, cmd: Command<T>, _: &mut Self::Context) -> Self::Result {
75        match cmd.cmd.query(&self.0) {
76            Ok(v) => Ok(v),
77            Err(e) => Err(e.into()),
78        }
79    }
80}