actix_redis/
redis.rs

1use std::{collections::VecDeque, io};
2
3use actix::prelude::*;
4use actix_rt::net::TcpStream;
5use actix_service::boxed::{self, BoxService};
6use actix_tls::connect::{ConnectError, ConnectInfo, Connection, ConnectorService};
7use backoff::{backoff::Backoff, ExponentialBackoff};
8use log::{error, info, warn};
9use redis_async::{
10    error::Error as RespError,
11    resp::{RespCodec, RespValue},
12};
13use tokio::{
14    io::{split, WriteHalf},
15    sync::oneshot,
16};
17use tokio_util::codec::FramedRead;
18
19use crate::Error;
20
21/// Command for sending data to Redis.
22#[derive(Debug)]
23pub struct Command(pub RespValue);
24
25impl Message for Command {
26    type Result = Result<RespValue, Error>;
27}
28
29/// Redis communication actor.
30pub struct RedisActor {
31    addr: String,
32    connector: BoxService<ConnectInfo<String>, Connection<String, TcpStream>, ConnectError>,
33    backoff: ExponentialBackoff,
34    cell: Option<actix::io::FramedWrite<RespValue, WriteHalf<TcpStream>, RespCodec>>,
35    queue: VecDeque<oneshot::Sender<Result<RespValue, Error>>>,
36}
37
38impl RedisActor {
39    /// Start new `Supervisor` with `RedisActor`.
40    pub fn start<S: Into<String>>(addr: S) -> Addr<RedisActor> {
41        let addr = addr.into();
42
43        let backoff = ExponentialBackoff {
44            max_elapsed_time: None,
45            ..Default::default()
46        };
47
48        Supervisor::start(|_| RedisActor {
49            addr,
50            connector: boxed::service(ConnectorService::default()),
51            cell: None,
52            backoff,
53            queue: VecDeque::new(),
54        })
55    }
56}
57
58impl Actor for RedisActor {
59    type Context = Context<Self>;
60
61    fn started(&mut self, ctx: &mut Context<Self>) {
62        let req = ConnectInfo::new(self.addr.to_owned());
63        self.connector
64            .call(req)
65            .into_actor(self)
66            .map(|res, act, ctx| match res {
67                Ok(conn) => {
68                    let stream = conn.into_parts().0;
69                    info!("Connected to redis server: {}", act.addr);
70
71                    let (r, w) = split(stream);
72
73                    // configure write side of the connection
74                    let framed = actix::io::FramedWrite::new(w, RespCodec, ctx);
75                    act.cell = Some(framed);
76
77                    // read side of the connection
78                    ctx.add_stream(FramedRead::new(r, RespCodec));
79
80                    act.backoff.reset();
81                }
82                Err(err) => {
83                    error!("Can not connect to redis server: {}", err);
84                    // re-connect with backoff time.
85                    // we stop current context, supervisor will restart it.
86                    if let Some(timeout) = act.backoff.next_backoff() {
87                        ctx.run_later(timeout, |_, ctx| ctx.stop());
88                    }
89                }
90            })
91            .wait(ctx);
92    }
93}
94
95impl Supervised for RedisActor {
96    fn restarting(&mut self, _: &mut Self::Context) {
97        self.cell.take();
98        for tx in self.queue.drain(..) {
99            let _ = tx.send(Err(Error::Disconnected));
100        }
101    }
102}
103
104impl actix::io::WriteHandler<io::Error> for RedisActor {
105    fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running {
106        warn!("Redis connection dropped: {} error: {}", self.addr, err);
107        Running::Stop
108    }
109}
110
111impl StreamHandler<Result<RespValue, RespError>> for RedisActor {
112    fn handle(&mut self, msg: Result<RespValue, RespError>, ctx: &mut Self::Context) {
113        match msg {
114            Err(e) => {
115                if let Some(tx) = self.queue.pop_front() {
116                    let _ = tx.send(Err(e.into()));
117                }
118                ctx.stop();
119            }
120            Ok(val) => {
121                if let Some(tx) = self.queue.pop_front() {
122                    let _ = tx.send(Ok(val));
123                }
124            }
125        }
126    }
127}
128
129impl Handler<Command> for RedisActor {
130    type Result = ResponseFuture<Result<RespValue, Error>>;
131
132    fn handle(&mut self, msg: Command, _: &mut Self::Context) -> Self::Result {
133        let (tx, rx) = oneshot::channel();
134        if let Some(ref mut cell) = self.cell {
135            self.queue.push_back(tx);
136            cell.write(msg.0);
137        } else {
138            let _ = tx.send(Err(Error::NotConnected));
139        }
140
141        Box::pin(async move { rx.await.map_err(|_| Error::Disconnected)? })
142    }
143}