1use std::{mem, net};
2
3use bytes::Buf;
4use bytes::BufMut;
5use cyphernet::addr::{tor, HostName, NetAddr};
6use radicle::crypto::Signature;
7use radicle::git::Oid;
8use radicle::identity::RepoId;
9use radicle::node::Address;
10use radicle::node::NodeId;
11use radicle::node::Timestamp;
12
13use crate::bounded::BoundedVec;
14use crate::service::filter::Filter;
15use crate::service::message::*;
16use crate::wire;
17
18#[repr(u16)]
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum MessageType {
22 NodeAnnouncement = 2,
23 InventoryAnnouncement = 4,
24 RefsAnnouncement = 6,
25 Subscribe = 8,
26 Ping = 10,
27 Pong = 12,
28 Info = 14,
29}
30
31impl From<MessageType> for u16 {
32 fn from(other: MessageType) -> Self {
33 other as u16
34 }
35}
36
37impl TryFrom<u16> for MessageType {
38 type Error = u16;
39
40 fn try_from(other: u16) -> Result<Self, Self::Error> {
41 match other {
42 2 => Ok(MessageType::NodeAnnouncement),
43 4 => Ok(MessageType::InventoryAnnouncement),
44 6 => Ok(MessageType::RefsAnnouncement),
45 8 => Ok(MessageType::Subscribe),
46 10 => Ok(MessageType::Ping),
47 12 => Ok(MessageType::Pong),
48 14 => Ok(MessageType::Info),
49 _ => Err(other),
50 }
51 }
52}
53
54impl Message {
55 pub const MAX_SIZE: wire::Size =
57 wire::Size::MAX - (mem::size_of::<MessageType>() as wire::Size);
58
59 pub fn type_id(&self) -> u16 {
60 match self {
61 Self::Subscribe { .. } => MessageType::Subscribe,
62 Self::Announcement(Announcement { message, .. }) => match message {
63 AnnouncementMessage::Node(_) => MessageType::NodeAnnouncement,
64 AnnouncementMessage::Inventory(_) => MessageType::InventoryAnnouncement,
65 AnnouncementMessage::Refs(_) => MessageType::RefsAnnouncement,
66 },
67 Self::Info(_) => MessageType::Info,
68 Self::Ping { .. } => MessageType::Ping,
69 Self::Pong { .. } => MessageType::Pong,
70 }
71 .into()
72 }
73}
74
75#[repr(u8)]
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum AddressType {
79 Ipv4 = 1,
80 Ipv6 = 2,
81 Dns = 3,
82 Onion = 4,
83}
84
85impl From<AddressType> for u8 {
86 fn from(other: AddressType) -> Self {
87 other as u8
88 }
89}
90
91impl From<&Address> for AddressType {
92 fn from(a: &Address) -> Self {
93 match a.host {
94 HostName::Ip(net::IpAddr::V4(_)) => AddressType::Ipv4,
95 HostName::Ip(net::IpAddr::V6(_)) => AddressType::Ipv6,
96 HostName::Dns(_) => AddressType::Dns,
97 HostName::Tor(_) => AddressType::Onion,
98 _ => todo!(), }
100 }
101}
102
103impl TryFrom<u8> for AddressType {
104 type Error = u8;
105
106 fn try_from(other: u8) -> Result<Self, Self::Error> {
107 match other {
108 1 => Ok(AddressType::Ipv4),
109 2 => Ok(AddressType::Ipv6),
110 3 => Ok(AddressType::Dns),
111 4 => Ok(AddressType::Onion),
112 _ => Err(other),
113 }
114 }
115}
116
117impl wire::Encode for AnnouncementMessage {
118 fn encode(&self, buf: &mut impl BufMut) {
119 match self {
120 Self::Node(ann) => ann.encode(buf),
121 Self::Inventory(ann) => ann.encode(buf),
122 Self::Refs(ann) => ann.encode(buf),
123 }
124 }
125}
126
127impl wire::Encode for RefsAnnouncement {
128 fn encode(&self, buf: &mut impl BufMut) {
129 self.rid.encode(buf);
130 self.refs.encode(buf);
131 self.timestamp.encode(buf);
132 }
133}
134
135impl wire::Decode for RefsAnnouncement {
136 fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
137 let rid = RepoId::decode(buf)?;
138 let refs = BoundedVec::<_, REF_REMOTE_LIMIT>::decode(buf)?;
139 let timestamp = Timestamp::decode(buf)?;
140
141 Ok(Self {
142 rid,
143 refs,
144 timestamp,
145 })
146 }
147}
148
149impl wire::Encode for InventoryAnnouncement {
150 fn encode(&self, buf: &mut impl BufMut) {
151 self.inventory.encode(buf);
152 self.timestamp.encode(buf);
153 }
154}
155
156impl wire::Decode for InventoryAnnouncement {
157 fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
158 let inventory = BoundedVec::decode(buf)?;
159 let timestamp = Timestamp::decode(buf)?;
160
161 Ok(Self {
162 inventory,
163 timestamp,
164 })
165 }
166}
167
168#[repr(u8)]
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum InfoType {
173 RefsAlreadySynced = 1,
174}
175
176impl From<InfoType> for u16 {
177 fn from(other: InfoType) -> Self {
178 other as u16
179 }
180}
181
182impl TryFrom<u16> for InfoType {
183 type Error = u16;
184
185 fn try_from(other: u16) -> Result<Self, Self::Error> {
186 match other {
187 1 => Ok(Self::RefsAlreadySynced),
188 n => Err(n),
189 }
190 }
191}
192
193impl From<Info> for InfoType {
194 fn from(info: Info) -> Self {
195 (&info).into()
196 }
197}
198
199impl From<&Info> for InfoType {
200 fn from(info: &Info) -> Self {
201 match info {
202 Info::RefsAlreadySynced { .. } => Self::RefsAlreadySynced,
203 }
204 }
205}
206
207impl wire::Encode for Info {
208 fn encode(&self, buf: &mut impl BufMut) {
209 u16::from(InfoType::from(self)).encode(buf);
210 match self {
211 Info::RefsAlreadySynced { rid, at } => {
212 rid.encode(buf);
213 at.encode(buf);
214 }
215 }
216 }
217}
218
219impl wire::Decode for Info {
220 fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
221 let info_type = buf.try_get_u16()?;
222
223 match InfoType::try_from(info_type) {
224 Ok(InfoType::RefsAlreadySynced) => {
225 let rid = RepoId::decode(buf)?;
226 let at = Oid::decode(buf)?;
227
228 Ok(Self::RefsAlreadySynced { rid, at })
229 }
230 Err(other) => Err(wire::Error::UnknownInfoType(other)),
231 }
232 }
233}
234
235impl wire::Encode for Message {
236 fn encode(&self, buf: &mut impl BufMut) {
237 self.type_id().encode(buf);
238
239 match self {
240 Self::Subscribe(Subscribe {
241 filter,
242 since,
243 until,
244 }) => {
245 filter.encode(buf);
246 since.encode(buf);
247 until.encode(buf);
248 }
249 Self::Announcement(Announcement {
250 node,
251 message,
252 signature,
253 }) => {
254 node.encode(buf);
255 signature.encode(buf);
256 message.encode(buf);
257 }
258 Self::Info(info) => {
259 info.encode(buf);
260 }
261 Self::Ping(Ping { ponglen, zeroes }) => {
262 ponglen.encode(buf);
263 zeroes.encode(buf);
264 }
265 Self::Pong { zeroes } => {
266 zeroes.encode(buf);
267 }
268 }
269 }
270}
271
272impl wire::Decode for Message {
273 fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
274 let type_id = buf.try_get_u16()?;
275
276 match MessageType::try_from(type_id) {
277 Ok(MessageType::Subscribe) => {
278 let filter = Filter::decode(buf)?;
279 let since = Timestamp::decode(buf)?;
280 let until = Timestamp::decode(buf)?;
281
282 Ok(Self::Subscribe(Subscribe {
283 filter,
284 since,
285 until,
286 }))
287 }
288 Ok(MessageType::NodeAnnouncement) => {
289 let node = NodeId::decode(buf)?;
290 let signature = Signature::decode(buf)?;
291 let message = NodeAnnouncement::decode(buf)?.into();
292
293 Ok(Announcement {
294 node,
295 message,
296 signature,
297 }
298 .into())
299 }
300 Ok(MessageType::InventoryAnnouncement) => {
301 let node = NodeId::decode(buf)?;
302 let signature = Signature::decode(buf)?;
303 let message = InventoryAnnouncement::decode(buf)?.into();
304
305 Ok(Announcement {
306 node,
307 message,
308 signature,
309 }
310 .into())
311 }
312 Ok(MessageType::RefsAnnouncement) => {
313 let node = NodeId::decode(buf)?;
314 let signature = Signature::decode(buf)?;
315 let message = RefsAnnouncement::decode(buf)?.into();
316
317 Ok(Announcement {
318 node,
319 message,
320 signature,
321 }
322 .into())
323 }
324 Ok(MessageType::Info) => {
325 let info = Info::decode(buf)?;
326 Ok(Self::Info(info))
327 }
328 Ok(MessageType::Ping) => {
329 let ponglen = u16::decode(buf)?;
330 let zeroes = ZeroBytes::decode(buf)?;
331 Ok(Self::Ping(Ping { ponglen, zeroes }))
332 }
333 Ok(MessageType::Pong) => {
334 let zeroes = ZeroBytes::decode(buf)?;
335 Ok(Self::Pong { zeroes })
336 }
337 Err(other) => Err(wire::Error::UnknownMessageType(other)),
338 }
339 }
340}
341
342impl wire::Encode for Address {
343 fn encode(&self, buf: &mut impl BufMut) {
344 match self.host {
345 HostName::Ip(net::IpAddr::V4(ip)) => {
346 u8::from(AddressType::Ipv4).encode(buf);
347 ip.octets().encode(buf);
348 }
349 HostName::Ip(net::IpAddr::V6(ip)) => {
350 u8::from(AddressType::Ipv6).encode(buf);
351 ip.octets().encode(buf);
352 }
353 HostName::Dns(ref dns) => {
354 u8::from(AddressType::Dns).encode(buf);
355 dns.encode(buf);
356 }
357 HostName::Tor(addr) => {
358 u8::from(AddressType::Onion).encode(buf);
359 addr.encode(buf);
360 }
361 _ => {
362 unimplemented!(
363 "Encoding not defined for addresses of the same type as the following: {:?}",
364 self.host
365 );
366 }
367 }
368 self.port().encode(buf);
369 }
370}
371
372impl wire::Decode for Address {
373 fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
374 let addrtype = buf.try_get_u8()?;
375
376 let host = match AddressType::try_from(addrtype) {
377 Ok(AddressType::Ipv4) => {
378 let octets: [u8; 4] = wire::Decode::decode(buf)?;
379 let ip = net::Ipv4Addr::from(octets);
380
381 HostName::Ip(net::IpAddr::V4(ip))
382 }
383 Ok(AddressType::Ipv6) => {
384 let octets: [u8; 16] = wire::Decode::decode(buf)?;
385 let ip = net::Ipv6Addr::from(octets);
386
387 HostName::Ip(net::IpAddr::V6(ip))
388 }
389 Ok(AddressType::Dns) => {
390 let dns: String = wire::Decode::decode(buf)?;
391
392 HostName::Dns(dns)
393 }
394 Ok(AddressType::Onion) => {
395 let onion: tor::OnionAddrV3 = wire::Decode::decode(buf)?;
396
397 HostName::Tor(onion)
398 }
399 Err(other) => return Err(wire::Error::UnknownAddressType(other)),
400 };
401 let port = u16::decode(buf)?;
402
403 Ok(Self::from(NetAddr { host, port }))
404 }
405}
406
407impl wire::Encode for ZeroBytes {
408 fn encode(&self, buf: &mut impl BufMut) {
409 (self.len() as u16).encode(buf);
410 buf.put_bytes(0u8, self.len());
411 }
412}
413
414impl wire::Decode for ZeroBytes {
415 fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
416 let zeroes = u16::decode(buf)?;
417 for _ in 0..zeroes {
418 _ = u8::decode(buf)?;
419 }
420 Ok(ZeroBytes::new(zeroes))
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427 use qcheck_macros::quickcheck;
428 use radicle::node::device::Device;
429 use radicle::node::UserAgent;
430 use radicle::storage::refs::RefsAt;
431
432 use crate::deserializer::Deserializer;
433 use crate::wire::{self, Encode};
434 use radicle::test::arbitrary;
435
436 #[test]
437 fn test_refs_ann_max_size() {
438 let signer = Device::mock();
439 let refs: [RefsAt; REF_REMOTE_LIMIT] = arbitrary::gen(1);
440 let ann = AnnouncementMessage::Refs(RefsAnnouncement {
441 rid: arbitrary::gen(1),
442 refs: BoundedVec::collect_from(&mut refs.into_iter()),
443 timestamp: arbitrary::gen(1),
444 });
445 let ann = ann.signed(&signer);
446 let msg = Message::Announcement(ann);
447 let data = wire::serialize(&msg);
448
449 assert!(data.len() < wire::Size::MAX as usize);
450 }
451
452 #[test]
453 fn test_inv_ann_max_size() {
454 let signer = Device::mock();
455 let inv: [RepoId; INVENTORY_LIMIT] = arbitrary::gen(1);
456 let ann = AnnouncementMessage::Inventory(InventoryAnnouncement {
457 inventory: BoundedVec::collect_from(&mut inv.into_iter()),
458 timestamp: arbitrary::gen(1),
459 });
460 let ann = ann.signed(&signer);
461 let msg = Message::Announcement(ann);
462 let data = wire::serialize(&msg);
463
464 assert!(data.len() < wire::Size::MAX as usize);
465 }
466
467 #[test]
468 fn test_node_ann_max_size() {
469 let signer = Device::mock();
470 let addrs: [Address; ADDRESS_LIMIT] = arbitrary::gen(1);
471 let alias = ['@'; radicle::node::MAX_ALIAS_LENGTH];
472 let ann = AnnouncementMessage::Node(NodeAnnouncement {
473 version: 1,
474 features: Default::default(),
475 alias: radicle::node::Alias::new(String::from_iter(alias)),
476 addresses: BoundedVec::collect_from(&mut addrs.into_iter()),
477 timestamp: arbitrary::gen(1),
478 nonce: u64::MAX,
479 agent: UserAgent::default(),
480 });
481 let ann = ann.signed(&signer);
482 let msg = Message::Announcement(ann);
483 let data = wire::serialize(&msg);
484
485 assert!(data.len() < wire::Size::MAX as usize);
486 }
487
488 #[test]
489 fn test_pingpong_encode_max_size() {
490 wire::serialize(&Message::Ping(Ping {
491 ponglen: 0,
492 zeroes: ZeroBytes::new(Ping::MAX_PING_ZEROES),
493 }));
494
495 wire::serialize(&Message::Pong {
496 zeroes: ZeroBytes::new(Ping::MAX_PONG_ZEROES),
497 });
498 }
499
500 #[test]
501 #[should_panic(expected = "advance out of bounds")]
502 fn test_ping_encode_size_overflow() {
503 wire::serialize(&Message::Ping(Ping {
504 ponglen: 0,
505 zeroes: ZeroBytes::new(Ping::MAX_PING_ZEROES + 1),
506 }));
507 }
508
509 #[test]
510 #[should_panic(expected = "advance out of bounds")]
511 fn test_pong_encode_size_overflow() {
512 wire::serialize(&Message::Pong {
513 zeroes: ZeroBytes::new(Ping::MAX_PONG_ZEROES + 1),
514 });
515 }
516
517 #[quickcheck]
518 fn prop_message_encode_decode(message: Message) {
519 let encoded = &wire::serialize(&message);
520 let decoded = wire::deserialize::<Message>(encoded).unwrap();
521
522 assert_eq!(message, decoded);
523 }
524
525 #[test]
526 fn prop_message_decoder() {
527 fn property(items: Vec<Message>) {
528 let mut decoder = Deserializer::<1048576, Message>::new(8);
529
530 for item in &items {
531 item.encode(&mut decoder);
532 }
533 for item in items {
534 assert_eq!(decoder.next().unwrap().unwrap(), item);
535 }
536 }
537
538 qcheck::QuickCheck::new()
539 .gen(qcheck::Gen::new(16))
540 .quickcheck(property as fn(items: Vec<Message>));
541 }
542
543 #[test]
544 fn prop_zero_bytes_encode_decode() {
545 fn property(zeroes: wire::Size) {
546 if zeroes > Ping::MAX_PING_ZEROES {
547 return;
548 }
549
550 let zeroes = ZeroBytes::new(zeroes);
551
552 assert_eq!(
553 wire::deserialize::<ZeroBytes>(&wire::serialize(&zeroes)).unwrap(),
554 zeroes
555 );
556 }
557
558 qcheck::QuickCheck::new()
559 .gen(qcheck::Gen::new(16))
560 .quickcheck(property as fn(zeroes: wire::Size));
561 }
562
563 #[quickcheck]
564 fn prop_addr(addr: Address) {
565 assert_eq!(
566 wire::deserialize::<Address>(&wire::serialize(&addr)).unwrap(),
567 addr
568 );
569 }
570}