1pub mod peers;
4pub mod signed_peers;
5pub mod tokens;
6
7use std::{fmt::Debug, net::SocketAddrV4, num::NonZeroUsize};
8
9use dyn_clone::DynClone;
10use lru::LruCache;
11use tracing::debug;
12
13use crate::common::{
14 AnnouncePeerRequestArguments, AnnounceSignedPeerRequestArguments, ErrorSpecific,
15 FindNodeRequestArguments, FindNodeResponseArguments, GetImmutableResponseArguments,
16 GetMutableResponseArguments, GetPeersRequestArguments, GetPeersResponseArguments,
17 GetSignedPeersResponseArguments, GetValueRequestArguments, Id, MAX_BUCKET_SIZE_K, MutableItem,
18 NoMoreRecentValueResponseArguments, NoValuesResponseArguments, PingResponseArguments,
19 PutImmutableRequestArguments, PutMutableRequestArguments, PutRequest, PutRequestSpecific,
20 RequestTypeSpecific, ResponseSpecific, RoutingTable, SignedAnnounce, validate_immutable,
21};
22
23use peers::PeersStore;
24use signed_peers::SignedPeersStore;
25use tokens::Tokens;
26
27pub use crate::common::{MessageType, RequestSpecific};
28
29pub const MAX_INFO_HASHES: usize = 2000;
31pub const MAX_PEERS: usize = 500;
33pub const MAX_VALUES: usize = 1000;
35
36pub trait RequestFilter: Send + Sync + Debug + DynClone {
40 fn allow_request(&self, request: &RequestSpecific, from: SocketAddrV4) -> bool;
42}
43
44dyn_clone::clone_trait_object!(RequestFilter);
45
46#[derive(Debug, Clone)]
47struct DefaultFilter;
48
49impl RequestFilter for DefaultFilter {
50 fn allow_request(&self, _request: &RequestSpecific, _from: SocketAddrV4) -> bool {
51 true
52 }
53}
54
55#[derive(Debug)]
56pub struct Server {
62 tokens: Tokens,
64 peers: PeersStore,
66 signed_peers: SignedPeersStore,
68 immutable_values: LruCache<Id, Box<[u8]>>,
70 mutable_values: LruCache<Id, MutableItem>,
72 filter: Box<dyn RequestFilter>,
74}
75
76impl Default for Server {
77 fn default() -> Self {
78 Self::new(ServerSettings::default())
79 }
80}
81
82#[derive(Debug, Clone)]
83pub struct ServerSettings {
85 pub max_info_hashes: usize,
87 pub max_peers_per_info_hash: usize,
89 pub max_immutable_values: usize,
91 pub max_mutable_values: usize,
93 pub filter: Box<dyn RequestFilter>,
97}
98
99impl Default for ServerSettings {
100 fn default() -> Self {
101 Self {
102 max_info_hashes: MAX_INFO_HASHES,
103 max_peers_per_info_hash: MAX_PEERS,
104 max_mutable_values: MAX_VALUES,
105 max_immutable_values: MAX_VALUES,
106
107 filter: Box::new(DefaultFilter),
108 }
109 }
110}
111
112impl Server {
113 pub fn new(settings: ServerSettings) -> Self {
115 let tokens = Tokens::new();
116
117 Self {
118 tokens,
119 peers: PeersStore::new(
120 NonZeroUsize::new(settings.max_info_hashes).unwrap_or(
121 NonZeroUsize::new(MAX_INFO_HASHES).expect("MAX_PEERS is NonZeroUsize"),
122 ),
123 NonZeroUsize::new(settings.max_peers_per_info_hash)
124 .unwrap_or(NonZeroUsize::new(MAX_PEERS).expect("MAX_PEERS is NonZeroUsize")),
125 ),
126 signed_peers: SignedPeersStore::new(
127 NonZeroUsize::new(settings.max_info_hashes).unwrap_or(
128 NonZeroUsize::new(MAX_INFO_HASHES).expect("MAX_PEERS is NonZeroUsize"),
129 ),
130 NonZeroUsize::new(settings.max_peers_per_info_hash)
131 .unwrap_or(NonZeroUsize::new(MAX_PEERS).expect("MAX_PEERS is NonZeroUsize")),
132 ),
133
134 immutable_values: LruCache::new(
135 NonZeroUsize::new(settings.max_immutable_values)
136 .unwrap_or(NonZeroUsize::new(MAX_VALUES).expect("MAX_VALUES is NonZeroUsize")),
137 ),
138 mutable_values: LruCache::new(
139 NonZeroUsize::new(settings.max_mutable_values)
140 .unwrap_or(NonZeroUsize::new(MAX_VALUES).expect("MAX_VALUES is NonZeroUsize")),
141 ),
142 filter: settings.filter,
143 }
144 }
145
146 pub fn handle_request(
150 &mut self,
151 routing_table: &RoutingTable,
152 signing_peers_routing_table: &RoutingTable,
153 from: SocketAddrV4,
154 request: RequestSpecific,
155 ) -> Option<MessageType> {
156 if !self.filter.allow_request(&request, from) {
157 return None;
158 }
159
160 if self.tokens.should_update() {
162 self.tokens.rotate()
163 }
164
165 let requester_id = request.requester_id;
166
167 Some(match request.request_type {
168 RequestTypeSpecific::Ping => {
169 MessageType::Response(ResponseSpecific::Ping(PingResponseArguments {
170 responder_id: *routing_table.id(),
171 }))
172 }
173 RequestTypeSpecific::FindNode(FindNodeRequestArguments { target, .. }) => {
174 let mut nodes = signing_peers_routing_table.closest(target).to_vec();
176 if nodes.len() < MAX_BUCKET_SIZE_K {
177 nodes.extend_from_slice(
178 &routing_table
179 .closest(target)
180 .iter()
181 .take(MAX_BUCKET_SIZE_K - nodes.len())
182 .cloned()
183 .collect::<Vec<_>>(),
184 );
185 }
186
187 MessageType::Response(ResponseSpecific::FindNode(FindNodeResponseArguments {
188 responder_id: *routing_table.id(),
189 nodes: nodes.into(),
190 }))
191 }
192 RequestTypeSpecific::GetPeers(GetPeersRequestArguments { info_hash, .. }) => {
193 MessageType::Response(match self.peers.get_random_peers(&info_hash) {
194 Some(peers) => ResponseSpecific::GetPeers(GetPeersResponseArguments {
195 responder_id: *routing_table.id(),
196 token: self.tokens.generate_token(from).into(),
197 nodes: Some(routing_table.closest(info_hash)),
198 values: peers,
199 }),
200 None => ResponseSpecific::NoValues(NoValuesResponseArguments {
201 responder_id: *routing_table.id(),
202 token: self.tokens.generate_token(from).into(),
203 nodes: Some(routing_table.closest(info_hash)),
204 }),
205 })
206 }
207 RequestTypeSpecific::GetSignedPeers(GetPeersRequestArguments { info_hash, .. }) => {
208 MessageType::Response(match self.signed_peers.get_random_peers(&info_hash) {
209 Some(peers) => {
210 ResponseSpecific::GetSignedPeers(GetSignedPeersResponseArguments {
211 responder_id: *signing_peers_routing_table.id(),
212 token: self.tokens.generate_token(from).into(),
213 nodes: Some(signing_peers_routing_table.closest(info_hash)),
214 peers: peers
215 .iter()
216 .map(|p| (*p.key(), p.timestamp(), *p.signature()))
217 .collect(),
218 })
219 }
220 None => ResponseSpecific::NoValues(NoValuesResponseArguments {
221 responder_id: *signing_peers_routing_table.id(),
222 token: self.tokens.generate_token(from).into(),
223 nodes: Some(signing_peers_routing_table.closest(info_hash)),
224 }),
225 })
226 }
227 RequestTypeSpecific::GetValue(GetValueRequestArguments { target, seq, .. }) => {
228 if seq.is_some() {
229 MessageType::Response(self.handle_get_mutable(routing_table, from, target, seq))
230 } else if let Some(v) = self.immutable_values.get(&target) {
231 MessageType::Response(ResponseSpecific::GetImmutable(
232 GetImmutableResponseArguments {
233 responder_id: *routing_table.id(),
234 token: self.tokens.generate_token(from).into(),
235 nodes: Some(routing_table.closest(target)),
236 v: v.clone(),
237 },
238 ))
239 } else {
240 MessageType::Response(self.handle_get_mutable(routing_table, from, target, seq))
241 }
242 }
243 RequestTypeSpecific::Put(PutRequest {
244 token,
245 put_request_type,
246 }) => match put_request_type {
247 PutRequestSpecific::AnnouncePeer(AnnouncePeerRequestArguments {
248 info_hash,
249 port,
250 implied_port,
251 ..
252 }) => {
253 if !self.tokens.validate(from, &token) {
254 debug!(
255 ?info_hash,
256 ?requester_id,
257 ?from,
258 request_type = "announce_peer",
259 "Invalid token"
260 );
261
262 return Some(MessageType::Error(ErrorSpecific {
263 code: 203,
264 description: "Bad token".to_string(),
265 }));
266 }
267
268 let peer = match implied_port {
269 Some(true) => from,
270 _ => SocketAddrV4::new(*from.ip(), port),
271 };
272
273 self.peers
274 .add_peer(info_hash, (&request.requester_id, peer));
275
276 return Some(MessageType::Response(ResponseSpecific::Ping(
277 PingResponseArguments {
278 responder_id: *routing_table.id(),
279 },
280 )));
281 }
282 PutRequestSpecific::AnnounceSignedPeer(AnnounceSignedPeerRequestArguments {
283 info_hash,
284 t,
285 k,
286 sig,
287 }) => {
288 if !self.tokens.validate(from, &token) {
289 debug!(
290 ?info_hash,
291 ?requester_id,
292 ?from,
293 request_type = "announce_signed_peer",
294 "Invalid token"
295 );
296
297 return Some(MessageType::Error(ErrorSpecific {
298 code: 203,
299 description: "Bad token".to_string(),
300 }));
301 }
302
303 match SignedAnnounce::from_dht_request(&info_hash, &k, t, &sig) {
304 Ok(peer) => {
305 self.signed_peers.add_peer(info_hash, peer);
306
307 return Some(MessageType::Response(ResponseSpecific::Ping(
308 PingResponseArguments {
309 responder_id: *routing_table.id(),
310 },
311 )));
312 }
313 Err(error) => {
314 debug!(
315 ?info_hash,
316 ?requester_id,
317 ?from,
318 ?error,
319 request_type = "announce_signed_peer",
320 "Invalid signed announce"
321 );
322
323 return Some(MessageType::Error(ErrorSpecific {
324 code: 203,
325 description: error.to_string(),
326 }));
327 }
328 }
329 }
330 PutRequestSpecific::PutImmutable(PutImmutableRequestArguments {
331 v,
332 target,
333 ..
334 }) => {
335 if !self.tokens.validate(from, &token) {
336 debug!(
337 ?target,
338 ?requester_id,
339 ?from,
340 request_type = "put_immutable",
341 "Invalid token"
342 );
343
344 return Some(MessageType::Error(ErrorSpecific {
345 code: 203,
346 description: "Bad token".to_string(),
347 }));
348 }
349
350 if v.len() > 1000 {
351 debug!(?target, ?requester_id, ?from, size = ?v.len(), "Message (v field) too big.");
352
353 return Some(MessageType::Error(ErrorSpecific {
354 code: 205,
355 description: "Message (v field) too big.".to_string(),
356 }));
357 }
358 if !validate_immutable(&v, target) {
359 debug!(?target, ?requester_id, ?from, v = ?v, "Target doesn't match the sha1 hash of v field.");
360
361 return Some(MessageType::Error(ErrorSpecific {
362 code: 203,
363 description: "Target doesn't match the sha1 hash of v field"
364 .to_string(),
365 }));
366 }
367
368 self.immutable_values.put(target, v);
369
370 return Some(MessageType::Response(ResponseSpecific::Ping(
371 PingResponseArguments {
372 responder_id: *routing_table.id(),
373 },
374 )));
375 }
376 PutRequestSpecific::PutMutable(PutMutableRequestArguments {
377 target,
378 v,
379 k,
380 seq,
381 sig,
382 salt,
383 cas,
384 ..
385 }) => {
386 if !self.tokens.validate(from, &token) {
387 debug!(
388 ?target,
389 ?requester_id,
390 ?from,
391 request_type = "put_mutable",
392 "Invalid token"
393 );
394 return Some(MessageType::Error(ErrorSpecific {
395 code: 203,
396 description: "Bad token".to_string(),
397 }));
398 }
399 if v.len() > 1000 {
400 return Some(MessageType::Error(ErrorSpecific {
401 code: 205,
402 description: "Message (v field) too big.".to_string(),
403 }));
404 }
405 if let Some(ref salt) = salt
406 && salt.len() > 64
407 {
408 return Some(MessageType::Error(ErrorSpecific {
409 code: 207,
410 description: "salt (salt field) too big.".to_string(),
411 }));
412 }
413 if let Some(previous) = self.mutable_values.get(&target) {
414 if let Some(cas) = cas
415 && previous.seq() != cas
416 {
417 debug!(
418 ?target,
419 ?requester_id,
420 ?from,
421 "CAS mismatched, re-read value and try again."
422 );
423
424 return Some(MessageType::Error(ErrorSpecific {
425 code: 301,
426 description: "CAS mismatched, re-read value and try again."
427 .to_string(),
428 }));
429 };
430
431 if seq < previous.seq() {
432 debug!(
433 ?target,
434 ?requester_id,
435 ?from,
436 "Sequence number less than current."
437 );
438
439 return Some(MessageType::Error(ErrorSpecific {
440 code: 302,
441 description: "Sequence number less than current.".to_string(),
442 }));
443 }
444 }
445
446 match MutableItem::from_dht_message(target, &k, v, seq, &sig, salt) {
447 Ok(item) => {
448 self.mutable_values.put(target, item);
449
450 MessageType::Response(ResponseSpecific::Ping(PingResponseArguments {
451 responder_id: *routing_table.id(),
452 }))
453 }
454 Err(error) => {
455 debug!(?target, ?requester_id, ?from, ?error, "Invalid signature");
456
457 MessageType::Error(ErrorSpecific {
458 code: 206,
459 description: "Invalid signature".to_string(),
460 })
461 }
462 }
463 }
464 },
465 })
466 }
467
468 fn handle_get_mutable(
470 &mut self,
471 routing_table: &RoutingTable,
472 from: SocketAddrV4,
473 target: Id,
474 seq: Option<i64>,
475 ) -> ResponseSpecific {
476 match self.mutable_values.get(&target) {
477 Some(item) => {
478 let no_more_recent_values = seq.map(|request_seq| item.seq() <= request_seq);
479
480 match no_more_recent_values {
481 Some(true) => {
482 ResponseSpecific::NoMoreRecentValue(NoMoreRecentValueResponseArguments {
483 responder_id: *routing_table.id(),
484 token: self.tokens.generate_token(from).into(),
485 nodes: Some(routing_table.closest(target)),
486 seq: item.seq(),
487 })
488 }
489 _ => ResponseSpecific::GetMutable(GetMutableResponseArguments {
490 responder_id: *routing_table.id(),
491 token: self.tokens.generate_token(from).into(),
492 nodes: Some(routing_table.closest(target)),
493 v: item.value().into(),
494 k: *item.key(),
495 seq: item.seq(),
496 sig: *item.signature(),
497 }),
498 }
499 }
500 None => ResponseSpecific::NoValues(NoValuesResponseArguments {
501 responder_id: *routing_table.id(),
502 token: self.tokens.generate_token(from).into(),
503 nodes: Some(routing_table.closest(target)),
504 }),
505 }
506 }
507}