Skip to main content

libaster/proxy/
cluster.rs

1pub mod back;
2pub mod fetcher;
3pub mod front;
4pub mod init;
5pub mod redirect;
6
7use crate::com::create_reuse_port_listener;
8use crate::com::meta::meta_init;
9use crate::com::set_read_write_timeout;
10use crate::com::AsError;
11use crate::com::ClusterConfig;
12use crate::protocol::redis::{new_read_only_cmd, RedisHandleCodec, RedisNodeCodec};
13use crate::protocol::redis::{Cmd, ReplicaLayout, SLOTS_COUNT};
14use crate::proxy::cluster::fetcher::SingleFlightTrigger;
15use crate::utils::crc::crc16;
16
17#[cfg(feature = "metrics")]
18use crate::metrics::{front_conn_incr, thread_incr};
19
20// use failure::Error;
21use futures::future::ok;
22use futures::future::*;
23use futures::task;
24use futures::unsync::mpsc::{channel, Sender};
25use futures::AsyncSink;
26use futures::{Sink, Stream};
27
28use tokio::net::TcpStream;
29use tokio::prelude::FutureExt;
30use tokio::runtime::current_thread;
31use tokio::timer::Interval;
32use tokio_codec::Decoder;
33
34use rand::prelude::*;
35
36use std::cell::{Cell, RefCell};
37use std::collections::{HashMap, HashSet, VecDeque};
38use std::net::SocketAddr;
39use std::rc::{Rc, Weak};
40use std::thread::{self, JoinHandle};
41use std::time::{Duration, Instant};
42
43#[derive(Clone, Debug, Eq, PartialEq)]
44pub enum Redirect {
45    Move { slot: usize, to: String },
46    Ask { slot: usize, to: String },
47}
48
49impl Redirect {
50    pub(crate) fn is_ask(&self) -> bool {
51        match self {
52            Redirect::Ask { .. } => true,
53            _ => false,
54        }
55    }
56}
57
58#[derive(Clone)]
59pub struct Redirection {
60    pub target: Redirect,
61    pub cmd: Cmd,
62}
63
64impl Redirection {
65    pub(crate) fn new(is_move: bool, slot: usize, to: String, cmd: Cmd) -> Redirection {
66        if is_move {
67            Redirection {
68                target: Redirect::Move { slot, to },
69                cmd,
70            }
71        } else {
72            Redirection {
73                target: Redirect::Ask { slot, to },
74                cmd,
75            }
76        }
77    }
78}
79
80pub use Redirect::{Ask, Move};
81
82pub struct RedirectFuture {}
83
84pub struct Cluster {
85    pub cc: ClusterConfig,
86
87    slots: RefCell<Slots>,
88    conns: RefCell<Conns>,
89    hash_tag: Vec<u8>,
90    read_from_slave: bool,
91
92    moved: Sender<Redirection>,
93    fetch: RefCell<Option<Rc<SingleFlightTrigger>>>,
94    latest: RefCell<Instant>,
95}
96
97impl Cluster {
98    pub(crate) fn run(cc: ClusterConfig, replica: ReplicaLayout) -> Result<(), AsError> {
99        let addr = cc
100            .listen_addr
101            .clone()
102            .parse::<SocketAddr>()
103            .expect("parse socket never fail");
104        let fut = ok::<ClusterConfig, AsError>(cc)
105            .and_then(|mut cc| {
106                let read_from_slave = cc.read_from_slave.clone().unwrap_or(false);
107                let hash_tag = cc
108                    .hash_tag
109                    .clone()
110                    .map(|x| x.as_bytes().to_vec())
111                    .unwrap_or(vec![]);
112                let mut slots = Slots::default();
113                let (masters, replicas) = replica;
114                slots.try_update_all(masters, replicas);
115                let (moved, moved_rx) = channel(10240);
116
117                let all_masters = slots.get_all_masters();
118                let mut all_lived = HashSet::new();
119                cc.servers = all_masters.iter().map(|x| x.clone()).collect();
120                let all_servers = cc.servers.clone();
121                let mut conns = Conns::default();
122                for master in all_servers.into_iter() {
123                    let conn = ConnBuilder::new()
124                        .moved(moved.clone())
125                        .cluster(cc.name.clone())
126                        .node(master.clone())
127                        .read_timeout(cc.read_timeout.clone())
128                        .write_timeout(cc.write_timeout.clone())
129                        .connect()?;
130                    conns.insert(&master, conn);
131                    all_lived.insert(master.clone());
132                }
133
134                if read_from_slave {
135                    let all_slaves = slots.get_all_replicas();
136                    for slave in all_slaves.into_iter() {
137                        let conn = ConnBuilder::new()
138                            .moved(moved.clone())
139                            .cluster(cc.name.clone())
140                            .node(slave.clone())
141                            .read_timeout(cc.read_timeout.clone())
142                            .write_timeout(cc.write_timeout.clone())
143                            .replica(true)
144                            .connect()?;
145                        conns.insert(&slave, conn);
146                        all_lived.insert(slave.clone());
147                    }
148                }
149                let cluster = Cluster {
150                    cc,
151                    hash_tag,
152                    read_from_slave,
153                    moved,
154                    slots: RefCell::new(slots),
155                    conns: RefCell::new(conns),
156                    fetch: RefCell::new(None),
157                    latest: RefCell::new(Instant::now()),
158                };
159                Ok((cluster, moved_rx))
160            })
161            .and_then(|(cluster, moved_rx)| {
162                let rc_cluster = Rc::new(cluster);
163                let redirect_handler = redirect::RedirectHandler::new(rc_cluster.clone(), moved_rx);
164                current_thread::spawn(redirect_handler);
165                Ok(rc_cluster)
166            })
167            .and_then(|rc_cluster| {
168                let interval_millis = rc_cluster.cc.fetch_interval.unwrap_or(1000);
169                let interval = Interval::new(
170                    Instant::now() + Duration::from_millis(interval_millis),
171                    Duration::from_millis(interval_millis),
172                );
173                let interval_stream = interval
174                    .map(|_| fetcher::TriggerBy::Interval)
175                    .map_err(|_| AsError::None);
176
177                let (tx, rx) = channel(1024);
178                let trigger = Rc::new(SingleFlightTrigger::new(1, tx));
179                let _ = rc_cluster.fetch.borrow_mut().replace(trigger);
180                let trigger = rx.map_err(|_| AsError::None);
181                let fetch =
182                    fetcher::Fetch::new(rc_cluster.clone(), trigger.select(interval_stream));
183                current_thread::spawn(fetch);
184                Ok(rc_cluster)
185            })
186            .and_then(move |cluster| {
187                let listen = create_reuse_port_listener(&addr).expect("bind never fail");
188                let service = listen
189                    .incoming()
190                    .for_each(move |sock| {
191                        let cluster = cluster.clone();
192                        if let Err(err) = sock.set_nodelay(true) {
193                            warn!(
194                                "cluster {} fail to set nodelay but skip, due to {:?}",
195                                cluster.cc.name, err
196                            );
197                        }
198                        let client_str = match sock.peer_addr() {
199                            Ok(client) => format!("{}", client),
200                            Err(err) => {
201                                error!(
202                                    "cluster {} fail to get client name due to {:?}",
203                                    cluster.cc.name, err
204                                );
205                                "unknown".to_string()
206                            }
207                        };
208
209                        #[cfg(feature = "metrics")]
210                        front_conn_incr(&cluster.cc.name);
211                        let codec = RedisHandleCodec {};
212                        let (output, input) = codec.framed(sock).split();
213                        let fut = front::Front::new(client_str, cluster, input, output);
214                        current_thread::spawn(fut);
215                        Ok(())
216                    })
217                    .map_err(|err| {
218                        error!("fail to accept incomming sock due {}", err);
219                    });
220                current_thread::spawn(service);
221                Ok(())
222            });
223        current_thread::spawn(
224            fut.map_err(|err| error!("fail to create cluster server due to {}", err)),
225        );
226        Ok(())
227    }
228}
229
230impl Cluster {
231    fn get_addr(&self, slot: usize, is_read: bool) -> String {
232        // trace!("get slot={} and is_read={}", slot, is_read);
233        if self.read_from_slave && is_read {
234            if let Some(replica) = self.slots.borrow().get_replica(slot) {
235                if replica != "" {
236                    return replica.to_string();
237                }
238            }
239        }
240        self.slots
241            .borrow()
242            .get_master(slot)
243            .map(|x| x.to_string())
244            .expect("master addr never be empty")
245    }
246
247    fn get_random_master(&self, exclusive: &str) -> String {
248        self.slots
249            .borrow_mut()
250            .get_random_master(exclusive)
251            .map(|x| x.to_string())
252            .expect("master addr never be empty")
253    }
254
255    pub fn trigger_fetch(&self, trigger_by: fetcher::TriggerBy) {
256        if let Some(trigger) = self.fetch.borrow().clone() {
257            if trigger.try_trigger() {
258                info!("succeed trigger fetch process by {:?}", trigger_by);
259                return;
260            }
261        } else {
262            warn!("fail to trigger fetch process due to trigger event uninitialed");
263        }
264    }
265
266    pub fn dispatch_to(&self, addr: &str, cmd: Cmd) -> Result<AsyncSink<Cmd>, AsError> {
267        if !cmd.borrow().can_cycle() {
268            cmd.set_error(AsError::ClusterFailDispatch);
269            return Ok(AsyncSink::Ready);
270        }
271        let mut conns = self.conns.borrow_mut();
272        loop {
273            if let Some(sender) = conns.get_mut(addr).map(|x| x.sender()) {
274                match sender.start_send(cmd) {
275                    Ok(ret) => {
276                        return Ok(ret);
277                    }
278                    Err(_se) => {
279                        warn!("fail to send to backend {} ", addr);
280                        return Err(AsError::BackendClosedError(addr.to_string()));
281                    }
282                }
283            } else {
284                self.connect(&addr, &mut conns)?;
285            }
286        }
287    }
288
289    fn inner_dispatch_all(&self, cmds: &mut VecDeque<Cmd>) -> Result<usize, AsError> {
290        let mut count = 0usize;
291        loop {
292            if cmds.is_empty() {
293                return Ok(count);
294            }
295            let cmd = cmds.pop_front().expect("cmds pop front never be empty");
296            if !cmd.borrow().can_cycle() {
297                cmd.set_error(AsError::RequestReachMaxCycle);
298                continue;
299            }
300            let slot = {
301                let hash_tag = self.hash_tag.as_ref();
302                let signed = cmd.borrow().key_hash(hash_tag, crc16) as usize;
303                signed % SLOTS_COUNT
304            };
305
306            let addr = if !cmd.is_retry() {
307                self.get_addr(slot, cmd.borrow().is_read())
308            } else {
309                let cmd_borrow = cmd.borrow();
310                let exclusive = cmd_borrow
311                    .reply
312                    .as_ref()
313                    .map(|x| String::from_utf8_lossy(&x.data[1..x.data.len() - 2]).to_string())
314                    .unwrap_or("".to_string());
315                self.get_random_master(&exclusive)
316            };
317            let mut conns = self.conns.borrow_mut();
318
319            if let Some(sender) = conns.get_mut(&addr).map(|x| x.sender()) {
320                match sender.start_send(cmd) {
321                    Ok(AsyncSink::Ready) => {
322                        // trace!("success start command into backend");
323                        count += 1;
324                    }
325                    Ok(AsyncSink::NotReady(cmd)) => {
326                        cmd.borrow_mut().add_cycle();
327                        cmds.push_front(cmd);
328                        return Ok(count);
329                    }
330                    Err(se) => {
331                        let cmd = se.into_inner();
332                        cmd.borrow_mut().add_cycle();
333                        cmds.push_front(cmd);
334                        self.connect(&addr, &mut conns)?;
335                        return Ok(count);
336                    }
337                }
338            } else {
339                cmds.push_front(cmd);
340                self.connect(&addr, &mut conns)?;
341                return Ok(count);
342            }
343        }
344    }
345
346    pub fn dispatch_all(&self, cmds: &mut VecDeque<Cmd>) -> Result<usize, AsError> {
347        let count = self.inner_dispatch_all(cmds)?;
348        if count != 0 {
349            self.latest.replace(Instant::now());
350        }
351        Ok(count)
352    }
353
354    pub(crate) fn since_latest(&self) -> Duration {
355        self.latest.borrow().elapsed()
356    }
357
358    pub(crate) fn try_update_all_slots(&self, layout: ReplicaLayout) -> bool {
359        let (masters, replicas) = layout;
360        let updated = self.slots.borrow_mut().try_update_all(masters, replicas);
361        if updated {
362            info!("skip to update cluster cc due to unnecessary");
363        }
364        updated
365    }
366
367    pub(crate) fn update_slot(&self, slot: usize, addr: String) {
368        debug_assert!(slot <= SLOTS_COUNT);
369        self.slots.borrow_mut().update_slot(slot, addr);
370    }
371
372    pub(crate) fn connect(&self, addr: &str, conns: &mut Conns) -> Result<(), AsError> {
373        let is_replica = !self.slots.borrow().is_master(addr);
374
375        let sender = ConnBuilder::new()
376            .moved(self.moved.clone())
377            .cluster(self.cc.name.clone())
378            .node(addr.to_string())
379            .read_timeout(self.cc.read_timeout.clone())
380            .write_timeout(self.cc.write_timeout.clone())
381            .fetch(
382                self.fetch
383                    .borrow()
384                    .as_ref()
385                    .map(|x| Rc::downgrade(x))
386                    .unwrap_or(Weak::new()),
387            )
388            .replica(is_replica)
389            .connect()?;
390        conns.insert(&addr, sender);
391        Ok(())
392    }
393}
394
395pub(crate) struct Conns {
396    inner: HashMap<String, Conn<Sender<Cmd>>>,
397}
398
399impl Conns {
400    fn get_mut(&mut self, s: &str) -> Option<&mut Conn<Sender<Cmd>>> {
401        self.inner.get_mut(s)
402    }
403
404    fn insert(&mut self, s: &str, sender: Sender<Cmd>) {
405        let conn = Conn {
406            addr: s.to_string(),
407            sender,
408        };
409        self.inner.insert(s.to_string(), conn);
410    }
411}
412
413impl Default for Conns {
414    fn default() -> Conns {
415        Conns {
416            inner: HashMap::new(),
417        }
418    }
419}
420
421#[allow(unused)]
422struct Conn<S> {
423    addr: String,
424    sender: S,
425}
426
427impl<S> Conn<S> {
428    fn sender(&mut self) -> &mut S {
429        &mut self.sender
430    }
431}
432
433struct Slots {
434    masters: Vec<String>,
435    replicas: Vec<Replica>,
436    rng: ThreadRng,
437
438    all_masters: HashSet<String>,
439    all_replicas: HashSet<String>,
440}
441
442impl Slots {
443    fn try_update_all(&mut self, masters: Vec<String>, replicas: Vec<Vec<String>>) -> bool {
444        let mut changed = false;
445        for i in 0..SLOTS_COUNT {
446            if self.masters[i] != masters[i] {
447                changed = true;
448                self.masters[i] = masters[i].clone();
449                self.all_masters.insert(masters[i].clone());
450            }
451        }
452
453        for i in 0..SLOTS_COUNT {
454            let len_not_eqal = self.replicas[i].addrs.len() != replicas[i].len();
455            if len_not_eqal || self.replicas[i].addrs.as_slice() != replicas[i].as_slice() {
456                self.replicas[i] = Replica {
457                    addrs: replicas[i].clone(),
458                    current: Cell::new(0),
459                };
460                self.all_replicas.extend(replicas[i].clone().into_iter());
461                changed = true;
462            }
463        }
464
465        changed
466    }
467
468    fn update_slot(&mut self, slot: usize, addr: String) {
469        self.masters[slot] = addr.clone();
470        self.all_masters.insert(addr);
471    }
472
473    fn get_random_master(&mut self, exclusive: &str) -> Option<&str> {
474        loop {
475            let cursor: usize = self.rng.gen_range(0, self.masters.len());
476            if self
477                .masters
478                .get(cursor)
479                .map(|x| x == exclusive)
480                .unwrap_or(false)
481            {
482                continue;
483            }
484            return self.masters.get(cursor).map(|x| x.as_str());
485        }
486    }
487
488    fn get_master(&self, slot: usize) -> Option<&str> {
489        self.masters.get(slot).map(|x| x.as_str())
490    }
491
492    fn get_replica(&self, slot: usize) -> Option<&str> {
493        self.replicas.get(slot).map(|x| x.get_replica())
494    }
495
496    fn get_all_masters(&self) -> HashSet<String> {
497        self.all_masters.clone()
498    }
499
500    fn get_all_replicas(&self) -> HashSet<String> {
501        self.all_replicas.clone()
502    }
503
504    fn is_master(&self, addr: &str) -> bool {
505        self.all_masters.contains(addr)
506    }
507}
508
509impl Default for Slots {
510    fn default() -> Slots {
511        let mut masters = Vec::with_capacity(SLOTS_COUNT);
512        masters.resize(SLOTS_COUNT, "".to_string());
513        let mut replicas = Vec::with_capacity(SLOTS_COUNT);
514        replicas.resize(SLOTS_COUNT, Replica::default());
515        Slots {
516            rng: thread_rng(),
517            masters,
518            replicas,
519            all_masters: HashSet::new(),
520            all_replicas: HashSet::new(),
521        }
522    }
523}
524
525#[derive(Clone)]
526struct Replica {
527    addrs: Vec<String>,
528    current: Cell<usize>,
529}
530
531impl Replica {
532    fn get_replica(&self) -> &str {
533        if self.addrs.is_empty() {
534            return "";
535        }
536
537        let current = self.current.get();
538        let len = self.addrs.len();
539        let now = self.current.get();
540        self.current.set((now + 1) % len);
541        &self.addrs[current]
542    }
543}
544
545impl Default for Replica {
546    fn default() -> Replica {
547        Replica {
548            addrs: Vec::with_capacity(0),
549            current: Cell::new(0usize),
550        }
551    }
552}
553
554pub(crate) struct ConnBuilder {
555    cluster: Option<String>,
556    node: Option<String>,
557    moved: Option<Sender<Redirection>>,
558    rt: Option<u64>,
559    wt: Option<u64>,
560    replica: bool,
561    fetch: Weak<SingleFlightTrigger>,
562}
563
564impl ConnBuilder {
565    pub(crate) fn new() -> ConnBuilder {
566        ConnBuilder {
567            cluster: None,
568            node: None,
569            moved: None,
570            rt: Some(1000),
571            wt: Some(1000),
572            replica: false,
573            fetch: Weak::new(),
574        }
575    }
576
577    pub(crate) fn fetch(self, fetch: Weak<SingleFlightTrigger>) -> Self {
578        let mut cb = self;
579        cb.fetch = fetch;
580        cb
581    }
582
583    pub(crate) fn cluster(self, cluster: String) -> Self {
584        let mut cb = self;
585        cb.cluster = Some(cluster);
586        cb
587    }
588
589    pub(crate) fn moved(self, moved: Sender<Redirection>) -> Self {
590        let mut cb = self;
591        cb.moved = Some(moved);
592        cb
593    }
594
595    pub(crate) fn read_timeout(self, rt: Option<u64>) -> Self {
596        let mut cb = self;
597        cb.rt = rt;
598        cb
599    }
600
601    pub(crate) fn write_timeout(self, wt: Option<u64>) -> Self {
602        let mut cb = self;
603        cb.wt = wt;
604        cb
605    }
606
607    pub(crate) fn node(self, node: String) -> Self {
608        let mut cb = self;
609        cb.node = Some(node);
610        cb
611    }
612
613    pub(crate) fn replica(self, is_replica: bool) -> Self {
614        let mut cb = self;
615        cb.replica = is_replica;
616        cb
617    }
618
619    pub(crate) fn check_valid(&self) -> bool {
620        true && self.node.is_some() && self.cluster.is_some() && self.moved.is_some()
621    }
622
623    pub(crate) fn connect(self) -> Result<Sender<Cmd>, AsError> {
624        if !self.check_valid() {
625            error!(
626                "fail to open connection to backend {} due param is valid",
627                self.node.as_ref().map(|x| x.as_ref()).unwrap_or("unknown")
628            );
629            return Err(AsError::BadConfig("backend connection config".to_string()));
630        }
631
632        let node_addr = self.node.expect("addr must be checked first");
633        let node_addr_clone = node_addr.clone();
634        let cluster = self
635            .cluster
636            .expect("cluster name must be checked first")
637            .to_string();
638        let rt = self.rt.clone();
639        let wt = self.wt.clone();
640        let moved = self.moved.expect("must be checked first");
641        let fetch = self.fetch.clone();
642
643        let (mut tx, rx) = channel(1024 * 8);
644        let amt = lazy(|| -> Result<(), ()> { Ok(()) })
645            .and_then(move |_| {
646                let node_clone = node_addr.clone();
647                node_addr
648                    .as_str()
649                    .parse()
650                    .map_err(|err| error!("fail to parse addr {} due to {:?}", node_clone, err))
651            })
652            .and_then(|addr| {
653                let report_addr = format!("{:?}", &addr);
654                TcpStream::connect(&addr)
655                    .timeout(Duration::from_millis(100))
656                    .map_err(move |err| error!("fail to connect to {} {:?}", &report_addr, err))
657            })
658            .then(move |sock| {
659                if let Ok(sock) = sock {
660                    let sock =
661                        set_read_write_timeout(sock, rt, wt).expect("set timeout must be ok");
662                    if let Err(_) = sock.set_nodelay(true) {
663                        warn!("fail to set set nodelay when connect to backend but ignore");
664                    }
665
666                    let codec = RedisNodeCodec {};
667                    let (sink, stream) = codec.framed(sock).split();
668                    let backend =
669                        back::Back::new(cluster, node_addr_clone, rx, sink, stream, moved);
670                    current_thread::spawn(backend);
671                } else {
672                    error!("fail to conenct to backend {}", node_addr_clone);
673                    let blackhole = back::Blackhole::new(node_addr_clone, rx);
674                    current_thread::spawn(blackhole);
675                    if let Some(trigger) = fetch.upgrade() {
676                        trigger.try_trigger();
677                    }
678                }
679                Ok(())
680            });
681        current_thread::spawn(amt);
682        if self.replica {
683            let mut cmd = new_read_only_cmd();
684            cmd.reregister(task::current());
685            Self::silence_send_req(cmd, &mut tx);
686        }
687        Ok(tx)
688    }
689
690    fn silence_send_req(cmd: Cmd, tx: &mut Sender<Cmd>) {
691        match tx.start_send(cmd) {
692            Ok(AsyncSink::Ready) => {
693                debug!("success dispatch to read only replica node");
694            }
695            Ok(AsyncSink::NotReady(_)) => {
696                warn!("fail to initial backend connection of replica due to send fail");
697            }
698            Err(err) => {
699                warn!(
700                    "fail to initial backend connection of replica due to {}",
701                    err
702                );
703            }
704        }
705    }
706}
707
708pub fn run(cc: ClusterConfig, ip: Option<String>) -> Vec<JoinHandle<()>> {
709    let worker = cc.thread.unwrap_or(4);
710    (0..worker)
711        .into_iter()
712        .map(|_index| {
713            let builder = thread::Builder::new();
714            let cc = cc.clone();
715            let ip = ip.clone();
716            builder
717                .name(cc.name.clone())
718                .spawn(move || {
719                    meta_init(cc.clone(), ip);
720
721                    #[cfg(feature = "metrics")]
722                    thread_incr();
723
724                    current_thread::block_on_all(
725                        init::Initializer::new(cc)
726                            .map_err(|err| error!("fail to init cluster due to {}", err)),
727                    )
728                    .unwrap();
729                })
730                .expect("fail to spawn worker thread")
731        })
732        .collect()
733}