1#![allow(missing_docs)]
6
7mod internal;
8
9use std::convert::TryInto;
10use std::net::{Ipv4Addr, SocketAddrV4};
11
12use crate::common::{ID_SIZE, Id, Node};
13
14use super::InvalidIdSize;
15
16#[derive(Debug, PartialEq, Clone)]
17pub(crate) struct Message {
18 pub transaction_id: u32,
19
20 pub version: Option<[u8; 4]>,
22
23 pub requester_ip: Option<SocketAddrV4>,
26
27 pub message_type: MessageType,
28
29 pub read_only: bool,
32}
33
34#[derive(Debug, PartialEq, Clone)]
35pub enum MessageType {
36 Request(RequestSpecific),
37
38 Response(ResponseSpecific),
39
40 Error(ErrorSpecific),
41}
42
43#[derive(Debug, PartialEq, Clone)]
44pub struct ErrorSpecific {
45 pub code: i32,
46 pub description: String,
47}
48
49#[derive(Debug, PartialEq, Clone)]
54pub struct RequestSpecific {
55 pub requester_id: Id,
57 pub request_type: RequestTypeSpecific,
59}
60
61#[derive(Debug, PartialEq, Clone)]
63pub enum RequestTypeSpecific {
64 Ping,
66 FindNode(FindNodeRequestArguments),
68 GetPeers(GetPeersRequestArguments),
70 GetSignedPeers(GetPeersRequestArguments),
72 GetValue(GetValueRequestArguments),
74
75 Put(PutRequest),
77}
78
79#[derive(Debug, PartialEq, Clone)]
81pub struct PutRequest {
82 pub token: Box<[u8]>,
84 pub put_request_type: PutRequestSpecific,
86}
87
88#[derive(Debug, PartialEq, Clone)]
96pub enum PutRequestSpecific {
97 AnnouncePeer(AnnouncePeerRequestArguments),
99 AnnounceSignedPeer(AnnounceSignedPeerRequestArguments),
101 PutImmutable(PutImmutableRequestArguments),
103 PutMutable(PutMutableRequestArguments),
105}
106
107impl PutRequestSpecific {
108 pub fn target(&self) -> &Id {
113 match self {
114 PutRequestSpecific::AnnouncePeer(AnnouncePeerRequestArguments {
115 info_hash, ..
116 }) => info_hash,
117 PutRequestSpecific::AnnounceSignedPeer(AnnounceSignedPeerRequestArguments {
118 info_hash,
119 ..
120 }) => info_hash,
121 PutRequestSpecific::PutMutable(PutMutableRequestArguments { target, .. }) => target,
122 PutRequestSpecific::PutImmutable(PutImmutableRequestArguments { target, .. }) => target,
123 }
124 }
125}
126
127#[derive(Debug, PartialEq, Clone)]
128pub enum ResponseSpecific {
129 Ping(PingResponseArguments),
130 FindNode(FindNodeResponseArguments),
131 GetPeers(GetPeersResponseArguments),
132 GetSignedPeers(GetSignedPeersResponseArguments),
133 GetImmutable(GetImmutableResponseArguments),
134 GetMutable(GetMutableResponseArguments),
135 NoValues(NoValuesResponseArguments),
136 NoMoreRecentValue(NoMoreRecentValueResponseArguments),
137}
138
139#[derive(Debug, PartialEq, Clone)]
141pub struct PingResponseArguments {
142 pub responder_id: Id,
143}
144
145#[derive(Debug, PartialEq, Clone)]
147pub struct FindNodeRequestArguments {
148 pub target: Id,
149}
150
151#[derive(Debug, PartialEq, Clone)]
152pub struct FindNodeResponseArguments {
153 pub responder_id: Id,
154 pub nodes: Box<[Node]>,
155}
156
157#[derive(Debug, PartialEq, Clone)]
160pub struct GetValueRequestArguments {
161 pub target: Id,
162 pub seq: Option<i64>,
163 pub salt: Option<Box<[u8]>>,
167}
168
169#[derive(Debug, PartialEq, Clone)]
170pub struct NoValuesResponseArguments {
171 pub responder_id: Id,
172 pub token: Box<[u8]>,
173 pub nodes: Option<Box<[Node]>>,
174}
175
176#[derive(Debug, PartialEq, Clone)]
179pub struct GetPeersRequestArguments {
180 pub info_hash: Id,
181}
182
183#[derive(Debug, PartialEq, Clone)]
184pub struct GetPeersResponseArguments {
185 pub responder_id: Id,
186 pub token: Box<[u8]>,
187 pub values: Vec<SocketAddrV4>,
188 pub nodes: Option<Box<[Node]>>,
189}
190
191#[derive(Debug, PartialEq, Clone)]
192pub struct GetSignedPeersResponseArguments {
193 pub responder_id: Id,
194 pub token: Box<[u8]>,
195 pub peers: Vec<([u8; 32], u64, [u8; 64])>,
196 pub nodes: Option<Box<[Node]>>,
197}
198
199#[derive(Debug, PartialEq, Clone)]
202pub struct AnnouncePeerRequestArguments {
203 pub info_hash: Id,
204 pub port: u16,
205 pub implied_port: Option<bool>,
206}
207
208#[derive(Debug, PartialEq, Clone)]
209pub struct AnnounceSignedPeerRequestArguments {
210 pub info_hash: Id,
211 pub t: u64,
212 pub k: [u8; 32],
213 pub sig: [u8; 64],
214}
215
216#[derive(Debug, PartialEq, Clone)]
219pub struct GetImmutableResponseArguments {
220 pub responder_id: Id,
221 pub token: Box<[u8]>,
222 pub nodes: Option<Box<[Node]>>,
223 pub v: Box<[u8]>,
224}
225
226#[derive(Debug, PartialEq, Clone)]
229pub struct GetMutableResponseArguments {
230 pub responder_id: Id,
231 pub token: Box<[u8]>,
232 pub nodes: Option<Box<[Node]>>,
233 pub v: Box<[u8]>,
234 pub k: [u8; 32],
235 pub seq: i64,
236 pub sig: [u8; 64],
237}
238
239#[derive(Debug, PartialEq, Clone)]
240pub struct NoMoreRecentValueResponseArguments {
241 pub responder_id: Id,
242 pub token: Box<[u8]>,
243 pub nodes: Option<Box<[Node]>>,
244 pub seq: i64,
245}
246
247#[derive(Debug, PartialEq, Clone)]
250pub struct PutImmutableRequestArguments {
251 pub target: Id,
252 pub v: Box<[u8]>,
253}
254
255#[derive(Debug, PartialEq, Clone)]
258pub struct PutMutableRequestArguments {
259 pub target: Id,
260 pub v: Box<[u8]>,
261 pub k: [u8; 32],
262 pub seq: i64,
263 pub sig: [u8; 64],
264 pub salt: Option<Box<[u8]>>,
265 pub cas: Option<i64>,
266}
267
268impl Message {
269 fn into_serde_message(self) -> internal::DHTMessage {
270 internal::DHTMessage {
271 transaction_id: self.transaction_id.to_be_bytes().to_vec(),
272 version: self.version,
273 ip: self
274 .requester_ip
275 .map(|sockaddr| sockaddr_to_bytes(&sockaddr)),
276 read_only: if self.read_only { Some(1) } else { Some(0) },
277 variant: match self.message_type {
278 MessageType::Request(RequestSpecific {
279 requester_id,
280 request_type,
281 }) => internal::DHTMessageVariant::Request(match request_type {
282 RequestTypeSpecific::Ping => internal::DHTRequestSpecific::Ping {
283 arguments: internal::DHTPingRequestArguments {
284 id: requester_id.into(),
285 },
286 },
287 RequestTypeSpecific::FindNode(find_node_args) => {
288 internal::DHTRequestSpecific::FindNode {
289 arguments: internal::DHTFindNodeRequestArguments {
290 id: requester_id.into(),
291 target: find_node_args.target.into(),
292 },
293 }
294 }
295 RequestTypeSpecific::GetPeers(get_peers_args) => {
296 internal::DHTRequestSpecific::GetPeers {
297 arguments: internal::DHTGetPeersRequestArguments {
298 id: requester_id.into(),
299 info_hash: get_peers_args.info_hash.into(),
300 },
301 }
302 }
303 RequestTypeSpecific::GetSignedPeers(get_peers_args) => {
304 internal::DHTRequestSpecific::GetSignedPeers {
305 arguments: internal::DHTGetPeersRequestArguments {
306 id: requester_id.into(),
307 info_hash: get_peers_args.info_hash.into(),
308 },
309 }
310 }
311 RequestTypeSpecific::GetValue(get_mutable_args) => {
312 internal::DHTRequestSpecific::GetValue {
313 arguments: internal::DHTGetValueRequestArguments {
314 id: requester_id.into(),
315 target: get_mutable_args.target.into(),
316 seq: get_mutable_args.seq,
317 },
318 }
319 }
320 RequestTypeSpecific::Put(PutRequest {
321 token,
322 put_request_type,
323 }) => match put_request_type {
324 PutRequestSpecific::AnnouncePeer(announce_peer_args) => {
325 internal::DHTRequestSpecific::AnnouncePeer {
326 arguments: internal::DHTAnnouncePeerRequestArguments {
327 id: requester_id.into(),
328 token,
329
330 info_hash: announce_peer_args.info_hash.into(),
331 port: announce_peer_args.port,
332 implied_port: if announce_peer_args.implied_port.is_some() {
333 Some(1)
334 } else {
335 Some(0)
336 },
337 },
338 }
339 }
340 PutRequestSpecific::AnnounceSignedPeer(announce_signed_peer_args) => {
341 internal::DHTRequestSpecific::AnnounceSignedPeer {
342 arguments: internal::DHTAnnounceSignedPeerRequestArguments {
343 id: requester_id.into(),
344 token,
345
346 info_hash: announce_signed_peer_args.info_hash.into(),
347 t: announce_signed_peer_args.t as i64,
348 k: announce_signed_peer_args.k,
349 sig: announce_signed_peer_args.sig,
350 },
351 }
352 }
353 PutRequestSpecific::PutImmutable(put_immutable_arguments) => {
354 internal::DHTRequestSpecific::PutValue {
355 arguments: internal::DHTPutValueRequestArguments {
356 id: requester_id.into(),
357 token,
358
359 target: put_immutable_arguments.target.into(),
360 v: put_immutable_arguments.v,
361 k: None,
362 seq: None,
363 sig: None,
364 salt: None,
365 cas: None,
366 },
367 }
368 }
369 PutRequestSpecific::PutMutable(put_mutable_arguments) => {
370 internal::DHTRequestSpecific::PutValue {
371 arguments: internal::DHTPutValueRequestArguments {
372 id: requester_id.into(),
373 token,
374
375 target: put_mutable_arguments.target.into(),
376 v: put_mutable_arguments.v,
377 k: Some(put_mutable_arguments.k),
378 seq: Some(put_mutable_arguments.seq),
379 sig: Some(put_mutable_arguments.sig),
380 salt: put_mutable_arguments.salt,
381 cas: put_mutable_arguments.cas,
382 },
383 }
384 }
385 },
386 }),
387
388 MessageType::Response(res) => internal::DHTMessageVariant::Response(match res {
389 ResponseSpecific::Ping(ping_args) => internal::DHTResponseSpecific::Ping {
390 arguments: internal::DHTPingResponseArguments {
391 id: ping_args.responder_id.into(),
392 },
393 },
394 ResponseSpecific::FindNode(find_node_args) => {
395 internal::DHTResponseSpecific::FindNode {
396 arguments: internal::DHTFindNodeResponseArguments {
397 id: find_node_args.responder_id.into(),
398 nodes: nodes4_to_bytes(&find_node_args.nodes),
399 },
400 }
401 }
402 ResponseSpecific::GetPeers(get_peers_args) => {
403 internal::DHTResponseSpecific::GetPeers {
404 arguments: internal::DHTGetPeersResponseArguments {
405 id: get_peers_args.responder_id.into(),
406 token: get_peers_args.token,
407 nodes: get_peers_args
408 .nodes
409 .as_ref()
410 .map(|nodes| nodes4_to_bytes(nodes)),
411 values: peers_to_bytes(&get_peers_args.values),
412 },
413 }
414 }
415 ResponseSpecific::GetSignedPeers(get_peers_args) => {
416 internal::DHTResponseSpecific::GetSignedPeers {
417 arguments: internal::DHTGetSignedPeersResponseArguments {
418 id: get_peers_args.responder_id.into(),
419 token: get_peers_args.token,
420 nodes: get_peers_args
421 .nodes
422 .as_ref()
423 .map(|nodes| nodes4_to_bytes(nodes)),
424 peers: signed_peers_to_bytes(&get_peers_args.peers),
425 },
426 }
427 }
428 ResponseSpecific::NoValues(no_values_arguments) => {
429 internal::DHTResponseSpecific::NoValues {
430 arguments: internal::DHTNoValuesResponseArguments {
431 id: no_values_arguments.responder_id.into(),
432 token: no_values_arguments.token,
433 nodes: no_values_arguments
434 .nodes
435 .as_ref()
436 .map(|nodes| nodes4_to_bytes(nodes)),
437 },
438 }
439 }
440 ResponseSpecific::GetImmutable(get_immutable_args) => {
441 internal::DHTResponseSpecific::GetImmutable {
442 arguments: internal::DHTGetImmutableResponseArguments {
443 id: get_immutable_args.responder_id.into(),
444 token: get_immutable_args.token,
445 nodes: get_immutable_args
446 .nodes
447 .as_ref()
448 .map(|nodes| nodes4_to_bytes(nodes)),
449 v: get_immutable_args.v,
450 },
451 }
452 }
453 ResponseSpecific::GetMutable(get_mutable_args) => {
454 internal::DHTResponseSpecific::GetMutable {
455 arguments: internal::DHTGetMutableResponseArguments {
456 id: get_mutable_args.responder_id.into(),
457 token: get_mutable_args.token,
458 nodes: get_mutable_args
459 .nodes
460 .as_ref()
461 .map(|nodes| nodes4_to_bytes(nodes)),
462 v: get_mutable_args.v,
463 k: get_mutable_args.k,
464 seq: get_mutable_args.seq,
465 sig: get_mutable_args.sig,
466 },
467 }
468 }
469 ResponseSpecific::NoMoreRecentValue(args) => {
470 internal::DHTResponseSpecific::NoMoreRecentValue {
471 arguments: internal::DHTNoMoreRecentValueResponseArguments {
472 id: args.responder_id.into(),
473 token: args.token,
474 nodes: args.nodes.as_ref().map(|nodes| nodes4_to_bytes(nodes)),
475 seq: args.seq,
476 },
477 }
478 }
479 }),
480
481 MessageType::Error(err) => {
482 internal::DHTMessageVariant::Error(internal::DHTErrorSpecific {
483 error_info: (err.code, err.description),
484 })
485 }
486 },
487 }
488 }
489
490 fn from_serde_message(msg: internal::DHTMessage) -> Result<Message, DecodeMessageError> {
491 Ok(Message {
492 transaction_id: match *msg.transaction_id.as_slice() {
493 [a, b] => u16::from_be_bytes([a, b]) as u32,
494 [a, b, c, d] => u32::from_be_bytes([a, b, c, d]),
495 _ => return Err(DecodeMessageError::InvalidTransactionIdSize),
496 },
497 version: msg.version,
498 requester_ip: match msg.ip {
499 Some(ip) => Some(bytes_to_sockaddr(ip)?),
500 _ => None,
501 },
502 read_only: if let Some(read_only) = msg.read_only {
503 read_only > 0
504 } else {
505 false
506 },
507 message_type: match msg.variant {
508 internal::DHTMessageVariant::Request(req_variant) => {
509 MessageType::Request(match req_variant {
510 internal::DHTRequestSpecific::Ping { arguments } => RequestSpecific {
511 requester_id: Id::from_bytes(arguments.id)?,
512 request_type: RequestTypeSpecific::Ping,
513 },
514 internal::DHTRequestSpecific::FindNode { arguments } => RequestSpecific {
515 requester_id: Id::from_bytes(arguments.id)?,
516 request_type: RequestTypeSpecific::FindNode(FindNodeRequestArguments {
517 target: Id::from_bytes(arguments.target)?,
518 }),
519 },
520 internal::DHTRequestSpecific::GetPeers { arguments } => RequestSpecific {
521 requester_id: Id::from_bytes(arguments.id)?,
522 request_type: RequestTypeSpecific::GetPeers(GetPeersRequestArguments {
523 info_hash: Id::from_bytes(arguments.info_hash)?,
524 }),
525 },
526 internal::DHTRequestSpecific::GetSignedPeers { arguments } => {
527 RequestSpecific {
528 requester_id: Id::from_bytes(arguments.id)?,
529 request_type: RequestTypeSpecific::GetSignedPeers(
530 GetPeersRequestArguments {
531 info_hash: Id::from_bytes(arguments.info_hash)?,
532 },
533 ),
534 }
535 }
536
537 internal::DHTRequestSpecific::GetValue { arguments } => RequestSpecific {
538 requester_id: Id::from_bytes(arguments.id)?,
539
540 request_type: RequestTypeSpecific::GetValue(GetValueRequestArguments {
541 target: Id::from_bytes(arguments.target)?,
542 seq: arguments.seq,
543 salt: None,
544 }),
545 },
546 internal::DHTRequestSpecific::AnnouncePeer { arguments } => {
547 RequestSpecific {
548 requester_id: Id::from_bytes(arguments.id)?,
549 request_type: RequestTypeSpecific::Put(PutRequest {
550 token: arguments.token,
551 put_request_type: PutRequestSpecific::AnnouncePeer(
552 AnnouncePeerRequestArguments {
553 implied_port: arguments
554 .implied_port
555 .map(|implied_port| implied_port != 0),
556 info_hash: arguments.info_hash.into(),
557 port: arguments.port,
558 },
559 ),
560 }),
561 }
562 }
563 internal::DHTRequestSpecific::AnnounceSignedPeer { arguments } => {
564 RequestSpecific {
565 requester_id: Id::from_bytes(arguments.id)?,
566
567 request_type: RequestTypeSpecific::Put(PutRequest {
568 token: arguments.token,
569 put_request_type: PutRequestSpecific::AnnounceSignedPeer(
570 AnnounceSignedPeerRequestArguments {
571 info_hash: Id::from_bytes(arguments.info_hash)?,
572 t: arguments.t as u64,
573 k: arguments.k,
574 sig: arguments.sig,
575 },
576 ),
577 }),
578 }
579 }
580 internal::DHTRequestSpecific::PutValue { arguments } => {
581 if let Some(k) = arguments.k {
582 RequestSpecific {
583 requester_id: Id::from_bytes(arguments.id)?,
584
585 request_type: RequestTypeSpecific::Put(PutRequest {
586 token: arguments.token,
587 put_request_type: PutRequestSpecific::PutMutable(
588 PutMutableRequestArguments {
589 target: Id::from_bytes(arguments.target)?,
590 v: arguments.v,
591 k,
592 seq: arguments.seq.expect(
593 "Put mutable message to have sequence number",
594 ),
595 sig: arguments.sig.expect(
596 "Put mutable message to have a signature",
597 ),
598 salt: arguments.salt,
599 cas: arguments.cas,
600 },
601 ),
602 }),
603 }
604 } else {
605 RequestSpecific {
606 requester_id: Id::from_bytes(arguments.id)?,
607
608 request_type: RequestTypeSpecific::Put(PutRequest {
609 token: arguments.token,
610 put_request_type: PutRequestSpecific::PutImmutable(
611 PutImmutableRequestArguments {
612 target: Id::from_bytes(arguments.target)?,
613 v: arguments.v,
614 },
615 ),
616 }),
617 }
618 }
619 }
620 })
621 }
622
623 internal::DHTMessageVariant::Response(res_variant) => {
624 MessageType::Response(match res_variant {
625 internal::DHTResponseSpecific::Ping { arguments } => {
626 ResponseSpecific::Ping(PingResponseArguments {
627 responder_id: Id::from_bytes(arguments.id)?,
628 })
629 }
630 internal::DHTResponseSpecific::FindNode { arguments } => {
631 ResponseSpecific::FindNode(FindNodeResponseArguments {
632 responder_id: Id::from_bytes(arguments.id)?,
633 nodes: bytes_to_nodes4(&arguments.nodes)?,
634 })
635 }
636 internal::DHTResponseSpecific::GetPeers { arguments } => {
637 ResponseSpecific::GetPeers(GetPeersResponseArguments {
638 responder_id: Id::from_bytes(arguments.id)?,
639 token: arguments.token,
640 nodes: match arguments.nodes {
641 Some(nodes) => Some(bytes_to_nodes4(nodes)?),
642 None => None,
643 },
644 values: bytes_to_peers(arguments.values)?,
645 })
646 }
647 internal::DHTResponseSpecific::GetSignedPeers { arguments } => {
648 ResponseSpecific::GetSignedPeers(GetSignedPeersResponseArguments {
649 responder_id: Id::from_bytes(arguments.id)?,
650 token: arguments.token,
651 nodes: match arguments.nodes {
652 Some(nodes) => Some(bytes_to_nodes4(nodes)?),
653 None => None,
654 },
655 peers: bytes_to_signed_peers(arguments.peers)?,
656 })
657 }
658 internal::DHTResponseSpecific::NoValues { arguments } => {
659 ResponseSpecific::NoValues(NoValuesResponseArguments {
660 responder_id: Id::from_bytes(arguments.id)?,
661 token: arguments.token,
662 nodes: match arguments.nodes {
663 Some(nodes) => Some(bytes_to_nodes4(nodes)?),
664 None => None,
665 },
666 })
667 }
668 internal::DHTResponseSpecific::GetImmutable { arguments } => {
669 ResponseSpecific::GetImmutable(GetImmutableResponseArguments {
670 responder_id: Id::from_bytes(arguments.id)?,
671 token: arguments.token,
672 nodes: match arguments.nodes {
673 Some(nodes) => Some(bytes_to_nodes4(nodes)?),
674 None => None,
675 },
676 v: arguments.v,
677 })
678 }
679 internal::DHTResponseSpecific::GetMutable { arguments } => {
680 ResponseSpecific::GetMutable(GetMutableResponseArguments {
681 responder_id: Id::from_bytes(arguments.id)?,
682 token: arguments.token,
683 nodes: match arguments.nodes {
684 Some(nodes) => Some(bytes_to_nodes4(nodes)?),
685 None => None,
686 },
687 v: arguments.v,
688 k: arguments.k,
689 seq: arguments.seq,
690 sig: arguments.sig,
691 })
692 }
693 internal::DHTResponseSpecific::NoMoreRecentValue { arguments } => {
694 ResponseSpecific::NoMoreRecentValue(
695 NoMoreRecentValueResponseArguments {
696 responder_id: Id::from_bytes(arguments.id)?,
697 token: arguments.token,
698 nodes: match arguments.nodes {
699 Some(nodes) => Some(bytes_to_nodes4(nodes)?),
700 None => None,
701 },
702 seq: arguments.seq,
703 },
704 )
705 }
706 })
707 }
708
709 internal::DHTMessageVariant::Error(err) => MessageType::Error(ErrorSpecific {
710 code: err.error_info.0,
711 description: err.error_info.1,
712 }),
713 },
714 })
715 }
716
717 pub fn to_bytes(&self) -> Result<Vec<u8>, serde_bencode::Error> {
718 self.clone().into_serde_message().to_bytes()
719 }
720
721 pub fn from_bytes(bytes: &[u8]) -> Result<Message, DecodeMessageError> {
722 if bytes.len() < 15 {
723 return Err(DecodeMessageError::TooShort);
724 } else if bytes[0] != 100 {
725 return Err(DecodeMessageError::NotBencodeDictionary);
726 }
727
728 Message::from_serde_message(internal::DHTMessage::from_bytes(bytes)?)
729 }
730
731 pub fn get_author_id(&self) -> Option<Id> {
739 let id = match &self.message_type {
740 MessageType::Request(arguments) => arguments.requester_id,
741 MessageType::Response(response_variant) => match response_variant {
742 ResponseSpecific::Ping(arguments) => arguments.responder_id,
743 ResponseSpecific::FindNode(arguments) => arguments.responder_id,
744 ResponseSpecific::GetPeers(arguments) => arguments.responder_id,
745 ResponseSpecific::GetSignedPeers(arguments) => arguments.responder_id,
746 ResponseSpecific::GetImmutable(arguments) => arguments.responder_id,
747 ResponseSpecific::GetMutable(arguments) => arguments.responder_id,
748 ResponseSpecific::NoValues(arguments) => arguments.responder_id,
749 ResponseSpecific::NoMoreRecentValue(arguments) => arguments.responder_id,
750 },
751 MessageType::Error(_) => {
752 return None;
753 }
754 };
755
756 Some(id)
757 }
758
759 pub fn get_closer_nodes(&self) -> Option<&[Node]> {
761 match &self.message_type {
762 MessageType::Response(response_variant) => match response_variant {
763 ResponseSpecific::Ping(_) => None,
764 ResponseSpecific::FindNode(arguments) => Some(&arguments.nodes),
765 ResponseSpecific::GetPeers(arguments) => arguments.nodes.as_deref(),
766 ResponseSpecific::GetSignedPeers(arguments) => arguments.nodes.as_deref(),
767 ResponseSpecific::GetMutable(arguments) => arguments.nodes.as_deref(),
768 ResponseSpecific::GetImmutable(arguments) => arguments.nodes.as_deref(),
769 ResponseSpecific::NoValues(arguments) => arguments.nodes.as_deref(),
770 ResponseSpecific::NoMoreRecentValue(arguments) => arguments.nodes.as_deref(),
771 },
772 _ => None,
773 }
774 }
775
776 pub fn get_token(&self) -> Option<(Id, &[u8])> {
777 match &self.message_type {
778 MessageType::Response(response_variant) => match response_variant {
779 ResponseSpecific::Ping(_) => None,
780 ResponseSpecific::FindNode(_) => None,
781 ResponseSpecific::GetPeers(arguments) => {
782 Some((arguments.responder_id, &arguments.token))
783 }
784 ResponseSpecific::GetSignedPeers(arguments) => {
785 Some((arguments.responder_id, &arguments.token))
786 }
787 ResponseSpecific::GetImmutable(arguments) => {
788 Some((arguments.responder_id, &arguments.token))
789 }
790 ResponseSpecific::GetMutable(arguments) => {
791 Some((arguments.responder_id, &arguments.token))
792 }
793 ResponseSpecific::NoValues(arguments) => {
794 Some((arguments.responder_id, &arguments.token))
795 }
796 ResponseSpecific::NoMoreRecentValue(arguments) => {
797 Some((arguments.responder_id, &arguments.token))
798 }
799 },
800 _ => None,
801 }
802 }
803}
804
805fn bytes_to_sockaddr<T: AsRef<[u8]>>(bytes: T) -> Result<SocketAddrV4, DecodeMessageError> {
806 let bytes = bytes.as_ref();
807 match bytes.len() {
808 6 => {
809 let ip = Ipv4Addr::new(bytes[0], bytes[1], bytes[2], bytes[3]);
810
811 let port_bytes_as_array: [u8; 2] = bytes[4..6]
812 .try_into()
813 .map_err(|_| DecodeMessageError::InvalidPortEncoding)?;
814
815 let port: u16 = u16::from_be_bytes(port_bytes_as_array);
816
817 Ok(SocketAddrV4::new(ip, port))
818 }
819 18 => Err(DecodeMessageError::Ipv6Unsupported),
820 _ => Err(DecodeMessageError::InvalidSocketAddrEncodingLength),
821 }
822}
823
824pub fn sockaddr_to_bytes(sockaddr: &SocketAddrV4) -> [u8; 6] {
825 let mut bytes = [0u8; 6];
826
827 bytes[0..4].copy_from_slice(&sockaddr.ip().octets());
828
829 bytes[4..6].copy_from_slice(&sockaddr.port().to_be_bytes());
830
831 bytes
832}
833
834const NODE_BYTE_SIZE: usize = ID_SIZE + 6;
835
836fn nodes4_to_bytes(nodes: &[Node]) -> Box<[u8]> {
837 let mut bytes = Vec::with_capacity(NODE_BYTE_SIZE * nodes.len());
838
839 for node in nodes {
840 bytes.extend_from_slice(node.id().as_bytes());
841 bytes.extend_from_slice(&sockaddr_to_bytes(&node.address()));
842 }
843
844 bytes.into_boxed_slice()
845}
846
847fn bytes_to_nodes4<T: AsRef<[u8]>>(bytes: T) -> Result<Box<[Node]>, DecodeMessageError> {
848 let bytes = bytes.as_ref();
849
850 if bytes.len() % NODE_BYTE_SIZE != 0 {
851 return Err(DecodeMessageError::InvalidNodes4);
852 }
853
854 let expected_num = bytes.len() / NODE_BYTE_SIZE;
855 let mut to_ret = Vec::with_capacity(expected_num);
856 for i in 0..bytes.len() / NODE_BYTE_SIZE {
857 let i = i * NODE_BYTE_SIZE;
858 let id = Id::from_bytes(&bytes[i..i + ID_SIZE])?;
859 let sockaddr = bytes_to_sockaddr(&bytes[i + ID_SIZE..i + NODE_BYTE_SIZE])?;
860 let node = Node::new(id, sockaddr);
861 to_ret.push(node);
862 }
863
864 Ok(to_ret.into_boxed_slice())
865}
866
867fn peers_to_bytes(peers: &[SocketAddrV4]) -> Vec<serde_bytes::ByteBuf> {
868 peers
869 .iter()
870 .map(|p| serde_bytes::ByteBuf::from(sockaddr_to_bytes(p)))
871 .collect()
872}
873
874fn bytes_to_peers<T: AsRef<[serde_bytes::ByteBuf]>>(
875 bytes: T,
876) -> Result<Vec<SocketAddrV4>, DecodeMessageError> {
877 let bytes = bytes.as_ref();
878 bytes.iter().map(bytes_to_sockaddr).collect()
879}
880
881#[allow(clippy::type_complexity)]
882fn bytes_to_signed_peers<T: AsRef<[serde_bytes::ByteBuf]>>(
883 bytes: T,
884) -> Result<Vec<([u8; 32], u64, [u8; 64])>, DecodeMessageError> {
885 let bytes = bytes.as_ref();
886
887 bytes.iter().map(bytes_to_signed_peer).collect()
888}
889
890fn bytes_to_signed_peer<T: AsRef<[u8]>>(
891 bytes: T,
892) -> Result<([u8; 32], u64, [u8; 64]), DecodeMessageError> {
893 let bytes = bytes.as_ref();
894
895 if !bytes.len().is_multiple_of(104) {
896 return Err(DecodeMessageError::InvalidSignedPeersEncodingLength);
897 }
898
899 let t: [u8; 8] = bytes[32..40].try_into().expect("infallible");
900
901 Ok((
902 bytes[0..32].try_into().expect("infallible"),
903 u64::from_be_bytes(t),
904 bytes[40..104].try_into().expect("infallible"),
905 ))
906}
907
908fn signed_peers_to_bytes(peers: &[([u8; 32], u64, [u8; 64])]) -> Vec<serde_bytes::ByteBuf> {
909 peers
910 .iter()
911 .map(|p| serde_bytes::ByteBuf::from(signed_peer_to_bytes(p)))
912 .collect()
913}
914
915fn signed_peer_to_bytes(peer: &([u8; 32], u64, [u8; 64])) -> [u8; 104] {
916 let mut bytes = [0; 104];
917
918 bytes[0..32].copy_from_slice(&peer.0);
919 bytes[32..40].copy_from_slice(&peer.1.to_be_bytes());
920 bytes[40..].copy_from_slice(&peer.2);
921
922 bytes
923}
924
925#[n0_error::stack_error(derive, from_sources, std_sources)]
926pub enum DecodeMessageError {
928 #[error("Expected message to be longer than 15 characters")]
929 TooShort,
931
932 #[error("Expected message to start with 'd'")]
933 NotBencodeDictionary,
935
936 #[error("Wrong number of bytes for nodes")]
937 InvalidNodes4,
939
940 #[error("wrong number of bytes for port")]
941 InvalidPortEncoding,
943
944 #[error("IPv6 is not yet implemented")]
945 Ipv6Unsupported,
947
948 #[error("Wrong number of bytes for sockaddr")]
949 InvalidSocketAddrEncodingLength,
951
952 #[error(transparent)]
953 BencodeError(serde_bencode::Error),
955
956 #[error(transparent)]
957 InvalidIdSize(InvalidIdSize),
959
960 #[error("Invalid transaction id size (expected `[u8;2]` or `[u8;4]`)")]
961 InvalidTransactionIdSize,
963
964 #[error("Wrong number of bytes for signed peers")]
965 InvalidSignedPeersEncodingLength,
967}
968
969#[cfg(test)]
970mod tests {
971 use super::*;
972
973 #[test]
974 fn test_ping_request() {
975 let original_msg = Message {
976 transaction_id: 258,
977 version: None,
978 requester_ip: None,
979 read_only: false,
980 message_type: MessageType::Request(RequestSpecific {
981 requester_id: Id::random(),
982 request_type: RequestTypeSpecific::Ping,
983 }),
984 };
985
986 let serde_msg = original_msg.clone().into_serde_message();
987 let bytes = serde_msg.to_bytes().unwrap();
988 let parsed_serde_msg = internal::DHTMessage::from_bytes(&bytes).unwrap();
989 let parsed_msg = Message::from_serde_message(parsed_serde_msg).unwrap();
990 assert_eq!(parsed_msg, original_msg);
991 }
992
993 #[test]
994 fn test_ping_response() {
995 let original_msg = Message {
996 transaction_id: 258,
997 version: Some([0xde, 0xad, 0, 1]),
998 requester_ip: Some("99.100.101.102:1030".parse().unwrap()),
999 read_only: false,
1000 message_type: MessageType::Response(ResponseSpecific::Ping(PingResponseArguments {
1001 responder_id: Id::random(),
1002 })),
1003 };
1004
1005 let serde_msg = original_msg.clone().into_serde_message();
1006 let bytes = serde_msg.to_bytes().unwrap();
1007 let parsed_serde_msg = internal::DHTMessage::from_bytes(&bytes).unwrap();
1008 let parsed_msg = Message::from_serde_message(parsed_serde_msg).unwrap();
1009 assert_eq!(parsed_msg, original_msg);
1010 }
1011
1012 #[test]
1013 fn test_find_node_request() {
1014 let original_msg = Message {
1015 transaction_id: 258,
1016 version: Some([0x62, 0x61, 0x72, 0x66]),
1017 requester_ip: None,
1018 read_only: false,
1019 message_type: MessageType::Request(RequestSpecific {
1020 requester_id: Id::random(),
1021 request_type: RequestTypeSpecific::FindNode(FindNodeRequestArguments {
1022 target: Id::random(),
1023 }),
1024 }),
1025 };
1026
1027 let serde_msg = original_msg.clone().into_serde_message();
1028 let bytes = serde_msg.to_bytes().unwrap();
1029 let parsed_serde_msg = internal::DHTMessage::from_bytes(&bytes).unwrap();
1030 let parsed_msg = Message::from_serde_message(parsed_serde_msg).unwrap();
1031 assert_eq!(parsed_msg, original_msg);
1032 }
1033
1034 #[test]
1035 fn test_find_node_request_read_only() {
1036 let original_msg = Message {
1037 transaction_id: 258,
1038 version: Some([0x62, 0x61, 0x72, 0x66]),
1039 requester_ip: None,
1040 read_only: true,
1041 message_type: MessageType::Request(RequestSpecific {
1042 requester_id: Id::random(),
1043 request_type: RequestTypeSpecific::FindNode(FindNodeRequestArguments {
1044 target: Id::random(),
1045 }),
1046 }),
1047 };
1048
1049 let serde_msg = original_msg.clone().into_serde_message();
1050 let bytes = serde_msg.to_bytes().unwrap();
1051 let parsed_serde_msg = internal::DHTMessage::from_bytes(&bytes).unwrap();
1052 let parsed_msg = Message::from_serde_message(parsed_serde_msg).unwrap();
1053 assert_eq!(parsed_msg, original_msg);
1054 }
1055
1056 #[test]
1057 fn test_find_node_response() {
1058 let original_msg = Message {
1059 transaction_id: 258,
1060 version: Some([1, 2, 3, 4]),
1061 requester_ip: Some("50.51.52.53:5455".parse().unwrap()),
1062 read_only: false,
1063 message_type: MessageType::Response(ResponseSpecific::FindNode(
1064 FindNodeResponseArguments {
1065 responder_id: Id::random(),
1066 nodes: [Node::new(Id::random(), "49.50.52.52:5354".parse().unwrap())].into(),
1067 },
1068 )),
1069 };
1070
1071 let serde_msg = original_msg.clone().into_serde_message();
1072 let bytes = serde_msg.to_bytes().unwrap();
1073 let parsed_serde_msg = internal::DHTMessage::from_bytes(&bytes).unwrap();
1074 let parsed_msg = Message::from_serde_message(parsed_serde_msg).unwrap();
1075 assert_eq!(parsed_msg.get_author_id(), original_msg.get_author_id());
1076 assert_eq!(
1077 parsed_msg.get_closer_nodes().map(|nodes| nodes
1078 .iter()
1079 .map(|n| (n.id(), n.address()))
1080 .collect::<Vec<_>>()),
1081 original_msg.get_closer_nodes().map(|nodes| nodes
1082 .iter()
1083 .map(|n| (n.id(), n.address()))
1084 .collect::<Vec<_>>())
1085 );
1086 }
1087
1088 #[test]
1089 fn test_get_peers_request() {
1090 let original_msg = Message {
1091 transaction_id: 258,
1092 version: Some([72, 73, 0, 1]),
1093 requester_ip: None,
1094 read_only: false,
1095 message_type: MessageType::Request(RequestSpecific {
1096 requester_id: Id::random(),
1097 request_type: RequestTypeSpecific::GetPeers(GetPeersRequestArguments {
1098 info_hash: Id::random(),
1099 }),
1100 }),
1101 };
1102
1103 let serde_msg = original_msg.clone().into_serde_message();
1104 let bytes = serde_msg.to_bytes().unwrap();
1105 let parsed_serde_msg = internal::DHTMessage::from_bytes(&bytes).unwrap();
1106 let parsed_msg = Message::from_serde_message(parsed_serde_msg).unwrap();
1107 assert_eq!(parsed_msg, original_msg);
1108 }
1109
1110 #[test]
1111 fn test_get_peers_response() {
1112 let original_msg = Message {
1113 transaction_id: 3,
1114 version: Some([1, 2, 3, 4]),
1115 requester_ip: Some("50.51.52.53:5455".parse().unwrap()),
1116 read_only: true,
1117 message_type: MessageType::Response(ResponseSpecific::NoValues(
1118 NoValuesResponseArguments {
1119 responder_id: Id::random(),
1120 token: [99, 100, 101, 102].into(),
1121 nodes: Some(
1122 [Node::new(Id::random(), "49.50.52.52:5354".parse().unwrap())].into(),
1123 ),
1124 },
1125 )),
1126 };
1127
1128 let serde_msg = original_msg.clone().into_serde_message();
1129 let bytes = serde_msg.to_bytes().unwrap();
1130 let parsed_serde_msg = internal::DHTMessage::from_bytes(&bytes).unwrap();
1131 let parsed_msg = Message::from_serde_message(parsed_serde_msg).unwrap();
1132
1133 assert_eq!(parsed_msg.transaction_id, original_msg.transaction_id);
1134 assert_eq!(parsed_msg.version, original_msg.version);
1135 assert_eq!(parsed_msg.requester_ip, original_msg.requester_ip);
1136 assert_eq!(parsed_msg.get_author_id(), original_msg.get_author_id());
1137 assert_eq!(
1138 parsed_msg.get_closer_nodes().map(|nodes| nodes
1139 .iter()
1140 .map(|n| (n.id(), n.address()))
1141 .collect::<Vec<_>>()),
1142 original_msg.get_closer_nodes().map(|nodes| nodes
1143 .iter()
1144 .map(|n| (n.id(), n.address()))
1145 .collect::<Vec<_>>())
1146 );
1147 }
1148
1149 #[test]
1150 fn test_get_peers_response_peers() {
1151 let original_msg = Message {
1152 transaction_id: 3,
1153 version: Some([1, 2, 3, 4]),
1154 requester_ip: Some("50.51.52.53:5455".parse().unwrap()),
1155 read_only: false,
1156 message_type: MessageType::Response(ResponseSpecific::GetPeers(
1157 GetPeersResponseArguments {
1158 responder_id: Id::random(),
1159 token: vec![99, 100, 101, 102].into(),
1160 nodes: None,
1161 values: ["123.123.123.123:123".parse().unwrap()].into(),
1162 },
1163 )),
1164 };
1165
1166 let serde_msg = original_msg.clone().into_serde_message();
1167 let bytes = serde_msg.to_bytes().unwrap();
1168 let parsed_serde_msg = internal::DHTMessage::from_bytes(&bytes).unwrap();
1169 let parsed_msg = Message::from_serde_message(parsed_serde_msg).unwrap();
1170 assert_eq!(parsed_msg, original_msg);
1171 }
1172
1173 #[test]
1174 fn test_get_peers_response_neither() {
1175 let serde_message = internal::DHTMessage {
1176 ip: None,
1177 read_only: None,
1178 transaction_id: vec![1, 2], version: None,
1180 variant: internal::DHTMessageVariant::Response(
1181 internal::DHTResponseSpecific::NoValues {
1182 arguments: internal::DHTNoValuesResponseArguments {
1183 id: Id::random().into(),
1184 token: vec![0, 1].into(),
1185 nodes: None,
1186 },
1187 },
1188 ),
1189 };
1190 let parsed_msg = Message::from_serde_message(serde_message).unwrap();
1191 assert!(matches!(
1192 parsed_msg.message_type,
1193 MessageType::Response(ResponseSpecific::NoValues(NoValuesResponseArguments { .. }))
1194 ));
1195 }
1196
1197 #[test]
1198 fn test_get_immutable_request() {
1199 let original_msg = Message {
1200 transaction_id: 258,
1201 version: Some([72, 73, 0, 1]),
1202 requester_ip: None,
1203 read_only: false,
1204 message_type: MessageType::Request(RequestSpecific {
1205 requester_id: Id::random(),
1206 request_type: RequestTypeSpecific::GetValue(GetValueRequestArguments {
1207 target: Id::random(),
1208 seq: Some(1231),
1209 salt: None,
1210 }),
1211 }),
1212 };
1213
1214 let serde_msg = original_msg.clone().into_serde_message();
1215 let bytes = serde_msg.to_bytes().unwrap();
1216 let parsed_serde_msg = internal::DHTMessage::from_bytes(&bytes).unwrap();
1217 let parsed_msg = Message::from_serde_message(parsed_serde_msg).unwrap();
1218 assert_eq!(parsed_msg, original_msg);
1219 }
1220
1221 #[test]
1222 fn test_get_immutable_response() {
1223 let original_msg = Message {
1224 transaction_id: 3,
1225 version: Some([1, 2, 3, 4]),
1226 requester_ip: Some("50.51.52.53:5455".parse().unwrap()),
1227 read_only: false,
1228 message_type: MessageType::Response(ResponseSpecific::GetImmutable(
1229 GetImmutableResponseArguments {
1230 responder_id: Id::random(),
1231 token: [99, 100, 101, 102].into(),
1232 nodes: None,
1233 v: [99, 100, 101, 102].into(),
1234 },
1235 )),
1236 };
1237
1238 let serde_msg = original_msg.clone().into_serde_message();
1239 let bytes = serde_msg.to_bytes().unwrap();
1240 let parsed_serde_msg = internal::DHTMessage::from_bytes(&bytes).unwrap();
1241 let parsed_msg = Message::from_serde_message(parsed_serde_msg).unwrap();
1242 assert_eq!(parsed_msg, original_msg);
1243 }
1244
1245 #[test]
1246 fn test_put_immutable_request() {
1247 let original_msg = Message {
1248 transaction_id: 3,
1249 version: Some([1, 2, 3, 4]),
1250 requester_ip: Some("50.51.52.53:5455".parse().unwrap()),
1251 read_only: false,
1252 message_type: MessageType::Request(RequestSpecific {
1253 requester_id: Id::random(),
1254 request_type: RequestTypeSpecific::Put(PutRequest {
1255 token: [99, 100, 101, 102].into(),
1256 put_request_type: PutRequestSpecific::PutImmutable(
1257 PutImmutableRequestArguments {
1258 target: Id::random(),
1259 v: [99, 100, 101, 102].into(),
1260 },
1261 ),
1262 }),
1263 }),
1264 };
1265
1266 let serde_msg = original_msg.clone().into_serde_message();
1267 let bytes = serde_msg.to_bytes().unwrap();
1268 let parsed_serde_msg = internal::DHTMessage::from_bytes(&bytes).unwrap();
1269 let parsed_msg = Message::from_serde_message(parsed_serde_msg).unwrap();
1270 assert_eq!(parsed_msg, original_msg);
1271 }
1272
1273 #[test]
1274 fn test_put_mutable_request() {
1275 let original_msg = Message {
1276 transaction_id: 3,
1277 version: Some([1, 2, 3, 4]),
1278 requester_ip: Some("50.51.52.53:5455".parse().unwrap()),
1279 read_only: false,
1280 message_type: MessageType::Request(RequestSpecific {
1281 requester_id: Id::random(),
1282 request_type: RequestTypeSpecific::Put(PutRequest {
1283 token: [99, 100, 101, 102].into(),
1284 put_request_type: PutRequestSpecific::PutMutable(PutMutableRequestArguments {
1285 target: Id::random(),
1286 v: [99, 100, 101, 102].into(),
1287 k: [100; 32],
1288 seq: 100,
1289 sig: [0; 64],
1290 salt: Some([0, 2, 4, 8].into()),
1291 cas: Some(100),
1292 }),
1293 }),
1294 }),
1295 };
1296
1297 let serde_msg = original_msg.clone().into_serde_message();
1298 let bytes = serde_msg.to_bytes().unwrap();
1299 let parsed_serde_msg = internal::DHTMessage::from_bytes(&bytes).unwrap();
1300 let parsed_msg = Message::from_serde_message(parsed_serde_msg).unwrap();
1301 assert_eq!(parsed_msg, original_msg);
1302 }
1303}