1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
#[cfg(feature = "sync")]
use std::net::SocketAddr;
use bytes::Bytes;
#[cfg(feature = "sync")]
use rand::{seq::SliceRandom, thread_rng, Rng};
#[cfg(feature = "sync")]
use crate::{
core::id::Id,
core::message::{Chunk, Message, Nonce},
tcp::SyncTcpRouter,
};
/// A trait used to enable core kadcast functionality on the implementor.
#[cfg(feature = "sync")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "sync")))]
#[async_trait::async_trait]
pub trait Kadcast
where
Self: Clone + Send + Sync + 'static,
{
/// The number of nodes to query for peers at each search.
const ALPHA: u16 = 3;
/// The peer count target for this node.
const PEER_TARGET: u16 = 10;
/// The interval between periodic pings in seconds.
const PING_INTERVAL_SECS: u64 = 30;
/// The interval between periodic requests for peers while below the mininum number of peers.
const BOOTSTRAP_INTERVAL_SECS: u64 = 10;
/// The interval between periodic requests for peers while above the minimum number of peers.
const DISCOVERY_INTERVAL_SECS: u64 = 60;
/// Returns a clonable reference to the router.
fn router(&self) -> &SyncTcpRouter;
/// Returns `true` if the address is connected, `false` if it isn't.
async fn is_connected(&self, addr: SocketAddr) -> bool;
/// Connects to the address and returns if it was succesful or not.
///
/// Note: Kadmium assumes this method calls [`SyncTcpRouter::insert`] and
/// [`SyncTcpRouter::set_connected`] appropriately.
async fn connect(&self, addr: SocketAddr) -> bool;
/// Disconnects the address and returns `true` if it was connected, returns `false` if it wasn't.
///
/// Note: Kadmium assumes this method calls [`SyncTcpRouter::set_disconnected`] appropriately.
async fn disconnect(&self, addr: SocketAddr) -> bool;
/// Sends a message to the destination address.
async fn unicast(&self, dst: SocketAddr, message: Message);
/// Starts the periodic ping task.
async fn ping(&self) {
let self_clone = self.clone();
tokio::spawn(async move {
loop {
for addr in self_clone.router().connected_addrs() {
self_clone
.unicast(addr, Message::Ping(self_clone.router().generate_ping()))
.await
}
tokio::time::sleep(std::time::Duration::from_secs(Self::PING_INTERVAL_SECS)).await
}
});
// TODO: consider returning the task handle, or at least track it internally.
}
/// Starts the periodic peer discovery task.
async fn peer(&self) {
// TODO: a few current issues to consider:
//
// 1. identifiers are more likely to be in higher index buckets, not necessarily an issue
// so long as bucket size is above the minimum number of peers.
// 2. the above also guaranties a search returning K nodes can indeed return K nodes, so
// long as K is below the minimum number of peers. If K is larger a node will return at
// worst min(min peers, K) and at best min(peers, K).
//
// Therefore: bucket size >= min peers >= K is likely ideal.
let self_clone = self.clone();
tokio::spawn(async move {
loop {
for (_id, addr, is_connected) in
self_clone.router().select_search_peers(Self::ALPHA.into())
{
let is_connected = match is_connected {
true => self_clone.is_connected(addr).await,
false => self_clone.connect(addr).await,
};
if is_connected {
self_clone
.unicast(
addr,
Message::FindKNodes(self_clone.router().generate_find_k_nodes()),
)
.await;
}
}
let peer_deficit =
Self::PEER_TARGET as i128 - self_clone.router().connected_addrs().len() as i128;
if peer_deficit < 0 {
let addrs: Vec<SocketAddr> = {
let mut rng = rand::thread_rng();
self_clone
.router()
.connected_addrs()
.choose_multiple(&mut rng, peer_deficit.unsigned_abs() as usize)
.copied()
.collect()
};
for addr in addrs {
self_clone.disconnect(addr).await;
}
}
if peer_deficit > 0 {
let addrs: Vec<SocketAddr> = {
let mut rng = rand::thread_rng();
self_clone
.router()
.disconnected_addrs()
.choose_multiple(&mut rng, peer_deficit as usize)
.copied()
.collect()
};
for addr in addrs {
self_clone.connect(addr).await;
}
}
// Check the peer counts again.
let sleep_duration = {
std::time::Duration::from_secs(
if self_clone.router().connected_addrs().len() < Self::PEER_TARGET.into() {
Self::BOOTSTRAP_INTERVAL_SECS
} else {
Self::DISCOVERY_INTERVAL_SECS
},
)
};
tokio::time::sleep(sleep_duration).await;
}
});
}
// TODO: work out how and if data should be chunked (1 block per-message or multiple smaller
// messages). Up to the caller for now.
/// Broadcast data to the network, following the kadcast protocol.
async fn kadcast(&self, data: Bytes) -> Nonce {
let peers = self
.router()
.select_broadcast_peers(Id::BITS as u32)
.unwrap();
// TODO: record nonce somewhere.
let nonce = {
let mut rng = thread_rng();
rng.gen()
};
for (height, addr) in peers {
let message = Message::Chunk(Chunk {
// Can be used to trace the broadcast. If set differently for each peer here, it will
// be the same within a propagation sub-tree.
nonce,
height,
// Cheap as the backing storage is shared amongst instances.
data: data.clone(),
});
self.unicast(addr, message).await;
}
nonce
}
}