1use std::collections::VecDeque;
2use std::time::Duration;
3
4use crate::proto_compiled::{raft_client::RaftClient, ClusterInfoReq};
5use crate::{Id, Uri};
6use tokio::sync::mpsc::error::TrySendError;
7use tonic::transport::{Channel, Endpoint};
8use tower::discover::Change;
9
10mod service;
11
12pub struct Connector {
14 f: Box<dyn Fn(&Uri) -> Endpoint + 'static + Send>,
15}
16impl Connector {
17 pub fn new(f: impl Fn(&Uri) -> Endpoint + 'static + Send) -> Self {
18 Self { f: Box::new(f) }
19 }
20 pub fn connect(self, uri: Uri) -> Gateway {
21 Gateway::new(uri.into(), self.f)
22 }
23}
24
25#[derive(Clone)]
29pub struct Gateway {
30 chan: Channel,
31}
32impl Gateway {
33 fn new(id: Id, f: impl Fn(&Uri) -> Endpoint + 'static + Send) -> Self {
34 let (chan, tx) = Channel::balance_channel::<Id>(16);
35 tokio::spawn(async move {
36 let mut cur_leader: Option<Id> = None;
37 let mut new_leader: Option<Id> = None;
38 let mut membership = vec![id];
39 let mut change_queue = VecDeque::new();
40 'outer: loop {
41 for member in &membership {
42 let e = f(member.clone().uri());
43 if let Ok(mut conn) = RaftClient::connect(e).await {
44 let req = ClusterInfoReq {};
45 if let Ok(res) = conn.request_cluster_info(req).await {
46 let res = res.into_inner();
47 if let Some(leader) = res.leader_id {
48 let leader_id: Id = leader.parse().unwrap();
49 let mut xs = vec![];
50 for x in res.membership {
51 xs.push(x.parse().unwrap());
52 }
53 new_leader = Some(leader_id.clone());
54 membership = Self::sort(leader_id, xs);
55 break;
56 }
57 }
58 }
59 }
60 if new_leader != cur_leader {
61 if let Some(ref new_leader) = new_leader {
62 let insert = Change::Insert(new_leader.clone(), f(new_leader.uri()));
63 change_queue.push_back(insert);
64 if let Some(ref cur_leader) = cur_leader {
65 let remove = Change::Remove(cur_leader.clone());
66 change_queue.push_back(remove);
67 }
68 }
69 }
70 loop {
71 if let Some(change) = change_queue.pop_front() {
72 let msg = match &change {
73 Change::Insert(k, e) => Change::Insert(k.clone(), e.clone()),
74 Change::Remove(k) => Change::Remove(k.clone()),
75 };
76 match tx.try_send(msg) {
77 Ok(()) => match change {
78 Change::Insert(k, _) => {
79 cur_leader = Some(k.clone());
80 }
81 Change::Remove(_) => {}
82 },
83 Err(TrySendError::Full(_)) => {
84 change_queue.push_front(change);
85 break;
86 }
87 Err(TrySendError::Closed(_)) => {
88 break 'outer;
89 }
90 }
91 } else {
92 break;
93 }
94 }
95 tokio::time::sleep(Duration::from_secs(1)).await;
96 } });
98 Self { chan }
99 }
100 fn sort(awared_leader: Id, awared_membership: Vec<Id>) -> Vec<Id> {
101 let mut v = vec![];
102 for member in awared_membership {
103 let rank = if member == awared_leader { 0 } else { 1 };
104 v.push((rank, member.to_owned()))
105 }
106 v.sort_by_key(|x| x.0); let mut r = vec![];
108 for (_, id) in v {
109 r.push(id)
110 }
111 r
112 }
113}