libaster/proxy/cluster/
redirect.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use futures::unsync::mpsc::Receiver;
use futures::{Async, AsyncSink, Future, Stream};

use crate::com::AsError;
use crate::proxy::cluster::fetcher;
use crate::proxy::cluster::{Cluster, Redirect, Redirection};

use std::rc::Rc;

pub struct RedirectHandler {
    cluster: Rc<Cluster>,
    moved_rx: Receiver<Redirection>,
    store: Option<Redirection>,
}

impl RedirectHandler {
    pub fn new(cluster: Rc<Cluster>, moved_rx: Receiver<Redirection>) -> RedirectHandler {
        RedirectHandler {
            cluster,
            moved_rx,
            store: None,
        }
    }
}

impl Future for RedirectHandler {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        loop {
            if let Some(Redirection { target, cmd }) = self.store.take() {
                if !cmd.borrow().can_cycle() {
                    cmd.set_error(AsError::RequestReachMaxCycle);
                    continue;
                }

                let (slot, to, is_move) = match target {
                    Redirect::Move { slot, to } => (slot, to, true),
                    Redirect::Ask { slot, to } => (slot, to, false),
                };
                if is_move {
                    info!(
                        "cluster {} slot {} was moved to {}",
                        self.cluster.cc.borrow().name,
                        slot,
                        to
                    );
                    if self.cluster.update_slot(slot, to.clone()) {
                        self.cluster.trigger_fetch(fetcher::TriggerBy::Moved);
                    }
                }
                let rc_cmd = cmd.clone();
                match self.cluster.dispatch_to(&to, cmd) {
                    Ok(AsyncSink::NotReady(cmd)) => {
                        self.store = Some(Redirection::new(is_move, slot, to, cmd));
                        return Ok(Async::NotReady);
                    }
                    Ok(AsyncSink::Ready) => {
                        rc_cmd.borrow_mut().add_cycle();
                        std::mem::drop(rc_cmd);
                    }
                    Err(err) => {
                        error!("fail to dispath moved cmd to backend {} due to {}", to, err);
                    }
                }
            }

            match self.moved_rx.poll() {
                Ok(Async::Ready(Some(redirection))) => {
                    self.store = Some(redirection);
                }
                Ok(Async::Ready(None)) => {
                    info!("succeed to exits redirection handler");
                    return Ok(Async::Ready(()));
                }
                Ok(Async::NotReady) => return Ok(Async::NotReady),
                Err(err) => {
                    error!("fail to poll from moved channel due to {:?}", err);
                    return Err(());
                }
            }
        }
    }
}