1use crate::common::{Id, Node};
2use crate::dht::DHT;
3use crate::errors::RustyDHTError;
4use crate::packets;
5use crate::packets::MessageBuilder;
6use crate::storage::buckets::Buckets;
7use crate::storage::node_wrapper::NodeWrapper;
8use futures::StreamExt;
9use log::{debug, error, info, trace, warn};
10use std::collections::HashSet;
11use std::net::SocketAddr;
12use std::time::{Duration, Instant};
13
14pub async fn announce_peer(
27 dht: &DHT,
28 info_hash: Id,
29 port: Option<u16>,
30 timeout: Duration,
31) -> Result<Vec<Node>, RustyDHTError> {
32 let mut to_ret = Vec::new();
33
34 let get_peers_result = get_peers(dht, info_hash, timeout).await?;
36
37 trace!(target:"rustydht_lib::operations::announce_peer", "{} nodes responded to get_peers", get_peers_result.responders.len());
38
39 let announce_builder = MessageBuilder::new_announce_peer_request()
40 .sender_id(dht.get_id())
41 .read_only(dht.get_settings().read_only)
42 .target(info_hash)
43 .port(port.unwrap_or(0))
44 .implied_port(port.is_none());
45
46 let mut todos = futures::stream::FuturesUnordered::new();
48 for responder in get_peers_result.responders().into_iter().take(8) {
49 let builder = announce_builder.clone();
50 todos.push(async move {
51 let announce_req = builder
52 .token(responder.token)
53 .build()
54 .expect("Failed to build announce_peer request");
55 match dht
56 .send_request(
57 announce_req,
58 responder.node.address,
59 Some(responder.node.id),
60 Some(Duration::from_secs(5)),
61 )
62 .await
63 {
64 Ok(_) => Ok(responder.node),
65 Err(e) => Err(e),
66 }
67 });
68 }
69
70 while let Some(announce_result) = todos.next().await {
72 match announce_result {
73 Ok(node) => {
74 to_ret.push(node);
75 }
76
77 Err(e) => match e {
78 RustyDHTError::TimeoutError(_) => {
79 debug!(target: "rustydht_lib::operations::announce_peer", "announce_peer timed out: {}", e);
80 }
81
82 _ => {
83 warn!(target: "rustydht_lib::operations::announce_peer", "Error sending announce_peer: {}", e);
84 }
85 },
86 }
87 }
88
89 Ok(to_ret)
90}
91
92pub async fn find_node(
96 dht: &DHT,
97 target: Id,
98 timeout: Duration,
99) -> Result<Vec<Node>, RustyDHTError> {
100 let mut buckets = Buckets::new(target, 8);
101 let dht_settings = dht.get_settings();
102
103 let find_node_result = tokio::time::timeout(timeout, async {
104 let mut best_ids = Vec::new();
105 loop {
106 for node_wrapper in dht.get_nodes() {
108 if !buckets.contains(&node_wrapper.node.id) {
109 buckets.add(node_wrapper, None);
110 }
111 }
112
113 let nearest = buckets.get_nearest_nodes(&target, None);
115 if nearest.is_empty() {
116 tokio::time::sleep(Duration::from_secs(1)).await;
118 continue;
119 }
120 let best_ids_current: Vec<Id> = nearest.iter().map(|nw| nw.node.id).collect();
121 if best_ids == best_ids_current {
122 break;
123 }
124 best_ids = best_ids_current;
125
126 let request_builder = MessageBuilder::new_find_node_request()
128 .target(target)
129 .read_only(dht_settings.read_only)
130 .sender_id(dht.get_id());
131 let mut todos = futures::stream::FuturesUnordered::new();
132 for node in nearest {
133 todos.push(dht.send_request(
134 request_builder
135 .clone()
136 .build()
137 .expect("Failed to build find_node request"),
138 node.node.address,
139 Some(node.node.id),
140 Some(Duration::from_secs(5))
141 ));
142 }
143
144 let started_sending_time = Instant::now();
146 while let Some(request_result) = todos.next().await {
147 match request_result {
148 Ok(message) => match message.message_type {
149 packets::MessageType::Response(
150 packets::ResponseSpecific::FindNodeResponse(args),
151 ) => {
152 for node in args.nodes {
153 if !buckets.contains(&node.id) {
154 trace!(target: "rustydht_lib::operations::find_node", "Node {:?} is a candidate for buckets", node);
155 buckets.add(NodeWrapper::new(node), None);
156 }
157 }
158 }
159
160 _ => {
161 error!(target: "rustydht_lib::operations::find_node", "Got wrong packet type back: {:?}", message);
162 }
163 },
164 Err(e) => {
165 warn!(target: "rustydht_lib::operations::find_node", "Error sending find_node request: {}", e);
166 }
167 }
168 }
169
170 let since_sent = Instant::now().saturating_duration_since(started_sending_time);
174 let desired_interval = Duration::from_millis(1000);
175 let needed_sleep_interval = desired_interval.saturating_sub(since_sent);
176 if needed_sleep_interval != Duration::ZERO {
177 tokio::time::sleep(needed_sleep_interval).await;
178 }
179 }
180 })
181 .await;
182
183 if let Err(timeout) = find_node_result {
184 debug!(target: "rustydht_lib::operations::find_node", "Timed out after {:?}", timeout);
185 }
186
187 Ok(buckets
188 .get_nearest_nodes(&target, None)
189 .into_iter()
190 .map(|nw| nw.node.clone())
191 .collect())
192}
193
194pub async fn get_peers(
199 dht: &DHT,
200 info_hash: Id,
201 timeout: Duration,
202) -> Result<GetPeersResult, RustyDHTError> {
203 let mut unique_peers = HashSet::new();
204 let mut responders = Vec::new();
205 let mut buckets = Buckets::new(info_hash, 8);
206 let dht_settings = dht.get_settings();
207
208 find_node(dht, info_hash, Duration::from_secs(5)).await?;
210
211 let get_peers_result = tokio::time::timeout(timeout,
212 async {
213 let mut best_ids = Vec::new();
214 loop {
215 for node_wrapper in dht.get_nodes() {
217 if !buckets.contains(&node_wrapper.node.id) {
218 buckets.add(node_wrapper, None);
219 }
220 }
221
222 let nearest = buckets.get_nearest_nodes(&info_hash, None);
224 if nearest.len() <= 5 {
225 tokio::time::sleep(Duration::from_secs(1)).await;
227 continue;
228 }
229 let best_ids_current: Vec<Id> = nearest.iter().map(|nw| nw.node.id).collect();
230 if best_ids == best_ids_current {
231 break;
232 }
233 best_ids = best_ids_current;
234
235 let request_builder = MessageBuilder::new_get_peers_request()
237 .target(info_hash)
238 .read_only(dht_settings.read_only)
239 .sender_id(dht.get_id());
240 let mut todos = futures::stream::FuturesUnordered::new();
241 for node in nearest {
242 let node_clone = node.clone();
243 let request_builder_clone = request_builder.clone();
244 todos.push(async move {
245 match dht.send_request(
246 request_builder_clone
247 .build()
248 .expect("Failed to build get_peers request"),
249 node_clone.node.address,
250 Some(node_clone.node.id),
251 Some(Duration::from_secs(5))
252 ).await {
253 Ok(reply) => Ok((node_clone.node, reply)),
254 Err(e) => Err(e)
255 }
256 });
257 }
258
259 let started_sending_time = Instant::now();
261 while let Some(request_result) = todos.next().await {
262 match request_result {
263 Ok(result) => match result.1.message_type {
264 packets::MessageType::Response(
265 packets::ResponseSpecific::GetPeersResponse(args),
266 ) => {
267 responders.push(GetPeersResponder{
268 node: result.0,
269 token: args.token
270 });
271
272 match args.values {
273 packets::GetPeersResponseValues::Nodes(n) => {
274 debug!(target: "rustydht_lib::operations::get_peers", "Got {} nodes", n.len());
275 for node in n {
276 if !buckets.contains(&node.id) {
277 trace!(target: "rustydht_lib::operations::get_peers", "Node {:?} is a candidate for buckets", node);
278 buckets.add(NodeWrapper::new(node), None);
279 }
280 }
281 }
282 packets::GetPeersResponseValues::Peers(p) => {
283 info!(target: "rustydht_lib::operations::get_peers", "Got {} peers", p.len());
284 for peer in p {
285 unique_peers.insert(peer);
286 }
287 }
288 }},
289 _ => {
290 error!(target: "rustydht_lib::operations::get_peers", "Got wrong packet type back: {:?}", result.1);
291 }
292 },
293 Err(e) => {
294 warn!(target: "rustydht_lib::operations::get_peers", "Error sending get_peers request: {}", e);
295 }
296 }
297 }
298
299 let since_sent = Instant::now().saturating_duration_since(started_sending_time);
303 let desired_interval = Duration::from_millis(1000);
304 let needed_sleep_interval = desired_interval.saturating_sub(since_sent);
305 if needed_sleep_interval != Duration::ZERO {
306 tokio::time::sleep(needed_sleep_interval).await;
307 }
308 }
309 }).await;
310
311 if let Err(timeout) = get_peers_result {
312 debug!(target: "rustydht_lib::operations::get_peers", "Timed out after {:?}, returning current results", timeout);
313 }
314
315 Ok(GetPeersResult::new(
316 info_hash,
317 unique_peers.into_iter().collect(),
318 responders,
319 ))
320}
321
322pub struct GetPeersResult {
324 info_hash: Id,
325 peers: Vec<SocketAddr>,
326 responders: Vec<GetPeersResponder>,
327}
328
329impl GetPeersResult {
330 pub fn new(
331 info_hash: Id,
332 peers: Vec<SocketAddr>,
333 mut responders: Vec<GetPeersResponder>,
334 ) -> GetPeersResult {
335 responders.sort_unstable_by(|a, b| {
336 let a_dist = a.node.id.xor(&info_hash);
337 let b_dist = b.node.id.xor(&info_hash);
338 a_dist.partial_cmp(&b_dist).unwrap()
339 });
340 GetPeersResult {
341 info_hash,
342 peers,
343 responders,
344 }
345 }
346
347 pub fn info_hash(self) -> Id {
349 self.info_hash
350 }
351
352 pub fn peers(self) -> Vec<SocketAddr> {
354 self.peers
355 }
356
357 pub fn responders(self) -> Vec<GetPeersResponder> {
361 self.responders
362 }
363}
364
365pub struct GetPeersResponder {
369 node: Node,
370 token: Vec<u8>,
371}
372
373impl GetPeersResponder {
374 pub fn new(node: Node, token: Vec<u8>) -> GetPeersResponder {
375 GetPeersResponder { node, token }
376 }
377
378 pub fn node(self) -> Node {
379 self.node
380 }
381
382 pub fn token(self) -> Vec<u8> {
383 self.token
384 }
385}