lol_core/gateway/
mod.rs

1use std::collections::VecDeque;
2use std::time::Duration;
3
4use crate::proto_compiled::{raft_client::RaftClient, ClusterInfoReq};
5use crate::{Id, Uri};
6use tokio::sync::mpsc::error::TrySendError;
7use tonic::transport::{Channel, Endpoint};
8use tower::discover::Change;
9
10mod service;
11
12/// Gateway builder.
13pub struct Connector {
14    f: Box<dyn Fn(&Uri) -> Endpoint + 'static + Send>,
15}
16impl Connector {
17    pub fn new(f: impl Fn(&Uri) -> Endpoint + 'static + Send) -> Self {
18        Self { f: Box::new(f) }
19    }
20    pub fn connect(self, uri: Uri) -> Gateway {
21        Gateway::new(uri.into(), self.f)
22    }
23}
24
25/// Gateway is like `Channel` but updates the destination when cluster membership is changed.
26/// With Gateway, users don't need to remember the current membership but can transparently
27/// interact with the cluster through the current leader.
28#[derive(Clone)]
29pub struct Gateway {
30    chan: Channel,
31}
32impl Gateway {
33    fn new(id: Id, f: impl Fn(&Uri) -> Endpoint + 'static + Send) -> Self {
34        let (chan, tx) = Channel::balance_channel::<Id>(16);
35        tokio::spawn(async move {
36            let mut cur_leader: Option<Id> = None;
37            let mut new_leader: Option<Id> = None;
38            let mut membership = vec![id];
39            let mut change_queue = VecDeque::new();
40            'outer: loop {
41                for member in &membership {
42                    let e = f(member.clone().uri());
43                    if let Ok(mut conn) = RaftClient::connect(e).await {
44                        let req = ClusterInfoReq {};
45                        if let Ok(res) = conn.request_cluster_info(req).await {
46                            let res = res.into_inner();
47                            if let Some(leader) = res.leader_id {
48                                let leader_id: Id = leader.parse().unwrap();
49                                let mut xs = vec![];
50                                for x in res.membership {
51                                    xs.push(x.parse().unwrap());
52                                }
53                                new_leader = Some(leader_id.clone());
54                                membership = Self::sort(leader_id, xs);
55                                break;
56                            }
57                        }
58                    }
59                }
60                if new_leader != cur_leader {
61                    if let Some(ref new_leader) = new_leader {
62                        let insert = Change::Insert(new_leader.clone(), f(new_leader.uri()));
63                        change_queue.push_back(insert);
64                        if let Some(ref cur_leader) = cur_leader {
65                            let remove = Change::Remove(cur_leader.clone());
66                            change_queue.push_back(remove);
67                        }
68                    }
69                }
70                loop {
71                    if let Some(change) = change_queue.pop_front() {
72                        let msg = match &change {
73                            Change::Insert(k, e) => Change::Insert(k.clone(), e.clone()),
74                            Change::Remove(k) => Change::Remove(k.clone()),
75                        };
76                        match tx.try_send(msg) {
77                            Ok(()) => match change {
78                                Change::Insert(k, _) => {
79                                    cur_leader = Some(k.clone());
80                                }
81                                Change::Remove(_) => {}
82                            },
83                            Err(TrySendError::Full(_)) => {
84                                change_queue.push_front(change);
85                                break;
86                            }
87                            Err(TrySendError::Closed(_)) => {
88                                break 'outer;
89                            }
90                        }
91                    } else {
92                        break;
93                    }
94                }
95                tokio::time::sleep(Duration::from_secs(1)).await;
96            } // outer loop
97        });
98        Self { chan }
99    }
100    fn sort(awared_leader: Id, awared_membership: Vec<Id>) -> Vec<Id> {
101        let mut v = vec![];
102        for member in awared_membership {
103            let rank = if member == awared_leader { 0 } else { 1 };
104            v.push((rank, member.to_owned()))
105        }
106        v.sort_by_key(|x| x.0); // leader first
107        let mut r = vec![];
108        for (_, id) in v {
109            r.push(id)
110        }
111        r
112    }
113}