librqbit_peer_protocol/extended/
ut_pex.rs1use std::net::{IpAddr, SocketAddr};
2
3use buffers::ByteBufOwned;
4use byteorder::{ByteOrder, BE};
5use bytes::{Bytes, BytesMut};
6use clone_to_owned::CloneToOwned;
7use serde::{Deserialize, Serialize};
8
9pub struct PexPeerInfo {
10 pub flags: u8,
11 pub addr: SocketAddr,
12}
13
14impl core::fmt::Debug for PexPeerInfo {
15 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16 write!(f, "{}", self.addr)?;
17 if self.flags != 0 {
18 write!(f, ";flags={}", self.flags)?;
19 }
20 Ok(())
21 }
22}
23
24#[derive(Serialize, Default, Deserialize)]
25pub struct UtPex<B> {
26 #[serde(skip_serializing_if = "Option::is_none")]
27 added: Option<B>,
28 #[serde(rename = "added.f")]
29 #[serde(skip_serializing_if = "Option::is_none")]
30 added_f: Option<B>,
31 #[serde(skip_serializing_if = "Option::is_none")]
32 added6: Option<B>,
33 #[serde(rename = "added6.f")]
34 #[serde(skip_serializing_if = "Option::is_none")]
35 added6_f: Option<B>,
36 #[serde(skip_serializing_if = "Option::is_none")]
37 dropped: Option<B>,
38 #[serde(skip_serializing_if = "Option::is_none")]
39 dropped6: Option<B>,
40}
41
42impl<B> core::fmt::Debug for UtPex<B>
43where
44 B: AsRef<[u8]>,
45{
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 struct IterDebug<I>(I);
48 impl<I> core::fmt::Debug for IterDebug<I>
49 where
50 I: Iterator<Item = PexPeerInfo> + Clone,
51 {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 f.debug_list().entries(self.0.clone()).finish()
54 }
55 }
56 f.debug_struct("UtPex")
57 .field("added", &IterDebug(self.added_peers()))
58 .field("dropped", &IterDebug(self.dropped_peers()))
59 .finish()
60 }
61}
62
63impl<B> CloneToOwned for UtPex<B>
64where
65 B: CloneToOwned,
66{
67 type Target = UtPex<<B as CloneToOwned>::Target>;
68 fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target {
69 UtPex {
70 added: self.added.clone_to_owned(within_buffer),
71 added_f: self.added_f.clone_to_owned(within_buffer),
72 added6: self.added6.clone_to_owned(within_buffer),
73 added6_f: self.added6_f.clone_to_owned(within_buffer),
74 dropped: self.dropped.clone_to_owned(within_buffer),
75 dropped6: self.dropped6.clone_to_owned(within_buffer),
76 }
77 }
78}
79
80impl<B> UtPex<B>
81where
82 B: AsRef<[u8]>,
83{
84 fn added_peers_inner<'a>(
85 &'a self,
86 buf: &'a Option<B>,
87 flags: &'a Option<B>,
88 ip_len: usize,
89 ) -> impl Iterator<Item = PexPeerInfo> + Clone + 'a {
90 const PORT_LEN: usize = 2;
91 const DEFAULT_FLAGS: u8 = 0;
92 let addrs = buf
93 .as_ref()
94 .into_iter()
95 .flat_map(move |it| it.as_ref().chunks_exact(ip_len + PORT_LEN))
96 .map(move |c| {
97 let ip = match ip_len {
98 4 => IpAddr::from(TryInto::<[u8; 4]>::try_into(&c[..4]).unwrap()),
99 16 => IpAddr::from(TryInto::<[u8; 16]>::try_into(&c[..16]).unwrap()),
100 _ => unreachable!(),
101 };
102 let port = BE::read_u16(&c[ip_len..]);
103 SocketAddr::new(ip, port)
104 });
105 addrs.enumerate().map(move |(id, addr)| PexPeerInfo {
106 addr,
107 flags: flags
108 .as_ref()
109 .and_then(|f| f.as_ref().get(id).copied())
110 .unwrap_or(DEFAULT_FLAGS),
111 })
112 }
113
114 pub fn added_peers(&self) -> impl Iterator<Item = PexPeerInfo> + Clone + '_ {
115 self.added_peers_inner(&self.added, &self.added_f, 4)
116 .chain(self.added_peers_inner(&self.added6, &self.added6_f, 16))
117 }
118
119 pub fn dropped_peers(&self) -> impl Iterator<Item = PexPeerInfo> + Clone + '_ {
120 self.added_peers_inner(&self.dropped, &None, 4)
121 .chain(self.added_peers_inner(&self.dropped6, &None, 16))
122 }
123}
124
125impl UtPex<ByteBufOwned> {
126 pub fn from_addrs<'a, I, J>(addrs_live: I, addrs_closed: J) -> Self
127 where
128 I: IntoIterator<Item = &'a SocketAddr>,
129 J: IntoIterator<Item = &'a SocketAddr>,
130 {
131 fn addrs_to_bytes<'a, I>(addrs: I) -> (Option<ByteBufOwned>, Option<ByteBufOwned>)
132 where
133 I: IntoIterator<Item = &'a SocketAddr>,
134 {
135 let mut ipv4_addrs = BytesMut::new();
136 let mut ipv6_addrs = BytesMut::new();
137 for addr in addrs {
138 match addr {
139 SocketAddr::V4(v4) => {
140 ipv4_addrs.extend_from_slice(&v4.ip().octets());
141 ipv4_addrs.extend_from_slice(&v4.port().to_be_bytes());
142 }
143 SocketAddr::V6(v6) => {
144 ipv6_addrs.extend_from_slice(&v6.ip().octets());
145 ipv6_addrs.extend_from_slice(&v6.port().to_be_bytes());
146 }
147 }
148 }
149
150 let freeze = |buf: BytesMut| -> Option<ByteBufOwned> {
151 if !buf.is_empty() {
152 Some(buf.freeze().into())
153 } else {
154 None
155 }
156 };
157
158 (freeze(ipv4_addrs), freeze(ipv6_addrs))
159 }
160
161 let (added, added6) = addrs_to_bytes(addrs_live);
162 let (dropped, dropped6) = addrs_to_bytes(addrs_closed);
163
164 Self {
165 added,
166 added6,
167 dropped,
168 dropped6,
169 ..Default::default()
170 }
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use bencode::{bencode_serialize_to_writer, from_bytes};
177 use buffers::ByteBuf;
178
179 use super::*;
180
181 fn decode_hex(s: &str) -> Vec<u8> {
182 assert!(s.len() % 2 == 0);
183 (0..s.len())
184 .step_by(2)
185 .map(|i| u8::from_str_radix(&s[i..i + 2], 16).unwrap())
186 .collect()
187 }
188
189 #[test]
190 fn test_pex_deserialization() {
191 let msg = "64353a616464656431323ab99f9d14b56797f969861090373a61646465642e66323a0c00363a616464656436303a383a6164646564362e66303a373a64726f70706564303a383a64726f7070656436303a65";
192 let bytes = decode_hex(msg);
193 let pex = from_bytes::<UtPex<ByteBuf>>(&bytes).unwrap();
194 let addrs: Vec<_> = pex.added_peers().collect();
195 assert_eq!(2, addrs.len());
196 assert_eq!(
197 "185.159.157.20:46439".parse::<SocketAddr>().unwrap(),
198 addrs[0].addr
199 );
200 assert_eq!(12, addrs[0].flags);
201 assert_eq!(
202 "151.249.105.134:4240".parse::<SocketAddr>().unwrap(),
203 addrs[1].addr
204 );
205 assert_eq!(0, addrs[1].flags);
206 }
207
208 #[test]
209 fn test_pex_roundtrip() {
210 let a1 = "185.159.157.20:46439".parse::<SocketAddr>().unwrap();
211 let a2 = "151.249.105.134:4240".parse::<SocketAddr>().unwrap();
212 let aa1 = "[5be8:dde9:7f0b:d5a7:bd01:b3be:9c69:573b]:46439"
214 .parse::<SocketAddr>()
215 .unwrap();
216 let aa2 = "[f16c:f7ec:cfa2:e1c5:9a3c:cb08:801f:36b8]:4240"
217 .parse::<SocketAddr>()
218 .unwrap();
219
220 let addrs = vec![a1, aa1, a2, aa2];
221 let pex = UtPex::from_addrs(&addrs, &addrs);
222 let mut bytes = Vec::new();
223 bencode_serialize_to_writer(&pex, &mut bytes).unwrap();
224 let pex2 = from_bytes::<UtPex<ByteBuf>>(&bytes).unwrap();
225 assert_eq!(4, pex2.added_peers().count());
226 assert_eq!(pex.added_peers().count(), pex2.added_peers().count());
227 let addrs2: Vec<_> = pex2.added_peers().collect();
228 assert_eq!(a1, addrs2[0].addr);
229 assert_eq!(a2, addrs2[1].addr);
230 assert_eq!(aa1, addrs2[2].addr);
231 assert_eq!(aa2, addrs2[3].addr);
232 let addrs2: Vec<_> = pex2.dropped_peers().collect();
233 assert_eq!(a1, addrs2[0].addr);
234 assert_eq!(a2, addrs2[1].addr);
235 assert_eq!(aa1, addrs2[2].addr);
236 assert_eq!(aa2, addrs2[3].addr);
237 }
238}