libaster/proxy/cluster/
init.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use futures::task;
use futures::unsync::mpsc::{channel, Sender};
use futures::{Async, AsyncSink, Future, Sink};

use crate::com::AsError;
use crate::com::ClusterConfig;
use crate::protocol::redis::{new_cluster_slots_cmd, slots_reply_to_replicas, Cmd};
use crate::proxy::cluster::{Cluster, ConnBuilder};

enum State {
    Pending,
    Connecting,
    Fetching(Sender<Cmd>, Cmd),
    Waitting(Sender<Cmd>, Cmd),
    Done(Cmd),
}

pub struct Initializer {
    cc: ClusterConfig,
    current: usize,
    state: State,
}

impl Initializer {
    pub fn new(cc: ClusterConfig) -> Initializer {
        Initializer {
            cc,
            current: 0,
            state: State::Pending,
        }
    }
}

impl Future for Initializer {
    type Item = ();
    type Error = AsError;

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        loop {
            match &mut self.state {
                State::Pending => {
                    // debug!("start initialing cluster {}", self.cc.name);
                    self.current = 0;
                    self.state = State::Connecting;
                }
                State::Connecting => {
                    if self.current >= self.cc.servers.len() {
                        return Err(AsError::ClusterAllSeedsDie(self.cc.name.clone()));
                    }
                    let addr = self.cc.servers[self.current].clone();
                    // debug!("start to create connection to backend {}", &addr);
                    let (tx, _rx) = channel(0); // mock moved channel for backend is never be moved
                    let conn = ConnBuilder::new()
                        .moved(tx)
                        .cluster(self.cc.name.clone())
                        .node(addr.to_string())
                        .read_timeout(self.cc.read_timeout.clone())
                        .write_timeout(self.cc.write_timeout.clone())
                        .connect();
                    self.current += 1;

                    match conn {
                        Ok(sender) => {
                            let mut cmd = new_cluster_slots_cmd();
                            cmd.reregister(task::current());
                            self.state = State::Fetching(sender, cmd);
                        }
                        Err(err) => {
                            warn!("fail to connect to backend {} due to {}", &addr, &err);
                            self.state = State::Connecting;
                        }
                    }
                }
                State::Fetching(ref mut sender, ref cmd) => match sender.start_send(cmd.clone()) {
                    Ok(AsyncSink::NotReady(_cmd)) => {
                        // trace!("init: backend is not ready");
                        return Ok(Async::NotReady);
                    }
                    Ok(AsyncSink::Ready) => {
                        // trace!("init: backend is not ready");
                        self.state = State::Waitting(sender.clone(), cmd.clone());
                    }
                    Err(err) => {
                        let addr = self.cc.servers[self.current - 1].clone();
                        error!("fail to send CLUSTER SLOTS cmd to {} due {}", addr, err);
                        if let Err(_err) = sender.close() {
                            warn!("init fetching  connection can't be closed properly, skip");
                        }
                        self.state = State::Connecting;
                    }
                },
                State::Waitting(ref mut sender, ref mut cmd) => {
                    if !cmd.borrow().is_done() {
                        return Ok(Async::NotReady);
                    }
                    if let Err(_err) = sender.close() {
                        warn!("init waitting connection can't be closed properly, skip");
                    }
                    debug!("CLUSTER SLOTS get response as {:?}", cmd);
                    self.state = State::Done(cmd.clone());
                }
                State::Done(cmd) => match slots_reply_to_replicas(cmd.clone()) {
                    Ok(Some(replica)) => {
                        let cluster = Cluster::run(self.cc.clone(), replica);
                        match cluster {
                            Ok(_) => {
                                info!("succeed to create cluster {}", self.cc.name);
                                return Ok(Async::Ready(()));
                            }
                            Err(err) => {
                                warn!("fail to init cluster {} due to {}", self.cc.name, err);
                                self.state = State::Connecting;
                            }
                        }
                    }
                    Ok(None) => {
                        warn!("slots not full covered, this may be not allow in aster");
                        self.state = State::Connecting;
                    }
                    Err(err) => {
                        warn!("fail to parse cmd reply due {}", err);
                        self.state = State::Connecting;
                    }
                },
            }
        }
    }
}