1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
use crate::link::local::{LinkError, LinkRx, LinkTx};
use crate::link::network;
use crate::link::network::Network;
use crate::NodeId;
use std::collections::HashMap;
use std::time::Duration;
use std::{io, thread};
use tokio::net::TcpListener;
use tokio::task;
use tokio::task::JoinHandle;
use tokio::time;
use tokio::time::error::Elapsed;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("I/O")]
Io(#[from] io::Error),
#[error("Network {0}")]
Network(#[from] network::Error),
#[error("Timeout")]
Timeout(#[from] Elapsed),
#[error("Link error = {0}")]
Link(#[from] LinkError),
}
pub struct Cluster {
node_id: NodeId,
/// Address list of replicas that this node should connect to
listeners: Vec<(NodeId, String)>,
/// Address at which this server is listening on
listen: String,
/// Replica handles returned by the router
replicas: HashMap<NodeId, (LinkTx, LinkRx)>,
}
impl Cluster {
pub fn new<A: Into<String>>(id: NodeId, listen: &str, listeners: Vec<(NodeId, A)>) -> Cluster {
let listeners = listeners.into_iter().map(|v| (v.0, v.1.into())).collect();
Cluster {
node_id: id,
listeners,
listen: listen.to_owned(),
replicas: HashMap::new(),
}
}
pub fn add_replica_router_handle(&mut self, id: NodeId, replica: (LinkTx, LinkRx)) {
self.replicas.insert(id, replica);
}
/// Connection initiating replica
async fn network_connect(_remote: &str, _id: &str) -> Result<Network, Error> {
unimplemented!();
// let socket = TcpStream::connect(remote).await?;
// let mut network = Network::new(Box::new(socket), 1024 * 1024);
// network.send_connect(Connect::new(id)).await?;
// Ok(network)
}
async fn client(
remote_addr: &str,
remote_id: NodeId,
connector_id: NodeId,
mut link_tx: LinkTx,
mut link_rx: LinkRx,
) {
// Reconnect and run indefinitely
loop {
// Connect to remote replica on 'remote_addr'
let network =
match Cluster::network_connect(remote_addr, &connector_id.to_string()).await {
Ok(n) => n,
Err(e) => {
error!("Failed to connect to replica. Error = {:?}", e);
time::sleep(Duration::from_secs(1)).await;
continue;
}
};
// Create a replica representing 'remote_id'
let mut replica = Replica::new(remote_id, network);
if let Err(e) = replica.run(&mut link_tx, &mut link_rx).await {
error!("Replica stopped! Error = {:?}", e);
time::sleep(Duration::from_secs(1)).await;
continue;
}
}
}
async fn accept(&mut self) {
let listener = TcpListener::bind(&self.listen).await.unwrap();
let delay = Duration::from_millis(100);
let mut tasks: HashMap<NodeId, JoinHandle<(LinkTx, LinkRx)>> = HashMap::new();
info!("Waiting for connections on {}. Server = {}", self.listen, self.node_id);
loop {
let (stream, _addr) = listener.accept().await.unwrap();
let mut network = Network::new(Box::new(stream), 1024 * 1024, 100);
let replica_id: NodeId = match network.read_connect(100).await {
Ok(connect) => connect.client_id.parse().unwrap(),
Err(e) => {
error!("Error awaiting connect from replica = {:?}", e);
time::sleep(Duration::from_secs(1)).await;
continue;
}
};
let (link_tx, link_rx) = match self.replicas.remove(&replica_id) {
Some(v) => v,
None => match tasks.get_mut(&replica_id) {
Some(t) => t.await.unwrap(),
None => {
error!("Router handles not found for replica = {}", replica_id);
time::sleep(Duration::from_secs(1)).await;
continue;
}
},
};
// let replica_id_clone = replica_id.clone();
let t = task::spawn(async move {
let mut link_tx = link_tx;
let mut link_rx = link_rx;
// Create a replica representing 'remote_id'
let mut replica = Replica::new(replica_id, network);
if let Err(e) = replica.run(&mut link_tx, &mut link_rx).await {
error!("Replica stopped! Error = {:?}", e);
}
(link_tx, link_rx)
});
tasks.insert(replica_id, t);
time::sleep(delay).await;
}
}
/// Spawns client connections
async fn spawn_connectors(&mut self) {
for (remote_id, remote_addr) in self.listeners.clone() {
let (link_tx, link_rx) = match self.replicas.remove(&remote_id) {
Some(l) => l,
None => panic!("Failed to fetch router links for replica = {}", remote_id),
};
let connector_id = self.node_id.clone();
task::spawn(async move {
Cluster::client(&remote_addr, remote_id, connector_id, link_tx, link_rx).await;
});
}
}
/// Start the cluster
#[tokio::main(flavor = "current_thread")]
pub async fn start_inner(&mut self) {
self.spawn_connectors().await;
self.accept().await;
}
pub fn spawn(mut self) {
let t2 = thread::Builder::new().name(format!("cluster-{}", self.node_id));
t2.spawn(move || error!("Cluster done! Reason = {:?}", self.start_inner()))
.unwrap();
}
}
pub struct Replica {
/// Replica id of node this connection is representing
id: NodeId,
/// Link to read from and write to network
network: Network,
}
impl Replica {
fn new(id: NodeId, network: Network) -> Replica {
Replica { id, network }
}
pub async fn run(&mut self, _link_tx: &mut LinkTx, link_rx: &mut LinkRx) -> Result<(), Error> {
unimplemented!()
// link_tx.replica_subscribe("#").unwrap();
// self.network.set_keepalive(10);
// Note:
// Shouldn't result in bounded queue deadlocks because of blocking n/w send
// loop {
// select! {
// o = self.network.readb() => {
// let packets = o?.freeze();
// debug!("::{} {:10} {:20} size = {} bytes", self.id, "read", "remote", packets.len());
// // link_tx.async_send_replica_data(packets).await?;
// }
// // Receive from router when previous when state isn't in collision
// // due to previously received data request
// notification = link_rx.next() => {
// let message = match notification? {
// Some(v) => v,
// None => continue
// };
// let write = match message {
// Notification::ReplicaData{payload, ..} => payload,
// Notification::ReplicaAcks{payload, ..} => payload,
// v => unreachable!("Expecting only data or device acks. Received = {:?}", v)
// };
// debug!("::{} {:10} {:20} size = {} bytes", self.id, "write", "remote", write.len());
// self.network.write_all(&write).await?;
// }
// }
// }
}
}