rustydht_lib/dht/
operations.rs

1use crate::common::{Id, Node};
2use crate::dht::DHT;
3use crate::errors::RustyDHTError;
4use crate::packets;
5use crate::packets::MessageBuilder;
6use crate::storage::buckets::Buckets;
7use crate::storage::node_wrapper::NodeWrapper;
8use futures::StreamExt;
9use log::{debug, error, info, trace, warn};
10use std::collections::HashSet;
11use std::net::SocketAddr;
12use std::time::{Duration, Instant};
13
14/// Announce that you are a peer for a specific info_hash, returning the nodes
15/// that were successfully announced to.
16///
17/// # Arguments
18/// * `dht` - DHT instance that will be used to communicate
19/// * `info_hash` - Id of the torrent
20/// * `port` - optional port that other peers should use to contact your peer.
21/// If omitted, `implied_port` will be set true on the announce messages and
22/// * `timeout` - the maximum amount of time that will be spent searching for
23/// peers close to `info_hash` before announcing to them. This means that this
24/// function can actually take a bit longer than `timeout`, since it will take
25/// a moment after `timeout` has elapsed to announce to the nodes.
26pub async fn announce_peer(
27    dht: &DHT,
28    info_hash: Id,
29    port: Option<u16>,
30    timeout: Duration,
31) -> Result<Vec<Node>, RustyDHTError> {
32    let mut to_ret = Vec::new();
33
34    // Figure out which nodes we want to announce to
35    let get_peers_result = get_peers(dht, info_hash, timeout).await?;
36
37    trace!(target:"rustydht_lib::operations::announce_peer", "{} nodes responded to get_peers", get_peers_result.responders.len());
38
39    let announce_builder = MessageBuilder::new_announce_peer_request()
40        .sender_id(dht.get_id())
41        .read_only(dht.get_settings().read_only)
42        .target(info_hash)
43        .port(port.unwrap_or(0))
44        .implied_port(port.is_none());
45
46    // Prepare to send packets to the nearest 8
47    let mut todos = futures::stream::FuturesUnordered::new();
48    for responder in get_peers_result.responders().into_iter().take(8) {
49        let builder = announce_builder.clone();
50        todos.push(async move {
51            let announce_req = builder
52                .token(responder.token)
53                .build()
54                .expect("Failed to build announce_peer request");
55            match dht
56                .send_request(
57                    announce_req,
58                    responder.node.address,
59                    Some(responder.node.id),
60                    Some(Duration::from_secs(5)),
61                )
62                .await
63            {
64                Ok(_) => Ok(responder.node),
65                Err(e) => Err(e),
66            }
67        });
68    }
69
70    // Execute the futures, handle their results
71    while let Some(announce_result) = todos.next().await {
72        match announce_result {
73            Ok(node) => {
74                to_ret.push(node);
75            }
76
77            Err(e) => match e {
78                RustyDHTError::TimeoutError(_) => {
79                    debug!(target: "rustydht_lib::operations::announce_peer", "announce_peer timed out: {}", e);
80                }
81
82                _ => {
83                    warn!(target: "rustydht_lib::operations::announce_peer", "Error sending announce_peer: {}", e);
84                }
85            },
86        }
87    }
88
89    Ok(to_ret)
90}
91
92/// Use the DHT to find the closest nodes to the target as possible.
93///
94/// This runs until it stops making progress or `timeout` has elapsed.
95pub async fn find_node(
96    dht: &DHT,
97    target: Id,
98    timeout: Duration,
99) -> Result<Vec<Node>, RustyDHTError> {
100    let mut buckets = Buckets::new(target, 8);
101    let dht_settings = dht.get_settings();
102
103    let find_node_result = tokio::time::timeout(timeout, async {
104        let mut best_ids = Vec::new();
105        loop {
106            // Seed our buckets with the main buckets from the DHT
107            for node_wrapper in dht.get_nodes() {
108                if !buckets.contains(&node_wrapper.node.id) {
109                    buckets.add(node_wrapper, None);
110                }
111            }
112
113            // Grab a few nodes closest to our target
114            let nearest = buckets.get_nearest_nodes(&target, None);
115            if nearest.is_empty() {
116                // If there are no nodes in the buckets yet, DHT may still be bootstrapping. Give it a moment and try again
117                tokio::time::sleep(Duration::from_secs(1)).await;
118                continue;
119            }
120            let best_ids_current: Vec<Id> = nearest.iter().map(|nw| nw.node.id).collect();
121            if best_ids == best_ids_current {
122                break;
123            }
124            best_ids = best_ids_current;
125
126            // Get ready to send get_peers to all of those closest nodes
127            let request_builder = MessageBuilder::new_find_node_request()
128                .target(target)
129                .read_only(dht_settings.read_only)
130                .sender_id(dht.get_id());
131            let mut todos = futures::stream::FuturesUnordered::new();
132            for node in nearest {
133                todos.push(dht.send_request(
134                    request_builder
135                        .clone()
136                        .build()
137                        .expect("Failed to build find_node request"),
138                    node.node.address,
139                    Some(node.node.id),
140                    Some(Duration::from_secs(5))
141                ));
142            }
143
144            // Send get_peers to nearest nodes, handle their responses
145            let started_sending_time = Instant::now();
146            while let Some(request_result) = todos.next().await {
147                match request_result {
148                    Ok(message) => match message.message_type {
149                        packets::MessageType::Response(
150                            packets::ResponseSpecific::FindNodeResponse(args),
151                        ) => {
152                            for node in args.nodes {
153                                if !buckets.contains(&node.id) {
154                                    trace!(target: "rustydht_lib::operations::find_node", "Node {:?} is a candidate for buckets", node);
155                                    buckets.add(NodeWrapper::new(node), None);
156                                }
157                            }
158                        }
159
160                        _ => {
161                            error!(target: "rustydht_lib::operations::find_node", "Got wrong packet type back: {:?}", message);
162                        }
163                    },
164                    Err(e) => {
165                        warn!(target: "rustydht_lib::operations::find_node", "Error sending find_node request: {}", e);
166                    }
167                }
168            }
169
170            // Ensure that our next round of packet sending starts at least 1s from the last
171            // to prevent us from hitting other nodes too hard.
172            // i.e. don't be a jerk.
173            let since_sent = Instant::now().saturating_duration_since(started_sending_time);
174            let desired_interval = Duration::from_millis(1000);
175            let needed_sleep_interval = desired_interval.saturating_sub(since_sent);
176            if needed_sleep_interval != Duration::ZERO {
177                tokio::time::sleep(needed_sleep_interval).await;
178            }
179        }
180    })
181    .await;
182
183    if let Err(timeout) = find_node_result {
184        debug!(target: "rustydht_lib::operations::find_node", "Timed out after {:?}", timeout);
185    }
186
187    Ok(buckets
188        .get_nearest_nodes(&target, None)
189        .into_iter()
190        .map(|nw| nw.node.clone())
191        .collect())
192}
193
194/// Use the DHT to retrieve peers for the given info_hash.
195///
196/// Returns the all the results so far after `timeout` has elapsed
197/// or the operation stops making progress (whichever happens first).
198pub async fn get_peers(
199    dht: &DHT,
200    info_hash: Id,
201    timeout: Duration,
202) -> Result<GetPeersResult, RustyDHTError> {
203    let mut unique_peers = HashSet::new();
204    let mut responders = Vec::new();
205    let mut buckets = Buckets::new(info_hash, 8);
206    let dht_settings = dht.get_settings();
207
208    // Hack to aid in bootstrapping
209    find_node(dht, info_hash, Duration::from_secs(5)).await?;
210
211    let get_peers_result = tokio::time::timeout(timeout,
212    async {
213        let mut best_ids = Vec::new();
214        loop {
215            // Populate our buckets with the main buckets from the DHT
216            for node_wrapper in dht.get_nodes() {
217                if !buckets.contains(&node_wrapper.node.id) {
218                    buckets.add(node_wrapper, None);
219                }
220            }
221
222            // Grab a few nodes closest to our target info_hash
223            let nearest = buckets.get_nearest_nodes(&info_hash, None);
224            if nearest.len() <= 5 {
225                // If there are no/few nodes in the buckets yet, DHT may still be bootstrapping. Give it a moment and try again
226                tokio::time::sleep(Duration::from_secs(1)).await;
227                continue;
228            }
229            let best_ids_current: Vec<Id> = nearest.iter().map(|nw| nw.node.id).collect();
230            if best_ids == best_ids_current {
231                break;
232            }
233            best_ids = best_ids_current;
234
235            // Get ready to send get_peers to all of those closest nodes
236            let request_builder = MessageBuilder::new_get_peers_request()
237                .target(info_hash)
238                .read_only(dht_settings.read_only)
239                .sender_id(dht.get_id());
240            let mut todos = futures::stream::FuturesUnordered::new();
241            for node in nearest {
242                let node_clone = node.clone();
243                let request_builder_clone = request_builder.clone();
244                todos.push(async move {
245                    match dht.send_request(
246                        request_builder_clone
247                            .build()
248                            .expect("Failed to build get_peers request"),
249                        node_clone.node.address,
250                        Some(node_clone.node.id),
251                        Some(Duration::from_secs(5))
252                    ).await {
253                        Ok(reply) => Ok((node_clone.node, reply)),
254                        Err(e) => Err(e)
255                    }
256                });
257            }
258
259            // Send get_peers to nearest nodes, handle their responses
260            let started_sending_time = Instant::now();
261            while let Some(request_result) = todos.next().await {
262                match request_result {
263                    Ok(result) => match result.1.message_type {
264                        packets::MessageType::Response(
265                            packets::ResponseSpecific::GetPeersResponse(args),
266                        ) => {
267                            responders.push(GetPeersResponder{
268                                node: result.0,
269                                token: args.token
270                            });
271
272                            match args.values {
273                            packets::GetPeersResponseValues::Nodes(n) => {
274                                debug!(target: "rustydht_lib::operations::get_peers", "Got {} nodes", n.len());
275                                for node in n {
276                                    if !buckets.contains(&node.id) {
277                                        trace!(target: "rustydht_lib::operations::get_peers", "Node {:?} is a candidate for buckets", node);
278                                        buckets.add(NodeWrapper::new(node), None);
279                                    }
280                                }
281                            }
282                            packets::GetPeersResponseValues::Peers(p) => {
283                                info!(target: "rustydht_lib::operations::get_peers", "Got {} peers", p.len());
284                                for peer in p {
285                                    unique_peers.insert(peer);
286                                }
287                            }
288                        }},
289                        _ => {
290                            error!(target: "rustydht_lib::operations::get_peers", "Got wrong packet type back: {:?}", result.1);
291                        }
292                    },
293                    Err(e) => {
294                        warn!(target: "rustydht_lib::operations::get_peers", "Error sending get_peers request: {}", e);
295                    }
296                }
297            }
298
299            // Ensure that our next round of packet sending starts at least 1s from the last
300            // to prevent us from hitting other nodes too hard.
301            // i.e. don't be a jerk.
302            let since_sent = Instant::now().saturating_duration_since(started_sending_time);
303            let desired_interval = Duration::from_millis(1000);
304            let needed_sleep_interval = desired_interval.saturating_sub(since_sent);
305            if needed_sleep_interval != Duration::ZERO {
306                tokio::time::sleep(needed_sleep_interval).await;
307            }
308        }
309    }).await;
310
311    if let Err(timeout) = get_peers_result {
312        debug!(target: "rustydht_lib::operations::get_peers", "Timed out after {:?}, returning current results", timeout);
313    }
314
315    Ok(GetPeersResult::new(
316        info_hash,
317        unique_peers.into_iter().collect(),
318        responders,
319    ))
320}
321
322/// Represents the results of a [get_peers](crate::dht::operations::get_peers) operation
323pub struct GetPeersResult {
324    info_hash: Id,
325    peers: Vec<SocketAddr>,
326    responders: Vec<GetPeersResponder>,
327}
328
329impl GetPeersResult {
330    pub fn new(
331        info_hash: Id,
332        peers: Vec<SocketAddr>,
333        mut responders: Vec<GetPeersResponder>,
334    ) -> GetPeersResult {
335        responders.sort_unstable_by(|a, b| {
336            let a_dist = a.node.id.xor(&info_hash);
337            let b_dist = b.node.id.xor(&info_hash);
338            a_dist.partial_cmp(&b_dist).unwrap()
339        });
340        GetPeersResult {
341            info_hash,
342            peers,
343            responders,
344        }
345    }
346
347    /// The info_hash of the torrent that get_peers was attempting to get peers for
348    pub fn info_hash(self) -> Id {
349        self.info_hash
350    }
351
352    /// Vector full of any peers that were found for the info_hash
353    pub fn peers(self) -> Vec<SocketAddr> {
354        self.peers
355    }
356
357    /// Vector of information about the DHT nodes that responded to get_peers
358    ///
359    /// This is sorted by distance of the Node to the info_hash, from nearest to farthest.
360    pub fn responders(self) -> Vec<GetPeersResponder> {
361        self.responders
362    }
363}
364
365/// Represents the response of a node to a get_peers request, including its Id, IP address,
366/// and the token it replied with. This is helpful in case we want to follow up with
367/// an announce_peer request.
368pub struct GetPeersResponder {
369    node: Node,
370    token: Vec<u8>,
371}
372
373impl GetPeersResponder {
374    pub fn new(node: Node, token: Vec<u8>) -> GetPeersResponder {
375        GetPeersResponder { node, token }
376    }
377
378    pub fn node(self) -> Node {
379        self.node
380    }
381
382    pub fn token(self) -> Vec<u8> {
383        self.token
384    }
385}