actix_redis_client/
lib.rs1#[macro_use]
2extern crate failure;
3extern crate actix;
4pub extern crate redis;
5
6use actix::prelude::*;
7use std::marker::PhantomData;
8
9mod error;
10pub use self::error::*;
11
12pub type ActixRedisClientResult<T> = Result<T, ActixRedisClientError>;
14
15pub struct Command<T> {
18 cmd: redis::Cmd,
19 _marker: PhantomData<T>,
20}
21
22impl<T> Command<T> {
23 pub fn new(cmd: redis::Cmd) -> Self {
24 Command {
25 cmd,
26 _marker: PhantomData::default(),
27 }
28 }
29}
30
31impl<T: 'static> Message for Command<T> {
32 type Result = ActixRedisClientResult<T>;
33}
34
35pub struct RedisExecutorSync(redis::Client);
37impl RedisExecutorSync {
38 fn new(client: redis::Client) -> Self {
39 RedisExecutorSync(client)
40 }
41
42 pub fn start<F>(threads: usize, client_factory: F) -> Addr<Self>
44 where
45 F: Fn() -> redis::Client + Send + Sync + 'static,
46 {
47 SyncArbiter::start(threads, move || Self::new(client_factory()))
48 }
49
50 pub fn get_connection(&self) -> Result<redis::Connection, ActixRedisClientError> {
52 match self.0.get_connection() {
53 Ok(v) => Ok(v),
54 Err(e) => Err(e.into()),
55 }
56 }
57
58 pub fn get_pubsub(&self) -> Result<redis::PubSub, ActixRedisClientError> {
60 match self.0.get_pubsub() {
61 Ok(v) => Ok(v),
62 Err(e) => Err(e.into()),
63 }
64 }
65}
66
67impl Actor for RedisExecutorSync {
68 type Context = SyncContext<Self>;
69}
70
71impl<T: redis::FromRedisValue + 'static> Handler<Command<T>> for RedisExecutorSync {
72 type Result = ActixRedisClientResult<T>;
73
74 fn handle(&mut self, cmd: Command<T>, _: &mut Self::Context) -> Self::Result {
75 match cmd.cmd.query(&self.0) {
76 Ok(v) => Ok(v),
77 Err(e) => Err(e.into()),
78 }
79 }
80}