1use std::{mem, net};
2
3use bytes::Buf;
4use bytes::BufMut;
5use cyphernet::addr::{tor, HostName, NetAddr};
6use radicle::crypto::Signature;
7use radicle::git::Oid;
8use radicle::identity::RepoId;
9use radicle::node::Address;
10use radicle::node::NodeId;
11use radicle::node::Timestamp;
12
13use crate::bounded::BoundedVec;
14use crate::service::filter::Filter;
15use crate::service::message::*;
16use crate::wire;
17
18#[repr(u16)]
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum MessageType {
22 NodeAnnouncement = 2,
23 InventoryAnnouncement = 4,
24 RefsAnnouncement = 6,
25 Subscribe = 8,
26 Ping = 10,
27 Pong = 12,
28 Info = 14,
29}
30
31impl From<MessageType> for u16 {
32 fn from(other: MessageType) -> Self {
33 other as u16
34 }
35}
36
37impl TryFrom<u16> for MessageType {
38 type Error = u16;
39
40 fn try_from(other: u16) -> Result<Self, Self::Error> {
41 match other {
42 2 => Ok(MessageType::NodeAnnouncement),
43 4 => Ok(MessageType::InventoryAnnouncement),
44 6 => Ok(MessageType::RefsAnnouncement),
45 8 => Ok(MessageType::Subscribe),
46 10 => Ok(MessageType::Ping),
47 12 => Ok(MessageType::Pong),
48 14 => Ok(MessageType::Info),
49 _ => Err(other),
50 }
51 }
52}
53
54impl Message {
55 pub const MAX_SIZE: wire::Size =
57 wire::Size::MAX - (mem::size_of::<MessageType>() as wire::Size);
58
59 pub fn type_id(&self) -> u16 {
60 match self {
61 Self::Subscribe { .. } => MessageType::Subscribe,
62 Self::Announcement(Announcement { message, .. }) => match message {
63 AnnouncementMessage::Node(_) => MessageType::NodeAnnouncement,
64 AnnouncementMessage::Inventory(_) => MessageType::InventoryAnnouncement,
65 AnnouncementMessage::Refs(_) => MessageType::RefsAnnouncement,
66 },
67 Self::Info(_) => MessageType::Info,
68 Self::Ping { .. } => MessageType::Ping,
69 Self::Pong { .. } => MessageType::Pong,
70 }
71 .into()
72 }
73}
74
75#[repr(u8)]
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum AddressType {
79 Ipv4 = 1,
80 Ipv6 = 2,
81 Dns = 3,
82 Onion = 4,
83}
84
85impl From<AddressType> for u8 {
86 fn from(other: AddressType) -> Self {
87 other as u8
88 }
89}
90
91impl From<&Address> for AddressType {
92 fn from(a: &Address) -> Self {
93 match a.host {
94 HostName::Ip(net::IpAddr::V4(_)) => AddressType::Ipv4,
95 HostName::Ip(net::IpAddr::V6(_)) => AddressType::Ipv6,
96 HostName::Dns(_) => AddressType::Dns,
97 HostName::Tor(_) => AddressType::Onion,
98 _ => todo!(), }
100 }
101}
102
103impl TryFrom<u8> for AddressType {
104 type Error = u8;
105
106 fn try_from(other: u8) -> Result<Self, Self::Error> {
107 match other {
108 1 => Ok(AddressType::Ipv4),
109 2 => Ok(AddressType::Ipv6),
110 3 => Ok(AddressType::Dns),
111 4 => Ok(AddressType::Onion),
112 _ => Err(other),
113 }
114 }
115}
116
117impl wire::Encode for AnnouncementMessage {
118 fn encode(&self, buf: &mut impl BufMut) {
119 match self {
120 Self::Node(ann) => ann.encode(buf),
121 Self::Inventory(ann) => ann.encode(buf),
122 Self::Refs(ann) => ann.encode(buf),
123 }
124 }
125}
126
127impl wire::Encode for RefsAnnouncement {
128 fn encode(&self, buf: &mut impl BufMut) {
129 self.rid.encode(buf);
130 self.refs.encode(buf);
131 self.timestamp.encode(buf);
132 }
133}
134
135impl wire::Decode for RefsAnnouncement {
136 fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
137 let rid = RepoId::decode(buf)?;
138 let refs = BoundedVec::<_, REF_REMOTE_LIMIT>::decode(buf)?;
139 let timestamp = Timestamp::decode(buf)?;
140
141 Ok(Self {
142 rid,
143 refs,
144 timestamp,
145 })
146 }
147}
148
149impl wire::Encode for InventoryAnnouncement {
150 fn encode(&self, buf: &mut impl BufMut) {
151 self.inventory.encode(buf);
152 self.timestamp.encode(buf);
153 }
154}
155
156impl wire::Decode for InventoryAnnouncement {
157 fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
158 let inventory = BoundedVec::decode(buf)?;
159 let timestamp = Timestamp::decode(buf)?;
160
161 Ok(Self {
162 inventory,
163 timestamp,
164 })
165 }
166}
167
168#[repr(u8)]
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum InfoType {
173 RefsAlreadySynced = 1,
174}
175
176impl From<InfoType> for u16 {
177 fn from(other: InfoType) -> Self {
178 other as u16
179 }
180}
181
182impl TryFrom<u16> for InfoType {
183 type Error = u16;
184
185 fn try_from(other: u16) -> Result<Self, Self::Error> {
186 match other {
187 1 => Ok(Self::RefsAlreadySynced),
188 n => Err(n),
189 }
190 }
191}
192
193impl From<Info> for InfoType {
194 fn from(info: Info) -> Self {
195 (&info).into()
196 }
197}
198
199impl From<&Info> for InfoType {
200 fn from(info: &Info) -> Self {
201 match info {
202 Info::RefsAlreadySynced { .. } => Self::RefsAlreadySynced,
203 }
204 }
205}
206
207impl wire::Encode for Info {
208 fn encode(&self, buf: &mut impl BufMut) {
209 u16::from(InfoType::from(self)).encode(buf);
210 match self {
211 Info::RefsAlreadySynced { rid, at } => {
212 rid.encode(buf);
213 at.encode(buf);
214 }
215 }
216 }
217}
218
219impl wire::Decode for Info {
220 fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
221 let info_type = buf.try_get_u16()?;
222
223 match InfoType::try_from(info_type) {
224 Ok(InfoType::RefsAlreadySynced) => {
225 let rid = RepoId::decode(buf)?;
226 let at = Oid::decode(buf)?;
227
228 Ok(Self::RefsAlreadySynced { rid, at })
229 }
230 Err(other) => Err(wire::Invalid::InfoMessageType { actual: other }.into()),
231 }
232 }
233}
234
235impl wire::Encode for Message {
236 fn encode(&self, buf: &mut impl BufMut) {
237 let buf = &mut buf.limit(wire::Size::MAX as usize);
238
239 self.type_id().encode(buf);
240
241 match self {
242 Self::Subscribe(Subscribe {
243 filter,
244 since,
245 until,
246 }) => {
247 filter.encode(buf);
248 since.encode(buf);
249 until.encode(buf);
250 }
251 Self::Announcement(Announcement {
252 node,
253 message,
254 signature,
255 }) => {
256 node.encode(buf);
257 signature.encode(buf);
258 message.encode(buf);
259 }
260 Self::Info(info) => {
261 info.encode(buf);
262 }
263 Self::Ping(Ping { ponglen, zeroes }) => {
264 ponglen.encode(buf);
265 zeroes.encode(buf);
266 }
267 Self::Pong { zeroes } => {
268 zeroes.encode(buf);
269 }
270 }
271 }
272}
273
274impl wire::Decode for Message {
275 fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
276 let type_id = buf.try_get_u16()?;
277
278 match MessageType::try_from(type_id) {
279 Ok(MessageType::Subscribe) => {
280 let filter = Filter::decode(buf)?;
281 let since = Timestamp::decode(buf)?;
282 let until = Timestamp::decode(buf)?;
283
284 Ok(Self::Subscribe(Subscribe {
285 filter,
286 since,
287 until,
288 }))
289 }
290 Ok(MessageType::NodeAnnouncement) => {
291 let node = NodeId::decode(buf)?;
292 let signature = Signature::decode(buf)?;
293 let message = NodeAnnouncement::decode(buf)?.into();
294
295 Ok(Announcement {
296 node,
297 message,
298 signature,
299 }
300 .into())
301 }
302 Ok(MessageType::InventoryAnnouncement) => {
303 let node = NodeId::decode(buf)?;
304 let signature = Signature::decode(buf)?;
305 let message = InventoryAnnouncement::decode(buf)?.into();
306
307 Ok(Announcement {
308 node,
309 message,
310 signature,
311 }
312 .into())
313 }
314 Ok(MessageType::RefsAnnouncement) => {
315 let node = NodeId::decode(buf)?;
316 let signature = Signature::decode(buf)?;
317 let message = RefsAnnouncement::decode(buf)?.into();
318
319 Ok(Announcement {
320 node,
321 message,
322 signature,
323 }
324 .into())
325 }
326 Ok(MessageType::Info) => {
327 let info = Info::decode(buf)?;
328 Ok(Self::Info(info))
329 }
330 Ok(MessageType::Ping) => {
331 let ponglen = u16::decode(buf)?;
332 let zeroes = ZeroBytes::decode(buf)?;
333 Ok(Self::Ping(Ping { ponglen, zeroes }))
334 }
335 Ok(MessageType::Pong) => {
336 let zeroes = ZeroBytes::decode(buf)?;
337 Ok(Self::Pong { zeroes })
338 }
339 Err(other) => Err(wire::Invalid::MessageType { actual: other }.into()),
340 }
341 }
342}
343
344impl wire::Encode for Address {
345 fn encode(&self, buf: &mut impl BufMut) {
346 match self.host {
347 HostName::Ip(net::IpAddr::V4(ip)) => {
348 u8::from(AddressType::Ipv4).encode(buf);
349 ip.octets().encode(buf);
350 }
351 HostName::Ip(net::IpAddr::V6(ip)) => {
352 u8::from(AddressType::Ipv6).encode(buf);
353 ip.octets().encode(buf);
354 }
355 HostName::Dns(ref dns) => {
356 u8::from(AddressType::Dns).encode(buf);
357 dns.encode(buf);
358 }
359 HostName::Tor(addr) => {
360 u8::from(AddressType::Onion).encode(buf);
361 addr.encode(buf);
362 }
363 _ => {
364 unimplemented!(
365 "Encoding not defined for addresses of the same type as the following: {:?}",
366 self.host
367 );
368 }
369 }
370 self.port().encode(buf);
371 }
372}
373
374impl wire::Decode for Address {
375 fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
376 let addrtype = buf.try_get_u8()?;
377
378 let host = match AddressType::try_from(addrtype) {
379 Ok(AddressType::Ipv4) => {
380 let octets: [u8; 4] = wire::Decode::decode(buf)?;
381 let ip = net::Ipv4Addr::from(octets);
382
383 HostName::Ip(net::IpAddr::V4(ip))
384 }
385 Ok(AddressType::Ipv6) => {
386 let octets: [u8; 16] = wire::Decode::decode(buf)?;
387 let ip = net::Ipv6Addr::from(octets);
388
389 HostName::Ip(net::IpAddr::V6(ip))
390 }
391 Ok(AddressType::Dns) => {
392 let dns: String = wire::Decode::decode(buf)?;
393
394 HostName::Dns(dns)
395 }
396 Ok(AddressType::Onion) => {
397 let onion: tor::OnionAddrV3 = wire::Decode::decode(buf)?;
398
399 HostName::Tor(onion)
400 }
401 Err(other) => return Err(wire::Invalid::AddressType { actual: other }.into()),
402 };
403 let port = u16::decode(buf)?;
404
405 Ok(Self::from(NetAddr { host, port }))
406 }
407}
408
409impl wire::Encode for ZeroBytes {
410 fn encode(&self, buf: &mut impl BufMut) {
411 (self.len() as u16).encode(buf);
412 buf.put_bytes(0u8, self.len());
413 }
414}
415
416impl wire::Decode for ZeroBytes {
417 fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
418 let zeroes = u16::decode(buf)?;
419 for _ in 0..zeroes {
420 _ = u8::decode(buf)?;
421 }
422 Ok(ZeroBytes::new(zeroes))
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use qcheck_macros::quickcheck;
429 use radicle::node::device::Device;
430 use radicle::node::UserAgent;
431 use radicle::storage::refs::RefsAt;
432 use radicle::test::arbitrary;
433
434 use crate::deserializer::Deserializer;
435 use crate::prop_roundtrip;
436 use crate::wire::{roundtrip, Encode as _};
437
438 use super::*;
439
440 prop_roundtrip!(Address);
441 prop_roundtrip!(Message);
442
443 #[test]
444 fn test_refs_ann_max_size() {
445 let signer = Device::mock();
446 let refs: [RefsAt; REF_REMOTE_LIMIT] = arbitrary::gen(1);
447 let ann = AnnouncementMessage::Refs(RefsAnnouncement {
448 rid: arbitrary::gen(1),
449 refs: BoundedVec::collect_from(&mut refs.into_iter()),
450 timestamp: arbitrary::gen(1),
451 });
452 let ann = ann.signed(&signer);
453 let msg = Message::Announcement(ann);
454 let data = msg.encode_to_vec();
455
456 assert!(data.len() < wire::Size::MAX as usize);
457 }
458
459 #[test]
460 fn test_inv_ann_max_size() {
461 let signer = Device::mock();
462 let inv: [RepoId; INVENTORY_LIMIT] = arbitrary::gen(1);
463 let ann = AnnouncementMessage::Inventory(InventoryAnnouncement {
464 inventory: BoundedVec::collect_from(&mut inv.into_iter()),
465 timestamp: arbitrary::gen(1),
466 });
467 let ann = ann.signed(&signer);
468 let msg = Message::Announcement(ann);
469 let data = msg.encode_to_vec();
470
471 assert!(data.len() < wire::Size::MAX as usize);
472 }
473
474 #[test]
475 fn test_node_ann_max_size() {
476 let signer = Device::mock();
477 let addrs: [Address; ADDRESS_LIMIT] = arbitrary::gen(1);
478 let alias = ['@'; radicle::node::MAX_ALIAS_LENGTH];
479 let ann = AnnouncementMessage::Node(NodeAnnouncement {
480 version: 1,
481 features: Default::default(),
482 alias: radicle::node::Alias::new(String::from_iter(alias)),
483 addresses: BoundedVec::collect_from(&mut addrs.into_iter()),
484 timestamp: arbitrary::gen(1),
485 nonce: u64::MAX,
486 agent: UserAgent::default(),
487 });
488 let ann = ann.signed(&signer);
489 let msg = Message::Announcement(ann);
490 let data = msg.encode_to_vec();
491
492 assert!(data.len() < wire::Size::MAX as usize);
493 }
494
495 #[test]
496 fn test_pingpong_encode_max_size() {
497 Message::Ping(Ping {
498 ponglen: 0,
499 zeroes: ZeroBytes::new(Ping::MAX_PING_ZEROES),
500 })
501 .encode_to_vec();
502
503 (Message::Pong {
504 zeroes: ZeroBytes::new(Ping::MAX_PONG_ZEROES),
505 })
506 .encode_to_vec();
507 }
508
509 #[test]
510 #[should_panic(expected = "advance out of bounds")]
511 fn test_ping_encode_size_overflow() {
512 Message::Ping(Ping {
513 ponglen: 0,
514 zeroes: ZeroBytes::new(Ping::MAX_PING_ZEROES + 1),
515 })
516 .encode_to_vec();
517 }
518
519 #[test]
520 #[should_panic(expected = "advance out of bounds")]
521 fn test_pong_encode_size_overflow() {
522 Message::Pong {
523 zeroes: ZeroBytes::new(Ping::MAX_PONG_ZEROES + 1),
524 }
525 .encode_to_vec();
526 }
527
528 #[test]
529 fn prop_message_decoder() {
530 fn property(items: Vec<Message>) {
531 let mut decoder = Deserializer::<1048576, Message>::new(8);
532
533 for item in &items {
534 item.encode(&mut decoder);
535 }
536 for item in items {
537 assert_eq!(decoder.next().unwrap().unwrap(), item);
538 }
539 }
540
541 qcheck::QuickCheck::new()
542 .gen(qcheck::Gen::new(16))
543 .quickcheck(property as fn(items: Vec<Message>));
544 }
545
546 #[quickcheck]
547 fn prop_zero_bytes_encode_decode(zeroes: wire::Size) -> qcheck::TestResult {
548 if zeroes > Ping::MAX_PING_ZEROES {
549 return qcheck::TestResult::discard();
550 }
551
552 roundtrip(ZeroBytes::new(zeroes));
553
554 qcheck::TestResult::passed()
555 }
556}