1use std::{collections::VecDeque, io};
2
3use actix::prelude::*;
4use actix_rt::net::TcpStream;
5use actix_service::boxed::{self, BoxService};
6use actix_tls::connect::{ConnectError, ConnectInfo, Connection, ConnectorService};
7use backoff::{backoff::Backoff, ExponentialBackoff};
8use log::{error, info, warn};
9use redis_async::{
10 error::Error as RespError,
11 resp::{RespCodec, RespValue},
12};
13use tokio::{
14 io::{split, WriteHalf},
15 sync::oneshot,
16};
17use tokio_util::codec::FramedRead;
18
19use crate::Error;
20
21#[derive(Debug)]
23pub struct Command(pub RespValue);
24
25impl Message for Command {
26 type Result = Result<RespValue, Error>;
27}
28
29pub struct RedisActor {
31 addr: String,
32 connector: BoxService<ConnectInfo<String>, Connection<String, TcpStream>, ConnectError>,
33 backoff: ExponentialBackoff,
34 cell: Option<actix::io::FramedWrite<RespValue, WriteHalf<TcpStream>, RespCodec>>,
35 queue: VecDeque<oneshot::Sender<Result<RespValue, Error>>>,
36}
37
38impl RedisActor {
39 pub fn start<S: Into<String>>(addr: S) -> Addr<RedisActor> {
41 let addr = addr.into();
42
43 let backoff = ExponentialBackoff {
44 max_elapsed_time: None,
45 ..Default::default()
46 };
47
48 Supervisor::start(|_| RedisActor {
49 addr,
50 connector: boxed::service(ConnectorService::default()),
51 cell: None,
52 backoff,
53 queue: VecDeque::new(),
54 })
55 }
56}
57
58impl Actor for RedisActor {
59 type Context = Context<Self>;
60
61 fn started(&mut self, ctx: &mut Context<Self>) {
62 let req = ConnectInfo::new(self.addr.to_owned());
63 self.connector
64 .call(req)
65 .into_actor(self)
66 .map(|res, act, ctx| match res {
67 Ok(conn) => {
68 let stream = conn.into_parts().0;
69 info!("Connected to redis server: {}", act.addr);
70
71 let (r, w) = split(stream);
72
73 let framed = actix::io::FramedWrite::new(w, RespCodec, ctx);
75 act.cell = Some(framed);
76
77 ctx.add_stream(FramedRead::new(r, RespCodec));
79
80 act.backoff.reset();
81 }
82 Err(err) => {
83 error!("Can not connect to redis server: {}", err);
84 if let Some(timeout) = act.backoff.next_backoff() {
87 ctx.run_later(timeout, |_, ctx| ctx.stop());
88 }
89 }
90 })
91 .wait(ctx);
92 }
93}
94
95impl Supervised for RedisActor {
96 fn restarting(&mut self, _: &mut Self::Context) {
97 self.cell.take();
98 for tx in self.queue.drain(..) {
99 let _ = tx.send(Err(Error::Disconnected));
100 }
101 }
102}
103
104impl actix::io::WriteHandler<io::Error> for RedisActor {
105 fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running {
106 warn!("Redis connection dropped: {} error: {}", self.addr, err);
107 Running::Stop
108 }
109}
110
111impl StreamHandler<Result<RespValue, RespError>> for RedisActor {
112 fn handle(&mut self, msg: Result<RespValue, RespError>, ctx: &mut Self::Context) {
113 match msg {
114 Err(e) => {
115 if let Some(tx) = self.queue.pop_front() {
116 let _ = tx.send(Err(e.into()));
117 }
118 ctx.stop();
119 }
120 Ok(val) => {
121 if let Some(tx) = self.queue.pop_front() {
122 let _ = tx.send(Ok(val));
123 }
124 }
125 }
126 }
127}
128
129impl Handler<Command> for RedisActor {
130 type Result = ResponseFuture<Result<RespValue, Error>>;
131
132 fn handle(&mut self, msg: Command, _: &mut Self::Context) -> Self::Result {
133 let (tx, rx) = oneshot::channel();
134 if let Some(ref mut cell) = self.cell {
135 self.queue.push_back(tx);
136 cell.write(msg.0);
137 } else {
138 let _ = tx.send(Err(Error::NotConnected));
139 }
140
141 Box::pin(async move { rx.await.map_err(|_| Error::Disconnected)? })
142 }
143}