Skip to main content

n0_mainline/core/
server.rs

1//! Modules needed only for nodes running in server mode (not read-only).
2
3pub mod peers;
4pub mod signed_peers;
5pub mod tokens;
6
7use std::{fmt::Debug, net::SocketAddrV4, num::NonZeroUsize};
8
9use dyn_clone::DynClone;
10use lru::LruCache;
11use tracing::debug;
12
13use crate::common::{
14    AnnouncePeerRequestArguments, AnnounceSignedPeerRequestArguments, ErrorSpecific,
15    FindNodeRequestArguments, FindNodeResponseArguments, GetImmutableResponseArguments,
16    GetMutableResponseArguments, GetPeersRequestArguments, GetPeersResponseArguments,
17    GetSignedPeersResponseArguments, GetValueRequestArguments, Id, MAX_BUCKET_SIZE_K, MutableItem,
18    NoMoreRecentValueResponseArguments, NoValuesResponseArguments, PingResponseArguments,
19    PutImmutableRequestArguments, PutMutableRequestArguments, PutRequest, PutRequestSpecific,
20    RequestTypeSpecific, ResponseSpecific, RoutingTable, SignedAnnounce, validate_immutable,
21};
22
23use peers::PeersStore;
24use signed_peers::SignedPeersStore;
25use tokens::Tokens;
26
27pub use crate::common::{MessageType, RequestSpecific};
28
29/// Default maximum number of info_hashes for which to store peers.
30pub const MAX_INFO_HASHES: usize = 2000;
31/// Default maximum number of peers to store per info_hash.
32pub const MAX_PEERS: usize = 500;
33/// Default maximum number of Immutable and Mutable items to store.
34pub const MAX_VALUES: usize = 1000;
35
36/// A trait for filtering incoming requests to a DHT node and
37/// decide whether to allow handling it or rate limit or ban
38/// the requester, or prohibit specific requests' details.
39pub trait RequestFilter: Send + Sync + Debug + DynClone {
40    /// Returns true if the request from this source is allowed.
41    fn allow_request(&self, request: &RequestSpecific, from: SocketAddrV4) -> bool;
42}
43
44dyn_clone::clone_trait_object!(RequestFilter);
45
46#[derive(Debug, Clone)]
47struct DefaultFilter;
48
49impl RequestFilter for DefaultFilter {
50    fn allow_request(&self, _request: &RequestSpecific, _from: SocketAddrV4) -> bool {
51        true
52    }
53}
54
55#[derive(Debug)]
56/// A server that handles incoming requests.
57///
58/// Supports [BEP_005](https://www.bittorrent.org/beps/bep_0005.html) and [BEP_0044](https://www.bittorrent.org/beps/bep_0044.html).
59///
60/// But it doesn't implement any rate-limiting or blocking.
61pub struct Server {
62    /// Tokens generator
63    tokens: Tokens,
64    /// Peers store
65    peers: PeersStore,
66    /// signed peers store
67    signed_peers: SignedPeersStore,
68    /// Immutable values store
69    immutable_values: LruCache<Id, Box<[u8]>>,
70    /// Mutable values store
71    mutable_values: LruCache<Id, MutableItem>,
72    /// Filter requests before handling them.
73    filter: Box<dyn RequestFilter>,
74}
75
76impl Default for Server {
77    fn default() -> Self {
78        Self::new(ServerSettings::default())
79    }
80}
81
82#[derive(Debug, Clone)]
83/// Settings for the default dht server.
84pub struct ServerSettings {
85    /// The maximum info_hashes for which to store peers.
86    pub max_info_hashes: usize,
87    /// The maximum peers to store per info_hash.
88    pub max_peers_per_info_hash: usize,
89    /// Maximum number of immutable values to store.
90    pub max_immutable_values: usize,
91    /// Maximum number of mutable values to store.
92    pub max_mutable_values: usize,
93    /// Filter requests before handling them.
94    ///
95    /// Defaults to a function that always returns true.
96    pub filter: Box<dyn RequestFilter>,
97}
98
99impl Default for ServerSettings {
100    fn default() -> Self {
101        Self {
102            max_info_hashes: MAX_INFO_HASHES,
103            max_peers_per_info_hash: MAX_PEERS,
104            max_mutable_values: MAX_VALUES,
105            max_immutable_values: MAX_VALUES,
106
107            filter: Box::new(DefaultFilter),
108        }
109    }
110}
111
112impl Server {
113    /// Creates a new [Server]
114    pub fn new(settings: ServerSettings) -> Self {
115        let tokens = Tokens::new();
116
117        Self {
118            tokens,
119            peers: PeersStore::new(
120                NonZeroUsize::new(settings.max_info_hashes).unwrap_or(
121                    NonZeroUsize::new(MAX_INFO_HASHES).expect("MAX_PEERS is NonZeroUsize"),
122                ),
123                NonZeroUsize::new(settings.max_peers_per_info_hash)
124                    .unwrap_or(NonZeroUsize::new(MAX_PEERS).expect("MAX_PEERS is NonZeroUsize")),
125            ),
126            signed_peers: SignedPeersStore::new(
127                NonZeroUsize::new(settings.max_info_hashes).unwrap_or(
128                    NonZeroUsize::new(MAX_INFO_HASHES).expect("MAX_PEERS is NonZeroUsize"),
129                ),
130                NonZeroUsize::new(settings.max_peers_per_info_hash)
131                    .unwrap_or(NonZeroUsize::new(MAX_PEERS).expect("MAX_PEERS is NonZeroUsize")),
132            ),
133
134            immutable_values: LruCache::new(
135                NonZeroUsize::new(settings.max_immutable_values)
136                    .unwrap_or(NonZeroUsize::new(MAX_VALUES).expect("MAX_VALUES is NonZeroUsize")),
137            ),
138            mutable_values: LruCache::new(
139                NonZeroUsize::new(settings.max_mutable_values)
140                    .unwrap_or(NonZeroUsize::new(MAX_VALUES).expect("MAX_VALUES is NonZeroUsize")),
141            ),
142            filter: settings.filter,
143        }
144    }
145
146    /// Returns an optional response or an error for a request.
147    ///
148    /// Passed to the Rpc to send back to the requester.
149    pub fn handle_request(
150        &mut self,
151        routing_table: &RoutingTable,
152        signing_peers_routing_table: &RoutingTable,
153        from: SocketAddrV4,
154        request: RequestSpecific,
155    ) -> Option<MessageType> {
156        if !self.filter.allow_request(&request, from) {
157            return None;
158        }
159
160        // Lazily rotate secrets before handling a request
161        if self.tokens.should_update() {
162            self.tokens.rotate()
163        }
164
165        let requester_id = request.requester_id;
166
167        Some(match request.request_type {
168            RequestTypeSpecific::Ping => {
169                MessageType::Response(ResponseSpecific::Ping(PingResponseArguments {
170                    responder_id: *routing_table.id(),
171                }))
172            }
173            RequestTypeSpecific::FindNode(FindNodeRequestArguments { target, .. }) => {
174                // prioritize nodes supporting signed peers..
175                let mut nodes = signing_peers_routing_table.closest(target).to_vec();
176                if nodes.len() < MAX_BUCKET_SIZE_K {
177                    nodes.extend_from_slice(
178                        &routing_table
179                            .closest(target)
180                            .iter()
181                            .take(MAX_BUCKET_SIZE_K - nodes.len())
182                            .cloned()
183                            .collect::<Vec<_>>(),
184                    );
185                }
186
187                MessageType::Response(ResponseSpecific::FindNode(FindNodeResponseArguments {
188                    responder_id: *routing_table.id(),
189                    nodes: nodes.into(),
190                }))
191            }
192            RequestTypeSpecific::GetPeers(GetPeersRequestArguments { info_hash, .. }) => {
193                MessageType::Response(match self.peers.get_random_peers(&info_hash) {
194                    Some(peers) => ResponseSpecific::GetPeers(GetPeersResponseArguments {
195                        responder_id: *routing_table.id(),
196                        token: self.tokens.generate_token(from).into(),
197                        nodes: Some(routing_table.closest(info_hash)),
198                        values: peers,
199                    }),
200                    None => ResponseSpecific::NoValues(NoValuesResponseArguments {
201                        responder_id: *routing_table.id(),
202                        token: self.tokens.generate_token(from).into(),
203                        nodes: Some(routing_table.closest(info_hash)),
204                    }),
205                })
206            }
207            RequestTypeSpecific::GetSignedPeers(GetPeersRequestArguments { info_hash, .. }) => {
208                MessageType::Response(match self.signed_peers.get_random_peers(&info_hash) {
209                    Some(peers) => {
210                        ResponseSpecific::GetSignedPeers(GetSignedPeersResponseArguments {
211                            responder_id: *signing_peers_routing_table.id(),
212                            token: self.tokens.generate_token(from).into(),
213                            nodes: Some(signing_peers_routing_table.closest(info_hash)),
214                            peers: peers
215                                .iter()
216                                .map(|p| (*p.key(), p.timestamp(), *p.signature()))
217                                .collect(),
218                        })
219                    }
220                    None => ResponseSpecific::NoValues(NoValuesResponseArguments {
221                        responder_id: *signing_peers_routing_table.id(),
222                        token: self.tokens.generate_token(from).into(),
223                        nodes: Some(signing_peers_routing_table.closest(info_hash)),
224                    }),
225                })
226            }
227            RequestTypeSpecific::GetValue(GetValueRequestArguments { target, seq, .. }) => {
228                if seq.is_some() {
229                    MessageType::Response(self.handle_get_mutable(routing_table, from, target, seq))
230                } else if let Some(v) = self.immutable_values.get(&target) {
231                    MessageType::Response(ResponseSpecific::GetImmutable(
232                        GetImmutableResponseArguments {
233                            responder_id: *routing_table.id(),
234                            token: self.tokens.generate_token(from).into(),
235                            nodes: Some(routing_table.closest(target)),
236                            v: v.clone(),
237                        },
238                    ))
239                } else {
240                    MessageType::Response(self.handle_get_mutable(routing_table, from, target, seq))
241                }
242            }
243            RequestTypeSpecific::Put(PutRequest {
244                token,
245                put_request_type,
246            }) => match put_request_type {
247                PutRequestSpecific::AnnouncePeer(AnnouncePeerRequestArguments {
248                    info_hash,
249                    port,
250                    implied_port,
251                    ..
252                }) => {
253                    if !self.tokens.validate(from, &token) {
254                        debug!(
255                            ?info_hash,
256                            ?requester_id,
257                            ?from,
258                            request_type = "announce_peer",
259                            "Invalid token"
260                        );
261
262                        return Some(MessageType::Error(ErrorSpecific {
263                            code: 203,
264                            description: "Bad token".to_string(),
265                        }));
266                    }
267
268                    let peer = match implied_port {
269                        Some(true) => from,
270                        _ => SocketAddrV4::new(*from.ip(), port),
271                    };
272
273                    self.peers
274                        .add_peer(info_hash, (&request.requester_id, peer));
275
276                    return Some(MessageType::Response(ResponseSpecific::Ping(
277                        PingResponseArguments {
278                            responder_id: *routing_table.id(),
279                        },
280                    )));
281                }
282                PutRequestSpecific::AnnounceSignedPeer(AnnounceSignedPeerRequestArguments {
283                    info_hash,
284                    t,
285                    k,
286                    sig,
287                }) => {
288                    if !self.tokens.validate(from, &token) {
289                        debug!(
290                            ?info_hash,
291                            ?requester_id,
292                            ?from,
293                            request_type = "announce_signed_peer",
294                            "Invalid token"
295                        );
296
297                        return Some(MessageType::Error(ErrorSpecific {
298                            code: 203,
299                            description: "Bad token".to_string(),
300                        }));
301                    }
302
303                    match SignedAnnounce::from_dht_request(&info_hash, &k, t, &sig) {
304                        Ok(peer) => {
305                            self.signed_peers.add_peer(info_hash, peer);
306
307                            return Some(MessageType::Response(ResponseSpecific::Ping(
308                                PingResponseArguments {
309                                    responder_id: *routing_table.id(),
310                                },
311                            )));
312                        }
313                        Err(error) => {
314                            debug!(
315                                ?info_hash,
316                                ?requester_id,
317                                ?from,
318                                ?error,
319                                request_type = "announce_signed_peer",
320                                "Invalid signed announce"
321                            );
322
323                            return Some(MessageType::Error(ErrorSpecific {
324                                code: 203,
325                                description: error.to_string(),
326                            }));
327                        }
328                    }
329                }
330                PutRequestSpecific::PutImmutable(PutImmutableRequestArguments {
331                    v,
332                    target,
333                    ..
334                }) => {
335                    if !self.tokens.validate(from, &token) {
336                        debug!(
337                            ?target,
338                            ?requester_id,
339                            ?from,
340                            request_type = "put_immutable",
341                            "Invalid token"
342                        );
343
344                        return Some(MessageType::Error(ErrorSpecific {
345                            code: 203,
346                            description: "Bad token".to_string(),
347                        }));
348                    }
349
350                    if v.len() > 1000 {
351                        debug!(?target, ?requester_id, ?from, size = ?v.len(), "Message (v field) too big.");
352
353                        return Some(MessageType::Error(ErrorSpecific {
354                            code: 205,
355                            description: "Message (v field) too big.".to_string(),
356                        }));
357                    }
358                    if !validate_immutable(&v, target) {
359                        debug!(?target, ?requester_id, ?from, v = ?v, "Target doesn't match the sha1 hash of v field.");
360
361                        return Some(MessageType::Error(ErrorSpecific {
362                            code: 203,
363                            description: "Target doesn't match the sha1 hash of v field"
364                                .to_string(),
365                        }));
366                    }
367
368                    self.immutable_values.put(target, v);
369
370                    return Some(MessageType::Response(ResponseSpecific::Ping(
371                        PingResponseArguments {
372                            responder_id: *routing_table.id(),
373                        },
374                    )));
375                }
376                PutRequestSpecific::PutMutable(PutMutableRequestArguments {
377                    target,
378                    v,
379                    k,
380                    seq,
381                    sig,
382                    salt,
383                    cas,
384                    ..
385                }) => {
386                    if !self.tokens.validate(from, &token) {
387                        debug!(
388                            ?target,
389                            ?requester_id,
390                            ?from,
391                            request_type = "put_mutable",
392                            "Invalid token"
393                        );
394                        return Some(MessageType::Error(ErrorSpecific {
395                            code: 203,
396                            description: "Bad token".to_string(),
397                        }));
398                    }
399                    if v.len() > 1000 {
400                        return Some(MessageType::Error(ErrorSpecific {
401                            code: 205,
402                            description: "Message (v field) too big.".to_string(),
403                        }));
404                    }
405                    if let Some(ref salt) = salt
406                        && salt.len() > 64
407                    {
408                        return Some(MessageType::Error(ErrorSpecific {
409                            code: 207,
410                            description: "salt (salt field) too big.".to_string(),
411                        }));
412                    }
413                    if let Some(previous) = self.mutable_values.get(&target) {
414                        if let Some(cas) = cas
415                            && previous.seq() != cas
416                        {
417                            debug!(
418                                ?target,
419                                ?requester_id,
420                                ?from,
421                                "CAS mismatched, re-read value and try again."
422                            );
423
424                            return Some(MessageType::Error(ErrorSpecific {
425                                code: 301,
426                                description: "CAS mismatched, re-read value and try again."
427                                    .to_string(),
428                            }));
429                        };
430
431                        if seq < previous.seq() {
432                            debug!(
433                                ?target,
434                                ?requester_id,
435                                ?from,
436                                "Sequence number less than current."
437                            );
438
439                            return Some(MessageType::Error(ErrorSpecific {
440                                code: 302,
441                                description: "Sequence number less than current.".to_string(),
442                            }));
443                        }
444                    }
445
446                    match MutableItem::from_dht_message(target, &k, v, seq, &sig, salt) {
447                        Ok(item) => {
448                            self.mutable_values.put(target, item);
449
450                            MessageType::Response(ResponseSpecific::Ping(PingResponseArguments {
451                                responder_id: *routing_table.id(),
452                            }))
453                        }
454                        Err(error) => {
455                            debug!(?target, ?requester_id, ?from, ?error, "Invalid signature");
456
457                            MessageType::Error(ErrorSpecific {
458                                code: 206,
459                                description: "Invalid signature".to_string(),
460                            })
461                        }
462                    }
463                }
464            },
465        })
466    }
467
468    /// Handle get mutable request
469    fn handle_get_mutable(
470        &mut self,
471        routing_table: &RoutingTable,
472        from: SocketAddrV4,
473        target: Id,
474        seq: Option<i64>,
475    ) -> ResponseSpecific {
476        match self.mutable_values.get(&target) {
477            Some(item) => {
478                let no_more_recent_values = seq.map(|request_seq| item.seq() <= request_seq);
479
480                match no_more_recent_values {
481                    Some(true) => {
482                        ResponseSpecific::NoMoreRecentValue(NoMoreRecentValueResponseArguments {
483                            responder_id: *routing_table.id(),
484                            token: self.tokens.generate_token(from).into(),
485                            nodes: Some(routing_table.closest(target)),
486                            seq: item.seq(),
487                        })
488                    }
489                    _ => ResponseSpecific::GetMutable(GetMutableResponseArguments {
490                        responder_id: *routing_table.id(),
491                        token: self.tokens.generate_token(from).into(),
492                        nodes: Some(routing_table.closest(target)),
493                        v: item.value().into(),
494                        k: *item.key(),
495                        seq: item.seq(),
496                        sig: *item.signature(),
497                    }),
498                }
499            }
500            None => ResponseSpecific::NoValues(NoValuesResponseArguments {
501                responder_id: *routing_table.id(),
502                token: self.tokens.generate_token(from).into(),
503                nodes: Some(routing_table.closest(target)),
504            }),
505        }
506    }
507}