1pub mod back;
2pub mod fetcher;
3pub mod front;
4pub mod init;
5pub mod redirect;
6
7use crate::com::create_reuse_port_listener;
8use crate::com::meta::meta_init;
9use crate::com::set_read_write_timeout;
10use crate::com::AsError;
11use crate::com::ClusterConfig;
12use crate::protocol::redis::{new_read_only_cmd, RedisHandleCodec, RedisNodeCodec};
13use crate::protocol::redis::{Cmd, ReplicaLayout, SLOTS_COUNT};
14use crate::proxy::cluster::fetcher::SingleFlightTrigger;
15use crate::utils::crc::crc16;
16
17#[cfg(feature = "metrics")]
18use crate::metrics::{front_conn_incr, thread_incr};
19
20use futures::future::ok;
22use futures::future::*;
23use futures::task;
24use futures::unsync::mpsc::{channel, Sender};
25use futures::AsyncSink;
26use futures::{Sink, Stream};
27
28use tokio::net::TcpStream;
29use tokio::prelude::FutureExt;
30use tokio::runtime::current_thread;
31use tokio::timer::Interval;
32use tokio_codec::Decoder;
33
34use rand::prelude::*;
35
36use std::cell::{Cell, RefCell};
37use std::collections::{HashMap, HashSet, VecDeque};
38use std::net::SocketAddr;
39use std::rc::{Rc, Weak};
40use std::thread::{self, JoinHandle};
41use std::time::{Duration, Instant};
42
43#[derive(Clone, Debug, Eq, PartialEq)]
44pub enum Redirect {
45 Move { slot: usize, to: String },
46 Ask { slot: usize, to: String },
47}
48
49impl Redirect {
50 pub(crate) fn is_ask(&self) -> bool {
51 match self {
52 Redirect::Ask { .. } => true,
53 _ => false,
54 }
55 }
56}
57
58#[derive(Clone)]
59pub struct Redirection {
60 pub target: Redirect,
61 pub cmd: Cmd,
62}
63
64impl Redirection {
65 pub(crate) fn new(is_move: bool, slot: usize, to: String, cmd: Cmd) -> Redirection {
66 if is_move {
67 Redirection {
68 target: Redirect::Move { slot, to },
69 cmd,
70 }
71 } else {
72 Redirection {
73 target: Redirect::Ask { slot, to },
74 cmd,
75 }
76 }
77 }
78}
79
80pub use Redirect::{Ask, Move};
81
82pub struct RedirectFuture {}
83
84pub struct Cluster {
85 pub cc: ClusterConfig,
86
87 slots: RefCell<Slots>,
88 conns: RefCell<Conns>,
89 hash_tag: Vec<u8>,
90 read_from_slave: bool,
91
92 moved: Sender<Redirection>,
93 fetch: RefCell<Option<Rc<SingleFlightTrigger>>>,
94 latest: RefCell<Instant>,
95}
96
97impl Cluster {
98 pub(crate) fn run(cc: ClusterConfig, replica: ReplicaLayout) -> Result<(), AsError> {
99 let addr = cc
100 .listen_addr
101 .clone()
102 .parse::<SocketAddr>()
103 .expect("parse socket never fail");
104 let fut = ok::<ClusterConfig, AsError>(cc)
105 .and_then(|mut cc| {
106 let read_from_slave = cc.read_from_slave.clone().unwrap_or(false);
107 let hash_tag = cc
108 .hash_tag
109 .clone()
110 .map(|x| x.as_bytes().to_vec())
111 .unwrap_or(vec![]);
112 let mut slots = Slots::default();
113 let (masters, replicas) = replica;
114 slots.try_update_all(masters, replicas);
115 let (moved, moved_rx) = channel(10240);
116
117 let all_masters = slots.get_all_masters();
118 let mut all_lived = HashSet::new();
119 cc.servers = all_masters.iter().map(|x| x.clone()).collect();
120 let all_servers = cc.servers.clone();
121 let mut conns = Conns::default();
122 for master in all_servers.into_iter() {
123 let conn = ConnBuilder::new()
124 .moved(moved.clone())
125 .cluster(cc.name.clone())
126 .node(master.clone())
127 .read_timeout(cc.read_timeout.clone())
128 .write_timeout(cc.write_timeout.clone())
129 .connect()?;
130 conns.insert(&master, conn);
131 all_lived.insert(master.clone());
132 }
133
134 if read_from_slave {
135 let all_slaves = slots.get_all_replicas();
136 for slave in all_slaves.into_iter() {
137 let conn = ConnBuilder::new()
138 .moved(moved.clone())
139 .cluster(cc.name.clone())
140 .node(slave.clone())
141 .read_timeout(cc.read_timeout.clone())
142 .write_timeout(cc.write_timeout.clone())
143 .replica(true)
144 .connect()?;
145 conns.insert(&slave, conn);
146 all_lived.insert(slave.clone());
147 }
148 }
149 let cluster = Cluster {
150 cc,
151 hash_tag,
152 read_from_slave,
153 moved,
154 slots: RefCell::new(slots),
155 conns: RefCell::new(conns),
156 fetch: RefCell::new(None),
157 latest: RefCell::new(Instant::now()),
158 };
159 Ok((cluster, moved_rx))
160 })
161 .and_then(|(cluster, moved_rx)| {
162 let rc_cluster = Rc::new(cluster);
163 let redirect_handler = redirect::RedirectHandler::new(rc_cluster.clone(), moved_rx);
164 current_thread::spawn(redirect_handler);
165 Ok(rc_cluster)
166 })
167 .and_then(|rc_cluster| {
168 let interval_millis = rc_cluster.cc.fetch_interval.unwrap_or(1000);
169 let interval = Interval::new(
170 Instant::now() + Duration::from_millis(interval_millis),
171 Duration::from_millis(interval_millis),
172 );
173 let interval_stream = interval
174 .map(|_| fetcher::TriggerBy::Interval)
175 .map_err(|_| AsError::None);
176
177 let (tx, rx) = channel(1024);
178 let trigger = Rc::new(SingleFlightTrigger::new(1, tx));
179 let _ = rc_cluster.fetch.borrow_mut().replace(trigger);
180 let trigger = rx.map_err(|_| AsError::None);
181 let fetch =
182 fetcher::Fetch::new(rc_cluster.clone(), trigger.select(interval_stream));
183 current_thread::spawn(fetch);
184 Ok(rc_cluster)
185 })
186 .and_then(move |cluster| {
187 let listen = create_reuse_port_listener(&addr).expect("bind never fail");
188 let service = listen
189 .incoming()
190 .for_each(move |sock| {
191 let cluster = cluster.clone();
192 if let Err(err) = sock.set_nodelay(true) {
193 warn!(
194 "cluster {} fail to set nodelay but skip, due to {:?}",
195 cluster.cc.name, err
196 );
197 }
198 let client_str = match sock.peer_addr() {
199 Ok(client) => format!("{}", client),
200 Err(err) => {
201 error!(
202 "cluster {} fail to get client name due to {:?}",
203 cluster.cc.name, err
204 );
205 "unknown".to_string()
206 }
207 };
208
209 #[cfg(feature = "metrics")]
210 front_conn_incr(&cluster.cc.name);
211 let codec = RedisHandleCodec {};
212 let (output, input) = codec.framed(sock).split();
213 let fut = front::Front::new(client_str, cluster, input, output);
214 current_thread::spawn(fut);
215 Ok(())
216 })
217 .map_err(|err| {
218 error!("fail to accept incomming sock due {}", err);
219 });
220 current_thread::spawn(service);
221 Ok(())
222 });
223 current_thread::spawn(
224 fut.map_err(|err| error!("fail to create cluster server due to {}", err)),
225 );
226 Ok(())
227 }
228}
229
230impl Cluster {
231 fn get_addr(&self, slot: usize, is_read: bool) -> String {
232 if self.read_from_slave && is_read {
234 if let Some(replica) = self.slots.borrow().get_replica(slot) {
235 if replica != "" {
236 return replica.to_string();
237 }
238 }
239 }
240 self.slots
241 .borrow()
242 .get_master(slot)
243 .map(|x| x.to_string())
244 .expect("master addr never be empty")
245 }
246
247 fn get_random_master(&self, exclusive: &str) -> String {
248 self.slots
249 .borrow_mut()
250 .get_random_master(exclusive)
251 .map(|x| x.to_string())
252 .expect("master addr never be empty")
253 }
254
255 pub fn trigger_fetch(&self, trigger_by: fetcher::TriggerBy) {
256 if let Some(trigger) = self.fetch.borrow().clone() {
257 if trigger.try_trigger() {
258 info!("succeed trigger fetch process by {:?}", trigger_by);
259 return;
260 }
261 } else {
262 warn!("fail to trigger fetch process due to trigger event uninitialed");
263 }
264 }
265
266 pub fn dispatch_to(&self, addr: &str, cmd: Cmd) -> Result<AsyncSink<Cmd>, AsError> {
267 if !cmd.borrow().can_cycle() {
268 cmd.set_error(AsError::ClusterFailDispatch);
269 return Ok(AsyncSink::Ready);
270 }
271 let mut conns = self.conns.borrow_mut();
272 loop {
273 if let Some(sender) = conns.get_mut(addr).map(|x| x.sender()) {
274 match sender.start_send(cmd) {
275 Ok(ret) => {
276 return Ok(ret);
277 }
278 Err(_se) => {
279 warn!("fail to send to backend {} ", addr);
280 return Err(AsError::BackendClosedError(addr.to_string()));
281 }
282 }
283 } else {
284 self.connect(&addr, &mut conns)?;
285 }
286 }
287 }
288
289 fn inner_dispatch_all(&self, cmds: &mut VecDeque<Cmd>) -> Result<usize, AsError> {
290 let mut count = 0usize;
291 loop {
292 if cmds.is_empty() {
293 return Ok(count);
294 }
295 let cmd = cmds.pop_front().expect("cmds pop front never be empty");
296 if !cmd.borrow().can_cycle() {
297 cmd.set_error(AsError::RequestReachMaxCycle);
298 continue;
299 }
300 let slot = {
301 let hash_tag = self.hash_tag.as_ref();
302 let signed = cmd.borrow().key_hash(hash_tag, crc16) as usize;
303 signed % SLOTS_COUNT
304 };
305
306 let addr = if !cmd.is_retry() {
307 self.get_addr(slot, cmd.borrow().is_read())
308 } else {
309 let cmd_borrow = cmd.borrow();
310 let exclusive = cmd_borrow
311 .reply
312 .as_ref()
313 .map(|x| String::from_utf8_lossy(&x.data[1..x.data.len() - 2]).to_string())
314 .unwrap_or("".to_string());
315 self.get_random_master(&exclusive)
316 };
317 let mut conns = self.conns.borrow_mut();
318
319 if let Some(sender) = conns.get_mut(&addr).map(|x| x.sender()) {
320 match sender.start_send(cmd) {
321 Ok(AsyncSink::Ready) => {
322 count += 1;
324 }
325 Ok(AsyncSink::NotReady(cmd)) => {
326 cmd.borrow_mut().add_cycle();
327 cmds.push_front(cmd);
328 return Ok(count);
329 }
330 Err(se) => {
331 let cmd = se.into_inner();
332 cmd.borrow_mut().add_cycle();
333 cmds.push_front(cmd);
334 self.connect(&addr, &mut conns)?;
335 return Ok(count);
336 }
337 }
338 } else {
339 cmds.push_front(cmd);
340 self.connect(&addr, &mut conns)?;
341 return Ok(count);
342 }
343 }
344 }
345
346 pub fn dispatch_all(&self, cmds: &mut VecDeque<Cmd>) -> Result<usize, AsError> {
347 let count = self.inner_dispatch_all(cmds)?;
348 if count != 0 {
349 self.latest.replace(Instant::now());
350 }
351 Ok(count)
352 }
353
354 pub(crate) fn since_latest(&self) -> Duration {
355 self.latest.borrow().elapsed()
356 }
357
358 pub(crate) fn try_update_all_slots(&self, layout: ReplicaLayout) -> bool {
359 let (masters, replicas) = layout;
360 let updated = self.slots.borrow_mut().try_update_all(masters, replicas);
361 if updated {
362 info!("skip to update cluster cc due to unnecessary");
363 }
364 updated
365 }
366
367 pub(crate) fn update_slot(&self, slot: usize, addr: String) {
368 debug_assert!(slot <= SLOTS_COUNT);
369 self.slots.borrow_mut().update_slot(slot, addr);
370 }
371
372 pub(crate) fn connect(&self, addr: &str, conns: &mut Conns) -> Result<(), AsError> {
373 let is_replica = !self.slots.borrow().is_master(addr);
374
375 let sender = ConnBuilder::new()
376 .moved(self.moved.clone())
377 .cluster(self.cc.name.clone())
378 .node(addr.to_string())
379 .read_timeout(self.cc.read_timeout.clone())
380 .write_timeout(self.cc.write_timeout.clone())
381 .fetch(
382 self.fetch
383 .borrow()
384 .as_ref()
385 .map(|x| Rc::downgrade(x))
386 .unwrap_or(Weak::new()),
387 )
388 .replica(is_replica)
389 .connect()?;
390 conns.insert(&addr, sender);
391 Ok(())
392 }
393}
394
395pub(crate) struct Conns {
396 inner: HashMap<String, Conn<Sender<Cmd>>>,
397}
398
399impl Conns {
400 fn get_mut(&mut self, s: &str) -> Option<&mut Conn<Sender<Cmd>>> {
401 self.inner.get_mut(s)
402 }
403
404 fn insert(&mut self, s: &str, sender: Sender<Cmd>) {
405 let conn = Conn {
406 addr: s.to_string(),
407 sender,
408 };
409 self.inner.insert(s.to_string(), conn);
410 }
411}
412
413impl Default for Conns {
414 fn default() -> Conns {
415 Conns {
416 inner: HashMap::new(),
417 }
418 }
419}
420
421#[allow(unused)]
422struct Conn<S> {
423 addr: String,
424 sender: S,
425}
426
427impl<S> Conn<S> {
428 fn sender(&mut self) -> &mut S {
429 &mut self.sender
430 }
431}
432
433struct Slots {
434 masters: Vec<String>,
435 replicas: Vec<Replica>,
436 rng: ThreadRng,
437
438 all_masters: HashSet<String>,
439 all_replicas: HashSet<String>,
440}
441
442impl Slots {
443 fn try_update_all(&mut self, masters: Vec<String>, replicas: Vec<Vec<String>>) -> bool {
444 let mut changed = false;
445 for i in 0..SLOTS_COUNT {
446 if self.masters[i] != masters[i] {
447 changed = true;
448 self.masters[i] = masters[i].clone();
449 self.all_masters.insert(masters[i].clone());
450 }
451 }
452
453 for i in 0..SLOTS_COUNT {
454 let len_not_eqal = self.replicas[i].addrs.len() != replicas[i].len();
455 if len_not_eqal || self.replicas[i].addrs.as_slice() != replicas[i].as_slice() {
456 self.replicas[i] = Replica {
457 addrs: replicas[i].clone(),
458 current: Cell::new(0),
459 };
460 self.all_replicas.extend(replicas[i].clone().into_iter());
461 changed = true;
462 }
463 }
464
465 changed
466 }
467
468 fn update_slot(&mut self, slot: usize, addr: String) {
469 self.masters[slot] = addr.clone();
470 self.all_masters.insert(addr);
471 }
472
473 fn get_random_master(&mut self, exclusive: &str) -> Option<&str> {
474 loop {
475 let cursor: usize = self.rng.gen_range(0, self.masters.len());
476 if self
477 .masters
478 .get(cursor)
479 .map(|x| x == exclusive)
480 .unwrap_or(false)
481 {
482 continue;
483 }
484 return self.masters.get(cursor).map(|x| x.as_str());
485 }
486 }
487
488 fn get_master(&self, slot: usize) -> Option<&str> {
489 self.masters.get(slot).map(|x| x.as_str())
490 }
491
492 fn get_replica(&self, slot: usize) -> Option<&str> {
493 self.replicas.get(slot).map(|x| x.get_replica())
494 }
495
496 fn get_all_masters(&self) -> HashSet<String> {
497 self.all_masters.clone()
498 }
499
500 fn get_all_replicas(&self) -> HashSet<String> {
501 self.all_replicas.clone()
502 }
503
504 fn is_master(&self, addr: &str) -> bool {
505 self.all_masters.contains(addr)
506 }
507}
508
509impl Default for Slots {
510 fn default() -> Slots {
511 let mut masters = Vec::with_capacity(SLOTS_COUNT);
512 masters.resize(SLOTS_COUNT, "".to_string());
513 let mut replicas = Vec::with_capacity(SLOTS_COUNT);
514 replicas.resize(SLOTS_COUNT, Replica::default());
515 Slots {
516 rng: thread_rng(),
517 masters,
518 replicas,
519 all_masters: HashSet::new(),
520 all_replicas: HashSet::new(),
521 }
522 }
523}
524
525#[derive(Clone)]
526struct Replica {
527 addrs: Vec<String>,
528 current: Cell<usize>,
529}
530
531impl Replica {
532 fn get_replica(&self) -> &str {
533 if self.addrs.is_empty() {
534 return "";
535 }
536
537 let current = self.current.get();
538 let len = self.addrs.len();
539 let now = self.current.get();
540 self.current.set((now + 1) % len);
541 &self.addrs[current]
542 }
543}
544
545impl Default for Replica {
546 fn default() -> Replica {
547 Replica {
548 addrs: Vec::with_capacity(0),
549 current: Cell::new(0usize),
550 }
551 }
552}
553
554pub(crate) struct ConnBuilder {
555 cluster: Option<String>,
556 node: Option<String>,
557 moved: Option<Sender<Redirection>>,
558 rt: Option<u64>,
559 wt: Option<u64>,
560 replica: bool,
561 fetch: Weak<SingleFlightTrigger>,
562}
563
564impl ConnBuilder {
565 pub(crate) fn new() -> ConnBuilder {
566 ConnBuilder {
567 cluster: None,
568 node: None,
569 moved: None,
570 rt: Some(1000),
571 wt: Some(1000),
572 replica: false,
573 fetch: Weak::new(),
574 }
575 }
576
577 pub(crate) fn fetch(self, fetch: Weak<SingleFlightTrigger>) -> Self {
578 let mut cb = self;
579 cb.fetch = fetch;
580 cb
581 }
582
583 pub(crate) fn cluster(self, cluster: String) -> Self {
584 let mut cb = self;
585 cb.cluster = Some(cluster);
586 cb
587 }
588
589 pub(crate) fn moved(self, moved: Sender<Redirection>) -> Self {
590 let mut cb = self;
591 cb.moved = Some(moved);
592 cb
593 }
594
595 pub(crate) fn read_timeout(self, rt: Option<u64>) -> Self {
596 let mut cb = self;
597 cb.rt = rt;
598 cb
599 }
600
601 pub(crate) fn write_timeout(self, wt: Option<u64>) -> Self {
602 let mut cb = self;
603 cb.wt = wt;
604 cb
605 }
606
607 pub(crate) fn node(self, node: String) -> Self {
608 let mut cb = self;
609 cb.node = Some(node);
610 cb
611 }
612
613 pub(crate) fn replica(self, is_replica: bool) -> Self {
614 let mut cb = self;
615 cb.replica = is_replica;
616 cb
617 }
618
619 pub(crate) fn check_valid(&self) -> bool {
620 true && self.node.is_some() && self.cluster.is_some() && self.moved.is_some()
621 }
622
623 pub(crate) fn connect(self) -> Result<Sender<Cmd>, AsError> {
624 if !self.check_valid() {
625 error!(
626 "fail to open connection to backend {} due param is valid",
627 self.node.as_ref().map(|x| x.as_ref()).unwrap_or("unknown")
628 );
629 return Err(AsError::BadConfig("backend connection config".to_string()));
630 }
631
632 let node_addr = self.node.expect("addr must be checked first");
633 let node_addr_clone = node_addr.clone();
634 let cluster = self
635 .cluster
636 .expect("cluster name must be checked first")
637 .to_string();
638 let rt = self.rt.clone();
639 let wt = self.wt.clone();
640 let moved = self.moved.expect("must be checked first");
641 let fetch = self.fetch.clone();
642
643 let (mut tx, rx) = channel(1024 * 8);
644 let amt = lazy(|| -> Result<(), ()> { Ok(()) })
645 .and_then(move |_| {
646 let node_clone = node_addr.clone();
647 node_addr
648 .as_str()
649 .parse()
650 .map_err(|err| error!("fail to parse addr {} due to {:?}", node_clone, err))
651 })
652 .and_then(|addr| {
653 let report_addr = format!("{:?}", &addr);
654 TcpStream::connect(&addr)
655 .timeout(Duration::from_millis(100))
656 .map_err(move |err| error!("fail to connect to {} {:?}", &report_addr, err))
657 })
658 .then(move |sock| {
659 if let Ok(sock) = sock {
660 let sock =
661 set_read_write_timeout(sock, rt, wt).expect("set timeout must be ok");
662 if let Err(_) = sock.set_nodelay(true) {
663 warn!("fail to set set nodelay when connect to backend but ignore");
664 }
665
666 let codec = RedisNodeCodec {};
667 let (sink, stream) = codec.framed(sock).split();
668 let backend =
669 back::Back::new(cluster, node_addr_clone, rx, sink, stream, moved);
670 current_thread::spawn(backend);
671 } else {
672 error!("fail to conenct to backend {}", node_addr_clone);
673 let blackhole = back::Blackhole::new(node_addr_clone, rx);
674 current_thread::spawn(blackhole);
675 if let Some(trigger) = fetch.upgrade() {
676 trigger.try_trigger();
677 }
678 }
679 Ok(())
680 });
681 current_thread::spawn(amt);
682 if self.replica {
683 let mut cmd = new_read_only_cmd();
684 cmd.reregister(task::current());
685 Self::silence_send_req(cmd, &mut tx);
686 }
687 Ok(tx)
688 }
689
690 fn silence_send_req(cmd: Cmd, tx: &mut Sender<Cmd>) {
691 match tx.start_send(cmd) {
692 Ok(AsyncSink::Ready) => {
693 debug!("success dispatch to read only replica node");
694 }
695 Ok(AsyncSink::NotReady(_)) => {
696 warn!("fail to initial backend connection of replica due to send fail");
697 }
698 Err(err) => {
699 warn!(
700 "fail to initial backend connection of replica due to {}",
701 err
702 );
703 }
704 }
705 }
706}
707
708pub fn run(cc: ClusterConfig, ip: Option<String>) -> Vec<JoinHandle<()>> {
709 let worker = cc.thread.unwrap_or(4);
710 (0..worker)
711 .into_iter()
712 .map(|_index| {
713 let builder = thread::Builder::new();
714 let cc = cc.clone();
715 let ip = ip.clone();
716 builder
717 .name(cc.name.clone())
718 .spawn(move || {
719 meta_init(cc.clone(), ip);
720
721 #[cfg(feature = "metrics")]
722 thread_incr();
723
724 current_thread::block_on_all(
725 init::Initializer::new(cc)
726 .map_err(|err| error!("fail to init cluster due to {}", err)),
727 )
728 .unwrap();
729 })
730 .expect("fail to spawn worker thread")
731 })
732 .collect()
733}